aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c1981
1 files changed, 1314 insertions, 667 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index ec07ddcd9c..bc168fc58d 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -1,18 +1,19 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1996-2013. All Rights Reserved.
+ * Copyright Ericsson AB 1996-2017. All Rights Reserved.
*
- * The contents of this file are subject to the Erlang Public License,
- * Version 1.1, (the "License"); you may not use this file except in
- * compliance with the License. You should have received a copy of the
- * Erlang Public License along with this software. If not, it can be
- * retrieved online at http://www.erlang.org/.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * Software distributed under the License is distributed on an "AS IS"
- * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
- * the License for the specific language governing rights and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
* %CopyrightEnd%
*/
@@ -24,6 +25,7 @@
/* define this to get a lot of debug output */
/* #define ERTS_DIST_MSG_DBG */
+/* #define ERTS_RAW_DIST_MSG_DBG */
#ifdef HAVE_CONFIG_H
# include "config.h"
@@ -44,6 +46,8 @@
#include "erl_thr_progress.h"
#include "dtrace-wrapper.h"
+#define DIST_CTL_DEFAULT_SIZE 64
+
/* Turn this on to get printouts of all distribution messages
* which go on the line
*/
@@ -65,9 +69,13 @@ static void bw(byte *buf, ErlDrvSizeT sz)
static void
dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
{
+ ErtsHeapFactory factory;
+ DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
+ Eterm* ctl = ctl_default;
byte *extp = edep->extp;
Eterm msg;
- Sint size = erts_decode_dist_ext_size(edep);
+ Sint ctl_len;
+ Sint size = ctl_len = erts_decode_dist_ext_size(edep);
if (size < 0) {
erts_fprintf(stderr,
"DIST MSG DEBUG: erts_decode_dist_ext_size(%s) failed:\n",
@@ -75,10 +83,9 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
bw(buf, sz);
}
else {
- Eterm *hp;
ErlHeapFragment *mbuf = new_message_buffer(size);
- hp = mbuf->mem;
- msg = erts_decode_dist_ext(&hp, &mbuf->off_heap, edep);
+ erts_factory_static_init(&factory, ctl, ctl_len, &mbuf->off_heap);
+ msg = erts_decode_dist_ext(&factory, edep);
if (is_value(msg))
erts_fprintf(stderr, " %s: %T\n", what, msg);
else {
@@ -114,17 +121,17 @@ Export* dexit_trap = NULL;
Export* dmonitor_p_trap = NULL;
/* local variables */
-
+static Export *dist_ctrl_put_data_trap;
/* forward declarations */
static void clear_dist_entry(DistEntry*);
-static int dsig_send(ErtsDSigData *, Eterm, Eterm, int);
+static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy);
static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm);
static void init_nodes_monitors(void);
-static erts_smp_atomic_t no_caches;
-static erts_smp_atomic_t no_nodes;
+static erts_atomic_t no_caches;
+static erts_atomic_t no_nodes;
struct {
Eterm reason;
@@ -137,8 +144,8 @@ delete_cache(ErtsAtomCache *cache)
{
if (cache) {
erts_free(ERTS_ALC_T_DCACHE, (void *) cache);
- ASSERT(erts_smp_atomic_read_nob(&no_caches) > 0);
- erts_smp_atomic_dec_nob(&no_caches);
+ ASSERT(erts_atomic_read_nob(&no_caches) > 0);
+ erts_atomic_dec_nob(&no_caches);
}
}
@@ -149,14 +156,12 @@ create_cache(DistEntry *dep)
int i;
ErtsAtomCache *cp;
- ERTS_SMP_LC_ASSERT(
- is_internal_port(dep->cid)
- && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid)));
+ ERTS_LC_ASSERT(is_nil(dep->cid));
ASSERT(!dep->cache);
dep->cache = cp = (ErtsAtomCache*) erts_alloc(ERTS_ALC_T_DCACHE,
sizeof(ErtsAtomCache));
- erts_smp_atomic_inc_nob(&no_caches);
+ erts_atomic_inc_nob(&no_caches);
for (i = 0; i < sizeof(cp->in_arr)/sizeof(cp->in_arr[0]); i++) {
cp->in_arr[i] = THE_NON_VALUE;
cp->out_arr[i] = THE_NON_VALUE;
@@ -165,15 +170,17 @@ create_cache(DistEntry *dep)
Uint erts_dist_cache_size(void)
{
- return (Uint) erts_smp_atomic_read_mb(&no_caches)*sizeof(ErtsAtomCache);
+ return (Uint) erts_atomic_read_mb(&no_caches)*sizeof(ErtsAtomCache);
}
static ErtsProcList *
-get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs)
+get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs)
{
- ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&dep->qlock));
- dep->qflgs &= ~unset_qflgs;
- if (dep->qflgs & ERTS_DE_QFLG_EXIT) {
+ erts_aint32_t qflgs;
+ ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));
+ qflgs = erts_atomic32_read_band_acqb(&dep->qflgs, ~unset_qflgs);
+ qflgs &= ~unset_qflgs;
+ if (qflgs & ERTS_DE_QFLG_EXIT) {
/* No resume when exit has been scheduled */
return NULL;
}
@@ -252,12 +259,12 @@ static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp)
DistEntry *dep = ((NetExitsContext *) vnecp)->dep;
ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK;
- rp = erts_pid2proc(NULL, 0, mon->pid, rp_locks);
+ rp = erts_pid2proc(NULL, 0, mon->u.pid, rp_locks);
if (!rp)
goto done;
if (mon->type == MON_ORIGIN) {
- /* local pid is beeing monitored */
+ /* local pid is being monitored */
rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref);
/* ASSERT(rmon != NULL); nope, can happen during process exit */
if (rmon != NULL) {
@@ -271,21 +278,20 @@ static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp)
rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref);
/* ASSERT(rmon != NULL); can happen during process exit */
if (rmon != NULL) {
+ ASSERT(rmon->type == MON_ORIGIN);
ASSERT(is_atom(rmon->name) || is_nil(rmon->name));
watched = (is_atom(rmon->name)
? TUPLE2(lhp, rmon->name, dep->sysname)
- : rmon->pid);
-#ifdef ERTS_SMP
+ : rmon->u.pid);
rp_locks |= ERTS_PROC_LOCKS_MSG_SEND;
- erts_smp_proc_lock(rp, ERTS_PROC_LOCKS_MSG_SEND);
-#endif
+ erts_proc_lock(rp, ERTS_PROC_LOCKS_MSG_SEND);
erts_queue_monitor_message(rp, &rp_locks, mon->ref, am_process,
watched, am_noconnection);
erts_destroy_monitor(rmon);
}
UnUseTmpHeapNoproc(3);
}
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
done:
erts_destroy_monitor(mon);
}
@@ -331,10 +337,10 @@ static void doit_link_net_exits_sub(ErtsLink *sublnk, void *vlnecp)
erts_destroy_link(rlnk);
if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) {
/* We didn't exit the process and it is traced */
- trace_proc(NULL, rp, am_getting_unlinked, sublnk->pid);
+ trace_proc(NULL, 0, rp, am_getting_unlinked, sublnk->pid);
}
}
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
}
done:
erts_destroy_link(sublnk);
@@ -372,10 +378,11 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp)
ASSERT(lnk->type == LINK_NODE);
if (is_internal_pid(lnk->pid)) {
ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK;
- rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks);
- if (!rp) {
+ ErlOffHeap *ohp;
+ rp = erts_proc_lookup(lnk->pid);
+ if (!rp)
goto done;
- }
+ erts_proc_lock(rp, rp_locks);
rlnk = erts_remove_link(&ERTS_P_LINKS(rp), name);
if (rlnk != NULL) {
ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE));
@@ -383,18 +390,16 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp)
}
n = ERTS_LINK_REFC(lnk);
for (i = 0; i < n; ++i) {
- ErlHeapFragment* bp;
- ErlOffHeap *ohp;
Eterm tup;
- Eterm *hp = erts_alloc_message_heap(3,&bp,&ohp,rp,&rp_locks);
+ Eterm *hp;
+ ErtsMessage *msgp;
+
+ msgp = erts_alloc_message_heap(rp, &rp_locks,
+ 3, &hp, &ohp);
tup = TUPLE2(hp, am_nodedown, name);
- erts_queue_message(rp, &rp_locks, bp, tup, NIL
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, rp_locks, msgp, tup, am_system);
}
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
}
done:
erts_destroy_link(lnk);
@@ -406,16 +411,16 @@ set_node_not_alive(void *unused)
ErlHeapFragment *bp;
Eterm nodename = erts_this_dist_entry->sysname;
- ASSERT(erts_smp_atomic_read_nob(&no_nodes) == 0);
+ ASSERT(erts_atomic_read_nob(&no_nodes) == 0);
- erts_smp_thr_progress_block();
+ erts_thr_progress_block();
erts_set_this_node(am_Noname, 0);
erts_is_alive = 0;
send_nodes_mon_msgs(NULL, am_nodedown, nodename, am_visible, nodedown.reason);
nodedown.reason = NIL;
bp = nodedown.bp;
nodedown.bp = NULL;
- erts_smp_thr_progress_unblock();
+ erts_thr_progress_unblock();
if (bp)
free_message_buffer(bp);
}
@@ -423,7 +428,7 @@ set_node_not_alive(void *unused)
static ERTS_INLINE void
dec_no_nodes(void)
{
- erts_aint_t no = erts_smp_atomic_dec_read_mb(&no_nodes);
+ erts_aint_t no = erts_atomic_dec_read_mb(&no_nodes);
ASSERT(no >= 0);
ASSERT(erts_get_scheduler_id()); /* Need to be a scheduler */
if (no == 0)
@@ -436,12 +441,40 @@ static ERTS_INLINE void
inc_no_nodes(void)
{
#ifdef DEBUG
- erts_aint_t no = erts_smp_atomic_read_nob(&no_nodes);
+ erts_aint_t no = erts_atomic_read_nob(&no_nodes);
ASSERT(erts_is_alive ? no > 0 : no == 0);
#endif
- erts_smp_atomic_inc_mb(&no_nodes);
+ erts_atomic_inc_mb(&no_nodes);
}
-
+
+static void
+kill_dist_ctrl_proc(void *vpid)
+{
+ Eterm pid = (Eterm) vpid;
+ ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
+ Process *rp = erts_pid2proc(NULL, 0, pid, rp_locks);
+ if (rp) {
+ erts_send_exit_signal(NULL, rp->common.id, rp, &rp_locks,
+ am_kill, NIL, NULL, 0);
+ if (rp_locks)
+ erts_proc_unlock(rp, rp_locks);
+ }
+}
+
+static void
+schedule_kill_dist_ctrl_proc(Eterm pid)
+{
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ int sched_id = 1;
+ if (!esdp || ERTS_SCHEDULER_IS_DIRTY(esdp))
+ sched_id = 1;
+ else
+ sched_id = (int) esdp->no;
+ erts_schedule_misc_aux_work(sched_id,
+ kill_dist_ctrl_proc,
+ (void *) (UWord) pid);
+}
+
/*
* proc is currently running or exiting process.
*/
@@ -451,58 +484,62 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */
DistEntry *tdep;
- int no_dist_port = 0;
+ int no_dist_ctrl = 0;
Eterm nd_reason = (reason == am_no_network
? am_no_network
: am_net_kernel_terminated);
- erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx);
+ erts_rwmtx_rlock(&erts_dist_table_rwmtx);
for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next)
- no_dist_port++;
+ no_dist_ctrl++;
for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next)
- no_dist_port++;
+ no_dist_ctrl++;
/* KILL all port controllers */
- if (no_dist_port == 0)
- erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx);
+ if (no_dist_ctrl == 0)
+ erts_rwmtx_runlock(&erts_dist_table_rwmtx);
else {
Eterm def_buf[128];
int i = 0;
- Eterm *dist_port;
+ Eterm *dist_ctrl;
- if (no_dist_port <= sizeof(def_buf)/sizeof(def_buf[0]))
- dist_port = &def_buf[0];
+ if (no_dist_ctrl <= sizeof(def_buf)/sizeof(def_buf[0]))
+ dist_ctrl = &def_buf[0];
else
- dist_port = erts_alloc(ERTS_ALC_T_TMP,
- sizeof(Eterm)*no_dist_port);
+ dist_ctrl = erts_alloc(ERTS_ALC_T_TMP,
+ sizeof(Eterm)*no_dist_ctrl);
for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) {
- ASSERT(is_internal_port(tdep->cid));
- dist_port[i++] = tdep->cid;
+ ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid));
+ dist_ctrl[i++] = tdep->cid;
}
for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) {
- ASSERT(is_internal_port(tdep->cid));
- dist_port[i++] = tdep->cid;
+ ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid));
+ dist_ctrl[i++] = tdep->cid;
}
- erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx);
-
- for (i = 0; i < no_dist_port; i++) {
- Port *prt = erts_port_lookup(dist_port[i],
- ERTS_PORT_SFLGS_INVALID_LOOKUP);
- if (!prt)
- continue;
- ASSERT(erts_atomic32_read_nob(&prt->state)
- & ERTS_PORT_SFLG_DISTRIBUTION);
-
- erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED,
- prt, dist_port[i], nd_reason, NULL);
+ erts_rwmtx_runlock(&erts_dist_table_rwmtx);
+
+ for (i = 0; i < no_dist_ctrl; i++) {
+ if (is_internal_pid(dist_ctrl[i]))
+ schedule_kill_dist_ctrl_proc(dist_ctrl[i]);
+ else {
+ Port *prt = erts_port_lookup(dist_ctrl[i],
+ ERTS_PORT_SFLGS_INVALID_LOOKUP);
+ if (prt) {
+ ASSERT(erts_atomic32_read_nob(&prt->state)
+ & ERTS_PORT_SFLG_DISTRIBUTION);
+
+ erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED,
+ prt, dist_ctrl[i], nd_reason, NULL);
+ }
+ }
}
- if (dist_port != &def_buf[0])
- erts_free(ERTS_ALC_T_TMP, dist_port);
+ if (dist_ctrl != &def_buf[0])
+ erts_free(ERTS_ALC_T_TMP, dist_ctrl);
}
/*
- * When last dist port exits, node will be taken
+ * When last dist ctrl exits, node will be taken
* from alive to not alive.
*/
ASSERT(is_nil(nodedown.reason) && !nodedown.bp);
@@ -519,52 +556,51 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
&nodedown.bp->off_heap);
}
}
- else { /* Call from distribution port */
+ else { /* Call from distribution controller (port/process) */
NetExitsContext nec = {dep};
ErtsLink *nlinks;
ErtsLink *node_links;
ErtsMonitor *monitors;
Uint32 flags;
- erts_smp_atomic_set_mb(&dep->dist_cmd_scheduled, 1);
- erts_smp_de_rwlock(dep);
+ erts_atomic_set_mb(&dep->dist_cmd_scheduled, 1);
+ erts_de_rwlock(dep);
- ERTS_SMP_LC_ASSERT(is_internal_port(dep->cid)
- && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid)));
+ if (is_internal_port(dep->cid)) {
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid)));
- if (erts_port_task_is_scheduled(&dep->dist_cmd))
- erts_port_task_abort(&dep->dist_cmd);
+ if (erts_port_task_is_scheduled(&dep->dist_cmd))
+ erts_port_task_abort(&dep->dist_cmd);
+ }
if (dep->status & ERTS_DE_SFLG_EXITING) {
#ifdef DEBUG
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT);
- erts_smp_mtx_unlock(&dep->qlock);
+ ASSERT(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT);
#endif
}
else {
dep->status |= ERTS_DE_SFLG_EXITING;
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
- dep->qflgs |= ERTS_DE_QFLG_EXIT;
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_lock(&dep->qlock);
+ ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
+ erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT);
+ erts_mtx_unlock(&dep->qlock);
}
- erts_smp_de_links_lock(dep);
+ erts_de_links_lock(dep);
monitors = dep->monitors;
nlinks = dep->nlinks;
node_links = dep->node_links;
dep->monitors = NULL;
dep->nlinks = NULL;
dep->node_links = NULL;
- erts_smp_de_links_unlock(dep);
+ erts_de_links_unlock(dep);
nodename = dep->sysname;
flags = dep->flags;
erts_set_dist_entry_not_connected(dep);
- erts_smp_de_rwunlock(dep);
+ erts_de_rwunlock(dep);
erts_sweep_monitors(monitors, &doit_monitor_net_exits, (void *) &nec);
erts_sweep_links(nlinks, &doit_link_net_exits, (void *) &nec);
@@ -598,8 +634,8 @@ void init_dist(void)
nodedown.reason = NIL;
nodedown.bp = NULL;
- erts_smp_atomic_init_nob(&no_nodes, 0);
- erts_smp_atomic_init_nob(&no_caches, 0);
+ erts_atomic_init_nob(&no_nodes, 0);
+ erts_atomic_init_nob(&no_caches, 0);
/* Lookup/Install all references to trap functions */
dsend2_trap = trap_function(am_dsend,2);
@@ -611,6 +647,9 @@ void init_dist(void)
dgroup_leader_trap = trap_function(am_dgroup_leader,2);
dexit_trap = trap_function(am_dexit, 2);
dmonitor_p_trap = trap_function(am_dmonitor_p, 2);
+ dist_ctrl_put_data_trap = erts_export_put(am_erts_internal,
+ am_dist_ctrl_put_data,
+ 2);
}
#define ErtsDistOutputBuf2Binary(OB) \
@@ -622,9 +661,6 @@ alloc_dist_obuf(Uint size)
ErtsDistOutputBuf *obuf;
Uint obuf_size = sizeof(ErtsDistOutputBuf)+sizeof(byte)*(size-1);
Binary *bin = erts_bin_drv_alloc(obuf_size);
- bin->flags = BIN_FLAG_DRV;
- erts_refc_init(&bin->refc, 1);
- bin->orig_size = (SWord) obuf_size;
obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[0];
#ifdef DEBUG
obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN;
@@ -638,8 +674,7 @@ free_dist_obuf(ErtsDistOutputBuf *obuf)
{
Binary *bin = ErtsDistOutputBuf2Binary(obuf);
ASSERT(obuf->dbg_pattern == ERTS_DIST_OUTPUT_BUF_DBG_PATTERN);
- if (erts_refc_dectest(&bin->refc, 0) == 0)
- erts_bin_free(bin);
+ erts_bin_release(bin);
}
static ERTS_INLINE Sint
@@ -656,19 +691,24 @@ static void clear_dist_entry(DistEntry *dep)
ErtsProcList *suspendees;
ErtsDistOutputBuf *obuf;
- erts_smp_de_rwlock(dep);
+ erts_de_rwlock(dep);
+ erts_atomic_set_nob(&dep->input_handler,
+ (erts_aint_t) NIL);
cache = dep->cache;
dep->cache = NULL;
#ifdef DEBUG
- erts_smp_de_links_lock(dep);
+ erts_de_links_lock(dep);
ASSERT(!dep->nlinks);
ASSERT(!dep->node_links);
ASSERT(!dep->monitors);
- erts_smp_de_links_unlock(dep);
+ erts_de_links_unlock(dep);
#endif
- erts_smp_mtx_lock(&dep->qlock);
+ erts_mtx_lock(&dep->qlock);
+
+ erts_atomic64_set_nob(&dep->in, 0);
+ erts_atomic64_set_nob(&dep->out, 0);
if (!dep->out_queue.last)
obuf = dep->finalized_out_queue.first;
@@ -677,17 +717,24 @@ static void clear_dist_entry(DistEntry *dep)
obuf = dep->out_queue.first;
}
+ if (dep->tmp_out_queue.first) {
+ dep->tmp_out_queue.last->next = obuf;
+ obuf = dep->tmp_out_queue.first;
+ }
+
dep->out_queue.first = NULL;
dep->out_queue.last = NULL;
+ dep->tmp_out_queue.first = NULL;
+ dep->tmp_out_queue.last = NULL;
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
dep->status = 0;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
- erts_smp_mtx_unlock(&dep->qlock);
- erts_smp_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
+ erts_mtx_unlock(&dep->qlock);
+ erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
dep->send = NULL;
- erts_smp_de_rwunlock(dep);
+ erts_de_rwunlock(dep);
erts_resume_processes(suspendees);
@@ -702,13 +749,57 @@ static void clear_dist_entry(DistEntry *dep)
}
if (obufsize) {
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_lock(&dep->qlock);
+ ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize);
+ erts_atomic_add_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
+ erts_mtx_unlock(&dep->qlock);
+ }
+}
+
+int erts_dsend_context_dtor(Binary* ctx_bin)
+{
+ ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin);
+ switch (ctx->dss.phase) {
+ case ERTS_DSIG_SEND_PHASE_MSG_SIZE:
+ DESTROY_SAVED_WSTACK(&ctx->dss.u.sc.wstack);
+ break;
+ case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
+ DESTROY_SAVED_WSTACK(&ctx->dss.u.ec.wstack);
+ break;
+ default:;
+ }
+ if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) {
+ free_dist_obuf(ctx->dss.obuf);
+ }
+ if (ctx->dep_to_deref)
+ erts_deref_dist_entry(ctx->dep_to_deref);
+
+ return 1;
+}
+
+Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx)
+{
+ struct exported_ctx {
+ ErtsSendContext ctx;
+ ErtsAtomCacheMap acm;
+ };
+ Binary* ctx_bin = erts_create_magic_binary(sizeof(struct exported_ctx),
+ erts_dsend_context_dtor);
+ struct exported_ctx* dst = ERTS_MAGIC_BIN_DATA(ctx_bin);
+ Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
+
+ sys_memcpy(&dst->ctx, ctx, sizeof(ErtsSendContext));
+ ASSERT(ctx->dss.ctl == make_tuple(ctx->ctl_heap));
+ dst->ctx.dss.ctl = make_tuple(dst->ctx.ctl_heap);
+ if (ctx->dss.acmp) {
+ sys_memcpy(&dst->acm, ctx->dss.acmp, sizeof(ErtsAtomCacheMap));
+ dst->ctx.dss.acmp = &dst->acm;
}
+ return erts_mk_magic_ref(&hp, &MSO(p), ctx_bin);
}
+
/*
* The erts_dsig_send_*() functions implemented below, sends asynchronous
* distributed signals to other Erlang nodes. Before sending a distributed
@@ -731,7 +822,7 @@ erts_dsig_send_link(ErtsDSigData *dsdp, Eterm local, Eterm remote)
int res;
UseTmpHeapNoproc(4);
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send_ctl(dsdp, ctl, 0);
UnUseTmpHeapNoproc(4);
return res;
}
@@ -744,13 +835,13 @@ erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote)
int res;
UseTmpHeapNoproc(4);
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send_ctl(dsdp, ctl, 0);
UnUseTmpHeapNoproc(4);
return res;
}
-/* A local process that's beeing monitored by a remote one exits. We send:
+/* A local process that's being monitored by a remote one exits. We send:
{DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason},
which is rather sad as only the ref is needed, no pid's... */
int
@@ -767,12 +858,12 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
watched, watcher, ref, reason);
#ifdef DEBUG
- erts_smp_de_links_lock(dsdp->dep);
+ erts_de_links_lock(dsdp->dep);
ASSERT(!erts_lookup_monitor(dsdp->dep->monitors, ref));
- erts_smp_de_links_unlock(dsdp->dep);
+ erts_de_links_unlock(dsdp->dep);
#endif
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ res = dsig_send_ctl(dsdp, ctl, 1);
UnUseTmpHeapNoproc(6);
return res;
}
@@ -793,7 +884,7 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
make_small(DOP_MONITOR_P),
watcher, watched, ref);
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send_ctl(dsdp, ctl, 0);
UnUseTmpHeapNoproc(5);
return res;
}
@@ -815,18 +906,17 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher,
make_small(DOP_DEMONITOR_P),
watcher, watched, ref);
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, force);
+ res = dsig_send_ctl(dsdp, ctl, force);
UnUseTmpHeapNoproc(5);
return res;
}
int
-erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message)
+erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
{
Eterm ctl;
- DeclareTmpHeapNoproc(ctl_heap,5);
Eterm token = NIL;
- Process *sender = dsdp->proc;
+ Process *sender = ctx->dsd.proc;
int res;
#ifdef USE_VM_PROBES
Sint tok_label = 0;
@@ -838,12 +928,7 @@ erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message)
DTRACE_CHARBUF(receiver_name, 64);
#endif
- UseTmpHeapNoproc(5);
- if (SEQ_TRACE_TOKEN(sender) != NIL
-#ifdef USE_VM_PROBES
- && SEQ_TRACE_TOKEN(sender) != am_have_dt_utag
-#endif
- ) {
+ if (have_seqtrace(SEQ_TRACE_TOKEN(sender))) {
seq_trace_update_send(sender);
token = SEQ_TRACE_TOKEN(sender);
seq_trace_output(token, message, SEQ_TRACE_SEND, remote, sender);
@@ -852,13 +937,13 @@ erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message)
*node_name = *sender_name = *receiver_name = '\0';
if (DTRACE_ENABLED(message_send) || DTRACE_ENABLED(message_send_remote)) {
erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)),
- "%T", dsdp->dep->sysname);
+ "%T", ctx->dsd.dep->sysname);
erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
"%T", sender->common.id);
erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)),
"%T", remote);
msize = size_object(message);
- if (token != NIL && token != am_have_dt_utag) {
+ if (have_seqtrace(token)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
@@ -866,27 +951,48 @@ erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message)
}
#endif
- if (token != NIL)
- ctl = TUPLE4(&ctl_heap[0],
- make_small(DOP_SEND_TT), am_Cookie, remote, token);
- else
- ctl = TUPLE3(&ctl_heap[0], make_small(DOP_SEND), am_Cookie, remote);
+ if (token != NIL) {
+ Eterm el1, el2;
+ if (ctx->dep->flags & DFLAG_SEND_SENDER) {
+ el1 = make_small(DOP_SEND_SENDER_TT);
+ el2 = sender->common.id;
+ }
+ else {
+ el1 = make_small(DOP_SEND_TT);
+ el2 = am_Empty;
+ }
+ ctl = TUPLE4(&ctx->ctl_heap[0], el1, el2, remote, token);
+ }
+ else {
+ Eterm el1, el2;
+ if (ctx->dep->flags & DFLAG_SEND_SENDER) {
+ el1 = make_small(DOP_SEND_SENDER);
+ el2 = sender->common.id;
+ }
+ else {
+ el1 = make_small(DOP_SEND);
+ el2 = am_Empty;
+ }
+ ctl = TUPLE3(&ctx->ctl_heap[0], el1, el2, remote);
+ }
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
DTRACE7(message_send_remote, sender_name, node_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
- res = dsig_send(dsdp, ctl, message, 0);
- UnUseTmpHeapNoproc(5);
+ ctx->dss.ctl = ctl;
+ ctx->dss.msg = message;
+ ctx->dss.force_busy = 0;
+ res = erts_dsig_send(&ctx->dsd, &ctx->dss);
return res;
}
int
-erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message)
+erts_dsig_send_reg_msg(Eterm remote_name, Eterm message,
+ ErtsSendContext* ctx)
{
Eterm ctl;
- DeclareTmpHeapNoproc(ctl_heap,6);
Eterm token = NIL;
- Process *sender = dsdp->proc;
+ Process *sender = ctx->dsd.proc;
int res;
#ifdef USE_VM_PROBES
Sint tok_label = 0;
@@ -898,12 +1004,7 @@ erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message)
DTRACE_CHARBUF(receiver_name, 128);
#endif
- UseTmpHeapNoproc(6);
- if (SEQ_TRACE_TOKEN(sender) != NIL
-#ifdef USE_VM_PROBES
- && SEQ_TRACE_TOKEN(sender) != am_have_dt_utag
-#endif
- ) {
+ if (have_seqtrace(SEQ_TRACE_TOKEN(sender))) {
seq_trace_update_send(sender);
token = SEQ_TRACE_TOKEN(sender);
seq_trace_output(token, message, SEQ_TRACE_SEND, remote_name, sender);
@@ -912,13 +1013,13 @@ erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message)
*node_name = *sender_name = *receiver_name = '\0';
if (DTRACE_ENABLED(message_send) || DTRACE_ENABLED(message_send_remote)) {
erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)),
- "%T", dsdp->dep->sysname);
+ "%T", ctx->dsd.dep->sysname);
erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
"%T", sender->common.id);
erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)),
"{%T,%s}", remote_name, node_name);
msize = size_object(message);
- if (token != NIL && token != am_have_dt_utag) {
+ if (have_seqtrace(token)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
@@ -927,17 +1028,19 @@ erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message)
#endif
if (token != NIL)
- ctl = TUPLE5(&ctl_heap[0], make_small(DOP_REG_SEND_TT),
- sender->common.id, am_Cookie, remote_name, token);
+ ctl = TUPLE5(&ctx->ctl_heap[0], make_small(DOP_REG_SEND_TT),
+ sender->common.id, am_Empty, remote_name, token);
else
- ctl = TUPLE4(&ctl_heap[0], make_small(DOP_REG_SEND),
- sender->common.id, am_Cookie, remote_name);
+ ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_REG_SEND),
+ sender->common.id, am_Empty, remote_name);
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
DTRACE7(message_send_remote, sender_name, node_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
- res = dsig_send(dsdp, ctl, message, 0);
- UnUseTmpHeapNoproc(6);
+ ctx->dss.ctl = ctl;
+ ctx->dss.msg = message;
+ ctx->dss.force_busy = 0;
+ res = erts_dsig_send(&ctx->dsd, &ctx->dss);
return res;
}
@@ -961,11 +1064,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
#endif
UseTmpHeapNoproc(6);
- if (token != NIL
-#ifdef USE_VM_PROBES
- && token != am_have_dt_utag
-#endif
- ) {
+ if (have_seqtrace(token)) {
seq_trace_update_send(dsdp->proc);
seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local);
ctl = TUPLE5(&ctl_heap[0],
@@ -984,7 +1083,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
"{%T,%s}", remote, node_name);
erts_snprintf(reason_str, sizeof(DTRACE_CHARBUF_NAME(reason_str)),
"%T", reason);
- if (token != NIL && token != am_have_dt_utag) {
+ if (have_seqtrace(token)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
@@ -994,7 +1093,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
DTRACE7(process_exit_signal_remote, sender_name, node_name,
remote_name, reason_str, tok_label, tok_lastcnt, tok_serial);
/* forced, i.e ignore busy */
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ res = dsig_send_ctl(dsdp, ctl, 1);
UnUseTmpHeapNoproc(6);
return res;
}
@@ -1010,7 +1109,7 @@ erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason)
ctl = TUPLE4(&ctl_heap[0],
make_small(DOP_EXIT), local, remote, reason);
/* forced, i.e ignore busy */
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ res = dsig_send_ctl(dsdp, ctl, 1);
UnUseTmpHeapNoproc(5);
return res;
}
@@ -1026,7 +1125,7 @@ erts_dsig_send_exit2(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason
ctl = TUPLE4(&ctl_heap[0],
make_small(DOP_EXIT2), local, remote, reason);
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send_ctl(dsdp, ctl, 0);
UnUseTmpHeapNoproc(5);
return res;
}
@@ -1043,7 +1142,7 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
ctl = TUPLE3(&ctl_heap[0],
make_small(DOP_GROUP_LEADER), leader, remote);
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send_ctl(dsdp, ctl, 0);
UnUseTmpHeapNoproc(4);
return res;
}
@@ -1091,7 +1190,6 @@ int erts_net_message(Port *prt,
byte *buf,
ErlDrvSizeT len)
{
-#define DIST_CTL_DEFAULT_SIZE 64
ErtsDistExternal ede;
byte *t;
Sint ctl_len;
@@ -1104,7 +1202,7 @@ int erts_net_message(Port *prt,
Process* rp;
DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
Eterm* ctl = ctl_default;
- ErlOffHeap off_heap;
+ ErtsHeapFactory factory;
Eterm* hp;
Sint type;
Eterm token;
@@ -1113,25 +1211,25 @@ int erts_net_message(Port *prt,
ErtsLink *lnk;
Uint tuple_arity;
int res;
+ Uint32 connection_id;
#ifdef ERTS_DIST_MSG_DBG
ErlDrvSizeT orig_len = len;
#endif
UseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
- /* Thanks to Luke Gorrie */
- off_heap.first = NULL;
- off_heap.overhead = 0;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_LC_ASSERT(!prt || erts_lc_is_port_locked(prt));
if (!erts_is_alive) {
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
return 0;
}
- if (hlen != 0)
- goto data_error;
+
+
+ ASSERT(hlen == 0);
+
if (len == 0) { /* HANDLE TICK !!! */
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
return 0;
@@ -1150,30 +1248,31 @@ int erts_net_message(Port *prt,
len--;
}
- if (len == 0) {
- PURIFY_MSG("data error");
- goto data_error;
- }
-
- res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache);
+ res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache, &connection_id);
- if (res >= 0)
- res = ctl_len = erts_decode_dist_ext_size(&ede);
- else {
+ switch (res) {
+ case ERTS_PREP_DIST_EXT_CLOSED:
+ return 0; /* Connection not alive; ignore signal... */
+ case ERTS_PREP_DIST_EXT_FAILED:
#ifdef ERTS_DIST_MSG_DBG
erts_fprintf(stderr, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n");
bw(buf, orig_len);
#endif
- ctl_len = 0;
- }
-
- if (res < 0) {
+ goto data_error;
+ case ERTS_PREP_DIST_EXT_SUCCESS:
+ ctl_len = erts_decode_dist_ext_size(&ede);
+ if (ctl_len < 0) {
#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n");
- bw(buf, orig_len);
+ erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n");
+ bw(buf, orig_len);
#endif
- PURIFY_MSG("data error");
- goto data_error;
+ PURIFY_MSG("data error");
+ goto data_error;
+ }
+ break;
+ default:
+ ERTS_INTERNAL_ERROR("Unexpected result from erts_prepare_dist_ext()");
+ break;
}
if (ctl_len > DIST_CTL_DEFAULT_SIZE) {
@@ -1181,14 +1280,15 @@ int erts_net_message(Port *prt,
}
hp = ctl;
- arg = erts_decode_dist_ext(&hp, &off_heap, &ede);
+ erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF);
+ arg = erts_decode_dist_ext(&factory, &ede);
if (is_non_value(arg)) {
#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, "DIST MSG DEBUG: erts_dist_ext_size(CTL) failed:\n");
+ erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext(CTL) failed:\n");
bw(buf, orig_len);
#endif
PURIFY_MSG("data error");
- goto data_error;
+ goto decode_error;
}
ctl_len = t - buf;
@@ -1203,6 +1303,7 @@ int erts_net_message(Port *prt,
}
token_size = 0;
+ token = NIL;
switch (type = unsigned_val(tuple[1])) {
case DOP_LINK:
@@ -1231,23 +1332,23 @@ int erts_net_message(Port *prt,
break;
}
- erts_smp_de_links_lock(dep);
+ erts_de_links_lock(dep);
res = erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, from);
if (res < 0) {
/* It was already there! Lets skip the rest... */
- erts_smp_de_links_unlock(dep);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ erts_de_links_unlock(dep);
+ erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
break;
}
lnk = erts_add_or_lookup_link(&(dep->nlinks), LINK_PID, rp->common.id);
erts_add_link(&(ERTS_LINK_ROOT(lnk)), LINK_PID, from);
- erts_smp_de_links_unlock(dep);
+ erts_de_links_unlock(dep);
if (IS_TRACED_FL(rp, F_TRACE_PROCS))
- trace_proc(NULL, rp, am_getting_linked, from);
+ trace_proc(NULL, 0, rp, am_getting_linked, from);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
break;
case DOP_UNLINK: {
@@ -1270,10 +1371,10 @@ int erts_net_message(Port *prt,
lnk = erts_remove_link(&ERTS_P_LINKS(rp), from);
if (IS_TRACED_FL(rp, F_TRACE_PROCS) && lnk != NULL) {
- trace_proc(NULL, rp, am_getting_unlinked, from);
+ trace_proc(NULL, 0, rp, am_getting_unlinked, from);
}
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
erts_remove_dist_link(&dld, to, from, dep);
erts_destroy_dist_link(&dld);
@@ -1325,11 +1426,11 @@ int erts_net_message(Port *prt,
else {
if (is_atom(watched))
watched = rp->common.id;
- erts_smp_de_links_lock(dep);
+ erts_de_links_lock(dep);
erts_add_monitor(&(dep->monitors), MON_ORIGIN, ref, watched, name);
erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, ref, watcher, name);
- erts_smp_de_links_unlock(dep);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ erts_de_links_unlock(dep);
+ erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
}
break;
@@ -1351,14 +1452,14 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- erts_smp_de_links_lock(dep);
+ erts_de_links_lock(dep);
mon = erts_remove_monitor(&(dep->monitors),ref);
- erts_smp_de_links_unlock(dep);
+ erts_de_links_unlock(dep);
/* ASSERT(mon != NULL); can happen in case of broken dist message */
if (mon == NULL) {
break;
}
- watched = mon->pid;
+ watched = mon->u.pid;
erts_destroy_monitor(mon);
rp = erts_pid2proc_opt(NULL, 0,
watched, ERTS_PROC_LOCK_LINK,
@@ -1367,7 +1468,7 @@ int erts_net_message(Port *prt,
break;
}
mon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
+ erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK);
ASSERT(mon != NULL);
if (mon == NULL) {
break;
@@ -1419,67 +1520,81 @@ int erts_net_message(Port *prt,
ErlOffHeap *ohp;
ASSERT(xsize);
heap_frag = erts_dist_ext_trailer(ede_copy);
- ERTS_INIT_HEAP_FRAG(heap_frag, token_size);
+ ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
hp = heap_frag->mem;
ohp = &heap_frag->off_heap;
token = tuple[5];
token = copy_struct(token, token_size, &hp, ohp);
}
- erts_queue_dist_message(rp, &locks, ede_copy, token);
+ erts_queue_dist_message(rp, locks, ede_copy, token, from);
if (locks)
- erts_smp_proc_unlock(rp, locks);
+ erts_proc_unlock(rp, locks);
}
break;
+ case DOP_SEND_SENDER_TT: {
+ Uint xsize;
case DOP_SEND_TT:
+
if (tuple_arity != 4) {
goto invalid_message;
}
-
- token_size = size_object(tuple[4]);
- /* Fall through ... */
+
+ token = tuple[4];
+ token_size = size_object(token);
+ xsize = ERTS_HEAP_FRAG_SIZE(token_size);
+ goto send_common;
+
+ case DOP_SEND_SENDER:
case DOP_SEND:
+
+ token = NIL;
+ xsize = 0;
+ if (tuple_arity != 3)
+ goto invalid_message;
+
+ send_common:
+
/*
- * There is intentionally no testing of the cookie (it is always '')
- * from R9B and onwards.
+ * If DOP_SEND_SENDER or DOP_SEND_SENDER_TT element 2 contains
+ * the sender pid (i.e. DFLAG_SEND_SENDER is set); otherwise,
+ * the atom '' (empty cookie).
*/
+ ASSERT((type == DOP_SEND_SENDER || type == DOP_SEND_SENDER_TT)
+ ? (is_pid(tuple[2]) && (dep->flags & DFLAG_SEND_SENDER))
+ : tuple[2] == am_Empty);
+
#ifdef ERTS_DIST_MSG_DBG
dist_msg_dbg(&ede, "MSG", buf, orig_len);
#endif
- if (type != DOP_SEND_TT && tuple_arity != 3) {
- goto invalid_message;
- }
to = tuple[3];
if (is_not_pid(to)) {
goto invalid_message;
}
rp = erts_proc_lookup(to);
if (rp) {
- Uint xsize = type == DOP_SEND ? 0 : ERTS_HEAP_FRAG_SIZE(token_size);
ErtsProcLocks locks = 0;
ErtsDistExternal *ede_copy;
ede_copy = erts_make_dist_ext_copy(&ede, xsize);
- if (type == DOP_SEND) {
- token = NIL;
- } else {
+ if (is_not_nil(token)) {
ErlHeapFragment *heap_frag;
ErlOffHeap *ohp;
ASSERT(xsize);
heap_frag = erts_dist_ext_trailer(ede_copy);
- ERTS_INIT_HEAP_FRAG(heap_frag, token_size);
+ ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
hp = heap_frag->mem;
ohp = &heap_frag->off_heap;
- token = tuple[4];
token = copy_struct(token, token_size, &hp, ohp);
}
- erts_queue_dist_message(rp, &locks, ede_copy, token);
+ erts_queue_dist_message(rp, locks, ede_copy, token, am_Empty);
if (locks)
- erts_smp_proc_unlock(rp, locks);
+ erts_proc_unlock(rp, locks);
}
break;
+ }
case DOP_MONITOR_P_EXIT: {
/* We are monitoring a process on the remote node which dies, we get
@@ -1503,7 +1618,7 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- erts_smp_de_links_lock(dep);
+ erts_de_links_lock(dep);
sysname = dep->sysname;
mon = erts_remove_monitor(&(dep->monitors), ref);
/*
@@ -1512,11 +1627,11 @@ int erts_net_message(Port *prt,
* removed info about monitor. In this case, do nothing
* and everything will be as it should.
*/
- erts_smp_de_links_unlock(dep);
+ erts_de_links_unlock(dep);
if (mon == NULL) {
break;
}
- rp = erts_pid2proc(NULL, 0, mon->pid, rp_locks);
+ rp = erts_pid2proc(NULL, 0, mon->u.pid, rp_locks);
erts_destroy_monitor(mon);
if (rp == NULL) {
@@ -1526,18 +1641,18 @@ int erts_net_message(Port *prt,
mon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref);
if (mon == NULL) {
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
break;
}
UseTmpHeapNoproc(3);
watched = (is_not_nil(mon->name)
? TUPLE2(&lhp[0], mon->name, sysname)
- : mon->pid);
+ : mon->u.pid);
erts_queue_monitor_message(rp, &rp_locks,
ref, am_process, watched, reason);
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
erts_destroy_monitor(mon);
UnUseTmpHeapNoproc(3);
break;
@@ -1598,10 +1713,14 @@ int erts_net_message(Port *prt,
ERTS_XSIG_FLG_IGN_KILL);
if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) {
/* We didn't exit the process and it is traced */
- trace_proc(NULL, rp, am_getting_unlinked, from);
+ if (rp_locks & ERTS_PROC_LOCKS_XSIG_SEND) {
+ erts_proc_unlock(rp, ERTS_PROC_LOCKS_XSIG_SEND);
+ rp_locks &= ~ERTS_PROC_LOCKS_XSIG_SEND;
+ }
+ trace_proc(NULL, 0, rp, am_getting_unlinked, from);
}
}
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
}
erts_remove_dist_link(&dld, to, from, dep);
if (lnk)
@@ -1643,7 +1762,7 @@ int erts_net_message(Port *prt,
token,
NULL,
0);
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
}
break;
}
@@ -1661,19 +1780,19 @@ int erts_net_message(Port *prt,
if (!rp)
break;
rp->group_leader = STORE_NC_IN_PROC(rp, from);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);
+ erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);
break;
default:
goto invalid_message;
}
- erts_cleanup_offheap(&off_heap);
+ erts_factory_close(&factory);
if (ctl != ctl_default) {
erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
}
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_CHK_NO_PROC_LOCKS;
return 0;
invalid_message:
{
@@ -1681,220 +1800,315 @@ int erts_net_message(Port *prt,
erts_dsprintf(dsbufp, "Invalid distribution message: %.200T", arg);
erts_send_error_to_logger_nogl(dsbufp);
}
- data_error:
+decode_error:
PURIFY_MSG("data error");
- erts_cleanup_offheap(&off_heap);
+ erts_factory_close(&factory);
if (ctl != ctl_default) {
erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
}
+data_error:
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
- erts_deliver_port_exit(prt, dep->cid, am_killed, 0);
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ erts_kill_dist_connection(dep, connection_id);
+ ERTS_CHK_NO_PROC_LOCKS;
return -1;
}
-static int
-dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
+static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy)
{
- Eterm cid;
- int suspended = 0;
- int resume = 0;
- Uint32 pass_through_size;
- Uint data_size, dhdr_ext_size;
- ErtsAtomCacheMap *acmp;
- ErtsDistOutputBuf *obuf;
- DistEntry *dep = dsdp->dep;
- Uint32 flags = dep->flags;
- Process *c_p = dsdp->proc;
-
- if (!c_p || dsdp->no_suspend)
- force_busy = 1;
+ struct erts_dsig_send_context ctx;
+ int ret;
+ ctx.ctl = ctl;
+ ctx.msg = THE_NON_VALUE;
+ ctx.force_busy = force_busy;
+ ctx.phase = ERTS_DSIG_SEND_PHASE_INIT;
+#ifdef DEBUG
+ ctx.reds = 1; /* provoke assert below (no reduction count without msg) */
+#endif
+ ret = erts_dsig_send(dsdp, &ctx);
+ ASSERT(ret != ERTS_DSIG_SEND_CONTINUE);
+ return ret;
+}
- ERTS_SMP_LC_ASSERT(!c_p
- || (ERTS_PROC_LOCK_MAIN
- == erts_proc_lc_my_proc_locks(c_p)));
+static ERTS_INLINE void
+notify_dist_data(Process *c_p, Eterm pid)
+{
+ Process *rp;
+ ErtsProcLocks rp_locks;
- if (!erts_is_alive)
- return ERTS_DSIG_SEND_OK;
+ ASSERT(erts_get_scheduler_data()
+ && !ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data()));
+ ASSERT(is_internal_pid(pid));
- if (flags & DFLAG_DIST_HDR_ATOM_CACHE) {
- acmp = erts_get_atom_cache_map(c_p);
- pass_through_size = 0;
+ if (c_p && c_p->common.id == pid) {
+ rp = c_p;
+ rp_locks = ERTS_PROC_LOCK_MAIN;
}
else {
- acmp = NULL;
- pass_through_size = 1;
+ rp = erts_proc_lookup(pid);
+ rp_locks = 0;
}
-#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, ">>%s CTL: %T\n", pass_through_size ? "P" : " ", ctl);
- if (is_value(msg))
- erts_fprintf(stderr, " MSG: %T\n", msg);
-#endif
+ if (rp) {
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ erts_queue_message(rp, rp_locks, mp, am_dist_data, am_system);
+ }
+}
- data_size = pass_through_size;
- erts_reset_atom_cache_map(acmp);
- data_size += erts_encode_dist_ext_size(ctl, flags, acmp);
- if (is_value(msg))
- data_size += erts_encode_dist_ext_size(msg, flags, acmp);
- erts_finalize_atom_cache_map(acmp, flags);
+int
+erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
+{
+ int retval;
+ Sint initial_reds = ctx->reds;
+ Eterm cid;
- dhdr_ext_size = erts_encode_ext_dist_header_size(acmp);
- data_size += dhdr_ext_size;
+ while (1) {
+ switch (ctx->phase) {
+ case ERTS_DSIG_SEND_PHASE_INIT:
+ ctx->flags = dsdp->dep->flags;
+ ctx->c_p = dsdp->proc;
- obuf = alloc_dist_obuf(data_size);
- obuf->ext_endp = &obuf->data[0] + pass_through_size + dhdr_ext_size;
+ if (!ctx->c_p || dsdp->no_suspend)
+ ctx->force_busy = 1;
- /* Encode internal version of dist header */
- obuf->extp = erts_encode_ext_dist_header_setup(obuf->ext_endp, acmp);
- /* Encode control message */
- erts_encode_dist_ext(ctl, &obuf->ext_endp, flags, acmp);
- if (is_value(msg)) {
- /* Encode message */
- erts_encode_dist_ext(msg, &obuf->ext_endp, flags, acmp);
- }
+ ERTS_LC_ASSERT(!ctx->c_p
+ || (ERTS_PROC_LOCK_MAIN
+ == erts_proc_lc_my_proc_locks(ctx->c_p)));
- ASSERT(obuf->extp < obuf->ext_endp);
- ASSERT(&obuf->data[0] <= obuf->extp - pass_through_size);
- ASSERT(obuf->ext_endp <= &obuf->data[0] + data_size);
+ if (!erts_is_alive)
+ return ERTS_DSIG_SEND_OK;
- data_size = obuf->ext_endp - obuf->extp;
+ if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) {
+ ctx->acmp = erts_get_atom_cache_map(ctx->c_p);
+ ctx->pass_through_size = 0;
+ }
+ else {
+ ctx->acmp = NULL;
+ ctx->pass_through_size = 1;
+ }
- /*
- * Signal encoded; now verify that the connection still exists,
- * and if so enqueue the signal and schedule it for send.
- */
- obuf->next = NULL;
- erts_smp_de_rlock(dep);
- cid = dep->cid;
- if (cid != dsdp->cid
- || dep->connection_id != dsdp->connection_id
- || dep->status & ERTS_DE_SFLG_EXITING) {
- /* Not the same connection as when we started; drop message... */
- erts_smp_de_runlock(dep);
- free_dist_obuf(obuf);
- }
- else {
- ErtsProcList *plp = NULL;
- erts_smp_mtx_lock(&dep->qlock);
- dep->qsize += size_obuf(obuf);
- if (dep->qsize >= erts_dist_buf_busy_limit)
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- if (!force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) {
- erts_smp_mtx_unlock(&dep->qlock);
-
- plp = erts_proclist_create(c_p);
- erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
- suspended = 1;
- erts_smp_mtx_lock(&dep->qlock);
- }
-
- /* Enqueue obuf on dist entry */
- if (dep->out_queue.last)
- dep->out_queue.last->next = obuf;
- else
- dep->out_queue.first = obuf;
- dep->out_queue.last = obuf;
+ #ifdef ERTS_DIST_MSG_DBG
+ erts_fprintf(stderr, ">>%s CTL: %T\n", ctx->pass_through_size ? "P" : " ", ctx->ctl);
+ if (is_value(ctx->msg))
+ erts_fprintf(stderr, " MSG: %T\n", ctx->msg);
+ #endif
+
+ ctx->data_size = ctx->pass_through_size;
+ erts_reset_atom_cache_map(ctx->acmp);
+ erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size);
+
+ if (is_value(ctx->msg)) {
+ ctx->u.sc.wstack.wstart = NULL;
+ ctx->u.sc.flags = ctx->flags;
+ ctx->u.sc.level = 0;
+ ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE;
+ } else {
+ ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
+ }
+ break;
- if (!force_busy) {
- if (!(dep->qflgs & ERTS_DE_QFLG_BUSY)) {
- if (suspended)
- resume = 1; /* was busy when we started, but isn't now */
-#ifdef USE_VM_PROBES
- if (resume && DTRACE_ENABLED(dist_port_not_busy)) {
- DTRACE_CHARBUF(port_str, 64);
- DTRACE_CHARBUF(remote_str, 64);
-
- erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
- "%T", cid);
- erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
- "%T", dep->sysname);
- DTRACE3(dist_port_not_busy, erts_this_node_sysname,
- port_str, remote_str);
- }
-#endif
+ case ERTS_DSIG_SEND_PHASE_MSG_SIZE:
+ if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) {
+ retval = ERTS_DSIG_SEND_CONTINUE;
+ goto done;
}
- else {
- /* Enqueue suspended process on dist entry */
- ASSERT(plp);
- erts_proclist_store_last(&dep->suspended, plp);
+
+ ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
+ case ERTS_DSIG_SEND_PHASE_ALLOC:
+ erts_finalize_atom_cache_map(ctx->acmp, ctx->flags);
+
+ ctx->dhdr_ext_size = erts_encode_ext_dist_header_size(ctx->acmp);
+ ctx->data_size += ctx->dhdr_ext_size;
+
+ ctx->obuf = alloc_dist_obuf(ctx->data_size);
+ ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->pass_through_size + ctx->dhdr_ext_size;
+
+ /* Encode internal version of dist header */
+ ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp);
+ /* Encode control message */
+ erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL);
+ if (is_value(ctx->msg)) {
+ ctx->u.ec.flags = ctx->flags;
+ ctx->u.ec.level = 0;
+ ctx->u.ec.wstack.wstart = NULL;
+ ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE;
+ } else {
+ ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
}
- }
+ break;
+
+ case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
+ if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) {
+ retval = ERTS_DSIG_SEND_CONTINUE;
+ goto done;
+ }
+
+ ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
+ case ERTS_DSIG_SEND_PHASE_FIN: {
+ DistEntry *dep = dsdp->dep;
+ int suspended = 0;
+ int resume = 0;
+
+ ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp);
+ ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->pass_through_size);
+ ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size);
+
+ ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp;
- erts_smp_mtx_unlock(&dep->qlock);
- erts_schedule_dist_command(NULL, dep);
- erts_smp_de_runlock(dep);
-
- if (resume) {
- erts_resume(c_p, ERTS_PROC_LOCK_MAIN);
- erts_proclist_destroy(plp);
/*
- * Note that the calling process still have to yield as if it
- * suspended. If not, the calling process could later be
- * erroneously scheduled when it shouldn't be.
+ * Signal encoded; now verify that the connection still exists,
+ * and if so enqueue the signal and schedule it for send.
*/
- }
- }
+ ctx->obuf->next = NULL;
+ erts_de_rlock(dep);
+ cid = dep->cid;
+ if (cid != dsdp->cid
+ || dep->connection_id != dsdp->connection_id
+ || dep->status & ERTS_DE_SFLG_EXITING) {
+ /* Not the same connection as when we started; drop message... */
+ erts_de_runlock(dep);
+ free_dist_obuf(ctx->obuf);
+ }
+ else {
+ Sint qsize;
+ erts_aint32_t qflgs;
+ ErtsProcList *plp = NULL;
+ Eterm notify_proc = NIL;
+ Sint obsz = size_obuf(ctx->obuf);
+
+ erts_mtx_lock(&dep->qlock);
+ qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) obsz);
+ ASSERT(qsize >= obsz);
+ qflgs = erts_atomic32_read_nob(&dep->qflgs);
+ if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) {
+ erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY);
+ qflgs |= ERTS_DE_QFLG_BUSY;
+ }
+ if (qsize == obsz && (qflgs & ERTS_DE_QFLG_REQ_INFO)) {
+ /* Previously empty queue and info requested... */
+ qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
+ ~ERTS_DE_QFLG_REQ_INFO);
+ if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
+ notify_proc = dep->cid;
+ ASSERT(is_internal_pid(notify_proc));
+ }
+ /* else: requester will send itself the message... */
+ qflgs &= ~ERTS_DE_QFLG_REQ_INFO;
+ }
+ if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) {
+ erts_mtx_unlock(&dep->qlock);
- if (c_p) {
- int reds;
- /*
- * Bump reductions on calling process.
- *
- * This is the reduction cost: Always a base cost of 8 reductions
- * plus 16 reductions per kilobyte generated external data.
- */
+ plp = erts_proclist_create(ctx->c_p);
+ erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL);
+ suspended = 1;
+ erts_mtx_lock(&dep->qlock);
+ }
- data_size >>= (10-4);
-#if defined(ARCH_64) && !HALFWORD_HEAP
- data_size &= 0x003fffffffffffff;
-#elif defined(ARCH_32) || HALFWORD_HEAP
- data_size &= 0x003fffff;
-#else
-# error "Ohh come on ... !?!"
-#endif
- reds = 8 + ((int) data_size > 1000000 ? 1000000 : (int) data_size);
- BUMP_REDS(c_p, reds);
+ /* Enqueue obuf on dist entry */
+ if (dep->out_queue.last)
+ dep->out_queue.last->next = ctx->obuf;
+ else
+ dep->out_queue.first = ctx->obuf;
+ dep->out_queue.last = ctx->obuf;
+
+ if (!ctx->force_busy) {
+ qflgs = erts_atomic32_read_nob(&dep->qflgs);
+ if (!(qflgs & ERTS_DE_QFLG_BUSY)) {
+ if (suspended)
+ resume = 1; /* was busy when we started, but isn't now */
+ #ifdef USE_VM_PROBES
+ if (resume && DTRACE_ENABLED(dist_port_not_busy)) {
+ DTRACE_CHARBUF(port_str, 64);
+ DTRACE_CHARBUF(remote_str, 64);
+
+ erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
+ "%T", cid);
+ erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
+ "%T", dep->sysname);
+ DTRACE3(dist_port_not_busy, erts_this_node_sysname,
+ port_str, remote_str);
+ }
+ #endif
+ }
+ else {
+ /* Enqueue suspended process on dist entry */
+ ASSERT(plp);
+ erts_proclist_store_last(&dep->suspended, plp);
+ }
+ }
+
+ erts_mtx_unlock(&dep->qlock);
+ if (is_internal_port(dep->cid))
+ erts_schedule_dist_command(NULL, dep);
+ erts_de_runlock(dep);
+ if (is_internal_pid(notify_proc))
+ notify_dist_data(ctx->c_p, notify_proc);
+
+ if (resume) {
+ erts_resume(ctx->c_p, ERTS_PROC_LOCK_MAIN);
+ erts_proclist_destroy(plp);
+ /*
+ * Note that the calling process still have to yield as if it
+ * suspended. If not, the calling process could later be
+ * erroneously scheduled when it shouldn't be.
+ */
+ }
+ }
+ ctx->obuf = NULL;
+
+ if (suspended) {
+ #ifdef USE_VM_PROBES
+ if (!resume && DTRACE_ENABLED(dist_port_busy)) {
+ DTRACE_CHARBUF(port_str, 64);
+ DTRACE_CHARBUF(remote_str, 64);
+ DTRACE_CHARBUF(pid_str, 16);
+
+ erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", cid);
+ erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
+ "%T", dep->sysname);
+ erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)),
+ "%T", ctx->c_p->common.id);
+ DTRACE4(dist_port_busy, erts_this_node_sysname,
+ port_str, remote_str, pid_str);
+ }
+ #endif
+ if (!resume && erts_system_monitor_flags.busy_dist_port)
+ monitor_generic(ctx->c_p, am_busy_dist_port, cid);
+ retval = ERTS_DSIG_SEND_YIELD;
+ } else {
+ retval = ERTS_DSIG_SEND_OK;
+ }
+ goto done;
+ }
+ default:
+ erts_exit(ERTS_ABORT_EXIT, "dsig_send invalid phase (%d)\n", (int)ctx->phase);
+ }
}
- if (suspended) {
-#ifdef USE_VM_PROBES
- if (!resume && DTRACE_ENABLED(dist_port_busy)) {
- DTRACE_CHARBUF(port_str, 64);
- DTRACE_CHARBUF(remote_str, 64);
- DTRACE_CHARBUF(pid_str, 16);
-
- erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", cid);
- erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
- "%T", dep->sysname);
- erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)),
- "%T", c_p->common.id);
- DTRACE4(dist_port_busy, erts_this_node_sysname,
- port_str, remote_str, pid_str);
- }
-#endif
- if (!resume && erts_system_monitor_flags.busy_dist_port)
- monitor_generic(c_p, am_busy_dist_port, cid);
- return ERTS_DSIG_SEND_YIELD;
+done:
+ if (ctx->msg && ctx->c_p) {
+ BUMP_REDS(ctx->c_p, (initial_reds - ctx->reds) / TERM_TO_BINARY_LOOP_FACTOR);
}
- return ERTS_DSIG_SEND_OK;
+ return retval;
}
-
static Uint
dist_port_command(Port *prt, ErtsDistOutputBuf *obuf)
{
int fpe_was_unmasked;
- Uint size = obuf->ext_endp - obuf->extp;
+ ErlDrvSizeT size;
+ char *bufp;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
+ ERTS_CHK_NO_PROC_LOCKS;
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
- if (size > (Uint) INT_MAX)
- erl_exit(ERTS_ABORT_EXIT,
- "Absurdly large distribution output data buffer "
- "(%beu bytes) passed.\n",
- size);
+ if (!obuf) {
+ size = 0;
+ bufp = NULL;
+ }
+ else {
+ size = obuf->ext_endp - obuf->extp;
+ bufp = (char*) obuf->extp;
+ }
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(dist_output)) {
@@ -1909,11 +2123,10 @@ dist_port_command(Port *prt, ErtsDistOutputBuf *obuf)
remote_str, size);
}
#endif
+
prt->caller = NIL;
fpe_was_unmasked = erts_block_fpe();
- (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data,
- (char*) obuf->extp,
- (int) size);
+ (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, bufp, size);
erts_unblock_fpe(fpe_was_unmasked);
return size;
}
@@ -1922,33 +2135,41 @@ static Uint
dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
{
int fpe_was_unmasked;
- Uint size = obuf->ext_endp - obuf->extp;
+ ErlDrvSizeT size;
SysIOVec iov[2];
ErlDrvBinary* bv[2];
ErlIOVec eiov;
- ERTS_SMP_CHK_NO_PROC_LOCKS;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
-
- if (size > (Uint) INT_MAX)
- erl_exit(ERTS_ABORT_EXIT,
- "Absurdly large distribution output data buffer "
- "(%beu bytes) passed.\n",
- size);
+ ERTS_CHK_NO_PROC_LOCKS;
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
iov[0].iov_base = NULL;
iov[0].iov_len = 0;
bv[0] = NULL;
- iov[1].iov_base = obuf->extp;
- iov[1].iov_len = size;
- bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf));
+ if (!obuf) {
+ size = 0;
+ eiov.vsize = 1;
+ }
+ else {
+ size = obuf->ext_endp - obuf->extp;
+ eiov.vsize = 2;
+
+ iov[1].iov_base = obuf->extp;
+ iov[1].iov_len = size;
+ bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf));
+ }
- eiov.vsize = 2;
eiov.size = size;
eiov.iov = iov;
eiov.binv = bv;
+ if (size > (Uint) INT_MAX)
+ erts_exit(ERTS_DUMP_EXIT,
+ "Absurdly large distribution output data buffer "
+ "(%beu bytes) passed.\n",
+ size);
+
ASSERT(prt->drv_ptr->outputv);
#ifdef USE_VM_PROBES
@@ -1973,9 +2194,9 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
}
-#if defined(ARCH_64) && !HALFWORD_HEAP
+#if defined(ARCH_64)
#define ERTS_PORT_REDS_MASK__ 0x003fffffffffffffL
-#elif defined(ARCH_32) || HALFWORD_HEAP
+#elif defined(ARCH_32)
#define ERTS_PORT_REDS_MASK__ 0x003fffff
#else
# error "Ohh come on ... !?!"
@@ -1996,28 +2217,25 @@ erts_dist_command(Port *prt, int reds_limit)
Sint reds = ERTS_PORT_REDS_DIST_CMD_START;
Uint32 status;
Uint32 flags;
- Sint obufsize = 0;
+ Sint qsize, obufsize = 0;
ErtsDistOutputQueue oq, foq;
DistEntry *dep = prt->dist_entry;
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
erts_aint32_t sched_flags;
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
-
- erts_refc_inc(&dep->refc, 1); /* Otherwise dist_entry might be
- removed if port command fails */
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
- erts_smp_atomic_set_mb(&dep->dist_cmd_scheduled, 0);
+ erts_atomic_set_mb(&dep->dist_cmd_scheduled, 0);
- erts_smp_de_rlock(dep);
+ erts_de_rlock(dep);
flags = dep->flags;
status = dep->status;
send = dep->send;
- erts_smp_de_runlock(dep);
+ erts_de_runlock(dep);
if (status & ERTS_DE_SFLG_EXITING) {
- erts_deliver_port_exit(prt, prt->common.id, am_killed, 0);
- erts_deref_dist_entry(dep);
+ erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1);
return reds + ERTS_PORT_REDS_DIST_CMD_EXIT;
}
@@ -2031,19 +2249,19 @@ erts_dist_command(Port *prt, int reds_limit)
* a mess.
*/
- erts_smp_mtx_lock(&dep->qlock);
+ erts_mtx_lock(&dep->qlock);
oq.first = dep->out_queue.first;
oq.last = dep->out_queue.last;
dep->out_queue.first = NULL;
dep->out_queue.last = NULL;
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_unlock(&dep->qlock);
foq.first = dep->finalized_out_queue.first;
foq.last = dep->finalized_out_queue.last;
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
- sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
if (reds > reds_limit)
goto preempted;
@@ -2051,21 +2269,21 @@ erts_dist_command(Port *prt, int reds_limit)
if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) {
int preempt = 0;
do {
- Uint size;
- ErtsDistOutputBuf *fob;
-
- size = (*send)(prt, foq.first);
+ Uint size;
+ ErtsDistOutputBuf *fob;
+ size = (*send)(prt, foq.first);
+ erts_atomic64_inc_nob(&dep->out);
+ esdp->io.out += (Uint64) size;
#ifdef ERTS_RAW_DIST_MSG_DBG
- erts_fprintf(stderr, ">> ");
- bw(foq.first->extp, size);
+ erts_fprintf(stderr, ">> ");
+ bw(foq.first->extp, size);
#endif
- reds += ERTS_PORT_REDS_DIST_CMD_DATA(size);
- erts_smp_atomic_add_nob(&erts_bytes_out, size);
- fob = foq.first;
- obufsize += size_obuf(fob);
- foq.first = foq.first->next;
- free_dist_obuf(fob);
- sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ reds += ERTS_PORT_REDS_DIST_CMD_DATA(size);
+ fob = foq.first;
+ obufsize += size_obuf(fob);
+ foq.first = foq.first->next;
+ free_dist_obuf(fob);
+ sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT);
if (sched_flags & ERTS_PTS_FLG_BUSY_PORT)
break;
@@ -2125,32 +2343,34 @@ erts_dist_command(Port *prt, int reds_limit)
}
}
else {
+ int de_busy;
int preempt = 0;
while (oq.first && !preempt) {
- ErtsDistOutputBuf *fob;
- Uint size;
- oq.first->extp
- = erts_encode_ext_dist_header_finalize(oq.first->extp,
- dep->cache,
- flags);
- reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
- if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
- *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through'
- needed */
- ASSERT(&oq.first->data[0] <= oq.first->extp
- && oq.first->extp < oq.first->ext_endp);
- size = (*send)(prt, oq.first);
+ ErtsDistOutputBuf *fob;
+ Uint size;
+ oq.first->extp
+ = erts_encode_ext_dist_header_finalize(oq.first->extp,
+ dep->cache,
+ flags);
+ reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
+ if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
+ *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through'
+ needed */
+ ASSERT(&oq.first->data[0] <= oq.first->extp
+ && oq.first->extp < oq.first->ext_endp);
+ size = (*send)(prt, oq.first);
+ erts_atomic64_inc_nob(&dep->out);
+ esdp->io.out += (Uint64) size;
#ifdef ERTS_RAW_DIST_MSG_DBG
- erts_fprintf(stderr, ">> ");
- bw(oq.first->extp, size);
+ erts_fprintf(stderr, ">> ");
+ bw(oq.first->extp, size);
#endif
- reds += ERTS_PORT_REDS_DIST_CMD_DATA(size);
- erts_smp_atomic_add_nob(&erts_bytes_out, size);
- fob = oq.first;
- obufsize += size_obuf(fob);
- oq.first = oq.first->next;
- free_dist_obuf(fob);
- sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ reds += ERTS_PORT_REDS_DIST_CMD_DATA(size);
+ fob = oq.first;
+ obufsize += size_obuf(fob);
+ oq.first = oq.first->next;
+ free_dist_obuf(fob);
+ sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT);
if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt)
goto finalize_only;
@@ -2177,23 +2397,24 @@ erts_dist_command(Port *prt, int reds_limit)
* dist entry in a non-busy state and resume suspended
* processes.
*/
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
+ erts_mtx_lock(&dep->qlock);
+ de_busy = !!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_BUSY);
+ qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
+ ASSERT(qsize >= 0);
obufsize = 0;
if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)
- && (dep->qflgs & ERTS_DE_QFLG_BUSY)
- && dep->qsize < erts_dist_buf_busy_limit) {
+ && de_busy && qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
int resumed;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_unlock(&dep->qlock);
resumed = erts_resume_processes(suspendees);
reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
}
else
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_unlock(&dep->qlock);
}
ASSERT(!oq.first && !oq.last);
@@ -2202,10 +2423,15 @@ erts_dist_command(Port *prt, int reds_limit)
if (obufsize != 0) {
ASSERT(obufsize > 0);
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_lock(&dep->qlock);
+#ifdef DEBUG
+ qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
+ ASSERT(qsize >= 0);
+#else
+ erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize);
+#endif
+ erts_mtx_unlock(&dep->qlock);
}
ASSERT(foq.first || !foq.last);
@@ -2222,8 +2448,6 @@ erts_dist_command(Port *prt, int reds_limit)
if (reds > INT_MAX/2)
reds = INT_MAX/2;
- erts_deref_dist_entry(dep);
-
return reds;
preempted:
@@ -2259,9 +2483,9 @@ erts_dist_command(Port *prt, int reds_limit)
foq.last = NULL;
#ifdef DEBUG
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize == obufsize);
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_lock(&dep->qlock);
+ ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize);
+ erts_mtx_unlock(&dep->qlock);
#endif
}
else {
@@ -2270,14 +2494,14 @@ erts_dist_command(Port *prt, int reds_limit)
* Unhandle buffers need to be put back first
* in out_queue.
*/
- erts_smp_mtx_lock(&dep->qlock);
- dep->qsize -= obufsize;
+ erts_mtx_lock(&dep->qlock);
+ erts_atomic_add_nob(&dep->qsize, -obufsize);
obufsize = 0;
oq.last->next = dep->out_queue.first;
dep->out_queue.first = oq.first;
if (!dep->out_queue.last)
dep->out_queue.last = oq.last;
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_unlock(&dep->qlock);
}
erts_schedule_dist_command(prt, NULL);
@@ -2285,6 +2509,370 @@ erts_dist_command(Port *prt, int reds_limit)
goto done;
}
+#if 0
+
+int
+dist_data_finalize(Process *c_p, int reds_limit)
+{
+ int reds = 5;
+ DistEntry *dep = ;
+ ErtsDistOutputQueue oq, foq;
+ ErtsDistOutputBuf *ob;
+ int preempt;
+
+
+ erts_mtx_lock(&dep->qlock);
+ flags = dep->flags;
+ oq.first = dep->out_queue.first;
+ oq.last = dep->out_queue.last;
+ dep->out_queue.first = NULL;
+ dep->out_queue.last = NULL;
+ erts_mtx_unlock(&dep->qlock);
+
+ if (!oq.first) {
+ ASSERT(!oq.last);
+ oq.first = dep->tmp_out_queue.first;
+ oq.last = dep->tmp_out_queue.last;
+ }
+ else {
+ ErtsDistOutputBuf *f, *l;
+ ASSERT(oq.last);
+ if (dep->tmp_out_queue.last) {
+ dep->tmp_out_queue.last->next = oq.first;
+ oq.first = dep->tmp_out_queue.first;
+ }
+ }
+
+ if (!oq.first) {
+ /* Nothing to do... */
+ ASSERT(!oq.last);
+ return reds;
+ }
+
+ foq.first = dep->finalized_out_queue.first;
+ foq.last = dep->finalized_out_queue.last;
+
+ preempt = 0;
+ ob = oq.first;
+ ASSERT(ob);
+
+ do {
+ ob->extp = erts_encode_ext_dist_header_finalize(ob->extp,
+ dep->cache,
+ flags);
+ if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
+ *--ob->extp = PASS_THROUGH; /* Old node; 'pass through'
+ needed */
+ ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp);
+ reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
+ preempt = reds > reds_limit;
+ if (preempt)
+ break;
+ ob = ob->next;
+ } while (ob);
+ /*
+ * At least one buffer was finalized; if we got preempted,
+ * ob points to the last buffer that we finalized.
+ */
+ if (foq.last)
+ foq.last->next = oq.first;
+ else
+ foq.first = oq.first;
+ if (!preempt) {
+ /* All buffers finalized */
+ foq.last = oq.last;
+ oq.first = oq.last = NULL;
+ }
+ else {
+ /* Not all buffers finalized; split oq. */
+ foq.last = ob;
+ oq.first = ob->next;
+ if (oq.first)
+ ob->next = NULL;
+ else
+ oq.last = NULL;
+ }
+
+ dep->finalized_out_queue.first = foq.first;
+ dep->finalized_out_queue.last = foq.last;
+ dep->tmp_out_queue.first = oq.first;
+ dep->tmp_out_queue.last = oq.last;
+
+ return reds;
+}
+
+#endif
+
+BIF_RETTYPE
+dist_ctrl_get_data_notification_1(BIF_ALIST_1)
+{
+ DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
+ erts_aint32_t qflgs;
+ erts_aint_t qsize;
+ Eterm receiver = NIL;
+
+ if (!dep)
+ BIF_ERROR(BIF_P, EXC_NOTSUP);
+
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ /*
+ * Caller is the only one that can consume from this queue
+ * and the only one that can set the req-info flag...
+ */
+
+ erts_de_rlock(dep);
+
+ ASSERT(dep->cid == BIF_P->common.id);
+
+ qflgs = erts_atomic32_read_acqb(&dep->qflgs);
+
+ if (!(qflgs & ERTS_DE_QFLG_REQ_INFO)) {
+ qsize = erts_atomic_read_acqb(&dep->qsize);
+ ASSERT(qsize >= 0);
+ if (qsize > 0)
+ receiver = BIF_P->common.id; /* Notify ourselves... */
+ else { /* Empty queue; set req-info flag... */
+ qflgs = erts_atomic32_read_bor_mb(&dep->qflgs,
+ ERTS_DE_QFLG_REQ_INFO);
+ qsize = erts_atomic_read_acqb(&dep->qsize);
+ ASSERT(qsize >= 0);
+ if (qsize > 0) {
+ qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
+ ~ERTS_DE_QFLG_REQ_INFO);
+ if (qflgs & ERTS_DE_QFLG_REQ_INFO)
+ receiver = BIF_P->common.id; /* Notify ourselves... */
+ /* else: someone else will notify us... */
+ }
+ /* else: still empty queue... */
+ }
+ }
+ /* else: Already requested... */
+
+ erts_de_runlock(dep);
+
+ if (is_internal_pid(receiver))
+ notify_dist_data(BIF_P, receiver);
+
+ BIF_RET(am_ok);
+}
+
+BIF_RETTYPE
+dist_ctrl_put_data_2(BIF_ALIST_2)
+{
+ DistEntry *dep;
+ ErlDrvSizeT size;
+ Eterm input_handler;
+
+ if (is_binary(BIF_ARG_2))
+ size = binary_size(BIF_ARG_2);
+ else if (is_nil(BIF_ARG_2))
+ size = 0;
+ else if (is_list(BIF_ARG_2))
+ BIF_TRAP2(dist_ctrl_put_data_trap,
+ BIF_P, BIF_ARG_1, BIF_ARG_2);
+ else
+ BIF_ERROR(BIF_P, BADARG);
+
+ dep = erts_dhandle_to_dist_entry(BIF_ARG_1);
+ if (!dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ input_handler = (Eterm) erts_atomic_read_nob(&dep->input_handler);
+
+ if (input_handler != BIF_P->common.id)
+ BIF_ERROR(BIF_P, EXC_NOTSUP);
+
+ erts_atomic64_inc_nob(&dep->in);
+
+ if (size != 0) {
+ byte *data, *temp_alloc = NULL;
+
+ data = (byte *) erts_get_aligned_binary_bytes(BIF_ARG_2, &temp_alloc);
+ if (!data)
+ BIF_ERROR(BIF_P, BADARG);
+
+ erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
+
+ (void) erts_net_message(NULL, dep, NULL, 0, data, size);
+ /*
+ * We ignore any decode failures. On fatal failures the
+ * connection will be taken down by killing the
+ * distribution channel controller...
+ */
+
+ erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
+
+ BUMP_REDS(BIF_P, 5);
+
+ erts_free_aligned_binary_bytes(temp_alloc);
+
+ }
+
+ BIF_RET(am_ok);
+}
+
+BIF_RETTYPE
+dist_get_stat_1(BIF_ALIST_1)
+{
+ Sint64 read, write, pend;
+ Eterm res, *hp, **hpp;
+ Uint sz, *szp;
+ DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1);
+
+ if (!dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ erts_de_rlock(dep);
+
+ read = (Sint64) erts_atomic64_read_nob(&dep->in);
+ write = (Sint64) erts_atomic64_read_nob(&dep->out);
+ pend = (Sint64) erts_atomic_read_nob(&dep->qsize);
+
+ erts_de_runlock(dep);
+
+ sz = 0;
+ szp = &sz;
+ hpp = NULL;
+
+ while (1) {
+ res = erts_bld_tuple(hpp, szp, 4,
+ am_ok,
+ erts_bld_sint64(hpp, szp, read),
+ erts_bld_sint64(hpp, szp, write),
+ pend ? am_true : am_false);
+ if (hpp)
+ break;
+ hp = HAlloc(BIF_P, sz);
+ hpp = &hp;
+ szp = NULL;
+ }
+
+ BIF_RET(res);
+}
+
+BIF_RETTYPE
+dist_ctrl_input_handler_2(BIF_ALIST_2)
+{
+ DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
+
+ if (!dep)
+ BIF_ERROR(BIF_P, EXC_NOTSUP);
+
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ if (is_not_internal_pid(BIF_ARG_2))
+ BIF_ERROR(BIF_P, BADARG);
+
+ erts_atomic_set_nob(&dep->input_handler,
+ (erts_aint_t) BIF_ARG_2);
+
+ BIF_RET(am_ok);
+}
+
+BIF_RETTYPE
+dist_ctrl_get_data_1(BIF_ALIST_1)
+{
+ DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
+ int reds = 1;
+ ErtsDistOutputBuf *obuf;
+ Eterm *hp;
+ ProcBin *pb;
+ erts_aint_t qsize;
+
+ if (!dep)
+ BIF_ERROR(BIF_P, EXC_NOTSUP);
+
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep)
+ BIF_ERROR(BIF_P, BADARG);
+
+ erts_de_rlock(dep);
+
+ if (dep->status & ERTS_DE_SFLG_EXITING)
+ goto return_none;
+
+ ASSERT(dep->cid == BIF_P->common.id);
+
+#if 0
+ if (dep->finalized_out_queue.first) {
+ obuf = dep->finalized_out_queue.first;
+ dep->finalized_out_queue.first = obuf->next;
+ if (!obuf->next)
+ dep->finalized_out_queue.last = NULL;
+ }
+ else
+#endif
+ {
+ if (!dep->tmp_out_queue.first) {
+ ASSERT(!dep->tmp_out_queue.last);
+ qsize = erts_atomic_read_acqb(&dep->qsize);
+ if (qsize > 0) {
+ erts_mtx_lock(&dep->qlock);
+ dep->tmp_out_queue.first = dep->out_queue.first;
+ dep->tmp_out_queue.last = dep->out_queue.last;
+ dep->out_queue.first = NULL;
+ dep->out_queue.last = NULL;
+ erts_mtx_unlock(&dep->qlock);
+ }
+ }
+
+ if (!dep->tmp_out_queue.first) {
+ ASSERT(!dep->tmp_out_queue.last);
+ return_none:
+ erts_de_runlock(dep);
+ BIF_RET(am_none);
+ }
+ else {
+ obuf = dep->tmp_out_queue.first;
+ dep->tmp_out_queue.first = obuf->next;
+ if (!obuf->next)
+ dep->tmp_out_queue.last = NULL;
+ }
+
+ obuf->extp = erts_encode_ext_dist_header_finalize(obuf->extp,
+ dep->cache,
+ dep->flags);
+ reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
+ if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE))
+ *--obuf->extp = PASS_THROUGH; /* 'pass through' needed */
+ ASSERT(&obuf->data[0] <= obuf->extp
+ && obuf->extp < obuf->ext_endp);
+ }
+
+ erts_atomic64_inc_nob(&dep->out);
+
+ erts_de_runlock(dep);
+
+ hp = HAlloc(BIF_P, PROC_BIN_SIZE);
+ pb = (ProcBin *) (char *) hp;
+ pb->thing_word = HEADER_PROC_BIN;
+ pb->size = obuf->ext_endp - obuf->extp;
+ pb->next = MSO(BIF_P).first;
+ MSO(BIF_P).first = (struct erl_off_heap_header*) pb;
+ pb->val = ErtsDistOutputBuf2Binary(obuf);
+ pb->bytes = (byte*) obuf->extp;
+ pb->flags = 0;
+
+ qsize = erts_atomic_add_read_nob(&dep->qsize, -size_obuf(obuf));
+ ASSERT(qsize >= 0);
+
+ if (qsize < erts_dist_buf_busy_limit/2
+ && (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY)) {
+ ErtsProcList *resume_procs = NULL;
+ erts_mtx_lock(&dep->qlock);
+ resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
+ erts_mtx_unlock(&dep->qlock);
+ if (resume_procs) {
+ int resumed = erts_resume_processes(resume_procs);
+ reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
+ }
+ }
+
+ BIF_RET2(make_binary(pb), reds);
+}
+
void
erts_dist_port_not_busy(Port *prt)
{
@@ -2307,54 +2895,56 @@ erts_dist_port_not_busy(Port *prt)
void
erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id)
{
- erts_smp_de_rwlock(dep);
- if (is_internal_port(dep->cid)
- && connection_id == dep->connection_id
+ erts_de_rwlock(dep);
+ if (connection_id == dep->connection_id
&& !(dep->status & ERTS_DE_SFLG_EXITING)) {
dep->status |= ERTS_DE_SFLG_EXITING;
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
- dep->qflgs |= ERTS_DE_QFLG_EXIT;
- erts_smp_mtx_unlock(&dep->qlock);
+ erts_mtx_lock(&dep->qlock);
+ ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
+ erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT);
+ erts_mtx_unlock(&dep->qlock);
- erts_schedule_dist_command(NULL, dep);
+ if (is_internal_port(dep->cid))
+ erts_schedule_dist_command(NULL, dep);
+ else if (is_internal_pid(dep->cid))
+ schedule_kill_dist_ctrl_proc(dep->cid);
}
- erts_smp_de_rwunlock(dep);
+ erts_de_rwunlock(dep);
}
struct print_to_data {
- int to;
+ fmtfn_t to;
void *arg;
};
static void doit_print_monitor_info(ErtsMonitor *mon, void *vptdp)
{
- int to = ((struct print_to_data *) vptdp)->to;
+ fmtfn_t to = ((struct print_to_data *) vptdp)->to;
void *arg = ((struct print_to_data *) vptdp)->arg;
Process *rp;
ErtsMonitor *rmon;
- rp = erts_proc_lookup(mon->pid);
+ rp = erts_proc_lookup(mon->u.pid);
if (!rp || (rmon = erts_lookup_monitor(ERTS_P_MONITORS(rp), mon->ref)) == NULL) {
- erts_print(to, arg, "Warning, stray monitor for: %T\n", mon->pid);
+ erts_print(to, arg, "Warning, stray monitor for: %T\n", mon->u.pid);
} else if (mon->type == MON_ORIGIN) {
/* Local pid is being monitored */
erts_print(to, arg, "Remotely monitored by: %T %T\n",
- mon->pid, rmon->pid);
+ mon->u.pid, rmon->u.pid);
} else {
- erts_print(to, arg, "Remote monitoring: %T ", mon->pid);
- if (is_not_atom(rmon->pid))
- erts_print(to, arg, "%T\n", rmon->pid);
+ erts_print(to, arg, "Remote monitoring: %T ", mon->u.pid);
+ if (is_not_atom(rmon->u.pid))
+ erts_print(to, arg, "%T\n", rmon->u.pid);
else
erts_print(to, arg, "{%T, %T}\n",
rmon->name,
- rmon->pid); /* which in this case is the
+ rmon->u.pid); /* which in this case is the
remote system name... */
}
}
-static void print_monitor_info(int to, void *arg, ErtsMonitor *mon)
+static void print_monitor_info(fmtfn_t to, void *arg, ErtsMonitor *mon)
{
struct print_to_data ptd = {to, arg};
erts_doforall_monitors(mon,&doit_print_monitor_info,&ptd);
@@ -2380,7 +2970,7 @@ static void doit_print_link_info(ErtsLink *lnk, void *vptdp)
}
}
-static void print_link_info(int to, void *arg, ErtsLink *lnk)
+static void print_link_info(fmtfn_t to, void *arg, ErtsLink *lnk)
{
struct print_to_data ptd = {to, arg};
erts_doforall_links(lnk, &doit_print_link_info, (void *) &ptd);
@@ -2401,7 +2991,7 @@ static void doit_print_nodelink_info(ErtsLink *lnk, void *vpcontext)
"Remote monitoring: %T %T\n", lnk->pid, pcontext->sysname);
}
-static void print_nodelink_info(int to, void *arg, ErtsLink *lnk, Eterm sysname)
+static void print_nodelink_info(fmtfn_t to, void *arg, ErtsLink *lnk, Eterm sysname)
{
PrintNodeLinkContext context = {{to, arg}, sysname};
erts_doforall_links(lnk, &doit_print_nodelink_info, &context);
@@ -2409,7 +2999,7 @@ static void print_nodelink_info(int to, void *arg, ErtsLink *lnk, Eterm sysname)
static int
-info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected)
+info_dist_entry(fmtfn_t to, void *arg, DistEntry *dep, int visible, int connected)
{
if (visible && connected) {
@@ -2436,9 +3026,6 @@ info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected)
}
erts_print(to, arg, "Name: %T", dep->sysname);
-#ifdef DEBUG
- erts_print(to, arg, " (refc=%d)", erts_refc_read(&dep->refc, 1));
-#endif
erts_print(to, arg, "\n");
if (!connected && is_nil(dep->cid)) {
if (dep->nlinks) {
@@ -2458,7 +3045,7 @@ info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected)
return 0;
}
-int distribution_info(int to, void *arg) /* Called by break handler */
+int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */
{
DistEntry *dep;
@@ -2483,7 +3070,9 @@ int distribution_info(int to, void *arg) /* Called by break handler */
}
for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) {
- info_dist_entry(to, arg, dep, 0, 0);
+ if (dep != erts_this_dist_entry) {
+ info_dist_entry(to, arg, dep, 0, 0);
+ }
}
return(0);
@@ -2556,37 +3145,46 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2)
goto error;
}
- net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN,
- am_net_kernel, ERTS_PROC_LOCK_MAIN, 0);
- if (!net_kernel)
+ net_kernel = erts_whereis_process(BIF_P,
+ ERTS_PROC_LOCK_MAIN,
+ am_net_kernel,
+ ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS,
+ 0);
+ if (!net_kernel || ERTS_PROC_GET_DIST_ENTRY(net_kernel))
goto error;
- /* By setting dist_entry==erts_this_dist_entry and DISTRIBUTION on
- net_kernel do_net_exist will be called when net_kernel
- is terminated !! */
- (void) ERTS_PROC_SET_DIST_ENTRY(net_kernel,
- ERTS_PROC_LOCK_MAIN,
- erts_this_dist_entry);
- erts_refc_inc(&erts_this_dist_entry->refc, 2);
+ /* By setting F_DISTRIBUTION on net_kernel,
+ * erts_do_net_exits will be called when net_kernel is terminated !! */
net_kernel->flags |= F_DISTRIBUTION;
- if (net_kernel != BIF_P)
- erts_smp_proc_unlock(net_kernel, ERTS_PROC_LOCK_MAIN);
+ erts_proc_unlock(net_kernel,
+ (ERTS_PROC_LOCK_STATUS
+ | ((net_kernel != BIF_P)
+ ? ERTS_PROC_LOCK_MAIN
+ : 0)));
#ifdef DEBUG
- erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx);
+ erts_rwmtx_rlock(&erts_dist_table_rwmtx);
ASSERT(!erts_visible_dist_entries && !erts_hidden_dist_entries);
- erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx);
+ erts_rwmtx_runlock(&erts_dist_table_rwmtx);
#endif
- erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
- erts_smp_thr_progress_block();
+ erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
+ erts_thr_progress_block();
inc_no_nodes();
erts_set_this_node(BIF_ARG_1, (Uint32) creation);
erts_is_alive = 1;
send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, am_visible, NIL);
- erts_smp_thr_progress_unblock();
- erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
+ erts_thr_progress_unblock();
+ erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
+
+ /*
+ * Note erts_this_dist_entry is changed by erts_set_this_node(),
+ * so we *need* to use the new one after erts_set_this_node()
+ * is called.
+ */
+ erts_ref_dist_entry(erts_this_dist_entry);
+ ERTS_PROC_SET_DIST_ENTRY(net_kernel, erts_this_dist_entry);
BIF_RET(am_true);
@@ -2617,18 +3215,18 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
Eterm ic, oc;
Eterm *tp;
DistEntry *dep = NULL;
+ ErtsProcLocks proc_unlock = 0;
+ Process *proc;
Port *pp = NULL;
- /* Prepare for success */
- ERTS_BIF_PREP_RET(ret, am_true);
-
/*
* Check and pick out arguments
*/
if (!is_node_name_atom(BIF_ARG_1) ||
- is_not_internal_port(BIF_ARG_2) ||
- (erts_this_node->sysname == am_Noname)) {
+ !(is_internal_port(BIF_ARG_2)
+ || is_internal_pid(BIF_ARG_2))
+ || (erts_this_node->sysname == am_Noname)) {
goto badarg;
}
@@ -2672,77 +3270,124 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
else if (!dep)
goto system_limit; /* Should never happen!!! */
- pp = erts_id2port_sflgs(BIF_ARG_2,
- BIF_P,
- ERTS_PROC_LOCK_MAIN,
- ERTS_PORT_SFLGS_INVALID_LOOKUP);
- erts_smp_de_rwlock(dep);
+ if (is_internal_pid(BIF_ARG_2)) {
+ if (BIF_P->common.id == BIF_ARG_2) {
+ proc_unlock = 0;
+ proc = BIF_P;
+ }
+ else {
+ proc_unlock = ERTS_PROC_LOCK_MAIN;
+ proc = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN,
+ BIF_ARG_2, proc_unlock);
+ }
+ erts_de_rwlock(dep);
- if (!pp || (erts_atomic32_read_nob(&pp->state)
- & ERTS_PORT_SFLG_EXITING))
- goto badarg;
+ if (!proc)
+ goto badarg;
+ else if (proc == ERTS_PROC_LOCK_BUSY) {
+ proc_unlock = 0;
+ goto yield;
+ }
- if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
- goto badarg;
+ erts_proc_lock(proc, ERTS_PROC_LOCK_STATUS);
+ proc_unlock |= ERTS_PROC_LOCK_STATUS;
+
+ if (ERTS_PROC_GET_DIST_ENTRY(proc)) {
+ if (dep == ERTS_PROC_GET_DIST_ENTRY(proc)
+ && (proc->flags & F_DISTRIBUTION)
+ && dep->cid == BIF_ARG_2) {
+ ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep));
+ goto done;
+ }
+ goto badarg;
+ }
+
+ if (is_not_nil(dep->cid))
+ goto badarg;
- if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep)
- goto done; /* Already set */
+ proc->flags |= F_DISTRIBUTION;
+ ERTS_PROC_SET_DIST_ENTRY(proc, dep);
+
+ proc_unlock &= ~ERTS_PROC_LOCK_STATUS;
+ erts_proc_unlock(proc, ERTS_PROC_LOCK_STATUS);
+
+ dep->send = NULL; /* Only for distr ports... */
- if (dep->status & ERTS_DE_SFLG_EXITING) {
- /* Suspend on dist entry waiting for the exit to finish */
- ErtsProcList *plp = erts_proclist_create(BIF_P);
- plp->next = NULL;
- erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
- erts_smp_mtx_lock(&dep->qlock);
- erts_proclist_store_last(&dep->suspended, plp);
- erts_smp_mtx_unlock(&dep->qlock);
- goto yield;
}
+ else {
- ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING));
+ pp = erts_id2port_sflgs(BIF_ARG_2,
+ BIF_P,
+ ERTS_PROC_LOCK_MAIN,
+ ERTS_PORT_SFLGS_INVALID_LOOKUP);
+ erts_de_rwlock(dep);
- if (pp->dist_entry || is_not_nil(dep->cid))
- goto badarg;
+ if (!pp || (erts_atomic32_read_nob(&pp->state)
+ & ERTS_PORT_SFLG_EXITING))
+ goto badarg;
- erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
+ if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
+ goto badarg;
- /*
- * Dist-ports do not use the "busy port message queue" functionality, but
- * instead use "busy dist entry" functionality.
- */
- {
- ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
- erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
- }
+ if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) {
+ ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep));
+ goto done; /* Already set */
+ }
- pp->dist_entry = dep;
+ if (dep->status & ERTS_DE_SFLG_EXITING) {
+ /* Suspend on dist entry waiting for the exit to finish */
+ ErtsProcList *plp = erts_proclist_create(BIF_P);
+ plp->next = NULL;
+ erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
+ erts_mtx_lock(&dep->qlock);
+ erts_proclist_store_last(&dep->suspended, plp);
+ erts_mtx_unlock(&dep->qlock);
+ goto yield;
+ }
- dep->version = version;
- dep->creation = 0;
+ ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING));
- ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
+ if (pp->dist_entry || is_not_nil(dep->cid))
+ goto badarg;
-#if 1
- dep->send = (pp->drv_ptr->outputv
- ? dist_port_commandv
- : dist_port_command);
-#else
- dep->send = dist_port_command;
-#endif
- ASSERT(dep->send);
+ erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
+
+ pp->dist_entry = dep;
+
+ ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
+
+ dep->send = (pp->drv_ptr->outputv
+ ? dist_port_commandv
+ : dist_port_command);
+ ASSERT(dep->send);
+
+ /*
+ * Dist-ports do not use the "busy port message queue" functionality, but
+ * instead use "busy dist entry" functionality.
+ */
+ {
+ ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
+ erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
+ }
+
+ }
+
+ dep->version = version;
+ dep->creation = 0;
#ifdef DEBUG
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize == 0);
- erts_smp_mtx_unlock(&dep->qlock);
+ ASSERT(erts_atomic_read_nob(&dep->qsize) == 0);
#endif
- erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);
-
if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
create_cache(dep);
- erts_smp_de_rwunlock(dep);
+ erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);
+
+ erts_de_rwunlock(dep);
+
+ ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep));
+
dep = NULL; /* inc of refc transferred to port (dist_entry field) */
inc_no_nodes();
@@ -2755,13 +3400,16 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
done:
if (dep && dep != erts_this_dist_entry) {
- erts_smp_de_rwunlock(dep);
+ erts_de_rwunlock(dep);
erts_deref_dist_entry(dep);
}
if (pp)
erts_port_release(pp);
+ if (proc_unlock)
+ erts_proc_unlock(proc, proc_unlock);
+
return ret;
yield:
@@ -2807,7 +3455,7 @@ BIF_RETTYPE dist_exit_3(BIF_ALIST_3)
if (BIF_P->common.id == local) {
lp_locks = ERTS_PROC_LOCKS_ALL;
lp = BIF_P;
- erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR);
+ erts_proc_lock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR);
}
else {
lp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
@@ -2826,21 +3474,17 @@ BIF_RETTYPE dist_exit_3(BIF_ALIST_3)
NIL,
NULL,
0);
-#ifdef ERTS_SMP
if (lp == BIF_P)
lp_locks &= ~ERTS_PROC_LOCK_MAIN;
-#endif
- erts_smp_proc_unlock(lp, lp_locks);
+ erts_proc_unlock(lp, lp_locks);
if (lp == BIF_P) {
- erts_aint32_t state = erts_smp_atomic32_read_acqb(&BIF_P->state);
+ erts_aint32_t state = erts_atomic32_read_acqb(&BIF_P->state);
/*
* We may have exited current process and may have to take action.
*/
if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) {
-#ifdef ERTS_SMP
if (state & ERTS_PSFLG_PENDING_EXIT)
erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN);
-#endif
ERTS_BIF_EXITED(BIF_P);
}
}
@@ -2926,13 +3570,13 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
length = 0;
- erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx);
+ erts_rwmtx_rlock(&erts_dist_table_rwmtx);
- ASSERT(erts_no_of_not_connected_dist_entries >= 0);
+ ASSERT(erts_no_of_not_connected_dist_entries > 0);
ASSERT(erts_no_of_hidden_dist_entries >= 0);
ASSERT(erts_no_of_visible_dist_entries >= 0);
if(not_connected)
- length += erts_no_of_not_connected_dist_entries;
+ length += (erts_no_of_not_connected_dist_entries - 1);
if(hidden)
length += erts_no_of_hidden_dist_entries;
if(visible)
@@ -2943,7 +3587,7 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
result = NIL;
if (length == 0) {
- erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx);
+ erts_rwmtx_runlock(&erts_dist_table_rwmtx);
goto done;
}
@@ -2954,8 +3598,10 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
#endif
if(not_connected)
for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) {
- result = CONS(hp, dep->sysname, result);
- hp += 2;
+ if (dep != erts_this_dist_entry) {
+ result = CONS(hp, dep->sysname, result);
+ hp += 2;
+ }
}
if(hidden)
for(dep = erts_hidden_dist_entries; dep; dep = dep->next) {
@@ -2972,7 +3618,7 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1)
hp += 2;
}
ASSERT(endp == hp);
- erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx);
+ erts_rwmtx_runlock(&erts_dist_table_rwmtx);
done:
UnUseTmpHeap(2,BIF_P);
@@ -3027,15 +3673,15 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options)
if (dep == erts_this_dist_entry)
goto done;
- erts_smp_proc_lock(p, ERTS_PROC_LOCK_LINK);
- erts_smp_de_rlock(dep);
+ erts_proc_lock(p, ERTS_PROC_LOCK_LINK);
+ erts_de_rlock(dep);
if (ERTS_DE_IS_NOT_CONNECTED(dep)) {
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK);
- erts_smp_de_runlock(dep);
+ erts_proc_unlock(p, ERTS_PROC_LOCK_LINK);
+ erts_de_runlock(dep);
goto do_trap;
}
- erts_smp_de_links_lock(dep);
- erts_smp_de_runlock(dep);
+ erts_de_links_lock(dep);
+ erts_de_runlock(dep);
if (Bool == am_true) {
ASSERT(dep->cid != NIL);
@@ -3062,11 +3708,10 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options)
}
}
- erts_smp_de_links_unlock(dep);
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK);
+ erts_de_links_unlock(dep);
+ erts_proc_unlock(p, ERTS_PROC_LOCK_LINK);
done:
- erts_deref_dist_entry(dep);
BIF_RET(am_true);
}
@@ -3095,9 +3740,9 @@ BIF_RETTYPE net_kernel_dflag_unicode_io_1(BIF_ALIST_1)
if (de == erts_this_dist_entry) {
BIF_RET(am_true);
}
- erts_smp_de_rlock(de);
+ erts_de_rlock(de);
f = de->flags;
- erts_smp_de_runlock(de);
+ erts_de_runlock(de);
BIF_RET(((f & DFLAG_UNICODE_IO) ? am_true : am_false));
}
@@ -3127,7 +3772,7 @@ struct ErtsNodesMonitor_ {
Uint16 no;
};
-static erts_smp_mtx_t nodes_monitors_mtx;
+static erts_mtx_t nodes_monitors_mtx;
static ErtsNodesMonitor *nodes_monitors;
static ErtsNodesMonitor *nodes_monitors_end;
@@ -3145,7 +3790,8 @@ static ErtsNodesMonitor *nodes_monitors_end;
static void
init_nodes_monitors(void)
{
- erts_smp_mtx_init(&nodes_monitors_mtx, "nodes_monitors");
+ erts_mtx_init(&nodes_monitors_mtx, "nodes_monitors", NIL,
+ ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION);
nodes_monitors = NULL;
nodes_monitors_end = NULL;
}
@@ -3186,11 +3832,16 @@ send_nodes_mon_msg(Process *rp,
Uint sz)
{
Eterm msg;
- ErlHeapFragment* bp;
+ Eterm *hp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
- Eterm *hp = erts_alloc_message_heap(sz, &bp, &ohp, rp, rp_locksp);
#ifdef DEBUG
- Eterm *hend = hp + sz;
+ Eterm *hend;
+#endif
+
+ mp = erts_alloc_message_heap(rp, rp_locksp, sz, &hp, &ohp);
+#ifdef DEBUG
+ hend = hp + sz;
#endif
if (!nmp->opts) {
@@ -3236,11 +3887,7 @@ send_nodes_mon_msg(Process *rp,
}
ASSERT(hend == hp);
- erts_queue_message(rp, rp_locksp, bp, msg, NIL
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, *rp_locksp, mp, msg, am_system);
}
static void
@@ -3269,10 +3916,10 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas
}
#endif
- ERTS_SMP_LC_ASSERT(!c_p
+ ERTS_LC_ASSERT(!c_p
|| (erts_proc_lc_my_proc_locks(c_p)
== ERTS_PROC_LOCK_MAIN));
- erts_smp_mtx_lock(&nodes_monitors_mtx);
+ erts_mtx_lock(&nodes_monitors_mtx);
for (nmp = nodes_monitors; nmp; nmp = nmp->next) {
int i;
@@ -3297,7 +3944,7 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas
continue;
break;
default:
- erl_exit(ERTS_ABORT_EXIT, "Bad node type found\n");
+ erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n");
}
}
@@ -3305,7 +3952,7 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas
if (rp) {
if (rp == c_p)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
}
rp = nmp->proc;
@@ -3332,10 +3979,10 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas
if (rp) {
if (rp == c_p)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
- erts_smp_proc_unlock(rp, rp_locks);
+ erts_proc_unlock(rp, rp_locks);
}
- erts_smp_mtx_unlock(&nodes_monitors_mtx);
+ erts_mtx_unlock(&nodes_monitors_mtx);
}
static Eterm
@@ -3345,8 +3992,8 @@ insert_nodes_monitor(Process *c_p, Uint32 opts)
Eterm res = am_false;
ErtsNodesMonitor *xnmp, *nmp;
- ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&nodes_monitors_mtx));
- ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN);
+ ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&nodes_monitors_mtx));
+ ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN);
xnmp = c_p->nodes_monitors;
if (xnmp) {
@@ -3430,8 +4077,8 @@ remove_nodes_monitors(Process *c_p, Uint32 opts, int all)
Eterm res = am_false;
ErtsNodesMonitor *nmp;
- ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&nodes_monitors_mtx));
- ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN);
+ ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&nodes_monitors_mtx));
+ ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN);
nmp = c_p->nodes_monitors;
ASSERT(!nmp || !nmp->prev || nmp->prev->proc != c_p);
@@ -3473,23 +4120,23 @@ remove_nodes_monitors(Process *c_p, Uint32 opts, int all)
void
erts_delete_nodes_monitors(Process *c_p, ErtsProcLocks locks)
{
-#if defined(ERTS_ENABLE_LOCK_CHECK) && defined(ERTS_SMP)
+#if defined(ERTS_ENABLE_LOCK_CHECK)
if (c_p) {
ErtsProcLocks might_unlock = locks & ~ERTS_PROC_LOCK_MAIN;
if (might_unlock)
erts_proc_lc_might_unlock(c_p, might_unlock);
}
#endif
- if (erts_smp_mtx_trylock(&nodes_monitors_mtx) == EBUSY) {
+ if (erts_mtx_trylock(&nodes_monitors_mtx) == EBUSY) {
ErtsProcLocks unlock_locks = locks & ~ERTS_PROC_LOCK_MAIN;
if (c_p && unlock_locks)
- erts_smp_proc_unlock(c_p, unlock_locks);
- erts_smp_mtx_lock(&nodes_monitors_mtx);
+ erts_proc_unlock(c_p, unlock_locks);
+ erts_mtx_lock(&nodes_monitors_mtx);
if (c_p && unlock_locks)
- erts_smp_proc_lock(c_p, unlock_locks);
+ erts_proc_lock(c_p, unlock_locks);
}
remove_nodes_monitors(c_p, 0, 1);
- erts_smp_mtx_unlock(&nodes_monitors_mtx);
+ erts_mtx_unlock(&nodes_monitors_mtx);
}
Eterm
@@ -3500,7 +4147,7 @@ erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist)
Uint16 opts = (Uint16) 0;
ASSERT(c_p);
- ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);
+ ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);
if (on != am_true && on != am_false)
return THE_NON_VALUE;
@@ -3556,14 +4203,14 @@ erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist)
return THE_NON_VALUE;
}
- erts_smp_mtx_lock(&nodes_monitors_mtx);
+ erts_mtx_lock(&nodes_monitors_mtx);
if (on == am_true)
res = insert_nodes_monitor(c_p, opts);
else
res = remove_nodes_monitors(c_p, opts, 0);
- erts_smp_mtx_unlock(&nodes_monitors_mtx);
+ erts_mtx_unlock(&nodes_monitors_mtx);
return res;
}
@@ -3586,8 +4233,8 @@ erts_processes_monitoring_nodes(Process *c_p)
#endif
ASSERT(c_p);
- ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);
- erts_smp_mtx_lock(&nodes_monitors_mtx);
+ ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);
+ erts_mtx_lock(&nodes_monitors_mtx);
sz = 0;
szp = &sz;
@@ -3606,7 +4253,7 @@ erts_processes_monitoring_nodes(Process *c_p)
case ERTS_NODES_MON_OPT_TYPES: type = am_all; break;
case ERTS_NODES_MON_OPT_TYPE_VISIBLE: type = am_visible; break;
case ERTS_NODES_MON_OPT_TYPE_HIDDEN: type = am_hidden; break;
- default: erl_exit(ERTS_ABORT_EXIT, "Bad node type found\n");
+ default: erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n");
}
olist = erts_bld_cons(hpp, szp,
erts_bld_tuple(hpp, szp, 2,
@@ -3636,7 +4283,7 @@ erts_processes_monitoring_nodes(Process *c_p)
ASSERT(hp == hend);
- erts_smp_mtx_unlock(&nodes_monitors_mtx);
+ erts_mtx_unlock(&nodes_monitors_mtx);
return res;
}