diff options
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r-- | erts/emulator/beam/dist.c | 705 |
1 files changed, 398 insertions, 307 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 44f4eb9d43..09c83f1117 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -1,18 +1,19 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1996-2013. All Rights Reserved. + * Copyright Ericsson AB 1996-2016. All Rights Reserved. * - * The contents of this file are subject to the Erlang Public License, - * Version 1.1, (the "License"); you may not use this file except in - * compliance with the License. You should have received a copy of the - * Erlang Public License along with this software. If not, it can be - * retrieved online at http://www.erlang.org/. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * * %CopyrightEnd% */ @@ -44,6 +45,8 @@ #include "erl_thr_progress.h" #include "dtrace-wrapper.h" +#define DIST_CTL_DEFAULT_SIZE 64 + /* Turn this on to get printouts of all distribution messages * which go on the line */ @@ -65,9 +68,13 @@ static void bw(byte *buf, ErlDrvSizeT sz) static void dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) { + ErtsHeapFactory factory; + DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE); + Eterm* ctl = ctl_default; byte *extp = edep->extp; Eterm msg; - Sint size = erts_decode_dist_ext_size(edep); + Sint ctl_len; + Sint size = ctl_len = erts_decode_dist_ext_size(edep); if (size < 0) { erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(%s) failed:\n", @@ -75,10 +82,9 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) bw(buf, sz); } else { - Eterm *hp; ErlHeapFragment *mbuf = new_message_buffer(size); - hp = mbuf->mem; - msg = erts_decode_dist_ext(&hp, &mbuf->off_heap, edep); + erts_factory_static_init(&factory, ctl, ctl_len, &mbuf->off_heap); + msg = erts_decode_dist_ext(&factory, edep); if (is_value(msg)) erts_fprintf(stderr, " %s: %T\n", what, msg); else { @@ -119,7 +125,7 @@ Export* dmonitor_p_trap = NULL; /* forward declarations */ static void clear_dist_entry(DistEntry*); -static int dsig_send(ErtsDSigData *, Eterm, Eterm, int); +static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); @@ -331,7 +337,7 @@ static void doit_link_net_exits_sub(ErtsLink *sublnk, void *vlnecp) erts_destroy_link(rlnk); if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) { /* We didn't exit the process and it is traced */ - trace_proc(NULL, rp, am_getting_unlinked, sublnk->pid); + trace_proc(NULL, 0, rp, am_getting_unlinked, sublnk->pid); } } erts_smp_proc_unlock(rp, rp_locks); @@ -353,7 +359,7 @@ static void doit_link_net_exits_sub(ErtsLink *sublnk, void *vlnecp) static void doit_link_net_exits(ErtsLink *lnk, void *vnecp) { LinkNetExitsContext lnec = {(NetExitsContext *) vnecp, lnk}; - ASSERT(lnk->type == LINK_PID) + ASSERT(lnk->type == LINK_PID); erts_sweep_links(ERTS_LINK_ROOT(lnk), &doit_link_net_exits_sub, (void *) &lnec); #ifdef DEBUG ERTS_LINK_ROOT(lnk) = NULL; @@ -369,13 +375,14 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp) Process *rp; ErtsLink *rlnk; Uint i,n; - ASSERT(lnk->type == LINK_NODE) + ASSERT(lnk->type == LINK_NODE); if (is_internal_pid(lnk->pid)) { ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK; - rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks); - if (!rp) { + ErlOffHeap *ohp; + rp = erts_proc_lookup(lnk->pid); + if (!rp) goto done; - } + erts_smp_proc_lock(rp, rp_locks); rlnk = erts_remove_link(&ERTS_P_LINKS(rp), name); if (rlnk != NULL) { ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE)); @@ -383,16 +390,14 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp) } n = ERTS_LINK_REFC(lnk); for (i = 0; i < n; ++i) { - ErlHeapFragment* bp; - ErlOffHeap *ohp; Eterm tup; - Eterm *hp = erts_alloc_message_heap(3,&bp,&ohp,rp,&rp_locks); + Eterm *hp; + ErtsMessage *msgp; + + msgp = erts_alloc_message_heap(rp, &rp_locks, + 3, &hp, &ohp); tup = TUPLE2(hp, am_nodedown, name); - erts_queue_message(rp, &rp_locks, bp, tup, NIL -#ifdef USE_VM_PROBES - , NIL -#endif - ); + erts_queue_message(rp, rp_locks, msgp, tup, am_system); } erts_smp_proc_unlock(rp, rp_locks); } @@ -622,9 +627,7 @@ alloc_dist_obuf(Uint size) ErtsDistOutputBuf *obuf; Uint obuf_size = sizeof(ErtsDistOutputBuf)+sizeof(byte)*(size-1); Binary *bin = erts_bin_drv_alloc(obuf_size); - bin->flags = BIN_FLAG_DRV; erts_refc_init(&bin->refc, 1); - bin->orig_size = (SWord) obuf_size; obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[0]; #ifdef DEBUG obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN; @@ -709,6 +712,47 @@ static void clear_dist_entry(DistEntry *dep) } } +void erts_dsend_context_dtor(Binary* ctx_bin) +{ + ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin); + switch (ctx->dss.phase) { + case ERTS_DSIG_SEND_PHASE_MSG_SIZE: + DESTROY_SAVED_WSTACK(&ctx->dss.u.sc.wstack); + break; + case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: + DESTROY_SAVED_WSTACK(&ctx->dss.u.ec.wstack); + break; + default:; + } + if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) { + free_dist_obuf(ctx->dss.obuf); + } + if (ctx->dep_to_deref) + erts_deref_dist_entry(ctx->dep_to_deref); +} + +Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx) +{ + struct exported_ctx { + ErtsSendContext ctx; + ErtsAtomCacheMap acm; + }; + Binary* ctx_bin = erts_create_magic_binary(sizeof(struct exported_ctx), + erts_dsend_context_dtor); + struct exported_ctx* dst = ERTS_MAGIC_BIN_DATA(ctx_bin); + Eterm* hp = HAlloc(p, PROC_BIN_SIZE); + + sys_memcpy(&dst->ctx, ctx, sizeof(ErtsSendContext)); + ASSERT(ctx->dss.ctl == make_tuple(ctx->ctl_heap)); + dst->ctx.dss.ctl = make_tuple(dst->ctx.ctl_heap); + if (ctx->dss.acmp) { + sys_memcpy(&dst->acm, ctx->dss.acmp, sizeof(ErtsAtomCacheMap)); + dst->ctx.dss.acmp = &dst->acm; + } + return erts_mk_magic_binary_term(&hp, &MSO(p), ctx_bin); +} + + /* * The erts_dsig_send_*() functions implemented below, sends asynchronous * distributed signals to other Erlang nodes. Before sending a distributed @@ -731,7 +775,7 @@ erts_dsig_send_link(ErtsDSigData *dsdp, Eterm local, Eterm remote) int res; UseTmpHeapNoproc(4); - res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0); + res = dsig_send_ctl(dsdp, ctl, 0); UnUseTmpHeapNoproc(4); return res; } @@ -744,7 +788,7 @@ erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote) int res; UseTmpHeapNoproc(4); - res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0); + res = dsig_send_ctl(dsdp, ctl, 0); UnUseTmpHeapNoproc(4); return res; } @@ -772,7 +816,7 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, erts_smp_de_links_unlock(dsdp->dep); #endif - res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1); + res = dsig_send_ctl(dsdp, ctl, 1); UnUseTmpHeapNoproc(6); return res; } @@ -793,7 +837,7 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, make_small(DOP_MONITOR_P), watcher, watched, ref); - res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0); + res = dsig_send_ctl(dsdp, ctl, 0); UnUseTmpHeapNoproc(5); return res; } @@ -815,18 +859,17 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher, make_small(DOP_DEMONITOR_P), watcher, watched, ref); - res = dsig_send(dsdp, ctl, THE_NON_VALUE, force); + res = dsig_send_ctl(dsdp, ctl, force); UnUseTmpHeapNoproc(5); return res; } int -erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message) +erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) { Eterm ctl; - DeclareTmpHeapNoproc(ctl_heap,5); Eterm token = NIL; - Process *sender = dsdp->proc; + Process *sender = ctx->dsd.proc; int res; #ifdef USE_VM_PROBES Sint tok_label = 0; @@ -838,12 +881,7 @@ erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message) DTRACE_CHARBUF(receiver_name, 64); #endif - UseTmpHeapNoproc(5); - if (SEQ_TRACE_TOKEN(sender) != NIL -#ifdef USE_VM_PROBES - && SEQ_TRACE_TOKEN(sender) != am_have_dt_utag -#endif - ) { + if (have_seqtrace(SEQ_TRACE_TOKEN(sender))) { seq_trace_update_send(sender); token = SEQ_TRACE_TOKEN(sender); seq_trace_output(token, message, SEQ_TRACE_SEND, remote, sender); @@ -851,11 +889,14 @@ erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message) #ifdef USE_VM_PROBES *node_name = *sender_name = *receiver_name = '\0'; if (DTRACE_ENABLED(message_send) || DTRACE_ENABLED(message_send_remote)) { - erts_snprintf(node_name, sizeof(node_name), "%T", dsdp->dep->sysname); - erts_snprintf(sender_name, sizeof(sender_name), "%T", sender->common.id); - erts_snprintf(receiver_name, sizeof(receiver_name), "%T", remote); + erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)), + "%T", ctx->dsd.dep->sysname); + erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), + "%T", sender->common.id); + erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)), + "%T", remote); msize = size_object(message); - if (token != NIL && token != am_have_dt_utag) { + if (have_seqtrace(token)) { tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); @@ -864,26 +905,28 @@ erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message) #endif if (token != NIL) - ctl = TUPLE4(&ctl_heap[0], - make_small(DOP_SEND_TT), am_Cookie, remote, token); + ctl = TUPLE4(&ctx->ctl_heap[0], + make_small(DOP_SEND_TT), am_Empty, remote, token); else - ctl = TUPLE3(&ctl_heap[0], make_small(DOP_SEND), am_Cookie, remote); + ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_SEND), am_Empty, remote); DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); DTRACE7(message_send_remote, sender_name, node_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); - res = dsig_send(dsdp, ctl, message, 0); - UnUseTmpHeapNoproc(5); + ctx->dss.ctl = ctl; + ctx->dss.msg = message; + ctx->dss.force_busy = 0; + res = erts_dsig_send(&ctx->dsd, &ctx->dss); return res; } int -erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message) +erts_dsig_send_reg_msg(Eterm remote_name, Eterm message, + ErtsSendContext* ctx) { Eterm ctl; - DeclareTmpHeapNoproc(ctl_heap,6); Eterm token = NIL; - Process *sender = dsdp->proc; + Process *sender = ctx->dsd.proc; int res; #ifdef USE_VM_PROBES Sint tok_label = 0; @@ -895,12 +938,7 @@ erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message) DTRACE_CHARBUF(receiver_name, 128); #endif - UseTmpHeapNoproc(6); - if (SEQ_TRACE_TOKEN(sender) != NIL -#ifdef USE_VM_PROBES - && SEQ_TRACE_TOKEN(sender) != am_have_dt_utag -#endif - ) { + if (have_seqtrace(SEQ_TRACE_TOKEN(sender))) { seq_trace_update_send(sender); token = SEQ_TRACE_TOKEN(sender); seq_trace_output(token, message, SEQ_TRACE_SEND, remote_name, sender); @@ -908,12 +946,14 @@ erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message) #ifdef USE_VM_PROBES *node_name = *sender_name = *receiver_name = '\0'; if (DTRACE_ENABLED(message_send) || DTRACE_ENABLED(message_send_remote)) { - erts_snprintf(node_name, sizeof(node_name), "%T", dsdp->dep->sysname); - erts_snprintf(sender_name, sizeof(sender_name), "%T", sender->common.id); - erts_snprintf(receiver_name, sizeof(receiver_name), + erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)), + "%T", ctx->dsd.dep->sysname); + erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), + "%T", sender->common.id); + erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)), "{%T,%s}", remote_name, node_name); msize = size_object(message); - if (token != NIL && token != am_have_dt_utag) { + if (have_seqtrace(token)) { tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); @@ -922,17 +962,19 @@ erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message) #endif if (token != NIL) - ctl = TUPLE5(&ctl_heap[0], make_small(DOP_REG_SEND_TT), - sender->common.id, am_Cookie, remote_name, token); + ctl = TUPLE5(&ctx->ctl_heap[0], make_small(DOP_REG_SEND_TT), + sender->common.id, am_Empty, remote_name, token); else - ctl = TUPLE4(&ctl_heap[0], make_small(DOP_REG_SEND), - sender->common.id, am_Cookie, remote_name); + ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_REG_SEND), + sender->common.id, am_Empty, remote_name); DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); DTRACE7(message_send_remote, sender_name, node_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); - res = dsig_send(dsdp, ctl, message, 0); - UnUseTmpHeapNoproc(6); + ctx->dss.ctl = ctl; + ctx->dss.msg = message; + ctx->dss.force_busy = 0; + res = erts_dsig_send(&ctx->dsd, &ctx->dss); return res; } @@ -956,11 +998,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, #endif UseTmpHeapNoproc(6); - if (token != NIL -#ifdef USE_VM_PROBES - && token != am_have_dt_utag -#endif - ) { + if (have_seqtrace(token)) { seq_trace_update_send(dsdp->proc); seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local); ctl = TUPLE5(&ctl_heap[0], @@ -971,12 +1009,15 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, #ifdef USE_VM_PROBES *node_name = *sender_name = *remote_name = '\0'; if (DTRACE_ENABLED(process_exit_signal_remote)) { - erts_snprintf(node_name, sizeof(node_name), "%T", dsdp->dep->sysname); - erts_snprintf(sender_name, sizeof(sender_name), "%T", sender->common.id); - erts_snprintf(remote_name, sizeof(remote_name), + erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)), + "%T", dsdp->dep->sysname); + erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), + "%T", sender->common.id); + erts_snprintf(remote_name, sizeof(DTRACE_CHARBUF_NAME(remote_name)), "{%T,%s}", remote, node_name); - erts_snprintf(reason_str, sizeof(reason), "%T", reason); - if (token != NIL && token != am_have_dt_utag) { + erts_snprintf(reason_str, sizeof(DTRACE_CHARBUF_NAME(reason_str)), + "%T", reason); + if (have_seqtrace(token)) { tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); @@ -986,7 +1027,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, DTRACE7(process_exit_signal_remote, sender_name, node_name, remote_name, reason_str, tok_label, tok_lastcnt, tok_serial); /* forced, i.e ignore busy */ - res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1); + res = dsig_send_ctl(dsdp, ctl, 1); UnUseTmpHeapNoproc(6); return res; } @@ -1002,7 +1043,7 @@ erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason) ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason); /* forced, i.e ignore busy */ - res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1); + res = dsig_send_ctl(dsdp, ctl, 1); UnUseTmpHeapNoproc(5); return res; } @@ -1018,7 +1059,7 @@ erts_dsig_send_exit2(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT2), local, remote, reason); - res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0); + res = dsig_send_ctl(dsdp, ctl, 0); UnUseTmpHeapNoproc(5); return res; } @@ -1035,7 +1076,7 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote) ctl = TUPLE3(&ctl_heap[0], make_small(DOP_GROUP_LEADER), leader, remote); - res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0); + res = dsig_send_ctl(dsdp, ctl, 0); UnUseTmpHeapNoproc(4); return res; } @@ -1083,7 +1124,6 @@ int erts_net_message(Port *prt, byte *buf, ErlDrvSizeT len) { -#define DIST_CTL_DEFAULT_SIZE 64 ErtsDistExternal ede; byte *t; Sint ctl_len; @@ -1096,7 +1136,7 @@ int erts_net_message(Port *prt, Process* rp; DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE); Eterm* ctl = ctl_default; - ErlOffHeap off_heap; + ErtsHeapFactory factory; Eterm* hp; Sint type; Eterm token; @@ -1110,9 +1150,6 @@ int erts_net_message(Port *prt, #endif UseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); - /* Thanks to Luke Gorrie */ - off_heap.first = NULL; - off_heap.overhead = 0; ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -1173,14 +1210,15 @@ int erts_net_message(Port *prt, } hp = ctl; - arg = erts_decode_dist_ext(&hp, &off_heap, &ede); + erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF); + arg = erts_decode_dist_ext(&factory, &ede); if (is_non_value(arg)) { #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, "DIST MSG DEBUG: erts_dist_ext_size(CTL) failed:\n"); + erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext(CTL) failed:\n"); bw(buf, orig_len); #endif PURIFY_MSG("data error"); - goto data_error; + goto decode_error; } ctl_len = t - buf; @@ -1237,7 +1275,7 @@ int erts_net_message(Port *prt, erts_smp_de_links_unlock(dep); if (IS_TRACED_FL(rp, F_TRACE_PROCS)) - trace_proc(NULL, rp, am_getting_linked, from); + trace_proc(NULL, 0, rp, am_getting_linked, from); erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); break; @@ -1262,7 +1300,7 @@ int erts_net_message(Port *prt, lnk = erts_remove_link(&ERTS_P_LINKS(rp), from); if (IS_TRACED_FL(rp, F_TRACE_PROCS) && lnk != NULL) { - trace_proc(NULL, rp, am_getting_unlinked, from); + trace_proc(NULL, 0, rp, am_getting_unlinked, from); } erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); @@ -1411,14 +1449,14 @@ int erts_net_message(Port *prt, ErlOffHeap *ohp; ASSERT(xsize); heap_frag = erts_dist_ext_trailer(ede_copy); - ERTS_INIT_HEAP_FRAG(heap_frag, token_size); + ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size); hp = heap_frag->mem; ohp = &heap_frag->off_heap; token = tuple[5]; token = copy_struct(token, token_size, &hp, ohp); } - erts_queue_dist_message(rp, &locks, ede_copy, token); + erts_queue_dist_message(rp, locks, ede_copy, token, from); if (locks) erts_smp_proc_unlock(rp, locks); } @@ -1460,14 +1498,14 @@ int erts_net_message(Port *prt, ErlOffHeap *ohp; ASSERT(xsize); heap_frag = erts_dist_ext_trailer(ede_copy); - ERTS_INIT_HEAP_FRAG(heap_frag, token_size); + ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size); hp = heap_frag->mem; ohp = &heap_frag->off_heap; token = tuple[4]; token = copy_struct(token, token_size, &hp, ohp); } - erts_queue_dist_message(rp, &locks, ede_copy, token); + erts_queue_dist_message(rp, locks, ede_copy, token, tuple[2]); if (locks) erts_smp_proc_unlock(rp, locks); } @@ -1509,12 +1547,12 @@ int erts_net_message(Port *prt, break; } rp = erts_pid2proc(NULL, 0, mon->pid, rp_locks); + + erts_destroy_monitor(mon); if (rp == NULL) { break; } - erts_destroy_monitor(mon); - mon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref); if (mon == NULL) { @@ -1590,7 +1628,11 @@ int erts_net_message(Port *prt, ERTS_XSIG_FLG_IGN_KILL); if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) { /* We didn't exit the process and it is traced */ - trace_proc(NULL, rp, am_getting_unlinked, from); + if (rp_locks & ERTS_PROC_LOCKS_XSIG_SEND) { + erts_smp_proc_unlock(rp, ERTS_PROC_LOCKS_XSIG_SEND); + rp_locks &= ~ERTS_PROC_LOCKS_XSIG_SEND; + } + trace_proc(NULL, 0, rp, am_getting_unlinked, from); } } erts_smp_proc_unlock(rp, rp_locks); @@ -1660,7 +1702,7 @@ int erts_net_message(Port *prt, goto invalid_message; } - erts_cleanup_offheap(&off_heap); + erts_factory_close(&factory); if (ctl != ctl_default) { erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl); } @@ -1673,203 +1715,248 @@ int erts_net_message(Port *prt, erts_dsprintf(dsbufp, "Invalid distribution message: %.200T", arg); erts_send_error_to_logger_nogl(dsbufp); } - data_error: +decode_error: PURIFY_MSG("data error"); - erts_cleanup_offheap(&off_heap); + erts_factory_close(&factory); if (ctl != ctl_default) { erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl); } +data_error: UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); - erts_deliver_port_exit(prt, dep->cid, am_killed, 0); + erts_deliver_port_exit(prt, dep->cid, am_killed, 0, 1); ERTS_SMP_CHK_NO_PROC_LOCKS; return -1; } -static int -dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy) +static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy) { + struct erts_dsig_send_context ctx; + int ret; + ctx.ctl = ctl; + ctx.msg = THE_NON_VALUE; + ctx.force_busy = force_busy; + ctx.phase = ERTS_DSIG_SEND_PHASE_INIT; +#ifdef DEBUG + ctx.reds = 1; /* provoke assert below (no reduction count without msg) */ +#endif + ret = erts_dsig_send(dsdp, &ctx); + ASSERT(ret != ERTS_DSIG_SEND_CONTINUE); + return ret; +} + +int +erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) +{ + int retval; + Sint initial_reds = ctx->reds; Eterm cid; - int suspended = 0; - int resume = 0; - Uint32 pass_through_size; - Uint data_size, dhdr_ext_size; - ErtsAtomCacheMap *acmp; - ErtsDistOutputBuf *obuf; - DistEntry *dep = dsdp->dep; - Uint32 flags = dep->flags; - Process *c_p = dsdp->proc; - if (!c_p || dsdp->no_suspend) - force_busy = 1; + while (1) { + switch (ctx->phase) { + case ERTS_DSIG_SEND_PHASE_INIT: + ctx->flags = dsdp->dep->flags; + ctx->c_p = dsdp->proc; - ERTS_SMP_LC_ASSERT(!c_p - || (ERTS_PROC_LOCK_MAIN - == erts_proc_lc_my_proc_locks(c_p))); + if (!ctx->c_p || dsdp->no_suspend) + ctx->force_busy = 1; - if (!erts_is_alive) - return ERTS_DSIG_SEND_OK; + ERTS_SMP_LC_ASSERT(!ctx->c_p + || (ERTS_PROC_LOCK_MAIN + == erts_proc_lc_my_proc_locks(ctx->c_p))); - if (flags & DFLAG_DIST_HDR_ATOM_CACHE) { - acmp = erts_get_atom_cache_map(c_p); - pass_through_size = 0; - } - else { - acmp = NULL; - pass_through_size = 1; - } + if (!erts_is_alive) + return ERTS_DSIG_SEND_OK; -#ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, ">>%s CTL: %T\n", pass_through_size ? "P" : " ", ctl); - if (is_value(msg)) - erts_fprintf(stderr, " MSG: %T\n", msg); -#endif + if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) { + ctx->acmp = erts_get_atom_cache_map(ctx->c_p); + ctx->pass_through_size = 0; + } + else { + ctx->acmp = NULL; + ctx->pass_through_size = 1; + } - data_size = pass_through_size; - erts_reset_atom_cache_map(acmp); - data_size += erts_encode_dist_ext_size(ctl, flags, acmp); - if (is_value(msg)) - data_size += erts_encode_dist_ext_size(msg, flags, acmp); - erts_finalize_atom_cache_map(acmp, flags); + #ifdef ERTS_DIST_MSG_DBG + erts_fprintf(stderr, ">>%s CTL: %T\n", ctx->pass_through_size ? "P" : " ", ctx->ctl); + if (is_value(ctx->msg)) + erts_fprintf(stderr, " MSG: %T\n", ctx->msg); + #endif + + ctx->data_size = ctx->pass_through_size; + erts_reset_atom_cache_map(ctx->acmp); + erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size); + + if (is_value(ctx->msg)) { + ctx->u.sc.wstack.wstart = NULL; + ctx->u.sc.flags = ctx->flags; + ctx->u.sc.level = 0; + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; + } else { + ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; + } + break; - dhdr_ext_size = erts_encode_ext_dist_header_size(acmp); - data_size += dhdr_ext_size; + case ERTS_DSIG_SEND_PHASE_MSG_SIZE: + if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) { + retval = ERTS_DSIG_SEND_CONTINUE; + goto done; + } - obuf = alloc_dist_obuf(data_size); - obuf->ext_endp = &obuf->data[0] + pass_through_size + dhdr_ext_size; + ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; + case ERTS_DSIG_SEND_PHASE_ALLOC: + erts_finalize_atom_cache_map(ctx->acmp, ctx->flags); + + ctx->dhdr_ext_size = erts_encode_ext_dist_header_size(ctx->acmp); + ctx->data_size += ctx->dhdr_ext_size; + + ctx->obuf = alloc_dist_obuf(ctx->data_size); + ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->pass_through_size + ctx->dhdr_ext_size; + + /* Encode internal version of dist header */ + ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp); + /* Encode control message */ + erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL); + if (is_value(ctx->msg)) { + ctx->u.ec.flags = ctx->flags; + ctx->u.ec.level = 0; + ctx->u.ec.wstack.wstart = NULL; + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; + } else { + ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; + } + break; - /* Encode internal version of dist header */ - obuf->extp = erts_encode_ext_dist_header_setup(obuf->ext_endp, acmp); - /* Encode control message */ - erts_encode_dist_ext(ctl, &obuf->ext_endp, flags, acmp); - if (is_value(msg)) { - /* Encode message */ - erts_encode_dist_ext(msg, &obuf->ext_endp, flags, acmp); - } + case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: + if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) { + retval = ERTS_DSIG_SEND_CONTINUE; + goto done; + } - ASSERT(obuf->extp < obuf->ext_endp); - ASSERT(&obuf->data[0] <= obuf->extp - pass_through_size); - ASSERT(obuf->ext_endp <= &obuf->data[0] + data_size); + ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; + case ERTS_DSIG_SEND_PHASE_FIN: { + DistEntry *dep = dsdp->dep; + int suspended = 0; + int resume = 0; - data_size = obuf->ext_endp - obuf->extp; + ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp); + ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->pass_through_size); + ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size); - /* - * Signal encoded; now verify that the connection still exists, - * and if so enqueue the signal and schedule it for send. - */ - obuf->next = NULL; - erts_smp_de_rlock(dep); - cid = dep->cid; - if (cid != dsdp->cid - || dep->connection_id != dsdp->connection_id - || dep->status & ERTS_DE_SFLG_EXITING) { - /* Not the same connection as when we started; drop message... */ - erts_smp_de_runlock(dep); - free_dist_obuf(obuf); - } - else { - ErtsProcList *plp = NULL; - erts_smp_mtx_lock(&dep->qlock); - dep->qsize += size_obuf(obuf); - if (dep->qsize >= erts_dist_buf_busy_limit) - dep->qflgs |= ERTS_DE_QFLG_BUSY; - if (!force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) { - erts_smp_mtx_unlock(&dep->qlock); + ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp; - plp = erts_proclist_create(c_p); - erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL); - suspended = 1; - erts_smp_mtx_lock(&dep->qlock); - } + /* + * Signal encoded; now verify that the connection still exists, + * and if so enqueue the signal and schedule it for send. + */ + ctx->obuf->next = NULL; + erts_smp_de_rlock(dep); + cid = dep->cid; + if (cid != dsdp->cid + || dep->connection_id != dsdp->connection_id + || dep->status & ERTS_DE_SFLG_EXITING) { + /* Not the same connection as when we started; drop message... */ + erts_smp_de_runlock(dep); + free_dist_obuf(ctx->obuf); + } + else { + ErtsProcList *plp = NULL; + erts_smp_mtx_lock(&dep->qlock); + dep->qsize += size_obuf(ctx->obuf); + if (dep->qsize >= erts_dist_buf_busy_limit) + dep->qflgs |= ERTS_DE_QFLG_BUSY; + if (!ctx->force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) { + erts_smp_mtx_unlock(&dep->qlock); + + plp = erts_proclist_create(ctx->c_p); + erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL); + suspended = 1; + erts_smp_mtx_lock(&dep->qlock); + } - /* Enqueue obuf on dist entry */ - if (dep->out_queue.last) - dep->out_queue.last->next = obuf; - else - dep->out_queue.first = obuf; - dep->out_queue.last = obuf; + /* Enqueue obuf on dist entry */ + if (dep->out_queue.last) + dep->out_queue.last->next = ctx->obuf; + else + dep->out_queue.first = ctx->obuf; + dep->out_queue.last = ctx->obuf; + + if (!ctx->force_busy) { + if (!(dep->qflgs & ERTS_DE_QFLG_BUSY)) { + if (suspended) + resume = 1; /* was busy when we started, but isn't now */ + #ifdef USE_VM_PROBES + if (resume && DTRACE_ENABLED(dist_port_not_busy)) { + DTRACE_CHARBUF(port_str, 64); + DTRACE_CHARBUF(remote_str, 64); + + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), + "%T", cid); + erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), + "%T", dep->sysname); + DTRACE3(dist_port_not_busy, erts_this_node_sysname, + port_str, remote_str); + } + #endif + } + else { + /* Enqueue suspended process on dist entry */ + ASSERT(plp); + erts_proclist_store_last(&dep->suspended, plp); + } + } - if (!force_busy) { - if (!(dep->qflgs & ERTS_DE_QFLG_BUSY)) { - if (suspended) - resume = 1; /* was busy when we started, but isn't now */ -#ifdef USE_VM_PROBES - if (resume && DTRACE_ENABLED(dist_port_not_busy)) { - DTRACE_CHARBUF(port_str, 64); - DTRACE_CHARBUF(remote_str, 64); - - erts_snprintf(port_str, sizeof(port_str), "%T", cid); - erts_snprintf(remote_str, sizeof(remote_str), - "%T", dep->sysname); - DTRACE3(dist_port_not_busy, erts_this_node_sysname, - port_str, remote_str); - } -#endif + erts_smp_mtx_unlock(&dep->qlock); + erts_schedule_dist_command(NULL, dep); + erts_smp_de_runlock(dep); + + if (resume) { + erts_resume(ctx->c_p, ERTS_PROC_LOCK_MAIN); + erts_proclist_destroy(plp); + /* + * Note that the calling process still have to yield as if it + * suspended. If not, the calling process could later be + * erroneously scheduled when it shouldn't be. + */ + } } - else { - /* Enqueue suspended process on dist entry */ - ASSERT(plp); - erts_proclist_store_last(&dep->suspended, plp); + ctx->obuf = NULL; + + if (suspended) { + #ifdef USE_VM_PROBES + if (!resume && DTRACE_ENABLED(dist_port_busy)) { + DTRACE_CHARBUF(port_str, 64); + DTRACE_CHARBUF(remote_str, 64); + DTRACE_CHARBUF(pid_str, 16); + + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", cid); + erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), + "%T", dep->sysname); + erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), + "%T", ctx->c_p->common.id); + DTRACE4(dist_port_busy, erts_this_node_sysname, + port_str, remote_str, pid_str); + } + #endif + if (!resume && erts_system_monitor_flags.busy_dist_port) + monitor_generic(ctx->c_p, am_busy_dist_port, cid); + retval = ERTS_DSIG_SEND_YIELD; + } else { + retval = ERTS_DSIG_SEND_OK; } + goto done; } - - erts_smp_mtx_unlock(&dep->qlock); - erts_schedule_dist_command(NULL, dep); - erts_smp_de_runlock(dep); - - if (resume) { - erts_resume(c_p, ERTS_PROC_LOCK_MAIN); - erts_proclist_destroy(plp); - /* - * Note that the calling process still have to yield as if it - * suspended. If not, the calling process could later be - * erroneously scheduled when it shouldn't be. - */ + default: + erts_exit(ERTS_ABORT_EXIT, "dsig_send invalid phase (%d)\n", (int)ctx->phase); } } - if (c_p) { - int reds; - /* - * Bump reductions on calling process. - * - * This is the reduction cost: Always a base cost of 8 reductions - * plus 16 reductions per kilobyte generated external data. - */ - - data_size >>= (10-4); -#if defined(ARCH_64) && !HALFWORD_HEAP - data_size &= 0x003fffffffffffff; -#elif defined(ARCH_32) || HALFWORD_HEAP - data_size &= 0x003fffff; -#else -# error "Ohh come on ... !?!" -#endif - reds = 8 + ((int) data_size > 1000000 ? 1000000 : (int) data_size); - BUMP_REDS(c_p, reds); - } - - if (suspended) { -#ifdef USE_VM_PROBES - if (!resume && DTRACE_ENABLED(dist_port_busy)) { - DTRACE_CHARBUF(port_str, 64); - DTRACE_CHARBUF(remote_str, 64); - DTRACE_CHARBUF(pid_str, 16); - - erts_snprintf(port_str, sizeof(port_str), "%T", cid); - erts_snprintf(remote_str, sizeof(remote_str), "%T", dep->sysname); - erts_snprintf(pid_str, sizeof(pid_str), "%T", c_p->common.id); - DTRACE4(dist_port_busy, erts_this_node_sysname, - port_str, remote_str, pid_str); - } -#endif - if (!resume && erts_system_monitor_flags.busy_dist_port) - monitor_generic(c_p, am_busy_dist_port, cid); - return ERTS_DSIG_SEND_YIELD; +done: + if (ctx->msg && ctx->c_p) { + BUMP_REDS(ctx->c_p, (initial_reds - ctx->reds) / TERM_TO_BINARY_LOOP_FACTOR); } - return ERTS_DSIG_SEND_OK; + return retval; } - static Uint dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) { @@ -1880,7 +1967,7 @@ dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); if (size > (Uint) INT_MAX) - erl_exit(ERTS_ABORT_EXIT, + erts_exit(ERTS_DUMP_EXIT, "Absurdly large distribution output data buffer " "(%beu bytes) passed.\n", size); @@ -1890,8 +1977,9 @@ dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); - erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id); - erts_snprintf(remote_str, sizeof(remote_str), + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), + "%T", prt->common.id); + erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), "%T", prt->dist_entry->sysname); DTRACE4(dist_output, erts_this_node_sysname, port_str, remote_str, size); @@ -1919,7 +2007,7 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); if (size > (Uint) INT_MAX) - erl_exit(ERTS_ABORT_EXIT, + erts_exit(ERTS_DUMP_EXIT, "Absurdly large distribution output data buffer " "(%beu bytes) passed.\n", size); @@ -1944,8 +2032,9 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); - erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id); - erts_snprintf(remote_str, sizeof(remote_str), + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), + "%T", prt->common.id); + erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), "%T", prt->dist_entry->sysname); DTRACE4(dist_outputv, erts_this_node_sysname, port_str, remote_str, size); @@ -1960,9 +2049,9 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) } -#if defined(ARCH_64) && !HALFWORD_HEAP +#if defined(ARCH_64) #define ERTS_PORT_REDS_MASK__ 0x003fffffffffffffL -#elif defined(ARCH_32) || HALFWORD_HEAP +#elif defined(ARCH_32) #define ERTS_PORT_REDS_MASK__ 0x003fffff #else # error "Ohh come on ... !?!" @@ -1988,6 +2077,7 @@ erts_dist_command(Port *prt, int reds_limit) DistEntry *dep = prt->dist_entry; Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); erts_aint32_t sched_flags; + ErtsSchedulerData *esdp = erts_get_scheduler_data(); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); @@ -2003,7 +2093,7 @@ erts_dist_command(Port *prt, int reds_limit) erts_smp_de_runlock(dep); if (status & ERTS_DE_SFLG_EXITING) { - erts_deliver_port_exit(prt, prt->common.id, am_killed, 0); + erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1); erts_deref_dist_entry(dep); return reds + ERTS_PORT_REDS_DIST_CMD_EXIT; } @@ -2042,12 +2132,12 @@ erts_dist_command(Port *prt, int reds_limit) ErtsDistOutputBuf *fob; size = (*send)(prt, foq.first); + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG erts_fprintf(stderr, ">> "); bw(foq.first->extp, size); #endif reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - erts_smp_atomic_add_nob(&erts_bytes_out, size); fob = foq.first; obufsize += size_obuf(fob); foq.first = foq.first->next; @@ -2127,12 +2217,12 @@ erts_dist_command(Port *prt, int reds_limit) ASSERT(&oq.first->data[0] <= oq.first->extp && oq.first->extp < oq.first->ext_endp); size = (*send)(prt, oq.first); + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG erts_fprintf(stderr, ">> "); bw(oq.first->extp, size); #endif reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - erts_smp_atomic_add_nob(&erts_bytes_out, size); fob = oq.first; obufsize += size_obuf(fob); oq.first = oq.first->next; @@ -2280,8 +2370,9 @@ erts_dist_port_not_busy(Port *prt) DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); - erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id); - erts_snprintf(remote_str, sizeof(remote_str), + erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), + "%T", prt->common.id); + erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), "%T", prt->dist_entry->sysname); DTRACE3(dist_port_not_busy, erts_this_node_sysname, port_str, remote_str); @@ -2423,7 +2514,7 @@ info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected) erts_print(to, arg, "Name: %T", dep->sysname); #ifdef DEBUG - erts_print(to, arg, " (refc=%d)", erts_refc_read(&dep->refc, 1)); + erts_print(to, arg, " (refc=%d)", erts_refc_read(&dep->refc, 0)); #endif erts_print(to, arg, "\n"); if (!connected && is_nil(dep->cid)) { @@ -2469,7 +2560,9 @@ int distribution_info(int to, void *arg) /* Called by break handler */ } for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) { - info_dist_entry(to, arg, dep, 0, 0); + if (dep != erts_this_dist_entry) { + info_dist_entry(to, arg, dep, 0, 0); + } } return(0); @@ -2547,13 +2640,8 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) if (!net_kernel) goto error; - /* By setting dist_entry==erts_this_dist_entry and DISTRIBUTION on - net_kernel do_net_exist will be called when net_kernel - is terminated !! */ - (void) ERTS_PROC_SET_DIST_ENTRY(net_kernel, - ERTS_PROC_LOCK_MAIN, - erts_this_dist_entry); - erts_refc_inc(&erts_this_dist_entry->refc, 2); + /* By setting F_DISTRIBUTION on net_kernel, + * do_net_exist will be called when net_kernel is terminated !! */ net_kernel->flags |= F_DISTRIBUTION; if (net_kernel != BIF_P) @@ -2914,11 +3002,11 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx); - ASSERT(erts_no_of_not_connected_dist_entries >= 0); + ASSERT(erts_no_of_not_connected_dist_entries > 0); ASSERT(erts_no_of_hidden_dist_entries >= 0); ASSERT(erts_no_of_visible_dist_entries >= 0); if(not_connected) - length += erts_no_of_not_connected_dist_entries; + length += (erts_no_of_not_connected_dist_entries - 1); if(hidden) length += erts_no_of_hidden_dist_entries; if(visible) @@ -2940,8 +3028,10 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) #endif if(not_connected) for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { - result = CONS(hp, dep->sysname, result); - hp += 2; + if (dep != erts_this_dist_entry) { + result = CONS(hp, dep->sysname, result); + hp += 2; + } } if(hidden) for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { @@ -3172,11 +3262,16 @@ send_nodes_mon_msg(Process *rp, Uint sz) { Eterm msg; - ErlHeapFragment* bp; + Eterm *hp; + ErtsMessage *mp; ErlOffHeap *ohp; - Eterm *hp = erts_alloc_message_heap(sz, &bp, &ohp, rp, rp_locksp); #ifdef DEBUG - Eterm *hend = hp + sz; + Eterm *hend; +#endif + + mp = erts_alloc_message_heap(rp, rp_locksp, sz, &hp, &ohp); +#ifdef DEBUG + hend = hp + sz; #endif if (!nmp->opts) { @@ -3222,11 +3317,7 @@ send_nodes_mon_msg(Process *rp, } ASSERT(hend == hp); - erts_queue_message(rp, rp_locksp, bp, msg, NIL -#ifdef USE_VM_PROBES - , NIL -#endif - ); + erts_queue_message(rp, *rp_locksp, mp, msg, am_system); } static void @@ -3246,10 +3337,10 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas DTRACE_CHARBUF(type_str, 12); DTRACE_CHARBUF(reason_str, 64); - erts_snprintf(what_str, sizeof(what_str), "%T", what); - erts_snprintf(node_str, sizeof(node_str), "%T", node); - erts_snprintf(type_str, sizeof(type_str), "%T", type); - erts_snprintf(reason_str, sizeof(reason_str), "%T", reason); + erts_snprintf(what_str, sizeof(DTRACE_CHARBUF_NAME(what_str)), "%T", what); + erts_snprintf(node_str, sizeof(DTRACE_CHARBUF_NAME(node_str)), "%T", node); + erts_snprintf(type_str, sizeof(DTRACE_CHARBUF_NAME(type_str)), "%T", type); + erts_snprintf(reason_str, sizeof(DTRACE_CHARBUF_NAME(reason_str)), "%T", reason); DTRACE5(dist_monitor, erts_this_node_sysname, what_str, node_str, type_str, reason_str); } @@ -3283,7 +3374,7 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas continue; break; default: - erl_exit(ERTS_ABORT_EXIT, "Bad node type found\n"); + erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n"); } } @@ -3592,7 +3683,7 @@ erts_processes_monitoring_nodes(Process *c_p) case ERTS_NODES_MON_OPT_TYPES: type = am_all; break; case ERTS_NODES_MON_OPT_TYPE_VISIBLE: type = am_visible; break; case ERTS_NODES_MON_OPT_TYPE_HIDDEN: type = am_hidden; break; - default: erl_exit(ERTS_ABORT_EXIT, "Bad node type found\n"); + default: erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n"); } olist = erts_bld_cons(hpp, szp, erts_bld_tuple(hpp, szp, 2, |