diff options
Diffstat (limited to 'erts/emulator/beam')
24 files changed, 355 insertions, 384 deletions
diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index a0dbd9ec7b..d221e6aea6 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -1753,6 +1753,7 @@ BIF_RETTYPE erts_internal_purge_module_2(BIF_ALIST_2) if (literals) { ErtsLiteralAreaRef *ref; + ErtsMessage *mp; ref = erts_alloc(ERTS_ALC_T_LITERAL_REF, sizeof(ErtsLiteralAreaRef)); ref->literal_area = literals; @@ -1767,10 +1768,12 @@ BIF_RETTYPE erts_internal_purge_module_2(BIF_ALIST_2) release_literal_areas.last = ref; } erts_mtx_unlock(&release_literal_areas.mtx); + mp = erts_alloc_message(0, NULL); + ERL_MESSAGE_TOKEN(mp) = am_undefined; erts_queue_proc_message(BIF_P, erts_literal_area_collector, 0, - erts_alloc_message(0, NULL), + mp, am_copy_literals); } diff --git a/erts/emulator/beam/beam_debug.c b/erts/emulator/beam/beam_debug.c index 6d3b99c43e..9633de2021 100644 --- a/erts/emulator/beam/beam_debug.c +++ b/erts/emulator/beam/beam_debug.c @@ -667,7 +667,7 @@ print_op(fmtfn_t to, void *to_arg, int op, int size, BeamInstr* addr) ap++; break; case 'l': /* fr(N) */ - erts_print(to, to_arg, "fr(%d)", loader_reg_index(ap[0])); + erts_print(to, to_arg, "fr(%d)", ap[0] / sizeof(FloatDef)); ap++; break; default: @@ -1191,6 +1191,7 @@ dirty_send_message(Process *c_p, Eterm to, Eterm tag) mp = erts_alloc_message_heap(rp, &rp_locks, 3, &hp, &ohp); msg = TUPLE2(hp, tag, c_p->common.id); + ERL_MESSAGE_TOKEN(mp) = am_undefined; erts_queue_proc_message(c_p, rp, rp_locks, mp, msg); if (rp == real_c_p) diff --git a/erts/emulator/beam/beam_ranges.c b/erts/emulator/beam/beam_ranges.c index f0c9496341..9f3153724a 100644 --- a/erts/emulator/beam/beam_ranges.c +++ b/erts/emulator/beam/beam_ranges.c @@ -35,10 +35,8 @@ typedef struct { /* * Used for crash dumping of literals. The size of erts_dump_lit_areas is - * always twice the number of active ranges (to allow for literals in both - * current and old code). + * always at least the number of active ranges. */ - ErtsLiteralArea** erts_dump_lit_areas; Uint erts_dump_num_lit_areas; @@ -180,8 +178,8 @@ erts_end_staging_ranges(int commit) (erts_aint_t) (r[dst].modules + r[dst].n / 2)); - if (r[dst].allocated * 2 > erts_dump_num_lit_areas) { - erts_dump_num_lit_areas *= 2; + if (r[dst].allocated > erts_dump_num_lit_areas) { + erts_dump_num_lit_areas = r[dst].allocated * 2; erts_dump_lit_areas = (ErtsLiteralArea **) erts_realloc(ERTS_ALC_T_CRASH_DUMP, (void *) erts_dump_lit_areas, diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 97e1ee1286..f18af8bcd7 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -2063,7 +2063,7 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) if (p == rp) rp_locks |= ERTS_PROC_LOCK_MAIN; /* send to local process */ - erts_send_message(p, rp, &rp_locks, msg, 0); + erts_send_message(p, rp, &rp_locks, msg); erts_proc_unlock(rp, p == rp ? (rp_locks & ~ERTS_PROC_LOCK_MAIN) diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 1e822d5c7b..0633bff3c2 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -116,11 +116,12 @@ static Export *dist_ctrl_put_data_trap; /* forward declarations */ -static void clear_dist_entry(DistEntry*); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); static Sint abort_connection(DistEntry* dep, Uint32 conn_id); +static ErtsDistOutputBuf* clear_de_out_queues(DistEntry*); +static void free_de_out_queues(DistEntry*, ErtsDistOutputBuf*); static erts_atomic_t no_caches; static erts_atomic_t no_nodes; @@ -556,7 +557,10 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) } } else { /* Call from distribution controller (port/process) */ - ErtsMonLnkDist *mld; + ErtsMonLnkDist *mld; + ErtsAtomCache *cache; + ErtsProcList *suspendees; + ErtsDistOutputBuf *obuf; Uint32 flags; erts_atomic_set_mb(&dep->dist_cmd_scheduled, 1); @@ -588,6 +592,22 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) nodename = dep->sysname; flags = dep->flags; + erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) NIL); + cache = dep->cache; + dep->cache = NULL; + + erts_mtx_lock(&dep->qlock); + + erts_atomic64_set_nob(&dep->in, 0); + erts_atomic64_set_nob(&dep->out, 0); + + obuf = clear_de_out_queues(dep); + suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + erts_set_dist_entry_not_connected(dep); erts_de_rwunlock(dep); @@ -601,7 +621,13 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) ? am_connection_closed : reason)); - clear_dist_entry(dep); + erts_resume_processes(suspendees); + + delete_cache(cache); + + free_de_out_queues(dep, obuf); + if (dep->transcode_ctx) + transcode_free_ctx(dep); } dec_no_nodes(); @@ -732,41 +758,6 @@ static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf) } } -static void clear_dist_entry(DistEntry *dep) -{ - ErtsAtomCache *cache; - ErtsProcList *suspendees; - ErtsDistOutputBuf *obuf; - - erts_de_rwlock(dep); - erts_atomic_set_nob(&dep->input_handler, - (erts_aint_t) NIL); - cache = dep->cache; - dep->cache = NULL; - - erts_mtx_lock(&dep->qlock); - - erts_atomic64_set_nob(&dep->in, 0); - erts_atomic64_set_nob(&dep->out, 0); - - obuf = clear_de_out_queues(dep); - dep->state = ERTS_DE_STATE_IDLE; - suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); - - erts_mtx_unlock(&dep->qlock); - erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); - dep->send = NULL; - erts_de_rwunlock(dep); - - erts_resume_processes(suspendees); - - delete_cache(cache); - - free_de_out_queues(dep, obuf); - if (dep->transcode_ctx) - transcode_free_ctx(dep); -} - int erts_dsend_context_dtor(Binary* ctx_bin) { ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin); @@ -861,7 +852,7 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, DeclareTmpHeapNoproc(ctl_heap,6); int res; - if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { /* * Receiver does not support DOP_MONITOR_P_EXIT (see dsig_send_monitor) */ @@ -889,7 +880,7 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, DeclareTmpHeapNoproc(ctl_heap,5); int res; - if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { /* * Receiver does not support DOP_MONITOR_P. * Just avoid sending it and by doing that reduce this monitor @@ -920,7 +911,7 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher, DeclareTmpHeapNoproc(ctl_heap,5); int res; - if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { /* * Receiver does not support DOP_DEMONITOR_P (see dsig_send_monitor) */ @@ -940,7 +931,7 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher, static int can_send_seqtrace_token(ErtsSendContext* ctx, Eterm token) { Eterm label; - if (ctx->dep->flags & DFLAG_BIG_SEQTRACE_LABELS) { + if (ctx->dsd.flags & DFLAG_BIG_SEQTRACE_LABELS) { /* The other end is capable of handling arbitrary seq_trace labels. */ return 1; } @@ -1001,7 +992,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) send_token = (token != NIL && can_send_seqtrace_token(ctx, token)); - if (ctx->dep->flags & DFLAG_SEND_SENDER) { + if (ctx->dsd.flags & DFLAG_SEND_SENDER) { dist_op = make_small(send_token ? DOP_SEND_SENDER_TT : DOP_SEND_SENDER); @@ -1200,21 +1191,8 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote) #include <valgrind/valgrind.h> #include <valgrind/memcheck.h> -#ifndef HAVE_VALGRIND_PRINTF_XML -#define VALGRIND_PRINTF_XML VALGRIND_PRINTF -#endif - # define PURIFY_MSG(msg) \ - do { \ - char buf__[1]; size_t bufsz__ = sizeof(buf__); \ - if (erts_sys_explicit_8bit_getenv("VALGRIND_LOG_XML", buf__, &bufsz__) >= 0) { \ - VALGRIND_PRINTF_XML("<erlang_error_log>" \ - "%s, line %d: %s</erlang_error_log>\n", \ - __FILE__, __LINE__, msg); \ - } else { \ - VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg); \ - } \ - } while (0) + VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg) #else # define PURIFY_MSG(msg) #endif @@ -1231,13 +1209,13 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote) int erts_net_message(Port *prt, DistEntry *dep, + Uint32 conn_id, byte *hbuf, ErlDrvSizeT hlen, byte *buf, ErlDrvSizeT len) { ErtsDistExternal ede; - byte *t; Sint ctl_len; Eterm arg; Eterm from, to; @@ -1255,7 +1233,6 @@ int erts_net_message(Port *prt, Eterm token_size; Uint tuple_arity; int res; - Uint32 connection_id; #ifdef ERTS_DIST_MSG_DBG ErlDrvSizeT orig_len = len; #endif @@ -1271,7 +1248,6 @@ int erts_net_message(Port *prt, return 0; } - ASSERT(hlen == 0); if (len == 0) { /* HANDLE TICK !!! */ @@ -1284,15 +1260,7 @@ int erts_net_message(Port *prt, bw(buf, len); #endif - if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) - t = buf; - else { - /* Skip PASS_THROUGH */ - t = buf+1; - len--; - } - - res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache, &connection_id); + res = erts_prepare_dist_ext(&ede, buf, len, dep, conn_id, dep->cache); switch (res) { case ERTS_PREP_DIST_EXT_CLOSED: @@ -1334,10 +1302,9 @@ int erts_net_message(Port *prt, PURIFY_MSG("data error"); goto decode_error; } - ctl_len = t - buf; #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, "<<%s CTL: %T\n", len != orig_len ? "P" : " ", arg); + erts_fprintf(stderr, "<< CTL: %T\n", arg); #endif if (is_not_tuple(arg) || @@ -1791,7 +1758,7 @@ decode_error: } data_error: UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); - erts_kill_dist_connection(dep, connection_id); + erts_kill_dist_connection(dep, conn_id); ERTS_CHK_NO_PROC_LOCKS; return -1; } @@ -1847,7 +1814,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) while (1) { switch (ctx->phase) { case ERTS_DSIG_SEND_PHASE_INIT: - ctx->flags = dsdp->dep->flags; + ctx->flags = dsdp->flags; ctx->c_p = dsdp->proc; if (!ctx->c_p || dsdp->no_suspend) @@ -2102,13 +2069,14 @@ dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_output)) { + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), - "%T", prt->dist_entry->sysname); + "%T", dep->sysname); DTRACE4(dist_output, erts_this_node_sysname, port_str, remote_str, size); } @@ -2164,13 +2132,14 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_outputv)) { + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), - "%T", prt->dist_entry->sysname); + "%T", dep->sysname); DTRACE4(dist_outputv, erts_this_node_sysname, port_str, remote_str, size); } @@ -2208,7 +2177,7 @@ erts_dist_command(Port *prt, int initial_reds) Uint32 flags; Sint qsize, obufsize = 0; ErtsDistOutputQueue oq, foq; - DistEntry *dep = prt->dist_entry; + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); erts_aint32_t sched_flags; ErtsSchedulerData *esdp = erts_get_scheduler_data(); @@ -2584,11 +2553,12 @@ dist_ctrl_get_data_notification_1(BIF_ALIST_1) erts_aint32_t qflgs; erts_aint_t qsize; Eterm receiver = NIL; + Uint32 conn_id; if (!dep) BIF_ERROR(BIF_P, EXC_NOTSUP); - if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) + if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep) BIF_ERROR(BIF_P, BADARG); /* @@ -2598,6 +2568,11 @@ dist_ctrl_get_data_notification_1(BIF_ALIST_1) erts_de_rlock(dep); + if (dep->connection_id != conn_id) { + erts_de_runlock(dep); + BIF_ERROR(BIF_P, BADARG); + } + ASSERT(dep->cid == BIF_P->common.id); qflgs = erts_atomic32_read_acqb(&dep->qflgs); @@ -2638,6 +2613,7 @@ dist_ctrl_put_data_2(BIF_ALIST_2) DistEntry *dep; ErlDrvSizeT size; Eterm input_handler; + Uint32 conn_id; if (is_binary(BIF_ARG_2)) size = binary_size(BIF_ARG_2); @@ -2649,7 +2625,7 @@ dist_ctrl_put_data_2(BIF_ALIST_2) else BIF_ERROR(BIF_P, BADARG); - dep = erts_dhandle_to_dist_entry(BIF_ARG_1); + dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id); if (!dep) BIF_ERROR(BIF_P, BADARG); @@ -2669,7 +2645,7 @@ dist_ctrl_put_data_2(BIF_ALIST_2) erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); - (void) erts_net_message(NULL, dep, NULL, 0, data, size); + (void) erts_net_message(NULL, dep, conn_id, NULL, 0, data, size); /* * We ignore any decode failures. On fatal failures the * connection will be taken down by killing the @@ -2693,13 +2669,18 @@ dist_get_stat_1(BIF_ALIST_1) Sint64 read, write, pend; Eterm res, *hp, **hpp; Uint sz, *szp; - DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1); + Uint32 conn_id; + DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id); if (!dep) BIF_ERROR(BIF_P, BADARG); erts_de_rlock(dep); + if (dep->connection_id != conn_id) { + erts_de_runlock(dep); + BIF_ERROR(BIF_P, BADARG); + } read = (Sint64) erts_atomic64_read_nob(&dep->in); write = (Sint64) erts_atomic64_read_nob(&dep->out); pend = (Sint64) erts_atomic_read_nob(&dep->qsize); @@ -2730,19 +2711,25 @@ BIF_RETTYPE dist_ctrl_input_handler_2(BIF_ALIST_2) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + Uint32 conn_id; if (!dep) BIF_ERROR(BIF_P, EXC_NOTSUP); - if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) + if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep) BIF_ERROR(BIF_P, BADARG); if (is_not_internal_pid(BIF_ARG_2)) BIF_ERROR(BIF_P, BADARG); + erts_de_rlock(dep); + if (dep->connection_id != conn_id) { + erts_de_runlock(dep); + BIF_ERROR(BIF_P, BADARG); + } erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) BIF_ARG_2); - + erts_de_runlock(dep); BIF_RET(am_ok); } @@ -2756,15 +2743,21 @@ dist_ctrl_get_data_1(BIF_ALIST_1) Eterm *hp; ProcBin *pb; erts_aint_t qsize; + Uint32 conn_id; if (!dep) BIF_ERROR(BIF_P, EXC_NOTSUP); - if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) + if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep) BIF_ERROR(BIF_P, BADARG); erts_de_rlock(dep); + if (dep->connection_id != conn_id) { + erts_de_runlock(dep); + BIF_ERROR(BIF_P, BADARG); + } + if (dep->state == ERTS_DE_STATE_EXITING) goto return_none; @@ -2851,13 +2844,14 @@ erts_dist_port_not_busy(Port *prt) { #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_port_not_busy)) { + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), - "%T", prt->dist_entry->sysname); + "%T", dep->sysname); DTRACE3(dist_port_not_busy, erts_this_node_sysname, port_str, remote_str); } @@ -2883,10 +2877,10 @@ static void kill_connection(DistEntry *dep) } void -erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) +erts_kill_dist_connection(DistEntry *dep, Uint32 conn_id) { erts_de_rwlock(dep); - if (connection_id == dep->connection_id + if (conn_id == dep->connection_id && dep->state == ERTS_DE_STATE_CONNECTED) { kill_connection(dep); @@ -3222,23 +3216,6 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) else if (!dep) goto system_limit; /* Should never happen!!! */ - erts_de_rlock(dep); - de_locked = -1; - - if (dep->state == ERTS_DE_STATE_EXITING) { - /* Suspend on dist entry waiting for the exit to finish */ - ErtsProcList *plp = erts_proclist_create(BIF_P); - plp->next = NULL; - erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); - erts_mtx_lock(&dep->qlock); - erts_proclist_store_last(&dep->suspended, plp); - erts_mtx_unlock(&dep->qlock); - goto yield; - } - - erts_de_runlock(dep); - de_locked = 0; - if (is_internal_pid(BIF_ARG_2)) { if (BIF_P->common.id == BIF_ARG_2) { ErtsSetupConnDistCtrl scdc; @@ -3284,7 +3261,7 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) hp = HAlloc(BIF_P, 3); } else { - int new; + Uint32 conn_id; pp = erts_id2port_sflgs(BIF_ARG_2, BIF_P, @@ -3293,7 +3270,7 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) erts_de_rwlock(dep); de_locked = 1; - if (dep->state == ERTS_DE_STATE_EXITING) + if (dep->state != ERTS_DE_STATE_PENDING) goto badarg; if (!pp || (erts_atomic32_read_nob(&pp->state) @@ -3303,49 +3280,39 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) goto badarg; - if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) - new = 0; - else { - if (dep->state != ERTS_DE_STATE_PENDING) { - if (dep->state == ERTS_DE_STATE_IDLE) - erts_set_dist_entry_pending(dep); - else - goto badarg; - } - - if (pp->dist_entry || is_not_nil(dep->cid)) - goto badarg; - - erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); + if (erts_prtsd_get(pp, ERTS_PRTSD_DIST_ENTRY) != NULL + || is_not_nil(dep->cid)) + goto badarg; - pp->dist_entry = dep; + erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); - ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); + erts_prtsd_set(pp, ERTS_PRTSD_DIST_ENTRY, dep); + erts_prtsd_set(pp, ERTS_PRTSD_CONN_ID, (void*)(UWord)dep->connection_id); - dep->send = (pp->drv_ptr->outputv - ? dist_port_commandv - : dist_port_command); - ASSERT(dep->send); + ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); - /* - * Dist-ports do not use the "busy port message queue" functionality, but - * instead use "busy dist entry" functionality. - */ - { - ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; - erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); - } + dep->send = (pp->drv_ptr->outputv + ? dist_port_commandv + : dist_port_command); + ASSERT(dep->send); - setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version); - de_locked = 0; - new = !0; + /* + * Dist-ports do not use the "busy port message queue" functionality, but + * instead use "busy dist entry" functionality. + */ + { + ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; + erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); } - hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); - res = erts_build_dhandle(&hp, &BIF_P->off_heap, dep); + conn_id = dep->connection_id; + setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version); + de_locked = 0; + + hp = HAlloc(BIF_P, 3 + ERTS_DHANDLE_SIZE); + res = erts_build_dhandle(&hp, &BIF_P->off_heap, dep, conn_id); res_tag = am_ok; /* Connection up */ - if (new) - dep = NULL; /* inc of refc transferred to port (dist_entry field) */ + dep = NULL; /* inc of refc transferred to port (dist_entry field) */ } ASSERT(is_value(res) && is_value(res_tag)); @@ -3371,12 +3338,6 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) return ret; - yield: - ERTS_BIF_PREP_YIELD4(ret, - bif_export[BIF_erts_internal_create_dist_channel_4], - BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4); - goto done; - badarg: ERTS_BIF_PREP_RET(ret, am_badarg); goto done; @@ -3398,8 +3359,7 @@ setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, dep->creation = 0; ASSERT(is_internal_port(ctrlr) || is_internal_pid(ctrlr)); - ASSERT(erts_atomic_read_nob(&dep->qsize) == 0 - || (dep->state == ERTS_DE_STATE_PENDING)); + ASSERT(dep->state == ERTS_DE_STATE_PENDING); if (flags & DFLAG_DIST_HDR_ATOM_CACHE) create_cache(dep); @@ -3445,37 +3405,20 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment * DistEntry *dep = scdcp->dep; int dep_locked = 0; Eterm *hp; - erts_aint32_t state; + Uint32 conn_id; if (redsp) *redsp = 1; - state = erts_atomic32_read_nob(&c_p->state); - - if (state & ERTS_PSFLG_EXITING) - goto badarg; + ASSERT(!ERTS_PROC_IS_EXITING(c_p)); erts_de_rwlock(dep); dep_locked = !0; - if (dep->state == ERTS_DE_STATE_EXITING) + if (dep->state != ERTS_DE_STATE_PENDING) goto badarg; - if (ERTS_PROC_GET_DIST_ENTRY(c_p)) { - if (dep == ERTS_PROC_GET_DIST_ENTRY(c_p) - && (c_p->flags & F_DISTRIBUTION) - && dep->cid == c_p->common.id) { - goto connected; - } - goto badarg; - } - - if (dep->state != ERTS_DE_STATE_PENDING) { - if (dep->state == ERTS_DE_STATE_IDLE) - erts_set_dist_entry_pending(dep); - else - goto badarg; - } + conn_id = dep->connection_id; if (is_not_nil(dep->cid)) goto badarg; @@ -3490,18 +3433,17 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment * setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id, scdcp->flags, scdcp->version); -connected: /* we take over previous inc in refc of dep */ if (!bpp) /* called directly... */ - return erts_make_dhandle(c_p, dep); + return erts_make_dhandle(c_p, dep, conn_id); erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg); - *bpp = new_message_buffer(ERTS_MAGIC_REF_THING_SIZE); + *bpp = new_message_buffer(ERTS_DHANDLE_SIZE); hp = (*bpp)->mem; - return erts_build_dhandle(&hp, &(*bpp)->off_heap, dep); + return erts_build_dhandle(&hp, &(*bpp)->off_heap, dep, conn_id); badarg: @@ -3542,30 +3484,27 @@ BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1) erts_de_rwlock(dep); switch (dep->state) { - case ERTS_DE_STATE_PENDING: case ERTS_DE_STATE_CONNECTED: + case ERTS_DE_STATE_EXITING: + case ERTS_DE_STATE_PENDING: conn_id = dep->connection_id; break; case ERTS_DE_STATE_IDLE: erts_set_dist_entry_pending(dep); conn_id = dep->connection_id; break; - case ERTS_DE_STATE_EXITING: - conn_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; - break; default: erts_exit(ERTS_ABORT_EXIT, "Invalid dep->state (%d)\n", dep->state); } erts_de_rwunlock(dep); - hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); - dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep); + hp = HAlloc(BIF_P, ERTS_DHANDLE_SIZE); + dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep, conn_id); erts_deref_dist_entry(dep); - BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle)); + BIF_RET(dhandle); } Sint erts_abort_connection_rwunlock(DistEntry* dep) { - Sint reds = 0; ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); if (dep->state == ERTS_DE_STATE_CONNECTED) { @@ -3575,6 +3514,7 @@ Sint erts_abort_connection_rwunlock(DistEntry* dep) ErtsAtomCache *cache; ErtsDistOutputBuf *obuf; ErtsProcList *resume_procs; + Sint reds = 0; ErtsMonLnkDist *mld; ASSERT(is_nil(dep->cid)); @@ -3596,7 +3536,6 @@ Sint erts_abort_connection_rwunlock(DistEntry* dep) dep->send = NULL; erts_set_dist_entry_not_connected(dep); - erts_de_rwunlock(dep); schedule_con_monitor_link_cleanup(mld, THE_NON_VALUE, @@ -3609,19 +3548,10 @@ Sint erts_abort_connection_rwunlock(DistEntry* dep) delete_cache(cache); free_de_out_queues(dep, obuf); - - /* - * We wait to make DistEntry idle and accept new connection attempts - * until all is cleared and deallocated. This to get some back pressure - * against repeated failing connection attempts saturating all CPUs - * with cleanup jobs. - */ - erts_de_rwlock(dep); - ASSERT(dep->state == ERTS_DE_STATE_EXITING); - dep->state = ERTS_DE_STATE_IDLE; + return reds; } erts_de_rwunlock(dep); - return reds; + return 0; } static Sint abort_connection(DistEntry *dep, Uint32 conn_id) @@ -3636,22 +3566,19 @@ static Sint abort_connection(DistEntry *dep, Uint32 conn_id) BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2) { DistEntry* dep; - Eterm* tp; + Uint32 conn_id; + Sint reds; - if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) { - BIF_ERROR(BIF_P, BADARG); - } - tp = tuple_val(BIF_ARG_2); - dep = erts_dhandle_to_dist_entry(tp[2]); - if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1) + if (is_not_atom(BIF_ARG_1)) + BIF_ERROR(BIF_P, BADARG); + dep = erts_dhandle_to_dist_entry(BIF_ARG_2, &conn_id); + if (!dep || dep != erts_find_dist_entry(BIF_ARG_1) || dep == erts_this_dist_entry) { BIF_ERROR(BIF_P, BADARG); } - if (dep) { - Sint reds = abort_connection(dep, unsigned_val(tp[1])); - BUMP_REDS(BIF_P, reds); - } + reds = abort_connection(dep, conn_id); + BUMP_REDS(BIF_P, reds); BIF_RET(am_true); } @@ -3682,14 +3609,14 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) } /* - * Send {auto_connect, Node, ConnId, DHandle} to net_kernel + * Send {auto_connect, Node, DHandle} to net_kernel */ mp = erts_alloc_message_heap(net_kernel, &nk_locks, - 5 + ERTS_MAGIC_REF_THING_SIZE, + 4 + ERTS_DHANDLE_SIZE, &hp, &ohp); - dhandle = erts_build_dhandle(&hp, ohp, dep); - msg = TUPLE4(hp, am_auto_connect, dep->sysname, make_small(conn_id), - dhandle); + dhandle = erts_build_dhandle(&hp, ohp, dep, conn_id); + msg = TUPLE3(hp, am_auto_connect, dep->sysname, dhandle); + ERL_MESSAGE_TOKEN(mp) = am_undefined; erts_queue_proc_message(proc, net_kernel, nk_locks, mp, msg); erts_proc_unlock(net_kernel, nk_locks); } diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 75cb865390..d4d7874a70 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -136,6 +136,7 @@ typedef struct { Eterm cid; Eterm connection_id; int no_suspend; + Uint32 flags; } ErtsDSigData; #define ERTS_DE_BUSY_LIMIT (1024*1024) @@ -235,6 +236,7 @@ retry: dsdp->cid = dep->cid; dsdp->connection_id = dep->connection_id; dsdp->no_suspend = no_suspend; + dsdp->flags = dep->flags; if (dspl == ERTS_DSP_NO_LOCK) erts_de_runlock(dep); return res; @@ -254,9 +256,9 @@ void erts_schedule_dist_command(Port *prt, DistEntry *dist_entry) ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); ASSERT((erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD) == 0); - ASSERT(prt->dist_entry); - dep = prt->dist_entry; + dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); + ASSERT(dep); id = prt->common.id; } else { diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c index 575d6ca867..8fe1ccb758 100644 --- a/erts/emulator/beam/erl_alloc.c +++ b/erts/emulator/beam/erl_alloc.c @@ -114,7 +114,7 @@ typedef union { char align_afa[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(AFAllctr_t))]; AOFFAllctr_t aoffa; char align_aoffa[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(AOFFAllctr_t))]; -} ErtsAllocatorState_t; +} ErtsAllocatorState_t erts_align_attribute(ERTS_CACHE_LINE_SIZE); static ErtsAllocatorState_t std_alloc_state; static ErtsAllocatorState_t ll_alloc_state; diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 5789fa8e71..7fada0d548 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -2093,17 +2093,6 @@ current_stacktrace(ErtsHeapFactory *hfact, Process* rp, return res; } -#if defined(VALGRIND) -static int check_if_xml(void) -{ - char buf[1]; - size_t bufsz = sizeof(buf); - return erts_sys_explicit_8bit_getenv("VALGRIND_LOG_XML", buf, &bufsz) >= 0; -} -#else -#define check_if_xml() 0 -#endif - /* * This function takes care of calls to erlang:system_info/1 when the argument * is a tuple. @@ -2200,15 +2189,9 @@ info_1_tuple(Process* BIF_P, /* Pointer to current process. */ #endif } else if (is_list(*tp)) { #if defined(PURIFY) -#define ERTS_ERROR_CHECKER_PRINTF purify_printf -#define ERTS_ERROR_CHECKER_PRINTF_XML purify_printf +# define ERTS_ERROR_CHECKER_PRINTF purify_printf #elif defined(VALGRIND) -#define ERTS_ERROR_CHECKER_PRINTF VALGRIND_PRINTF -# ifndef HAVE_VALGRIND_PRINTF_XML -# define ERTS_ERROR_CHECKER_PRINTF_XML VALGRIND_PRINTF -# else -# define ERTS_ERROR_CHECKER_PRINTF_XML VALGRIND_PRINTF_XML -# endif +# define ERTS_ERROR_CHECKER_PRINTF VALGRIND_PRINTF #endif ErlDrvSizeT buf_size = 8*1024; /* Try with 8KB first */ char *buf = erts_alloc(ERTS_ALC_T_TMP, buf_size); @@ -2224,12 +2207,7 @@ info_1_tuple(Process* BIF_P, /* Pointer to current process. */ ASSERT(r == buf_size - 1); } buf[buf_size - 1 - r] = '\0'; - if (check_if_xml()) { - ERTS_ERROR_CHECKER_PRINTF_XML("<erlang_info_log>" - "%s</erlang_info_log>\n", buf); - } else { - ERTS_ERROR_CHECKER_PRINTF("%s\n", buf); - } + ERTS_ERROR_CHECKER_PRINTF("%s\n", buf); erts_free(ERTS_ALC_T_TMP, (void *) buf); BIF_RET(am_true); #undef ERTS_ERROR_CHECKER_PRINTF diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c index 36d83d93f4..c009a3bde8 100644 --- a/erts/emulator/beam/erl_db.c +++ b/erts/emulator/beam/erl_db.c @@ -3647,6 +3647,7 @@ send_ets_transfer_message(Process *c_p, Process *proc, hd_copy = copy_struct(heir_data, hd_sz, &hp, ohp); sender = c_p->common.id; msg = TUPLE4(hp, am_ETS_TRANSFER, tid, sender, hd_copy); + ERL_MESSAGE_TOKEN(mp) = am_undefined; erts_queue_proc_message(c_p, proc, *locks, mp, msg); } diff --git a/erts/emulator/beam/erl_db_hash.c b/erts/emulator/beam/erl_db_hash.c index b988a19cf4..752d3ae3a8 100644 --- a/erts/emulator/beam/erl_db_hash.c +++ b/erts/emulator/beam/erl_db_hash.c @@ -150,6 +150,22 @@ static ERTS_INLINE Uint hash_to_ix(DbTableHash* tb, HashValue hval) } +static ERTS_INLINE FixedDeletion* alloc_fixdel(DbTableHash* tb) +{ + FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL, + (DbTable *) tb, + sizeof(FixedDeletion)); + ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion)); + return fixd; +} + +static ERTS_INLINE void free_fixdel(DbTableHash* tb, FixedDeletion* fixd) +{ + erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable*)tb, + fixd, sizeof(FixedDeletion)); + ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion)); +} + static ERTS_INLINE int link_fixdel(DbTableHash* tb, FixedDeletion* fixd, erts_aint_t fixated_by_me) @@ -160,8 +176,7 @@ static ERTS_INLINE int link_fixdel(DbTableHash* tb, was_next = erts_atomic_read_acqb(&tb->fixdel); do { /* Lockless atomic insertion in linked list: */ if (NFIXED(tb) <= fixated_by_me) { - erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable*)tb, - fixd, sizeof(FixedDeletion)); + free_fixdel(tb, fixd); return 0; /* raced by unfixer */ } exp_next = was_next; @@ -180,10 +195,7 @@ static ERTS_INLINE int link_fixdel(DbTableHash* tb, static int add_fixed_deletion(DbTableHash* tb, int ix, erts_aint_t fixated_by_me) { - FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL, - (DbTable *) tb, - sizeof(FixedDeletion)); - ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion)); + FixedDeletion* fixd = alloc_fixdel(tb); fixd->slot = ix; fixd->all = 0; return link_fixdel(tb, fixd, fixated_by_me); @@ -637,11 +649,7 @@ restart: free_me = fixdel; fixdel = fixdel->next; - erts_db_free(ERTS_ALC_T_DB_FIX_DEL, - (DbTable *) tb, - (void *) free_me, - sizeof(FixedDeletion)); - ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion)); + free_fixdel(tb, free_me); work++; } @@ -2338,11 +2346,10 @@ static SWord db_mark_all_deleted_hash(DbTable *tbl, SWord reds) } else { /* First call */ - fixdel = erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL, - (DbTable *) tb, - sizeof(FixedDeletion)); - ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion)); - link_fixdel(tb, fixdel, 0); + int ok; + fixdel = alloc_fixdel(tb); + ok = link_fixdel(tb, fixdel, 0); + ASSERT(ok); (void)ok; i = 0; } @@ -2444,11 +2451,7 @@ static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds) FixedDeletion *fx = fixdel; fixdel = fx->next; - erts_db_free(ERTS_ALC_T_DB_FIX_DEL, - (DbTable *) tb, - (void *) fx, - sizeof(FixedDeletion)); - ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion)); + free_fixdel(tb, fx); if (--reds < 0) { erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)fixdel); return reds; /* Not done */ diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 507cc989d2..a3274d7443 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -409,6 +409,11 @@ ErtsMessage* prepend_pending_sig_maybe(Process* sender, Process* receiver, * * @brief Send one message from *NOT* a local process. * + * seq_trace does not work with this type of messages + * to it is set to am_undefined which means that the + * receiving process will not remove the seq_trace token + * when it gets this message. + * */ void erts_queue_message(Process* receiver, ErtsProcLocks receiver_locks, @@ -417,11 +422,19 @@ erts_queue_message(Process* receiver, ErtsProcLocks receiver_locks, ASSERT(is_not_internal_pid(from)); ERL_MESSAGE_TERM(mp) = msg; ERL_MESSAGE_FROM(mp) = from; + ERL_MESSAGE_TOKEN(mp) = am_undefined; queue_messages(receiver, receiver_locks, mp, &mp->next, 1); } /** * @brief Send one message from a local process. + * + * It is up to the caller of this function to set the + * correct seq_trace. The general rule of thumb is that + * it should be set to am_undefined if the message + * cannot be traced using seq_trace, if it can be + * traced it should be set to the trace token. It should + * very rarely be explicitly set to NIL! */ void erts_queue_proc_message(Process* sender, @@ -584,8 +597,7 @@ void erts_send_message(Process* sender, Process* receiver, ErtsProcLocks *receiver_locks, - Eterm message, - unsigned flags) + Eterm message) { Uint msize; ErtsMessage* mp; @@ -619,7 +631,7 @@ erts_send_message(Process* sender, receiver_state = erts_atomic32_read_nob(&receiver->state); - if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) { + if (SEQ_TRACE_TOKEN(sender) != NIL) { Eterm* hp; Eterm stoken = SEQ_TRACE_TOKEN(sender); Uint seq_trace_size = 0; diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index d120111634..b2550814fd 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -405,8 +405,6 @@ typedef struct erl_trace_message_queue__ { #define SAVE_MESSAGE(p) \ (p)->sig_qs.save = &(*(p)->sig_qs.save)->next -#define ERTS_SND_FLG_NO_SEQ_TRACE (((unsigned) 1) << 0) - #define ERTS_HEAP_FRAG_SIZE(DATA_WORDS) \ (sizeof(ErlHeapFragment) - sizeof(Eterm) + (DATA_WORDS)*sizeof(Eterm)) @@ -429,7 +427,7 @@ typedef struct erl_trace_message_queue__ { do { \ (MP)->next = NULL; \ ERL_MESSAGE_TERM(MP) = THE_NON_VALUE; \ - ERL_MESSAGE_TOKEN(MP) = NIL; \ + ERL_MESSAGE_TOKEN(MP) = THE_NON_VALUE; \ ERL_MESSAGE_FROM(MP) = NIL; \ ERL_MESSAGE_DT_UTAG_INIT(MP); \ MP->data.attached = NULL; \ @@ -446,7 +444,7 @@ void erts_queue_proc_message(Process* from,Process* to, ErtsProcLocks,ErtsMessag void erts_queue_proc_messages(Process* from, Process* to, ErtsProcLocks, ErtsMessage*, ErtsMessage**, Uint); void erts_deliver_exit_message(Eterm, Process*, ErtsProcLocks *, Eterm, Eterm); -void erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned); +void erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm); void erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp); Uint erts_msg_attached_data_size_aux(ErtsMessage *msg); diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c index ef010ff476..ee6e6085b6 100644 --- a/erts/emulator/beam/erl_nif.c +++ b/erts/emulator/beam/erl_nif.c @@ -754,8 +754,58 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, rp_locks = ERTS_PROC_LOCK_MAIN; if (menv) { + Eterm token = c_p ? SEQ_TRACE_TOKEN(c_p) : am_undefined; + if (token != NIL && token != am_undefined) { + /* This code is copied from erts_send_message */ + Eterm stoken = SEQ_TRACE_TOKEN(c_p); +#ifdef USE_VM_PROBES + DTRACE_CHARBUF(sender_name, 64); + DTRACE_CHARBUF(receiver_name, 64); + Sint tok_label = 0; + Sint tok_lastcnt = 0; + Sint tok_serial = 0; + Eterm utag = NIL; + *sender_name = *receiver_name = '\0'; + if (DTRACE_ENABLED(message_send)) { + erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), + "%T", c_p->common.id); + erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)), + "%T", rp->common.id); + } +#endif + if (have_seqtrace(stoken)) { + seq_trace_update_send(c_p); + seq_trace_output(stoken, msg, SEQ_TRACE_SEND, + rp->common.id, c_p); + } +#ifdef USE_VM_PROBES + if (!(DT_UTAG_FLAGS(c_p) & DT_UTAG_SPREADING)) { + stoken = NIL; + } +#endif + token = enif_make_copy(msg_env, stoken); + +#ifdef USE_VM_PROBES + if (DT_UTAG_FLAGS(c_p) & DT_UTAG_SPREADING) { + if (is_immed(DT_UTAG(c_p))) + utag = DT_UTAG(c_p); + else + utag = enif_make_copy(msg_env, DT_UTAG(c_p)); + } + if (DTRACE_ENABLED(message_send)) { + if (have_seqtrace(stoken)) { + tok_label = SEQ_TRACE_T_DTRACE_LABEL(stoken); + tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(stoken)); + tok_serial = signed_val(SEQ_TRACE_T_SERIAL(stoken)); + } + DTRACE6(message_send, sender_name, receiver_name, + size_object(msg), tok_label, tok_lastcnt, tok_serial); + } +#endif + } flush_env(msg_env); mp = erts_alloc_message(0, NULL); + ERL_MESSAGE_TOKEN(mp) = token; mp->data.heap_frag = menv->env.heap_frag; ASSERT(mp->data.heap_frag == MBUF(&menv->phony_proc)); if (mp->data.heap_frag != NULL) { @@ -793,6 +843,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, ohp = &bp->off_heap; } } + ERL_MESSAGE_TOKEN(mp) = am_undefined; msg = copy_struct_litopt(msg, sz, &hp, ohp, &litarea); } @@ -836,6 +887,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, ERL_MESSAGE_TERM(mp) = msg; ERL_MESSAGE_FROM(mp) = from; + ERL_MESSAGE_TOKEN(mp) = am_undefined; if (!msgq) { msgq = erts_alloc(ERTS_ALC_T_TRACE_MSG_QUEUE, diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index f4a36d124a..f4dc60941a 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -370,31 +370,43 @@ DistEntry *erts_find_dist_entry(Eterm sysname) } DistEntry * -erts_dhandle_to_dist_entry(Eterm dhandle) +erts_dhandle_to_dist_entry(Eterm dhandle, Uint32 *conn_id) { + Eterm *tpl; Binary *bin; - if (!is_internal_magic_ref(dhandle)) + + if (!is_boxed(dhandle)) + return NULL; + tpl = boxed_val(dhandle); + if (tpl[0] != make_arityval(2) || !is_small(tpl[1]) + || !is_internal_magic_ref(tpl[2])) return NULL; - bin = erts_magic_ref2bin(dhandle); + *conn_id = unsigned_val(tpl[1]); + bin = erts_magic_ref2bin(tpl[2]); if (ERTS_MAGIC_BIN_DESTRUCTOR(bin) != erts_dist_entry_destructor) return NULL; return ErtsBin2DistEntry(bin); } Eterm -erts_build_dhandle(Eterm **hpp, ErlOffHeap* ohp, DistEntry *dep) +erts_build_dhandle(Eterm **hpp, ErlOffHeap* ohp, + DistEntry *dep, Uint32 conn_id) { Binary *bin = ErtsDistEntry2Bin(dep); + Eterm mref, dhandle; ASSERT(bin); ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == erts_dist_entry_destructor); - return erts_mk_magic_ref(hpp, ohp, bin); + mref = erts_mk_magic_ref(hpp, ohp, bin); + dhandle = TUPLE2(*hpp, make_small(conn_id), mref); + *hpp += 3; + return dhandle; } Eterm -erts_make_dhandle(Process *c_p, DistEntry *dep) +erts_make_dhandle(Process *c_p, DistEntry *dep, Uint32 conn_id) { - Eterm *hp = HAlloc(c_p, ERTS_MAGIC_REF_THING_SIZE); - return erts_build_dhandle(&hp, &c_p->off_heap, dep); + Eterm *hp = HAlloc(c_p, ERTS_DHANDLE_SIZE); + return erts_build_dhandle(&hp, &c_p->off_heap, dep, conn_id); } static void start_timer_delete_dist_entry(void *vdep); @@ -627,7 +639,7 @@ erts_set_dist_entry_not_connected(DistEntry *dep) if(dep->next) dep->next->prev = dep->prev; - dep->state = ERTS_DE_STATE_EXITING; + dep->state = ERTS_DE_STATE_IDLE; dep->flags = 0; dep->prev = NULL; dep->cid = NIL; @@ -1908,8 +1920,9 @@ setup_reference_table(void) if (ohp) insert_offheap(ohp, HEAP_REF, prt->common.id); /* Insert controller */ - if (prt->dist_entry) - insert_dist_entry(prt->dist_entry, + dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); + if (dep) + insert_dist_entry(dep, CTRL_REF, prt->common.id, 0); diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index 9a792b10b1..c44f1f8991 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -214,9 +214,10 @@ int erts_lc_is_de_rwlocked(DistEntry *); int erts_lc_is_de_rlocked(DistEntry *); #endif int erts_dist_entry_destructor(Binary *bin); -DistEntry *erts_dhandle_to_dist_entry(Eterm dhandle); -Eterm erts_build_dhandle(Eterm **hpp, ErlOffHeap*, DistEntry*); -Eterm erts_make_dhandle(Process *c_p, DistEntry *dep); +DistEntry *erts_dhandle_to_dist_entry(Eterm dhandle, Uint32* connection_id); +#define ERTS_DHANDLE_SIZE (3+ERTS_MAGIC_REF_THING_SIZE) +Eterm erts_build_dhandle(Eterm **hpp, ErlOffHeap*, DistEntry*, Uint32 conn_id); +Eterm erts_make_dhandle(Process *c_p, DistEntry*, Uint32 conn_id); ERTS_GLB_INLINE void erts_deref_node_entry(ErlNode *np); ERTS_GLB_INLINE void erts_de_rlock(DistEntry *dep); diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h index 9b52b648e5..2be0a5bf74 100644 --- a/erts/emulator/beam/erl_port.h +++ b/erts/emulator/beam/erl_port.h @@ -112,8 +112,10 @@ typedef struct line_buf { /* Buffer used in line oriented I/O */ */ #define ERTS_PRTSD_SCHED_ID 0 +#define ERTS_PRTSD_DIST_ENTRY 1 +#define ERTS_PRTSD_CONN_ID 2 -#define ERTS_PRTSD_SIZE 1 +#define ERTS_PRTSD_SIZE 3 typedef struct { void *data[ERTS_PRTSD_SIZE]; @@ -154,7 +156,6 @@ struct _erl_drv_port { Uint bytes_out; /* Number of bytes written */ ErlPortIOQueue ioq; /* driver accessible i/o queue */ - DistEntry *dist_entry; /* Dist entry used in DISTRIBUTION */ char *name; /* String used in the open */ erts_driver_t* drv_ptr; UWord drv_data; @@ -257,6 +258,8 @@ ERTS_GLB_INLINE void * erts_prtsd_get(Port *prt, int ix) { ErtsPrtSD *psd = (ErtsPrtSD *) erts_atomic_read_nob(&prt->psd); + + ASSERT((unsigned)ix < ERTS_PRTSD_SIZE); if (!psd) return NULL; ERTS_THR_DATA_DEPENDENCY_READ_MEMORY_BARRIER; @@ -272,6 +275,7 @@ erts_prtsd_set(Port *prt, int ix, void *data) psd = (ErtsPrtSD *) erts_atomic_read_nob(&prt->psd); + ASSERT((unsigned)ix < ERTS_PRTSD_SIZE); if (psd) { #ifdef ETHR_ORDERED_READ_DEPEND ETHR_MEMBAR(ETHR_LoadStore|ETHR_StoreStore); @@ -459,7 +463,7 @@ erts_port_unlock(Port *prt) ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP) #define ERTS_PORT_SCHED_ID(P, ID) \ - ((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID))) + ((Uint) (UWord) erts_prtsd_set((P), ERTS_PRTSD_SCHED_ID, (void *) (UWord) (ID))) extern const Port erts_invalid_port; #define ERTS_PORT_LOCK_BUSY ((Port *) &erts_invalid_port) diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 6af1236145..f343e984f7 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -52,8 +52,8 @@ #define ERTS_SIG_Q_OP_MAX 13 -#define ERTS_SIG_Q_OP_EXIT 0 -#define ERTS_SIG_Q_OP_EXIT_LINKED 1 +#define ERTS_SIG_Q_OP_EXIT 0 /* Exit signal due to bif call */ +#define ERTS_SIG_Q_OP_EXIT_LINKED 1 /* Exit signal due to link break*/ #define ERTS_SIG_Q_OP_MONITOR_DOWN 2 #define ERTS_SIG_Q_OP_MONITOR 3 #define ERTS_SIG_Q_OP_DEMONITOR 4 @@ -1167,10 +1167,7 @@ erts_proc_sig_send_persistent_monitor_msg(Uint16 type, Eterm key, ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_PERSISTENT_MON_MSG, type, 0); ERL_MESSAGE_FROM(mp) = from; - ERL_MESSAGE_TOKEN(mp) = NIL; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = NIL; -#endif + ERL_MESSAGE_TOKEN(mp) = am_undefined; if (!proc_queue_signal(NULL, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_PERSISTENT_MON_MSG)) { @@ -1564,11 +1561,6 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref) ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_IS_ALIVE, ERTS_SIG_Q_TYPE_UNDEFINED, 0); - ERL_MESSAGE_TOKEN(mp) = NIL; - ERL_MESSAGE_FROM(mp) = am_system; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = NIL; -#endif if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_IS_ALIVE)) (void) maybe_elevate_sig_handling_prio(c_p, to); @@ -1672,11 +1664,6 @@ erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, Eterm tag, Eterm reply) ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_SYNC_SUSPEND, ERTS_SIG_Q_TYPE_UNDEFINED, 0); - ERL_MESSAGE_TOKEN(mp) = NIL; - ERL_MESSAGE_FROM(mp) = am_system; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = NIL; -#endif if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_SYNC_SUSPEND)) (void) maybe_elevate_sig_handling_prio(c_p, to); @@ -1790,7 +1777,7 @@ handle_rpc(Process *c_p, ErtsProcSigRPC *rpc, int cnt, int limit, int *yieldp) msg = TUPLE2(hp, ref, res); mp->hfrag.next = bp; - + ERL_MESSAGE_TOKEN(mp) = am_undefined; erts_queue_proc_message(c_p, rp, 0, mp, msg); } @@ -2155,10 +2142,7 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing, pid = STORE_NC(&hp, ohp, from); ERL_MESSAGE_TERM(mp) = TUPLE3(hp, am_EXIT, pid, reason); - ERL_MESSAGE_TOKEN(mp) = NIL; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = NIL; -#endif + ERL_MESSAGE_TOKEN(mp) = am_undefined; if (is_immed(pid)) ERL_MESSAGE_FROM(mp) = pid; else { @@ -2346,10 +2330,7 @@ convert_to_down_message(Process *c_p, type, from, reason); hp += 6; - ERL_MESSAGE_TOKEN(mp) = NIL; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = NIL; -#endif + ERL_MESSAGE_TOKEN(mp) = am_undefined; /* Replace original signal with the exit message... */ convert_to_msg(c_p, sig, mp, next_nm_sig); @@ -2397,10 +2378,7 @@ convert_to_nodedown_messages(Process *c_p, ERL_MESSAGE_TERM(mp) = TUPLE2(hp, am_nodedown, node); ERL_MESSAGE_FROM(mp) = am_system; - ERL_MESSAGE_TOKEN(mp) = NIL; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = NIL; -#endif + ERL_MESSAGE_TOKEN(mp) = am_undefined; mp->next = nd_first; nd_first = mp; if (!nd_last) @@ -2830,6 +2808,7 @@ handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing, if (is_alive) erts_factory_trim_and_close(&hfact, &msg, 1); + ERL_MESSAGE_TOKEN(mp) = am_undefined; erts_queue_proc_message(c_p, rp, locks, mp, msg); if (!is_alive && locks) @@ -2931,6 +2910,7 @@ sync_suspend_reply(Process *c_p, ErtsMessage *mp, erts_aint32_t state) tp[2] = ssusp->async ? am_not_suspended : am_internal_error; } } + ERL_MESSAGE_TOKEN(mp) = am_undefined; erts_queue_proc_message(c_p, rp, 0, mp, ssusp->message); } } diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index d5bd17ff3e..0f7f1598fd 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -9897,6 +9897,7 @@ notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, ASSERT(hp_start + hsz == hp); #endif + ERL_MESSAGE_TOKEN(mp) = am_undefined; erts_queue_proc_message(c_p, rp, rp_locks, mp, msg); if (c_p == rp) diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index a60e117bab..8d20ccdf90 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -293,7 +293,7 @@ typedef enum { * highest index... * * Remember to update description in erts_pre_init_process() - * when adding new flags... + * and etp-commands when adding new flags... */ typedef enum { diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 904993ceb6..621ba108ba 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -671,30 +671,37 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, byte *ext, Uint size, DistEntry *dep, - ErtsAtomCache *cache, - Uint32 *connection_id) + Uint32 conn_id, + ErtsAtomCache *cache) { -#undef ERTS_EXT_FAIL -#undef ERTS_EXT_HDR_FAIL -#if 1 -#define ERTS_EXT_FAIL goto fail -#define ERTS_EXT_HDR_FAIL goto bad_hdr -#else -#define ERTS_EXT_FAIL abort() -#define ERTS_EXT_HDR_FAIL abort() -#endif + register byte *ep; + + edep->heap_size = -1; + edep->flags = 0; + edep->dep = dep; + + ASSERT(dep); + erts_de_rlock(dep); - register byte *ep = ext; ASSERT(dep->flags & DFLAG_UTF8_ATOMS); - edep->heap_size = -1; - edep->ext_endp = ext+size; + if ((dep->state != ERTS_DE_STATE_CONNECTED && + dep->state != ERTS_DE_STATE_PENDING) + || dep->connection_id != conn_id) { + erts_de_runlock(dep); + return ERTS_PREP_DIST_EXT_CLOSED; + } - if (size < 2) - ERTS_EXT_FAIL; + if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)) { + /* Skip PASS_THROUGH */ + ext++; + size--; + } + edep->ext_endp = ext + size; + ep = ext; - if (!dep) - ERTS_INTERNAL_ERROR("Invalid use"); + if (size < 2) + goto fail; if (ep[0] != VERSION_MAGIC) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); @@ -703,28 +710,17 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, "channel %d\n", dist_entry_channel_no(dep)); erts_send_error_to_logger_nogl(dsbufp); - ERTS_EXT_FAIL; + goto fail; } - edep->flags = 0; - edep->dep = dep; - - erts_de_rlock(dep); - - if (dep->state != ERTS_DE_STATE_CONNECTED && - dep->state != ERTS_DE_STATE_PENDING) { - erts_de_runlock(dep); - return ERTS_PREP_DIST_EXT_CLOSED; - } if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) edep->flags |= ERTS_DIST_EXT_DFLAG_HDR; - *connection_id = dep->connection_id; edep->connection_id = dep->connection_id; if (ep[1] != DIST_HEADER) { if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) - ERTS_EXT_HDR_FAIL; + goto bad_hdr; edep->attab.size = 0; edep->extp = ext; } @@ -733,17 +729,17 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, int no_atoms; if (!(edep->flags & ERTS_DIST_EXT_DFLAG_HDR)) - ERTS_EXT_HDR_FAIL; + goto bad_hdr; #undef CHKSIZE #define CHKSIZE(SZ) \ - do { if ((SZ) > edep->ext_endp - ep) ERTS_EXT_HDR_FAIL; } while(0) + do { if ((SZ) > edep->ext_endp - ep) goto bad_hdr; } while(0) CHKSIZE(1+1+1); ep += 2; no_atoms = (int) get_int8(ep); if (no_atoms < 0 || ERTS_ATOM_CACHE_SIZE < no_atoms) - ERTS_EXT_HDR_FAIL; + goto bad_hdr; ep++; if (no_atoms) { int long_atoms = 0; @@ -821,18 +817,18 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, /* atom already cached */ cix += (int) get_int8(ep); if (cix >= ERTS_ATOM_CACHE_SIZE) - ERTS_EXT_HDR_FAIL; + goto bad_hdr; ep++; atom = cache->in_arr[cix]; if (!is_atom(atom)) - ERTS_EXT_HDR_FAIL; + goto bad_hdr; edep->attab.atom[tix] = atom; } else { /* new cached atom */ cix += (int) get_int8(ep); if (cix >= ERTS_ATOM_CACHE_SIZE) - ERTS_EXT_HDR_FAIL; + goto bad_hdr; ep++; if (long_atoms) { CHKSIZE(2); @@ -850,7 +846,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, ERTS_ATOM_ENC_UTF8, 0); if (is_non_value(atom)) - ERTS_EXT_HDR_FAIL; + goto bad_hdr; ep += len; cache->in_arr[cix] = atom; edep->attab.atom[tix] = atom; @@ -870,12 +866,12 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, edep->extp = ep; #ifdef ERTS_DEBUG_USE_DIST_SEP if (*ep != VERSION_MAGIC) - ERTS_EXT_HDR_FAIL; + goto bad_hdr; #endif } #ifdef ERTS_DEBUG_USE_DIST_SEP if (*ep != VERSION_MAGIC) - ERTS_EXT_FAIL; + goto fail; #endif erts_de_runlock(dep); @@ -883,8 +879,6 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, return ERTS_PREP_DIST_EXT_SUCCESS; #undef CHKSIZE -#undef ERTS_EXT_FAIL -#undef ERTS_EXT_HDR_FAIL bad_hdr: { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); @@ -901,7 +895,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, } fail: { erts_de_runlock(dep); - erts_kill_dist_connection(dep, *connection_id); + erts_kill_dist_connection(dep, conn_id); } return ERTS_PREP_DIST_EXT_FAILED; } diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index bbd9b4bad2..edac177cc6 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -182,7 +182,7 @@ void erts_destroy_dist_ext_copy(ErtsDistExternal *); #define ERTS_PREP_DIST_EXT_CLOSED (1) int erts_prepare_dist_ext(ErtsDistExternal *, byte *, Uint, - DistEntry *, ErtsAtomCache *, Uint32 *); + DistEntry *, Uint32 conn_id, ErtsAtomCache *); Sint erts_decode_dist_ext_size(ErtsDistExternal *); Eterm erts_decode_dist_ext(ErtsHeapFactory* factory, ErtsDistExternal *); diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index 2cf268162d..21ae205237 100644 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -1083,7 +1083,7 @@ extern int erts_do_net_exits(DistEntry*, Eterm); extern int distribution_info(fmtfn_t, void *); extern int is_node_name_atom(Eterm a); -extern int erts_net_message(Port *, DistEntry *, +extern int erts_net_message(Port *, DistEntry *, Uint32 conn_id, byte *, ErlDrvSizeT, byte *, ErlDrvSizeT); extern void init_dist(void); diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 2446b3c074..5325480901 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -375,7 +375,6 @@ static Port *create_port(char *name, prt->control_flags = 0; prt->bytes_in = 0; prt->bytes_out = 0; - prt->dist_entry = NULL; ERTS_PORT_INIT_CONNECTED(prt, pid); prt->common.u.alive.reg = NULL; ERTS_PTMR_INIT(prt); @@ -3287,7 +3286,6 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to, if (trace_send) trace_port_send(prt, to, tuple, 1); - ERL_MESSAGE_TOKEN(mp) = am_undefined; erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id); if (rp_locks) erts_proc_unlock(rp, rp_locks); @@ -3459,7 +3457,6 @@ deliver_vec_message(Port* prt, /* Port */ if (IS_TRACED_FL(prt, F_TRACE_SEND)) trace_port_send(prt, to, tuple, 1); - ERL_MESSAGE_TOKEN(mp) = am_undefined; erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id); erts_proc_unlock(rp, rp_locks); if (!scheduler) @@ -3615,12 +3612,12 @@ terminate_port(Port *prt) erts_cleanup_port_data(prt); + ASSERT(erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY) == NULL); + psd = (ErtsPrtSD *) erts_atomic_read_nob(&prt->psd); if (psd) erts_free(ERTS_ALC_T_PRTSD, psd); - ASSERT(prt->dist_entry == NULL); - kill_port(prt); /* @@ -3761,10 +3758,12 @@ erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed, DRV_MONITOR_UNLOCK_PDL(prt); } - if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && prt->dist_entry) { - erts_do_net_exits(prt->dist_entry, modified_reason); - erts_deref_dist_entry(prt->dist_entry); - prt->dist_entry = NULL; + if (state & ERTS_PORT_SFLG_DISTRIBUTION) { + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); + ASSERT(dep); + erts_do_net_exits(dep, modified_reason); + erts_deref_dist_entry(dep); + erts_prtsd_set(prt, ERTS_PRTSD_DIST_ENTRY, NULL); erts_atomic32_read_band_relb(&prt->state, ~ERTS_PORT_SFLG_DISTRIBUTION); } @@ -5052,7 +5051,7 @@ set_busy_port(ErlDrvPort dprt, int on) DTRACE1(port_not_busy, port_str); } #endif - if (prt->dist_entry) { + if (erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY) != NULL) { /* * Processes suspended on distribution ports are * normally queued on the dist entry. @@ -5382,7 +5381,6 @@ void driver_report_exit(ErlDrvPort ix, int status) if (IS_TRACED_FL(prt, F_TRACE_SEND)) trace_port_send(prt, pid, tuple, 1); - ERL_MESSAGE_TOKEN(mp) = am_undefined; erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id); erts_proc_unlock(rp, rp_locks); @@ -5988,8 +5986,6 @@ driver_deliver_term(Port *prt, Eterm to, ErlDrvTermData* data, int len) from = prt->common.id; } - /* send message */ - ERL_MESSAGE_TOKEN(factory.message) = am_undefined; erts_queue_message(rp, rp_locks, factory.message, mess, from); } else if (res == -2) { @@ -6174,9 +6170,12 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, else erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len)); if (state & ERTS_PORT_SFLG_DISTRIBUTION) { - erts_atomic64_inc_nob(&prt->dist_entry->in); + DistEntry* dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); + Uint32 conn_id = (Uint32)(UWord) erts_prtsd_get(prt, ERTS_PRTSD_CONN_ID); + erts_atomic64_inc_nob(&dep->in); return erts_net_message(prt, - prt->dist_entry, + dep, + conn_id, (byte*) hbuf, hlen, (byte*) (bin->orig_bytes+offs), len); } @@ -6215,15 +6214,19 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, else erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len)); if (state & ERTS_PORT_SFLG_DISTRIBUTION) { - erts_atomic64_inc_nob(&prt->dist_entry->in); + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); + Uint32 conn_id = (Uint32)(UWord) erts_prtsd_get(prt, ERTS_PRTSD_CONN_ID); + erts_atomic64_inc_nob(&dep->in); if (len == 0) return erts_net_message(prt, - prt->dist_entry, + dep, + conn_id, NULL, 0, (byte*) hbuf, hlen); else return erts_net_message(prt, - prt->dist_entry, + dep, + conn_id, (byte*) hbuf, hlen, (byte*) buf, len); } diff --git a/erts/emulator/beam/ops.tab b/erts/emulator/beam/ops.tab index c51e4ef784..e76d896ffc 100644 --- a/erts/emulator/beam/ops.tab +++ b/erts/emulator/beam/ops.tab @@ -244,7 +244,7 @@ if_end # Optimize for that case. raise x==2 x==1 => i_raise raise Trace=y Value=y => move Trace x=2 | move Value x=1 | i_raise -raise Trace Value => move Trace x=3 | move Value x=1 | move x=3 x=2 | i_raise +raise Trace Value => move Trace x | move Value x=1 | move x x=2 | i_raise i_raise |