aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c465
1 files changed, 280 insertions, 185 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index e3094404e2..b1cdd0660a 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -1,19 +1,19 @@
/*
* %CopyrightBegin%
- *
- * Copyright Ericsson AB 1996-2009. All Rights Reserved.
- *
+ *
+ * Copyright Ericsson AB 1996-2011. All Rights Reserved.
+ *
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
- *
+ *
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
- *
+ *
* %CopyrightEnd%
*/
@@ -97,6 +97,8 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
#define PASS_THROUGH 'p' /* This code should go */
int erts_is_alive; /* System must be blocked on change */
+int erts_dist_buf_busy_limit;
+
/* distribution trap functions */
Export* dsend2_trap = NULL;
@@ -160,7 +162,7 @@ Uint erts_dist_cache_size(void)
static ErtsProcList *
get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs)
{
- ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&dep->qlock));
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&dep->qlock));
dep->qflgs &= ~unset_qflgs;
if (dep->qflgs & ERTS_DE_QFLG_EXIT) {
/* No resume when exit has been scheduled */
@@ -228,6 +230,7 @@ int is_node_name_atom(Eterm a)
typedef struct {
DistEntry *dep;
+ Eterm *lhp;
} NetExitsContext;
/*
@@ -253,8 +256,9 @@ static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp)
erts_destroy_monitor(rmon);
}
} else {
- Eterm lhp[3];
+ DeclareTmpHeapNoproc(lhp,3);
Eterm watched;
+ UseTmpHeapNoproc(3);
ASSERT(mon->type == MON_TARGET);
rmon = erts_remove_monitor(&(rp->monitors),mon->ref);
/* ASSERT(rmon != NULL); can happen during process exit */
@@ -271,6 +275,7 @@ static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp)
watched, am_noconnection);
erts_destroy_monitor(rmon);
}
+ UnUseTmpHeapNoproc(3);
}
erts_smp_proc_unlock(rp, rp_locks);
done:
@@ -450,17 +455,17 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
if (dep->status & ERTS_DE_SFLG_EXITING) {
#ifdef DEBUG
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
#endif
}
else {
dep->status |= ERTS_DE_SFLG_EXITING;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
dep->qflgs |= ERTS_DE_QFLG_EXIT;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
erts_smp_de_links_lock(dep);
@@ -574,7 +579,7 @@ static void clear_dist_entry(DistEntry *dep)
erts_smp_de_links_unlock(dep);
#endif
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
if (!dep->out_queue.last)
obuf = dep->finalized_out_queue.first;
@@ -590,7 +595,7 @@ static void clear_dist_entry(DistEntry *dep)
dep->status = 0;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
erts_smp_atomic_set(&dep->dist_cmd_scheduled, 0);
dep->send = NULL;
erts_smp_de_rwunlock(dep);
@@ -608,10 +613,10 @@ static void clear_dist_entry(DistEntry *dep)
}
if (obufsize) {
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
}
@@ -632,19 +637,27 @@ static void clear_dist_entry(DistEntry *dep)
int
erts_dsig_send_link(ErtsDSigData *dsdp, Eterm local, Eterm remote)
{
- Eterm ctl_heap[4];
+ DeclareTmpHeapNoproc(ctl_heap,4);
Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_LINK), local, remote);
+ int res;
+ UseTmpHeapNoproc(4);
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ UnUseTmpHeapNoproc(4);
+ return res;
}
int
erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote)
{
- Eterm ctl_heap[4];
+ DeclareTmpHeapNoproc(ctl_heap,4);
Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_UNLINK), local, remote);
+ int res;
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ UseTmpHeapNoproc(4);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ UnUseTmpHeapNoproc(4);
+ return res;
}
@@ -656,7 +669,10 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
Eterm ref, Eterm reason)
{
Eterm ctl;
- Eterm ctl_heap[6];
+ DeclareTmpHeapNoproc(ctl_heap,6);
+ int res;
+
+ UseTmpHeapNoproc(6);
ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT),
watched, watcher, ref, reason);
@@ -667,7 +683,9 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
erts_smp_de_links_unlock(dsdp->dep);
#endif
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ UnUseTmpHeapNoproc(6);
+ return res;
}
/* We want to monitor a process (named or unnamed) on another node, we send:
@@ -678,13 +696,17 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
Eterm ref)
{
Eterm ctl;
- Eterm ctl_heap[5];
+ DeclareTmpHeapNoproc(ctl_heap,5);
+ int res;
+ UseTmpHeapNoproc(5);
ctl = TUPLE4(&ctl_heap[0],
make_small(DOP_MONITOR_P),
watcher, watched, ref);
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ UnUseTmpHeapNoproc(5);
+ return res;
}
/* A local process monitoring a remote one wants to stop monitoring, either
@@ -696,23 +718,29 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher,
Eterm watched, Eterm ref, int force)
{
Eterm ctl;
- Eterm ctl_heap[5];
+ DeclareTmpHeapNoproc(ctl_heap,5);
+ int res;
+ UseTmpHeapNoproc(5);
ctl = TUPLE4(&ctl_heap[0],
make_small(DOP_DEMONITOR_P),
watcher, watched, ref);
- return dsig_send(dsdp, ctl, THE_NON_VALUE, force);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, force);
+ UnUseTmpHeapNoproc(5);
+ return res;
}
int
erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message)
{
Eterm ctl;
- Eterm ctl_heap[5];
+ DeclareTmpHeapNoproc(ctl_heap,5);
Eterm token = NIL;
Process *sender = dsdp->proc;
+ int res;
+ UseTmpHeapNoproc(5);
if (SEQ_TRACE_TOKEN(sender) != NIL) {
seq_trace_update_send(sender);
token = SEQ_TRACE_TOKEN(sender);
@@ -724,17 +752,21 @@ erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message)
make_small(DOP_SEND_TT), am_Cookie, remote, token);
else
ctl = TUPLE3(&ctl_heap[0], make_small(DOP_SEND), am_Cookie, remote);
- return dsig_send(dsdp, ctl, message, 0);
+ res = dsig_send(dsdp, ctl, message, 0);
+ UnUseTmpHeapNoproc(5);
+ return res;
}
int
erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message)
{
Eterm ctl;
- Eterm ctl_heap[6];
+ DeclareTmpHeapNoproc(ctl_heap,6);
Eterm token = NIL;
Process *sender = dsdp->proc;
+ int res;
+ UseTmpHeapNoproc(6);
if (SEQ_TRACE_TOKEN(sender) != NIL) {
seq_trace_update_send(sender);
token = SEQ_TRACE_TOKEN(sender);
@@ -747,7 +779,9 @@ erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message)
else
ctl = TUPLE4(&ctl_heap[0], make_small(DOP_REG_SEND),
sender->id, am_Cookie, remote_name);
- return dsig_send(dsdp, ctl, message, 0);
+ res = dsig_send(dsdp, ctl, message, 0);
+ UnUseTmpHeapNoproc(6);
+ return res;
}
/* local has died, deliver the exit signal to remote */
@@ -756,8 +790,10 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
Eterm reason, Eterm token)
{
Eterm ctl;
- Eterm ctl_heap[6];
+ DeclareTmpHeapNoproc(ctl_heap,6);
+ int res;
+ UseTmpHeapNoproc(6);
if (token != NIL) {
seq_trace_update_send(dsdp->proc);
seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local);
@@ -767,38 +803,58 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
}
/* forced, i.e ignore busy */
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ UnUseTmpHeapNoproc(6);
+ return res;
}
int
erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason)
{
- Eterm ctl_heap[5];
- Eterm ctl = TUPLE4(&ctl_heap[0],
- make_small(DOP_EXIT), local, remote, reason);
+ DeclareTmpHeapNoproc(ctl_heap,5);
+ int res;
+ Eterm ctl;
+
+ UseTmpHeapNoproc(5);
+ ctl = TUPLE4(&ctl_heap[0],
+ make_small(DOP_EXIT), local, remote, reason);
/* forced, i.e ignore busy */
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ UnUseTmpHeapNoproc(5);
+ return res;
}
int
erts_dsig_send_exit2(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason)
{
- Eterm ctl_heap[5];
- Eterm ctl = TUPLE4(&ctl_heap[0],
- make_small(DOP_EXIT2), local, remote, reason);
+ DeclareTmpHeapNoproc(ctl_heap,5);
+ int res;
+ Eterm ctl;
+
+ UseTmpHeapNoproc(5);
+ ctl = TUPLE4(&ctl_heap[0],
+ make_small(DOP_EXIT2), local, remote, reason);
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ UnUseTmpHeapNoproc(5);
+ return res;
}
int
erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
{
- Eterm ctl_heap[4];
- Eterm ctl = TUPLE3(&ctl_heap[0],
- make_small(DOP_GROUP_LEADER), leader, remote);
+ DeclareTmpHeapNoproc(ctl_heap,4);
+ int res;
+ Eterm ctl;
+
+ UseTmpHeapNoproc(4);
+ ctl = TUPLE3(&ctl_heap[0],
+ make_small(DOP_GROUP_LEADER), leader, remote);
- return dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ UnUseTmpHeapNoproc(4);
+ return res;
}
#if defined(PURIFY)
@@ -808,11 +864,15 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
#include <valgrind/valgrind.h>
#include <valgrind/memcheck.h>
+#ifndef HAVE_VALGRIND_PRINTF_XML
+#define VALGRIND_PRINTF_XML VALGRIND_PRINTF
+#endif
+
# define PURIFY_MSG(msg) \
do { \
char buf__[1]; size_t bufsz__ = sizeof(buf__); \
if (erts_sys_getenv("VALGRIND_LOG_XML", buf__, &bufsz__) >= 0) { \
- VALGRIND_PRINTF("<erlang_error_log>" \
+ VALGRIND_PRINTF_XML("<erlang_error_log>" \
"%s, line %d: %s</erlang_error_log>\n", \
__FILE__, __LINE__, msg); \
} else { \
@@ -832,6 +892,7 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
**
** assert hlen == 0 !!!
*/
+
int erts_net_message(Port *prt,
DistEntry *dep,
byte *hbuf,
@@ -839,10 +900,10 @@ int erts_net_message(Port *prt,
byte *buf,
int len)
{
+#define DIST_CTL_DEFAULT_SIZE 64
ErtsDistExternal ede;
byte *t;
Sint ctl_len;
- int orig_ctl_len;
Eterm arg;
Eterm from, to;
Eterm watcher, watched;
@@ -850,7 +911,7 @@ int erts_net_message(Port *prt,
Eterm *tuple;
Eterm reason;
Process* rp;
- Eterm ctl_default[64];
+ DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
Eterm* ctl = ctl_default;
ErlOffHeap off_heap;
Eterm* hp;
@@ -859,29 +920,31 @@ int erts_net_message(Port *prt,
Eterm token_size;
ErtsMonitor *mon;
ErtsLink *lnk;
+ Uint tuple_arity;
int res;
#ifdef ERTS_DIST_MSG_DBG
int orig_len = len;
#endif
+ UseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
/* Thanks to Luke Gorrie */
- off_heap.mso = NULL;
-#ifndef HYBRID /* FIND ME! */
- off_heap.funs = NULL;
-#endif
+ off_heap.first = NULL;
off_heap.overhead = 0;
- off_heap.externals = NULL;
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (!erts_is_alive)
+ if (!erts_is_alive) {
+ UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
return 0;
+ }
if (hlen > 0)
goto data_error;
- if (len == 0) /* HANDLE TICK !!! */
+ if (len == 0) { /* HANDLE TICK !!! */
+ UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
return 0;
+ }
#ifdef ERTS_RAW_DIST_MSG_DBG
erts_fprintf(stderr, "<< ");
@@ -921,8 +984,8 @@ int erts_net_message(Port *prt,
PURIFY_MSG("data error");
goto data_error;
}
- orig_ctl_len = ctl_len;
- if (ctl_len > sizeof(ctl_default)/sizeof(ctl_default[0])) {
+
+ if (ctl_len > DIST_CTL_DEFAULT_SIZE) {
ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm));
}
hp = ctl;
@@ -943,29 +1006,23 @@ int erts_net_message(Port *prt,
#endif
if (is_not_tuple(arg) ||
- (tuple = tuple_val(arg), arityval(*tuple) < 1) ||
+ (tuple = tuple_val(arg), (tuple_arity = arityval(*tuple)) < 1) ||
is_not_small(tuple[1])) {
- erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
- erts_dsprintf(dsbufp, "Invalid distribution message: %.200T", arg);
- erts_send_error_to_logger_nogl(dsbufp);
- goto data_error;
+ goto invalid_message;
}
token_size = 0;
switch (type = unsigned_val(tuple[1])) {
case DOP_LINK:
+ if (tuple_arity != 3) {
+ goto invalid_message;
+ }
from = tuple[2];
to = tuple[3]; /* local proc to link to */
if (is_not_pid(from) || is_not_pid(to)) {
- erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
- PURIFY_MSG("data error");
- erts_dsprintf(dsbufp,
- "Invalid DOP_LINK distribution message: %.200T",
- arg);
- erts_send_error_to_logger_nogl(dsbufp);
- goto data_error;
+ goto invalid_message;
}
rp = erts_pid2proc_opt(NULL, 0,
@@ -1004,8 +1061,14 @@ int erts_net_message(Port *prt,
case DOP_UNLINK: {
ErtsDistLinkData dld;
+ if (tuple_arity != 3) {
+ goto invalid_message;
+ }
from = tuple[2];
to = tuple[3];
+ if (is_not_pid(from) || is_not_pid(to)) {
+ goto invalid_message;
+ }
rp = erts_pid2proc_opt(NULL, 0,
to, ERTS_PROC_LOCK_LINK,
@@ -1032,11 +1095,19 @@ int erts_net_message(Port *prt,
/* A remote process wants to monitor us, we get:
{DOP_MONITOR_P, Remote pid, local pid or name, ref} */
Eterm name;
+
+ if (tuple_arity != 4) {
+ goto invalid_message;
+ }
watcher = tuple[2];
watched = tuple[3]; /* local proc to monitor */
ref = tuple[4];
+ if (is_not_ref(ref)) {
+ goto invalid_message;
+ }
+
if (is_atom(watched)) {
name = watched;
rp = erts_whereis_process(NULL, 0,
@@ -1078,10 +1149,17 @@ int erts_net_message(Port *prt,
We get {DOP_DEMONITOR_P, Remote pid, Local pid or name, ref},
We need only the ref of course */
+ if (tuple_arity != 4) {
+ goto invalid_message;
+ }
/* watcher = tuple[2]; */
/* watched = tuple[3]; May be an atom in case of monitor name */
ref = tuple[4];
+ if(is_not_ref(ref)) {
+ goto invalid_message;
+ }
+
erts_smp_de_links_lock(dep);
mon = erts_remove_monitor(&(dep->monitors),ref);
erts_smp_de_links_unlock(dep);
@@ -1106,10 +1184,11 @@ int erts_net_message(Port *prt,
erts_destroy_monitor(mon);
break;
- case DOP_NODE_LINK: /* XXX never sent ?? */
- break;
-
case DOP_REG_SEND_TT:
+ if (tuple_arity != 5) {
+ goto invalid_message;
+ }
+
token_size = size_object(tuple[5]);
/* Fall through ... */
case DOP_REG_SEND:
@@ -1120,12 +1199,19 @@ int erts_net_message(Port *prt,
* There is intentionally no testing of the cookie (it is always '')
* from R9B and onwards.
*/
+ if (type != DOP_REG_SEND_TT && tuple_arity != 4) {
+ goto invalid_message;
+ }
+
#ifdef ERTS_DIST_MSG_DBG
dist_msg_dbg(&ede, "MSG", buf, orig_len);
#endif
from = tuple[2];
to = tuple[4];
+ if (is_not_pid(from) || is_not_atom(to)){
+ goto invalid_message;
+ }
rp = erts_whereis_process(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC);
if (rp) {
Uint xsize = (type == DOP_REG_SEND
@@ -1157,6 +1243,10 @@ int erts_net_message(Port *prt,
break;
case DOP_SEND_TT:
+ if (tuple_arity != 4) {
+ goto invalid_message;
+ }
+
token_size = size_object(tuple[4]);
/* Fall through ... */
case DOP_SEND:
@@ -1167,8 +1257,13 @@ int erts_net_message(Port *prt,
#ifdef ERTS_DIST_MSG_DBG
dist_msg_dbg(&ede, "MSG", buf, orig_len);
#endif
-
+ if (type != DOP_SEND_TT && tuple_arity != 3) {
+ goto invalid_message;
+ }
to = tuple[3];
+ if (is_not_pid(to)) {
+ goto invalid_message;
+ }
rp = erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC);
if (rp) {
Uint xsize = type == DOP_SEND ? 0 : ERTS_HEAP_FRAG_SIZE(token_size);
@@ -1202,15 +1297,23 @@ int erts_net_message(Port *prt,
{DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */
- Eterm lhp[3];
+ DeclareTmpHeapNoproc(lhp,3);
Eterm sysname;
ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_MSG_SEND|ERTS_PROC_LOCK_LINK;
+ if (tuple_arity != 5) {
+ goto invalid_message;
+ }
+
/* watched = tuple[2]; */ /* remote proc which died */
/* watcher = tuple[3]; */
ref = tuple[4];
reason = tuple[5];
+ if(is_not_ref(ref)) {
+ goto invalid_message;
+ }
+
erts_smp_de_links_lock(dep);
sysname = dep->sysname;
mon = erts_remove_monitor(&(dep->monitors), ref);
@@ -1237,6 +1340,7 @@ int erts_net_message(Port *prt,
erts_smp_proc_unlock(rp, rp_locks);
break;
}
+ UseTmpHeapNoproc(3);
watched = (is_not_nil(mon->name)
? TUPLE2(&lhp[0], mon->name, sysname)
@@ -1246,6 +1350,7 @@ int erts_net_message(Port *prt,
ref, am_process, watched, reason);
erts_smp_proc_unlock(rp, rp_locks);
erts_destroy_monitor(mon);
+ UnUseTmpHeapNoproc(3);
break;
}
@@ -1255,24 +1360,25 @@ int erts_net_message(Port *prt,
ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND;
/* 'from', which 'to' is linked to, died */
if (type == DOP_EXIT) {
- from = tuple[2];
- to = tuple[3];
- reason = tuple[4];
- token = NIL;
+ if (tuple_arity != 4) {
+ goto invalid_message;
+ }
+
+ from = tuple[2];
+ to = tuple[3];
+ reason = tuple[4];
+ token = NIL;
} else {
- from = tuple[2];
- to = tuple[3];
- token = tuple[4];
- reason = tuple[5];
+ if (tuple_arity != 5) {
+ goto invalid_message;
+ }
+ from = tuple[2];
+ to = tuple[3];
+ token = tuple[4];
+ reason = tuple[5];
}
- if (is_not_internal_pid(to)) {
- erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
- PURIFY_MSG("data error");
- erts_dsprintf(dsbufp,
- "Invalid DOP_EXIT distribution message: %.200T",
- arg);
- erts_send_error_to_logger_nogl(dsbufp);
- goto data_error;
+ if (is_not_pid(from) || is_not_internal_pid(to)) {
+ goto invalid_message;
}
rp = erts_pid2proc(NULL, 0, to, rp_locks);
@@ -1319,15 +1425,24 @@ int erts_net_message(Port *prt,
ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
/* 'from' is send an exit signal to 'to' */
if (type == DOP_EXIT2) {
- from = tuple[2];
- to = tuple[3];
- reason = tuple[4];
- token = NIL;
+ if (tuple_arity != 4) {
+ goto invalid_message;
+ }
+ from = tuple[2];
+ to = tuple[3];
+ reason = tuple[4];
+ token = NIL;
} else {
- from = tuple[2];
- to = tuple[3];
- token = tuple[4];
- reason = tuple[5];
+ if (tuple_arity != 5) {
+ goto invalid_message;
+ }
+ from = tuple[2];
+ to = tuple[3];
+ token = tuple[4];
+ reason = tuple[5];
+ }
+ if (is_not_pid(from) || is_not_internal_pid(to)) {
+ goto invalid_message;
}
rp = erts_pid2proc_opt(NULL, 0, to, rp_locks,
ERTS_P2P_FLG_SMP_INC_REFC);
@@ -1346,10 +1461,14 @@ int erts_net_message(Port *prt,
break;
}
case DOP_GROUP_LEADER:
+ if (tuple_arity != 3) {
+ goto invalid_message;
+ }
from = tuple[2]; /* Group leader */
to = tuple[3]; /* new member */
- if (is_not_pid(from))
- break;
+ if (is_not_pid(from) || is_not_pid(to)) {
+ goto invalid_message;
+ }
rp = erts_pid2proc(NULL, 0, to, ERTS_PROC_LOCK_MAIN);
if (!rp)
@@ -1358,57 +1477,39 @@ int erts_net_message(Port *prt,
erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);
break;
- default: {
- erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
- erts_dsprintf(dsbufp,
- "Illegal value in distribution dispatch switch: "
- "%.200T",
- arg);
- erts_send_error_to_logger_nogl(dsbufp);
- PURIFY_MSG("data error");
- goto data_error;
- }
+ default:
+ goto invalid_message;
}
- if (off_heap.mso) {
- erts_cleanup_mso(off_heap.mso);
- }
- if (off_heap.externals) {
- erts_cleanup_externals(off_heap.externals);
- }
+ erts_cleanup_offheap(&off_heap);
#ifndef HYBRID /* FIND ME! */
- if (off_heap.funs) {
- erts_cleanup_funs(off_heap.funs);
- }
if (ctl != ctl_default) {
erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
}
#endif
+ UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
ERTS_SMP_CHK_NO_PROC_LOCKS;
return 0;
-
- data_error:
- if (off_heap.mso) {
- erts_cleanup_mso(off_heap.mso);
- }
- if (off_heap.externals) {
- erts_cleanup_externals(off_heap.externals);
+ invalid_message:
+ {
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ erts_dsprintf(dsbufp, "Invalid distribution message: %.200T", arg);
+ erts_send_error_to_logger_nogl(dsbufp);
}
+ data_error:
+ PURIFY_MSG("data error");
+ erts_cleanup_offheap(&off_heap);
#ifndef HYBRID /* FIND ME! */
- if (off_heap.funs) {
- erts_cleanup_funs(off_heap.funs);
- }
if (ctl != ctl_default) {
erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
}
#endif
+ UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
erts_do_exit_port(prt, dep->cid, am_killed);
ERTS_SMP_CHK_NO_PROC_LOCKS;
return -1;
}
-#define ERTS_DE_BUSY_LIMIT (128*1024)
-
static int
dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
{
@@ -1492,18 +1593,18 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
}
else {
ErtsProcList *plp = NULL;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
dep->qsize += size_obuf(obuf);
- if (dep->qsize >= ERTS_DE_BUSY_LIMIT)
+ if (dep->qsize >= erts_dist_buf_busy_limit)
dep->qflgs |= ERTS_DE_QFLG_BUSY;
if (!force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) {
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
plp = erts_proclist_create(c_p);
plp->next = NULL;
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
suspended = 1;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
}
/* Enqueue obuf on dist entry */
@@ -1529,7 +1630,7 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
}
}
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
erts_schedule_dist_command(NULL, dep);
erts_smp_de_runlock(dep);
@@ -1554,9 +1655,9 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
*/
data_size >>= (10-4);
-#if defined(ARCH_64)
+#if defined(ARCH_64) && !HALFWORD_HEAP
data_size &= 0x003fffffffffffff;
-#elif defined(ARCH_32)
+#elif defined(ARCH_32) || HALFWORD_HEAP
data_size &= 0x003fffff;
#else
# error "Ohh come on ... !?!"
@@ -1586,7 +1687,7 @@ dist_port_command(Port *prt, ErtsDistOutputBuf *obuf)
if (size > (Uint) INT_MAX)
erl_exit(ERTS_ABORT_EXIT,
"Absurdly large distribution output data buffer "
- "(%bpu bytes) passed.\n",
+ "(%beu bytes) passed.\n",
size);
prt->caller = NIL;
@@ -1613,7 +1714,7 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
if (size > (Uint) INT_MAX)
erl_exit(ERTS_ABORT_EXIT,
"Absurdly large distribution output data buffer "
- "(%bpu bytes) passed.\n",
+ "(%beu bytes) passed.\n",
size);
iov[0].iov_base = NULL;
@@ -1640,9 +1741,9 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
}
-#if defined(ARCH_64)
+#if defined(ARCH_64) && !HALFWORD_HEAP
#define ERTS_PORT_REDS_MASK__ 0x003fffffffffffffL
-#elif defined(ARCH_32)
+#elif defined(ARCH_32) || HALFWORD_HEAP
#define ERTS_PORT_REDS_MASK__ 0x003fffff
#else
# error "Ohh come on ... !?!"
@@ -1662,10 +1763,8 @@ erts_dist_command(Port *prt, int reds_limit)
{
Sint reds = ERTS_PORT_REDS_DIST_CMD_START;
int prt_busy;
- int de_busy;
Uint32 status;
Uint32 flags;
- Uint32 qflgs;
Sint obufsize = 0;
ErtsDistOutputQueue oq, foq;
DistEntry *dep = prt->dist_entry;
@@ -1700,13 +1799,12 @@ erts_dist_command(Port *prt, int reds_limit)
* a mess.
*/
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
oq.first = dep->out_queue.first;
oq.last = dep->out_queue.last;
dep->out_queue.first = NULL;
dep->out_queue.last = NULL;
- qflgs = dep->qflgs;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
foq.first = dep->finalized_out_queue.first;
foq.last = dep->finalized_out_queue.last;
@@ -1717,17 +1815,8 @@ erts_dist_command(Port *prt, int reds_limit)
goto preempted;
prt_busy = (int) (prt->status & ERTS_PORT_SFLG_PORT_BUSY);
- de_busy = (int) (qflgs & ERTS_DE_QFLG_BUSY);
- if (prt_busy) {
- if (!de_busy) {
- erts_smp_spin_lock(&dep->qlock);
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- erts_smp_spin_unlock(&dep->qlock);
- de_busy = 1;
- }
- }
- else if (foq.first) {
+ if (!prt_busy && foq.first) {
int preempt = 0;
do {
Uint size;
@@ -1745,10 +1834,7 @@ erts_dist_command(Port *prt, int reds_limit)
free_dist_obuf(fob);
preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD);
if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) {
- erts_smp_spin_lock(&dep->qlock);
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- erts_smp_spin_unlock(&dep->qlock);
- de_busy = prt_busy = 1;
+ prt_busy = 1;
break;
}
} while (foq.first && !preempt);
@@ -1831,10 +1917,7 @@ erts_dist_command(Port *prt, int reds_limit)
free_dist_obuf(fob);
preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD);
if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) {
- erts_smp_spin_lock(&dep->qlock);
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- erts_smp_spin_unlock(&dep->qlock);
- de_busy = prt_busy = 1;
+ prt_busy = 1;
if (oq.first && !preempt)
goto finalize_only;
}
@@ -1861,22 +1944,23 @@ erts_dist_command(Port *prt, int reds_limit)
* dist entry in a non-busy state and resume suspended
* processes.
*/
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
obufsize = 0;
- if (de_busy && !prt_busy && dep->qsize < ERTS_DE_BUSY_LIMIT) {
+ if (!prt_busy
+ && (dep->qflgs & ERTS_DE_QFLG_BUSY)
+ && dep->qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
int resumed;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
resumed = erts_resume_processes(suspendees);
reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
- de_busy = 0;
}
else
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
ASSERT(!oq.first && !oq.last);
@@ -1885,10 +1969,10 @@ erts_dist_command(Port *prt, int reds_limit)
if (obufsize != 0) {
ASSERT(obufsize > 0);
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
ASSERT(foq.first || !foq.last);
@@ -1938,9 +2022,9 @@ erts_dist_command(Port *prt, int reds_limit)
foq.last = NULL;
#ifdef DEBUG
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize == obufsize);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
#endif
}
else {
@@ -1949,14 +2033,14 @@ erts_dist_command(Port *prt, int reds_limit)
* Unhandle buffers need to be put back first
* in out_queue.
*/
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
dep->qsize -= obufsize;
obufsize = 0;
oq.last->next = dep->out_queue.first;
dep->out_queue.first = oq.first;
if (!dep->out_queue.last)
dep->out_queue.last = oq.last;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
erts_schedule_dist_command(prt, NULL);
@@ -1980,10 +2064,10 @@ erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id)
dep->status |= ERTS_DE_SFLG_EXITING;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
dep->qflgs |= ERTS_DE_QFLG_EXIT;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
erts_schedule_dist_command(NULL, dep);
}
@@ -2354,13 +2438,13 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
ErtsProcList *plp = erts_proclist_create(BIF_P);
plp->next = NULL;
erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
if (dep->suspended.last)
dep->suspended.last->next = plp;
else
dep->suspended.first = plp;
dep->suspended.last = plp;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
goto yield;
}
@@ -2388,9 +2472,9 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
ASSERT(dep->send);
#ifdef DEBUG
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize == 0);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
#endif
erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);
@@ -2547,12 +2631,15 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
int visible = 0;
int hidden = 0;
int this = 0;
- Uint buf[2]; /* For one cons-cell */
+ DeclareTmpHeap(buf,2,BIF_P); /* For one cons-cell */
DistEntry *dep;
Eterm arg_list = BIF_ARG_1;
#ifdef DEBUG
Eterm* endp;
#endif
+
+ UseTmpHeap(2,BIF_P);
+
if (is_atom(BIF_ARG_1))
arg_list = CONS(buf, BIF_ARG_1, NIL);
@@ -2563,13 +2650,14 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
case am_known: visible = hidden = not_connected = this = 1; break;
case am_this: this = 1; break;
case am_connected: visible = hidden = 1; break;
- default: BIF_ERROR(BIF_P, BADARG); break;
+ default: goto error; break;
}
arg_list = CDR(list_val(arg_list));
}
- if (is_not_nil(arg_list))
- BIF_ERROR(BIF_P, BADARG);
+ if (is_not_nil(arg_list)) {
+ goto error;
+ }
length = 0;
@@ -2591,7 +2679,7 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
if (length == 0) {
erts_smp_rwmtx_rwunlock(&erts_dist_table_rwmtx);
- BIF_RET(result);
+ goto done;
}
hp = HAlloc(BIF_P, 2*length);
@@ -2620,7 +2708,14 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
}
ASSERT(endp == hp);
erts_smp_rwmtx_rwunlock(&erts_dist_table_rwmtx);
+
+done:
+ UnUseTmpHeap(2,BIF_P);
BIF_RET(result);
+
+error:
+ UnUseTmpHeap(2,BIF_P);
+ BIF_ERROR(BIF_P,BADARG);
}
/**********************************************************************/