aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c228
1 files changed, 111 insertions, 117 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 23897a49ae..982f1066df 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1996-2014. All Rights Reserved.
+ * Copyright Ericsson AB 1996-2017. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
/* define this to get a lot of debug output */
/* #define ERTS_DIST_MSG_DBG */
+/* #define ERTS_RAW_DIST_MSG_DBG */
#ifdef HAVE_CONFIG_H
# include "config.h"
@@ -45,6 +46,8 @@
#include "erl_thr_progress.h"
#include "dtrace-wrapper.h"
+#define DIST_CTL_DEFAULT_SIZE 64
+
/* Turn this on to get printouts of all distribution messages
* which go on the line
*/
@@ -66,9 +69,13 @@ static void bw(byte *buf, ErlDrvSizeT sz)
static void
dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
{
+ ErtsHeapFactory factory;
+ DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
+ Eterm* ctl = ctl_default;
byte *extp = edep->extp;
Eterm msg;
- Sint size = erts_decode_dist_ext_size(edep);
+ Sint ctl_len;
+ Sint size = ctl_len = erts_decode_dist_ext_size(edep);
if (size < 0) {
erts_fprintf(stderr,
"DIST MSG DEBUG: erts_decode_dist_ext_size(%s) failed:\n",
@@ -76,10 +83,9 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
bw(buf, sz);
}
else {
- Eterm *hp;
ErlHeapFragment *mbuf = new_message_buffer(size);
- hp = mbuf->mem;
- msg = erts_decode_dist_ext(&hp, &mbuf->off_heap, edep);
+ erts_factory_static_init(&factory, ctl, ctl_len, &mbuf->off_heap);
+ msg = erts_decode_dist_ext(&factory, edep);
if (is_value(msg))
erts_fprintf(stderr, " %s: %T\n", what, msg);
else {
@@ -253,12 +259,12 @@ static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp)
DistEntry *dep = ((NetExitsContext *) vnecp)->dep;
ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK;
- rp = erts_pid2proc(NULL, 0, mon->pid, rp_locks);
+ rp = erts_pid2proc(NULL, 0, mon->u.pid, rp_locks);
if (!rp)
goto done;
if (mon->type == MON_ORIGIN) {
- /* local pid is beeing monitored */
+ /* local pid is being monitored */
rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref);
/* ASSERT(rmon != NULL); nope, can happen during process exit */
if (rmon != NULL) {
@@ -272,10 +278,11 @@ static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp)
rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref);
/* ASSERT(rmon != NULL); can happen during process exit */
if (rmon != NULL) {
+ ASSERT(rmon->type == MON_ORIGIN);
ASSERT(is_atom(rmon->name) || is_nil(rmon->name));
watched = (is_atom(rmon->name)
? TUPLE2(lhp, rmon->name, dep->sysname)
- : rmon->pid);
+ : rmon->u.pid);
#ifdef ERTS_SMP
rp_locks |= ERTS_PROC_LOCKS_MSG_SEND;
erts_smp_proc_lock(rp, ERTS_PROC_LOCKS_MSG_SEND);
@@ -332,7 +339,7 @@ static void doit_link_net_exits_sub(ErtsLink *sublnk, void *vlnecp)
erts_destroy_link(rlnk);
if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) {
/* We didn't exit the process and it is traced */
- trace_proc(NULL, rp, am_getting_unlinked, sublnk->pid);
+ trace_proc(NULL, 0, rp, am_getting_unlinked, sublnk->pid);
}
}
erts_smp_proc_unlock(rp, rp_locks);
@@ -373,10 +380,11 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp)
ASSERT(lnk->type == LINK_NODE);
if (is_internal_pid(lnk->pid)) {
ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK;
- rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks);
- if (!rp) {
+ ErlOffHeap *ohp;
+ rp = erts_proc_lookup(lnk->pid);
+ if (!rp)
goto done;
- }
+ erts_smp_proc_lock(rp, rp_locks);
rlnk = erts_remove_link(&ERTS_P_LINKS(rp), name);
if (rlnk != NULL) {
ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE));
@@ -384,12 +392,14 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp)
}
n = ERTS_LINK_REFC(lnk);
for (i = 0; i < n; ++i) {
- ErlHeapFragment* bp;
- ErlOffHeap *ohp;
Eterm tup;
- Eterm *hp = erts_alloc_message_heap(3,&bp,&ohp,rp,&rp_locks);
+ Eterm *hp;
+ ErtsMessage *msgp;
+
+ msgp = erts_alloc_message_heap(rp, &rp_locks,
+ 3, &hp, &ohp);
tup = TUPLE2(hp, am_nodedown, name);
- erts_queue_message(rp, &rp_locks, bp, tup, NIL);
+ erts_queue_message(rp, rp_locks, msgp, tup, am_system);
}
erts_smp_proc_unlock(rp, rp_locks);
}
@@ -619,7 +629,6 @@ alloc_dist_obuf(Uint size)
ErtsDistOutputBuf *obuf;
Uint obuf_size = sizeof(ErtsDistOutputBuf)+sizeof(byte)*(size-1);
Binary *bin = erts_bin_drv_alloc(obuf_size);
- erts_refc_init(&bin->refc, 1);
obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[0];
#ifdef DEBUG
obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN;
@@ -633,8 +642,7 @@ free_dist_obuf(ErtsDistOutputBuf *obuf)
{
Binary *bin = ErtsDistOutputBuf2Binary(obuf);
ASSERT(obuf->dbg_pattern == ERTS_DIST_OUTPUT_BUF_DBG_PATTERN);
- if (erts_refc_dectest(&bin->refc, 0) == 0)
- erts_bin_free(bin);
+ erts_bin_release(bin);
}
static ERTS_INLINE Sint
@@ -704,7 +712,7 @@ static void clear_dist_entry(DistEntry *dep)
}
}
-void erts_dsend_context_dtor(Binary* ctx_bin)
+int erts_dsend_context_dtor(Binary* ctx_bin)
{
ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin);
switch (ctx->dss.phase) {
@@ -721,6 +729,8 @@ void erts_dsend_context_dtor(Binary* ctx_bin)
}
if (ctx->dep_to_deref)
erts_deref_dist_entry(ctx->dep_to_deref);
+
+ return 1;
}
Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx)
@@ -732,24 +742,16 @@ Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx)
Binary* ctx_bin = erts_create_magic_binary(sizeof(struct exported_ctx),
erts_dsend_context_dtor);
struct exported_ctx* dst = ERTS_MAGIC_BIN_DATA(ctx_bin);
- Uint ctl_size = !HALFWORD_HEAP ? 0 : (arityval(ctx->ctl_heap[0]) + 1);
- Eterm* hp = HAlloc(p, ctl_size + PROC_BIN_SIZE);
+ Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
sys_memcpy(&dst->ctx, ctx, sizeof(ErtsSendContext));
ASSERT(ctx->dss.ctl == make_tuple(ctx->ctl_heap));
-#if !HALFWORD_HEAP
dst->ctx.dss.ctl = make_tuple(dst->ctx.ctl_heap);
-#else
- /* Must put control tuple in low mem */
- sys_memcpy(hp, ctx->ctl_heap, ctl_size*sizeof(Eterm));
- dst->ctx.dss.ctl = make_tuple(hp);
- hp += ctl_size;
-#endif
if (ctx->dss.acmp) {
sys_memcpy(&dst->acm, ctx->dss.acmp, sizeof(ErtsAtomCacheMap));
dst->ctx.dss.acmp = &dst->acm;
}
- return erts_mk_magic_binary_term(&hp, &MSO(p), ctx_bin);
+ return erts_mk_magic_ref(&hp, &MSO(p), ctx_bin);
}
@@ -794,7 +796,7 @@ erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote)
}
-/* A local process that's beeing monitored by a remote one exits. We send:
+/* A local process that's being monitored by a remote one exits. We send:
{DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason},
which is rather sad as only the ref is needed, no pid's... */
int
@@ -881,11 +883,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
DTRACE_CHARBUF(receiver_name, 64);
#endif
- if (SEQ_TRACE_TOKEN(sender) != NIL
-#ifdef USE_VM_PROBES
- && SEQ_TRACE_TOKEN(sender) != am_have_dt_utag
-#endif
- ) {
+ if (have_seqtrace(SEQ_TRACE_TOKEN(sender))) {
seq_trace_update_send(sender);
token = SEQ_TRACE_TOKEN(sender);
seq_trace_output(token, message, SEQ_TRACE_SEND, remote, sender);
@@ -900,7 +898,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)),
"%T", remote);
msize = size_object(message);
- if (token != NIL && token != am_have_dt_utag) {
+ if (have_seqtrace(token)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
@@ -910,9 +908,9 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
if (token != NIL)
ctl = TUPLE4(&ctx->ctl_heap[0],
- make_small(DOP_SEND_TT), am_Cookie, remote, token);
+ make_small(DOP_SEND_TT), am_Empty, remote, token);
else
- ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_SEND), am_Cookie, remote);
+ ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_SEND), am_Empty, remote);
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
DTRACE7(message_send_remote, sender_name, node_name, receiver_name,
@@ -942,11 +940,7 @@ erts_dsig_send_reg_msg(Eterm remote_name, Eterm message,
DTRACE_CHARBUF(receiver_name, 128);
#endif
- if (SEQ_TRACE_TOKEN(sender) != NIL
-#ifdef USE_VM_PROBES
- && SEQ_TRACE_TOKEN(sender) != am_have_dt_utag
-#endif
- ) {
+ if (have_seqtrace(SEQ_TRACE_TOKEN(sender))) {
seq_trace_update_send(sender);
token = SEQ_TRACE_TOKEN(sender);
seq_trace_output(token, message, SEQ_TRACE_SEND, remote_name, sender);
@@ -961,7 +955,7 @@ erts_dsig_send_reg_msg(Eterm remote_name, Eterm message,
erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)),
"{%T,%s}", remote_name, node_name);
msize = size_object(message);
- if (token != NIL && token != am_have_dt_utag) {
+ if (have_seqtrace(token)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
@@ -971,10 +965,10 @@ erts_dsig_send_reg_msg(Eterm remote_name, Eterm message,
if (token != NIL)
ctl = TUPLE5(&ctx->ctl_heap[0], make_small(DOP_REG_SEND_TT),
- sender->common.id, am_Cookie, remote_name, token);
+ sender->common.id, am_Empty, remote_name, token);
else
ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_REG_SEND),
- sender->common.id, am_Cookie, remote_name);
+ sender->common.id, am_Empty, remote_name);
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
DTRACE7(message_send_remote, sender_name, node_name, receiver_name,
@@ -1006,11 +1000,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
#endif
UseTmpHeapNoproc(6);
- if (token != NIL
-#ifdef USE_VM_PROBES
- && token != am_have_dt_utag
-#endif
- ) {
+ if (have_seqtrace(token)) {
seq_trace_update_send(dsdp->proc);
seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local);
ctl = TUPLE5(&ctl_heap[0],
@@ -1029,7 +1019,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
"{%T,%s}", remote, node_name);
erts_snprintf(reason_str, sizeof(DTRACE_CHARBUF_NAME(reason_str)),
"%T", reason);
- if (token != NIL && token != am_have_dt_utag) {
+ if (have_seqtrace(token)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
@@ -1136,7 +1126,6 @@ int erts_net_message(Port *prt,
byte *buf,
ErlDrvSizeT len)
{
-#define DIST_CTL_DEFAULT_SIZE 64
ErtsDistExternal ede;
byte *t;
Sint ctl_len;
@@ -1149,7 +1138,6 @@ int erts_net_message(Port *prt,
Process* rp;
DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
Eterm* ctl = ctl_default;
- ErlOffHeap off_heap;
ErtsHeapFactory factory;
Eterm* hp;
Sint type;
@@ -1164,9 +1152,6 @@ int erts_net_message(Port *prt,
#endif
UseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
- /* Thanks to Luke Gorrie */
- off_heap.first = NULL;
- off_heap.overhead = 0;
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -1227,15 +1212,15 @@ int erts_net_message(Port *prt,
}
hp = ctl;
- erts_factory_static_init(&factory, ctl, ctl_len, &off_heap);
+ erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF);
arg = erts_decode_dist_ext(&factory, &ede);
if (is_non_value(arg)) {
#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, "DIST MSG DEBUG: erts_dist_ext_size(CTL) failed:\n");
+ erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext(CTL) failed:\n");
bw(buf, orig_len);
#endif
PURIFY_MSG("data error");
- goto data_error;
+ goto decode_error;
}
ctl_len = t - buf;
@@ -1292,7 +1277,7 @@ int erts_net_message(Port *prt,
erts_smp_de_links_unlock(dep);
if (IS_TRACED_FL(rp, F_TRACE_PROCS))
- trace_proc(NULL, rp, am_getting_linked, from);
+ trace_proc(NULL, 0, rp, am_getting_linked, from);
erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
break;
@@ -1317,7 +1302,7 @@ int erts_net_message(Port *prt,
lnk = erts_remove_link(&ERTS_P_LINKS(rp), from);
if (IS_TRACED_FL(rp, F_TRACE_PROCS) && lnk != NULL) {
- trace_proc(NULL, rp, am_getting_unlinked, from);
+ trace_proc(NULL, 0, rp, am_getting_unlinked, from);
}
erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
@@ -1405,7 +1390,7 @@ int erts_net_message(Port *prt,
if (mon == NULL) {
break;
}
- watched = mon->pid;
+ watched = mon->u.pid;
erts_destroy_monitor(mon);
rp = erts_pid2proc_opt(NULL, 0,
watched, ERTS_PROC_LOCK_LINK,
@@ -1466,14 +1451,14 @@ int erts_net_message(Port *prt,
ErlOffHeap *ohp;
ASSERT(xsize);
heap_frag = erts_dist_ext_trailer(ede_copy);
- ERTS_INIT_HEAP_FRAG(heap_frag, token_size);
+ ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
hp = heap_frag->mem;
ohp = &heap_frag->off_heap;
token = tuple[5];
token = copy_struct(token, token_size, &hp, ohp);
}
- erts_queue_dist_message(rp, &locks, ede_copy, token);
+ erts_queue_dist_message(rp, locks, ede_copy, token, from);
if (locks)
erts_smp_proc_unlock(rp, locks);
}
@@ -1515,14 +1500,14 @@ int erts_net_message(Port *prt,
ErlOffHeap *ohp;
ASSERT(xsize);
heap_frag = erts_dist_ext_trailer(ede_copy);
- ERTS_INIT_HEAP_FRAG(heap_frag, token_size);
+ ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
hp = heap_frag->mem;
ohp = &heap_frag->off_heap;
token = tuple[4];
token = copy_struct(token, token_size, &hp, ohp);
}
- erts_queue_dist_message(rp, &locks, ede_copy, token);
+ erts_queue_dist_message(rp, locks, ede_copy, token, tuple[2]);
if (locks)
erts_smp_proc_unlock(rp, locks);
}
@@ -1563,7 +1548,7 @@ int erts_net_message(Port *prt,
if (mon == NULL) {
break;
}
- rp = erts_pid2proc(NULL, 0, mon->pid, rp_locks);
+ rp = erts_pid2proc(NULL, 0, mon->u.pid, rp_locks);
erts_destroy_monitor(mon);
if (rp == NULL) {
@@ -1580,7 +1565,7 @@ int erts_net_message(Port *prt,
watched = (is_not_nil(mon->name)
? TUPLE2(&lhp[0], mon->name, sysname)
- : mon->pid);
+ : mon->u.pid);
erts_queue_monitor_message(rp, &rp_locks,
ref, am_process, watched, reason);
@@ -1645,7 +1630,11 @@ int erts_net_message(Port *prt,
ERTS_XSIG_FLG_IGN_KILL);
if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) {
/* We didn't exit the process and it is traced */
- trace_proc(NULL, rp, am_getting_unlinked, from);
+ if (rp_locks & ERTS_PROC_LOCKS_XSIG_SEND) {
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCKS_XSIG_SEND);
+ rp_locks &= ~ERTS_PROC_LOCKS_XSIG_SEND;
+ }
+ trace_proc(NULL, 0, rp, am_getting_unlinked, from);
}
}
erts_smp_proc_unlock(rp, rp_locks);
@@ -1715,7 +1704,7 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- erts_cleanup_offheap(&off_heap);
+ erts_factory_close(&factory);
if (ctl != ctl_default) {
erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
}
@@ -1728,14 +1717,15 @@ int erts_net_message(Port *prt,
erts_dsprintf(dsbufp, "Invalid distribution message: %.200T", arg);
erts_send_error_to_logger_nogl(dsbufp);
}
- data_error:
+decode_error:
PURIFY_MSG("data error");
- erts_cleanup_offheap(&off_heap);
+ erts_factory_close(&factory);
if (ctl != ctl_default) {
erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
}
+data_error:
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
- erts_deliver_port_exit(prt, dep->cid, am_killed, 0);
+ erts_deliver_port_exit(prt, dep->cid, am_killed, 0, 1);
ERTS_SMP_CHK_NO_PROC_LOCKS;
return -1;
}
@@ -1790,8 +1780,8 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
#ifdef ERTS_DIST_MSG_DBG
erts_fprintf(stderr, ">>%s CTL: %T\n", ctx->pass_through_size ? "P" : " ", ctx->ctl);
- if (is_value(msg))
- erts_fprintf(stderr, " MSG: %T\n", msg);
+ if (is_value(ctx->msg))
+ erts_fprintf(stderr, " MSG: %T\n", ctx->msg);
#endif
ctx->data_size = ctx->pass_through_size;
@@ -1958,7 +1948,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
goto done;
}
default:
- erl_exit(ERTS_ABORT_EXIT, "dsig_send invalid phase (%d)\n", (int)ctx->phase);
+ erts_exit(ERTS_ABORT_EXIT, "dsig_send invalid phase (%d)\n", (int)ctx->phase);
}
}
@@ -1979,7 +1969,7 @@ dist_port_command(Port *prt, ErtsDistOutputBuf *obuf)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
if (size > (Uint) INT_MAX)
- erl_exit(ERTS_ABORT_EXIT,
+ erts_exit(ERTS_DUMP_EXIT,
"Absurdly large distribution output data buffer "
"(%beu bytes) passed.\n",
size);
@@ -2019,7 +2009,7 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
if (size > (Uint) INT_MAX)
- erl_exit(ERTS_ABORT_EXIT,
+ erts_exit(ERTS_DUMP_EXIT,
"Absurdly large distribution output data buffer "
"(%beu bytes) passed.\n",
size);
@@ -2061,9 +2051,9 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
}
-#if defined(ARCH_64) && !HALFWORD_HEAP
+#if defined(ARCH_64)
#define ERTS_PORT_REDS_MASK__ 0x003fffffffffffffL
-#elif defined(ARCH_32) || HALFWORD_HEAP
+#elif defined(ARCH_32)
#define ERTS_PORT_REDS_MASK__ 0x003fffff
#else
# error "Ohh come on ... !?!"
@@ -2093,7 +2083,7 @@ erts_dist_command(Port *prt, int reds_limit)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- erts_refc_inc(&dep->refc, 1); /* Otherwise dist_entry might be
+ erts_smp_refc_inc(&dep->refc, 1); /* Otherwise dist_entry might be
removed if port command fails */
erts_smp_atomic_set_mb(&dep->dist_cmd_scheduled, 0);
@@ -2105,7 +2095,7 @@ erts_dist_command(Port *prt, int reds_limit)
erts_smp_de_runlock(dep);
if (status & ERTS_DE_SFLG_EXITING) {
- erts_deliver_port_exit(prt, prt->common.id, am_killed, 0);
+ erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1);
erts_deref_dist_entry(dep);
return reds + ERTS_PORT_REDS_DIST_CMD_EXIT;
}
@@ -2414,36 +2404,36 @@ erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id)
}
struct print_to_data {
- int to;
+ fmtfn_t to;
void *arg;
};
static void doit_print_monitor_info(ErtsMonitor *mon, void *vptdp)
{
- int to = ((struct print_to_data *) vptdp)->to;
+ fmtfn_t to = ((struct print_to_data *) vptdp)->to;
void *arg = ((struct print_to_data *) vptdp)->arg;
Process *rp;
ErtsMonitor *rmon;
- rp = erts_proc_lookup(mon->pid);
+ rp = erts_proc_lookup(mon->u.pid);
if (!rp || (rmon = erts_lookup_monitor(ERTS_P_MONITORS(rp), mon->ref)) == NULL) {
- erts_print(to, arg, "Warning, stray monitor for: %T\n", mon->pid);
+ erts_print(to, arg, "Warning, stray monitor for: %T\n", mon->u.pid);
} else if (mon->type == MON_ORIGIN) {
/* Local pid is being monitored */
erts_print(to, arg, "Remotely monitored by: %T %T\n",
- mon->pid, rmon->pid);
+ mon->u.pid, rmon->u.pid);
} else {
- erts_print(to, arg, "Remote monitoring: %T ", mon->pid);
- if (is_not_atom(rmon->pid))
- erts_print(to, arg, "%T\n", rmon->pid);
+ erts_print(to, arg, "Remote monitoring: %T ", mon->u.pid);
+ if (is_not_atom(rmon->u.pid))
+ erts_print(to, arg, "%T\n", rmon->u.pid);
else
erts_print(to, arg, "{%T, %T}\n",
rmon->name,
- rmon->pid); /* which in this case is the
+ rmon->u.pid); /* which in this case is the
remote system name... */
}
}
-static void print_monitor_info(int to, void *arg, ErtsMonitor *mon)
+static void print_monitor_info(fmtfn_t to, void *arg, ErtsMonitor *mon)
{
struct print_to_data ptd = {to, arg};
erts_doforall_monitors(mon,&doit_print_monitor_info,&ptd);
@@ -2469,7 +2459,7 @@ static void doit_print_link_info(ErtsLink *lnk, void *vptdp)
}
}
-static void print_link_info(int to, void *arg, ErtsLink *lnk)
+static void print_link_info(fmtfn_t to, void *arg, ErtsLink *lnk)
{
struct print_to_data ptd = {to, arg};
erts_doforall_links(lnk, &doit_print_link_info, (void *) &ptd);
@@ -2490,7 +2480,7 @@ static void doit_print_nodelink_info(ErtsLink *lnk, void *vpcontext)
"Remote monitoring: %T %T\n", lnk->pid, pcontext->sysname);
}
-static void print_nodelink_info(int to, void *arg, ErtsLink *lnk, Eterm sysname)
+static void print_nodelink_info(fmtfn_t to, void *arg, ErtsLink *lnk, Eterm sysname)
{
PrintNodeLinkContext context = {{to, arg}, sysname};
erts_doforall_links(lnk, &doit_print_nodelink_info, &context);
@@ -2498,7 +2488,7 @@ static void print_nodelink_info(int to, void *arg, ErtsLink *lnk, Eterm sysname)
static int
-info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected)
+info_dist_entry(fmtfn_t to, void *arg, DistEntry *dep, int visible, int connected)
{
if (visible && connected) {
@@ -2526,7 +2516,7 @@ info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected)
erts_print(to, arg, "Name: %T", dep->sysname);
#ifdef DEBUG
- erts_print(to, arg, " (refc=%d)", erts_refc_read(&dep->refc, 0));
+ erts_print(to, arg, " (refc=%d)", erts_smp_refc_read(&dep->refc, 0));
#endif
erts_print(to, arg, "\n");
if (!connected && is_nil(dep->cid)) {
@@ -2547,7 +2537,7 @@ info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected)
return 0;
}
-int distribution_info(int to, void *arg) /* Called by break handler */
+int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */
{
DistEntry *dep;
@@ -2572,7 +2562,9 @@ int distribution_info(int to, void *arg) /* Called by break handler */
}
for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) {
- info_dist_entry(to, arg, dep, 0, 0);
+ if (dep != erts_this_dist_entry) {
+ info_dist_entry(to, arg, dep, 0, 0);
+ }
}
return(0);
@@ -2650,13 +2642,8 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2)
if (!net_kernel)
goto error;
- /* By setting dist_entry==erts_this_dist_entry and DISTRIBUTION on
- net_kernel do_net_exist will be called when net_kernel
- is terminated !! */
- (void) ERTS_PROC_SET_DIST_ENTRY(net_kernel,
- ERTS_PROC_LOCK_MAIN,
- erts_this_dist_entry);
- erts_refc_inc(&erts_this_dist_entry->refc, 2);
+ /* By setting F_DISTRIBUTION on net_kernel,
+ * do_net_exist will be called when net_kernel is terminated !! */
net_kernel->flags |= F_DISTRIBUTION;
if (net_kernel != BIF_P)
@@ -3017,11 +3004,11 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx);
- ASSERT(erts_no_of_not_connected_dist_entries >= 0);
+ ASSERT(erts_no_of_not_connected_dist_entries > 0);
ASSERT(erts_no_of_hidden_dist_entries >= 0);
ASSERT(erts_no_of_visible_dist_entries >= 0);
if(not_connected)
- length += erts_no_of_not_connected_dist_entries;
+ length += (erts_no_of_not_connected_dist_entries - 1);
if(hidden)
length += erts_no_of_hidden_dist_entries;
if(visible)
@@ -3043,8 +3030,10 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
#endif
if(not_connected)
for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) {
- result = CONS(hp, dep->sysname, result);
- hp += 2;
+ if (dep != erts_this_dist_entry) {
+ result = CONS(hp, dep->sysname, result);
+ hp += 2;
+ }
}
if(hidden)
for(dep = erts_hidden_dist_entries; dep; dep = dep->next) {
@@ -3275,11 +3264,16 @@ send_nodes_mon_msg(Process *rp,
Uint sz)
{
Eterm msg;
- ErlHeapFragment* bp;
+ Eterm *hp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
- Eterm *hp = erts_alloc_message_heap(sz, &bp, &ohp, rp, rp_locksp);
#ifdef DEBUG
- Eterm *hend = hp + sz;
+ Eterm *hend;
+#endif
+
+ mp = erts_alloc_message_heap(rp, rp_locksp, sz, &hp, &ohp);
+#ifdef DEBUG
+ hend = hp + sz;
#endif
if (!nmp->opts) {
@@ -3325,7 +3319,7 @@ send_nodes_mon_msg(Process *rp,
}
ASSERT(hend == hp);
- erts_queue_message(rp, rp_locksp, bp, msg, NIL);
+ erts_queue_message(rp, *rp_locksp, mp, msg, am_system);
}
static void
@@ -3382,7 +3376,7 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas
continue;
break;
default:
- erl_exit(ERTS_ABORT_EXIT, "Bad node type found\n");
+ erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n");
}
}
@@ -3691,7 +3685,7 @@ erts_processes_monitoring_nodes(Process *c_p)
case ERTS_NODES_MON_OPT_TYPES: type = am_all; break;
case ERTS_NODES_MON_OPT_TYPE_VISIBLE: type = am_visible; break;
case ERTS_NODES_MON_OPT_TYPE_HIDDEN: type = am_hidden; break;
- default: erl_exit(ERTS_ABORT_EXIT, "Bad node type found\n");
+ default: erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n");
}
olist = erts_bld_cons(hpp, szp,
erts_bld_tuple(hpp, szp, 2,