diff options
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r-- | erts/emulator/beam/dist.c | 3256 |
1 files changed, 3256 insertions, 0 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c new file mode 100644 index 0000000000..e3094404e2 --- /dev/null +++ b/erts/emulator/beam/dist.c @@ -0,0 +1,3256 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 1996-2009. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +/* + * distribution of erlang messages to other nodes. + */ + + +/* define this to get a lot of debug output */ +/* #define ERTS_DIST_MSG_DBG */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#define ERTS_WANT_EXTERNAL_TAGS + +#include <stddef.h> +#include "sys.h" +#include "erl_vm.h" +#include "global.h" +#include "erl_process.h" +#include "error.h" +#include "dist.h" +#include "bif.h" +#include "external.h" +#include "erl_binary.h" + +/* Turn this on to get printouts of all distribution messages + * which go on the line + */ +#if 0 +#define ERTS_DIST_MSG_DBG +#endif +#if 0 +#define ERTS_RAW_DIST_MSG_DBG +#endif + +#if defined(ERTS_DIST_MSG_DBG) || defined(ERTS_RAW_DIST_MSG_DBG) +static void bw(byte *buf, int sz) +{ + bin_write(ERTS_PRINT_STDERR,NULL,buf,sz); +} +#endif + +#ifdef ERTS_DIST_MSG_DBG +static void +dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) +{ + byte *extp = edep->extp; + Eterm msg; + Sint size = erts_decode_dist_ext_size(edep, 0); + if (size < 0) { + erts_fprintf(stderr, + "DIST MSG DEBUG: erts_decode_dist_ext_size(%s) failed:\n", + what); + bw(buf, sz); + } + else { + Eterm *hp; + ErlHeapFragment *mbuf = new_message_buffer(size); + hp = mbuf->mem; + msg = erts_decode_dist_ext(&hp, &mbuf->off_heap, edep); + if (is_value(msg)) + erts_fprintf(stderr, " %s: %T\n", what, msg); + else { + erts_fprintf(stderr, + "DIST MSG DEBUG: erts_decode_dist_ext(%s) failed:\n", + what); + bw(buf, sz); + } + free_message_buffer(mbuf); + edep->extp = extp; + } +} + +#endif + + + +#define PASS_THROUGH 'p' /* This code should go */ + +int erts_is_alive; /* System must be blocked on change */ + +/* distribution trap functions */ +Export* dsend2_trap = NULL; +Export* dsend3_trap = NULL; +/*Export* dsend_nosuspend_trap = NULL;*/ +Export* dlink_trap = NULL; +Export* dunlink_trap = NULL; +Export* dmonitor_node_trap = NULL; +Export* dgroup_leader_trap = NULL; +Export* dexit_trap = NULL; +Export* dmonitor_p_trap = NULL; + +/* local variables */ + + +/* forward declarations */ + +static void clear_dist_entry(DistEntry*); +static int dsig_send(ErtsDSigData *, Eterm, Eterm, int); +static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); +static void init_nodes_monitors(void); + +static erts_smp_atomic_t no_caches; + +static void +delete_cache(ErtsAtomCache *cache) +{ + if (cache) { + erts_free(ERTS_ALC_T_DCACHE, (void *) cache); + ASSERT(erts_smp_atomic_read(&no_caches) > 0); + erts_smp_atomic_dec(&no_caches); + } +} + + +static void +create_cache(DistEntry *dep) +{ + int i; + ErtsAtomCache *cp; + + ERTS_SMP_LC_ASSERT( + is_internal_port(dep->cid) + && erts_lc_is_port_locked(&erts_port[internal_port_index(dep->cid)])); + ASSERT(!dep->cache); + + dep->cache = cp = (ErtsAtomCache*) erts_alloc(ERTS_ALC_T_DCACHE, + sizeof(ErtsAtomCache)); + erts_smp_atomic_inc(&no_caches); + for (i = 0; i < sizeof(cp->in_arr)/sizeof(cp->in_arr[0]); i++) { + cp->in_arr[i] = THE_NON_VALUE; + cp->out_arr[i] = THE_NON_VALUE; + } +} + +Uint erts_dist_cache_size(void) +{ + return (Uint) erts_smp_atomic_read(&no_caches)*sizeof(ErtsAtomCache); +} + +static ErtsProcList * +get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs) +{ + ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&dep->qlock)); + dep->qflgs &= ~unset_qflgs; + if (dep->qflgs & ERTS_DE_QFLG_EXIT) { + /* No resume when exit has been scheduled */ + return NULL; + } + else { + ErtsProcList *plp; + plp = dep->suspended.first; + dep->suspended.first = NULL; + dep->suspended.last = NULL; + return plp; + } +} + +/* +** A full node name constists of a "n@h" +** +** n must be a valid node name: string of ([a-z][A-Z][0-9]_-)+ +** +** h is not checked at all, we assume that we have a properly +** configured machine where the networking is ok for the OS +** +** We do check that there is not a second @ in the string, since +** many distributed operations are guaranteed not to work then. +*/ + + +static int is_node_name(char *ptr, int len) +{ + int c = '\0'; /* suppress use-before-set warning */ + int pos = 0; + + while (pos < len) { + c = ptr[pos++]; + if (! ((c == '-') || (c == '_') || + ((c >= 'a') && (c <= 'z')) || + ((c >= 'A') && (c <= 'Z')) || + ((c >= '0') && (c <= '9')))) + break; + } + + /* Scanned past the host name: now we want to see a '@', and there + should be text both before and after it. */ + if (c != '@' || pos < 2 || pos == len) + return 0; + + while (pos < len) { + c = ptr[pos++]; + if (c == '@') + return 0; + } + + return 1; +} + +int is_node_name_atom(Eterm a) +{ + int i; + if(is_not_atom(a)) + return 0; + i = atom_val(a); + ASSERT((i > 0) && (i < atom_table_size()) && (atom_tab(i) != NULL)); + return is_node_name((char*)atom_tab(i)->name, atom_tab(i)->len); +} + +typedef struct { + DistEntry *dep; +} NetExitsContext; + +/* +** This function is called when a distribution +** port or process terminates +*/ +static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp) +{ + Process *rp; + ErtsMonitor *rmon; + DistEntry *dep = ((NetExitsContext *) vnecp)->dep; + ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK; + + rp = erts_pid2proc(NULL, 0, mon->pid, rp_locks); + if (!rp) + goto done; + + if (mon->type == MON_ORIGIN) { + /* local pid is beeing monitored */ + rmon = erts_remove_monitor(&(rp->monitors),mon->ref); + /* ASSERT(rmon != NULL); nope, can happen during process exit */ + if (rmon != NULL) { + erts_destroy_monitor(rmon); + } + } else { + Eterm lhp[3]; + Eterm watched; + ASSERT(mon->type == MON_TARGET); + rmon = erts_remove_monitor(&(rp->monitors),mon->ref); + /* ASSERT(rmon != NULL); can happen during process exit */ + if (rmon != NULL) { + ASSERT(is_atom(rmon->name) || is_nil(rmon->name)); + watched = (is_atom(rmon->name) + ? TUPLE2(lhp, rmon->name, dep->sysname) + : rmon->pid); +#ifdef ERTS_SMP + rp_locks |= ERTS_PROC_LOCKS_MSG_SEND; + erts_smp_proc_lock(rp, ERTS_PROC_LOCKS_MSG_SEND); +#endif + erts_queue_monitor_message(rp, &rp_locks, mon->ref, am_process, + watched, am_noconnection); + erts_destroy_monitor(rmon); + } + } + erts_smp_proc_unlock(rp, rp_locks); + done: + erts_destroy_monitor(mon); +} + +typedef struct { + NetExitsContext *necp; + ErtsLink *lnk; +} LinkNetExitsContext; + +/* +** This is the function actually doing the job of sending exit messages +** for links in a dist entry upon net_exit (the node goes down), NB, +** only process links, not node monitors are handled here, +** they reside in a separate tree.... +*/ +static void doit_link_net_exits_sub(ErtsLink *sublnk, void *vlnecp) +{ + ErtsLink *lnk = ((LinkNetExitsContext *) vlnecp)->lnk; /* the local pid */ + ErtsLink *rlnk; + Process *rp; + + ASSERT(lnk->type == LINK_PID); + if (is_internal_pid(lnk->pid)) { + int xres; + ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND; + + rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks); + if (!rp) { + goto done; + } + + rlnk = erts_remove_link(&(rp->nlinks), sublnk->pid); + xres = erts_send_exit_signal(NULL, + sublnk->pid, + rp, + &rp_locks, + am_noconnection, + NIL, + NULL, + 0); + + if (rlnk) { + erts_destroy_link(rlnk); + if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) { + /* We didn't exit the process and it is traced */ + trace_proc(NULL, rp, am_getting_unlinked, sublnk->pid); + } + } + erts_smp_proc_unlock(rp, rp_locks); + } + done: + erts_destroy_link(sublnk); + +} + + + + + +/* +** This function is called when a distribution +** port or process terminates, once for each link on the high level, +** it in turn traverses the link subtree for the specific link node... +*/ +static void doit_link_net_exits(ErtsLink *lnk, void *vnecp) +{ + LinkNetExitsContext lnec = {(NetExitsContext *) vnecp, lnk}; + ASSERT(lnk->type == LINK_PID) + erts_sweep_links(ERTS_LINK_ROOT(lnk), &doit_link_net_exits_sub, (void *) &lnec); +#ifdef DEBUG + ERTS_LINK_ROOT(lnk) = NULL; +#endif + erts_destroy_link(lnk); +} + + +static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp) +{ + DistEntry *dep = ((NetExitsContext *) vnecp)->dep; + Eterm name = dep->sysname; + Process *rp; + ErtsLink *rlnk; + Uint i,n; + ASSERT(lnk->type == LINK_NODE) + if (is_internal_pid(lnk->pid)) { + ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK; + rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks); + if (!rp) { + goto done; + } + rlnk = erts_remove_link(&(rp->nlinks), name); + if (rlnk != NULL) { + ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE)); + erts_destroy_link(rlnk); + } + n = ERTS_LINK_REFC(lnk); + for (i = 0; i < n; ++i) { + ErlHeapFragment* bp; + ErlOffHeap *ohp; + Eterm tup; + Eterm *hp = erts_alloc_message_heap(3,&bp,&ohp,rp,&rp_locks); + tup = TUPLE2(hp, am_nodedown, name); + erts_queue_message(rp, &rp_locks, bp, tup, NIL); + } + erts_smp_proc_unlock(rp, rp_locks); + } + done: + erts_destroy_link(lnk); +} + + +/* + * proc is currently running or exiting process. + */ +int erts_do_net_exits(DistEntry *dep, Eterm reason) +{ + Eterm nodename; + + if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */ + Eterm nd_reason = (reason == am_no_network + ? am_no_network + : am_net_kernel_terminated); + erts_smp_rwmtx_rwlock(&erts_dist_table_rwmtx); + + /* KILL all port controllers */ + while(erts_visible_dist_entries || erts_hidden_dist_entries) { + DistEntry *tdep; + Eterm prt_id; + Port *prt; + if(erts_hidden_dist_entries) + tdep = erts_hidden_dist_entries; + else + tdep = erts_visible_dist_entries; + prt_id = tdep->cid; + ASSERT(is_internal_port(prt_id)); + erts_smp_rwmtx_rwunlock(&erts_dist_table_rwmtx); + + prt = erts_id2port(prt_id, NULL, 0); + if (prt) { + ASSERT(prt->status & ERTS_PORT_SFLG_DISTRIBUTION); + ASSERT(prt->dist_entry); + /* will call do_net_exists !!! */ + erts_do_exit_port(prt, prt_id, nd_reason); + erts_port_release(prt); + } + + erts_smp_rwmtx_rwlock(&erts_dist_table_rwmtx); + } + + erts_smp_rwmtx_rwunlock(&erts_dist_table_rwmtx); + + nodename = erts_this_dist_entry->sysname; + erts_smp_block_system(ERTS_BS_FLG_ALLOW_GC); + erts_set_this_node(am_Noname, 0); + erts_is_alive = 0; + send_nodes_mon_msgs(NULL, am_nodedown, nodename, am_visible, nd_reason); + erts_smp_release_system(); + + } + else { /* recursive call via erts_do_exit_port() will end up here */ + NetExitsContext nec = {dep}; + ErtsLink *nlinks; + ErtsLink *node_links; + ErtsMonitor *monitors; + Uint32 flags; + + erts_smp_atomic_set(&dep->dist_cmd_scheduled, 1); + erts_smp_de_rwlock(dep); + + ERTS_SMP_LC_ASSERT(is_internal_port(dep->cid) + && erts_lc_is_port_locked(&erts_port[internal_port_index(dep->cid)])); + + if (erts_port_task_is_scheduled(&dep->dist_cmd)) + erts_port_task_abort(dep->cid, &dep->dist_cmd); + + if (dep->status & ERTS_DE_SFLG_EXITING) { +#ifdef DEBUG + erts_smp_spin_lock(&dep->qlock); + ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT); + erts_smp_spin_unlock(&dep->qlock); +#endif + } + else { + dep->status |= ERTS_DE_SFLG_EXITING; + erts_smp_spin_lock(&dep->qlock); + ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT)); + dep->qflgs |= ERTS_DE_QFLG_EXIT; + erts_smp_spin_unlock(&dep->qlock); + } + + erts_smp_de_links_lock(dep); + monitors = dep->monitors; + nlinks = dep->nlinks; + node_links = dep->node_links; + dep->monitors = NULL; + dep->nlinks = NULL; + dep->node_links = NULL; + erts_smp_de_links_unlock(dep); + + nodename = dep->sysname; + flags = dep->flags; + + erts_set_dist_entry_not_connected(dep); + + erts_smp_de_rwunlock(dep); + + erts_sweep_monitors(monitors, &doit_monitor_net_exits, (void *) &nec); + erts_sweep_links(nlinks, &doit_link_net_exits, (void *) &nec); + erts_sweep_links(node_links, &doit_node_link_net_exits, (void *) &nec); + + send_nodes_mon_msgs(NULL, + am_nodedown, + nodename, + flags & DFLAG_PUBLISHED ? am_visible : am_hidden, + reason == am_normal ? am_connection_closed : reason); + + clear_dist_entry(dep); + + } + return 1; +} + +static Export* +trap_function(Eterm func, int arity) +{ + return erts_export_put(am_erlang, func, arity); +} + +void init_dist(void) +{ + init_nodes_monitors(); + + erts_smp_atomic_init(&no_caches, 0); + + /* Lookup/Install all references to trap functions */ + dsend2_trap = trap_function(am_dsend,2); + dsend3_trap = trap_function(am_dsend,3); + /* dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/ + dlink_trap = trap_function(am_dlink,1); + dunlink_trap = trap_function(am_dunlink,1); + dmonitor_node_trap = trap_function(am_dmonitor_node,3); + dgroup_leader_trap = trap_function(am_dgroup_leader,2); + dexit_trap = trap_function(am_dexit, 2); + dmonitor_p_trap = trap_function(am_dmonitor_p, 2); +} + +#define ErtsDistOutputBuf2Binary(OB) \ + ((Binary *) (((char *) (OB)) - offsetof(Binary, orig_bytes))) + +static ERTS_INLINE ErtsDistOutputBuf * +alloc_dist_obuf(Uint size) +{ + ErtsDistOutputBuf *obuf; + Uint obuf_size = sizeof(ErtsDistOutputBuf)+sizeof(byte)*(size-1); + Binary *bin = erts_bin_drv_alloc(obuf_size); + bin->flags = BIN_FLAG_DRV; + erts_refc_init(&bin->refc, 1); + bin->orig_size = (long) obuf_size; + obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[0]; +#ifdef DEBUG + obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN; + ASSERT(bin == ErtsDistOutputBuf2Binary(obuf)); +#endif + return obuf; +} + +static ERTS_INLINE void +free_dist_obuf(ErtsDistOutputBuf *obuf) +{ + Binary *bin = ErtsDistOutputBuf2Binary(obuf); + ASSERT(obuf->dbg_pattern == ERTS_DIST_OUTPUT_BUF_DBG_PATTERN); + if (erts_refc_dectest(&bin->refc, 0) == 0) + erts_bin_free(bin); +} + +static ERTS_INLINE Sint +size_obuf(ErtsDistOutputBuf *obuf) +{ + Binary *bin = ErtsDistOutputBuf2Binary(obuf); + return bin->orig_size; +} + +static void clear_dist_entry(DistEntry *dep) +{ + Sint obufsize = 0; + ErtsAtomCache *cache; + ErtsProcList *suspendees; + ErtsDistOutputBuf *obuf; + + erts_smp_de_rwlock(dep); + cache = dep->cache; + dep->cache = NULL; + +#ifdef DEBUG + erts_smp_de_links_lock(dep); + ASSERT(!dep->nlinks); + ASSERT(!dep->node_links); + ASSERT(!dep->monitors); + erts_smp_de_links_unlock(dep); +#endif + + erts_smp_spin_lock(&dep->qlock); + + if (!dep->out_queue.last) + obuf = dep->finalized_out_queue.first; + else { + dep->out_queue.last->next = dep->finalized_out_queue.first; + obuf = dep->out_queue.first; + } + + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + dep->finalized_out_queue.first = NULL; + dep->finalized_out_queue.last = NULL; + dep->status = 0; + suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + + erts_smp_spin_unlock(&dep->qlock); + erts_smp_atomic_set(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + erts_smp_de_rwunlock(dep); + + erts_resume_processes(suspendees); + + delete_cache(cache); + + while (obuf) { + ErtsDistOutputBuf *fobuf; + fobuf = obuf; + obuf = obuf->next; + obufsize += size_obuf(fobuf); + free_dist_obuf(fobuf); + } + + if (obufsize) { + erts_smp_spin_lock(&dep->qlock); + ASSERT(dep->qsize >= obufsize); + dep->qsize -= obufsize; + erts_smp_spin_unlock(&dep->qlock); + } +} + +/* + * The erts_dsig_send_*() functions implemented below, sends asynchronous + * distributed signals to other Erlang nodes. Before sending a distributed + * signal, you need to prepare the operation by calling erts_dsig_prepare() + * (see dist.h). + * + * Note that the distributed signal send operation is truly asynchronous, + * and the signal is not guaranteed to reach the receiver if the connection + * goes down before the signal has reached the receiver. + */ + +/* +** Send a DOP_LINK link message +*/ +int +erts_dsig_send_link(ErtsDSigData *dsdp, Eterm local, Eterm remote) +{ + Eterm ctl_heap[4]; + Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_LINK), local, remote); + + return dsig_send(dsdp, ctl, THE_NON_VALUE, 0); +} + +int +erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote) +{ + Eterm ctl_heap[4]; + Eterm ctl = TUPLE3(&ctl_heap[0], make_small(DOP_UNLINK), local, remote); + + return dsig_send(dsdp, ctl, THE_NON_VALUE, 0); +} + + +/* A local process that's beeing monitored by a remote one exits. We send: + {DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason}, + which is rather sad as only the ref is needed, no pid's... */ +int +erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, + Eterm ref, Eterm reason) +{ + Eterm ctl; + Eterm ctl_heap[6]; + + ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT), + watched, watcher, ref, reason); + +#ifdef DEBUG + erts_smp_de_links_lock(dsdp->dep); + ASSERT(!erts_lookup_monitor(dsdp->dep->monitors, ref)); + erts_smp_de_links_unlock(dsdp->dep); +#endif + + return dsig_send(dsdp, ctl, THE_NON_VALUE, 1); +} + +/* We want to monitor a process (named or unnamed) on another node, we send: + {DOP_MONITOR_P, Local pid, Remote pid or name, Ref}, which is exactly what's + needed on the other side... */ +int +erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, + Eterm ref) +{ + Eterm ctl; + Eterm ctl_heap[5]; + + ctl = TUPLE4(&ctl_heap[0], + make_small(DOP_MONITOR_P), + watcher, watched, ref); + + return dsig_send(dsdp, ctl, THE_NON_VALUE, 0); +} + +/* A local process monitoring a remote one wants to stop monitoring, either + because of a demonitor bif call or because the local process died. We send + {DOP_DEMONITOR_P, Local pid, Remote pid or name, ref}, which is once again + rather redundant as only the ref will be needed on the other side... */ +int +erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher, + Eterm watched, Eterm ref, int force) +{ + Eterm ctl; + Eterm ctl_heap[5]; + + ctl = TUPLE4(&ctl_heap[0], + make_small(DOP_DEMONITOR_P), + watcher, watched, ref); + + return dsig_send(dsdp, ctl, THE_NON_VALUE, force); +} + +int +erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message) +{ + Eterm ctl; + Eterm ctl_heap[5]; + Eterm token = NIL; + Process *sender = dsdp->proc; + + if (SEQ_TRACE_TOKEN(sender) != NIL) { + seq_trace_update_send(sender); + token = SEQ_TRACE_TOKEN(sender); + seq_trace_output(token, message, SEQ_TRACE_SEND, remote, sender); + } + + if (token != NIL) + ctl = TUPLE4(&ctl_heap[0], + make_small(DOP_SEND_TT), am_Cookie, remote, token); + else + ctl = TUPLE3(&ctl_heap[0], make_small(DOP_SEND), am_Cookie, remote); + return dsig_send(dsdp, ctl, message, 0); +} + +int +erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message) +{ + Eterm ctl; + Eterm ctl_heap[6]; + Eterm token = NIL; + Process *sender = dsdp->proc; + + if (SEQ_TRACE_TOKEN(sender) != NIL) { + seq_trace_update_send(sender); + token = SEQ_TRACE_TOKEN(sender); + seq_trace_output(token, message, SEQ_TRACE_SEND, remote_name, sender); + } + + if (token != NIL) + ctl = TUPLE5(&ctl_heap[0], make_small(DOP_REG_SEND_TT), + sender->id, am_Cookie, remote_name, token); + else + ctl = TUPLE4(&ctl_heap[0], make_small(DOP_REG_SEND), + sender->id, am_Cookie, remote_name); + return dsig_send(dsdp, ctl, message, 0); +} + +/* local has died, deliver the exit signal to remote */ +int +erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, + Eterm reason, Eterm token) +{ + Eterm ctl; + Eterm ctl_heap[6]; + + if (token != NIL) { + seq_trace_update_send(dsdp->proc); + seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local); + ctl = TUPLE5(&ctl_heap[0], + make_small(DOP_EXIT_TT), local, remote, token, reason); + } else { + ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason); + } + /* forced, i.e ignore busy */ + return dsig_send(dsdp, ctl, THE_NON_VALUE, 1); +} + +int +erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason) +{ + Eterm ctl_heap[5]; + Eterm ctl = TUPLE4(&ctl_heap[0], + make_small(DOP_EXIT), local, remote, reason); + /* forced, i.e ignore busy */ + return dsig_send(dsdp, ctl, THE_NON_VALUE, 1); +} + +int +erts_dsig_send_exit2(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason) +{ + Eterm ctl_heap[5]; + Eterm ctl = TUPLE4(&ctl_heap[0], + make_small(DOP_EXIT2), local, remote, reason); + + return dsig_send(dsdp, ctl, THE_NON_VALUE, 0); +} + + +int +erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote) +{ + Eterm ctl_heap[4]; + Eterm ctl = TUPLE3(&ctl_heap[0], + make_small(DOP_GROUP_LEADER), leader, remote); + + return dsig_send(dsdp, ctl, THE_NON_VALUE, 0); +} + +#if defined(PURIFY) +# define PURIFY_MSG(msg) \ + purify_printf("%s, line %d: %s", __FILE__, __LINE__, msg) +#elif defined(VALGRIND) +#include <valgrind/valgrind.h> +#include <valgrind/memcheck.h> + +# define PURIFY_MSG(msg) \ + do { \ + char buf__[1]; size_t bufsz__ = sizeof(buf__); \ + if (erts_sys_getenv("VALGRIND_LOG_XML", buf__, &bufsz__) >= 0) { \ + VALGRIND_PRINTF("<erlang_error_log>" \ + "%s, line %d: %s</erlang_error_log>\n", \ + __FILE__, __LINE__, msg); \ + } else { \ + VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg); \ + } \ + } while (0) +#else +# define PURIFY_MSG(msg) +#endif + +/* +** Input from distribution port. +** Input follows the distribution protocol v4.5 +** +** The protocol is a 4 byte header protocol +** the DOP_DATA is stripped by driver_output +** +** assert hlen == 0 !!! +*/ +int erts_net_message(Port *prt, + DistEntry *dep, + byte *hbuf, + int hlen, + byte *buf, + int len) +{ + ErtsDistExternal ede; + byte *t; + Sint ctl_len; + int orig_ctl_len; + Eterm arg; + Eterm from, to; + Eterm watcher, watched; + Eterm ref; + Eterm *tuple; + Eterm reason; + Process* rp; + Eterm ctl_default[64]; + Eterm* ctl = ctl_default; + ErlOffHeap off_heap; + Eterm* hp; + Sint type; + Eterm token; + Eterm token_size; + ErtsMonitor *mon; + ErtsLink *lnk; + int res; +#ifdef ERTS_DIST_MSG_DBG + int orig_len = len; +#endif + + /* Thanks to Luke Gorrie */ + off_heap.mso = NULL; +#ifndef HYBRID /* FIND ME! */ + off_heap.funs = NULL; +#endif + off_heap.overhead = 0; + off_heap.externals = NULL; + + ERTS_SMP_CHK_NO_PROC_LOCKS; + + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + + if (!erts_is_alive) + return 0; + if (hlen > 0) + goto data_error; + if (len == 0) /* HANDLE TICK !!! */ + return 0; + +#ifdef ERTS_RAW_DIST_MSG_DBG + erts_fprintf(stderr, "<< "); + bw(buf, len); +#endif + + if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) + t = buf; + else { + /* Skip PASS_THROUGH */ + t = buf+1; + len--; + } + + if (len == 0) { + PURIFY_MSG("data error"); + goto data_error; + } + + res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache); + + if (res >= 0) + res = ctl_len = erts_decode_dist_ext_size(&ede, 0); + else { +#ifdef ERTS_DIST_MSG_DBG + erts_fprintf(stderr, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n"); + bw(buf, orig_len); +#endif + ctl_len = 0; + } + + if (res < 0) { +#ifdef ERTS_DIST_MSG_DBG + erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n"); + bw(buf, orig_len); +#endif + PURIFY_MSG("data error"); + goto data_error; + } + orig_ctl_len = ctl_len; + if (ctl_len > sizeof(ctl_default)/sizeof(ctl_default[0])) { + ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm)); + } + hp = ctl; + + arg = erts_decode_dist_ext(&hp, &off_heap, &ede); + if (is_non_value(arg)) { +#ifdef ERTS_DIST_MSG_DBG + erts_fprintf(stderr, "DIST MSG DEBUG: erts_dist_ext_size(CTL) failed:\n"); + bw(buf, orig_len); +#endif + PURIFY_MSG("data error"); + goto data_error; + } + ctl_len = t - buf; + +#ifdef ERTS_DIST_MSG_DBG + erts_fprintf(stderr, "<<%s CTL: %T\n", len != orig_len ? "P" : " ", arg); +#endif + + if (is_not_tuple(arg) || + (tuple = tuple_val(arg), arityval(*tuple) < 1) || + is_not_small(tuple[1])) { + erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); + erts_dsprintf(dsbufp, "Invalid distribution message: %.200T", arg); + erts_send_error_to_logger_nogl(dsbufp); + goto data_error; + } + + token_size = 0; + + switch (type = unsigned_val(tuple[1])) { + case DOP_LINK: + from = tuple[2]; + to = tuple[3]; /* local proc to link to */ + + if (is_not_pid(from) || is_not_pid(to)) { + erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); + PURIFY_MSG("data error"); + erts_dsprintf(dsbufp, + "Invalid DOP_LINK distribution message: %.200T", + arg); + erts_send_error_to_logger_nogl(dsbufp); + goto data_error; + } + + rp = erts_pid2proc_opt(NULL, 0, + to, ERTS_PROC_LOCK_LINK, + ERTS_P2P_FLG_ALLOW_OTHER_X); + if (!rp) { + /* This is tricky (we MUST force a distributed send) */ + ErtsDSigData dsd; + int code; + code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + if (code == ERTS_DSIG_PREP_CONNECTED) { + code = erts_dsig_send_exit(&dsd, to, from, am_noproc); + ASSERT(code == ERTS_DSIG_SEND_OK); + } + break; + } + + erts_smp_de_links_lock(dep); + res = erts_add_link(&(rp->nlinks), LINK_PID, from); + + if (res < 0) { + /* It was already there! Lets skip the rest... */ + erts_smp_de_links_unlock(dep); + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + break; + } + lnk = erts_add_or_lookup_link(&(dep->nlinks), LINK_PID, rp->id); + erts_add_link(&(ERTS_LINK_ROOT(lnk)), LINK_PID, from); + erts_smp_de_links_unlock(dep); + + if (IS_TRACED_FL(rp, F_TRACE_PROCS)) + trace_proc(NULL, rp, am_getting_linked, from); + + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + break; + + case DOP_UNLINK: { + ErtsDistLinkData dld; + from = tuple[2]; + to = tuple[3]; + + rp = erts_pid2proc_opt(NULL, 0, + to, ERTS_PROC_LOCK_LINK, + ERTS_P2P_FLG_ALLOW_OTHER_X); + if (!rp) + break; + + lnk = erts_remove_link(&(rp->nlinks), from); + + if (IS_TRACED_FL(rp, F_TRACE_PROCS) && lnk != NULL) { + trace_proc(NULL, rp, am_getting_unlinked, from); + } + + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + + erts_remove_dist_link(&dld, to, from, dep); + erts_destroy_dist_link(&dld); + if (lnk) + erts_destroy_link(lnk); + break; + } + + case DOP_MONITOR_P: { + /* A remote process wants to monitor us, we get: + {DOP_MONITOR_P, Remote pid, local pid or name, ref} */ + Eterm name; + + watcher = tuple[2]; + watched = tuple[3]; /* local proc to monitor */ + ref = tuple[4]; + + if (is_atom(watched)) { + name = watched; + rp = erts_whereis_process(NULL, 0, + watched, ERTS_PROC_LOCK_LINK, + ERTS_P2P_FLG_ALLOW_OTHER_X); + } + else { + name = NIL; + rp = erts_pid2proc_opt(NULL, 0, + watched, ERTS_PROC_LOCK_LINK, + ERTS_P2P_FLG_ALLOW_OTHER_X); + } + + if (!rp) { + ErtsDSigData dsd; + int code; + code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + if (code == ERTS_DSIG_PREP_CONNECTED) { + code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, + am_noproc); + ASSERT(code == ERTS_DSIG_SEND_OK); + } + } + else { + if (is_atom(watched)) + watched = rp->id; + erts_smp_de_links_lock(dep); + erts_add_monitor(&(dep->monitors), MON_ORIGIN, ref, watched, name); + erts_add_monitor(&(rp->monitors), MON_TARGET, ref, watcher, name); + erts_smp_de_links_unlock(dep); + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + } + + break; + } + + case DOP_DEMONITOR_P: + /* A remote node informs us that a local pid in no longer monitored + We get {DOP_DEMONITOR_P, Remote pid, Local pid or name, ref}, + We need only the ref of course */ + + /* watcher = tuple[2]; */ + /* watched = tuple[3]; May be an atom in case of monitor name */ + ref = tuple[4]; + + erts_smp_de_links_lock(dep); + mon = erts_remove_monitor(&(dep->monitors),ref); + erts_smp_de_links_unlock(dep); + /* ASSERT(mon != NULL); can happen in case of broken dist message */ + if (mon == NULL) { + break; + } + watched = mon->pid; + erts_destroy_monitor(mon); + rp = erts_pid2proc_opt(NULL, 0, + watched, ERTS_PROC_LOCK_LINK, + ERTS_P2P_FLG_ALLOW_OTHER_X); + if (!rp) { + break; + } + mon = erts_remove_monitor(&(rp->monitors),ref); + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + ASSERT(mon != NULL); + if (mon == NULL) { + break; + } + erts_destroy_monitor(mon); + break; + + case DOP_NODE_LINK: /* XXX never sent ?? */ + break; + + case DOP_REG_SEND_TT: + token_size = size_object(tuple[5]); + /* Fall through ... */ + case DOP_REG_SEND: + /* {DOP_REG_SEND, From, Cookie, ToName} -- Message */ + /* {DOP_REG_SEND_TT, From, Cookie, ToName, TraceToken} -- Message */ + + /* + * There is intentionally no testing of the cookie (it is always '') + * from R9B and onwards. + */ +#ifdef ERTS_DIST_MSG_DBG + dist_msg_dbg(&ede, "MSG", buf, orig_len); +#endif + + from = tuple[2]; + to = tuple[4]; + rp = erts_whereis_process(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC); + if (rp) { + Uint xsize = (type == DOP_REG_SEND + ? 0 + : ERTS_HEAP_FRAG_SIZE(token_size)); + ErtsProcLocks locks = 0; + ErtsDistExternal *ede_copy; + + ede_copy = erts_make_dist_ext_copy(&ede, xsize); + if (type == DOP_REG_SEND) { + token = NIL; + } else { + ErlHeapFragment *heap_frag; + ErlOffHeap *ohp; + ASSERT(xsize); + heap_frag = erts_dist_ext_trailer(ede_copy); + ERTS_INIT_HEAP_FRAG(heap_frag, token_size); + hp = heap_frag->mem; + ohp = &heap_frag->off_heap; + token = tuple[5]; + token = copy_struct(token, token_size, &hp, ohp); + } + + erts_queue_dist_message(rp, &locks, ede_copy, token); + if (locks) + erts_smp_proc_unlock(rp, locks); + erts_smp_proc_dec_refc(rp); + } + break; + + case DOP_SEND_TT: + token_size = size_object(tuple[4]); + /* Fall through ... */ + case DOP_SEND: + /* + * There is intentionally no testing of the cookie (it is always '') + * from R9B and onwards. + */ +#ifdef ERTS_DIST_MSG_DBG + dist_msg_dbg(&ede, "MSG", buf, orig_len); +#endif + + to = tuple[3]; + rp = erts_pid2proc_opt(NULL, 0, to, 0, ERTS_P2P_FLG_SMP_INC_REFC); + if (rp) { + Uint xsize = type == DOP_SEND ? 0 : ERTS_HEAP_FRAG_SIZE(token_size); + ErtsProcLocks locks = 0; + ErtsDistExternal *ede_copy; + + ede_copy = erts_make_dist_ext_copy(&ede, xsize); + if (type == DOP_SEND) { + token = NIL; + } else { + ErlHeapFragment *heap_frag; + ErlOffHeap *ohp; + ASSERT(xsize); + heap_frag = erts_dist_ext_trailer(ede_copy); + ERTS_INIT_HEAP_FRAG(heap_frag, token_size); + hp = heap_frag->mem; + ohp = &heap_frag->off_heap; + token = tuple[4]; + token = copy_struct(token, token_size, &hp, ohp); + } + + erts_queue_dist_message(rp, &locks, ede_copy, token); + if (locks) + erts_smp_proc_unlock(rp, locks); + erts_smp_proc_dec_refc(rp); + } + break; + + case DOP_MONITOR_P_EXIT: { + /* We are monitoring a process on the remote node which dies, we get + {DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */ + + + Eterm lhp[3]; + Eterm sysname; + ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_MSG_SEND|ERTS_PROC_LOCK_LINK; + + /* watched = tuple[2]; */ /* remote proc which died */ + /* watcher = tuple[3]; */ + ref = tuple[4]; + reason = tuple[5]; + + erts_smp_de_links_lock(dep); + sysname = dep->sysname; + mon = erts_remove_monitor(&(dep->monitors), ref); + /* + * If demonitor was performed at the same time as the + * monitored process exits, monitoring side will have + * removed info about monitor. In this case, do nothing + * and everything will be as it should. + */ + erts_smp_de_links_unlock(dep); + if (mon == NULL) { + break; + } + rp = erts_pid2proc(NULL, 0, mon->pid, rp_locks); + if (rp == NULL) { + break; + } + + erts_destroy_monitor(mon); + + mon = erts_remove_monitor(&(rp->monitors),ref); + + if (mon == NULL) { + erts_smp_proc_unlock(rp, rp_locks); + break; + } + + watched = (is_not_nil(mon->name) + ? TUPLE2(&lhp[0], mon->name, sysname) + : mon->pid); + + erts_queue_monitor_message(rp, &rp_locks, + ref, am_process, watched, reason); + erts_smp_proc_unlock(rp, rp_locks); + erts_destroy_monitor(mon); + break; + } + + case DOP_EXIT_TT: + case DOP_EXIT: { + ErtsDistLinkData dld; + ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK|ERTS_PROC_LOCKS_XSIG_SEND; + /* 'from', which 'to' is linked to, died */ + if (type == DOP_EXIT) { + from = tuple[2]; + to = tuple[3]; + reason = tuple[4]; + token = NIL; + } else { + from = tuple[2]; + to = tuple[3]; + token = tuple[4]; + reason = tuple[5]; + } + if (is_not_internal_pid(to)) { + erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); + PURIFY_MSG("data error"); + erts_dsprintf(dsbufp, + "Invalid DOP_EXIT distribution message: %.200T", + arg); + erts_send_error_to_logger_nogl(dsbufp); + goto data_error; + } + + rp = erts_pid2proc(NULL, 0, to, rp_locks); + if (!rp) + lnk = NULL; + else { + lnk = erts_remove_link(&(rp->nlinks), from); + + /* If lnk == NULL, we have unlinked on this side, i.e. + * ignore exit. + */ + if (lnk) { + int xres; +#if 0 + /* Arndt: Maybe it should never be 'kill', but it can be, + namely when a linked process does exit(kill). Until we know + whether that is incorrect and what should happen instead, + we leave the assertion out. */ + ASSERT(reason != am_kill); /* should never be kill (killed) */ +#endif + xres = erts_send_exit_signal(NULL, + from, + rp, + &rp_locks, + reason, + token, + NULL, + ERTS_XSIG_FLG_IGN_KILL); + if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) { + /* We didn't exit the process and it is traced */ + trace_proc(NULL, rp, am_getting_unlinked, from); + } + } + erts_smp_proc_unlock(rp, rp_locks); + } + erts_remove_dist_link(&dld, to, from, dep); + if (lnk) + erts_destroy_link(lnk); + erts_destroy_dist_link(&dld); + break; + } + case DOP_EXIT2_TT: + case DOP_EXIT2: { + ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; + /* 'from' is send an exit signal to 'to' */ + if (type == DOP_EXIT2) { + from = tuple[2]; + to = tuple[3]; + reason = tuple[4]; + token = NIL; + } else { + from = tuple[2]; + to = tuple[3]; + token = tuple[4]; + reason = tuple[5]; + } + rp = erts_pid2proc_opt(NULL, 0, to, rp_locks, + ERTS_P2P_FLG_SMP_INC_REFC); + if (rp) { + (void) erts_send_exit_signal(NULL, + from, + rp, + &rp_locks, + reason, + token, + NULL, + 0); + erts_smp_proc_unlock(rp, rp_locks); + erts_smp_proc_dec_refc(rp); + } + break; + } + case DOP_GROUP_LEADER: + from = tuple[2]; /* Group leader */ + to = tuple[3]; /* new member */ + if (is_not_pid(from)) + break; + + rp = erts_pid2proc(NULL, 0, to, ERTS_PROC_LOCK_MAIN); + if (!rp) + break; + rp->group_leader = STORE_NC_IN_PROC(rp, from); + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); + break; + + default: { + erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); + erts_dsprintf(dsbufp, + "Illegal value in distribution dispatch switch: " + "%.200T", + arg); + erts_send_error_to_logger_nogl(dsbufp); + PURIFY_MSG("data error"); + goto data_error; + } + } + + if (off_heap.mso) { + erts_cleanup_mso(off_heap.mso); + } + if (off_heap.externals) { + erts_cleanup_externals(off_heap.externals); + } +#ifndef HYBRID /* FIND ME! */ + if (off_heap.funs) { + erts_cleanup_funs(off_heap.funs); + } + if (ctl != ctl_default) { + erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl); + } +#endif + ERTS_SMP_CHK_NO_PROC_LOCKS; + return 0; + + data_error: + if (off_heap.mso) { + erts_cleanup_mso(off_heap.mso); + } + if (off_heap.externals) { + erts_cleanup_externals(off_heap.externals); + } +#ifndef HYBRID /* FIND ME! */ + if (off_heap.funs) { + erts_cleanup_funs(off_heap.funs); + } + if (ctl != ctl_default) { + erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl); + } +#endif + erts_do_exit_port(prt, dep->cid, am_killed); + ERTS_SMP_CHK_NO_PROC_LOCKS; + return -1; +} + +#define ERTS_DE_BUSY_LIMIT (128*1024) + +static int +dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy) +{ + Eterm cid; + int suspended = 0; + int resume = 0; + Uint32 pass_through_size; + Uint data_size, dhdr_ext_size; + ErtsAtomCacheMap *acmp; + ErtsDistOutputBuf *obuf; + DistEntry *dep = dsdp->dep; + Uint32 flags = dep->flags; + Process *c_p = dsdp->proc; + + if (!c_p || dsdp->no_suspend) + force_busy = 1; + + ERTS_SMP_LC_ASSERT(!c_p + || (ERTS_PROC_LOCK_MAIN + == erts_proc_lc_my_proc_locks(c_p))); + + if (!erts_is_alive) + return ERTS_DSIG_SEND_OK; + + if (flags & DFLAG_DIST_HDR_ATOM_CACHE) { + acmp = erts_get_atom_cache_map(c_p); + pass_through_size = 0; + } + else { + acmp = NULL; + pass_through_size = 1; + } + +#ifdef ERTS_DIST_MSG_DBG + erts_fprintf(stderr, ">>%s CTL: %T\n", pass_through_size ? "P" : " ", ctl); + if (is_value(msg)) + erts_fprintf(stderr, " MSG: %T\n", msg); +#endif + + data_size = pass_through_size; + erts_reset_atom_cache_map(acmp); + data_size += erts_encode_dist_ext_size(ctl, flags, acmp); + if (is_value(msg)) + data_size += erts_encode_dist_ext_size(msg, flags, acmp); + erts_finalize_atom_cache_map(acmp); + + dhdr_ext_size = erts_encode_ext_dist_header_size(acmp); + data_size += dhdr_ext_size; + + obuf = alloc_dist_obuf(data_size); + obuf->ext_endp = &obuf->data[0] + pass_through_size + dhdr_ext_size; + + /* Encode internal version of dist header */ + obuf->extp = erts_encode_ext_dist_header_setup(obuf->ext_endp, acmp); + /* Encode control message */ + erts_encode_dist_ext(ctl, &obuf->ext_endp, flags, acmp); + if (is_value(msg)) { + /* Encode message */ + erts_encode_dist_ext(msg, &obuf->ext_endp, flags, acmp); + } + + ASSERT(obuf->extp < obuf->ext_endp); + ASSERT(&obuf->data[0] <= obuf->extp - pass_through_size); + ASSERT(obuf->ext_endp <= &obuf->data[0] + data_size); + + data_size = obuf->ext_endp - obuf->extp; + + /* + * Signal encoded; now verify that the connection still exists, + * and if so enqueue the signal and schedule it for send. + */ + obuf->next = NULL; + erts_smp_de_rlock(dep); + cid = dep->cid; + if (cid != dsdp->cid + || dep->connection_id != dsdp->connection_id + || dep->status & ERTS_DE_SFLG_EXITING) { + /* Not the same connection as when we started; drop message... */ + erts_smp_de_runlock(dep); + free_dist_obuf(obuf); + } + else { + ErtsProcList *plp = NULL; + erts_smp_spin_lock(&dep->qlock); + dep->qsize += size_obuf(obuf); + if (dep->qsize >= ERTS_DE_BUSY_LIMIT) + dep->qflgs |= ERTS_DE_QFLG_BUSY; + if (!force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) { + erts_smp_spin_unlock(&dep->qlock); + + plp = erts_proclist_create(c_p); + plp->next = NULL; + erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL); + suspended = 1; + erts_smp_spin_lock(&dep->qlock); + } + + /* Enqueue obuf on dist entry */ + if (dep->out_queue.last) + dep->out_queue.last->next = obuf; + else + dep->out_queue.first = obuf; + dep->out_queue.last = obuf; + + if (!force_busy) { + if (!(dep->qflgs & ERTS_DE_QFLG_BUSY)) { + if (suspended) + resume = 1; /* was busy when we started, but isn't now */ + } + else { + /* Enqueue suspended process on dist entry */ + ASSERT(plp); + if (dep->suspended.last) + dep->suspended.last->next = plp; + else + dep->suspended.first = plp; + dep->suspended.last = plp; + } + } + + erts_smp_spin_unlock(&dep->qlock); + erts_schedule_dist_command(NULL, dep); + erts_smp_de_runlock(dep); + + if (resume) { + erts_resume(c_p, ERTS_PROC_LOCK_MAIN); + erts_proclist_destroy(plp); + /* + * Note that the calling process still have to yield as if it + * suspended. If not, the calling process could later be + * erroneously scheduled when it shouldn't be. + */ + } + } + + if (c_p) { + int reds; + /* + * Bump reductions on calling process. + * + * This is the reduction cost: Always a base cost of 8 reductions + * plus 16 reductions per kilobyte generated external data. + */ + + data_size >>= (10-4); +#if defined(ARCH_64) + data_size &= 0x003fffffffffffff; +#elif defined(ARCH_32) + data_size &= 0x003fffff; +#else +# error "Ohh come on ... !?!" +#endif + reds = 8 + ((int) data_size > 1000000 ? 1000000 : (int) data_size); + BUMP_REDS(c_p, reds); + } + + if (suspended) { + if (!resume && erts_system_monitor_flags.busy_dist_port) + monitor_generic(c_p, am_busy_dist_port, cid); + return ERTS_DSIG_SEND_YIELD; + } + return ERTS_DSIG_SEND_OK; +} + + +static Uint +dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) +{ + int fpe_was_unmasked; + Uint size = obuf->ext_endp - obuf->extp; + + ERTS_SMP_CHK_NO_PROC_LOCKS; + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + + if (size > (Uint) INT_MAX) + erl_exit(ERTS_ABORT_EXIT, + "Absurdly large distribution output data buffer " + "(%bpu bytes) passed.\n", + size); + + prt->caller = NIL; + fpe_was_unmasked = erts_block_fpe(); + (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, + (char*) obuf->extp, + (int) size); + erts_unblock_fpe(fpe_was_unmasked); + return size; +} + +static Uint +dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) +{ + int fpe_was_unmasked; + Uint size = obuf->ext_endp - obuf->extp; + SysIOVec iov[2]; + ErlDrvBinary* bv[2]; + ErlIOVec eiov; + + ERTS_SMP_CHK_NO_PROC_LOCKS; + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + + if (size > (Uint) INT_MAX) + erl_exit(ERTS_ABORT_EXIT, + "Absurdly large distribution output data buffer " + "(%bpu bytes) passed.\n", + size); + + iov[0].iov_base = NULL; + iov[0].iov_len = 0; + bv[0] = NULL; + + iov[1].iov_base = obuf->extp; + iov[1].iov_len = size; + bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); + + eiov.vsize = 2; + eiov.size = size; + eiov.iov = iov; + eiov.binv = bv; + + ASSERT(prt->drv_ptr->outputv); + + prt->caller = NIL; + fpe_was_unmasked = erts_block_fpe(); + (*prt->drv_ptr->outputv)((ErlDrvData) prt->drv_data, &eiov); + erts_unblock_fpe(fpe_was_unmasked); + + return size; +} + + +#if defined(ARCH_64) +#define ERTS_PORT_REDS_MASK__ 0x003fffffffffffffL +#elif defined(ARCH_32) +#define ERTS_PORT_REDS_MASK__ 0x003fffff +#else +# error "Ohh come on ... !?!" +#endif + +#define ERTS_PORT_REDS_DIST_CMD_START 5 +#define ERTS_PORT_REDS_DIST_CMD_FINALIZE 3 +#define ERTS_PORT_REDS_DIST_CMD_EXIT 200 +#define ERTS_PORT_REDS_DIST_CMD_RESUMED 5 +#define ERTS_PORT_REDS_DIST_CMD_DATA(SZ) \ + ((SZ) < (1 << 10) \ + ? ((Sint) 1) \ + : ((((Sint) (SZ)) >> 10) & ((Sint) ERTS_PORT_REDS_MASK__))) + +int +erts_dist_command(Port *prt, int reds_limit) +{ + Sint reds = ERTS_PORT_REDS_DIST_CMD_START; + int prt_busy; + int de_busy; + Uint32 status; + Uint32 flags; + Uint32 qflgs; + Sint obufsize = 0; + ErtsDistOutputQueue oq, foq; + DistEntry *dep = prt->dist_entry; + Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); + + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + + erts_refc_inc(&dep->refc, 1); /* Otherwise dist_entry might be + removed if port command fails */ + + erts_smp_atomic_xchg(&dep->dist_cmd_scheduled, 0); + + erts_smp_de_rlock(dep); + flags = dep->flags; + status = dep->status; + send = dep->send; + erts_smp_de_runlock(dep); + + if (status & ERTS_DE_SFLG_EXITING) { + erts_do_exit_port(prt, prt->id, am_killed); + erts_deref_dist_entry(dep); + return reds + ERTS_PORT_REDS_DIST_CMD_EXIT; + } + + ASSERT(send); + + /* + * We need to remove both out queues from the + * dist entry while passing it to port command; + * otherwise, port command will free the buffers + * in the queues on failure and we'll end up with + * a mess. + */ + + erts_smp_spin_lock(&dep->qlock); + oq.first = dep->out_queue.first; + oq.last = dep->out_queue.last; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + qflgs = dep->qflgs; + erts_smp_spin_unlock(&dep->qlock); + + foq.first = dep->finalized_out_queue.first; + foq.last = dep->finalized_out_queue.last; + dep->finalized_out_queue.first = NULL; + dep->finalized_out_queue.last = NULL; + + if (reds > reds_limit) + goto preempted; + + prt_busy = (int) (prt->status & ERTS_PORT_SFLG_PORT_BUSY); + de_busy = (int) (qflgs & ERTS_DE_QFLG_BUSY); + + if (prt_busy) { + if (!de_busy) { + erts_smp_spin_lock(&dep->qlock); + dep->qflgs |= ERTS_DE_QFLG_BUSY; + erts_smp_spin_unlock(&dep->qlock); + de_busy = 1; + } + } + else if (foq.first) { + int preempt = 0; + do { + Uint size; + ErtsDistOutputBuf *fob; + + size = (*send)(prt, foq.first); +#ifdef ERTS_RAW_DIST_MSG_DBG + erts_fprintf(stderr, ">> "); + bw(foq.first->extp, size); +#endif + reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = foq.first; + obufsize += size_obuf(fob); + foq.first = foq.first->next; + free_dist_obuf(fob); + preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD); + if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) { + erts_smp_spin_lock(&dep->qlock); + dep->qflgs |= ERTS_DE_QFLG_BUSY; + erts_smp_spin_unlock(&dep->qlock); + de_busy = prt_busy = 1; + break; + } + } while (foq.first && !preempt); + if (!foq.first) + foq.last = NULL; + if (preempt) + goto preempted; + } + + if (prt_busy) { + if (oq.first) { + ErtsDistOutputBuf *ob; + int preempt; + finalize_only: + preempt = 0; + ob = oq.first; + ASSERT(ob); + do { + ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, + dep->cache); + if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' + needed */ + ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + preempt = reds > reds_limit; + if (preempt) + break; + ob = ob->next; + } while (ob); + /* + * At least one buffer was finalized; if we got preempted, + * ob points to the last buffer that we finalized. + */ + if (foq.last) + foq.last->next = oq.first; + else + foq.first = oq.first; + if (!preempt) { + /* All buffers finalized */ + foq.last = oq.last; + oq.first = oq.last = NULL; + } + else { + /* Not all buffers finalized; split oq. */ + foq.last = ob; + oq.first = ob->next; + if (oq.first) + ob->next = NULL; + else + oq.last = NULL; + } + if (preempt) + goto preempted; + } + } + else { + int preempt = 0; + while (oq.first && !preempt) { + ErtsDistOutputBuf *fob; + Uint size; + oq.first->extp + = erts_encode_ext_dist_header_finalize(oq.first->extp, + dep->cache); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' + needed */ + ASSERT(&oq.first->data[0] <= oq.first->extp + && oq.first->extp < oq.first->ext_endp); + size = (*send)(prt, oq.first); +#ifdef ERTS_RAW_DIST_MSG_DBG + erts_fprintf(stderr, ">> "); + bw(oq.first->extp, size); +#endif + reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = oq.first; + obufsize += size_obuf(fob); + oq.first = oq.first->next; + free_dist_obuf(fob); + preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD); + if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) { + erts_smp_spin_lock(&dep->qlock); + dep->qflgs |= ERTS_DE_QFLG_BUSY; + erts_smp_spin_unlock(&dep->qlock); + de_busy = prt_busy = 1; + if (oq.first && !preempt) + goto finalize_only; + } + } + + ASSERT(!oq.first || preempt); + + /* + * Preempt if not all buffers have been handled. + */ + if (preempt && oq.first) + goto preempted; + +#ifdef DEBUG + oq.last = NULL; +#endif + ASSERT(!oq.first); + ASSERT(!foq.first && !foq.last); + + /* + * Everything that was buffered when we started have now been + * written to the port. If port isn't busy but dist entry is + * and we havn't got too muched queued on dist entry, set + * dist entry in a non-busy state and resume suspended + * processes. + */ + erts_smp_spin_lock(&dep->qlock); + ASSERT(dep->qsize >= obufsize); + dep->qsize -= obufsize; + obufsize = 0; + if (de_busy && !prt_busy && dep->qsize < ERTS_DE_BUSY_LIMIT) { + ErtsProcList *suspendees; + int resumed; + suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); + erts_smp_spin_unlock(&dep->qlock); + + resumed = erts_resume_processes(suspendees); + reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + de_busy = 0; + } + else + erts_smp_spin_unlock(&dep->qlock); + } + + ASSERT(!oq.first && !oq.last); + + done: + + if (obufsize != 0) { + ASSERT(obufsize > 0); + erts_smp_spin_lock(&dep->qlock); + ASSERT(dep->qsize >= obufsize); + dep->qsize -= obufsize; + erts_smp_spin_unlock(&dep->qlock); + } + + ASSERT(foq.first || !foq.last); + ASSERT(!foq.first || foq.last); + ASSERT(!dep->finalized_out_queue.first); + ASSERT(!dep->finalized_out_queue.last); + + if (foq.first) { + dep->finalized_out_queue.first = foq.first; + dep->finalized_out_queue.last = foq.last; + } + + /* Avoid wrapping reduction counter... */ + if (reds > INT_MAX/2) + reds = INT_MAX/2; + + erts_deref_dist_entry(dep); + + return reds; + + preempted: + + ASSERT(oq.first || !oq.last); + ASSERT(!oq.first || oq.last); + + if (prt->status & ERTS_PORT_SFLGS_DEAD) { + /* + * Port died during port command; clean up 'oq' + * and 'foq'. Things buffered in dist entry after + * we begun processing the queues have already been + * cleaned up when port terminated. + */ + + if (oq.first) + oq.last->next = foq.first; + else + oq.first = foq.first; + + while (oq.first) { + ErtsDistOutputBuf *fob = oq.first; + oq.first = oq.first->next; + obufsize += size_obuf(fob); + free_dist_obuf(fob); + } + + foq.first = NULL; + foq.last = NULL; + +#ifdef DEBUG + erts_smp_spin_lock(&dep->qlock); + ASSERT(dep->qsize == obufsize); + erts_smp_spin_unlock(&dep->qlock); +#endif + } + else { + if (oq.first) { + /* + * Unhandle buffers need to be put back first + * in out_queue. + */ + erts_smp_spin_lock(&dep->qlock); + dep->qsize -= obufsize; + obufsize = 0; + oq.last->next = dep->out_queue.first; + dep->out_queue.first = oq.first; + if (!dep->out_queue.last) + dep->out_queue.last = oq.last; + erts_smp_spin_unlock(&dep->qlock); + } + + erts_schedule_dist_command(prt, NULL); + } + goto done; +} + +void +erts_dist_port_not_busy(Port *prt) +{ + erts_schedule_dist_command(prt, NULL); +} + +void +erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) +{ + erts_smp_de_rwlock(dep); + if (is_internal_port(dep->cid) + && connection_id == dep->connection_id + && !(dep->status & ERTS_DE_SFLG_EXITING)) { + + dep->status |= ERTS_DE_SFLG_EXITING; + + erts_smp_spin_lock(&dep->qlock); + ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT)); + dep->qflgs |= ERTS_DE_QFLG_EXIT; + erts_smp_spin_unlock(&dep->qlock); + + erts_schedule_dist_command(NULL, dep); + } + erts_smp_de_rwunlock(dep); +} + +struct print_to_data { + int to; + void *arg; +}; + +static void doit_print_monitor_info(ErtsMonitor *mon, void *vptdp) +{ + int to = ((struct print_to_data *) vptdp)->to; + void *arg = ((struct print_to_data *) vptdp)->arg; + Process *rp; + ErtsMonitor *rmon; + rp = erts_pid2proc_unlocked(mon->pid); + if (!rp || (rmon = erts_lookup_monitor(rp->monitors, mon->ref)) == NULL) { + erts_print(to, arg, "Warning, stray monitor for: %T\n", mon->pid); + } else if (mon->type == MON_ORIGIN) { + /* Local pid is being monitored */ + erts_print(to, arg, "Remotely monitored by: %T %T\n", + mon->pid, rmon->pid); + } else { + erts_print(to, arg, "Remote monitoring: %T ", mon->pid); + if (is_not_atom(rmon->pid)) + erts_print(to, arg, "%T\n", rmon->pid); + else + erts_print(to, arg, "{%T, %T}\n", + rmon->name, + rmon->pid); /* which in this case is the + remote system name... */ + } +} + +static void print_monitor_info(int to, void *arg, ErtsMonitor *mon) +{ + struct print_to_data ptd = {to, arg}; + erts_doforall_monitors(mon,&doit_print_monitor_info,&ptd); +} + +typedef struct { + struct print_to_data *ptdp; + Eterm from; +} PrintLinkContext; + +static void doit_print_link_info2(ErtsLink *lnk, void *vpplc) +{ + PrintLinkContext *pplc = (PrintLinkContext *) vpplc; + erts_print(pplc->ptdp->to, pplc->ptdp->arg, "Remote link: %T %T\n", + pplc->from, lnk->pid); +} + +static void doit_print_link_info(ErtsLink *lnk, void *vptdp) +{ + if (is_internal_pid(lnk->pid) && erts_pid2proc_unlocked(lnk->pid)) { + PrintLinkContext plc = {(struct print_to_data *) vptdp, lnk->pid}; + erts_doforall_links(ERTS_LINK_ROOT(lnk), &doit_print_link_info2, &plc); + } +} + +static void print_link_info(int to, void *arg, ErtsLink *lnk) +{ + struct print_to_data ptd = {to, arg}; + erts_doforall_links(lnk, &doit_print_link_info, (void *) &ptd); +} + +typedef struct { + struct print_to_data ptd; + Eterm sysname; +} PrintNodeLinkContext; + + +static void doit_print_nodelink_info(ErtsLink *lnk, void *vpcontext) +{ + PrintNodeLinkContext *pcontext = vpcontext; + + if (is_internal_pid(lnk->pid) && erts_pid2proc_unlocked(lnk->pid)) + erts_print(pcontext->ptd.to, pcontext->ptd.arg, + "Remote monitoring: %T %T\n", lnk->pid, pcontext->sysname); +} + +static void print_nodelink_info(int to, void *arg, ErtsLink *lnk, Eterm sysname) +{ + PrintNodeLinkContext context = {{to, arg}, sysname}; + erts_doforall_links(lnk, &doit_print_nodelink_info, &context); +} + + +static int +info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected) +{ + + if (visible && connected) { + erts_print(to, arg, "=visible_node:"); + } else if (connected) { + erts_print(to, arg, "=hidden_node:"); + } else { + erts_print(to, arg, "=not_connected:"); + } + erts_print(to, arg, "%d\n", dist_entry_channel_no(dep)); + + if(connected && is_nil(dep->cid)) { + erts_print(to, arg, + "Error: Not connected node still registered as connected:%T\n", + dep->sysname); + return 0; + } + + if(!connected && is_not_nil(dep->cid)) { + erts_print(to, arg, + "Error: Connected node not registered as connected:%T\n", + dep->sysname); + return 0; + } + + erts_print(to, arg, "Name: %T", dep->sysname); +#ifdef DEBUG + erts_print(to, arg, " (refc=%d)", erts_refc_read(&dep->refc, 1)); +#endif + erts_print(to, arg, "\n"); + if (!connected && is_nil(dep->cid)) { + if (dep->nlinks) { + erts_print(to, arg, "Error: Got links to not connected node:%T\n", + dep->sysname); + } + return 0; + } + + erts_print(to, arg, "Controller: %T\n", dep->cid, to); + + erts_print_node_info(to, arg, dep->sysname, NULL, NULL); + print_monitor_info(to, arg, dep->monitors); + print_link_info(to, arg, dep->nlinks); + print_nodelink_info(to, arg, dep->node_links, dep->sysname); + + return 0; + +} +int distribution_info(int to, void *arg) /* Called by break handler */ +{ + DistEntry *dep; + + erts_print(to, arg, "=node:%T\n", erts_this_dist_entry->sysname); + + if (erts_this_node->sysname == am_Noname) { + erts_print(to, arg, "=no_distribution\n"); + return(0); + } + +#if 0 + if (!erts_visible_dist_entries && !erts_hidden_dist_entries) + erts_print(to, arg, "Alive but not holding any connections \n"); +#endif + + for(dep = erts_visible_dist_entries; dep; dep = dep->next) { + info_dist_entry(to, arg, dep, 1, 1); + } + + for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { + info_dist_entry(to, arg, dep, 0, 1); + } + + for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) { + info_dist_entry(to, arg, dep, 0, 0); + } + + return(0); +} + +/**************************************************************************** + DISTRIBUTION BIFS: + + setnode/2 -- start distribution + setnode/3 -- set node controller + + node/1 -- return objects node name + node/0 -- return this node name + nodes/0 -- return a list of all (non hidden) nodes + is_alive -- return true if distribution is running else false + monitor_node -- turn on/off node monitoring + + node controller only: + dist_exit/3 -- send exit signals from remote to local process + dist_link/2 -- link a remote process to a local + dist_unlink/2 -- unlink a remote from a local +****************************************************************************/ + + + +/********************************************************************** + ** Set the node name of current node fail if node already is set. + ** setnode(name@host, Creation) + ** loads functions pointer to trap_functions from module erlang. + ** erlang:dsend/2 + ** erlang:dlink/1 + ** erlang:dunlink/1 + ** erlang:dmonitor_node/3 + ** erlang:dgroup_leader/2 + ** erlang:dexit/2 + ** -- are these needed ? + ** dexit/1 + ***********************************************************************/ + +BIF_RETTYPE setnode_2(BIF_ALIST_2) +{ + Process *net_kernel; + Uint creation; + + /* valid creation ? */ + if(!term_to_Uint(BIF_ARG_2, &creation)) + goto error; + if(creation > 3) + goto error; + + /* valid node name ? */ + if (!is_node_name_atom(BIF_ARG_1)) + goto error; + + if (BIF_ARG_1 == am_Noname) /* cant use this name !! */ + goto error; + if (erts_is_alive) /* must not be alive! */ + goto error; + + /* Check that all trap functions are defined !! */ + if (dsend2_trap->address == NULL || + dsend3_trap->address == NULL || + /* dsend_nosuspend_trap->address == NULL ||*/ + dlink_trap->address == NULL || + dunlink_trap->address == NULL || + dmonitor_node_trap->address == NULL || + dgroup_leader_trap->address == NULL || + dmonitor_p_trap->address == NULL || + dexit_trap->address == NULL) { + goto error; + } + + net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN, + am_net_kernel, ERTS_PROC_LOCK_MAIN, 0); + if (!net_kernel) + goto error; + + /* By setting dist_entry==erts_this_dist_entry and DISTRIBUTION on + net_kernel do_net_exist will be called when net_kernel + is terminated !! */ + (void *) ERTS_PROC_SET_DIST_ENTRY(net_kernel, + ERTS_PROC_LOCK_MAIN, + erts_this_dist_entry); + erts_refc_inc(&erts_this_dist_entry->refc, 2); + net_kernel->flags |= F_DISTRIBUTION; + + if (net_kernel != BIF_P) + erts_smp_proc_unlock(net_kernel, ERTS_PROC_LOCK_MAIN); + +#ifdef DEBUG + erts_smp_rwmtx_rwlock(&erts_dist_table_rwmtx); + ASSERT(!erts_visible_dist_entries && !erts_hidden_dist_entries); + erts_smp_rwmtx_rwunlock(&erts_dist_table_rwmtx); +#endif + + erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); + erts_smp_block_system(ERTS_BS_FLG_ALLOW_GC); + erts_set_this_node(BIF_ARG_1, (Uint32) creation); + erts_is_alive = 1; + send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, am_visible, NIL); + erts_smp_release_system(); + erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + + BIF_RET(am_true); + + error: + BIF_ERROR(BIF_P, BADARG); +} + +/********************************************************************** + ** Allocate a dist entry, set node name install the connection handler + ** setnode_3({name@host, Creation}, Cid, {Type, Version, Initial, IC, OC}) + ** Type = flag field, where the flags are specified in dist.h + ** Version = distribution version, >= 1 + ** IC = in_cookie (ignored) + ** OC = out_cookie (ignored) + ** + ** Note that in distribution protocols above 1, the Initial parameter + ** is always NIL and the cookies are always the atom '', cookies are not + ** sent in the distribution messages but are only used in + ** the handshake. + ** + ***********************************************************************/ + +BIF_RETTYPE setnode_3(BIF_ALIST_3) +{ + BIF_RETTYPE ret; + Uint flags; + unsigned long version; + Eterm ic, oc; + Eterm *tp; + DistEntry *dep = NULL; + Port *pp = NULL; + + /* Prepare for success */ + ERTS_BIF_PREP_RET(ret, am_true); + + /* + * Check and pick out arguments + */ + + if (!is_node_name_atom(BIF_ARG_1) || + is_not_internal_port(BIF_ARG_2) || + (erts_this_node->sysname == am_Noname)) { + goto badarg; + } + + if (!is_tuple(BIF_ARG_3)) + goto badarg; + tp = tuple_val(BIF_ARG_3); + if (*tp++ != make_arityval(4)) + goto badarg; + if (!is_small(*tp)) + goto badarg; + flags = unsigned_val(*tp++); + if (!is_small(*tp) || (version = unsigned_val(*tp)) == 0) + goto badarg; + ic = *(++tp); + oc = *(++tp); + if (!is_atom(ic) || !is_atom(oc)) + goto badarg; + + /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */ + if (!(DFLAG_EXTENDED_REFERENCES & flags)) { + erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); + erts_dsprintf(dsbufp, "%T", BIF_P->id); + if (BIF_P->reg) + erts_dsprintf(dsbufp, " (%T)", BIF_P->reg->name); + erts_dsprintf(dsbufp, + " attempted to enable connection to node %T " + "which is not able to handle extended references.\n", + BIF_ARG_1); + erts_send_error_to_logger(BIF_P->group_leader, dsbufp); + goto badarg; + } + + /* + * Arguments seem to be in order. + */ + + /* get dist_entry */ + dep = erts_find_or_insert_dist_entry(BIF_ARG_1); + if (dep == erts_this_dist_entry) + goto badarg; + else if (!dep) + goto system_limit; /* Should never happen!!! */ + + pp = erts_id2port(BIF_ARG_2, BIF_P, ERTS_PROC_LOCK_MAIN); + erts_smp_de_rwlock(dep); + + if (!pp || (pp->status & ERTS_PORT_SFLG_EXITING)) + goto badarg; + + if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) + goto badarg; + + if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) + goto done; /* Already set */ + + if (dep->status & ERTS_DE_SFLG_EXITING) { + /* Suspend on dist entry waiting for the exit to finish */ + ErtsProcList *plp = erts_proclist_create(BIF_P); + plp->next = NULL; + erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); + erts_smp_spin_lock(&dep->qlock); + if (dep->suspended.last) + dep->suspended.last->next = plp; + else + dep->suspended.first = plp; + dep->suspended.last = plp; + erts_smp_spin_unlock(&dep->qlock); + goto yield; + } + + ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); + + if (pp->dist_entry || is_not_nil(dep->cid)) + goto badarg; + + erts_port_status_bor_set(pp, ERTS_PORT_SFLG_DISTRIBUTION); + + pp->dist_entry = dep; + + dep->version = version; + dep->creation = 0; + + ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); + +#if 1 + dep->send = (pp->drv_ptr->outputv + ? dist_port_commandv + : dist_port_command); +#else + dep->send = dist_port_command; +#endif + ASSERT(dep->send); + +#ifdef DEBUG + erts_smp_spin_lock(&dep->qlock); + ASSERT(dep->qsize == 0); + erts_smp_spin_unlock(&dep->qlock); +#endif + + erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); + + if (flags & DFLAG_DIST_HDR_ATOM_CACHE) + create_cache(dep); + + erts_smp_de_rwunlock(dep); + dep = NULL; /* inc of refc transferred to port (dist_entry field) */ + + send_nodes_mon_msgs(BIF_P, + am_nodeup, + BIF_ARG_1, + flags & DFLAG_PUBLISHED ? am_visible : am_hidden, + NIL); + done: + + if (dep && dep != erts_this_dist_entry) { + erts_smp_de_rwunlock(dep); + erts_deref_dist_entry(dep); + } + + if (pp) + erts_smp_port_unlock(pp); + + return ret; + + yield: + ERTS_BIF_PREP_YIELD3(ret, bif_export[BIF_setnode_3], BIF_P, + BIF_ARG_1, BIF_ARG_2, BIF_ARG_3); + goto done; + + badarg: + ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG); + goto done; + + system_limit: + ERTS_BIF_PREP_ERROR(ret, BIF_P, SYSTEM_LIMIT); + goto done; +} + + +/**********************************************************************/ +/* dist_exit(Local, Term, Remote) -> Bool */ + +BIF_RETTYPE dist_exit_3(BIF_ALIST_3) +{ + Eterm local; + Eterm remote; + DistEntry *rdep; + + local = BIF_ARG_1; + remote = BIF_ARG_3; + + /* Check that remote is a remote process */ + if (is_not_external_pid(remote)) + goto error; + + rdep = external_dist_entry(remote); + + if(rdep == erts_this_dist_entry) + goto error; + + /* Check that local is local */ + if (is_internal_pid(local)) { + Process *lp; + ErtsProcLocks lp_locks; + if (BIF_P->id == local) { + lp_locks = ERTS_PROC_LOCKS_ALL; + lp = BIF_P; + erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR); + } + else { + lp_locks = ERTS_PROC_LOCKS_XSIG_SEND; + lp = erts_pid2proc_opt(BIF_P, ERTS_PROC_LOCK_MAIN, + local, lp_locks, + ERTS_P2P_FLG_SMP_INC_REFC); + if (!lp) { + BIF_RET(am_true); /* ignore */ + } + } + + (void) erts_send_exit_signal(BIF_P, + remote, + lp, + &lp_locks, + BIF_ARG_2, + NIL, + NULL, + 0); +#ifdef ERTS_SMP + if (lp == BIF_P) + lp_locks &= ~ERTS_PROC_LOCK_MAIN; +#endif + erts_smp_proc_unlock(lp, lp_locks); + if (lp != BIF_P) + erts_smp_proc_dec_refc(lp); + else { + /* + * We may have exited current process and may have to take action. + */ + ERTS_BIF_CHK_EXITED(BIF_P); + ERTS_SMP_BIF_CHK_PENDING_EXIT(BIF_P, ERTS_PROC_LOCK_MAIN); + } + } + else if (is_external_pid(local) + && external_dist_entry(local) == erts_this_dist_entry) { + BIF_RET(am_true); /* ignore */ + } + else + goto error; + BIF_RET(am_true); + + error: + BIF_ERROR(BIF_P, BADARG); +} + +/**********************************************************************/ +/* node(Object) -> Node */ + +BIF_RETTYPE node_1(BIF_ALIST_1) +{ + if (is_not_node_container(BIF_ARG_1)) + BIF_ERROR(BIF_P, BADARG); + BIF_RET(node_container_node_name(BIF_ARG_1)); +} + +/**********************************************************************/ +/* node() -> Node */ + +BIF_RETTYPE node_0(BIF_ALIST_0) +{ + BIF_RET(erts_this_dist_entry->sysname); +} + + +/**********************************************************************/ +/* nodes() -> [ Node ] */ + +#if 0 /* Done in erlang.erl instead. */ +BIF_RETTYPE nodes_0(BIF_ALIST_0) +{ + return nodes_1(BIF_P, am_visible); +} +#endif + + +BIF_RETTYPE nodes_1(BIF_ALIST_1) +{ + Eterm result; + int length; + Eterm* hp; + int not_connected = 0; + int visible = 0; + int hidden = 0; + int this = 0; + Uint buf[2]; /* For one cons-cell */ + DistEntry *dep; + Eterm arg_list = BIF_ARG_1; +#ifdef DEBUG + Eterm* endp; +#endif + if (is_atom(BIF_ARG_1)) + arg_list = CONS(buf, BIF_ARG_1, NIL); + + while (is_list(arg_list)) { + switch(CAR(list_val(arg_list))) { + case am_visible: visible = 1; break; + case am_hidden: hidden = 1; break; + case am_known: visible = hidden = not_connected = this = 1; break; + case am_this: this = 1; break; + case am_connected: visible = hidden = 1; break; + default: BIF_ERROR(BIF_P, BADARG); break; + } + arg_list = CDR(list_val(arg_list)); + } + + if (is_not_nil(arg_list)) + BIF_ERROR(BIF_P, BADARG); + + length = 0; + + erts_smp_rwmtx_rwlock(&erts_dist_table_rwmtx); + + ASSERT(erts_no_of_not_connected_dist_entries >= 0); + ASSERT(erts_no_of_hidden_dist_entries >= 0); + ASSERT(erts_no_of_visible_dist_entries >= 0); + if(not_connected) + length += erts_no_of_not_connected_dist_entries; + if(hidden) + length += erts_no_of_hidden_dist_entries; + if(visible) + length += erts_no_of_visible_dist_entries; + if(this) + length++; + + result = NIL; + + if (length == 0) { + erts_smp_rwmtx_rwunlock(&erts_dist_table_rwmtx); + BIF_RET(result); + } + + hp = HAlloc(BIF_P, 2*length); + +#ifdef DEBUG + endp = hp + length*2; +#endif + if(not_connected) + for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { + result = CONS(hp, dep->sysname, result); + hp += 2; + } + if(hidden) + for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { + result = CONS(hp, dep->sysname, result); + hp += 2; + } + if(visible) + for(dep = erts_visible_dist_entries; dep; dep = dep->next) { + result = CONS(hp, dep->sysname, result); + hp += 2; + } + if(this) { + result = CONS(hp, erts_this_dist_entry->sysname, result); + hp += 2; + } + ASSERT(endp == hp); + erts_smp_rwmtx_rwunlock(&erts_dist_table_rwmtx); + BIF_RET(result); +} + +/**********************************************************************/ +/* is_alive() -> Bool */ + +BIF_RETTYPE is_alive_0(BIF_ALIST_0) +{ + Eterm res = erts_is_alive ? am_true : am_false; + BIF_RET(res); +} + +/**********************************************************************/ +/* erlang:monitor_node(Node, Bool, Options) -> Bool */ + +BIF_RETTYPE monitor_node_3(BIF_ALIST_3) +{ + DistEntry *dep; + ErtsLink *lnk; + Eterm l; + + for (l = BIF_ARG_3; l != NIL && is_list(l); l = CDR(list_val(l))) { + Eterm t = CAR(list_val(l)); + /* allow_passive_connect the only available option right now */ + if (t != am_allow_passive_connect) { + BIF_ERROR(BIF_P, BADARG); + } + } + if (l != NIL) { + BIF_ERROR(BIF_P, BADARG); + } + + if (is_not_atom(BIF_ARG_1) || + ((BIF_ARG_2 != am_true) && (BIF_ARG_2 != am_false)) || + ((erts_this_node->sysname == am_Noname) + && (BIF_ARG_1 != erts_this_node->sysname))) { + BIF_ERROR(BIF_P, BADARG); + } + dep = erts_sysname_to_connected_dist_entry(BIF_ARG_1); + if (!dep) { + do_trap: + BIF_TRAP3(dmonitor_node_trap, BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3); + } + if (dep == erts_this_dist_entry) + goto done; + + erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_LINK); + erts_smp_de_rlock(dep); + if (ERTS_DE_IS_NOT_CONNECTED(dep)) { + erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK); + erts_smp_de_runlock(dep); + goto do_trap; + } + erts_smp_de_links_lock(dep); + erts_smp_de_runlock(dep); + + if (BIF_ARG_2 == am_true) { + ASSERT(dep->cid != NIL); + lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, + BIF_P->id); + ++ERTS_LINK_REFC(lnk); + lnk = erts_add_or_lookup_link(&(BIF_P->nlinks), LINK_NODE, BIF_ARG_1); + ++ERTS_LINK_REFC(lnk); + } + else { + lnk = erts_lookup_link(dep->node_links, BIF_P->id); + if (lnk != NULL) { + if ((--ERTS_LINK_REFC(lnk)) == 0) { + erts_destroy_link(erts_remove_link(&(dep->node_links), + BIF_P->id)); + } + } + lnk = erts_lookup_link(BIF_P->nlinks, BIF_ARG_1); + if (lnk != NULL) { + if ((--ERTS_LINK_REFC(lnk)) == 0) { + erts_destroy_link(erts_remove_link(&(BIF_P->nlinks), + BIF_ARG_1)); + } + } + } + + erts_smp_de_links_unlock(dep); + erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK); + + done: + erts_deref_dist_entry(dep); + BIF_RET(am_true); +} + +/* monitor_node(Node, Bool) -> Bool */ + +BIF_RETTYPE monitor_node_2(BIF_ALIST_2) +{ + BIF_RET(monitor_node_3(BIF_P,BIF_ARG_1,BIF_ARG_2,NIL)); +} + +BIF_RETTYPE net_kernel_dflag_unicode_io_1(BIF_ALIST_1) +{ + DistEntry *de; + Uint32 f; + if (is_not_pid(BIF_ARG_1)) { + BIF_ERROR(BIF_P,BADARG); + } + de = pid_dist_entry(BIF_ARG_1); + ASSERT(de != NULL); + if (de == erts_this_dist_entry) { + BIF_RET(am_true); + } + erts_smp_de_rlock(de); + f = de->flags; + erts_smp_de_runlock(de); + BIF_RET(((f & DFLAG_UNICODE_IO) ? am_true : am_false)); +} + +/* + * The major part of the implementation of net_kernel:monitor_nodes/[1,2] + * follows. + * + * Currently net_kernel:monitor_nodes/[1,2] calls process_flag/2 which in + * turn calls erts_monitor_nodes(). If the process_flag() call fails (with + * badarg), the code in net_kernel determines what type of error to return. + * This in order to simplify the task of being backward compatible. + */ + +#define ERTS_NODES_MON_OPT_TYPE_VISIBLE (((Uint16) 1) << 0) +#define ERTS_NODES_MON_OPT_TYPE_HIDDEN (((Uint16) 1) << 1) +#define ERTS_NODES_MON_OPT_DOWN_REASON (((Uint16) 1) << 2) + +#define ERTS_NODES_MON_OPT_TYPES \ + (ERTS_NODES_MON_OPT_TYPE_VISIBLE|ERTS_NODES_MON_OPT_TYPE_HIDDEN) + +typedef struct ErtsNodesMonitor_ ErtsNodesMonitor; +struct ErtsNodesMonitor_ { + ErtsNodesMonitor *prev; + ErtsNodesMonitor *next; + Process *proc; + Uint16 opts; + Uint16 no; +}; + +static erts_smp_mtx_t nodes_monitors_mtx; +static ErtsNodesMonitor *nodes_monitors; +static ErtsNodesMonitor *nodes_monitors_end; + +/* + * Nodes monitors are stored in a double linked list. 'nodes_monitors' + * points to the beginning of the list and 'nodes_monitors_end' points + * to the end of the list. + * + * There might be more than one entry per process in the list. If so, + * they are located in sequence. The 'nodes_monitors' field of the + * process struct refers to the first element in the sequence + * corresponding to the process in question. + */ + +static void +init_nodes_monitors(void) +{ + erts_smp_mtx_init(&nodes_monitors_mtx, "nodes_monitors"); + nodes_monitors = NULL; + nodes_monitors_end = NULL; +} + +static ERTS_INLINE Uint +nodes_mon_msg_sz(ErtsNodesMonitor *nmp, Eterm what, Eterm reason) +{ + Uint sz; + if (!nmp->opts) { + sz = 3; + } + else { + sz = 0; + + if (nmp->opts & ERTS_NODES_MON_OPT_TYPES) + sz += 2 + 3; + + if (what == am_nodedown + && (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON)) { + if (is_not_immed(reason)) + sz += size_object(reason); + sz += 2 + 3; + } + + sz += 4; + } + return sz; +} + +static ERTS_INLINE void +send_nodes_mon_msg(Process *rp, + ErtsProcLocks *rp_locksp, + ErtsNodesMonitor *nmp, + Eterm node, + Eterm what, + Eterm type, + Eterm reason, + Uint sz) +{ + Eterm msg; + ErlHeapFragment* bp; + ErlOffHeap *ohp; + Eterm *hp = erts_alloc_message_heap(sz, &bp, &ohp, rp, rp_locksp); +#ifdef DEBUG + Eterm *hend = hp + sz; +#endif + + if (!nmp->opts) { + msg = TUPLE2(hp, what, node); +#ifdef DEBUG + hp += 3; +#endif + } + else { + Eterm tup; + Eterm info = NIL; + + if (nmp->opts & (ERTS_NODES_MON_OPT_TYPE_VISIBLE + | ERTS_NODES_MON_OPT_TYPE_HIDDEN)) { + + tup = TUPLE2(hp, am_node_type, type); + hp += 3; + info = CONS(hp, tup, info); + hp += 2; + } + + if (what == am_nodedown + && (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON)) { + Eterm rsn_cpy; + + if (is_immed(reason)) + rsn_cpy = reason; + else { + Eterm rsn_sz = size_object(reason); + rsn_cpy = copy_struct(reason, rsn_sz, &hp, ohp); + } + + tup = TUPLE2(hp, am_nodedown_reason, rsn_cpy); + hp += 3; + info = CONS(hp, tup, info); + hp += 2; + } + + msg = TUPLE3(hp, what, node, info); +#ifdef DEBUG + hp += 4; +#endif + } + + ASSERT(hend == hp); + erts_queue_message(rp, rp_locksp, bp, msg, NIL); +} + +static void +send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reason) +{ + ErtsNodesMonitor *nmp; + ErtsProcLocks rp_locks = 0; /* Init to shut up false warning */ + Process *rp = NULL; + + ASSERT(is_immed(what)); + ASSERT(is_immed(node)); + ASSERT(is_immed(type)); + + ERTS_SMP_LC_ASSERT(!c_p + || (erts_proc_lc_my_proc_locks(c_p) + == ERTS_PROC_LOCK_MAIN)); + erts_smp_mtx_lock(&nodes_monitors_mtx); + + for (nmp = nodes_monitors; nmp; nmp = nmp->next) { + int i; + Uint16 no; + Uint sz; + + ASSERT(nmp->proc != NULL); + + if (!nmp->opts) { + if (type != am_visible) + continue; + } + else { + switch (type) { + case am_hidden: + if (!(nmp->opts & ERTS_NODES_MON_OPT_TYPE_HIDDEN)) + continue; + break; + case am_visible: + if ((nmp->opts & ERTS_NODES_MON_OPT_TYPES) + && !(nmp->opts & ERTS_NODES_MON_OPT_TYPE_VISIBLE)) + continue; + break; + default: + erl_exit(ERTS_ABORT_EXIT, "Bad node type found\n"); + } + } + + if (rp != nmp->proc) { + if (rp) { + if (rp == c_p) + rp_locks &= ~ERTS_PROC_LOCK_MAIN; + erts_smp_proc_unlock(rp, rp_locks); + } + + rp = nmp->proc; + rp_locks = 0; + if (rp == c_p) + rp_locks |= ERTS_PROC_LOCK_MAIN; + } + + ASSERT(rp); + + sz = nodes_mon_msg_sz(nmp, what, reason); + + for (i = 0, no = nmp->no; i < no; i++) + send_nodes_mon_msg(rp, + &rp_locks, + nmp, + node, + what, + type, + reason, + sz); + } + + if (rp) { + if (rp == c_p) + rp_locks &= ~ERTS_PROC_LOCK_MAIN; + erts_smp_proc_unlock(rp, rp_locks); + } + + erts_smp_mtx_unlock(&nodes_monitors_mtx); +} + +static Eterm +insert_nodes_monitor(Process *c_p, Uint32 opts) +{ + Uint16 no = 1; + Eterm res = am_false; + ErtsNodesMonitor *xnmp, *nmp; + + ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&nodes_monitors_mtx)); + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN); + + xnmp = c_p->nodes_monitors; + if (xnmp) { + ASSERT(!xnmp->prev || xnmp->prev->proc != c_p); + + while (1) { + ASSERT(xnmp->proc == c_p); + if (xnmp->opts == opts) + break; + if (!xnmp->next || xnmp->next->proc != c_p) + break; + xnmp = xnmp->next; + } + ASSERT(xnmp); + ASSERT(xnmp->proc == c_p); + ASSERT(xnmp->opts == opts + || !xnmp->next + || xnmp->next->proc != c_p); + + if (xnmp->opts != opts) + goto alloc_new; + else { + res = am_true; + no = xnmp->no++; + if (!xnmp->no) { + /* + * 'no' wrapped; transfer all prevous monitors to new + * element (which will be the next element in the list) + * and set this to one... + */ + xnmp->no = 1; + goto alloc_new; + } + } + } + else { + alloc_new: + nmp = erts_alloc(ERTS_ALC_T_NODES_MON, sizeof(ErtsNodesMonitor)); + nmp->proc = c_p; + nmp->opts = opts; + nmp->no = no; + + if (xnmp) { + ASSERT(nodes_monitors); + ASSERT(c_p->nodes_monitors); + nmp->next = xnmp->next; + nmp->prev = xnmp; + xnmp->next = nmp; + if (nmp->next) { + ASSERT(nodes_monitors_end != xnmp); + ASSERT(nmp->next->prev == xnmp); + nmp->next->prev = nmp; + } + else { + ASSERT(nodes_monitors_end == xnmp); + nodes_monitors_end = nmp; + } + } + else { + ASSERT(!c_p->nodes_monitors); + c_p->nodes_monitors = nmp; + nmp->next = NULL; + nmp->prev = nodes_monitors_end; + if (nodes_monitors_end) { + ASSERT(nodes_monitors); + nodes_monitors_end->next = nmp; + } + else { + ASSERT(!nodes_monitors); + nodes_monitors = nmp; + } + nodes_monitors_end = nmp; + } + } + return res; +} + +static Eterm +remove_nodes_monitors(Process *c_p, Uint32 opts, int all) +{ + Eterm res = am_false; + ErtsNodesMonitor *nmp; + + ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&nodes_monitors_mtx)); + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN); + + nmp = c_p->nodes_monitors; + ASSERT(!nmp || !nmp->prev || nmp->prev->proc != c_p); + + while (nmp && nmp->proc == c_p) { + if (!all && nmp->opts != opts) + nmp = nmp->next; + else { /* if (all || nmp->opts == opts) */ + ErtsNodesMonitor *free_nmp; + res = am_true; + if (nmp->prev) { + ASSERT(nodes_monitors != nmp); + nmp->prev->next = nmp->next; + } + else { + ASSERT(nodes_monitors == nmp); + nodes_monitors = nmp->next; + } + if (nmp->next) { + ASSERT(nodes_monitors_end != nmp); + nmp->next->prev = nmp->prev; + } + else { + ASSERT(nodes_monitors_end == nmp); + nodes_monitors_end = nmp->prev; + } + free_nmp = nmp; + nmp = nmp->next; + if (c_p->nodes_monitors == free_nmp) + c_p->nodes_monitors = nmp && nmp->proc == c_p ? nmp : NULL; + erts_free(ERTS_ALC_T_NODES_MON, free_nmp); + } + } + + ASSERT(!all || !c_p->nodes_monitors); + return res; +} + +void +erts_delete_nodes_monitors(Process *c_p, ErtsProcLocks locks) +{ +#if defined(ERTS_ENABLE_LOCK_CHECK) && defined(ERTS_SMP) + if (c_p) { + ErtsProcLocks might_unlock = locks & ~ERTS_PROC_LOCK_MAIN; + if (might_unlock) + erts_proc_lc_might_unlock(c_p, might_unlock); + } +#endif + if (erts_smp_mtx_trylock(&nodes_monitors_mtx) == EBUSY) { + ErtsProcLocks unlock_locks = locks & ~ERTS_PROC_LOCK_MAIN; + if (c_p && unlock_locks) + erts_smp_proc_unlock(c_p, unlock_locks); + erts_smp_mtx_lock(&nodes_monitors_mtx); + if (c_p && unlock_locks) + erts_smp_proc_lock(c_p, unlock_locks); + } + remove_nodes_monitors(c_p, 0, 1); + erts_smp_mtx_unlock(&nodes_monitors_mtx); +} + +Eterm +erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist) +{ + Eterm res; + Eterm opts_list = olist; + Uint16 opts = (Uint16) 0; + + ASSERT(c_p); + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); + + if (on != am_true && on != am_false) + return THE_NON_VALUE; + + if (is_not_nil(opts_list)) { + int all = 0, visible = 0, hidden = 0; + + while (is_list(opts_list)) { + Eterm *cp = list_val(opts_list); + Eterm opt = CAR(cp); + opts_list = CDR(cp); + if (opt == am_nodedown_reason) + opts |= ERTS_NODES_MON_OPT_DOWN_REASON; + else if (is_tuple(opt)) { + Eterm* tp = tuple_val(opt); + if (arityval(tp[0]) != 2) + return THE_NON_VALUE; + switch (tp[1]) { + case am_node_type: + switch (tp[2]) { + case am_visible: + if (hidden || all) + return THE_NON_VALUE; + opts |= ERTS_NODES_MON_OPT_TYPE_VISIBLE; + visible = 1; + break; + case am_hidden: + if (visible || all) + return THE_NON_VALUE; + opts |= ERTS_NODES_MON_OPT_TYPE_HIDDEN; + hidden = 1; + break; + case am_all: + if (visible || hidden) + return THE_NON_VALUE; + opts |= ERTS_NODES_MON_OPT_TYPES; + all = 1; + break; + default: + return THE_NON_VALUE; + } + break; + default: + return THE_NON_VALUE; + } + } + else { + return THE_NON_VALUE; + } + } + + if (is_not_nil(opts_list)) + return THE_NON_VALUE; + } + + erts_smp_mtx_lock(&nodes_monitors_mtx); + + if (on == am_true) + res = insert_nodes_monitor(c_p, opts); + else + res = remove_nodes_monitors(c_p, opts, 0); + + erts_smp_mtx_unlock(&nodes_monitors_mtx); + + return res; +} + +/* + * Note, this function is only used for debuging. + */ + +Eterm +erts_processes_monitoring_nodes(Process *c_p) +{ + ErtsNodesMonitor *nmp; + Eterm res; + Eterm *hp; + Eterm **hpp; + Uint sz; + Uint *szp; +#ifdef DEBUG + Eterm *hend; +#endif + + ASSERT(c_p); + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); + erts_smp_mtx_lock(&nodes_monitors_mtx); + + sz = 0; + szp = &sz; + hpp = NULL; + + bld_result: + res = NIL; + + for (nmp = nodes_monitors_end; nmp; nmp = nmp->prev) { + Uint16 i; + for (i = 0; i < nmp->no; i++) { + Eterm olist = NIL; + if (nmp->opts & ERTS_NODES_MON_OPT_TYPES) { + Eterm type; + switch (nmp->opts & ERTS_NODES_MON_OPT_TYPES) { + case ERTS_NODES_MON_OPT_TYPES: type = am_all; break; + case ERTS_NODES_MON_OPT_TYPE_VISIBLE: type = am_visible; break; + case ERTS_NODES_MON_OPT_TYPE_HIDDEN: type = am_hidden; break; + default: erl_exit(ERTS_ABORT_EXIT, "Bad node type found\n"); + } + olist = erts_bld_cons(hpp, szp, + erts_bld_tuple(hpp, szp, 2, + am_node_type, + type), + olist); + } + if (nmp->opts & ERTS_NODES_MON_OPT_DOWN_REASON) + olist = erts_bld_cons(hpp, szp, am_nodedown_reason, olist); + res = erts_bld_cons(hpp, szp, + erts_bld_tuple(hpp, szp, 2, + nmp->proc->id, + olist), + res); + } + } + + if (!hpp) { + hp = HAlloc(c_p, sz); +#ifdef DEBUG + hend = hp + sz; +#endif + hpp = &hp; + szp = NULL; + goto bld_result; + } + + ASSERT(hp == hend); + + erts_smp_mtx_unlock(&nodes_monitors_mtx); + + return res; +} |