aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c294
1 files changed, 167 insertions, 127 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index e3094404e2..694460d702 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -1,19 +1,19 @@
/*
* %CopyrightBegin%
- *
- * Copyright Ericsson AB 1996-2009. All Rights Reserved.
- *
+ *
+ * Copyright Ericsson AB 1996-2010. All Rights Reserved.
+ *
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
- *
+ *
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
- *
+ *
* %CopyrightEnd%
*/
@@ -97,6 +97,8 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
#define PASS_THROUGH 'p' /* This code should go */
int erts_is_alive; /* System must be blocked on change */
+int erts_dist_buf_busy_limit;
+
/* distribution trap functions */
Export* dsend2_trap = NULL;
@@ -160,7 +162,7 @@ Uint erts_dist_cache_size(void)
static ErtsProcList *
get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs)
{
- ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&dep->qlock));
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&dep->qlock));
dep->qflgs &= ~unset_qflgs;
if (dep->qflgs & ERTS_DE_QFLG_EXIT) {
/* No resume when exit has been scheduled */
@@ -228,6 +230,7 @@ int is_node_name_atom(Eterm a)
typedef struct {
DistEntry *dep;
+ Eterm *lhp;
} NetExitsContext;
/*
@@ -253,8 +256,9 @@ static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp)
erts_destroy_monitor(rmon);
}
} else {
- Eterm lhp[3];
+ DeclareTmpHeapNoproc(lhp,3);
Eterm watched;
+ UseTmpHeapNoproc(3);
ASSERT(mon->type == MON_TARGET);
rmon = erts_remove_monitor(&(rp->monitors),mon->ref);
/* ASSERT(rmon != NULL); can happen during process exit */
@@ -271,6 +275,7 @@ static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp)
watched, am_noconnection);
erts_destroy_monitor(rmon);
}
+ UnUseTmpHeapNoproc(3);
}
erts_smp_proc_unlock(rp, rp_locks);
done:
@@ -450,17 +455,17 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
if (dep->status & ERTS_DE_SFLG_EXITING) {
#ifdef DEBUG
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
#endif
}
else {
dep->status |= ERTS_DE_SFLG_EXITING;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
dep->qflgs |= ERTS_DE_QFLG_EXIT;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
erts_smp_de_links_lock(dep);
@@ -574,7 +579,7 @@ static void clear_dist_entry(DistEntry *dep)
erts_smp_de_links_unlock(dep);
#endif
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
if (!dep->out_queue.last)
obuf = dep->finalized_out_queue.first;
@@ -590,7 +595,7 @@ static void clear_dist_entry(DistEntry *dep)
dep->status = 0;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
erts_smp_atomic_set(&dep->dist_cmd_scheduled, 0);
dep->send = NULL;
erts_smp_de_rwunlock(dep);
@@ -608,10 +613,10 @@ static void clear_dist_entry(DistEntry *dep)
}
if (obufsize) {
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
}
@@ -632,19 +637,27 @@ static void clear_dist_entry(DistEntry *dep)
int
erts_dsig_send_link(ErtsDSigData *dsdp, Eterm local, Eterm remote)
{
- Eterm ctl_heap[4];
+ DeclareTmpHeapNoproc(ctl_heap,4);
Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_LINK), local, remote);
+ int res;
+ UseTmpHeapNoproc(4);
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ UnUseTmpHeapNoproc(4);
+ return res;
}
int
erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote)
{
- Eterm ctl_heap[4];
+ DeclareTmpHeapNoproc(ctl_heap,4);
Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_UNLINK), local, remote);
+ int res;
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ UseTmpHeapNoproc(4);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ UnUseTmpHeapNoproc(4);
+ return res;
}
@@ -656,7 +669,10 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
Eterm ref, Eterm reason)
{
Eterm ctl;
- Eterm ctl_heap[6];
+ DeclareTmpHeapNoproc(ctl_heap,6);
+ int res;
+
+ UseTmpHeapNoproc(6);
ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT),
watched, watcher, ref, reason);
@@ -667,7 +683,9 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
erts_smp_de_links_unlock(dsdp->dep);
#endif
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ UnUseTmpHeapNoproc(6);
+ return res;
}
/* We want to monitor a process (named or unnamed) on another node, we send:
@@ -678,13 +696,17 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
Eterm ref)
{
Eterm ctl;
- Eterm ctl_heap[5];
+ DeclareTmpHeapNoproc(ctl_heap,5);
+ int res;
+ UseTmpHeapNoproc(5);
ctl = TUPLE4(&ctl_heap[0],
make_small(DOP_MONITOR_P),
watcher, watched, ref);
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ UnUseTmpHeapNoproc(5);
+ return res;
}
/* A local process monitoring a remote one wants to stop monitoring, either
@@ -696,23 +718,29 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher,
Eterm watched, Eterm ref, int force)
{
Eterm ctl;
- Eterm ctl_heap[5];
+ DeclareTmpHeapNoproc(ctl_heap,5);
+ int res;
+ UseTmpHeapNoproc(5);
ctl = TUPLE4(&ctl_heap[0],
make_small(DOP_DEMONITOR_P),
watcher, watched, ref);
- return dsig_send(dsdp, ctl, THE_NON_VALUE, force);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, force);
+ UnUseTmpHeapNoproc(5);
+ return res;
}
int
erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message)
{
Eterm ctl;
- Eterm ctl_heap[5];
+ DeclareTmpHeapNoproc(ctl_heap,5);
Eterm token = NIL;
Process *sender = dsdp->proc;
+ int res;
+ UseTmpHeapNoproc(5);
if (SEQ_TRACE_TOKEN(sender) != NIL) {
seq_trace_update_send(sender);
token = SEQ_TRACE_TOKEN(sender);
@@ -724,17 +752,21 @@ erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message)
make_small(DOP_SEND_TT), am_Cookie, remote, token);
else
ctl = TUPLE3(&ctl_heap[0], make_small(DOP_SEND), am_Cookie, remote);
- return dsig_send(dsdp, ctl, message, 0);
+ res = dsig_send(dsdp, ctl, message, 0);
+ UnUseTmpHeapNoproc(5);
+ return res;
}
int
erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message)
{
Eterm ctl;
- Eterm ctl_heap[6];
+ DeclareTmpHeapNoproc(ctl_heap,6);
Eterm token = NIL;
Process *sender = dsdp->proc;
+ int res;
+ UseTmpHeapNoproc(6);
if (SEQ_TRACE_TOKEN(sender) != NIL) {
seq_trace_update_send(sender);
token = SEQ_TRACE_TOKEN(sender);
@@ -747,7 +779,9 @@ erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message)
else
ctl = TUPLE4(&ctl_heap[0], make_small(DOP_REG_SEND),
sender->id, am_Cookie, remote_name);
- return dsig_send(dsdp, ctl, message, 0);
+ res = dsig_send(dsdp, ctl, message, 0);
+ UnUseTmpHeapNoproc(6);
+ return res;
}
/* local has died, deliver the exit signal to remote */
@@ -756,8 +790,10 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
Eterm reason, Eterm token)
{
Eterm ctl;
- Eterm ctl_heap[6];
+ DeclareTmpHeapNoproc(ctl_heap,6);
+ int res;
+ UseTmpHeapNoproc(6);
if (token != NIL) {
seq_trace_update_send(dsdp->proc);
seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local);
@@ -767,38 +803,58 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
}
/* forced, i.e ignore busy */
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ UnUseTmpHeapNoproc(6);
+ return res;
}
int
erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason)
{
- Eterm ctl_heap[5];
- Eterm ctl = TUPLE4(&ctl_heap[0],
- make_small(DOP_EXIT), local, remote, reason);
+ DeclareTmpHeapNoproc(ctl_heap,5);
+ int res;
+ Eterm ctl;
+
+ UseTmpHeapNoproc(5);
+ ctl = TUPLE4(&ctl_heap[0],
+ make_small(DOP_EXIT), local, remote, reason);
/* forced, i.e ignore busy */
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ UnUseTmpHeapNoproc(5);
+ return res;
}
int
erts_dsig_send_exit2(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason)
{
- Eterm ctl_heap[5];
- Eterm ctl = TUPLE4(&ctl_heap[0],
- make_small(DOP_EXIT2), local, remote, reason);
+ DeclareTmpHeapNoproc(ctl_heap,5);
+ int res;
+ Eterm ctl;
+
+ UseTmpHeapNoproc(5);
+ ctl = TUPLE4(&ctl_heap[0],
+ make_small(DOP_EXIT2), local, remote, reason);
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ UnUseTmpHeapNoproc(5);
+ return res;
}
int
erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
{
- Eterm ctl_heap[4];
- Eterm ctl = TUPLE3(&ctl_heap[0],
- make_small(DOP_GROUP_LEADER), leader, remote);
+ DeclareTmpHeapNoproc(ctl_heap,4);
+ int res;
+ Eterm ctl;
+
+ UseTmpHeapNoproc(4);
+ ctl = TUPLE3(&ctl_heap[0],
+ make_small(DOP_GROUP_LEADER), leader, remote);
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ UnUseTmpHeapNoproc(4);
+ return res;
}
#if defined(PURIFY)
@@ -832,6 +888,7 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
**
** assert hlen == 0 !!!
*/
+
int erts_net_message(Port *prt,
DistEntry *dep,
byte *hbuf,
@@ -839,6 +896,7 @@ int erts_net_message(Port *prt,
byte *buf,
int len)
{
+#define DIST_CTL_DEFAULT_SIZE 64
ErtsDistExternal ede;
byte *t;
Sint ctl_len;
@@ -850,7 +908,7 @@ int erts_net_message(Port *prt,
Eterm *tuple;
Eterm reason;
Process* rp;
- Eterm ctl_default[64];
+ DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
Eterm* ctl = ctl_default;
ErlOffHeap off_heap;
Eterm* hp;
@@ -864,24 +922,25 @@ int erts_net_message(Port *prt,
int orig_len = len;
#endif
+ UseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
/* Thanks to Luke Gorrie */
- off_heap.mso = NULL;
-#ifndef HYBRID /* FIND ME! */
- off_heap.funs = NULL;
-#endif
+ off_heap.first = NULL;
off_heap.overhead = 0;
- off_heap.externals = NULL;
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (!erts_is_alive)
+ if (!erts_is_alive) {
+ UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
return 0;
+ }
if (hlen > 0)
goto data_error;
- if (len == 0) /* HANDLE TICK !!! */
+ if (len == 0) { /* HANDLE TICK !!! */
+ UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
return 0;
+ }
#ifdef ERTS_RAW_DIST_MSG_DBG
erts_fprintf(stderr, "<< ");
@@ -922,7 +981,8 @@ int erts_net_message(Port *prt,
goto data_error;
}
orig_ctl_len = ctl_len;
- if (ctl_len > sizeof(ctl_default)/sizeof(ctl_default[0])) {
+
+ if (ctl_len > DIST_CTL_DEFAULT_SIZE) {
ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm));
}
hp = ctl;
@@ -1202,7 +1262,7 @@ int erts_net_message(Port *prt,
{DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */
- Eterm lhp[3];
+ DeclareTmpHeapNoproc(lhp,3);
Eterm sysname;
ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_MSG_SEND|ERTS_PROC_LOCK_LINK;
@@ -1237,6 +1297,7 @@ int erts_net_message(Port *prt,
erts_smp_proc_unlock(rp, rp_locks);
break;
}
+ UseTmpHeapNoproc(3);
watched = (is_not_nil(mon->name)
? TUPLE2(&lhp[0], mon->name, sysname)
@@ -1246,6 +1307,7 @@ int erts_net_message(Port *prt,
ref, am_process, watched, reason);
erts_smp_proc_unlock(rp, rp_locks);
erts_destroy_monitor(mon);
+ UnUseTmpHeapNoproc(3);
break;
}
@@ -1370,45 +1432,29 @@ int erts_net_message(Port *prt,
}
}
- if (off_heap.mso) {
- erts_cleanup_mso(off_heap.mso);
- }
- if (off_heap.externals) {
- erts_cleanup_externals(off_heap.externals);
- }
+ erts_cleanup_offheap(&off_heap);
#ifndef HYBRID /* FIND ME! */
- if (off_heap.funs) {
- erts_cleanup_funs(off_heap.funs);
- }
if (ctl != ctl_default) {
erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
}
#endif
+ UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
ERTS_SMP_CHK_NO_PROC_LOCKS;
return 0;
data_error:
- if (off_heap.mso) {
- erts_cleanup_mso(off_heap.mso);
- }
- if (off_heap.externals) {
- erts_cleanup_externals(off_heap.externals);
- }
+ erts_cleanup_offheap(&off_heap);
#ifndef HYBRID /* FIND ME! */
- if (off_heap.funs) {
- erts_cleanup_funs(off_heap.funs);
- }
if (ctl != ctl_default) {
erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
}
#endif
+ UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
erts_do_exit_port(prt, dep->cid, am_killed);
ERTS_SMP_CHK_NO_PROC_LOCKS;
return -1;
}
-#define ERTS_DE_BUSY_LIMIT (128*1024)
-
static int
dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
{
@@ -1492,18 +1538,18 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
}
else {
ErtsProcList *plp = NULL;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
dep->qsize += size_obuf(obuf);
- if (dep->qsize >= ERTS_DE_BUSY_LIMIT)
+ if (dep->qsize >= erts_dist_buf_busy_limit)
dep->qflgs |= ERTS_DE_QFLG_BUSY;
if (!force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) {
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
plp = erts_proclist_create(c_p);
plp->next = NULL;
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
suspended = 1;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
}
/* Enqueue obuf on dist entry */
@@ -1529,7 +1575,7 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
}
}
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
erts_schedule_dist_command(NULL, dep);
erts_smp_de_runlock(dep);
@@ -1554,9 +1600,9 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
*/
data_size >>= (10-4);
-#if defined(ARCH_64)
+#if defined(ARCH_64) && !HALFWORD_HEAP
data_size &= 0x003fffffffffffff;
-#elif defined(ARCH_32)
+#elif defined(ARCH_32) || HALFWORD_HEAP
data_size &= 0x003fffff;
#else
# error "Ohh come on ... !?!"
@@ -1640,9 +1686,9 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
}
-#if defined(ARCH_64)
+#if defined(ARCH_64) && !HALFWORD_HEAP
#define ERTS_PORT_REDS_MASK__ 0x003fffffffffffffL
-#elif defined(ARCH_32)
+#elif defined(ARCH_32) || HALFWORD_HEAP
#define ERTS_PORT_REDS_MASK__ 0x003fffff
#else
# error "Ohh come on ... !?!"
@@ -1662,10 +1708,8 @@ erts_dist_command(Port *prt, int reds_limit)
{
Sint reds = ERTS_PORT_REDS_DIST_CMD_START;
int prt_busy;
- int de_busy;
Uint32 status;
Uint32 flags;
- Uint32 qflgs;
Sint obufsize = 0;
ErtsDistOutputQueue oq, foq;
DistEntry *dep = prt->dist_entry;
@@ -1700,13 +1744,12 @@ erts_dist_command(Port *prt, int reds_limit)
* a mess.
*/
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
oq.first = dep->out_queue.first;
oq.last = dep->out_queue.last;
dep->out_queue.first = NULL;
dep->out_queue.last = NULL;
- qflgs = dep->qflgs;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
foq.first = dep->finalized_out_queue.first;
foq.last = dep->finalized_out_queue.last;
@@ -1717,17 +1760,8 @@ erts_dist_command(Port *prt, int reds_limit)
goto preempted;
prt_busy = (int) (prt->status & ERTS_PORT_SFLG_PORT_BUSY);
- de_busy = (int) (qflgs & ERTS_DE_QFLG_BUSY);
- if (prt_busy) {
- if (!de_busy) {
- erts_smp_spin_lock(&dep->qlock);
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- erts_smp_spin_unlock(&dep->qlock);
- de_busy = 1;
- }
- }
- else if (foq.first) {
+ if (!prt_busy && foq.first) {
int preempt = 0;
do {
Uint size;
@@ -1745,10 +1779,7 @@ erts_dist_command(Port *prt, int reds_limit)
free_dist_obuf(fob);
preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD);
if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) {
- erts_smp_spin_lock(&dep->qlock);
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- erts_smp_spin_unlock(&dep->qlock);
- de_busy = prt_busy = 1;
+ prt_busy = 1;
break;
}
} while (foq.first && !preempt);
@@ -1831,10 +1862,7 @@ erts_dist_command(Port *prt, int reds_limit)
free_dist_obuf(fob);
preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD);
if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) {
- erts_smp_spin_lock(&dep->qlock);
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- erts_smp_spin_unlock(&dep->qlock);
- de_busy = prt_busy = 1;
+ prt_busy = 1;
if (oq.first && !preempt)
goto finalize_only;
}
@@ -1861,22 +1889,23 @@ erts_dist_command(Port *prt, int reds_limit)
* dist entry in a non-busy state and resume suspended
* processes.
*/
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
obufsize = 0;
- if (de_busy && !prt_busy && dep->qsize < ERTS_DE_BUSY_LIMIT) {
+ if (!prt_busy
+ && (dep->qflgs & ERTS_DE_QFLG_BUSY)
+ && dep->qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
int resumed;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
resumed = erts_resume_processes(suspendees);
reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
- de_busy = 0;
}
else
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
ASSERT(!oq.first && !oq.last);
@@ -1885,10 +1914,10 @@ erts_dist_command(Port *prt, int reds_limit)
if (obufsize != 0) {
ASSERT(obufsize > 0);
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
ASSERT(foq.first || !foq.last);
@@ -1938,9 +1967,9 @@ erts_dist_command(Port *prt, int reds_limit)
foq.last = NULL;
#ifdef DEBUG
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize == obufsize);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
#endif
}
else {
@@ -1949,14 +1978,14 @@ erts_dist_command(Port *prt, int reds_limit)
* Unhandle buffers need to be put back first
* in out_queue.
*/
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
dep->qsize -= obufsize;
obufsize = 0;
oq.last->next = dep->out_queue.first;
dep->out_queue.first = oq.first;
if (!dep->out_queue.last)
dep->out_queue.last = oq.last;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
erts_schedule_dist_command(prt, NULL);
@@ -1980,10 +2009,10 @@ erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id)
dep->status |= ERTS_DE_SFLG_EXITING;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
dep->qflgs |= ERTS_DE_QFLG_EXIT;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
erts_schedule_dist_command(NULL, dep);
}
@@ -2354,13 +2383,13 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
ErtsProcList *plp = erts_proclist_create(BIF_P);
plp->next = NULL;
erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
if (dep->suspended.last)
dep->suspended.last->next = plp;
else
dep->suspended.first = plp;
dep->suspended.last = plp;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
goto yield;
}
@@ -2388,9 +2417,9 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
ASSERT(dep->send);
#ifdef DEBUG
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize == 0);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
#endif
erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);
@@ -2547,12 +2576,15 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
int visible = 0;
int hidden = 0;
int this = 0;
- Uint buf[2]; /* For one cons-cell */
+ DeclareTmpHeap(buf,2,BIF_P); /* For one cons-cell */
DistEntry *dep;
Eterm arg_list = BIF_ARG_1;
#ifdef DEBUG
Eterm* endp;
#endif
+
+ UseTmpHeap(2,BIF_P);
+
if (is_atom(BIF_ARG_1))
arg_list = CONS(buf, BIF_ARG_1, NIL);
@@ -2563,13 +2595,14 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
case am_known: visible = hidden = not_connected = this = 1; break;
case am_this: this = 1; break;
case am_connected: visible = hidden = 1; break;
- default: BIF_ERROR(BIF_P, BADARG); break;
+ default: goto error; break;
}
arg_list = CDR(list_val(arg_list));
}
- if (is_not_nil(arg_list))
- BIF_ERROR(BIF_P, BADARG);
+ if (is_not_nil(arg_list)) {
+ goto error;
+ }
length = 0;
@@ -2591,7 +2624,7 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
if (length == 0) {
erts_smp_rwmtx_rwunlock(&erts_dist_table_rwmtx);
- BIF_RET(result);
+ goto done;
}
hp = HAlloc(BIF_P, 2*length);
@@ -2620,7 +2653,14 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
}
ASSERT(endp == hp);
erts_smp_rwmtx_rwunlock(&erts_dist_table_rwmtx);
+
+done:
+ UnUseTmpHeap(2,BIF_P);
BIF_RET(result);
+
+error:
+ UnUseTmpHeap(2,BIF_P);
+ BIF_ERROR(BIF_P,BADARG);
}
/**********************************************************************/