diff options
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r-- | erts/emulator/beam/dist.c | 3822 |
1 files changed, 2255 insertions, 1567 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 09c83f1117..0633bff3c2 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1996-2016. All Rights Reserved. + * Copyright Ericsson AB 1996-2018. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ /* define this to get a lot of debug output */ /* #define ERTS_DIST_MSG_DBG */ +/* #define ERTS_RAW_DIST_MSG_DBG */ #ifdef HAVE_CONFIG_H # include "config.h" @@ -44,6 +45,7 @@ #include "erl_binary.h" #include "erl_thr_progress.h" #include "dtrace-wrapper.h" +#include "erl_proc_sig_queue.h" #define DIST_CTL_DEFAULT_SIZE 64 @@ -102,35 +104,27 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) -#define PASS_THROUGH 'p' /* This code should go */ - int erts_is_alive; /* System must be blocked on change */ int erts_dist_buf_busy_limit; /* distribution trap functions */ -Export* dsend2_trap = NULL; -Export* dsend3_trap = NULL; -/*Export* dsend_nosuspend_trap = NULL;*/ -Export* dlink_trap = NULL; -Export* dunlink_trap = NULL; Export* dmonitor_node_trap = NULL; -Export* dgroup_leader_trap = NULL; -Export* dexit_trap = NULL; -Export* dmonitor_p_trap = NULL; /* local variables */ - +static Export *dist_ctrl_put_data_trap; /* forward declarations */ -static void clear_dist_entry(DistEntry*); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); +static Sint abort_connection(DistEntry* dep, Uint32 conn_id); +static ErtsDistOutputBuf* clear_de_out_queues(DistEntry*); +static void free_de_out_queues(DistEntry*, ErtsDistOutputBuf*); -static erts_smp_atomic_t no_caches; -static erts_smp_atomic_t no_nodes; +static erts_atomic_t no_caches; +static erts_atomic_t no_nodes; struct { Eterm reason; @@ -143,8 +137,8 @@ delete_cache(ErtsAtomCache *cache) { if (cache) { erts_free(ERTS_ALC_T_DCACHE, (void *) cache); - ASSERT(erts_smp_atomic_read_nob(&no_caches) > 0); - erts_smp_atomic_dec_nob(&no_caches); + ASSERT(erts_atomic_read_nob(&no_caches) > 0); + erts_atomic_dec_nob(&no_caches); } } @@ -155,14 +149,12 @@ create_cache(DistEntry *dep) int i; ErtsAtomCache *cp; - ERTS_SMP_LC_ASSERT( - is_internal_port(dep->cid) - && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); + ERTS_LC_ASSERT(is_nil(dep->cid)); ASSERT(!dep->cache); dep->cache = cp = (ErtsAtomCache*) erts_alloc(ERTS_ALC_T_DCACHE, sizeof(ErtsAtomCache)); - erts_smp_atomic_inc_nob(&no_caches); + erts_atomic_inc_nob(&no_caches); for (i = 0; i < sizeof(cp->in_arr)/sizeof(cp->in_arr[0]); i++) { cp->in_arr[i] = THE_NON_VALUE; cp->out_arr[i] = THE_NON_VALUE; @@ -171,15 +163,17 @@ create_cache(DistEntry *dep) Uint erts_dist_cache_size(void) { - return (Uint) erts_smp_atomic_read_mb(&no_caches)*sizeof(ErtsAtomCache); + return (Uint) erts_atomic_read_mb(&no_caches)*sizeof(ErtsAtomCache); } static ErtsProcList * -get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs) +get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs) { - ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&dep->qlock)); - dep->qflgs &= ~unset_qflgs; - if (dep->qflgs & ERTS_DE_QFLG_EXIT) { + erts_aint32_t qflgs; + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); + qflgs = erts_atomic32_read_band_acqb(&dep->qflgs, ~unset_qflgs); + qflgs &= ~unset_qflgs; + if (qflgs & ERTS_DE_QFLG_EXIT) { /* No resume when exit has been scheduled */ return NULL; } @@ -191,6 +185,161 @@ get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs) } } +#define ERTS_MON_LNK_FIRE_LIMIT 100 + +static void monitor_connection_down(ErtsMonitor *mon, void *unused) +{ + if (erts_monitor_is_origin(mon)) + erts_proc_sig_send_demonitor(mon); + else + erts_proc_sig_send_monitor_down(mon, am_noconnection); +} + +static void link_connection_down(ErtsLink *lnk, void *vdist) +{ + erts_proc_sig_send_link_exit(NULL, THE_NON_VALUE, lnk, + am_noconnection, NIL); +} + +typedef enum { + ERTS_CML_CLEANUP_STATE_LINKS, + ERTS_CML_CLEANUP_STATE_MONITORS, + ERTS_CML_CLEANUP_STATE_ONAME_MONITORS, + ERTS_CML_CLEANUP_STATE_NODE_MONITORS +} ErtsConMonLnkCleaupState; + +typedef struct { + ErtsConMonLnkCleaupState state; + ErtsMonLnkDist *dist; + void *yield_state; + int trigger_node_monitors; + Eterm nodename; + Eterm visability; + Eterm reason; + ErlOffHeap oh; + Eterm heap[1]; +} ErtsConMonLnkCleanup; + +static void +con_monitor_link_cleanup(void *vcmlcp) +{ + ErtsConMonLnkCleanup *cmlcp = vcmlcp; + ErtsMonLnkDist *dist = cmlcp->dist; + ErtsSchedulerData *esdp; + int yield; + + switch (cmlcp->state) { + case ERTS_CML_CLEANUP_STATE_LINKS: + yield = erts_link_list_foreach_delete_yielding(&dist->links, + link_connection_down, + NULL, &cmlcp->yield_state, + ERTS_MON_LNK_FIRE_LIMIT); + if (yield) + break; + + ASSERT(!cmlcp->yield_state); + cmlcp->state = ERTS_CML_CLEANUP_STATE_MONITORS; + case ERTS_CML_CLEANUP_STATE_MONITORS: + yield = erts_monitor_list_foreach_delete_yielding(&dist->monitors, + monitor_connection_down, + NULL, &cmlcp->yield_state, + ERTS_MON_LNK_FIRE_LIMIT); + if (yield) + break; + + ASSERT(!cmlcp->yield_state); + cmlcp->state = ERTS_CML_CLEANUP_STATE_ONAME_MONITORS; + case ERTS_CML_CLEANUP_STATE_ONAME_MONITORS: + yield = erts_monitor_tree_foreach_delete_yielding(&dist->orig_name_monitors, + monitor_connection_down, + NULL, &cmlcp->yield_state, + ERTS_MON_LNK_FIRE_LIMIT/2); + if (yield) + break; + + cmlcp->dist = NULL; + erts_mon_link_dist_dec_refc(dist); + + ASSERT(!cmlcp->yield_state); + cmlcp->state = ERTS_CML_CLEANUP_STATE_NODE_MONITORS; + case ERTS_CML_CLEANUP_STATE_NODE_MONITORS: + if (cmlcp->trigger_node_monitors) { + send_nodes_mon_msgs(NULL, + am_nodedown, + cmlcp->nodename, + cmlcp->visability, + cmlcp->reason); + } + erts_cleanup_offheap(&cmlcp->oh); + erts_free(ERTS_ALC_T_CML_CLEANUP, vcmlcp); + return; /* done */ + } + + /* yield... */ + + esdp = erts_get_scheduler_data(); + ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL); + erts_schedule_misc_aux_work((int) esdp->no, + con_monitor_link_cleanup, + (void *) cmlcp); +} + +static void +schedule_con_monitor_link_cleanup(ErtsMonLnkDist *dist, + Eterm nodename, + Eterm visability, + Eterm reason) +{ + if (dist || is_value(nodename)) { + ErtsSchedulerData *esdp; + ErtsConMonLnkCleanup *cmlcp; + Uint rsz, size; + + size = sizeof(ErtsConMonLnkCleanup); + + if (is_non_value(reason) || is_immed(reason)) { + rsz = 0; + size -= sizeof(Eterm); + } + else { + rsz = size_object(reason); + size += sizeof(Eterm) * (rsz - 1); + } + + cmlcp = erts_alloc(ERTS_ALC_T_CML_CLEANUP, size); + + ERTS_INIT_OFF_HEAP(&cmlcp->oh); + + cmlcp->yield_state = NULL; + cmlcp->dist = dist; + if (!dist) + cmlcp->state = ERTS_CML_CLEANUP_STATE_NODE_MONITORS; + else { + cmlcp->state = ERTS_CML_CLEANUP_STATE_LINKS; + erts_mtx_lock(&dist->mtx); + ASSERT(dist->alive); + dist->alive = 0; + erts_mtx_unlock(&dist->mtx); + } + + cmlcp->trigger_node_monitors = is_value(nodename); + cmlcp->nodename = nodename; + cmlcp->visability = visability; + if (rsz == 0) + cmlcp->reason = reason; + else { + Eterm *hp = &cmlcp->heap[0]; + cmlcp->reason = copy_struct(reason, rsz, &hp, &cmlcp->oh); + } + + esdp = erts_get_scheduler_data(); + ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL); + erts_schedule_misc_aux_work((int) esdp->no, + con_monitor_link_cleanup, + (void *) cmlcp); + } +} + /* ** A full node name constists of a "n@h" ** @@ -242,185 +391,22 @@ int is_node_name_atom(Eterm a) return is_node_name((char*)atom_tab(i)->name, atom_tab(i)->len); } -typedef struct { - DistEntry *dep; - Eterm *lhp; -} NetExitsContext; - -/* -** This function is called when a distribution -** port or process terminates -*/ -static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp) -{ - Process *rp; - ErtsMonitor *rmon; - DistEntry *dep = ((NetExitsContext *) vnecp)->dep; - ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK; - - rp = erts_pid2proc(NULL, 0, mon->pid, rp_locks); - if (!rp) - goto done; - - if (mon->type == MON_ORIGIN) { - /* local pid is beeing monitored */ - rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref); - /* ASSERT(rmon != NULL); nope, can happen during process exit */ - if (rmon != NULL) { - erts_destroy_monitor(rmon); - } - } else { - DeclareTmpHeapNoproc(lhp,3); - Eterm watched; - UseTmpHeapNoproc(3); - ASSERT(mon->type == MON_TARGET); - rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref); - /* ASSERT(rmon != NULL); can happen during process exit */ - if (rmon != NULL) { - ASSERT(is_atom(rmon->name) || is_nil(rmon->name)); - watched = (is_atom(rmon->name) - ? TUPLE2(lhp, rmon->name, dep->sysname) - : rmon->pid); -#ifdef ERTS_SMP - rp_locks |= ERTS_PROC_LOCKS_MSG_SEND; - erts_smp_proc_lock(rp, ERTS_PROC_LOCKS_MSG_SEND); -#endif - erts_queue_monitor_message(rp, &rp_locks, mon->ref, am_process, - watched, am_noconnection); - erts_destroy_monitor(rmon); - } - UnUseTmpHeapNoproc(3); - } - erts_smp_proc_unlock(rp, rp_locks); - done: - erts_destroy_monitor(mon); -} - -typedef struct { - NetExitsContext *necp; - ErtsLink *lnk; -} LinkNetExitsContext; - -/* -** This is the function actually doing the job of sending exit messages -** for links in a dist entry upon net_exit (the node goes down), NB, -** only process links, not node monitors are handled here, -** they reside in a separate tree.... -*/ -static void doit_link_net_exits_sub(ErtsLink *sublnk, void *vlnecp) -{ - ErtsLink *lnk = ((LinkNetExitsContext *) vlnecp)->lnk; /* the local pid */ - ErtsLink *rlnk; - Process *rp; - - ASSERT(lnk->type == LINK_PID); - if (is_internal_pid(lnk->pid)) { - int xres; - ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND; - - rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks); - if (!rp) { - goto done; - } - - rlnk = erts_remove_link(&ERTS_P_LINKS(rp), sublnk->pid); - xres = erts_send_exit_signal(NULL, - sublnk->pid, - rp, - &rp_locks, - am_noconnection, - NIL, - NULL, - 0); - - if (rlnk) { - erts_destroy_link(rlnk); - if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) { - /* We didn't exit the process and it is traced */ - trace_proc(NULL, 0, rp, am_getting_unlinked, sublnk->pid); - } - } - erts_smp_proc_unlock(rp, rp_locks); - } - done: - erts_destroy_link(sublnk); - -} - - - - - -/* -** This function is called when a distribution -** port or process terminates, once for each link on the high level, -** it in turn traverses the link subtree for the specific link node... -*/ -static void doit_link_net_exits(ErtsLink *lnk, void *vnecp) -{ - LinkNetExitsContext lnec = {(NetExitsContext *) vnecp, lnk}; - ASSERT(lnk->type == LINK_PID); - erts_sweep_links(ERTS_LINK_ROOT(lnk), &doit_link_net_exits_sub, (void *) &lnec); -#ifdef DEBUG - ERTS_LINK_ROOT(lnk) = NULL; -#endif - erts_destroy_link(lnk); -} - - -static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp) -{ - DistEntry *dep = ((NetExitsContext *) vnecp)->dep; - Eterm name = dep->sysname; - Process *rp; - ErtsLink *rlnk; - Uint i,n; - ASSERT(lnk->type == LINK_NODE); - if (is_internal_pid(lnk->pid)) { - ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK; - ErlOffHeap *ohp; - rp = erts_proc_lookup(lnk->pid); - if (!rp) - goto done; - erts_smp_proc_lock(rp, rp_locks); - rlnk = erts_remove_link(&ERTS_P_LINKS(rp), name); - if (rlnk != NULL) { - ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE)); - erts_destroy_link(rlnk); - } - n = ERTS_LINK_REFC(lnk); - for (i = 0; i < n; ++i) { - Eterm tup; - Eterm *hp; - ErtsMessage *msgp; - - msgp = erts_alloc_message_heap(rp, &rp_locks, - 3, &hp, &ohp); - tup = TUPLE2(hp, am_nodedown, name); - erts_queue_message(rp, rp_locks, msgp, tup, am_system); - } - erts_smp_proc_unlock(rp, rp_locks); - } - done: - erts_destroy_link(lnk); -} - static void set_node_not_alive(void *unused) { ErlHeapFragment *bp; Eterm nodename = erts_this_dist_entry->sysname; - ASSERT(erts_smp_atomic_read_nob(&no_nodes) == 0); + ASSERT(erts_atomic_read_nob(&no_nodes) == 0); - erts_smp_thr_progress_block(); + erts_thr_progress_block(); erts_set_this_node(am_Noname, 0); erts_is_alive = 0; send_nodes_mon_msgs(NULL, am_nodedown, nodename, am_visible, nodedown.reason); nodedown.reason = NIL; bp = nodedown.bp; nodedown.bp = NULL; - erts_smp_thr_progress_unblock(); + erts_thr_progress_unblock(); if (bp) free_message_buffer(bp); } @@ -428,7 +414,7 @@ set_node_not_alive(void *unused) static ERTS_INLINE void dec_no_nodes(void) { - erts_aint_t no = erts_smp_atomic_dec_read_mb(&no_nodes); + erts_aint_t no = erts_atomic_dec_read_mb(&no_nodes); ASSERT(no >= 0); ASSERT(erts_get_scheduler_id()); /* Need to be a scheduler */ if (no == 0) @@ -441,12 +427,33 @@ static ERTS_INLINE void inc_no_nodes(void) { #ifdef DEBUG - erts_aint_t no = erts_smp_atomic_read_nob(&no_nodes); + erts_aint_t no = erts_atomic_read_nob(&no_nodes); ASSERT(erts_is_alive ? no > 0 : no == 0); #endif - erts_smp_atomic_inc_mb(&no_nodes); + erts_atomic_inc_mb(&no_nodes); +} + +static void +kill_dist_ctrl_proc(void *vpid) +{ + erts_proc_sig_send_exit(NULL, (Eterm) vpid, (Eterm) vpid, + am_kill, NIL, 0); } - + +static void +schedule_kill_dist_ctrl_proc(Eterm pid) +{ + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + int sched_id = 1; + if (!esdp || ERTS_SCHEDULER_IS_DIRTY(esdp)) + sched_id = 1; + else + sched_id = (int) esdp->no; + erts_schedule_misc_aux_work(sched_id, + kill_dist_ctrl_proc, + (void *) (UWord) pid); +} + /* * proc is currently running or exiting process. */ @@ -456,58 +463,83 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */ DistEntry *tdep; - int no_dist_port = 0; + int no_dist_ctrl; + int no_pending; Eterm nd_reason = (reason == am_no_network ? am_no_network : am_net_kernel_terminated); - erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx); + int i = 0; + Eterm *dist_ctrl; + DistEntry** pending; + + ERTS_UNDEF(dist_ctrl, NULL); + ERTS_UNDEF(pending, NULL); - for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) - no_dist_port++; - for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) - no_dist_port++; + erts_rwmtx_rlock(&erts_dist_table_rwmtx); + + no_dist_ctrl = (erts_no_of_hidden_dist_entries + + erts_no_of_visible_dist_entries); + no_pending = erts_no_of_pending_dist_entries; /* KILL all port controllers */ - if (no_dist_port == 0) - erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx); - else { - Eterm def_buf[128]; - int i = 0; - Eterm *dist_port; - - if (no_dist_port <= sizeof(def_buf)/sizeof(def_buf[0])) - dist_port = &def_buf[0]; - else - dist_port = erts_alloc(ERTS_ALC_T_TMP, - sizeof(Eterm)*no_dist_port); + if (no_dist_ctrl) { + dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, + sizeof(Eterm)*no_dist_ctrl); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) { - ASSERT(is_internal_port(tdep->cid)); - dist_port[i++] = tdep->cid; + ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + ASSERT(i < no_dist_ctrl); + dist_ctrl[i++] = tdep->cid; } for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) { - ASSERT(is_internal_port(tdep->cid)); - dist_port[i++] = tdep->cid; + ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + ASSERT(i < no_dist_ctrl); + dist_ctrl[i++] = tdep->cid; } - erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx); - - for (i = 0; i < no_dist_port; i++) { - Port *prt = erts_port_lookup(dist_port[i], - ERTS_PORT_SFLGS_INVALID_LOOKUP); - if (!prt) - continue; - ASSERT(erts_atomic32_read_nob(&prt->state) - & ERTS_PORT_SFLG_DISTRIBUTION); + ASSERT(i == no_dist_ctrl); + } + if (no_pending) { + pending = erts_alloc(ERTS_ALC_T_TMP, sizeof(DistEntry*)*no_pending); + i = 0; + for (tdep = erts_pending_dist_entries; tdep; tdep = tdep->next) { + ASSERT(is_nil(tdep->cid)); + ASSERT(i < no_pending); + pending[i++] = tdep; + erts_ref_dist_entry(tdep); + } + ASSERT(i == no_pending); + } + erts_rwmtx_runlock(&erts_dist_table_rwmtx); + + if (no_dist_ctrl) { + for (i = 0; i < no_dist_ctrl; i++) { + if (is_internal_pid(dist_ctrl[i])) + schedule_kill_dist_ctrl_proc(dist_ctrl[i]); + else { + Port *prt = erts_port_lookup(dist_ctrl[i], + ERTS_PORT_SFLGS_INVALID_LOOKUP); + if (prt) { + ASSERT(erts_atomic32_read_nob(&prt->state) + & ERTS_PORT_SFLG_DISTRIBUTION); + + erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED, + prt, dist_ctrl[i], nd_reason, NULL); + } + } + } + erts_free(ERTS_ALC_T_TMP, dist_ctrl); + } - erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED, - prt, dist_port[i], nd_reason, NULL); - } + if (no_pending) { + for (i = 0; i < no_pending; i++) { + abort_connection(pending[i], pending[i]->connection_id); + erts_deref_dist_entry(pending[i]); + } + erts_free(ERTS_ALC_T_TMP, pending); + } - if (dist_port != &def_buf[0]) - erts_free(ERTS_ALC_T_TMP, dist_port); - } /* - * When last dist port exits, node will be taken + * When last dist ctrl exits, node will be taken * from alive to not alive. */ ASSERT(is_nil(nodedown.reason) && !nodedown.bp); @@ -524,65 +556,78 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) &nodedown.bp->off_heap); } } - else { /* Call from distribution port */ - NetExitsContext nec = {dep}; - ErtsLink *nlinks; - ErtsLink *node_links; - ErtsMonitor *monitors; + else { /* Call from distribution controller (port/process) */ + ErtsMonLnkDist *mld; + ErtsAtomCache *cache; + ErtsProcList *suspendees; + ErtsDistOutputBuf *obuf; Uint32 flags; - erts_smp_atomic_set_mb(&dep->dist_cmd_scheduled, 1); - erts_smp_de_rwlock(dep); + erts_atomic_set_mb(&dep->dist_cmd_scheduled, 1); + erts_de_rwlock(dep); - ERTS_SMP_LC_ASSERT(is_internal_port(dep->cid) - && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); + if (is_internal_port(dep->cid)) { + ERTS_LC_ASSERT(erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); - if (erts_port_task_is_scheduled(&dep->dist_cmd)) - erts_port_task_abort(&dep->dist_cmd); + if (erts_port_task_is_scheduled(&dep->dist_cmd)) + erts_port_task_abort(&dep->dist_cmd); + } - if (dep->status & ERTS_DE_SFLG_EXITING) { + if (dep->state == ERTS_DE_STATE_EXITING) { #ifdef DEBUG - erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT); - erts_smp_mtx_unlock(&dep->qlock); + ASSERT(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT); #endif } else { - dep->status |= ERTS_DE_SFLG_EXITING; - erts_smp_mtx_lock(&dep->qlock); - ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT)); - dep->qflgs |= ERTS_DE_QFLG_EXIT; - erts_smp_mtx_unlock(&dep->qlock); + dep->state = ERTS_DE_STATE_EXITING; + erts_mtx_lock(&dep->qlock); + ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); + erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT); + erts_mtx_unlock(&dep->qlock); } - erts_smp_de_links_lock(dep); - monitors = dep->monitors; - nlinks = dep->nlinks; - node_links = dep->node_links; - dep->monitors = NULL; - dep->nlinks = NULL; - dep->node_links = NULL; - erts_smp_de_links_unlock(dep); + mld = dep->mld; + dep->mld = NULL; nodename = dep->sysname; flags = dep->flags; + erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) NIL); + cache = dep->cache; + dep->cache = NULL; + + erts_mtx_lock(&dep->qlock); + + erts_atomic64_set_nob(&dep->in, 0); + erts_atomic64_set_nob(&dep->out, 0); + + obuf = clear_de_out_queues(dep); + suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + erts_set_dist_entry_not_connected(dep); - erts_smp_de_rwunlock(dep); + erts_de_rwunlock(dep); - erts_sweep_monitors(monitors, &doit_monitor_net_exits, (void *) &nec); - erts_sweep_links(nlinks, &doit_link_net_exits, (void *) &nec); - erts_sweep_links(node_links, &doit_node_link_net_exits, (void *) &nec); + schedule_con_monitor_link_cleanup(mld, + nodename, + (flags & DFLAG_PUBLISHED + ? am_visible + : am_hidden), + (reason == am_normal + ? am_connection_closed + : reason)); - send_nodes_mon_msgs(NULL, - am_nodedown, - nodename, - flags & DFLAG_PUBLISHED ? am_visible : am_hidden, - reason == am_normal ? am_connection_closed : reason); + erts_resume_processes(suspendees); - clear_dist_entry(dep); + delete_cache(cache); + free_de_out_queues(dep, obuf); + if (dep->transcode_ctx) + transcode_free_ctx(dep); } dec_no_nodes(); @@ -596,6 +641,14 @@ trap_function(Eterm func, int arity) return erts_export_put(am_erlang, func, arity); } +/* + * Sync with dist_util.erl: + * + * -record(erts_dflags, + * {default, mandatory, addable, rejectable, strict_order}). + */ +static Eterm erts_dflags_record; + void init_dist(void) { init_nodes_monitors(); @@ -603,19 +656,24 @@ void init_dist(void) nodedown.reason = NIL; nodedown.bp = NULL; - erts_smp_atomic_init_nob(&no_nodes, 0); - erts_smp_atomic_init_nob(&no_caches, 0); + erts_atomic_init_nob(&no_nodes, 0); + erts_atomic_init_nob(&no_caches, 0); /* Lookup/Install all references to trap functions */ - dsend2_trap = trap_function(am_dsend,2); - dsend3_trap = trap_function(am_dsend,3); - /* dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/ - dlink_trap = trap_function(am_dlink,1); - dunlink_trap = trap_function(am_dunlink,1); dmonitor_node_trap = trap_function(am_dmonitor_node,3); - dgroup_leader_trap = trap_function(am_dgroup_leader,2); - dexit_trap = trap_function(am_dexit, 2); - dmonitor_p_trap = trap_function(am_dmonitor_p, 2); + dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, + am_dist_ctrl_put_data, + 2); + { + Eterm* hp = erts_alloc(ERTS_ALC_T_LITERAL, (1+6)*sizeof(Eterm)); + erts_dflags_record = TUPLE6(hp, am_erts_dflags, + make_small(DFLAG_DIST_DEFAULT), + make_small(DFLAG_DIST_MANDATORY), + make_small(DFLAG_DIST_ADDABLE), + make_small(DFLAG_DIST_REJECTABLE), + make_small(DFLAG_DIST_STRICT_ORDER)); + erts_set_literal_tag(&erts_dflags_record, hp, (1+6)); + } } #define ErtsDistOutputBuf2Binary(OB) \ @@ -627,10 +685,10 @@ alloc_dist_obuf(Uint size) ErtsDistOutputBuf *obuf; Uint obuf_size = sizeof(ErtsDistOutputBuf)+sizeof(byte)*(size-1); Binary *bin = erts_bin_drv_alloc(obuf_size); - erts_refc_init(&bin->refc, 1); obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[0]; #ifdef DEBUG obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN; + obuf->alloc_endp = obuf->data + size; ASSERT(bin == ErtsDistOutputBuf2Binary(obuf)); #endif return obuf; @@ -641,8 +699,7 @@ free_dist_obuf(ErtsDistOutputBuf *obuf) { Binary *bin = ErtsDistOutputBuf2Binary(obuf); ASSERT(obuf->dbg_pattern == ERTS_DIST_OUTPUT_BUF_DBG_PATTERN); - if (erts_refc_dectest(&bin->refc, 0) == 0) - erts_bin_free(bin); + erts_bin_release(bin); } static ERTS_INLINE Sint @@ -652,26 +709,11 @@ size_obuf(ErtsDistOutputBuf *obuf) return bin->orig_size; } -static void clear_dist_entry(DistEntry *dep) +static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) { - Sint obufsize = 0; - ErtsAtomCache *cache; - ErtsProcList *suspendees; ErtsDistOutputBuf *obuf; - erts_smp_de_rwlock(dep); - cache = dep->cache; - dep->cache = NULL; - -#ifdef DEBUG - erts_smp_de_links_lock(dep); - ASSERT(!dep->nlinks); - ASSERT(!dep->node_links); - ASSERT(!dep->monitors); - erts_smp_de_links_unlock(dep); -#endif - - erts_smp_mtx_lock(&dep->qlock); + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); if (!dep->out_queue.last) obuf = dep->finalized_out_queue.first; @@ -680,21 +722,24 @@ static void clear_dist_entry(DistEntry *dep) obuf = dep->out_queue.first; } + if (dep->tmp_out_queue.first) { + dep->tmp_out_queue.last->next = obuf; + obuf = dep->tmp_out_queue.first; + } + dep->out_queue.first = NULL; dep->out_queue.last = NULL; + dep->tmp_out_queue.first = NULL; + dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; - dep->status = 0; - suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); - erts_smp_mtx_unlock(&dep->qlock); - erts_smp_atomic_set_nob(&dep->dist_cmd_scheduled, 0); - dep->send = NULL; - erts_smp_de_rwunlock(dep); - - erts_resume_processes(suspendees); + return obuf; +} - delete_cache(cache); +static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf) +{ + Sint obufsize = 0; while (obuf) { ErtsDistOutputBuf *fobuf; @@ -705,14 +750,15 @@ static void clear_dist_entry(DistEntry *dep) } if (obufsize) { - erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize >= obufsize); - dep->qsize -= obufsize; - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_lock(&dep->qlock); + ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); + erts_atomic_add_nob(&dep->qsize, + (erts_aint_t) -obufsize); + erts_mtx_unlock(&dep->qlock); } } -void erts_dsend_context_dtor(Binary* ctx_bin) +int erts_dsend_context_dtor(Binary* ctx_bin) { ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin); switch (ctx->dss.phase) { @@ -727,8 +773,10 @@ void erts_dsend_context_dtor(Binary* ctx_bin) if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) { free_dist_obuf(ctx->dss.obuf); } - if (ctx->dep_to_deref) - erts_deref_dist_entry(ctx->dep_to_deref); + if (ctx->deref_dep) + erts_deref_dist_entry(ctx->dep); + + return 1; } Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx) @@ -740,7 +788,7 @@ Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx) Binary* ctx_bin = erts_create_magic_binary(sizeof(struct exported_ctx), erts_dsend_context_dtor); struct exported_ctx* dst = ERTS_MAGIC_BIN_DATA(ctx_bin); - Eterm* hp = HAlloc(p, PROC_BIN_SIZE); + Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE); sys_memcpy(&dst->ctx, ctx, sizeof(ErtsSendContext)); ASSERT(ctx->dss.ctl == make_tuple(ctx->ctl_heap)); @@ -749,7 +797,7 @@ Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx) sys_memcpy(&dst->acm, ctx->dss.acmp, sizeof(ErtsAtomCacheMap)); dst->ctx.dss.acmp = &dst->acm; } - return erts_mk_magic_binary_term(&hp, &MSO(p), ctx_bin); + return erts_mk_magic_ref(&hp, &MSO(p), ctx_bin); } @@ -794,9 +842,8 @@ erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote) } -/* A local process that's beeing monitored by a remote one exits. We send: - {DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason}, - which is rather sad as only the ref is needed, no pid's... */ +/* A local process that's being monitored by a remote one exits. We send: + {DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason} */ int erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, Eterm ref, Eterm reason) @@ -805,17 +852,18 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, DeclareTmpHeapNoproc(ctl_heap,6); int res; + if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + /* + * Receiver does not support DOP_MONITOR_P_EXIT (see dsig_send_monitor) + */ + return ERTS_DSIG_SEND_OK; + } + UseTmpHeapNoproc(6); ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT), watched, watcher, ref, reason); -#ifdef DEBUG - erts_smp_de_links_lock(dsdp->dep); - ASSERT(!erts_lookup_monitor(dsdp->dep->monitors, ref)); - erts_smp_de_links_unlock(dsdp->dep); -#endif - res = dsig_send_ctl(dsdp, ctl, 1); UnUseTmpHeapNoproc(6); return res; @@ -832,6 +880,16 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, DeclareTmpHeapNoproc(ctl_heap,5); int res; + if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + /* + * Receiver does not support DOP_MONITOR_P. + * Just avoid sending it and by doing that reduce this monitor + * to only supervise the connection. This will work for simple c-nodes + * with a 1-to-1 relation between "Erlang process" and OS-process. + */ + return ERTS_DSIG_SEND_OK; + } + UseTmpHeapNoproc(5); ctl = TUPLE4(&ctl_heap[0], make_small(DOP_MONITOR_P), @@ -844,8 +902,7 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, /* A local process monitoring a remote one wants to stop monitoring, either because of a demonitor bif call or because the local process died. We send - {DOP_DEMONITOR_P, Local pid, Remote pid or name, ref}, which is once again - rather redundant as only the ref will be needed on the other side... */ + {DOP_DEMONITOR_P, Local pid, Remote pid or name, ref} */ int erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, Eterm ref, int force) @@ -854,6 +911,13 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher, DeclareTmpHeapNoproc(ctl_heap,5); int res; + if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) { + /* + * Receiver does not support DOP_DEMONITOR_P (see dsig_send_monitor) + */ + return ERTS_DSIG_SEND_OK; + } + UseTmpHeapNoproc(5); ctl = TUPLE4(&ctl_heap[0], make_small(DOP_DEMONITOR_P), @@ -864,6 +928,24 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher, return res; } +static int can_send_seqtrace_token(ErtsSendContext* ctx, Eterm token) { + Eterm label; + + if (ctx->dsd.flags & DFLAG_BIG_SEQTRACE_LABELS) { + /* The other end is capable of handling arbitrary seq_trace labels. */ + return 1; + } + + /* The other end only tolerates smalls, but since we could potentially be + * talking to an old 32-bit emulator from a 64-bit one, we have to check + * whether the label is small on any emulator. */ + label = SEQ_TRACE_T_LABEL(token); + + return is_small(label) && + signed_val(label) <= (ERTS_SINT32_MAX >> _TAG_IMMED1_SIZE) && + signed_val(label) >= (ERTS_SINT32_MIN >> _TAG_IMMED1_SIZE); +} + int erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) { @@ -897,18 +979,38 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) "%T", remote); msize = size_object(message); if (have_seqtrace(token)) { - tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); + tok_label = SEQ_TRACE_T_DTRACE_LABEL(token); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); } } #endif - if (token != NIL) - ctl = TUPLE4(&ctx->ctl_heap[0], - make_small(DOP_SEND_TT), am_Empty, remote, token); - else - ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_SEND), am_Empty, remote); + { + Eterm dist_op, sender_id; + int send_token; + + send_token = (token != NIL && can_send_seqtrace_token(ctx, token)); + + if (ctx->dsd.flags & DFLAG_SEND_SENDER) { + dist_op = make_small(send_token ? + DOP_SEND_SENDER_TT : + DOP_SEND_SENDER); + sender_id = sender->common.id; + } else { + dist_op = make_small(send_token ? + DOP_SEND_TT : + DOP_SEND); + sender_id = am_Empty; + } + + if (send_token) { + ctl = TUPLE4(&ctx->ctl_heap[0], dist_op, sender_id, remote, token); + } else { + ctl = TUPLE3(&ctx->ctl_heap[0], dist_op, sender_id, remote); + } + } + DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); DTRACE7(message_send_remote, sender_name, node_name, receiver_name, @@ -954,19 +1056,20 @@ erts_dsig_send_reg_msg(Eterm remote_name, Eterm message, "{%T,%s}", remote_name, node_name); msize = size_object(message); if (have_seqtrace(token)) { - tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); + tok_label = SEQ_TRACE_T_DTRACE_LABEL(token); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); } } #endif - if (token != NIL) + if (token != NIL && can_send_seqtrace_token(ctx, token)) ctl = TUPLE5(&ctx->ctl_heap[0], make_small(DOP_REG_SEND_TT), sender->common.id, am_Empty, remote_name, token); else ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_REG_SEND), sender->common.id, am_Empty, remote_name); + DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); DTRACE7(message_send_remote, sender_name, node_name, receiver_name, @@ -1018,7 +1121,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, erts_snprintf(reason_str, sizeof(DTRACE_CHARBUF_NAME(reason_str)), "%T", reason); if (have_seqtrace(token)) { - tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); + tok_label = SEQ_TRACE_T_DTRACE_LABEL(token); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); } @@ -1088,21 +1191,8 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote) #include <valgrind/valgrind.h> #include <valgrind/memcheck.h> -#ifndef HAVE_VALGRIND_PRINTF_XML -#define VALGRIND_PRINTF_XML VALGRIND_PRINTF -#endif - # define PURIFY_MSG(msg) \ - do { \ - char buf__[1]; size_t bufsz__ = sizeof(buf__); \ - if (erts_sys_getenv_raw("VALGRIND_LOG_XML", buf__, &bufsz__) >= 0) { \ - VALGRIND_PRINTF_XML("<erlang_error_log>" \ - "%s, line %d: %s</erlang_error_log>\n", \ - __FILE__, __LINE__, msg); \ - } else { \ - VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg); \ - } \ - } while (0) + VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg) #else # define PURIFY_MSG(msg) #endif @@ -1119,13 +1209,13 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote) int erts_net_message(Port *prt, DistEntry *dep, + Uint32 conn_id, byte *hbuf, ErlDrvSizeT hlen, byte *buf, ErlDrvSizeT len) { ErtsDistExternal ede; - byte *t; Sint ctl_len; Eterm arg; Eterm from, to; @@ -1141,8 +1231,6 @@ int erts_net_message(Port *prt, Sint type; Eterm token; Eterm token_size; - ErtsMonitor *mon; - ErtsLink *lnk; Uint tuple_arity; int res; #ifdef ERTS_DIST_MSG_DBG @@ -1151,16 +1239,17 @@ int erts_net_message(Port *prt, UseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); - ERTS_SMP_CHK_NO_PROC_LOCKS; + ERTS_CHK_NO_PROC_LOCKS; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_LC_ASSERT(!prt || erts_lc_is_port_locked(prt)); if (!erts_is_alive) { UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); return 0; } - if (hlen != 0) - goto data_error; + + ASSERT(hlen == 0); + if (len == 0) { /* HANDLE TICK !!! */ UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); return 0; @@ -1171,38 +1260,31 @@ int erts_net_message(Port *prt, bw(buf, len); #endif - if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) - t = buf; - else { - /* Skip PASS_THROUGH */ - t = buf+1; - len--; - } + res = erts_prepare_dist_ext(&ede, buf, len, dep, conn_id, dep->cache); - if (len == 0) { - PURIFY_MSG("data error"); - goto data_error; - } - - res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache); - - if (res >= 0) - res = ctl_len = erts_decode_dist_ext_size(&ede); - else { + switch (res) { + case ERTS_PREP_DIST_EXT_CLOSED: + return 0; /* Connection not alive; ignore signal... */ + case ERTS_PREP_DIST_EXT_FAILED: #ifdef ERTS_DIST_MSG_DBG erts_fprintf(stderr, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n"); bw(buf, orig_len); #endif - ctl_len = 0; - } - - if (res < 0) { + goto data_error; + case ERTS_PREP_DIST_EXT_SUCCESS: + ctl_len = erts_decode_dist_ext_size(&ede); + if (ctl_len < 0) { #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n"); - bw(buf, orig_len); + erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n"); + bw(buf, orig_len); #endif - PURIFY_MSG("data error"); - goto data_error; + PURIFY_MSG("data error"); + goto data_error; + } + break; + default: + ERTS_INTERNAL_ERROR("Unexpected result from erts_prepare_dist_ext()"); + break; } if (ctl_len > DIST_CTL_DEFAULT_SIZE) { @@ -1220,10 +1302,9 @@ int erts_net_message(Port *prt, PURIFY_MSG("data error"); goto decode_error; } - ctl_len = t - buf; #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, "<<%s CTL: %T\n", len != orig_len ? "P" : " ", arg); + erts_fprintf(stderr, "<< CTL: %T\n", arg); #endif if (is_not_tuple(arg) || @@ -1233,90 +1314,92 @@ int erts_net_message(Port *prt, } token_size = 0; + token = NIL; switch (type = unsigned_val(tuple[1])) { - case DOP_LINK: + case DOP_LINK: { + ErtsDSigData dsd; + int code; + if (tuple_arity != 3) { goto invalid_message; } from = tuple[2]; to = tuple[3]; /* local proc to link to */ - if (is_not_pid(from) || is_not_pid(to)) { - goto invalid_message; - } + if (is_not_external_pid(from)) + goto invalid_message; - rp = erts_pid2proc_opt(NULL, 0, - to, ERTS_PROC_LOCK_LINK, - ERTS_P2P_FLG_ALLOW_OTHER_X); - if (!rp) { - /* This is tricky (we MUST force a distributed send) */ - ErtsDSigData dsd; - int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { - code = erts_dsig_send_exit(&dsd, to, from, am_noproc); - ASSERT(code == ERTS_DSIG_SEND_OK); - } - break; - } + if (dep != external_pid_dist_entry(from)) + goto invalid_message; - erts_smp_de_links_lock(dep); - res = erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, from); + if (is_external_pid(to)) { + if (external_pid_dist_entry(to) != erts_this_dist_entry) + goto invalid_message; + /* old incarnation of node; reply noproc... */ + } + else if (is_internal_pid(to)) { + ErtsLinkData *ldp = erts_link_create(ERTS_LNK_TYPE_DIST_PROC, + from, to); + ASSERT(ldp->a.other.item == to); + ASSERT(eq(ldp->b.other.item, from)); +#ifdef DEBUG + code = +#endif + erts_link_dist_insert(&ldp->a, dep->mld); + ASSERT(code); - if (res < 0) { - /* It was already there! Lets skip the rest... */ - erts_smp_de_links_unlock(dep); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - break; - } - lnk = erts_add_or_lookup_link(&(dep->nlinks), LINK_PID, rp->common.id); - erts_add_link(&(ERTS_LINK_ROOT(lnk)), LINK_PID, from); - erts_smp_de_links_unlock(dep); + if (erts_proc_sig_send_link(NULL, to, &ldp->b)) + break; /* done */ + + /* Failed to send signal; cleanup and reply noproc... */ + +#ifdef DEBUG + code = +#endif + erts_link_dist_delete(&ldp->a); + ASSERT(code); + erts_link_release_both(ldp); + } - if (IS_TRACED_FL(rp, F_TRACE_PROCS)) - trace_proc(NULL, 0, rp, am_getting_linked, from); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED) { + code = erts_dsig_send_exit(&dsd, to, from, am_noproc); + ASSERT(code == ERTS_DSIG_SEND_OK); + } - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); break; + } case DOP_UNLINK: { - ErtsDistLinkData dld; if (tuple_arity != 3) { goto invalid_message; } from = tuple[2]; to = tuple[3]; - if (is_not_pid(from) || is_not_pid(to)) { + if (is_not_external_pid(from)) + goto invalid_message; + if (dep != external_pid_dist_entry(from)) goto invalid_message; - } - - rp = erts_pid2proc_opt(NULL, 0, - to, ERTS_PROC_LOCK_LINK, - ERTS_P2P_FLG_ALLOW_OTHER_X); - if (!rp) - break; - - lnk = erts_remove_link(&ERTS_P_LINKS(rp), from); - if (IS_TRACED_FL(rp, F_TRACE_PROCS) && lnk != NULL) { - trace_proc(NULL, 0, rp, am_getting_unlinked, from); - } + if (is_external_pid(to) + && erts_this_dist_entry == external_pid_dist_entry(from)) + break; - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + if (is_not_internal_pid(to)) + goto invalid_message; - erts_remove_dist_link(&dld, to, from, dep); - erts_destroy_dist_link(&dld); - if (lnk) - erts_destroy_link(lnk); + erts_proc_sig_send_dist_unlink(dep, from, to); break; } case DOP_MONITOR_P: { /* A remote process wants to monitor us, we get: {DOP_MONITOR_P, Remote pid, local pid or name, ref} */ - Eterm name; - + Eterm pid, name; + ErtsDSigData dsd; + int code; + if (tuple_arity != 4) { goto invalid_message; } @@ -1325,44 +1408,58 @@ int erts_net_message(Port *prt, watched = tuple[3]; /* local proc to monitor */ ref = tuple[4]; - if (is_not_ref(ref)) { + if (is_not_external_pid(watcher)) + goto invalid_message; + else if (external_pid_dist_entry(watcher) != dep) + goto invalid_message; + + if (is_not_ref(ref)) goto invalid_message; - } - if (is_atom(watched)) { - name = watched; - rp = erts_whereis_process(NULL, 0, - watched, ERTS_PROC_LOCK_LINK, - ERTS_P2P_FLG_ALLOW_OTHER_X); - } - else { - name = NIL; - rp = erts_pid2proc_opt(NULL, 0, - watched, ERTS_PROC_LOCK_LINK, - ERTS_P2P_FLG_ALLOW_OTHER_X); - } + if (is_internal_pid(watched)) { + name = NIL; + pid = watched; + } + else if (is_atom(watched)) { + name = watched; + pid = erts_whereis_name_to_id(NULL, watched); + /* if port or undefined; reply noproc... */ + } + else if (is_external_pid(watched) + && external_pid_dist_entry(watched) == erts_this_dist_entry) { + name = NIL; + pid = am_undefined; /* old incarnation; reply noproc... */ + } + else + goto invalid_message; - if (!rp) { - ErtsDSigData dsd; - int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { - code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, - am_noproc); - ASSERT(code == ERTS_DSIG_SEND_OK); - } - } - else { - if (is_atom(watched)) - watched = rp->common.id; - erts_smp_de_links_lock(dep); - erts_add_monitor(&(dep->monitors), MON_ORIGIN, ref, watched, name); - erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, ref, watcher, name); - erts_smp_de_links_unlock(dep); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - } + if (is_internal_pid(pid)) { + ErtsMonitorData *mdp; + mdp = erts_monitor_create(ERTS_MON_TYPE_DIST_PROC, + ref, watcher, pid, name); - break; + code = erts_monitor_dist_insert(&mdp->origin, dep->mld); + ASSERT(code); (void)code; + + if (erts_proc_sig_send_monitor(&mdp->target, pid)) + break; /* done */ + + /* Failed to send to local proc; cleanup reply noproc... */ + + code = erts_monitor_dist_delete(&mdp->origin); + ASSERT(code); (void)code; + erts_monitor_release_both(mdp); + + } + + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED) { + code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, + am_noproc); + ASSERT(code == ERTS_DSIG_SEND_OK); + } + + break; } case DOP_DEMONITOR_P: @@ -1373,36 +1470,43 @@ int erts_net_message(Port *prt, if (tuple_arity != 4) { goto invalid_message; } - /* watcher = tuple[2]; */ - /* watched = tuple[3]; May be an atom in case of monitor name */ + + watcher = tuple[2]; + watched = tuple[3]; ref = tuple[4]; - if(is_not_ref(ref)) { + if (is_not_ref(ref)) { goto invalid_message; } - erts_smp_de_links_lock(dep); - mon = erts_remove_monitor(&(dep->monitors),ref); - erts_smp_de_links_unlock(dep); - /* ASSERT(mon != NULL); can happen in case of broken dist message */ - if (mon == NULL) { - break; - } - watched = mon->pid; - erts_destroy_monitor(mon); - rp = erts_pid2proc_opt(NULL, 0, - watched, ERTS_PROC_LOCK_LINK, - ERTS_P2P_FLG_ALLOW_OTHER_X); - if (!rp) { - break; - } - mon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - ASSERT(mon != NULL); - if (mon == NULL) { - break; - } - erts_destroy_monitor(mon); + if (is_not_external_pid(watcher) || external_pid_dist_entry(watcher) != dep) + goto invalid_message; + + if (is_internal_pid(watched)) + erts_proc_sig_send_dist_demonitor(watched, ref); + else if (is_external_pid(watched) + && external_pid_dist_entry(watched) == erts_this_dist_entry) { + /* old incarnation; ignore it */ + ; + } + else if (is_atom(watched)) { + ErtsMonLnkDist *mld = dep->mld; + ErtsMonitor *mon; + + erts_mtx_lock(&mld->mtx); + + mon = erts_monitor_tree_lookup(mld->orig_name_monitors, ref); + if (mon) + erts_monitor_tree_delete(&mld->orig_name_monitors, mon); + + erts_mtx_unlock(&mld->mtx); + + if (mon) + erts_proc_sig_send_demonitor(mon); + } + else + goto invalid_message; + break; case DOP_REG_SEND_TT: @@ -1458,42 +1562,56 @@ int erts_net_message(Port *prt, erts_queue_dist_message(rp, locks, ede_copy, token, from); if (locks) - erts_smp_proc_unlock(rp, locks); + erts_proc_unlock(rp, locks); } break; + case DOP_SEND_SENDER_TT: { + Uint xsize; case DOP_SEND_TT: + if (tuple_arity != 4) { goto invalid_message; } - - token_size = size_object(tuple[4]); - /* Fall through ... */ + + token = tuple[4]; + token_size = size_object(token); + xsize = ERTS_HEAP_FRAG_SIZE(token_size); + goto send_common; + + case DOP_SEND_SENDER: case DOP_SEND: + + token = NIL; + xsize = 0; + if (tuple_arity != 3) + goto invalid_message; + + send_common: + /* - * There is intentionally no testing of the cookie (it is always '') - * from R9B and onwards. + * If DOP_SEND_SENDER or DOP_SEND_SENDER_TT element 2 contains + * the sender pid (i.e. DFLAG_SEND_SENDER is set); otherwise, + * the atom '' (empty cookie). */ + ASSERT((type == DOP_SEND_SENDER || type == DOP_SEND_SENDER_TT) + ? (is_pid(tuple[2]) && (dep->flags & DFLAG_SEND_SENDER)) + : tuple[2] == am_Empty); + #ifdef ERTS_DIST_MSG_DBG dist_msg_dbg(&ede, "MSG", buf, orig_len); #endif - if (type != DOP_SEND_TT && tuple_arity != 3) { - goto invalid_message; - } to = tuple[3]; if (is_not_pid(to)) { goto invalid_message; } rp = erts_proc_lookup(to); if (rp) { - Uint xsize = type == DOP_SEND ? 0 : ERTS_HEAP_FRAG_SIZE(token_size); ErtsProcLocks locks = 0; ErtsDistExternal *ede_copy; ede_copy = erts_make_dist_ext_copy(&ede, xsize); - if (type == DOP_SEND) { - token = NIL; - } else { + if (is_not_nil(token)) { ErlHeapFragment *heap_frag; ErlOffHeap *ohp; ASSERT(xsize); @@ -1501,82 +1619,50 @@ int erts_net_message(Port *prt, ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size); hp = heap_frag->mem; ohp = &heap_frag->off_heap; - token = tuple[4]; token = copy_struct(token, token_size, &hp, ohp); } - erts_queue_dist_message(rp, locks, ede_copy, token, tuple[2]); + erts_queue_dist_message(rp, locks, ede_copy, token, am_Empty); if (locks) - erts_smp_proc_unlock(rp, locks); + erts_proc_unlock(rp, locks); } break; + } case DOP_MONITOR_P_EXIT: { /* We are monitoring a process on the remote node which dies, we get {DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */ - - DeclareTmpHeapNoproc(lhp,3); - Eterm sysname; - ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_MSG_SEND|ERTS_PROC_LOCK_LINK; - if (tuple_arity != 5) { goto invalid_message; } - /* watched = tuple[2]; */ /* remote proc which died */ - /* watcher = tuple[3]; */ + watched = tuple[2]; /* remote proc or name which died */ + watcher = tuple[3]; ref = tuple[4]; reason = tuple[5]; - if(is_not_ref(ref)) { + if (is_not_ref(ref)) goto invalid_message; - } - erts_smp_de_links_lock(dep); - sysname = dep->sysname; - mon = erts_remove_monitor(&(dep->monitors), ref); - /* - * If demonitor was performed at the same time as the - * monitored process exits, monitoring side will have - * removed info about monitor. In this case, do nothing - * and everything will be as it should. - */ - erts_smp_de_links_unlock(dep); - if (mon == NULL) { - break; - } - rp = erts_pid2proc(NULL, 0, mon->pid, rp_locks); + if (is_not_external_pid(watched) && is_not_atom(watched)) + goto invalid_message; - erts_destroy_monitor(mon); - if (rp == NULL) { - break; - } - - mon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref); + if (is_not_internal_pid(watcher)) { + if (!is_external_pid(watcher)) + goto invalid_message; + if (erts_this_dist_entry == external_pid_dist_entry(watcher)) + break; + goto invalid_message; + } - if (mon == NULL) { - erts_smp_proc_unlock(rp, rp_locks); - break; - } - UseTmpHeapNoproc(3); - - watched = (is_not_nil(mon->name) - ? TUPLE2(&lhp[0], mon->name, sysname) - : mon->pid); - - erts_queue_monitor_message(rp, &rp_locks, - ref, am_process, watched, reason); - erts_smp_proc_unlock(rp, rp_locks); - erts_destroy_monitor(mon); - UnUseTmpHeapNoproc(3); + erts_proc_sig_send_dist_monitor_down(dep, ref, watched, + watcher, reason); break; } case DOP_EXIT_TT: case DOP_EXIT: { - ErtsDistLinkData dld; - ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND; /* 'from', which 'to' is linked to, died */ if (type == DOP_EXIT) { if (tuple_arity != 4) { @@ -1596,56 +1682,19 @@ int erts_net_message(Port *prt, token = tuple[4]; reason = tuple[5]; } - if (is_not_pid(from) || is_not_internal_pid(to)) { + if (is_not_external_pid(from) + || dep != external_pid_dist_entry(from) + || is_not_internal_pid(to)) { goto invalid_message; } - rp = erts_pid2proc(NULL, 0, to, rp_locks); - if (!rp) - lnk = NULL; - else { - lnk = erts_remove_link(&ERTS_P_LINKS(rp), from); - - /* If lnk == NULL, we have unlinked on this side, i.e. - * ignore exit. - */ - if (lnk) { - int xres; -#if 0 - /* Arndt: Maybe it should never be 'kill', but it can be, - namely when a linked process does exit(kill). Until we know - whether that is incorrect and what should happen instead, - we leave the assertion out. */ - ASSERT(reason != am_kill); /* should never be kill (killed) */ -#endif - xres = erts_send_exit_signal(NULL, - from, - rp, - &rp_locks, - reason, - token, - NULL, - ERTS_XSIG_FLG_IGN_KILL); - if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) { - /* We didn't exit the process and it is traced */ - if (rp_locks & ERTS_PROC_LOCKS_XSIG_SEND) { - erts_smp_proc_unlock(rp, ERTS_PROC_LOCKS_XSIG_SEND); - rp_locks &= ~ERTS_PROC_LOCKS_XSIG_SEND; - } - trace_proc(NULL, 0, rp, am_getting_unlinked, from); - } - } - erts_smp_proc_unlock(rp, rp_locks); - } - erts_remove_dist_link(&dld, to, from, dep); - if (lnk) - erts_destroy_link(lnk); - erts_destroy_dist_link(&dld); + erts_proc_sig_send_dist_link_exit(dep, + from, to, + reason, token); break; } case DOP_EXIT2_TT: - case DOP_EXIT2: { - ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; + case DOP_EXIT2: /* 'from' is send an exit signal to 'to' */ if (type == DOP_EXIT2) { if (tuple_arity != 4) { @@ -1667,20 +1716,10 @@ int erts_net_message(Port *prt, if (is_not_pid(from) || is_not_internal_pid(to)) { goto invalid_message; } - rp = erts_pid2proc(NULL, 0, to, rp_locks); - if (rp) { - (void) erts_send_exit_signal(NULL, - from, - rp, - &rp_locks, - reason, - token, - NULL, - 0); - erts_smp_proc_unlock(rp, rp_locks); - } + + erts_proc_sig_send_exit(NULL, from, to, reason, token, 0); break; - } + case DOP_GROUP_LEADER: if (tuple_arity != 3) { goto invalid_message; @@ -1691,11 +1730,7 @@ int erts_net_message(Port *prt, goto invalid_message; } - rp = erts_pid2proc(NULL, 0, to, ERTS_PROC_LOCK_MAIN); - if (!rp) - break; - rp->group_leader = STORE_NC_IN_PROC(rp, from); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); + (void) erts_proc_sig_send_group_leader(NULL, to, from, NIL); break; default: @@ -1707,7 +1742,7 @@ int erts_net_message(Port *prt, erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl); } UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); - ERTS_SMP_CHK_NO_PROC_LOCKS; + ERTS_CHK_NO_PROC_LOCKS; return 0; invalid_message: { @@ -1723,8 +1758,8 @@ decode_error: } data_error: UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); - erts_deliver_port_exit(prt, dep->cid, am_killed, 0, 1); - ERTS_SMP_CHK_NO_PROC_LOCKS; + erts_kill_dist_connection(dep, conn_id); + ERTS_CHK_NO_PROC_LOCKS; return -1; } @@ -1744,6 +1779,31 @@ static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy) return ret; } +static ERTS_INLINE void +notify_dist_data(Process *c_p, Eterm pid) +{ + Process *rp; + ErtsProcLocks rp_locks; + + ASSERT(erts_get_scheduler_data() + && !ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data())); + ASSERT(is_internal_pid(pid)); + + if (c_p && c_p->common.id == pid) { + rp = c_p; + rp_locks = ERTS_PROC_LOCK_MAIN; + } + else { + rp = erts_proc_lookup(pid); + rp_locks = 0; + } + + if (rp) { + ErtsMessage *mp = erts_alloc_message(0, NULL); + erts_queue_message(rp, rp_locks, mp, am_dist_data, am_system); + } +} + int erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) { @@ -1754,13 +1814,13 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) while (1) { switch (ctx->phase) { case ERTS_DSIG_SEND_PHASE_INIT: - ctx->flags = dsdp->dep->flags; + ctx->flags = dsdp->flags; ctx->c_p = dsdp->proc; if (!ctx->c_p || dsdp->no_suspend) ctx->force_busy = 1; - ERTS_SMP_LC_ASSERT(!ctx->c_p + ERTS_LC_ASSERT(!ctx->c_p || (ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(ctx->c_p))); @@ -1769,33 +1829,32 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) { ctx->acmp = erts_get_atom_cache_map(ctx->c_p); - ctx->pass_through_size = 0; + ctx->max_finalize_prepend = 0; } else { ctx->acmp = NULL; - ctx->pass_through_size = 1; + ctx->max_finalize_prepend = 3; } #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, ">>%s CTL: %T\n", ctx->pass_through_size ? "P" : " ", ctx->ctl); - if (is_value(ctx->msg)) - erts_fprintf(stderr, " MSG: %T\n", ctx->msg); + erts_fprintf(stderr, ">> CTL: %T\n", ctx->ctl); + if (is_value(ctx->msg)) + erts_fprintf(stderr, " MSG: %T\n", ctx->msg); #endif - ctx->data_size = ctx->pass_through_size; + ctx->data_size = ctx->max_finalize_prepend; erts_reset_atom_cache_map(ctx->acmp); erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size); - if (is_value(ctx->msg)) { - ctx->u.sc.wstack.wstart = NULL; - ctx->u.sc.flags = ctx->flags; - ctx->u.sc.level = 0; - ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; - } else { - ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; - } - break; + if (is_non_value(ctx->msg)) { + ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; + break; + } + ctx->u.sc.wstack.wstart = NULL; + ctx->u.sc.flags = ctx->flags; + ctx->u.sc.level = 0; + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; case ERTS_DSIG_SEND_PHASE_MSG_SIZE: if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) { retval = ERTS_DSIG_SEND_CONTINUE; @@ -1810,23 +1869,25 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->data_size += ctx->dhdr_ext_size; ctx->obuf = alloc_dist_obuf(ctx->data_size); - ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->pass_through_size + ctx->dhdr_ext_size; + ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size; /* Encode internal version of dist header */ ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp); /* Encode control message */ erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL); - if (is_value(ctx->msg)) { - ctx->u.ec.flags = ctx->flags; - ctx->u.ec.level = 0; - ctx->u.ec.wstack.wstart = NULL; - ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; - } else { - ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; - } - break; - - case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: + if (is_non_value(ctx->msg)) { + ctx->obuf->msg_start = NULL; + ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; + break; + } + ctx->u.ec.flags = ctx->flags; + ctx->u.ec.hopefull_flags = 0; + ctx->u.ec.level = 0; + ctx->u.ec.wstack.wstart = NULL; + ctx->obuf->msg_start = ctx->obuf->ext_endp; + + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; + case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) { retval = ERTS_DSIG_SEND_CONTINUE; goto done; @@ -1839,38 +1900,59 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) int resume = 0; ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp); - ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->pass_through_size); + ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->max_finalize_prepend); ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size); ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp; + ctx->obuf->hopefull_flags = ctx->u.ec.hopefull_flags; /* * Signal encoded; now verify that the connection still exists, * and if so enqueue the signal and schedule it for send. */ ctx->obuf->next = NULL; - erts_smp_de_rlock(dep); + erts_de_rlock(dep); cid = dep->cid; - if (cid != dsdp->cid - || dep->connection_id != dsdp->connection_id - || dep->status & ERTS_DE_SFLG_EXITING) { + if (dep->state == ERTS_DE_STATE_EXITING + || dep->state == ERTS_DE_STATE_IDLE + || dep->connection_id != dsdp->connection_id) { /* Not the same connection as when we started; drop message... */ - erts_smp_de_runlock(dep); + erts_de_runlock(dep); free_dist_obuf(ctx->obuf); } else { + Sint qsize; + erts_aint32_t qflgs; ErtsProcList *plp = NULL; - erts_smp_mtx_lock(&dep->qlock); - dep->qsize += size_obuf(ctx->obuf); - if (dep->qsize >= erts_dist_buf_busy_limit) - dep->qflgs |= ERTS_DE_QFLG_BUSY; - if (!ctx->force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) { - erts_smp_mtx_unlock(&dep->qlock); + Eterm notify_proc = NIL; + Sint obsz = size_obuf(ctx->obuf); + + erts_mtx_lock(&dep->qlock); + qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) obsz); + ASSERT(qsize >= obsz); + qflgs = erts_atomic32_read_nob(&dep->qflgs); + if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) { + erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY); + qflgs |= ERTS_DE_QFLG_BUSY; + } + if (qsize == obsz && (qflgs & ERTS_DE_QFLG_REQ_INFO)) { + /* Previously empty queue and info requested... */ + qflgs = erts_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + notify_proc = dep->cid; + ASSERT(is_internal_pid(notify_proc)); + } + /* else: requester will send itself the message... */ + qflgs &= ~ERTS_DE_QFLG_REQ_INFO; + } + if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) { + erts_mtx_unlock(&dep->qlock); plp = erts_proclist_create(ctx->c_p); erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL); suspended = 1; - erts_smp_mtx_lock(&dep->qlock); + erts_mtx_lock(&dep->qlock); } /* Enqueue obuf on dist entry */ @@ -1881,7 +1963,8 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) dep->out_queue.last = ctx->obuf; if (!ctx->force_busy) { - if (!(dep->qflgs & ERTS_DE_QFLG_BUSY)) { + qflgs = erts_atomic32_read_nob(&dep->qflgs); + if (!(qflgs & ERTS_DE_QFLG_BUSY)) { if (suspended) resume = 1; /* was busy when we started, but isn't now */ #ifdef USE_VM_PROBES @@ -1905,9 +1988,17 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } } - erts_smp_mtx_unlock(&dep->qlock); - erts_schedule_dist_command(NULL, dep); - erts_smp_de_runlock(dep); + erts_mtx_unlock(&dep->qlock); + if (dep->state != ERTS_DE_STATE_PENDING) { + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + } + else { + notify_proc = NIL; + } + erts_de_runlock(dep); + if (is_internal_pid(notify_proc)) + notify_dist_data(ctx->c_p, notify_proc); if (resume) { erts_resume(ctx->c_p, ERTS_PROC_LOCK_MAIN); @@ -1961,35 +2052,39 @@ static Uint dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) { int fpe_was_unmasked; - Uint size = obuf->ext_endp - obuf->extp; + ErlDrvSizeT size; + char *bufp; - ERTS_SMP_CHK_NO_PROC_LOCKS; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_CHK_NO_PROC_LOCKS; + ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (size > (Uint) INT_MAX) - erts_exit(ERTS_DUMP_EXIT, - "Absurdly large distribution output data buffer " - "(%beu bytes) passed.\n", - size); + if (!obuf) { + size = 0; + bufp = NULL; + } + else { + size = obuf->ext_endp - obuf->extp; + bufp = (char*) obuf->extp; + } #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_output)) { + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), - "%T", prt->dist_entry->sysname); + "%T", dep->sysname); DTRACE4(dist_output, erts_this_node_sysname, port_str, remote_str, size); } #endif + prt->caller = NIL; fpe_was_unmasked = erts_block_fpe(); - (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, - (char*) obuf->extp, - (int) size); + (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, bufp, size); erts_unblock_fpe(fpe_was_unmasked); return size; } @@ -1998,44 +2093,53 @@ static Uint dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) { int fpe_was_unmasked; - Uint size = obuf->ext_endp - obuf->extp; + ErlDrvSizeT size; SysIOVec iov[2]; ErlDrvBinary* bv[2]; ErlIOVec eiov; - ERTS_SMP_CHK_NO_PROC_LOCKS; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - - if (size > (Uint) INT_MAX) - erts_exit(ERTS_DUMP_EXIT, - "Absurdly large distribution output data buffer " - "(%beu bytes) passed.\n", - size); + ERTS_CHK_NO_PROC_LOCKS; + ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); iov[0].iov_base = NULL; iov[0].iov_len = 0; bv[0] = NULL; - iov[1].iov_base = obuf->extp; - iov[1].iov_len = size; - bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); + if (!obuf) { + size = 0; + eiov.vsize = 1; + } + else { + size = obuf->ext_endp - obuf->extp; + eiov.vsize = 2; + + iov[1].iov_base = obuf->extp; + iov[1].iov_len = size; + bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); + } - eiov.vsize = 2; eiov.size = size; eiov.iov = iov; eiov.binv = bv; + if (size > (Uint) INT_MAX) + erts_exit(ERTS_DUMP_EXIT, + "Absurdly large distribution output data buffer " + "(%beu bytes) passed.\n", + size); + ASSERT(prt->drv_ptr->outputv); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_outputv)) { + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), - "%T", prt->dist_entry->sysname); + "%T", dep->sysname); DTRACE4(dist_outputv, erts_this_node_sysname, port_str, remote_str, size); } @@ -2058,7 +2162,6 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) #endif #define ERTS_PORT_REDS_DIST_CMD_START 5 -#define ERTS_PORT_REDS_DIST_CMD_FINALIZE 3 #define ERTS_PORT_REDS_DIST_CMD_EXIT 200 #define ERTS_PORT_REDS_DIST_CMD_RESUMED 5 #define ERTS_PORT_REDS_DIST_CMD_DATA(SZ) \ @@ -2067,37 +2170,36 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) : ((((Sint) (SZ)) >> 10) & ((Sint) ERTS_PORT_REDS_MASK__))) int -erts_dist_command(Port *prt, int reds_limit) +erts_dist_command(Port *prt, int initial_reds) { - Sint reds = ERTS_PORT_REDS_DIST_CMD_START; - Uint32 status; + Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START; + enum dist_entry_state state; Uint32 flags; - Sint obufsize = 0; + Sint qsize, obufsize = 0; ErtsDistOutputQueue oq, foq; - DistEntry *dep = prt->dist_entry; + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); erts_aint32_t sched_flags; ErtsSchedulerData *esdp = erts_get_scheduler_data(); - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - - erts_refc_inc(&dep->refc, 1); /* Otherwise dist_entry might be - removed if port command fails */ + ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); - erts_smp_atomic_set_mb(&dep->dist_cmd_scheduled, 0); + erts_atomic_set_mb(&dep->dist_cmd_scheduled, 0); - erts_smp_de_rlock(dep); + erts_de_rlock(dep); flags = dep->flags; - status = dep->status; + state = dep->state; send = dep->send; - erts_smp_de_runlock(dep); + erts_de_runlock(dep); - if (status & ERTS_DE_SFLG_EXITING) { + if (state == ERTS_DE_STATE_EXITING) { erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1); - erts_deref_dist_entry(dep); - return reds + ERTS_PORT_REDS_DIST_CMD_EXIT; + reds -= ERTS_PORT_REDS_DIST_CMD_EXIT; + return initial_reds - reds; } + ASSERT(state != ERTS_DE_STATE_PENDING); + ASSERT(send); /* @@ -2108,42 +2210,42 @@ erts_dist_command(Port *prt, int reds_limit) * a mess. */ - erts_smp_mtx_lock(&dep->qlock); + erts_mtx_lock(&dep->qlock); oq.first = dep->out_queue.first; oq.last = dep->out_queue.last; dep->out_queue.first = NULL; dep->out_queue.last = NULL; - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_unlock(&dep->qlock); foq.first = dep->finalized_out_queue.first; foq.last = dep->finalized_out_queue.last; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; - sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); + sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - if (reds > reds_limit) + if (reds < 0) goto preempted; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) { int preempt = 0; do { - Uint size; - ErtsDistOutputBuf *fob; - - size = (*send)(prt, foq.first); - esdp->io.out += (Uint64) size; + Uint size; + ErtsDistOutputBuf *fob; + size = (*send)(prt, foq.first); + erts_atomic64_inc_nob(&dep->out); + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG - erts_fprintf(stderr, ">> "); - bw(foq.first->extp, size); + erts_fprintf(stderr, ">> "); + bw(foq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); + reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); fob = foq.first; obufsize += size_obuf(fob); foq.first = foq.first->next; free_dist_obuf(fob); - sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); - preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + sched_flags = erts_atomic32_read_nob(&prt->sched.flags); + preempt = reds < 0 || (sched_flags & ERTS_PTS_FLG_EXIT); if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) break; } while (foq.first && !preempt); @@ -2156,79 +2258,71 @@ erts_dist_command(Port *prt, int reds_limit) if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) { if (oq.first) { ErtsDistOutputBuf *ob; - int preempt; + ErtsDistOutputBuf *last_finalized = NULL; finalize_only: - preempt = 0; ob = oq.first; ASSERT(ob); do { - ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, - dep->cache, - flags); - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ - ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - preempt = reds > reds_limit; - if (preempt) - break; - ob = ob->next; + reds = erts_encode_ext_dist_header_finalize(ob, dep, flags, reds); + if (reds < 0) + break; + last_finalized = ob; + ob = ob->next; } while (ob); - /* - * At least one buffer was finalized; if we got preempted, - * ob points to the last buffer that we finalized. - */ - if (foq.last) - foq.last->next = oq.first; - else - foq.first = oq.first; - if (!preempt) { - /* All buffers finalized */ - foq.last = oq.last; - oq.first = oq.last = NULL; - } - else { - /* Not all buffers finalized; split oq. */ - foq.last = ob; - oq.first = ob->next; - if (oq.first) - ob->next = NULL; - else - oq.last = NULL; - } - if (preempt) - goto preempted; + if (last_finalized) { + /* + * At least one buffer was finalized; if we got preempted, + * ob points to the next buffer to continue finalize. + */ + if (foq.last) + foq.last->next = oq.first; + else + foq.first = oq.first; + foq.last = last_finalized; + if (!ob) { + /* All buffers finalized */ + ASSERT(foq.last == oq.last); + ASSERT(foq.last->next == NULL); + oq.first = oq.last = NULL; + } + else { + /* Not all buffers finalized; split oq. */ + ASSERT(foq.last->next == ob); + foq.last->next = NULL; + oq.first = ob; + } + } + if (reds <= 0) + goto preempted; } } else { + int de_busy; int preempt = 0; while (oq.first && !preempt) { ErtsDistOutputBuf *fob; Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ + reds = erts_encode_ext_dist_header_finalize(oq.first, dep, flags, reds); + if (reds < 0) { + preempt = 1; + break; + } ASSERT(&oq.first->data[0] <= oq.first->extp - && oq.first->extp < oq.first->ext_endp); + && oq.first->extp <= oq.first->ext_endp); size = (*send)(prt, oq.first); + erts_atomic64_inc_nob(&dep->out); esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG - erts_fprintf(stderr, ">> "); - bw(oq.first->extp, size); + erts_fprintf(stderr, ">> "); + bw(oq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); + reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); fob = oq.first; obufsize += size_obuf(fob); oq.first = oq.first->next; free_dist_obuf(fob); - sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); - preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + sched_flags = erts_atomic32_read_nob(&prt->sched.flags); + preempt = reds <= 0 || (sched_flags & ERTS_PTS_FLG_EXIT); if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt) goto finalize_only; } @@ -2254,23 +2348,24 @@ erts_dist_command(Port *prt, int reds_limit) * dist entry in a non-busy state and resume suspended * processes. */ - erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize >= obufsize); - dep->qsize -= obufsize; + erts_mtx_lock(&dep->qlock); + de_busy = !!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_BUSY); + qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize, + (erts_aint_t) -obufsize); + ASSERT(qsize >= 0); obufsize = 0; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) - && (dep->qflgs & ERTS_DE_QFLG_BUSY) - && dep->qsize < erts_dist_buf_busy_limit) { + && de_busy && qsize < erts_dist_buf_busy_limit) { ErtsProcList *suspendees; int resumed; suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_unlock(&dep->qlock); resumed = erts_resume_processes(suspendees); - reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } else - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_unlock(&dep->qlock); } ASSERT(!oq.first && !oq.last); @@ -2279,14 +2374,18 @@ erts_dist_command(Port *prt, int reds_limit) if (obufsize != 0) { ASSERT(obufsize > 0); - erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize >= obufsize); - dep->qsize -= obufsize; - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_lock(&dep->qlock); +#ifdef DEBUG + qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize, + (erts_aint_t) -obufsize); + ASSERT(qsize >= 0); +#else + erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize); +#endif + erts_mtx_unlock(&dep->qlock); } - ASSERT(foq.first || !foq.last); - ASSERT(!foq.first || foq.last); + ASSERT(!!foq.first == !!foq.last); ASSERT(!dep->finalized_out_queue.first); ASSERT(!dep->finalized_out_queue.last); @@ -2296,12 +2395,10 @@ erts_dist_command(Port *prt, int reds_limit) } /* Avoid wrapping reduction counter... */ - if (reds > INT_MAX/2) - reds = INT_MAX/2; - - erts_deref_dist_entry(dep); + if (reds < INT_MIN/2) + reds = INT_MIN/2; - return reds; + return initial_reds - reds; preempted: /* @@ -2309,8 +2406,7 @@ erts_dist_command(Port *prt, int reds_limit) * since last call to driver. */ - ASSERT(oq.first || !oq.last); - ASSERT(!oq.first || oq.last); + ASSERT(!!oq.first == !!oq.last); if (sched_flags & ERTS_PTS_FLG_EXIT) { /* @@ -2334,12 +2430,6 @@ erts_dist_command(Port *prt, int reds_limit) foq.first = NULL; foq.last = NULL; - -#ifdef DEBUG - erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize == obufsize); - erts_smp_mtx_unlock(&dep->qlock); -#endif } else { if (oq.first) { @@ -2347,14 +2437,14 @@ erts_dist_command(Port *prt, int reds_limit) * Unhandle buffers need to be put back first * in out_queue. */ - erts_smp_mtx_lock(&dep->qlock); - dep->qsize -= obufsize; + erts_mtx_lock(&dep->qlock); + erts_atomic_add_nob(&dep->qsize, -obufsize); obufsize = 0; oq.last->next = dep->out_queue.first; dep->out_queue.first = oq.first; if (!dep->out_queue.last) dep->out_queue.last = oq.last; - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_unlock(&dep->qlock); } erts_schedule_dist_command(prt, NULL); @@ -2362,18 +2452,406 @@ erts_dist_command(Port *prt, int reds_limit) goto done; } +#if 0 + +int +dist_data_finalize(Process *c_p, int reds_limit) +{ + int reds = 5; + DistEntry *dep = ; + ErtsDistOutputQueue oq, foq; + ErtsDistOutputBuf *ob; + int preempt; + + + erts_mtx_lock(&dep->qlock); + flags = dep->flags; + oq.first = dep->out_queue.first; + oq.last = dep->out_queue.last; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + erts_mtx_unlock(&dep->qlock); + + if (!oq.first) { + ASSERT(!oq.last); + oq.first = dep->tmp_out_queue.first; + oq.last = dep->tmp_out_queue.last; + } + else { + ErtsDistOutputBuf *f, *l; + ASSERT(oq.last); + if (dep->tmp_out_queue.last) { + dep->tmp_out_queue.last->next = oq.first; + oq.first = dep->tmp_out_queue.first; + } + } + + if (!oq.first) { + /* Nothing to do... */ + ASSERT(!oq.last); + return reds; + } + + foq.first = dep->finalized_out_queue.first; + foq.last = dep->finalized_out_queue.last; + + preempt = 0; + ob = oq.first; + ASSERT(ob); + + do { + ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, + dep->cache, + flags); + if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' + needed */ + ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + preempt = reds > reds_limit; + if (preempt) + break; + ob = ob->next; + } while (ob); + /* + * At least one buffer was finalized; if we got preempted, + * ob points to the last buffer that we finalized. + */ + if (foq.last) + foq.last->next = oq.first; + else + foq.first = oq.first; + if (!preempt) { + /* All buffers finalized */ + foq.last = oq.last; + oq.first = oq.last = NULL; + } + else { + /* Not all buffers finalized; split oq. */ + foq.last = ob; + oq.first = ob->next; + if (oq.first) + ob->next = NULL; + else + oq.last = NULL; + } + + dep->finalized_out_queue.first = foq.first; + dep->finalized_out_queue.last = foq.last; + dep->tmp_out_queue.first = oq.first; + dep->tmp_out_queue.last = oq.last; + + return reds; +} + +#endif + +BIF_RETTYPE +dist_ctrl_get_data_notification_1(BIF_ALIST_1) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + erts_aint32_t qflgs; + erts_aint_t qsize; + Eterm receiver = NIL; + Uint32 conn_id; + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep) + BIF_ERROR(BIF_P, BADARG); + + /* + * Caller is the only one that can consume from this queue + * and the only one that can set the req-info flag... + */ + + erts_de_rlock(dep); + + if (dep->connection_id != conn_id) { + erts_de_runlock(dep); + BIF_ERROR(BIF_P, BADARG); + } + + ASSERT(dep->cid == BIF_P->common.id); + + qflgs = erts_atomic32_read_acqb(&dep->qflgs); + + if (!(qflgs & ERTS_DE_QFLG_REQ_INFO)) { + qsize = erts_atomic_read_acqb(&dep->qsize); + ASSERT(qsize >= 0); + if (qsize > 0) + receiver = BIF_P->common.id; /* Notify ourselves... */ + else { /* Empty queue; set req-info flag... */ + qflgs = erts_atomic32_read_bor_mb(&dep->qflgs, + ERTS_DE_QFLG_REQ_INFO); + qsize = erts_atomic_read_acqb(&dep->qsize); + ASSERT(qsize >= 0); + if (qsize > 0) { + qflgs = erts_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) + receiver = BIF_P->common.id; /* Notify ourselves... */ + /* else: someone else will notify us... */ + } + /* else: still empty queue... */ + } + } + /* else: Already requested... */ + + erts_de_runlock(dep); + + if (is_internal_pid(receiver)) + notify_dist_data(BIF_P, receiver); + + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_ctrl_put_data_2(BIF_ALIST_2) +{ + DistEntry *dep; + ErlDrvSizeT size; + Eterm input_handler; + Uint32 conn_id; + + if (is_binary(BIF_ARG_2)) + size = binary_size(BIF_ARG_2); + else if (is_nil(BIF_ARG_2)) + size = 0; + else if (is_list(BIF_ARG_2)) + BIF_TRAP2(dist_ctrl_put_data_trap, + BIF_P, BIF_ARG_1, BIF_ARG_2); + else + BIF_ERROR(BIF_P, BADARG); + + dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id); + if (!dep) + BIF_ERROR(BIF_P, BADARG); + + input_handler = (Eterm) erts_atomic_read_nob(&dep->input_handler); + + if (input_handler != BIF_P->common.id) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + erts_atomic64_inc_nob(&dep->in); + + if (size != 0) { + byte *data, *temp_alloc = NULL; + + data = (byte *) erts_get_aligned_binary_bytes(BIF_ARG_2, &temp_alloc); + if (!data) + BIF_ERROR(BIF_P, BADARG); + + erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); + + (void) erts_net_message(NULL, dep, conn_id, NULL, 0, data, size); + /* + * We ignore any decode failures. On fatal failures the + * connection will be taken down by killing the + * distribution channel controller... + */ + + erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + + BUMP_REDS(BIF_P, 5); + + erts_free_aligned_binary_bytes(temp_alloc); + + } + + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_get_stat_1(BIF_ALIST_1) +{ + Sint64 read, write, pend; + Eterm res, *hp, **hpp; + Uint sz, *szp; + Uint32 conn_id; + DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id); + + if (!dep) + BIF_ERROR(BIF_P, BADARG); + + erts_de_rlock(dep); + + if (dep->connection_id != conn_id) { + erts_de_runlock(dep); + BIF_ERROR(BIF_P, BADARG); + } + read = (Sint64) erts_atomic64_read_nob(&dep->in); + write = (Sint64) erts_atomic64_read_nob(&dep->out); + pend = (Sint64) erts_atomic_read_nob(&dep->qsize); + + erts_de_runlock(dep); + + sz = 0; + szp = &sz; + hpp = NULL; + + while (1) { + res = erts_bld_tuple(hpp, szp, 4, + am_ok, + erts_bld_sint64(hpp, szp, read), + erts_bld_sint64(hpp, szp, write), + pend ? am_true : am_false); + if (hpp) + break; + hp = HAlloc(BIF_P, sz); + hpp = &hp; + szp = NULL; + } + + BIF_RET(res); +} + +BIF_RETTYPE +dist_ctrl_input_handler_2(BIF_ALIST_2) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + Uint32 conn_id; + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep) + BIF_ERROR(BIF_P, BADARG); + + if (is_not_internal_pid(BIF_ARG_2)) + BIF_ERROR(BIF_P, BADARG); + + erts_de_rlock(dep); + if (dep->connection_id != conn_id) { + erts_de_runlock(dep); + BIF_ERROR(BIF_P, BADARG); + } + erts_atomic_set_nob(&dep->input_handler, + (erts_aint_t) BIF_ARG_2); + erts_de_runlock(dep); + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_ctrl_get_data_1(BIF_ALIST_1) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P); + Sint reds = initial_reds; + ErtsDistOutputBuf *obuf; + Eterm *hp; + ProcBin *pb; + erts_aint_t qsize; + Uint32 conn_id; + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep) + BIF_ERROR(BIF_P, BADARG); + + erts_de_rlock(dep); + + if (dep->connection_id != conn_id) { + erts_de_runlock(dep); + BIF_ERROR(BIF_P, BADARG); + } + + if (dep->state == ERTS_DE_STATE_EXITING) + goto return_none; + + ASSERT(dep->cid == BIF_P->common.id); + +#if 0 + if (dep->finalized_out_queue.first) { + obuf = dep->finalized_out_queue.first; + dep->finalized_out_queue.first = obuf->next; + if (!obuf->next) + dep->finalized_out_queue.last = NULL; + } + else +#endif + { + if (!dep->tmp_out_queue.first) { + ASSERT(!dep->tmp_out_queue.last); + ASSERT(!dep->transcode_ctx); + qsize = erts_atomic_read_acqb(&dep->qsize); + if (qsize > 0) { + erts_mtx_lock(&dep->qlock); + dep->tmp_out_queue.first = dep->out_queue.first; + dep->tmp_out_queue.last = dep->out_queue.last; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + erts_mtx_unlock(&dep->qlock); + } + } + + if (!dep->tmp_out_queue.first) { + ASSERT(!dep->tmp_out_queue.last); + return_none: + erts_de_runlock(dep); + BIF_RET(am_none); + } + + obuf = dep->tmp_out_queue.first; + reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->flags, reds); + if (reds < 0) { + erts_de_runlock(dep); + ERTS_BIF_YIELD1(bif_export[BIF_dist_ctrl_get_data_1], + BIF_P, BIF_ARG_1); + } + + dep->tmp_out_queue.first = obuf->next; + if (!obuf->next) + dep->tmp_out_queue.last = NULL; + } + + erts_atomic64_inc_nob(&dep->out); + + erts_de_runlock(dep); + + hp = HAlloc(BIF_P, PROC_BIN_SIZE); + pb = (ProcBin *) (char *) hp; + pb->thing_word = HEADER_PROC_BIN; + pb->size = obuf->ext_endp - obuf->extp; + pb->next = MSO(BIF_P).first; + MSO(BIF_P).first = (struct erl_off_heap_header*) pb; + pb->val = ErtsDistOutputBuf2Binary(obuf); + pb->bytes = (byte*) obuf->extp; + pb->flags = 0; + + qsize = erts_atomic_add_read_nob(&dep->qsize, -size_obuf(obuf)); + ASSERT(qsize >= 0); + + if (qsize < erts_dist_buf_busy_limit/2 + && (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY)) { + ErtsProcList *resume_procs = NULL; + erts_mtx_lock(&dep->qlock); + resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); + erts_mtx_unlock(&dep->qlock); + if (resume_procs) { + int resumed = erts_resume_processes(resume_procs); + reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + } + } + + BIF_RET2(make_binary(pb), (initial_reds - reds)); +} + void erts_dist_port_not_busy(Port *prt) { #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_port_not_busy)) { + DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), - "%T", prt->dist_entry->sysname); + "%T", dep->sysname); DTRACE3(dist_port_not_busy, erts_this_node_sysname, port_str, remote_str); } @@ -2381,86 +2859,93 @@ erts_dist_port_not_busy(Port *prt) erts_schedule_dist_command(prt, NULL); } -void -erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) +static void kill_connection(DistEntry *dep) { - erts_smp_de_rwlock(dep); - if (is_internal_port(dep->cid) - && connection_id == dep->connection_id - && !(dep->status & ERTS_DE_SFLG_EXITING)) { - - dep->status |= ERTS_DE_SFLG_EXITING; + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); + ASSERT(dep->state == ERTS_DE_STATE_CONNECTED); + + dep->state = ERTS_DE_STATE_EXITING; + erts_mtx_lock(&dep->qlock); + ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); + erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); + erts_mtx_unlock(&dep->qlock); + + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + else if (is_internal_pid(dep->cid)) + schedule_kill_dist_ctrl_proc(dep->cid); +} - erts_smp_mtx_lock(&dep->qlock); - ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT)); - dep->qflgs |= ERTS_DE_QFLG_EXIT; - erts_smp_mtx_unlock(&dep->qlock); +void +erts_kill_dist_connection(DistEntry *dep, Uint32 conn_id) +{ + erts_de_rwlock(dep); + if (conn_id == dep->connection_id + && dep->state == ERTS_DE_STATE_CONNECTED) { - erts_schedule_dist_command(NULL, dep); + kill_connection(dep); } - erts_smp_de_rwunlock(dep); + erts_de_rwunlock(dep); } struct print_to_data { - int to; + fmtfn_t to; void *arg; }; static void doit_print_monitor_info(ErtsMonitor *mon, void *vptdp) { - int to = ((struct print_to_data *) vptdp)->to; + fmtfn_t to = ((struct print_to_data *) vptdp)->to; void *arg = ((struct print_to_data *) vptdp)->arg; - Process *rp; - ErtsMonitor *rmon; - rp = erts_proc_lookup(mon->pid); - if (!rp || (rmon = erts_lookup_monitor(ERTS_P_MONITORS(rp), mon->ref)) == NULL) { - erts_print(to, arg, "Warning, stray monitor for: %T\n", mon->pid); - } else if (mon->type == MON_ORIGIN) { - /* Local pid is being monitored */ + ErtsMonitorDataExtended *mdep; + + ASSERT(mon->flags & ERTS_ML_FLG_EXTENDED); + + mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon); + + ASSERT(mdep->dist); + + if (erts_monitor_is_origin(mon)) { erts_print(to, arg, "Remotely monitored by: %T %T\n", - mon->pid, rmon->pid); - } else { - erts_print(to, arg, "Remote monitoring: %T ", mon->pid); - if (is_not_atom(rmon->pid)) - erts_print(to, arg, "%T\n", rmon->pid); - else - erts_print(to, arg, "{%T, %T}\n", - rmon->name, - rmon->pid); /* which in this case is the - remote system name... */ + mon->other.item, mdep->md.target.other.item); + } + else { + erts_print(to, arg, "Remote monitoring: %T ", mon->other.item); + if (mon->flags & ERTS_ML_FLG_NAME) + erts_print(to, arg, "{%T, %T}\n", mdep->u.name, mdep->dist->nodename); + else + erts_print(to, arg, "%T\n", mdep->md.origin.other.item); } } -static void print_monitor_info(int to, void *arg, ErtsMonitor *mon) +static void print_monitor_info(fmtfn_t to, void *arg, DistEntry *dep) { struct print_to_data ptd = {to, arg}; - erts_doforall_monitors(mon,&doit_print_monitor_info,&ptd); -} - -typedef struct { - struct print_to_data *ptdp; - Eterm from; -} PrintLinkContext; - -static void doit_print_link_info2(ErtsLink *lnk, void *vpplc) -{ - PrintLinkContext *pplc = (PrintLinkContext *) vpplc; - erts_print(pplc->ptdp->to, pplc->ptdp->arg, "Remote link: %T %T\n", - pplc->from, lnk->pid); + if (dep->mld) { + erts_monitor_list_foreach(dep->mld->monitors, + doit_print_monitor_info, + (void *) &ptd); + erts_monitor_tree_foreach(dep->mld->orig_name_monitors, + doit_print_monitor_info, + (void *) &ptd); + } } static void doit_print_link_info(ErtsLink *lnk, void *vptdp) { - if (is_internal_pid(lnk->pid) && erts_proc_lookup(lnk->pid)) { - PrintLinkContext plc = {(struct print_to_data *) vptdp, lnk->pid}; - erts_doforall_links(ERTS_LINK_ROOT(lnk), &doit_print_link_info2, &plc); - } + struct print_to_data *ptdp = vptdp; + ErtsLink *lnk2 = erts_link_to_other(lnk, NULL); + erts_print(ptdp->to, ptdp->arg, "Remote link: %T %T\n", + lnk2->other.item, lnk->other.item); } -static void print_link_info(int to, void *arg, ErtsLink *lnk) +static void print_link_info(fmtfn_t to, void *arg, DistEntry *dep) { struct print_to_data ptd = {to, arg}; - erts_doforall_links(lnk, &doit_print_link_info, (void *) &ptd); + if (dep->mld) + erts_link_list_foreach(dep->mld->links, + doit_print_link_info, + (void *) &ptd); } typedef struct { @@ -2468,25 +2953,8 @@ typedef struct { Eterm sysname; } PrintNodeLinkContext; - -static void doit_print_nodelink_info(ErtsLink *lnk, void *vpcontext) -{ - PrintNodeLinkContext *pcontext = vpcontext; - - if (is_internal_pid(lnk->pid) && erts_proc_lookup(lnk->pid)) - erts_print(pcontext->ptd.to, pcontext->ptd.arg, - "Remote monitoring: %T %T\n", lnk->pid, pcontext->sysname); -} - -static void print_nodelink_info(int to, void *arg, ErtsLink *lnk, Eterm sysname) -{ - PrintNodeLinkContext context = {{to, arg}, sysname}; - erts_doforall_links(lnk, &doit_print_nodelink_info, &context); -} - - static int -info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected) +info_dist_entry(fmtfn_t to, void *arg, DistEntry *dep, int visible, int connected) { if (visible && connected) { @@ -2513,13 +2981,10 @@ info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected) } erts_print(to, arg, "Name: %T", dep->sysname); -#ifdef DEBUG - erts_print(to, arg, " (refc=%d)", erts_refc_read(&dep->refc, 0)); -#endif erts_print(to, arg, "\n"); if (!connected && is_nil(dep->cid)) { - if (dep->nlinks) { - erts_print(to, arg, "Error: Got links to not connected node:%T\n", + if (dep->mld) { + erts_print(to, arg, "Error: Got links/monitors to not connected node:%T\n", dep->sysname); } return 0; @@ -2528,14 +2993,13 @@ info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected) erts_print(to, arg, "Controller: %T\n", dep->cid, to); erts_print_node_info(to, arg, dep->sysname, NULL, NULL); - print_monitor_info(to, arg, dep->monitors); - print_link_info(to, arg, dep->nlinks); - print_nodelink_info(to, arg, dep->node_links, dep->sysname); + print_monitor_info(to, arg, dep); + print_link_info(to, arg, dep); return 0; } -int distribution_info(int to, void *arg) /* Called by break handler */ +int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ { DistEntry *dep; @@ -2559,6 +3023,10 @@ int distribution_info(int to, void *arg) /* Called by break handler */ info_dist_entry(to, arg, dep, 0, 1); } + for (dep = erts_pending_dist_entries; dep; dep = dep->next) { + info_dist_entry(to, arg, dep, 0, 0); + } + for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) { if (dep != erts_this_dist_entry) { info_dist_entry(to, arg, dep, 0, 0); @@ -2581,7 +3049,6 @@ int distribution_info(int to, void *arg) /* Called by break handler */ monitor_node -- turn on/off node monitoring node controller only: - dist_exit/3 -- send exit signals from remote to local process dist_link/2 -- link a remote process to a local dist_unlink/2 -- unlink a remote from a local ****************************************************************************/ @@ -2591,15 +3058,6 @@ int distribution_info(int to, void *arg) /* Called by break handler */ /********************************************************************** ** Set the node name of current node fail if node already is set. ** setnode(name@host, Creation) - ** loads functions pointer to trap_functions from module erlang. - ** erlang:dsend/2 - ** erlang:dlink/1 - ** erlang:dunlink/1 - ** erlang:dmonitor_node/3 - ** erlang:dgroup_leader/2 - ** erlang:dexit/2 - ** -- are these needed ? - ** dexit/1 ***********************************************************************/ BIF_RETTYPE setnode_2(BIF_ALIST_2) @@ -2623,44 +3081,50 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) goto error; /* Check that all trap functions are defined !! */ - if (dsend2_trap->addressv[0] == NULL || - dsend3_trap->addressv[0] == NULL || - /* dsend_nosuspend_trap->address == NULL ||*/ - dlink_trap->addressv[0] == NULL || - dunlink_trap->addressv[0] == NULL || - dmonitor_node_trap->addressv[0] == NULL || - dgroup_leader_trap->addressv[0] == NULL || - dmonitor_p_trap->addressv[0] == NULL || - dexit_trap->addressv[0] == NULL) { + if (dmonitor_node_trap->addressv[0] == NULL) { goto error; } - net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN, - am_net_kernel, ERTS_PROC_LOCK_MAIN, 0); - if (!net_kernel) + net_kernel = erts_whereis_process(BIF_P, + ERTS_PROC_LOCK_MAIN, + am_net_kernel, + ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS, + 0); + if (!net_kernel || ERTS_PROC_GET_DIST_ENTRY(net_kernel)) goto error; /* By setting F_DISTRIBUTION on net_kernel, - * do_net_exist will be called when net_kernel is terminated !! */ + * erts_do_net_exits will be called when net_kernel is terminated !! */ net_kernel->flags |= F_DISTRIBUTION; - if (net_kernel != BIF_P) - erts_smp_proc_unlock(net_kernel, ERTS_PROC_LOCK_MAIN); + erts_proc_unlock(net_kernel, + (ERTS_PROC_LOCK_STATUS + | ((net_kernel != BIF_P) + ? ERTS_PROC_LOCK_MAIN + : 0))); #ifdef DEBUG - erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx); + erts_rwmtx_rlock(&erts_dist_table_rwmtx); ASSERT(!erts_visible_dist_entries && !erts_hidden_dist_entries); - erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx); + erts_rwmtx_runlock(&erts_dist_table_rwmtx); #endif - erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); - erts_smp_thr_progress_block(); + erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); + erts_thr_progress_block(); inc_no_nodes(); erts_set_this_node(BIF_ARG_1, (Uint32) creation); erts_is_alive = 1; send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, am_visible, NIL); - erts_smp_thr_progress_unblock(); - erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + erts_thr_progress_unblock(); + erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + + /* + * Note erts_this_dist_entry is changed by erts_set_this_node(), + * so we *need* to use the new one after erts_set_this_node() + * is called. + */ + erts_ref_dist_entry(erts_this_dist_entry); + ERTS_PROC_SET_DIST_ENTRY(net_kernel, erts_this_dist_entry); BIF_RET(am_true); @@ -2668,74 +3132,80 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) BIF_ERROR(BIF_P, BADARG); } -/********************************************************************** - ** Allocate a dist entry, set node name install the connection handler - ** setnode_3({name@host, Creation}, Cid, {Type, Version, Initial, IC, OC}) - ** Type = flag field, where the flags are specified in dist.h - ** Version = distribution version, >= 1 - ** IC = in_cookie (ignored) - ** OC = out_cookie (ignored) - ** - ** Note that in distribution protocols above 1, the Initial parameter - ** is always NIL and the cookies are always the atom '', cookies are not - ** sent in the distribution messages but are only used in - ** the handshake. - ** - ***********************************************************************/ +/* + * erts_internal:create_dist_channel/4 is used by + * erlang:setnode/3. + */ -BIF_RETTYPE setnode_3(BIF_ALIST_3) +typedef struct { + DistEntry *dep; + Uint flags; + Uint version; +} ErtsSetupConnDistCtrl; + +static void +setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, + Eterm ctrlr, Uint flags, + Uint version); + +static Eterm +setup_connection_distctrl(Process *c_p, void *arg, + int *redsp, ErlHeapFragment **bpp); + +BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) { BIF_RETTYPE ret; Uint flags; - unsigned long version; - Eterm ic, oc; - Eterm *tp; + Uint version; + Eterm *hp, res_tag = THE_NON_VALUE, res = THE_NON_VALUE; DistEntry *dep = NULL; + int de_locked = 0; Port *pp = NULL; - /* Prepare for success */ - ERTS_BIF_PREP_RET(ret, am_true); - /* * Check and pick out arguments */ - if (!is_node_name_atom(BIF_ARG_1) || - is_not_internal_port(BIF_ARG_2) || - (erts_this_node->sysname == am_Noname)) { - goto badarg; - } + /* Node name... */ + if (!is_node_name_atom(BIF_ARG_1)) + goto badarg; - if (!is_tuple(BIF_ARG_3)) - goto badarg; - tp = tuple_val(BIF_ARG_3); - if (*tp++ != make_arityval(4)) - goto badarg; - if (!is_small(*tp)) - goto badarg; - flags = unsigned_val(*tp++); - if (!is_small(*tp) || (version = unsigned_val(*tp)) == 0) - goto badarg; - ic = *(++tp); - oc = *(++tp); - if (!is_atom(ic) || !is_atom(oc)) - goto badarg; + /* Distribution controller... */ + if (!is_internal_port(BIF_ARG_2) && !is_internal_pid(BIF_ARG_2)) + goto badarg; + + /* Dist flags... */ + if (!is_small(BIF_ARG_3)) + goto badarg; + flags = unsigned_val(BIF_ARG_3); + + /* Version... */ + if (!is_small(BIF_ARG_4)) + goto badarg; + version = unsigned_val(BIF_ARG_4); + + if (version == 0) + goto badarg; - /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */ - if (!(DFLAG_EXTENDED_REFERENCES & flags)) { + if (~flags & DFLAG_DIST_MANDATORY) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "%T", BIF_P->common.id); if (BIF_P->common.u.alive.reg) erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name); erts_dsprintf(dsbufp, " attempted to enable connection to node %T " - "which is not able to handle extended references.\n", + "which does not support all mandatory capabilities.\n", BIF_ARG_1); erts_send_error_to_logger(BIF_P->group_leader, dsbufp); goto badarg; } /* + * ToDo: Should we not pass connection_id as well + * to make sure it's the right connection we commit. + */ + + /* * Arguments seem to be in order. */ @@ -2746,90 +3216,120 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) else if (!dep) goto system_limit; /* Should never happen!!! */ - pp = erts_id2port_sflgs(BIF_ARG_2, - BIF_P, - ERTS_PROC_LOCK_MAIN, - ERTS_PORT_SFLGS_INVALID_LOOKUP); - erts_smp_de_rwlock(dep); + if (is_internal_pid(BIF_ARG_2)) { + if (BIF_P->common.id == BIF_ARG_2) { + ErtsSetupConnDistCtrl scdc; - if (!pp || (erts_atomic32_read_nob(&pp->state) - & ERTS_PORT_SFLG_EXITING)) - goto badarg; + scdc.dep = dep; + scdc.flags = flags; + scdc.version = version; - if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) - goto badarg; + res = setup_connection_distctrl(BIF_P, &scdc, NULL, NULL); + BUMP_REDS(BIF_P, 5); + dep = NULL; - if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) - goto done; /* Already set */ + if (res == am_badarg) + goto badarg; - if (dep->status & ERTS_DE_SFLG_EXITING) { - /* Suspend on dist entry waiting for the exit to finish */ - ErtsProcList *plp = erts_proclist_create(BIF_P); - plp->next = NULL; - erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); - erts_smp_mtx_lock(&dep->qlock); - erts_proclist_store_last(&dep->suspended, plp); - erts_smp_mtx_unlock(&dep->qlock); - goto yield; - } + ASSERT(is_internal_magic_ref(res)); + res_tag = am_ok; /* Connection up */ + } + else { + ErtsSetupConnDistCtrl *scdcp; - ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); + scdcp = erts_alloc(ERTS_ALC_T_SETUP_CONN_ARG, + sizeof(ErtsSetupConnDistCtrl)); - if (pp->dist_entry || is_not_nil(dep->cid)) - goto badarg; + scdcp->dep = dep; + scdcp->flags = flags; + scdcp->version = version; - erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); + res = erts_proc_sig_send_rpc_request(BIF_P, + BIF_ARG_2, + !0, + setup_connection_distctrl, + (void *) scdcp); + if (is_non_value(res)) + goto badarg; - /* - * Dist-ports do not use the "busy port message queue" functionality, but - * instead use "busy dist entry" functionality. - */ - { - ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; - erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); + dep = NULL; + + ASSERT(is_internal_ordinary_ref(res)); + + res_tag = am_message; /* Caller need to wait for dhandle in message */ + } + hp = HAlloc(BIF_P, 3); } + else { + Uint32 conn_id; - pp->dist_entry = dep; + pp = erts_id2port_sflgs(BIF_ARG_2, + BIF_P, + ERTS_PROC_LOCK_MAIN, + ERTS_PORT_SFLGS_INVALID_LOOKUP); + erts_de_rwlock(dep); + de_locked = 1; - dep->version = version; - dep->creation = 0; + if (dep->state != ERTS_DE_STATE_PENDING) + goto badarg; - ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); + if (!pp || (erts_atomic32_read_nob(&pp->state) + & ERTS_PORT_SFLG_EXITING)) + goto badarg; -#if 1 - dep->send = (pp->drv_ptr->outputv - ? dist_port_commandv - : dist_port_command); -#else - dep->send = dist_port_command; -#endif - ASSERT(dep->send); + if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) + goto badarg; -#ifdef DEBUG - erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize == 0); - erts_smp_mtx_unlock(&dep->qlock); -#endif + if (erts_prtsd_get(pp, ERTS_PRTSD_DIST_ENTRY) != NULL + || is_not_nil(dep->cid)) + goto badarg; - erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); + erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); - if (flags & DFLAG_DIST_HDR_ATOM_CACHE) - create_cache(dep); + erts_prtsd_set(pp, ERTS_PRTSD_DIST_ENTRY, dep); + erts_prtsd_set(pp, ERTS_PRTSD_CONN_ID, (void*)(UWord)dep->connection_id); - erts_smp_de_rwunlock(dep); - dep = NULL; /* inc of refc transferred to port (dist_entry field) */ + ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); - inc_no_nodes(); + dep->send = (pp->drv_ptr->outputv + ? dist_port_commandv + : dist_port_command); + ASSERT(dep->send); + + /* + * Dist-ports do not use the "busy port message queue" functionality, but + * instead use "busy dist entry" functionality. + */ + { + ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; + erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); + } + + conn_id = dep->connection_id; + setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version); + de_locked = 0; + + hp = HAlloc(BIF_P, 3 + ERTS_DHANDLE_SIZE); + res = erts_build_dhandle(&hp, &BIF_P->off_heap, dep, conn_id); + res_tag = am_ok; /* Connection up */ + dep = NULL; /* inc of refc transferred to port (dist_entry field) */ + } + + ASSERT(is_value(res) && is_value(res_tag)); + + res = TUPLE2(hp, res_tag, res); + + ERTS_BIF_PREP_RET(ret, res); - send_nodes_mon_msgs(BIF_P, - am_nodeup, - BIF_ARG_1, - flags & DFLAG_PUBLISHED ? am_visible : am_hidden, - NIL); done: if (dep && dep != erts_this_dist_entry) { - erts_smp_de_rwunlock(dep); + if (de_locked) { + if (de_locked > 0) + erts_de_rwunlock(dep); + else + erts_de_runlock(dep); + } erts_deref_dist_entry(dep); } @@ -2838,97 +3338,290 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) return ret; - yield: - ERTS_BIF_PREP_YIELD3(ret, bif_export[BIF_setnode_3], BIF_P, - BIF_ARG_1, BIF_ARG_2, BIF_ARG_3); - goto done; - badarg: - ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG); + ERTS_BIF_PREP_RET(ret, am_badarg); goto done; system_limit: - ERTS_BIF_PREP_ERROR(ret, BIF_P, SYSTEM_LIMIT); + ERTS_BIF_PREP_RET(ret, am_system_limit); goto done; } +static void +setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, + Eterm ctrlr, Uint flags, + Uint version) +{ + Eterm notify_proc = NIL; + erts_aint32_t qflgs; -/**********************************************************************/ -/* dist_exit(Local, Term, Remote) -> Bool */ + dep->version = version; + dep->creation = 0; + + ASSERT(is_internal_port(ctrlr) || is_internal_pid(ctrlr)); + ASSERT(dep->state == ERTS_DE_STATE_PENDING); + + if (flags & DFLAG_DIST_HDR_ATOM_CACHE) + create_cache(dep); + + erts_set_dist_entry_connected(dep, ctrlr, flags); + + notify_proc = NIL; + if (erts_atomic_read_nob(&dep->qsize)) { + if (is_internal_port(dep->cid)) { + erts_schedule_dist_command(NULL, dep); + } + else { + qflgs = erts_atomic32_read_nob(&dep->qflgs); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + qflgs = erts_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + notify_proc = dep->cid; + ASSERT(is_internal_pid(notify_proc)); + } + } + } + } + + erts_de_rwunlock(dep); + + if (is_internal_pid(notify_proc)) + notify_dist_data(c_p, notify_proc); + + inc_no_nodes(); + + send_nodes_mon_msgs(c_p, + am_nodeup, + dep->sysname, + flags & DFLAG_PUBLISHED ? am_visible : am_hidden, + NIL); +} + +static Eterm +setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment **bpp) +{ + ErtsSetupConnDistCtrl *scdcp = (ErtsSetupConnDistCtrl *) arg; + DistEntry *dep = scdcp->dep; + int dep_locked = 0; + Eterm *hp; + Uint32 conn_id; + + if (redsp) + *redsp = 1; + + ASSERT(!ERTS_PROC_IS_EXITING(c_p)); + + erts_de_rwlock(dep); + dep_locked = !0; + + if (dep->state != ERTS_DE_STATE_PENDING) + goto badarg; + + conn_id = dep->connection_id; + + if (is_not_nil(dep->cid)) + goto badarg; + + c_p->flags |= F_DISTRIBUTION; + ERTS_PROC_SET_DIST_ENTRY(c_p, dep); + + dep->send = NULL; /* Only for distr ports... */ + + if (redsp) + *redsp = 5; + + setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id, + scdcp->flags, scdcp->version); + + /* we take over previous inc in refc of dep */ + + if (!bpp) /* called directly... */ + return erts_make_dhandle(c_p, dep, conn_id); + + erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg); -BIF_RETTYPE dist_exit_3(BIF_ALIST_3) + *bpp = new_message_buffer(ERTS_DHANDLE_SIZE); + hp = (*bpp)->mem; + return erts_build_dhandle(&hp, &(*bpp)->off_heap, dep, conn_id); + +badarg: + + if (bpp) /* not called directly */ + erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg); + + if (dep_locked) + erts_de_rwunlock(dep); + + erts_deref_dist_entry(dep); + + return am_badarg; +} + + +BIF_RETTYPE erts_internal_get_dflags_0(BIF_ALIST_0) { - Eterm local; - Eterm remote; - DistEntry *rdep; + return erts_dflags_record; +} - local = BIF_ARG_1; - remote = BIF_ARG_3; +BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1) +{ + DistEntry* dep; + Uint32 conn_id; + Eterm* hp; + Eterm dhandle; - /* Check that remote is a remote process */ - if (is_not_external_pid(remote)) - goto error; + if (is_not_atom(BIF_ARG_1)) { + BIF_ERROR(BIF_P, BADARG); + } + dep = erts_find_or_insert_dist_entry(BIF_ARG_1); - rdep = external_dist_entry(remote); - - if(rdep == erts_this_dist_entry) - goto error; + if (dep == erts_this_dist_entry) { + erts_deref_dist_entry(dep); + BIF_ERROR(BIF_P, BADARG); + } - /* Check that local is local */ - if (is_internal_pid(local)) { - Process *lp; - ErtsProcLocks lp_locks; - if (BIF_P->common.id == local) { - lp_locks = ERTS_PROC_LOCKS_ALL; - lp = BIF_P; - erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR); - } - else { - lp_locks = ERTS_PROC_LOCKS_XSIG_SEND; - lp = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN, - local, lp_locks); - if (!lp) { - BIF_RET(am_true); /* ignore */ - } - } - - (void) erts_send_exit_signal(BIF_P, - remote, - lp, - &lp_locks, - BIF_ARG_2, - NIL, - NULL, - 0); -#ifdef ERTS_SMP - if (lp == BIF_P) - lp_locks &= ~ERTS_PROC_LOCK_MAIN; -#endif - erts_smp_proc_unlock(lp, lp_locks); - if (lp == BIF_P) { - erts_aint32_t state = erts_smp_atomic32_read_acqb(&BIF_P->state); - /* - * We may have exited current process and may have to take action. - */ - if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { -#ifdef ERTS_SMP - if (state & ERTS_PSFLG_PENDING_EXIT) - erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN); -#endif - ERTS_BIF_EXITED(BIF_P); - } - } + erts_de_rwlock(dep); + + switch (dep->state) { + case ERTS_DE_STATE_CONNECTED: + case ERTS_DE_STATE_EXITING: + case ERTS_DE_STATE_PENDING: + conn_id = dep->connection_id; + break; + case ERTS_DE_STATE_IDLE: + erts_set_dist_entry_pending(dep); + conn_id = dep->connection_id; + break; + default: + erts_exit(ERTS_ABORT_EXIT, "Invalid dep->state (%d)\n", dep->state); + } + erts_de_rwunlock(dep); + hp = HAlloc(BIF_P, ERTS_DHANDLE_SIZE); + dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep, conn_id); + erts_deref_dist_entry(dep); + BIF_RET(dhandle); +} + +Sint erts_abort_connection_rwunlock(DistEntry* dep) +{ + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); + + if (dep->state == ERTS_DE_STATE_CONNECTED) { + kill_connection(dep); } - else if (is_external_pid(local) - && external_dist_entry(local) == erts_this_dist_entry) { - BIF_RET(am_true); /* ignore */ + else if (dep->state == ERTS_DE_STATE_PENDING) { + ErtsAtomCache *cache; + ErtsDistOutputBuf *obuf; + ErtsProcList *resume_procs; + Sint reds = 0; + ErtsMonLnkDist *mld; + + ASSERT(is_nil(dep->cid)); + + mld = dep->mld; + dep->mld = NULL; + + cache = dep->cache; + dep->cache = NULL; + erts_mtx_lock(&dep->qlock); + obuf = dep->out_queue.first; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + ASSERT(!dep->tmp_out_queue.first); + ASSERT(!dep->finalized_out_queue.first); + resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + + erts_set_dist_entry_not_connected(dep); + erts_de_rwunlock(dep); + + schedule_con_monitor_link_cleanup(mld, THE_NON_VALUE, + THE_NON_VALUE, THE_NON_VALUE); + + if (resume_procs) { + int resumed = erts_resume_processes(resume_procs); + reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + } + + delete_cache(cache); + free_de_out_queues(dep, obuf); + return reds; } - else - goto error; + erts_de_rwunlock(dep); + return 0; +} + +static Sint abort_connection(DistEntry *dep, Uint32 conn_id) +{ + erts_de_rwlock(dep); + if (dep->connection_id == conn_id) + return erts_abort_connection_rwunlock(dep); + erts_de_rwunlock(dep); + return 0; +} + +BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2) +{ + DistEntry* dep; + Uint32 conn_id; + Sint reds; + + if (is_not_atom(BIF_ARG_1)) + BIF_ERROR(BIF_P, BADARG); + dep = erts_dhandle_to_dist_entry(BIF_ARG_2, &conn_id); + if (!dep || dep != erts_find_dist_entry(BIF_ARG_1) + || dep == erts_this_dist_entry) { + BIF_ERROR(BIF_P, BADARG); + } + + reds = abort_connection(dep, conn_id); + BUMP_REDS(BIF_P, reds); BIF_RET(am_true); +} - error: - BIF_ERROR(BIF_P, BADARG); +int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) +{ + erts_de_rwlock(dep); + if (dep->state != ERTS_DE_STATE_IDLE) { + erts_de_rwunlock(dep); + } + else { + Process* net_kernel; + ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ; + Eterm *hp; + ErlOffHeap *ohp; + ErtsMessage *mp; + Eterm msg, dhandle; + Uint32 conn_id; + + erts_set_dist_entry_pending(dep); + conn_id = dep->connection_id; + erts_de_rwunlock(dep); + + net_kernel = erts_whereis_process(proc, proc_locks, + am_net_kernel, nk_locks, 0); + if (!net_kernel) { + abort_connection(dep, conn_id); + return 0; + } + + /* + * Send {auto_connect, Node, DHandle} to net_kernel + */ + mp = erts_alloc_message_heap(net_kernel, &nk_locks, + 4 + ERTS_DHANDLE_SIZE, + &hp, &ohp); + dhandle = erts_build_dhandle(&hp, ohp, dep, conn_id); + msg = TUPLE3(hp, am_auto_connect, dep->sysname, dhandle); + ERL_MESSAGE_TOKEN(mp) = am_undefined; + erts_queue_proc_message(proc, net_kernel, nk_locks, mp, msg); + erts_proc_unlock(net_kernel, nk_locks); + } + + return 1; } /**********************************************************************/ @@ -3000,13 +3693,15 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) length = 0; - erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx); + erts_rwmtx_rlock(&erts_dist_table_rwmtx); ASSERT(erts_no_of_not_connected_dist_entries > 0); ASSERT(erts_no_of_hidden_dist_entries >= 0); + ASSERT(erts_no_of_pending_dist_entries >= 0); ASSERT(erts_no_of_visible_dist_entries >= 0); if(not_connected) - length += (erts_no_of_not_connected_dist_entries - 1); + length += ((erts_no_of_not_connected_dist_entries - 1) + + erts_no_of_pending_dist_entries); if(hidden) length += erts_no_of_hidden_dist_entries; if(visible) @@ -3017,7 +3712,7 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) result = NIL; if (length == 0) { - erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx); + erts_rwmtx_runlock(&erts_dist_table_rwmtx); goto done; } @@ -3026,13 +3721,18 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) #ifdef DEBUG endp = hp + length*2; #endif - if(not_connected) + if(not_connected) { for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { if (dep != erts_this_dist_entry) { result = CONS(hp, dep->sysname, result); hp += 2; } + } + for(dep = erts_pending_dist_entries; dep; dep = dep->next) { + result = CONS(hp, dep->sysname, result); + hp += 2; } + } if(hidden) for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { result = CONS(hp, dep->sysname, result); @@ -3048,7 +3748,7 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) hp += 2; } ASSERT(endp == hp); - erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx); + erts_rwmtx_runlock(&erts_dist_table_rwmtx); done: UnUseTmpHeap(2,BIF_P); @@ -3074,76 +3774,179 @@ BIF_RETTYPE is_alive_0(BIF_ALIST_0) static BIF_RETTYPE monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) { - DistEntry *dep; - ErtsLink *lnk; + BIF_RETTYPE ret; + DistEntry *dep = NULL; Eterm l; + int async_connect = 1; for (l = Options; l != NIL && is_list(l); l = CDR(list_val(l))) { Eterm t = CAR(list_val(l)); - /* allow_passive_connect the only available option right now */ - if (t != am_allow_passive_connect) { + if (t == am_allow_passive_connect) { + /* + * Handle this horrible feature by falling back on old synchronous + * auto-connect (if needed) + */ + async_connect = 0; + } else { BIF_ERROR(p, BADARG); } } if (l != NIL) { BIF_ERROR(p, BADARG); } + if (l != NIL) + goto badarg; - if (is_not_atom(Node) || - ((Bool != am_true) && (Bool != am_false)) || - ((erts_this_node->sysname == am_Noname) - && (Node != erts_this_node->sysname))) { - BIF_ERROR(p, BADARG); - } - dep = erts_sysname_to_connected_dist_entry(Node); - if (!dep) { - do_trap: - BIF_TRAP3(dmonitor_node_trap, p, Node, Bool, Options); + if (is_not_atom(Node)) + goto badarg; + + if (erts_this_node->sysname == am_Noname && Node != am_Noname) + goto badarg; + + switch (Bool) { + + case am_false: { + ErtsMonitor *mon; + /* + * Before OTP-21, monitor_node(Node, false) triggered + * auto-connect and a 'nodedown' message if that failed. + * Now it's a simple no-op which feels more reasonable. + */ + mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(p), Node); + if (mon) { + ErtsMonitorDataExtended *mdep; + ASSERT(erts_monitor_is_origin(mon)); + + mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon); + + ASSERT((mdep->u.refc > 0)); + if (--mdep->u.refc == 0) { + if (!mdep->uptr.node_monitors) + erts_monitor_tree_delete(&ERTS_P_MONITORS(p), mon); + else { + ErtsMonitor *sub_mon; + ErtsMonitorDataExtended *sub_mdep; + sub_mon = erts_monitor_list_last(mdep->uptr.node_monitors); + erts_monitor_list_delete(&mdep->uptr.node_monitors, sub_mon); + sub_mon->flags &= ~ERTS_ML_FLG_IN_SUBTABLE; + sub_mdep = ((ErtsMonitorDataExtended *) + erts_monitor_to_data(sub_mon)); + sub_mdep->uptr.node_monitors = mdep->uptr.node_monitors; + mdep->uptr.node_monitors = NULL; + erts_monitor_tree_replace(&ERTS_P_MONITORS(p), mon, sub_mon); + } + if (erts_monitor_dist_delete(&mdep->md.target)) + erts_monitor_release_both((ErtsMonitorData *) mdep); + else + erts_monitor_release(mon); + } + } + break; } - if (dep == erts_this_dist_entry) - goto done; - erts_smp_proc_lock(p, ERTS_PROC_LOCK_LINK); - erts_smp_de_rlock(dep); - if (ERTS_DE_IS_NOT_CONNECTED(dep)) { - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK); - erts_smp_de_runlock(dep); - goto do_trap; - } - erts_smp_de_links_lock(dep); - erts_smp_de_runlock(dep); - - if (Bool == am_true) { - ASSERT(dep->cid != NIL); - lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, - p->common.id); - ++ERTS_LINK_REFC(lnk); - lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); - ++ERTS_LINK_REFC(lnk); - } - else { - lnk = erts_lookup_link(dep->node_links, p->common.id); - if (lnk != NULL) { - if ((--ERTS_LINK_REFC(lnk)) == 0) { - erts_destroy_link(erts_remove_link(&(dep->node_links), - p->common.id)); - } - } - lnk = erts_lookup_link(ERTS_P_LINKS(p), Node); - if (lnk != NULL) { - if ((--ERTS_LINK_REFC(lnk)) == 0) { - erts_destroy_link(erts_remove_link(&ERTS_P_LINKS(p), - Node)); + case am_true: { + ErtsDSigData dsd; + dsd.node = Node; + + dep = erts_find_or_insert_dist_entry(Node); + if (dep == erts_this_dist_entry) + break; + + switch (erts_dsig_prepare(&dsd, dep, p, + ERTS_PROC_LOCK_MAIN, + ERTS_DSP_RLOCK, 0, async_connect)) { + case ERTS_DSIG_PREP_NOT_ALIVE: + case ERTS_DSIG_PREP_NOT_CONNECTED: + /* Trap to either send 'nodedown' or do passive connection attempt */ + goto do_trap; + case ERTS_DSIG_PREP_PENDING: + if (!async_connect) { + /* + * Pending connection may fail, so we must trap + * to ensure passive connection attempt + */ + erts_de_runlock(dep); + goto do_trap; } - } + /*fall through*/ + case ERTS_DSIG_PREP_CONNECTED: { + ErtsMonitor *mon; + ErtsMonitorDataExtended *mdep; + int created; + + mon = erts_monitor_tree_lookup_create(&ERTS_P_MONITORS(p), + &created, + ERTS_MON_TYPE_NODE, + p->common.id, + Node); + mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon); + if (created) { +#ifdef DEBUG + int inserted = +#endif + erts_monitor_dist_insert(&mdep->md.target, dep->mld); + ASSERT(inserted); + ASSERT(mdep->dist->connection_id == dep->connection_id); + } + else if (mdep->dist->connection_id != dep->connection_id) { + ErtsMonitorDataExtended *mdep2; + ErtsMonitor *mon2; +#ifdef DEBUG + int inserted; +#endif + mdep2 = ((ErtsMonitorDataExtended *) + erts_monitor_create(ERTS_MON_TYPE_NODE, NIL, + p->common.id, Node, NIL)); + mon2 = &mdep2->md.origin; +#ifdef DEBUG + inserted = +#endif + erts_monitor_dist_insert(&mdep->md.target, dep->mld); + ASSERT(inserted); + ASSERT(mdep2->dist->connection_id == dep->connection_id); + + mdep2->uptr.node_monitors = mdep->uptr.node_monitors; + mdep->uptr.node_monitors = NULL; + erts_monitor_tree_replace(&ERTS_P_MONITORS(p), mon, mon2); + erts_monitor_list_insert(&mdep2->uptr.node_monitors, mon); + mon->flags |= ERTS_ML_FLG_IN_SUBTABLE; + mdep = mdep2; + } + + mdep->u.refc++; + + break; + } + + default: + ERTS_ASSERT(! "Invalid dsig prepare result"); + } + + erts_de_runlock(dep); + + break; } - erts_smp_de_links_unlock(dep); - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK); + default: + goto badarg; + } - done: - erts_deref_dist_entry(dep); - BIF_RET(am_true); + ERTS_BIF_PREP_RET(ret, am_true); + +do_return: + + if (dep) + erts_deref_dist_entry(dep); + + return ret; + +do_trap: + ERTS_BIF_PREP_TRAP3(ret, dmonitor_node_trap, p, Node, Bool, Options); + goto do_return; + +badarg: + ERTS_BIF_PREP_ERROR(ret, p, BADARG); + goto do_return; } BIF_RETTYPE monitor_node_3(BIF_ALIST_3) @@ -3171,9 +3974,9 @@ BIF_RETTYPE net_kernel_dflag_unicode_io_1(BIF_ALIST_1) if (de == erts_this_dist_entry) { BIF_RET(am_true); } - erts_smp_de_rlock(de); + erts_de_rlock(de); f = de->flags; - erts_smp_de_runlock(de); + erts_de_runlock(de); BIF_RET(((f & DFLAG_UNICODE_IO) ? am_true : am_false)); } @@ -3194,18 +3997,9 @@ BIF_RETTYPE net_kernel_dflag_unicode_io_1(BIF_ALIST_1) #define ERTS_NODES_MON_OPT_TYPES \ (ERTS_NODES_MON_OPT_TYPE_VISIBLE|ERTS_NODES_MON_OPT_TYPE_HIDDEN) -typedef struct ErtsNodesMonitor_ ErtsNodesMonitor; -struct ErtsNodesMonitor_ { - ErtsNodesMonitor *prev; - ErtsNodesMonitor *next; - Process *proc; - Uint16 opts; - Uint16 no; -}; - -static erts_smp_mtx_t nodes_monitors_mtx; -static ErtsNodesMonitor *nodes_monitors; -static ErtsNodesMonitor *nodes_monitors_end; +static erts_mtx_t nodes_monitors_mtx; +static ErtsMonitor *nodes_monitors; +static Uint no_nodes_monitors; /* * Nodes monitors are stored in a double linked list. 'nodes_monitors' @@ -3221,111 +4015,172 @@ static ErtsNodesMonitor *nodes_monitors_end; static void init_nodes_monitors(void) { - erts_smp_mtx_init(&nodes_monitors_mtx, "nodes_monitors"); + erts_mtx_init(&nodes_monitors_mtx, "nodes_monitors", NIL, + ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION); nodes_monitors = NULL; - nodes_monitors_end = NULL; + no_nodes_monitors = 0; } -static ERTS_INLINE Uint -nodes_mon_msg_sz(ErtsNodesMonitor *nmp, Eterm what, Eterm reason) +Eterm +erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist) { - Uint sz; - if (!nmp->opts) { - sz = 3; - } - else { - sz = 0; + Eterm key, old_value, opts_list = olist; + Uint opts = (Uint) 0; - if (nmp->opts & ERTS_NODES_MON_OPT_TYPES) - sz += 2 + 3; + ASSERT(c_p); + ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); + + if (on != am_true && on != am_false) + return THE_NON_VALUE; + + if (is_not_nil(opts_list)) { + int all = 0, visible = 0, hidden = 0; - if (what == am_nodedown - && (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON)) { - if (is_not_immed(reason)) - sz += size_object(reason); - sz += 2 + 3; + while (is_list(opts_list)) { + Eterm *cp = list_val(opts_list); + Eterm opt = CAR(cp); + opts_list = CDR(cp); + if (opt == am_nodedown_reason) + opts |= ERTS_NODES_MON_OPT_DOWN_REASON; + else if (is_tuple(opt)) { + Eterm* tp = tuple_val(opt); + if (arityval(tp[0]) != 2) + return THE_NON_VALUE; + switch (tp[1]) { + case am_node_type: + switch (tp[2]) { + case am_visible: + if (hidden || all) + return THE_NON_VALUE; + opts |= ERTS_NODES_MON_OPT_TYPE_VISIBLE; + visible = 1; + break; + case am_hidden: + if (visible || all) + return THE_NON_VALUE; + opts |= ERTS_NODES_MON_OPT_TYPE_HIDDEN; + hidden = 1; + break; + case am_all: + if (visible || hidden) + return THE_NON_VALUE; + opts |= ERTS_NODES_MON_OPT_TYPES; + all = 1; + break; + default: + return THE_NON_VALUE; + } + break; + default: + return THE_NON_VALUE; + } + } + else { + return THE_NON_VALUE; + } } - sz += 4; + if (is_not_nil(opts_list)) + return THE_NON_VALUE; + } + + key = make_small(opts); + + if (on == am_true) { + ErtsMonitorDataExtended *mdep; + ErtsMonitor *omon; + int created; + omon = erts_monitor_tree_lookup_create(&ERTS_P_MONITORS(c_p), + &created, + ERTS_MON_TYPE_NODES, + c_p->common.id, + key); + mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(omon); + if (created) { + erts_mtx_lock(&nodes_monitors_mtx); + no_nodes_monitors++; + erts_monitor_list_insert(&nodes_monitors, &mdep->md.target); + erts_mtx_unlock(&nodes_monitors_mtx); + } + old_value = mdep->u.refc; + mdep->u.refc++; + } + else { + ErtsMonitorDataExtended *mdep; + ErtsMonitor *omon; + omon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p), key); + if (!omon) + old_value = 0; + else { + mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(omon); + old_value = mdep->u.refc; + ASSERT(mdep->u.refc > 0); + erts_mtx_lock(&nodes_monitors_mtx); + ASSERT(no_nodes_monitors > 0); + no_nodes_monitors--; + ASSERT(erts_monitor_is_in_table(&mdep->md.target)); + erts_monitor_list_delete(&nodes_monitors, &mdep->md.target); + erts_mtx_unlock(&nodes_monitors_mtx); + erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p), omon); + erts_monitor_release_both((ErtsMonitorData *) mdep); + } } - return sz; + + return erts_make_integer(old_value, c_p); } -static ERTS_INLINE void -send_nodes_mon_msg(Process *rp, - ErtsProcLocks *rp_locksp, - ErtsNodesMonitor *nmp, - Eterm node, - Eterm what, - Eterm type, - Eterm reason, - Uint sz) +void +erts_monitor_nodes_delete(ErtsMonitor *omon) { - Eterm msg; - Eterm *hp; - ErtsMessage *mp; - ErlOffHeap *ohp; -#ifdef DEBUG - Eterm *hend; -#endif + ErtsMonitorData *mdp; - mp = erts_alloc_message_heap(rp, rp_locksp, sz, &hp, &ohp); -#ifdef DEBUG - hend = hp + sz; -#endif + ASSERT(omon->type == ERTS_MON_TYPE_NODES); + ASSERT(erts_monitor_is_origin(omon)); - if (!nmp->opts) { - msg = TUPLE2(hp, what, node); -#ifdef DEBUG - hp += 3; -#endif - } - else { - Eterm tup; - Eterm info = NIL; + mdp = erts_monitor_to_data(omon); - if (nmp->opts & (ERTS_NODES_MON_OPT_TYPE_VISIBLE - | ERTS_NODES_MON_OPT_TYPE_HIDDEN)) { + erts_mtx_lock(&nodes_monitors_mtx); + ASSERT(erts_monitor_is_in_table(&mdp->target)); + ASSERT(no_nodes_monitors > 0); + no_nodes_monitors--; + erts_monitor_list_delete(&nodes_monitors, &mdp->target); + erts_mtx_unlock(&nodes_monitors_mtx); + erts_monitor_release_both(mdp); +} - tup = TUPLE2(hp, am_node_type, type); - hp += 3; - info = CONS(hp, tup, info); - hp += 2; - } +typedef struct { + Eterm pid; + Eterm options; +} ErtsNodesMonitorData; - if (what == am_nodedown - && (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON)) { - Eterm rsn_cpy; - - if (is_immed(reason)) - rsn_cpy = reason; - else { - Eterm rsn_sz = size_object(reason); - rsn_cpy = copy_struct(reason, rsn_sz, &hp, ohp); - } +typedef struct { + ErtsNodesMonitorData *nmdp; + Uint i; +} ErtsNodesMonitorContext; - tup = TUPLE2(hp, am_nodedown_reason, rsn_cpy); - hp += 3; - info = CONS(hp, tup, info); - hp += 2; - } +static void +save_nodes_monitor(ErtsMonitor *mon, void *vctxt) +{ + ErtsNodesMonitorContext *ctxt = vctxt; + ErtsMonitorData *mdp = erts_monitor_to_data(mon); - msg = TUPLE3(hp, what, node, info); -#ifdef DEBUG - hp += 4; -#endif - } + ASSERT(erts_monitor_is_target(mon)); + ASSERT(mon->type == ERTS_MON_TYPE_NODES); - ASSERT(hend == hp); - erts_queue_message(rp, *rp_locksp, mp, msg, am_system); + ctxt->nmdp[ctxt->i].pid = mon->other.item; + ctxt->nmdp[ctxt->i].options = mdp->origin.other.item; + + ctxt->i++; } static void send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reason) { - ErtsNodesMonitor *nmp; - ErtsProcLocks rp_locks = 0; /* Init to shut up false warning */ - Process *rp = NULL; + Uint opts; + Uint i, no, reason_size; + ErtsNodesMonitorData def_buf[100]; + ErtsNodesMonitorData *nmdp = &def_buf[0]; + ErtsNodesMonitorContext ctxt; ASSERT(is_immed(what)); ASSERT(is_immed(node)); @@ -3346,31 +4201,44 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas } #endif - ERTS_SMP_LC_ASSERT(!c_p - || (erts_proc_lc_my_proc_locks(c_p) - == ERTS_PROC_LOCK_MAIN)); - erts_smp_mtx_lock(&nodes_monitors_mtx); + ctxt.i = 0; + + reason_size = is_immed(reason) ? 0 : size_object(reason); - for (nmp = nodes_monitors; nmp; nmp = nmp->next) { - int i; - Uint16 no; - Uint sz; + erts_mtx_lock(&nodes_monitors_mtx); + if (no_nodes_monitors > sizeof(def_buf)/sizeof(def_buf[0])) + nmdp = erts_alloc(ERTS_ALC_T_TMP, + no_nodes_monitors*sizeof(ErtsNodesMonitorData)); + ctxt.nmdp = nmdp; + erts_monitor_list_foreach(nodes_monitors, + save_nodes_monitor, + (void *) &ctxt); + + ASSERT(ctxt.i == no_nodes_monitors); + no = no_nodes_monitors; + + erts_mtx_unlock(&nodes_monitors_mtx); - ASSERT(nmp->proc != NULL); + for (i = 0; i < no; i++) { + Eterm tmp_heap[3+2+3+2+4 /* max need */]; + Eterm *hp, msg; + Uint hsz; - if (!nmp->opts) { + ASSERT(is_small(nmdp[i].options)); + opts = (Uint) signed_val(nmdp[i].options); + if (!opts) { if (type != am_visible) continue; } else { switch (type) { case am_hidden: - if (!(nmp->opts & ERTS_NODES_MON_OPT_TYPE_HIDDEN)) + if (!(opts & ERTS_NODES_MON_OPT_TYPE_HIDDEN)) continue; break; case am_visible: - if ((nmp->opts & ERTS_NODES_MON_OPT_TYPES) - && !(nmp->opts & ERTS_NODES_MON_OPT_TYPE_VISIBLE)) + if ((opts & ERTS_NODES_MON_OPT_TYPES) + && !(opts & ERTS_NODES_MON_OPT_TYPE_VISIBLE)) continue; break; default: @@ -3378,342 +4246,162 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas } } - if (rp != nmp->proc) { - if (rp) { - if (rp == c_p) - rp_locks &= ~ERTS_PROC_LOCK_MAIN; - erts_smp_proc_unlock(rp, rp_locks); - } + hsz = 0; + hp = &tmp_heap[0]; - rp = nmp->proc; - rp_locks = 0; - if (rp == c_p) - rp_locks |= ERTS_PROC_LOCK_MAIN; - } + if (!opts) { + msg = TUPLE2(hp, what, node); + hp += 3; + } + else { + Eterm tup; + Eterm info = NIL; + + if (opts & (ERTS_NODES_MON_OPT_TYPE_VISIBLE + | ERTS_NODES_MON_OPT_TYPE_HIDDEN)) { + + tup = TUPLE2(hp, am_node_type, type); + hp += 3; + info = CONS(hp, tup, info); + hp += 2; + } + + if (what == am_nodedown + && (opts & ERTS_NODES_MON_OPT_DOWN_REASON)) { + hsz += reason_size; + tup = TUPLE2(hp, am_nodedown_reason, reason); + hp += 3; + info = CONS(hp, tup, info); + hp += 2; + } + + msg = TUPLE3(hp, what, node, info); + hp += 4; + } - ASSERT(rp); + ASSERT(hp - &tmp_heap[0] <= sizeof(tmp_heap)/sizeof(tmp_heap[0])); - sz = nodes_mon_msg_sz(nmp, what, reason); + hsz += hp - &tmp_heap[0]; - for (i = 0, no = nmp->no; i < no; i++) - send_nodes_mon_msg(rp, - &rp_locks, - nmp, - node, - what, - type, - reason, - sz); + erts_proc_sig_send_persistent_monitor_msg(ERTS_MON_TYPE_NODES, + nmdp[i].options, + am_system, + nmdp[i].pid, + msg, + hsz); } - if (rp) { - if (rp == c_p) - rp_locks &= ~ERTS_PROC_LOCK_MAIN; - erts_smp_proc_unlock(rp, rp_locks); - } - - erts_smp_mtx_unlock(&nodes_monitors_mtx); + if (nmdp != &def_buf[0]) + erts_free(ERTS_ALC_T_TMP, nmdp); } + -static Eterm -insert_nodes_monitor(Process *c_p, Uint32 opts) -{ - Uint16 no = 1; - Eterm res = am_false; - ErtsNodesMonitor *xnmp, *nmp; - - ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&nodes_monitors_mtx)); - ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN); - - xnmp = c_p->nodes_monitors; - if (xnmp) { - ASSERT(!xnmp->prev || xnmp->prev->proc != c_p); - - while (1) { - ASSERT(xnmp->proc == c_p); - if (xnmp->opts == opts) - break; - if (!xnmp->next || xnmp->next->proc != c_p) - break; - xnmp = xnmp->next; - } - ASSERT(xnmp); - ASSERT(xnmp->proc == c_p); - ASSERT(xnmp->opts == opts - || !xnmp->next - || xnmp->next->proc != c_p); - - if (xnmp->opts != opts) - goto alloc_new; - else { - res = am_true; - no = xnmp->no++; - if (!xnmp->no) { - /* - * 'no' wrapped; transfer all prevous monitors to new - * element (which will be the next element in the list) - * and set this to one... - */ - xnmp->no = 1; - goto alloc_new; - } - } - } - else { - alloc_new: - nmp = erts_alloc(ERTS_ALC_T_NODES_MON, sizeof(ErtsNodesMonitor)); - nmp->proc = c_p; - nmp->opts = opts; - nmp->no = no; - - if (xnmp) { - ASSERT(nodes_monitors); - ASSERT(c_p->nodes_monitors); - nmp->next = xnmp->next; - nmp->prev = xnmp; - xnmp->next = nmp; - if (nmp->next) { - ASSERT(nodes_monitors_end != xnmp); - ASSERT(nmp->next->prev == xnmp); - nmp->next->prev = nmp; - } - else { - ASSERT(nodes_monitors_end == xnmp); - nodes_monitors_end = nmp; - } - } - else { - ASSERT(!c_p->nodes_monitors); - c_p->nodes_monitors = nmp; - nmp->next = NULL; - nmp->prev = nodes_monitors_end; - if (nodes_monitors_end) { - ASSERT(nodes_monitors); - nodes_monitors_end->next = nmp; - } - else { - ASSERT(!nodes_monitors); - nodes_monitors = nmp; - } - nodes_monitors_end = nmp; - } - } - return res; -} - -static Eterm -remove_nodes_monitors(Process *c_p, Uint32 opts, int all) -{ - Eterm res = am_false; - ErtsNodesMonitor *nmp; - - ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&nodes_monitors_mtx)); - ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN); - - nmp = c_p->nodes_monitors; - ASSERT(!nmp || !nmp->prev || nmp->prev->proc != c_p); - - while (nmp && nmp->proc == c_p) { - if (!all && nmp->opts != opts) - nmp = nmp->next; - else { /* if (all || nmp->opts == opts) */ - ErtsNodesMonitor *free_nmp; - res = am_true; - if (nmp->prev) { - ASSERT(nodes_monitors != nmp); - nmp->prev->next = nmp->next; - } - else { - ASSERT(nodes_monitors == nmp); - nodes_monitors = nmp->next; - } - if (nmp->next) { - ASSERT(nodes_monitors_end != nmp); - nmp->next->prev = nmp->prev; - } - else { - ASSERT(nodes_monitors_end == nmp); - nodes_monitors_end = nmp->prev; - } - free_nmp = nmp; - nmp = nmp->next; - if (c_p->nodes_monitors == free_nmp) - c_p->nodes_monitors = nmp && nmp->proc == c_p ? nmp : NULL; - erts_free(ERTS_ALC_T_NODES_MON, free_nmp); - } - } - - ASSERT(!all || !c_p->nodes_monitors); - return res; -} - -void -erts_delete_nodes_monitors(Process *c_p, ErtsProcLocks locks) -{ -#if defined(ERTS_ENABLE_LOCK_CHECK) && defined(ERTS_SMP) - if (c_p) { - ErtsProcLocks might_unlock = locks & ~ERTS_PROC_LOCK_MAIN; - if (might_unlock) - erts_proc_lc_might_unlock(c_p, might_unlock); - } -#endif - if (erts_smp_mtx_trylock(&nodes_monitors_mtx) == EBUSY) { - ErtsProcLocks unlock_locks = locks & ~ERTS_PROC_LOCK_MAIN; - if (c_p && unlock_locks) - erts_smp_proc_unlock(c_p, unlock_locks); - erts_smp_mtx_lock(&nodes_monitors_mtx); - if (c_p && unlock_locks) - erts_smp_proc_lock(c_p, unlock_locks); - } - remove_nodes_monitors(c_p, 0, 1); - erts_smp_mtx_unlock(&nodes_monitors_mtx); -} - -Eterm -erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist) -{ +typedef struct { + Eterm **hpp; + Uint *szp; Eterm res; - Eterm opts_list = olist; - Uint16 opts = (Uint16) 0; +} ErtsNodesMonitorInfoContext; - ASSERT(c_p); - ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); - if (on != am_true && on != am_false) - return THE_NON_VALUE; - - if (is_not_nil(opts_list)) { - int all = 0, visible = 0, hidden = 0; - - while (is_list(opts_list)) { - Eterm *cp = list_val(opts_list); - Eterm opt = CAR(cp); - opts_list = CDR(cp); - if (opt == am_nodedown_reason) - opts |= ERTS_NODES_MON_OPT_DOWN_REASON; - else if (is_tuple(opt)) { - Eterm* tp = tuple_val(opt); - if (arityval(tp[0]) != 2) - return THE_NON_VALUE; - switch (tp[1]) { - case am_node_type: - switch (tp[2]) { - case am_visible: - if (hidden || all) - return THE_NON_VALUE; - opts |= ERTS_NODES_MON_OPT_TYPE_VISIBLE; - visible = 1; - break; - case am_hidden: - if (visible || all) - return THE_NON_VALUE; - opts |= ERTS_NODES_MON_OPT_TYPE_HIDDEN; - hidden = 1; - break; - case am_all: - if (visible || hidden) - return THE_NON_VALUE; - opts |= ERTS_NODES_MON_OPT_TYPES; - all = 1; - break; - default: - return THE_NON_VALUE; - } - break; - default: - return THE_NON_VALUE; - } - } - else { - return THE_NON_VALUE; - } - } - - if (is_not_nil(opts_list)) - return THE_NON_VALUE; +static void +nodes_monitor_info(ErtsMonitor *mon, void *vctxt) +{ + ErtsMonitorDataExtended *mdep; + ErtsNodesMonitorInfoContext *ctxt = vctxt; + Uint no, i, opts, *szp; + Eterm **hpp, res; + + hpp = ctxt->hpp; + szp = ctxt->szp; + res = ctxt->res; + + ASSERT(erts_monitor_is_target(mon)); + ASSERT(mon->type == ERTS_MON_TYPE_NODES); + mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon); + no = mdep->u.refc; + + ASSERT(is_small(mdep->md.origin.other.item)); + opts = (Uint) signed_val(mdep->md.origin.other.item); + + for (i = 0; i < no; i++) { + Eterm olist = NIL; + if (opts & ERTS_NODES_MON_OPT_TYPES) { + Eterm type; + switch (opts & ERTS_NODES_MON_OPT_TYPES) { + case ERTS_NODES_MON_OPT_TYPES: type = am_all; break; + case ERTS_NODES_MON_OPT_TYPE_VISIBLE: type = am_visible; break; + case ERTS_NODES_MON_OPT_TYPE_HIDDEN: type = am_hidden; break; + default: erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n"); + } + olist = erts_bld_cons(hpp, szp, + erts_bld_tuple(hpp, szp, 2, + am_node_type, + type), + olist); + } + if (opts & ERTS_NODES_MON_OPT_DOWN_REASON) + olist = erts_bld_cons(hpp, szp, am_nodedown_reason, olist); + res = erts_bld_cons(hpp, szp, + erts_bld_tuple(hpp, szp, 2, + mon->other.item, + olist), + res); } - erts_smp_mtx_lock(&nodes_monitors_mtx); - - if (on == am_true) - res = insert_nodes_monitor(c_p, opts); - else - res = remove_nodes_monitors(c_p, opts, 0); - - erts_smp_mtx_unlock(&nodes_monitors_mtx); - - return res; + ctxt->hpp = hpp; + ctxt->szp = szp; + ctxt->res = res; } -/* - * Note, this function is only used for debuging. - */ - Eterm erts_processes_monitoring_nodes(Process *c_p) { - ErtsNodesMonitor *nmp; - Eterm res; + /* + * Note, this function is only used for debugging. + */ + ErtsNodesMonitorInfoContext ctxt; Eterm *hp; - Eterm **hpp; Uint sz; - Uint *szp; #ifdef DEBUG Eterm *hend; #endif ASSERT(c_p); - ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); - erts_smp_mtx_lock(&nodes_monitors_mtx); + ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); + + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN); + erts_thr_progress_block(); + + erts_mtx_lock(&nodes_monitors_mtx); sz = 0; - szp = &sz; - hpp = NULL; + ctxt.szp = &sz; + ctxt.hpp = NULL; - bld_result: - res = NIL; - - for (nmp = nodes_monitors_end; nmp; nmp = nmp->prev) { - Uint16 i; - for (i = 0; i < nmp->no; i++) { - Eterm olist = NIL; - if (nmp->opts & ERTS_NODES_MON_OPT_TYPES) { - Eterm type; - switch (nmp->opts & ERTS_NODES_MON_OPT_TYPES) { - case ERTS_NODES_MON_OPT_TYPES: type = am_all; break; - case ERTS_NODES_MON_OPT_TYPE_VISIBLE: type = am_visible; break; - case ERTS_NODES_MON_OPT_TYPE_HIDDEN: type = am_hidden; break; - default: erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n"); - } - olist = erts_bld_cons(hpp, szp, - erts_bld_tuple(hpp, szp, 2, - am_node_type, - type), - olist); - } - if (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON) - olist = erts_bld_cons(hpp, szp, am_nodedown_reason, olist); - res = erts_bld_cons(hpp, szp, - erts_bld_tuple(hpp, szp, 2, - nmp->proc->common.id, - olist), - res); - } - } + while (1) { + ctxt.res = NIL; + + erts_monitor_list_foreach(nodes_monitors, + nodes_monitor_info, + (void *) &ctxt); + + if (ctxt.hpp) + break; - if (!hpp) { hp = HAlloc(c_p, sz); #ifdef DEBUG hend = hp + sz; #endif - hpp = &hp; - szp = NULL; - goto bld_result; + ctxt.hpp = &hp; + ctxt.szp = NULL; } ASSERT(hp == hend); - erts_smp_mtx_unlock(&nodes_monitors_mtx); + erts_mtx_unlock(&nodes_monitors_mtx); - return res; + erts_thr_progress_unblock(); + erts_proc_lock(c_p, ERTS_PROC_LOCK_MAIN); + + return ctxt.res; } |