aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c368
1 files changed, 154 insertions, 214 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 70474898b2..bbe2fc31f3 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -116,11 +116,12 @@ static Export *dist_ctrl_put_data_trap;
/* forward declarations */
-static void clear_dist_entry(DistEntry*);
static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy);
static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm);
static void init_nodes_monitors(void);
static Sint abort_connection(DistEntry* dep, Uint32 conn_id);
+static ErtsDistOutputBuf* clear_de_out_queues(DistEntry*);
+static void free_de_out_queues(DistEntry*, ErtsDistOutputBuf*);
static erts_atomic_t no_caches;
static erts_atomic_t no_nodes;
@@ -556,7 +557,10 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
}
}
else { /* Call from distribution controller (port/process) */
- ErtsMonLnkDist *mld;
+ ErtsMonLnkDist *mld;
+ ErtsAtomCache *cache;
+ ErtsProcList *suspendees;
+ ErtsDistOutputBuf *obuf;
Uint32 flags;
erts_atomic_set_mb(&dep->dist_cmd_scheduled, 1);
@@ -588,6 +592,22 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
nodename = dep->sysname;
flags = dep->flags;
+ erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) NIL);
+ cache = dep->cache;
+ dep->cache = NULL;
+
+ erts_mtx_lock(&dep->qlock);
+
+ erts_atomic64_set_nob(&dep->in, 0);
+ erts_atomic64_set_nob(&dep->out, 0);
+
+ obuf = clear_de_out_queues(dep);
+ suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
+
+ erts_mtx_unlock(&dep->qlock);
+ erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
+ dep->send = NULL;
+
erts_set_dist_entry_not_connected(dep);
erts_de_rwunlock(dep);
@@ -601,7 +621,13 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
? am_connection_closed
: reason));
- clear_dist_entry(dep);
+ erts_resume_processes(suspendees);
+
+ delete_cache(cache);
+
+ free_de_out_queues(dep, obuf);
+ if (dep->transcode_ctx)
+ transcode_free_ctx(dep);
}
dec_no_nodes();
@@ -732,41 +758,6 @@ static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf)
}
}
-static void clear_dist_entry(DistEntry *dep)
-{
- ErtsAtomCache *cache;
- ErtsProcList *suspendees;
- ErtsDistOutputBuf *obuf;
-
- erts_de_rwlock(dep);
- erts_atomic_set_nob(&dep->input_handler,
- (erts_aint_t) NIL);
- cache = dep->cache;
- dep->cache = NULL;
-
- erts_mtx_lock(&dep->qlock);
-
- erts_atomic64_set_nob(&dep->in, 0);
- erts_atomic64_set_nob(&dep->out, 0);
-
- obuf = clear_de_out_queues(dep);
- dep->state = ERTS_DE_STATE_IDLE;
- suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
-
- erts_mtx_unlock(&dep->qlock);
- erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
- dep->send = NULL;
- erts_de_rwunlock(dep);
-
- erts_resume_processes(suspendees);
-
- delete_cache(cache);
-
- free_de_out_queues(dep, obuf);
- if (dep->transcode_ctx)
- transcode_free_ctx(dep);
-}
-
int erts_dsend_context_dtor(Binary* ctx_bin)
{
ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin);
@@ -861,7 +852,7 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
DeclareTmpHeapNoproc(ctl_heap,6);
int res;
- if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
+ if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
/*
* Receiver does not support DOP_MONITOR_P_EXIT (see dsig_send_monitor)
*/
@@ -889,7 +880,7 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
DeclareTmpHeapNoproc(ctl_heap,5);
int res;
- if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
+ if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
/*
* Receiver does not support DOP_MONITOR_P.
* Just avoid sending it and by doing that reduce this monitor
@@ -920,7 +911,7 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher,
DeclareTmpHeapNoproc(ctl_heap,5);
int res;
- if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
+ if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
/*
* Receiver does not support DOP_DEMONITOR_P (see dsig_send_monitor)
*/
@@ -940,7 +931,7 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher,
static int can_send_seqtrace_token(ErtsSendContext* ctx, Eterm token) {
Eterm label;
- if (ctx->dep->flags & DFLAG_BIG_SEQTRACE_LABELS) {
+ if (ctx->dsd.flags & DFLAG_BIG_SEQTRACE_LABELS) {
/* The other end is capable of handling arbitrary seq_trace labels. */
return 1;
}
@@ -1001,7 +992,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
send_token = (token != NIL && can_send_seqtrace_token(ctx, token));
- if (ctx->dep->flags & DFLAG_SEND_SENDER) {
+ if (ctx->dsd.flags & DFLAG_SEND_SENDER) {
dist_op = make_small(send_token ?
DOP_SEND_SENDER_TT :
DOP_SEND_SENDER);
@@ -1200,21 +1191,8 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
#include <valgrind/valgrind.h>
#include <valgrind/memcheck.h>
-#ifndef HAVE_VALGRIND_PRINTF_XML
-#define VALGRIND_PRINTF_XML VALGRIND_PRINTF
-#endif
-
# define PURIFY_MSG(msg) \
- do { \
- char buf__[1]; size_t bufsz__ = sizeof(buf__); \
- if (erts_sys_explicit_8bit_getenv("VALGRIND_LOG_XML", buf__, &bufsz__) >= 0) { \
- VALGRIND_PRINTF_XML("<erlang_error_log>" \
- "%s, line %d: %s</erlang_error_log>\n", \
- __FILE__, __LINE__, msg); \
- } else { \
- VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg); \
- } \
- } while (0)
+ VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg)
#else
# define PURIFY_MSG(msg)
#endif
@@ -1231,13 +1209,13 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
int erts_net_message(Port *prt,
DistEntry *dep,
+ Uint32 conn_id,
byte *hbuf,
ErlDrvSizeT hlen,
byte *buf,
ErlDrvSizeT len)
{
ErtsDistExternal ede;
- byte *t;
Sint ctl_len;
Eterm arg;
Eterm from, to;
@@ -1255,7 +1233,6 @@ int erts_net_message(Port *prt,
Eterm token_size;
Uint tuple_arity;
int res;
- Uint32 connection_id;
#ifdef ERTS_DIST_MSG_DBG
ErlDrvSizeT orig_len = len;
#endif
@@ -1271,7 +1248,6 @@ int erts_net_message(Port *prt,
return 0;
}
-
ASSERT(hlen == 0);
if (len == 0) { /* HANDLE TICK !!! */
@@ -1284,15 +1260,7 @@ int erts_net_message(Port *prt,
bw(buf, len);
#endif
- if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)
- t = buf;
- else {
- /* Skip PASS_THROUGH */
- t = buf+1;
- len--;
- }
-
- res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache, &connection_id);
+ res = erts_prepare_dist_ext(&ede, buf, len, dep, conn_id, dep->cache);
switch (res) {
case ERTS_PREP_DIST_EXT_CLOSED:
@@ -1334,10 +1302,9 @@ int erts_net_message(Port *prt,
PURIFY_MSG("data error");
goto decode_error;
}
- ctl_len = t - buf;
#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, "<<%s CTL: %T\n", len != orig_len ? "P" : " ", arg);
+ erts_fprintf(stderr, "<< CTL: %T\n", arg);
#endif
if (is_not_tuple(arg) ||
@@ -1746,9 +1713,17 @@ int erts_net_message(Port *prt,
token = tuple[4];
reason = tuple[5];
}
- if (is_not_pid(from) || is_not_internal_pid(to)) {
+ if (is_not_pid(from)) {
goto invalid_message;
}
+ if (is_not_internal_pid(to)) {
+ if (is_external_pid(to)) {
+ DistEntry *dep = external_pid_dist_entry(to);
+ if (dep == erts_this_dist_entry)
+ break; /* Old incarnation of this node... */
+ }
+ goto invalid_message;
+ }
erts_proc_sig_send_exit(NULL, from, to, reason, token, 0);
break;
@@ -1791,7 +1766,7 @@ decode_error:
}
data_error:
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
- erts_kill_dist_connection(dep, connection_id);
+ erts_kill_dist_connection(dep, conn_id);
ERTS_CHK_NO_PROC_LOCKS;
return -1;
}
@@ -1847,7 +1822,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
while (1) {
switch (ctx->phase) {
case ERTS_DSIG_SEND_PHASE_INIT:
- ctx->flags = dsdp->dep->flags;
+ ctx->flags = dsdp->flags;
ctx->c_p = dsdp->proc;
if (!ctx->c_p || dsdp->no_suspend)
@@ -2102,13 +2077,14 @@ dist_port_command(Port *prt, ErtsDistOutputBuf *obuf)
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(dist_output)) {
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
DTRACE_CHARBUF(port_str, 64);
DTRACE_CHARBUF(remote_str, 64);
erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
"%T", prt->common.id);
erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
- "%T", prt->dist_entry->sysname);
+ "%T", dep->sysname);
DTRACE4(dist_output, erts_this_node_sysname, port_str,
remote_str, size);
}
@@ -2164,13 +2140,14 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(dist_outputv)) {
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
DTRACE_CHARBUF(port_str, 64);
DTRACE_CHARBUF(remote_str, 64);
erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
"%T", prt->common.id);
erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
- "%T", prt->dist_entry->sysname);
+ "%T", dep->sysname);
DTRACE4(dist_outputv, erts_this_node_sysname, port_str,
remote_str, size);
}
@@ -2208,7 +2185,7 @@ erts_dist_command(Port *prt, int initial_reds)
Uint32 flags;
Sint qsize, obufsize = 0;
ErtsDistOutputQueue oq, foq;
- DistEntry *dep = prt->dist_entry;
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
erts_aint32_t sched_flags;
ErtsSchedulerData *esdp = erts_get_scheduler_data();
@@ -2584,11 +2561,12 @@ dist_ctrl_get_data_notification_1(BIF_ALIST_1)
erts_aint32_t qflgs;
erts_aint_t qsize;
Eterm receiver = NIL;
+ Uint32 conn_id;
if (!dep)
BIF_ERROR(BIF_P, EXC_NOTSUP);
- if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep)
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep)
BIF_ERROR(BIF_P, BADARG);
/*
@@ -2598,6 +2576,11 @@ dist_ctrl_get_data_notification_1(BIF_ALIST_1)
erts_de_rlock(dep);
+ if (dep->connection_id != conn_id) {
+ erts_de_runlock(dep);
+ BIF_ERROR(BIF_P, BADARG);
+ }
+
ASSERT(dep->cid == BIF_P->common.id);
qflgs = erts_atomic32_read_acqb(&dep->qflgs);
@@ -2638,6 +2621,7 @@ dist_ctrl_put_data_2(BIF_ALIST_2)
DistEntry *dep;
ErlDrvSizeT size;
Eterm input_handler;
+ Uint32 conn_id;
if (is_binary(BIF_ARG_2))
size = binary_size(BIF_ARG_2);
@@ -2649,7 +2633,7 @@ dist_ctrl_put_data_2(BIF_ALIST_2)
else
BIF_ERROR(BIF_P, BADARG);
- dep = erts_dhandle_to_dist_entry(BIF_ARG_1);
+ dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id);
if (!dep)
BIF_ERROR(BIF_P, BADARG);
@@ -2669,7 +2653,7 @@ dist_ctrl_put_data_2(BIF_ALIST_2)
erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
- (void) erts_net_message(NULL, dep, NULL, 0, data, size);
+ (void) erts_net_message(NULL, dep, conn_id, NULL, 0, data, size);
/*
* We ignore any decode failures. On fatal failures the
* connection will be taken down by killing the
@@ -2693,13 +2677,18 @@ dist_get_stat_1(BIF_ALIST_1)
Sint64 read, write, pend;
Eterm res, *hp, **hpp;
Uint sz, *szp;
- DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1);
+ Uint32 conn_id;
+ DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id);
if (!dep)
BIF_ERROR(BIF_P, BADARG);
erts_de_rlock(dep);
+ if (dep->connection_id != conn_id) {
+ erts_de_runlock(dep);
+ BIF_ERROR(BIF_P, BADARG);
+ }
read = (Sint64) erts_atomic64_read_nob(&dep->in);
write = (Sint64) erts_atomic64_read_nob(&dep->out);
pend = (Sint64) erts_atomic_read_nob(&dep->qsize);
@@ -2730,19 +2719,25 @@ BIF_RETTYPE
dist_ctrl_input_handler_2(BIF_ALIST_2)
{
DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
+ Uint32 conn_id;
if (!dep)
BIF_ERROR(BIF_P, EXC_NOTSUP);
- if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep)
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep)
BIF_ERROR(BIF_P, BADARG);
if (is_not_internal_pid(BIF_ARG_2))
BIF_ERROR(BIF_P, BADARG);
+ erts_de_rlock(dep);
+ if (dep->connection_id != conn_id) {
+ erts_de_runlock(dep);
+ BIF_ERROR(BIF_P, BADARG);
+ }
erts_atomic_set_nob(&dep->input_handler,
(erts_aint_t) BIF_ARG_2);
-
+ erts_de_runlock(dep);
BIF_RET(am_ok);
}
@@ -2756,15 +2751,21 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
Eterm *hp;
ProcBin *pb;
erts_aint_t qsize;
+ Uint32 conn_id;
if (!dep)
BIF_ERROR(BIF_P, EXC_NOTSUP);
- if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep)
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep)
BIF_ERROR(BIF_P, BADARG);
erts_de_rlock(dep);
+ if (dep->connection_id != conn_id) {
+ erts_de_runlock(dep);
+ BIF_ERROR(BIF_P, BADARG);
+ }
+
if (dep->state == ERTS_DE_STATE_EXITING)
goto return_none;
@@ -2851,13 +2852,14 @@ erts_dist_port_not_busy(Port *prt)
{
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(dist_port_not_busy)) {
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
DTRACE_CHARBUF(port_str, 64);
DTRACE_CHARBUF(remote_str, 64);
erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
"%T", prt->common.id);
erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
- "%T", prt->dist_entry->sysname);
+ "%T", dep->sysname);
DTRACE3(dist_port_not_busy, erts_this_node_sysname,
port_str, remote_str);
}
@@ -2883,10 +2885,10 @@ static void kill_connection(DistEntry *dep)
}
void
-erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id)
+erts_kill_dist_connection(DistEntry *dep, Uint32 conn_id)
{
erts_de_rwlock(dep);
- if (connection_id == dep->connection_id
+ if (conn_id == dep->connection_id
&& dep->state == ERTS_DE_STATE_CONNECTED) {
kill_connection(dep);
@@ -3222,23 +3224,6 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
else if (!dep)
goto system_limit; /* Should never happen!!! */
- erts_de_rlock(dep);
- de_locked = -1;
-
- if (dep->state == ERTS_DE_STATE_EXITING) {
- /* Suspend on dist entry waiting for the exit to finish */
- ErtsProcList *plp = erts_proclist_create(BIF_P);
- plp->next = NULL;
- erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
- erts_mtx_lock(&dep->qlock);
- erts_proclist_store_last(&dep->suspended, plp);
- erts_mtx_unlock(&dep->qlock);
- goto yield;
- }
-
- erts_de_runlock(dep);
- de_locked = 0;
-
if (is_internal_pid(BIF_ARG_2)) {
if (BIF_P->common.id == BIF_ARG_2) {
ErtsSetupConnDistCtrl scdc;
@@ -3284,7 +3269,7 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
hp = HAlloc(BIF_P, 3);
}
else {
- int new;
+ Uint32 conn_id;
pp = erts_id2port_sflgs(BIF_ARG_2,
BIF_P,
@@ -3293,7 +3278,7 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
erts_de_rwlock(dep);
de_locked = 1;
- if (dep->state == ERTS_DE_STATE_EXITING)
+ if (dep->state != ERTS_DE_STATE_PENDING)
goto badarg;
if (!pp || (erts_atomic32_read_nob(&pp->state)
@@ -3303,49 +3288,39 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
goto badarg;
- if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep)
- new = 0;
- else {
- if (dep->state != ERTS_DE_STATE_PENDING) {
- if (dep->state == ERTS_DE_STATE_IDLE)
- erts_set_dist_entry_pending(dep);
- else
- goto badarg;
- }
-
- if (pp->dist_entry || is_not_nil(dep->cid))
- goto badarg;
-
- erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
+ if (erts_prtsd_get(pp, ERTS_PRTSD_DIST_ENTRY) != NULL
+ || is_not_nil(dep->cid))
+ goto badarg;
- pp->dist_entry = dep;
+ erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
- ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
+ erts_prtsd_set(pp, ERTS_PRTSD_DIST_ENTRY, dep);
+ erts_prtsd_set(pp, ERTS_PRTSD_CONN_ID, (void*)(UWord)dep->connection_id);
- dep->send = (pp->drv_ptr->outputv
- ? dist_port_commandv
- : dist_port_command);
- ASSERT(dep->send);
+ ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
- /*
- * Dist-ports do not use the "busy port message queue" functionality, but
- * instead use "busy dist entry" functionality.
- */
- {
- ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
- erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
- }
+ dep->send = (pp->drv_ptr->outputv
+ ? dist_port_commandv
+ : dist_port_command);
+ ASSERT(dep->send);
- setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version);
- de_locked = 0;
- new = !0;
+ /*
+ * Dist-ports do not use the "busy port message queue" functionality, but
+ * instead use "busy dist entry" functionality.
+ */
+ {
+ ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
+ erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
}
- hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE);
- res = erts_build_dhandle(&hp, &BIF_P->off_heap, dep);
+ conn_id = dep->connection_id;
+ setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version);
+ de_locked = 0;
+
+ hp = HAlloc(BIF_P, 3 + ERTS_DHANDLE_SIZE);
+ res = erts_build_dhandle(&hp, &BIF_P->off_heap, dep, conn_id);
res_tag = am_ok; /* Connection up */
- if (new)
- dep = NULL; /* inc of refc transferred to port (dist_entry field) */
+ dep = NULL; /* inc of refc transferred to port (dist_entry field) */
}
ASSERT(is_value(res) && is_value(res_tag));
@@ -3371,12 +3346,6 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
return ret;
- yield:
- ERTS_BIF_PREP_YIELD4(ret,
- bif_export[BIF_erts_internal_create_dist_channel_4],
- BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4);
- goto done;
-
badarg:
ERTS_BIF_PREP_RET(ret, am_badarg);
goto done;
@@ -3398,8 +3367,7 @@ setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
dep->creation = 0;
ASSERT(is_internal_port(ctrlr) || is_internal_pid(ctrlr));
- ASSERT(erts_atomic_read_nob(&dep->qsize) == 0
- || (dep->state == ERTS_DE_STATE_PENDING));
+ ASSERT(dep->state == ERTS_DE_STATE_PENDING);
if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
create_cache(dep);
@@ -3445,37 +3413,20 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment *
DistEntry *dep = scdcp->dep;
int dep_locked = 0;
Eterm *hp;
- erts_aint32_t state;
+ Uint32 conn_id;
if (redsp)
*redsp = 1;
- state = erts_atomic32_read_nob(&c_p->state);
-
- if (state & ERTS_PSFLG_EXITING)
- goto badarg;
+ ASSERT(!ERTS_PROC_IS_EXITING(c_p));
erts_de_rwlock(dep);
dep_locked = !0;
- if (dep->state == ERTS_DE_STATE_EXITING)
- goto badarg;
-
- if (ERTS_PROC_GET_DIST_ENTRY(c_p)) {
- if (dep == ERTS_PROC_GET_DIST_ENTRY(c_p)
- && (c_p->flags & F_DISTRIBUTION)
- && dep->cid == c_p->common.id) {
- goto connected;
- }
+ if (dep->state != ERTS_DE_STATE_PENDING)
goto badarg;
- }
- if (dep->state != ERTS_DE_STATE_PENDING) {
- if (dep->state == ERTS_DE_STATE_IDLE)
- erts_set_dist_entry_pending(dep);
- else
- goto badarg;
- }
+ conn_id = dep->connection_id;
if (is_not_nil(dep->cid))
goto badarg;
@@ -3490,18 +3441,17 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment *
setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id,
scdcp->flags, scdcp->version);
-connected:
/* we take over previous inc in refc of dep */
if (!bpp) /* called directly... */
- return erts_make_dhandle(c_p, dep);
+ return erts_make_dhandle(c_p, dep, conn_id);
erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg);
- *bpp = new_message_buffer(ERTS_MAGIC_REF_THING_SIZE);
+ *bpp = new_message_buffer(ERTS_DHANDLE_SIZE);
hp = (*bpp)->mem;
- return erts_build_dhandle(&hp, &(*bpp)->off_heap, dep);
+ return erts_build_dhandle(&hp, &(*bpp)->off_heap, dep, conn_id);
badarg:
@@ -3542,34 +3492,30 @@ BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1)
erts_de_rwlock(dep);
switch (dep->state) {
- case ERTS_DE_STATE_PENDING:
case ERTS_DE_STATE_CONNECTED:
+ case ERTS_DE_STATE_EXITING:
+ case ERTS_DE_STATE_PENDING:
conn_id = dep->connection_id;
break;
case ERTS_DE_STATE_IDLE:
erts_set_dist_entry_pending(dep);
conn_id = dep->connection_id;
break;
- case ERTS_DE_STATE_EXITING:
- conn_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK;
- break;
default:
erts_exit(ERTS_ABORT_EXIT, "Invalid dep->state (%d)\n", dep->state);
}
erts_de_rwunlock(dep);
- hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE);
- dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep);
+ hp = HAlloc(BIF_P, ERTS_DHANDLE_SIZE);
+ dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep, conn_id);
erts_deref_dist_entry(dep);
- BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle));
+ BIF_RET(dhandle);
}
-static Sint abort_connection(DistEntry* dep, Uint32 conn_id)
+Sint erts_abort_connection_rwunlock(DistEntry* dep)
{
- erts_de_rwlock(dep);
+ ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep));
- if (dep->connection_id != conn_id)
- ;
- else if (dep->state == ERTS_DE_STATE_CONNECTED) {
+ if (dep->state == ERTS_DE_STATE_CONNECTED) {
kill_connection(dep);
}
else if (dep->state == ERTS_DE_STATE_PENDING) {
@@ -3598,7 +3544,6 @@ static Sint abort_connection(DistEntry* dep, Uint32 conn_id)
dep->send = NULL;
erts_set_dist_entry_not_connected(dep);
-
erts_de_rwunlock(dep);
schedule_con_monitor_link_cleanup(mld, THE_NON_VALUE,
@@ -3611,42 +3556,37 @@ static Sint abort_connection(DistEntry* dep, Uint32 conn_id)
delete_cache(cache);
free_de_out_queues(dep, obuf);
-
- /*
- * We wait to make DistEntry idle and accept new connection attempts
- * until all is cleared and deallocated. This to get some back pressure
- * against repeated failing connection attempts saturating all CPUs
- * with cleanup jobs.
- */
- erts_de_rwlock(dep);
- ASSERT(dep->state == ERTS_DE_STATE_EXITING);
- dep->state = ERTS_DE_STATE_IDLE;
- erts_de_rwunlock(dep);
return reds;
}
erts_de_rwunlock(dep);
return 0;
}
+static Sint abort_connection(DistEntry *dep, Uint32 conn_id)
+{
+ erts_de_rwlock(dep);
+ if (dep->connection_id == conn_id)
+ return erts_abort_connection_rwunlock(dep);
+ erts_de_rwunlock(dep);
+ return 0;
+}
+
BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2)
{
DistEntry* dep;
- Eterm* tp;
+ Uint32 conn_id;
+ Sint reds;
- if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) {
- BIF_ERROR(BIF_P, BADARG);
- }
- tp = tuple_val(BIF_ARG_2);
- dep = erts_dhandle_to_dist_entry(tp[2]);
- if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)
+ if (is_not_atom(BIF_ARG_1))
+ BIF_ERROR(BIF_P, BADARG);
+ dep = erts_dhandle_to_dist_entry(BIF_ARG_2, &conn_id);
+ if (!dep || dep != erts_find_dist_entry(BIF_ARG_1)
|| dep == erts_this_dist_entry) {
BIF_ERROR(BIF_P, BADARG);
}
- if (dep) {
- Sint reds = abort_connection(dep, unsigned_val(tp[1]));
- BUMP_REDS(BIF_P, reds);
- }
+ reds = abort_connection(dep, conn_id);
+ BUMP_REDS(BIF_P, reds);
BIF_RET(am_true);
}
@@ -3677,14 +3617,14 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks)
}
/*
- * Send {auto_connect, Node, ConnId, DHandle} to net_kernel
+ * Send {auto_connect, Node, DHandle} to net_kernel
*/
mp = erts_alloc_message_heap(net_kernel, &nk_locks,
- 5 + ERTS_MAGIC_REF_THING_SIZE,
+ 4 + ERTS_DHANDLE_SIZE,
&hp, &ohp);
- dhandle = erts_build_dhandle(&hp, ohp, dep);
- msg = TUPLE4(hp, am_auto_connect, dep->sysname, make_small(conn_id),
- dhandle);
+ dhandle = erts_build_dhandle(&hp, ohp, dep, conn_id);
+ msg = TUPLE3(hp, am_auto_connect, dep->sysname, dhandle);
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_proc_message(proc, net_kernel, nk_locks, mp, msg);
erts_proc_unlock(net_kernel, nk_locks);
}