aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2017-07-14 19:34:54 +0200
committerSverker Eriksson <[email protected]>2017-11-15 20:10:33 +0100
commitf89fb92384280e2939414287a2ecb8f86a199318 (patch)
treefacf1d7b31b56cc25575d6cb9a01a69e2e4317d0
parentfe720f6b2051c9bf8ff303f857c3db0a84b1c050 (diff)
downloadotp-f89fb92384280e2939414287a2ecb8f86a199318.tar.gz
otp-f89fb92384280e2939414287a2ecb8f86a199318.tar.bz2
otp-f89fb92384280e2939414287a2ecb8f86a199318.zip
erts: Introduce asynchronous auto-connect
-rw-r--r--erts/emulator/beam/atom.names1
-rw-r--r--erts/emulator/beam/bif.c121
-rw-r--r--erts/emulator/beam/bif.tab8
-rw-r--r--erts/emulator/beam/dist.c274
-rw-r--r--erts/emulator/beam/dist.h122
-rw-r--r--erts/emulator/beam/erl_node_tables.c20
-rw-r--r--erts/emulator/beam/erl_node_tables.h7
-rw-r--r--erts/emulator/beam/erl_process.c40
-rw-r--r--erts/emulator/beam/external.c41
-rw-r--r--erts/emulator/beam/external.h2
-rw-r--r--erts/emulator/beam/io.c3
-rw-r--r--erts/preloaded/src/erlang.erl15
-rw-r--r--lib/kernel/src/net_kernel.erl210
13 files changed, 648 insertions, 216 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names
index fc55b687d4..a534dd44fb 100644
--- a/erts/emulator/beam/atom.names
+++ b/erts/emulator/beam/atom.names
@@ -108,6 +108,7 @@ atom asynchronous
atom atom
atom atom_used
atom attributes
+atom auto_connect
atom await_microstate_accounting_modifications
atom await_port_send_result
atom await_proc_exit
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index 4b11884f38..34f5afae78 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -227,17 +227,40 @@ BIF_RETTYPE link_1(BIF_ALIST_1)
goto res_no_proc;
}
- code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_RLOCK, 0);
+ code = erts_dsig_prepare(&dsd, &dep, BIF_P,
+ (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK),
+ ERTS_DSP_RLOCK, 0, 1);
switch (code) {
case ERTS_DSIG_PREP_NOT_ALIVE:
- /* Let the dlink trap handle it */
- case ERTS_DSIG_PREP_NOT_CONNECTED:
- erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK);
- BIF_TRAP1(dlink_trap, BIF_P, BIF_ARG_1);
-
+ case ERTS_DSIG_PREP_NOT_CONNECTED: {
+ ErtsProcLocks locks = ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK;
+ erts_aint32_t state;
+ erts_proc_lock(BIF_P, (ERTS_PROC_LOCKS_ALL & ~locks));
+ locks = ERTS_PROC_LOCKS_ALL;
+ erts_send_exit_signal(BIF_P, BIF_ARG_1, BIF_P, &locks,
+ am_noconnection, NIL, NULL, 0);
+ erts_proc_unlock(BIF_P, locks & ERTS_PROC_LOCKS_ALL_MINOR);
+
+ /*
+ * Copy-paste from old dist_exit_3, not sure if we really
+ * need erts_handle_pending_exit when exit_2 does not.
+ */
+ state = erts_atomic32_read_acqb(&BIF_P->state);
+ if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) {
+#ifdef ERTS_SMP
+ if (state & ERTS_PSFLG_PENDING_EXIT)
+ erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN);
+#endif
+ ERTS_BIF_EXITED(BIF_P);
+ }
+ BIF_RET(am_true);
+ }
+ case ERTS_DSIG_PREP_PENDING:
case ERTS_DSIG_PREP_CONNECTED:
- /* We are connected. Setup link and send link signal */
-
+ /*
+ * We have (pending) connection.
+ * Setup link and enqueue link signal.
+ */
erts_de_links_lock(dep);
erts_add_link(&ERTS_P_LINKS(BIF_P), LINK_PID, BIF_ARG_1);
@@ -256,8 +279,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1)
ERTS_BIF_YIELD_RETURN(BIF_P, am_true);
BIF_RET(am_true);
default:
- ASSERT(! "Invalid dsig prepare result");
- BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR);
+ ERTS_ASSERT(! "Invalid dsig prepare result");
}
}
}
@@ -292,7 +314,8 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to)
ERTS_LC_ASSERT((ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK)
== erts_proc_lc_my_proc_locks(c_p));
- code = erts_dsig_prepare(&dsd, dep, c_p, ERTS_DSP_RLOCK, 0);
+ code = erts_dsig_prepare(&dsd, &dep, c_p, ERTS_PROC_LOCK_MAIN,
+ ERTS_DSP_RLOCK, 0, 0);
switch (code) {
case ERTS_DSIG_PREP_NOT_ALIVE:
case ERTS_DSIG_PREP_NOT_CONNECTED:
@@ -313,6 +336,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to)
res = am_true;
break;
+ case ERTS_DSIG_PREP_PENDING:
case ERTS_DSIG_PREP_CONNECTED:
erts_de_links_lock(dep);
@@ -347,8 +371,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to)
}
break;
default:
- ASSERT(! "Invalid dsig prepare result");
- return am_internal_error;
+ ERTS_ASSERT(! "Invalid dsig prepare result");
}
@@ -768,10 +791,18 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2,
int code;
erts_proc_lock(p, ERTS_PROC_LOCK_LINK);
- code = erts_dsig_prepare(&dsd, dep, p, ERTS_DSP_RLOCK, 0);
+ code = erts_dsig_prepare(&dsd, &dep,
+ p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK),
+ ERTS_DSP_RLOCK, 0, 0);
switch (code) {
+ case ERTS_DSIG_PREP_PENDING:
+ /*
+ * Must wait for connection to know if node supports monitor.
+ * Damn these synchronous errors.
+ */
+ erts_smp_de_runlock(dep);
+ /* fall through */
case ERTS_DSIG_PREP_NOT_ALIVE:
- /* Let the dmonitor_p trap handle it */
case ERTS_DSIG_PREP_NOT_CONNECTED:
erts_proc_unlock(p, ERTS_PROC_LOCK_LINK);
ERTS_BIF_PREP_TRAP2(ret, dmonitor_p_trap, p, bifarg1, bifarg2);
@@ -818,9 +849,7 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2,
}
break;
default:
- ASSERT(! "Invalid dsig prepare result");
- ERTS_BIF_PREP_ERROR(ret, p, EXC_INTERNAL_ERROR);
- break;
+ ERTS_ASSERT(! "Invalid dsig prepare result");
}
BIF_RET(ret);
@@ -1158,7 +1187,8 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1)
BIF_RET(am_true);
}
- code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0);
+ code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN,
+ ERTS_DSP_NO_LOCK, 0, 0);
switch (code) {
case ERTS_DSIG_PREP_NOT_ALIVE:
case ERTS_DSIG_PREP_NOT_CONNECTED:
@@ -1173,6 +1203,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1)
BIF_TRAP1(dunlink_trap, BIF_P, BIF_ARG_1);
#endif
+ case ERTS_DSIG_PREP_PENDING:
case ERTS_DSIG_PREP_CONNECTED:
erts_remove_dist_link(&dld, BIF_P->common.id, BIF_ARG_1, dep);
code = erts_dsig_send_unlink(&dsd, BIF_P->common.id, BIF_ARG_1);
@@ -1545,22 +1576,24 @@ BIF_RETTYPE exit_2(BIF_ALIST_2)
DistEntry *dep;
dep = external_pid_dist_entry(BIF_ARG_1);
+ ERTS_ASSERT(dep);
if(dep == erts_this_dist_entry)
BIF_RET(am_true);
- code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0);
+ code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN,
+ ERTS_DSP_NO_LOCK, 0, 1);
switch (code) {
case ERTS_DSIG_PREP_NOT_ALIVE:
case ERTS_DSIG_PREP_NOT_CONNECTED:
- BIF_TRAP2(dexit_trap, BIF_P, BIF_ARG_1, BIF_ARG_2);
+ BIF_RET(am_true);
+ case ERTS_DSIG_PREP_PENDING:
case ERTS_DSIG_PREP_CONNECTED:
code = erts_dsig_send_exit2(&dsd, BIF_P->common.id, BIF_ARG_1, BIF_ARG_2);
if (code == ERTS_DSIG_SEND_YIELD)
ERTS_BIF_YIELD_RETURN(BIF_P, am_true);
BIF_RET(am_true);
default:
- ASSERT(! "Invalid dsig prepare result");
- BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR);
+ ERTS_ASSERT(! "Invalid dsig prepare result");
}
}
else if (is_not_internal_pid(BIF_ARG_1)) {
@@ -1964,7 +1997,7 @@ ebif_bang_2(BIF_ALIST_2)
* Send a message to Process, Port or Registered Process.
* Returns non-negative reduction bump or negative result code.
*/
-#define SEND_TRAP (-1)
+#define SEND_NOCONNECT (-1)
#define SEND_YIELD (-2)
#define SEND_YIELD_RETURN (-3)
#define SEND_BADARG (-4)
@@ -1980,20 +2013,22 @@ static Sint remote_send(Process *p, DistEntry *dep,
{
Sint res;
int code;
-
ASSERT(is_atom(to) || is_external_pid(to));
ctx->dep = dep;
- code = erts_dsig_prepare(&ctx->dsd, dep, p, ERTS_DSP_NO_LOCK, !ctx->suspend);
+ code = erts_dsig_prepare(&ctx->dsd, &dep, p, ERTS_PROC_LOCK_MAIN,
+ ERTS_DSP_NO_LOCK,
+ !ctx->suspend, ctx->connect);
switch (code) {
case ERTS_DSIG_PREP_NOT_ALIVE:
case ERTS_DSIG_PREP_NOT_CONNECTED:
- res = SEND_TRAP;
+ res = SEND_NOCONNECT;
break;
case ERTS_DSIG_PREP_WOULD_SUSPEND:
ASSERT(!ctx->suspend);
res = SEND_YIELD;
break;
+ case ERTS_DSIG_PREP_PENDING:
case ERTS_DSIG_PREP_CONNECTED: {
if (is_atom(to))
@@ -2205,12 +2240,14 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx)
}
return 0;
}
+ ctx->dsd.node = tp[2];
ret = remote_send(p, dep, tp[1], to, msg, ctx);
if (ret == SEND_YIELD_CONTINUE) {
- if (dep)
+ if (dep) {
erts_ref_dist_entry(dep);
- ctx->dep_to_deref = dep;
+ ctx->deref_dep = 1;
+ }
}
return ret;
} else {
@@ -2251,7 +2288,6 @@ BIF_RETTYPE send_3(BIF_ALIST_3)
Eterm msg = BIF_ARG_2;
Eterm opts = BIF_ARG_3;
- int connect = !0;
Eterm l = opts;
Sint result;
@@ -2262,14 +2298,15 @@ BIF_RETTYPE send_3(BIF_ALIST_3)
UseTmpHeap(sizeof(ErtsSendContext)/sizeof(Eterm), BIF_P);
ctx->suspend = !0;
- ctx->dep_to_deref = NULL;
+ ctx->connect = !0;
+ ctx->deref_dep = 0;
ctx->return_term = am_ok;
ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR);
ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT;
while (is_list(l)) {
if (CAR(list_val(l)) == am_noconnect) {
- connect = 0;
+ ctx->connect = 0;
} else if (CAR(list_val(l)) == am_nosuspend) {
ctx->suspend = 0;
} else {
@@ -2306,9 +2343,9 @@ BIF_RETTYPE send_3(BIF_ALIST_3)
goto yield_return;
ERTS_BIF_PREP_RET(retval, am_ok);
break;
- case SEND_TRAP:
- if (connect) {
- ERTS_BIF_PREP_TRAP3(retval, dsend3_trap, p, to, msg, opts);
+ case SEND_NOCONNECT:
+ if (ctx->connect) {
+ ERTS_BIF_PREP_RET(retval, am_ok);
} else {
ERTS_BIF_PREP_RET(retval, am_noconnect);
}
@@ -2412,7 +2449,8 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg)
ref = NIL;
#endif
ctx->suspend = !0;
- ctx->dep_to_deref = NULL;
+ ctx->connect = !0;
+ ctx->deref_dep = 0;
ctx->return_term = msg;
ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR);
ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT;
@@ -2436,8 +2474,8 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg)
goto yield_return;
ERTS_BIF_PREP_RET(retval, msg);
break;
- case SEND_TRAP:
- ERTS_BIF_PREP_TRAP2(retval, dsend2_trap, p, to, msg);
+ case SEND_NOCONNECT:
+ ERTS_BIF_PREP_RET(retval, msg);
break;
case SEND_YIELD:
ERTS_BIF_PREP_YIELD2(retval, bif_export[BIF_send_2], p, to, msg);
@@ -4387,20 +4425,21 @@ BIF_RETTYPE group_leader_2(BIF_ALIST_2)
if(dep == erts_this_dist_entry)
BIF_ERROR(BIF_P, BADARG);
- code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0);
+ code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN,
+ ERTS_DSP_NO_LOCK, 0, 0);
switch (code) {
case ERTS_DSIG_PREP_NOT_ALIVE:
BIF_RET(am_true);
case ERTS_DSIG_PREP_NOT_CONNECTED:
BIF_TRAP2(dgroup_leader_trap, BIF_P, BIF_ARG_1, BIF_ARG_2);
+ case ERTS_DSIG_PREP_PENDING:
case ERTS_DSIG_PREP_CONNECTED:
code = erts_dsig_send_group_leader(&dsd, BIF_ARG_1, BIF_ARG_2);
if (code == ERTS_DSIG_SEND_YIELD)
ERTS_BIF_YIELD_RETURN(BIF_P, am_true);
BIF_RET(am_true);
default:
- ASSERT(! "Invalid dsig prepare result");
- BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR);
+ ERTS_ASSERT(! "Invalid dsig prepare result");
}
}
else if (is_internal_pid(BIF_ARG_2)) {
diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab
index f7b4451890..66469a011d 100644
--- a/erts/emulator/beam/bif.tab
+++ b/erts/emulator/beam/bif.tab
@@ -691,3 +691,11 @@ bif erts_internal:maps_to_list/2
#
bif erlang:iolist_to_iovec/1
+
+#
+# New in 21.0
+#
+
+bif erlang:new_connection_id/1
+bif erlang:abort_connection_id/2
+
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index bc168fc58d..0fa399cf53 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -103,8 +103,6 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
-#define PASS_THROUGH 'p' /* This code should go */
-
int erts_is_alive; /* System must be blocked on change */
int erts_dist_buf_busy_limit;
@@ -684,31 +682,11 @@ size_obuf(ErtsDistOutputBuf *obuf)
return bin->orig_size;
}
-static void clear_dist_entry(DistEntry *dep)
+static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep)
{
- Sint obufsize = 0;
- ErtsAtomCache *cache;
- ErtsProcList *suspendees;
ErtsDistOutputBuf *obuf;
- erts_de_rwlock(dep);
- erts_atomic_set_nob(&dep->input_handler,
- (erts_aint_t) NIL);
- cache = dep->cache;
- dep->cache = NULL;
-
-#ifdef DEBUG
- erts_de_links_lock(dep);
- ASSERT(!dep->nlinks);
- ASSERT(!dep->node_links);
- ASSERT(!dep->monitors);
- erts_de_links_unlock(dep);
-#endif
-
- erts_mtx_lock(&dep->qlock);
-
- erts_atomic64_set_nob(&dep->in, 0);
- erts_atomic64_set_nob(&dep->out, 0);
+ ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));
if (!dep->out_queue.last)
obuf = dep->finalized_out_queue.first;
@@ -728,17 +706,12 @@ static void clear_dist_entry(DistEntry *dep)
dep->tmp_out_queue.last = NULL;
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
- dep->status = 0;
- suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
-
- erts_mtx_unlock(&dep->qlock);
- erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
- dep->send = NULL;
- erts_de_rwunlock(dep);
-
- erts_resume_processes(suspendees);
+ return obuf;
+}
- delete_cache(cache);
+static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf)
+{
+ Sint obufsize = 0;
while (obuf) {
ErtsDistOutputBuf *fobuf;
@@ -750,13 +723,54 @@ static void clear_dist_entry(DistEntry *dep)
if (obufsize) {
erts_mtx_lock(&dep->qlock);
- ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize);
+ ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize);
erts_atomic_add_nob(&dep->qsize,
(erts_aint_t) -obufsize);
erts_mtx_unlock(&dep->qlock);
}
}
+static void clear_dist_entry(DistEntry *dep)
+{
+ ErtsAtomCache *cache;
+ ErtsProcList *suspendees;
+ ErtsDistOutputBuf *obuf;
+
+ erts_de_rwlock(dep);
+ erts_atomic_set_nob(&dep->input_handler,
+ (erts_aint_t) NIL);
+ cache = dep->cache;
+ dep->cache = NULL;
+
+#ifdef DEBUG
+ erts_de_links_lock(dep);
+ ASSERT(!dep->nlinks);
+ ASSERT(!dep->node_links);
+ ASSERT(!dep->monitors);
+ erts_de_links_unlock(dep);
+#endif
+
+ erts_mtx_lock(&dep->qlock);
+
+ erts_atomic64_set_nob(&dep->in, 0);
+ erts_atomic64_set_nob(&dep->out, 0);
+
+ obuf = clear_de_out_queues(dep);
+ dep->status = 0;
+ suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
+
+ erts_mtx_unlock(&dep->qlock);
+ erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
+ dep->send = NULL;
+ erts_de_rwunlock(dep);
+
+ erts_resume_processes(suspendees);
+
+ delete_cache(cache);
+
+ free_de_out_queues(dep, obuf);
+}
+
int erts_dsend_context_dtor(Binary* ctx_bin)
{
ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin);
@@ -772,8 +786,8 @@ int erts_dsend_context_dtor(Binary* ctx_bin)
if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) {
free_dist_obuf(ctx->dss.obuf);
}
- if (ctx->dep_to_deref)
- erts_deref_dist_entry(ctx->dep_to_deref);
+ if (ctx->deref_dep)
+ erts_deref_dist_entry(ctx->dep);
return 1;
}
@@ -1324,7 +1338,7 @@ int erts_net_message(Port *prt,
/* This is tricky (we MUST force a distributed send) */
ErtsDSigData dsd;
int code;
- code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0);
+ code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0);
if (code == ERTS_DSIG_PREP_CONNECTED) {
code = erts_dsig_send_exit(&dsd, to, from, am_noproc);
ASSERT(code == ERTS_DSIG_SEND_OK);
@@ -1416,7 +1430,7 @@ int erts_net_message(Port *prt,
if (!rp) {
ErtsDSigData dsd;
int code;
- code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0);
+ code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0);
if (code == ERTS_DSIG_PREP_CONNECTED) {
code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref,
am_noproc);
@@ -1883,7 +1897,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
}
else {
ctx->acmp = NULL;
- ctx->pass_through_size = 1;
+ ctx->pass_through_size = 3; /* SVERK rename */
}
#ifdef ERTS_DIST_MSG_DBG
@@ -1961,9 +1975,9 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
ctx->obuf->next = NULL;
erts_de_rlock(dep);
cid = dep->cid;
- if (cid != dsdp->cid
- || dep->connection_id != dsdp->connection_id
- || dep->status & ERTS_DE_SFLG_EXITING) {
+ if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED))
+ || dep->status & ERTS_DE_SFLG_EXITING
+ || dep->connection_id != dsdp->connection_id) {
/* Not the same connection as when we started; drop message... */
erts_de_runlock(dep);
free_dist_obuf(ctx->obuf);
@@ -2037,8 +2051,13 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
}
erts_mtx_unlock(&dep->qlock);
- if (is_internal_port(dep->cid))
- erts_schedule_dist_command(NULL, dep);
+ if (!(dep->status & ERTS_DE_SFLG_PENDING)) {
+ if (is_internal_port(dep->cid))
+ erts_schedule_dist_command(NULL, dep);
+ }
+ else {
+ notify_proc = NIL;
+ }
erts_de_runlock(dep);
if (is_internal_pid(notify_proc))
notify_dist_data(ctx->c_p, notify_proc);
@@ -2306,9 +2325,6 @@ erts_dist_command(Port *prt, int reds_limit)
ob->extp = erts_encode_ext_dist_header_finalize(ob->extp,
dep->cache,
flags);
- if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
- *--ob->extp = PASS_THROUGH; /* Old node; 'pass through'
- needed */
ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp);
reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
preempt = reds > reds_limit;
@@ -2346,21 +2362,18 @@ erts_dist_command(Port *prt, int reds_limit)
int de_busy;
int preempt = 0;
while (oq.first && !preempt) {
- ErtsDistOutputBuf *fob;
- Uint size;
- oq.first->extp
- = erts_encode_ext_dist_header_finalize(oq.first->extp,
- dep->cache,
- flags);
- reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
- if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
- *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through'
- needed */
- ASSERT(&oq.first->data[0] <= oq.first->extp
- && oq.first->extp < oq.first->ext_endp);
- size = (*send)(prt, oq.first);
+ ErtsDistOutputBuf *fob;
+ Uint size;
+ oq.first->extp
+ = erts_encode_ext_dist_header_finalize(oq.first->extp,
+ dep->cache,
+ flags);
+ reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
+ ASSERT(&oq.first->data[0] <= oq.first->extp
+ && oq.first->extp < oq.first->ext_endp);
+ size = (*send)(prt, oq.first);
erts_atomic64_inc_nob(&dep->out);
- esdp->io.out += (Uint64) size;
+ esdp->io.out += (Uint64) size;
#ifdef ERTS_RAW_DIST_MSG_DBG
erts_fprintf(stderr, ">> ");
bw(oq.first->extp, size);
@@ -2482,11 +2495,13 @@ erts_dist_command(Port *prt, int reds_limit)
foq.first = NULL;
foq.last = NULL;
+/* SVERK Hmmm....
#ifdef DEBUG
erts_mtx_lock(&dep->qlock);
ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize);
erts_mtx_unlock(&dep->qlock);
#endif
+*/
}
else {
if (oq.first) {
@@ -3218,6 +3233,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
ErtsProcLocks proc_unlock = 0;
Process *proc;
Port *pp = NULL;
+ Eterm notify_proc;
+ erts_aint32_t qflgs;
/*
* Check and pick out arguments
@@ -3245,15 +3262,14 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
if (!is_atom(ic) || !is_atom(oc))
goto badarg;
- /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */
- if (!(DFLAG_EXTENDED_REFERENCES & flags)) {
+ if (~flags & DFLAG_DIST_MANDATORY) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
erts_dsprintf(dsbufp, "%T", BIF_P->common.id);
if (BIF_P->common.u.alive.reg)
erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name);
erts_dsprintf(dsbufp,
" attempted to enable connection to node %T "
- "which is not able to handle extended references.\n",
+ "which does not support all mandatory capabilities.\n",
BIF_ARG_1);
erts_send_error_to_logger(BIF_P->group_leader, dsbufp);
goto badarg;
@@ -3376,7 +3392,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
dep->creation = 0;
#ifdef DEBUG
- ASSERT(erts_atomic_read_nob(&dep->qsize) == 0);
+ ASSERT(erts_atomic_read_nob(&dep->qsize) == 0
+ || (dep->status & ERTS_DE_SFLG_PENDING));
#endif
if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
@@ -3384,7 +3401,26 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);
+ notify_proc = NIL;
+ if (erts_atomic_read_nob(&dep->qsize)) {
+ if (is_internal_port(dep->cid)) {
+ erts_schedule_dist_command(NULL, dep);
+ }
+ else {
+ qflgs = erts_atomic32_read_nob(&dep->qflgs);
+ if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
+ qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
+ ~ERTS_DE_QFLG_REQ_INFO);
+ if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
+ notify_proc = dep->cid;
+ ASSERT(is_internal_pid(notify_proc));
+ }
+ }
+ }
+ }
erts_de_rwunlock(dep);
+ if (is_internal_pid(notify_proc))
+ notify_dist_data(BIF_P, notify_proc);
ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep));
@@ -3426,6 +3462,112 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
goto done;
}
+BIF_RETTYPE new_connection_id_1(BIF_ALIST_1)
+{
+ DistEntry* dep;
+ Uint32 conn_id;
+
+ if (is_not_atom(BIF_ARG_1)) {
+ BIF_ERROR(BIF_P, BADARG);
+ }
+ dep = erts_find_or_insert_dist_entry(BIF_ARG_1);
+ ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */
+
+ erts_de_rwlock(dep);
+
+ if (ERTS_DE_IS_CONNECTED(dep) || dep->status & ERTS_DE_SFLG_PENDING)
+ conn_id = dep->connection_id;
+ else if (dep->status == 0) {
+ dep->status = ERTS_DE_SFLG_PENDING;
+ dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION;
+ dep->connection_id++;
+ dep->connection_id &= ERTS_DIST_CON_ID_MASK;
+ conn_id = dep->connection_id;
+ }
+ else {
+ ASSERT(!"SVERK: What to do?");
+ conn_id = dep->connection_id;
+ }
+ erts_de_rwunlock(dep);
+ BIF_RET(make_small(conn_id));
+}
+
+BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2)
+{
+ DistEntry* dep;
+
+ if (is_not_atom(BIF_ARG_1) || is_not_small(BIF_ARG_2)) {
+ BIF_ERROR(BIF_P, BADARG);
+ }
+ dep = erts_find_dist_entry(BIF_ARG_1);
+ ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */
+
+ if (!dep) {
+ BIF_RET(am_false);
+ }
+
+ erts_de_rwlock(dep);
+
+ if (dep->status == ERTS_DE_SFLG_PENDING
+ && dep->connection_id == unsigned_val(BIF_ARG_2)) {
+
+ NetExitsContext nec = {dep};
+ ErtsLink *nlinks;
+ ErtsLink *node_links;
+ ErtsMonitor *monitors;
+ ErtsAtomCache *cache;
+ ErtsDistOutputBuf *obuf;
+ ErtsProcList *resume_procs;
+ Sint reds = 0;
+
+ ASSERT(is_nil(dep->cid));
+
+ erts_de_links_lock(dep);
+ monitors = dep->monitors;
+ nlinks = dep->nlinks;
+ node_links = dep->node_links;
+ dep->monitors = NULL;
+ dep->nlinks = NULL;
+ dep->node_links = NULL;
+ erts_de_links_unlock(dep);
+
+ cache = dep->cache;
+ dep->cache = NULL;
+ dep->status = 0;
+ dep->flags = 0;
+ erts_mtx_lock(&dep->qlock);
+ obuf = dep->out_queue.first;
+ dep->out_queue.first = NULL;
+ dep->out_queue.last = NULL;
+ ASSERT(!dep->tmp_out_queue.first);
+ ASSERT(!dep->finalized_out_queue.first);
+ resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
+ erts_mtx_unlock(&dep->qlock);
+ erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
+ dep->send = NULL;
+ erts_de_rwunlock(dep);
+
+ erts_sweep_monitors(monitors, &doit_monitor_net_exits, &nec);
+ erts_sweep_links(nlinks, &doit_link_net_exits, &nec);
+ erts_sweep_links(node_links, &doit_node_link_net_exits, &nec);
+
+ if (resume_procs) {
+ int resumed = erts_resume_processes(resume_procs);
+ reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
+ }
+
+ delete_cache(cache);
+
+ free_de_out_queues(dep, obuf);
+
+ BIF_RET2(am_true, reds);
+ }
+
+ erts_de_rwunlock(dep);
+
+ BIF_RET(am_false);
+}
+
/**********************************************************************/
/* dist_exit(Local, Term, Remote) -> Bool */
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index d4765c50b8..26432b21e9 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -45,6 +45,13 @@
#define DFLAG_MAP_TAG 0x20000
#define DFLAG_BIG_CREATION 0x40000
#define DFLAG_SEND_SENDER 0x80000
+#define DFLAG_PENDING_CONNECTION 0x100000
+
+/* Mandatory flags for distribution (sync with dist_util.erl) */
+#define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \
+ | DFLAG_EXTENDED_PIDS_PORTS \
+ | DFLAG_UTF8_ATOMS \
+ | DFLAG_NEW_FUN_TAGS)
/* All flags that should be enabled when term_to_binary/1 is used. */
#define TERM_TO_BINARY_DFLAGS (DFLAG_EXTENDED_REFERENCES \
@@ -98,6 +105,7 @@ typedef enum {
typedef struct {
Process *proc;
DistEntry *dep;
+ Eterm node; /* used if dep == NULL */
Eterm cid;
Eterm connection_id;
int no_suspend;
@@ -117,14 +125,10 @@ extern int erts_is_alive;
/*
* erts_dsig_prepare() prepares a send of a distributed signal.
- * One of the values defined below are returned. If the returned
- * value is another than ERTS_DSIG_PREP_CONNECTED, the
- * distributed signal cannot be sent before appropriate actions
- * have been taken. Appropriate actions would typically be setting
- * up the connection.
+ * One of the values defined below are returned.
*/
-/* Connected; signal can be sent. */
+/* Connected; signals can be enqueued and sent. */
#define ERTS_DSIG_PREP_CONNECTED 0
/* Not connected; connection needs to be set up. */
#define ERTS_DSIG_PREP_NOT_CONNECTED 1
@@ -132,11 +136,15 @@ extern int erts_is_alive;
#define ERTS_DSIG_PREP_WOULD_SUSPEND 2
/* System not alive (distributed) */
#define ERTS_DSIG_PREP_NOT_ALIVE 3
+/* Pending connection; signals can be enqueued */
+#define ERTS_DSIG_PREP_PENDING 4
ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *,
- DistEntry *,
+ DistEntry **,
Process *,
+ ErtsProcLocks,
ErtsDSigPrepLock,
+ int,
int);
ERTS_GLB_INLINE
@@ -146,29 +154,100 @@ void erts_schedule_dist_command(Port *, DistEntry *);
ERTS_GLB_INLINE int
erts_dsig_prepare(ErtsDSigData *dsdp,
- DistEntry *dep,
+ DistEntry **depp,
Process *proc,
+ ErtsProcLocks proc_locks,
ErtsDSigPrepLock dspl,
- int no_suspend)
+ int no_suspend,
+ int connect)
{
- int failure;
+ DistEntry* dep = *depp;
+ int res;
+
if (!erts_is_alive)
return ERTS_DSIG_PREP_NOT_ALIVE;
- if (!dep)
- return ERTS_DSIG_PREP_NOT_CONNECTED;
+ if (!dep) {
+ if (!connect)
+ return ERTS_DSIG_PREP_NOT_CONNECTED;
+
+ dep = erts_find_or_insert_dist_entry(dsdp->node);
+ ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */
+ }
+
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ if (connect) {
+ erts_proc_lc_might_unlock(proc, proc_locks);
+ }
+#endif
+
+retry:
if (dspl == ERTS_DSP_RWLOCK)
erts_de_rwlock(dep);
else
erts_de_rlock(dep);
- if (ERTS_DE_IS_NOT_CONNECTED(dep)) {
- failure = ERTS_DSIG_PREP_NOT_CONNECTED;
+
+ if (ERTS_DE_IS_CONNECTED(dep)) {
+ res = ERTS_DSIG_PREP_CONNECTED;
+ }
+ else if (dep->status & ERTS_DE_SFLG_PENDING) {
+ res = ERTS_DSIG_PREP_PENDING;
+ }
+ else if (dep->status & ERTS_DE_SFLG_EXITING) {
+ /* SVERK is this ok, or should we trigger another connection setup */
+ res = ERTS_DSIG_PREP_NOT_CONNECTED;
+ goto fail;
+ }
+ else if (connect) {
+ ASSERT(dep->status == 0);
+ if (dspl != ERTS_DSP_RWLOCK) {
+ erts_de_runlock(dep);
+ erts_de_rwlock(dep);
+ }
+ if (dep->status == 0) {
+ Process* net_kernel;
+ ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ;
+ Eterm *hp;
+ ErlOffHeap *ohp;
+ ErtsMessage *mp;
+ Eterm msg, conn_id;
+
+ dep->status = ERTS_DE_SFLG_PENDING;
+ dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION;
+ dep->connection_id++;
+ dep->connection_id &= ERTS_DIST_CON_ID_MASK;
+ conn_id = make_small(dep->connection_id);
+ erts_de_rwunlock(dep);
+
+ net_kernel = erts_whereis_process(proc, proc_locks,
+ am_net_kernel, nk_locks, 0);
+ if (!net_kernel) {
+ if (!*depp) {
+ erts_deref_dist_entry(dep);
+ }
+ return ERTS_DSIG_PREP_NOT_ALIVE;
+ }
+
+ /* Send {auto_connect, Node, ConnId} to net_kernel */
+ mp = erts_alloc_message_heap(net_kernel, &nk_locks, 4, &hp, &ohp);
+ msg = TUPLE3(hp, am_auto_connect, dep->sysname, conn_id);
+ erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id);
+ erts_proc_unlock(net_kernel, nk_locks);
+ }
+ else
+ erts_de_rwunlock(dep);
+ goto retry;
+ }
+ else {
+ ASSERT(dep->status == 0);
+ res = ERTS_DSIG_PREP_NOT_CONNECTED;
goto fail;
}
+
if (no_suspend) {
- if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) {
- failure = ERTS_DSIG_PREP_WOULD_SUSPEND;
+ if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) {
+ res = ERTS_DSIG_PREP_WOULD_SUSPEND;
goto fail;
- }
+ }
}
dsdp->proc = proc;
dsdp->dep = dep;
@@ -177,15 +256,15 @@ erts_dsig_prepare(ErtsDSigData *dsdp,
dsdp->no_suspend = no_suspend;
if (dspl == ERTS_DSP_NO_LOCK)
erts_de_runlock(dep);
- return ERTS_DSIG_PREP_CONNECTED;
+ *depp = dep;
+ return res;
fail:
if (dspl == ERTS_DSP_RWLOCK)
erts_de_rwunlock(dep);
else
erts_de_runlock(dep);
- return failure;
-
+ return res;
}
ERTS_GLB_INLINE
@@ -346,11 +425,12 @@ struct erts_dsig_send_context {
typedef struct {
int suspend;
+ int connect;
Eterm ctl_heap[6];
ErtsDSigData dsd;
- DistEntry* dep_to_deref;
DistEntry *dep;
+ int deref_dep;
struct erts_dsig_send_context dss;
Eterm return_term;
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index 9931686cbe..dd607f438e 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -597,6 +597,8 @@ erts_set_dist_entry_not_connected(DistEntry *dep)
void
erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags)
{
+ erts_aint32_t set_qflgs;
+
ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep));
erts_rwmtx_rwlock(&erts_dist_table_rwmtx);
@@ -619,22 +621,26 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags)
ASSERT(erts_no_of_not_connected_dist_entries > 0);
erts_no_of_not_connected_dist_entries--;
+ if (dep->status & ERTS_DE_SFLG_PENDING) {
+ dep->status &= ~ERTS_DE_SFLG_PENDING;
+ } else {
+ dep->connection_id++;
+ dep->connection_id &= ERTS_DIST_CON_ID_MASK;
+ }
dep->status |= ERTS_DE_SFLG_CONNECTED;
- dep->flags = flags;
+ dep->flags = flags & ~DFLAG_PENDING_CONNECTION;
dep->cid = cid;
erts_atomic_set_nob(&dep->input_handler,
(erts_aint_t) cid);
- dep->connection_id++;
- dep->connection_id &= ERTS_DIST_EXT_CON_ID_MASK;
dep->prev = NULL;
erts_atomic64_set_nob(&dep->in, 0);
erts_atomic64_set_nob(&dep->out, 0);
- erts_atomic32_set_nob(&dep->qflgs,
- (is_internal_port(cid)
- ? ERTS_DE_QFLG_PORT_CTRL
- : ERTS_DE_QFLG_PROC_CTRL));
+ set_qflgs = (is_internal_port(cid) ?
+ ERTS_DE_QFLG_PORT_CTRL : ERTS_DE_QFLG_PROC_CTRL);
+ erts_atomic32_read_bor_nob(&dep->qflgs, set_qflgs);
+
if(flags & DFLAG_PUBLISHED) {
dep->next = erts_visible_dist_entries;
if(erts_visible_dist_entries) {
diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h
index 2cfd18f22f..d012f4b2cf 100644
--- a/erts/emulator/beam/erl_node_tables.h
+++ b/erts/emulator/beam/erl_node_tables.h
@@ -57,8 +57,9 @@
#define ERST_INTERNAL_CHANNEL_NO 0
-#define ERTS_DE_SFLG_CONNECTED (((Uint32) 1) << 0)
-#define ERTS_DE_SFLG_EXITING (((Uint32) 1) << 1)
+#define ERTS_DE_SFLG_PENDING (((Uint32) 1) << 0)
+#define ERTS_DE_SFLG_CONNECTED (((Uint32) 1) << 1)
+#define ERTS_DE_SFLG_EXITING (((Uint32) 1) << 2)
#define ERTS_DE_QFLG_BUSY (((erts_aint32_t) 1) << 0)
#define ERTS_DE_QFLG_EXIT (((erts_aint32_t) 1) << 1)
@@ -106,8 +107,6 @@ struct ErtsProcList_;
* unlock mutexes with higher numbers before mutexes with higher numbers.
*/
-struct erl_link;
-
typedef struct dist_entry_ {
HashBucket hash_bucket; /* Hash bucket */
struct dist_entry_ *next; /* Next entry in dist_table (not sorted) */
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 3c0a126fe2..2d13cd92b2 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -12659,9 +12659,11 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext)
erts_de_links_unlock(dep);
if (rmon) {
ErtsDSigData dsd;
- int code = erts_dsig_prepare(&dsd, dep, NULL,
- ERTS_DSP_NO_LOCK, 0);
- if (code == ERTS_DSIG_PREP_CONNECTED) {
+ int code = erts_dsig_prepare(&dsd, &dep, NULL, 0,
+ ERTS_DSP_NO_LOCK, 0, 0);
+ if (code == ERTS_DSIG_PREP_CONNECTED ||
+ code == ERTS_DSIG_PREP_PENDING) {
+
code = erts_dsig_send_demonitor(&dsd,
rmon->u.pid,
mon->name,
@@ -12705,9 +12707,11 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext)
erts_de_links_unlock(dep);
if (rmon) {
ErtsDSigData dsd;
- int code = erts_dsig_prepare(&dsd, dep, NULL,
- ERTS_DSP_NO_LOCK, 0);
- if (code == ERTS_DSIG_PREP_CONNECTED) {
+ int code = erts_dsig_prepare(&dsd, &dep, NULL, 0,
+ ERTS_DSP_NO_LOCK, 0, 0);
+ if (code == ERTS_DSIG_PREP_CONNECTED ||
+ code == ERTS_DSIG_PREP_PENDING) {
+
code = erts_dsig_send_demonitor(&dsd,
rmon->u.pid,
mon->u.pid,
@@ -12764,8 +12768,8 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext)
erts_de_links_unlock(dep);
if (rmon) {
ErtsDSigData dsd;
- int code = erts_dsig_prepare(&dsd, dep, NULL,
- ERTS_DSP_NO_LOCK, 0);
+ int code = erts_dsig_prepare(&dsd, &dep, NULL, 0,
+ ERTS_DSP_NO_LOCK, 0, 0);
if (code == ERTS_DSIG_PREP_CONNECTED) {
code = erts_dsig_send_m_exit(&dsd,
mon->u.pid,
@@ -12887,14 +12891,18 @@ static void doit_exit_link(ErtsLink *lnk, void *vpcontext)
int code;
ErtsDistLinkData dld;
erts_remove_dist_link(&dld, p->common.id, item, dep);
- erts_proc_lock(p, ERTS_PROC_LOCK_MAIN);
- code = erts_dsig_prepare(&dsd, dep, p, ERTS_DSP_NO_LOCK, 0);
- if (code == ERTS_DSIG_PREP_CONNECTED) {
- code = erts_dsig_send_exit_tt(&dsd, p->common.id, item,
- reason, SEQ_TRACE_TOKEN(p));
- ASSERT(code == ERTS_DSIG_SEND_OK);
- }
- erts_proc_unlock(p, ERTS_PROC_LOCK_MAIN);
+ if (dld.d_lnk) {
+ erts_proc_lock(p, ERTS_PROC_LOCK_MAIN);
+ code = erts_dsig_prepare(&dsd, &dep, p, 0, ERTS_DSP_NO_LOCK, 0, 0);
+ if (code == ERTS_DSIG_PREP_CONNECTED ||
+ code == ERTS_DSIG_PREP_PENDING) {
+
+ code = erts_dsig_send_exit_tt(&dsd, p->common.id, item,
+ reason, SEQ_TRACE_TOKEN(p));
+ ASSERT(code == ERTS_DSIG_SEND_OK);
+ }
+ erts_proc_unlock(p, ERTS_PROC_LOCK_MAIN);
+ }
erts_destroy_dist_link(&dld);
}
}
diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c
index 3b851087d1..c49e2f617a 100644
--- a/erts/emulator/beam/external.c
+++ b/erts/emulator/beam/external.c
@@ -349,6 +349,8 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp)
}
}
+#define PASS_THROUGH 'p' /* This code should go */
+
byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint32 dflags)
{
byte *ip;
@@ -358,9 +360,33 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint
int long_atoms;
register byte *ep = ext;
ASSERT(dflags & DFLAG_UTF8_ATOMS);
- ASSERT(ep[0] == VERSION_MAGIC);
- if (ep[1] != DIST_HEADER)
- return ext;
+
+ if (ep[0] != VERSION_MAGIC) {
+ ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT);
+ if (dflags & DFLAG_DIST_HDR_ATOM_CACHE) {
+ /*
+ * Encoded without atom cache (toward pending connection)
+ * but receiver wants dist header. Let's prepend an empty one.
+ */
+ *--ep = 0; /* NumberOfAtomCacheRefs */
+ *--ep = DIST_HEADER;
+ *--ep = VERSION_MAGIC;
+ }
+ else {
+ /* Node without atom cache, 'pass through' needed */
+
+ ASSERT(!"SVERK: Must insert VERSION_MAGIC's");
+ *--ep = PASS_THROUGH;
+ }
+ return ep;
+ }
+ else if (ep[1] != DIST_HEADER) {
+ ASSERT(ep[1] == SMALL_TUPLE_EXT || ep[1] == LARGE_TUPLE_EXT);
+ ASSERT(!(dflags & DFLAG_DIST_HDR_ATOM_CACHE));
+ /* Node without atom cache, 'pass through' needed */
+ *--ep = PASS_THROUGH;
+ return ep;
+ }
dist_hdr_flags = ep[2];
long_atoms = ERTS_DIST_HDR_LONG_ATOMS_FLG & ((int) dist_hdr_flags);
@@ -511,7 +537,7 @@ int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx
return -1;
} else {
#ifndef ERTS_DEBUG_USE_DIST_SEP
- if (!(ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE))
+ if (!(ctx->flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_PENDING_CONNECTION)))
#endif
sz++ /* VERSION_MAGIC */;
@@ -543,7 +569,7 @@ int erts_encode_dist_ext(Eterm term, byte **ext, Uint32 flags, ErtsAtomCacheMap
{
if (!ctx || !ctx->wstack.wstart) {
#ifndef ERTS_DEBUG_USE_DIST_SEP
- if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
+ if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_PENDING_CONNECTION)))
#endif
*(*ext)++ = VERSION_MAGIC;
}
@@ -644,12 +670,11 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
erts_de_rlock(dep);
- if ((dep->status & (ERTS_DE_SFLG_EXITING|ERTS_DE_SFLG_CONNECTED))
- != ERTS_DE_SFLG_CONNECTED) {
+ if (dep->status != ERTS_DE_SFLG_CONNECTED &&
+ dep->status != ERTS_DE_SFLG_PENDING) {
erts_de_runlock(dep);
return ERTS_PREP_DIST_EXT_CLOSED;
}
-
if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)
edep->flags |= ERTS_DIST_EXT_DFLAG_HDR;
diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h
index d6416edbc3..49950c4aad 100644
--- a/erts/emulator/beam/external.h
+++ b/erts/emulator/beam/external.h
@@ -117,7 +117,7 @@ typedef struct {
#define ERTS_DIST_EXT_ATOM_TRANS_TAB ((Uint32) 0x2)
#define ERTS_DIST_EXT_BTT_SAFE ((Uint32) 0x4)
-#define ERTS_DIST_CON_ID_MASK ((Uint32) 0x00ffffff)
+#define ERTS_DIST_CON_ID_MASK ((Uint32) 0x00ffffff) /* also in net_kernel.erl */
typedef struct {
DistEntry *dep;
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 85013af3ad..6cd1aa5e79 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -3829,11 +3829,12 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc)
ErtsDistLinkData dld;
ErtsDSigData dsd;
int code;
- code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0);
+ code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0);
switch (code) {
case ERTS_DSIG_PREP_NOT_ALIVE:
case ERTS_DSIG_PREP_NOT_CONNECTED:
break;
+ case ERTS_DSIG_PREP_PENDING:
case ERTS_DSIG_PREP_CONNECTED:
erts_remove_dist_link(&dld, port_id, lnk->pid, dep);
erts_destroy_dist_link(&dld);
diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl
index 80bceae506..28625c400f 100644
--- a/erts/preloaded/src/erlang.erl
+++ b/erts/preloaded/src/erlang.erl
@@ -165,6 +165,8 @@
-export([registered/0, resume_process/1, round/1, self/0]).
-export([seq_trace/2, seq_trace_print/1, seq_trace_print/2, setnode/2]).
-export([setnode/3, size/1, spawn/3, spawn_link/3, split_binary/2]).
+-export([new_connection_id/1]).
+-export([abort_connection_id/2]).
-export([suspend_process/2, system_monitor/0]).
-export([system_monitor/1, system_monitor/2, system_profile/0]).
-export([system_profile/2, throw/1, time/0, trace/3, trace_delivered/1]).
@@ -1670,6 +1672,19 @@ setnode(_P1, _P2) ->
setnode(_P1, _P2, _P3) ->
erlang:nif_error(undefined).
+%% new_connection_id/1
+-spec erlang:new_connection_id(Node) -> integer() when
+ Node :: atom().
+new_connection_id(_Node) ->
+ erlang:nif_error(undefined).
+
+%% abort_connection_id/2
+-spec erlang:abort_connection_id(Node, ConnId) -> integer() when
+ Node :: atom(),
+ ConnId :: integer().
+abort_connection_id(_Node, _ConnId) ->
+ erlang:nif_error(undefined).
+
%% size/1
%% Shadowed by erl_bif_types: erlang:size/1
-spec size(Item) -> non_neg_integer() when
diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl
index c68036a291..f929e4bf11 100644
--- a/lib/kernel/src/net_kernel.erl
+++ b/lib/kernel/src/net_kernel.erl
@@ -122,6 +122,7 @@
-record(connection, {
node, %% remote node name
+ conn_id, %% Connection identity
state, %% pending | up | up_pending
owner, %% owner pid
pending_owner, %% possible new owner
@@ -362,54 +363,33 @@ init({Name, LongOrShortNames, TickT, CleanHalt}) ->
end.
-handle_connect([Conn], _, _, From, State) when Conn#connection.state =:= up ->
- async_reply({reply, true, State}, From);
-handle_connect([Conn], _, _, From, State) when Conn#connection.state =:= pending;
- Conn#connection.state =:= up_pending ->
- Waiting = Conn#connection.waiting,
- ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
- {noreply, State};
-handle_connect(_, Type, Node, From , State) ->
- case setup(Node,Type,From,State) of
- {ok, SetupPid} ->
- Owners = [{SetupPid, Node} | State#state.conn_owners],
- {noreply,State#state{conn_owners=Owners}};
- _Error ->
- ?connect_failure(Node, {setup_call, failed, _Error}),
- async_reply({reply, false, State}, From)
- end.
-
-%% ------------------------------------------------------------
-%% handle_call.
-%% ------------------------------------------------------------
-
-%%
-%% Auto-connect to Node.
-%% The response is delayed until the connection is up and
-%% running.
-%%
-handle_call({auto_connect, _, Node, _}, From, State) when Node =:= node() ->
- async_reply({reply, true, State}, From);
-handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) ->
- verbose({auto_connect, Type, Node, WaitForBarred}, 1, State),
-
+handle_auto_connect(Type, Node, ConnId, WaitForBarred, From, State) ->
ConnLookup = ets:lookup(sys_dist, Node),
case ConnLookup of
[#barred_connection{}] ->
case WaitForBarred of
false ->
- async_reply({reply, false, State}, From);
+ {reply, false, State};
true ->
spawn(?MODULE,passive_connect_monitor,[From,Node]),
{noreply, State}
end;
+ [#connection{conn_id=ConnId, state = up}] ->
+ {reply, true, State};
+ [#connection{conn_id=ConnId, waiting=Waiting}=Conn] ->
+ case From of
+ noreply -> ok;
+ _ -> ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]})
+ end,
+ {noreply, State};
+
_ ->
case application:get_env(kernel, dist_auto_connect) of
{ok, never} ->
?connect_failure(Node,{dist_auto_connect,never}),
- async_reply({reply, false, State}, From);
+ {reply, false, State};
%% This might happen due to connection close
%% not beeing propagated to user space yet.
@@ -418,11 +398,78 @@ handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) ->
(hd(ConnLookup))#connection.state =:= up ->
?connect_failure(Node,{barred_connection,
ets:lookup(sys_dist, Node)}),
- async_reply({reply, false, State}, From);
+ {reply, false, State};
_ ->
- handle_connect(ConnLookup, Type, Node, From, State)
+ case setup(ConnLookup, Node,ConnId,Type,From,State) of
+ {ok, SetupPid} ->
+ Owners = [{SetupPid, Node} | State#state.conn_owners],
+ {noreply,State#state{conn_owners=Owners}};
+ _Error ->
+ ?connect_failure(Node, {setup_call, failed, _Error}),
+ {reply, false, State}
+ end
end
- end;
+ end.
+
+
+handle_connect([#connection{conn_id = ConnId, state = up}], _, _, ConnId, _From, State) ->
+ {reply, true, State};
+handle_connect([#connection{conn_id = ConnId}=Conn], _, _, ConnId, From, State)
+ when Conn#connection.state =:= pending;
+ Conn#connection.state =:= up_pending ->
+ Waiting = Conn#connection.waiting,
+ ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}),
+ {noreply, State};
+handle_connect([#barred_connection{}], Type, Node, ConnId, From , State) ->
+ %% Barred connection only affects auto_connect, ignore it.
+ handle_connect([], Type, Node, ConnId, From , State);
+handle_connect(ConnLookup, Type, Node, ConnId, From , State) ->
+ case setup(ConnLookup, Node,ConnId,Type,From,State) of
+ {ok, SetupPid} ->
+ Owners = [{SetupPid, Node} | State#state.conn_owners],
+ {noreply,State#state{conn_owners=Owners}};
+ _Error ->
+ ?connect_failure(Node, {setup_call, failed, _Error}),
+ {reply, false, State}
+ end.
+
+-define(ERTS_DIST_CON_ID_MASK, 16#ffffff). % also in external.h
+
+verify_new_conn_id([], ConnId)
+ when (ConnId band (bnot ?ERTS_DIST_CON_ID_MASK)) =:= 0 ->
+ true;
+verify_new_conn_id([#connection{conn_id = Old}], New)
+ when New =:= ((Old+1) band ?ERTS_DIST_CON_ID_MASK) ->
+ true;
+verify_new_conn_id(_, _) ->
+ false.
+
+
+
+%% ------------------------------------------------------------
+%% handle_call.
+%% ------------------------------------------------------------
+
+%%
+%% Auto-connect to Node.
+%% The response is delayed until the connection is up and running.
+%%
+handle_call({auto_connect, _, Node, _}, From, State) when Node =:= node() ->
+ async_reply({reply, true, State}, From);
+handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) ->
+ verbose({auto_connect, Type, Node, WaitForBarred}, 1, State),
+
+ R = case (catch erlang:new_connection_id(Node)) of
+ ConnId when is_integer(ConnId) ->
+ handle_auto_connect(Type, Node, ConnId, WaitForBarred, From, State);
+
+ _Error ->
+ error_logger:error_msg("~n** Cannot get connection id for node ~w~n",
+ [Node]),
+ {reply, false, State}
+ end,
+
+ return_call(R, From);
%%
%% Explicit connect
@@ -433,7 +480,17 @@ handle_call({connect, _, Node, _, _}, From, State) when Node =:= node() ->
handle_call({connect, Type, Node}, From, State) ->
verbose({connect, Type, Node}, 1, State),
ConnLookup = ets:lookup(sys_dist, Node),
- handle_connect(ConnLookup, Type, Node, From, State);
+ R = case (catch erlang:new_connection_id(Node)) of
+ ConnId when is_integer(ConnId) ->
+ handle_connect(ConnLookup, Type, Node, ConnId, From, State);
+
+ _Error ->
+ error_logger:error_msg("~n** Cannot get connection id for node ~w~n",
+ [Node]),
+ {reply, false, State}
+ end,
+ return_call(R, From);
+
%%
%% Close the connection to Node.
@@ -640,6 +697,25 @@ terminate(_Reason, State) ->
%% ------------------------------------------------------------
%%
+%% Asynchronous auto connect request
+%%
+handle_info({auto_connect,Node,ConnId}, State) ->
+ verbose({auto_connect, Node, ConnId}, 1, State),
+ NewState =
+ case handle_auto_connect(normal, Node, ConnId, false, noreply, State) of
+ {noreply, S} -> %% Pending connection
+ S;
+
+ {reply, true, S} -> %% Already connected
+ S;
+
+ {reply, false, S} -> %% Connection refused
+ erlang:abort_connection_id(Node, ConnId),
+ S
+ end,
+ {noreply, NewState};
+
+%%
%% accept a new connection.
%%
handle_info({accept,AcceptPid,Socket,Family,Proto}, State) ->
@@ -719,7 +795,12 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) ->
AcceptPid ! {self(), {accept_pending, already_pending}},
{noreply, State};
_ ->
+ ConnId = case (catch erlang:new_connection_id(Node)) of
+ CI when is_integer(CI) -> CI
+ %% SVERK What to do?
+ end,
ets:insert(sys_dist, #connection{node = Node,
+ conn_id = ConnId,
state = pending,
owner = AcceptPid,
address = Address,
@@ -912,6 +993,7 @@ pending_nodedown(Conn, Node, Type, State) ->
% Don't bar connections that have never been alive
%mark_sys_dist_nodedown(Node),
% - instead just delete the node:
+ erlang:abort_connection_id(Node, Conn#connection.conn_id),
ets:delete(sys_dist, Node),
reply_waiting(Node,Conn#connection.waiting, false),
case Type of
@@ -934,15 +1016,16 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) ->
State#state{conn_owners = [{AcceptPid,Node}|Owners], pend_owners = Pend}.
-up_nodedown(_Conn, Node, _Reason, Type, State) ->
- mark_sys_dist_nodedown(Node),
+up_nodedown(Conn, Node, _Reason, Type, State) ->
+ mark_sys_dist_nodedown(Conn, Node),
case Type of
normal -> ?nodedown(Node, State);
_ -> ok
end,
State.
-mark_sys_dist_nodedown(Node) ->
+mark_sys_dist_nodedown(Conn, Node) ->
+ erlang:abort_connection_id(Node, Conn#connection.conn_id),
case application:get_env(kernel, dist_auto_connect) of
{ok, once} ->
ets:insert(sys_dist, #barred_connection{node = Node});
@@ -1185,15 +1268,8 @@ spawn_func(_,{From,Tag},M,F,A,Gleader) ->
%% Set up connection to a new node.
%% -----------------------------------------------------------
-setup(Node,Type,From,State) ->
- Allowed = State#state.allowed,
- case lists:member(Node, Allowed) of
- false when Allowed =/= [] ->
- error_msg("** Connection attempt with "
- "disallowed node ~w ** ~n", [Node]),
- {error, bad_node};
- _ ->
- case select_mod(Node, State#state.listen) of
+setup(ConnLookup, Node,ConnId,Type,From,State) ->
+ case setup_check(ConnLookup, Node, ConnId, State) of
{ok, L} ->
Mod = L#listen.module,
LAddr = L#listen.address,
@@ -1206,18 +1282,45 @@ setup(Node,Type,From,State) ->
Addr = LAddr#net_address {
address = undefined,
host = undefined },
+ Waiting = case From of
+ noreply -> [];
+ _ -> [From]
+ end,
ets:insert(sys_dist, #connection{node = Node,
+ conn_id = ConnId,
state = pending,
owner = Pid,
- waiting = [From],
+ waiting = Waiting,
address = Addr,
type = normal}),
{ok, Pid};
Error ->
Error
- end
end.
+setup_check(ConnLookup, Node, ConnId, State) ->
+ Allowed = State#state.allowed,
+ case lists:member(Node, Allowed) of
+ false when Allowed =/= [] ->
+ error_msg("** Connection attempt with "
+ "disallowed node ~w ** ~n", [Node]),
+ {error, bad_node};
+ _ ->
+ case verify_new_conn_id(ConnLookup, ConnId) of
+ false ->
+ error_msg("** Connection attempt to ~w with "
+ "bad connection id ~w ** ~n", [Node, ConnId]),
+ {error, bad_conn_id};
+ true ->
+ case select_mod(Node, State#state.listen) of
+ {ok, _L}=OK -> OK;
+ Error -> Error
+ end
+ end
+ end.
+
+
+
%%
%% Find a module that is willing to handle connection setup to Node
%%
@@ -1658,6 +1761,11 @@ verbose(_, _, _) ->
getnode(P) when is_pid(P) -> node(P);
getnode(P) -> P.
+return_call({noreply, _State}=R, _From) ->
+ R;
+return_call(R, From) ->
+ async_reply(R, From).
+
async_reply({reply, Msg, State}, From) ->
async_gen_server_reply(From, Msg),
{noreply, State}.