diff options
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r-- | erts/emulator/beam/dist.c | 1012 |
1 files changed, 638 insertions, 374 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index bc168fc58d..cd799e04b8 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -103,21 +103,12 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) -#define PASS_THROUGH 'p' /* This code should go */ - int erts_is_alive; /* System must be blocked on change */ int erts_dist_buf_busy_limit; /* distribution trap functions */ -Export* dsend2_trap = NULL; -Export* dsend3_trap = NULL; -/*Export* dsend_nosuspend_trap = NULL;*/ -Export* dlink_trap = NULL; -Export* dunlink_trap = NULL; Export* dmonitor_node_trap = NULL; -Export* dgroup_leader_trap = NULL; -Export* dexit_trap = NULL; Export* dmonitor_p_trap = NULL; /* local variables */ @@ -129,6 +120,7 @@ static void clear_dist_entry(DistEntry*); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); +static Sint abort_connection(DistEntry* dep, Uint32 conn_id); static erts_atomic_t no_caches; static erts_atomic_t no_nodes; @@ -383,22 +375,24 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp) if (!rp) goto done; erts_proc_lock(rp, rp_locks); - rlnk = erts_remove_link(&ERTS_P_LINKS(rp), name); - if (rlnk != NULL) { - ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE)); - erts_destroy_link(rlnk); - } - n = ERTS_LINK_REFC(lnk); - for (i = 0; i < n; ++i) { - Eterm tup; - Eterm *hp; - ErtsMessage *msgp; - - msgp = erts_alloc_message_heap(rp, &rp_locks, - 3, &hp, &ohp); - tup = TUPLE2(hp, am_nodedown, name); - erts_queue_message(rp, rp_locks, msgp, tup, am_system); - } + if (!ERTS_PROC_IS_EXITING(rp)) { + rlnk = erts_remove_link(&ERTS_P_LINKS(rp), name); + if (rlnk != NULL) { + ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE)); + erts_destroy_link(rlnk); + } + n = ERTS_LINK_REFC(lnk); + for (i = 0; i < n; ++i) { + Eterm tup; + Eterm *hp; + ErtsMessage *msgp; + + msgp = erts_alloc_message_heap(rp, &rp_locks, + 3, &hp, &ohp); + tup = TUPLE2(hp, am_nodedown, name); + erts_queue_message(rp, rp_locks, msgp, tup, am_system); + } + } erts_proc_unlock(rp, rp_locks); } done: @@ -484,41 +478,55 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */ DistEntry *tdep; - int no_dist_ctrl = 0; + int no_dist_ctrl; + int no_pending; Eterm nd_reason = (reason == am_no_network ? am_no_network : am_net_kernel_terminated); + int i = 0; + Eterm *dist_ctrl; + DistEntry** pending; + + ERTS_UNDEF(dist_ctrl, NULL); + ERTS_UNDEF(pending, NULL); + erts_rwmtx_rlock(&erts_dist_table_rwmtx); - for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) - no_dist_ctrl++; - for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) - no_dist_ctrl++; + no_dist_ctrl = (erts_no_of_hidden_dist_entries + + erts_no_of_visible_dist_entries); + no_pending = erts_no_of_pending_dist_entries; /* KILL all port controllers */ - if (no_dist_ctrl == 0) - erts_rwmtx_runlock(&erts_dist_table_rwmtx); - else { - Eterm def_buf[128]; - int i = 0; - Eterm *dist_ctrl; - - if (no_dist_ctrl <= sizeof(def_buf)/sizeof(def_buf[0])) - dist_ctrl = &def_buf[0]; - else - dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, - sizeof(Eterm)*no_dist_ctrl); + if (no_dist_ctrl) { + dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, + sizeof(Eterm)*no_dist_ctrl); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) { ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + ASSERT(i < no_dist_ctrl); dist_ctrl[i++] = tdep->cid; } for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) { ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + ASSERT(i < no_dist_ctrl); dist_ctrl[i++] = tdep->cid; } - erts_rwmtx_runlock(&erts_dist_table_rwmtx); + ASSERT(i == no_dist_ctrl); + } + if (no_pending) { + pending = erts_alloc(ERTS_ALC_T_TMP, sizeof(DistEntry*)*no_pending); + i = 0; + for (tdep = erts_pending_dist_entries; tdep; tdep = tdep->next) { + ASSERT(is_nil(tdep->cid)); + ASSERT(i < no_pending); + pending[i++] = tdep; + erts_ref_dist_entry(tdep); + } + ASSERT(i == no_pending); + } + erts_rwmtx_runlock(&erts_dist_table_rwmtx); - for (i = 0; i < no_dist_ctrl; i++) { + if (no_dist_ctrl) { + for (i = 0; i < no_dist_ctrl; i++) { if (is_internal_pid(dist_ctrl[i])) schedule_kill_dist_ctrl_proc(dist_ctrl[i]); else { @@ -532,11 +540,18 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) prt, dist_ctrl[i], nd_reason, NULL); } } - } + } + erts_free(ERTS_ALC_T_TMP, dist_ctrl); + } + + if (no_pending) { + for (i = 0; i < no_pending; i++) { + abort_connection(pending[i], pending[i]->connection_id); + erts_deref_dist_entry(pending[i]); + } + erts_free(ERTS_ALC_T_TMP, pending); + } - if (dist_ctrl != &def_buf[0]) - erts_free(ERTS_ALC_T_TMP, dist_ctrl); - } /* * When last dist ctrl exits, node will be taken @@ -573,13 +588,13 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) erts_port_task_abort(&dep->dist_cmd); } - if (dep->status & ERTS_DE_SFLG_EXITING) { + if (dep->state == ERTS_DE_STATE_EXITING) { #ifdef DEBUG ASSERT(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT); #endif } else { - dep->status |= ERTS_DE_SFLG_EXITING; + dep->state = ERTS_DE_STATE_EXITING; erts_mtx_lock(&dep->qlock); ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT); @@ -613,7 +628,6 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) reason == am_normal ? am_connection_closed : reason); clear_dist_entry(dep); - } dec_no_nodes(); @@ -627,6 +641,14 @@ trap_function(Eterm func, int arity) return erts_export_put(am_erlang, func, arity); } +/* + * Sync with dist_util.erl: + * + * -record(erts_dflags, + * {default, mandatory, addable, rejectable, strict_order}). + */ +static Eterm erts_dflags_record; + void init_dist(void) { init_nodes_monitors(); @@ -638,18 +660,21 @@ void init_dist(void) erts_atomic_init_nob(&no_caches, 0); /* Lookup/Install all references to trap functions */ - dsend2_trap = trap_function(am_dsend,2); - dsend3_trap = trap_function(am_dsend,3); - /* dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/ - dlink_trap = trap_function(am_dlink,1); - dunlink_trap = trap_function(am_dunlink,1); dmonitor_node_trap = trap_function(am_dmonitor_node,3); - dgroup_leader_trap = trap_function(am_dgroup_leader,2); - dexit_trap = trap_function(am_dexit, 2); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, am_dist_ctrl_put_data, 2); + { + Eterm* hp = erts_alloc(ERTS_ALC_T_LITERAL, (1+6)*sizeof(Eterm)); + erts_dflags_record = TUPLE6(hp, am_erts_dflags, + make_small(DFLAG_DIST_DEFAULT), + make_small(DFLAG_DIST_MANDATORY), + make_small(DFLAG_DIST_ADDABLE), + make_small(DFLAG_DIST_REJECTABLE), + make_small(DFLAG_DIST_STRICT_ORDER)); + erts_set_literal_tag(&erts_dflags_record, hp, (1+6)); + } } #define ErtsDistOutputBuf2Binary(OB) \ @@ -664,6 +689,7 @@ alloc_dist_obuf(Uint size) obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[0]; #ifdef DEBUG obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN; + obuf->alloc_endp = obuf->data + size; ASSERT(bin == ErtsDistOutputBuf2Binary(obuf)); #endif return obuf; @@ -684,31 +710,11 @@ size_obuf(ErtsDistOutputBuf *obuf) return bin->orig_size; } -static void clear_dist_entry(DistEntry *dep) +static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) { - Sint obufsize = 0; - ErtsAtomCache *cache; - ErtsProcList *suspendees; ErtsDistOutputBuf *obuf; - erts_de_rwlock(dep); - erts_atomic_set_nob(&dep->input_handler, - (erts_aint_t) NIL); - cache = dep->cache; - dep->cache = NULL; - -#ifdef DEBUG - erts_de_links_lock(dep); - ASSERT(!dep->nlinks); - ASSERT(!dep->node_links); - ASSERT(!dep->monitors); - erts_de_links_unlock(dep); -#endif - - erts_mtx_lock(&dep->qlock); - - erts_atomic64_set_nob(&dep->in, 0); - erts_atomic64_set_nob(&dep->out, 0); + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); if (!dep->out_queue.last) obuf = dep->finalized_out_queue.first; @@ -728,17 +734,13 @@ static void clear_dist_entry(DistEntry *dep) dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; - dep->status = 0; - suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); - - erts_mtx_unlock(&dep->qlock); - erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); - dep->send = NULL; - erts_de_rwunlock(dep); - erts_resume_processes(suspendees); + return obuf; +} - delete_cache(cache); +static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf) +{ + Sint obufsize = 0; while (obuf) { ErtsDistOutputBuf *fobuf; @@ -750,13 +752,56 @@ static void clear_dist_entry(DistEntry *dep) if (obufsize) { erts_mtx_lock(&dep->qlock); - ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); + ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize); erts_mtx_unlock(&dep->qlock); } } +static void clear_dist_entry(DistEntry *dep) +{ + ErtsAtomCache *cache; + ErtsProcList *suspendees; + ErtsDistOutputBuf *obuf; + + erts_de_rwlock(dep); + erts_atomic_set_nob(&dep->input_handler, + (erts_aint_t) NIL); + cache = dep->cache; + dep->cache = NULL; + +#ifdef DEBUG + erts_de_links_lock(dep); + ASSERT(!dep->nlinks); + ASSERT(!dep->node_links); + ASSERT(!dep->monitors); + erts_de_links_unlock(dep); +#endif + + erts_mtx_lock(&dep->qlock); + + erts_atomic64_set_nob(&dep->in, 0); + erts_atomic64_set_nob(&dep->out, 0); + + obuf = clear_de_out_queues(dep); + dep->state = ERTS_DE_STATE_IDLE; + suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + erts_de_rwunlock(dep); + + erts_resume_processes(suspendees); + + delete_cache(cache); + + free_de_out_queues(dep, obuf); + if (dep->transcode_ctx) + transcode_free_ctx(dep); +} + int erts_dsend_context_dtor(Binary* ctx_bin) { ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin); @@ -772,8 +817,8 @@ int erts_dsend_context_dtor(Binary* ctx_bin) if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) { free_dist_obuf(ctx->dss.obuf); } - if (ctx->dep_to_deref) - erts_deref_dist_entry(ctx->dep_to_deref); + if (ctx->deref_dep) + erts_deref_dist_entry(ctx->dep); return 1; } @@ -852,6 +897,13 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, DeclareTmpHeapNoproc(ctl_heap,6); int res; + if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + /* + * Receiver does not support DOP_MONITOR_P_EXIT (see dsig_send_monitor) + */ + return ERTS_DSIG_SEND_OK; + } + UseTmpHeapNoproc(6); ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT), @@ -879,6 +931,16 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, DeclareTmpHeapNoproc(ctl_heap,5); int res; + if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + /* + * Receiver does not support DOP_MONITOR_P. + * Just avoid sending it and by doing that reduce this monitor + * to only supervise the connection. This will work for simple c-nodes + * with a 1-to-1 relation between "Erlang process" and OS-process. + */ + return ERTS_DSIG_SEND_OK; + } + UseTmpHeapNoproc(5); ctl = TUPLE4(&ctl_heap[0], make_small(DOP_MONITOR_P), @@ -901,6 +963,13 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher, DeclareTmpHeapNoproc(ctl_heap,5); int res; + if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + /* + * Receiver does not support DOP_DEMONITOR_P (see dsig_send_monitor) + */ + return ERTS_DSIG_SEND_OK; + } + UseTmpHeapNoproc(5); ctl = TUPLE4(&ctl_heap[0], make_small(DOP_DEMONITOR_P), @@ -1161,7 +1230,7 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote) # define PURIFY_MSG(msg) \ do { \ char buf__[1]; size_t bufsz__ = sizeof(buf__); \ - if (erts_sys_getenv_raw("VALGRIND_LOG_XML", buf__, &bufsz__) >= 0) { \ + if (erts_sys_explicit_8bit_getenv("VALGRIND_LOG_XML", buf__, &bufsz__) >= 0) { \ VALGRIND_PRINTF_XML("<erlang_error_log>" \ "%s, line %d: %s</erlang_error_log>\n", \ __FILE__, __LINE__, msg); \ @@ -1324,7 +1393,7 @@ int erts_net_message(Port *prt, /* This is tricky (we MUST force a distributed send) */ ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_exit(&dsd, to, from, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); @@ -1416,7 +1485,7 @@ int erts_net_message(Port *prt, if (!rp) { ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, am_noproc); @@ -1879,33 +1948,32 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) { ctx->acmp = erts_get_atom_cache_map(ctx->c_p); - ctx->pass_through_size = 0; + ctx->max_finalize_prepend = 0; } else { ctx->acmp = NULL; - ctx->pass_through_size = 1; + ctx->max_finalize_prepend = 3; } #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, ">>%s CTL: %T\n", ctx->pass_through_size ? "P" : " ", ctx->ctl); - if (is_value(ctx->msg)) - erts_fprintf(stderr, " MSG: %T\n", ctx->msg); + erts_fprintf(stderr, ">> CTL: %T\n", ctx->ctl); + if (is_value(ctx->msg)) + erts_fprintf(stderr, " MSG: %T\n", ctx->msg); #endif - ctx->data_size = ctx->pass_through_size; + ctx->data_size = ctx->max_finalize_prepend; erts_reset_atom_cache_map(ctx->acmp); erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size); - if (is_value(ctx->msg)) { - ctx->u.sc.wstack.wstart = NULL; - ctx->u.sc.flags = ctx->flags; - ctx->u.sc.level = 0; - ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; - } else { - ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; - } - break; + if (is_non_value(ctx->msg)) { + ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; + break; + } + ctx->u.sc.wstack.wstart = NULL; + ctx->u.sc.flags = ctx->flags; + ctx->u.sc.level = 0; + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; case ERTS_DSIG_SEND_PHASE_MSG_SIZE: if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) { retval = ERTS_DSIG_SEND_CONTINUE; @@ -1920,23 +1988,25 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->data_size += ctx->dhdr_ext_size; ctx->obuf = alloc_dist_obuf(ctx->data_size); - ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->pass_through_size + ctx->dhdr_ext_size; + ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size; /* Encode internal version of dist header */ ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp); /* Encode control message */ erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL); - if (is_value(ctx->msg)) { - ctx->u.ec.flags = ctx->flags; - ctx->u.ec.level = 0; - ctx->u.ec.wstack.wstart = NULL; - ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; - } else { - ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; - } - break; - - case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: + if (is_non_value(ctx->msg)) { + ctx->obuf->msg_start = NULL; + ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; + break; + } + ctx->u.ec.flags = ctx->flags; + ctx->u.ec.hopefull_flags = 0; + ctx->u.ec.level = 0; + ctx->u.ec.wstack.wstart = NULL; + ctx->obuf->msg_start = ctx->obuf->ext_endp; + + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; + case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) { retval = ERTS_DSIG_SEND_CONTINUE; goto done; @@ -1949,11 +2019,12 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) int resume = 0; ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp); - ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->pass_through_size); + ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->max_finalize_prepend); ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size); ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp; + ctx->obuf->hopefull_flags = ctx->u.ec.hopefull_flags; /* * Signal encoded; now verify that the connection still exists, * and if so enqueue the signal and schedule it for send. @@ -1961,9 +2032,9 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->obuf->next = NULL; erts_de_rlock(dep); cid = dep->cid; - if (cid != dsdp->cid - || dep->connection_id != dsdp->connection_id - || dep->status & ERTS_DE_SFLG_EXITING) { + if (dep->state == ERTS_DE_STATE_EXITING + || dep->state == ERTS_DE_STATE_IDLE + || dep->connection_id != dsdp->connection_id) { /* Not the same connection as when we started; drop message... */ erts_de_runlock(dep); free_dist_obuf(ctx->obuf); @@ -2037,8 +2108,13 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } erts_mtx_unlock(&dep->qlock); - if (is_internal_port(dep->cid)) - erts_schedule_dist_command(NULL, dep); + if (dep->state != ERTS_DE_STATE_PENDING) { + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + } + else { + notify_proc = NIL; + } erts_de_runlock(dep); if (is_internal_pid(notify_proc)) notify_dist_data(ctx->c_p, notify_proc); @@ -2203,7 +2279,6 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) #endif #define ERTS_PORT_REDS_DIST_CMD_START 5 -#define ERTS_PORT_REDS_DIST_CMD_FINALIZE 3 #define ERTS_PORT_REDS_DIST_CMD_EXIT 200 #define ERTS_PORT_REDS_DIST_CMD_RESUMED 5 #define ERTS_PORT_REDS_DIST_CMD_DATA(SZ) \ @@ -2212,10 +2287,10 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) : ((((Sint) (SZ)) >> 10) & ((Sint) ERTS_PORT_REDS_MASK__))) int -erts_dist_command(Port *prt, int reds_limit) +erts_dist_command(Port *prt, int initial_reds) { - Sint reds = ERTS_PORT_REDS_DIST_CMD_START; - Uint32 status; + Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START; + enum dist_entry_state state; Uint32 flags; Sint qsize, obufsize = 0; ErtsDistOutputQueue oq, foq; @@ -2230,15 +2305,18 @@ erts_dist_command(Port *prt, int reds_limit) erts_de_rlock(dep); flags = dep->flags; - status = dep->status; + state = dep->state; send = dep->send; erts_de_runlock(dep); - if (status & ERTS_DE_SFLG_EXITING) { + if (state == ERTS_DE_STATE_EXITING) { erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1); - return reds + ERTS_PORT_REDS_DIST_CMD_EXIT; + reds -= ERTS_PORT_REDS_DIST_CMD_EXIT; + return initial_reds - reds; } + ASSERT(state != ERTS_DE_STATE_PENDING); + ASSERT(send); /* @@ -2263,7 +2341,7 @@ erts_dist_command(Port *prt, int reds_limit) sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - if (reds > reds_limit) + if (reds < 0) goto preempted; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) { @@ -2278,13 +2356,13 @@ erts_dist_command(Port *prt, int reds_limit) erts_fprintf(stderr, ">> "); bw(foq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = foq.first; - obufsize += size_obuf(fob); - foq.first = foq.first->next; - free_dist_obuf(fob); + reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = foq.first; + obufsize += size_obuf(fob); + foq.first = foq.first->next; + free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + preempt = reds < 0 || (sched_flags & ERTS_PTS_FLG_EXIT); if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) break; } while (foq.first && !preempt); @@ -2297,81 +2375,71 @@ erts_dist_command(Port *prt, int reds_limit) if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) { if (oq.first) { ErtsDistOutputBuf *ob; - int preempt; + ErtsDistOutputBuf *last_finalized = NULL; finalize_only: - preempt = 0; ob = oq.first; ASSERT(ob); do { - ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, - dep->cache, - flags); - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ - ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - preempt = reds > reds_limit; - if (preempt) - break; - ob = ob->next; + reds = erts_encode_ext_dist_header_finalize(ob, dep, flags, reds); + if (reds < 0) + break; + last_finalized = ob; + ob = ob->next; } while (ob); - /* - * At least one buffer was finalized; if we got preempted, - * ob points to the last buffer that we finalized. - */ - if (foq.last) - foq.last->next = oq.first; - else - foq.first = oq.first; - if (!preempt) { - /* All buffers finalized */ - foq.last = oq.last; - oq.first = oq.last = NULL; - } - else { - /* Not all buffers finalized; split oq. */ - foq.last = ob; - oq.first = ob->next; - if (oq.first) - ob->next = NULL; - else - oq.last = NULL; - } - if (preempt) - goto preempted; + if (last_finalized) { + /* + * At least one buffer was finalized; if we got preempted, + * ob points to the next buffer to continue finalize. + */ + if (foq.last) + foq.last->next = oq.first; + else + foq.first = oq.first; + foq.last = last_finalized; + if (!ob) { + /* All buffers finalized */ + ASSERT(foq.last == oq.last); + ASSERT(foq.last->next == NULL); + oq.first = oq.last = NULL; + } + else { + /* Not all buffers finalized; split oq. */ + ASSERT(foq.last->next == ob); + foq.last->next = NULL; + oq.first = ob; + } + } + if (reds <= 0) + goto preempted; } } else { int de_busy; int preempt = 0; while (oq.first && !preempt) { - ErtsDistOutputBuf *fob; - Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ - ASSERT(&oq.first->data[0] <= oq.first->extp - && oq.first->extp < oq.first->ext_endp); - size = (*send)(prt, oq.first); + ErtsDistOutputBuf *fob; + Uint size; + reds = erts_encode_ext_dist_header_finalize(oq.first, dep, flags, reds); + if (reds < 0) { + preempt = 1; + break; + } + ASSERT(&oq.first->data[0] <= oq.first->extp + && oq.first->extp <= oq.first->ext_endp); + size = (*send)(prt, oq.first); erts_atomic64_inc_nob(&dep->out); - esdp->io.out += (Uint64) size; + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG erts_fprintf(stderr, ">> "); bw(oq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = oq.first; - obufsize += size_obuf(fob); - oq.first = oq.first->next; - free_dist_obuf(fob); + reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = oq.first; + obufsize += size_obuf(fob); + oq.first = oq.first->next; + free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + preempt = reds <= 0 || (sched_flags & ERTS_PTS_FLG_EXIT); if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt) goto finalize_only; } @@ -2411,7 +2479,7 @@ erts_dist_command(Port *prt, int reds_limit) erts_mtx_unlock(&dep->qlock); resumed = erts_resume_processes(suspendees); - reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } else erts_mtx_unlock(&dep->qlock); @@ -2434,8 +2502,7 @@ erts_dist_command(Port *prt, int reds_limit) erts_mtx_unlock(&dep->qlock); } - ASSERT(foq.first || !foq.last); - ASSERT(!foq.first || foq.last); + ASSERT(!!foq.first == !!foq.last); ASSERT(!dep->finalized_out_queue.first); ASSERT(!dep->finalized_out_queue.last); @@ -2445,10 +2512,10 @@ erts_dist_command(Port *prt, int reds_limit) } /* Avoid wrapping reduction counter... */ - if (reds > INT_MAX/2) - reds = INT_MAX/2; + if (reds < INT_MIN/2) + reds = INT_MIN/2; - return reds; + return initial_reds - reds; preempted: /* @@ -2456,8 +2523,7 @@ erts_dist_command(Port *prt, int reds_limit) * since last call to driver. */ - ASSERT(oq.first || !oq.last); - ASSERT(!oq.first || oq.last); + ASSERT(!!oq.first == !!oq.last); if (sched_flags & ERTS_PTS_FLG_EXIT) { /* @@ -2481,12 +2547,6 @@ erts_dist_command(Port *prt, int reds_limit) foq.first = NULL; foq.last = NULL; - -#ifdef DEBUG - erts_mtx_lock(&dep->qlock); - ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); - erts_mtx_unlock(&dep->qlock); -#endif } else { if (oq.first) { @@ -2776,7 +2836,8 @@ BIF_RETTYPE dist_ctrl_get_data_1(BIF_ALIST_1) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); - int reds = 1; + const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P); + Sint reds = initial_reds; ErtsDistOutputBuf *obuf; Eterm *hp; ProcBin *pb; @@ -2790,7 +2851,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_de_rlock(dep); - if (dep->status & ERTS_DE_SFLG_EXITING) + if (dep->state == ERTS_DE_STATE_EXITING) goto return_none; ASSERT(dep->cid == BIF_P->common.id); @@ -2807,6 +2868,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1) { if (!dep->tmp_out_queue.first) { ASSERT(!dep->tmp_out_queue.last); + ASSERT(!dep->transcode_ctx); qsize = erts_atomic_read_acqb(&dep->qsize); if (qsize > 0) { erts_mtx_lock(&dep->qlock); @@ -2824,21 +2886,18 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_de_runlock(dep); BIF_RET(am_none); } - else { - obuf = dep->tmp_out_queue.first; - dep->tmp_out_queue.first = obuf->next; - if (!obuf->next) - dep->tmp_out_queue.last = NULL; + + obuf = dep->tmp_out_queue.first; + reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->flags, reds); + if (reds < 0) { + erts_de_runlock(dep); + ERTS_BIF_YIELD1(bif_export[BIF_dist_ctrl_get_data_1], + BIF_P, BIF_ARG_1); } - obuf->extp = erts_encode_ext_dist_header_finalize(obuf->extp, - dep->cache, - dep->flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--obuf->extp = PASS_THROUGH; /* 'pass through' needed */ - ASSERT(&obuf->data[0] <= obuf->extp - && obuf->extp < obuf->ext_endp); + dep->tmp_out_queue.first = obuf->next; + if (!obuf->next) + dep->tmp_out_queue.last = NULL; } erts_atomic64_inc_nob(&dep->out); @@ -2866,11 +2925,11 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_mtx_unlock(&dep->qlock); if (resume_procs) { int resumed = erts_resume_processes(resume_procs); - reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } } - BIF_RET2(make_binary(pb), reds); + BIF_RET2(make_binary(pb), (initial_reds - reds)); } void @@ -2892,24 +2951,31 @@ erts_dist_port_not_busy(Port *prt) erts_schedule_dist_command(prt, NULL); } +static void kill_connection(DistEntry *dep) +{ + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); + ASSERT(dep->state == ERTS_DE_STATE_CONNECTED); + + dep->state = ERTS_DE_STATE_EXITING; + erts_mtx_lock(&dep->qlock); + ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); + erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); + erts_mtx_unlock(&dep->qlock); + + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + else if (is_internal_pid(dep->cid)) + schedule_kill_dist_ctrl_proc(dep->cid); +} + void erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) { erts_de_rwlock(dep); if (connection_id == dep->connection_id - && !(dep->status & ERTS_DE_SFLG_EXITING)) { - - dep->status |= ERTS_DE_SFLG_EXITING; - - erts_mtx_lock(&dep->qlock); - ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); - erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); - erts_mtx_unlock(&dep->qlock); + && dep->state == ERTS_DE_STATE_CONNECTED) { - if (is_internal_port(dep->cid)) - erts_schedule_dist_command(NULL, dep); - else if (is_internal_pid(dep->cid)) - schedule_kill_dist_ctrl_proc(dep->cid); + kill_connection(dep); } erts_de_rwunlock(dep); } @@ -3069,6 +3135,10 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ info_dist_entry(to, arg, dep, 0, 1); } + for (dep = erts_pending_dist_entries; dep; dep = dep->next) { + info_dist_entry(to, arg, dep, 0, 0); + } + for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) { if (dep != erts_this_dist_entry) { info_dist_entry(to, arg, dep, 0, 0); @@ -3091,7 +3161,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ monitor_node -- turn on/off node monitoring node controller only: - dist_exit/3 -- send exit signals from remote to local process dist_link/2 -- link a remote process to a local dist_unlink/2 -- unlink a remote from a local ****************************************************************************/ @@ -3101,15 +3170,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ /********************************************************************** ** Set the node name of current node fail if node already is set. ** setnode(name@host, Creation) - ** loads functions pointer to trap_functions from module erlang. - ** erlang:dsend/2 - ** erlang:dlink/1 - ** erlang:dunlink/1 - ** erlang:dmonitor_node/3 - ** erlang:dgroup_leader/2 - ** erlang:dexit/2 - ** -- are these needed ? - ** dexit/1 ***********************************************************************/ BIF_RETTYPE setnode_2(BIF_ALIST_2) @@ -3133,15 +3193,8 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) goto error; /* Check that all trap functions are defined !! */ - if (dsend2_trap->addressv[0] == NULL || - dsend3_trap->addressv[0] == NULL || - /* dsend_nosuspend_trap->address == NULL ||*/ - dlink_trap->addressv[0] == NULL || - dunlink_trap->addressv[0] == NULL || - dmonitor_node_trap->addressv[0] == NULL || - dgroup_leader_trap->addressv[0] == NULL || - dmonitor_p_trap->addressv[0] == NULL || - dexit_trap->addressv[0] == NULL) { + if (dmonitor_node_trap->addressv[0] == NULL || + dmonitor_p_trap->addressv[0] == NULL) { goto error; } @@ -3218,6 +3271,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) ErtsProcLocks proc_unlock = 0; Process *proc; Port *pp = NULL; + Eterm notify_proc; + erts_aint32_t qflgs; /* * Check and pick out arguments @@ -3245,21 +3300,25 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) if (!is_atom(ic) || !is_atom(oc)) goto badarg; - /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */ - if (!(DFLAG_EXTENDED_REFERENCES & flags)) { + if (~flags & DFLAG_DIST_MANDATORY) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "%T", BIF_P->common.id); if (BIF_P->common.u.alive.reg) erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name); erts_dsprintf(dsbufp, " attempted to enable connection to node %T " - "which is not able to handle extended references.\n", + "which does not support all mandatory capabilities.\n", BIF_ARG_1); erts_send_error_to_logger(BIF_P->group_leader, dsbufp); goto badarg; } /* + * ToDo: Should we not pass connection_id as well + * to make sure it's the right connection we commit. + */ + + /* * Arguments seem to be in order. */ @@ -3302,6 +3361,23 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto badarg; } + if (dep->state == ERTS_DE_STATE_EXITING) { + /* Suspend on dist entry waiting for the exit to finish */ + ErtsProcList *plp = erts_proclist_create(BIF_P); + plp->next = NULL; + erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); + erts_mtx_lock(&dep->qlock); + erts_proclist_store_last(&dep->suspended, plp); + erts_mtx_unlock(&dep->qlock); + goto yield; + } + if (dep->state != ERTS_DE_STATE_PENDING) { + if (dep->state == ERTS_DE_STATE_IDLE) + erts_set_dist_entry_pending(dep); + else + goto badarg; + } + if (is_not_nil(dep->cid)) goto badarg; @@ -3334,7 +3410,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto done; /* Already set */ } - if (dep->status & ERTS_DE_SFLG_EXITING) { + if (dep->state == ERTS_DE_STATE_EXITING) { /* Suspend on dist entry waiting for the exit to finish */ ErtsProcList *plp = erts_proclist_create(BIF_P); plp->next = NULL; @@ -3344,8 +3420,12 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_mtx_unlock(&dep->qlock); goto yield; } - - ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); + if (dep->state != ERTS_DE_STATE_PENDING) { + if (dep->state == ERTS_DE_STATE_IDLE) + erts_set_dist_entry_pending(dep); + else + goto badarg; + } if (pp->dist_entry || is_not_nil(dep->cid)) goto badarg; @@ -3376,7 +3456,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) dep->creation = 0; #ifdef DEBUG - ASSERT(erts_atomic_read_nob(&dep->qsize) == 0); + ASSERT(erts_atomic_read_nob(&dep->qsize) == 0 + || (dep->state == ERTS_DE_STATE_PENDING)); #endif if (flags & DFLAG_DIST_HDR_ATOM_CACHE) @@ -3384,7 +3465,26 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); + notify_proc = NIL; + if (erts_atomic_read_nob(&dep->qsize)) { + if (is_internal_port(dep->cid)) { + erts_schedule_dist_command(NULL, dep); + } + else { + qflgs = erts_atomic32_read_nob(&dep->qflgs); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + qflgs = erts_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + notify_proc = dep->cid; + ASSERT(is_internal_pid(notify_proc)); + } + } + } + } erts_de_rwunlock(dep); + if (is_internal_pid(notify_proc)) + notify_dist_data(BIF_P, notify_proc); ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); @@ -3426,79 +3526,189 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto done; } +BIF_RETTYPE erts_internal_get_dflags_0(BIF_ALIST_0) +{ + return erts_dflags_record; +} -/**********************************************************************/ -/* dist_exit(Local, Term, Remote) -> Bool */ +BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1) +{ + DistEntry* dep; + Uint32 conn_id; + Eterm* hp; + Eterm dhandle; + + if (is_not_atom(BIF_ARG_1)) { + BIF_ERROR(BIF_P, BADARG); + } + dep = erts_find_or_insert_dist_entry(BIF_ARG_1); + + if (dep == erts_this_dist_entry) { + erts_deref_dist_entry(dep); + BIF_ERROR(BIF_P, BADARG); + } + + erts_de_rwlock(dep); + + switch (dep->state) { + case ERTS_DE_STATE_PENDING: + case ERTS_DE_STATE_CONNECTED: + conn_id = dep->connection_id; + break; + case ERTS_DE_STATE_IDLE: + erts_set_dist_entry_pending(dep); + conn_id = dep->connection_id; + break; + case ERTS_DE_STATE_EXITING: + conn_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; + break; + default: + erts_exit(ERTS_ABORT_EXIT, "Invalid dep->state (%d)\n", dep->state); + } + erts_de_rwunlock(dep); + hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); + dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep); + erts_deref_dist_entry(dep); + BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle)); +} -BIF_RETTYPE dist_exit_3(BIF_ALIST_3) +static Sint abort_connection(DistEntry* dep, Uint32 conn_id) { - Eterm local; - Eterm remote; - DistEntry *rdep; + erts_de_rwlock(dep); - local = BIF_ARG_1; - remote = BIF_ARG_3; + if (dep->connection_id != conn_id) + ; + else if (dep->state == ERTS_DE_STATE_CONNECTED) { + kill_connection(dep); + } + else if (dep->state == ERTS_DE_STATE_PENDING) { + NetExitsContext nec = {dep}; + ErtsLink *nlinks; + ErtsLink *node_links; + ErtsMonitor *monitors; + ErtsAtomCache *cache; + ErtsDistOutputBuf *obuf; + ErtsProcList *resume_procs; + Sint reds = 0; + + ASSERT(is_nil(dep->cid)); - /* Check that remote is a remote process */ - if (is_not_external_pid(remote)) - goto error; + erts_de_links_lock(dep); + monitors = dep->monitors; + nlinks = dep->nlinks; + node_links = dep->node_links; + dep->monitors = NULL; + dep->nlinks = NULL; + dep->node_links = NULL; + erts_de_links_unlock(dep); - rdep = external_dist_entry(remote); - - if(rdep == erts_this_dist_entry) - goto error; + cache = dep->cache; + dep->cache = NULL; + erts_mtx_lock(&dep->qlock); + obuf = dep->out_queue.first; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + ASSERT(!dep->tmp_out_queue.first); + ASSERT(!dep->finalized_out_queue.first); + resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; - /* Check that local is local */ - if (is_internal_pid(local)) { - Process *lp; - ErtsProcLocks lp_locks; - if (BIF_P->common.id == local) { - lp_locks = ERTS_PROC_LOCKS_ALL; - lp = BIF_P; - erts_proc_lock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR); - } - else { - lp_locks = ERTS_PROC_LOCKS_XSIG_SEND; - lp = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN, - local, lp_locks); - if (!lp) { - BIF_RET(am_true); /* ignore */ - } - } - - (void) erts_send_exit_signal(BIF_P, - remote, - lp, - &lp_locks, - BIF_ARG_2, - NIL, - NULL, - 0); - if (lp == BIF_P) - lp_locks &= ~ERTS_PROC_LOCK_MAIN; - erts_proc_unlock(lp, lp_locks); - if (lp == BIF_P) { - erts_aint32_t state = erts_atomic32_read_acqb(&BIF_P->state); - /* - * We may have exited current process and may have to take action. - */ - if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { - if (state & ERTS_PSFLG_PENDING_EXIT) - erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN); - ERTS_BIF_EXITED(BIF_P); - } - } + erts_set_dist_entry_not_connected(dep); + + erts_de_rwunlock(dep); + + erts_sweep_monitors(monitors, &doit_monitor_net_exits, &nec); + erts_sweep_links(nlinks, &doit_link_net_exits, &nec); + erts_sweep_links(node_links, &doit_node_link_net_exits, &nec); + + if (resume_procs) { + int resumed = erts_resume_processes(resume_procs); + reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + } + + delete_cache(cache); + free_de_out_queues(dep, obuf); + + /* + * We wait to make DistEntry idle and accept new connection attempts + * until all is cleared and deallocated. This to get some back pressure + * against repeated failing connection attempts saturating all CPUs + * with cleanup jobs. + */ + erts_de_rwlock(dep); + ASSERT(dep->state == ERTS_DE_STATE_EXITING); + dep->state = ERTS_DE_STATE_IDLE; + erts_de_rwunlock(dep); + return reds; } - else if (is_external_pid(local) - && external_dist_entry(local) == erts_this_dist_entry) { - BIF_RET(am_true); /* ignore */ + erts_de_rwunlock(dep); + return 0; +} + +BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2) +{ + DistEntry* dep; + Eterm* tp; + + if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) { + BIF_ERROR(BIF_P, BADARG); + } + tp = tuple_val(BIF_ARG_2); + dep = erts_dhandle_to_dist_entry(tp[2]); + if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1) + || dep == erts_this_dist_entry) { + BIF_ERROR(BIF_P, BADARG); + } + + if (dep) { + Sint reds = abort_connection(dep, unsigned_val(tp[1])); + BUMP_REDS(BIF_P, reds); } - else - goto error; BIF_RET(am_true); +} - error: - BIF_ERROR(BIF_P, BADARG); +int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) +{ + erts_de_rwlock(dep); + if (dep->state != ERTS_DE_STATE_IDLE) { + erts_de_rwunlock(dep); + } + else { + Process* net_kernel; + ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ; + Eterm *hp; + ErlOffHeap *ohp; + ErtsMessage *mp; + Eterm msg, dhandle; + Uint32 conn_id; + + erts_set_dist_entry_pending(dep); + conn_id = dep->connection_id; + erts_de_rwunlock(dep); + + net_kernel = erts_whereis_process(proc, proc_locks, + am_net_kernel, nk_locks, 0); + if (!net_kernel) { + abort_connection(dep, conn_id); + return 0; + } + + /* + * Send {auto_connect, Node, ConnId, DHandle} to net_kernel + */ + mp = erts_alloc_message_heap(net_kernel, &nk_locks, + 5 + ERTS_MAGIC_REF_THING_SIZE, + &hp, &ohp); + dhandle = erts_build_dhandle(&hp, ohp, dep); + msg = TUPLE4(hp, am_auto_connect, dep->sysname, make_small(conn_id), + dhandle); + erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); + erts_proc_unlock(net_kernel, nk_locks); + } + + return 1; } /**********************************************************************/ @@ -3574,9 +3784,11 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) ASSERT(erts_no_of_not_connected_dist_entries > 0); ASSERT(erts_no_of_hidden_dist_entries >= 0); + ASSERT(erts_no_of_pending_dist_entries >= 0); ASSERT(erts_no_of_visible_dist_entries >= 0); if(not_connected) - length += (erts_no_of_not_connected_dist_entries - 1); + length += ((erts_no_of_not_connected_dist_entries - 1) + + erts_no_of_pending_dist_entries); if(hidden) length += erts_no_of_hidden_dist_entries; if(visible) @@ -3596,13 +3808,18 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) #ifdef DEBUG endp = hp + length*2; #endif - if(not_connected) + if(not_connected) { for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { if (dep != erts_this_dist_entry) { result = CONS(hp, dep->sysname, result); hp += 2; } + } + for(dep = erts_pending_dist_entries; dep; dep = dep->next) { + result = CONS(hp, dep->sysname, result); + hp += 2; } + } if(hidden) for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { result = CONS(hp, dep->sysname, result); @@ -3647,11 +3864,17 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) DistEntry *dep; ErtsLink *lnk; Eterm l; + int async_connect = 1; for (l = Options; l != NIL && is_list(l); l = CDR(list_val(l))) { Eterm t = CAR(list_val(l)); - /* allow_passive_connect the only available option right now */ - if (t != am_allow_passive_connect) { + if (t == am_allow_passive_connect) { + /* + * Handle this horrible feature by falling back on old synchronous + * auto-connect (if needed) + */ + async_connect = 0; + } else { BIF_ERROR(p, BADARG); } } @@ -3665,50 +3888,91 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) && (Node != erts_this_node->sysname))) { BIF_ERROR(p, BADARG); } - dep = erts_sysname_to_connected_dist_entry(Node); - if (!dep) { - do_trap: - BIF_TRAP3(dmonitor_node_trap, p, Node, Bool, Options); - } - if (dep == erts_this_dist_entry) - goto done; - - erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - erts_de_rlock(dep); - if (ERTS_DE_IS_NOT_CONNECTED(dep)) { - erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); - erts_de_runlock(dep); - goto do_trap; - } - erts_de_links_lock(dep); - erts_de_runlock(dep); if (Bool == am_true) { - ASSERT(dep->cid != NIL); - lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, - p->common.id); - ++ERTS_LINK_REFC(lnk); - lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); - ++ERTS_LINK_REFC(lnk); - } - else { - lnk = erts_lookup_link(dep->node_links, p->common.id); - if (lnk != NULL) { - if ((--ERTS_LINK_REFC(lnk)) == 0) { - erts_destroy_link(erts_remove_link(&(dep->node_links), - p->common.id)); - } - } - lnk = erts_lookup_link(ERTS_P_LINKS(p), Node); - if (lnk != NULL) { - if ((--ERTS_LINK_REFC(lnk)) == 0) { - erts_destroy_link(erts_remove_link(&ERTS_P_LINKS(p), - Node)); + ErtsDSigData dsd; + dsd.node = Node; + dep = erts_find_or_insert_dist_entry(Node); + if (dep == erts_this_dist_entry) + goto done; + + erts_proc_lock(p, ERTS_PROC_LOCK_LINK); + + switch (erts_dsig_prepare(&dsd, dep, p, + (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), + ERTS_DSP_RLOCK, 0, async_connect)) { + case ERTS_DSIG_PREP_NOT_ALIVE: + case ERTS_DSIG_PREP_NOT_CONNECTED: + /* Trap to either send 'nodedown' or do passive connection attempt */ + trap: + erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); + erts_deref_dist_entry(dep); + BIF_TRAP3(dmonitor_node_trap, p, Node, Bool, Options); + case ERTS_DSIG_PREP_PENDING: + if (!async_connect) { + /* + * Pending connection may fail, so we must trap + * to ensure passive connection attempt + */ + erts_de_runlock(dep); + goto trap; } - } + /*fall through*/ + case ERTS_DSIG_PREP_CONNECTED: + erts_de_links_lock(dep); + erts_de_runlock(dep); + lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, + p->common.id); + ++ERTS_LINK_REFC(lnk); + lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); + ++ERTS_LINK_REFC(lnk); + erts_de_links_unlock(dep); + break; + default: + ERTS_ASSERT(! "Invalid dsig prepare result"); + } + erts_deref_dist_entry(dep); + } + else { /* Bool == false */ + dep = erts_sysname_to_connected_dist_entry(Node); + if (!dep) { + /* + * Before OTP-21 this case triggered auto-connect + * and a 'nodedown' message if that failed. + * Now it's a simple no-op which feels more reasonable. + */ + BIF_RET(am_true); + } + if (dep == erts_this_dist_entry) + goto done; + + erts_proc_lock(p, ERTS_PROC_LOCK_LINK); + erts_de_rlock(dep); + if (dep->state == ERTS_DE_STATE_IDLE) { + ASSERT(!dep->node_links); + erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); + erts_de_runlock(dep); + goto done; + } + erts_de_links_lock(dep); + erts_de_runlock(dep); + lnk = erts_lookup_link(dep->node_links, p->common.id); + if (lnk != NULL) { + if ((--ERTS_LINK_REFC(lnk)) == 0) { + erts_destroy_link(erts_remove_link(&(dep->node_links), + p->common.id)); + } + } + lnk = erts_lookup_link(ERTS_P_LINKS(p), Node); + if (lnk != NULL) { + if ((--ERTS_LINK_REFC(lnk)) == 0) { + erts_destroy_link(erts_remove_link(&ERTS_P_LINKS(p), + Node)); + } + } + erts_de_links_unlock(dep); } - erts_de_links_unlock(dep); erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); done: |