aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2017-08-28 15:00:48 +0200
committerRickard Green <[email protected]>2017-08-28 15:00:48 +0200
commit3aa559f584559af3a76ecd8c2367f59e062dbe0e (patch)
tree8cdd89cabdb2b6592f03627826a7b7041441d178 /erts/emulator/beam
parent7d8beec59350c1b3e92cee50c222f8e2ccdd30d5 (diff)
parentffd59fbd9ac262b7aba4b86e7da4992a3e668e24 (diff)
downloadotp-3aa559f584559af3a76ecd8c2367f59e062dbe0e.tar.gz
otp-3aa559f584559af3a76ecd8c2367f59e062dbe0e.tar.bz2
otp-3aa559f584559af3a76ecd8c2367f59e062dbe0e.zip
Merge branch 'rickard/dist/OTP-14459' into rickard/dist/master/OTP-14459
Conflicts: erts/emulator/beam/bif.c erts/emulator/beam/dist.c erts/emulator/beam/dist.h erts/emulator/beam/erl_bif_info.c erts/emulator/beam/erl_node_tables.c erts/emulator/beam/erl_node_tables.h erts/emulator/beam/external.c
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r--erts/emulator/beam/atom.names2
-rw-r--r--erts/emulator/beam/bif.c42
-rw-r--r--erts/emulator/beam/bif.tab6
-rw-r--r--erts/emulator/beam/dist.c1021
-rw-r--r--erts/emulator/beam/dist.h12
-rw-r--r--erts/emulator/beam/erl_bif_info.c9
-rw-r--r--erts/emulator/beam/erl_monitors.c2
-rw-r--r--erts/emulator/beam/erl_node_tables.c346
-rw-r--r--erts/emulator/beam/erl_node_tables.h53
-rw-r--r--erts/emulator/beam/erl_process.c39
-rw-r--r--erts/emulator/beam/erl_process.h20
-rw-r--r--erts/emulator/beam/external.c58
-rw-r--r--erts/emulator/beam/external.h7
-rw-r--r--erts/emulator/beam/io.c2
14 files changed, 1199 insertions, 420 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names
index a44d23b181..fc55b687d4 100644
--- a/erts/emulator/beam/atom.names
+++ b/erts/emulator/beam/atom.names
@@ -217,6 +217,8 @@ atom discard
atom display_items
atom dist
atom dist_cmd
+atom dist_ctrl_put_data
+atom dist_data
atom Div='/'
atom div
atom dlink
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index 7e26af5abd..cbbc2e88a1 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -437,7 +437,6 @@ BIF_RETTYPE demonitor(Process *c_p, Eterm ref, Eterm *multip)
ErtsMonitor *mon = NULL; /* The monitor entry to delete */
Eterm to = NIL; /* Monitor link traget */
DistEntry *dep = NULL; /* Target's distribution entry */
- int deref_de = 0;
BIF_RETTYPE res = am_false;
int unlock_link = 1;
@@ -467,8 +466,6 @@ BIF_RETTYPE demonitor(Process *c_p, Eterm ref, Eterm *multip)
ASSERT(is_node_name_atom(to));
dep = erts_sysname_to_connected_dist_entry(to);
ASSERT(dep != erts_this_dist_entry);
- if (dep)
- deref_de = 1;
} else if (is_port(to)) {
if (port_dist_entry(to) != erts_this_dist_entry) {
goto badarg;
@@ -486,11 +483,6 @@ BIF_RETTYPE demonitor(Process *c_p, Eterm ref, Eterm *multip)
unlock_link = 0;
}
else { /* Local monitor */
- if (deref_de) {
- deref_de = 0;
- erts_deref_dist_entry(dep);
- }
- dep = NULL;
demonitor_local_process(c_p, ref, to, &res);
}
break;
@@ -505,11 +497,6 @@ done:
if (unlock_link)
erts_proc_unlock(c_p, ERTS_PROC_LOCK_LINK);
- if (deref_de) {
- ASSERT(dep);
- erts_deref_dist_entry(dep);
- }
-
ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
BIF_RET(res);
}
@@ -844,7 +831,6 @@ BIF_RETTYPE monitor_2(BIF_ALIST_2)
Eterm target = BIF_ARG_2;
BIF_RETTYPE ret;
DistEntry *dep = NULL;
- int deref_de = 0;
/* Only process monitors are implemented */
switch (BIF_ARG_1) {
@@ -904,21 +890,14 @@ local_port:
}
dep = erts_sysname_to_connected_dist_entry(remote_node);
if (dep == erts_this_dist_entry) {
- deref_de = 1;
ret = local_name_monitor(BIF_P, BIF_ARG_1, name);
} else {
- if (dep)
- deref_de = 1;
ret = remote_monitor(BIF_P, BIF_ARG_1, BIF_ARG_2, dep, name, 1);
}
} else {
badarg:
ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG);
}
- if (deref_de) {
- deref_de = 0;
- erts_deref_dist_entry(dep);
- }
return ret;
}
@@ -2000,6 +1979,7 @@ static Sint remote_send(Process *p, DistEntry *dep,
ASSERT(is_atom(to) || is_external_pid(to));
+ ctx->dep = dep;
code = erts_dsig_prepare(&ctx->dsd, dep, p, ERTS_DSP_NO_LOCK, !ctx->suspend);
switch (code) {
case ERTS_DSIG_PREP_NOT_ALIVE:
@@ -2201,7 +2181,6 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx)
if (dep == erts_this_dist_entry) {
Eterm id;
- erts_deref_dist_entry(dep);
if (IS_TRACED_FL(p, F_TRACE_SEND))
trace_send(p, to, msg);
if (ERTS_PROC_GET_SAVED_CALLS_BUF(p))
@@ -2224,11 +2203,9 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx)
}
ret = remote_send(p, dep, tp[1], to, msg, ctx);
- if (ret != SEND_YIELD_CONTINUE) {
- if (dep) {
- erts_deref_dist_entry(dep);
- }
- } else {
+ if (ret == SEND_YIELD_CONTINUE) {
+ if (dep)
+ erts_ref_dist_entry(dep);
ctx->dep_to_deref = dep;
}
return ret;
@@ -4164,7 +4141,6 @@ BIF_RETTYPE list_to_pid_1(BIF_ALIST_1)
goto bad;
if(dep == erts_this_dist_entry) {
- erts_deref_dist_entry(dep);
BIF_RET(make_internal_pid(make_pid_data(c, b)));
}
else {
@@ -4184,13 +4160,10 @@ BIF_RETTYPE list_to_pid_1(BIF_ALIST_1)
etp->data.ui[0] = make_pid_data(c, b);
MSO(BIF_P).first = (struct erl_off_heap_header*) etp;
- erts_deref_dist_entry(dep);
BIF_RET(make_external_pid(etp));
}
bad:
- if (dep)
- erts_deref_dist_entry(dep);
if (buf)
erts_free(ERTS_ALC_T_TMP, (void *) buf);
BIF_ERROR(BIF_P, BADARG);
@@ -4235,7 +4208,6 @@ BIF_RETTYPE list_to_port_1(BIF_ALIST_1)
goto bad;
if(dep == erts_this_dist_entry) {
- erts_deref_dist_entry(dep);
BIF_RET(make_internal_port(p));
}
else {
@@ -4255,13 +4227,10 @@ BIF_RETTYPE list_to_port_1(BIF_ALIST_1)
etp->data.ui[0] = p;
MSO(BIF_P).first = (struct erl_off_heap_header*) etp;
- erts_deref_dist_entry(dep);
BIF_RET(make_external_port(etp));
}
bad:
- if (dep)
- erts_deref_dist_entry(dep);
BIF_ERROR(BIF_P, BADARG);
}
@@ -4381,12 +4350,9 @@ BIF_RETTYPE list_to_ref_1(BIF_ALIST_1)
res = make_external_ref(etp);
}
- erts_deref_dist_entry(dep);
BIF_RET(res);
bad:
- if (dep)
- erts_deref_dist_entry(dep);
BIF_ERROR(BIF_P, BADARG);
}
diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab
index 962b00ae7b..2f7927d685 100644
--- a/erts/emulator/beam/bif.tab
+++ b/erts/emulator/beam/bif.tab
@@ -154,6 +154,12 @@ bif erlang:spawn_opt/1
bif erlang:setnode/2
bif erlang:setnode/3
bif erlang:dist_exit/3
+bif erlang:dist_get_stat/1
+bif erlang:dist_ctrl_input_handler/2
+bif erlang:dist_ctrl_put_data/2
+bif erlang:dist_ctrl_get_data/1
+bif erlang:dist_ctrl_get_data_notification/1
+
# Static native functions in erts_internal
bif erts_internal:port_info/1
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 491c4d378e..10ec275922 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -121,7 +121,7 @@ Export* dexit_trap = NULL;
Export* dmonitor_p_trap = NULL;
/* local variables */
-
+static Export *dist_ctrl_put_data_trap;
/* forward declarations */
@@ -156,9 +156,7 @@ create_cache(DistEntry *dep)
int i;
ErtsAtomCache *cp;
- ERTS_LC_ASSERT(
- is_internal_port(dep->cid)
- && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid)));
+ ERTS_LC_ASSERT(is_nil(dep->cid));
ASSERT(!dep->cache);
dep->cache = cp = (ErtsAtomCache*) erts_alloc(ERTS_ALC_T_DCACHE,
@@ -176,11 +174,13 @@ Uint erts_dist_cache_size(void)
}
static ErtsProcList *
-get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs)
+get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs)
{
+ erts_aint32_t qflgs;
ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));
- dep->qflgs &= ~unset_qflgs;
- if (dep->qflgs & ERTS_DE_QFLG_EXIT) {
+ qflgs = erts_atomic32_read_band_acqb(&dep->qflgs, ~unset_qflgs);
+ qflgs &= ~unset_qflgs;
+ if (qflgs & ERTS_DE_QFLG_EXIT) {
/* No resume when exit has been scheduled */
return NULL;
}
@@ -446,7 +446,35 @@ inc_no_nodes(void)
#endif
erts_atomic_inc_mb(&no_nodes);
}
-
+
+static void
+kill_dist_ctrl_proc(void *vpid)
+{
+ Eterm pid = (Eterm) vpid;
+ ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
+ Process *rp = erts_pid2proc(NULL, 0, pid, rp_locks);
+ if (rp) {
+ erts_send_exit_signal(NULL, rp->common.id, rp, &rp_locks,
+ am_kill, NIL, NULL, 0);
+ if (rp_locks)
+ erts_proc_unlock(rp, rp_locks);
+ }
+}
+
+static void
+schedule_kill_dist_ctrl_proc(Eterm pid)
+{
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ int sched_id = 1;
+ if (!esdp || ERTS_SCHEDULER_IS_DIRTY(esdp))
+ sched_id = 1;
+ else
+ sched_id = (int) esdp->no;
+ erts_schedule_misc_aux_work(sched_id,
+ kill_dist_ctrl_proc,
+ (void *) (UWord) pid);
+}
+
/*
* proc is currently running or exiting process.
*/
@@ -456,58 +484,62 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */
DistEntry *tdep;
- int no_dist_port = 0;
+ int no_dist_ctrl = 0;
Eterm nd_reason = (reason == am_no_network
? am_no_network
: am_net_kernel_terminated);
erts_rwmtx_rlock(&erts_dist_table_rwmtx);
for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next)
- no_dist_port++;
+ no_dist_ctrl++;
for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next)
- no_dist_port++;
+ no_dist_ctrl++;
/* KILL all port controllers */
- if (no_dist_port == 0)
+ if (no_dist_ctrl == 0)
erts_rwmtx_runlock(&erts_dist_table_rwmtx);
else {
Eterm def_buf[128];
int i = 0;
- Eterm *dist_port;
+ Eterm *dist_ctrl;
- if (no_dist_port <= sizeof(def_buf)/sizeof(def_buf[0]))
- dist_port = &def_buf[0];
+ if (no_dist_ctrl <= sizeof(def_buf)/sizeof(def_buf[0]))
+ dist_ctrl = &def_buf[0];
else
- dist_port = erts_alloc(ERTS_ALC_T_TMP,
- sizeof(Eterm)*no_dist_port);
+ dist_ctrl = erts_alloc(ERTS_ALC_T_TMP,
+ sizeof(Eterm)*no_dist_ctrl);
for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) {
- ASSERT(is_internal_port(tdep->cid));
- dist_port[i++] = tdep->cid;
+ ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid));
+ dist_ctrl[i++] = tdep->cid;
}
for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) {
- ASSERT(is_internal_port(tdep->cid));
- dist_port[i++] = tdep->cid;
+ ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid));
+ dist_ctrl[i++] = tdep->cid;
}
erts_rwmtx_runlock(&erts_dist_table_rwmtx);
- for (i = 0; i < no_dist_port; i++) {
- Port *prt = erts_port_lookup(dist_port[i],
- ERTS_PORT_SFLGS_INVALID_LOOKUP);
- if (!prt)
- continue;
- ASSERT(erts_atomic32_read_nob(&prt->state)
- & ERTS_PORT_SFLG_DISTRIBUTION);
-
- erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED,
- prt, dist_port[i], nd_reason, NULL);
+ for (i = 0; i < no_dist_ctrl; i++) {
+ if (is_internal_pid(dist_ctrl[i]))
+ schedule_kill_dist_ctrl_proc(dist_ctrl[i]);
+ else {
+ Port *prt = erts_port_lookup(dist_ctrl[i],
+ ERTS_PORT_SFLGS_INVALID_LOOKUP);
+ if (prt) {
+ ASSERT(erts_atomic32_read_nob(&prt->state)
+ & ERTS_PORT_SFLG_DISTRIBUTION);
+
+ erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED,
+ prt, dist_ctrl[i], nd_reason, NULL);
+ }
+ }
}
- if (dist_port != &def_buf[0])
- erts_free(ERTS_ALC_T_TMP, dist_port);
+ if (dist_ctrl != &def_buf[0])
+ erts_free(ERTS_ALC_T_TMP, dist_ctrl);
}
/*
- * When last dist port exits, node will be taken
+ * When last dist ctrl exits, node will be taken
* from alive to not alive.
*/
ASSERT(is_nil(nodedown.reason) && !nodedown.bp);
@@ -524,7 +556,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
&nodedown.bp->off_heap);
}
}
- else { /* Call from distribution port */
+ else { /* Call from distribution controller (port/process) */
NetExitsContext nec = {dep};
ErtsLink *nlinks;
ErtsLink *node_links;
@@ -534,24 +566,23 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
erts_atomic_set_mb(&dep->dist_cmd_scheduled, 1);
erts_de_rwlock(dep);
- ERTS_LC_ASSERT(is_internal_port(dep->cid)
- && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid)));
+ if (is_internal_port(dep->cid)) {
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid)));
- if (erts_port_task_is_scheduled(&dep->dist_cmd))
- erts_port_task_abort(&dep->dist_cmd);
+ if (erts_port_task_is_scheduled(&dep->dist_cmd))
+ erts_port_task_abort(&dep->dist_cmd);
+ }
if (dep->status & ERTS_DE_SFLG_EXITING) {
#ifdef DEBUG
- erts_mtx_lock(&dep->qlock);
- ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT);
- erts_mtx_unlock(&dep->qlock);
+ ASSERT(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT);
#endif
}
else {
dep->status |= ERTS_DE_SFLG_EXITING;
erts_mtx_lock(&dep->qlock);
- ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
- dep->qflgs |= ERTS_DE_QFLG_EXIT;
+ ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
+ erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT);
erts_mtx_unlock(&dep->qlock);
}
@@ -616,6 +647,9 @@ void init_dist(void)
dgroup_leader_trap = trap_function(am_dgroup_leader,2);
dexit_trap = trap_function(am_dexit, 2);
dmonitor_p_trap = trap_function(am_dmonitor_p, 2);
+ dist_ctrl_put_data_trap = erts_export_put(am_erts_internal,
+ am_dist_ctrl_put_data,
+ 2);
}
#define ErtsDistOutputBuf2Binary(OB) \
@@ -658,6 +692,8 @@ static void clear_dist_entry(DistEntry *dep)
ErtsDistOutputBuf *obuf;
erts_de_rwlock(dep);
+ erts_atomic_set_nob(&dep->input_handler,
+ (erts_aint_t) NIL);
cache = dep->cache;
dep->cache = NULL;
@@ -671,6 +707,9 @@ static void clear_dist_entry(DistEntry *dep)
erts_mtx_lock(&dep->qlock);
+ erts_atomic64_set_nob(&dep->in, 0);
+ erts_atomic64_set_nob(&dep->out, 0);
+
if (!dep->out_queue.last)
obuf = dep->finalized_out_queue.first;
else {
@@ -678,8 +717,15 @@ static void clear_dist_entry(DistEntry *dep)
obuf = dep->out_queue.first;
}
+ if (dep->tmp_out_queue.first) {
+ dep->tmp_out_queue.last->next = obuf;
+ obuf = dep->tmp_out_queue.first;
+ }
+
dep->out_queue.first = NULL;
dep->out_queue.last = NULL;
+ dep->tmp_out_queue.first = NULL;
+ dep->tmp_out_queue.last = NULL;
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
dep->status = 0;
@@ -704,8 +750,9 @@ static void clear_dist_entry(DistEntry *dep)
if (obufsize) {
erts_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
+ ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize);
+ erts_atomic_add_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
erts_mtx_unlock(&dep->qlock);
}
}
@@ -904,11 +951,30 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
}
#endif
- if (token != NIL)
- ctl = TUPLE4(&ctx->ctl_heap[0],
- make_small(DOP_SEND_TT), am_Empty, remote, token);
- else
- ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_SEND), am_Empty, remote);
+ if (token != NIL) {
+ Eterm el1, el2;
+ if (ctx->dep->flags & DFLAG_SEND_SENDER) {
+ el1 = make_small(DOP_SEND_SENDER_TT);
+ el2 = sender->common.id;
+ }
+ else {
+ el1 = make_small(DOP_SEND_TT);
+ el2 = am_Empty;
+ }
+ ctl = TUPLE4(&ctx->ctl_heap[0], el1, el2, remote, token);
+ }
+ else {
+ Eterm el1, el2;
+ if (ctx->dep->flags & DFLAG_SEND_SENDER) {
+ el1 = make_small(DOP_SEND_SENDER);
+ el2 = sender->common.id;
+ }
+ else {
+ el1 = make_small(DOP_SEND);
+ el2 = am_Empty;
+ }
+ ctl = TUPLE3(&ctx->ctl_heap[0], el1, el2, remote);
+ }
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
DTRACE7(message_send_remote, sender_name, node_name, receiver_name,
@@ -1145,6 +1211,7 @@ int erts_net_message(Port *prt,
ErtsLink *lnk;
Uint tuple_arity;
int res;
+ Uint32 connection_id;
#ifdef ERTS_DIST_MSG_DBG
ErlDrvSizeT orig_len = len;
#endif
@@ -1153,14 +1220,17 @@ int erts_net_message(Port *prt,
ERTS_CHK_NO_PROC_LOCKS;
- ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(!prt || erts_lc_is_port_locked(prt));
if (!erts_is_alive) {
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
return 0;
}
+
+
if (hlen != 0)
goto data_error;
+
if (len == 0) { /* HANDLE TICK !!! */
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
return 0;
@@ -1184,25 +1254,31 @@ int erts_net_message(Port *prt,
goto data_error;
}
- res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache);
+ res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache, &connection_id);
- if (res >= 0)
- res = ctl_len = erts_decode_dist_ext_size(&ede);
- else {
+ switch (res) {
+ case ERTS_PREP_DIST_EXT_CLOSED:
+ return 0; /* Connection not alive; ignore signal... */
+ case ERTS_PREP_DIST_EXT_FAILED:
#ifdef ERTS_DIST_MSG_DBG
erts_fprintf(stderr, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n");
bw(buf, orig_len);
#endif
- ctl_len = 0;
- }
-
- if (res < 0) {
+ goto data_error;
+ case ERTS_PREP_DIST_EXT_SUCCESS:
+ ctl_len = erts_decode_dist_ext_size(&ede);
+ if (ctl_len < 0) {
#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n");
- bw(buf, orig_len);
+ erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n");
+ bw(buf, orig_len);
#endif
- PURIFY_MSG("data error");
- goto data_error;
+ PURIFY_MSG("data error");
+ goto data_error;
+ }
+ break;
+ default:
+ ERTS_INTERNAL_ERROR("Unexpected result from erts_prepare_dist_ext()");
+ break;
}
if (ctl_len > DIST_CTL_DEFAULT_SIZE) {
@@ -1233,6 +1309,7 @@ int erts_net_message(Port *prt,
}
token_size = 0;
+ token = NIL;
switch (type = unsigned_val(tuple[1])) {
case DOP_LINK:
@@ -1462,38 +1539,52 @@ int erts_net_message(Port *prt,
}
break;
+ case DOP_SEND_SENDER_TT: {
+ Uint xsize;
case DOP_SEND_TT:
+
if (tuple_arity != 4) {
goto invalid_message;
}
-
- token_size = size_object(tuple[4]);
- /* Fall through ... */
+
+ token = tuple[4];
+ token_size = size_object(token);
+ xsize = ERTS_HEAP_FRAG_SIZE(token_size);
+ goto send_common;
+
+ case DOP_SEND_SENDER:
case DOP_SEND:
+
+ token = NIL;
+ xsize = 0;
+ if (tuple_arity != 3)
+ goto invalid_message;
+
+ send_common:
+
/*
- * There is intentionally no testing of the cookie (it is always '')
- * from R9B and onwards.
+ * If DOP_SEND_SENDER or DOP_SEND_SENDER_TT element 2 contains
+ * the sender pid (i.e. DFLAG_SEND_SENDER is set); otherwise,
+ * the atom '' (empty cookie).
*/
+ ASSERT((type == DOP_SEND_SENDER || type == DOP_SEND_SENDER_TT)
+ ? (is_pid(tuple[2]) && (dep->flags & DFLAG_SEND_SENDER))
+ : tuple[2] == am_Empty);
+
#ifdef ERTS_DIST_MSG_DBG
dist_msg_dbg(&ede, "MSG", buf, orig_len);
#endif
- if (type != DOP_SEND_TT && tuple_arity != 3) {
- goto invalid_message;
- }
to = tuple[3];
if (is_not_pid(to)) {
goto invalid_message;
}
rp = erts_proc_lookup(to);
if (rp) {
- Uint xsize = type == DOP_SEND ? 0 : ERTS_HEAP_FRAG_SIZE(token_size);
ErtsProcLocks locks = 0;
ErtsDistExternal *ede_copy;
ede_copy = erts_make_dist_ext_copy(&ede, xsize);
- if (type == DOP_SEND) {
- token = NIL;
- } else {
+ if (is_not_nil(token)) {
ErlHeapFragment *heap_frag;
ErlOffHeap *ohp;
ASSERT(xsize);
@@ -1501,15 +1592,15 @@ int erts_net_message(Port *prt,
ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
hp = heap_frag->mem;
ohp = &heap_frag->off_heap;
- token = tuple[4];
token = copy_struct(token, token_size, &hp, ohp);
}
- erts_queue_dist_message(rp, locks, ede_copy, token, tuple[2]);
+ erts_queue_dist_message(rp, locks, ede_copy, token, am_Empty);
if (locks)
erts_proc_unlock(rp, locks);
}
break;
+ }
case DOP_MONITOR_P_EXIT: {
/* We are monitoring a process on the remote node which dies, we get
@@ -1723,7 +1814,7 @@ decode_error:
}
data_error:
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
- erts_deliver_port_exit(prt, dep->cid, am_killed, 0, 1);
+ erts_kill_dist_connection(dep, connection_id);
ERTS_CHK_NO_PROC_LOCKS;
return -1;
}
@@ -1744,6 +1835,31 @@ static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy)
return ret;
}
+static ERTS_INLINE void
+notify_dist_data(Process *c_p, Eterm pid)
+{
+ Process *rp;
+ ErtsProcLocks rp_locks;
+
+ ASSERT(erts_get_scheduler_data()
+ && !ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data()));
+ ASSERT(is_internal_pid(pid));
+
+ if (c_p && c_p->common.id == pid) {
+ rp = c_p;
+ rp_locks = ERTS_PROC_LOCK_MAIN;
+ }
+ else {
+ rp = erts_proc_lookup(pid);
+ rp_locks = 0;
+ }
+
+ if (rp) {
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ erts_queue_message(rp, rp_locks, mp, am_dist_data, am_system);
+ }
+}
+
int
erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
{
@@ -1859,12 +1975,32 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
free_dist_obuf(ctx->obuf);
}
else {
+ Sint qsize;
+ erts_aint32_t qflgs;
ErtsProcList *plp = NULL;
+ Eterm notify_proc = NIL;
+ Sint obsz = size_obuf(ctx->obuf);
+
erts_mtx_lock(&dep->qlock);
- dep->qsize += size_obuf(ctx->obuf);
- if (dep->qsize >= erts_dist_buf_busy_limit)
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- if (!ctx->force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) {
+ qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) obsz);
+ ASSERT(qsize >= obsz);
+ qflgs = erts_atomic32_read_nob(&dep->qflgs);
+ if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) {
+ erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY);
+ qflgs |= ERTS_DE_QFLG_BUSY;
+ }
+ if (qsize == obsz && (qflgs & ERTS_DE_QFLG_REQ_INFO)) {
+ /* Previously empty queue and info requested... */
+ qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
+ ~ERTS_DE_QFLG_REQ_INFO);
+ if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
+ notify_proc = dep->cid;
+ ASSERT(is_internal_pid(notify_proc));
+ }
+ /* else: requester will send itself the message... */
+ qflgs &= ~ERTS_DE_QFLG_REQ_INFO;
+ }
+ if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) {
erts_mtx_unlock(&dep->qlock);
plp = erts_proclist_create(ctx->c_p);
@@ -1881,7 +2017,8 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
dep->out_queue.last = ctx->obuf;
if (!ctx->force_busy) {
- if (!(dep->qflgs & ERTS_DE_QFLG_BUSY)) {
+ qflgs = erts_atomic32_read_nob(&dep->qflgs);
+ if (!(qflgs & ERTS_DE_QFLG_BUSY)) {
if (suspended)
resume = 1; /* was busy when we started, but isn't now */
#ifdef USE_VM_PROBES
@@ -1906,8 +2043,11 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
}
erts_mtx_unlock(&dep->qlock);
- erts_schedule_dist_command(NULL, dep);
+ if (is_internal_port(dep->cid))
+ erts_schedule_dist_command(NULL, dep);
erts_de_runlock(dep);
+ if (is_internal_pid(notify_proc))
+ notify_dist_data(ctx->c_p, notify_proc);
if (resume) {
erts_resume(ctx->c_p, ERTS_PROC_LOCK_MAIN);
@@ -1961,16 +2101,20 @@ static Uint
dist_port_command(Port *prt, ErtsDistOutputBuf *obuf)
{
int fpe_was_unmasked;
- Uint size = obuf->ext_endp - obuf->extp;
+ ErlDrvSizeT size;
+ char *bufp;
ERTS_CHK_NO_PROC_LOCKS;
ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (size > (Uint) INT_MAX)
- erts_exit(ERTS_DUMP_EXIT,
- "Absurdly large distribution output data buffer "
- "(%beu bytes) passed.\n",
- size);
+ if (!obuf) {
+ size = 0;
+ bufp = NULL;
+ }
+ else {
+ size = obuf->ext_endp - obuf->extp;
+ bufp = (char*) obuf->extp;
+ }
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(dist_output)) {
@@ -1985,11 +2129,10 @@ dist_port_command(Port *prt, ErtsDistOutputBuf *obuf)
remote_str, size);
}
#endif
+
prt->caller = NIL;
fpe_was_unmasked = erts_block_fpe();
- (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data,
- (char*) obuf->extp,
- (int) size);
+ (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, bufp, size);
erts_unblock_fpe(fpe_was_unmasked);
return size;
}
@@ -1998,7 +2141,7 @@ static Uint
dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
{
int fpe_was_unmasked;
- Uint size = obuf->ext_endp - obuf->extp;
+ ErlDrvSizeT size;
SysIOVec iov[2];
ErlDrvBinary* bv[2];
ErlIOVec eiov;
@@ -2006,25 +2149,33 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
ERTS_CHK_NO_PROC_LOCKS;
ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (size > (Uint) INT_MAX)
- erts_exit(ERTS_DUMP_EXIT,
- "Absurdly large distribution output data buffer "
- "(%beu bytes) passed.\n",
- size);
-
iov[0].iov_base = NULL;
iov[0].iov_len = 0;
bv[0] = NULL;
- iov[1].iov_base = obuf->extp;
- iov[1].iov_len = size;
- bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf));
+ if (!obuf) {
+ size = 0;
+ eiov.vsize = 1;
+ }
+ else {
+ size = obuf->ext_endp - obuf->extp;
+ eiov.vsize = 2;
+
+ iov[1].iov_base = obuf->extp;
+ iov[1].iov_len = size;
+ bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf));
+ }
- eiov.vsize = 2;
eiov.size = size;
eiov.iov = iov;
eiov.binv = bv;
+ if (size > (Uint) INT_MAX)
+ erts_exit(ERTS_DUMP_EXIT,
+ "Absurdly large distribution output data buffer "
+ "(%beu bytes) passed.\n",
+ size);
+
ASSERT(prt->drv_ptr->outputv);
#ifdef USE_VM_PROBES
@@ -2072,7 +2223,7 @@ erts_dist_command(Port *prt, int reds_limit)
Sint reds = ERTS_PORT_REDS_DIST_CMD_START;
Uint32 status;
Uint32 flags;
- Sint obufsize = 0;
+ Sint qsize, obufsize = 0;
ErtsDistOutputQueue oq, foq;
DistEntry *dep = prt->dist_entry;
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
@@ -2081,9 +2232,6 @@ erts_dist_command(Port *prt, int reds_limit)
ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
- erts_refc_inc(&dep->refc, 1); /* Otherwise dist_entry might be
- removed if port command fails */
-
erts_atomic_set_mb(&dep->dist_cmd_scheduled, 0);
erts_de_rlock(dep);
@@ -2094,7 +2242,6 @@ erts_dist_command(Port *prt, int reds_limit)
if (status & ERTS_DE_SFLG_EXITING) {
erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1);
- erts_deref_dist_entry(dep);
return reds + ERTS_PORT_REDS_DIST_CMD_EXIT;
}
@@ -2128,20 +2275,20 @@ erts_dist_command(Port *prt, int reds_limit)
if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) {
int preempt = 0;
do {
- Uint size;
- ErtsDistOutputBuf *fob;
-
- size = (*send)(prt, foq.first);
- esdp->io.out += (Uint64) size;
+ Uint size;
+ ErtsDistOutputBuf *fob;
+ size = (*send)(prt, foq.first);
+ erts_atomic64_inc_nob(&dep->out);
+ esdp->io.out += (Uint64) size;
#ifdef ERTS_RAW_DIST_MSG_DBG
- erts_fprintf(stderr, ">> ");
- bw(foq.first->extp, size);
+ erts_fprintf(stderr, ">> ");
+ bw(foq.first->extp, size);
#endif
- reds += ERTS_PORT_REDS_DIST_CMD_DATA(size);
- fob = foq.first;
- obufsize += size_obuf(fob);
- foq.first = foq.first->next;
- free_dist_obuf(fob);
+ reds += ERTS_PORT_REDS_DIST_CMD_DATA(size);
+ fob = foq.first;
+ obufsize += size_obuf(fob);
+ foq.first = foq.first->next;
+ free_dist_obuf(fob);
sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT);
if (sched_flags & ERTS_PTS_FLG_BUSY_PORT)
@@ -2202,31 +2349,33 @@ erts_dist_command(Port *prt, int reds_limit)
}
}
else {
+ int de_busy;
int preempt = 0;
while (oq.first && !preempt) {
- ErtsDistOutputBuf *fob;
- Uint size;
- oq.first->extp
- = erts_encode_ext_dist_header_finalize(oq.first->extp,
- dep->cache,
- flags);
- reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
- if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
- *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through'
- needed */
- ASSERT(&oq.first->data[0] <= oq.first->extp
- && oq.first->extp < oq.first->ext_endp);
- size = (*send)(prt, oq.first);
- esdp->io.out += (Uint64) size;
+ ErtsDistOutputBuf *fob;
+ Uint size;
+ oq.first->extp
+ = erts_encode_ext_dist_header_finalize(oq.first->extp,
+ dep->cache,
+ flags);
+ reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
+ if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
+ *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through'
+ needed */
+ ASSERT(&oq.first->data[0] <= oq.first->extp
+ && oq.first->extp < oq.first->ext_endp);
+ size = (*send)(prt, oq.first);
+ erts_atomic64_inc_nob(&dep->out);
+ esdp->io.out += (Uint64) size;
#ifdef ERTS_RAW_DIST_MSG_DBG
- erts_fprintf(stderr, ">> ");
- bw(oq.first->extp, size);
+ erts_fprintf(stderr, ">> ");
+ bw(oq.first->extp, size);
#endif
- reds += ERTS_PORT_REDS_DIST_CMD_DATA(size);
- fob = oq.first;
- obufsize += size_obuf(fob);
- oq.first = oq.first->next;
- free_dist_obuf(fob);
+ reds += ERTS_PORT_REDS_DIST_CMD_DATA(size);
+ fob = oq.first;
+ obufsize += size_obuf(fob);
+ oq.first = oq.first->next;
+ free_dist_obuf(fob);
sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT);
if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt)
@@ -2255,12 +2404,13 @@ erts_dist_command(Port *prt, int reds_limit)
* processes.
*/
erts_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
+ de_busy = !!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_BUSY);
+ qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
+ ASSERT(qsize >= 0);
obufsize = 0;
if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)
- && (dep->qflgs & ERTS_DE_QFLG_BUSY)
- && dep->qsize < erts_dist_buf_busy_limit) {
+ && de_busy && qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
int resumed;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
@@ -2280,8 +2430,13 @@ erts_dist_command(Port *prt, int reds_limit)
if (obufsize != 0) {
ASSERT(obufsize > 0);
erts_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
+#ifdef DEBUG
+ qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
+ ASSERT(qsize >= 0);
+#else
+ erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize);
+#endif
erts_mtx_unlock(&dep->qlock);
}
@@ -2299,8 +2454,6 @@ erts_dist_command(Port *prt, int reds_limit)
if (reds > INT_MAX/2)
reds = INT_MAX/2;
- erts_deref_dist_entry(dep);
-
return reds;
preempted:
@@ -2337,7 +2490,7 @@ erts_dist_command(Port *prt, int reds_limit)
#ifdef DEBUG
erts_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize == obufsize);
+ ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize);
erts_mtx_unlock(&dep->qlock);
#endif
}
@@ -2348,7 +2501,7 @@ erts_dist_command(Port *prt, int reds_limit)
* in out_queue.
*/
erts_mtx_lock(&dep->qlock);
- dep->qsize -= obufsize;
+ erts_atomic_add_nob(&dep->qsize, -obufsize);
obufsize = 0;
oq.last->next = dep->out_queue.first;
dep->out_queue.first = oq.first;
@@ -2362,6 +2515,370 @@ erts_dist_command(Port *prt, int reds_limit)
goto done;
}
+#if 0
+
+int
+dist_data_finalize(Process *c_p, int reds_limit)
+{
+ int reds = 5;
+ DistEntry *dep = ;
+ ErtsDistOutputQueue oq, foq;
+ ErtsDistOutputBuf *ob;
+ int preempt;
+
+
+ erts_mtx_lock(&dep->qlock);
+ flags = dep->flags;
+ oq.first = dep->out_queue.first;
+ oq.last = dep->out_queue.last;
+ dep->out_queue.first = NULL;
+ dep->out_queue.last = NULL;
+ erts_mtx_unlock(&dep->qlock);
+
+ if (!oq.first) {
+ ASSERT(!oq.last);
+ oq.first = dep->tmp_out_queue.first;
+ oq.last = dep->tmp_out_queue.last;
+ }
+ else {
+ ErtsDistOutputBuf *f, *l;
+ ASSERT(oq.last);
+ if (dep->tmp_out_queue.last) {
+ dep->tmp_out_queue.last->next = oq.first;
+ oq.first = dep->tmp_out_queue.first;
+ }
+ }
+
+ if (!oq.first) {
+ /* Nothing to do... */
+ ASSERT(!oq.last);
+ return reds;
+ }
+
+ foq.first = dep->finalized_out_queue.first;
+ foq.last = dep->finalized_out_queue.last;
+
+ preempt = 0;
+ ob = oq.first;
+ ASSERT(ob);
+
+ do {
+ ob->extp = erts_encode_ext_dist_header_finalize(ob->extp,
+ dep->cache,
+ flags);
+ if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
+ *--ob->extp = PASS_THROUGH; /* Old node; 'pass through'
+ needed */
+ ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp);
+ reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
+ preempt = reds > reds_limit;
+ if (preempt)
+ break;
+ ob = ob->next;
+ } while (ob);
+ /*
+ * At least one buffer was finalized; if we got preempted,
+ * ob points to the last buffer that we finalized.
+ */
+ if (foq.last)
+ foq.last->next = oq.first;
+ else
+ foq.first = oq.first;
+ if (!preempt) {
+ /* All buffers finalized */
+ foq.last = oq.last;
+ oq.first = oq.last = NULL;
+ }
+ else {
+ /* Not all buffers finalized; split oq. */
+ foq.last = ob;
+ oq.first = ob->next;
+ if (oq.first)
+ ob->next = NULL;
+ else
+ oq.last = NULL;
+ }
+
+ dep->finalized_out_queue.first = foq.first;
+ dep->finalized_out_queue.last = foq.last;
+ dep->tmp_out_queue.first = oq.first;
+ dep->tmp_out_queue.last = oq.last;
+
+ return reds;
+}
+
+#endif
+
+BIF_RETTYPE
+dist_ctrl_get_data_notification_1(BIF_ALIST_1)
+{
+ DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
+ erts_aint32_t qflgs;
+ erts_aint_t qsize;
+ Eterm receiver = NIL;
+
+ if (!dep)
+ BIF_ERROR(BIF_P, EXC_NOTSUP);
+
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ /*
+ * Caller is the only one that can consume from this queue
+ * and the only one that can set the req-info flag...
+ */
+
+ erts_de_rlock(dep);
+
+ ASSERT(dep->cid == BIF_P->common.id);
+
+ qflgs = erts_atomic32_read_acqb(&dep->qflgs);
+
+ if (!(qflgs & ERTS_DE_QFLG_REQ_INFO)) {
+ qsize = erts_atomic_read_acqb(&dep->qsize);
+ ASSERT(qsize >= 0);
+ if (qsize > 0)
+ receiver = BIF_P->common.id; /* Notify ourselves... */
+ else { /* Empty queue; set req-info flag... */
+ qflgs = erts_atomic32_read_bor_mb(&dep->qflgs,
+ ERTS_DE_QFLG_REQ_INFO);
+ qsize = erts_atomic_read_acqb(&dep->qsize);
+ ASSERT(qsize >= 0);
+ if (qsize > 0) {
+ qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
+ ~ERTS_DE_QFLG_REQ_INFO);
+ if (qflgs & ERTS_DE_QFLG_REQ_INFO)
+ receiver = BIF_P->common.id; /* Notify ourselves... */
+ /* else: someone else will notify us... */
+ }
+ /* else: still empty queue... */
+ }
+ }
+ /* else: Already requested... */
+
+ erts_de_runlock(dep);
+
+ if (is_internal_pid(receiver))
+ notify_dist_data(BIF_P, receiver);
+
+ BIF_RET(am_ok);
+}
+
+BIF_RETTYPE
+dist_ctrl_put_data_2(BIF_ALIST_2)
+{
+ DistEntry *dep;
+ ErlDrvSizeT size;
+ Eterm input_handler;
+
+ if (is_binary(BIF_ARG_2))
+ size = binary_size(BIF_ARG_2);
+ else if (is_nil(BIF_ARG_2))
+ size = 0;
+ else if (is_list(BIF_ARG_2))
+ BIF_TRAP2(dist_ctrl_put_data_trap,
+ BIF_P, BIF_ARG_1, BIF_ARG_2);
+ else
+ BIF_ERROR(BIF_P, BADARG);
+
+ dep = erts_dhandle_to_dist_entry(BIF_ARG_1);
+ if (!dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ input_handler = (Eterm) erts_atomic_read_nob(&dep->input_handler);
+
+ if (input_handler != BIF_P->common.id)
+ BIF_ERROR(BIF_P, EXC_NOTSUP);
+
+ erts_atomic64_inc_nob(&dep->in);
+
+ if (size != 0) {
+ byte *data, *temp_alloc = NULL;
+
+ data = (byte *) erts_get_aligned_binary_bytes(BIF_ARG_2, &temp_alloc);
+ if (!data)
+ BIF_ERROR(BIF_P, BADARG);
+
+ erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
+
+ (void) erts_net_message(NULL, dep, NULL, 0, data, size);
+ /*
+ * We ignore any decode failures. On fatal failures the
+ * connection will be taken down by killing the
+ * distribution channel controller...
+ */
+
+ erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
+
+ BUMP_REDS(BIF_P, 5);
+
+ erts_free_aligned_binary_bytes(temp_alloc);
+
+ }
+
+ BIF_RET(am_ok);
+}
+
+BIF_RETTYPE
+dist_get_stat_1(BIF_ALIST_1)
+{
+ Sint64 read, write, pend;
+ Eterm res, *hp, **hpp;
+ Uint sz, *szp;
+ DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1);
+
+ if (!dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ erts_de_rlock(dep);
+
+ read = (Sint64) erts_atomic64_read_nob(&dep->in);
+ write = (Sint64) erts_atomic64_read_nob(&dep->out);
+ pend = (Sint64) erts_atomic_read_nob(&dep->qsize);
+
+ erts_de_runlock(dep);
+
+ sz = 0;
+ szp = &sz;
+ hpp = NULL;
+
+ while (1) {
+ res = erts_bld_tuple(hpp, szp, 4,
+ am_ok,
+ erts_bld_sint64(hpp, szp, read),
+ erts_bld_sint64(hpp, szp, write),
+ pend ? am_true : am_false);
+ if (hpp)
+ break;
+ hp = HAlloc(BIF_P, sz);
+ hpp = &hp;
+ szp = NULL;
+ }
+
+ BIF_RET(res);
+}
+
+BIF_RETTYPE
+dist_ctrl_input_handler_2(BIF_ALIST_2)
+{
+ DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
+
+ if (!dep)
+ BIF_ERROR(BIF_P, EXC_NOTSUP);
+
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ if (is_not_internal_pid(BIF_ARG_2))
+ BIF_ERROR(BIF_P, BADARG);
+
+ erts_atomic_set_nob(&dep->input_handler,
+ (erts_aint_t) BIF_ARG_2);
+
+ BIF_RET(am_ok);
+}
+
+BIF_RETTYPE
+dist_ctrl_get_data_1(BIF_ALIST_1)
+{
+ DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
+ int reds = 1;
+ ErtsDistOutputBuf *obuf;
+ Eterm *hp;
+ ProcBin *pb;
+ erts_aint_t qsize;
+
+ if (!dep)
+ BIF_ERROR(BIF_P, EXC_NOTSUP);
+
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ erts_de_rlock(dep);
+
+ if (dep->status & ERTS_DE_SFLG_EXITING)
+ goto return_none;
+
+ ASSERT(dep->cid == BIF_P->common.id);
+
+#if 0
+ if (dep->finalized_out_queue.first) {
+ obuf = dep->finalized_out_queue.first;
+ dep->finalized_out_queue.first = obuf->next;
+ if (!obuf->next)
+ dep->finalized_out_queue.last = NULL;
+ }
+ else
+#endif
+ {
+ if (!dep->tmp_out_queue.first) {
+ ASSERT(!dep->tmp_out_queue.last);
+ qsize = erts_atomic_read_acqb(&dep->qsize);
+ if (qsize > 0) {
+ erts_mtx_lock(&dep->qlock);
+ dep->tmp_out_queue.first = dep->out_queue.first;
+ dep->tmp_out_queue.last = dep->out_queue.last;
+ dep->out_queue.first = NULL;
+ dep->out_queue.last = NULL;
+ erts_mtx_unlock(&dep->qlock);
+ }
+ }
+
+ if (!dep->tmp_out_queue.first) {
+ ASSERT(!dep->tmp_out_queue.last);
+ return_none:
+ erts_de_runlock(dep);
+ BIF_RET(am_none);
+ }
+ else {
+ obuf = dep->tmp_out_queue.first;
+ dep->tmp_out_queue.first = obuf->next;
+ if (!obuf->next)
+ dep->tmp_out_queue.last = NULL;
+ }
+
+ obuf->extp = erts_encode_ext_dist_header_finalize(obuf->extp,
+ dep->cache,
+ dep->flags);
+ reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
+ if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE))
+ *--obuf->extp = PASS_THROUGH; /* 'pass through' needed */
+ ASSERT(&obuf->data[0] <= obuf->extp
+ && obuf->extp < obuf->ext_endp);
+ }
+
+ erts_atomic64_inc_nob(&dep->out);
+
+ erts_de_runlock(dep);
+
+ hp = HAlloc(BIF_P, PROC_BIN_SIZE);
+ pb = (ProcBin *) (char *) hp;
+ pb->thing_word = HEADER_PROC_BIN;
+ pb->size = obuf->ext_endp - obuf->extp;
+ pb->next = MSO(BIF_P).first;
+ MSO(BIF_P).first = (struct erl_off_heap_header*) pb;
+ pb->val = ErtsDistOutputBuf2Binary(obuf);
+ pb->bytes = (byte*) obuf->extp;
+ pb->flags = 0;
+
+ qsize = erts_atomic_add_read_nob(&dep->qsize, -size_obuf(obuf));
+ ASSERT(qsize >= 0);
+
+ if (qsize < erts_dist_buf_busy_limit/2
+ && (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY)) {
+ ErtsProcList *resume_procs = NULL;
+ erts_mtx_lock(&dep->qlock);
+ resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
+ erts_mtx_unlock(&dep->qlock);
+ if (resume_procs) {
+ int resumed = erts_resume_processes(resume_procs);
+ reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
+ }
+ }
+
+ BIF_RET2(make_binary(pb), reds);
+}
+
void
erts_dist_port_not_busy(Port *prt)
{
@@ -2385,18 +2902,20 @@ void
erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id)
{
erts_de_rwlock(dep);
- if (is_internal_port(dep->cid)
- && connection_id == dep->connection_id
+ if (connection_id == dep->connection_id
&& !(dep->status & ERTS_DE_SFLG_EXITING)) {
dep->status |= ERTS_DE_SFLG_EXITING;
erts_mtx_lock(&dep->qlock);
- ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
- dep->qflgs |= ERTS_DE_QFLG_EXIT;
+ ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
+ erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT);
erts_mtx_unlock(&dep->qlock);
- erts_schedule_dist_command(NULL, dep);
+ if (is_internal_port(dep->cid))
+ erts_schedule_dist_command(NULL, dep);
+ else if (is_internal_pid(dep->cid))
+ schedule_kill_dist_ctrl_proc(dep->cid);
}
erts_de_rwunlock(dep);
}
@@ -2513,9 +3032,6 @@ info_dist_entry(fmtfn_t to, void *arg, DistEntry *dep, int visible, int connecte
}
erts_print(to, arg, "Name: %T", dep->sysname);
-#ifdef DEBUG
- erts_print(to, arg, " (refc=%d)", erts_refc_read(&dep->refc, 0));
-#endif
erts_print(to, arg, "\n");
if (!connected && is_nil(dep->cid)) {
if (dep->nlinks) {
@@ -2635,17 +3151,23 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2)
goto error;
}
- net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN,
- am_net_kernel, ERTS_PROC_LOCK_MAIN, 0);
- if (!net_kernel)
+ net_kernel = erts_whereis_process(BIF_P,
+ ERTS_PROC_LOCK_MAIN,
+ am_net_kernel,
+ ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS,
+ 0);
+ if (!net_kernel || ERTS_PROC_GET_DIST_ENTRY(net_kernel))
goto error;
/* By setting F_DISTRIBUTION on net_kernel,
- * do_net_exist will be called when net_kernel is terminated !! */
+ * erts_do_net_exits will be called when net_kernel is terminated !! */
net_kernel->flags |= F_DISTRIBUTION;
- if (net_kernel != BIF_P)
- erts_proc_unlock(net_kernel, ERTS_PROC_LOCK_MAIN);
+ erts_proc_unlock(net_kernel,
+ (ERTS_PROC_LOCK_STATUS
+ | ((net_kernel != BIF_P)
+ ? ERTS_PROC_LOCK_MAIN
+ : 0)));
#ifdef DEBUG
erts_rwmtx_rlock(&erts_dist_table_rwmtx);
@@ -2662,6 +3184,14 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2)
erts_thr_progress_unblock();
erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
+ /*
+ * Note erts_this_dist_entry is changed by erts_set_this_node(),
+ * so we *need* to use the new one after erts_set_this_node()
+ * is called.
+ */
+ erts_ref_dist_entry(erts_this_dist_entry);
+ ERTS_PROC_SET_DIST_ENTRY(net_kernel, erts_this_dist_entry);
+
BIF_RET(am_true);
error:
@@ -2691,18 +3221,18 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
Eterm ic, oc;
Eterm *tp;
DistEntry *dep = NULL;
+ ErtsProcLocks proc_unlock = 0;
+ Process *proc;
Port *pp = NULL;
- /* Prepare for success */
- ERTS_BIF_PREP_RET(ret, am_true);
-
/*
* Check and pick out arguments
*/
if (!is_node_name_atom(BIF_ARG_1) ||
- is_not_internal_port(BIF_ARG_2) ||
- (erts_this_node->sysname == am_Noname)) {
+ !(is_internal_port(BIF_ARG_2)
+ || is_internal_pid(BIF_ARG_2))
+ || (erts_this_node->sysname == am_Noname)) {
goto badarg;
}
@@ -2746,77 +3276,120 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
else if (!dep)
goto system_limit; /* Should never happen!!! */
- pp = erts_id2port_sflgs(BIF_ARG_2,
- BIF_P,
- ERTS_PROC_LOCK_MAIN,
- ERTS_PORT_SFLGS_INVALID_LOOKUP);
- erts_de_rwlock(dep);
+ if (is_internal_pid(BIF_ARG_2)) {
+ if (BIF_P->common.id == BIF_ARG_2) {
+ proc_unlock = 0;
+ proc = BIF_P;
+ }
+ else {
+ proc_unlock = ERTS_PROC_LOCK_MAIN;
+ proc = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN,
+ BIF_ARG_2, proc_unlock);
+ }
+ erts_de_rwlock(dep);
- if (!pp || (erts_atomic32_read_nob(&pp->state)
- & ERTS_PORT_SFLG_EXITING))
- goto badarg;
+ if (!proc)
+ goto badarg;
+ else if (proc == ERTS_PROC_LOCK_BUSY) {
+ proc_unlock = 0;
+ goto yield;
+ }
- if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
- goto badarg;
+ erts_proc_lock(proc, ERTS_PROC_LOCK_STATUS);
+ proc_unlock |= ERTS_PROC_LOCK_STATUS;
- if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep)
- goto done; /* Already set */
+ if (ERTS_PROC_GET_DIST_ENTRY(proc)) {
+ if (dep == ERTS_PROC_GET_DIST_ENTRY(proc)
+ && (proc->flags & F_DISTRIBUTION)
+ && dep->cid == BIF_ARG_2)
+ goto done;
+ goto badarg;
+ }
- if (dep->status & ERTS_DE_SFLG_EXITING) {
- /* Suspend on dist entry waiting for the exit to finish */
- ErtsProcList *plp = erts_proclist_create(BIF_P);
- plp->next = NULL;
- erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
- erts_mtx_lock(&dep->qlock);
- erts_proclist_store_last(&dep->suspended, plp);
- erts_mtx_unlock(&dep->qlock);
- goto yield;
- }
+ if (is_not_nil(dep->cid))
+ goto badarg;
- ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING));
+ proc->flags |= F_DISTRIBUTION;
+ ERTS_PROC_SET_DIST_ENTRY(proc, dep);
- if (pp->dist_entry || is_not_nil(dep->cid))
- goto badarg;
+ proc_unlock &= ~ERTS_PROC_LOCK_STATUS;
+ erts_proc_unlock(proc, ERTS_PROC_LOCK_STATUS);
- erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
+ dep->send = NULL; /* Only for distr ports... */
- /*
- * Dist-ports do not use the "busy port message queue" functionality, but
- * instead use "busy dist entry" functionality.
- */
- {
- ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
- erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
}
+ else {
- pp->dist_entry = dep;
+ pp = erts_id2port_sflgs(BIF_ARG_2,
+ BIF_P,
+ ERTS_PROC_LOCK_MAIN,
+ ERTS_PORT_SFLGS_INVALID_LOOKUP);
+ erts_de_rwlock(dep);
+
+ if (!pp || (erts_atomic32_read_nob(&pp->state)
+ & ERTS_PORT_SFLG_EXITING))
+ goto badarg;
+
+ if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
+ goto badarg;
+
+ if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep)
+ goto done; /* Already set */
+
+ if (dep->status & ERTS_DE_SFLG_EXITING) {
+ /* Suspend on dist entry waiting for the exit to finish */
+ ErtsProcList *plp = erts_proclist_create(BIF_P);
+ plp->next = NULL;
+ erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
+ erts_mtx_lock(&dep->qlock);
+ erts_proclist_store_last(&dep->suspended, plp);
+ erts_mtx_unlock(&dep->qlock);
+ goto yield;
+ }
- dep->version = version;
- dep->creation = 0;
+ ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING));
- ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
+ if (pp->dist_entry || is_not_nil(dep->cid))
+ goto badarg;
-#if 1
- dep->send = (pp->drv_ptr->outputv
- ? dist_port_commandv
- : dist_port_command);
-#else
- dep->send = dist_port_command;
-#endif
- ASSERT(dep->send);
+ erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
+
+ pp->dist_entry = dep;
+
+ ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
+
+ dep->send = (pp->drv_ptr->outputv
+ ? dist_port_commandv
+ : dist_port_command);
+ ASSERT(dep->send);
+
+ /*
+ * Dist-ports do not use the "busy port message queue" functionality, but
+ * instead use "busy dist entry" functionality.
+ */
+ {
+ ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
+ erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
+ }
+
+ }
+
+ dep->version = version;
+ dep->creation = 0;
#ifdef DEBUG
- erts_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize == 0);
- erts_mtx_unlock(&dep->qlock);
+ ASSERT(erts_atomic_read_nob(&dep->qsize) == 0);
#endif
- erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);
-
if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
create_cache(dep);
+ erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);
+
erts_de_rwunlock(dep);
+
+ ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep));
+
dep = NULL; /* inc of refc transferred to port (dist_entry field) */
inc_no_nodes();
@@ -2836,6 +3409,9 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
if (pp)
erts_port_release(pp);
+ if (proc_unlock)
+ erts_proc_unlock(proc, proc_unlock);
+
return ret;
yield:
@@ -3138,7 +3714,6 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options)
erts_proc_unlock(p, ERTS_PROC_LOCK_LINK);
done:
- erts_deref_dist_entry(dep);
BIF_RET(am_true);
}
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index 05016cafc5..d4765c50b8 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -44,6 +44,7 @@
#define DFLAG_UTF8_ATOMS 0x10000
#define DFLAG_MAP_TAG 0x20000
#define DFLAG_BIG_CREATION 0x40000
+#define DFLAG_SEND_SENDER 0x80000
/* All flags that should be enabled when term_to_binary/1 is used. */
#define TERM_TO_BINARY_DFLAGS (DFLAG_EXTENDED_REFERENCES \
@@ -74,6 +75,9 @@
#define DOP_DEMONITOR_P 20
#define DOP_MONITOR_P_EXIT 21
+#define DOP_SEND_SENDER 22
+#define DOP_SEND_SENDER_TT 23
+
/* distribution trap functions */
extern Export* dsend2_trap;
extern Export* dsend3_trap;
@@ -161,13 +165,10 @@ erts_dsig_prepare(ErtsDSigData *dsdp,
goto fail;
}
if (no_suspend) {
- failure = ERTS_DSIG_PREP_CONNECTED;
- erts_mtx_lock(&dep->qlock);
- if (dep->qflgs & ERTS_DE_QFLG_BUSY)
+ if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) {
failure = ERTS_DSIG_PREP_WOULD_SUSPEND;
- erts_mtx_unlock(&dep->qlock);
- if (failure == ERTS_DSIG_PREP_WOULD_SUSPEND)
goto fail;
+ }
}
dsdp->proc = proc;
dsdp->dep = dep;
@@ -349,6 +350,7 @@ typedef struct {
Eterm ctl_heap[6];
ErtsDSigData dsd;
DistEntry* dep_to_deref;
+ DistEntry *dep;
struct erts_dsig_send_context dss;
Eterm return_term;
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 2ff95a3338..d0942ccef8 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -3750,7 +3750,6 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1)
subres = make_link_list(BIF_P, dep->nlinks, NIL);
subres = make_link_list(BIF_P, dep->node_links, subres);
erts_de_links_unlock(dep);
- erts_deref_dist_entry(dep);
BIF_RET(subres);
} else {
BIF_RET(am_undefined);
@@ -3781,7 +3780,6 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1)
erts_de_links_lock(dep);
ml = make_monitor_list(BIF_P, dep->monitors);
erts_de_links_unlock(dep);
- erts_deref_dist_entry(dep);
BIF_RET(ml);
} else {
BIF_RET(am_undefined);
@@ -3796,7 +3794,6 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1)
else {
Uint cno = dist_entry_channel_no(dep);
res = make_small(cno);
- erts_deref_dist_entry(dep);
}
BIF_RET(res);
}
@@ -3858,15 +3855,14 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1)
DFLAG_BIT_BINARIES);
BIF_RET(erts_term_to_binary(BIF_P, tp[2], 0, dflags));
}
- else if (ERTS_IS_ATOM_STR("dist_port", tp[1])) {
+ else if (ERTS_IS_ATOM_STR("dist_ctrl", tp[1])) {
Eterm res = am_undefined;
DistEntry *dep = erts_sysname_to_connected_dist_entry(tp[2]);
if (dep) {
erts_de_rlock(dep);
- if (is_internal_port(dep->cid))
+ if (is_internal_port(dep->cid) || is_internal_pid(dep->cid))
res = dep->cid;
erts_de_runlock(dep);
- erts_deref_dist_entry(dep);
}
BIF_RET(res);
}
@@ -4275,7 +4271,6 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2)
con_id = dep->connection_id;
erts_de_runlock(dep);
erts_kill_dist_connection(dep, con_id);
- erts_deref_dist_entry(dep);
BIF_RET(am_true);
}
}
diff --git a/erts/emulator/beam/erl_monitors.c b/erts/emulator/beam/erl_monitors.c
index 67c552b364..1c840d89f6 100644
--- a/erts/emulator/beam/erl_monitors.c
+++ b/erts/emulator/beam/erl_monitors.c
@@ -993,7 +993,6 @@ Eterm erts_debug_dump_monitors_1(BIF_ALIST_1)
erts_dump_monitors(dep->monitors,0);
erts_de_links_unlock(dep);
erts_printf("Monitors dumped-------------------------\n");
- erts_deref_dist_entry(dep);
BIF_RET(am_true);
} else {
BIF_ERROR(p,BADARG);
@@ -1038,7 +1037,6 @@ Eterm erts_debug_dump_links_1(BIF_ALIST_1)
erts_dump_links(dep->nlinks,0);
erts_de_links_unlock(dep);
erts_printf("Links dumped----------------------------\n");
- erts_deref_dist_entry(dep);
BIF_RET(am_true);
} else {
BIF_ERROR(p,BADARG);
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index f8e9fec27a..0f3dfa797c 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -29,6 +29,8 @@
#include "error.h"
#include "erl_thr_progress.h"
#include "dtrace-wrapper.h"
+#include "erl_binary.h"
+#include "erl_bif_unique.h"
Hash erts_dist_table;
Hash erts_node_table;
@@ -57,6 +59,58 @@ static ErtsMonotonicTime node_tab_delete_delay;
/* -- The distribution table ---------------------------------------------- */
+#define ErtsBin2DistEntry(B) \
+ ((DistEntry *) ERTS_MAGIC_BIN_DATA((B)))
+#define ErtsDistEntry2Bin(DEP) \
+ ((Binary *) ERTS_MAGIC_BIN_FROM_DATA((DEP)))
+
+static ERTS_INLINE erts_aint_t
+de_refc_read(DistEntry *dep, erts_aint_t min)
+{
+ return erts_refc_read(&ErtsDistEntry2Bin(dep)->intern.refc, min);
+}
+
+static ERTS_INLINE erts_aint_t
+de_refc_inc_read(DistEntry *dep, erts_aint_t min)
+{
+ return erts_refc_inctest(&ErtsDistEntry2Bin(dep)->intern.refc, min);
+}
+
+static ERTS_INLINE void
+de_refc_inc(DistEntry *dep, erts_aint_t min)
+{
+ erts_refc_inc(&ErtsDistEntry2Bin(dep)->intern.refc, min);
+}
+
+static ERTS_INLINE void
+de_refc_dec(DistEntry *dep, erts_aint_t min)
+{
+#ifdef DEBUG
+ (void) erts_refc_read(&ErtsDistEntry2Bin(dep)->intern.refc, min+1);
+#endif
+ erts_bin_release(ErtsDistEntry2Bin(dep));
+}
+
+static ERTS_INLINE erts_aint_t
+de_refc_dec_read(DistEntry *dep, erts_aint_t min)
+{
+ return erts_refc_dectest(&ErtsDistEntry2Bin(dep)->intern.refc, min);
+}
+
+void
+erts_ref_dist_entry(DistEntry *dep)
+{
+ ASSERT(dep);
+ de_refc_inc(dep, 1);
+}
+
+void
+erts_deref_dist_entry(DistEntry *dep)
+{
+ ASSERT(dep);
+ de_refc_dec(dep, 0);
+}
+
#ifdef DEBUG
static int
is_in_de_list(DistEntry *dep, DistEntry *dep_list)
@@ -85,22 +139,39 @@ dist_table_cmp(void *dep1, void *dep2)
static void*
dist_table_alloc(void *dep_tmpl)
{
+#ifdef DEBUG
+ erts_aint_t refc;
+#endif
Eterm sysname;
+ Binary *bin;
DistEntry *dep;
erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
sysname = ((DistEntry *) dep_tmpl)->sysname;
- dep = (DistEntry *) erts_alloc(ERTS_ALC_T_DIST_ENTRY, sizeof(DistEntry));
+
+ bin = erts_create_magic_binary_x(sizeof(DistEntry),
+ erts_dist_entry_destructor,
+ ERTS_ALC_T_DIST_ENTRY,
+ 0);
+ dep = ErtsBin2DistEntry(bin);
dist_entries++;
+#ifdef DEBUG
+ refc =
+#else
+ (void)
+#endif
+ de_refc_dec_read(dep, -1);
+ ASSERT(refc == -1);
+
dep->prev = NULL;
- erts_refc_init(&dep->refc, -1);
erts_rwmtx_init_opt(&dep->rwmtx, &rwmtx_opt, "dist_entry", sysname,
ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION);
dep->sysname = sysname;
dep->cid = NIL;
+ erts_atomic_init_nob(&dep->input_handler, (erts_aint_t) NIL);
dep->connection_id = 0;
dep->status = 0;
dep->flags = 0;
@@ -114,12 +185,16 @@ dist_table_alloc(void *dep_tmpl)
erts_mtx_init(&dep->qlock, "dist_entry_out_queue", sysname,
ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION);
- dep->qflgs = 0;
- dep->qsize = 0;
+ erts_atomic32_init_nob(&dep->qflgs, 0);
+ erts_atomic_init_nob(&dep->qsize, 0);
+ erts_atomic64_init_nob(&dep->in, 0);
+ erts_atomic64_init_nob(&dep->out, 0);
dep->out_queue.first = NULL;
dep->out_queue.last = NULL;
dep->suspended = NULL;
+ dep->tmp_out_queue.first = NULL;
+ dep->tmp_out_queue.last = NULL;
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
@@ -181,7 +256,7 @@ dist_table_free(void *vdep)
#ifdef DEBUG
sys_memset(vdep, 0x77, sizeof(DistEntry));
#endif
- erts_free(ERTS_ALC_T_DIST_ENTRY, (void *) dep);
+ erts_bin_free(ErtsDistEntry2Bin(dep));
ASSERT(dist_entries > 0);
dist_entries--;
@@ -199,19 +274,52 @@ erts_dist_table_info(fmtfn_t to, void *to_arg)
erts_rwmtx_runlock(&erts_dist_table_rwmtx);
}
+static ERTS_INLINE DistEntry *find_dist_entry(Eterm sysname,
+ int inc_refc,
+ int connected_only)
+{
+ DistEntry *res;
+ DistEntry de;
+ de.sysname = sysname;
+ erts_rwmtx_rlock(&erts_dist_table_rwmtx);
+ res = hash_get(&erts_dist_table, (void *) &de);
+ if (res) {
+ if (connected_only && is_nil(res->cid))
+ res = NULL;
+ else {
+ int pend_delete;
+ erts_aint_t refc;
+ if (inc_refc) {
+ refc = de_refc_inc_read(res, 1);
+ pend_delete = refc < 2;
+ }
+ else {
+ refc = de_refc_read(res, 0);
+ pend_delete = refc < 1;
+ }
+ if (pend_delete) /* Pending delete */
+ de_refc_inc(res, 1);
+ }
+ }
+ erts_rwmtx_runlock(&erts_dist_table_rwmtx);
+ return res;
+}
+
DistEntry *
erts_channel_no_to_dist_entry(Uint cno)
{
+ /*
+ * Does NOT increase reference count!
+ */
+
/*
* For this node (and previous incarnations of this node),
* ERST_INTERNAL_CHANNEL_NO (will always be 0 I guess) is used as
* channel no. For other nodes, the atom index of the atom corresponding
* to the node name is used as channel no.
*/
- if(cno == ERST_INTERNAL_CHANNEL_NO) {
- erts_refc_inc(&erts_this_dist_entry->refc, 2);
+ if (cno == ERST_INTERNAL_CHANNEL_NO)
return erts_this_dist_entry;
- }
if((cno > MAX_ATOM_INDEX)
|| (cno >= atom_table_size())
@@ -220,80 +328,97 @@ erts_channel_no_to_dist_entry(Uint cno)
/* cno is a valid atom index; find corresponding dist entry (if there
is one) */
- return erts_find_dist_entry(make_atom(cno));
+ return find_dist_entry(make_atom(cno), 0, 0);
}
-
DistEntry *
erts_sysname_to_connected_dist_entry(Eterm sysname)
{
- DistEntry de;
- DistEntry *res_dep;
- de.sysname = sysname;
-
- if(erts_this_dist_entry->sysname == sysname) {
- erts_refc_inc(&erts_this_dist_entry->refc, 2);
+ /*
+ * Does NOT increase reference count!
+ */
+ if(erts_this_dist_entry->sysname == sysname)
return erts_this_dist_entry;
- }
-
- erts_rwmtx_rlock(&erts_dist_table_rwmtx);
- res_dep = (DistEntry *) hash_get(&erts_dist_table, (void *) &de);
- if (res_dep) {
- erts_aint_t refc = erts_refc_inctest(&res_dep->refc, 1);
- if (refc < 2) /* Pending delete */
- erts_refc_inc(&res_dep->refc, 1);
- }
- erts_rwmtx_runlock(&erts_dist_table_rwmtx);
- if (res_dep) {
- int deref;
- erts_rwmtx_rlock(&res_dep->rwmtx);
- deref = is_nil(res_dep->cid);
- erts_rwmtx_runlock(&res_dep->rwmtx);
- if (deref) {
- erts_deref_dist_entry(res_dep);
- res_dep = NULL;
- }
- }
- return res_dep;
+ return find_dist_entry(sysname, 0, 1);
}
DistEntry *erts_find_or_insert_dist_entry(Eterm sysname)
{
+ /*
+ * This function DOES increase reference count!
+ */
DistEntry *res;
DistEntry de;
erts_aint_t refc;
- res = erts_find_dist_entry(sysname);
+ res = find_dist_entry(sysname, 1, 0);
if (res)
return res;
de.sysname = sysname;
erts_rwmtx_rwlock(&erts_dist_table_rwmtx);
res = hash_put(&erts_dist_table, (void *) &de);
- refc = erts_refc_inctest(&res->refc, 0);
+ refc = de_refc_inc_read(res, 0);
if (refc < 2) /* New or pending delete */
- erts_refc_inc(&res->refc, 1);
+ de_refc_inc(res, 1);
erts_rwmtx_rwunlock(&erts_dist_table_rwmtx);
return res;
}
DistEntry *erts_find_dist_entry(Eterm sysname)
{
- DistEntry *res;
- DistEntry de;
- de.sysname = sysname;
- erts_rwmtx_rlock(&erts_dist_table_rwmtx);
- res = hash_get(&erts_dist_table, (void *) &de);
- if (res) {
- erts_aint_t refc = erts_refc_inctest(&res->refc, 1);
- if (refc < 2) /* Pending delete */
- erts_refc_inc(&res->refc, 1);
- }
- erts_rwmtx_runlock(&erts_dist_table_rwmtx);
- return res;
+ /*
+ * Does NOT increase reference count!
+ */
+ return find_dist_entry(sysname, 0, 0);
}
-static void try_delete_dist_entry(void *vdep)
+DistEntry *
+erts_dhandle_to_dist_entry(Eterm dhandle)
{
- DistEntry *dep = (DistEntry *) vdep;
+ Binary *bin;
+ if (!is_internal_magic_ref(dhandle))
+ return NULL;
+ bin = erts_magic_ref2bin(dhandle);
+ if (ERTS_MAGIC_BIN_DESTRUCTOR(bin) != erts_dist_entry_destructor)
+ return NULL;
+ return ErtsBin2DistEntry(bin);
+}
+
+Eterm
+erts_make_dhandle(Process *c_p, DistEntry *dep)
+{
+ Binary *bin;
+ Eterm *hp;
+
+ bin = ErtsDistEntry2Bin(dep);
+ ASSERT(bin);
+ ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == erts_dist_entry_destructor);
+ hp = HAlloc(c_p, ERTS_MAGIC_REF_THING_SIZE);
+ return erts_mk_magic_ref(&hp, &c_p->off_heap, bin);
+}
+
+static void try_delete_dist_entry(void *vbin);
+
+static void
+prepare_try_delete_dist_entry(void *vbin)
+{
+ Binary *bin = (Binary *) vbin;
+ DistEntry *dep = ErtsBin2DistEntry(bin);
+ Uint size;
+ erts_aint_t refc;
+
+ refc = de_refc_read(dep, 0);
+ if (refc > 0)
+ return;
+
+ size = ERTS_MAGIC_BIN_SIZE(sizeof(DistEntry));
+ erts_schedule_thr_prgr_later_cleanup_op(try_delete_dist_entry,
+ vbin, &dep->later_op, size);
+}
+
+static void try_delete_dist_entry(void *vbin)
+{
+ Binary *bin = (Binary *) vbin;
+ DistEntry *dep = ErtsBin2DistEntry(bin);
erts_aint_t refc;
erts_rwmtx_rwlock(&erts_dist_table_rwmtx);
@@ -312,26 +437,39 @@ static void try_delete_dist_entry(void *vdep)
*
* If refc > 0, the entry is in use. Keep the entry.
*/
- refc = erts_refc_dectest(&dep->refc, -1);
+ refc = de_refc_dec_read(dep, -1);
if (refc == -1)
(void) hash_erase(&erts_dist_table, (void *) dep);
erts_rwmtx_rwunlock(&erts_dist_table_rwmtx);
- if (refc == 0)
- erts_schedule_delete_dist_entry(dep);
+ if (refc == 0) {
+ if (node_tab_delete_delay == 0)
+ prepare_try_delete_dist_entry(vbin);
+ else if (node_tab_delete_delay > 0)
+ erts_start_timer_callback(node_tab_delete_delay,
+ prepare_try_delete_dist_entry,
+ vbin);
+ }
}
-void erts_schedule_delete_dist_entry(DistEntry *dep)
+int erts_dist_entry_destructor(Binary *bin)
{
- ASSERT(dep != erts_this_dist_entry);
- if (dep != erts_this_dist_entry) {
- if (node_tab_delete_delay == 0)
- try_delete_dist_entry((void *) dep);
- else if (node_tab_delete_delay > 0)
- erts_start_timer_callback(node_tab_delete_delay,
- try_delete_dist_entry,
- (void *) dep);
- }
+ DistEntry *dep = ErtsBin2DistEntry(bin);
+ erts_aint_t refc;
+
+ refc = de_refc_read(dep, -1);
+
+ if (refc == -1)
+ return 1; /* Allow deallocation of structure... */
+
+ if (node_tab_delete_delay == 0)
+ prepare_try_delete_dist_entry((void *) bin);
+ else if (node_tab_delete_delay > 0)
+ erts_start_timer_callback(node_tab_delete_delay,
+ prepare_try_delete_dist_entry,
+ (void *) bin);
+
+ return 0;
}
Uint
@@ -384,7 +522,7 @@ erts_set_dist_entry_not_connected(DistEntry *dep)
erts_rwmtx_rwlock(&erts_dist_table_rwmtx);
ASSERT(dep != erts_this_dist_entry);
- ASSERT(is_internal_port(dep->cid));
+ ASSERT(is_internal_port(dep->cid) || is_internal_pid(dep->cid));
if(dep->flags & DFLAG_PUBLISHED) {
if(dep->prev) {
@@ -439,7 +577,7 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags)
ASSERT(dep != erts_this_dist_entry);
ASSERT(is_nil(dep->cid));
- ASSERT(is_internal_port(cid));
+ ASSERT(is_internal_port(cid) || is_internal_pid(cid));
if(dep->prev) {
ASSERT(is_in_de_list(dep, erts_not_connected_dist_entries));
@@ -459,10 +597,19 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags)
dep->status |= ERTS_DE_SFLG_CONNECTED;
dep->flags = flags;
dep->cid = cid;
+ erts_atomic_set_nob(&dep->input_handler,
+ (erts_aint_t) cid);
+
dep->connection_id++;
dep->connection_id &= ERTS_DIST_EXT_CON_ID_MASK;
dep->prev = NULL;
+ erts_atomic64_set_nob(&dep->in, 0);
+ erts_atomic64_set_nob(&dep->out, 0);
+ erts_atomic32_set_nob(&dep->qflgs,
+ (is_internal_port(cid)
+ ? ERTS_DE_QFLG_PORT_CTRL
+ : ERTS_DE_QFLG_PROC_CTRL));
if(flags & DFLAG_PUBLISHED) {
dep->next = erts_visible_dist_entries;
if(erts_visible_dist_entries) {
@@ -716,19 +863,18 @@ void
erts_set_this_node(Eterm sysname, Uint creation)
{
ERTS_LC_ASSERT(erts_thr_progress_is_blocking());
- ASSERT(erts_refc_read(&erts_this_dist_entry->refc, 2));
+ ASSERT(2 <= de_refc_read(erts_this_dist_entry, 2));
if (erts_refc_dectest(&erts_this_node->refc, 0) == 0)
try_delete_node(erts_this_node);
- if (erts_refc_dectest(&erts_this_dist_entry->refc, 0) == 0)
- try_delete_dist_entry(erts_this_dist_entry);
+ erts_deref_dist_entry(erts_this_dist_entry);
erts_this_node = NULL; /* to make sure refc is bumped for this node */
erts_this_node = erts_find_or_insert_node(sysname, creation);
erts_this_dist_entry = erts_this_node->dist_entry;
- erts_refc_inc(&erts_this_dist_entry->refc, 2);
+ erts_ref_dist_entry(erts_this_dist_entry);
erts_this_node_sysname = erts_this_node_sysname_BUFFER;
erts_snprintf(erts_this_node_sysname, sizeof(erts_this_node_sysname_BUFFER),
@@ -797,9 +943,9 @@ void erts_init_node_tables(int dd_sec)
ASSERT(erts_this_node->dist_entry != NULL);
erts_this_dist_entry = erts_this_node->dist_entry;
/* +1 for erts_this_dist_entry */
- /* +1 for erts_this_node->dist_entry */
- erts_refc_init(&erts_this_dist_entry->refc, 2);
+ erts_ref_dist_entry(erts_this_dist_entry);
+ ASSERT(2 == de_refc_read(erts_this_dist_entry, 2));
erts_this_node_sysname = erts_this_node_sysname_BUFFER;
erts_snprintf(erts_this_node_sysname, sizeof(erts_this_node_sysname_BUFFER),
@@ -876,6 +1022,7 @@ static Eterm AM_node_references;
static Eterm AM_system;
static Eterm AM_timer;
static Eterm AM_delayed_delete_timer;
+static Eterm AM_thread_progress_delete_timer;
static void setup_reference_table(void);
static Eterm reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp);
@@ -965,6 +1112,7 @@ erts_get_node_and_dist_references(struct process *proc)
INIT_AM(timer);
INIT_AM(system);
INIT_AM(delayed_delete_timer);
+ INIT_AM(thread_progress_delete_timer);
references_atoms_need_init = 0;
}
@@ -1148,6 +1296,10 @@ insert_offheap2(ErlOffHeap *oh, void *arg)
insert_offheap(oh, a->type, a->id);
}
+#define ErtsIsDistEntryBinary(Bin) \
+ (((Bin)->intern.flags & BIN_FLAG_MAGIC) \
+ && ERTS_MAGIC_BIN_DESTRUCTOR((Bin)) == erts_dist_entry_destructor)
+
static void
insert_offheap(ErlOffHeap *oh, int type, Eterm id)
{
@@ -1158,7 +1310,10 @@ insert_offheap(ErlOffHeap *oh, int type, Eterm id)
for (u.hdr = oh->first; u.hdr; u.hdr = u.hdr->next) {
switch (thing_subtag(u.hdr->thing_word)) {
case REF_SUBTAG:
- if(IsMatchProgBinary(u.mref->mb)) {
+ if (ErtsIsDistEntryBinary(u.mref->mb))
+ insert_dist_entry(ErtsBin2DistEntry(u.mref->mb),
+ type, id, 0);
+ else if(IsMatchProgBinary(u.mref->mb)) {
InsertedBin *ib;
int insert_bin = 1;
for (ib = inserted_bins; ib; ib = ib->next)
@@ -1301,26 +1456,34 @@ insert_delayed_delete_node(void *state,
ErtsMonotonicTime timeout_pos,
void *vnp)
{
- DeclareTmpHeapNoproc(heap,3);
- UseTmpHeapNoproc(3);
+ Eterm heap[3];
insert_node((ErlNode *) vnp,
SYSTEM_REF,
TUPLE2(&heap[0], AM_system, AM_delayed_delete_timer));
- UnUseTmpHeapNoproc(3);
+}
+
+static void
+insert_thr_prgr_delete_dist_entry(void *arg, ErtsThrPrgrVal thr_prgr, void *vbin)
+{
+ DistEntry *dep = ErtsBin2DistEntry(vbin);
+ Eterm heap[3];
+ insert_dist_entry(dep,
+ SYSTEM_REF,
+ TUPLE2(&heap[0], AM_system, AM_thread_progress_delete_timer),
+ 0);
}
static void
insert_delayed_delete_dist_entry(void *state,
ErtsMonotonicTime timeout_pos,
- void *vdep)
+ void *vbin)
{
- DeclareTmpHeapNoproc(heap,3);
- UseTmpHeapNoproc(3);
- insert_dist_entry((DistEntry *) vdep,
+ DistEntry *dep = ErtsBin2DistEntry(vbin);
+ Eterm heap[3];
+ insert_dist_entry(dep,
SYSTEM_REF,
TUPLE2(&heap[0], AM_system, AM_delayed_delete_timer),
0);
- UnUseTmpHeapNoproc(3);
}
static void
@@ -1354,9 +1517,12 @@ setup_reference_table(void)
erts_debug_callback_timer_foreach(try_delete_node,
insert_delayed_delete_node,
NULL);
- erts_debug_callback_timer_foreach(try_delete_dist_entry,
+ erts_debug_callback_timer_foreach(prepare_try_delete_dist_entry,
insert_delayed_delete_dist_entry,
NULL);
+ erts_debug_later_op_foreach(try_delete_dist_entry,
+ insert_thr_prgr_delete_dist_entry,
+ NULL);
UseTmpHeapNoproc(3);
insert_node(erts_this_node,
@@ -1421,6 +1587,14 @@ setup_reference_table(void)
insert_links(ERTS_P_LINKS(proc), proc->common.id);
if (ERTS_P_MONITORS(proc))
insert_monitors(ERTS_P_MONITORS(proc), proc->common.id);
+ {
+ DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(proc);
+ if (dep)
+ insert_dist_entry(dep,
+ CTRL_REF,
+ proc->common.id,
+ 0);
+ }
}
}
@@ -1719,7 +1893,7 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp)
/* DistList = [{Dist, Refc, ReferenceIdList}] */
tup = MK_3TUP(referred_dists[i].dist->sysname,
- MK_UINT(erts_refc_read(&referred_dists[i].dist->refc, 0)),
+ MK_UINT(de_refc_read(referred_dists[i].dist, 0)),
dril);
dl = MK_CONS(tup, dl);
}
diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h
index 7974b25444..3bba673435 100644
--- a/erts/emulator/beam/erl_node_tables.h
+++ b/erts/emulator/beam/erl_node_tables.h
@@ -47,6 +47,9 @@
#define ERTS_PORT_TASK_ONLY_BASIC_TYPES__
#include "erl_port_task.h"
#undef ERTS_PORT_TASK_ONLY_BASIC_TYPES__
+#define ERTS_BINARY_TYPES_ONLY__
+#include "erl_binary.h"
+#undef ERTS_BINARY_TYPES_ONLY__
#define ERTS_NODE_TAB_DELAY_GC_DEFAULT (60)
#define ERTS_NODE_TAB_DELAY_GC_MAX (100*1000*1000)
@@ -60,11 +63,17 @@
#define ERTS_DE_SFLGS_ALL (ERTS_DE_SFLG_CONNECTED \
| ERTS_DE_SFLG_EXITING)
-#define ERTS_DE_QFLG_BUSY (((Uint32) 1) << 0)
-#define ERTS_DE_QFLG_EXIT (((Uint32) 1) << 1)
+#define ERTS_DE_QFLG_BUSY (((erts_aint32_t) 1) << 0)
+#define ERTS_DE_QFLG_EXIT (((erts_aint32_t) 1) << 1)
+#define ERTS_DE_QFLG_REQ_INFO (((erts_aint32_t) 1) << 2)
+#define ERTS_DE_QFLG_PORT_CTRL (((erts_aint32_t) 1) << 3)
+#define ERTS_DE_QFLG_PROC_CTRL (((erts_aint32_t) 1) << 4)
#define ERTS_DE_QFLGS_ALL (ERTS_DE_QFLG_BUSY \
- | ERTS_DE_QFLG_EXIT)
+ | ERTS_DE_QFLG_EXIT \
+ | ERTS_DE_QFLG_REQ_INFO \
+ | ERTS_DE_QFLG_PORT_CTRL \
+ | ERTS_DE_QFLG_PROC_CTRL)
#if defined(ARCH_64)
#define ERTS_DIST_OUTPUT_BUF_DBG_PATTERN ((Uint) 0xf713f713f713f713UL)
@@ -106,12 +115,13 @@ typedef struct dist_entry_ {
HashBucket hash_bucket; /* Hash bucket */
struct dist_entry_ *next; /* Next entry in dist_table (not sorted) */
struct dist_entry_ *prev; /* Previous entry in dist_table (not sorted) */
- erts_refc_t refc; /* Reference count */
- erts_rwmtx_t rwmtx; /* Protects all fields below until lck_mtx. */
+ erts_rwmtx_t rwmtx; /* Protects all fields below until lck_mtx. */
Eterm sysname; /* name@host atom for efficiency */
Uint32 creation; /* creation of connected node */
- Eterm cid; /* connection handler (pid or port), NIL == free */
+ erts_atomic_t input_handler; /* Input handler */
+ Eterm cid; /* connection handler (pid or port),
+ NIL == free */
Uint32 connection_id; /* Connection id incremented on connect */
Uint32 status; /* Slot status, like exiting reserved etc */
Uint32 flags; /* Distribution flags, like hidden,
@@ -119,7 +129,7 @@ typedef struct dist_entry_ {
unsigned long version; /* Protocol version */
- erts_mtx_t lnk_mtx; /* Protects node_links, nlinks, and
+ erts_mtx_t lnk_mtx; /* Protects node_links, nlinks, and
monitors. */
ErtsLink *node_links; /* In a dist entry, node links are kept
in a separate tree, while they are
@@ -131,12 +141,15 @@ typedef struct dist_entry_ {
ErtsLink *nlinks; /* Link tree with subtrees */
ErtsMonitor *monitors; /* Monitor tree */
- erts_mtx_t qlock; /* Protects qflgs and out_queue */
- Uint32 qflgs;
- Sint qsize;
+ erts_mtx_t qlock; /* Protects qflgs and out_queue */
+ erts_atomic32_t qflgs;
+ erts_atomic_t qsize;
+ erts_atomic64_t in;
+ erts_atomic64_t out;
ErtsDistOutputQueue out_queue;
struct ErtsProcList_ *suspended;
+ ErtsDistOutputQueue tmp_out_queue;
ErtsDistOutputQueue finalized_out_queue;
erts_atomic_t dist_cmd_scheduled;
ErtsPortTaskHandle dist_cmd;
@@ -144,6 +157,8 @@ typedef struct dist_entry_ {
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
struct cache* cache; /* The atom cache */
+
+ ErtsThrPrgrLaterOp later_op;
} DistEntry;
typedef struct erl_node_ {
@@ -193,12 +208,12 @@ Eterm erts_get_node_and_dist_references(struct process *);
int erts_lc_is_de_rwlocked(DistEntry *);
int erts_lc_is_de_rlocked(DistEntry *);
#endif
+int erts_dist_entry_destructor(Binary *bin);
+DistEntry *erts_dhandle_to_dist_entry(Eterm dhandle);
+Eterm erts_make_dhandle(Process *c_p, DistEntry *dep);
+void erts_ref_dist_entry(DistEntry *dep);
+void erts_deref_dist_entry(DistEntry *dep);
-#ifdef ERTS_ENABLE_LOCK_COUNT
-void erts_lcnt_update_distribution_locks(int enable);
-#endif
-
-ERTS_GLB_INLINE void erts_deref_dist_entry(DistEntry *dep);
ERTS_GLB_INLINE void erts_deref_node_entry(ErlNode *np);
ERTS_GLB_INLINE void erts_de_rlock(DistEntry *dep);
ERTS_GLB_INLINE void erts_de_runlock(DistEntry *dep);
@@ -210,14 +225,6 @@ ERTS_GLB_INLINE void erts_de_links_unlock(DistEntry *dep);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE void
-erts_deref_dist_entry(DistEntry *dep)
-{
- ASSERT(dep);
- if (erts_refc_dectest(&dep->refc, 0) == 0)
- erts_schedule_delete_dist_entry(dep);
-}
-
-ERTS_GLB_INLINE void
erts_deref_node_entry(ErlNode *np)
{
ASSERT(np);
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index c29b13c6c1..ab5030e5b9 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -776,6 +776,11 @@ erts_pre_init_process(void)
= ERTS_PSD_ETS_FIXED_TABLES_GET_LOCKS;
erts_psd_required_locks[ERTS_PSD_ETS_FIXED_TABLES].set_locks
= ERTS_PSD_ETS_FIXED_TABLES_SET_LOCKS;
+
+ erts_psd_required_locks[ERTS_PSD_DIST_ENTRY].get_locks
+ = ERTS_PSD_DIST_ENTRY_GET_LOCKS;
+ erts_psd_required_locks[ERTS_PSD_DIST_ENTRY].set_locks
+ = ERTS_PSD_DIST_ENTRY_SET_LOCKS;
#endif
}
@@ -13082,7 +13087,6 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext)
}
erts_destroy_monitor(rmon);
}
- erts_deref_dist_entry(dep);
}
} else {
ASSERT(is_pid(mon->u.pid) || is_port(mon->u.pid));
@@ -13322,7 +13326,6 @@ static void doit_exit_link(ErtsLink *lnk, void *vpcontext)
erts_de_links_unlock(dep);
if (rlnk)
erts_destroy_link(rlnk);
- erts_deref_dist_entry(dep);
}
break;
@@ -13430,7 +13433,7 @@ erts_continue_exit_process(Process *p)
ErtsMonitor *mon;
ErtsProcLocks curr_locks = ERTS_PROC_LOCK_MAIN;
Eterm reason = p->fvalue;
- DistEntry *dep;
+ DistEntry *dep = NULL;
erts_aint32_t state;
int delay_del_proc = 0;
@@ -13631,13 +13634,16 @@ erts_continue_exit_process(Process *p)
if (refc_inced && !(n & ERTS_PSFLG_IN_RUNQ))
erts_proc_dec_refc(p);
}
-
- dep = (p->flags & F_DISTRIBUTION) ? erts_this_dist_entry : NULL;
+
+ dep = ((p->flags & F_DISTRIBUTION)
+ ? ERTS_PROC_SET_DIST_ENTRY(p, NULL)
+ : NULL);
erts_proc_unlock(p, ERTS_PROC_LOCKS_ALL);
if (dep) {
- erts_do_net_exits(dep, reason);
+ erts_do_net_exits(dep, (reason == am_kill) ? am_killed : reason);
+ erts_deref_dist_entry(dep);
}
/*
@@ -14045,3 +14051,24 @@ erts_dbg_check_halloc_lock(Process *p)
return 0;
}
#endif
+
+void
+erts_debug_later_op_foreach(void (*callback)(void*),
+ void (*func)(void *, ErtsThrPrgrVal, void *),
+ void *arg)
+{
+ int six;
+ if (!erts_thr_progress_is_blocking())
+ ERTS_INTERNAL_ERROR("Not blocking thread progress");
+
+ for (six = 0; six < erts_no_schedulers; six++) {
+ ErtsSchedulerData *esdp = &erts_aligned_scheduler_data[six].esd;
+ ErtsThrPrgrLaterOp *lop = esdp->aux_work_data.later_op.first;
+
+ while (lop) {
+ if (lop->func == callback)
+ func(arg, lop->later, lop->data);
+ lop = lop->next;
+ }
+ }
+}
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index 7ca37882c2..150f22556c 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -697,6 +697,11 @@ extern ErtsAlignedSchedulerData *erts_aligned_dirty_io_scheduler_data;
int erts_lc_runq_is_locked(ErtsRunQueue *);
#endif
+void
+erts_debug_later_op_foreach(void (*callback)(void*),
+ void (*func)(void *, ErtsThrPrgrVal, void *),
+ void *arg);
+
#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
void erts_empty_runq(ErtsRunQueue *rq);
@@ -803,14 +808,15 @@ erts_reset_max_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi)
#define ERTS_PSD_NIF_TRAP_EXPORT 5
#define ERTS_PSD_ETS_OWNED_TABLES 6
#define ERTS_PSD_ETS_FIXED_TABLES 7
-#define ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF 8
+#define ERTS_PSD_DIST_ENTRY 8
+#define ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF 9 /* keep last... */
-#define ERTS_PSD_SIZE 9
+#define ERTS_PSD_SIZE 10
#if !defined(HIPE)
# undef ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF
# undef ERTS_PSD_SIZE
-# define ERTS_PSD_SIZE 8
+# define ERTS_PSD_SIZE 9
#endif
typedef struct {
@@ -844,6 +850,9 @@ typedef struct {
#define ERTS_PSD_ETS_FIXED_TABLES_GET_LOCKS ERTS_PROC_LOCK_MAIN
#define ERTS_PSD_ETS_FIXED_TABLES_SET_LOCKS ERTS_PROC_LOCK_MAIN
+#define ERTS_PSD_DIST_ENTRY_GET_LOCKS ERTS_PROC_LOCK_MAIN
+#define ERTS_PSD_DIST_ENTRY_SET_LOCKS ERTS_PROC_LOCK_MAIN
+
typedef struct {
ErtsProcLocks get_locks;
ErtsProcLocks set_locks;
@@ -2041,6 +2050,11 @@ erts_psd_set(Process *p, int ix, void *data)
#define ERTS_PROC_SET_NIF_TRAP_EXPORT(P, NTE) \
erts_psd_set((P), ERTS_PSD_NIF_TRAP_EXPORT, (void *) (NTE))
+#define ERTS_PROC_GET_DIST_ENTRY(P) \
+ ((DistEntry *) erts_psd_get((P), ERTS_PSD_DIST_ENTRY))
+#define ERTS_PROC_SET_DIST_ENTRY(P, DE) \
+ ((DistEntry *) erts_psd_set((P), ERTS_PSD_DIST_ENTRY, (void *) (DE)))
+
#ifdef HIPE
#define ERTS_PROC_GET_SUSPENDED_SAVED_CALLS_BUF(P) \
((struct saved_calls *) erts_psd_get((P), ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF))
diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c
index 0874be7250..1c2f8f9843 100644
--- a/erts/emulator/beam/external.c
+++ b/erts/emulator/beam/external.c
@@ -616,7 +616,7 @@ erts_make_dist_ext_copy(ErtsDistExternal *edep, Uint xsize)
sys_memcpy((void *) ep, (void *) edep, dist_ext_sz);
ep += dist_ext_sz;
if (new_edep->dep)
- erts_refc_inc(&new_edep->dep->refc, 1);
+ erts_ref_dist_entry(new_edep->dep);
new_edep->extp = ep;
new_edep->ext_endp = ep + ext_sz;
new_edep->heap_size = -1;
@@ -629,7 +629,8 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
byte *ext,
Uint size,
DistEntry *dep,
- ErtsAtomCache *cache)
+ ErtsAtomCache *cache,
+ Uint32 *connection_id)
{
#undef ERTS_EXT_FAIL
#undef ERTS_EXT_HDR_FAIL
@@ -650,33 +651,36 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
if (size < 2)
ERTS_EXT_FAIL;
+ if (!dep)
+ ERTS_INTERNAL_ERROR("Invalid use");
+
if (ep[0] != VERSION_MAGIC) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
- if (dep)
- erts_dsprintf(dsbufp,
- "** Got message from incompatible erlang on "
- "channel %d\n",
- dist_entry_channel_no(dep));
- else
- erts_dsprintf(dsbufp,
- "** Attempt to convert old incompatible "
- "binary %d\n",
- *ep);
+ erts_dsprintf(dsbufp,
+ "** Got message from incompatible erlang on "
+ "channel %d\n",
+ dist_entry_channel_no(dep));
erts_send_error_to_logger_nogl(dsbufp);
ERTS_EXT_FAIL;
}
edep->flags = 0;
edep->dep = dep;
- if (dep) {
- erts_de_rlock(dep);
- if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)
- edep->flags |= ERTS_DIST_EXT_DFLAG_HDR;
-
- edep->flags |= (dep->connection_id & ERTS_DIST_EXT_CON_ID_MASK);
- erts_de_runlock(dep);
+
+ erts_de_rlock(dep);
+
+ if ((dep->status & (ERTS_DE_SFLG_EXITING|ERTS_DE_SFLG_CONNECTED))
+ != ERTS_DE_SFLG_CONNECTED) {
+ erts_de_runlock(dep);
+ return ERTS_PREP_DIST_EXT_CLOSED;
}
+ if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)
+ edep->flags |= ERTS_DIST_EXT_DFLAG_HDR;
+
+ *connection_id = dep->connection_id;
+ edep->flags |= (dep->connection_id & ERTS_DIST_EXT_CON_ID_MASK);
+
if (ep[1] != DIST_HEADER) {
if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR)
ERTS_EXT_HDR_FAIL;
@@ -835,14 +839,15 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
ERTS_EXT_FAIL;
#endif
- return 0;
+ erts_de_runlock(dep);
+
+ return ERTS_PREP_DIST_EXT_SUCCESS;
#undef CHKSIZE
#undef ERTS_EXT_FAIL
#undef ERTS_EXT_HDR_FAIL
- bad_hdr:
- if (dep) {
+ bad_hdr: {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
erts_dsprintf(dsbufp,
"%T got a corrupted distribution header from %T "
@@ -855,10 +860,11 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
erts_dsprintf(dsbufp, ">>");
erts_send_warning_to_logger_nogl(dsbufp);
}
- fail:
- if (dep)
- erts_kill_dist_connection(dep, dep->connection_id);
- return -1;
+ fail: {
+ erts_de_runlock(dep);
+ erts_kill_dist_connection(dep, *connection_id);
+ }
+ return ERTS_PREP_DIST_EXT_FAILED;
}
static void
diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h
index f00426cc16..3c61d013da 100644
--- a/erts/emulator/beam/external.h
+++ b/erts/emulator/beam/external.h
@@ -185,8 +185,13 @@ ERTS_GLB_INLINE void *erts_dist_ext_trailer(ErtsDistExternal *);
ErtsDistExternal *erts_make_dist_ext_copy(ErtsDistExternal *, Uint);
void *erts_dist_ext_trailer(ErtsDistExternal *);
void erts_destroy_dist_ext_copy(ErtsDistExternal *);
+
+#define ERTS_PREP_DIST_EXT_FAILED (-1)
+#define ERTS_PREP_DIST_EXT_SUCCESS (0)
+#define ERTS_PREP_DIST_EXT_CLOSED (1)
+
int erts_prepare_dist_ext(ErtsDistExternal *, byte *, Uint,
- DistEntry *, ErtsAtomCache *);
+ DistEntry *, ErtsAtomCache *, Uint32 *);
Sint erts_decode_dist_ext_size(ErtsDistExternal *);
Eterm erts_decode_dist_ext(ErtsHeapFactory* factory, ErtsDistExternal *);
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index c8925e159e..04108e5f20 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -6682,6 +6682,7 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
else
erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len));
if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
+ erts_atomic64_inc_nob(&prt->dist_entry->in);
return erts_net_message(prt,
prt->dist_entry,
(byte*) hbuf, hlen,
@@ -6722,6 +6723,7 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
else
erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len));
if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
+ erts_atomic64_inc_nob(&prt->dist_entry->in);
if (len == 0)
return erts_net_message(prt,
prt->dist_entry,