diff options
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r-- | erts/emulator/beam/dist.c | 1930 |
1 files changed, 901 insertions, 1029 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 30390cdb5e..f203d85ca9 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1996-2017. All Rights Reserved. + * Copyright Ericsson AB 1996-2018. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,6 +45,7 @@ #include "erl_binary.h" #include "erl_thr_progress.h" #include "dtrace-wrapper.h" +#include "erl_proc_sig_queue.h" #define DIST_CTL_DEFAULT_SIZE 64 @@ -109,7 +110,6 @@ int erts_dist_buf_busy_limit; /* distribution trap functions */ Export* dmonitor_node_trap = NULL; -Export* dmonitor_p_trap = NULL; /* local variables */ static Export *dist_ctrl_put_data_trap; @@ -184,6 +184,161 @@ get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs) } } +#define ERTS_MON_LNK_FIRE_LIMIT 100 + +static void monitor_connection_down(ErtsMonitor *mon, void *unused) +{ + if (erts_monitor_is_origin(mon)) + erts_proc_sig_send_demonitor(mon); + else + erts_proc_sig_send_monitor_down(mon, am_noconnection); +} + +static void link_connection_down(ErtsLink *lnk, void *vdist) +{ + erts_proc_sig_send_link_exit(NULL, THE_NON_VALUE, lnk, + am_noconnection, NIL); +} + +typedef enum { + ERTS_CML_CLEANUP_STATE_LINKS, + ERTS_CML_CLEANUP_STATE_MONITORS, + ERTS_CML_CLEANUP_STATE_ONAME_MONITORS, + ERTS_CML_CLEANUP_STATE_NODE_MONITORS +} ErtsConMonLnkCleaupState; + +typedef struct { + ErtsConMonLnkCleaupState state; + ErtsMonLnkDist *dist; + void *yield_state; + int trigger_node_monitors; + Eterm nodename; + Eterm visability; + Eterm reason; + ErlOffHeap oh; + Eterm heap[1]; +} ErtsConMonLnkCleanup; + +static void +con_monitor_link_cleanup(void *vcmlcp) +{ + ErtsConMonLnkCleanup *cmlcp = vcmlcp; + ErtsMonLnkDist *dist = cmlcp->dist; + ErtsSchedulerData *esdp; + int yield; + + switch (cmlcp->state) { + case ERTS_CML_CLEANUP_STATE_LINKS: + yield = erts_link_list_foreach_delete_yielding(&dist->links, + link_connection_down, + NULL, &cmlcp->yield_state, + ERTS_MON_LNK_FIRE_LIMIT); + if (yield) + break; + + ASSERT(!cmlcp->yield_state); + cmlcp->state = ERTS_CML_CLEANUP_STATE_MONITORS; + case ERTS_CML_CLEANUP_STATE_MONITORS: + yield = erts_monitor_list_foreach_delete_yielding(&dist->monitors, + monitor_connection_down, + NULL, &cmlcp->yield_state, + ERTS_MON_LNK_FIRE_LIMIT); + if (yield) + break; + + ASSERT(!cmlcp->yield_state); + cmlcp->state = ERTS_CML_CLEANUP_STATE_ONAME_MONITORS; + case ERTS_CML_CLEANUP_STATE_ONAME_MONITORS: + yield = erts_monitor_tree_foreach_delete_yielding(&dist->orig_name_monitors, + monitor_connection_down, + NULL, &cmlcp->yield_state, + ERTS_MON_LNK_FIRE_LIMIT/2); + if (yield) + break; + + cmlcp->dist = NULL; + erts_mon_link_dist_dec_refc(dist); + + ASSERT(!cmlcp->yield_state); + cmlcp->state = ERTS_CML_CLEANUP_STATE_NODE_MONITORS; + case ERTS_CML_CLEANUP_STATE_NODE_MONITORS: + if (cmlcp->trigger_node_monitors) { + send_nodes_mon_msgs(NULL, + am_nodedown, + cmlcp->nodename, + cmlcp->visability, + cmlcp->reason); + } + erts_cleanup_offheap(&cmlcp->oh); + erts_free(ERTS_ALC_T_CML_CLEANUP, vcmlcp); + return; /* done */ + } + + /* yield... */ + + esdp = erts_get_scheduler_data(); + ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL); + erts_schedule_misc_aux_work((int) esdp->no, + con_monitor_link_cleanup, + (void *) cmlcp); +} + +static void +schedule_con_monitor_link_cleanup(ErtsMonLnkDist *dist, + Eterm nodename, + Eterm visability, + Eterm reason) +{ + if (dist || is_value(nodename)) { + ErtsSchedulerData *esdp; + ErtsConMonLnkCleanup *cmlcp; + Uint rsz, size; + + size = sizeof(ErtsConMonLnkCleanup); + + if (is_non_value(reason) || is_immed(reason)) { + rsz = 0; + size -= sizeof(Eterm); + } + else { + rsz = size_object(reason); + size += sizeof(Eterm) * (rsz - 1); + } + + cmlcp = erts_alloc(ERTS_ALC_T_CML_CLEANUP, size); + + ERTS_INIT_OFF_HEAP(&cmlcp->oh); + + cmlcp->yield_state = NULL; + cmlcp->dist = dist; + if (!dist) + cmlcp->state = ERTS_CML_CLEANUP_STATE_NODE_MONITORS; + else { + cmlcp->state = ERTS_CML_CLEANUP_STATE_LINKS; + erts_mtx_lock(&dist->mtx); + ASSERT(dist->alive); + dist->alive = 0; + erts_mtx_unlock(&dist->mtx); + } + + cmlcp->trigger_node_monitors = is_value(nodename); + cmlcp->nodename = nodename; + cmlcp->visability = visability; + if (rsz == 0) + cmlcp->reason = reason; + else { + Eterm *hp = &cmlcp->heap[0]; + cmlcp->reason = copy_struct(reason, rsz, &hp, &cmlcp->oh); + } + + esdp = erts_get_scheduler_data(); + ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL); + erts_schedule_misc_aux_work((int) esdp->no, + con_monitor_link_cleanup, + (void *) cmlcp); + } +} + /* ** A full node name constists of a "n@h" ** @@ -235,170 +390,6 @@ int is_node_name_atom(Eterm a) return is_node_name((char*)atom_tab(i)->name, atom_tab(i)->len); } -typedef struct { - DistEntry *dep; - Eterm *lhp; -} NetExitsContext; - -/* -** This function is called when a distribution -** port or process terminates -*/ -static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp) -{ - Process *rp; - ErtsMonitor *rmon; - DistEntry *dep = ((NetExitsContext *) vnecp)->dep; - ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK; - - rp = erts_pid2proc(NULL, 0, mon->u.pid, rp_locks); - if (!rp) - goto done; - - if (mon->type == MON_ORIGIN) { - /* local pid is being monitored */ - rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref); - /* ASSERT(rmon != NULL); nope, can happen during process exit */ - if (rmon != NULL) { - erts_destroy_monitor(rmon); - } - } else { - DeclareTmpHeapNoproc(lhp,3); - Eterm watched; - UseTmpHeapNoproc(3); - ASSERT(mon->type == MON_TARGET); - rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref); - /* ASSERT(rmon != NULL); can happen during process exit */ - if (rmon != NULL) { - ASSERT(rmon->type == MON_ORIGIN); - ASSERT(is_atom(rmon->name) || is_nil(rmon->name)); - watched = (is_atom(rmon->name) - ? TUPLE2(lhp, rmon->name, dep->sysname) - : rmon->u.pid); - rp_locks |= ERTS_PROC_LOCKS_MSG_SEND; - erts_proc_lock(rp, ERTS_PROC_LOCKS_MSG_SEND); - erts_queue_monitor_message(rp, &rp_locks, mon->ref, am_process, - watched, am_noconnection); - erts_destroy_monitor(rmon); - } - UnUseTmpHeapNoproc(3); - } - erts_proc_unlock(rp, rp_locks); - done: - erts_destroy_monitor(mon); -} - -typedef struct { - NetExitsContext *necp; - ErtsLink *lnk; -} LinkNetExitsContext; - -/* -** This is the function actually doing the job of sending exit messages -** for links in a dist entry upon net_exit (the node goes down), NB, -** only process links, not node monitors are handled here, -** they reside in a separate tree.... -*/ -static void doit_link_net_exits_sub(ErtsLink *sublnk, void *vlnecp) -{ - ErtsLink *lnk = ((LinkNetExitsContext *) vlnecp)->lnk; /* the local pid */ - ErtsLink *rlnk; - Process *rp; - - ASSERT(lnk->type == LINK_PID); - if (is_internal_pid(lnk->pid)) { - int xres; - ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND; - - rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks); - if (!rp) { - goto done; - } - - rlnk = erts_remove_link(&ERTS_P_LINKS(rp), sublnk->pid); - xres = erts_send_exit_signal(NULL, - sublnk->pid, - rp, - &rp_locks, - am_noconnection, - NIL, - NULL, - 0); - - if (rlnk) { - erts_destroy_link(rlnk); - if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) { - /* We didn't exit the process and it is traced */ - trace_proc(NULL, 0, rp, am_getting_unlinked, sublnk->pid); - } - } - erts_proc_unlock(rp, rp_locks); - } - done: - erts_destroy_link(sublnk); - -} - - - - - -/* -** This function is called when a distribution -** port or process terminates, once for each link on the high level, -** it in turn traverses the link subtree for the specific link node... -*/ -static void doit_link_net_exits(ErtsLink *lnk, void *vnecp) -{ - LinkNetExitsContext lnec = {(NetExitsContext *) vnecp, lnk}; - ASSERT(lnk->type == LINK_PID); - erts_sweep_links(ERTS_LINK_ROOT(lnk), &doit_link_net_exits_sub, (void *) &lnec); -#ifdef DEBUG - ERTS_LINK_ROOT(lnk) = NULL; -#endif - erts_destroy_link(lnk); -} - - -static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp) -{ - DistEntry *dep = ((NetExitsContext *) vnecp)->dep; - Eterm name = dep->sysname; - Process *rp; - ErtsLink *rlnk; - Uint i,n; - ASSERT(lnk->type == LINK_NODE); - if (is_internal_pid(lnk->pid)) { - ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK; - ErlOffHeap *ohp; - rp = erts_proc_lookup(lnk->pid); - if (!rp) - goto done; - erts_proc_lock(rp, rp_locks); - if (!ERTS_PROC_IS_EXITING(rp)) { - rlnk = erts_remove_link(&ERTS_P_LINKS(rp), name); - if (rlnk != NULL) { - ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE)); - erts_destroy_link(rlnk); - } - n = ERTS_LINK_REFC(lnk); - for (i = 0; i < n; ++i) { - Eterm tup; - Eterm *hp; - ErtsMessage *msgp; - - msgp = erts_alloc_message_heap(rp, &rp_locks, - 3, &hp, &ohp); - tup = TUPLE2(hp, am_nodedown, name); - erts_queue_message(rp, rp_locks, msgp, tup, am_system); - } - } - erts_proc_unlock(rp, rp_locks); - } - done: - erts_destroy_link(lnk); -} - static void set_node_not_alive(void *unused) { @@ -444,15 +435,8 @@ inc_no_nodes(void) static void kill_dist_ctrl_proc(void *vpid) { - Eterm pid = (Eterm) vpid; - ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; - Process *rp = erts_pid2proc(NULL, 0, pid, rp_locks); - if (rp) { - erts_send_exit_signal(NULL, rp->common.id, rp, &rp_locks, - am_kill, NIL, NULL, 0); - if (rp_locks) - erts_proc_unlock(rp, rp_locks); - } + erts_proc_sig_send_exit(NULL, (Eterm) vpid, (Eterm) vpid, + am_kill, NIL, 0); } static void @@ -572,10 +556,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) } } else { /* Call from distribution controller (port/process) */ - NetExitsContext nec = {dep}; - ErtsLink *nlinks; - ErtsLink *node_links; - ErtsMonitor *monitors; + ErtsMonLnkDist *mld; Uint32 flags; erts_atomic_set_mb(&dep->dist_cmd_scheduled, 1); @@ -588,27 +569,21 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) erts_port_task_abort(&dep->dist_cmd); } - if (dep->status & ERTS_DE_SFLG_EXITING) { + if (dep->state == ERTS_DE_STATE_EXITING) { #ifdef DEBUG ASSERT(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT); #endif } else { - dep->status |= ERTS_DE_SFLG_EXITING; + dep->state = ERTS_DE_STATE_EXITING; erts_mtx_lock(&dep->qlock); ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT); erts_mtx_unlock(&dep->qlock); } - erts_de_links_lock(dep); - monitors = dep->monitors; - nlinks = dep->nlinks; - node_links = dep->node_links; - dep->monitors = NULL; - dep->nlinks = NULL; - dep->node_links = NULL; - erts_de_links_unlock(dep); + mld = dep->mld; + dep->mld = NULL; nodename = dep->sysname; flags = dep->flags; @@ -617,15 +592,14 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) erts_de_rwunlock(dep); - erts_sweep_monitors(monitors, &doit_monitor_net_exits, (void *) &nec); - erts_sweep_links(nlinks, &doit_link_net_exits, (void *) &nec); - erts_sweep_links(node_links, &doit_node_link_net_exits, (void *) &nec); - - send_nodes_mon_msgs(NULL, - am_nodedown, - nodename, - flags & DFLAG_PUBLISHED ? am_visible : am_hidden, - reason == am_normal ? am_connection_closed : reason); + schedule_con_monitor_link_cleanup(mld, + nodename, + (flags & DFLAG_PUBLISHED + ? am_visible + : am_hidden), + (reason == am_normal + ? am_connection_closed + : reason)); clear_dist_entry(dep); } @@ -641,6 +615,14 @@ trap_function(Eterm func, int arity) return erts_export_put(am_erlang, func, arity); } +/* + * Sync with dist_util.erl: + * + * -record(erts_dflags, + * {default, mandatory, addable, rejectable, strict_order}). + */ +static Eterm erts_dflags_record; + void init_dist(void) { init_nodes_monitors(); @@ -653,10 +635,19 @@ void init_dist(void) /* Lookup/Install all references to trap functions */ dmonitor_node_trap = trap_function(am_dmonitor_node,3); - dmonitor_p_trap = trap_function(am_dmonitor_p, 2); dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, am_dist_ctrl_put_data, 2); + { + Eterm* hp = erts_alloc(ERTS_ALC_T_LITERAL, (1+6)*sizeof(Eterm)); + erts_dflags_record = TUPLE6(hp, am_erts_dflags, + make_small(DFLAG_DIST_DEFAULT), + make_small(DFLAG_DIST_MANDATORY), + make_small(DFLAG_DIST_ADDABLE), + make_small(DFLAG_DIST_REJECTABLE), + make_small(DFLAG_DIST_STRICT_ORDER)); + erts_set_literal_tag(&erts_dflags_record, hp, (1+6)); + } } #define ErtsDistOutputBuf2Binary(OB) \ @@ -753,21 +744,13 @@ static void clear_dist_entry(DistEntry *dep) cache = dep->cache; dep->cache = NULL; -#ifdef DEBUG - erts_de_links_lock(dep); - ASSERT(!dep->nlinks); - ASSERT(!dep->node_links); - ASSERT(!dep->monitors); - erts_de_links_unlock(dep); -#endif - erts_mtx_lock(&dep->qlock); erts_atomic64_set_nob(&dep->in, 0); erts_atomic64_set_nob(&dep->out, 0); obuf = clear_de_out_queues(dep); - dep->status = 0; + dep->state = ERTS_DE_STATE_IDLE; suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); erts_mtx_unlock(&dep->qlock); @@ -869,8 +852,7 @@ erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote) /* A local process that's being monitored by a remote one exits. We send: - {DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason}, - which is rather sad as only the ref is needed, no pid's... */ + {DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason} */ int erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, Eterm ref, Eterm reason) @@ -891,12 +873,6 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT), watched, watcher, ref, reason); -#ifdef DEBUG - erts_de_links_lock(dsdp->dep); - ASSERT(!erts_lookup_monitor(dsdp->dep->monitors, ref)); - erts_de_links_unlock(dsdp->dep); -#endif - res = dsig_send_ctl(dsdp, ctl, 1); UnUseTmpHeapNoproc(6); return res; @@ -935,8 +911,7 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, /* A local process monitoring a remote one wants to stop monitoring, either because of a demonitor bif call or because the local process died. We send - {DOP_DEMONITOR_P, Local pid, Remote pid or name, ref}, which is once again - rather redundant as only the ref will be needed on the other side... */ + {DOP_DEMONITOR_P, Local pid, Remote pid or name, ref} */ int erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, Eterm ref, int force) @@ -962,6 +937,24 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher, return res; } +static int can_send_seqtrace_token(ErtsSendContext* ctx, Eterm token) { + Eterm label; + + if (ctx->dep->flags & DFLAG_BIG_SEQTRACE_LABELS) { + /* The other end is capable of handling arbitrary seq_trace labels. */ + return 1; + } + + /* The other end only tolerates smalls, but since we could potentially be + * talking to an old 32-bit emulator from a 64-bit one, we have to check + * whether the label is small on any emulator. */ + label = SEQ_TRACE_T_LABEL(token); + + return is_small(label) && + signed_val(label) <= (ERTS_SINT32_MAX >> _TAG_IMMED1_SIZE) && + signed_val(label) >= (ERTS_SINT32_MIN >> _TAG_IMMED1_SIZE); +} + int erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) { @@ -995,37 +988,38 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) "%T", remote); msize = size_object(message); if (have_seqtrace(token)) { - tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); + tok_label = SEQ_TRACE_T_DTRACE_LABEL(token); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); } } #endif - if (token != NIL) { - Eterm el1, el2; - if (ctx->dep->flags & DFLAG_SEND_SENDER) { - el1 = make_small(DOP_SEND_SENDER_TT); - el2 = sender->common.id; - } - else { - el1 = make_small(DOP_SEND_TT); - el2 = am_Empty; - } - ctl = TUPLE4(&ctx->ctl_heap[0], el1, el2, remote, token); - } - else { - Eterm el1, el2; + { + Eterm dist_op, sender_id; + int send_token; + + send_token = (token != NIL && can_send_seqtrace_token(ctx, token)); + if (ctx->dep->flags & DFLAG_SEND_SENDER) { - el1 = make_small(DOP_SEND_SENDER); - el2 = sender->common.id; + dist_op = make_small(send_token ? + DOP_SEND_SENDER_TT : + DOP_SEND_SENDER); + sender_id = sender->common.id; + } else { + dist_op = make_small(send_token ? + DOP_SEND_TT : + DOP_SEND); + sender_id = am_Empty; } - else { - el1 = make_small(DOP_SEND); - el2 = am_Empty; + + if (send_token) { + ctl = TUPLE4(&ctx->ctl_heap[0], dist_op, sender_id, remote, token); + } else { + ctl = TUPLE3(&ctx->ctl_heap[0], dist_op, sender_id, remote); } - ctl = TUPLE3(&ctx->ctl_heap[0], el1, el2, remote); } + DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); DTRACE7(message_send_remote, sender_name, node_name, receiver_name, @@ -1071,19 +1065,20 @@ erts_dsig_send_reg_msg(Eterm remote_name, Eterm message, "{%T,%s}", remote_name, node_name); msize = size_object(message); if (have_seqtrace(token)) { - tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); + tok_label = SEQ_TRACE_T_DTRACE_LABEL(token); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); } } #endif - if (token != NIL) + if (token != NIL && can_send_seqtrace_token(ctx, token)) ctl = TUPLE5(&ctx->ctl_heap[0], make_small(DOP_REG_SEND_TT), sender->common.id, am_Empty, remote_name, token); else ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_REG_SEND), sender->common.id, am_Empty, remote_name); + DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); DTRACE7(message_send_remote, sender_name, node_name, receiver_name, @@ -1135,7 +1130,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, erts_snprintf(reason_str, sizeof(DTRACE_CHARBUF_NAME(reason_str)), "%T", reason); if (have_seqtrace(token)) { - tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); + tok_label = SEQ_TRACE_T_DTRACE_LABEL(token); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); } @@ -1258,8 +1253,6 @@ int erts_net_message(Port *prt, Sint type; Eterm token; Eterm token_size; - ErtsMonitor *mon; - ErtsLink *lnk; Uint tuple_arity; int res; Uint32 connection_id; @@ -1357,88 +1350,89 @@ int erts_net_message(Port *prt, token = NIL; switch (type = unsigned_val(tuple[1])) { - case DOP_LINK: + case DOP_LINK: { + ErtsDSigData dsd; + int code; + if (tuple_arity != 3) { goto invalid_message; } from = tuple[2]; to = tuple[3]; /* local proc to link to */ - if (is_not_pid(from) || is_not_pid(to)) { - goto invalid_message; - } + if (is_not_external_pid(from)) + goto invalid_message; - rp = erts_pid2proc_opt(NULL, 0, - to, ERTS_PROC_LOCK_LINK, - ERTS_P2P_FLG_ALLOW_OTHER_X); - if (!rp) { - /* This is tricky (we MUST force a distributed send) */ - ErtsDSigData dsd; - int code; - code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { - code = erts_dsig_send_exit(&dsd, to, from, am_noproc); - ASSERT(code == ERTS_DSIG_SEND_OK); - } - break; - } + if (dep != external_pid_dist_entry(from)) + goto invalid_message; - erts_de_links_lock(dep); - res = erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, from); + if (is_external_pid(to)) { + if (external_pid_dist_entry(to) != erts_this_dist_entry) + goto invalid_message; + /* old incarnation of node; reply noproc... */ + } + else if (is_internal_pid(to)) { + ErtsLinkData *ldp = erts_link_create(ERTS_LNK_TYPE_DIST_PROC, + from, to); + ASSERT(ldp->a.other.item == to); + ASSERT(eq(ldp->b.other.item, from)); +#ifdef DEBUG + code = +#endif + erts_link_dist_insert(&ldp->a, dep->mld); + ASSERT(code); - if (res < 0) { - /* It was already there! Lets skip the rest... */ - erts_de_links_unlock(dep); - erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - break; - } - lnk = erts_add_or_lookup_link(&(dep->nlinks), LINK_PID, rp->common.id); - erts_add_link(&(ERTS_LINK_ROOT(lnk)), LINK_PID, from); - erts_de_links_unlock(dep); + if (erts_proc_sig_send_link(NULL, to, &ldp->b)) + break; /* done */ - if (IS_TRACED_FL(rp, F_TRACE_PROCS)) - trace_proc(NULL, 0, rp, am_getting_linked, from); + /* Failed to send signal; cleanup and reply noproc... */ + +#ifdef DEBUG + code = +#endif + erts_link_dist_delete(&ldp->a); + ASSERT(code); + erts_link_release_both(ldp); + } + + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED) { + code = erts_dsig_send_exit(&dsd, to, from, am_noproc); + ASSERT(code == ERTS_DSIG_SEND_OK); + } - erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); break; + } case DOP_UNLINK: { - ErtsDistLinkData dld; if (tuple_arity != 3) { goto invalid_message; } from = tuple[2]; to = tuple[3]; - if (is_not_pid(from) || is_not_pid(to)) { + if (is_not_external_pid(from)) + goto invalid_message; + if (dep != external_pid_dist_entry(from)) goto invalid_message; - } - - rp = erts_pid2proc_opt(NULL, 0, - to, ERTS_PROC_LOCK_LINK, - ERTS_P2P_FLG_ALLOW_OTHER_X); - if (!rp) - break; - - lnk = erts_remove_link(&ERTS_P_LINKS(rp), from); - if (IS_TRACED_FL(rp, F_TRACE_PROCS) && lnk != NULL) { - trace_proc(NULL, 0, rp, am_getting_unlinked, from); - } + if (is_external_pid(to) + && erts_this_dist_entry == external_pid_dist_entry(from)) + break; - erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + if (is_not_internal_pid(to)) + goto invalid_message; - erts_remove_dist_link(&dld, to, from, dep); - erts_destroy_dist_link(&dld); - if (lnk) - erts_destroy_link(lnk); + erts_proc_sig_send_dist_unlink(dep, from, to); break; } case DOP_MONITOR_P: { /* A remote process wants to monitor us, we get: {DOP_MONITOR_P, Remote pid, local pid or name, ref} */ - Eterm name; - + Eterm pid, name; + ErtsDSigData dsd; + int code; + if (tuple_arity != 4) { goto invalid_message; } @@ -1447,44 +1441,64 @@ int erts_net_message(Port *prt, watched = tuple[3]; /* local proc to monitor */ ref = tuple[4]; - if (is_not_ref(ref)) { + if (is_not_external_pid(watcher)) + goto invalid_message; + else if (external_pid_dist_entry(watcher) != dep) + goto invalid_message; + + if (is_not_ref(ref)) goto invalid_message; - } - if (is_atom(watched)) { - name = watched; - rp = erts_whereis_process(NULL, 0, - watched, ERTS_PROC_LOCK_LINK, - ERTS_P2P_FLG_ALLOW_OTHER_X); - } - else { - name = NIL; - rp = erts_pid2proc_opt(NULL, 0, - watched, ERTS_PROC_LOCK_LINK, - ERTS_P2P_FLG_ALLOW_OTHER_X); - } + if (is_internal_pid(watched)) { + name = NIL; + pid = watched; + } + else if (is_atom(watched)) { + name = watched; + pid = erts_whereis_name_to_id(NULL, watched); + /* if port or undefined; reply noproc... */ + } + else if (is_external_pid(watched) + && external_pid_dist_entry(watched) == erts_this_dist_entry) { + name = NIL; + pid = am_undefined; /* old incarnation; reply noproc... */ + } + else + goto invalid_message; - if (!rp) { - ErtsDSigData dsd; - int code; - code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { - code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, - am_noproc); - ASSERT(code == ERTS_DSIG_SEND_OK); - } - } - else { - if (is_atom(watched)) - watched = rp->common.id; - erts_de_links_lock(dep); - erts_add_monitor(&(dep->monitors), MON_ORIGIN, ref, watched, name); - erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, ref, watcher, name); - erts_de_links_unlock(dep); - erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - } + if (is_internal_pid(pid)) { + ErtsMonitorData *mdp; + mdp = erts_monitor_create(ERTS_MON_TYPE_DIST_PROC, + ref, watcher, pid, name); - break; +#ifdef DEBUG + code = +#endif + erts_monitor_dist_insert(&mdp->origin, dep->mld); + ASSERT(code); + + if (erts_proc_sig_send_monitor(&mdp->target, pid)) + break; /* done */ + + /* Failed to send to local proc; cleanup reply noproc... */ + +#ifdef DEBUG + code = +#endif + erts_monitor_dist_delete(&mdp->origin); + ASSERT(code); + erts_monitor_release_both(mdp); + + } + + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED) { + code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, + am_noproc); + ASSERT(code == ERTS_DSIG_SEND_OK); + } + + break; } case DOP_DEMONITOR_P: @@ -1495,36 +1509,43 @@ int erts_net_message(Port *prt, if (tuple_arity != 4) { goto invalid_message; } - /* watcher = tuple[2]; */ - /* watched = tuple[3]; May be an atom in case of monitor name */ + + watcher = tuple[2]; + watched = tuple[3]; ref = tuple[4]; - if(is_not_ref(ref)) { + if (is_not_ref(ref)) { goto invalid_message; } - erts_de_links_lock(dep); - mon = erts_remove_monitor(&(dep->monitors),ref); - erts_de_links_unlock(dep); - /* ASSERT(mon != NULL); can happen in case of broken dist message */ - if (mon == NULL) { - break; - } - watched = mon->u.pid; - erts_destroy_monitor(mon); - rp = erts_pid2proc_opt(NULL, 0, - watched, ERTS_PROC_LOCK_LINK, - ERTS_P2P_FLG_ALLOW_OTHER_X); - if (!rp) { - break; - } - mon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref); - erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); - ASSERT(mon != NULL); - if (mon == NULL) { - break; - } - erts_destroy_monitor(mon); + if (is_not_external_pid(watcher) || external_pid_dist_entry(watcher) != dep) + goto invalid_message; + + if (is_internal_pid(watched)) + erts_proc_sig_send_dist_demonitor(watched, ref); + else if (is_external_pid(watched) + && external_pid_dist_entry(watched) == erts_this_dist_entry) { + /* old incarnation; ignore it */ + ; + } + else if (is_atom(watched)) { + ErtsMonLnkDist *mld = dep->mld; + ErtsMonitor *mon; + + erts_mtx_lock(&mld->mtx); + + mon = erts_monitor_tree_lookup(mld->orig_name_monitors, ref); + if (mon) + erts_monitor_tree_delete(&mld->orig_name_monitors, mon); + + erts_mtx_unlock(&mld->mtx); + + if (mon) + erts_proc_sig_send_demonitor(mon); + } + else + goto invalid_message; + break; case DOP_REG_SEND_TT: @@ -1651,68 +1672,36 @@ int erts_net_message(Port *prt, /* We are monitoring a process on the remote node which dies, we get {DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */ - - DeclareTmpHeapNoproc(lhp,3); - Eterm sysname; - ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_MSG_SEND|ERTS_PROC_LOCK_LINK; - if (tuple_arity != 5) { goto invalid_message; } - /* watched = tuple[2]; */ /* remote proc which died */ - /* watcher = tuple[3]; */ + watched = tuple[2]; /* remote proc or name which died */ + watcher = tuple[3]; ref = tuple[4]; reason = tuple[5]; - if(is_not_ref(ref)) { + if (is_not_ref(ref)) goto invalid_message; - } - - erts_de_links_lock(dep); - sysname = dep->sysname; - mon = erts_remove_monitor(&(dep->monitors), ref); - /* - * If demonitor was performed at the same time as the - * monitored process exits, monitoring side will have - * removed info about monitor. In this case, do nothing - * and everything will be as it should. - */ - erts_de_links_unlock(dep); - if (mon == NULL) { - break; - } - rp = erts_pid2proc(NULL, 0, mon->u.pid, rp_locks); - erts_destroy_monitor(mon); - if (rp == NULL) { - break; - } + if (is_not_external_pid(watched) && is_not_atom(watched)) + goto invalid_message; - mon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref); + if (is_not_internal_pid(watcher)) { + if (!is_external_pid(watcher)) + goto invalid_message; + if (erts_this_dist_entry == external_pid_dist_entry(watcher)) + break; + goto invalid_message; + } - if (mon == NULL) { - erts_proc_unlock(rp, rp_locks); - break; - } - UseTmpHeapNoproc(3); - - watched = (is_not_nil(mon->name) - ? TUPLE2(&lhp[0], mon->name, sysname) - : mon->u.pid); - - erts_queue_monitor_message(rp, &rp_locks, - ref, am_process, watched, reason); - erts_proc_unlock(rp, rp_locks); - erts_destroy_monitor(mon); - UnUseTmpHeapNoproc(3); + erts_proc_sig_send_dist_monitor_down(dep, ref, watched, + watcher, reason); break; } case DOP_EXIT_TT: case DOP_EXIT: { - ErtsDistLinkData dld; - ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND; /* 'from', which 'to' is linked to, died */ if (type == DOP_EXIT) { if (tuple_arity != 4) { @@ -1732,56 +1721,19 @@ int erts_net_message(Port *prt, token = tuple[4]; reason = tuple[5]; } - if (is_not_pid(from) || is_not_internal_pid(to)) { + if (is_not_external_pid(from) + || dep != external_pid_dist_entry(from) + || is_not_internal_pid(to)) { goto invalid_message; } - rp = erts_pid2proc(NULL, 0, to, rp_locks); - if (!rp) - lnk = NULL; - else { - lnk = erts_remove_link(&ERTS_P_LINKS(rp), from); - - /* If lnk == NULL, we have unlinked on this side, i.e. - * ignore exit. - */ - if (lnk) { - int xres; -#if 0 - /* Arndt: Maybe it should never be 'kill', but it can be, - namely when a linked process does exit(kill). Until we know - whether that is incorrect and what should happen instead, - we leave the assertion out. */ - ASSERT(reason != am_kill); /* should never be kill (killed) */ -#endif - xres = erts_send_exit_signal(NULL, - from, - rp, - &rp_locks, - reason, - token, - NULL, - ERTS_XSIG_FLG_IGN_KILL); - if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) { - /* We didn't exit the process and it is traced */ - if (rp_locks & ERTS_PROC_LOCKS_XSIG_SEND) { - erts_proc_unlock(rp, ERTS_PROC_LOCKS_XSIG_SEND); - rp_locks &= ~ERTS_PROC_LOCKS_XSIG_SEND; - } - trace_proc(NULL, 0, rp, am_getting_unlinked, from); - } - } - erts_proc_unlock(rp, rp_locks); - } - erts_remove_dist_link(&dld, to, from, dep); - if (lnk) - erts_destroy_link(lnk); - erts_destroy_dist_link(&dld); + erts_proc_sig_send_dist_link_exit(dep, + from, to, + reason, token); break; } case DOP_EXIT2_TT: - case DOP_EXIT2: { - ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; + case DOP_EXIT2: /* 'from' is send an exit signal to 'to' */ if (type == DOP_EXIT2) { if (tuple_arity != 4) { @@ -1803,20 +1755,10 @@ int erts_net_message(Port *prt, if (is_not_pid(from) || is_not_internal_pid(to)) { goto invalid_message; } - rp = erts_pid2proc(NULL, 0, to, rp_locks); - if (rp) { - (void) erts_send_exit_signal(NULL, - from, - rp, - &rp_locks, - reason, - token, - NULL, - 0); - erts_proc_unlock(rp, rp_locks); - } + + erts_proc_sig_send_exit(NULL, from, to, reason, token, 0); break; - } + case DOP_GROUP_LEADER: if (tuple_arity != 3) { goto invalid_message; @@ -1827,11 +1769,7 @@ int erts_net_message(Port *prt, goto invalid_message; } - rp = erts_pid2proc(NULL, 0, to, ERTS_PROC_LOCK_MAIN); - if (!rp) - break; - rp->group_leader = STORE_NC_IN_PROC(rp, from); - erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); + (void) erts_proc_sig_send_group_leader(NULL, to, from, NIL); break; default: @@ -1982,6 +1920,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) break; } ctx->u.ec.flags = ctx->flags; + ctx->u.ec.hopefull_flags = 0; ctx->u.ec.level = 0; ctx->u.ec.wstack.wstart = NULL; ctx->obuf->msg_start = ctx->obuf->ext_endp; @@ -2005,6 +1944,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp; + ctx->obuf->hopefull_flags = ctx->u.ec.hopefull_flags; /* * Signal encoded; now verify that the connection still exists, * and if so enqueue the signal and schedule it for send. @@ -2012,8 +1952,8 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->obuf->next = NULL; erts_de_rlock(dep); cid = dep->cid; - if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED)) - || dep->status & ERTS_DE_SFLG_EXITING + if (dep->state == ERTS_DE_STATE_EXITING + || dep->state == ERTS_DE_STATE_IDLE || dep->connection_id != dsdp->connection_id) { /* Not the same connection as when we started; drop message... */ erts_de_runlock(dep); @@ -2088,7 +2028,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } erts_mtx_unlock(&dep->qlock); - if (!(dep->status & ERTS_DE_SFLG_PENDING)) { + if (dep->state != ERTS_DE_STATE_PENDING) { if (is_internal_port(dep->cid)) erts_schedule_dist_command(NULL, dep); } @@ -2270,7 +2210,7 @@ int erts_dist_command(Port *prt, int initial_reds) { Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START; - Uint32 status; + enum dist_entry_state state; Uint32 flags; Sint qsize, obufsize = 0; ErtsDistOutputQueue oq, foq; @@ -2285,17 +2225,17 @@ erts_dist_command(Port *prt, int initial_reds) erts_de_rlock(dep); flags = dep->flags; - status = dep->status; + state = dep->state; send = dep->send; erts_de_runlock(dep); - if (status & ERTS_DE_SFLG_EXITING) { + if (state == ERTS_DE_STATE_EXITING) { erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1); reds -= ERTS_PORT_REDS_DIST_CMD_EXIT; return initial_reds - reds; } - ASSERT(!(status & ERTS_DE_SFLG_PENDING)); + ASSERT(state != ERTS_DE_STATE_PENDING); ASSERT(send); @@ -2831,7 +2771,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_de_rlock(dep); - if (dep->status & ERTS_DE_SFLG_EXITING) + if (dep->state == ERTS_DE_STATE_EXITING) goto return_none; ASSERT(dep->cid == BIF_P->common.id); @@ -2934,9 +2874,9 @@ erts_dist_port_not_busy(Port *prt) static void kill_connection(DistEntry *dep) { ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); - ASSERT(dep->status == ERTS_DE_SFLG_CONNECTED); + ASSERT(dep->state == ERTS_DE_STATE_CONNECTED); - dep->status |= ERTS_DE_SFLG_EXITING; + dep->state = ERTS_DE_STATE_EXITING; erts_mtx_lock(&dep->qlock); ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); @@ -2953,7 +2893,7 @@ erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) { erts_de_rwlock(dep); if (connection_id == dep->connection_id - && dep->status == ERTS_DE_SFLG_CONNECTED) { + && dep->state == ERTS_DE_STATE_CONNECTED) { kill_connection(dep); } @@ -2969,57 +2909,55 @@ static void doit_print_monitor_info(ErtsMonitor *mon, void *vptdp) { fmtfn_t to = ((struct print_to_data *) vptdp)->to; void *arg = ((struct print_to_data *) vptdp)->arg; - Process *rp; - ErtsMonitor *rmon; - rp = erts_proc_lookup(mon->u.pid); - if (!rp || (rmon = erts_lookup_monitor(ERTS_P_MONITORS(rp), mon->ref)) == NULL) { - erts_print(to, arg, "Warning, stray monitor for: %T\n", mon->u.pid); - } else if (mon->type == MON_ORIGIN) { - /* Local pid is being monitored */ + ErtsMonitorDataExtended *mdep; + + ASSERT(mon->flags & ERTS_ML_FLG_EXTENDED); + + mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon); + + ASSERT(mdep->dist); + + if (erts_monitor_is_origin(mon)) { erts_print(to, arg, "Remotely monitored by: %T %T\n", - mon->u.pid, rmon->u.pid); - } else { - erts_print(to, arg, "Remote monitoring: %T ", mon->u.pid); - if (is_not_atom(rmon->u.pid)) - erts_print(to, arg, "%T\n", rmon->u.pid); - else - erts_print(to, arg, "{%T, %T}\n", - rmon->name, - rmon->u.pid); /* which in this case is the - remote system name... */ + mon->other.item, mdep->md.target.other.item); + } + else { + erts_print(to, arg, "Remote monitoring: %T ", mon->other.item); + if (mon->flags & ERTS_ML_FLG_NAME) + erts_print(to, arg, "{%T, %T}\n", mdep->u.name, mdep->dist->nodename); + else + erts_print(to, arg, "%T\n", mdep->md.origin.other.item); } } -static void print_monitor_info(fmtfn_t to, void *arg, ErtsMonitor *mon) +static void print_monitor_info(fmtfn_t to, void *arg, DistEntry *dep) { struct print_to_data ptd = {to, arg}; - erts_doforall_monitors(mon,&doit_print_monitor_info,&ptd); -} - -typedef struct { - struct print_to_data *ptdp; - Eterm from; -} PrintLinkContext; - -static void doit_print_link_info2(ErtsLink *lnk, void *vpplc) -{ - PrintLinkContext *pplc = (PrintLinkContext *) vpplc; - erts_print(pplc->ptdp->to, pplc->ptdp->arg, "Remote link: %T %T\n", - pplc->from, lnk->pid); + if (dep->mld) { + erts_monitor_list_foreach(dep->mld->monitors, + doit_print_monitor_info, + (void *) &ptd); + erts_monitor_tree_foreach(dep->mld->orig_name_monitors, + doit_print_monitor_info, + (void *) &ptd); + } } static void doit_print_link_info(ErtsLink *lnk, void *vptdp) { - if (is_internal_pid(lnk->pid) && erts_proc_lookup(lnk->pid)) { - PrintLinkContext plc = {(struct print_to_data *) vptdp, lnk->pid}; - erts_doforall_links(ERTS_LINK_ROOT(lnk), &doit_print_link_info2, &plc); - } + struct print_to_data *ptdp = vptdp; + ErtsLink *lnk2 = erts_link_to_other(lnk, NULL); + erts_print(ptdp->to, ptdp->arg, "Remote link: %T %T\n", + lnk2->other.item, lnk->other.item); } -static void print_link_info(fmtfn_t to, void *arg, ErtsLink *lnk) +static void print_link_info(fmtfn_t to, void *arg, DistEntry *dep) { struct print_to_data ptd = {to, arg}; - erts_doforall_links(lnk, &doit_print_link_info, (void *) &ptd); + if (dep->mld) + erts_link_list_foreach(dep->mld->links, + doit_print_link_info, + (void *) &ptd); } typedef struct { @@ -3027,23 +2965,6 @@ typedef struct { Eterm sysname; } PrintNodeLinkContext; - -static void doit_print_nodelink_info(ErtsLink *lnk, void *vpcontext) -{ - PrintNodeLinkContext *pcontext = vpcontext; - - if (is_internal_pid(lnk->pid) && erts_proc_lookup(lnk->pid)) - erts_print(pcontext->ptd.to, pcontext->ptd.arg, - "Remote monitoring: %T %T\n", lnk->pid, pcontext->sysname); -} - -static void print_nodelink_info(fmtfn_t to, void *arg, ErtsLink *lnk, Eterm sysname) -{ - PrintNodeLinkContext context = {{to, arg}, sysname}; - erts_doforall_links(lnk, &doit_print_nodelink_info, &context); -} - - static int info_dist_entry(fmtfn_t to, void *arg, DistEntry *dep, int visible, int connected) { @@ -3074,8 +2995,8 @@ info_dist_entry(fmtfn_t to, void *arg, DistEntry *dep, int visible, int connecte erts_print(to, arg, "Name: %T", dep->sysname); erts_print(to, arg, "\n"); if (!connected && is_nil(dep->cid)) { - if (dep->nlinks) { - erts_print(to, arg, "Error: Got links to not connected node:%T\n", + if (dep->mld) { + erts_print(to, arg, "Error: Got links/monitors to not connected node:%T\n", dep->sysname); } return 0; @@ -3084,9 +3005,8 @@ info_dist_entry(fmtfn_t to, void *arg, DistEntry *dep, int visible, int connecte erts_print(to, arg, "Controller: %T\n", dep->cid, to); erts_print_node_info(to, arg, dep->sysname, NULL, NULL); - print_monitor_info(to, arg, dep->monitors); - print_link_info(to, arg, dep->nlinks); - print_nodelink_info(to, arg, dep->node_links, dep->sysname); + print_monitor_info(to, arg, dep); + print_link_info(to, arg, dep); return 0; @@ -3173,8 +3093,7 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) goto error; /* Check that all trap functions are defined !! */ - if (dmonitor_node_trap->addressv[0] == NULL || - dmonitor_p_trap->addressv[0] == NULL) { + if (dmonitor_node_trap->addressv[0] == NULL) { goto error; } @@ -3341,7 +3260,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto badarg; } - if (dep->status & ERTS_DE_SFLG_EXITING) { + if (dep->state == ERTS_DE_STATE_EXITING) { /* Suspend on dist entry waiting for the exit to finish */ ErtsProcList *plp = erts_proclist_create(BIF_P); plp->next = NULL; @@ -3351,8 +3270,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_mtx_unlock(&dep->qlock); goto yield; } - if (dep->status != ERTS_DE_SFLG_PENDING) { - if (dep->status == 0) + if (dep->state != ERTS_DE_STATE_PENDING) { + if (dep->state == ERTS_DE_STATE_IDLE) erts_set_dist_entry_pending(dep); else goto badarg; @@ -3390,7 +3309,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto done; /* Already set */ } - if (dep->status & ERTS_DE_SFLG_EXITING) { + if (dep->state == ERTS_DE_STATE_EXITING) { /* Suspend on dist entry waiting for the exit to finish */ ErtsProcList *plp = erts_proclist_create(BIF_P); plp->next = NULL; @@ -3400,8 +3319,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_mtx_unlock(&dep->qlock); goto yield; } - if (dep->status != ERTS_DE_SFLG_PENDING) { - if (dep->status == 0) + if (dep->state != ERTS_DE_STATE_PENDING) { + if (dep->state == ERTS_DE_STATE_IDLE) erts_set_dist_entry_pending(dep); else goto badarg; @@ -3437,7 +3356,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) #ifdef DEBUG ASSERT(erts_atomic_read_nob(&dep->qsize) == 0 - || (dep->status & ERTS_DE_SFLG_PENDING)); + || (dep->state == ERTS_DE_STATE_PENDING)); #endif if (flags & DFLAG_DIST_HDR_ATOM_CACHE) @@ -3506,6 +3425,11 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto done; } +BIF_RETTYPE erts_internal_get_dflags_0(BIF_ALIST_0) +{ + return erts_dflags_record; +} + BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1) { DistEntry* dep; @@ -3525,15 +3449,20 @@ BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1) erts_de_rwlock(dep); - if (ERTS_DE_IS_CONNECTED(dep) || dep->status & ERTS_DE_SFLG_PENDING) + switch (dep->state) { + case ERTS_DE_STATE_PENDING: + case ERTS_DE_STATE_CONNECTED: conn_id = dep->connection_id; - else if (dep->status == 0) { + break; + case ERTS_DE_STATE_IDLE: erts_set_dist_entry_pending(dep); conn_id = dep->connection_id; - } - else { - ASSERT(dep->status & ERTS_DE_SFLG_EXITING); + break; + case ERTS_DE_STATE_EXITING: conn_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; + break; + default: + erts_exit(ERTS_ABORT_EXIT, "Invalid dep->state (%d)\n", dep->state); } erts_de_rwunlock(dep); hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); @@ -3548,29 +3477,20 @@ static Sint abort_connection(DistEntry* dep, Uint32 conn_id) if (dep->connection_id != conn_id) ; - else if (dep->status == ERTS_DE_SFLG_CONNECTED) { + else if (dep->state == ERTS_DE_STATE_CONNECTED) { kill_connection(dep); } - else if (dep->status == ERTS_DE_SFLG_PENDING) { - NetExitsContext nec = {dep}; - ErtsLink *nlinks; - ErtsLink *node_links; - ErtsMonitor *monitors; + else if (dep->state == ERTS_DE_STATE_PENDING) { ErtsAtomCache *cache; ErtsDistOutputBuf *obuf; ErtsProcList *resume_procs; Sint reds = 0; + ErtsMonLnkDist *mld; ASSERT(is_nil(dep->cid)); - erts_de_links_lock(dep); - monitors = dep->monitors; - nlinks = dep->nlinks; - node_links = dep->node_links; - dep->monitors = NULL; - dep->nlinks = NULL; - dep->node_links = NULL; - erts_de_links_unlock(dep); + mld = dep->mld; + dep->mld = NULL; cache = dep->cache; dep->cache = NULL; @@ -3589,9 +3509,8 @@ static Sint abort_connection(DistEntry* dep, Uint32 conn_id) erts_de_rwunlock(dep); - erts_sweep_monitors(monitors, &doit_monitor_net_exits, &nec); - erts_sweep_links(nlinks, &doit_link_net_exits, &nec); - erts_sweep_links(node_links, &doit_node_link_net_exits, &nec); + schedule_con_monitor_link_cleanup(mld, THE_NON_VALUE, + THE_NON_VALUE, THE_NON_VALUE); if (resume_procs) { int resumed = erts_resume_processes(resume_procs); @@ -3600,6 +3519,17 @@ static Sint abort_connection(DistEntry* dep, Uint32 conn_id) delete_cache(cache); free_de_out_queues(dep, obuf); + + /* + * We wait to make DistEntry idle and accept new connection attempts + * until all is cleared and deallocated. This to get some back pressure + * against repeated failing connection attempts saturating all CPUs + * with cleanup jobs. + */ + erts_de_rwlock(dep); + ASSERT(dep->state == ERTS_DE_STATE_EXITING); + dep->state = ERTS_DE_STATE_IDLE; + erts_de_rwunlock(dep); return reds; } erts_de_rwunlock(dep); @@ -3631,7 +3561,7 @@ BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2) int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) { erts_de_rwlock(dep); - if (dep->status != 0) { + if (dep->state != ERTS_DE_STATE_IDLE) { erts_de_rwunlock(dep); } else { @@ -3820,8 +3750,8 @@ BIF_RETTYPE is_alive_0(BIF_ALIST_0) static BIF_RETTYPE monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) { - DistEntry *dep; - ErtsLink *lnk; + BIF_RETTYPE ret; + DistEntry *dep = NULL; Eterm l; int async_connect = 1; @@ -3840,33 +3770,71 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) if (l != NIL) { BIF_ERROR(p, BADARG); } + if (l != NIL) + goto badarg; - if (is_not_atom(Node) || - ((Bool != am_true) && (Bool != am_false)) || - ((erts_this_node->sysname == am_Noname) - && (Node != erts_this_node->sysname))) { - BIF_ERROR(p, BADARG); + if (is_not_atom(Node)) + goto badarg; + + if (erts_this_node->sysname == am_Noname && Node != am_Noname) + goto badarg; + + switch (Bool) { + + case am_false: { + ErtsMonitor *mon; + /* + * Before OTP-21, monitor_node(Node, false) triggered + * auto-connect and a 'nodedown' message if that failed. + * Now it's a simple no-op which feels more reasonable. + */ + mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(p), Node); + if (mon) { + ErtsMonitorDataExtended *mdep; + ASSERT(erts_monitor_is_origin(mon)); + + mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon); + + ASSERT((mdep->u.refc > 0)); + if (--mdep->u.refc == 0) { + if (!mdep->uptr.node_monitors) + erts_monitor_tree_delete(&ERTS_P_MONITORS(p), mon); + else { + ErtsMonitor *sub_mon; + ErtsMonitorDataExtended *sub_mdep; + sub_mon = erts_monitor_list_last(mdep->uptr.node_monitors); + erts_monitor_list_delete(&mdep->uptr.node_monitors, sub_mon); + sub_mon->flags &= ~ERTS_ML_FLG_IN_SUBTABLE; + sub_mdep = ((ErtsMonitorDataExtended *) + erts_monitor_to_data(sub_mon)); + sub_mdep->uptr.node_monitors = mdep->uptr.node_monitors; + mdep->uptr.node_monitors = NULL; + erts_monitor_tree_replace(&ERTS_P_MONITORS(p), mon, sub_mon); + } + if (erts_monitor_dist_delete(&mdep->md.target)) + erts_monitor_release_both((ErtsMonitorData *) mdep); + else + erts_monitor_release(mon); + } + } + break; } - if (Bool == am_true) { + case am_true: { ErtsDSigData dsd; dsd.node = Node; + dep = erts_find_or_insert_dist_entry(Node); if (dep == erts_this_dist_entry) - goto done; - - erts_proc_lock(p, ERTS_PROC_LOCK_LINK); + break; switch (erts_dsig_prepare(&dsd, dep, p, - (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), + ERTS_PROC_LOCK_MAIN, ERTS_DSP_RLOCK, 0, async_connect)) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: /* Trap to either send 'nodedown' or do passive connection attempt */ - trap: - erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); - erts_deref_dist_entry(dep); - BIF_TRAP3(dmonitor_node_trap, p, Node, Bool, Options); + goto do_trap; case ERTS_DSIG_PREP_PENDING: if (!async_connect) { /* @@ -3874,67 +3842,87 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) * to ensure passive connection attempt */ erts_de_runlock(dep); - goto trap; + goto do_trap; } /*fall through*/ - case ERTS_DSIG_PREP_CONNECTED: - erts_de_links_lock(dep); - erts_de_runlock(dep); - lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, - p->common.id); - ++ERTS_LINK_REFC(lnk); - lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); - ++ERTS_LINK_REFC(lnk); - erts_de_links_unlock(dep); + case ERTS_DSIG_PREP_CONNECTED: { + ErtsMonitor *mon; + ErtsMonitorDataExtended *mdep; + int created; + + mon = erts_monitor_tree_lookup_create(&ERTS_P_MONITORS(p), + &created, + ERTS_MON_TYPE_NODE, + p->common.id, + Node); + mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon); + if (created) { +#ifdef DEBUG + int inserted = +#endif + erts_monitor_dist_insert(&mdep->md.target, dep->mld); + ASSERT(inserted); + ASSERT(mdep->dist->connection_id == dep->connection_id); + } + else if (mdep->dist->connection_id != dep->connection_id) { + ErtsMonitorDataExtended *mdep2; + ErtsMonitor *mon2; +#ifdef DEBUG + int inserted; +#endif + mdep2 = ((ErtsMonitorDataExtended *) + erts_monitor_create(ERTS_MON_TYPE_NODE, NIL, + p->common.id, Node, NIL)); + mon2 = &mdep2->md.origin; +#ifdef DEBUG + inserted = +#endif + erts_monitor_dist_insert(&mdep->md.target, dep->mld); + ASSERT(inserted); + ASSERT(mdep2->dist->connection_id == dep->connection_id); + + mdep2->uptr.node_monitors = mdep->uptr.node_monitors; + mdep->uptr.node_monitors = NULL; + erts_monitor_tree_replace(&ERTS_P_MONITORS(p), mon, mon2); + erts_monitor_list_insert(&mdep2->uptr.node_monitors, mon); + mon->flags |= ERTS_ML_FLG_IN_SUBTABLE; + mdep = mdep2; + } + + mdep->u.refc++; + break; + } + default: ERTS_ASSERT(! "Invalid dsig prepare result"); } - erts_deref_dist_entry(dep); - } - else { /* Bool == false */ - dep = erts_sysname_to_connected_dist_entry(Node); - if (!dep) { - /* - * Before OTP-21 this case triggered auto-connect - * and a 'nodedown' message if that failed. - * Now it's a simple no-op which feels more reasonable. - */ - BIF_RET(am_true); - } - if (dep == erts_this_dist_entry) - goto done; - erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - erts_de_rlock(dep); - if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED))) { - erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); - erts_de_runlock(dep); - goto done; - } - erts_de_links_lock(dep); erts_de_runlock(dep); - lnk = erts_lookup_link(dep->node_links, p->common.id); - if (lnk != NULL) { - if ((--ERTS_LINK_REFC(lnk)) == 0) { - erts_destroy_link(erts_remove_link(&(dep->node_links), - p->common.id)); - } - } - lnk = erts_lookup_link(ERTS_P_LINKS(p), Node); - if (lnk != NULL) { - if ((--ERTS_LINK_REFC(lnk)) == 0) { - erts_destroy_link(erts_remove_link(&ERTS_P_LINKS(p), - Node)); - } - } - erts_de_links_unlock(dep); + + break; } - erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); + default: + goto badarg; + } - done: - BIF_RET(am_true); + ERTS_BIF_PREP_RET(ret, am_true); + +do_return: + + if (dep) + erts_deref_dist_entry(dep); + + return ret; + +do_trap: + ERTS_BIF_PREP_TRAP3(ret, dmonitor_node_trap, p, Node, Bool, Options); + goto do_return; + +badarg: + ERTS_BIF_PREP_ERROR(ret, p, BADARG); + goto do_return; } BIF_RETTYPE monitor_node_3(BIF_ALIST_3) @@ -3985,18 +3973,9 @@ BIF_RETTYPE net_kernel_dflag_unicode_io_1(BIF_ALIST_1) #define ERTS_NODES_MON_OPT_TYPES \ (ERTS_NODES_MON_OPT_TYPE_VISIBLE|ERTS_NODES_MON_OPT_TYPE_HIDDEN) -typedef struct ErtsNodesMonitor_ ErtsNodesMonitor; -struct ErtsNodesMonitor_ { - ErtsNodesMonitor *prev; - ErtsNodesMonitor *next; - Process *proc; - Uint16 opts; - Uint16 no; -}; - static erts_mtx_t nodes_monitors_mtx; -static ErtsNodesMonitor *nodes_monitors; -static ErtsNodesMonitor *nodes_monitors_end; +static ErtsMonitor *nodes_monitors; +static Uint no_nodes_monitors; /* * Nodes monitors are stored in a double linked list. 'nodes_monitors' @@ -4015,109 +3994,169 @@ init_nodes_monitors(void) erts_mtx_init(&nodes_monitors_mtx, "nodes_monitors", NIL, ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION); nodes_monitors = NULL; - nodes_monitors_end = NULL; + no_nodes_monitors = 0; } -static ERTS_INLINE Uint -nodes_mon_msg_sz(ErtsNodesMonitor *nmp, Eterm what, Eterm reason) +Eterm +erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist) { - Uint sz; - if (!nmp->opts) { - sz = 3; - } - else { - sz = 0; + Eterm key, old_value, opts_list = olist; + Uint opts = (Uint) 0; + + ASSERT(c_p); + ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); - if (nmp->opts & ERTS_NODES_MON_OPT_TYPES) - sz += 2 + 3; + if (on != am_true && on != am_false) + return THE_NON_VALUE; - if (what == am_nodedown - && (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON)) { - if (is_not_immed(reason)) - sz += size_object(reason); - sz += 2 + 3; + if (is_not_nil(opts_list)) { + int all = 0, visible = 0, hidden = 0; + + while (is_list(opts_list)) { + Eterm *cp = list_val(opts_list); + Eterm opt = CAR(cp); + opts_list = CDR(cp); + if (opt == am_nodedown_reason) + opts |= ERTS_NODES_MON_OPT_DOWN_REASON; + else if (is_tuple(opt)) { + Eterm* tp = tuple_val(opt); + if (arityval(tp[0]) != 2) + return THE_NON_VALUE; + switch (tp[1]) { + case am_node_type: + switch (tp[2]) { + case am_visible: + if (hidden || all) + return THE_NON_VALUE; + opts |= ERTS_NODES_MON_OPT_TYPE_VISIBLE; + visible = 1; + break; + case am_hidden: + if (visible || all) + return THE_NON_VALUE; + opts |= ERTS_NODES_MON_OPT_TYPE_HIDDEN; + hidden = 1; + break; + case am_all: + if (visible || hidden) + return THE_NON_VALUE; + opts |= ERTS_NODES_MON_OPT_TYPES; + all = 1; + break; + default: + return THE_NON_VALUE; + } + break; + default: + return THE_NON_VALUE; + } + } + else { + return THE_NON_VALUE; + } } - sz += 4; + if (is_not_nil(opts_list)) + return THE_NON_VALUE; + } + + key = make_small(opts); + + if (on == am_true) { + ErtsMonitorDataExtended *mdep; + ErtsMonitor *omon; + int created; + omon = erts_monitor_tree_lookup_create(&ERTS_P_MONITORS(c_p), + &created, + ERTS_MON_TYPE_NODES, + c_p->common.id, + key); + mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(omon); + if (created) { + erts_mtx_lock(&nodes_monitors_mtx); + no_nodes_monitors++; + erts_monitor_list_insert(&nodes_monitors, &mdep->md.target); + erts_mtx_unlock(&nodes_monitors_mtx); + } + old_value = mdep->u.refc; + mdep->u.refc++; + } + else { + ErtsMonitorDataExtended *mdep; + ErtsMonitor *omon; + omon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p), key); + if (!omon) + old_value = 0; + else { + mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(omon); + old_value = mdep->u.refc; + ASSERT(mdep->u.refc > 0); + erts_mtx_lock(&nodes_monitors_mtx); + ASSERT(no_nodes_monitors > 0); + no_nodes_monitors--; + ASSERT(erts_monitor_is_in_table(&mdep->md.target)); + erts_monitor_list_delete(&nodes_monitors, &mdep->md.target); + erts_mtx_unlock(&nodes_monitors_mtx); + erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p), omon); + erts_monitor_release_both((ErtsMonitorData *) mdep); + } } - return sz; + + return erts_make_integer(old_value, c_p); } -static ERTS_INLINE void -send_nodes_mon_msg(Process *rp, - ErtsProcLocks *rp_locksp, - ErtsNodesMonitor *nmp, - Eterm node, - Eterm what, - Eterm type, - Eterm reason, - Uint sz) +void +erts_monitor_nodes_delete(ErtsMonitor *omon) { - Eterm msg; - Eterm *hp; - ErtsMessage *mp; - ErlOffHeap *ohp; -#ifdef DEBUG - Eterm *hend; -#endif + ErtsMonitorData *mdp; - mp = erts_alloc_message_heap(rp, rp_locksp, sz, &hp, &ohp); -#ifdef DEBUG - hend = hp + sz; -#endif + ASSERT(omon->type == ERTS_MON_TYPE_NODES); + ASSERT(erts_monitor_is_origin(omon)); - if (!nmp->opts) { - msg = TUPLE2(hp, what, node); -#ifdef DEBUG - hp += 3; -#endif - } - else { - Eterm tup; - Eterm info = NIL; + mdp = erts_monitor_to_data(omon); - if (nmp->opts & (ERTS_NODES_MON_OPT_TYPE_VISIBLE - | ERTS_NODES_MON_OPT_TYPE_HIDDEN)) { + erts_mtx_lock(&nodes_monitors_mtx); + ASSERT(erts_monitor_is_in_table(&mdp->target)); + ASSERT(no_nodes_monitors > 0); + no_nodes_monitors--; + erts_monitor_list_delete(&nodes_monitors, &mdp->target); + erts_mtx_unlock(&nodes_monitors_mtx); + erts_monitor_release_both(mdp); +} - tup = TUPLE2(hp, am_node_type, type); - hp += 3; - info = CONS(hp, tup, info); - hp += 2; - } +typedef struct { + Eterm pid; + Eterm options; +} ErtsNodesMonitorData; - if (what == am_nodedown - && (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON)) { - Eterm rsn_cpy; - - if (is_immed(reason)) - rsn_cpy = reason; - else { - Eterm rsn_sz = size_object(reason); - rsn_cpy = copy_struct(reason, rsn_sz, &hp, ohp); - } +typedef struct { + ErtsNodesMonitorData *nmdp; + Uint i; +} ErtsNodesMonitorContext; - tup = TUPLE2(hp, am_nodedown_reason, rsn_cpy); - hp += 3; - info = CONS(hp, tup, info); - hp += 2; - } +static void +save_nodes_monitor(ErtsMonitor *mon, void *vctxt) +{ + ErtsNodesMonitorContext *ctxt = vctxt; + ErtsMonitorData *mdp = erts_monitor_to_data(mon); - msg = TUPLE3(hp, what, node, info); -#ifdef DEBUG - hp += 4; -#endif - } + ASSERT(erts_monitor_is_target(mon)); + ASSERT(mon->type == ERTS_MON_TYPE_NODES); - ASSERT(hend == hp); - erts_queue_message(rp, *rp_locksp, mp, msg, am_system); + ctxt->nmdp[ctxt->i].pid = mon->other.item; + ctxt->nmdp[ctxt->i].options = mdp->origin.other.item; + + ctxt->i++; } static void send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reason) { - ErtsNodesMonitor *nmp; - ErtsProcLocks rp_locks = 0; /* Init to shut up false warning */ - Process *rp = NULL; + Uint opts; + Uint i, no, reason_size; + ErtsNodesMonitorData def_buf[100]; + ErtsNodesMonitorData *nmdp = &def_buf[0]; + ErtsNodesMonitorContext ctxt; ASSERT(is_immed(what)); ASSERT(is_immed(node)); @@ -4138,31 +4177,44 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas } #endif - ERTS_LC_ASSERT(!c_p - || (erts_proc_lc_my_proc_locks(c_p) - == ERTS_PROC_LOCK_MAIN)); + ctxt.i = 0; + + reason_size = is_immed(reason) ? 0 : size_object(reason); + erts_mtx_lock(&nodes_monitors_mtx); + if (no_nodes_monitors > sizeof(def_buf)/sizeof(def_buf[0])) + nmdp = erts_alloc(ERTS_ALC_T_TMP, + no_nodes_monitors*sizeof(ErtsNodesMonitorData)); + ctxt.nmdp = nmdp; + erts_monitor_list_foreach(nodes_monitors, + save_nodes_monitor, + (void *) &ctxt); + + ASSERT(ctxt.i == no_nodes_monitors); + no = no_nodes_monitors; - for (nmp = nodes_monitors; nmp; nmp = nmp->next) { - int i; - Uint16 no; - Uint sz; + erts_mtx_unlock(&nodes_monitors_mtx); - ASSERT(nmp->proc != NULL); + for (i = 0; i < no; i++) { + Eterm tmp_heap[3+2+3+2+4 /* max need */]; + Eterm *hp, msg; + Uint hsz; - if (!nmp->opts) { + ASSERT(is_small(nmdp[i].options)); + opts = (Uint) signed_val(nmdp[i].options); + if (!opts) { if (type != am_visible) continue; } else { switch (type) { case am_hidden: - if (!(nmp->opts & ERTS_NODES_MON_OPT_TYPE_HIDDEN)) + if (!(opts & ERTS_NODES_MON_OPT_TYPE_HIDDEN)) continue; break; case am_visible: - if ((nmp->opts & ERTS_NODES_MON_OPT_TYPES) - && !(nmp->opts & ERTS_NODES_MON_OPT_TYPE_VISIBLE)) + if ((opts & ERTS_NODES_MON_OPT_TYPES) + && !(opts & ERTS_NODES_MON_OPT_TYPE_VISIBLE)) continue; break; default: @@ -4170,342 +4222,162 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas } } - if (rp != nmp->proc) { - if (rp) { - if (rp == c_p) - rp_locks &= ~ERTS_PROC_LOCK_MAIN; - erts_proc_unlock(rp, rp_locks); - } - - rp = nmp->proc; - rp_locks = 0; - if (rp == c_p) - rp_locks |= ERTS_PROC_LOCK_MAIN; - } - - ASSERT(rp); + hsz = 0; + hp = &tmp_heap[0]; - sz = nodes_mon_msg_sz(nmp, what, reason); + if (!opts) { + msg = TUPLE2(hp, what, node); + hp += 3; + } + else { + Eterm tup; + Eterm info = NIL; - for (i = 0, no = nmp->no; i < no; i++) - send_nodes_mon_msg(rp, - &rp_locks, - nmp, - node, - what, - type, - reason, - sz); - } + if (opts & (ERTS_NODES_MON_OPT_TYPE_VISIBLE + | ERTS_NODES_MON_OPT_TYPE_HIDDEN)) { - if (rp) { - if (rp == c_p) - rp_locks &= ~ERTS_PROC_LOCK_MAIN; - erts_proc_unlock(rp, rp_locks); - } - - erts_mtx_unlock(&nodes_monitors_mtx); -} + tup = TUPLE2(hp, am_node_type, type); + hp += 3; + info = CONS(hp, tup, info); + hp += 2; + } -static Eterm -insert_nodes_monitor(Process *c_p, Uint32 opts) -{ - Uint16 no = 1; - Eterm res = am_false; - ErtsNodesMonitor *xnmp, *nmp; + if (what == am_nodedown + && (opts & ERTS_NODES_MON_OPT_DOWN_REASON)) { + hsz += reason_size; + tup = TUPLE2(hp, am_nodedown_reason, reason); + hp += 3; + info = CONS(hp, tup, info); + hp += 2; + } - ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&nodes_monitors_mtx)); - ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN); + msg = TUPLE3(hp, what, node, info); + hp += 4; + } - xnmp = c_p->nodes_monitors; - if (xnmp) { - ASSERT(!xnmp->prev || xnmp->prev->proc != c_p); + ASSERT(hp - &tmp_heap[0] <= sizeof(tmp_heap)/sizeof(tmp_heap[0])); - while (1) { - ASSERT(xnmp->proc == c_p); - if (xnmp->opts == opts) - break; - if (!xnmp->next || xnmp->next->proc != c_p) - break; - xnmp = xnmp->next; - } - ASSERT(xnmp); - ASSERT(xnmp->proc == c_p); - ASSERT(xnmp->opts == opts - || !xnmp->next - || xnmp->next->proc != c_p); - - if (xnmp->opts != opts) - goto alloc_new; - else { - res = am_true; - no = xnmp->no++; - if (!xnmp->no) { - /* - * 'no' wrapped; transfer all prevous monitors to new - * element (which will be the next element in the list) - * and set this to one... - */ - xnmp->no = 1; - goto alloc_new; - } - } - } - else { - alloc_new: - nmp = erts_alloc(ERTS_ALC_T_NODES_MON, sizeof(ErtsNodesMonitor)); - nmp->proc = c_p; - nmp->opts = opts; - nmp->no = no; - - if (xnmp) { - ASSERT(nodes_monitors); - ASSERT(c_p->nodes_monitors); - nmp->next = xnmp->next; - nmp->prev = xnmp; - xnmp->next = nmp; - if (nmp->next) { - ASSERT(nodes_monitors_end != xnmp); - ASSERT(nmp->next->prev == xnmp); - nmp->next->prev = nmp; - } - else { - ASSERT(nodes_monitors_end == xnmp); - nodes_monitors_end = nmp; - } - } - else { - ASSERT(!c_p->nodes_monitors); - c_p->nodes_monitors = nmp; - nmp->next = NULL; - nmp->prev = nodes_monitors_end; - if (nodes_monitors_end) { - ASSERT(nodes_monitors); - nodes_monitors_end->next = nmp; - } - else { - ASSERT(!nodes_monitors); - nodes_monitors = nmp; - } - nodes_monitors_end = nmp; - } - } - return res; -} + hsz += hp - &tmp_heap[0]; -static Eterm -remove_nodes_monitors(Process *c_p, Uint32 opts, int all) -{ - Eterm res = am_false; - ErtsNodesMonitor *nmp; - - ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&nodes_monitors_mtx)); - ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN); - - nmp = c_p->nodes_monitors; - ASSERT(!nmp || !nmp->prev || nmp->prev->proc != c_p); - - while (nmp && nmp->proc == c_p) { - if (!all && nmp->opts != opts) - nmp = nmp->next; - else { /* if (all || nmp->opts == opts) */ - ErtsNodesMonitor *free_nmp; - res = am_true; - if (nmp->prev) { - ASSERT(nodes_monitors != nmp); - nmp->prev->next = nmp->next; - } - else { - ASSERT(nodes_monitors == nmp); - nodes_monitors = nmp->next; - } - if (nmp->next) { - ASSERT(nodes_monitors_end != nmp); - nmp->next->prev = nmp->prev; - } - else { - ASSERT(nodes_monitors_end == nmp); - nodes_monitors_end = nmp->prev; - } - free_nmp = nmp; - nmp = nmp->next; - if (c_p->nodes_monitors == free_nmp) - c_p->nodes_monitors = nmp && nmp->proc == c_p ? nmp : NULL; - erts_free(ERTS_ALC_T_NODES_MON, free_nmp); - } + erts_proc_sig_send_persistent_monitor_msg(ERTS_MON_TYPE_NODES, + nmdp[i].options, + am_system, + nmdp[i].pid, + msg, + hsz); } - - ASSERT(!all || !c_p->nodes_monitors); - return res; -} -void -erts_delete_nodes_monitors(Process *c_p, ErtsProcLocks locks) -{ -#if defined(ERTS_ENABLE_LOCK_CHECK) - if (c_p) { - ErtsProcLocks might_unlock = locks & ~ERTS_PROC_LOCK_MAIN; - if (might_unlock) - erts_proc_lc_might_unlock(c_p, might_unlock); - } -#endif - if (erts_mtx_trylock(&nodes_monitors_mtx) == EBUSY) { - ErtsProcLocks unlock_locks = locks & ~ERTS_PROC_LOCK_MAIN; - if (c_p && unlock_locks) - erts_proc_unlock(c_p, unlock_locks); - erts_mtx_lock(&nodes_monitors_mtx); - if (c_p && unlock_locks) - erts_proc_lock(c_p, unlock_locks); - } - remove_nodes_monitors(c_p, 0, 1); - erts_mtx_unlock(&nodes_monitors_mtx); + if (nmdp != &def_buf[0]) + erts_free(ERTS_ALC_T_TMP, nmdp); } + -Eterm -erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist) -{ +typedef struct { + Eterm **hpp; + Uint *szp; Eterm res; - Eterm opts_list = olist; - Uint16 opts = (Uint16) 0; - - ASSERT(c_p); - ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); - - if (on != am_true && on != am_false) - return THE_NON_VALUE; - - if (is_not_nil(opts_list)) { - int all = 0, visible = 0, hidden = 0; - - while (is_list(opts_list)) { - Eterm *cp = list_val(opts_list); - Eterm opt = CAR(cp); - opts_list = CDR(cp); - if (opt == am_nodedown_reason) - opts |= ERTS_NODES_MON_OPT_DOWN_REASON; - else if (is_tuple(opt)) { - Eterm* tp = tuple_val(opt); - if (arityval(tp[0]) != 2) - return THE_NON_VALUE; - switch (tp[1]) { - case am_node_type: - switch (tp[2]) { - case am_visible: - if (hidden || all) - return THE_NON_VALUE; - opts |= ERTS_NODES_MON_OPT_TYPE_VISIBLE; - visible = 1; - break; - case am_hidden: - if (visible || all) - return THE_NON_VALUE; - opts |= ERTS_NODES_MON_OPT_TYPE_HIDDEN; - hidden = 1; - break; - case am_all: - if (visible || hidden) - return THE_NON_VALUE; - opts |= ERTS_NODES_MON_OPT_TYPES; - all = 1; - break; - default: - return THE_NON_VALUE; - } - break; - default: - return THE_NON_VALUE; - } - } - else { - return THE_NON_VALUE; - } - } - - if (is_not_nil(opts_list)) - return THE_NON_VALUE; - } +} ErtsNodesMonitorInfoContext; - erts_mtx_lock(&nodes_monitors_mtx); - - if (on == am_true) - res = insert_nodes_monitor(c_p, opts); - else - res = remove_nodes_monitors(c_p, opts, 0); - erts_mtx_unlock(&nodes_monitors_mtx); - - return res; +static void +nodes_monitor_info(ErtsMonitor *mon, void *vctxt) +{ + ErtsMonitorDataExtended *mdep; + ErtsNodesMonitorInfoContext *ctxt = vctxt; + Uint no, i, opts, *szp; + Eterm **hpp, res; + + hpp = ctxt->hpp; + szp = ctxt->szp; + res = ctxt->res; + + ASSERT(erts_monitor_is_target(mon)); + ASSERT(mon->type == ERTS_MON_TYPE_NODES); + mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon); + no = mdep->u.refc; + + ASSERT(is_small(mdep->md.origin.other.item)); + opts = (Uint) signed_val(mdep->md.origin.other.item); + + for (i = 0; i < no; i++) { + Eterm olist = NIL; + if (opts & ERTS_NODES_MON_OPT_TYPES) { + Eterm type; + switch (opts & ERTS_NODES_MON_OPT_TYPES) { + case ERTS_NODES_MON_OPT_TYPES: type = am_all; break; + case ERTS_NODES_MON_OPT_TYPE_VISIBLE: type = am_visible; break; + case ERTS_NODES_MON_OPT_TYPE_HIDDEN: type = am_hidden; break; + default: erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n"); + } + olist = erts_bld_cons(hpp, szp, + erts_bld_tuple(hpp, szp, 2, + am_node_type, + type), + olist); + } + if (opts & ERTS_NODES_MON_OPT_DOWN_REASON) + olist = erts_bld_cons(hpp, szp, am_nodedown_reason, olist); + res = erts_bld_cons(hpp, szp, + erts_bld_tuple(hpp, szp, 2, + mon->other.item, + olist), + res); + } + + ctxt->hpp = hpp; + ctxt->szp = szp; + ctxt->res = res; } -/* - * Note, this function is only used for debuging. - */ - Eterm erts_processes_monitoring_nodes(Process *c_p) { - ErtsNodesMonitor *nmp; - Eterm res; + /* + * Note, this function is only used for debugging. + */ + ErtsNodesMonitorInfoContext ctxt; Eterm *hp; - Eterm **hpp; Uint sz; - Uint *szp; #ifdef DEBUG Eterm *hend; #endif ASSERT(c_p); ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); + + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MAIN); + erts_thr_progress_block(); + erts_mtx_lock(&nodes_monitors_mtx); sz = 0; - szp = &sz; - hpp = NULL; + ctxt.szp = &sz; + ctxt.hpp = NULL; - bld_result: - res = NIL; - - for (nmp = nodes_monitors_end; nmp; nmp = nmp->prev) { - Uint16 i; - for (i = 0; i < nmp->no; i++) { - Eterm olist = NIL; - if (nmp->opts & ERTS_NODES_MON_OPT_TYPES) { - Eterm type; - switch (nmp->opts & ERTS_NODES_MON_OPT_TYPES) { - case ERTS_NODES_MON_OPT_TYPES: type = am_all; break; - case ERTS_NODES_MON_OPT_TYPE_VISIBLE: type = am_visible; break; - case ERTS_NODES_MON_OPT_TYPE_HIDDEN: type = am_hidden; break; - default: erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n"); - } - olist = erts_bld_cons(hpp, szp, - erts_bld_tuple(hpp, szp, 2, - am_node_type, - type), - olist); - } - if (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON) - olist = erts_bld_cons(hpp, szp, am_nodedown_reason, olist); - res = erts_bld_cons(hpp, szp, - erts_bld_tuple(hpp, szp, 2, - nmp->proc->common.id, - olist), - res); - } - } + while (1) { + ctxt.res = NIL; + + erts_monitor_list_foreach(nodes_monitors, + nodes_monitor_info, + (void *) &ctxt); + + if (ctxt.hpp) + break; - if (!hpp) { hp = HAlloc(c_p, sz); #ifdef DEBUG hend = hp + sz; #endif - hpp = &hp; - szp = NULL; - goto bld_result; + ctxt.hpp = &hp; + ctxt.szp = NULL; } ASSERT(hp == hend); erts_mtx_unlock(&nodes_monitors_mtx); - return res; + erts_thr_progress_unblock(); + erts_proc_lock(c_p, ERTS_PROC_LOCK_MAIN); + + return ctxt.res; } |