diff options
author | Sverker Eriksson <[email protected]> | 2017-07-14 19:34:54 +0200 |
---|---|---|
committer | Sverker Eriksson <[email protected]> | 2017-11-15 20:10:33 +0100 |
commit | f89fb92384280e2939414287a2ecb8f86a199318 (patch) | |
tree | facf1d7b31b56cc25575d6cb9a01a69e2e4317d0 | |
parent | fe720f6b2051c9bf8ff303f857c3db0a84b1c050 (diff) | |
download | otp-f89fb92384280e2939414287a2ecb8f86a199318.tar.gz otp-f89fb92384280e2939414287a2ecb8f86a199318.tar.bz2 otp-f89fb92384280e2939414287a2ecb8f86a199318.zip |
erts: Introduce asynchronous auto-connect
-rw-r--r-- | erts/emulator/beam/atom.names | 1 | ||||
-rw-r--r-- | erts/emulator/beam/bif.c | 121 | ||||
-rw-r--r-- | erts/emulator/beam/bif.tab | 8 | ||||
-rw-r--r-- | erts/emulator/beam/dist.c | 274 | ||||
-rw-r--r-- | erts/emulator/beam/dist.h | 122 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.c | 20 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.h | 7 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 40 | ||||
-rw-r--r-- | erts/emulator/beam/external.c | 41 | ||||
-rw-r--r-- | erts/emulator/beam/external.h | 2 | ||||
-rw-r--r-- | erts/emulator/beam/io.c | 3 | ||||
-rw-r--r-- | erts/preloaded/src/erlang.erl | 15 | ||||
-rw-r--r-- | lib/kernel/src/net_kernel.erl | 210 |
13 files changed, 648 insertions, 216 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index fc55b687d4..a534dd44fb 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -108,6 +108,7 @@ atom asynchronous atom atom atom atom_used atom attributes +atom auto_connect atom await_microstate_accounting_modifications atom await_port_send_result atom await_proc_exit diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 4b11884f38..34f5afae78 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -227,17 +227,40 @@ BIF_RETTYPE link_1(BIF_ALIST_1) goto res_no_proc; } - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_RLOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, BIF_P, + (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), + ERTS_DSP_RLOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: - /* Let the dlink trap handle it */ - case ERTS_DSIG_PREP_NOT_CONNECTED: - erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK); - BIF_TRAP1(dlink_trap, BIF_P, BIF_ARG_1); - + case ERTS_DSIG_PREP_NOT_CONNECTED: { + ErtsProcLocks locks = ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK; + erts_aint32_t state; + erts_proc_lock(BIF_P, (ERTS_PROC_LOCKS_ALL & ~locks)); + locks = ERTS_PROC_LOCKS_ALL; + erts_send_exit_signal(BIF_P, BIF_ARG_1, BIF_P, &locks, + am_noconnection, NIL, NULL, 0); + erts_proc_unlock(BIF_P, locks & ERTS_PROC_LOCKS_ALL_MINOR); + + /* + * Copy-paste from old dist_exit_3, not sure if we really + * need erts_handle_pending_exit when exit_2 does not. + */ + state = erts_atomic32_read_acqb(&BIF_P->state); + if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { +#ifdef ERTS_SMP + if (state & ERTS_PSFLG_PENDING_EXIT) + erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN); +#endif + ERTS_BIF_EXITED(BIF_P); + } + BIF_RET(am_true); + } + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: - /* We are connected. Setup link and send link signal */ - + /* + * We have (pending) connection. + * Setup link and enqueue link signal. + */ erts_de_links_lock(dep); erts_add_link(&ERTS_P_LINKS(BIF_P), LINK_PID, BIF_ARG_1); @@ -256,8 +279,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); default: - ASSERT(! "Invalid dsig prepare result"); - BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); + ERTS_ASSERT(! "Invalid dsig prepare result"); } } } @@ -292,7 +314,8 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) ERTS_LC_ASSERT((ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK) == erts_proc_lc_my_proc_locks(c_p)); - code = erts_dsig_prepare(&dsd, dep, c_p, ERTS_DSP_RLOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, c_p, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_RLOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: @@ -313,6 +336,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) res = am_true; break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: erts_de_links_lock(dep); @@ -347,8 +371,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) } break; default: - ASSERT(! "Invalid dsig prepare result"); - return am_internal_error; + ERTS_ASSERT(! "Invalid dsig prepare result"); } @@ -768,10 +791,18 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, int code; erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - code = erts_dsig_prepare(&dsd, dep, p, ERTS_DSP_RLOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, + p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), + ERTS_DSP_RLOCK, 0, 0); switch (code) { + case ERTS_DSIG_PREP_PENDING: + /* + * Must wait for connection to know if node supports monitor. + * Damn these synchronous errors. + */ + erts_smp_de_runlock(dep); + /* fall through */ case ERTS_DSIG_PREP_NOT_ALIVE: - /* Let the dmonitor_p trap handle it */ case ERTS_DSIG_PREP_NOT_CONNECTED: erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); ERTS_BIF_PREP_TRAP2(ret, dmonitor_p_trap, p, bifarg1, bifarg2); @@ -818,9 +849,7 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, } break; default: - ASSERT(! "Invalid dsig prepare result"); - ERTS_BIF_PREP_ERROR(ret, p, EXC_INTERNAL_ERROR); - break; + ERTS_ASSERT(! "Invalid dsig prepare result"); } BIF_RET(ret); @@ -1158,7 +1187,8 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) BIF_RET(am_true); } - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: @@ -1173,6 +1203,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) BIF_TRAP1(dunlink_trap, BIF_P, BIF_ARG_1); #endif + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: erts_remove_dist_link(&dld, BIF_P->common.id, BIF_ARG_1, dep); code = erts_dsig_send_unlink(&dsd, BIF_P->common.id, BIF_ARG_1); @@ -1545,22 +1576,24 @@ BIF_RETTYPE exit_2(BIF_ALIST_2) DistEntry *dep; dep = external_pid_dist_entry(BIF_ARG_1); + ERTS_ASSERT(dep); if(dep == erts_this_dist_entry) BIF_RET(am_true); - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: - BIF_TRAP2(dexit_trap, BIF_P, BIF_ARG_1, BIF_ARG_2); + BIF_RET(am_true); + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: code = erts_dsig_send_exit2(&dsd, BIF_P->common.id, BIF_ARG_1, BIF_ARG_2); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); default: - ASSERT(! "Invalid dsig prepare result"); - BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); + ERTS_ASSERT(! "Invalid dsig prepare result"); } } else if (is_not_internal_pid(BIF_ARG_1)) { @@ -1964,7 +1997,7 @@ ebif_bang_2(BIF_ALIST_2) * Send a message to Process, Port or Registered Process. * Returns non-negative reduction bump or negative result code. */ -#define SEND_TRAP (-1) +#define SEND_NOCONNECT (-1) #define SEND_YIELD (-2) #define SEND_YIELD_RETURN (-3) #define SEND_BADARG (-4) @@ -1980,20 +2013,22 @@ static Sint remote_send(Process *p, DistEntry *dep, { Sint res; int code; - ASSERT(is_atom(to) || is_external_pid(to)); ctx->dep = dep; - code = erts_dsig_prepare(&ctx->dsd, dep, p, ERTS_DSP_NO_LOCK, !ctx->suspend); + code = erts_dsig_prepare(&ctx->dsd, &dep, p, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, + !ctx->suspend, ctx->connect); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: - res = SEND_TRAP; + res = SEND_NOCONNECT; break; case ERTS_DSIG_PREP_WOULD_SUSPEND: ASSERT(!ctx->suspend); res = SEND_YIELD; break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: { if (is_atom(to)) @@ -2205,12 +2240,14 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) } return 0; } + ctx->dsd.node = tp[2]; ret = remote_send(p, dep, tp[1], to, msg, ctx); if (ret == SEND_YIELD_CONTINUE) { - if (dep) + if (dep) { erts_ref_dist_entry(dep); - ctx->dep_to_deref = dep; + ctx->deref_dep = 1; + } } return ret; } else { @@ -2251,7 +2288,6 @@ BIF_RETTYPE send_3(BIF_ALIST_3) Eterm msg = BIF_ARG_2; Eterm opts = BIF_ARG_3; - int connect = !0; Eterm l = opts; Sint result; @@ -2262,14 +2298,15 @@ BIF_RETTYPE send_3(BIF_ALIST_3) UseTmpHeap(sizeof(ErtsSendContext)/sizeof(Eterm), BIF_P); ctx->suspend = !0; - ctx->dep_to_deref = NULL; + ctx->connect = !0; + ctx->deref_dep = 0; ctx->return_term = am_ok; ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR); ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT; while (is_list(l)) { if (CAR(list_val(l)) == am_noconnect) { - connect = 0; + ctx->connect = 0; } else if (CAR(list_val(l)) == am_nosuspend) { ctx->suspend = 0; } else { @@ -2306,9 +2343,9 @@ BIF_RETTYPE send_3(BIF_ALIST_3) goto yield_return; ERTS_BIF_PREP_RET(retval, am_ok); break; - case SEND_TRAP: - if (connect) { - ERTS_BIF_PREP_TRAP3(retval, dsend3_trap, p, to, msg, opts); + case SEND_NOCONNECT: + if (ctx->connect) { + ERTS_BIF_PREP_RET(retval, am_ok); } else { ERTS_BIF_PREP_RET(retval, am_noconnect); } @@ -2412,7 +2449,8 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg) ref = NIL; #endif ctx->suspend = !0; - ctx->dep_to_deref = NULL; + ctx->connect = !0; + ctx->deref_dep = 0; ctx->return_term = msg; ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR); ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT; @@ -2436,8 +2474,8 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg) goto yield_return; ERTS_BIF_PREP_RET(retval, msg); break; - case SEND_TRAP: - ERTS_BIF_PREP_TRAP2(retval, dsend2_trap, p, to, msg); + case SEND_NOCONNECT: + ERTS_BIF_PREP_RET(retval, msg); break; case SEND_YIELD: ERTS_BIF_PREP_YIELD2(retval, bif_export[BIF_send_2], p, to, msg); @@ -4387,20 +4425,21 @@ BIF_RETTYPE group_leader_2(BIF_ALIST_2) if(dep == erts_this_dist_entry) BIF_ERROR(BIF_P, BADARG); - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: BIF_RET(am_true); case ERTS_DSIG_PREP_NOT_CONNECTED: BIF_TRAP2(dgroup_leader_trap, BIF_P, BIF_ARG_1, BIF_ARG_2); + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: code = erts_dsig_send_group_leader(&dsd, BIF_ARG_1, BIF_ARG_2); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); default: - ASSERT(! "Invalid dsig prepare result"); - BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); + ERTS_ASSERT(! "Invalid dsig prepare result"); } } else if (is_internal_pid(BIF_ARG_2)) { diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index f7b4451890..66469a011d 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -691,3 +691,11 @@ bif erts_internal:maps_to_list/2 # bif erlang:iolist_to_iovec/1 + +# +# New in 21.0 +# + +bif erlang:new_connection_id/1 +bif erlang:abort_connection_id/2 + diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index bc168fc58d..0fa399cf53 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -103,8 +103,6 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) -#define PASS_THROUGH 'p' /* This code should go */ - int erts_is_alive; /* System must be blocked on change */ int erts_dist_buf_busy_limit; @@ -684,31 +682,11 @@ size_obuf(ErtsDistOutputBuf *obuf) return bin->orig_size; } -static void clear_dist_entry(DistEntry *dep) +static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) { - Sint obufsize = 0; - ErtsAtomCache *cache; - ErtsProcList *suspendees; ErtsDistOutputBuf *obuf; - erts_de_rwlock(dep); - erts_atomic_set_nob(&dep->input_handler, - (erts_aint_t) NIL); - cache = dep->cache; - dep->cache = NULL; - -#ifdef DEBUG - erts_de_links_lock(dep); - ASSERT(!dep->nlinks); - ASSERT(!dep->node_links); - ASSERT(!dep->monitors); - erts_de_links_unlock(dep); -#endif - - erts_mtx_lock(&dep->qlock); - - erts_atomic64_set_nob(&dep->in, 0); - erts_atomic64_set_nob(&dep->out, 0); + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); if (!dep->out_queue.last) obuf = dep->finalized_out_queue.first; @@ -728,17 +706,12 @@ static void clear_dist_entry(DistEntry *dep) dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; - dep->status = 0; - suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); - - erts_mtx_unlock(&dep->qlock); - erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); - dep->send = NULL; - erts_de_rwunlock(dep); - - erts_resume_processes(suspendees); + return obuf; +} - delete_cache(cache); +static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf) +{ + Sint obufsize = 0; while (obuf) { ErtsDistOutputBuf *fobuf; @@ -750,13 +723,54 @@ static void clear_dist_entry(DistEntry *dep) if (obufsize) { erts_mtx_lock(&dep->qlock); - ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); + ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize); erts_mtx_unlock(&dep->qlock); } } +static void clear_dist_entry(DistEntry *dep) +{ + ErtsAtomCache *cache; + ErtsProcList *suspendees; + ErtsDistOutputBuf *obuf; + + erts_de_rwlock(dep); + erts_atomic_set_nob(&dep->input_handler, + (erts_aint_t) NIL); + cache = dep->cache; + dep->cache = NULL; + +#ifdef DEBUG + erts_de_links_lock(dep); + ASSERT(!dep->nlinks); + ASSERT(!dep->node_links); + ASSERT(!dep->monitors); + erts_de_links_unlock(dep); +#endif + + erts_mtx_lock(&dep->qlock); + + erts_atomic64_set_nob(&dep->in, 0); + erts_atomic64_set_nob(&dep->out, 0); + + obuf = clear_de_out_queues(dep); + dep->status = 0; + suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + erts_de_rwunlock(dep); + + erts_resume_processes(suspendees); + + delete_cache(cache); + + free_de_out_queues(dep, obuf); +} + int erts_dsend_context_dtor(Binary* ctx_bin) { ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin); @@ -772,8 +786,8 @@ int erts_dsend_context_dtor(Binary* ctx_bin) if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) { free_dist_obuf(ctx->dss.obuf); } - if (ctx->dep_to_deref) - erts_deref_dist_entry(ctx->dep_to_deref); + if (ctx->deref_dep) + erts_deref_dist_entry(ctx->dep); return 1; } @@ -1324,7 +1338,7 @@ int erts_net_message(Port *prt, /* This is tricky (we MUST force a distributed send) */ ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_exit(&dsd, to, from, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); @@ -1416,7 +1430,7 @@ int erts_net_message(Port *prt, if (!rp) { ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, am_noproc); @@ -1883,7 +1897,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } else { ctx->acmp = NULL; - ctx->pass_through_size = 1; + ctx->pass_through_size = 3; /* SVERK rename */ } #ifdef ERTS_DIST_MSG_DBG @@ -1961,9 +1975,9 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->obuf->next = NULL; erts_de_rlock(dep); cid = dep->cid; - if (cid != dsdp->cid - || dep->connection_id != dsdp->connection_id - || dep->status & ERTS_DE_SFLG_EXITING) { + if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED)) + || dep->status & ERTS_DE_SFLG_EXITING + || dep->connection_id != dsdp->connection_id) { /* Not the same connection as when we started; drop message... */ erts_de_runlock(dep); free_dist_obuf(ctx->obuf); @@ -2037,8 +2051,13 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } erts_mtx_unlock(&dep->qlock); - if (is_internal_port(dep->cid)) - erts_schedule_dist_command(NULL, dep); + if (!(dep->status & ERTS_DE_SFLG_PENDING)) { + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + } + else { + notify_proc = NIL; + } erts_de_runlock(dep); if (is_internal_pid(notify_proc)) notify_dist_data(ctx->c_p, notify_proc); @@ -2306,9 +2325,6 @@ erts_dist_command(Port *prt, int reds_limit) ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, dep->cache, flags); - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; preempt = reds > reds_limit; @@ -2346,21 +2362,18 @@ erts_dist_command(Port *prt, int reds_limit) int de_busy; int preempt = 0; while (oq.first && !preempt) { - ErtsDistOutputBuf *fob; - Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ - ASSERT(&oq.first->data[0] <= oq.first->extp - && oq.first->extp < oq.first->ext_endp); - size = (*send)(prt, oq.first); + ErtsDistOutputBuf *fob; + Uint size; + oq.first->extp + = erts_encode_ext_dist_header_finalize(oq.first->extp, + dep->cache, + flags); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + ASSERT(&oq.first->data[0] <= oq.first->extp + && oq.first->extp < oq.first->ext_endp); + size = (*send)(prt, oq.first); erts_atomic64_inc_nob(&dep->out); - esdp->io.out += (Uint64) size; + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG erts_fprintf(stderr, ">> "); bw(oq.first->extp, size); @@ -2482,11 +2495,13 @@ erts_dist_command(Port *prt, int reds_limit) foq.first = NULL; foq.last = NULL; +/* SVERK Hmmm.... #ifdef DEBUG erts_mtx_lock(&dep->qlock); ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); erts_mtx_unlock(&dep->qlock); #endif +*/ } else { if (oq.first) { @@ -3218,6 +3233,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) ErtsProcLocks proc_unlock = 0; Process *proc; Port *pp = NULL; + Eterm notify_proc; + erts_aint32_t qflgs; /* * Check and pick out arguments @@ -3245,15 +3262,14 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) if (!is_atom(ic) || !is_atom(oc)) goto badarg; - /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */ - if (!(DFLAG_EXTENDED_REFERENCES & flags)) { + if (~flags & DFLAG_DIST_MANDATORY) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "%T", BIF_P->common.id); if (BIF_P->common.u.alive.reg) erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name); erts_dsprintf(dsbufp, " attempted to enable connection to node %T " - "which is not able to handle extended references.\n", + "which does not support all mandatory capabilities.\n", BIF_ARG_1); erts_send_error_to_logger(BIF_P->group_leader, dsbufp); goto badarg; @@ -3376,7 +3392,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) dep->creation = 0; #ifdef DEBUG - ASSERT(erts_atomic_read_nob(&dep->qsize) == 0); + ASSERT(erts_atomic_read_nob(&dep->qsize) == 0 + || (dep->status & ERTS_DE_SFLG_PENDING)); #endif if (flags & DFLAG_DIST_HDR_ATOM_CACHE) @@ -3384,7 +3401,26 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); + notify_proc = NIL; + if (erts_atomic_read_nob(&dep->qsize)) { + if (is_internal_port(dep->cid)) { + erts_schedule_dist_command(NULL, dep); + } + else { + qflgs = erts_atomic32_read_nob(&dep->qflgs); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + qflgs = erts_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + notify_proc = dep->cid; + ASSERT(is_internal_pid(notify_proc)); + } + } + } + } erts_de_rwunlock(dep); + if (is_internal_pid(notify_proc)) + notify_dist_data(BIF_P, notify_proc); ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); @@ -3426,6 +3462,112 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto done; } +BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) +{ + DistEntry* dep; + Uint32 conn_id; + + if (is_not_atom(BIF_ARG_1)) { + BIF_ERROR(BIF_P, BADARG); + } + dep = erts_find_or_insert_dist_entry(BIF_ARG_1); + ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + + erts_de_rwlock(dep); + + if (ERTS_DE_IS_CONNECTED(dep) || dep->status & ERTS_DE_SFLG_PENDING) + conn_id = dep->connection_id; + else if (dep->status == 0) { + dep->status = ERTS_DE_SFLG_PENDING; + dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION; + dep->connection_id++; + dep->connection_id &= ERTS_DIST_CON_ID_MASK; + conn_id = dep->connection_id; + } + else { + ASSERT(!"SVERK: What to do?"); + conn_id = dep->connection_id; + } + erts_de_rwunlock(dep); + BIF_RET(make_small(conn_id)); +} + +BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) +{ + DistEntry* dep; + + if (is_not_atom(BIF_ARG_1) || is_not_small(BIF_ARG_2)) { + BIF_ERROR(BIF_P, BADARG); + } + dep = erts_find_dist_entry(BIF_ARG_1); + ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + + if (!dep) { + BIF_RET(am_false); + } + + erts_de_rwlock(dep); + + if (dep->status == ERTS_DE_SFLG_PENDING + && dep->connection_id == unsigned_val(BIF_ARG_2)) { + + NetExitsContext nec = {dep}; + ErtsLink *nlinks; + ErtsLink *node_links; + ErtsMonitor *monitors; + ErtsAtomCache *cache; + ErtsDistOutputBuf *obuf; + ErtsProcList *resume_procs; + Sint reds = 0; + + ASSERT(is_nil(dep->cid)); + + erts_de_links_lock(dep); + monitors = dep->monitors; + nlinks = dep->nlinks; + node_links = dep->node_links; + dep->monitors = NULL; + dep->nlinks = NULL; + dep->node_links = NULL; + erts_de_links_unlock(dep); + + cache = dep->cache; + dep->cache = NULL; + dep->status = 0; + dep->flags = 0; + erts_mtx_lock(&dep->qlock); + obuf = dep->out_queue.first; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + ASSERT(!dep->tmp_out_queue.first); + ASSERT(!dep->finalized_out_queue.first); + resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + erts_de_rwunlock(dep); + + erts_sweep_monitors(monitors, &doit_monitor_net_exits, &nec); + erts_sweep_links(nlinks, &doit_link_net_exits, &nec); + erts_sweep_links(node_links, &doit_node_link_net_exits, &nec); + + if (resume_procs) { + int resumed = erts_resume_processes(resume_procs); + reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + } + + delete_cache(cache); + + free_de_out_queues(dep, obuf); + + BIF_RET2(am_true, reds); + } + + erts_de_rwunlock(dep); + + BIF_RET(am_false); +} + /**********************************************************************/ /* dist_exit(Local, Term, Remote) -> Bool */ diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index d4765c50b8..26432b21e9 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -45,6 +45,13 @@ #define DFLAG_MAP_TAG 0x20000 #define DFLAG_BIG_CREATION 0x40000 #define DFLAG_SEND_SENDER 0x80000 +#define DFLAG_PENDING_CONNECTION 0x100000 + +/* Mandatory flags for distribution (sync with dist_util.erl) */ +#define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \ + | DFLAG_EXTENDED_PIDS_PORTS \ + | DFLAG_UTF8_ATOMS \ + | DFLAG_NEW_FUN_TAGS) /* All flags that should be enabled when term_to_binary/1 is used. */ #define TERM_TO_BINARY_DFLAGS (DFLAG_EXTENDED_REFERENCES \ @@ -98,6 +105,7 @@ typedef enum { typedef struct { Process *proc; DistEntry *dep; + Eterm node; /* used if dep == NULL */ Eterm cid; Eterm connection_id; int no_suspend; @@ -117,14 +125,10 @@ extern int erts_is_alive; /* * erts_dsig_prepare() prepares a send of a distributed signal. - * One of the values defined below are returned. If the returned - * value is another than ERTS_DSIG_PREP_CONNECTED, the - * distributed signal cannot be sent before appropriate actions - * have been taken. Appropriate actions would typically be setting - * up the connection. + * One of the values defined below are returned. */ -/* Connected; signal can be sent. */ +/* Connected; signals can be enqueued and sent. */ #define ERTS_DSIG_PREP_CONNECTED 0 /* Not connected; connection needs to be set up. */ #define ERTS_DSIG_PREP_NOT_CONNECTED 1 @@ -132,11 +136,15 @@ extern int erts_is_alive; #define ERTS_DSIG_PREP_WOULD_SUSPEND 2 /* System not alive (distributed) */ #define ERTS_DSIG_PREP_NOT_ALIVE 3 +/* Pending connection; signals can be enqueued */ +#define ERTS_DSIG_PREP_PENDING 4 ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *, - DistEntry *, + DistEntry **, Process *, + ErtsProcLocks, ErtsDSigPrepLock, + int, int); ERTS_GLB_INLINE @@ -146,29 +154,100 @@ void erts_schedule_dist_command(Port *, DistEntry *); ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *dsdp, - DistEntry *dep, + DistEntry **depp, Process *proc, + ErtsProcLocks proc_locks, ErtsDSigPrepLock dspl, - int no_suspend) + int no_suspend, + int connect) { - int failure; + DistEntry* dep = *depp; + int res; + if (!erts_is_alive) return ERTS_DSIG_PREP_NOT_ALIVE; - if (!dep) - return ERTS_DSIG_PREP_NOT_CONNECTED; + if (!dep) { + if (!connect) + return ERTS_DSIG_PREP_NOT_CONNECTED; + + dep = erts_find_or_insert_dist_entry(dsdp->node); + ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + } + +#ifdef ERTS_ENABLE_LOCK_CHECK + if (connect) { + erts_proc_lc_might_unlock(proc, proc_locks); + } +#endif + +retry: if (dspl == ERTS_DSP_RWLOCK) erts_de_rwlock(dep); else erts_de_rlock(dep); - if (ERTS_DE_IS_NOT_CONNECTED(dep)) { - failure = ERTS_DSIG_PREP_NOT_CONNECTED; + + if (ERTS_DE_IS_CONNECTED(dep)) { + res = ERTS_DSIG_PREP_CONNECTED; + } + else if (dep->status & ERTS_DE_SFLG_PENDING) { + res = ERTS_DSIG_PREP_PENDING; + } + else if (dep->status & ERTS_DE_SFLG_EXITING) { + /* SVERK is this ok, or should we trigger another connection setup */ + res = ERTS_DSIG_PREP_NOT_CONNECTED; + goto fail; + } + else if (connect) { + ASSERT(dep->status == 0); + if (dspl != ERTS_DSP_RWLOCK) { + erts_de_runlock(dep); + erts_de_rwlock(dep); + } + if (dep->status == 0) { + Process* net_kernel; + ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ; + Eterm *hp; + ErlOffHeap *ohp; + ErtsMessage *mp; + Eterm msg, conn_id; + + dep->status = ERTS_DE_SFLG_PENDING; + dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION; + dep->connection_id++; + dep->connection_id &= ERTS_DIST_CON_ID_MASK; + conn_id = make_small(dep->connection_id); + erts_de_rwunlock(dep); + + net_kernel = erts_whereis_process(proc, proc_locks, + am_net_kernel, nk_locks, 0); + if (!net_kernel) { + if (!*depp) { + erts_deref_dist_entry(dep); + } + return ERTS_DSIG_PREP_NOT_ALIVE; + } + + /* Send {auto_connect, Node, ConnId} to net_kernel */ + mp = erts_alloc_message_heap(net_kernel, &nk_locks, 4, &hp, &ohp); + msg = TUPLE3(hp, am_auto_connect, dep->sysname, conn_id); + erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); + erts_proc_unlock(net_kernel, nk_locks); + } + else + erts_de_rwunlock(dep); + goto retry; + } + else { + ASSERT(dep->status == 0); + res = ERTS_DSIG_PREP_NOT_CONNECTED; goto fail; } + if (no_suspend) { - if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) { - failure = ERTS_DSIG_PREP_WOULD_SUSPEND; + if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) { + res = ERTS_DSIG_PREP_WOULD_SUSPEND; goto fail; - } + } } dsdp->proc = proc; dsdp->dep = dep; @@ -177,15 +256,15 @@ erts_dsig_prepare(ErtsDSigData *dsdp, dsdp->no_suspend = no_suspend; if (dspl == ERTS_DSP_NO_LOCK) erts_de_runlock(dep); - return ERTS_DSIG_PREP_CONNECTED; + *depp = dep; + return res; fail: if (dspl == ERTS_DSP_RWLOCK) erts_de_rwunlock(dep); else erts_de_runlock(dep); - return failure; - + return res; } ERTS_GLB_INLINE @@ -346,11 +425,12 @@ struct erts_dsig_send_context { typedef struct { int suspend; + int connect; Eterm ctl_heap[6]; ErtsDSigData dsd; - DistEntry* dep_to_deref; DistEntry *dep; + int deref_dep; struct erts_dsig_send_context dss; Eterm return_term; diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 9931686cbe..dd607f438e 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -597,6 +597,8 @@ erts_set_dist_entry_not_connected(DistEntry *dep) void erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) { + erts_aint32_t set_qflgs; + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); erts_rwmtx_rwlock(&erts_dist_table_rwmtx); @@ -619,22 +621,26 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) ASSERT(erts_no_of_not_connected_dist_entries > 0); erts_no_of_not_connected_dist_entries--; + if (dep->status & ERTS_DE_SFLG_PENDING) { + dep->status &= ~ERTS_DE_SFLG_PENDING; + } else { + dep->connection_id++; + dep->connection_id &= ERTS_DIST_CON_ID_MASK; + } dep->status |= ERTS_DE_SFLG_CONNECTED; - dep->flags = flags; + dep->flags = flags & ~DFLAG_PENDING_CONNECTION; dep->cid = cid; erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) cid); - dep->connection_id++; - dep->connection_id &= ERTS_DIST_EXT_CON_ID_MASK; dep->prev = NULL; erts_atomic64_set_nob(&dep->in, 0); erts_atomic64_set_nob(&dep->out, 0); - erts_atomic32_set_nob(&dep->qflgs, - (is_internal_port(cid) - ? ERTS_DE_QFLG_PORT_CTRL - : ERTS_DE_QFLG_PROC_CTRL)); + set_qflgs = (is_internal_port(cid) ? + ERTS_DE_QFLG_PORT_CTRL : ERTS_DE_QFLG_PROC_CTRL); + erts_atomic32_read_bor_nob(&dep->qflgs, set_qflgs); + if(flags & DFLAG_PUBLISHED) { dep->next = erts_visible_dist_entries; if(erts_visible_dist_entries) { diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index 2cfd18f22f..d012f4b2cf 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -57,8 +57,9 @@ #define ERST_INTERNAL_CHANNEL_NO 0 -#define ERTS_DE_SFLG_CONNECTED (((Uint32) 1) << 0) -#define ERTS_DE_SFLG_EXITING (((Uint32) 1) << 1) +#define ERTS_DE_SFLG_PENDING (((Uint32) 1) << 0) +#define ERTS_DE_SFLG_CONNECTED (((Uint32) 1) << 1) +#define ERTS_DE_SFLG_EXITING (((Uint32) 1) << 2) #define ERTS_DE_QFLG_BUSY (((erts_aint32_t) 1) << 0) #define ERTS_DE_QFLG_EXIT (((erts_aint32_t) 1) << 1) @@ -106,8 +107,6 @@ struct ErtsProcList_; * unlock mutexes with higher numbers before mutexes with higher numbers. */ -struct erl_link; - typedef struct dist_entry_ { HashBucket hash_bucket; /* Hash bucket */ struct dist_entry_ *next; /* Next entry in dist_table (not sorted) */ diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 3c0a126fe2..2d13cd92b2 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -12659,9 +12659,11 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, dep, NULL, - ERTS_DSP_NO_LOCK, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { + int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED || + code == ERTS_DSIG_PREP_PENDING) { + code = erts_dsig_send_demonitor(&dsd, rmon->u.pid, mon->name, @@ -12705,9 +12707,11 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, dep, NULL, - ERTS_DSP_NO_LOCK, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { + int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED || + code == ERTS_DSIG_PREP_PENDING) { + code = erts_dsig_send_demonitor(&dsd, rmon->u.pid, mon->u.pid, @@ -12764,8 +12768,8 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, dep, NULL, - ERTS_DSP_NO_LOCK, 0); + int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, mon->u.pid, @@ -12887,14 +12891,18 @@ static void doit_exit_link(ErtsLink *lnk, void *vpcontext) int code; ErtsDistLinkData dld; erts_remove_dist_link(&dld, p->common.id, item, dep); - erts_proc_lock(p, ERTS_PROC_LOCK_MAIN); - code = erts_dsig_prepare(&dsd, dep, p, ERTS_DSP_NO_LOCK, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { - code = erts_dsig_send_exit_tt(&dsd, p->common.id, item, - reason, SEQ_TRACE_TOKEN(p)); - ASSERT(code == ERTS_DSIG_SEND_OK); - } - erts_proc_unlock(p, ERTS_PROC_LOCK_MAIN); + if (dld.d_lnk) { + erts_proc_lock(p, ERTS_PROC_LOCK_MAIN); + code = erts_dsig_prepare(&dsd, &dep, p, 0, ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED || + code == ERTS_DSIG_PREP_PENDING) { + + code = erts_dsig_send_exit_tt(&dsd, p->common.id, item, + reason, SEQ_TRACE_TOKEN(p)); + ASSERT(code == ERTS_DSIG_SEND_OK); + } + erts_proc_unlock(p, ERTS_PROC_LOCK_MAIN); + } erts_destroy_dist_link(&dld); } } diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 3b851087d1..c49e2f617a 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -349,6 +349,8 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) } } +#define PASS_THROUGH 'p' /* This code should go */ + byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint32 dflags) { byte *ip; @@ -358,9 +360,33 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint int long_atoms; register byte *ep = ext; ASSERT(dflags & DFLAG_UTF8_ATOMS); - ASSERT(ep[0] == VERSION_MAGIC); - if (ep[1] != DIST_HEADER) - return ext; + + if (ep[0] != VERSION_MAGIC) { + ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT); + if (dflags & DFLAG_DIST_HDR_ATOM_CACHE) { + /* + * Encoded without atom cache (toward pending connection) + * but receiver wants dist header. Let's prepend an empty one. + */ + *--ep = 0; /* NumberOfAtomCacheRefs */ + *--ep = DIST_HEADER; + *--ep = VERSION_MAGIC; + } + else { + /* Node without atom cache, 'pass through' needed */ + + ASSERT(!"SVERK: Must insert VERSION_MAGIC's"); + *--ep = PASS_THROUGH; + } + return ep; + } + else if (ep[1] != DIST_HEADER) { + ASSERT(ep[1] == SMALL_TUPLE_EXT || ep[1] == LARGE_TUPLE_EXT); + ASSERT(!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)); + /* Node without atom cache, 'pass through' needed */ + *--ep = PASS_THROUGH; + return ep; + } dist_hdr_flags = ep[2]; long_atoms = ERTS_DIST_HDR_LONG_ATOMS_FLG & ((int) dist_hdr_flags); @@ -511,7 +537,7 @@ int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx return -1; } else { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE)) + if (!(ctx->flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_PENDING_CONNECTION))) #endif sz++ /* VERSION_MAGIC */; @@ -543,7 +569,7 @@ int erts_encode_dist_ext(Eterm term, byte **ext, Uint32 flags, ErtsAtomCacheMap { if (!ctx || !ctx->wstack.wstart) { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_PENDING_CONNECTION))) #endif *(*ext)++ = VERSION_MAGIC; } @@ -644,12 +670,11 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, erts_de_rlock(dep); - if ((dep->status & (ERTS_DE_SFLG_EXITING|ERTS_DE_SFLG_CONNECTED)) - != ERTS_DE_SFLG_CONNECTED) { + if (dep->status != ERTS_DE_SFLG_CONNECTED && + dep->status != ERTS_DE_SFLG_PENDING) { erts_de_runlock(dep); return ERTS_PREP_DIST_EXT_CLOSED; } - if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) edep->flags |= ERTS_DIST_EXT_DFLAG_HDR; diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index d6416edbc3..49950c4aad 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -117,7 +117,7 @@ typedef struct { #define ERTS_DIST_EXT_ATOM_TRANS_TAB ((Uint32) 0x2) #define ERTS_DIST_EXT_BTT_SAFE ((Uint32) 0x4) -#define ERTS_DIST_CON_ID_MASK ((Uint32) 0x00ffffff) +#define ERTS_DIST_CON_ID_MASK ((Uint32) 0x00ffffff) /* also in net_kernel.erl */ typedef struct { DistEntry *dep; diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 85013af3ad..6cd1aa5e79 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -3829,11 +3829,12 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc) ErtsDistLinkData dld; ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: erts_remove_dist_link(&dld, port_id, lnk->pid, dep); erts_destroy_dist_link(&dld); diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 80bceae506..28625c400f 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -165,6 +165,8 @@ -export([registered/0, resume_process/1, round/1, self/0]). -export([seq_trace/2, seq_trace_print/1, seq_trace_print/2, setnode/2]). -export([setnode/3, size/1, spawn/3, spawn_link/3, split_binary/2]). +-export([new_connection_id/1]). +-export([abort_connection_id/2]). -export([suspend_process/2, system_monitor/0]). -export([system_monitor/1, system_monitor/2, system_profile/0]). -export([system_profile/2, throw/1, time/0, trace/3, trace_delivered/1]). @@ -1670,6 +1672,19 @@ setnode(_P1, _P2) -> setnode(_P1, _P2, _P3) -> erlang:nif_error(undefined). +%% new_connection_id/1 +-spec erlang:new_connection_id(Node) -> integer() when + Node :: atom(). +new_connection_id(_Node) -> + erlang:nif_error(undefined). + +%% abort_connection_id/2 +-spec erlang:abort_connection_id(Node, ConnId) -> integer() when + Node :: atom(), + ConnId :: integer(). +abort_connection_id(_Node, _ConnId) -> + erlang:nif_error(undefined). + %% size/1 %% Shadowed by erl_bif_types: erlang:size/1 -spec size(Item) -> non_neg_integer() when diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index c68036a291..f929e4bf11 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -122,6 +122,7 @@ -record(connection, { node, %% remote node name + conn_id, %% Connection identity state, %% pending | up | up_pending owner, %% owner pid pending_owner, %% possible new owner @@ -362,54 +363,33 @@ init({Name, LongOrShortNames, TickT, CleanHalt}) -> end. -handle_connect([Conn], _, _, From, State) when Conn#connection.state =:= up -> - async_reply({reply, true, State}, From); -handle_connect([Conn], _, _, From, State) when Conn#connection.state =:= pending; - Conn#connection.state =:= up_pending -> - Waiting = Conn#connection.waiting, - ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}), - {noreply, State}; -handle_connect(_, Type, Node, From , State) -> - case setup(Node,Type,From,State) of - {ok, SetupPid} -> - Owners = [{SetupPid, Node} | State#state.conn_owners], - {noreply,State#state{conn_owners=Owners}}; - _Error -> - ?connect_failure(Node, {setup_call, failed, _Error}), - async_reply({reply, false, State}, From) - end. - -%% ------------------------------------------------------------ -%% handle_call. -%% ------------------------------------------------------------ - -%% -%% Auto-connect to Node. -%% The response is delayed until the connection is up and -%% running. -%% -handle_call({auto_connect, _, Node, _}, From, State) when Node =:= node() -> - async_reply({reply, true, State}, From); -handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) -> - verbose({auto_connect, Type, Node, WaitForBarred}, 1, State), - +handle_auto_connect(Type, Node, ConnId, WaitForBarred, From, State) -> ConnLookup = ets:lookup(sys_dist, Node), case ConnLookup of [#barred_connection{}] -> case WaitForBarred of false -> - async_reply({reply, false, State}, From); + {reply, false, State}; true -> spawn(?MODULE,passive_connect_monitor,[From,Node]), {noreply, State} end; + [#connection{conn_id=ConnId, state = up}] -> + {reply, true, State}; + [#connection{conn_id=ConnId, waiting=Waiting}=Conn] -> + case From of + noreply -> ok; + _ -> ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}) + end, + {noreply, State}; + _ -> case application:get_env(kernel, dist_auto_connect) of {ok, never} -> ?connect_failure(Node,{dist_auto_connect,never}), - async_reply({reply, false, State}, From); + {reply, false, State}; %% This might happen due to connection close %% not beeing propagated to user space yet. @@ -418,11 +398,78 @@ handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) -> (hd(ConnLookup))#connection.state =:= up -> ?connect_failure(Node,{barred_connection, ets:lookup(sys_dist, Node)}), - async_reply({reply, false, State}, From); + {reply, false, State}; _ -> - handle_connect(ConnLookup, Type, Node, From, State) + case setup(ConnLookup, Node,ConnId,Type,From,State) of + {ok, SetupPid} -> + Owners = [{SetupPid, Node} | State#state.conn_owners], + {noreply,State#state{conn_owners=Owners}}; + _Error -> + ?connect_failure(Node, {setup_call, failed, _Error}), + {reply, false, State} + end end - end; + end. + + +handle_connect([#connection{conn_id = ConnId, state = up}], _, _, ConnId, _From, State) -> + {reply, true, State}; +handle_connect([#connection{conn_id = ConnId}=Conn], _, _, ConnId, From, State) + when Conn#connection.state =:= pending; + Conn#connection.state =:= up_pending -> + Waiting = Conn#connection.waiting, + ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}), + {noreply, State}; +handle_connect([#barred_connection{}], Type, Node, ConnId, From , State) -> + %% Barred connection only affects auto_connect, ignore it. + handle_connect([], Type, Node, ConnId, From , State); +handle_connect(ConnLookup, Type, Node, ConnId, From , State) -> + case setup(ConnLookup, Node,ConnId,Type,From,State) of + {ok, SetupPid} -> + Owners = [{SetupPid, Node} | State#state.conn_owners], + {noreply,State#state{conn_owners=Owners}}; + _Error -> + ?connect_failure(Node, {setup_call, failed, _Error}), + {reply, false, State} + end. + +-define(ERTS_DIST_CON_ID_MASK, 16#ffffff). % also in external.h + +verify_new_conn_id([], ConnId) + when (ConnId band (bnot ?ERTS_DIST_CON_ID_MASK)) =:= 0 -> + true; +verify_new_conn_id([#connection{conn_id = Old}], New) + when New =:= ((Old+1) band ?ERTS_DIST_CON_ID_MASK) -> + true; +verify_new_conn_id(_, _) -> + false. + + + +%% ------------------------------------------------------------ +%% handle_call. +%% ------------------------------------------------------------ + +%% +%% Auto-connect to Node. +%% The response is delayed until the connection is up and running. +%% +handle_call({auto_connect, _, Node, _}, From, State) when Node =:= node() -> + async_reply({reply, true, State}, From); +handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) -> + verbose({auto_connect, Type, Node, WaitForBarred}, 1, State), + + R = case (catch erlang:new_connection_id(Node)) of + ConnId when is_integer(ConnId) -> + handle_auto_connect(Type, Node, ConnId, WaitForBarred, From, State); + + _Error -> + error_logger:error_msg("~n** Cannot get connection id for node ~w~n", + [Node]), + {reply, false, State} + end, + + return_call(R, From); %% %% Explicit connect @@ -433,7 +480,17 @@ handle_call({connect, _, Node, _, _}, From, State) when Node =:= node() -> handle_call({connect, Type, Node}, From, State) -> verbose({connect, Type, Node}, 1, State), ConnLookup = ets:lookup(sys_dist, Node), - handle_connect(ConnLookup, Type, Node, From, State); + R = case (catch erlang:new_connection_id(Node)) of + ConnId when is_integer(ConnId) -> + handle_connect(ConnLookup, Type, Node, ConnId, From, State); + + _Error -> + error_logger:error_msg("~n** Cannot get connection id for node ~w~n", + [Node]), + {reply, false, State} + end, + return_call(R, From); + %% %% Close the connection to Node. @@ -640,6 +697,25 @@ terminate(_Reason, State) -> %% ------------------------------------------------------------ %% +%% Asynchronous auto connect request +%% +handle_info({auto_connect,Node,ConnId}, State) -> + verbose({auto_connect, Node, ConnId}, 1, State), + NewState = + case handle_auto_connect(normal, Node, ConnId, false, noreply, State) of + {noreply, S} -> %% Pending connection + S; + + {reply, true, S} -> %% Already connected + S; + + {reply, false, S} -> %% Connection refused + erlang:abort_connection_id(Node, ConnId), + S + end, + {noreply, NewState}; + +%% %% accept a new connection. %% handle_info({accept,AcceptPid,Socket,Family,Proto}, State) -> @@ -719,7 +795,12 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) -> AcceptPid ! {self(), {accept_pending, already_pending}}, {noreply, State}; _ -> + ConnId = case (catch erlang:new_connection_id(Node)) of + CI when is_integer(CI) -> CI + %% SVERK What to do? + end, ets:insert(sys_dist, #connection{node = Node, + conn_id = ConnId, state = pending, owner = AcceptPid, address = Address, @@ -912,6 +993,7 @@ pending_nodedown(Conn, Node, Type, State) -> % Don't bar connections that have never been alive %mark_sys_dist_nodedown(Node), % - instead just delete the node: + erlang:abort_connection_id(Node, Conn#connection.conn_id), ets:delete(sys_dist, Node), reply_waiting(Node,Conn#connection.waiting, false), case Type of @@ -934,15 +1016,16 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) -> State#state{conn_owners = [{AcceptPid,Node}|Owners], pend_owners = Pend}. -up_nodedown(_Conn, Node, _Reason, Type, State) -> - mark_sys_dist_nodedown(Node), +up_nodedown(Conn, Node, _Reason, Type, State) -> + mark_sys_dist_nodedown(Conn, Node), case Type of normal -> ?nodedown(Node, State); _ -> ok end, State. -mark_sys_dist_nodedown(Node) -> +mark_sys_dist_nodedown(Conn, Node) -> + erlang:abort_connection_id(Node, Conn#connection.conn_id), case application:get_env(kernel, dist_auto_connect) of {ok, once} -> ets:insert(sys_dist, #barred_connection{node = Node}); @@ -1185,15 +1268,8 @@ spawn_func(_,{From,Tag},M,F,A,Gleader) -> %% Set up connection to a new node. %% ----------------------------------------------------------- -setup(Node,Type,From,State) -> - Allowed = State#state.allowed, - case lists:member(Node, Allowed) of - false when Allowed =/= [] -> - error_msg("** Connection attempt with " - "disallowed node ~w ** ~n", [Node]), - {error, bad_node}; - _ -> - case select_mod(Node, State#state.listen) of +setup(ConnLookup, Node,ConnId,Type,From,State) -> + case setup_check(ConnLookup, Node, ConnId, State) of {ok, L} -> Mod = L#listen.module, LAddr = L#listen.address, @@ -1206,18 +1282,45 @@ setup(Node,Type,From,State) -> Addr = LAddr#net_address { address = undefined, host = undefined }, + Waiting = case From of + noreply -> []; + _ -> [From] + end, ets:insert(sys_dist, #connection{node = Node, + conn_id = ConnId, state = pending, owner = Pid, - waiting = [From], + waiting = Waiting, address = Addr, type = normal}), {ok, Pid}; Error -> Error - end end. +setup_check(ConnLookup, Node, ConnId, State) -> + Allowed = State#state.allowed, + case lists:member(Node, Allowed) of + false when Allowed =/= [] -> + error_msg("** Connection attempt with " + "disallowed node ~w ** ~n", [Node]), + {error, bad_node}; + _ -> + case verify_new_conn_id(ConnLookup, ConnId) of + false -> + error_msg("** Connection attempt to ~w with " + "bad connection id ~w ** ~n", [Node, ConnId]), + {error, bad_conn_id}; + true -> + case select_mod(Node, State#state.listen) of + {ok, _L}=OK -> OK; + Error -> Error + end + end + end. + + + %% %% Find a module that is willing to handle connection setup to Node %% @@ -1658,6 +1761,11 @@ verbose(_, _, _) -> getnode(P) when is_pid(P) -> node(P); getnode(P) -> P. +return_call({noreply, _State}=R, _From) -> + R; +return_call(R, From) -> + async_reply(R, From). + async_reply({reply, Msg, State}, From) -> async_gen_server_reply(From, Msg), {noreply, State}. |