/* * %CopyrightBegin% * * Copyright Ericsson AB 1996-2017. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * %CopyrightEnd% */ /* * distribution of erlang messages to other nodes. */ /* define this to get a lot of debug output */ /* #define ERTS_DIST_MSG_DBG */ /* #define ERTS_RAW_DIST_MSG_DBG */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #define ERTS_WANT_EXTERNAL_TAGS #include #include "sys.h" #include "erl_vm.h" #include "global.h" #include "erl_process.h" #include "error.h" #include "dist.h" #include "bif.h" #include "external.h" #include "erl_binary.h" #include "erl_thr_progress.h" #include "dtrace-wrapper.h" #define DIST_CTL_DEFAULT_SIZE 64 /* Turn this on to get printouts of all distribution messages * which go on the line */ #if 0 #define ERTS_DIST_MSG_DBG #endif #if 0 #define ERTS_RAW_DIST_MSG_DBG #endif #if defined(ERTS_DIST_MSG_DBG) || defined(ERTS_RAW_DIST_MSG_DBG) static void bw(byte *buf, ErlDrvSizeT sz) { bin_write(ERTS_PRINT_STDERR, NULL, buf, sz); } #endif #ifdef ERTS_DIST_MSG_DBG static void dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) { ErtsHeapFactory factory; DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE); Eterm* ctl = ctl_default; byte *extp = edep->extp; Eterm msg; Sint ctl_len; Sint size = ctl_len = erts_decode_dist_ext_size(edep); if (size < 0) { erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(%s) failed:\n", what); bw(buf, sz); } else { ErlHeapFragment *mbuf = new_message_buffer(size); erts_factory_static_init(&factory, ctl, ctl_len, &mbuf->off_heap); msg = erts_decode_dist_ext(&factory, edep); if (is_value(msg)) erts_fprintf(stderr, " %s: %T\n", what, msg); else { erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext(%s) failed:\n", what); bw(buf, sz); } free_message_buffer(mbuf); edep->extp = extp; } } #endif int erts_is_alive; /* System must be blocked on change */ int erts_dist_buf_busy_limit; /* distribution trap functions */ Export* dmonitor_node_trap = NULL; Export* dmonitor_p_trap = NULL; /* local variables */ static Export *dist_ctrl_put_data_trap; /* forward declarations */ static void clear_dist_entry(DistEntry*); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); static erts_atomic_t no_caches; static erts_atomic_t no_nodes; struct { Eterm reason; ErlHeapFragment *bp; } nodedown; static void delete_cache(ErtsAtomCache *cache) { if (cache) { erts_free(ERTS_ALC_T_DCACHE, (void *) cache); ASSERT(erts_atomic_read_nob(&no_caches) > 0); erts_atomic_dec_nob(&no_caches); } } static void create_cache(DistEntry *dep) { int i; ErtsAtomCache *cp; ERTS_LC_ASSERT(is_nil(dep->cid)); ASSERT(!dep->cache); dep->cache = cp = (ErtsAtomCache*) erts_alloc(ERTS_ALC_T_DCACHE, sizeof(ErtsAtomCache)); erts_atomic_inc_nob(&no_caches); for (i = 0; i < sizeof(cp->in_arr)/sizeof(cp->in_arr[0]); i++) { cp->in_arr[i] = THE_NON_VALUE; cp->out_arr[i] = THE_NON_VALUE; } } Uint erts_dist_cache_size(void) { return (Uint) erts_atomic_read_mb(&no_caches)*sizeof(ErtsAtomCache); } static ErtsProcList * get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs) { erts_aint32_t qflgs; ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); qflgs = erts_atomic32_read_band_acqb(&dep->qflgs, ~unset_qflgs); qflgs &= ~unset_qflgs; if (qflgs & ERTS_DE_QFLG_EXIT) { /* No resume when exit has been scheduled */ return NULL; } else { ErtsProcList *suspended = dep->suspended; dep->suspended = NULL; erts_proclist_fetch(&suspended, NULL); return suspended; } } /* ** A full node name constists of a "n@h" ** ** n must be a valid node name: string of ([a-z][A-Z][0-9]_-)+ ** ** h is not checked at all, we assume that we have a properly ** configured machine where the networking is ok for the OS ** ** We do check that there is not a second @ in the string, since ** many distributed operations are guaranteed not to work then. */ static int is_node_name(char *ptr, int len) { int c = '\0'; /* suppress use-before-set warning */ int pos = 0; while (pos < len) { c = ptr[pos++]; if (! ((c == '-') || (c == '_') || ((c >= 'a') && (c <= 'z')) || ((c >= 'A') && (c <= 'Z')) || ((c >= '0') && (c <= '9')))) break; } /* Scanned past the host name: now we want to see a '@', and there should be text both before and after it. */ if (c != '@' || pos < 2 || pos == len) return 0; while (pos < len) { c = ptr[pos++]; if (c == '@') return 0; } return 1; } int is_node_name_atom(Eterm a) { int i; if(is_not_atom(a)) return 0; i = atom_val(a); ASSERT((i > 0) && (i < atom_table_size()) && (atom_tab(i) != NULL)); return is_node_name((char*)atom_tab(i)->name, atom_tab(i)->len); } typedef struct { DistEntry *dep; Eterm *lhp; } NetExitsContext; /* ** This function is called when a distribution ** port or process terminates */ static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp) { Process *rp; ErtsMonitor *rmon; DistEntry *dep = ((NetExitsContext *) vnecp)->dep; ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK; rp = erts_pid2proc(NULL, 0, mon->u.pid, rp_locks); if (!rp) goto done; if (mon->type == MON_ORIGIN) { /* local pid is being monitored */ rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref); /* ASSERT(rmon != NULL); nope, can happen during process exit */ if (rmon != NULL) { erts_destroy_monitor(rmon); } } else { DeclareTmpHeapNoproc(lhp,3); Eterm watched; UseTmpHeapNoproc(3); ASSERT(mon->type == MON_TARGET); rmon = erts_remove_monitor(&ERTS_P_MONITORS(rp), mon->ref); /* ASSERT(rmon != NULL); can happen during process exit */ if (rmon != NULL) { ASSERT(rmon->type == MON_ORIGIN); ASSERT(is_atom(rmon->name) || is_nil(rmon->name)); watched = (is_atom(rmon->name) ? TUPLE2(lhp, rmon->name, dep->sysname) : rmon->u.pid); rp_locks |= ERTS_PROC_LOCKS_MSG_SEND; erts_proc_lock(rp, ERTS_PROC_LOCKS_MSG_SEND); erts_queue_monitor_message(rp, &rp_locks, mon->ref, am_process, watched, am_noconnection); erts_destroy_monitor(rmon); } UnUseTmpHeapNoproc(3); } erts_proc_unlock(rp, rp_locks); done: erts_destroy_monitor(mon); } typedef struct { NetExitsContext *necp; ErtsLink *lnk; } LinkNetExitsContext; /* ** This is the function actually doing the job of sending exit messages ** for links in a dist entry upon net_exit (the node goes down), NB, ** only process links, not node monitors are handled here, ** they reside in a separate tree.... */ static void doit_link_net_exits_sub(ErtsLink *sublnk, void *vlnecp) { ErtsLink *lnk = ((LinkNetExitsContext *) vlnecp)->lnk; /* the local pid */ ErtsLink *rlnk; Process *rp; ASSERT(lnk->type == LINK_PID); if (is_internal_pid(lnk->pid)) { int xres; ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND; rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks); if (!rp) { goto done; } rlnk = erts_remove_link(&ERTS_P_LINKS(rp), sublnk->pid); xres = erts_send_exit_signal(NULL, sublnk->pid, rp, &rp_locks, am_noconnection, NIL, NULL, 0); if (rlnk) { erts_destroy_link(rlnk); if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) { /* We didn't exit the process and it is traced */ trace_proc(NULL, 0, rp, am_getting_unlinked, sublnk->pid); } } erts_proc_unlock(rp, rp_locks); } done: erts_destroy_link(sublnk); } /* ** This function is called when a distribution ** port or process terminates, once for each link on the high level, ** it in turn traverses the link subtree for the specific link node... */ static void doit_link_net_exits(ErtsLink *lnk, void *vnecp) { LinkNetExitsContext lnec = {(NetExitsContext *) vnecp, lnk}; ASSERT(lnk->type == LINK_PID); erts_sweep_links(ERTS_LINK_ROOT(lnk), &doit_link_net_exits_sub, (void *) &lnec); #ifdef DEBUG ERTS_LINK_ROOT(lnk) = NULL; #endif erts_destroy_link(lnk); } static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp) { DistEntry *dep = ((NetExitsContext *) vnecp)->dep; Eterm name = dep->sysname; Process *rp; ErtsLink *rlnk; Uint i,n; ASSERT(lnk->type == LINK_NODE); if (is_internal_pid(lnk->pid)) { ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK; ErlOffHeap *ohp; rp = erts_proc_lookup(lnk->pid); if (!rp) goto done; erts_proc_lock(rp, rp_locks); rlnk = erts_remove_link(&ERTS_P_LINKS(rp), name); if (rlnk != NULL) { ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE)); erts_destroy_link(rlnk); } n = ERTS_LINK_REFC(lnk); for (i = 0; i < n; ++i) { Eterm tup; Eterm *hp; ErtsMessage *msgp; msgp = erts_alloc_message_heap(rp, &rp_locks, 3, &hp, &ohp); tup = TUPLE2(hp, am_nodedown, name); erts_queue_message(rp, rp_locks, msgp, tup, am_system); } erts_proc_unlock(rp, rp_locks); } done: erts_destroy_link(lnk); } static void set_node_not_alive(void *unused) { ErlHeapFragment *bp; Eterm nodename = erts_this_dist_entry->sysname; ASSERT(erts_atomic_read_nob(&no_nodes) == 0); erts_thr_progress_block(); erts_set_this_node(am_Noname, 0); erts_is_alive = 0; send_nodes_mon_msgs(NULL, am_nodedown, nodename, am_visible, nodedown.reason); nodedown.reason = NIL; bp = nodedown.bp; nodedown.bp = NULL; erts_thr_progress_unblock(); if (bp) free_message_buffer(bp); } static ERTS_INLINE void dec_no_nodes(void) { erts_aint_t no = erts_atomic_dec_read_mb(&no_nodes); ASSERT(no >= 0); ASSERT(erts_get_scheduler_id()); /* Need to be a scheduler */ if (no == 0) erts_schedule_misc_aux_work(erts_get_scheduler_id(), set_node_not_alive, NULL); } static ERTS_INLINE void inc_no_nodes(void) { #ifdef DEBUG erts_aint_t no = erts_atomic_read_nob(&no_nodes); ASSERT(erts_is_alive ? no > 0 : no == 0); #endif erts_atomic_inc_mb(&no_nodes); } static void kill_dist_ctrl_proc(void *vpid) { Eterm pid = (Eterm) vpid; ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; Process *rp = erts_pid2proc(NULL, 0, pid, rp_locks); if (rp) { erts_send_exit_signal(NULL, rp->common.id, rp, &rp_locks, am_kill, NIL, NULL, 0); if (rp_locks) erts_proc_unlock(rp, rp_locks); } } static void schedule_kill_dist_ctrl_proc(Eterm pid) { ErtsSchedulerData *esdp = erts_get_scheduler_data(); int sched_id = 1; if (!esdp || ERTS_SCHEDULER_IS_DIRTY(esdp)) sched_id = 1; else sched_id = (int) esdp->no; erts_schedule_misc_aux_work(sched_id, kill_dist_ctrl_proc, (void *) (UWord) pid); } /* * proc is currently running or exiting process. */ int erts_do_net_exits(DistEntry *dep, Eterm reason) { Eterm nodename; if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */ DistEntry *tdep; int no_dist_ctrl = 0; Eterm nd_reason = (reason == am_no_network ? am_no_network : am_net_kernel_terminated); erts_rwmtx_rlock(&erts_dist_table_rwmtx); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) no_dist_ctrl++; for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) no_dist_ctrl++; /* KILL all port controllers */ if (no_dist_ctrl == 0) erts_rwmtx_runlock(&erts_dist_table_rwmtx); else { Eterm def_buf[128]; int i = 0; Eterm *dist_ctrl; if (no_dist_ctrl <= sizeof(def_buf)/sizeof(def_buf[0])) dist_ctrl = &def_buf[0]; else dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, sizeof(Eterm)*no_dist_ctrl); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) { ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); dist_ctrl[i++] = tdep->cid; } for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) { ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); dist_ctrl[i++] = tdep->cid; } erts_rwmtx_runlock(&erts_dist_table_rwmtx); for (i = 0; i < no_dist_ctrl; i++) { if (is_internal_pid(dist_ctrl[i])) schedule_kill_dist_ctrl_proc(dist_ctrl[i]); else { Port *prt = erts_port_lookup(dist_ctrl[i], ERTS_PORT_SFLGS_INVALID_LOOKUP); if (prt) { ASSERT(erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLG_DISTRIBUTION); erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED, prt, dist_ctrl[i], nd_reason, NULL); } } } if (dist_ctrl != &def_buf[0]) erts_free(ERTS_ALC_T_TMP, dist_ctrl); } /* * When last dist ctrl exits, node will be taken * from alive to not alive. */ ASSERT(is_nil(nodedown.reason) && !nodedown.bp); if (is_immed(nd_reason)) nodedown.reason = nd_reason; else { Eterm *hp; Uint sz = size_object(nd_reason); nodedown.bp = new_message_buffer(sz); hp = nodedown.bp->mem; nodedown.reason = copy_struct(nd_reason, sz, &hp, &nodedown.bp->off_heap); } } else { /* Call from distribution controller (port/process) */ NetExitsContext nec = {dep}; ErtsLink *nlinks; ErtsLink *node_links; ErtsMonitor *monitors; Uint32 flags; erts_atomic_set_mb(&dep->dist_cmd_scheduled, 1); erts_de_rwlock(dep); if (is_internal_port(dep->cid)) { ERTS_LC_ASSERT(erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); if (erts_port_task_is_scheduled(&dep->dist_cmd)) erts_port_task_abort(&dep->dist_cmd); } if (dep->status & ERTS_DE_SFLG_EXITING) { #ifdef DEBUG ASSERT(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT); #endif } else { dep->status |= ERTS_DE_SFLG_EXITING; erts_mtx_lock(&dep->qlock); ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT); erts_mtx_unlock(&dep->qlock); } erts_de_links_lock(dep); monitors = dep->monitors; nlinks = dep->nlinks; node_links = dep->node_links; dep->monitors = NULL; dep->nlinks = NULL; dep->node_links = NULL; erts_de_links_unlock(dep); nodename = dep->sysname; flags = dep->flags; erts_set_dist_entry_not_connected(dep); erts_de_rwunlock(dep); erts_sweep_monitors(monitors, &doit_monitor_net_exits, (void *) &nec); erts_sweep_links(nlinks, &doit_link_net_exits, (void *) &nec); erts_sweep_links(node_links, &doit_node_link_net_exits, (void *) &nec); send_nodes_mon_msgs(NULL, am_nodedown, nodename, flags & DFLAG_PUBLISHED ? am_visible : am_hidden, reason == am_normal ? am_connection_closed : reason); clear_dist_entry(dep); } dec_no_nodes(); return 1; } static Export* trap_function(Eterm func, int arity) { return erts_export_put(am_erlang, func, arity); } void init_dist(void) { init_nodes_monitors(); nodedown.reason = NIL; nodedown.bp = NULL; erts_atomic_init_nob(&no_nodes, 0); erts_atomic_init_nob(&no_caches, 0); /* Lookup/Install all references to trap functions */ dmonitor_node_trap = trap_function(am_dmonitor_node,3); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, am_dist_ctrl_put_data, 2); } #define ErtsDistOutputBuf2Binary(OB) \ ((Binary *) (((char *) (OB)) - offsetof(Binary, orig_bytes))) static ERTS_INLINE ErtsDistOutputBuf * alloc_dist_obuf(Uint size) { ErtsDistOutputBuf *obuf; Uint obuf_size = sizeof(ErtsDistOutputBuf)+sizeof(byte)*(size-1); Binary *bin = erts_bin_drv_alloc(obuf_size); obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[0]; #ifdef DEBUG obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN; ASSERT(bin == ErtsDistOutputBuf2Binary(obuf)); #endif return obuf; } static ERTS_INLINE void free_dist_obuf(ErtsDistOutputBuf *obuf) { Binary *bin = ErtsDistOutputBuf2Binary(obuf); ASSERT(obuf->dbg_pattern == ERTS_DIST_OUTPUT_BUF_DBG_PATTERN); erts_bin_release(bin); } static ERTS_INLINE Sint size_obuf(ErtsDistOutputBuf *obuf) { Binary *bin = ErtsDistOutputBuf2Binary(obuf); return bin->orig_size; } static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) { ErtsDistOutputBuf *obuf; ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); if (!dep->out_queue.last) obuf = dep->finalized_out_queue.first; else { dep->out_queue.last->next = dep->finalized_out_queue.first; obuf = dep->out_queue.first; } if (dep->tmp_out_queue.first) { dep->tmp_out_queue.last->next = obuf; obuf = dep->tmp_out_queue.first; } dep->out_queue.first = NULL; dep->out_queue.last = NULL; dep->tmp_out_queue.first = NULL; dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; return obuf; } static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf) { Sint obufsize = 0; while (obuf) { ErtsDistOutputBuf *fobuf; fobuf = obuf; obuf = obuf->next; obufsize += size_obuf(fobuf); free_dist_obuf(fobuf); } if (obufsize) { erts_mtx_lock(&dep->qlock); ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize); erts_mtx_unlock(&dep->qlock); } } static void clear_dist_entry(DistEntry *dep) { ErtsAtomCache *cache; ErtsProcList *suspendees; ErtsDistOutputBuf *obuf; erts_de_rwlock(dep); erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) NIL); cache = dep->cache; dep->cache = NULL; #ifdef DEBUG erts_de_links_lock(dep); ASSERT(!dep->nlinks); ASSERT(!dep->node_links); ASSERT(!dep->monitors); erts_de_links_unlock(dep); #endif erts_mtx_lock(&dep->qlock); erts_atomic64_set_nob(&dep->in, 0); erts_atomic64_set_nob(&dep->out, 0); obuf = clear_de_out_queues(dep); dep->status = 0; suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); erts_mtx_unlock(&dep->qlock); erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); dep->send = NULL; erts_de_rwunlock(dep); erts_resume_processes(suspendees); delete_cache(cache); free_de_out_queues(dep, obuf); } int erts_dsend_context_dtor(Binary* ctx_bin) { ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin); switch (ctx->dss.phase) { case ERTS_DSIG_SEND_PHASE_MSG_SIZE: DESTROY_SAVED_WSTACK(&ctx->dss.u.sc.wstack); break; case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: DESTROY_SAVED_WSTACK(&ctx->dss.u.ec.wstack); break; default:; } if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) { free_dist_obuf(ctx->dss.obuf); } if (ctx->deref_dep) erts_deref_dist_entry(ctx->dep); return 1; } Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx) { struct exported_ctx { ErtsSendContext ctx; ErtsAtomCacheMap acm; }; Binary* ctx_bin = erts_create_magic_binary(sizeof(struct exported_ctx), erts_dsend_context_dtor); struct exported_ctx* dst = ERTS_MAGIC_BIN_DATA(ctx_bin); Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE); sys_memcpy(&dst->ctx, ctx, sizeof(ErtsSendContext)); ASSERT(ctx->dss.ctl == make_tuple(ctx->ctl_heap)); dst->ctx.dss.ctl = make_tuple(dst->ctx.ctl_heap); if (ctx->dss.acmp) { sys_memcpy(&dst->acm, ctx->dss.acmp, sizeof(ErtsAtomCacheMap)); dst->ctx.dss.acmp = &dst->acm; } return erts_mk_magic_ref(&hp, &MSO(p), ctx_bin); } /* * The erts_dsig_send_*() functions implemented below, sends asynchronous * distributed signals to other Erlang nodes. Before sending a distributed * signal, you need to prepare the operation by calling erts_dsig_prepare() * (see dist.h). * * Note that the distributed signal send operation is truly asynchronous, * and the signal is not guaranteed to reach the receiver if the connection * goes down before the signal has reached the receiver. */ /* ** Send a DOP_LINK link message */ int erts_dsig_send_link(ErtsDSigData *dsdp, Eterm local, Eterm remote) { DeclareTmpHeapNoproc(ctl_heap,4); Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_LINK), local, remote); int res; UseTmpHeapNoproc(4); res = dsig_send_ctl(dsdp, ctl, 0); UnUseTmpHeapNoproc(4); return res; } int erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote) { DeclareTmpHeapNoproc(ctl_heap,4); Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_UNLINK), local, remote); int res; UseTmpHeapNoproc(4); res = dsig_send_ctl(dsdp, ctl, 0); UnUseTmpHeapNoproc(4); return res; } /* A local process that's being monitored by a remote one exits. We send: {DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason}, which is rather sad as only the ref is needed, no pid's... */ int erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, Eterm ref, Eterm reason) { Eterm ctl; DeclareTmpHeapNoproc(ctl_heap,6); int res; UseTmpHeapNoproc(6); ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT), watched, watcher, ref, reason); #ifdef DEBUG erts_de_links_lock(dsdp->dep); ASSERT(!erts_lookup_monitor(dsdp->dep->monitors, ref)); erts_de_links_unlock(dsdp->dep); #endif res = dsig_send_ctl(dsdp, ctl, 1); UnUseTmpHeapNoproc(6); return res; } /* We want to monitor a process (named or unnamed) on another node, we send: {DOP_MONITOR_P, Local pid, Remote pid or name, Ref}, which is exactly what's needed on the other side... */ int erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, Eterm ref) { Eterm ctl; DeclareTmpHeapNoproc(ctl_heap,5); int res; UseTmpHeapNoproc(5); ctl = TUPLE4(&ctl_heap[0], make_small(DOP_MONITOR_P), watcher, watched, ref); res = dsig_send_ctl(dsdp, ctl, 0); UnUseTmpHeapNoproc(5); return res; } /* A local process monitoring a remote one wants to stop monitoring, either because of a demonitor bif call or because the local process died. We send {DOP_DEMONITOR_P, Local pid, Remote pid or name, ref}, which is once again rather redundant as only the ref will be needed on the other side... */ int erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, Eterm ref, int force) { Eterm ctl; DeclareTmpHeapNoproc(ctl_heap,5); int res; UseTmpHeapNoproc(5); ctl = TUPLE4(&ctl_heap[0], make_small(DOP_DEMONITOR_P), watcher, watched, ref); res = dsig_send_ctl(dsdp, ctl, force); UnUseTmpHeapNoproc(5); return res; } int erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) { Eterm ctl; Eterm token = NIL; Process *sender = ctx->dsd.proc; int res; #ifdef USE_VM_PROBES Sint tok_label = 0; Sint tok_lastcnt = 0; Sint tok_serial = 0; Uint msize = 0; DTRACE_CHARBUF(node_name, 64); DTRACE_CHARBUF(sender_name, 64); DTRACE_CHARBUF(receiver_name, 64); #endif if (have_seqtrace(SEQ_TRACE_TOKEN(sender))) { seq_trace_update_send(sender); token = SEQ_TRACE_TOKEN(sender); seq_trace_output(token, message, SEQ_TRACE_SEND, remote, sender); } #ifdef USE_VM_PROBES *node_name = *sender_name = *receiver_name = '\0'; if (DTRACE_ENABLED(message_send) || DTRACE_ENABLED(message_send_remote)) { erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)), "%T", ctx->dsd.dep->sysname); erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), "%T", sender->common.id); erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)), "%T", remote); msize = size_object(message); if (have_seqtrace(token)) { tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); } } #endif if (token != NIL) { Eterm el1, el2; if (ctx->dep->flags & DFLAG_SEND_SENDER) { el1 = make_small(DOP_SEND_SENDER_TT); el2 = sender->common.id; } else { el1 = make_small(DOP_SEND_TT); el2 = am_Empty; } ctl = TUPLE4(&ctx->ctl_heap[0], el1, el2, remote, token); } else { Eterm el1, el2; if (ctx->dep->flags & DFLAG_SEND_SENDER) { el1 = make_small(DOP_SEND_SENDER); el2 = sender->common.id; } else { el1 = make_small(DOP_SEND); el2 = am_Empty; } ctl = TUPLE3(&ctx->ctl_heap[0], el1, el2, remote); } DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); DTRACE7(message_send_remote, sender_name, node_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); ctx->dss.ctl = ctl; ctx->dss.msg = message; ctx->dss.force_busy = 0; res = erts_dsig_send(&ctx->dsd, &ctx->dss); return res; } int erts_dsig_send_reg_msg(Eterm remote_name, Eterm message, ErtsSendContext* ctx) { Eterm ctl; Eterm token = NIL; Process *sender = ctx->dsd.proc; int res; #ifdef USE_VM_PROBES Sint tok_label = 0; Sint tok_lastcnt = 0; Sint tok_serial = 0; Uint32 msize = 0; DTRACE_CHARBUF(node_name, 64); DTRACE_CHARBUF(sender_name, 64); DTRACE_CHARBUF(receiver_name, 128); #endif if (have_seqtrace(SEQ_TRACE_TOKEN(sender))) { seq_trace_update_send(sender); token = SEQ_TRACE_TOKEN(sender); seq_trace_output(token, message, SEQ_TRACE_SEND, remote_name, sender); } #ifdef USE_VM_PROBES *node_name = *sender_name = *receiver_name = '\0'; if (DTRACE_ENABLED(message_send) || DTRACE_ENABLED(message_send_remote)) { erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)), "%T", ctx->dsd.dep->sysname); erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), "%T", sender->common.id); erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)), "{%T,%s}", remote_name, node_name); msize = size_object(message); if (have_seqtrace(token)) { tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); } } #endif if (token != NIL) ctl = TUPLE5(&ctx->ctl_heap[0], make_small(DOP_REG_SEND_TT), sender->common.id, am_Empty, remote_name, token); else ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_REG_SEND), sender->common.id, am_Empty, remote_name); DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); DTRACE7(message_send_remote, sender_name, node_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); ctx->dss.ctl = ctl; ctx->dss.msg = message; ctx->dss.force_busy = 0; res = erts_dsig_send(&ctx->dsd, &ctx->dss); return res; } /* local has died, deliver the exit signal to remote */ int erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason, Eterm token) { Eterm ctl; DeclareTmpHeapNoproc(ctl_heap,6); int res; #ifdef USE_VM_PROBES Process *sender = dsdp->proc; Sint tok_label = 0; Sint tok_lastcnt = 0; Sint tok_serial = 0; DTRACE_CHARBUF(node_name, 64); DTRACE_CHARBUF(sender_name, 64); DTRACE_CHARBUF(remote_name, 128); DTRACE_CHARBUF(reason_str, 128); #endif UseTmpHeapNoproc(6); if (have_seqtrace(token)) { seq_trace_update_send(dsdp->proc); seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local); ctl = TUPLE5(&ctl_heap[0], make_small(DOP_EXIT_TT), local, remote, token, reason); } else { ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason); } #ifdef USE_VM_PROBES *node_name = *sender_name = *remote_name = '\0'; if (DTRACE_ENABLED(process_exit_signal_remote)) { erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)), "%T", dsdp->dep->sysname); erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), "%T", sender->common.id); erts_snprintf(remote_name, sizeof(DTRACE_CHARBUF_NAME(remote_name)), "{%T,%s}", remote, node_name); erts_snprintf(reason_str, sizeof(DTRACE_CHARBUF_NAME(reason_str)), "%T", reason); if (have_seqtrace(token)) { tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); } } #endif DTRACE7(process_exit_signal_remote, sender_name, node_name, remote_name, reason_str, tok_label, tok_lastcnt, tok_serial); /* forced, i.e ignore busy */ res = dsig_send_ctl(dsdp, ctl, 1); UnUseTmpHeapNoproc(6); return res; } int erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason) { DeclareTmpHeapNoproc(ctl_heap,5); int res; Eterm ctl; UseTmpHeapNoproc(5); ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason); /* forced, i.e ignore busy */ res = dsig_send_ctl(dsdp, ctl, 1); UnUseTmpHeapNoproc(5); return res; } int erts_dsig_send_exit2(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason) { DeclareTmpHeapNoproc(ctl_heap,5); int res; Eterm ctl; UseTmpHeapNoproc(5); ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT2), local, remote, reason); res = dsig_send_ctl(dsdp, ctl, 0); UnUseTmpHeapNoproc(5); return res; } int erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote) { DeclareTmpHeapNoproc(ctl_heap,4); int res; Eterm ctl; UseTmpHeapNoproc(4); ctl = TUPLE3(&ctl_heap[0], make_small(DOP_GROUP_LEADER), leader, remote); res = dsig_send_ctl(dsdp, ctl, 0); UnUseTmpHeapNoproc(4); return res; } #if defined(PURIFY) # define PURIFY_MSG(msg) \ purify_printf("%s, line %d: %s", __FILE__, __LINE__, msg) #elif defined(VALGRIND) #include #include #ifndef HAVE_VALGRIND_PRINTF_XML #define VALGRIND_PRINTF_XML VALGRIND_PRINTF #endif # define PURIFY_MSG(msg) \ do { \ char buf__[1]; size_t bufsz__ = sizeof(buf__); \ if (erts_sys_getenv_raw("VALGRIND_LOG_XML", buf__, &bufsz__) >= 0) { \ VALGRIND_PRINTF_XML("" \ "%s, line %d: %s\n", \ __FILE__, __LINE__, msg); \ } else { \ VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg); \ } \ } while (0) #else # define PURIFY_MSG(msg) #endif /* ** Input from distribution port. ** Input follows the distribution protocol v4.5 ** ** The protocol is a 4 byte header protocol ** the DOP_DATA is stripped by driver_output ** ** assert hlen == 0 !!! */ int erts_net_message(Port *prt, DistEntry *dep, byte *hbuf, ErlDrvSizeT hlen, byte *buf, ErlDrvSizeT len) { ErtsDistExternal ede; byte *t; Sint ctl_len; Eterm arg; Eterm from, to; Eterm watcher, watched; Eterm ref; Eterm *tuple; Eterm reason; Process* rp; DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE); Eterm* ctl = ctl_default; ErtsHeapFactory factory; Eterm* hp; Sint type; Eterm token; Eterm token_size; ErtsMonitor *mon; ErtsLink *lnk; Uint tuple_arity; int res; Uint32 connection_id; #ifdef ERTS_DIST_MSG_DBG ErlDrvSizeT orig_len = len; #endif UseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); ERTS_CHK_NO_PROC_LOCKS; ERTS_LC_ASSERT(!prt || erts_lc_is_port_locked(prt)); if (!erts_is_alive) { UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); return 0; } ASSERT(hlen == 0); if (len == 0) { /* HANDLE TICK !!! */ UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); return 0; } #ifdef ERTS_RAW_DIST_MSG_DBG erts_fprintf(stderr, "<< "); bw(buf, len); #endif if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) t = buf; else { /* Skip PASS_THROUGH */ t = buf+1; len--; } res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache, &connection_id); switch (res) { case ERTS_PREP_DIST_EXT_CLOSED: return 0; /* Connection not alive; ignore signal... */ case ERTS_PREP_DIST_EXT_FAILED: #ifdef ERTS_DIST_MSG_DBG erts_fprintf(stderr, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n"); bw(buf, orig_len); #endif goto data_error; case ERTS_PREP_DIST_EXT_SUCCESS: ctl_len = erts_decode_dist_ext_size(&ede); if (ctl_len < 0) { #ifdef ERTS_DIST_MSG_DBG erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n"); bw(buf, orig_len); #endif PURIFY_MSG("data error"); goto data_error; } break; default: ERTS_INTERNAL_ERROR("Unexpected result from erts_prepare_dist_ext()"); break; } if (ctl_len > DIST_CTL_DEFAULT_SIZE) { ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm)); } hp = ctl; erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF); arg = erts_decode_dist_ext(&factory, &ede); if (is_non_value(arg)) { #ifdef ERTS_DIST_MSG_DBG erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext(CTL) failed:\n"); bw(buf, orig_len); #endif PURIFY_MSG("data error"); goto decode_error; } ctl_len = t - buf; #ifdef ERTS_DIST_MSG_DBG erts_fprintf(stderr, "<<%s CTL: %T\n", len != orig_len ? "P" : " ", arg); #endif if (is_not_tuple(arg) || (tuple = tuple_val(arg), (tuple_arity = arityval(*tuple)) < 1) || is_not_small(tuple[1])) { goto invalid_message; } token_size = 0; token = NIL; switch (type = unsigned_val(tuple[1])) { case DOP_LINK: if (tuple_arity != 3) { goto invalid_message; } from = tuple[2]; to = tuple[3]; /* local proc to link to */ if (is_not_pid(from) || is_not_pid(to)) { goto invalid_message; } rp = erts_pid2proc_opt(NULL, 0, to, ERTS_PROC_LOCK_LINK, ERTS_P2P_FLG_ALLOW_OTHER_X); if (!rp) { /* This is tricky (we MUST force a distributed send) */ ErtsDSigData dsd; int code; code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_exit(&dsd, to, from, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); } break; } erts_de_links_lock(dep); res = erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, from); if (res < 0) { /* It was already there! Lets skip the rest... */ erts_de_links_unlock(dep); erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); break; } lnk = erts_add_or_lookup_link(&(dep->nlinks), LINK_PID, rp->common.id); erts_add_link(&(ERTS_LINK_ROOT(lnk)), LINK_PID, from); erts_de_links_unlock(dep); if (IS_TRACED_FL(rp, F_TRACE_PROCS)) trace_proc(NULL, 0, rp, am_getting_linked, from); erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); break; case DOP_UNLINK: { ErtsDistLinkData dld; if (tuple_arity != 3) { goto invalid_message; } from = tuple[2]; to = tuple[3]; if (is_not_pid(from) || is_not_pid(to)) { goto invalid_message; } rp = erts_pid2proc_opt(NULL, 0, to, ERTS_PROC_LOCK_LINK, ERTS_P2P_FLG_ALLOW_OTHER_X); if (!rp) break; lnk = erts_remove_link(&ERTS_P_LINKS(rp), from); if (IS_TRACED_FL(rp, F_TRACE_PROCS) && lnk != NULL) { trace_proc(NULL, 0, rp, am_getting_unlinked, from); } erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); erts_remove_dist_link(&dld, to, from, dep); erts_destroy_dist_link(&dld); if (lnk) erts_destroy_link(lnk); break; } case DOP_MONITOR_P: { /* A remote process wants to monitor us, we get: {DOP_MONITOR_P, Remote pid, local pid or name, ref} */ Eterm name; if (tuple_arity != 4) { goto invalid_message; } watcher = tuple[2]; watched = tuple[3]; /* local proc to monitor */ ref = tuple[4]; if (is_not_ref(ref)) { goto invalid_message; } if (is_atom(watched)) { name = watched; rp = erts_whereis_process(NULL, 0, watched, ERTS_PROC_LOCK_LINK, ERTS_P2P_FLG_ALLOW_OTHER_X); } else { name = NIL; rp = erts_pid2proc_opt(NULL, 0, watched, ERTS_PROC_LOCK_LINK, ERTS_P2P_FLG_ALLOW_OTHER_X); } if (!rp) { ErtsDSigData dsd; int code; code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); } } else { if (is_atom(watched)) watched = rp->common.id; erts_de_links_lock(dep); erts_add_monitor(&(dep->monitors), MON_ORIGIN, ref, watched, name); erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, ref, watcher, name); erts_de_links_unlock(dep); erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); } break; } case DOP_DEMONITOR_P: /* A remote node informs us that a local pid in no longer monitored We get {DOP_DEMONITOR_P, Remote pid, Local pid or name, ref}, We need only the ref of course */ if (tuple_arity != 4) { goto invalid_message; } /* watcher = tuple[2]; */ /* watched = tuple[3]; May be an atom in case of monitor name */ ref = tuple[4]; if(is_not_ref(ref)) { goto invalid_message; } erts_de_links_lock(dep); mon = erts_remove_monitor(&(dep->monitors),ref); erts_de_links_unlock(dep); /* ASSERT(mon != NULL); can happen in case of broken dist message */ if (mon == NULL) { break; } watched = mon->u.pid; erts_destroy_monitor(mon); rp = erts_pid2proc_opt(NULL, 0, watched, ERTS_PROC_LOCK_LINK, ERTS_P2P_FLG_ALLOW_OTHER_X); if (!rp) { break; } mon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref); erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); ASSERT(mon != NULL); if (mon == NULL) { break; } erts_destroy_monitor(mon); break; case DOP_REG_SEND_TT: if (tuple_arity != 5) { goto invalid_message; } token_size = size_object(tuple[5]); /* Fall through ... */ case DOP_REG_SEND: /* {DOP_REG_SEND, From, Cookie, ToName} -- Message */ /* {DOP_REG_SEND_TT, From, Cookie, ToName, TraceToken} -- Message */ /* * There is intentionally no testing of the cookie (it is always '') * from R9B and onwards. */ if (type != DOP_REG_SEND_TT && tuple_arity != 4) { goto invalid_message; } #ifdef ERTS_DIST_MSG_DBG dist_msg_dbg(&ede, "MSG", buf, orig_len); #endif from = tuple[2]; to = tuple[4]; if (is_not_pid(from) || is_not_atom(to)){ goto invalid_message; } rp = erts_whereis_process(NULL, 0, to, 0, 0); if (rp) { Uint xsize = (type == DOP_REG_SEND ? 0 : ERTS_HEAP_FRAG_SIZE(token_size)); ErtsProcLocks locks = 0; ErtsDistExternal *ede_copy; ede_copy = erts_make_dist_ext_copy(&ede, xsize); if (type == DOP_REG_SEND) { token = NIL; } else { ErlHeapFragment *heap_frag; ErlOffHeap *ohp; ASSERT(xsize); heap_frag = erts_dist_ext_trailer(ede_copy); ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size); hp = heap_frag->mem; ohp = &heap_frag->off_heap; token = tuple[5]; token = copy_struct(token, token_size, &hp, ohp); } erts_queue_dist_message(rp, locks, ede_copy, token, from); if (locks) erts_proc_unlock(rp, locks); } break; case DOP_SEND_SENDER_TT: { Uint xsize; case DOP_SEND_TT: if (tuple_arity != 4) { goto invalid_message; } token = tuple[4]; token_size = size_object(token); xsize = ERTS_HEAP_FRAG_SIZE(token_size); goto send_common; case DOP_SEND_SENDER: case DOP_SEND: token = NIL; xsize = 0; if (tuple_arity != 3) goto invalid_message; send_common: /* * If DOP_SEND_SENDER or DOP_SEND_SENDER_TT element 2 contains * the sender pid (i.e. DFLAG_SEND_SENDER is set); otherwise, * the atom '' (empty cookie). */ ASSERT((type == DOP_SEND_SENDER || type == DOP_SEND_SENDER_TT) ? (is_pid(tuple[2]) && (dep->flags & DFLAG_SEND_SENDER)) : tuple[2] == am_Empty); #ifdef ERTS_DIST_MSG_DBG dist_msg_dbg(&ede, "MSG", buf, orig_len); #endif to = tuple[3]; if (is_not_pid(to)) { goto invalid_message; } rp = erts_proc_lookup(to); if (rp) { ErtsProcLocks locks = 0; ErtsDistExternal *ede_copy; ede_copy = erts_make_dist_ext_copy(&ede, xsize); if (is_not_nil(token)) { ErlHeapFragment *heap_frag; ErlOffHeap *ohp; ASSERT(xsize); heap_frag = erts_dist_ext_trailer(ede_copy); ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size); hp = heap_frag->mem; ohp = &heap_frag->off_heap; token = copy_struct(token, token_size, &hp, ohp); } erts_queue_dist_message(rp, locks, ede_copy, token, am_Empty); if (locks) erts_proc_unlock(rp, locks); } break; } case DOP_MONITOR_P_EXIT: { /* We are monitoring a process on the remote node which dies, we get {DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */ DeclareTmpHeapNoproc(lhp,3); Eterm sysname; ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_MSG_SEND|ERTS_PROC_LOCK_LINK; if (tuple_arity != 5) { goto invalid_message; } /* watched = tuple[2]; */ /* remote proc which died */ /* watcher = tuple[3]; */ ref = tuple[4]; reason = tuple[5]; if(is_not_ref(ref)) { goto invalid_message; } erts_de_links_lock(dep); sysname = dep->sysname; mon = erts_remove_monitor(&(dep->monitors), ref); /* * If demonitor was performed at the same time as the * monitored process exits, monitoring side will have * removed info about monitor. In this case, do nothing * and everything will be as it should. */ erts_de_links_unlock(dep); if (mon == NULL) { break; } rp = erts_pid2proc(NULL, 0, mon->u.pid, rp_locks); erts_destroy_monitor(mon); if (rp == NULL) { break; } mon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref); if (mon == NULL) { erts_proc_unlock(rp, rp_locks); break; } UseTmpHeapNoproc(3); watched = (is_not_nil(mon->name) ? TUPLE2(&lhp[0], mon->name, sysname) : mon->u.pid); erts_queue_monitor_message(rp, &rp_locks, ref, am_process, watched, reason); erts_proc_unlock(rp, rp_locks); erts_destroy_monitor(mon); UnUseTmpHeapNoproc(3); break; } case DOP_EXIT_TT: case DOP_EXIT: { ErtsDistLinkData dld; ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND; /* 'from', which 'to' is linked to, died */ if (type == DOP_EXIT) { if (tuple_arity != 4) { goto invalid_message; } from = tuple[2]; to = tuple[3]; reason = tuple[4]; token = NIL; } else { if (tuple_arity != 5) { goto invalid_message; } from = tuple[2]; to = tuple[3]; token = tuple[4]; reason = tuple[5]; } if (is_not_pid(from) || is_not_internal_pid(to)) { goto invalid_message; } rp = erts_pid2proc(NULL, 0, to, rp_locks); if (!rp) lnk = NULL; else { lnk = erts_remove_link(&ERTS_P_LINKS(rp), from); /* If lnk == NULL, we have unlinked on this side, i.e. * ignore exit. */ if (lnk) { int xres; #if 0 /* Arndt: Maybe it should never be 'kill', but it can be, namely when a linked process does exit(kill). Until we know whether that is incorrect and what should happen instead, we leave the assertion out. */ ASSERT(reason != am_kill); /* should never be kill (killed) */ #endif xres = erts_send_exit_signal(NULL, from, rp, &rp_locks, reason, token, NULL, ERTS_XSIG_FLG_IGN_KILL); if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) { /* We didn't exit the process and it is traced */ if (rp_locks & ERTS_PROC_LOCKS_XSIG_SEND) { erts_proc_unlock(rp, ERTS_PROC_LOCKS_XSIG_SEND); rp_locks &= ~ERTS_PROC_LOCKS_XSIG_SEND; } trace_proc(NULL, 0, rp, am_getting_unlinked, from); } } erts_proc_unlock(rp, rp_locks); } erts_remove_dist_link(&dld, to, from, dep); if (lnk) erts_destroy_link(lnk); erts_destroy_dist_link(&dld); break; } case DOP_EXIT2_TT: case DOP_EXIT2: { ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; /* 'from' is send an exit signal to 'to' */ if (type == DOP_EXIT2) { if (tuple_arity != 4) { goto invalid_message; } from = tuple[2]; to = tuple[3]; reason = tuple[4]; token = NIL; } else { if (tuple_arity != 5) { goto invalid_message; } from = tuple[2]; to = tuple[3]; token = tuple[4]; reason = tuple[5]; } if (is_not_pid(from) || is_not_internal_pid(to)) { goto invalid_message; } rp = erts_pid2proc(NULL, 0, to, rp_locks); if (rp) { (void) erts_send_exit_signal(NULL, from, rp, &rp_locks, reason, token, NULL, 0); erts_proc_unlock(rp, rp_locks); } break; } case DOP_GROUP_LEADER: if (tuple_arity != 3) { goto invalid_message; } from = tuple[2]; /* Group leader */ to = tuple[3]; /* new member */ if (is_not_pid(from) || is_not_pid(to)) { goto invalid_message; } rp = erts_pid2proc(NULL, 0, to, ERTS_PROC_LOCK_MAIN); if (!rp) break; rp->group_leader = STORE_NC_IN_PROC(rp, from); erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); break; default: goto invalid_message; } erts_factory_close(&factory); if (ctl != ctl_default) { erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl); } UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); ERTS_CHK_NO_PROC_LOCKS; return 0; invalid_message: { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "Invalid distribution message: %.200T", arg); erts_send_error_to_logger_nogl(dsbufp); } decode_error: PURIFY_MSG("data error"); erts_factory_close(&factory); if (ctl != ctl_default) { erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl); } data_error: UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); erts_kill_dist_connection(dep, connection_id); ERTS_CHK_NO_PROC_LOCKS; return -1; } static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy) { struct erts_dsig_send_context ctx; int ret; ctx.ctl = ctl; ctx.msg = THE_NON_VALUE; ctx.force_busy = force_busy; ctx.phase = ERTS_DSIG_SEND_PHASE_INIT; #ifdef DEBUG ctx.reds = 1; /* provoke assert below (no reduction count without msg) */ #endif ret = erts_dsig_send(dsdp, &ctx); ASSERT(ret != ERTS_DSIG_SEND_CONTINUE); return ret; } static ERTS_INLINE void notify_dist_data(Process *c_p, Eterm pid) { Process *rp; ErtsProcLocks rp_locks; ASSERT(erts_get_scheduler_data() && !ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data())); ASSERT(is_internal_pid(pid)); if (c_p && c_p->common.id == pid) { rp = c_p; rp_locks = ERTS_PROC_LOCK_MAIN; } else { rp = erts_proc_lookup(pid); rp_locks = 0; } if (rp) { ErtsMessage *mp = erts_alloc_message(0, NULL); erts_queue_message(rp, rp_locks, mp, am_dist_data, am_system); } } int erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) { int retval; Sint initial_reds = ctx->reds; Eterm cid; while (1) { switch (ctx->phase) { case ERTS_DSIG_SEND_PHASE_INIT: ctx->flags = dsdp->dep->flags; ctx->c_p = dsdp->proc; if (!ctx->c_p || dsdp->no_suspend) ctx->force_busy = 1; ERTS_LC_ASSERT(!ctx->c_p || (ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(ctx->c_p))); if (!erts_is_alive) return ERTS_DSIG_SEND_OK; if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) { ctx->acmp = erts_get_atom_cache_map(ctx->c_p); ctx->max_finalize_prepend = 0; } else { ctx->acmp = NULL; ctx->max_finalize_prepend = 3; } #ifdef ERTS_DIST_MSG_DBG erts_fprintf(stderr, ">> CTL: %T\n", ctx->ctl); if (is_value(ctx->msg)) erts_fprintf(stderr, " MSG: %T\n", ctx->msg); #endif ctx->data_size = ctx->max_finalize_prepend; erts_reset_atom_cache_map(ctx->acmp); erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size); if (is_non_value(ctx->msg)) { ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; break; } ctx->u.sc.wstack.wstart = NULL; ctx->u.sc.flags = ctx->flags; ctx->u.sc.level = 0; ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; case ERTS_DSIG_SEND_PHASE_MSG_SIZE: if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) { retval = ERTS_DSIG_SEND_CONTINUE; goto done; } ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; case ERTS_DSIG_SEND_PHASE_ALLOC: erts_finalize_atom_cache_map(ctx->acmp, ctx->flags); ctx->dhdr_ext_size = erts_encode_ext_dist_header_size(ctx->acmp); ctx->data_size += ctx->dhdr_ext_size; ctx->obuf = alloc_dist_obuf(ctx->data_size); ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size; /* Encode internal version of dist header */ ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp); /* Encode control message */ erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL); if (is_non_value(ctx->msg)) { ctx->obuf->msg_start = NULL; ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; break; } ctx->u.ec.flags = ctx->flags; ctx->u.ec.level = 0; ctx->u.ec.wstack.wstart = NULL; ctx->obuf->msg_start = ctx->obuf->ext_endp; ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) { retval = ERTS_DSIG_SEND_CONTINUE; goto done; } ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; case ERTS_DSIG_SEND_PHASE_FIN: { DistEntry *dep = dsdp->dep; int suspended = 0; int resume = 0; ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp); ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->max_finalize_prepend); ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size); ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp; /* * Signal encoded; now verify that the connection still exists, * and if so enqueue the signal and schedule it for send. */ ctx->obuf->next = NULL; erts_de_rlock(dep); cid = dep->cid; if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED)) || dep->status & ERTS_DE_SFLG_EXITING || dep->connection_id != dsdp->connection_id) { /* Not the same connection as when we started; drop message... */ erts_de_runlock(dep); free_dist_obuf(ctx->obuf); } else { Sint qsize; erts_aint32_t qflgs; ErtsProcList *plp = NULL; Eterm notify_proc = NIL; Sint obsz = size_obuf(ctx->obuf); erts_mtx_lock(&dep->qlock); qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) obsz); ASSERT(qsize >= obsz); qflgs = erts_atomic32_read_nob(&dep->qflgs); if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) { erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY); qflgs |= ERTS_DE_QFLG_BUSY; } if (qsize == obsz && (qflgs & ERTS_DE_QFLG_REQ_INFO)) { /* Previously empty queue and info requested... */ qflgs = erts_atomic32_read_band_mb(&dep->qflgs, ~ERTS_DE_QFLG_REQ_INFO); if (qflgs & ERTS_DE_QFLG_REQ_INFO) { notify_proc = dep->cid; ASSERT(is_internal_pid(notify_proc)); } /* else: requester will send itself the message... */ qflgs &= ~ERTS_DE_QFLG_REQ_INFO; } if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) { erts_mtx_unlock(&dep->qlock); plp = erts_proclist_create(ctx->c_p); erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL); suspended = 1; erts_mtx_lock(&dep->qlock); } /* Enqueue obuf on dist entry */ if (dep->out_queue.last) dep->out_queue.last->next = ctx->obuf; else dep->out_queue.first = ctx->obuf; dep->out_queue.last = ctx->obuf; if (!ctx->force_busy) { qflgs = erts_atomic32_read_nob(&dep->qflgs); if (!(qflgs & ERTS_DE_QFLG_BUSY)) { if (suspended) resume = 1; /* was busy when we started, but isn't now */ #ifdef USE_VM_PROBES if (resume && DTRACE_ENABLED(dist_port_not_busy)) { DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", cid); erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), "%T", dep->sysname); DTRACE3(dist_port_not_busy, erts_this_node_sysname, port_str, remote_str); } #endif } else { /* Enqueue suspended process on dist entry */ ASSERT(plp); erts_proclist_store_last(&dep->suspended, plp); } } erts_mtx_unlock(&dep->qlock); if (!(dep->status & ERTS_DE_SFLG_PENDING)) { if (is_internal_port(dep->cid)) erts_schedule_dist_command(NULL, dep); } else { notify_proc = NIL; } erts_de_runlock(dep); if (is_internal_pid(notify_proc)) notify_dist_data(ctx->c_p, notify_proc); if (resume) { erts_resume(ctx->c_p, ERTS_PROC_LOCK_MAIN); erts_proclist_destroy(plp); /* * Note that the calling process still have to yield as if it * suspended. If not, the calling process could later be * erroneously scheduled when it shouldn't be. */ } } ctx->obuf = NULL; if (suspended) { #ifdef USE_VM_PROBES if (!resume && DTRACE_ENABLED(dist_port_busy)) { DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); DTRACE_CHARBUF(pid_str, 16); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", cid); erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), "%T", dep->sysname); erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", ctx->c_p->common.id); DTRACE4(dist_port_busy, erts_this_node_sysname, port_str, remote_str, pid_str); } #endif if (!resume && erts_system_monitor_flags.busy_dist_port) monitor_generic(ctx->c_p, am_busy_dist_port, cid); retval = ERTS_DSIG_SEND_YIELD; } else { retval = ERTS_DSIG_SEND_OK; } goto done; } default: erts_exit(ERTS_ABORT_EXIT, "dsig_send invalid phase (%d)\n", (int)ctx->phase); } } done: if (ctx->msg && ctx->c_p) { BUMP_REDS(ctx->c_p, (initial_reds - ctx->reds) / TERM_TO_BINARY_LOOP_FACTOR); } return retval; } static Uint dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) { int fpe_was_unmasked; ErlDrvSizeT size; char *bufp; ERTS_CHK_NO_PROC_LOCKS; ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); if (!obuf) { size = 0; bufp = NULL; } else { size = obuf->ext_endp - obuf->extp; bufp = (char*) obuf->extp; } #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_output)) { DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), "%T", prt->dist_entry->sysname); DTRACE4(dist_output, erts_this_node_sysname, port_str, remote_str, size); } #endif prt->caller = NIL; fpe_was_unmasked = erts_block_fpe(); (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, bufp, size); erts_unblock_fpe(fpe_was_unmasked); return size; } static Uint dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) { int fpe_was_unmasked; ErlDrvSizeT size; SysIOVec iov[2]; ErlDrvBinary* bv[2]; ErlIOVec eiov; ERTS_CHK_NO_PROC_LOCKS; ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); iov[0].iov_base = NULL; iov[0].iov_len = 0; bv[0] = NULL; if (!obuf) { size = 0; eiov.vsize = 1; } else { size = obuf->ext_endp - obuf->extp; eiov.vsize = 2; iov[1].iov_base = obuf->extp; iov[1].iov_len = size; bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); } eiov.size = size; eiov.iov = iov; eiov.binv = bv; if (size > (Uint) INT_MAX) erts_exit(ERTS_DUMP_EXIT, "Absurdly large distribution output data buffer " "(%beu bytes) passed.\n", size); ASSERT(prt->drv_ptr->outputv); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_outputv)) { DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), "%T", prt->dist_entry->sysname); DTRACE4(dist_outputv, erts_this_node_sysname, port_str, remote_str, size); } #endif prt->caller = NIL; fpe_was_unmasked = erts_block_fpe(); (*prt->drv_ptr->outputv)((ErlDrvData) prt->drv_data, &eiov); erts_unblock_fpe(fpe_was_unmasked); return size; } #if defined(ARCH_64) #define ERTS_PORT_REDS_MASK__ 0x003fffffffffffffL #elif defined(ARCH_32) #define ERTS_PORT_REDS_MASK__ 0x003fffff #else # error "Ohh come on ... !?!" #endif #define ERTS_PORT_REDS_DIST_CMD_START 5 #define ERTS_PORT_REDS_DIST_CMD_FINALIZE 3 #define ERTS_PORT_REDS_DIST_CMD_EXIT 200 #define ERTS_PORT_REDS_DIST_CMD_RESUMED 5 #define ERTS_PORT_REDS_DIST_CMD_DATA(SZ) \ ((SZ) < (1 << 10) \ ? ((Sint) 1) \ : ((((Sint) (SZ)) >> 10) & ((Sint) ERTS_PORT_REDS_MASK__))) int erts_dist_command(Port *prt, int reds_limit) { Sint reds = ERTS_PORT_REDS_DIST_CMD_START; Uint32 status; Uint32 flags; Sint qsize, obufsize = 0; ErtsDistOutputQueue oq, foq; DistEntry *dep = prt->dist_entry; Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); erts_aint32_t sched_flags; ErtsSchedulerData *esdp = erts_get_scheduler_data(); ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); erts_atomic_set_mb(&dep->dist_cmd_scheduled, 0); erts_de_rlock(dep); flags = dep->flags; status = dep->status; send = dep->send; erts_de_runlock(dep); if (status & ERTS_DE_SFLG_EXITING) { erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1); return reds + ERTS_PORT_REDS_DIST_CMD_EXIT; } ASSERT(send); /* * We need to remove both out queues from the * dist entry while passing it to port command; * otherwise, port command will free the buffers * in the queues on failure and we'll end up with * a mess. */ erts_mtx_lock(&dep->qlock); oq.first = dep->out_queue.first; oq.last = dep->out_queue.last; dep->out_queue.first = NULL; dep->out_queue.last = NULL; erts_mtx_unlock(&dep->qlock); foq.first = dep->finalized_out_queue.first; foq.last = dep->finalized_out_queue.last; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; sched_flags = erts_atomic32_read_nob(&prt->sched.flags); if (reds > reds_limit) goto preempted; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) { int preempt = 0; do { Uint size; ErtsDistOutputBuf *fob; size = (*send)(prt, foq.first); erts_atomic64_inc_nob(&dep->out); esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG erts_fprintf(stderr, ">> "); bw(foq.first->extp, size); #endif reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); fob = foq.first; obufsize += size_obuf(fob); foq.first = foq.first->next; free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) break; } while (foq.first && !preempt); if (!foq.first) foq.last = NULL; if (preempt) goto preempted; } if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) { if (oq.first) { ErtsDistOutputBuf *ob; int preempt; finalize_only: preempt = 0; ob = oq.first; ASSERT(ob); do { erts_encode_ext_dist_header_finalize(ob, dep->cache, flags); ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; preempt = reds > reds_limit; if (preempt) break; ob = ob->next; } while (ob); /* * At least one buffer was finalized; if we got preempted, * ob points to the last buffer that we finalized. */ if (foq.last) foq.last->next = oq.first; else foq.first = oq.first; if (!preempt) { /* All buffers finalized */ foq.last = oq.last; oq.first = oq.last = NULL; } else { /* Not all buffers finalized; split oq. */ foq.last = ob; oq.first = ob->next; if (oq.first) ob->next = NULL; else oq.last = NULL; } if (preempt) goto preempted; } } else { int de_busy; int preempt = 0; while (oq.first && !preempt) { ErtsDistOutputBuf *fob; Uint size; erts_encode_ext_dist_header_finalize(oq.first, dep->cache, flags); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; ASSERT(&oq.first->data[0] <= oq.first->extp && oq.first->extp < oq.first->ext_endp); size = (*send)(prt, oq.first); erts_atomic64_inc_nob(&dep->out); esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG erts_fprintf(stderr, ">> "); bw(oq.first->extp, size); #endif reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); fob = oq.first; obufsize += size_obuf(fob); oq.first = oq.first->next; free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt) goto finalize_only; } ASSERT(!oq.first || preempt); /* * Preempt if not all buffers have been handled. */ if (preempt && oq.first) goto preempted; #ifdef DEBUG oq.last = NULL; #endif ASSERT(!oq.first); ASSERT(!foq.first && !foq.last); /* * Everything that was buffered when we started have now been * written to the port. If port isn't busy but dist entry is * and we havn't got too muched queued on dist entry, set * dist entry in a non-busy state and resume suspended * processes. */ erts_mtx_lock(&dep->qlock); de_busy = !!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_BUSY); qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) -obufsize); ASSERT(qsize >= 0); obufsize = 0; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && de_busy && qsize < erts_dist_buf_busy_limit) { ErtsProcList *suspendees; int resumed; suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); erts_mtx_unlock(&dep->qlock); resumed = erts_resume_processes(suspendees); reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } else erts_mtx_unlock(&dep->qlock); } ASSERT(!oq.first && !oq.last); done: if (obufsize != 0) { ASSERT(obufsize > 0); erts_mtx_lock(&dep->qlock); #ifdef DEBUG qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) -obufsize); ASSERT(qsize >= 0); #else erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize); #endif erts_mtx_unlock(&dep->qlock); } ASSERT(foq.first || !foq.last); ASSERT(!foq.first || foq.last); ASSERT(!dep->finalized_out_queue.first); ASSERT(!dep->finalized_out_queue.last); if (foq.first) { dep->finalized_out_queue.first = foq.first; dep->finalized_out_queue.last = foq.last; } /* Avoid wrapping reduction counter... */ if (reds > INT_MAX/2) reds = INT_MAX/2; return reds; preempted: /* * Here we assume that state has been read * since last call to driver. */ ASSERT(oq.first || !oq.last); ASSERT(!oq.first || oq.last); if (sched_flags & ERTS_PTS_FLG_EXIT) { /* * Port died during port command; clean up 'oq' * and 'foq'. Things buffered in dist entry after * we begun processing the queues have already been * cleaned up when port terminated. */ if (oq.first) oq.last->next = foq.first; else oq.first = foq.first; while (oq.first) { ErtsDistOutputBuf *fob = oq.first; oq.first = oq.first->next; obufsize += size_obuf(fob); free_dist_obuf(fob); } foq.first = NULL; foq.last = NULL; /* SVERK Hmmm.... #ifdef DEBUG erts_mtx_lock(&dep->qlock); ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); erts_mtx_unlock(&dep->qlock); #endif */ } else { if (oq.first) { /* * Unhandle buffers need to be put back first * in out_queue. */ erts_mtx_lock(&dep->qlock); erts_atomic_add_nob(&dep->qsize, -obufsize); obufsize = 0; oq.last->next = dep->out_queue.first; dep->out_queue.first = oq.first; if (!dep->out_queue.last) dep->out_queue.last = oq.last; erts_mtx_unlock(&dep->qlock); } erts_schedule_dist_command(prt, NULL); } goto done; } #if 0 int dist_data_finalize(Process *c_p, int reds_limit) { int reds = 5; DistEntry *dep = ; ErtsDistOutputQueue oq, foq; ErtsDistOutputBuf *ob; int preempt; erts_mtx_lock(&dep->qlock); flags = dep->flags; oq.first = dep->out_queue.first; oq.last = dep->out_queue.last; dep->out_queue.first = NULL; dep->out_queue.last = NULL; erts_mtx_unlock(&dep->qlock); if (!oq.first) { ASSERT(!oq.last); oq.first = dep->tmp_out_queue.first; oq.last = dep->tmp_out_queue.last; } else { ErtsDistOutputBuf *f, *l; ASSERT(oq.last); if (dep->tmp_out_queue.last) { dep->tmp_out_queue.last->next = oq.first; oq.first = dep->tmp_out_queue.first; } } if (!oq.first) { /* Nothing to do... */ ASSERT(!oq.last); return reds; } foq.first = dep->finalized_out_queue.first; foq.last = dep->finalized_out_queue.last; preempt = 0; ob = oq.first; ASSERT(ob); do { ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, dep->cache, flags); if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' needed */ ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; preempt = reds > reds_limit; if (preempt) break; ob = ob->next; } while (ob); /* * At least one buffer was finalized; if we got preempted, * ob points to the last buffer that we finalized. */ if (foq.last) foq.last->next = oq.first; else foq.first = oq.first; if (!preempt) { /* All buffers finalized */ foq.last = oq.last; oq.first = oq.last = NULL; } else { /* Not all buffers finalized; split oq. */ foq.last = ob; oq.first = ob->next; if (oq.first) ob->next = NULL; else oq.last = NULL; } dep->finalized_out_queue.first = foq.first; dep->finalized_out_queue.last = foq.last; dep->tmp_out_queue.first = oq.first; dep->tmp_out_queue.last = oq.last; return reds; } #endif BIF_RETTYPE dist_ctrl_get_data_notification_1(BIF_ALIST_1) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); erts_aint32_t qflgs; erts_aint_t qsize; Eterm receiver = NIL; if (!dep) BIF_ERROR(BIF_P, EXC_NOTSUP); if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) BIF_ERROR(BIF_P, BADARG); /* * Caller is the only one that can consume from this queue * and the only one that can set the req-info flag... */ erts_de_rlock(dep); ASSERT(dep->cid == BIF_P->common.id); qflgs = erts_atomic32_read_acqb(&dep->qflgs); if (!(qflgs & ERTS_DE_QFLG_REQ_INFO)) { qsize = erts_atomic_read_acqb(&dep->qsize); ASSERT(qsize >= 0); if (qsize > 0) receiver = BIF_P->common.id; /* Notify ourselves... */ else { /* Empty queue; set req-info flag... */ qflgs = erts_atomic32_read_bor_mb(&dep->qflgs, ERTS_DE_QFLG_REQ_INFO); qsize = erts_atomic_read_acqb(&dep->qsize); ASSERT(qsize >= 0); if (qsize > 0) { qflgs = erts_atomic32_read_band_mb(&dep->qflgs, ~ERTS_DE_QFLG_REQ_INFO); if (qflgs & ERTS_DE_QFLG_REQ_INFO) receiver = BIF_P->common.id; /* Notify ourselves... */ /* else: someone else will notify us... */ } /* else: still empty queue... */ } } /* else: Already requested... */ erts_de_runlock(dep); if (is_internal_pid(receiver)) notify_dist_data(BIF_P, receiver); BIF_RET(am_ok); } BIF_RETTYPE dist_ctrl_put_data_2(BIF_ALIST_2) { DistEntry *dep; ErlDrvSizeT size; Eterm input_handler; if (is_binary(BIF_ARG_2)) size = binary_size(BIF_ARG_2); else if (is_nil(BIF_ARG_2)) size = 0; else if (is_list(BIF_ARG_2)) BIF_TRAP2(dist_ctrl_put_data_trap, BIF_P, BIF_ARG_1, BIF_ARG_2); else BIF_ERROR(BIF_P, BADARG); dep = erts_dhandle_to_dist_entry(BIF_ARG_1); if (!dep) BIF_ERROR(BIF_P, BADARG); input_handler = (Eterm) erts_atomic_read_nob(&dep->input_handler); if (input_handler != BIF_P->common.id) BIF_ERROR(BIF_P, EXC_NOTSUP); erts_atomic64_inc_nob(&dep->in); if (size != 0) { byte *data, *temp_alloc = NULL; data = (byte *) erts_get_aligned_binary_bytes(BIF_ARG_2, &temp_alloc); if (!data) BIF_ERROR(BIF_P, BADARG); erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); (void) erts_net_message(NULL, dep, NULL, 0, data, size); /* * We ignore any decode failures. On fatal failures the * connection will be taken down by killing the * distribution channel controller... */ erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); BUMP_REDS(BIF_P, 5); erts_free_aligned_binary_bytes(temp_alloc); } BIF_RET(am_ok); } BIF_RETTYPE dist_get_stat_1(BIF_ALIST_1) { Sint64 read, write, pend; Eterm res, *hp, **hpp; Uint sz, *szp; DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1); if (!dep) BIF_ERROR(BIF_P, BADARG); erts_de_rlock(dep); read = (Sint64) erts_atomic64_read_nob(&dep->in); write = (Sint64) erts_atomic64_read_nob(&dep->out); pend = (Sint64) erts_atomic_read_nob(&dep->qsize); erts_de_runlock(dep); sz = 0; szp = &sz; hpp = NULL; while (1) { res = erts_bld_tuple(hpp, szp, 4, am_ok, erts_bld_sint64(hpp, szp, read), erts_bld_sint64(hpp, szp, write), pend ? am_true : am_false); if (hpp) break; hp = HAlloc(BIF_P, sz); hpp = &hp; szp = NULL; } BIF_RET(res); } BIF_RETTYPE dist_ctrl_input_handler_2(BIF_ALIST_2) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); if (!dep) BIF_ERROR(BIF_P, EXC_NOTSUP); if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) BIF_ERROR(BIF_P, BADARG); if (is_not_internal_pid(BIF_ARG_2)) BIF_ERROR(BIF_P, BADARG); erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) BIF_ARG_2); BIF_RET(am_ok); } BIF_RETTYPE dist_ctrl_get_data_1(BIF_ALIST_1) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); int reds = 1; ErtsDistOutputBuf *obuf; Eterm *hp; ProcBin *pb; erts_aint_t qsize; if (!dep) BIF_ERROR(BIF_P, EXC_NOTSUP); if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) BIF_ERROR(BIF_P, BADARG); erts_de_rlock(dep); if (dep->status & ERTS_DE_SFLG_EXITING) goto return_none; ASSERT(dep->cid == BIF_P->common.id); #if 0 if (dep->finalized_out_queue.first) { obuf = dep->finalized_out_queue.first; dep->finalized_out_queue.first = obuf->next; if (!obuf->next) dep->finalized_out_queue.last = NULL; } else #endif { if (!dep->tmp_out_queue.first) { ASSERT(!dep->tmp_out_queue.last); qsize = erts_atomic_read_acqb(&dep->qsize); if (qsize > 0) { erts_mtx_lock(&dep->qlock); dep->tmp_out_queue.first = dep->out_queue.first; dep->tmp_out_queue.last = dep->out_queue.last; dep->out_queue.first = NULL; dep->out_queue.last = NULL; erts_mtx_unlock(&dep->qlock); } } if (!dep->tmp_out_queue.first) { ASSERT(!dep->tmp_out_queue.last); return_none: erts_de_runlock(dep); BIF_RET(am_none); } else { obuf = dep->tmp_out_queue.first; dep->tmp_out_queue.first = obuf->next; if (!obuf->next) dep->tmp_out_queue.last = NULL; } obuf->extp = erts_encode_ext_dist_header_finalize(obuf->extp, dep->cache, dep->flags); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)) *--obuf->extp = PASS_THROUGH; /* 'pass through' needed */ ASSERT(&obuf->data[0] <= obuf->extp && obuf->extp < obuf->ext_endp); } erts_atomic64_inc_nob(&dep->out); erts_de_runlock(dep); hp = HAlloc(BIF_P, PROC_BIN_SIZE); pb = (ProcBin *) (char *) hp; pb->thing_word = HEADER_PROC_BIN; pb->size = obuf->ext_endp - obuf->extp; pb->next = MSO(BIF_P).first; MSO(BIF_P).first = (struct erl_off_heap_header*) pb; pb->val = ErtsDistOutputBuf2Binary(obuf); pb->bytes = (byte*) obuf->extp; pb->flags = 0; qsize = erts_atomic_add_read_nob(&dep->qsize, -size_obuf(obuf)); ASSERT(qsize >= 0); if (qsize < erts_dist_buf_busy_limit/2 && (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY)) { ErtsProcList *resume_procs = NULL; erts_mtx_lock(&dep->qlock); resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); erts_mtx_unlock(&dep->qlock); if (resume_procs) { int resumed = erts_resume_processes(resume_procs); reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } } BIF_RET2(make_binary(pb), reds); } void erts_dist_port_not_busy(Port *prt) { #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_port_not_busy)) { DTRACE_CHARBUF(port_str, 64); DTRACE_CHARBUF(remote_str, 64); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id); erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)), "%T", prt->dist_entry->sysname); DTRACE3(dist_port_not_busy, erts_this_node_sysname, port_str, remote_str); } #endif erts_schedule_dist_command(prt, NULL); } void erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) { erts_de_rwlock(dep); if (connection_id == dep->connection_id && !(dep->status & ERTS_DE_SFLG_EXITING)) { dep->status |= ERTS_DE_SFLG_EXITING; erts_mtx_lock(&dep->qlock); ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); erts_mtx_unlock(&dep->qlock); if (is_internal_port(dep->cid)) erts_schedule_dist_command(NULL, dep); else if (is_internal_pid(dep->cid)) schedule_kill_dist_ctrl_proc(dep->cid); } erts_de_rwunlock(dep); } struct print_to_data { fmtfn_t to; void *arg; }; static void doit_print_monitor_info(ErtsMonitor *mon, void *vptdp) { fmtfn_t to = ((struct print_to_data *) vptdp)->to; void *arg = ((struct print_to_data *) vptdp)->arg; Process *rp; ErtsMonitor *rmon; rp = erts_proc_lookup(mon->u.pid); if (!rp || (rmon = erts_lookup_monitor(ERTS_P_MONITORS(rp), mon->ref)) == NULL) { erts_print(to, arg, "Warning, stray monitor for: %T\n", mon->u.pid); } else if (mon->type == MON_ORIGIN) { /* Local pid is being monitored */ erts_print(to, arg, "Remotely monitored by: %T %T\n", mon->u.pid, rmon->u.pid); } else { erts_print(to, arg, "Remote monitoring: %T ", mon->u.pid); if (is_not_atom(rmon->u.pid)) erts_print(to, arg, "%T\n", rmon->u.pid); else erts_print(to, arg, "{%T, %T}\n", rmon->name, rmon->u.pid); /* which in this case is the remote system name... */ } } static void print_monitor_info(fmtfn_t to, void *arg, ErtsMonitor *mon) { struct print_to_data ptd = {to, arg}; erts_doforall_monitors(mon,&doit_print_monitor_info,&ptd); } typedef struct { struct print_to_data *ptdp; Eterm from; } PrintLinkContext; static void doit_print_link_info2(ErtsLink *lnk, void *vpplc) { PrintLinkContext *pplc = (PrintLinkContext *) vpplc; erts_print(pplc->ptdp->to, pplc->ptdp->arg, "Remote link: %T %T\n", pplc->from, lnk->pid); } static void doit_print_link_info(ErtsLink *lnk, void *vptdp) { if (is_internal_pid(lnk->pid) && erts_proc_lookup(lnk->pid)) { PrintLinkContext plc = {(struct print_to_data *) vptdp, lnk->pid}; erts_doforall_links(ERTS_LINK_ROOT(lnk), &doit_print_link_info2, &plc); } } static void print_link_info(fmtfn_t to, void *arg, ErtsLink *lnk) { struct print_to_data ptd = {to, arg}; erts_doforall_links(lnk, &doit_print_link_info, (void *) &ptd); } typedef struct { struct print_to_data ptd; Eterm sysname; } PrintNodeLinkContext; static void doit_print_nodelink_info(ErtsLink *lnk, void *vpcontext) { PrintNodeLinkContext *pcontext = vpcontext; if (is_internal_pid(lnk->pid) && erts_proc_lookup(lnk->pid)) erts_print(pcontext->ptd.to, pcontext->ptd.arg, "Remote monitoring: %T %T\n", lnk->pid, pcontext->sysname); } static void print_nodelink_info(fmtfn_t to, void *arg, ErtsLink *lnk, Eterm sysname) { PrintNodeLinkContext context = {{to, arg}, sysname}; erts_doforall_links(lnk, &doit_print_nodelink_info, &context); } static int info_dist_entry(fmtfn_t to, void *arg, DistEntry *dep, int visible, int connected) { if (visible && connected) { erts_print(to, arg, "=visible_node:"); } else if (connected) { erts_print(to, arg, "=hidden_node:"); } else { erts_print(to, arg, "=not_connected:"); } erts_print(to, arg, "%d\n", dist_entry_channel_no(dep)); if(connected && is_nil(dep->cid)) { erts_print(to, arg, "Error: Not connected node still registered as connected:%T\n", dep->sysname); return 0; } if(!connected && is_not_nil(dep->cid)) { erts_print(to, arg, "Error: Connected node not registered as connected:%T\n", dep->sysname); return 0; } erts_print(to, arg, "Name: %T", dep->sysname); erts_print(to, arg, "\n"); if (!connected && is_nil(dep->cid)) { if (dep->nlinks) { erts_print(to, arg, "Error: Got links to not connected node:%T\n", dep->sysname); } return 0; } erts_print(to, arg, "Controller: %T\n", dep->cid, to); erts_print_node_info(to, arg, dep->sysname, NULL, NULL); print_monitor_info(to, arg, dep->monitors); print_link_info(to, arg, dep->nlinks); print_nodelink_info(to, arg, dep->node_links, dep->sysname); return 0; } int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ { DistEntry *dep; erts_print(to, arg, "=node:%T\n", erts_this_dist_entry->sysname); if (erts_this_node->sysname == am_Noname) { erts_print(to, arg, "=no_distribution\n"); return(0); } #if 0 if (!erts_visible_dist_entries && !erts_hidden_dist_entries) erts_print(to, arg, "Alive but not holding any connections \n"); #endif for(dep = erts_visible_dist_entries; dep; dep = dep->next) { info_dist_entry(to, arg, dep, 1, 1); } for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { info_dist_entry(to, arg, dep, 0, 1); } for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) { if (dep != erts_this_dist_entry) { info_dist_entry(to, arg, dep, 0, 0); } } return(0); } /**************************************************************************** DISTRIBUTION BIFS: setnode/2 -- start distribution setnode/3 -- set node controller node/1 -- return objects node name node/0 -- return this node name nodes/0 -- return a list of all (non hidden) nodes is_alive -- return true if distribution is running else false monitor_node -- turn on/off node monitoring node controller only: dist_link/2 -- link a remote process to a local dist_unlink/2 -- unlink a remote from a local ****************************************************************************/ /********************************************************************** ** Set the node name of current node fail if node already is set. ** setnode(name@host, Creation) ***********************************************************************/ BIF_RETTYPE setnode_2(BIF_ALIST_2) { Process *net_kernel; Uint creation; /* valid creation ? */ if(!term_to_Uint(BIF_ARG_2, &creation)) goto error; if(creation > 3) goto error; /* valid node name ? */ if (!is_node_name_atom(BIF_ARG_1)) goto error; if (BIF_ARG_1 == am_Noname) /* cant use this name !! */ goto error; if (erts_is_alive) /* must not be alive! */ goto error; /* Check that all trap functions are defined !! */ if (dmonitor_node_trap->addressv[0] == NULL || dmonitor_p_trap->addressv[0] == NULL) { goto error; } net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN, am_net_kernel, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS, 0); if (!net_kernel || ERTS_PROC_GET_DIST_ENTRY(net_kernel)) goto error; /* By setting F_DISTRIBUTION on net_kernel, * erts_do_net_exits will be called when net_kernel is terminated !! */ net_kernel->flags |= F_DISTRIBUTION; erts_proc_unlock(net_kernel, (ERTS_PROC_LOCK_STATUS | ((net_kernel != BIF_P) ? ERTS_PROC_LOCK_MAIN : 0))); #ifdef DEBUG erts_rwmtx_rlock(&erts_dist_table_rwmtx); ASSERT(!erts_visible_dist_entries && !erts_hidden_dist_entries); erts_rwmtx_runlock(&erts_dist_table_rwmtx); #endif erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); erts_thr_progress_block(); inc_no_nodes(); erts_set_this_node(BIF_ARG_1, (Uint32) creation); erts_is_alive = 1; send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, am_visible, NIL); erts_thr_progress_unblock(); erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); /* * Note erts_this_dist_entry is changed by erts_set_this_node(), * so we *need* to use the new one after erts_set_this_node() * is called. */ erts_ref_dist_entry(erts_this_dist_entry); ERTS_PROC_SET_DIST_ENTRY(net_kernel, erts_this_dist_entry); BIF_RET(am_true); error: BIF_ERROR(BIF_P, BADARG); } /********************************************************************** ** Allocate a dist entry, set node name install the connection handler ** setnode_3({name@host, Creation}, Cid, {Type, Version, Initial, IC, OC}) ** Type = flag field, where the flags are specified in dist.h ** Version = distribution version, >= 1 ** IC = in_cookie (ignored) ** OC = out_cookie (ignored) ** ** Note that in distribution protocols above 1, the Initial parameter ** is always NIL and the cookies are always the atom '', cookies are not ** sent in the distribution messages but are only used in ** the handshake. ** ***********************************************************************/ BIF_RETTYPE setnode_3(BIF_ALIST_3) { BIF_RETTYPE ret; Uint flags; unsigned long version; Eterm ic, oc; Eterm *tp; DistEntry *dep = NULL; ErtsProcLocks proc_unlock = 0; Process *proc; Port *pp = NULL; Eterm notify_proc; erts_aint32_t qflgs; /* * Check and pick out arguments */ if (!is_node_name_atom(BIF_ARG_1) || !(is_internal_port(BIF_ARG_2) || is_internal_pid(BIF_ARG_2)) || (erts_this_node->sysname == am_Noname)) { goto badarg; } if (!is_tuple(BIF_ARG_3)) goto badarg; tp = tuple_val(BIF_ARG_3); if (*tp++ != make_arityval(4)) goto badarg; if (!is_small(*tp)) goto badarg; flags = unsigned_val(*tp++); if (!is_small(*tp) || (version = unsigned_val(*tp)) == 0) goto badarg; ic = *(++tp); oc = *(++tp); if (!is_atom(ic) || !is_atom(oc)) goto badarg; if (~flags & DFLAG_DIST_MANDATORY) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "%T", BIF_P->common.id); if (BIF_P->common.u.alive.reg) erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name); erts_dsprintf(dsbufp, " attempted to enable connection to node %T " "which does not support all mandatory capabilities.\n", BIF_ARG_1); erts_send_error_to_logger(BIF_P->group_leader, dsbufp); goto badarg; } /* * Arguments seem to be in order. */ /* get dist_entry */ dep = erts_find_or_insert_dist_entry(BIF_ARG_1); if (dep == erts_this_dist_entry) goto badarg; else if (!dep) goto system_limit; /* Should never happen!!! */ if (is_internal_pid(BIF_ARG_2)) { if (BIF_P->common.id == BIF_ARG_2) { proc_unlock = 0; proc = BIF_P; } else { proc_unlock = ERTS_PROC_LOCK_MAIN; proc = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN, BIF_ARG_2, proc_unlock); } erts_de_rwlock(dep); if (!proc) goto badarg; else if (proc == ERTS_PROC_LOCK_BUSY) { proc_unlock = 0; goto yield; } erts_proc_lock(proc, ERTS_PROC_LOCK_STATUS); proc_unlock |= ERTS_PROC_LOCK_STATUS; if (ERTS_PROC_GET_DIST_ENTRY(proc)) { if (dep == ERTS_PROC_GET_DIST_ENTRY(proc) && (proc->flags & F_DISTRIBUTION) && dep->cid == BIF_ARG_2) { ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); goto done; } goto badarg; } if (is_not_nil(dep->cid)) goto badarg; proc->flags |= F_DISTRIBUTION; ERTS_PROC_SET_DIST_ENTRY(proc, dep); proc_unlock &= ~ERTS_PROC_LOCK_STATUS; erts_proc_unlock(proc, ERTS_PROC_LOCK_STATUS); dep->send = NULL; /* Only for distr ports... */ } else { pp = erts_id2port_sflgs(BIF_ARG_2, BIF_P, ERTS_PROC_LOCK_MAIN, ERTS_PORT_SFLGS_INVALID_LOOKUP); erts_de_rwlock(dep); if (!pp || (erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_EXITING)) goto badarg; if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) goto badarg; if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) { ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); goto done; /* Already set */ } if (dep->status & ERTS_DE_SFLG_EXITING) { /* Suspend on dist entry waiting for the exit to finish */ ErtsProcList *plp = erts_proclist_create(BIF_P); plp->next = NULL; erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); erts_mtx_lock(&dep->qlock); erts_proclist_store_last(&dep->suspended, plp); erts_mtx_unlock(&dep->qlock); goto yield; } ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); if (pp->dist_entry || is_not_nil(dep->cid)) goto badarg; erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); pp->dist_entry = dep; ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); dep->send = (pp->drv_ptr->outputv ? dist_port_commandv : dist_port_command); ASSERT(dep->send); /* * Dist-ports do not use the "busy port message queue" functionality, but * instead use "busy dist entry" functionality. */ { ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); } } dep->version = version; dep->creation = 0; #ifdef DEBUG ASSERT(erts_atomic_read_nob(&dep->qsize) == 0 || (dep->status & ERTS_DE_SFLG_PENDING)); #endif if (flags & DFLAG_DIST_HDR_ATOM_CACHE) create_cache(dep); erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); notify_proc = NIL; if (erts_atomic_read_nob(&dep->qsize)) { if (is_internal_port(dep->cid)) { erts_schedule_dist_command(NULL, dep); } else { qflgs = erts_atomic32_read_nob(&dep->qflgs); if (qflgs & ERTS_DE_QFLG_REQ_INFO) { qflgs = erts_atomic32_read_band_mb(&dep->qflgs, ~ERTS_DE_QFLG_REQ_INFO); if (qflgs & ERTS_DE_QFLG_REQ_INFO) { notify_proc = dep->cid; ASSERT(is_internal_pid(notify_proc)); } } } } erts_de_rwunlock(dep); if (is_internal_pid(notify_proc)) notify_dist_data(BIF_P, notify_proc); ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); dep = NULL; /* inc of refc transferred to port (dist_entry field) */ inc_no_nodes(); send_nodes_mon_msgs(BIF_P, am_nodeup, BIF_ARG_1, flags & DFLAG_PUBLISHED ? am_visible : am_hidden, NIL); done: if (dep && dep != erts_this_dist_entry) { erts_de_rwunlock(dep); erts_deref_dist_entry(dep); } if (pp) erts_port_release(pp); if (proc_unlock) erts_proc_unlock(proc, proc_unlock); return ret; yield: ERTS_BIF_PREP_YIELD3(ret, bif_export[BIF_setnode_3], BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3); goto done; badarg: ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG); goto done; system_limit: ERTS_BIF_PREP_ERROR(ret, BIF_P, SYSTEM_LIMIT); goto done; } BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) { DistEntry* dep; Uint32 conn_id; if (is_not_atom(BIF_ARG_1)) { BIF_ERROR(BIF_P, BADARG); } dep = erts_find_or_insert_dist_entry(BIF_ARG_1); ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ erts_de_rwlock(dep); if (ERTS_DE_IS_CONNECTED(dep) || dep->status & ERTS_DE_SFLG_PENDING) conn_id = dep->connection_id; else if (dep->status == 0) { dep->status = ERTS_DE_SFLG_PENDING; dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION; dep->connection_id++; dep->connection_id &= ERTS_DIST_CON_ID_MASK; conn_id = dep->connection_id; } else { ASSERT(!"SVERK: What to do?"); conn_id = dep->connection_id; } erts_de_rwunlock(dep); BIF_RET(make_small(conn_id)); } BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) { DistEntry* dep; if (is_not_atom(BIF_ARG_1) || is_not_small(BIF_ARG_2)) { BIF_ERROR(BIF_P, BADARG); } dep = erts_find_dist_entry(BIF_ARG_1); ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ if (!dep) { BIF_RET(am_false); } erts_de_rwlock(dep); if (dep->status == ERTS_DE_SFLG_PENDING && dep->connection_id == unsigned_val(BIF_ARG_2)) { NetExitsContext nec = {dep}; ErtsLink *nlinks; ErtsLink *node_links; ErtsMonitor *monitors; ErtsAtomCache *cache; ErtsDistOutputBuf *obuf; ErtsProcList *resume_procs; Sint reds = 0; ASSERT(is_nil(dep->cid)); erts_de_links_lock(dep); monitors = dep->monitors; nlinks = dep->nlinks; node_links = dep->node_links; dep->monitors = NULL; dep->nlinks = NULL; dep->node_links = NULL; erts_de_links_unlock(dep); cache = dep->cache; dep->cache = NULL; dep->status = 0; dep->flags = 0; erts_mtx_lock(&dep->qlock); obuf = dep->out_queue.first; dep->out_queue.first = NULL; dep->out_queue.last = NULL; ASSERT(!dep->tmp_out_queue.first); ASSERT(!dep->finalized_out_queue.first); resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); erts_mtx_unlock(&dep->qlock); erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); dep->send = NULL; erts_de_rwunlock(dep); erts_sweep_monitors(monitors, &doit_monitor_net_exits, &nec); erts_sweep_links(nlinks, &doit_link_net_exits, &nec); erts_sweep_links(node_links, &doit_node_link_net_exits, &nec); if (resume_procs) { int resumed = erts_resume_processes(resume_procs); reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } delete_cache(cache); free_de_out_queues(dep, obuf); BIF_RET2(am_true, reds); } erts_de_rwunlock(dep); BIF_RET(am_false); } /**********************************************************************/ /* node(Object) -> Node */ BIF_RETTYPE node_1(BIF_ALIST_1) { if (is_not_node_container(BIF_ARG_1)) BIF_ERROR(BIF_P, BADARG); BIF_RET(node_container_node_name(BIF_ARG_1)); } /**********************************************************************/ /* node() -> Node */ BIF_RETTYPE node_0(BIF_ALIST_0) { BIF_RET(erts_this_dist_entry->sysname); } /**********************************************************************/ /* nodes() -> [ Node ] */ #if 0 /* Done in erlang.erl instead. */ BIF_RETTYPE nodes_0(BIF_ALIST_0) { return nodes_1(BIF_P, am_visible); } #endif BIF_RETTYPE nodes_1(BIF_ALIST_1) { Eterm result; int length; Eterm* hp; int not_connected = 0; int visible = 0; int hidden = 0; int this = 0; DeclareTmpHeap(buf,2,BIF_P); /* For one cons-cell */ DistEntry *dep; Eterm arg_list = BIF_ARG_1; #ifdef DEBUG Eterm* endp; #endif UseTmpHeap(2,BIF_P); if (is_atom(BIF_ARG_1)) arg_list = CONS(buf, BIF_ARG_1, NIL); while (is_list(arg_list)) { switch(CAR(list_val(arg_list))) { case am_visible: visible = 1; break; case am_hidden: hidden = 1; break; case am_known: visible = hidden = not_connected = this = 1; break; case am_this: this = 1; break; case am_connected: visible = hidden = 1; break; default: goto error; break; } arg_list = CDR(list_val(arg_list)); } if (is_not_nil(arg_list)) { goto error; } length = 0; erts_rwmtx_rlock(&erts_dist_table_rwmtx); ASSERT(erts_no_of_not_connected_dist_entries > 0); ASSERT(erts_no_of_hidden_dist_entries >= 0); ASSERT(erts_no_of_visible_dist_entries >= 0); if(not_connected) length += (erts_no_of_not_connected_dist_entries - 1); if(hidden) length += erts_no_of_hidden_dist_entries; if(visible) length += erts_no_of_visible_dist_entries; if(this) length++; result = NIL; if (length == 0) { erts_rwmtx_runlock(&erts_dist_table_rwmtx); goto done; } hp = HAlloc(BIF_P, 2*length); #ifdef DEBUG endp = hp + length*2; #endif if(not_connected) for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { if (dep != erts_this_dist_entry) { result = CONS(hp, dep->sysname, result); hp += 2; } } if(hidden) for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { result = CONS(hp, dep->sysname, result); hp += 2; } if(visible) for(dep = erts_visible_dist_entries; dep; dep = dep->next) { result = CONS(hp, dep->sysname, result); hp += 2; } if(this) { result = CONS(hp, erts_this_dist_entry->sysname, result); hp += 2; } ASSERT(endp == hp); erts_rwmtx_runlock(&erts_dist_table_rwmtx); done: UnUseTmpHeap(2,BIF_P); BIF_RET(result); error: UnUseTmpHeap(2,BIF_P); BIF_ERROR(BIF_P,BADARG); } /**********************************************************************/ /* is_alive() -> Bool */ BIF_RETTYPE is_alive_0(BIF_ALIST_0) { Eterm res = erts_is_alive ? am_true : am_false; BIF_RET(res); } /**********************************************************************/ /* erlang:monitor_node(Node, Bool, Options) -> Bool */ static BIF_RETTYPE monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) { DistEntry *dep; ErtsLink *lnk; Eterm l; int async_connect = 1; for (l = Options; l != NIL && is_list(l); l = CDR(list_val(l))) { Eterm t = CAR(list_val(l)); if (t == am_allow_passive_connect) { /* * Handle this horrible feature by falling back on old synchronous * auto-connect (if needed) */ async_connect = 0; } else { BIF_ERROR(p, BADARG); } } if (l != NIL) { BIF_ERROR(p, BADARG); } if (is_not_atom(Node) || ((Bool != am_true) && (Bool != am_false)) || ((erts_this_node->sysname == am_Noname) && (Node != erts_this_node->sysname))) { BIF_ERROR(p, BADARG); } if (Bool == am_true) { ErtsDSigData dsd; dsd.node = Node; dep = erts_find_or_insert_dist_entry(Node); if (dep == erts_this_dist_entry) goto done; erts_proc_lock(p, ERTS_PROC_LOCK_LINK); switch (erts_dsig_prepare(&dsd, &dep, p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), ERTS_DSP_RLOCK, 0, async_connect)) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: /* Trap to either send 'nodedown' or do passive connection attempt */ trap: erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); erts_deref_dist_entry(dep); BIF_TRAP3(dmonitor_node_trap, p, Node, Bool, Options); case ERTS_DSIG_PREP_PENDING: if (!async_connect) { /* * Pending connection may fail, so we must trap * to ensure passive connection attempt */ erts_de_runlock(dep); goto trap; } /*fall through*/ case ERTS_DSIG_PREP_CONNECTED: erts_de_links_lock(dep); erts_de_runlock(dep); lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, p->common.id); ++ERTS_LINK_REFC(lnk); lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); ++ERTS_LINK_REFC(lnk); break; default: ERTS_ASSERT(! "Invalid dsig prepare result"); } } else { /* Bool == false */ dep = erts_sysname_to_connected_dist_entry(Node); if (!dep) { /* * Before OTP-21 this case triggered auto-connect * and a 'nodedown' message if that failed. * Now it's a simple no-op which feels more reasonable. */ BIF_RET(am_true); } if (dep == erts_this_dist_entry) goto done; erts_proc_lock(p, ERTS_PROC_LOCK_LINK); erts_de_rlock(dep); if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED))) { erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); erts_de_runlock(dep); goto done; } erts_de_links_lock(dep); erts_de_runlock(dep); lnk = erts_lookup_link(dep->node_links, p->common.id); if (lnk != NULL) { if ((--ERTS_LINK_REFC(lnk)) == 0) { erts_destroy_link(erts_remove_link(&(dep->node_links), p->common.id)); } } lnk = erts_lookup_link(ERTS_P_LINKS(p), Node); if (lnk != NULL) { if ((--ERTS_LINK_REFC(lnk)) == 0) { erts_destroy_link(erts_remove_link(&ERTS_P_LINKS(p), Node)); } } } erts_de_links_unlock(dep); erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); done: BIF_RET(am_true); } BIF_RETTYPE monitor_node_3(BIF_ALIST_3) { BIF_RET(monitor_node(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3)); } /* monitor_node(Node, Bool) -> Bool */ BIF_RETTYPE monitor_node_2(BIF_ALIST_2) { BIF_RET(monitor_node(BIF_P, BIF_ARG_1, BIF_ARG_2, NIL)); } BIF_RETTYPE net_kernel_dflag_unicode_io_1(BIF_ALIST_1) { DistEntry *de; Uint32 f; if (is_not_pid(BIF_ARG_1)) { BIF_ERROR(BIF_P,BADARG); } de = pid_dist_entry(BIF_ARG_1); ASSERT(de != NULL); if (de == erts_this_dist_entry) { BIF_RET(am_true); } erts_de_rlock(de); f = de->flags; erts_de_runlock(de); BIF_RET(((f & DFLAG_UNICODE_IO) ? am_true : am_false)); } /* * The major part of the implementation of net_kernel:monitor_nodes/[1,2] * follows. * * Currently net_kernel:monitor_nodes/[1,2] calls process_flag/2 which in * turn calls erts_monitor_nodes(). If the process_flag() call fails (with * badarg), the code in net_kernel determines what type of error to return. * This in order to simplify the task of being backward compatible. */ #define ERTS_NODES_MON_OPT_TYPE_VISIBLE (((Uint16) 1) << 0) #define ERTS_NODES_MON_OPT_TYPE_HIDDEN (((Uint16) 1) << 1) #define ERTS_NODES_MON_OPT_DOWN_REASON (((Uint16) 1) << 2) #define ERTS_NODES_MON_OPT_TYPES \ (ERTS_NODES_MON_OPT_TYPE_VISIBLE|ERTS_NODES_MON_OPT_TYPE_HIDDEN) typedef struct ErtsNodesMonitor_ ErtsNodesMonitor; struct ErtsNodesMonitor_ { ErtsNodesMonitor *prev; ErtsNodesMonitor *next; Process *proc; Uint16 opts; Uint16 no; }; static erts_mtx_t nodes_monitors_mtx; static ErtsNodesMonitor *nodes_monitors; static ErtsNodesMonitor *nodes_monitors_end; /* * Nodes monitors are stored in a double linked list. 'nodes_monitors' * points to the beginning of the list and 'nodes_monitors_end' points * to the end of the list. * * There might be more than one entry per process in the list. If so, * they are located in sequence. The 'nodes_monitors' field of the * process struct refers to the first element in the sequence * corresponding to the process in question. */ static void init_nodes_monitors(void) { erts_mtx_init(&nodes_monitors_mtx, "nodes_monitors", NIL, ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION); nodes_monitors = NULL; nodes_monitors_end = NULL; } static ERTS_INLINE Uint nodes_mon_msg_sz(ErtsNodesMonitor *nmp, Eterm what, Eterm reason) { Uint sz; if (!nmp->opts) { sz = 3; } else { sz = 0; if (nmp->opts & ERTS_NODES_MON_OPT_TYPES) sz += 2 + 3; if (what == am_nodedown && (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON)) { if (is_not_immed(reason)) sz += size_object(reason); sz += 2 + 3; } sz += 4; } return sz; } static ERTS_INLINE void send_nodes_mon_msg(Process *rp, ErtsProcLocks *rp_locksp, ErtsNodesMonitor *nmp, Eterm node, Eterm what, Eterm type, Eterm reason, Uint sz) { Eterm msg; Eterm *hp; ErtsMessage *mp; ErlOffHeap *ohp; #ifdef DEBUG Eterm *hend; #endif mp = erts_alloc_message_heap(rp, rp_locksp, sz, &hp, &ohp); #ifdef DEBUG hend = hp + sz; #endif if (!nmp->opts) { msg = TUPLE2(hp, what, node); #ifdef DEBUG hp += 3; #endif } else { Eterm tup; Eterm info = NIL; if (nmp->opts & (ERTS_NODES_MON_OPT_TYPE_VISIBLE | ERTS_NODES_MON_OPT_TYPE_HIDDEN)) { tup = TUPLE2(hp, am_node_type, type); hp += 3; info = CONS(hp, tup, info); hp += 2; } if (what == am_nodedown && (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON)) { Eterm rsn_cpy; if (is_immed(reason)) rsn_cpy = reason; else { Eterm rsn_sz = size_object(reason); rsn_cpy = copy_struct(reason, rsn_sz, &hp, ohp); } tup = TUPLE2(hp, am_nodedown_reason, rsn_cpy); hp += 3; info = CONS(hp, tup, info); hp += 2; } msg = TUPLE3(hp, what, node, info); #ifdef DEBUG hp += 4; #endif } ASSERT(hend == hp); erts_queue_message(rp, *rp_locksp, mp, msg, am_system); } static void send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reason) { ErtsNodesMonitor *nmp; ErtsProcLocks rp_locks = 0; /* Init to shut up false warning */ Process *rp = NULL; ASSERT(is_immed(what)); ASSERT(is_immed(node)); ASSERT(is_immed(type)); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_monitor)) { DTRACE_CHARBUF(what_str, 12); DTRACE_CHARBUF(node_str, 64); DTRACE_CHARBUF(type_str, 12); DTRACE_CHARBUF(reason_str, 64); erts_snprintf(what_str, sizeof(DTRACE_CHARBUF_NAME(what_str)), "%T", what); erts_snprintf(node_str, sizeof(DTRACE_CHARBUF_NAME(node_str)), "%T", node); erts_snprintf(type_str, sizeof(DTRACE_CHARBUF_NAME(type_str)), "%T", type); erts_snprintf(reason_str, sizeof(DTRACE_CHARBUF_NAME(reason_str)), "%T", reason); DTRACE5(dist_monitor, erts_this_node_sysname, what_str, node_str, type_str, reason_str); } #endif ERTS_LC_ASSERT(!c_p || (erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN)); erts_mtx_lock(&nodes_monitors_mtx); for (nmp = nodes_monitors; nmp; nmp = nmp->next) { int i; Uint16 no; Uint sz; ASSERT(nmp->proc != NULL); if (!nmp->opts) { if (type != am_visible) continue; } else { switch (type) { case am_hidden: if (!(nmp->opts & ERTS_NODES_MON_OPT_TYPE_HIDDEN)) continue; break; case am_visible: if ((nmp->opts & ERTS_NODES_MON_OPT_TYPES) && !(nmp->opts & ERTS_NODES_MON_OPT_TYPE_VISIBLE)) continue; break; default: erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n"); } } if (rp != nmp->proc) { if (rp) { if (rp == c_p) rp_locks &= ~ERTS_PROC_LOCK_MAIN; erts_proc_unlock(rp, rp_locks); } rp = nmp->proc; rp_locks = 0; if (rp == c_p) rp_locks |= ERTS_PROC_LOCK_MAIN; } ASSERT(rp); sz = nodes_mon_msg_sz(nmp, what, reason); for (i = 0, no = nmp->no; i < no; i++) send_nodes_mon_msg(rp, &rp_locks, nmp, node, what, type, reason, sz); } if (rp) { if (rp == c_p) rp_locks &= ~ERTS_PROC_LOCK_MAIN; erts_proc_unlock(rp, rp_locks); } erts_mtx_unlock(&nodes_monitors_mtx); } static Eterm insert_nodes_monitor(Process *c_p, Uint32 opts) { Uint16 no = 1; Eterm res = am_false; ErtsNodesMonitor *xnmp, *nmp; ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&nodes_monitors_mtx)); ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN); xnmp = c_p->nodes_monitors; if (xnmp) { ASSERT(!xnmp->prev || xnmp->prev->proc != c_p); while (1) { ASSERT(xnmp->proc == c_p); if (xnmp->opts == opts) break; if (!xnmp->next || xnmp->next->proc != c_p) break; xnmp = xnmp->next; } ASSERT(xnmp); ASSERT(xnmp->proc == c_p); ASSERT(xnmp->opts == opts || !xnmp->next || xnmp->next->proc != c_p); if (xnmp->opts != opts) goto alloc_new; else { res = am_true; no = xnmp->no++; if (!xnmp->no) { /* * 'no' wrapped; transfer all prevous monitors to new * element (which will be the next element in the list) * and set this to one... */ xnmp->no = 1; goto alloc_new; } } } else { alloc_new: nmp = erts_alloc(ERTS_ALC_T_NODES_MON, sizeof(ErtsNodesMonitor)); nmp->proc = c_p; nmp->opts = opts; nmp->no = no; if (xnmp) { ASSERT(nodes_monitors); ASSERT(c_p->nodes_monitors); nmp->next = xnmp->next; nmp->prev = xnmp; xnmp->next = nmp; if (nmp->next) { ASSERT(nodes_monitors_end != xnmp); ASSERT(nmp->next->prev == xnmp); nmp->next->prev = nmp; } else { ASSERT(nodes_monitors_end == xnmp); nodes_monitors_end = nmp; } } else { ASSERT(!c_p->nodes_monitors); c_p->nodes_monitors = nmp; nmp->next = NULL; nmp->prev = nodes_monitors_end; if (nodes_monitors_end) { ASSERT(nodes_monitors); nodes_monitors_end->next = nmp; } else { ASSERT(!nodes_monitors); nodes_monitors = nmp; } nodes_monitors_end = nmp; } } return res; } static Eterm remove_nodes_monitors(Process *c_p, Uint32 opts, int all) { Eterm res = am_false; ErtsNodesMonitor *nmp; ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&nodes_monitors_mtx)); ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN); nmp = c_p->nodes_monitors; ASSERT(!nmp || !nmp->prev || nmp->prev->proc != c_p); while (nmp && nmp->proc == c_p) { if (!all && nmp->opts != opts) nmp = nmp->next; else { /* if (all || nmp->opts == opts) */ ErtsNodesMonitor *free_nmp; res = am_true; if (nmp->prev) { ASSERT(nodes_monitors != nmp); nmp->prev->next = nmp->next; } else { ASSERT(nodes_monitors == nmp); nodes_monitors = nmp->next; } if (nmp->next) { ASSERT(nodes_monitors_end != nmp); nmp->next->prev = nmp->prev; } else { ASSERT(nodes_monitors_end == nmp); nodes_monitors_end = nmp->prev; } free_nmp = nmp; nmp = nmp->next; if (c_p->nodes_monitors == free_nmp) c_p->nodes_monitors = nmp && nmp->proc == c_p ? nmp : NULL; erts_free(ERTS_ALC_T_NODES_MON, free_nmp); } } ASSERT(!all || !c_p->nodes_monitors); return res; } void erts_delete_nodes_monitors(Process *c_p, ErtsProcLocks locks) { #if defined(ERTS_ENABLE_LOCK_CHECK) if (c_p) { ErtsProcLocks might_unlock = locks & ~ERTS_PROC_LOCK_MAIN; if (might_unlock) erts_proc_lc_might_unlock(c_p, might_unlock); } #endif if (erts_mtx_trylock(&nodes_monitors_mtx) == EBUSY) { ErtsProcLocks unlock_locks = locks & ~ERTS_PROC_LOCK_MAIN; if (c_p && unlock_locks) erts_proc_unlock(c_p, unlock_locks); erts_mtx_lock(&nodes_monitors_mtx); if (c_p && unlock_locks) erts_proc_lock(c_p, unlock_locks); } remove_nodes_monitors(c_p, 0, 1); erts_mtx_unlock(&nodes_monitors_mtx); } Eterm erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist) { Eterm res; Eterm opts_list = olist; Uint16 opts = (Uint16) 0; ASSERT(c_p); ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); if (on != am_true && on != am_false) return THE_NON_VALUE; if (is_not_nil(opts_list)) { int all = 0, visible = 0, hidden = 0; while (is_list(opts_list)) { Eterm *cp = list_val(opts_list); Eterm opt = CAR(cp); opts_list = CDR(cp); if (opt == am_nodedown_reason) opts |= ERTS_NODES_MON_OPT_DOWN_REASON; else if (is_tuple(opt)) { Eterm* tp = tuple_val(opt); if (arityval(tp[0]) != 2) return THE_NON_VALUE; switch (tp[1]) { case am_node_type: switch (tp[2]) { case am_visible: if (hidden || all) return THE_NON_VALUE; opts |= ERTS_NODES_MON_OPT_TYPE_VISIBLE; visible = 1; break; case am_hidden: if (visible || all) return THE_NON_VALUE; opts |= ERTS_NODES_MON_OPT_TYPE_HIDDEN; hidden = 1; break; case am_all: if (visible || hidden) return THE_NON_VALUE; opts |= ERTS_NODES_MON_OPT_TYPES; all = 1; break; default: return THE_NON_VALUE; } break; default: return THE_NON_VALUE; } } else { return THE_NON_VALUE; } } if (is_not_nil(opts_list)) return THE_NON_VALUE; } erts_mtx_lock(&nodes_monitors_mtx); if (on == am_true) res = insert_nodes_monitor(c_p, opts); else res = remove_nodes_monitors(c_p, opts, 0); erts_mtx_unlock(&nodes_monitors_mtx); return res; } /* * Note, this function is only used for debuging. */ Eterm erts_processes_monitoring_nodes(Process *c_p) { ErtsNodesMonitor *nmp; Eterm res; Eterm *hp; Eterm **hpp; Uint sz; Uint *szp; #ifdef DEBUG Eterm *hend; #endif ASSERT(c_p); ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); erts_mtx_lock(&nodes_monitors_mtx); sz = 0; szp = &sz; hpp = NULL; bld_result: res = NIL; for (nmp = nodes_monitors_end; nmp; nmp = nmp->prev) { Uint16 i; for (i = 0; i < nmp->no; i++) { Eterm olist = NIL; if (nmp->opts & ERTS_NODES_MON_OPT_TYPES) { Eterm type; switch (nmp->opts & ERTS_NODES_MON_OPT_TYPES) { case ERTS_NODES_MON_OPT_TYPES: type = am_all; break; case ERTS_NODES_MON_OPT_TYPE_VISIBLE: type = am_visible; break; case ERTS_NODES_MON_OPT_TYPE_HIDDEN: type = am_hidden; break; default: erts_exit(ERTS_ABORT_EXIT, "Bad node type found\n"); } olist = erts_bld_cons(hpp, szp, erts_bld_tuple(hpp, szp, 2, am_node_type, type), olist); } if (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON) olist = erts_bld_cons(hpp, szp, am_nodedown_reason, olist); res = erts_bld_cons(hpp, szp, erts_bld_tuple(hpp, szp, 2, nmp->proc->common.id, olist), res); } } if (!hpp) { hp = HAlloc(c_p, sz); #ifdef DEBUG hend = hp + sz; #endif hpp = &hp; szp = NULL; goto bld_result; } ASSERT(hp == hend); erts_mtx_unlock(&nodes_monitors_mtx); return res; }