From f89fb92384280e2939414287a2ecb8f86a199318 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 14 Jul 2017 19:34:54 +0200 Subject: erts: Introduce asynchronous auto-connect --- erts/emulator/beam/dist.c | 274 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 208 insertions(+), 66 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index bc168fc58d..0fa399cf53 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -103,8 +103,6 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) -#define PASS_THROUGH 'p' /* This code should go */ - int erts_is_alive; /* System must be blocked on change */ int erts_dist_buf_busy_limit; @@ -684,31 +682,11 @@ size_obuf(ErtsDistOutputBuf *obuf) return bin->orig_size; } -static void clear_dist_entry(DistEntry *dep) +static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) { - Sint obufsize = 0; - ErtsAtomCache *cache; - ErtsProcList *suspendees; ErtsDistOutputBuf *obuf; - erts_de_rwlock(dep); - erts_atomic_set_nob(&dep->input_handler, - (erts_aint_t) NIL); - cache = dep->cache; - dep->cache = NULL; - -#ifdef DEBUG - erts_de_links_lock(dep); - ASSERT(!dep->nlinks); - ASSERT(!dep->node_links); - ASSERT(!dep->monitors); - erts_de_links_unlock(dep); -#endif - - erts_mtx_lock(&dep->qlock); - - erts_atomic64_set_nob(&dep->in, 0); - erts_atomic64_set_nob(&dep->out, 0); + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); if (!dep->out_queue.last) obuf = dep->finalized_out_queue.first; @@ -728,17 +706,12 @@ static void clear_dist_entry(DistEntry *dep) dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; - dep->status = 0; - suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); - - erts_mtx_unlock(&dep->qlock); - erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); - dep->send = NULL; - erts_de_rwunlock(dep); - - erts_resume_processes(suspendees); + return obuf; +} - delete_cache(cache); +static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf) +{ + Sint obufsize = 0; while (obuf) { ErtsDistOutputBuf *fobuf; @@ -750,13 +723,54 @@ static void clear_dist_entry(DistEntry *dep) if (obufsize) { erts_mtx_lock(&dep->qlock); - ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); + ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize); erts_mtx_unlock(&dep->qlock); } } +static void clear_dist_entry(DistEntry *dep) +{ + ErtsAtomCache *cache; + ErtsProcList *suspendees; + ErtsDistOutputBuf *obuf; + + erts_de_rwlock(dep); + erts_atomic_set_nob(&dep->input_handler, + (erts_aint_t) NIL); + cache = dep->cache; + dep->cache = NULL; + +#ifdef DEBUG + erts_de_links_lock(dep); + ASSERT(!dep->nlinks); + ASSERT(!dep->node_links); + ASSERT(!dep->monitors); + erts_de_links_unlock(dep); +#endif + + erts_mtx_lock(&dep->qlock); + + erts_atomic64_set_nob(&dep->in, 0); + erts_atomic64_set_nob(&dep->out, 0); + + obuf = clear_de_out_queues(dep); + dep->status = 0; + suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + erts_de_rwunlock(dep); + + erts_resume_processes(suspendees); + + delete_cache(cache); + + free_de_out_queues(dep, obuf); +} + int erts_dsend_context_dtor(Binary* ctx_bin) { ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin); @@ -772,8 +786,8 @@ int erts_dsend_context_dtor(Binary* ctx_bin) if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) { free_dist_obuf(ctx->dss.obuf); } - if (ctx->dep_to_deref) - erts_deref_dist_entry(ctx->dep_to_deref); + if (ctx->deref_dep) + erts_deref_dist_entry(ctx->dep); return 1; } @@ -1324,7 +1338,7 @@ int erts_net_message(Port *prt, /* This is tricky (we MUST force a distributed send) */ ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_exit(&dsd, to, from, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); @@ -1416,7 +1430,7 @@ int erts_net_message(Port *prt, if (!rp) { ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, am_noproc); @@ -1883,7 +1897,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } else { ctx->acmp = NULL; - ctx->pass_through_size = 1; + ctx->pass_through_size = 3; /* SVERK rename */ } #ifdef ERTS_DIST_MSG_DBG @@ -1961,9 +1975,9 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->obuf->next = NULL; erts_de_rlock(dep); cid = dep->cid; - if (cid != dsdp->cid - || dep->connection_id != dsdp->connection_id - || dep->status & ERTS_DE_SFLG_EXITING) { + if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED)) + || dep->status & ERTS_DE_SFLG_EXITING + || dep->connection_id != dsdp->connection_id) { /* Not the same connection as when we started; drop message... */ erts_de_runlock(dep); free_dist_obuf(ctx->obuf); @@ -2037,8 +2051,13 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } erts_mtx_unlock(&dep->qlock); - if (is_internal_port(dep->cid)) - erts_schedule_dist_command(NULL, dep); + if (!(dep->status & ERTS_DE_SFLG_PENDING)) { + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + } + else { + notify_proc = NIL; + } erts_de_runlock(dep); if (is_internal_pid(notify_proc)) notify_dist_data(ctx->c_p, notify_proc); @@ -2306,9 +2325,6 @@ erts_dist_command(Port *prt, int reds_limit) ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, dep->cache, flags); - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; preempt = reds > reds_limit; @@ -2346,21 +2362,18 @@ erts_dist_command(Port *prt, int reds_limit) int de_busy; int preempt = 0; while (oq.first && !preempt) { - ErtsDistOutputBuf *fob; - Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ - ASSERT(&oq.first->data[0] <= oq.first->extp - && oq.first->extp < oq.first->ext_endp); - size = (*send)(prt, oq.first); + ErtsDistOutputBuf *fob; + Uint size; + oq.first->extp + = erts_encode_ext_dist_header_finalize(oq.first->extp, + dep->cache, + flags); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + ASSERT(&oq.first->data[0] <= oq.first->extp + && oq.first->extp < oq.first->ext_endp); + size = (*send)(prt, oq.first); erts_atomic64_inc_nob(&dep->out); - esdp->io.out += (Uint64) size; + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG erts_fprintf(stderr, ">> "); bw(oq.first->extp, size); @@ -2482,11 +2495,13 @@ erts_dist_command(Port *prt, int reds_limit) foq.first = NULL; foq.last = NULL; +/* SVERK Hmmm.... #ifdef DEBUG erts_mtx_lock(&dep->qlock); ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); erts_mtx_unlock(&dep->qlock); #endif +*/ } else { if (oq.first) { @@ -3218,6 +3233,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) ErtsProcLocks proc_unlock = 0; Process *proc; Port *pp = NULL; + Eterm notify_proc; + erts_aint32_t qflgs; /* * Check and pick out arguments @@ -3245,15 +3262,14 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) if (!is_atom(ic) || !is_atom(oc)) goto badarg; - /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */ - if (!(DFLAG_EXTENDED_REFERENCES & flags)) { + if (~flags & DFLAG_DIST_MANDATORY) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "%T", BIF_P->common.id); if (BIF_P->common.u.alive.reg) erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name); erts_dsprintf(dsbufp, " attempted to enable connection to node %T " - "which is not able to handle extended references.\n", + "which does not support all mandatory capabilities.\n", BIF_ARG_1); erts_send_error_to_logger(BIF_P->group_leader, dsbufp); goto badarg; @@ -3376,7 +3392,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) dep->creation = 0; #ifdef DEBUG - ASSERT(erts_atomic_read_nob(&dep->qsize) == 0); + ASSERT(erts_atomic_read_nob(&dep->qsize) == 0 + || (dep->status & ERTS_DE_SFLG_PENDING)); #endif if (flags & DFLAG_DIST_HDR_ATOM_CACHE) @@ -3384,7 +3401,26 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); + notify_proc = NIL; + if (erts_atomic_read_nob(&dep->qsize)) { + if (is_internal_port(dep->cid)) { + erts_schedule_dist_command(NULL, dep); + } + else { + qflgs = erts_atomic32_read_nob(&dep->qflgs); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + qflgs = erts_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + notify_proc = dep->cid; + ASSERT(is_internal_pid(notify_proc)); + } + } + } + } erts_de_rwunlock(dep); + if (is_internal_pid(notify_proc)) + notify_dist_data(BIF_P, notify_proc); ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); @@ -3426,6 +3462,112 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto done; } +BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) +{ + DistEntry* dep; + Uint32 conn_id; + + if (is_not_atom(BIF_ARG_1)) { + BIF_ERROR(BIF_P, BADARG); + } + dep = erts_find_or_insert_dist_entry(BIF_ARG_1); + ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + + erts_de_rwlock(dep); + + if (ERTS_DE_IS_CONNECTED(dep) || dep->status & ERTS_DE_SFLG_PENDING) + conn_id = dep->connection_id; + else if (dep->status == 0) { + dep->status = ERTS_DE_SFLG_PENDING; + dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION; + dep->connection_id++; + dep->connection_id &= ERTS_DIST_CON_ID_MASK; + conn_id = dep->connection_id; + } + else { + ASSERT(!"SVERK: What to do?"); + conn_id = dep->connection_id; + } + erts_de_rwunlock(dep); + BIF_RET(make_small(conn_id)); +} + +BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) +{ + DistEntry* dep; + + if (is_not_atom(BIF_ARG_1) || is_not_small(BIF_ARG_2)) { + BIF_ERROR(BIF_P, BADARG); + } + dep = erts_find_dist_entry(BIF_ARG_1); + ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + + if (!dep) { + BIF_RET(am_false); + } + + erts_de_rwlock(dep); + + if (dep->status == ERTS_DE_SFLG_PENDING + && dep->connection_id == unsigned_val(BIF_ARG_2)) { + + NetExitsContext nec = {dep}; + ErtsLink *nlinks; + ErtsLink *node_links; + ErtsMonitor *monitors; + ErtsAtomCache *cache; + ErtsDistOutputBuf *obuf; + ErtsProcList *resume_procs; + Sint reds = 0; + + ASSERT(is_nil(dep->cid)); + + erts_de_links_lock(dep); + monitors = dep->monitors; + nlinks = dep->nlinks; + node_links = dep->node_links; + dep->monitors = NULL; + dep->nlinks = NULL; + dep->node_links = NULL; + erts_de_links_unlock(dep); + + cache = dep->cache; + dep->cache = NULL; + dep->status = 0; + dep->flags = 0; + erts_mtx_lock(&dep->qlock); + obuf = dep->out_queue.first; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + ASSERT(!dep->tmp_out_queue.first); + ASSERT(!dep->finalized_out_queue.first); + resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + erts_de_rwunlock(dep); + + erts_sweep_monitors(monitors, &doit_monitor_net_exits, &nec); + erts_sweep_links(nlinks, &doit_link_net_exits, &nec); + erts_sweep_links(node_links, &doit_node_link_net_exits, &nec); + + if (resume_procs) { + int resumed = erts_resume_processes(resume_procs); + reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + } + + delete_cache(cache); + + free_de_out_queues(dep, obuf); + + BIF_RET2(am_true, reds); + } + + erts_de_rwunlock(dep); + + BIF_RET(am_false); +} + /**********************************************************************/ /* dist_exit(Local, Term, Remote) -> Bool */ -- cgit v1.2.3 From 3dda5298322a76f2787c6c4dd1cce78416c20dc3 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 16 Aug 2017 16:56:12 +0200 Subject: erts: Async auto-connect for monitor_node Removed distribution_SUITE:applied_monitor_node as it seems to test apply of trapping BIF and monitor_node does not trap anymore. --- erts/emulator/beam/dist.c | 124 +++++++++++++++++++++++++++++++--------------- 1 file changed, 84 insertions(+), 40 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 0fa399cf53..724cb6b24d 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3789,11 +3789,17 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) DistEntry *dep; ErtsLink *lnk; Eterm l; + int async_connect = 1; for (l = Options; l != NIL && is_list(l); l = CDR(list_val(l))) { Eterm t = CAR(list_val(l)); - /* allow_passive_connect the only available option right now */ - if (t != am_allow_passive_connect) { + if (t == am_allow_passive_connect) { + /* + * Handle this horrible feature by falling back on old synchronous + * auto-connect (if needed) + */ + async_connect = 0; + } else { BIF_ERROR(p, BADARG); } } @@ -3807,47 +3813,85 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) && (Node != erts_this_node->sysname))) { BIF_ERROR(p, BADARG); } - dep = erts_sysname_to_connected_dist_entry(Node); - if (!dep) { - do_trap: - BIF_TRAP3(dmonitor_node_trap, p, Node, Bool, Options); - } - if (dep == erts_this_dist_entry) - goto done; - - erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - erts_de_rlock(dep); - if (ERTS_DE_IS_NOT_CONNECTED(dep)) { - erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); - erts_de_runlock(dep); - goto do_trap; - } - erts_de_links_lock(dep); - erts_de_runlock(dep); if (Bool == am_true) { - ASSERT(dep->cid != NIL); - lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, - p->common.id); - ++ERTS_LINK_REFC(lnk); - lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); - ++ERTS_LINK_REFC(lnk); - } - else { - lnk = erts_lookup_link(dep->node_links, p->common.id); - if (lnk != NULL) { - if ((--ERTS_LINK_REFC(lnk)) == 0) { - erts_destroy_link(erts_remove_link(&(dep->node_links), - p->common.id)); - } - } - lnk = erts_lookup_link(ERTS_P_LINKS(p), Node); - if (lnk != NULL) { - if ((--ERTS_LINK_REFC(lnk)) == 0) { - erts_destroy_link(erts_remove_link(&ERTS_P_LINKS(p), - Node)); + ErtsDSigData dsd; + dsd.node = Node; + dep = erts_find_or_insert_dist_entry(Node); + if (dep == erts_this_dist_entry) + goto done; + + erts_proc_lock(p, ERTS_PROC_LOCK_LINK); + + switch (erts_dsig_prepare(&dsd, &dep, p, + (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), + ERTS_DSP_RLOCK, 0, async_connect)) { + case ERTS_DSIG_PREP_NOT_ALIVE: + case ERTS_DSIG_PREP_NOT_CONNECTED: + /* Trap to either send 'nodedown' or do passive connection attempt */ + trap: + erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); + erts_deref_dist_entry(dep); + BIF_TRAP3(dmonitor_node_trap, p, Node, Bool, Options); + case ERTS_DSIG_PREP_PENDING: + if (!async_connect) { + /* + * Pending connection may fail, so we must trap + * to ensure passive connection attempt + */ + erts_de_runlock(dep); + goto trap; } - } + /*fall through*/ + case ERTS_DSIG_PREP_CONNECTED: + erts_de_links_lock(dep); + erts_de_runlock(dep); + lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, + p->common.id); + ++ERTS_LINK_REFC(lnk); + lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); + ++ERTS_LINK_REFC(lnk); + break; + default: + ERTS_ASSERT(! "Invalid dsig prepare result"); + } + } + else { /* Bool == false */ + dep = erts_sysname_to_connected_dist_entry(Node); + if (!dep) { + /* + * Before OTP-21 this case triggered auto-connect + * and a 'nodedown' message if that failed. + * Now it's a simple no-op which feels more reasonable. + */ + BIF_RET(am_true); + } + if (dep == erts_this_dist_entry) + goto done; + + erts_proc_lock(p, ERTS_PROC_LOCK_LINK); + erts_de_rlock(dep); + if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED))) { + erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); + erts_de_runlock(dep); + goto done; + } + erts_de_links_lock(dep); + erts_de_runlock(dep); + lnk = erts_lookup_link(dep->node_links, p->common.id); + if (lnk != NULL) { + if ((--ERTS_LINK_REFC(lnk)) == 0) { + erts_destroy_link(erts_remove_link(&(dep->node_links), + p->common.id)); + } + } + lnk = erts_lookup_link(ERTS_P_LINKS(p), Node); + if (lnk != NULL) { + if ((--ERTS_LINK_REFC(lnk)) == 0) { + erts_destroy_link(erts_remove_link(&ERTS_P_LINKS(p), + Node)); + } + } } erts_de_links_unlock(dep); -- cgit v1.2.3 From e8df66a934045a1b0dc1edf0a695b6dbcf4cca4b Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Mon, 24 Jul 2017 16:05:36 +0200 Subject: Remove obsolete erlang:dsend --- erts/emulator/beam/dist.c | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 724cb6b24d..7c44545de3 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -108,9 +108,6 @@ int erts_dist_buf_busy_limit; /* distribution trap functions */ -Export* dsend2_trap = NULL; -Export* dsend3_trap = NULL; -/*Export* dsend_nosuspend_trap = NULL;*/ Export* dlink_trap = NULL; Export* dunlink_trap = NULL; Export* dmonitor_node_trap = NULL; @@ -636,9 +633,6 @@ void init_dist(void) erts_atomic_init_nob(&no_caches, 0); /* Lookup/Install all references to trap functions */ - dsend2_trap = trap_function(am_dsend,2); - dsend3_trap = trap_function(am_dsend,3); - /* dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/ dlink_trap = trap_function(am_dlink,1); dunlink_trap = trap_function(am_dunlink,1); dmonitor_node_trap = trap_function(am_dmonitor_node,3); @@ -3148,10 +3142,7 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) goto error; /* Check that all trap functions are defined !! */ - if (dsend2_trap->addressv[0] == NULL || - dsend3_trap->addressv[0] == NULL || - /* dsend_nosuspend_trap->address == NULL ||*/ - dlink_trap->addressv[0] == NULL || + if (dlink_trap->addressv[0] == NULL || dunlink_trap->addressv[0] == NULL || dmonitor_node_trap->addressv[0] == NULL || dgroup_leader_trap->addressv[0] == NULL || -- cgit v1.2.3 From 5ad822ccfd841400bc44cb53acc6a0889ca3f128 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 15 Aug 2017 16:26:34 +0200 Subject: Remove obsolete erlang:dlink/1, dunlink/1 and dist_exit/3 --- erts/emulator/beam/dist.c | 87 +---------------------------------------------- 1 file changed, 1 insertion(+), 86 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 7c44545de3..6e08c72289 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -108,8 +108,6 @@ int erts_dist_buf_busy_limit; /* distribution trap functions */ -Export* dlink_trap = NULL; -Export* dunlink_trap = NULL; Export* dmonitor_node_trap = NULL; Export* dgroup_leader_trap = NULL; Export* dexit_trap = NULL; @@ -633,8 +631,6 @@ void init_dist(void) erts_atomic_init_nob(&no_caches, 0); /* Lookup/Install all references to trap functions */ - dlink_trap = trap_function(am_dlink,1); - dunlink_trap = trap_function(am_dunlink,1); dmonitor_node_trap = trap_function(am_dmonitor_node,3); dgroup_leader_trap = trap_function(am_dgroup_leader,2); dexit_trap = trap_function(am_dexit, 2); @@ -3100,7 +3096,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ monitor_node -- turn on/off node monitoring node controller only: - dist_exit/3 -- send exit signals from remote to local process dist_link/2 -- link a remote process to a local dist_unlink/2 -- unlink a remote from a local ****************************************************************************/ @@ -3111,9 +3106,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ ** Set the node name of current node fail if node already is set. ** setnode(name@host, Creation) ** loads functions pointer to trap_functions from module erlang. - ** erlang:dsend/2 - ** erlang:dlink/1 - ** erlang:dunlink/1 ** erlang:dmonitor_node/3 ** erlang:dgroup_leader/2 ** erlang:dexit/2 @@ -3142,9 +3134,7 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) goto error; /* Check that all trap functions are defined !! */ - if (dlink_trap->addressv[0] == NULL || - dunlink_trap->addressv[0] == NULL || - dmonitor_node_trap->addressv[0] == NULL || + if (dmonitor_node_trap->addressv[0] == NULL || dgroup_leader_trap->addressv[0] == NULL || dmonitor_p_trap->addressv[0] == NULL || dexit_trap->addressv[0] == NULL) { @@ -3559,81 +3549,6 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) BIF_RET(am_false); } - -/**********************************************************************/ -/* dist_exit(Local, Term, Remote) -> Bool */ - -BIF_RETTYPE dist_exit_3(BIF_ALIST_3) -{ - Eterm local; - Eterm remote; - DistEntry *rdep; - - local = BIF_ARG_1; - remote = BIF_ARG_3; - - /* Check that remote is a remote process */ - if (is_not_external_pid(remote)) - goto error; - - rdep = external_dist_entry(remote); - - if(rdep == erts_this_dist_entry) - goto error; - - /* Check that local is local */ - if (is_internal_pid(local)) { - Process *lp; - ErtsProcLocks lp_locks; - if (BIF_P->common.id == local) { - lp_locks = ERTS_PROC_LOCKS_ALL; - lp = BIF_P; - erts_proc_lock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR); - } - else { - lp_locks = ERTS_PROC_LOCKS_XSIG_SEND; - lp = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN, - local, lp_locks); - if (!lp) { - BIF_RET(am_true); /* ignore */ - } - } - - (void) erts_send_exit_signal(BIF_P, - remote, - lp, - &lp_locks, - BIF_ARG_2, - NIL, - NULL, - 0); - if (lp == BIF_P) - lp_locks &= ~ERTS_PROC_LOCK_MAIN; - erts_proc_unlock(lp, lp_locks); - if (lp == BIF_P) { - erts_aint32_t state = erts_atomic32_read_acqb(&BIF_P->state); - /* - * We may have exited current process and may have to take action. - */ - if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { - if (state & ERTS_PSFLG_PENDING_EXIT) - erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN); - ERTS_BIF_EXITED(BIF_P); - } - } - } - else if (is_external_pid(local) - && external_dist_entry(local) == erts_this_dist_entry) { - BIF_RET(am_true); /* ignore */ - } - else - goto error; - BIF_RET(am_true); - - error: - BIF_ERROR(BIF_P, BADARG); -} - /**********************************************************************/ /* node(Object) -> Node */ -- cgit v1.2.3 From da2935ce340cef5db1b5f589778eb20044796610 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 15 Aug 2017 17:12:02 +0200 Subject: Remove obsolete erlang:dexit/2 --- erts/emulator/beam/dist.c | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 6e08c72289..e36ea9ba94 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -110,7 +110,6 @@ int erts_dist_buf_busy_limit; /* distribution trap functions */ Export* dmonitor_node_trap = NULL; Export* dgroup_leader_trap = NULL; -Export* dexit_trap = NULL; Export* dmonitor_p_trap = NULL; /* local variables */ @@ -633,7 +632,6 @@ void init_dist(void) /* Lookup/Install all references to trap functions */ dmonitor_node_trap = trap_function(am_dmonitor_node,3); dgroup_leader_trap = trap_function(am_dgroup_leader,2); - dexit_trap = trap_function(am_dexit, 2); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, am_dist_ctrl_put_data, @@ -3108,9 +3106,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ ** loads functions pointer to trap_functions from module erlang. ** erlang:dmonitor_node/3 ** erlang:dgroup_leader/2 - ** erlang:dexit/2 - ** -- are these needed ? - ** dexit/1 ***********************************************************************/ BIF_RETTYPE setnode_2(BIF_ALIST_2) @@ -3136,8 +3131,7 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) /* Check that all trap functions are defined !! */ if (dmonitor_node_trap->addressv[0] == NULL || dgroup_leader_trap->addressv[0] == NULL || - dmonitor_p_trap->addressv[0] == NULL || - dexit_trap->addressv[0] == NULL) { + dmonitor_p_trap->addressv[0] == NULL) { goto error; } -- cgit v1.2.3 From 389e11b8b8a476ca73ca03a39ad7ec298dc99e83 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 18 Aug 2017 17:08:27 +0200 Subject: Remove obsolete erlang:dgroup_leader --- erts/emulator/beam/dist.c | 6 ------ 1 file changed, 6 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index e36ea9ba94..cd4125e404 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -109,7 +109,6 @@ int erts_dist_buf_busy_limit; /* distribution trap functions */ Export* dmonitor_node_trap = NULL; -Export* dgroup_leader_trap = NULL; Export* dmonitor_p_trap = NULL; /* local variables */ @@ -631,7 +630,6 @@ void init_dist(void) /* Lookup/Install all references to trap functions */ dmonitor_node_trap = trap_function(am_dmonitor_node,3); - dgroup_leader_trap = trap_function(am_dgroup_leader,2); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, am_dist_ctrl_put_data, @@ -3103,9 +3101,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ /********************************************************************** ** Set the node name of current node fail if node already is set. ** setnode(name@host, Creation) - ** loads functions pointer to trap_functions from module erlang. - ** erlang:dmonitor_node/3 - ** erlang:dgroup_leader/2 ***********************************************************************/ BIF_RETTYPE setnode_2(BIF_ALIST_2) @@ -3130,7 +3125,6 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) /* Check that all trap functions are defined !! */ if (dmonitor_node_trap->addressv[0] == NULL || - dgroup_leader_trap->addressv[0] == NULL || dmonitor_p_trap->addressv[0] == NULL) { goto error; } -- cgit v1.2.3 From 5857245406584b8bfcbdf0be450ad494a992d5c8 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 8 Sep 2017 17:11:43 +0200 Subject: erts: Fix auto-connect toward erl_interface/jinterface --- erts/emulator/beam/dist.c | 63 ++++++++++++++++++++++------------------------- 1 file changed, 29 insertions(+), 34 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index cd4125e404..6d2e907f18 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -1879,33 +1879,32 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) { ctx->acmp = erts_get_atom_cache_map(ctx->c_p); - ctx->pass_through_size = 0; + ctx->max_finalize_prepend = 0; } else { ctx->acmp = NULL; - ctx->pass_through_size = 3; /* SVERK rename */ + ctx->max_finalize_prepend = 3; } #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, ">>%s CTL: %T\n", ctx->pass_through_size ? "P" : " ", ctx->ctl); - if (is_value(ctx->msg)) - erts_fprintf(stderr, " MSG: %T\n", ctx->msg); + erts_fprintf(stderr, ">> CTL: %T\n", ctx->ctl); + if (is_value(ctx->msg)) + erts_fprintf(stderr, " MSG: %T\n", ctx->msg); #endif - ctx->data_size = ctx->pass_through_size; + ctx->data_size = ctx->max_finalize_prepend; erts_reset_atom_cache_map(ctx->acmp); erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size); - if (is_value(ctx->msg)) { - ctx->u.sc.wstack.wstart = NULL; - ctx->u.sc.flags = ctx->flags; - ctx->u.sc.level = 0; - ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; - } else { - ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; - } - break; + if (is_non_value(ctx->msg)) { + ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; + break; + } + ctx->u.sc.wstack.wstart = NULL; + ctx->u.sc.flags = ctx->flags; + ctx->u.sc.level = 0; + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; case ERTS_DSIG_SEND_PHASE_MSG_SIZE: if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) { retval = ERTS_DSIG_SEND_CONTINUE; @@ -1920,23 +1919,24 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->data_size += ctx->dhdr_ext_size; ctx->obuf = alloc_dist_obuf(ctx->data_size); - ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->pass_through_size + ctx->dhdr_ext_size; + ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size; /* Encode internal version of dist header */ ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp); /* Encode control message */ erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL); - if (is_value(ctx->msg)) { - ctx->u.ec.flags = ctx->flags; - ctx->u.ec.level = 0; - ctx->u.ec.wstack.wstart = NULL; - ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; - } else { - ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; - } - break; + if (is_non_value(ctx->msg)) { + ctx->obuf->msg_start = NULL; + ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; + break; + } + ctx->u.ec.flags = ctx->flags; + ctx->u.ec.level = 0; + ctx->u.ec.wstack.wstart = NULL; + ctx->obuf->msg_start = ctx->obuf->ext_endp; - case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; + case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) { retval = ERTS_DSIG_SEND_CONTINUE; goto done; @@ -1949,7 +1949,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) int resume = 0; ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp); - ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->pass_through_size); + ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->max_finalize_prepend); ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size); ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp; @@ -2308,9 +2308,7 @@ erts_dist_command(Port *prt, int reds_limit) ob = oq.first; ASSERT(ob); do { - ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, - dep->cache, - flags); + erts_encode_ext_dist_header_finalize(ob, dep->cache, flags); ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; preempt = reds > reds_limit; @@ -2350,10 +2348,7 @@ erts_dist_command(Port *prt, int reds_limit) while (oq.first && !preempt) { ErtsDistOutputBuf *fob; Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); + erts_encode_ext_dist_header_finalize(oq.first, dep->cache, flags); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; ASSERT(&oq.first->data[0] <= oq.first->extp && oq.first->extp < oq.first->ext_endp); -- cgit v1.2.3 From d736e87ff94ab8191f33dca55516e6c1d440b915 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 24 Aug 2017 17:20:21 +0200 Subject: Add optimistic DFLAG_DIST_HOPEFULLY for pending connections to avoid tuple fallbacks for export funs and bitstrings. ToDo: Re-encode if receiver turn out to be erl_interface/jinterface. --- erts/emulator/beam/dist.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 6d2e907f18..7877865ad4 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3443,7 +3443,7 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) conn_id = dep->connection_id; else if (dep->status == 0) { dep->status = ERTS_DE_SFLG_PENDING; - dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION; + dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); dep->connection_id++; dep->connection_id &= ERTS_DIST_CON_ID_MASK; conn_id = dep->connection_id; -- cgit v1.2.3 From e4d28f70133af9b3b8c320fe5e70e2086830874d Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 25 Aug 2017 18:46:58 +0200 Subject: erts: Transcode tuple fallbacks When finalizing outgoing distribution messages we transcode them into using tuple fallbacks if the receiver does not support bitstrings and export-funs. This can only happen if the message was first encoded toward a pending connection when the receiver was unknown. It's an optimistic approach optmimized for modern beam nodes, that expect real bitstrings and funs (since orig_bytes[0]; #ifdef DEBUG obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN; + obuf->alloc_endp = obuf->data + size; ASSERT(bin == ErtsDistOutputBuf2Binary(obuf)); #endif return obuf; @@ -692,6 +693,7 @@ static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; + return obuf; } @@ -755,6 +757,8 @@ static void clear_dist_entry(DistEntry *dep) delete_cache(cache); free_de_out_queues(dep, obuf); + if (dep->transcode_ctx) + transcode_free_ctx(dep); } int erts_dsend_context_dtor(Binary* ctx_bin) @@ -2208,7 +2212,6 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) #endif #define ERTS_PORT_REDS_DIST_CMD_START 5 -#define ERTS_PORT_REDS_DIST_CMD_FINALIZE 3 #define ERTS_PORT_REDS_DIST_CMD_EXIT 200 #define ERTS_PORT_REDS_DIST_CMD_RESUMED 5 #define ERTS_PORT_REDS_DIST_CMD_DATA(SZ) \ @@ -2217,9 +2220,9 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) : ((((Sint) (SZ)) >> 10) & ((Sint) ERTS_PORT_REDS_MASK__))) int -erts_dist_command(Port *prt, int reds_limit) +erts_dist_command(Port *prt, int initial_reds) { - Sint reds = ERTS_PORT_REDS_DIST_CMD_START; + Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START; Uint32 status; Uint32 flags; Sint qsize, obufsize = 0; @@ -2241,9 +2244,12 @@ erts_dist_command(Port *prt, int reds_limit) if (status & ERTS_DE_SFLG_EXITING) { erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1); - return reds + ERTS_PORT_REDS_DIST_CMD_EXIT; + reds -= ERTS_PORT_REDS_DIST_CMD_EXIT; + return initial_reds - reds; } + ASSERT(!(status & ERTS_DE_SFLG_PENDING)); + ASSERT(send); /* @@ -2268,7 +2274,7 @@ erts_dist_command(Port *prt, int reds_limit) sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - if (reds > reds_limit) + if (reds < 0) goto preempted; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) { @@ -2283,13 +2289,13 @@ erts_dist_command(Port *prt, int reds_limit) erts_fprintf(stderr, ">> "); bw(foq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = foq.first; - obufsize += size_obuf(fob); - foq.first = foq.first->next; - free_dist_obuf(fob); + reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = foq.first; + obufsize += size_obuf(fob); + foq.first = foq.first->next; + free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + preempt = reds < 0 || (sched_flags & ERTS_PTS_FLG_EXIT); if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) break; } while (foq.first && !preempt); @@ -2302,44 +2308,42 @@ erts_dist_command(Port *prt, int reds_limit) if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) { if (oq.first) { ErtsDistOutputBuf *ob; - int preempt; + ErtsDistOutputBuf *last_finalized = NULL; finalize_only: - preempt = 0; ob = oq.first; ASSERT(ob); do { - erts_encode_ext_dist_header_finalize(ob, dep->cache, flags); - ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - preempt = reds > reds_limit; - if (preempt) - break; - ob = ob->next; + reds = erts_encode_ext_dist_header_finalize(ob, dep, flags, reds); + if (reds >= 0) { + last_finalized = ob; + ob = ob->next; + } } while (ob); - /* - * At least one buffer was finalized; if we got preempted, - * ob points to the last buffer that we finalized. - */ - if (foq.last) - foq.last->next = oq.first; - else - foq.first = oq.first; - if (!preempt) { - /* All buffers finalized */ - foq.last = oq.last; - oq.first = oq.last = NULL; - } - else { - /* Not all buffers finalized; split oq. */ - foq.last = ob; - oq.first = ob->next; - if (oq.first) - ob->next = NULL; - else - oq.last = NULL; - } - if (preempt) - goto preempted; + if (last_finalized) { + /* + * At least one buffer was finalized; if we got preempted, + * ob points to the next buffer to continue finalize. + */ + if (foq.last) + foq.last->next = oq.first; + else + foq.first = oq.first; + foq.last = last_finalized; + if (!ob) { + /* All buffers finalized */ + ASSERT(foq.last == oq.last); + ASSERT(foq.last->next == NULL); + oq.first = oq.last = NULL; + } + else { + /* Not all buffers finalized; split oq. */ + ASSERT(foq.last->next == ob); + foq.last->next = NULL; + oq.first = ob; + } + } + if (reds <= 0) + goto preempted; } } else { @@ -2348,8 +2352,11 @@ erts_dist_command(Port *prt, int reds_limit) while (oq.first && !preempt) { ErtsDistOutputBuf *fob; Uint size; - erts_encode_ext_dist_header_finalize(oq.first, dep->cache, flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + reds = erts_encode_ext_dist_header_finalize(oq.first, dep, flags, reds); + if (reds < 0) { + preempt = 1; + break; + } ASSERT(&oq.first->data[0] <= oq.first->extp && oq.first->extp < oq.first->ext_endp); size = (*send)(prt, oq.first); @@ -2359,13 +2366,13 @@ erts_dist_command(Port *prt, int reds_limit) erts_fprintf(stderr, ">> "); bw(oq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = oq.first; - obufsize += size_obuf(fob); - oq.first = oq.first->next; - free_dist_obuf(fob); + reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = oq.first; + obufsize += size_obuf(fob); + oq.first = oq.first->next; + free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + preempt = reds <= 0 || (sched_flags & ERTS_PTS_FLG_EXIT); if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt) goto finalize_only; } @@ -2405,7 +2412,7 @@ erts_dist_command(Port *prt, int reds_limit) erts_mtx_unlock(&dep->qlock); resumed = erts_resume_processes(suspendees); - reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } else erts_mtx_unlock(&dep->qlock); @@ -2428,8 +2435,7 @@ erts_dist_command(Port *prt, int reds_limit) erts_mtx_unlock(&dep->qlock); } - ASSERT(foq.first || !foq.last); - ASSERT(!foq.first || foq.last); + ASSERT(!!foq.first == !!foq.last); ASSERT(!dep->finalized_out_queue.first); ASSERT(!dep->finalized_out_queue.last); @@ -2439,10 +2445,10 @@ erts_dist_command(Port *prt, int reds_limit) } /* Avoid wrapping reduction counter... */ - if (reds > INT_MAX/2) - reds = INT_MAX/2; + if (reds < INT_MIN/2) + reds = INT_MIN/2; - return reds; + return initial_reds - reds; preempted: /* @@ -2450,8 +2456,7 @@ erts_dist_command(Port *prt, int reds_limit) * since last call to driver. */ - ASSERT(oq.first || !oq.last); - ASSERT(!oq.first || oq.last); + ASSERT(!!oq.first == !!oq.last); if (sched_flags & ERTS_PTS_FLG_EXIT) { /* @@ -2475,14 +2480,11 @@ erts_dist_command(Port *prt, int reds_limit) foq.first = NULL; foq.last = NULL; - -/* SVERK Hmmm.... #ifdef DEBUG erts_mtx_lock(&dep->qlock); ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); erts_mtx_unlock(&dep->qlock); #endif -*/ } else { if (oq.first) { @@ -2772,7 +2774,8 @@ BIF_RETTYPE dist_ctrl_get_data_1(BIF_ALIST_1) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); - int reds = 1; + const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P); + Sint reds = initial_reds; ErtsDistOutputBuf *obuf; Eterm *hp; ProcBin *pb; @@ -2803,6 +2806,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1) { if (!dep->tmp_out_queue.first) { ASSERT(!dep->tmp_out_queue.last); + ASSERT(!dep->transcode_ctx); qsize = erts_atomic_read_acqb(&dep->qsize); if (qsize > 0) { erts_mtx_lock(&dep->qlock); @@ -2820,21 +2824,18 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_de_runlock(dep); BIF_RET(am_none); } - else { - obuf = dep->tmp_out_queue.first; - dep->tmp_out_queue.first = obuf->next; - if (!obuf->next) - dep->tmp_out_queue.last = NULL; + + obuf = dep->tmp_out_queue.first; + reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->flags, reds); + if (reds < 0) { + erts_de_runlock(dep); + ERTS_BIF_YIELD1(bif_export[BIF_dist_ctrl_get_data_1], + BIF_P, BIF_ARG_1); } - obuf->extp = erts_encode_ext_dist_header_finalize(obuf->extp, - dep->cache, - dep->flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--obuf->extp = PASS_THROUGH; /* 'pass through' needed */ - ASSERT(&obuf->data[0] <= obuf->extp - && obuf->extp < obuf->ext_endp); + dep->tmp_out_queue.first = obuf->next; + if (!obuf->next) + dep->tmp_out_queue.last = NULL; } erts_atomic64_inc_nob(&dep->out); @@ -2862,11 +2863,11 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_mtx_unlock(&dep->qlock); if (resume_procs) { int resumed = erts_resume_processes(resume_procs); - reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } } - BIF_RET2(make_binary(pb), reds); + BIF_RET2(make_binary(pb), (initial_reds - reds)); } void -- cgit v1.2.3 From 56ab31904f7157d7b4282c2afd474c7bed7b9c75 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 19 Sep 2017 17:44:47 +0200 Subject: Remove faulty assert Send may have failed, port exit with dist_entry cleaned up and then new pending connection with queued messages. --- erts/emulator/beam/dist.c | 5 ----- 1 file changed, 5 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index a5df92bf9a..474b0e8364 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -2480,11 +2480,6 @@ erts_dist_command(Port *prt, int initial_reds) foq.first = NULL; foq.last = NULL; -#ifdef DEBUG - erts_mtx_lock(&dep->qlock); - ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); - erts_mtx_unlock(&dep->qlock); -#endif } else { if (oq.first) { -- cgit v1.2.3 From 771abbd23709b5a03416278595588931889ab7c5 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 20 Sep 2017 15:06:07 +0200 Subject: erts: Keep magic ref to DistEntry in net_kernel to make sure it's kept alive. --- erts/emulator/beam/dist.c | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 474b0e8364..c16579037c 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3426,6 +3426,8 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) { DistEntry* dep; Uint32 conn_id; + Eterm* hp; + Eterm dhandle; if (is_not_atom(BIF_ARG_1)) { BIF_ERROR(BIF_P, BADARG); @@ -3449,17 +3451,25 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) conn_id = dep->connection_id; } erts_de_rwunlock(dep); - BIF_RET(make_small(conn_id)); + hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); + dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep); + erts_deref_dist_entry(dep); + BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle)); } BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) { DistEntry* dep; + Eterm* tp; - if (is_not_atom(BIF_ARG_1) || is_not_small(BIF_ARG_2)) { + if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) { BIF_ERROR(BIF_P, BADARG); } - dep = erts_find_dist_entry(BIF_ARG_1); + tp = tuple_val(BIF_ARG_2); + dep = erts_dhandle_to_dist_entry(tp[2]); + if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)) { + BIF_ERROR(BIF_P, BADARG); + } ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ if (!dep) { @@ -3469,7 +3479,7 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) erts_de_rwlock(dep); if (dep->status == ERTS_DE_SFLG_PENDING - && dep->connection_id == unsigned_val(BIF_ARG_2)) { + && dep->connection_id == unsigned_val(tp[1])) { NetExitsContext nec = {dep}; ErtsLink *nlinks; @@ -3736,10 +3746,12 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) ++ERTS_LINK_REFC(lnk); lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); ++ERTS_LINK_REFC(lnk); + erts_de_links_unlock(dep); break; default: ERTS_ASSERT(! "Invalid dsig prepare result"); } + erts_deref_dist_entry(dep); } else { /* Bool == false */ dep = erts_sysname_to_connected_dist_entry(Node); @@ -3777,9 +3789,9 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) Node)); } } + erts_de_links_unlock(dep); } - erts_de_links_unlock(dep); erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); done: -- cgit v1.2.3 From 25f9d0469e2b1da0aec73b55e54561be8b573634 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 26 Sep 2017 16:01:29 +0200 Subject: Refactor auto_connect into an outline function and abort_pending_connection into own utility function. --- erts/emulator/beam/dist.c | 92 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 68 insertions(+), 24 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index c16579037c..3e8c4c65b5 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -120,6 +120,7 @@ static void clear_dist_entry(DistEntry*); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); +static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id); static erts_atomic_t no_caches; static erts_atomic_t no_nodes; @@ -3457,30 +3458,16 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle)); } -BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) +static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) { - DistEntry* dep; - Eterm* tp; - - if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) { - BIF_ERROR(BIF_P, BADARG); - } - tp = tuple_val(BIF_ARG_2); - dep = erts_dhandle_to_dist_entry(tp[2]); - if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)) { - BIF_ERROR(BIF_P, BADARG); - } - ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ - - if (!dep) { - BIF_RET(am_false); - } + Sint reds = 0; erts_de_rwlock(dep); - if (dep->status == ERTS_DE_SFLG_PENDING - && dep->connection_id == unsigned_val(tp[1])) { - + if (dep->status != ERTS_DE_SFLG_PENDING || dep->connection_id != conn_id) { + erts_de_rwunlock(dep); + } + else { NetExitsContext nec = {dep}; ErtsLink *nlinks; ErtsLink *node_links; @@ -3488,7 +3475,6 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) ErtsAtomCache *cache; ErtsDistOutputBuf *obuf; ErtsProcList *resume_procs; - Sint reds = 0; ASSERT(is_nil(dep->cid)); @@ -3529,13 +3515,71 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) delete_cache(cache); free_de_out_queues(dep, obuf); + } + return reds; +} - BIF_RET2(am_true, reds); +BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) +{ + DistEntry* dep; + Eterm* tp; + + if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) { + BIF_ERROR(BIF_P, BADARG); + } + tp = tuple_val(BIF_ARG_2); + dep = erts_dhandle_to_dist_entry(tp[2]); + if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)) { + BIF_ERROR(BIF_P, BADARG); } + ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ - erts_de_rwunlock(dep); + if (dep) { + Sint reds = abort_pending_connection(dep, unsigned_val(tp[1])); + BUMP_REDS(BIF_P, reds); + } + BIF_RET(am_true); +} - BIF_RET(am_false); +int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) +{ + erts_de_rwlock(dep); + if (dep->status != 0) { + erts_de_rwunlock(dep); + } + else { + Process* net_kernel; + ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ; + Eterm *hp; + ErlOffHeap *ohp; + ErtsMessage *mp; + Eterm msg, dhandle; + Uint32 conn_id; + + dep->status = ERTS_DE_SFLG_PENDING; + dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); + dep->connection_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; + conn_id = dep->connection_id; + erts_de_rwunlock(dep); + + net_kernel = erts_whereis_process(proc, proc_locks, + am_net_kernel, nk_locks, 0); + if (!net_kernel) { + return 0; + } + + /* Send {auto_connect, Node, ConnId, DHandle} to net_kernel */ + mp = erts_alloc_message_heap(net_kernel, &nk_locks, + 5 + ERTS_MAGIC_REF_THING_SIZE, + &hp, &ohp); + dhandle = erts_build_dhandle(&hp, ohp, dep); + msg = TUPLE4(hp, am_auto_connect, dep->sysname, make_small(conn_id), + dhandle); + erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); + erts_proc_unlock(net_kernel, nk_locks); + } + + return 1; } /**********************************************************************/ -- cgit v1.2.3 From 6b48ef0226084a7f471d391abd4c1c399068840a Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 27 Sep 2017 19:02:10 +0200 Subject: erts: Put pending DistrEntry in separate list --- erts/emulator/beam/dist.c | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 3e8c4c65b5..44f56ebe28 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3062,6 +3062,10 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ info_dist_entry(to, arg, dep, 0, 1); } + for (dep = erts_pending_dist_entries; dep; dep = dep->next) { + info_dist_entry(to, arg, dep, 0, 0); + } + for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) { if (dep != erts_this_dist_entry) { info_dist_entry(to, arg, dep, 0, 0); @@ -3441,10 +3445,7 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) if (ERTS_DE_IS_CONNECTED(dep) || dep->status & ERTS_DE_SFLG_PENDING) conn_id = dep->connection_id; else if (dep->status == 0) { - dep->status = ERTS_DE_SFLG_PENDING; - dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); - dep->connection_id++; - dep->connection_id &= ERTS_DIST_CON_ID_MASK; + erts_set_dist_entry_pending(dep); conn_id = dep->connection_id; } else { @@ -3489,8 +3490,6 @@ static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) cache = dep->cache; dep->cache = NULL; - dep->status = 0; - dep->flags = 0; erts_mtx_lock(&dep->qlock); obuf = dep->out_queue.first; dep->out_queue.first = NULL; @@ -3501,6 +3500,9 @@ static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) erts_mtx_unlock(&dep->qlock); erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); dep->send = NULL; + + erts_set_dist_entry_not_connected(dep); + erts_de_rwunlock(dep); erts_sweep_monitors(monitors, &doit_monitor_net_exits, &nec); @@ -3556,9 +3558,7 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) Eterm msg, dhandle; Uint32 conn_id; - dep->status = ERTS_DE_SFLG_PENDING; - dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); - dep->connection_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; + erts_set_dist_entry_pending(dep); conn_id = dep->connection_id; erts_de_rwunlock(dep); @@ -3655,9 +3655,11 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) ASSERT(erts_no_of_not_connected_dist_entries > 0); ASSERT(erts_no_of_hidden_dist_entries >= 0); + ASSERT(erts_no_of_pending_dist_entries >= 0); ASSERT(erts_no_of_visible_dist_entries >= 0); if(not_connected) - length += (erts_no_of_not_connected_dist_entries - 1); + length += ((erts_no_of_not_connected_dist_entries - 1) + + erts_no_of_pending_dist_entries); if(hidden) length += erts_no_of_hidden_dist_entries; if(visible) @@ -3677,13 +3679,18 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) #ifdef DEBUG endp = hp + length*2; #endif - if(not_connected) + if(not_connected) { for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { if (dep != erts_this_dist_entry) { result = CONS(hp, dep->sysname, result); hp += 2; } + } + for(dep = erts_pending_dist_entries; dep; dep = dep->next) { + result = CONS(hp, dep->sysname, result); + hp += 2; } + } if(hidden) for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { result = CONS(hp, dep->sysname, result); -- cgit v1.2.3 From 0a6ccd3d1fb1d9cacd7369533d6e5aa1fc2ded77 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 27 Sep 2017 19:06:23 +0200 Subject: Abort all pending connections if net_kernel terminates --- erts/emulator/beam/dist.c | 66 ++++++++++++++++++++++++++++++----------------- 1 file changed, 43 insertions(+), 23 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 44f56ebe28..1b5b730764 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -476,41 +476,54 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */ DistEntry *tdep; - int no_dist_ctrl = 0; + int no_dist_ctrl; + int no_pending; Eterm nd_reason = (reason == am_no_network ? am_no_network : am_net_kernel_terminated); + int i = 0; + Eterm *dist_ctrl; + DistEntry** pending; + + ERTS_UNDEF(dist_ctrl, NULL); + ERTS_UNDEF(pending, NULL); + erts_rwmtx_rlock(&erts_dist_table_rwmtx); - for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) - no_dist_ctrl++; - for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) - no_dist_ctrl++; + no_dist_ctrl = (erts_no_of_hidden_dist_entries + + erts_no_of_visible_dist_entries); + no_pending = erts_no_of_pending_dist_entries; /* KILL all port controllers */ - if (no_dist_ctrl == 0) - erts_rwmtx_runlock(&erts_dist_table_rwmtx); - else { - Eterm def_buf[128]; - int i = 0; - Eterm *dist_ctrl; - - if (no_dist_ctrl <= sizeof(def_buf)/sizeof(def_buf[0])) - dist_ctrl = &def_buf[0]; - else - dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, - sizeof(Eterm)*no_dist_ctrl); + if (no_dist_ctrl) { + dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, + sizeof(Eterm)*no_dist_ctrl); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) { ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + ASSERT(i < no_dist_ctrl); dist_ctrl[i++] = tdep->cid; } for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) { ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + ASSERT(i < no_dist_ctrl); dist_ctrl[i++] = tdep->cid; } - erts_rwmtx_runlock(&erts_dist_table_rwmtx); + ASSERT(i == no_dist_ctrl); + } + if (no_pending) { + pending = erts_alloc(ERTS_ALC_T_TMP, sizeof(DistEntry*)*no_pending); + i = 0; + for (tdep = erts_pending_dist_entries; tdep; tdep = tdep->next) { + ASSERT(is_nil(tdep->cid)); + ASSERT(i < no_pending); + pending[i++] = tdep; + } + ASSERT(i == no_pending); + } + erts_rwmtx_runlock(&erts_dist_table_rwmtx); - for (i = 0; i < no_dist_ctrl; i++) { + if (no_dist_ctrl) { + for (i = 0; i < no_dist_ctrl; i++) { if (is_internal_pid(dist_ctrl[i])) schedule_kill_dist_ctrl_proc(dist_ctrl[i]); else { @@ -524,11 +537,17 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) prt, dist_ctrl[i], nd_reason, NULL); } } - } + } + erts_free(ERTS_ALC_T_TMP, dist_ctrl); + } + + if (no_pending) { + for (i = 0; i < no_pending; i++) { + abort_pending_connection(pending[i], pending[i]->connection_id); + } + erts_free(ERTS_ALC_T_TMP, pending); + } - if (dist_ctrl != &def_buf[0]) - erts_free(ERTS_ALC_T_TMP, dist_ctrl); - } /* * When last dist ctrl exits, node will be taken @@ -3565,6 +3584,7 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) net_kernel = erts_whereis_process(proc, proc_locks, am_net_kernel, nk_locks, 0); if (!net_kernel) { + abort_pending_connection(dep, conn_id); return 0; } -- cgit v1.2.3 From 6c4a3094d0263e46827c6b1869a6de0c033b6b64 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 27 Sep 2017 19:45:42 +0200 Subject: Improve connection aborting --- erts/emulator/beam/dist.c | 120 +++++++++++++++++++++++++++++++--------------- 1 file changed, 81 insertions(+), 39 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 1b5b730764..4a25fbdd4c 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -120,7 +120,7 @@ static void clear_dist_entry(DistEntry*); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); -static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id); +static Sint abort_connection(DistEntry* dep, Uint32 conn_id); static erts_atomic_t no_caches; static erts_atomic_t no_nodes; @@ -517,6 +517,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) ASSERT(is_nil(tdep->cid)); ASSERT(i < no_pending); pending[i++] = tdep; + erts_ref_dist_entry(tdep); } ASSERT(i == no_pending); } @@ -543,7 +544,8 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (no_pending) { for (i = 0; i < no_pending; i++) { - abort_pending_connection(pending[i], pending[i]->connection_id); + abort_connection(pending[i], pending[i]->connection_id); + erts_deref_dist_entry(pending[i]); } erts_free(ERTS_ALC_T_TMP, pending); } @@ -624,7 +626,6 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) reason == am_normal ? am_connection_closed : reason); clear_dist_entry(dep); - } dec_no_nodes(); @@ -2904,24 +2905,31 @@ erts_dist_port_not_busy(Port *prt) erts_schedule_dist_command(prt, NULL); } +static void kill_connection(DistEntry *dep) +{ + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); + ASSERT(dep->status == ERTS_DE_SFLG_CONNECTED); + + dep->status |= ERTS_DE_SFLG_EXITING; + erts_mtx_lock(&dep->qlock); + ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); + erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); + erts_mtx_unlock(&dep->qlock); + + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + else if (is_internal_pid(dep->cid)) + schedule_kill_dist_ctrl_proc(dep->cid); +} + void erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) { erts_de_rwlock(dep); if (connection_id == dep->connection_id - && !(dep->status & ERTS_DE_SFLG_EXITING)) { - - dep->status |= ERTS_DE_SFLG_EXITING; - - erts_mtx_lock(&dep->qlock); - ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); - erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); - erts_mtx_unlock(&dep->qlock); + && dep->status == ERTS_DE_SFLG_CONNECTED) { - if (is_internal_port(dep->cid)) - erts_schedule_dist_command(NULL, dep); - else if (is_internal_pid(dep->cid)) - schedule_kill_dist_ctrl_proc(dep->cid); + kill_connection(dep); } erts_de_rwunlock(dep); } @@ -3259,6 +3267,11 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto badarg; } + /* + * ToDo: Should we not pass connection_id as well + * to make sure it's the right connection we commit. + */ + /* * Arguments seem to be in order. */ @@ -3302,6 +3315,23 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto badarg; } + if (dep->status & ERTS_DE_SFLG_EXITING) { + /* Suspend on dist entry waiting for the exit to finish */ + ErtsProcList *plp = erts_proclist_create(BIF_P); + plp->next = NULL; + erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); + erts_mtx_lock(&dep->qlock); + erts_proclist_store_last(&dep->suspended, plp); + erts_mtx_unlock(&dep->qlock); + goto yield; + } + if (dep->status != ERTS_DE_SFLG_PENDING) { + if (dep->status == 0) + erts_set_dist_entry_pending(dep); + else + goto badarg; + } + if (is_not_nil(dep->cid)) goto badarg; @@ -3344,8 +3374,12 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_mtx_unlock(&dep->qlock); goto yield; } - - ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); + if (dep->status != ERTS_DE_SFLG_PENDING) { + if (dep->status == 0) + erts_set_dist_entry_pending(dep); + else + goto badarg; + } if (pp->dist_entry || is_not_nil(dep->cid)) goto badarg; @@ -3457,7 +3491,11 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) BIF_ERROR(BIF_P, BADARG); } dep = erts_find_or_insert_dist_entry(BIF_ARG_1); - ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + + if (dep == erts_this_dist_entry) { + erts_deref_dist_entry(dep); + BIF_ERROR(BIF_P, BADARG); + } erts_de_rwlock(dep); @@ -3468,8 +3506,8 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) conn_id = dep->connection_id; } else { - ASSERT(!"SVERK: What to do?"); - conn_id = dep->connection_id; + ASSERT(dep->status & ERTS_DE_SFLG_EXITING); + conn_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; } erts_de_rwunlock(dep); hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); @@ -3478,23 +3516,24 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle)); } -static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) +static Sint abort_connection(DistEntry* dep, Uint32 conn_id) { - Sint reds = 0; - erts_de_rwlock(dep); - if (dep->status != ERTS_DE_SFLG_PENDING || dep->connection_id != conn_id) { - erts_de_rwunlock(dep); + if (dep->connection_id != conn_id) + ; + else if (dep->status == ERTS_DE_SFLG_CONNECTED) { + kill_connection(dep); } - else { - NetExitsContext nec = {dep}; - ErtsLink *nlinks; - ErtsLink *node_links; - ErtsMonitor *monitors; - ErtsAtomCache *cache; - ErtsDistOutputBuf *obuf; + else if (dep->status == ERTS_DE_SFLG_PENDING) { + NetExitsContext nec = {dep}; + ErtsLink *nlinks; + ErtsLink *node_links; + ErtsMonitor *monitors; + ErtsAtomCache *cache; + ErtsDistOutputBuf *obuf; ErtsProcList *resume_procs; + Sint reds = 0; ASSERT(is_nil(dep->cid)); @@ -3534,10 +3573,11 @@ static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) } delete_cache(cache); - free_de_out_queues(dep, obuf); + return reds; } - return reds; + erts_de_rwunlock(dep); + return 0; } BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) @@ -3550,13 +3590,13 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) } tp = tuple_val(BIF_ARG_2); dep = erts_dhandle_to_dist_entry(tp[2]); - if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)) { + if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1) + || dep == erts_this_dist_entry) { BIF_ERROR(BIF_P, BADARG); } - ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ if (dep) { - Sint reds = abort_pending_connection(dep, unsigned_val(tp[1])); + Sint reds = abort_connection(dep, unsigned_val(tp[1])); BUMP_REDS(BIF_P, reds); } BIF_RET(am_true); @@ -3584,11 +3624,13 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) net_kernel = erts_whereis_process(proc, proc_locks, am_net_kernel, nk_locks, 0); if (!net_kernel) { - abort_pending_connection(dep, conn_id); + abort_connection(dep, conn_id); return 0; } - /* Send {auto_connect, Node, ConnId, DHandle} to net_kernel */ + /* + * Send {auto_connect, Node, ConnId, DHandle} to net_kernel + */ mp = erts_alloc_message_heap(net_kernel, &nk_locks, 5 + ERTS_MAGIC_REF_THING_SIZE, &hp, &ohp); -- cgit v1.2.3 From 2f7a0afbbf34699ac1cc1733e3634c1cbeb94c5d Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 11 Oct 2017 20:38:33 +0200 Subject: Refactor erts_dsig_prepare argument dep(p) Don't need to be pointer-pointer --- erts/emulator/beam/dist.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 4a25fbdd4c..ae64f394c8 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -1349,7 +1349,7 @@ int erts_net_message(Port *prt, /* This is tricky (we MUST force a distributed send) */ ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_exit(&dsd, to, from, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); @@ -1441,7 +1441,7 @@ int erts_net_message(Port *prt, if (!rp) { ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, am_noproc); @@ -3831,7 +3831,7 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - switch (erts_dsig_prepare(&dsd, &dep, p, + switch (erts_dsig_prepare(&dsd, dep, p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), ERTS_DSP_RLOCK, 0, async_connect)) { case ERTS_DSIG_PREP_NOT_ALIVE: -- cgit v1.2.3 From 19c9f3a61fbef3682056bab3db4787cd763bd06e Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 15 Nov 2017 19:44:52 +0100 Subject: Move new|abort_connection_id to erts_internal and drop _id suffix. --- erts/emulator/beam/dist.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'erts/emulator/beam/dist.c') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index ae64f394c8..16a7bd9eb3 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3480,7 +3480,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto done; } -BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) +BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1) { DistEntry* dep; Uint32 conn_id; @@ -3580,7 +3580,7 @@ static Sint abort_connection(DistEntry* dep, Uint32 conn_id) return 0; } -BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) +BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2) { DistEntry* dep; Eterm* tp; -- cgit v1.2.3