diff options
author | Sverker Eriksson <[email protected]> | 2018-09-18 14:38:58 +0200 |
---|---|---|
committer | Sverker Eriksson <[email protected]> | 2018-09-18 14:38:58 +0200 |
commit | 990c25dc1265c0450ecb47ca73116ccb72a246c8 (patch) | |
tree | 4cec06b76454f0bb9bbe44e7f5cf3778d1abaf86 /erts/emulator/beam/dist.c | |
parent | 59b49c6a5bc8053db97a27029ecf5245784b45fc (diff) | |
parent | 06a5b038c1fb5f722b7f691488aaf18981f1344f (diff) | |
download | otp-990c25dc1265c0450ecb47ca73116ccb72a246c8.tar.gz otp-990c25dc1265c0450ecb47ca73116ccb72a246c8.tar.bz2 otp-990c25dc1265c0450ecb47ca73116ccb72a246c8.zip |
Merge branch 'sverker/erts/robustify-dist-entry-states/OTP-15297' into maint
* sverker/erts/robustify-dist-entry-states/OTP-15297:
erts: Refactor port dist_entry & conn_id into PRTSD
Remove ugly fail case macros
Consolidate distribution entry state transitions
erts: Fix bug in undocumented system_flag(scheduling_statistics)
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r-- | erts/emulator/beam/dist.c | 329 |
1 files changed, 134 insertions, 195 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index f041e92375..0633bff3c2 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -116,11 +116,12 @@ static Export *dist_ctrl_put_data_trap; /* forward declarations */ -static void clear_dist_entry(DistEntry*); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); static Sint abort_connection(DistEntry* dep, Uint32 conn_id); +static ErtsDistOutputBuf* clear_de_out_queues(DistEntry*); +static void free_de_out_queues(DistEntry*, ErtsDistOutputBuf*); static erts_atomic_t no_caches; static erts_atomic_t no_nodes; @@ -556,7 +557,10 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) } } else { /* Call from distribution controller (port/process) */ - ErtsMonLnkDist *mld; + ErtsMonLnkDist *mld; + ErtsAtomCache *cache; + ErtsProcList *suspendees; + ErtsDistOutputBuf *obuf; Uint32 flags; erts_atomic_set_mb(&dep->dist_cmd_scheduled, 1); @@ -588,6 +592,22 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) nodename = dep->sysname; flags = dep->flags; + erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) NIL); + cache = dep->cache; + dep->cache = NULL; + + erts_mtx_lock(&dep->qlock); + + erts_atomic64_set_nob(&dep->in, 0); + erts_atomic64_set_nob(&dep->out, 0); + + obuf = clear_de_out_queues(dep); + suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + erts_set_dist_entry_not_connected(dep); erts_de_rwunlock(dep); @@ -601,7 +621,13 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) ? am_connection_closed : reason)); - clear_dist_entry(dep); + erts_resume_processes(suspendees); + + delete_cache(cache); + + free_de_out_queues(dep, obuf); + if (dep->transcode_ctx) + transcode_free_ctx(dep); } dec_no_nodes(); @@ -732,41 +758,6 @@ static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf) } } -static void clear_dist_entry(DistEntry *dep) -{ - ErtsAtomCache *cache; - ErtsProcList *suspendees; - ErtsDistOutputBuf *obuf; - - erts_de_rwlock(dep); - erts_atomic_set_nob(&dep->input_handler, - (erts_aint_t) NIL); - cache = dep->cache; - dep->cache = NULL; - - erts_mtx_lock(&dep->qlock); - - erts_atomic64_set_nob(&dep->in, 0); - erts_atomic64_set_nob(&dep->out, 0); - - obuf = clear_de_out_queues(dep); - dep->state = ERTS_DE_STATE_IDLE; - suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); - - erts_mtx_unlock(&dep->qlock); - erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); - dep->send = NULL; - erts_de_rwunlock(dep); - - erts_resume_processes(suspendees); - - delete_cache(cache); - - free_de_out_queues(dep, obuf); - if (dep->transcode_ctx) - transcode_free_ctx(dep); -} - int erts_dsend_context_dtor(Binary* ctx_bin) { ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin); @@ -861,7 +852,7 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, DeclareTmpHeapNoproc(ctl_heap,6); int res; - if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { /* * Receiver does not support DOP_MONITOR_P_EXIT (see dsig_send_monitor) */ @@ -889,7 +880,7 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, DeclareTmpHeapNoproc(ctl_heap,5); int res; - if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { /* * Receiver does not support DOP_MONITOR_P. * Just avoid sending it and by doing that reduce this monitor @@ -920,7 +911,7 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher, DeclareTmpHeapNoproc(ctl_heap,5); int res; - if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { /* * Receiver does not support DOP_DEMONITOR_P (see dsig_send_monitor) */ @@ -940,7 +931,7 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher, static int can_send_seqtrace_token(ErtsSendContext* ctx, Eterm token) { Eterm label; - if (ctx->dep->flags & DFLAG_BIG_SEQTRACE_LABELS) { + if (ctx->dsd.flags & DFLAG_BIG_SEQTRACE_LABELS) { /* The other end is capable of handling arbitrary seq_trace labels. */ return 1; } @@ -1001,7 +992,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) send_token = (token != NIL && can_send_seqtrace_token(ctx, token)); - if (ctx->dep->flags & DFLAG_SEND_SENDER) { + if (ctx->dsd.flags & DFLAG_SEND_SENDER) { dist_op = make_small(send_token ? DOP_SEND_SENDER_TT : DOP_SEND_SENDER); @@ -1218,13 +1209,13 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote) int erts_net_message(Port *prt, DistEntry *dep, + Uint32 conn_id, byte *hbuf, ErlDrvSizeT hlen, byte *buf, ErlDrvSizeT len) { ErtsDistExternal ede; - byte *t; Sint ctl_len; Eterm arg; Eterm from, to; @@ -1242,7 +1233,6 @@ int erts_net_message(Port *prt, Eterm token_size; Uint tuple_arity; int res; - Uint32 connection_id; #ifdef ERTS_DIST_MSG_DBG ErlDrvSizeT orig_len = len; #endif @@ -1258,7 +1248,6 @@ int erts_net_message(Port *prt, return 0; } - ASSERT(hlen == 0); if (len == 0) { /* HANDLE TICK !!! */ @@ -1271,15 +1260,7 @@ int erts_net_message(Port *prt, bw(buf, len); #endif - if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) - t = buf; - else { - /* Skip PASS_THROUGH */ - t = buf+1; - len--; - } - - res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache, &connection_id); + res = erts_prepare_dist_ext(&ede, buf, len, dep, conn_id, dep->cache); switch (res) { case ERTS_PREP_DIST_EXT_CLOSED: @@ -1321,10 +1302,9 @@ int erts_net_message(Port *prt, PURIFY_MSG("data error"); goto decode_error; } - ctl_len = t - buf; #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, "<<%s CTL: %T\n", len != orig_len ? "P" : " ", arg); + erts_fprintf(stderr, "<< CTL: %T\n", arg); #endif if (is_not_tuple(arg) || @@ -1778,7 +1758,7 @@ decode_error: } data_error: UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); - erts_kill_dist_connection(dep, connection_id); + erts_kill_dist_connection(dep, conn_id); ERTS_CHK_NO_PROC_LOCKS; return -1; } @@ -1834,7 +1814,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) while (1) { switch (ctx->phase) { case ERTS_DSIG_SEND_PHASE_INIT: - ctx->flags = dsdp->dep->flags; + ctx->flags = dsdp->flags; ctx->c_p = dsdp->proc; if (!ctx->c_p || dsdp->no_suspend) @@ -2089,13 +2069,14 @@ dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_output)) { + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), - "%T", prt->dist_entry->sysname); + "%T", dep->sysname); DTRACE4(dist_output, erts_this_node_sysname, port_str, remote_str, size); } @@ -2151,13 +2132,14 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_outputv)) { + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), - "%T", prt->dist_entry->sysname); + "%T", dep->sysname); DTRACE4(dist_outputv, erts_this_node_sysname, port_str, remote_str, size); } @@ -2195,7 +2177,7 @@ erts_dist_command(Port *prt, int initial_reds) Uint32 flags; Sint qsize, obufsize = 0; ErtsDistOutputQueue oq, foq; - DistEntry *dep = prt->dist_entry; + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); erts_aint32_t sched_flags; ErtsSchedulerData *esdp = erts_get_scheduler_data(); @@ -2571,11 +2553,12 @@ dist_ctrl_get_data_notification_1(BIF_ALIST_1) erts_aint32_t qflgs; erts_aint_t qsize; Eterm receiver = NIL; + Uint32 conn_id; if (!dep) BIF_ERROR(BIF_P, EXC_NOTSUP); - if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) + if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep) BIF_ERROR(BIF_P, BADARG); /* @@ -2585,6 +2568,11 @@ dist_ctrl_get_data_notification_1(BIF_ALIST_1) erts_de_rlock(dep); + if (dep->connection_id != conn_id) { + erts_de_runlock(dep); + BIF_ERROR(BIF_P, BADARG); + } + ASSERT(dep->cid == BIF_P->common.id); qflgs = erts_atomic32_read_acqb(&dep->qflgs); @@ -2625,6 +2613,7 @@ dist_ctrl_put_data_2(BIF_ALIST_2) DistEntry *dep; ErlDrvSizeT size; Eterm input_handler; + Uint32 conn_id; if (is_binary(BIF_ARG_2)) size = binary_size(BIF_ARG_2); @@ -2636,7 +2625,7 @@ dist_ctrl_put_data_2(BIF_ALIST_2) else BIF_ERROR(BIF_P, BADARG); - dep = erts_dhandle_to_dist_entry(BIF_ARG_1); + dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id); if (!dep) BIF_ERROR(BIF_P, BADARG); @@ -2656,7 +2645,7 @@ dist_ctrl_put_data_2(BIF_ALIST_2) erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); - (void) erts_net_message(NULL, dep, NULL, 0, data, size); + (void) erts_net_message(NULL, dep, conn_id, NULL, 0, data, size); /* * We ignore any decode failures. On fatal failures the * connection will be taken down by killing the @@ -2680,13 +2669,18 @@ dist_get_stat_1(BIF_ALIST_1) Sint64 read, write, pend; Eterm res, *hp, **hpp; Uint sz, *szp; - DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1); + Uint32 conn_id; + DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id); if (!dep) BIF_ERROR(BIF_P, BADARG); erts_de_rlock(dep); + if (dep->connection_id != conn_id) { + erts_de_runlock(dep); + BIF_ERROR(BIF_P, BADARG); + } read = (Sint64) erts_atomic64_read_nob(&dep->in); write = (Sint64) erts_atomic64_read_nob(&dep->out); pend = (Sint64) erts_atomic_read_nob(&dep->qsize); @@ -2717,19 +2711,25 @@ BIF_RETTYPE dist_ctrl_input_handler_2(BIF_ALIST_2) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + Uint32 conn_id; if (!dep) BIF_ERROR(BIF_P, EXC_NOTSUP); - if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) + if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep) BIF_ERROR(BIF_P, BADARG); if (is_not_internal_pid(BIF_ARG_2)) BIF_ERROR(BIF_P, BADARG); + erts_de_rlock(dep); + if (dep->connection_id != conn_id) { + erts_de_runlock(dep); + BIF_ERROR(BIF_P, BADARG); + } erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) BIF_ARG_2); - + erts_de_runlock(dep); BIF_RET(am_ok); } @@ -2743,15 +2743,21 @@ dist_ctrl_get_data_1(BIF_ALIST_1) Eterm *hp; ProcBin *pb; erts_aint_t qsize; + Uint32 conn_id; if (!dep) BIF_ERROR(BIF_P, EXC_NOTSUP); - if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) + if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep) BIF_ERROR(BIF_P, BADARG); erts_de_rlock(dep); + if (dep->connection_id != conn_id) { + erts_de_runlock(dep); + BIF_ERROR(BIF_P, BADARG); + } + if (dep->state == ERTS_DE_STATE_EXITING) goto return_none; @@ -2838,13 +2844,14 @@ erts_dist_port_not_busy(Port *prt) { #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_port_not_busy)) { + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), - "%T", prt->dist_entry->sysname); + "%T", dep->sysname); DTRACE3(dist_port_not_busy, erts_this_node_sysname, port_str, remote_str); } @@ -2870,10 +2877,10 @@ static void kill_connection(DistEntry *dep) } void -erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) +erts_kill_dist_connection(DistEntry *dep, Uint32 conn_id) { erts_de_rwlock(dep); - if (connection_id == dep->connection_id + if (conn_id == dep->connection_id && dep->state == ERTS_DE_STATE_CONNECTED) { kill_connection(dep); @@ -3209,23 +3216,6 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) else if (!dep) goto system_limit; /* Should never happen!!! */ - erts_de_rlock(dep); - de_locked = -1; - - if (dep->state == ERTS_DE_STATE_EXITING) { - /* Suspend on dist entry waiting for the exit to finish */ - ErtsProcList *plp = erts_proclist_create(BIF_P); - plp->next = NULL; - erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); - erts_mtx_lock(&dep->qlock); - erts_proclist_store_last(&dep->suspended, plp); - erts_mtx_unlock(&dep->qlock); - goto yield; - } - - erts_de_runlock(dep); - de_locked = 0; - if (is_internal_pid(BIF_ARG_2)) { if (BIF_P->common.id == BIF_ARG_2) { ErtsSetupConnDistCtrl scdc; @@ -3271,7 +3261,7 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) hp = HAlloc(BIF_P, 3); } else { - int new; + Uint32 conn_id; pp = erts_id2port_sflgs(BIF_ARG_2, BIF_P, @@ -3280,7 +3270,7 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) erts_de_rwlock(dep); de_locked = 1; - if (dep->state == ERTS_DE_STATE_EXITING) + if (dep->state != ERTS_DE_STATE_PENDING) goto badarg; if (!pp || (erts_atomic32_read_nob(&pp->state) @@ -3290,49 +3280,39 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) goto badarg; - if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) - new = 0; - else { - if (dep->state != ERTS_DE_STATE_PENDING) { - if (dep->state == ERTS_DE_STATE_IDLE) - erts_set_dist_entry_pending(dep); - else - goto badarg; - } - - if (pp->dist_entry || is_not_nil(dep->cid)) - goto badarg; + if (erts_prtsd_get(pp, ERTS_PRTSD_DIST_ENTRY) != NULL + || is_not_nil(dep->cid)) + goto badarg; - erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); + erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); - pp->dist_entry = dep; + erts_prtsd_set(pp, ERTS_PRTSD_DIST_ENTRY, dep); + erts_prtsd_set(pp, ERTS_PRTSD_CONN_ID, (void*)(UWord)dep->connection_id); - ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); + ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); - dep->send = (pp->drv_ptr->outputv - ? dist_port_commandv - : dist_port_command); - ASSERT(dep->send); + dep->send = (pp->drv_ptr->outputv + ? dist_port_commandv + : dist_port_command); + ASSERT(dep->send); - /* - * Dist-ports do not use the "busy port message queue" functionality, but - * instead use "busy dist entry" functionality. - */ - { - ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; - erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); - } - - setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version); - de_locked = 0; - new = !0; + /* + * Dist-ports do not use the "busy port message queue" functionality, but + * instead use "busy dist entry" functionality. + */ + { + ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; + erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); } - hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); - res = erts_build_dhandle(&hp, &BIF_P->off_heap, dep); + conn_id = dep->connection_id; + setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version); + de_locked = 0; + + hp = HAlloc(BIF_P, 3 + ERTS_DHANDLE_SIZE); + res = erts_build_dhandle(&hp, &BIF_P->off_heap, dep, conn_id); res_tag = am_ok; /* Connection up */ - if (new) - dep = NULL; /* inc of refc transferred to port (dist_entry field) */ + dep = NULL; /* inc of refc transferred to port (dist_entry field) */ } ASSERT(is_value(res) && is_value(res_tag)); @@ -3358,12 +3338,6 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) return ret; - yield: - ERTS_BIF_PREP_YIELD4(ret, - bif_export[BIF_erts_internal_create_dist_channel_4], - BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4); - goto done; - badarg: ERTS_BIF_PREP_RET(ret, am_badarg); goto done; @@ -3385,8 +3359,7 @@ setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, dep->creation = 0; ASSERT(is_internal_port(ctrlr) || is_internal_pid(ctrlr)); - ASSERT(erts_atomic_read_nob(&dep->qsize) == 0 - || (dep->state == ERTS_DE_STATE_PENDING)); + ASSERT(dep->state == ERTS_DE_STATE_PENDING); if (flags & DFLAG_DIST_HDR_ATOM_CACHE) create_cache(dep); @@ -3432,37 +3405,20 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment * DistEntry *dep = scdcp->dep; int dep_locked = 0; Eterm *hp; - erts_aint32_t state; + Uint32 conn_id; if (redsp) *redsp = 1; - state = erts_atomic32_read_nob(&c_p->state); - - if (state & ERTS_PSFLG_EXITING) - goto badarg; + ASSERT(!ERTS_PROC_IS_EXITING(c_p)); erts_de_rwlock(dep); dep_locked = !0; - if (dep->state == ERTS_DE_STATE_EXITING) + if (dep->state != ERTS_DE_STATE_PENDING) goto badarg; - if (ERTS_PROC_GET_DIST_ENTRY(c_p)) { - if (dep == ERTS_PROC_GET_DIST_ENTRY(c_p) - && (c_p->flags & F_DISTRIBUTION) - && dep->cid == c_p->common.id) { - goto connected; - } - goto badarg; - } - - if (dep->state != ERTS_DE_STATE_PENDING) { - if (dep->state == ERTS_DE_STATE_IDLE) - erts_set_dist_entry_pending(dep); - else - goto badarg; - } + conn_id = dep->connection_id; if (is_not_nil(dep->cid)) goto badarg; @@ -3477,18 +3433,17 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment * setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id, scdcp->flags, scdcp->version); -connected: /* we take over previous inc in refc of dep */ if (!bpp) /* called directly... */ - return erts_make_dhandle(c_p, dep); + return erts_make_dhandle(c_p, dep, conn_id); erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg); - *bpp = new_message_buffer(ERTS_MAGIC_REF_THING_SIZE); + *bpp = new_message_buffer(ERTS_DHANDLE_SIZE); hp = (*bpp)->mem; - return erts_build_dhandle(&hp, &(*bpp)->off_heap, dep); + return erts_build_dhandle(&hp, &(*bpp)->off_heap, dep, conn_id); badarg: @@ -3529,30 +3484,27 @@ BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1) erts_de_rwlock(dep); switch (dep->state) { - case ERTS_DE_STATE_PENDING: case ERTS_DE_STATE_CONNECTED: + case ERTS_DE_STATE_EXITING: + case ERTS_DE_STATE_PENDING: conn_id = dep->connection_id; break; case ERTS_DE_STATE_IDLE: erts_set_dist_entry_pending(dep); conn_id = dep->connection_id; break; - case ERTS_DE_STATE_EXITING: - conn_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; - break; default: erts_exit(ERTS_ABORT_EXIT, "Invalid dep->state (%d)\n", dep->state); } erts_de_rwunlock(dep); - hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); - dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep); + hp = HAlloc(BIF_P, ERTS_DHANDLE_SIZE); + dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep, conn_id); erts_deref_dist_entry(dep); - BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle)); + BIF_RET(dhandle); } Sint erts_abort_connection_rwunlock(DistEntry* dep) { - Sint reds = 0; ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); if (dep->state == ERTS_DE_STATE_CONNECTED) { @@ -3562,6 +3514,7 @@ Sint erts_abort_connection_rwunlock(DistEntry* dep) ErtsAtomCache *cache; ErtsDistOutputBuf *obuf; ErtsProcList *resume_procs; + Sint reds = 0; ErtsMonLnkDist *mld; ASSERT(is_nil(dep->cid)); @@ -3583,7 +3536,6 @@ Sint erts_abort_connection_rwunlock(DistEntry* dep) dep->send = NULL; erts_set_dist_entry_not_connected(dep); - erts_de_rwunlock(dep); schedule_con_monitor_link_cleanup(mld, THE_NON_VALUE, @@ -3596,19 +3548,10 @@ Sint erts_abort_connection_rwunlock(DistEntry* dep) delete_cache(cache); free_de_out_queues(dep, obuf); - - /* - * We wait to make DistEntry idle and accept new connection attempts - * until all is cleared and deallocated. This to get some back pressure - * against repeated failing connection attempts saturating all CPUs - * with cleanup jobs. - */ - erts_de_rwlock(dep); - ASSERT(dep->state == ERTS_DE_STATE_EXITING); - dep->state = ERTS_DE_STATE_IDLE; + return reds; } erts_de_rwunlock(dep); - return reds; + return 0; } static Sint abort_connection(DistEntry *dep, Uint32 conn_id) @@ -3623,22 +3566,19 @@ static Sint abort_connection(DistEntry *dep, Uint32 conn_id) BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2) { DistEntry* dep; - Eterm* tp; + Uint32 conn_id; + Sint reds; - if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) { - BIF_ERROR(BIF_P, BADARG); - } - tp = tuple_val(BIF_ARG_2); - dep = erts_dhandle_to_dist_entry(tp[2]); - if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1) + if (is_not_atom(BIF_ARG_1)) + BIF_ERROR(BIF_P, BADARG); + dep = erts_dhandle_to_dist_entry(BIF_ARG_2, &conn_id); + if (!dep || dep != erts_find_dist_entry(BIF_ARG_1) || dep == erts_this_dist_entry) { BIF_ERROR(BIF_P, BADARG); } - if (dep) { - Sint reds = abort_connection(dep, unsigned_val(tp[1])); - BUMP_REDS(BIF_P, reds); - } + reds = abort_connection(dep, conn_id); + BUMP_REDS(BIF_P, reds); BIF_RET(am_true); } @@ -3669,14 +3609,13 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) } /* - * Send {auto_connect, Node, ConnId, DHandle} to net_kernel + * Send {auto_connect, Node, DHandle} to net_kernel */ mp = erts_alloc_message_heap(net_kernel, &nk_locks, - 5 + ERTS_MAGIC_REF_THING_SIZE, + 4 + ERTS_DHANDLE_SIZE, &hp, &ohp); - dhandle = erts_build_dhandle(&hp, ohp, dep); - msg = TUPLE4(hp, am_auto_connect, dep->sysname, make_small(conn_id), - dhandle); + dhandle = erts_build_dhandle(&hp, ohp, dep, conn_id); + msg = TUPLE3(hp, am_auto_connect, dep->sysname, dhandle); ERL_MESSAGE_TOKEN(mp) = am_undefined; erts_queue_proc_message(proc, net_kernel, nk_locks, mp, msg); erts_proc_unlock(net_kernel, nk_locks); |