aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c3822
1 files changed, 2255 insertions, 1567 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 09c83f1117..0633bff3c2 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1996-2016. All Rights Reserved.
+ * Copyright Ericsson AB 1996-2018. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
/* define this to get a lot of debug output */
/* #define ERTS_DIST_MSG_DBG */
+/* #define ERTS_RAW_DIST_MSG_DBG */
#ifdef HAVE_CONFIG_H
# include "config.h"
@@ -44,6 +45,7 @@
#include "erl_binary.h"
#include "erl_thr_progress.h"
#include "dtrace-wrapper.h"
+#include "erl_proc_sig_queue.h"
#define DIST_CTL_DEFAULT_SIZE 64
@@ -102,35 +104,27 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
-#define PASS_THROUGH 'p' /* This code should go */
-
int erts_is_alive; /* System must be blocked on change */
int erts_dist_buf_busy_limit;
/* distribution trap functions */
-Export* dsend2_trap = NULL;
-Export* dsend3_trap = NULL;
-/*Export* dsend_nosuspend_trap = NULL;*/
-Export* dlink_trap = NULL;
-Export* dunlink_trap = NULL;
Export* dmonitor_node_trap = NULL;
-Export* dgroup_leader_trap = NULL;
-Export* dexit_trap = NULL;
-Export* dmonitor_p_trap = NULL;
/* local variables */
-
+static Export *dist_ctrl_put_data_trap;
/* forward declarations */
-static void clear_dist_entry(DistEntry*);
static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy);
static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm);
static void init_nodes_monitors(void);
+static Sint abort_connection(DistEntry* dep, Uint32 conn_id);
+static ErtsDistOutputBuf* clear_de_out_queues(DistEntry*);
+static void free_de_out_queues(DistEntry*, ErtsDistOutputBuf*);
-static erts_smp_atomic_t no_caches;
-static erts_smp_atomic_t no_nodes;
+static erts_atomic_t no_caches;
+static erts_atomic_t no_nodes;
struct {
Eterm reason;
@@ -143,8 +137,8 @@ delete_cache(ErtsAtomCache *cache)
{
if (cache) {
erts_free(ERTS_ALC_T_DCACHE, (void *) cache);
- ASSERT(erts_smp_atomic_read_nob(&no_caches) > 0);
- erts_smp_atomic_dec_nob(&no_caches);
+ ASSERT(erts_atomic_read_nob(&no_caches) > 0);
+ erts_atomic_dec_nob(&no_caches);
}
}
@@ -155,14 +149,12 @@ create_cache(DistEntry *dep)
int i;
ErtsAtomCache *cp;
- ERTS_SMP_LC_ASSERT(
- is_internal_port(dep->cid)
- && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid)));
+ ERTS_LC_ASSERT(is_nil(dep->cid));
ASSERT(!dep->cache);
dep->cache = cp = (ErtsAtomCache*) erts_alloc(ERTS_ALC_T_DCACHE,
sizeof(ErtsAtomCache));
- erts_smp_atomic_inc_nob(&no_caches);
+ erts_atomic_inc_nob(&no_caches);
for (i = 0; i < sizeof(cp->in_arr)/sizeof(cp->in_arr[0]); i++) {
cp->in_arr[i] = THE_NON_VALUE;
cp->out_arr[i] = THE_NON_VALUE;
@@ -171,15 +163,17 @@ create_cache(DistEntry *dep)
Uint erts_dist_cache_size(void)
{
- return (Uint) erts_smp_atomic_read_mb(&no_caches)*sizeof(ErtsAtomCache);
+ return (Uint) erts_atomic_read_mb(&no_caches)*sizeof(ErtsAtomCache);
}
static ErtsProcList *
-get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs)
+get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs)
{
- ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&dep->qlock));
- dep->qflgs &= ~unset_qflgs;
- if (dep->qflgs & ERTS_DE_QFLG_EXIT) {
+ erts_aint32_t qflgs;
+ ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));
+ qflgs = erts_atomic32_read_band_acqb(&dep->qflgs, ~unset_qflgs);
+ qflgs &= ~unset_qflgs;
+ if (qflgs & ERTS_DE_QFLG_EXIT) {
/* No resume when exit has been scheduled */
return NULL;
}
@@ -191,6 +185,161 @@ get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs)
}
}
+#define ERTS_MON_LNK_FIRE_LIMIT 100
+
+static void monitor_connection_down(ErtsMonitor *mon, void *unused)
+{
+ if (erts_monitor_is_origin(mon))
+ erts_proc_sig_send_demonitor(mon);
+ else
+ erts_proc_sig_send_monitor_down(mon, am_noconnection);
+}
+
+static void link_connection_down(ErtsLink *lnk, void *vdist)
+{
+ erts_proc_sig_send_link_exit(NULL, THE_NON_VALUE, lnk,
+ am_noconnection, NIL);
+}
+
+typedef enum {
+ ERTS_CML_CLEANUP_STATE_LINKS,
+ ERTS_CML_CLEANUP_STATE_MONITORS,
+ ERTS_CML_CLEANUP_STATE_ONAME_MONITORS,
+ ERTS_CML_CLEANUP_STATE_NODE_MONITORS
+} ErtsConMonLnkCleaupState;
+
+typedef struct {
+ ErtsConMonLnkCleaupState state;
+ ErtsMonLnkDist *dist;
+ void *yield_state;
+ int trigger_node_monitors;
+ Eterm nodename;
+ Eterm visability;
+ Eterm reason;
+ ErlOffHeap oh;
+ Eterm heap[1];
+} ErtsConMonLnkCleanup;
+
+static void
+con_monitor_link_cleanup(void *vcmlcp)
+{
+ ErtsConMonLnkCleanup *cmlcp = vcmlcp;
+ ErtsMonLnkDist *dist = cmlcp->dist;
+ ErtsSchedulerData *esdp;
+ int yield;
+
+ switch (cmlcp->state) {
+ case ERTS_CML_CLEANUP_STATE_LINKS:
+ yield = erts_link_list_foreach_delete_yielding(&dist->links,
+ link_connection_down,
+ NULL, &cmlcp->yield_state,
+ ERTS_MON_LNK_FIRE_LIMIT);
+ if (yield)
+ break;
+
+ ASSERT(!cmlcp->yield_state);
+ cmlcp->state = ERTS_CML_CLEANUP_STATE_MONITORS;
+ case ERTS_CML_CLEANUP_STATE_MONITORS:
+ yield = erts_monitor_list_foreach_delete_yielding(&dist->monitors,
+ monitor_connection_down,
+ NULL, &cmlcp->yield_state,
+ ERTS_MON_LNK_FIRE_LIMIT);
+ if (yield)
+ break;
+
+ ASSERT(!cmlcp->yield_state);
+ cmlcp->state = ERTS_CML_CLEANUP_STATE_ONAME_MONITORS;
+ case ERTS_CML_CLEANUP_STATE_ONAME_MONITORS:
+ yield = erts_monitor_tree_foreach_delete_yielding(&dist->orig_name_monitors,
+ monitor_connection_down,
+ NULL, &cmlcp->yield_state,
+ ERTS_MON_LNK_FIRE_LIMIT/2);
+ if (yield)
+ break;
+
+ cmlcp->dist = NULL;
+ erts_mon_link_dist_dec_refc(dist);
+
+ ASSERT(!cmlcp->yield_state);
+ cmlcp->state = ERTS_CML_CLEANUP_STATE_NODE_MONITORS;
+ case ERTS_CML_CLEANUP_STATE_NODE_MONITORS:
+ if (cmlcp->trigger_node_monitors) {
+ send_nodes_mon_msgs(NULL,
+ am_nodedown,
+ cmlcp->nodename,
+ cmlcp->visability,
+ cmlcp->reason);
+ }
+ erts_cleanup_offheap(&cmlcp->oh);
+ erts_free(ERTS_ALC_T_CML_CLEANUP, vcmlcp);
+ return; /* done */
+ }
+
+ /* yield... */
+
+ esdp = erts_get_scheduler_data();
+ ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL);
+ erts_schedule_misc_aux_work((int) esdp->no,
+ con_monitor_link_cleanup,
+ (void *) cmlcp);
+}
+
+static void
+schedule_con_monitor_link_cleanup(ErtsMonLnkDist *dist,
+ Eterm nodename,
+ Eterm visability,
+ Eterm reason)
+{
+ if (dist || is_value(nodename)) {
+ ErtsSchedulerData *esdp;
+ ErtsConMonLnkCleanup *cmlcp;
+ Uint rsz, size;
+
+ size = sizeof(ErtsConMonLnkCleanup);
+
+ if (is_non_value(reason) || is_immed(reason)) {
+ rsz = 0;
+ size -= sizeof(Eterm);
+ }
+ else {
+ rsz = size_object(reason);
+ size += sizeof(Eterm) * (rsz - 1);
+ }
+
+ cmlcp = erts_alloc(ERTS_ALC_T_CML_CLEANUP, size);
+
+ ERTS_INIT_OFF_HEAP(&cmlcp->oh);
+
+ cmlcp->yield_state = NULL;
+ cmlcp->dist = dist;
+ if (!dist)
+ cmlcp->state = ERTS_CML_CLEANUP_STATE_NODE_MONITORS;
+ else {
+ cmlcp->state = ERTS_CML_CLEANUP_STATE_LINKS;
+ erts_mtx_lock(&dist->mtx);
+ ASSERT(dist->alive);
+ dist->alive = 0;
+ erts_mtx_unlock(&dist->mtx);
+ }
+
+ cmlcp->trigger_node_monitors = is_value(nodename);
+ cmlcp->nodename = nodename;
+ cmlcp->visability = visability;
+ if (rsz == 0)
+ cmlcp->reason = reason;
+ else {
+ Eterm *hp = &cmlcp->heap[0];
+ cmlcp->reason = copy_struct(reason, rsz, &hp, &cmlcp->oh);
+ }
+
+ esdp = erts_get_scheduler_data();
+ ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL);
+ erts_schedule_misc_aux_work((int) esdp->no,
+ con_monitor_link_cleanup,
+ (void *) cmlcp);
+ }
+}
+
/*
** A full node name constists of a "n@h"
**
@@ -242,185 +391,22 @@ int is_node_name_atom(Eterm a)
return is_node_name((char*)atom_tab(i)->name, atom_tab(i)->len);
}
-typedef struct {
- DistEntry *dep;
- Eterm *lhp;
-} NetExitsContext;
-
-/*
-** This function is called when a distribution
-** port or process terminates
-*/
-static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp)
-{
- Process *rp;
- ErtsMonitor *rmon;
- DistEntry *dep = ((NetExitsContext *) vnecp)->dep;
- ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK;
-
- rp = erts_pid2proc(NULL, 0, mon->pid, rp_locks);
- if (!rp)
- goto done;
-
- if (mon->type == MON_ORIGIN) {
- /* local pid is beeing monitored */
- rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref);
- /* ASSERT(rmon != NULL); nope, can happen during process exit */
- if (rmon != NULL) {
- erts_destroy_monitor(rmon);
- }
- } else {
- DeclareTmpHeapNoproc(lhp,3);
- Eterm watched;
- UseTmpHeapNoproc(3);
- ASSERT(mon->type == MON_TARGET);
- rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref);
- /* ASSERT(rmon != NULL); can happen during process exit */
- if (rmon != NULL) {
- ASSERT(is_atom(rmon->name) || is_nil(rmon->name));
- watched = (is_atom(rmon->name)
- ? TUPLE2(lhp, rmon->name, dep->sysname)
- : rmon->pid);
-#ifdef ERTS_SMP
- rp_locks |= ERTS_PROC_LOCKS_MSG_SEND;
- erts_smp_proc_lock(rp, ERTS_PROC_LOCKS_MSG_SEND);
-#endif
- erts_queue_monitor_message(rp, &rp_locks, mon->ref, am_process,
- watched, am_noconnection);
- erts_destroy_monitor(rmon);
- }
- UnUseTmpHeapNoproc(3);
- }
- erts_smp_proc_unlock(rp, rp_locks);
- done:
- erts_destroy_monitor(mon);
-}
-
-typedef struct {
- NetExitsContext *necp;
- ErtsLink *lnk;
-} LinkNetExitsContext;
-
-/*
-** This is the function actually doing the job of sending exit messages
-** for links in a dist entry upon net_exit (the node goes down), NB,
-** only process links, not node monitors are handled here,
-** they reside in a separate tree....
-*/
-static void doit_link_net_exits_sub(ErtsLink *sublnk, void *vlnecp)
-{
- ErtsLink *lnk = ((LinkNetExitsContext *) vlnecp)->lnk; /* the local pid */
- ErtsLink *rlnk;
- Process *rp;
-
- ASSERT(lnk->type == LINK_PID);
- if (is_internal_pid(lnk->pid)) {
- int xres;
- ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND;
-
- rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks);
- if (!rp) {
- goto done;
- }
-
- rlnk = erts_remove_link(&ERTS_P_LINKS(rp), sublnk->pid);
- xres = erts_send_exit_signal(NULL,
- sublnk->pid,
- rp,
- &rp_locks,
- am_noconnection,
- NIL,
- NULL,
- 0);
-
- if (rlnk) {
- erts_destroy_link(rlnk);
- if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) {
- /* We didn't exit the process and it is traced */
- trace_proc(NULL, 0, rp, am_getting_unlinked, sublnk->pid);
- }
- }
- erts_smp_proc_unlock(rp, rp_locks);
- }
- done:
- erts_destroy_link(sublnk);
-
-}
-
-
-
-
-
-/*
-** This function is called when a distribution
-** port or process terminates, once for each link on the high level,
-** it in turn traverses the link subtree for the specific link node...
-*/
-static void doit_link_net_exits(ErtsLink *lnk, void *vnecp)
-{
- LinkNetExitsContext lnec = {(NetExitsContext *) vnecp, lnk};
- ASSERT(lnk->type == LINK_PID);
- erts_sweep_links(ERTS_LINK_ROOT(lnk), &doit_link_net_exits_sub, (void *) &lnec);
-#ifdef DEBUG
- ERTS_LINK_ROOT(lnk) = NULL;
-#endif
- erts_destroy_link(lnk);
-}
-
-
-static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp)
-{
- DistEntry *dep = ((NetExitsContext *) vnecp)->dep;
- Eterm name = dep->sysname;
- Process *rp;
- ErtsLink *rlnk;
- Uint i,n;
- ASSERT(lnk->type == LINK_NODE);
- if (is_internal_pid(lnk->pid)) {
- ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK;
- ErlOffHeap *ohp;
- rp = erts_proc_lookup(lnk->pid);
- if (!rp)
- goto done;
- erts_smp_proc_lock(rp, rp_locks);
- rlnk = erts_remove_link(&ERTS_P_LINKS(rp), name);
- if (rlnk != NULL) {
- ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE));
- erts_destroy_link(rlnk);
- }
- n = ERTS_LINK_REFC(lnk);
- for (i = 0; i < n; ++i) {
- Eterm tup;
- Eterm *hp;
- ErtsMessage *msgp;
-
- msgp = erts_alloc_message_heap(rp, &rp_locks,
- 3, &hp, &ohp);
- tup = TUPLE2(hp, am_nodedown, name);
- erts_queue_message(rp, rp_locks, msgp, tup, am_system);
- }
- erts_smp_proc_unlock(rp, rp_locks);
- }
- done:
- erts_destroy_link(lnk);
-}
-
static void
set_node_not_alive(void *unused)
{
ErlHeapFragment *bp;
Eterm nodename = erts_this_dist_entry->sysname;
- ASSERT(erts_smp_atomic_read_nob(&no_nodes) == 0);
+ ASSERT(erts_atomic_read_nob(&no_nodes) == 0);
- erts_smp_thr_progress_block();
+ erts_thr_progress_block();
erts_set_this_node(am_Noname, 0);
erts_is_alive = 0;
send_nodes_mon_msgs(NULL, am_nodedown, nodename, am_visible, nodedown.reason);
nodedown.reason = NIL;
bp = nodedown.bp;
nodedown.bp = NULL;
- erts_smp_thr_progress_unblock();
+ erts_thr_progress_unblock();
if (bp)
free_message_buffer(bp);
}
@@ -428,7 +414,7 @@ set_node_not_alive(void *unused)
static ERTS_INLINE void
dec_no_nodes(void)
{
- erts_aint_t no = erts_smp_atomic_dec_read_mb(&no_nodes);
+ erts_aint_t no = erts_atomic_dec_read_mb(&no_nodes);
ASSERT(no >= 0);
ASSERT(erts_get_scheduler_id()); /* Need to be a scheduler */
if (no == 0)
@@ -441,12 +427,33 @@ static ERTS_INLINE void
inc_no_nodes(void)
{
#ifdef DEBUG
- erts_aint_t no = erts_smp_atomic_read_nob(&no_nodes);
+ erts_aint_t no = erts_atomic_read_nob(&no_nodes);
ASSERT(erts_is_alive ? no > 0 : no == 0);
#endif
- erts_smp_atomic_inc_mb(&no_nodes);
+ erts_atomic_inc_mb(&no_nodes);
+}
+
+static void
+kill_dist_ctrl_proc(void *vpid)
+{
+ erts_proc_sig_send_exit(NULL, (Eterm) vpid, (Eterm) vpid,
+ am_kill, NIL, 0);
}
-
+
+static void
+schedule_kill_dist_ctrl_proc(Eterm pid)
+{
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ int sched_id = 1;
+ if (!esdp || ERTS_SCHEDULER_IS_DIRTY(esdp))
+ sched_id = 1;
+ else
+ sched_id = (int) esdp->no;
+ erts_schedule_misc_aux_work(sched_id,
+ kill_dist_ctrl_proc,
+ (void *) (UWord) pid);
+}
+
/*
* proc is currently running or exiting process.
*/
@@ -456,58 +463,83 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */
DistEntry *tdep;
- int no_dist_port = 0;
+ int no_dist_ctrl;
+ int no_pending;
Eterm nd_reason = (reason == am_no_network
? am_no_network
: am_net_kernel_terminated);
- erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx);
+ int i = 0;
+ Eterm *dist_ctrl;
+ DistEntry** pending;
+
+ ERTS_UNDEF(dist_ctrl, NULL);
+ ERTS_UNDEF(pending, NULL);
- for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next)
- no_dist_port++;
- for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next)
- no_dist_port++;
+ erts_rwmtx_rlock(&erts_dist_table_rwmtx);
+
+ no_dist_ctrl = (erts_no_of_hidden_dist_entries +
+ erts_no_of_visible_dist_entries);
+ no_pending = erts_no_of_pending_dist_entries;
/* KILL all port controllers */
- if (no_dist_port == 0)
- erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx);
- else {
- Eterm def_buf[128];
- int i = 0;
- Eterm *dist_port;
-
- if (no_dist_port <= sizeof(def_buf)/sizeof(def_buf[0]))
- dist_port = &def_buf[0];
- else
- dist_port = erts_alloc(ERTS_ALC_T_TMP,
- sizeof(Eterm)*no_dist_port);
+ if (no_dist_ctrl) {
+ dist_ctrl = erts_alloc(ERTS_ALC_T_TMP,
+ sizeof(Eterm)*no_dist_ctrl);
for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) {
- ASSERT(is_internal_port(tdep->cid));
- dist_port[i++] = tdep->cid;
+ ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid));
+ ASSERT(i < no_dist_ctrl);
+ dist_ctrl[i++] = tdep->cid;
}
for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) {
- ASSERT(is_internal_port(tdep->cid));
- dist_port[i++] = tdep->cid;
+ ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid));
+ ASSERT(i < no_dist_ctrl);
+ dist_ctrl[i++] = tdep->cid;
}
- erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx);
-
- for (i = 0; i < no_dist_port; i++) {
- Port *prt = erts_port_lookup(dist_port[i],
- ERTS_PORT_SFLGS_INVALID_LOOKUP);
- if (!prt)
- continue;
- ASSERT(erts_atomic32_read_nob(&prt->state)
- & ERTS_PORT_SFLG_DISTRIBUTION);
+ ASSERT(i == no_dist_ctrl);
+ }
+ if (no_pending) {
+ pending = erts_alloc(ERTS_ALC_T_TMP, sizeof(DistEntry*)*no_pending);
+ i = 0;
+ for (tdep = erts_pending_dist_entries; tdep; tdep = tdep->next) {
+ ASSERT(is_nil(tdep->cid));
+ ASSERT(i < no_pending);
+ pending[i++] = tdep;
+ erts_ref_dist_entry(tdep);
+ }
+ ASSERT(i == no_pending);
+ }
+ erts_rwmtx_runlock(&erts_dist_table_rwmtx);
+
+ if (no_dist_ctrl) {
+ for (i = 0; i < no_dist_ctrl; i++) {
+ if (is_internal_pid(dist_ctrl[i]))
+ schedule_kill_dist_ctrl_proc(dist_ctrl[i]);
+ else {
+ Port *prt = erts_port_lookup(dist_ctrl[i],
+ ERTS_PORT_SFLGS_INVALID_LOOKUP);
+ if (prt) {
+ ASSERT(erts_atomic32_read_nob(&prt->state)
+ & ERTS_PORT_SFLG_DISTRIBUTION);
+
+ erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED,
+ prt, dist_ctrl[i], nd_reason, NULL);
+ }
+ }
+ }
+ erts_free(ERTS_ALC_T_TMP, dist_ctrl);
+ }
- erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED,
- prt, dist_port[i], nd_reason, NULL);
- }
+ if (no_pending) {
+ for (i = 0; i < no_pending; i++) {
+ abort_connection(pending[i], pending[i]->connection_id);
+ erts_deref_dist_entry(pending[i]);
+ }
+ erts_free(ERTS_ALC_T_TMP, pending);
+ }
- if (dist_port != &def_buf[0])
- erts_free(ERTS_ALC_T_TMP, dist_port);
- }
/*
- * When last dist port exits, node will be taken
+ * When last dist ctrl exits, node will be taken
* from alive to not alive.
*/
ASSERT(is_nil(nodedown.reason) && !nodedown.bp);
@@ -524,65 +556,78 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
&nodedown.bp->off_heap);
}
}
- else { /* Call from distribution port */
- NetExitsContext nec = {dep};
- ErtsLink *nlinks;
- ErtsLink *node_links;
- ErtsMonitor *monitors;
+ else { /* Call from distribution controller (port/process) */
+ ErtsMonLnkDist *mld;
+ ErtsAtomCache *cache;
+ ErtsProcList *suspendees;
+ ErtsDistOutputBuf *obuf;
Uint32 flags;
- erts_smp_atomic_set_mb(&dep->dist_cmd_scheduled, 1);
- erts_smp_de_rwlock(dep);
+ erts_atomic_set_mb(&dep->dist_cmd_scheduled, 1);
+ erts_de_rwlock(dep);
- ERTS_SMP_LC_ASSERT(is_internal_port(dep->cid)
- && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid)));
+ if (is_internal_port(dep->cid)) {
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid)));
- if (erts_port_task_is_scheduled(&dep->dist_cmd))
- erts_port_task_abort(&dep->dist_cmd);
+ if (erts_port_task_is_scheduled(&dep->dist_cmd))
+ erts_port_task_abort(&dep->dist_cmd);
+ }
- if (dep->status & ERTS_DE_SFLG_EXITING) {
+ if (dep->state == ERTS_DE_STATE_EXITING) {
#ifdef DEBUG
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT);
- erts_smp_mtx_unlock(&dep->qlock);
+ ASSERT(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT);
#endif
}
else {
- dep->status |= ERTS_DE_SFLG_EXITING;
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
- dep->qflgs |= ERTS_DE_QFLG_EXIT;
- erts_smp_mtx_unlock(&dep->qlock);
+ dep->state = ERTS_DE_STATE_EXITING;
+ erts_mtx_lock(&dep->qlock);
+ ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
+ erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT);
+ erts_mtx_unlock(&dep->qlock);
}
- erts_smp_de_links_lock(dep);
- monitors = dep->monitors;
- nlinks = dep->nlinks;
- node_links = dep->node_links;
- dep->monitors = NULL;
- dep->nlinks = NULL;
- dep->node_links = NULL;
- erts_smp_de_links_unlock(dep);
+ mld = dep->mld;
+ dep->mld = NULL;
nodename = dep->sysname;
flags = dep->flags;
+ erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) NIL);
+ cache = dep->cache;
+ dep->cache = NULL;
+
+ erts_mtx_lock(&dep->qlock);
+
+ erts_atomic64_set_nob(&dep->in, 0);
+ erts_atomic64_set_nob(&dep->out, 0);
+
+ obuf = clear_de_out_queues(dep);
+ suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
+
+ erts_mtx_unlock(&dep->qlock);
+ erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
+ dep->send = NULL;
+
erts_set_dist_entry_not_connected(dep);
- erts_smp_de_rwunlock(dep);
+ erts_de_rwunlock(dep);
- erts_sweep_monitors(monitors, &doit_monitor_net_exits, (void *) &nec);
- erts_sweep_links(nlinks, &doit_link_net_exits, (void *) &nec);
- erts_sweep_links(node_links, &doit_node_link_net_exits, (void *) &nec);
+ schedule_con_monitor_link_cleanup(mld,
+ nodename,
+ (flags & DFLAG_PUBLISHED
+ ? am_visible
+ : am_hidden),
+ (reason == am_normal
+ ? am_connection_closed
+ : reason));
- send_nodes_mon_msgs(NULL,
- am_nodedown,
- nodename,
- flags & DFLAG_PUBLISHED ? am_visible : am_hidden,
- reason == am_normal ? am_connection_closed : reason);
+ erts_resume_processes(suspendees);
- clear_dist_entry(dep);
+ delete_cache(cache);
+ free_de_out_queues(dep, obuf);
+ if (dep->transcode_ctx)
+ transcode_free_ctx(dep);
}
dec_no_nodes();
@@ -596,6 +641,14 @@ trap_function(Eterm func, int arity)
return erts_export_put(am_erlang, func, arity);
}
+/*
+ * Sync with dist_util.erl:
+ *
+ * -record(erts_dflags,
+ * {default, mandatory, addable, rejectable, strict_order}).
+ */
+static Eterm erts_dflags_record;
+
void init_dist(void)
{
init_nodes_monitors();
@@ -603,19 +656,24 @@ void init_dist(void)
nodedown.reason = NIL;
nodedown.bp = NULL;
- erts_smp_atomic_init_nob(&no_nodes, 0);
- erts_smp_atomic_init_nob(&no_caches, 0);
+ erts_atomic_init_nob(&no_nodes, 0);
+ erts_atomic_init_nob(&no_caches, 0);
/* Lookup/Install all references to trap functions */
- dsend2_trap = trap_function(am_dsend,2);
- dsend3_trap = trap_function(am_dsend,3);
- /* dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/
- dlink_trap = trap_function(am_dlink,1);
- dunlink_trap = trap_function(am_dunlink,1);
dmonitor_node_trap = trap_function(am_dmonitor_node,3);
- dgroup_leader_trap = trap_function(am_dgroup_leader,2);
- dexit_trap = trap_function(am_dexit, 2);
- dmonitor_p_trap = trap_function(am_dmonitor_p, 2);
+ dist_ctrl_put_data_trap = erts_export_put(am_erts_internal,
+ am_dist_ctrl_put_data,
+ 2);
+ {
+ Eterm* hp = erts_alloc(ERTS_ALC_T_LITERAL, (1+6)*sizeof(Eterm));
+ erts_dflags_record = TUPLE6(hp, am_erts_dflags,
+ make_small(DFLAG_DIST_DEFAULT),
+ make_small(DFLAG_DIST_MANDATORY),
+ make_small(DFLAG_DIST_ADDABLE),
+ make_small(DFLAG_DIST_REJECTABLE),
+ make_small(DFLAG_DIST_STRICT_ORDER));
+ erts_set_literal_tag(&erts_dflags_record, hp, (1+6));
+ }
}
#define ErtsDistOutputBuf2Binary(OB) \
@@ -627,10 +685,10 @@ alloc_dist_obuf(Uint size)
ErtsDistOutputBuf *obuf;
Uint obuf_size = sizeof(ErtsDistOutputBuf)+sizeof(byte)*(size-1);
Binary *bin = erts_bin_drv_alloc(obuf_size);
- erts_refc_init(&bin->refc, 1);
obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[0];
#ifdef DEBUG
obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN;
+ obuf->alloc_endp = obuf->data + size;
ASSERT(bin == ErtsDistOutputBuf2Binary(obuf));
#endif
return obuf;
@@ -641,8 +699,7 @@ free_dist_obuf(ErtsDistOutputBuf *obuf)
{
Binary *bin = ErtsDistOutputBuf2Binary(obuf);
ASSERT(obuf->dbg_pattern == ERTS_DIST_OUTPUT_BUF_DBG_PATTERN);
- if (erts_refc_dectest(&bin->refc, 0) == 0)
- erts_bin_free(bin);
+ erts_bin_release(bin);
}
static ERTS_INLINE Sint
@@ -652,26 +709,11 @@ size_obuf(ErtsDistOutputBuf *obuf)
return bin->orig_size;
}
-static void clear_dist_entry(DistEntry *dep)
+static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep)
{
- Sint obufsize = 0;
- ErtsAtomCache *cache;
- ErtsProcList *suspendees;
ErtsDistOutputBuf *obuf;
- erts_smp_de_rwlock(dep);
- cache = dep->cache;
- dep->cache = NULL;
-
-#ifdef DEBUG
- erts_smp_de_links_lock(dep);
- ASSERT(!dep->nlinks);
- ASSERT(!dep->node_links);
- ASSERT(!dep->monitors);
- erts_smp_de_links_unlock(dep);
-#endif
-
- erts_smp_mtx_lock(&dep->qlock);
+ ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));
if (!dep->out_queue.last)
obuf = dep->finalized_out_queue.first;
@@ -680,21 +722,24 @@ static void clear_dist_entry(DistEntry *dep)
obuf = dep->out_queue.first;
}
+ if (dep->tmp_out_queue.first) {
+ dep->tmp_out_queue.last->next = obuf;
+ obuf = dep->tmp_out_queue.first;
+ }
+
dep->out_queue.first = NULL;
dep->out_queue.last = NULL;
+ dep->tmp_out_queue.first = NULL;
+ dep->tmp_out_queue.last = NULL;
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
- dep->status = 0;
- suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
- erts_smp_mtx_unlock(&dep->qlock);
- erts_smp_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
- dep->send = NULL;
- erts_smp_de_rwunlock(dep);
-
- erts_resume_processes(suspendees);
+ return obuf;
+}
- delete_cache(cache);
+static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf)
+{
+ Sint obufsize = 0;
while (obuf) {
ErtsDistOutputBuf *fobuf;
@@ -705,14 +750,15 @@ static void clear_dist_entry(DistEntry *dep)
}
if (obufsize) {
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_lock(&dep->qlock);
+ ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize);
+ erts_atomic_add_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
+ erts_mtx_unlock(&dep->qlock);
}
}
-void erts_dsend_context_dtor(Binary* ctx_bin)
+int erts_dsend_context_dtor(Binary* ctx_bin)
{
ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin);
switch (ctx->dss.phase) {
@@ -727,8 +773,10 @@ void erts_dsend_context_dtor(Binary* ctx_bin)
if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) {
free_dist_obuf(ctx->dss.obuf);
}
- if (ctx->dep_to_deref)
- erts_deref_dist_entry(ctx->dep_to_deref);
+ if (ctx->deref_dep)
+ erts_deref_dist_entry(ctx->dep);
+
+ return 1;
}
Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx)
@@ -740,7 +788,7 @@ Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx)
Binary* ctx_bin = erts_create_magic_binary(sizeof(struct exported_ctx),
erts_dsend_context_dtor);
struct exported_ctx* dst = ERTS_MAGIC_BIN_DATA(ctx_bin);
- Eterm* hp = HAlloc(p, PROC_BIN_SIZE);
+ Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
sys_memcpy(&dst->ctx, ctx, sizeof(ErtsSendContext));
ASSERT(ctx->dss.ctl == make_tuple(ctx->ctl_heap));
@@ -749,7 +797,7 @@ Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx)
sys_memcpy(&dst->acm, ctx->dss.acmp, sizeof(ErtsAtomCacheMap));
dst->ctx.dss.acmp = &dst->acm;
}
- return erts_mk_magic_binary_term(&hp, &MSO(p), ctx_bin);
+ return erts_mk_magic_ref(&hp, &MSO(p), ctx_bin);
}
@@ -794,9 +842,8 @@ erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote)
}
-/* A local process that's beeing monitored by a remote one exits. We send:
- {DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason},
- which is rather sad as only the ref is needed, no pid's... */
+/* A local process that's being monitored by a remote one exits. We send:
+ {DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason} */
int
erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
Eterm ref, Eterm reason)
@@ -805,17 +852,18 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
DeclareTmpHeapNoproc(ctl_heap,6);
int res;
+ if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
+ /*
+ * Receiver does not support DOP_MONITOR_P_EXIT (see dsig_send_monitor)
+ */
+ return ERTS_DSIG_SEND_OK;
+ }
+
UseTmpHeapNoproc(6);
ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT),
watched, watcher, ref, reason);
-#ifdef DEBUG
- erts_smp_de_links_lock(dsdp->dep);
- ASSERT(!erts_lookup_monitor(dsdp->dep->monitors, ref));
- erts_smp_de_links_unlock(dsdp->dep);
-#endif
-
res = dsig_send_ctl(dsdp, ctl, 1);
UnUseTmpHeapNoproc(6);
return res;
@@ -832,6 +880,16 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
DeclareTmpHeapNoproc(ctl_heap,5);
int res;
+ if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
+ /*
+ * Receiver does not support DOP_MONITOR_P.
+ * Just avoid sending it and by doing that reduce this monitor
+ * to only supervise the connection. This will work for simple c-nodes
+ * with a 1-to-1 relation between "Erlang process" and OS-process.
+ */
+ return ERTS_DSIG_SEND_OK;
+ }
+
UseTmpHeapNoproc(5);
ctl = TUPLE4(&ctl_heap[0],
make_small(DOP_MONITOR_P),
@@ -844,8 +902,7 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
/* A local process monitoring a remote one wants to stop monitoring, either
because of a demonitor bif call or because the local process died. We send
- {DOP_DEMONITOR_P, Local pid, Remote pid or name, ref}, which is once again
- rather redundant as only the ref will be needed on the other side... */
+ {DOP_DEMONITOR_P, Local pid, Remote pid or name, ref} */
int
erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher,
Eterm watched, Eterm ref, int force)
@@ -854,6 +911,13 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher,
DeclareTmpHeapNoproc(ctl_heap,5);
int res;
+ if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
+ /*
+ * Receiver does not support DOP_DEMONITOR_P (see dsig_send_monitor)
+ */
+ return ERTS_DSIG_SEND_OK;
+ }
+
UseTmpHeapNoproc(5);
ctl = TUPLE4(&ctl_heap[0],
make_small(DOP_DEMONITOR_P),
@@ -864,6 +928,24 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher,
return res;
}
+static int can_send_seqtrace_token(ErtsSendContext* ctx, Eterm token) {
+ Eterm label;
+
+ if (ctx->dsd.flags & DFLAG_BIG_SEQTRACE_LABELS) {
+ /* The other end is capable of handling arbitrary seq_trace labels. */
+ return 1;
+ }
+
+ /* The other end only tolerates smalls, but since we could potentially be
+ * talking to an old 32-bit emulator from a 64-bit one, we have to check
+ * whether the label is small on any emulator. */
+ label = SEQ_TRACE_T_LABEL(token);
+
+ return is_small(label) &&
+ signed_val(label) <= (ERTS_SINT32_MAX >> _TAG_IMMED1_SIZE) &&
+ signed_val(label) >= (ERTS_SINT32_MIN >> _TAG_IMMED1_SIZE);
+}
+
int
erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
{
@@ -897,18 +979,38 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
"%T", remote);
msize = size_object(message);
if (have_seqtrace(token)) {
- tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
+ tok_label = SEQ_TRACE_T_DTRACE_LABEL(token);
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
}
}
#endif
- if (token != NIL)
- ctl = TUPLE4(&ctx->ctl_heap[0],
- make_small(DOP_SEND_TT), am_Empty, remote, token);
- else
- ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_SEND), am_Empty, remote);
+ {
+ Eterm dist_op, sender_id;
+ int send_token;
+
+ send_token = (token != NIL && can_send_seqtrace_token(ctx, token));
+
+ if (ctx->dsd.flags & DFLAG_SEND_SENDER) {
+ dist_op = make_small(send_token ?
+ DOP_SEND_SENDER_TT :
+ DOP_SEND_SENDER);
+ sender_id = sender->common.id;
+ } else {
+ dist_op = make_small(send_token ?
+ DOP_SEND_TT :
+ DOP_SEND);
+ sender_id = am_Empty;
+ }
+
+ if (send_token) {
+ ctl = TUPLE4(&ctx->ctl_heap[0], dist_op, sender_id, remote, token);
+ } else {
+ ctl = TUPLE3(&ctx->ctl_heap[0], dist_op, sender_id, remote);
+ }
+ }
+
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
DTRACE7(message_send_remote, sender_name, node_name, receiver_name,
@@ -954,19 +1056,20 @@ erts_dsig_send_reg_msg(Eterm remote_name, Eterm message,
"{%T,%s}", remote_name, node_name);
msize = size_object(message);
if (have_seqtrace(token)) {
- tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
+ tok_label = SEQ_TRACE_T_DTRACE_LABEL(token);
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
}
}
#endif
- if (token != NIL)
+ if (token != NIL && can_send_seqtrace_token(ctx, token))
ctl = TUPLE5(&ctx->ctl_heap[0], make_small(DOP_REG_SEND_TT),
sender->common.id, am_Empty, remote_name, token);
else
ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_REG_SEND),
sender->common.id, am_Empty, remote_name);
+
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
DTRACE7(message_send_remote, sender_name, node_name, receiver_name,
@@ -1018,7 +1121,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
erts_snprintf(reason_str, sizeof(DTRACE_CHARBUF_NAME(reason_str)),
"%T", reason);
if (have_seqtrace(token)) {
- tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
+ tok_label = SEQ_TRACE_T_DTRACE_LABEL(token);
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
}
@@ -1088,21 +1191,8 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
#include <valgrind/valgrind.h>
#include <valgrind/memcheck.h>
-#ifndef HAVE_VALGRIND_PRINTF_XML
-#define VALGRIND_PRINTF_XML VALGRIND_PRINTF
-#endif
-
# define PURIFY_MSG(msg) \
- do { \
- char buf__[1]; size_t bufsz__ = sizeof(buf__); \
- if (erts_sys_getenv_raw("VALGRIND_LOG_XML", buf__, &bufsz__) >= 0) { \
- VALGRIND_PRINTF_XML("<erlang_error_log>" \
- "%s, line %d: %s</erlang_error_log>\n", \
- __FILE__, __LINE__, msg); \
- } else { \
- VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg); \
- } \
- } while (0)
+ VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg)
#else
# define PURIFY_MSG(msg)
#endif
@@ -1119,13 +1209,13 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
int erts_net_message(Port *prt,
DistEntry *dep,
+ Uint32 conn_id,
byte *hbuf,
ErlDrvSizeT hlen,
byte *buf,
ErlDrvSizeT len)
{
ErtsDistExternal ede;
- byte *t;
Sint ctl_len;
Eterm arg;
Eterm from, to;
@@ -1141,8 +1231,6 @@ int erts_net_message(Port *prt,
Sint type;
Eterm token;
Eterm token_size;
- ErtsMonitor *mon;
- ErtsLink *lnk;
Uint tuple_arity;
int res;
#ifdef ERTS_DIST_MSG_DBG
@@ -1151,16 +1239,17 @@ int erts_net_message(Port *prt,
UseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(!prt || erts_lc_is_port_locked(prt));
if (!erts_is_alive) {
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
return 0;
}
- if (hlen != 0)
- goto data_error;
+
+ ASSERT(hlen == 0);
+
if (len == 0) { /* HANDLE TICK !!! */
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
return 0;
@@ -1171,38 +1260,31 @@ int erts_net_message(Port *prt,
bw(buf, len);
#endif
- if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)
- t = buf;
- else {
- /* Skip PASS_THROUGH */
- t = buf+1;
- len--;
- }
+ res = erts_prepare_dist_ext(&ede, buf, len, dep, conn_id, dep->cache);
- if (len == 0) {
- PURIFY_MSG("data error");
- goto data_error;
- }
-
- res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache);
-
- if (res >= 0)
- res = ctl_len = erts_decode_dist_ext_size(&ede);
- else {
+ switch (res) {
+ case ERTS_PREP_DIST_EXT_CLOSED:
+ return 0; /* Connection not alive; ignore signal... */
+ case ERTS_PREP_DIST_EXT_FAILED:
#ifdef ERTS_DIST_MSG_DBG
erts_fprintf(stderr, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n");
bw(buf, orig_len);
#endif
- ctl_len = 0;
- }
-
- if (res < 0) {
+ goto data_error;
+ case ERTS_PREP_DIST_EXT_SUCCESS:
+ ctl_len = erts_decode_dist_ext_size(&ede);
+ if (ctl_len < 0) {
#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n");
- bw(buf, orig_len);
+ erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n");
+ bw(buf, orig_len);
#endif
- PURIFY_MSG("data error");
- goto data_error;
+ PURIFY_MSG("data error");
+ goto data_error;
+ }
+ break;
+ default:
+ ERTS_INTERNAL_ERROR("Unexpected result from erts_prepare_dist_ext()");
+ break;
}
if (ctl_len > DIST_CTL_DEFAULT_SIZE) {
@@ -1220,10 +1302,9 @@ int erts_net_message(Port *prt,
PURIFY_MSG("data error");
goto decode_error;
}
- ctl_len = t - buf;
#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, "<<%s CTL: %T\n", len != orig_len ? "P" : " ", arg);
+ erts_fprintf(stderr, "<< CTL: %T\n", arg);
#endif
if (is_not_tuple(arg) ||
@@ -1233,90 +1314,92 @@ int erts_net_message(Port *prt,
}
token_size = 0;
+ token = NIL;
switch (type = unsigned_val(tuple[1])) {
- case DOP_LINK:
+ case DOP_LINK: {
+ ErtsDSigData dsd;
+ int code;
+
if (tuple_arity != 3) {
goto invalid_message;
}
from = tuple[2];
to = tuple[3]; /* local proc to link to */
- if (is_not_pid(from) || is_not_pid(to)) {
- goto invalid_message;
- }
+ if (is_not_external_pid(from))
+ goto invalid_message;
- rp = erts_pid2proc_opt(NULL, 0,
- to, ERTS_PROC_LOCK_LINK,
- ERTS_P2P_FLG_ALLOW_OTHER_X);
- if (!rp) {
- /* This is tricky (we MUST force a distributed send) */
- ErtsDSigData dsd;
- int code;
- code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0);
- if (code == ERTS_DSIG_PREP_CONNECTED) {
- code = erts_dsig_send_exit(&dsd, to, from, am_noproc);
- ASSERT(code == ERTS_DSIG_SEND_OK);
- }
- break;
- }
+ if (dep != external_pid_dist_entry(from))
+ goto invalid_message;
- erts_smp_de_links_lock(dep);
- res = erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, from);
+ if (is_external_pid(to)) {
+ if (external_pid_dist_entry(to) != erts_this_dist_entry)
+ goto invalid_message;
+ /* old incarnation of node; reply noproc... */
+ }
+ else if (is_internal_pid(to)) {
+ ErtsLinkData *ldp = erts_link_create(ERTS_LNK_TYPE_DIST_PROC,
+ from, to);
+ ASSERT(ldp->a.other.item == to);
+ ASSERT(eq(ldp->b.other.item, from));
+#ifdef DEBUG
+ code =
+#endif
+ erts_link_dist_insert(&ldp->a, dep->mld);
+ ASSERT(code);
- if (res < 0) {
- /* It was already there! Lets skip the rest... */
- erts_smp_de_links_unlock(dep);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- break;
- }
- lnk = erts_add_or_lookup_link(&(dep->nlinks), LINK_PID, rp->common.id);
- erts_add_link(&(ERTS_LINK_ROOT(lnk)), LINK_PID, from);
- erts_smp_de_links_unlock(dep);
+ if (erts_proc_sig_send_link(NULL, to, &ldp->b))
+ break; /* done */
+
+ /* Failed to send signal; cleanup and reply noproc... */
+
+#ifdef DEBUG
+ code =
+#endif
+ erts_link_dist_delete(&ldp->a);
+ ASSERT(code);
+ erts_link_release_both(ldp);
+ }
- if (IS_TRACED_FL(rp, F_TRACE_PROCS))
- trace_proc(NULL, 0, rp, am_getting_linked, from);
+ code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0);
+ if (code == ERTS_DSIG_PREP_CONNECTED) {
+ code = erts_dsig_send_exit(&dsd, to, from, am_noproc);
+ ASSERT(code == ERTS_DSIG_SEND_OK);
+ }
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
break;
+ }
case DOP_UNLINK: {
- ErtsDistLinkData dld;
if (tuple_arity != 3) {
goto invalid_message;
}
from = tuple[2];
to = tuple[3];
- if (is_not_pid(from) || is_not_pid(to)) {
+ if (is_not_external_pid(from))
+ goto invalid_message;
+ if (dep != external_pid_dist_entry(from))
goto invalid_message;
- }
-
- rp = erts_pid2proc_opt(NULL, 0,
- to, ERTS_PROC_LOCK_LINK,
- ERTS_P2P_FLG_ALLOW_OTHER_X);
- if (!rp)
- break;
-
- lnk = erts_remove_link(&ERTS_P_LINKS(rp), from);
- if (IS_TRACED_FL(rp, F_TRACE_PROCS) && lnk != NULL) {
- trace_proc(NULL, 0, rp, am_getting_unlinked, from);
- }
+ if (is_external_pid(to)
+ && erts_this_dist_entry == external_pid_dist_entry(from))
+ break;
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ if (is_not_internal_pid(to))
+ goto invalid_message;
- erts_remove_dist_link(&dld, to, from, dep);
- erts_destroy_dist_link(&dld);
- if (lnk)
- erts_destroy_link(lnk);
+ erts_proc_sig_send_dist_unlink(dep, from, to);
break;
}
case DOP_MONITOR_P: {
/* A remote process wants to monitor us, we get:
{DOP_MONITOR_P, Remote pid, local pid or name, ref} */
- Eterm name;
-
+ Eterm pid, name;
+ ErtsDSigData dsd;
+ int code;
+
if (tuple_arity != 4) {
goto invalid_message;
}
@@ -1325,44 +1408,58 @@ int erts_net_message(Port *prt,
watched = tuple[3]; /* local proc to monitor */
ref = tuple[4];
- if (is_not_ref(ref)) {
+ if (is_not_external_pid(watcher))
+ goto invalid_message;
+ else if (external_pid_dist_entry(watcher) != dep)
+ goto invalid_message;
+
+ if (is_not_ref(ref))
goto invalid_message;
- }
- if (is_atom(watched)) {
- name = watched;
- rp = erts_whereis_process(NULL, 0,
- watched, ERTS_PROC_LOCK_LINK,
- ERTS_P2P_FLG_ALLOW_OTHER_X);
- }
- else {
- name = NIL;
- rp = erts_pid2proc_opt(NULL, 0,
- watched, ERTS_PROC_LOCK_LINK,
- ERTS_P2P_FLG_ALLOW_OTHER_X);
- }
+ if (is_internal_pid(watched)) {
+ name = NIL;
+ pid = watched;
+ }
+ else if (is_atom(watched)) {
+ name = watched;
+ pid = erts_whereis_name_to_id(NULL, watched);
+ /* if port or undefined; reply noproc... */
+ }
+ else if (is_external_pid(watched)
+ && external_pid_dist_entry(watched) == erts_this_dist_entry) {
+ name = NIL;
+ pid = am_undefined; /* old incarnation; reply noproc... */
+ }
+ else
+ goto invalid_message;
- if (!rp) {
- ErtsDSigData dsd;
- int code;
- code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0);
- if (code == ERTS_DSIG_PREP_CONNECTED) {
- code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref,
- am_noproc);
- ASSERT(code == ERTS_DSIG_SEND_OK);
- }
- }
- else {
- if (is_atom(watched))
- watched = rp->common.id;
- erts_smp_de_links_lock(dep);
- erts_add_monitor(&(dep->monitors), MON_ORIGIN, ref, watched, name);
- erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, ref, watcher, name);
- erts_smp_de_links_unlock(dep);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- }
+ if (is_internal_pid(pid)) {
+ ErtsMonitorData *mdp;
+ mdp = erts_monitor_create(ERTS_MON_TYPE_DIST_PROC,
+ ref, watcher, pid, name);
- break;
+ code = erts_monitor_dist_insert(&mdp->origin, dep->mld);
+ ASSERT(code); (void)code;
+
+ if (erts_proc_sig_send_monitor(&mdp->target, pid))
+ break; /* done */
+
+ /* Failed to send to local proc; cleanup reply noproc... */
+
+ code = erts_monitor_dist_delete(&mdp->origin);
+ ASSERT(code); (void)code;
+ erts_monitor_release_both(mdp);
+
+ }
+
+ code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0);
+ if (code == ERTS_DSIG_PREP_CONNECTED) {
+ code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref,
+ am_noproc);
+ ASSERT(code == ERTS_DSIG_SEND_OK);
+ }
+
+ break;
}
case DOP_DEMONITOR_P:
@@ -1373,36 +1470,43 @@ int erts_net_message(Port *prt,
if (tuple_arity != 4) {
goto invalid_message;
}
- /* watcher = tuple[2]; */
- /* watched = tuple[3]; May be an atom in case of monitor name */
+
+ watcher = tuple[2];
+ watched = tuple[3];
ref = tuple[4];
- if(is_not_ref(ref)) {
+ if (is_not_ref(ref)) {
goto invalid_message;
}
- erts_smp_de_links_lock(dep);
- mon = erts_remove_monitor(&(dep->monitors),ref);
- erts_smp_de_links_unlock(dep);
- /* ASSERT(mon != NULL); can happen in case of broken dist message */
- if (mon == NULL) {
- break;
- }
- watched = mon->pid;
- erts_destroy_monitor(mon);
- rp = erts_pid2proc_opt(NULL, 0,
- watched, ERTS_PROC_LOCK_LINK,
- ERTS_P2P_FLG_ALLOW_OTHER_X);
- if (!rp) {
- break;
- }
- mon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
- ASSERT(mon != NULL);
- if (mon == NULL) {
- break;
- }
- erts_destroy_monitor(mon);
+ if (is_not_external_pid(watcher) || external_pid_dist_entry(watcher) != dep)
+ goto invalid_message;
+
+ if (is_internal_pid(watched))
+ erts_proc_sig_send_dist_demonitor(watched, ref);
+ else if (is_external_pid(watched)
+ && external_pid_dist_entry(watched) == erts_this_dist_entry) {
+ /* old incarnation; ignore it */
+ ;
+ }
+ else if (is_atom(watched)) {
+ ErtsMonLnkDist *mld = dep->mld;
+ ErtsMonitor *mon;
+
+ erts_mtx_lock(&mld->mtx);
+
+ mon = erts_monitor_tree_lookup(mld->orig_name_monitors, ref);
+ if (mon)
+ erts_monitor_tree_delete(&mld->orig_name_monitors, mon);
+
+ erts_mtx_unlock(&mld->mtx);
+
+ if (mon)
+ erts_proc_sig_send_demonitor(mon);
+ }
+ else
+ goto invalid_message;
+
break;
case DOP_REG_SEND_TT:
@@ -1458,42 +1562,56 @@ int erts_net_message(Port *prt,
erts_queue_dist_message(rp, locks, ede_copy, token, from);
if (locks)
- erts_smp_proc_unlock(rp, locks);
+ erts_proc_unlock(rp, locks);
}
break;
+ case DOP_SEND_SENDER_TT: {
+ Uint xsize;
case DOP_SEND_TT:
+
if (tuple_arity != 4) {
goto invalid_message;
}
-
- token_size = size_object(tuple[4]);
- /* Fall through ... */
+
+ token = tuple[4];
+ token_size = size_object(token);
+ xsize = ERTS_HEAP_FRAG_SIZE(token_size);
+ goto send_common;
+
+ case DOP_SEND_SENDER:
case DOP_SEND:
+
+ token = NIL;
+ xsize = 0;
+ if (tuple_arity != 3)
+ goto invalid_message;
+
+ send_common:
+
/*
- * There is intentionally no testing of the cookie (it is always '')
- * from R9B and onwards.
+ * If DOP_SEND_SENDER or DOP_SEND_SENDER_TT element 2 contains
+ * the sender pid (i.e. DFLAG_SEND_SENDER is set); otherwise,
+ * the atom '' (empty cookie).
*/
+ ASSERT((type == DOP_SEND_SENDER || type == DOP_SEND_SENDER_TT)
+ ? (is_pid(tuple[2]) && (dep->flags & DFLAG_SEND_SENDER))
+ : tuple[2] == am_Empty);
+
#ifdef ERTS_DIST_MSG_DBG
dist_msg_dbg(&ede, "MSG", buf, orig_len);
#endif
- if (type != DOP_SEND_TT && tuple_arity != 3) {
- goto invalid_message;
- }
to = tuple[3];
if (is_not_pid(to)) {
goto invalid_message;
}
rp = erts_proc_lookup(to);
if (rp) {
- Uint xsize = type == DOP_SEND ? 0 : ERTS_HEAP_FRAG_SIZE(token_size);
ErtsProcLocks locks = 0;
ErtsDistExternal *ede_copy;
ede_copy = erts_make_dist_ext_copy(&ede, xsize);
- if (type == DOP_SEND) {
- token = NIL;
- } else {
+ if (is_not_nil(token)) {
ErlHeapFragment *heap_frag;
ErlOffHeap *ohp;
ASSERT(xsize);
@@ -1501,82 +1619,50 @@ int erts_net_message(Port *prt,
ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
hp = heap_frag->mem;
ohp = &heap_frag->off_heap;
- token = tuple[4];
token = copy_struct(token, token_size, &hp, ohp);
}
- erts_queue_dist_message(rp, locks, ede_copy, token, tuple[2]);
+ erts_queue_dist_message(rp, locks, ede_copy, token, am_Empty);
if (locks)
- erts_smp_proc_unlock(rp, locks);
+ erts_proc_unlock(rp, locks);
}
break;
+ }
case DOP_MONITOR_P_EXIT: {
/* We are monitoring a process on the remote node which dies, we get
{DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */
-
- DeclareTmpHeapNoproc(lhp,3);
- Eterm sysname;
- ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_MSG_SEND|ERTS_PROC_LOCK_LINK;
-
if (tuple_arity != 5) {
goto invalid_message;
}
- /* watched = tuple[2]; */ /* remote proc which died */
- /* watcher = tuple[3]; */
+ watched = tuple[2]; /* remote proc or name which died */
+ watcher = tuple[3];
ref = tuple[4];
reason = tuple[5];
- if(is_not_ref(ref)) {
+ if (is_not_ref(ref))
goto invalid_message;
- }
- erts_smp_de_links_lock(dep);
- sysname = dep->sysname;
- mon = erts_remove_monitor(&(dep->monitors), ref);
- /*
- * If demonitor was performed at the same time as the
- * monitored process exits, monitoring side will have
- * removed info about monitor. In this case, do nothing
- * and everything will be as it should.
- */
- erts_smp_de_links_unlock(dep);
- if (mon == NULL) {
- break;
- }
- rp = erts_pid2proc(NULL, 0, mon->pid, rp_locks);
+ if (is_not_external_pid(watched) && is_not_atom(watched))
+ goto invalid_message;
- erts_destroy_monitor(mon);
- if (rp == NULL) {
- break;
- }
-
- mon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref);
+ if (is_not_internal_pid(watcher)) {
+ if (!is_external_pid(watcher))
+ goto invalid_message;
+ if (erts_this_dist_entry == external_pid_dist_entry(watcher))
+ break;
+ goto invalid_message;
+ }
- if (mon == NULL) {
- erts_smp_proc_unlock(rp, rp_locks);
- break;
- }
- UseTmpHeapNoproc(3);
-
- watched = (is_not_nil(mon->name)
- ? TUPLE2(&lhp[0], mon->name, sysname)
- : mon->pid);
-
- erts_queue_monitor_message(rp, &rp_locks,
- ref, am_process, watched, reason);
- erts_smp_proc_unlock(rp, rp_locks);
- erts_destroy_monitor(mon);
- UnUseTmpHeapNoproc(3);
+ erts_proc_sig_send_dist_monitor_down(dep, ref, watched,
+ watcher, reason);
break;
}
case DOP_EXIT_TT:
case DOP_EXIT: {
- ErtsDistLinkData dld;
- ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND;
/* 'from', which 'to' is linked to, died */
if (type == DOP_EXIT) {
if (tuple_arity != 4) {
@@ -1596,56 +1682,19 @@ int erts_net_message(Port *prt,
token = tuple[4];
reason = tuple[5];
}
- if (is_not_pid(from) || is_not_internal_pid(to)) {
+ if (is_not_external_pid(from)
+ || dep != external_pid_dist_entry(from)
+ || is_not_internal_pid(to)) {
goto invalid_message;
}
- rp = erts_pid2proc(NULL, 0, to, rp_locks);
- if (!rp)
- lnk = NULL;
- else {
- lnk = erts_remove_link(&ERTS_P_LINKS(rp), from);
-
- /* If lnk == NULL, we have unlinked on this side, i.e.
- * ignore exit.
- */
- if (lnk) {
- int xres;
-#if 0
- /* Arndt: Maybe it should never be 'kill', but it can be,
- namely when a linked process does exit(kill). Until we know
- whether that is incorrect and what should happen instead,
- we leave the assertion out. */
- ASSERT(reason != am_kill); /* should never be kill (killed) */
-#endif
- xres = erts_send_exit_signal(NULL,
- from,
- rp,
- &rp_locks,
- reason,
- token,
- NULL,
- ERTS_XSIG_FLG_IGN_KILL);
- if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) {
- /* We didn't exit the process and it is traced */
- if (rp_locks & ERTS_PROC_LOCKS_XSIG_SEND) {
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCKS_XSIG_SEND);
- rp_locks &= ~ERTS_PROC_LOCKS_XSIG_SEND;
- }
- trace_proc(NULL, 0, rp, am_getting_unlinked, from);
- }
- }
- erts_smp_proc_unlock(rp, rp_locks);
- }
- erts_remove_dist_link(&dld, to, from, dep);
- if (lnk)
- erts_destroy_link(lnk);
- erts_destroy_dist_link(&dld);
+ erts_proc_sig_send_dist_link_exit(dep,
+ from, to,
+ reason, token);
break;
}
case DOP_EXIT2_TT:
- case DOP_EXIT2: {
- ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
+ case DOP_EXIT2:
/* 'from' is send an exit signal to 'to' */
if (type == DOP_EXIT2) {
if (tuple_arity != 4) {
@@ -1667,20 +1716,10 @@ int erts_net_message(Port *prt,
if (is_not_pid(from) || is_not_internal_pid(to)) {
goto invalid_message;
}
- rp = erts_pid2proc(NULL, 0, to, rp_locks);
- if (rp) {
- (void) erts_send_exit_signal(NULL,
- from,
- rp,
- &rp_locks,
- reason,
- token,
- NULL,
- 0);
- erts_smp_proc_unlock(rp, rp_locks);
- }
+
+ erts_proc_sig_send_exit(NULL, from, to, reason, token, 0);
break;
- }
+
case DOP_GROUP_LEADER:
if (tuple_arity != 3) {
goto invalid_message;
@@ -1691,11 +1730,7 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- rp = erts_pid2proc(NULL, 0, to, ERTS_PROC_LOCK_MAIN);
- if (!rp)
- break;
- rp->group_leader = STORE_NC_IN_PROC(rp, from);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);
+ (void) erts_proc_sig_send_group_leader(NULL, to, from, NIL);
break;
default:
@@ -1707,7 +1742,7 @@ int erts_net_message(Port *prt,
erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
}
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
return 0;
invalid_message:
{
@@ -1723,8 +1758,8 @@ decode_error:
}
data_error:
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
- erts_deliver_port_exit(prt, dep->cid, am_killed, 0, 1);
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ erts_kill_dist_connection(dep, conn_id);
+ ERTS_CHK_NO_PROC_LOCKS;
return -1;
}
@@ -1744,6 +1779,31 @@ static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy)
return ret;
}
+static ERTS_INLINE void
+notify_dist_data(Process *c_p, Eterm pid)
+{
+ Process *rp;
+ ErtsProcLocks rp_locks;
+
+ ASSERT(erts_get_scheduler_data()
+ && !ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data()));
+ ASSERT(is_internal_pid(pid));
+
+ if (c_p && c_p->common.id == pid) {
+ rp = c_p;
+ rp_locks = ERTS_PROC_LOCK_MAIN;
+ }
+ else {
+ rp = erts_proc_lookup(pid);
+ rp_locks = 0;
+ }
+
+ if (rp) {
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ erts_queue_message(rp, rp_locks, mp, am_dist_data, am_system);
+ }
+}
+
int
erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
{
@@ -1754,13 +1814,13 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
while (1) {
switch (ctx->phase) {
case ERTS_DSIG_SEND_PHASE_INIT:
- ctx->flags = dsdp->dep->flags;
+ ctx->flags = dsdp->flags;
ctx->c_p = dsdp->proc;
if (!ctx->c_p || dsdp->no_suspend)
ctx->force_busy = 1;
- ERTS_SMP_LC_ASSERT(!ctx->c_p
+ ERTS_LC_ASSERT(!ctx->c_p
|| (ERTS_PROC_LOCK_MAIN
== erts_proc_lc_my_proc_locks(ctx->c_p)));
@@ -1769,33 +1829,32 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) {
ctx->acmp = erts_get_atom_cache_map(ctx->c_p);
- ctx->pass_through_size = 0;
+ ctx->max_finalize_prepend = 0;
}
else {
ctx->acmp = NULL;
- ctx->pass_through_size = 1;
+ ctx->max_finalize_prepend = 3;
}
#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, ">>%s CTL: %T\n", ctx->pass_through_size ? "P" : " ", ctx->ctl);
- if (is_value(ctx->msg))
- erts_fprintf(stderr, " MSG: %T\n", ctx->msg);
+ erts_fprintf(stderr, ">> CTL: %T\n", ctx->ctl);
+ if (is_value(ctx->msg))
+ erts_fprintf(stderr, " MSG: %T\n", ctx->msg);
#endif
- ctx->data_size = ctx->pass_through_size;
+ ctx->data_size = ctx->max_finalize_prepend;
erts_reset_atom_cache_map(ctx->acmp);
erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size);
- if (is_value(ctx->msg)) {
- ctx->u.sc.wstack.wstart = NULL;
- ctx->u.sc.flags = ctx->flags;
- ctx->u.sc.level = 0;
- ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE;
- } else {
- ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
- }
- break;
+ if (is_non_value(ctx->msg)) {
+ ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
+ break;
+ }
+ ctx->u.sc.wstack.wstart = NULL;
+ ctx->u.sc.flags = ctx->flags;
+ ctx->u.sc.level = 0;
+ ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE;
case ERTS_DSIG_SEND_PHASE_MSG_SIZE:
if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) {
retval = ERTS_DSIG_SEND_CONTINUE;
@@ -1810,23 +1869,25 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
ctx->data_size += ctx->dhdr_ext_size;
ctx->obuf = alloc_dist_obuf(ctx->data_size);
- ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->pass_through_size + ctx->dhdr_ext_size;
+ ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size;
/* Encode internal version of dist header */
ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp);
/* Encode control message */
erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL);
- if (is_value(ctx->msg)) {
- ctx->u.ec.flags = ctx->flags;
- ctx->u.ec.level = 0;
- ctx->u.ec.wstack.wstart = NULL;
- ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE;
- } else {
- ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
- }
- break;
-
- case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
+ if (is_non_value(ctx->msg)) {
+ ctx->obuf->msg_start = NULL;
+ ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
+ break;
+ }
+ ctx->u.ec.flags = ctx->flags;
+ ctx->u.ec.hopefull_flags = 0;
+ ctx->u.ec.level = 0;
+ ctx->u.ec.wstack.wstart = NULL;
+ ctx->obuf->msg_start = ctx->obuf->ext_endp;
+
+ ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE;
+ case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) {
retval = ERTS_DSIG_SEND_CONTINUE;
goto done;
@@ -1839,38 +1900,59 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
int resume = 0;
ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp);
- ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->pass_through_size);
+ ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->max_finalize_prepend);
ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size);
ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp;
+ ctx->obuf->hopefull_flags = ctx->u.ec.hopefull_flags;
/*
* Signal encoded; now verify that the connection still exists,
* and if so enqueue the signal and schedule it for send.
*/
ctx->obuf->next = NULL;
- erts_smp_de_rlock(dep);
+ erts_de_rlock(dep);
cid = dep->cid;
- if (cid != dsdp->cid
- || dep->connection_id != dsdp->connection_id
- || dep->status & ERTS_DE_SFLG_EXITING) {
+ if (dep->state == ERTS_DE_STATE_EXITING
+ || dep->state == ERTS_DE_STATE_IDLE
+ || dep->connection_id != dsdp->connection_id) {
/* Not the same connection as when we started; drop message... */
- erts_smp_de_runlock(dep);
+ erts_de_runlock(dep);
free_dist_obuf(ctx->obuf);
}
else {
+ Sint qsize;
+ erts_aint32_t qflgs;
ErtsProcList *plp = NULL;
- erts_smp_mtx_lock(&dep->qlock);
- dep->qsize += size_obuf(ctx->obuf);
- if (dep->qsize >= erts_dist_buf_busy_limit)
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- if (!ctx->force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) {
- erts_smp_mtx_unlock(&dep->qlock);
+ Eterm notify_proc = NIL;
+ Sint obsz = size_obuf(ctx->obuf);
+
+ erts_mtx_lock(&dep->qlock);
+ qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) obsz);
+ ASSERT(qsize >= obsz);
+ qflgs = erts_atomic32_read_nob(&dep->qflgs);
+ if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) {
+ erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY);
+ qflgs |= ERTS_DE_QFLG_BUSY;
+ }
+ if (qsize == obsz && (qflgs & ERTS_DE_QFLG_REQ_INFO)) {
+ /* Previously empty queue and info requested... */
+ qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
+ ~ERTS_DE_QFLG_REQ_INFO);
+ if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
+ notify_proc = dep->cid;
+ ASSERT(is_internal_pid(notify_proc));
+ }
+ /* else: requester will send itself the message... */
+ qflgs &= ~ERTS_DE_QFLG_REQ_INFO;
+ }
+ if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) {
+ erts_mtx_unlock(&dep->qlock);
plp = erts_proclist_create(ctx->c_p);
erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL);
suspended = 1;
- erts_smp_mtx_lock(&dep->qlock);
+ erts_mtx_lock(&dep->qlock);
}
/* Enqueue obuf on dist entry */
@@ -1881,7 +1963,8 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
dep->out_queue.last = ctx->obuf;
if (!ctx->force_busy) {
- if (!(dep->qflgs & ERTS_DE_QFLG_BUSY)) {
+ qflgs = erts_atomic32_read_nob(&dep->qflgs);
+ if (!(qflgs & ERTS_DE_QFLG_BUSY)) {
if (suspended)
resume = 1; /* was busy when we started, but isn't now */
#ifdef USE_VM_PROBES
@@ -1905,9 +1988,17 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
}
}
- erts_smp_mtx_unlock(&dep->qlock);
- erts_schedule_dist_command(NULL, dep);
- erts_smp_de_runlock(dep);
+ erts_mtx_unlock(&dep->qlock);
+ if (dep->state != ERTS_DE_STATE_PENDING) {
+ if (is_internal_port(dep->cid))
+ erts_schedule_dist_command(NULL, dep);
+ }
+ else {
+ notify_proc = NIL;
+ }
+ erts_de_runlock(dep);
+ if (is_internal_pid(notify_proc))
+ notify_dist_data(ctx->c_p, notify_proc);
if (resume) {
erts_resume(ctx->c_p, ERTS_PROC_LOCK_MAIN);
@@ -1961,35 +2052,39 @@ static Uint
dist_port_command(Port *prt, ErtsDistOutputBuf *obuf)
{
int fpe_was_unmasked;
- Uint size = obuf->ext_endp - obuf->extp;
+ ErlDrvSizeT size;
+ char *bufp;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_CHK_NO_PROC_LOCKS;
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (size > (Uint) INT_MAX)
- erts_exit(ERTS_DUMP_EXIT,
- "Absurdly large distribution output data buffer "
- "(%beu bytes) passed.\n",
- size);
+ if (!obuf) {
+ size = 0;
+ bufp = NULL;
+ }
+ else {
+ size = obuf->ext_endp - obuf->extp;
+ bufp = (char*) obuf->extp;
+ }
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(dist_output)) {
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
DTRACE_CHARBUF(port_str, 64);
DTRACE_CHARBUF(remote_str, 64);
erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
"%T", prt->common.id);
erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
- "%T", prt->dist_entry->sysname);
+ "%T", dep->sysname);
DTRACE4(dist_output, erts_this_node_sysname, port_str,
remote_str, size);
}
#endif
+
prt->caller = NIL;
fpe_was_unmasked = erts_block_fpe();
- (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data,
- (char*) obuf->extp,
- (int) size);
+ (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, bufp, size);
erts_unblock_fpe(fpe_was_unmasked);
return size;
}
@@ -1998,44 +2093,53 @@ static Uint
dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
{
int fpe_was_unmasked;
- Uint size = obuf->ext_endp - obuf->extp;
+ ErlDrvSizeT size;
SysIOVec iov[2];
ErlDrvBinary* bv[2];
ErlIOVec eiov;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
-
- if (size > (Uint) INT_MAX)
- erts_exit(ERTS_DUMP_EXIT,
- "Absurdly large distribution output data buffer "
- "(%beu bytes) passed.\n",
- size);
+ ERTS_CHK_NO_PROC_LOCKS;
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
iov[0].iov_base = NULL;
iov[0].iov_len = 0;
bv[0] = NULL;
- iov[1].iov_base = obuf->extp;
- iov[1].iov_len = size;
- bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf));
+ if (!obuf) {
+ size = 0;
+ eiov.vsize = 1;
+ }
+ else {
+ size = obuf->ext_endp - obuf->extp;
+ eiov.vsize = 2;
+
+ iov[1].iov_base = obuf->extp;
+ iov[1].iov_len = size;
+ bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf));
+ }
- eiov.vsize = 2;
eiov.size = size;
eiov.iov = iov;
eiov.binv = bv;
+ if (size > (Uint) INT_MAX)
+ erts_exit(ERTS_DUMP_EXIT,
+ "Absurdly large distribution output data buffer "
+ "(%beu bytes) passed.\n",
+ size);
+
ASSERT(prt->drv_ptr->outputv);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(dist_outputv)) {
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
DTRACE_CHARBUF(port_str, 64);
DTRACE_CHARBUF(remote_str, 64);
erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
"%T", prt->common.id);
erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
- "%T", prt->dist_entry->sysname);
+ "%T", dep->sysname);
DTRACE4(dist_outputv, erts_this_node_sysname, port_str,
remote_str, size);
}
@@ -2058,7 +2162,6 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
#endif
#define ERTS_PORT_REDS_DIST_CMD_START 5
-#define ERTS_PORT_REDS_DIST_CMD_FINALIZE 3
#define ERTS_PORT_REDS_DIST_CMD_EXIT 200
#define ERTS_PORT_REDS_DIST_CMD_RESUMED 5
#define ERTS_PORT_REDS_DIST_CMD_DATA(SZ) \
@@ -2067,37 +2170,36 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
: ((((Sint) (SZ)) >> 10) & ((Sint) ERTS_PORT_REDS_MASK__)))
int
-erts_dist_command(Port *prt, int reds_limit)
+erts_dist_command(Port *prt, int initial_reds)
{
- Sint reds = ERTS_PORT_REDS_DIST_CMD_START;
- Uint32 status;
+ Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START;
+ enum dist_entry_state state;
Uint32 flags;
- Sint obufsize = 0;
+ Sint qsize, obufsize = 0;
ErtsDistOutputQueue oq, foq;
- DistEntry *dep = prt->dist_entry;
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
erts_aint32_t sched_flags;
ErtsSchedulerData *esdp = erts_get_scheduler_data();
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
-
- erts_refc_inc(&dep->refc, 1); /* Otherwise dist_entry might be
- removed if port command fails */
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
- erts_smp_atomic_set_mb(&dep->dist_cmd_scheduled, 0);
+ erts_atomic_set_mb(&dep->dist_cmd_scheduled, 0);
- erts_smp_de_rlock(dep);
+ erts_de_rlock(dep);
flags = dep->flags;
- status = dep->status;
+ state = dep->state;
send = dep->send;
- erts_smp_de_runlock(dep);
+ erts_de_runlock(dep);
- if (status & ERTS_DE_SFLG_EXITING) {
+ if (state == ERTS_DE_STATE_EXITING) {
erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1);
- erts_deref_dist_entry(dep);
- return reds + ERTS_PORT_REDS_DIST_CMD_EXIT;
+ reds -= ERTS_PORT_REDS_DIST_CMD_EXIT;
+ return initial_reds - reds;
}
+ ASSERT(state != ERTS_DE_STATE_PENDING);
+
ASSERT(send);
/*
@@ -2108,42 +2210,42 @@ erts_dist_command(Port *prt, int reds_limit)
* a mess.
*/
- erts_smp_mtx_lock(&dep->qlock);
+ erts_mtx_lock(&dep->qlock);
oq.first = dep->out_queue.first;
oq.last = dep->out_queue.last;
dep->out_queue.first = NULL;
dep->out_queue.last = NULL;
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_unlock(&dep->qlock);
foq.first = dep->finalized_out_queue.first;
foq.last = dep->finalized_out_queue.last;
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
- sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
- if (reds > reds_limit)
+ if (reds < 0)
goto preempted;
if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) {
int preempt = 0;
do {
- Uint size;
- ErtsDistOutputBuf *fob;
-
- size = (*send)(prt, foq.first);
- esdp->io.out += (Uint64) size;
+ Uint size;
+ ErtsDistOutputBuf *fob;
+ size = (*send)(prt, foq.first);
+ erts_atomic64_inc_nob(&dep->out);
+ esdp->io.out += (Uint64) size;
#ifdef ERTS_RAW_DIST_MSG_DBG
- erts_fprintf(stderr, ">> ");
- bw(foq.first->extp, size);
+ erts_fprintf(stderr, ">> ");
+ bw(foq.first->extp, size);
#endif
- reds += ERTS_PORT_REDS_DIST_CMD_DATA(size);
+ reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size);
fob = foq.first;
obufsize += size_obuf(fob);
foq.first = foq.first->next;
free_dist_obuf(fob);
- sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
- preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT);
+ sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
+ preempt = reds < 0 || (sched_flags & ERTS_PTS_FLG_EXIT);
if (sched_flags & ERTS_PTS_FLG_BUSY_PORT)
break;
} while (foq.first && !preempt);
@@ -2156,79 +2258,71 @@ erts_dist_command(Port *prt, int reds_limit)
if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) {
if (oq.first) {
ErtsDistOutputBuf *ob;
- int preempt;
+ ErtsDistOutputBuf *last_finalized = NULL;
finalize_only:
- preempt = 0;
ob = oq.first;
ASSERT(ob);
do {
- ob->extp = erts_encode_ext_dist_header_finalize(ob->extp,
- dep->cache,
- flags);
- if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
- *--ob->extp = PASS_THROUGH; /* Old node; 'pass through'
- needed */
- ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp);
- reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
- preempt = reds > reds_limit;
- if (preempt)
- break;
- ob = ob->next;
+ reds = erts_encode_ext_dist_header_finalize(ob, dep, flags, reds);
+ if (reds < 0)
+ break;
+ last_finalized = ob;
+ ob = ob->next;
} while (ob);
- /*
- * At least one buffer was finalized; if we got preempted,
- * ob points to the last buffer that we finalized.
- */
- if (foq.last)
- foq.last->next = oq.first;
- else
- foq.first = oq.first;
- if (!preempt) {
- /* All buffers finalized */
- foq.last = oq.last;
- oq.first = oq.last = NULL;
- }
- else {
- /* Not all buffers finalized; split oq. */
- foq.last = ob;
- oq.first = ob->next;
- if (oq.first)
- ob->next = NULL;
- else
- oq.last = NULL;
- }
- if (preempt)
- goto preempted;
+ if (last_finalized) {
+ /*
+ * At least one buffer was finalized; if we got preempted,
+ * ob points to the next buffer to continue finalize.
+ */
+ if (foq.last)
+ foq.last->next = oq.first;
+ else
+ foq.first = oq.first;
+ foq.last = last_finalized;
+ if (!ob) {
+ /* All buffers finalized */
+ ASSERT(foq.last == oq.last);
+ ASSERT(foq.last->next == NULL);
+ oq.first = oq.last = NULL;
+ }
+ else {
+ /* Not all buffers finalized; split oq. */
+ ASSERT(foq.last->next == ob);
+ foq.last->next = NULL;
+ oq.first = ob;
+ }
+ }
+ if (reds <= 0)
+ goto preempted;
}
}
else {
+ int de_busy;
int preempt = 0;
while (oq.first && !preempt) {
ErtsDistOutputBuf *fob;
Uint size;
- oq.first->extp
- = erts_encode_ext_dist_header_finalize(oq.first->extp,
- dep->cache,
- flags);
- reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
- if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
- *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through'
- needed */
+ reds = erts_encode_ext_dist_header_finalize(oq.first, dep, flags, reds);
+ if (reds < 0) {
+ preempt = 1;
+ break;
+ }
ASSERT(&oq.first->data[0] <= oq.first->extp
- && oq.first->extp < oq.first->ext_endp);
+ && oq.first->extp <= oq.first->ext_endp);
size = (*send)(prt, oq.first);
+ erts_atomic64_inc_nob(&dep->out);
esdp->io.out += (Uint64) size;
#ifdef ERTS_RAW_DIST_MSG_DBG
- erts_fprintf(stderr, ">> ");
- bw(oq.first->extp, size);
+ erts_fprintf(stderr, ">> ");
+ bw(oq.first->extp, size);
#endif
- reds += ERTS_PORT_REDS_DIST_CMD_DATA(size);
+ reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size);
fob = oq.first;
obufsize += size_obuf(fob);
oq.first = oq.first->next;
free_dist_obuf(fob);
- sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
- preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT);
+ sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
+ preempt = reds <= 0 || (sched_flags & ERTS_PTS_FLG_EXIT);
if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt)
goto finalize_only;
}
@@ -2254,23 +2348,24 @@ erts_dist_command(Port *prt, int reds_limit)
* dist entry in a non-busy state and resume suspended
* processes.
*/
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
+ erts_mtx_lock(&dep->qlock);
+ de_busy = !!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_BUSY);
+ qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
+ ASSERT(qsize >= 0);
obufsize = 0;
if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)
- && (dep->qflgs & ERTS_DE_QFLG_BUSY)
- && dep->qsize < erts_dist_buf_busy_limit) {
+ && de_busy && qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
int resumed;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_unlock(&dep->qlock);
resumed = erts_resume_processes(suspendees);
- reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
+ reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
}
else
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_unlock(&dep->qlock);
}
ASSERT(!oq.first && !oq.last);
@@ -2279,14 +2374,18 @@ erts_dist_command(Port *prt, int reds_limit)
if (obufsize != 0) {
ASSERT(obufsize > 0);
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_lock(&dep->qlock);
+#ifdef DEBUG
+ qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
+ ASSERT(qsize >= 0);
+#else
+ erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize);
+#endif
+ erts_mtx_unlock(&dep->qlock);
}
- ASSERT(foq.first || !foq.last);
- ASSERT(!foq.first || foq.last);
+ ASSERT(!!foq.first == !!foq.last);
ASSERT(!dep->finalized_out_queue.first);
ASSERT(!dep->finalized_out_queue.last);
@@ -2296,12 +2395,10 @@ erts_dist_command(Port *prt, int reds_limit)
}
/* Avoid wrapping reduction counter... */
- if (reds > INT_MAX/2)
- reds = INT_MAX/2;
-
- erts_deref_dist_entry(dep);
+ if (reds < INT_MIN/2)
+ reds = INT_MIN/2;
- return reds;
+ return initial_reds - reds;
preempted:
/*
@@ -2309,8 +2406,7 @@ erts_dist_command(Port *prt, int reds_limit)
* since last call to driver.
*/
- ASSERT(oq.first || !oq.last);
- ASSERT(!oq.first || oq.last);
+ ASSERT(!!oq.first == !!oq.last);
if (sched_flags & ERTS_PTS_FLG_EXIT) {
/*
@@ -2334,12 +2430,6 @@ erts_dist_command(Port *prt, int reds_limit)
foq.first = NULL;
foq.last = NULL;
-
-#ifdef DEBUG
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize == obufsize);
- erts_smp_mtx_unlock(&dep->qlock);
-#endif
}
else {
if (oq.first) {
@@ -2347,14 +2437,14 @@ erts_dist_command(Port *prt, int reds_limit)
* Unhandle buffers need to be put back first
* in out_queue.
*/
- erts_smp_mtx_lock(&dep->qlock);
- dep->qsize -= obufsize;
+ erts_mtx_lock(&dep->qlock);
+ erts_atomic_add_nob(&dep->qsize, -obufsize);
obufsize = 0;
oq.last->next = dep->out_queue.first;
dep->out_queue.first = oq.first;
if (!dep->out_queue.last)
dep->out_queue.last = oq.last;
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_unlock(&dep->qlock);
}
erts_schedule_dist_command(prt, NULL);
@@ -2362,18 +2452,406 @@ erts_dist_command(Port *prt, int reds_limit)
goto done;
}
+#if 0
+
+int
+dist_data_finalize(Process *c_p, int reds_limit)
+{
+ int reds = 5;
+ DistEntry *dep = ;
+ ErtsDistOutputQueue oq, foq;
+ ErtsDistOutputBuf *ob;
+ int preempt;
+
+
+ erts_mtx_lock(&dep->qlock);
+ flags = dep->flags;
+ oq.first = dep->out_queue.first;
+ oq.last = dep->out_queue.last;
+ dep->out_queue.first = NULL;
+ dep->out_queue.last = NULL;
+ erts_mtx_unlock(&dep->qlock);
+
+ if (!oq.first) {
+ ASSERT(!oq.last);
+ oq.first = dep->tmp_out_queue.first;
+ oq.last = dep->tmp_out_queue.last;
+ }
+ else {
+ ErtsDistOutputBuf *f, *l;
+ ASSERT(oq.last);
+ if (dep->tmp_out_queue.last) {
+ dep->tmp_out_queue.last->next = oq.first;
+ oq.first = dep->tmp_out_queue.first;
+ }
+ }
+
+ if (!oq.first) {
+ /* Nothing to do... */
+ ASSERT(!oq.last);
+ return reds;
+ }
+
+ foq.first = dep->finalized_out_queue.first;
+ foq.last = dep->finalized_out_queue.last;
+
+ preempt = 0;
+ ob = oq.first;
+ ASSERT(ob);
+
+ do {
+ ob->extp = erts_encode_ext_dist_header_finalize(ob->extp,
+ dep->cache,
+ flags);
+ if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
+ *--ob->extp = PASS_THROUGH; /* Old node; 'pass through'
+ needed */
+ ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp);
+ reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
+ preempt = reds > reds_limit;
+ if (preempt)
+ break;
+ ob = ob->next;
+ } while (ob);
+ /*
+ * At least one buffer was finalized; if we got preempted,
+ * ob points to the last buffer that we finalized.
+ */
+ if (foq.last)
+ foq.last->next = oq.first;
+ else
+ foq.first = oq.first;
+ if (!preempt) {
+ /* All buffers finalized */
+ foq.last = oq.last;
+ oq.first = oq.last = NULL;
+ }
+ else {
+ /* Not all buffers finalized; split oq. */
+ foq.last = ob;
+ oq.first = ob->next;
+ if (oq.first)
+ ob->next = NULL;
+ else
+ oq.last = NULL;
+ }
+
+ dep->finalized_out_queue.first = foq.first;
+ dep->finalized_out_queue.last = foq.last;
+ dep->tmp_out_queue.first = oq.first;
+ dep->tmp_out_queue.last = oq.last;
+
+ return reds;
+}
+
+#endif
+
+BIF_RETTYPE
+dist_ctrl_get_data_notification_1(BIF_ALIST_1)
+{
+ DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
+ erts_aint32_t qflgs;
+ erts_aint_t qsize;
+ Eterm receiver = NIL;
+ Uint32 conn_id;
+
+ if (!dep)
+ BIF_ERROR(BIF_P, EXC_NOTSUP);
+
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ /*
+ * Caller is the only one that can consume from this queue
+ * and the only one that can set the req-info flag...
+ */
+
+ erts_de_rlock(dep);
+
+ if (dep->connection_id != conn_id) {
+ erts_de_runlock(dep);
+ BIF_ERROR(BIF_P, BADARG);
+ }
+
+ ASSERT(dep->cid == BIF_P->common.id);
+
+ qflgs = erts_atomic32_read_acqb(&dep->qflgs);
+
+ if (!(qflgs & ERTS_DE_QFLG_REQ_INFO)) {
+ qsize = erts_atomic_read_acqb(&dep->qsize);
+ ASSERT(qsize >= 0);
+ if (qsize > 0)
+ receiver = BIF_P->common.id; /* Notify ourselves... */
+ else { /* Empty queue; set req-info flag... */
+ qflgs = erts_atomic32_read_bor_mb(&dep->qflgs,
+ ERTS_DE_QFLG_REQ_INFO);
+ qsize = erts_atomic_read_acqb(&dep->qsize);
+ ASSERT(qsize >= 0);
+ if (qsize > 0) {
+ qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
+ ~ERTS_DE_QFLG_REQ_INFO);
+ if (qflgs & ERTS_DE_QFLG_REQ_INFO)
+ receiver = BIF_P->common.id; /* Notify ourselves... */
+ /* else: someone else will notify us... */
+ }
+ /* else: still empty queue... */
+ }
+ }
+ /* else: Already requested... */
+
+ erts_de_runlock(dep);
+
+ if (is_internal_pid(receiver))
+ notify_dist_data(BIF_P, receiver);
+
+ BIF_RET(am_ok);
+}
+
+BIF_RETTYPE
+dist_ctrl_put_data_2(BIF_ALIST_2)
+{
+ DistEntry *dep;
+ ErlDrvSizeT size;
+ Eterm input_handler;
+ Uint32 conn_id;
+
+ if (is_binary(BIF_ARG_2))
+ size = binary_size(BIF_ARG_2);
+ else if (is_nil(BIF_ARG_2))
+ size = 0;
+ else if (is_list(BIF_ARG_2))
+ BIF_TRAP2(dist_ctrl_put_data_trap,
+ BIF_P, BIF_ARG_1, BIF_ARG_2);
+ else
+ BIF_ERROR(BIF_P, BADARG);
+
+ dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id);
+ if (!dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ input_handler = (Eterm) erts_atomic_read_nob(&dep->input_handler);
+
+ if (input_handler != BIF_P->common.id)
+ BIF_ERROR(BIF_P, EXC_NOTSUP);
+
+ erts_atomic64_inc_nob(&dep->in);
+
+ if (size != 0) {
+ byte *data, *temp_alloc = NULL;
+
+ data = (byte *) erts_get_aligned_binary_bytes(BIF_ARG_2, &temp_alloc);
+ if (!data)
+ BIF_ERROR(BIF_P, BADARG);
+
+ erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
+
+ (void) erts_net_message(NULL, dep, conn_id, NULL, 0, data, size);
+ /*
+ * We ignore any decode failures. On fatal failures the
+ * connection will be taken down by killing the
+ * distribution channel controller...
+ */
+
+ erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
+
+ BUMP_REDS(BIF_P, 5);
+
+ erts_free_aligned_binary_bytes(temp_alloc);
+
+ }
+
+ BIF_RET(am_ok);
+}
+
+BIF_RETTYPE
+dist_get_stat_1(BIF_ALIST_1)
+{
+ Sint64 read, write, pend;
+ Eterm res, *hp, **hpp;
+ Uint sz, *szp;
+ Uint32 conn_id;
+ DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id);
+
+ if (!dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ erts_de_rlock(dep);
+
+ if (dep->connection_id != conn_id) {
+ erts_de_runlock(dep);
+ BIF_ERROR(BIF_P, BADARG);
+ }
+ read = (Sint64) erts_atomic64_read_nob(&dep->in);
+ write = (Sint64) erts_atomic64_read_nob(&dep->out);
+ pend = (Sint64) erts_atomic_read_nob(&dep->qsize);
+
+ erts_de_runlock(dep);
+
+ sz = 0;
+ szp = &sz;
+ hpp = NULL;
+
+ while (1) {
+ res = erts_bld_tuple(hpp, szp, 4,
+ am_ok,
+ erts_bld_sint64(hpp, szp, read),
+ erts_bld_sint64(hpp, szp, write),
+ pend ? am_true : am_false);
+ if (hpp)
+ break;
+ hp = HAlloc(BIF_P, sz);
+ hpp = &hp;
+ szp = NULL;
+ }
+
+ BIF_RET(res);
+}
+
+BIF_RETTYPE
+dist_ctrl_input_handler_2(BIF_ALIST_2)
+{
+ DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
+ Uint32 conn_id;
+
+ if (!dep)
+ BIF_ERROR(BIF_P, EXC_NOTSUP);
+
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ if (is_not_internal_pid(BIF_ARG_2))
+ BIF_ERROR(BIF_P, BADARG);
+
+ erts_de_rlock(dep);
+ if (dep->connection_id != conn_id) {
+ erts_de_runlock(dep);
+ BIF_ERROR(BIF_P, BADARG);
+ }
+ erts_atomic_set_nob(&dep->input_handler,
+ (erts_aint_t) BIF_ARG_2);
+ erts_de_runlock(dep);
+ BIF_RET(am_ok);
+}
+
+BIF_RETTYPE
+dist_ctrl_get_data_1(BIF_ALIST_1)
+{
+ DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
+ const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P);
+ Sint reds = initial_reds;
+ ErtsDistOutputBuf *obuf;
+ Eterm *hp;
+ ProcBin *pb;
+ erts_aint_t qsize;
+ Uint32 conn_id;
+
+ if (!dep)
+ BIF_ERROR(BIF_P, EXC_NOTSUP);
+
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ erts_de_rlock(dep);
+
+ if (dep->connection_id != conn_id) {
+ erts_de_runlock(dep);
+ BIF_ERROR(BIF_P, BADARG);
+ }
+
+ if (dep->state == ERTS_DE_STATE_EXITING)
+ goto return_none;
+
+ ASSERT(dep->cid == BIF_P->common.id);
+
+#if 0
+ if (dep->finalized_out_queue.first) {
+ obuf = dep->finalized_out_queue.first;
+ dep->finalized_out_queue.first = obuf->next;
+ if (!obuf->next)
+ dep->finalized_out_queue.last = NULL;
+ }
+ else
+#endif
+ {
+ if (!dep->tmp_out_queue.first) {
+ ASSERT(!dep->tmp_out_queue.last);
+ ASSERT(!dep->transcode_ctx);
+ qsize = erts_atomic_read_acqb(&dep->qsize);
+ if (qsize > 0) {
+ erts_mtx_lock(&dep->qlock);
+ dep->tmp_out_queue.first = dep->out_queue.first;
+ dep->tmp_out_queue.last = dep->out_queue.last;
+ dep->out_queue.first = NULL;
+ dep->out_queue.last = NULL;
+ erts_mtx_unlock(&dep->qlock);
+ }
+ }
+
+ if (!dep->tmp_out_queue.first) {
+ ASSERT(!dep->tmp_out_queue.last);
+ return_none:
+ erts_de_runlock(dep);
+ BIF_RET(am_none);
+ }
+
+ obuf = dep->tmp_out_queue.first;
+ reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->flags, reds);
+ if (reds < 0) {
+ erts_de_runlock(dep);
+ ERTS_BIF_YIELD1(bif_export[BIF_dist_ctrl_get_data_1],
+ BIF_P, BIF_ARG_1);
+ }
+
+ dep->tmp_out_queue.first = obuf->next;
+ if (!obuf->next)
+ dep->tmp_out_queue.last = NULL;
+ }
+
+ erts_atomic64_inc_nob(&dep->out);
+
+ erts_de_runlock(dep);
+
+ hp = HAlloc(BIF_P, PROC_BIN_SIZE);
+ pb = (ProcBin *) (char *) hp;
+ pb->thing_word = HEADER_PROC_BIN;
+ pb->size = obuf->ext_endp - obuf->extp;
+ pb->next = MSO(BIF_P).first;
+ MSO(BIF_P).first = (struct erl_off_heap_header*) pb;
+ pb->val = ErtsDistOutputBuf2Binary(obuf);
+ pb->bytes = (byte*) obuf->extp;
+ pb->flags = 0;
+
+ qsize = erts_atomic_add_read_nob(&dep->qsize, -size_obuf(obuf));
+ ASSERT(qsize >= 0);
+
+ if (qsize < erts_dist_buf_busy_limit/2
+ && (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY)) {
+ ErtsProcList *resume_procs = NULL;
+ erts_mtx_lock(&dep->qlock);
+ resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
+ erts_mtx_unlock(&dep->qlock);
+ if (resume_procs) {
+ int resumed = erts_resume_processes(resume_procs);
+ reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
+ }
+ }
+
+ BIF_RET2(make_binary(pb), (initial_reds - reds));
+}
+
void
erts_dist_port_not_busy(Port *prt)
{
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(dist_port_not_busy)) {
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
DTRACE_CHARBUF(port_str, 64);
DTRACE_CHARBUF(remote_str, 64);
erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
"%T", prt->common.id);
erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
- "%T", prt->dist_entry->sysname);
+ "%T", dep->sysname);
DTRACE3(dist_port_not_busy, erts_this_node_sysname,
port_str, remote_str);
}
@@ -2381,86 +2859,93 @@ erts_dist_port_not_busy(Port *prt)
erts_schedule_dist_command(prt, NULL);
}
-void
-erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id)
+static void kill_connection(DistEntry *dep)
{
- erts_smp_de_rwlock(dep);
- if (is_internal_port(dep->cid)
- && connection_id == dep->connection_id
- && !(dep->status & ERTS_DE_SFLG_EXITING)) {
-
- dep->status |= ERTS_DE_SFLG_EXITING;
+ ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep));
+ ASSERT(dep->state == ERTS_DE_STATE_CONNECTED);
+
+ dep->state = ERTS_DE_STATE_EXITING;
+ erts_mtx_lock(&dep->qlock);
+ ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
+ erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT);
+ erts_mtx_unlock(&dep->qlock);
+
+ if (is_internal_port(dep->cid))
+ erts_schedule_dist_command(NULL, dep);
+ else if (is_internal_pid(dep->cid))
+ schedule_kill_dist_ctrl_proc(dep->cid);
+}
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
- dep->qflgs |= ERTS_DE_QFLG_EXIT;
- erts_smp_mtx_unlock(&dep->qlock);
+void
+erts_kill_dist_connection(DistEntry *dep, Uint32 conn_id)
+{
+ erts_de_rwlock(dep);
+ if (conn_id == dep->connection_id
+ && dep->state == ERTS_DE_STATE_CONNECTED) {
- erts_schedule_dist_command(NULL, dep);
+ kill_connection(dep);
}
- erts_smp_de_rwunlock(dep);
+ erts_de_rwunlock(dep);
}
struct print_to_data {
- int to;
+ fmtfn_t to;
void *arg;
};
static void doit_print_monitor_info(ErtsMonitor *mon, void *vptdp)
{
- int to = ((struct print_to_data *) vptdp)->to;
+ fmtfn_t to = ((struct print_to_data *) vptdp)->to;
void *arg = ((struct print_to_data *) vptdp)->arg;
- Process *rp;
- ErtsMonitor *rmon;
- rp = erts_proc_lookup(mon->pid);
- if (!rp || (rmon = erts_lookup_monitor(ERTS_P_MONITORS(rp), mon->ref)) == NULL) {
- erts_print(to, arg, "Warning, stray monitor for: %T\n", mon->pid);
- } else if (mon->type == MON_ORIGIN) {
- /* Local pid is being monitored */
+ ErtsMonitorDataExtended *mdep;
+
+ ASSERT(mon->flags & ERTS_ML_FLG_EXTENDED);
+
+ mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon);
+
+ ASSERT(mdep->dist);
+
+ if (erts_monitor_is_origin(mon)) {
erts_print(to, arg, "Remotely monitored by: %T %T\n",
- mon->pid, rmon->pid);
- } else {
- erts_print(to, arg, "Remote monitoring: %T ", mon->pid);
- if (is_not_atom(rmon->pid))
- erts_print(to, arg, "%T\n", rmon->pid);
- else
- erts_print(to, arg, "{%T, %T}\n",
- rmon->name,
- rmon->pid); /* which in this case is the
- remote system name... */
+ mon->other.item, mdep->md.target.other.item);
+ }
+ else {
+ erts_print(to, arg, "Remote monitoring: %T ", mon->other.item);
+ if (mon->flags & ERTS_ML_FLG_NAME)
+ erts_print(to, arg, "{%T, %T}\n", mdep->u.name, mdep->dist->nodename);
+ else
+ erts_print(to, arg, "%T\n", mdep->md.origin.other.item);
}
}
-static void print_monitor_info(int to, void *arg, ErtsMonitor *mon)
+static void print_monitor_info(fmtfn_t to, void *arg, DistEntry *dep)
{
struct print_to_data ptd = {to, arg};
- erts_doforall_monitors(mon,&doit_print_monitor_info,&ptd);
-}
-
-typedef struct {
- struct print_to_data *ptdp;
- Eterm from;
-} PrintLinkContext;
-
-static void doit_print_link_info2(ErtsLink *lnk, void *vpplc)
-{
- PrintLinkContext *pplc = (PrintLinkContext *) vpplc;
- erts_print(pplc->ptdp->to, pplc->ptdp->arg, "Remote link: %T %T\n",
- pplc->from, lnk->pid);
+ if (dep->mld) {
+ erts_monitor_list_foreach(dep->mld->monitors,
+ doit_print_monitor_info,
+ (void *) &ptd);
+ erts_monitor_tree_foreach(dep->mld->orig_name_monitors,
+ doit_print_monitor_info,
+ (void *) &ptd);
+ }
}
static void doit_print_link_info(ErtsLink *lnk, void *vptdp)
{
- if (is_internal_pid(lnk->pid) && erts_proc_lookup(lnk->pid)) {
- PrintLinkContext plc = {(struct print_to_data *) vptdp, lnk->pid};
- erts_doforall_links(ERTS_LINK_ROOT(lnk), &doit_print_link_info2, &plc);
- }
+ struct print_to_data *ptdp = vptdp;
+ ErtsLink *lnk2 = erts_link_to_other(lnk, NULL);
+ erts_print(ptdp->to, ptdp->arg, "Remote link: %T %T\n",
+ lnk2->other.item, lnk->other.item);
}
-static void print_link_info(int to, void *arg, ErtsLink *lnk)
+static void print_link_info(fmtfn_t to, void *arg, DistEntry *dep)
{
struct print_to_data ptd = {to, arg};
- erts_doforall_links(lnk, &doit_print_link_info, (void *) &ptd);
+ if (dep->mld)
+ erts_link_list_foreach(dep->mld->links,
+ doit_print_link_info,
+ (void *) &ptd);
}
typedef struct {
@@ -2468,25 +2953,8 @@ typedef struct {
Eterm sysname;
} PrintNodeLinkContext;
-
-static void doit_print_nodelink_info(ErtsLink *lnk, void *vpcontext)
-{
- PrintNodeLinkContext *pcontext = vpcontext;
-
- if (is_internal_pid(lnk->pid) && erts_proc_lookup(lnk->pid))
- erts_print(pcontext->ptd.to, pcontext->ptd.arg,
- "Remote monitoring: %T %T\n", lnk->pid, pcontext->sysname);
-}
-
-static void print_nodelink_info(int to, void *arg, ErtsLink *lnk, Eterm sysname)
-{
- PrintNodeLinkContext context = {{to, arg}, sysname};
- erts_doforall_links(lnk, &doit_print_nodelink_info, &context);
-}
-
-
static int
-info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected)
+info_dist_entry(fmtfn_t to, void *arg, DistEntry *dep, int visible, int connected)
{
if (visible && connected) {
@@ -2513,13 +2981,10 @@ info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected)
}
erts_print(to, arg, "Name: %T", dep->sysname);
-#ifdef DEBUG
- erts_print(to, arg, " (refc=%d)", erts_refc_read(&dep->refc, 0));
-#endif
erts_print(to, arg, "\n");
if (!connected && is_nil(dep->cid)) {
- if (dep->nlinks) {
- erts_print(to, arg, "Error: Got links to not connected node:%T\n",
+ if (dep->mld) {
+ erts_print(to, arg, "Error: Got links/monitors to not connected node:%T\n",
dep->sysname);
}
return 0;
@@ -2528,14 +2993,13 @@ info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected)
erts_print(to, arg, "Controller: %T\n", dep->cid, to);
erts_print_node_info(to, arg, dep->sysname, NULL, NULL);
- print_monitor_info(to, arg, dep->monitors);
- print_link_info(to, arg, dep->nlinks);
- print_nodelink_info(to, arg, dep->node_links, dep->sysname);
+ print_monitor_info(to, arg, dep);
+ print_link_info(to, arg, dep);
return 0;
}
-int distribution_info(int to, void *arg) /* Called by break handler */
+int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */
{
DistEntry *dep;
@@ -2559,6 +3023,10 @@ int distribution_info(int to, void *arg) /* Called by break handler */
info_dist_entry(to, arg, dep, 0, 1);
}
+ for (dep = erts_pending_dist_entries; dep; dep = dep->next) {
+ info_dist_entry(to, arg, dep, 0, 0);
+ }
+
for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) {
if (dep != erts_this_dist_entry) {
info_dist_entry(to, arg, dep, 0, 0);
@@ -2581,7 +3049,6 @@ int distribution_info(int to, void *arg) /* Called by break handler */
monitor_node -- turn on/off node monitoring
node controller only:
- dist_exit/3 -- send exit signals from remote to local process
dist_link/2 -- link a remote process to a local
dist_unlink/2 -- unlink a remote from a local
****************************************************************************/
@@ -2591,15 +3058,6 @@ int distribution_info(int to, void *arg) /* Called by break handler */
/**********************************************************************
** Set the node name of current node fail if node already is set.
** setnode(name@host, Creation)
- ** loads functions pointer to trap_functions from module erlang.
- ** erlang:dsend/2
- ** erlang:dlink/1
- ** erlang:dunlink/1
- ** erlang:dmonitor_node/3
- ** erlang:dgroup_leader/2
- ** erlang:dexit/2
- ** -- are these needed ?
- ** dexit/1
***********************************************************************/
BIF_RETTYPE setnode_2(BIF_ALIST_2)
@@ -2623,44 +3081,50 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2)
goto error;
/* Check that all trap functions are defined !! */
- if (dsend2_trap->addressv[0] == NULL ||
- dsend3_trap->addressv[0] == NULL ||
- /* dsend_nosuspend_trap->address == NULL ||*/
- dlink_trap->addressv[0] == NULL ||
- dunlink_trap->addressv[0] == NULL ||
- dmonitor_node_trap->addressv[0] == NULL ||
- dgroup_leader_trap->addressv[0] == NULL ||
- dmonitor_p_trap->addressv[0] == NULL ||
- dexit_trap->addressv[0] == NULL) {
+ if (dmonitor_node_trap->addressv[0] == NULL) {
goto error;
}
- net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN,
- am_net_kernel, ERTS_PROC_LOCK_MAIN, 0);
- if (!net_kernel)
+ net_kernel = erts_whereis_process(BIF_P,
+ ERTS_PROC_LOCK_MAIN,
+ am_net_kernel,
+ ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS,
+ 0);
+ if (!net_kernel || ERTS_PROC_GET_DIST_ENTRY(net_kernel))
goto error;
/* By setting F_DISTRIBUTION on net_kernel,
- * do_net_exist will be called when net_kernel is terminated !! */
+ * erts_do_net_exits will be called when net_kernel is terminated !! */
net_kernel->flags |= F_DISTRIBUTION;
- if (net_kernel != BIF_P)
- erts_smp_proc_unlock(net_kernel, ERTS_PROC_LOCK_MAIN);
+ erts_proc_unlock(net_kernel,
+ (ERTS_PROC_LOCK_STATUS
+ | ((net_kernel != BIF_P)
+ ? ERTS_PROC_LOCK_MAIN
+ : 0)));
#ifdef DEBUG
- erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx);
+ erts_rwmtx_rlock(&erts_dist_table_rwmtx);
ASSERT(!erts_visible_dist_entries && !erts_hidden_dist_entries);
- erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx);
+ erts_rwmtx_runlock(&erts_dist_table_rwmtx);
#endif
- erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
- erts_smp_thr_progress_block();
+ erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
+ erts_thr_progress_block();
inc_no_nodes();
erts_set_this_node(BIF_ARG_1, (Uint32) creation);
erts_is_alive = 1;
send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, am_visible, NIL);
- erts_smp_thr_progress_unblock();
- erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
+ erts_thr_progress_unblock();
+ erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
+
+ /*
+ * Note erts_this_dist_entry is changed by erts_set_this_node(),
+ * so we *need* to use the new one after erts_set_this_node()
+ * is called.
+ */
+ erts_ref_dist_entry(erts_this_dist_entry);
+ ERTS_PROC_SET_DIST_ENTRY(net_kernel, erts_this_dist_entry);
BIF_RET(am_true);
@@ -2668,74 +3132,80 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2)
BIF_ERROR(BIF_P, BADARG);
}
-/**********************************************************************
- ** Allocate a dist entry, set node name install the connection handler
- ** setnode_3({name@host, Creation}, Cid, {Type, Version, Initial, IC, OC})
- ** Type = flag field, where the flags are specified in dist.h
- ** Version = distribution version, >= 1
- ** IC = in_cookie (ignored)
- ** OC = out_cookie (ignored)
- **
- ** Note that in distribution protocols above 1, the Initial parameter
- ** is always NIL and the cookies are always the atom '', cookies are not
- ** sent in the distribution messages but are only used in
- ** the handshake.
- **
- ***********************************************************************/
+/*
+ * erts_internal:create_dist_channel/4 is used by
+ * erlang:setnode/3.
+ */
-BIF_RETTYPE setnode_3(BIF_ALIST_3)
+typedef struct {
+ DistEntry *dep;
+ Uint flags;
+ Uint version;
+} ErtsSetupConnDistCtrl;
+
+static void
+setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
+ Eterm ctrlr, Uint flags,
+ Uint version);
+
+static Eterm
+setup_connection_distctrl(Process *c_p, void *arg,
+ int *redsp, ErlHeapFragment **bpp);
+
+BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
{
BIF_RETTYPE ret;
Uint flags;
- unsigned long version;
- Eterm ic, oc;
- Eterm *tp;
+ Uint version;
+ Eterm *hp, res_tag = THE_NON_VALUE, res = THE_NON_VALUE;
DistEntry *dep = NULL;
+ int de_locked = 0;
Port *pp = NULL;
- /* Prepare for success */
- ERTS_BIF_PREP_RET(ret, am_true);
-
/*
* Check and pick out arguments
*/
- if (!is_node_name_atom(BIF_ARG_1) ||
- is_not_internal_port(BIF_ARG_2) ||
- (erts_this_node->sysname == am_Noname)) {
- goto badarg;
- }
+ /* Node name... */
+ if (!is_node_name_atom(BIF_ARG_1))
+ goto badarg;
- if (!is_tuple(BIF_ARG_3))
- goto badarg;
- tp = tuple_val(BIF_ARG_3);
- if (*tp++ != make_arityval(4))
- goto badarg;
- if (!is_small(*tp))
- goto badarg;
- flags = unsigned_val(*tp++);
- if (!is_small(*tp) || (version = unsigned_val(*tp)) == 0)
- goto badarg;
- ic = *(++tp);
- oc = *(++tp);
- if (!is_atom(ic) || !is_atom(oc))
- goto badarg;
+ /* Distribution controller... */
+ if (!is_internal_port(BIF_ARG_2) && !is_internal_pid(BIF_ARG_2))
+ goto badarg;
+
+ /* Dist flags... */
+ if (!is_small(BIF_ARG_3))
+ goto badarg;
+ flags = unsigned_val(BIF_ARG_3);
+
+ /* Version... */
+ if (!is_small(BIF_ARG_4))
+ goto badarg;
+ version = unsigned_val(BIF_ARG_4);
+
+ if (version == 0)
+ goto badarg;
- /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */
- if (!(DFLAG_EXTENDED_REFERENCES & flags)) {
+ if (~flags & DFLAG_DIST_MANDATORY) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
erts_dsprintf(dsbufp, "%T", BIF_P->common.id);
if (BIF_P->common.u.alive.reg)
erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name);
erts_dsprintf(dsbufp,
" attempted to enable connection to node %T "
- "which is not able to handle extended references.\n",
+ "which does not support all mandatory capabilities.\n",
BIF_ARG_1);
erts_send_error_to_logger(BIF_P->group_leader, dsbufp);
goto badarg;
}
/*
+ * ToDo: Should we not pass connection_id as well
+ * to make sure it's the right connection we commit.
+ */
+
+ /*
* Arguments seem to be in order.
*/
@@ -2746,90 +3216,120 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
else if (!dep)
goto system_limit; /* Should never happen!!! */
- pp = erts_id2port_sflgs(BIF_ARG_2,
- BIF_P,
- ERTS_PROC_LOCK_MAIN,
- ERTS_PORT_SFLGS_INVALID_LOOKUP);
- erts_smp_de_rwlock(dep);
+ if (is_internal_pid(BIF_ARG_2)) {
+ if (BIF_P->common.id == BIF_ARG_2) {
+ ErtsSetupConnDistCtrl scdc;
- if (!pp || (erts_atomic32_read_nob(&pp->state)
- & ERTS_PORT_SFLG_EXITING))
- goto badarg;
+ scdc.dep = dep;
+ scdc.flags = flags;
+ scdc.version = version;
- if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
- goto badarg;
+ res = setup_connection_distctrl(BIF_P, &scdc, NULL, NULL);
+ BUMP_REDS(BIF_P, 5);
+ dep = NULL;
- if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep)
- goto done; /* Already set */
+ if (res == am_badarg)
+ goto badarg;
- if (dep->status & ERTS_DE_SFLG_EXITING) {
- /* Suspend on dist entry waiting for the exit to finish */
- ErtsProcList *plp = erts_proclist_create(BIF_P);
- plp->next = NULL;
- erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
- erts_smp_mtx_lock(&dep->qlock);
- erts_proclist_store_last(&dep->suspended, plp);
- erts_smp_mtx_unlock(&dep->qlock);
- goto yield;
- }
+ ASSERT(is_internal_magic_ref(res));
+ res_tag = am_ok; /* Connection up */
+ }
+ else {
+ ErtsSetupConnDistCtrl *scdcp;
- ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING));
+ scdcp = erts_alloc(ERTS_ALC_T_SETUP_CONN_ARG,
+ sizeof(ErtsSetupConnDistCtrl));
- if (pp->dist_entry || is_not_nil(dep->cid))
- goto badarg;
+ scdcp->dep = dep;
+ scdcp->flags = flags;
+ scdcp->version = version;
- erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
+ res = erts_proc_sig_send_rpc_request(BIF_P,
+ BIF_ARG_2,
+ !0,
+ setup_connection_distctrl,
+ (void *) scdcp);
+ if (is_non_value(res))
+ goto badarg;
- /*
- * Dist-ports do not use the "busy port message queue" functionality, but
- * instead use "busy dist entry" functionality.
- */
- {
- ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
- erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
+ dep = NULL;
+
+ ASSERT(is_internal_ordinary_ref(res));
+
+ res_tag = am_message; /* Caller need to wait for dhandle in message */
+ }
+ hp = HAlloc(BIF_P, 3);
}
+ else {
+ Uint32 conn_id;
- pp->dist_entry = dep;
+ pp = erts_id2port_sflgs(BIF_ARG_2,
+ BIF_P,
+ ERTS_PROC_LOCK_MAIN,
+ ERTS_PORT_SFLGS_INVALID_LOOKUP);
+ erts_de_rwlock(dep);
+ de_locked = 1;
- dep->version = version;
- dep->creation = 0;
+ if (dep->state != ERTS_DE_STATE_PENDING)
+ goto badarg;
- ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
+ if (!pp || (erts_atomic32_read_nob(&pp->state)
+ & ERTS_PORT_SFLG_EXITING))
+ goto badarg;
-#if 1
- dep->send = (pp->drv_ptr->outputv
- ? dist_port_commandv
- : dist_port_command);
-#else
- dep->send = dist_port_command;
-#endif
- ASSERT(dep->send);
+ if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
+ goto badarg;
-#ifdef DEBUG
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize == 0);
- erts_smp_mtx_unlock(&dep->qlock);
-#endif
+ if (erts_prtsd_get(pp, ERTS_PRTSD_DIST_ENTRY) != NULL
+ || is_not_nil(dep->cid))
+ goto badarg;
- erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);
+ erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
- if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
- create_cache(dep);
+ erts_prtsd_set(pp, ERTS_PRTSD_DIST_ENTRY, dep);
+ erts_prtsd_set(pp, ERTS_PRTSD_CONN_ID, (void*)(UWord)dep->connection_id);
- erts_smp_de_rwunlock(dep);
- dep = NULL; /* inc of refc transferred to port (dist_entry field) */
+ ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
- inc_no_nodes();
+ dep->send = (pp->drv_ptr->outputv
+ ? dist_port_commandv
+ : dist_port_command);
+ ASSERT(dep->send);
+
+ /*
+ * Dist-ports do not use the "busy port message queue" functionality, but
+ * instead use "busy dist entry" functionality.
+ */
+ {
+ ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
+ erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
+ }
+
+ conn_id = dep->connection_id;
+ setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version);
+ de_locked = 0;
+
+ hp = HAlloc(BIF_P, 3 + ERTS_DHANDLE_SIZE);
+ res = erts_build_dhandle(&hp, &BIF_P->off_heap, dep, conn_id);
+ res_tag = am_ok; /* Connection up */
+ dep = NULL; /* inc of refc transferred to port (dist_entry field) */
+ }
+
+ ASSERT(is_value(res) && is_value(res_tag));
+
+ res = TUPLE2(hp, res_tag, res);
+
+ ERTS_BIF_PREP_RET(ret, res);
- send_nodes_mon_msgs(BIF_P,
- am_nodeup,
- BIF_ARG_1,
- flags & DFLAG_PUBLISHED ? am_visible : am_hidden,
- NIL);
done:
if (dep && dep != erts_this_dist_entry) {
- erts_smp_de_rwunlock(dep);
+ if (de_locked) {
+ if (de_locked > 0)
+ erts_de_rwunlock(dep);
+ else
+ erts_de_runlock(dep);
+ }
erts_deref_dist_entry(dep);
}
@@ -2838,97 +3338,290 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
return ret;
- yield:
- ERTS_BIF_PREP_YIELD3(ret, bif_export[BIF_setnode_3], BIF_P,
- BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
- goto done;
-
badarg:
- ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG);
+ ERTS_BIF_PREP_RET(ret, am_badarg);
goto done;
system_limit:
- ERTS_BIF_PREP_ERROR(ret, BIF_P, SYSTEM_LIMIT);
+ ERTS_BIF_PREP_RET(ret, am_system_limit);
goto done;
}
+static void
+setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
+ Eterm ctrlr, Uint flags,
+ Uint version)
+{
+ Eterm notify_proc = NIL;
+ erts_aint32_t qflgs;
-/**********************************************************************/
-/* dist_exit(Local, Term, Remote) -> Bool */
+ dep->version = version;
+ dep->creation = 0;
+
+ ASSERT(is_internal_port(ctrlr) || is_internal_pid(ctrlr));
+ ASSERT(dep->state == ERTS_DE_STATE_PENDING);
+
+ if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
+ create_cache(dep);
+
+ erts_set_dist_entry_connected(dep, ctrlr, flags);
+
+ notify_proc = NIL;
+ if (erts_atomic_read_nob(&dep->qsize)) {
+ if (is_internal_port(dep->cid)) {
+ erts_schedule_dist_command(NULL, dep);
+ }
+ else {
+ qflgs = erts_atomic32_read_nob(&dep->qflgs);
+ if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
+ qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
+ ~ERTS_DE_QFLG_REQ_INFO);
+ if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
+ notify_proc = dep->cid;
+ ASSERT(is_internal_pid(notify_proc));
+ }
+ }
+ }
+ }
+
+ erts_de_rwunlock(dep);
+
+ if (is_internal_pid(notify_proc))
+ notify_dist_data(c_p, notify_proc);
+
+ inc_no_nodes();
+
+ send_nodes_mon_msgs(c_p,
+ am_nodeup,
+ dep->sysname,
+ flags & DFLAG_PUBLISHED ? am_visible : am_hidden,
+ NIL);
+}
+
+static Eterm
+setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment **bpp)
+{
+ ErtsSetupConnDistCtrl *scdcp = (ErtsSetupConnDistCtrl *) arg;
+ DistEntry *dep = scdcp->dep;
+ int dep_locked = 0;
+ Eterm *hp;
+ Uint32 conn_id;
+
+ if (redsp)
+ *redsp = 1;
+
+ ASSERT(!ERTS_PROC_IS_EXITING(c_p));
+
+ erts_de_rwlock(dep);
+ dep_locked = !0;
+
+ if (dep->state != ERTS_DE_STATE_PENDING)
+ goto badarg;
+
+ conn_id = dep->connection_id;
+
+ if (is_not_nil(dep->cid))
+ goto badarg;
+
+ c_p->flags |= F_DISTRIBUTION;
+ ERTS_PROC_SET_DIST_ENTRY(c_p, dep);
+
+ dep->send = NULL; /* Only for distr ports... */
+
+ if (redsp)
+ *redsp = 5;
+
+ setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id,
+ scdcp->flags, scdcp->version);
+
+ /* we take over previous inc in refc of dep */
+
+ if (!bpp) /* called directly... */
+ return erts_make_dhandle(c_p, dep, conn_id);
+
+ erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg);
-BIF_RETTYPE dist_exit_3(BIF_ALIST_3)
+ *bpp = new_message_buffer(ERTS_DHANDLE_SIZE);
+ hp = (*bpp)->mem;
+ return erts_build_dhandle(&hp, &(*bpp)->off_heap, dep, conn_id);
+
+badarg:
+
+ if (bpp) /* not called directly */
+ erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg);
+
+ if (dep_locked)
+ erts_de_rwunlock(dep);
+
+ erts_deref_dist_entry(dep);
+
+ return am_badarg;
+}
+
+
+BIF_RETTYPE erts_internal_get_dflags_0(BIF_ALIST_0)
{
- Eterm local;
- Eterm remote;
- DistEntry *rdep;
+ return erts_dflags_record;
+}
- local = BIF_ARG_1;
- remote = BIF_ARG_3;
+BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1)
+{
+ DistEntry* dep;
+ Uint32 conn_id;
+ Eterm* hp;
+ Eterm dhandle;
- /* Check that remote is a remote process */
- if (is_not_external_pid(remote))
- goto error;
+ if (is_not_atom(BIF_ARG_1)) {
+ BIF_ERROR(BIF_P, BADARG);
+ }
+ dep = erts_find_or_insert_dist_entry(BIF_ARG_1);
- rdep = external_dist_entry(remote);
-
- if(rdep == erts_this_dist_entry)
- goto error;
+ if (dep == erts_this_dist_entry) {
+ erts_deref_dist_entry(dep);
+ BIF_ERROR(BIF_P, BADARG);
+ }
- /* Check that local is local */
- if (is_internal_pid(local)) {
- Process *lp;
- ErtsProcLocks lp_locks;
- if (BIF_P->common.id == local) {
- lp_locks = ERTS_PROC_LOCKS_ALL;
- lp = BIF_P;
- erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR);
- }
- else {
- lp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
- lp = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN,
- local, lp_locks);
- if (!lp) {
- BIF_RET(am_true); /* ignore */
- }
- }
-
- (void) erts_send_exit_signal(BIF_P,
- remote,
- lp,
- &lp_locks,
- BIF_ARG_2,
- NIL,
- NULL,
- 0);
-#ifdef ERTS_SMP
- if (lp == BIF_P)
- lp_locks &= ~ERTS_PROC_LOCK_MAIN;
-#endif
- erts_smp_proc_unlock(lp, lp_locks);
- if (lp == BIF_P) {
- erts_aint32_t state = erts_smp_atomic32_read_acqb(&BIF_P->state);
- /*
- * We may have exited current process and may have to take action.
- */
- if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) {
-#ifdef ERTS_SMP
- if (state & ERTS_PSFLG_PENDING_EXIT)
- erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN);
-#endif
- ERTS_BIF_EXITED(BIF_P);
- }
- }
+ erts_de_rwlock(dep);
+
+ switch (dep->state) {
+ case ERTS_DE_STATE_CONNECTED:
+ case ERTS_DE_STATE_EXITING:
+ case ERTS_DE_STATE_PENDING:
+ conn_id = dep->connection_id;
+ break;
+ case ERTS_DE_STATE_IDLE:
+ erts_set_dist_entry_pending(dep);
+ conn_id = dep->connection_id;
+ break;
+ default:
+ erts_exit(ERTS_ABORT_EXIT, "Invalid dep->state (%d)\n", dep->state);
+ }
+ erts_de_rwunlock(dep);
+ hp = HAlloc(BIF_P, ERTS_DHANDLE_SIZE);
+ dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep, conn_id);
+ erts_deref_dist_entry(dep);
+ BIF_RET(dhandle);
+}
+
+Sint erts_abort_connection_rwunlock(DistEntry* dep)
+{
+ ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep));
+
+ if (dep->state == ERTS_DE_STATE_CONNECTED) {
+ kill_connection(dep);
}
- else if (is_external_pid(local)
- && external_dist_entry(local) == erts_this_dist_entry) {
- BIF_RET(am_true); /* ignore */
+ else if (dep->state == ERTS_DE_STATE_PENDING) {
+ ErtsAtomCache *cache;
+ ErtsDistOutputBuf *obuf;
+ ErtsProcList *resume_procs;
+ Sint reds = 0;
+ ErtsMonLnkDist *mld;
+
+ ASSERT(is_nil(dep->cid));
+
+ mld = dep->mld;
+ dep->mld = NULL;
+
+ cache = dep->cache;
+ dep->cache = NULL;
+ erts_mtx_lock(&dep->qlock);
+ obuf = dep->out_queue.first;
+ dep->out_queue.first = NULL;
+ dep->out_queue.last = NULL;
+ ASSERT(!dep->tmp_out_queue.first);
+ ASSERT(!dep->finalized_out_queue.first);
+ resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
+ erts_mtx_unlock(&dep->qlock);
+ erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
+ dep->send = NULL;
+
+ erts_set_dist_entry_not_connected(dep);
+ erts_de_rwunlock(dep);
+
+ schedule_con_monitor_link_cleanup(mld, THE_NON_VALUE,
+ THE_NON_VALUE, THE_NON_VALUE);
+
+ if (resume_procs) {
+ int resumed = erts_resume_processes(resume_procs);
+ reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
+ }
+
+ delete_cache(cache);
+ free_de_out_queues(dep, obuf);
+ return reds;
}
- else
- goto error;
+ erts_de_rwunlock(dep);
+ return 0;
+}
+
+static Sint abort_connection(DistEntry *dep, Uint32 conn_id)
+{
+ erts_de_rwlock(dep);
+ if (dep->connection_id == conn_id)
+ return erts_abort_connection_rwunlock(dep);
+ erts_de_rwunlock(dep);
+ return 0;
+}
+
+BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2)
+{
+ DistEntry* dep;
+ Uint32 conn_id;
+ Sint reds;
+
+ if (is_not_atom(BIF_ARG_1))
+ BIF_ERROR(BIF_P, BADARG);
+ dep = erts_dhandle_to_dist_entry(BIF_ARG_2, &conn_id);
+ if (!dep || dep != erts_find_dist_entry(BIF_ARG_1)
+ || dep == erts_this_dist_entry) {
+ BIF_ERROR(BIF_P, BADARG);
+ }
+
+ reds = abort_connection(dep, conn_id);
+ BUMP_REDS(BIF_P, reds);
BIF_RET(am_true);
+}
- error:
- BIF_ERROR(BIF_P, BADARG);
+int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks)
+{
+ erts_de_rwlock(dep);
+ if (dep->state != ERTS_DE_STATE_IDLE) {
+ erts_de_rwunlock(dep);
+ }
+ else {
+ Process* net_kernel;
+ ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ;
+ Eterm *hp;
+ ErlOffHeap *ohp;
+ ErtsMessage *mp;
+ Eterm msg, dhandle;
+ Uint32 conn_id;
+
+ erts_set_dist_entry_pending(dep);
+ conn_id = dep->connection_id;
+ erts_de_rwunlock(dep);
+
+ net_kernel = erts_whereis_process(proc, proc_locks,
+ am_net_kernel, nk_locks, 0);
+ if (!net_kernel) {
+ abort_connection(dep, conn_id);
+ return 0;
+ }
+
+ /*
+ * Send {auto_connect, Node, DHandle} to net_kernel
+ */
+ mp = erts_alloc_message_heap(net_kernel, &nk_locks,
+ 4 + ERTS_DHANDLE_SIZE,
+ &hp, &ohp);
+ dhandle = erts_build_dhandle(&hp, ohp, dep, conn_id);
+ msg = TUPLE3(hp, am_auto_connect, dep->sysname, dhandle);
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
+ erts_queue_proc_message(proc, net_kernel, nk_locks, mp, msg);
+ erts_proc_unlock(net_kernel, nk_locks);
+ }
+
+ return 1;
}
/**********************************************************************/
@@ -3000,13 +3693,15 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
length = 0;
- erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx);
+ erts_rwmtx_rlock(&erts_dist_table_rwmtx);
ASSERT(erts_no_of_not_connected_dist_entries > 0);
ASSERT(erts_no_of_hidden_dist_entries >= 0);
+ ASSERT(erts_no_of_pending_dist_entries >= 0);
ASSERT(erts_no_of_visible_dist_entries >= 0);
if(not_connected)
- length += (erts_no_of_not_connected_dist_entries - 1);
+ length += ((erts_no_of_not_connected_dist_entries - 1)
+ + erts_no_of_pending_dist_entries);
if(hidden)
length += erts_no_of_hidden_dist_entries;
if(visible)
@@ -3017,7 +3712,7 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
result = NIL;
if (length == 0) {
- erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx);
+ erts_rwmtx_runlock(&erts_dist_table_rwmtx);
goto done;
}
@@ -3026,13 +3721,18 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
#ifdef DEBUG
endp = hp + length*2;
#endif
- if(not_connected)
+ if(not_connected) {
for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) {
if (dep != erts_this_dist_entry) {
result = CONS(hp, dep->sysname, result);
hp += 2;
}
+ }
+ for(dep = erts_pending_dist_entries; dep; dep = dep->next) {
+ result = CONS(hp, dep->sysname, result);
+ hp += 2;
}
+ }
if(hidden)
for(dep = erts_hidden_dist_entries; dep; dep = dep->next) {
result = CONS(hp, dep->sysname, result);
@@ -3048,7 +3748,7 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
hp += 2;
}
ASSERT(endp == hp);
- erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx);
+ erts_rwmtx_runlock(&erts_dist_table_rwmtx);
done:
UnUseTmpHeap(2,BIF_P);
@@ -3074,76 +3774,179 @@ BIF_RETTYPE is_alive_0(BIF_ALIST_0)
static BIF_RETTYPE
monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options)
{
- DistEntry *dep;
- ErtsLink *lnk;
+ BIF_RETTYPE ret;
+ DistEntry *dep = NULL;
Eterm l;
+ int async_connect = 1;
for (l = Options; l != NIL && is_list(l); l = CDR(list_val(l))) {
Eterm t = CAR(list_val(l));
- /* allow_passive_connect the only available option right now */
- if (t != am_allow_passive_connect) {
+ if (t == am_allow_passive_connect) {
+ /*
+ * Handle this horrible feature by falling back on old synchronous
+ * auto-connect (if needed)
+ */
+ async_connect = 0;
+ } else {
BIF_ERROR(p, BADARG);
}
}
if (l != NIL) {
BIF_ERROR(p, BADARG);
}
+ if (l != NIL)
+ goto badarg;
- if (is_not_atom(Node) ||
- ((Bool != am_true) && (Bool != am_false)) ||
- ((erts_this_node->sysname == am_Noname)
- && (Node != erts_this_node->sysname))) {
- BIF_ERROR(p, BADARG);
- }
- dep = erts_sysname_to_connected_dist_entry(Node);
- if (!dep) {
- do_trap:
- BIF_TRAP3(dmonitor_node_trap, p, Node, Bool, Options);
+ if (is_not_atom(Node))
+ goto badarg;
+
+ if (erts_this_node->sysname == am_Noname && Node != am_Noname)
+ goto badarg;
+
+ switch (Bool) {
+
+ case am_false: {
+ ErtsMonitor *mon;
+ /*
+ * Before OTP-21, monitor_node(Node, false) triggered
+ * auto-connect and a 'nodedown' message if that failed.
+ * Now it's a simple no-op which feels more reasonable.
+ */
+ mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(p), Node);
+ if (mon) {
+ ErtsMonitorDataExtended *mdep;
+ ASSERT(erts_monitor_is_origin(mon));
+
+ mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon);
+
+ ASSERT((mdep->u.refc > 0));
+ if (--mdep->u.refc == 0) {
+ if (!mdep->uptr.node_monitors)
+ erts_monitor_tree_delete(&ERTS_P_MONITORS(p), mon);
+ else {
+ ErtsMonitor *sub_mon;
+ ErtsMonitorDataExtended *sub_mdep;
+ sub_mon = erts_monitor_list_last(mdep->uptr.node_monitors);
+ erts_monitor_list_delete(&mdep->uptr.node_monitors, sub_mon);
+ sub_mon->flags &= ~ERTS_ML_FLG_IN_SUBTABLE;
+ sub_mdep = ((ErtsMonitorDataExtended *)
+ erts_monitor_to_data(sub_mon));
+ sub_mdep->uptr.node_monitors = mdep->uptr.node_monitors;
+ mdep->uptr.node_monitors = NULL;
+ erts_monitor_tree_replace(&ERTS_P_MONITORS(p), mon, sub_mon);
+ }
+ if (erts_monitor_dist_delete(&mdep->md.target))
+ erts_monitor_release_both((ErtsMonitorData *) mdep);
+ else
+ erts_monitor_release(mon);
+ }
+ }
+ break;
}
- if (dep == erts_this_dist_entry)
- goto done;
- erts_smp_proc_lock(p, ERTS_PROC_LOCK_LINK);
- erts_smp_de_rlock(dep);
- if (ERTS_DE_IS_NOT_CONNECTED(dep)) {
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK);
- erts_smp_de_runlock(dep);
- goto do_trap;
- }
- erts_smp_de_links_lock(dep);
- erts_smp_de_runlock(dep);
-
- if (Bool == am_true) {
- ASSERT(dep->cid != NIL);
- lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE,
- p->common.id);
- ++ERTS_LINK_REFC(lnk);
- lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node);
- ++ERTS_LINK_REFC(lnk);
- }
- else {
- lnk = erts_lookup_link(dep->node_links, p->common.id);
- if (lnk != NULL) {
- if ((--ERTS_LINK_REFC(lnk)) == 0) {
- erts_destroy_link(erts_remove_link(&(dep->node_links),
- p->common.id));
- }
- }
- lnk = erts_lookup_link(ERTS_P_LINKS(p), Node);
- if (lnk != NULL) {
- if ((--ERTS_LINK_REFC(lnk)) == 0) {
- erts_destroy_link(erts_remove_link(&ERTS_P_LINKS(p),
- Node));
+ case am_true: {
+ ErtsDSigData dsd;
+ dsd.node = Node;
+
+ dep = erts_find_or_insert_dist_entry(Node);
+ if (dep == erts_this_dist_entry)
+ break;
+
+ switch (erts_dsig_prepare(&dsd, dep, p,
+ ERTS_PROC_LOCK_MAIN,
+ ERTS_DSP_RLOCK, 0, async_connect)) {
+ case ERTS_DSIG_PREP_NOT_ALIVE:
+ case ERTS_DSIG_PREP_NOT_CONNECTED:
+ /* Trap to either send 'nodedown' or do passive connection attempt */
+ goto do_trap;
+ case ERTS_DSIG_PREP_PENDING:
+ if (!async_connect) {
+ /*
+ * Pending connection may fail, so we must trap
+ * to ensure passive connection attempt
+ */
+ erts_de_runlock(dep);
+ goto do_trap;
}
- }
+ /*fall through*/
+ case ERTS_DSIG_PREP_CONNECTED: {
+ ErtsMonitor *mon;
+ ErtsMonitorDataExtended *mdep;
+ int created;
+
+ mon = erts_monitor_tree_lookup_create(&ERTS_P_MONITORS(p),
+ &created,
+ ERTS_MON_TYPE_NODE,
+ p->common.id,
+ Node);
+ mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon);
+ if (created) {
+#ifdef DEBUG
+ int inserted =
+#endif
+ erts_monitor_dist_insert(&mdep->md.target, dep->mld);
+ ASSERT(inserted);
+ ASSERT(mdep->dist->connection_id == dep->connection_id);
+ }
+ else if (mdep->dist->connection_id != dep->connection_id) {
+ ErtsMonitorDataExtended *mdep2;
+ ErtsMonitor *mon2;
+#ifdef DEBUG
+ int inserted;
+#endif
+ mdep2 = ((ErtsMonitorDataExtended *)
+ erts_monitor_create(ERTS_MON_TYPE_NODE, NIL,
+ p->common.id, Node, NIL));
+ mon2 = &mdep2->md.origin;
+#ifdef DEBUG
+ inserted =
+#endif
+ erts_monitor_dist_insert(&mdep->md.target, dep->mld);
+ ASSERT(inserted);
+ ASSERT(mdep2->dist->connection_id == dep->connection_id);
+
+ mdep2->uptr.node_monitors = mdep->uptr.node_monitors;
+ mdep->uptr.node_monitors = NULL;
+ erts_monitor_tree_replace(&ERTS_P_MONITORS(p), mon, mon2);
+ erts_monitor_list_insert(&mdep2->uptr.node_monitors, mon);
+ mon->flags |= ERTS_ML_FLG_IN_SUBTABLE;
+ mdep = mdep2;
+ }
+
+ mdep->u.refc++;
+
+ break;
+ }
+
+ default:
+ ERTS_ASSERT(! "Invalid dsig prepare result");
+ }
+
+ erts_de_runlock(dep);
+
+ break;
}
- erts_smp_de_links_unlock(dep);
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK);
+ default:
+ goto badarg;
+ }
- done:
- erts_deref_dist_entry(dep);
- BIF_RET(am_true);
+ ERTS_BIF_PREP_RET(ret, am_true);
+
+do_return:
+
+ if (dep)
+ erts_deref_dist_entry(dep);
+
+ return ret;
+
+do_trap:
+ ERTS_BIF_PREP_TRAP3(ret, dmonitor_node_trap, p, Node, Bool, Options);
+ goto do_return;
+
+badarg:
+ ERTS_BIF_PREP_ERROR(ret, p, BADARG);
+ goto do_return;
}
BIF_RETTYPE monitor_node_3(BIF_ALIST_3)
@@ -3171,9 +3974,9 @@ BIF_RETTYPE net_kernel_dflag_unicode_io_1(BIF_ALIST_1)
if (de == erts_this_dist_entry) {
BIF_RET(am_true);
}
- erts_smp_de_rlock(de);
+ erts_de_rlock(de);
f = de->flags;
- erts_smp_de_runlock(de);
+ erts_de_runlock(de);
BIF_RET(((f & DFLAG_UNICODE_IO) ? am_true : am_false));
}
@@ -3194,18 +3997,9 @@ BIF_RETTYPE net_kernel_dflag_unicode_io_1(BIF_ALIST_1)
#define ERTS_NODES_MON_OPT_TYPES \
(ERTS_NODES_MON_OPT_TYPE_VISIBLE|ERTS_NODES_MON_OPT_TYPE_HIDDEN)
-typedef struct ErtsNodesMonitor_ ErtsNodesMonitor;
-struct ErtsNodesMonitor_ {
- ErtsNodesMonitor *prev;
- ErtsNodesMonitor *next;
- Process *proc;
- Uint16 opts;
- Uint16 no;
-};
-
-static erts_smp_mtx_t nodes_monitors_mtx;
-static ErtsNodesMonitor *nodes_monitors;
-static ErtsNodesMonitor *nodes_monitors_end;
+static erts_mtx_t nodes_monitors_mtx;
+static ErtsMonitor *nodes_monitors;
+static Uint no_nodes_monitors;
/*
* Nodes monitors are stored in a double linked list. 'nodes_monitors'
@@ -3221,111 +4015,172 @@ static ErtsNodesMonitor *nodes_monitors_end;
static void
init_nodes_monitors(void)
{
- erts_smp_mtx_init(&nodes_monitors_mtx, "nodes_monitors");
+ erts_mtx_init(&nodes_monitors_mtx, "nodes_monitors", NIL,
+ ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION);
nodes_monitors = NULL;
- nodes_monitors_end = NULL;
+ no_nodes_monitors = 0;
}
-static ERTS_INLINE Uint
-nodes_mon_msg_sz(ErtsNodesMonitor *nmp, Eterm what, Eterm reason)
+Eterm
+erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist)
{
- Uint sz;
- if (!nmp->opts) {
- sz = 3;
- }
- else {
- sz = 0;
+ Eterm key, old_value, opts_list = olist;
+ Uint opts = (Uint) 0;
- if (nmp->opts & ERTS_NODES_MON_OPT_TYPES)
- sz += 2 + 3;
+ ASSERT(c_p);
+ ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);
+
+ if (on != am_true && on != am_false)
+ return THE_NON_VALUE;
+
+ if (is_not_nil(opts_list)) {
+ int all = 0, visible = 0, hidden = 0;
- if (what == am_nodedown
- && (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON)) {
- if (is_not_immed(reason))
- sz += size_object(reason);
- sz += 2 + 3;
+ while (is_list(opts_list)) {
+ Eterm *cp = list_val(opts_list);
+ Eterm opt = CAR(cp);
+ opts_list = CDR(cp);
+ if (opt == am_nodedown_reason)
+ opts |= ERTS_NODES_MON_OPT_DOWN_REASON;
+ else if (is_tuple(opt)) {
+ Eterm* tp = tuple_val(opt);
+ if (arityval(tp[0]) != 2)
+ return THE_NON_VALUE;
+ switch (tp[1]) {
+ case am_node_type:
+ switch (tp[2]) {
+ case am_visible:
+ if (hidden || all)
+ return THE_NON_VALUE;
+ opts |= ERTS_NODES_MON_OPT_TYPE_VISIBLE;
+ visible = 1;
+ break;
+ case am_hidden:
+ if (visible || all)
+ return THE_NON_VALUE;
+ opts |= ERTS_NODES_MON_OPT_TYPE_HIDDEN;
+ hidden = 1;
+ break;
+ case am_all:
+ if (visible || hidden)
+ return THE_NON_VALUE;
+ opts |= ERTS_NODES_MON_OPT_TYPES;
+ all = 1;
+ break;
+ default:
+ return THE_NON_VALUE;
+ }
+ break;
+ default:
+ return THE_NON_VALUE;
+ }
+ }
+ else {
+ return THE_NON_VALUE;
+ }
}
- sz += 4;
+ if (is_not_nil(opts_list))
+ return THE_NON_VALUE;
+ }
+
+ key = make_small(opts);
+
+ if (on == am_true) {
+ ErtsMonitorDataExtended *mdep;
+ ErtsMonitor *omon;
+ int created;
+ omon = erts_monitor_tree_lookup_create(&ERTS_P_MONITORS(c_p),
+ &created,
+ ERTS_MON_TYPE_NODES,
+ c_p->common.id,
+ key);
+ mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(omon);
+ if (created) {
+ erts_mtx_lock(&nodes_monitors_mtx);
+ no_nodes_monitors++;
+ erts_monitor_list_insert(&nodes_monitors, &mdep->md.target);
+ erts_mtx_unlock(&nodes_monitors_mtx);
+ }
+ old_value = mdep->u.refc;
+ mdep->u.refc++;
+ }
+ else {
+ ErtsMonitorDataExtended *mdep;
+ ErtsMonitor *omon;
+ omon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p), key);
+ if (!omon)
+ old_value = 0;
+ else {
+ mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(omon);
+ old_value = mdep->u.refc;
+ ASSERT(mdep->u.refc > 0);
+ erts_mtx_lock(&nodes_monitors_mtx);
+ ASSERT(no_nodes_monitors > 0);
+ no_nodes_monitors--;
+ ASSERT(erts_monitor_is_in_table(&mdep->md.target));
+ erts_monitor_list_delete(&nodes_monitors, &mdep->md.target);
+ erts_mtx_unlock(&nodes_monitors_mtx);
+ erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p), omon);
+ erts_monitor_release_both((ErtsMonitorData *) mdep);
+ }
}
- return sz;
+
+ return erts_make_integer(old_value, c_p);
}
-static ERTS_INLINE void
-send_nodes_mon_msg(Process *rp,
- ErtsProcLocks *rp_locksp,
- ErtsNodesMonitor *nmp,
- Eterm node,
- Eterm what,
- Eterm type,
- Eterm reason,
- Uint sz)
+void
+erts_monitor_nodes_delete(ErtsMonitor *omon)
{
- Eterm msg;
- Eterm *hp;
- ErtsMessage *mp;
- ErlOffHeap *ohp;
-#ifdef DEBUG
- Eterm *hend;
-#endif
+ ErtsMonitorData *mdp;
- mp = erts_alloc_message_heap(rp, rp_locksp, sz, &hp, &ohp);
-#ifdef DEBUG
- hend = hp + sz;
-#endif
+ ASSERT(omon->type == ERTS_MON_TYPE_NODES);
+ ASSERT(erts_monitor_is_origin(omon));
- if (!nmp->opts) {
- msg = TUPLE2(hp, what, node);
-#ifdef DEBUG
- hp += 3;
-#endif
- }
- else {
- Eterm tup;
- Eterm info = NIL;
+ mdp = erts_monitor_to_data(omon);
- if (nmp->opts & (ERTS_NODES_MON_OPT_TYPE_VISIBLE
- | ERTS_NODES_MON_OPT_TYPE_HIDDEN)) {
+ erts_mtx_lock(&nodes_monitors_mtx);
+ ASSERT(erts_monitor_is_in_table(&mdp->target));
+ ASSERT(no_nodes_monitors > 0);
+ no_nodes_monitors--;
+ erts_monitor_list_delete(&nodes_monitors, &mdp->target);
+ erts_mtx_unlock(&nodes_monitors_mtx);
+ erts_monitor_release_both(mdp);
+}
- tup = TUPLE2(hp, am_node_type, type);
- hp += 3;
- info = CONS(hp, tup, info);
- hp += 2;
- }
+typedef struct {
+ Eterm pid;
+ Eterm options;
+} ErtsNodesMonitorData;
- if (what == am_nodedown
- && (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON)) {
- Eterm rsn_cpy;
-
- if (is_immed(reason))
- rsn_cpy = reason;
- else {
- Eterm rsn_sz = size_object(reason);
- rsn_cpy = copy_struct(reason, rsn_sz, &hp, ohp);
- }
+typedef struct {
+ ErtsNodesMonitorData *nmdp;
+ Uint i;
+} ErtsNodesMonitorContext;
- tup = TUPLE2(hp, am_nodedown_reason, rsn_cpy);
- hp += 3;
- info = CONS(hp, tup, info);
- hp += 2;
- }
+static void
+save_nodes_monitor(ErtsMonitor *mon, void *vctxt)
+{
+ ErtsNodesMonitorContext *ctxt = vctxt;
+ ErtsMonitorData *mdp = erts_monitor_to_data(mon);
- msg = TUPLE3(hp, what, node, info);
-#ifdef DEBUG
- hp += 4;
-#endif
- }
+ ASSERT(erts_monitor_is_target(mon));
+ ASSERT(mon->type == ERTS_MON_TYPE_NODES);
- ASSERT(hend == hp);
- erts_queue_message(rp, *rp_locksp, mp, msg, am_system);
+ ctxt->nmdp[ctxt->i].pid = mon->other.item;
+ ctxt->nmdp[ctxt->i].options = mdp->origin.other.item;
+
+ ctxt->i++;
}
static void
send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reason)
{
- ErtsNodesMonitor *nmp;
- ErtsProcLocks rp_locks = 0; /* Init to shut up false warning */
- Process *rp = NULL;
+ Uint opts;
+ Uint i, no, reason_size;
+ ErtsNodesMonitorData def_buf[100];
+ ErtsNodesMonitorData *nmdp = &def_buf[0];
+ ErtsNodesMonitorContext ctxt;
ASSERT(is_immed(what));
ASSERT(is_immed(node));
@@ -3346,31 +4201,44 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas
}
#endif
- ERTS_SMP_LC_ASSERT(!c_p
- || (erts_proc_lc_my_proc_locks(c_p)
- == ERTS_PROC_LOCK_MAIN));
- erts_smp_mtx_lock(&nodes_monitors_mtx);
+ ctxt.i = 0;
+
+ reason_size = is_immed(reason) ? 0 : size_object(reason);
- for (nmp = nodes_monitors; nmp; nmp = nmp->next) {
- int i;
- Uint16 no;
- Uint sz;
+ erts_mtx_lock(&nodes_monitors_mtx);
+ if (no_nodes_monitors > sizeof(def_buf)/sizeof(def_buf[0]))
+ nmdp = erts_alloc(ERTS_ALC_T_TMP,
+ no_nodes_monitors*sizeof(ErtsNodesMonitorData));
+ ctxt.nmdp = nmdp;
+ erts_monitor_list_foreach(nodes_monitors,
+ save_nodes_monitor,
+ (void *) &ctxt);
+
+ ASSERT(ctxt.i == no_nodes_monitors);
+ no = no_nodes_monitors;
+
+ erts_mtx_unlock(&nodes_monitors_mtx);
- ASSERT(nmp->proc != NULL);
+ for (i = 0; i < no; i++) {
+ Eterm tmp_heap[3+2+3+2+4 /* max need */];
+ Eterm *hp, msg;
+ Uint hsz;
- if (!nmp->opts) {
+ ASSERT(is_small(nmdp[i].options));
+ opts = (Uint) signed_val(nmdp[i].options);
+ if (!opts) {
if (type != am_visible)
continue;
}
else {
switch (type) {
case am_hidden:
- if (!(nmp->opts & ERTS_NODES_MON_OPT_TYPE_HIDDEN))
+ if (!(opts & ERTS_NODES_MON_OPT_TYPE_HIDDEN))
continue;
break;
case am_visible:
- if ((nmp->opts & ERTS_NODES_MON_OPT_TYPES)
- && !(nmp->opts & ERTS_NODES_MON_OPT_TYPE_VISIBLE))
+ if ((opts & ERTS_NODES_MON_OPT_TYPES)
+ && !(opts & ERTS_NODES_MON_OPT_TYPE_VISIBLE))
continue;
break;
default:
@@ -3378,342 +4246,162 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas
}
}
- if (rp != nmp->proc) {
- if (rp) {
- if (rp == c_p)
- rp_locks &= ~ERTS_PROC_LOCK_MAIN;
- erts_smp_proc_unlock(rp, rp_locks);
- }
+ hsz = 0;
+ hp = &tmp_heap[0];
- rp = nmp->proc;
- rp_locks = 0;
- if (rp == c_p)
- rp_locks |= ERTS_PROC_LOCK_MAIN;
- }
+ if (!opts) {
+ msg = TUPLE2(hp, what, node);
+ hp += 3;
+ }
+ else {
+ Eterm tup;
+ Eterm info = NIL;
+
+ if (opts & (ERTS_NODES_MON_OPT_TYPE_VISIBLE
+ | ERTS_NODES_MON_OPT_TYPE_HIDDEN)) {
+
+ tup = TUPLE2(hp, am_node_type, type);
+ hp += 3;
+ info = CONS(hp, tup, info);
+ hp += 2;
+ }
+
+ if (what == am_nodedown
+ && (opts & ERTS_NODES_MON_OPT_DOWN_REASON)) {
+ hsz += reason_size;
+ tup = TUPLE2(hp, am_nodedown_reason, reason);
+ hp += 3;
+ info = CONS(hp, tup, info);
+ hp += 2;
+ }
+
+ msg = TUPLE3(hp, what, node, info);
+ hp += 4;
+ }
- ASSERT(rp);
+ ASSERT(hp - &tmp_heap[0] <= sizeof(tmp_heap)/sizeof(tmp_heap[0]));
- sz = nodes_mon_msg_sz(nmp, what, reason);
+ hsz += hp - &tmp_heap[0];
- for (i = 0, no = nmp->no; i < no; i++)
- send_nodes_mon_msg(rp,
- &rp_locks,
- nmp,
- node,
- what,
- type,
- reason,
- sz);
+ erts_proc_sig_send_persistent_monitor_msg(ERTS_MON_TYPE_NODES,
+ nmdp[i].options,
+ am_system,
+ nmdp[i].pid,
+ msg,
+ hsz);
}
- if (rp) {
- if (rp == c_p)
- rp_locks &= ~ERTS_PROC_LOCK_MAIN;
- erts_smp_proc_unlock(rp, rp_locks);
- }
-
- erts_smp_mtx_unlock(&nodes_monitors_mtx);
+ if (nmdp != &def_buf[0])
+ erts_free(ERTS_ALC_T_TMP, nmdp);
}
+
-static Eterm
-insert_nodes_monitor(Process *c_p, Uint32 opts)
-{
- Uint16 no = 1;
- Eterm res = am_false;
- ErtsNodesMonitor *xnmp, *nmp;
-
- ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&nodes_monitors_mtx));
- ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN);
-
- xnmp = c_p->nodes_monitors;
- if (xnmp) {
- ASSERT(!xnmp->prev || xnmp->prev->proc != c_p);
-
- while (1) {
- ASSERT(xnmp->proc == c_p);
- if (xnmp->opts == opts)
- break;
- if (!xnmp->next || xnmp->next->proc != c_p)
- break;
- xnmp = xnmp->next;
- }
- ASSERT(xnmp);
- ASSERT(xnmp->proc == c_p);
- ASSERT(xnmp->opts == opts
- || !xnmp->next
- || xnmp->next->proc != c_p);
-
- if (xnmp->opts != opts)
- goto alloc_new;
- else {
- res = am_true;
- no = xnmp->no++;
- if (!xnmp->no) {
- /*
- * 'no' wrapped; transfer all prevous monitors to new
- * element (which will be the next element in the list)
- * and set this to one...
- */
- xnmp->no = 1;
- goto alloc_new;
- }
- }
- }
- else {
- alloc_new:
- nmp = erts_alloc(ERTS_ALC_T_NODES_MON, sizeof(ErtsNodesMonitor));
- nmp->proc = c_p;
- nmp->opts = opts;
- nmp->no = no;
-
- if (xnmp) {
- ASSERT(nodes_monitors);
- ASSERT(c_p->nodes_monitors);
- nmp->next = xnmp->next;
- nmp->prev = xnmp;
- xnmp->next = nmp;
- if (nmp->next) {
- ASSERT(nodes_monitors_end != xnmp);
- ASSERT(nmp->next->prev == xnmp);
- nmp->next->prev = nmp;
- }
- else {
- ASSERT(nodes_monitors_end == xnmp);
- nodes_monitors_end = nmp;
- }
- }
- else {
- ASSERT(!c_p->nodes_monitors);
- c_p->nodes_monitors = nmp;
- nmp->next = NULL;
- nmp->prev = nodes_monitors_end;
- if (nodes_monitors_end) {
- ASSERT(nodes_monitors);
- nodes_monitors_end->next = nmp;
- }
- else {
- ASSERT(!nodes_monitors);
- nodes_monitors = nmp;
- }
- nodes_monitors_end = nmp;
- }
- }
- return res;
-}
-
-static Eterm
-remove_nodes_monitors(Process *c_p, Uint32 opts, int all)
-{
- Eterm res = am_false;
- ErtsNodesMonitor *nmp;
-
- ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&nodes_monitors_mtx));
- ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN);
-
- nmp = c_p->nodes_monitors;
- ASSERT(!nmp || !nmp->prev || nmp->prev->proc != c_p);
-
- while (nmp && nmp->proc == c_p) {
- if (!all && nmp->opts != opts)
- nmp = nmp->next;
- else { /* if (all || nmp->opts == opts) */
- ErtsNodesMonitor *free_nmp;
- res = am_true;
- if (nmp->prev) {
- ASSERT(nodes_monitors != nmp);
- nmp->prev->next = nmp->next;
- }
- else {
- ASSERT(nodes_monitors == nmp);
- nodes_monitors = nmp->next;
- }
- if (nmp->next) {
- ASSERT(nodes_monitors_end != nmp);
- nmp->next->prev = nmp->prev;
- }
- else {
- ASSERT(nodes_monitors_end == nmp);
- nodes_monitors_end = nmp->prev;
- }
- free_nmp = nmp;
- nmp = nmp->next;
- if (c_p->nodes_monitors == free_nmp)
- c_p->nodes_monitors = nmp && nmp->proc == c_p ? nmp : NULL;
- erts_free(ERTS_ALC_T_NODES_MON, free_nmp);
- }
- }
-
- ASSERT(!all || !c_p->nodes_monitors);
- return res;
-}
-
-void
-erts_delete_nodes_monitors(Process *c_p, ErtsProcLocks locks)
-{
-#if defined(ERTS_ENABLE_LOCK_CHECK) && defined(ERTS_SMP)
- if (c_p) {
- ErtsProcLocks might_unlock = locks & ~ERTS_PROC_LOCK_MAIN;
- if (might_unlock)
- erts_proc_lc_might_unlock(c_p, might_unlock);
- }
-#endif
- if (erts_smp_mtx_trylock(&nodes_monitors_mtx) == EBUSY) {
- ErtsProcLocks unlock_locks = locks & ~ERTS_PROC_LOCK_MAIN;
- if (c_p && unlock_locks)
- erts_smp_proc_unlock(c_p, unlock_locks);
- erts_smp_mtx_lock(&nodes_monitors_mtx);
- if (c_p && unlock_locks)
- erts_smp_proc_lock(c_p, unlock_locks);
- }
- remove_nodes_monitors(c_p, 0, 1);
- erts_smp_mtx_unlock(&nodes_monitors_mtx);
-}
-
-Eterm
-erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist)
-{
+typedef struct {
+ Eterm **hpp;
+ Uint *szp;
Eterm res;
- Eterm opts_list = olist;
- Uint16 opts = (Uint16) 0;
+} ErtsNodesMonitorInfoContext;
- ASSERT(c_p);
- ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);
- if (on != am_true && on != am_false)
- return THE_NON_VALUE;
-
- if (is_not_nil(opts_list)) {
- int all = 0, visible = 0, hidden = 0;
-
- while (is_list(opts_list)) {
- Eterm *cp = list_val(opts_list);
- Eterm opt = CAR(cp);
- opts_list = CDR(cp);
- if (opt == am_nodedown_reason)
- opts |= ERTS_NODES_MON_OPT_DOWN_REASON;
- else if (is_tuple(opt)) {
- Eterm* tp = tuple_val(opt);
- if (arityval(tp[0]) != 2)
- return THE_NON_VALUE;
- switch (tp[1]) {
- case am_node_type:
- switch (tp[2]) {
- case am_visible:
- if (hidden || all)
- return THE_NON_VALUE;
- opts |= ERTS_NODES_MON_OPT_TYPE_VISIBLE;
- visible = 1;
- break;
- case am_hidden:
- if (visible || all)
- return THE_NON_VALUE;
- opts |= ERTS_NODES_MON_OPT_TYPE_HIDDEN;
- hidden = 1;
- break;
- case am_all:
- if (visible || hidden)
- return THE_NON_VALUE;
- opts |= ERTS_NODES_MON_OPT_TYPES;
- all = 1;
- break;
- default:
- return THE_NON_VALUE;
- }
- break;
- default:
- return THE_NON_VALUE;
- }
- }
- else {
- return THE_NON_VALUE;
- }
- }
-
- if (is_not_nil(opts_list))
- return THE_NON_VALUE;
+static void
+nodes_monitor_info(ErtsMonitor *mon, void *vctxt)
+{
+ ErtsMonitorDataExtended *mdep;
+ ErtsNodesMonitorInfoContext *ctxt = vctxt;
+ Uint no, i, opts, *szp;
+ Eterm **hpp, res;
+
+ hpp = ctxt->hpp;
+ szp = ctxt->szp;
+ res = ctxt->res;
+
+ ASSERT(erts_monitor_is_target(mon));
+ ASSERT(mon->type == ERTS_MON_TYPE_NODES);
+ mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon);
+ no = mdep->u.refc;
+
+ ASSERT(is_small(mdep->md.origin.other.item));
+ opts = (Uint) signed_val(mdep->md.origin.other.item);
+
+ for (i = 0; i < no; i++) {
+ Eterm olist = NIL;
+ if (opts & ERTS_NODES_MON_OPT_TYPES) {
+ Eterm type;
+ switch (opts & ERTS_NODES_MON_OPT_TYPES) {
+ case ERTS_NODES_MON_OPT_TYPES: type = am_all; break;
+ case ERTS_NODES_MON_OPT_TYPE_VISIBLE: type = am_visible; break;
+ case ERTS_NODES_MON_OPT_TYPE_HIDDEN: type = am_hidden; break;
+ default: erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n");
+ }
+ olist = erts_bld_cons(hpp, szp,
+ erts_bld_tuple(hpp, szp, 2,
+ am_node_type,
+ type),
+ olist);
+ }
+ if (opts & ERTS_NODES_MON_OPT_DOWN_REASON)
+ olist = erts_bld_cons(hpp, szp, am_nodedown_reason, olist);
+ res = erts_bld_cons(hpp, szp,
+ erts_bld_tuple(hpp, szp, 2,
+ mon->other.item,
+ olist),
+ res);
}
- erts_smp_mtx_lock(&nodes_monitors_mtx);
-
- if (on == am_true)
- res = insert_nodes_monitor(c_p, opts);
- else
- res = remove_nodes_monitors(c_p, opts, 0);
-
- erts_smp_mtx_unlock(&nodes_monitors_mtx);
-
- return res;
+ ctxt->hpp = hpp;
+ ctxt->szp = szp;
+ ctxt->res = res;
}
-/*
- * Note, this function is only used for debuging.
- */
-
Eterm
erts_processes_monitoring_nodes(Process *c_p)
{
- ErtsNodesMonitor *nmp;
- Eterm res;
+ /*
+ * Note, this function is only used for debugging.
+ */
+ ErtsNodesMonitorInfoContext ctxt;
Eterm *hp;
- Eterm **hpp;
Uint sz;
- Uint *szp;
#ifdef DEBUG
Eterm *hend;
#endif
ASSERT(c_p);
- ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);
- erts_smp_mtx_lock(&nodes_monitors_mtx);
+ ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);
+
+ erts_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN);
+ erts_thr_progress_block();
+
+ erts_mtx_lock(&nodes_monitors_mtx);
sz = 0;
- szp = &sz;
- hpp = NULL;
+ ctxt.szp = &sz;
+ ctxt.hpp = NULL;
- bld_result:
- res = NIL;
-
- for (nmp = nodes_monitors_end; nmp; nmp = nmp->prev) {
- Uint16 i;
- for (i = 0; i < nmp->no; i++) {
- Eterm olist = NIL;
- if (nmp->opts & ERTS_NODES_MON_OPT_TYPES) {
- Eterm type;
- switch (nmp->opts & ERTS_NODES_MON_OPT_TYPES) {
- case ERTS_NODES_MON_OPT_TYPES: type = am_all; break;
- case ERTS_NODES_MON_OPT_TYPE_VISIBLE: type = am_visible; break;
- case ERTS_NODES_MON_OPT_TYPE_HIDDEN: type = am_hidden; break;
- default: erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n");
- }
- olist = erts_bld_cons(hpp, szp,
- erts_bld_tuple(hpp, szp, 2,
- am_node_type,
- type),
- olist);
- }
- if (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON)
- olist = erts_bld_cons(hpp, szp, am_nodedown_reason, olist);
- res = erts_bld_cons(hpp, szp,
- erts_bld_tuple(hpp, szp, 2,
- nmp->proc->common.id,
- olist),
- res);
- }
- }
+ while (1) {
+ ctxt.res = NIL;
+
+ erts_monitor_list_foreach(nodes_monitors,
+ nodes_monitor_info,
+ (void *) &ctxt);
+
+ if (ctxt.hpp)
+ break;
- if (!hpp) {
hp = HAlloc(c_p, sz);
#ifdef DEBUG
hend = hp + sz;
#endif
- hpp = &hp;
- szp = NULL;
- goto bld_result;
+ ctxt.hpp = &hp;
+ ctxt.szp = NULL;
}
ASSERT(hp == hend);
- erts_smp_mtx_unlock(&nodes_monitors_mtx);
+ erts_mtx_unlock(&nodes_monitors_mtx);
- return res;
+ erts_thr_progress_unblock();
+ erts_proc_lock(c_p, ERTS_PROC_LOCK_MAIN);
+
+ return ctxt.res;
}