diff options
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r-- | erts/emulator/beam/dist.c | 1335 |
1 files changed, 951 insertions, 384 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 09fdb897f5..bc168fc58d 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -121,7 +121,7 @@ Export* dexit_trap = NULL; Export* dmonitor_p_trap = NULL; /* local variables */ - +static Export *dist_ctrl_put_data_trap; /* forward declarations */ @@ -130,8 +130,8 @@ static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); -static erts_smp_atomic_t no_caches; -static erts_smp_atomic_t no_nodes; +static erts_atomic_t no_caches; +static erts_atomic_t no_nodes; struct { Eterm reason; @@ -144,8 +144,8 @@ delete_cache(ErtsAtomCache *cache) { if (cache) { erts_free(ERTS_ALC_T_DCACHE, (void *) cache); - ASSERT(erts_smp_atomic_read_nob(&no_caches) > 0); - erts_smp_atomic_dec_nob(&no_caches); + ASSERT(erts_atomic_read_nob(&no_caches) > 0); + erts_atomic_dec_nob(&no_caches); } } @@ -156,14 +156,12 @@ create_cache(DistEntry *dep) int i; ErtsAtomCache *cp; - ERTS_SMP_LC_ASSERT( - is_internal_port(dep->cid) - && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); + ERTS_LC_ASSERT(is_nil(dep->cid)); ASSERT(!dep->cache); dep->cache = cp = (ErtsAtomCache*) erts_alloc(ERTS_ALC_T_DCACHE, sizeof(ErtsAtomCache)); - erts_smp_atomic_inc_nob(&no_caches); + erts_atomic_inc_nob(&no_caches); for (i = 0; i < sizeof(cp->in_arr)/sizeof(cp->in_arr[0]); i++) { cp->in_arr[i] = THE_NON_VALUE; cp->out_arr[i] = THE_NON_VALUE; @@ -172,15 +170,17 @@ create_cache(DistEntry *dep) Uint erts_dist_cache_size(void) { - return (Uint) erts_smp_atomic_read_mb(&no_caches)*sizeof(ErtsAtomCache); + return (Uint) erts_atomic_read_mb(&no_caches)*sizeof(ErtsAtomCache); } static ErtsProcList * -get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs) +get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs) { - ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&dep->qlock)); - dep->qflgs &= ~unset_qflgs; - if (dep->qflgs & ERTS_DE_QFLG_EXIT) { + erts_aint32_t qflgs; + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); + qflgs = erts_atomic32_read_band_acqb(&dep->qflgs, ~unset_qflgs); + qflgs &= ~unset_qflgs; + if (qflgs & ERTS_DE_QFLG_EXIT) { /* No resume when exit has been scheduled */ return NULL; } @@ -283,17 +283,15 @@ static void doit_monitor_net_exits(ErtsMonitor *mon, void *vnecp) watched = (is_atom(rmon->name) ? TUPLE2(lhp, rmon->name, dep->sysname) : rmon->u.pid); -#ifdef ERTS_SMP rp_locks |= ERTS_PROC_LOCKS_MSG_SEND; - erts_smp_proc_lock(rp, ERTS_PROC_LOCKS_MSG_SEND); -#endif + erts_proc_lock(rp, ERTS_PROC_LOCKS_MSG_SEND); erts_queue_monitor_message(rp, &rp_locks, mon->ref, am_process, watched, am_noconnection); erts_destroy_monitor(rmon); } UnUseTmpHeapNoproc(3); } - erts_smp_proc_unlock(rp, rp_locks); + erts_proc_unlock(rp, rp_locks); done: erts_destroy_monitor(mon); } @@ -342,7 +340,7 @@ static void doit_link_net_exits_sub(ErtsLink *sublnk, void *vlnecp) trace_proc(NULL, 0, rp, am_getting_unlinked, sublnk->pid); } } - erts_smp_proc_unlock(rp, rp_locks); + erts_proc_unlock(rp, rp_locks); } done: erts_destroy_link(sublnk); @@ -384,7 +382,7 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp) rp = erts_proc_lookup(lnk->pid); if (!rp) goto done; - erts_smp_proc_lock(rp, rp_locks); + erts_proc_lock(rp, rp_locks); rlnk = erts_remove_link(&ERTS_P_LINKS(rp), name); if (rlnk != NULL) { ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE)); @@ -401,7 +399,7 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp) tup = TUPLE2(hp, am_nodedown, name); erts_queue_message(rp, rp_locks, msgp, tup, am_system); } - erts_smp_proc_unlock(rp, rp_locks); + erts_proc_unlock(rp, rp_locks); } done: erts_destroy_link(lnk); @@ -413,16 +411,16 @@ set_node_not_alive(void *unused) ErlHeapFragment *bp; Eterm nodename = erts_this_dist_entry->sysname; - ASSERT(erts_smp_atomic_read_nob(&no_nodes) == 0); + ASSERT(erts_atomic_read_nob(&no_nodes) == 0); - erts_smp_thr_progress_block(); + erts_thr_progress_block(); erts_set_this_node(am_Noname, 0); erts_is_alive = 0; send_nodes_mon_msgs(NULL, am_nodedown, nodename, am_visible, nodedown.reason); nodedown.reason = NIL; bp = nodedown.bp; nodedown.bp = NULL; - erts_smp_thr_progress_unblock(); + erts_thr_progress_unblock(); if (bp) free_message_buffer(bp); } @@ -430,7 +428,7 @@ set_node_not_alive(void *unused) static ERTS_INLINE void dec_no_nodes(void) { - erts_aint_t no = erts_smp_atomic_dec_read_mb(&no_nodes); + erts_aint_t no = erts_atomic_dec_read_mb(&no_nodes); ASSERT(no >= 0); ASSERT(erts_get_scheduler_id()); /* Need to be a scheduler */ if (no == 0) @@ -443,12 +441,40 @@ static ERTS_INLINE void inc_no_nodes(void) { #ifdef DEBUG - erts_aint_t no = erts_smp_atomic_read_nob(&no_nodes); + erts_aint_t no = erts_atomic_read_nob(&no_nodes); ASSERT(erts_is_alive ? no > 0 : no == 0); #endif - erts_smp_atomic_inc_mb(&no_nodes); + erts_atomic_inc_mb(&no_nodes); } - + +static void +kill_dist_ctrl_proc(void *vpid) +{ + Eterm pid = (Eterm) vpid; + ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; + Process *rp = erts_pid2proc(NULL, 0, pid, rp_locks); + if (rp) { + erts_send_exit_signal(NULL, rp->common.id, rp, &rp_locks, + am_kill, NIL, NULL, 0); + if (rp_locks) + erts_proc_unlock(rp, rp_locks); + } +} + +static void +schedule_kill_dist_ctrl_proc(Eterm pid) +{ + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + int sched_id = 1; + if (!esdp || ERTS_SCHEDULER_IS_DIRTY(esdp)) + sched_id = 1; + else + sched_id = (int) esdp->no; + erts_schedule_misc_aux_work(sched_id, + kill_dist_ctrl_proc, + (void *) (UWord) pid); +} + /* * proc is currently running or exiting process. */ @@ -458,58 +484,62 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */ DistEntry *tdep; - int no_dist_port = 0; + int no_dist_ctrl = 0; Eterm nd_reason = (reason == am_no_network ? am_no_network : am_net_kernel_terminated); - erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx); + erts_rwmtx_rlock(&erts_dist_table_rwmtx); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) - no_dist_port++; + no_dist_ctrl++; for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) - no_dist_port++; + no_dist_ctrl++; /* KILL all port controllers */ - if (no_dist_port == 0) - erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx); + if (no_dist_ctrl == 0) + erts_rwmtx_runlock(&erts_dist_table_rwmtx); else { Eterm def_buf[128]; int i = 0; - Eterm *dist_port; + Eterm *dist_ctrl; - if (no_dist_port <= sizeof(def_buf)/sizeof(def_buf[0])) - dist_port = &def_buf[0]; + if (no_dist_ctrl <= sizeof(def_buf)/sizeof(def_buf[0])) + dist_ctrl = &def_buf[0]; else - dist_port = erts_alloc(ERTS_ALC_T_TMP, - sizeof(Eterm)*no_dist_port); + dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, + sizeof(Eterm)*no_dist_ctrl); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) { - ASSERT(is_internal_port(tdep->cid)); - dist_port[i++] = tdep->cid; + ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + dist_ctrl[i++] = tdep->cid; } for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) { - ASSERT(is_internal_port(tdep->cid)); - dist_port[i++] = tdep->cid; + ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + dist_ctrl[i++] = tdep->cid; } - erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx); - - for (i = 0; i < no_dist_port; i++) { - Port *prt = erts_port_lookup(dist_port[i], - ERTS_PORT_SFLGS_INVALID_LOOKUP); - if (!prt) - continue; - ASSERT(erts_atomic32_read_nob(&prt->state) - & ERTS_PORT_SFLG_DISTRIBUTION); - - erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED, - prt, dist_port[i], nd_reason, NULL); + erts_rwmtx_runlock(&erts_dist_table_rwmtx); + + for (i = 0; i < no_dist_ctrl; i++) { + if (is_internal_pid(dist_ctrl[i])) + schedule_kill_dist_ctrl_proc(dist_ctrl[i]); + else { + Port *prt = erts_port_lookup(dist_ctrl[i], + ERTS_PORT_SFLGS_INVALID_LOOKUP); + if (prt) { + ASSERT(erts_atomic32_read_nob(&prt->state) + & ERTS_PORT_SFLG_DISTRIBUTION); + + erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED, + prt, dist_ctrl[i], nd_reason, NULL); + } + } } - if (dist_port != &def_buf[0]) - erts_free(ERTS_ALC_T_TMP, dist_port); + if (dist_ctrl != &def_buf[0]) + erts_free(ERTS_ALC_T_TMP, dist_ctrl); } /* - * When last dist port exits, node will be taken + * When last dist ctrl exits, node will be taken * from alive to not alive. */ ASSERT(is_nil(nodedown.reason) && !nodedown.bp); @@ -526,52 +556,51 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) &nodedown.bp->off_heap); } } - else { /* Call from distribution port */ + else { /* Call from distribution controller (port/process) */ NetExitsContext nec = {dep}; ErtsLink *nlinks; ErtsLink *node_links; ErtsMonitor *monitors; Uint32 flags; - erts_smp_atomic_set_mb(&dep->dist_cmd_scheduled, 1); - erts_smp_de_rwlock(dep); + erts_atomic_set_mb(&dep->dist_cmd_scheduled, 1); + erts_de_rwlock(dep); - ERTS_SMP_LC_ASSERT(is_internal_port(dep->cid) - && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); + if (is_internal_port(dep->cid)) { + ERTS_LC_ASSERT(erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); - if (erts_port_task_is_scheduled(&dep->dist_cmd)) - erts_port_task_abort(&dep->dist_cmd); + if (erts_port_task_is_scheduled(&dep->dist_cmd)) + erts_port_task_abort(&dep->dist_cmd); + } if (dep->status & ERTS_DE_SFLG_EXITING) { #ifdef DEBUG - erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT); - erts_smp_mtx_unlock(&dep->qlock); + ASSERT(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT); #endif } else { dep->status |= ERTS_DE_SFLG_EXITING; - erts_smp_mtx_lock(&dep->qlock); - ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT)); - dep->qflgs |= ERTS_DE_QFLG_EXIT; - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_lock(&dep->qlock); + ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); + erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT); + erts_mtx_unlock(&dep->qlock); } - erts_smp_de_links_lock(dep); + erts_de_links_lock(dep); monitors = dep->monitors; nlinks = dep->nlinks; node_links = dep->node_links; dep->monitors = NULL; dep->nlinks = NULL; dep->node_links = NULL; - erts_smp_de_links_unlock(dep); + erts_de_links_unlock(dep); nodename = dep->sysname; flags = dep->flags; erts_set_dist_entry_not_connected(dep); - erts_smp_de_rwunlock(dep); + erts_de_rwunlock(dep); erts_sweep_monitors(monitors, &doit_monitor_net_exits, (void *) &nec); erts_sweep_links(nlinks, &doit_link_net_exits, (void *) &nec); @@ -605,8 +634,8 @@ void init_dist(void) nodedown.reason = NIL; nodedown.bp = NULL; - erts_smp_atomic_init_nob(&no_nodes, 0); - erts_smp_atomic_init_nob(&no_caches, 0); + erts_atomic_init_nob(&no_nodes, 0); + erts_atomic_init_nob(&no_caches, 0); /* Lookup/Install all references to trap functions */ dsend2_trap = trap_function(am_dsend,2); @@ -618,6 +647,9 @@ void init_dist(void) dgroup_leader_trap = trap_function(am_dgroup_leader,2); dexit_trap = trap_function(am_dexit, 2); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); + dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, + am_dist_ctrl_put_data, + 2); } #define ErtsDistOutputBuf2Binary(OB) \ @@ -659,19 +691,24 @@ static void clear_dist_entry(DistEntry *dep) ErtsProcList *suspendees; ErtsDistOutputBuf *obuf; - erts_smp_de_rwlock(dep); + erts_de_rwlock(dep); + erts_atomic_set_nob(&dep->input_handler, + (erts_aint_t) NIL); cache = dep->cache; dep->cache = NULL; #ifdef DEBUG - erts_smp_de_links_lock(dep); + erts_de_links_lock(dep); ASSERT(!dep->nlinks); ASSERT(!dep->node_links); ASSERT(!dep->monitors); - erts_smp_de_links_unlock(dep); + erts_de_links_unlock(dep); #endif - erts_smp_mtx_lock(&dep->qlock); + erts_mtx_lock(&dep->qlock); + + erts_atomic64_set_nob(&dep->in, 0); + erts_atomic64_set_nob(&dep->out, 0); if (!dep->out_queue.last) obuf = dep->finalized_out_queue.first; @@ -680,17 +717,24 @@ static void clear_dist_entry(DistEntry *dep) obuf = dep->out_queue.first; } + if (dep->tmp_out_queue.first) { + dep->tmp_out_queue.last->next = obuf; + obuf = dep->tmp_out_queue.first; + } + dep->out_queue.first = NULL; dep->out_queue.last = NULL; + dep->tmp_out_queue.first = NULL; + dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; dep->status = 0; suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); - erts_smp_mtx_unlock(&dep->qlock); - erts_smp_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); dep->send = NULL; - erts_smp_de_rwunlock(dep); + erts_de_rwunlock(dep); erts_resume_processes(suspendees); @@ -705,10 +749,11 @@ static void clear_dist_entry(DistEntry *dep) } if (obufsize) { - erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize >= obufsize); - dep->qsize -= obufsize; - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_lock(&dep->qlock); + ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); + erts_atomic_add_nob(&dep->qsize, + (erts_aint_t) -obufsize); + erts_mtx_unlock(&dep->qlock); } } @@ -813,9 +858,9 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, watched, watcher, ref, reason); #ifdef DEBUG - erts_smp_de_links_lock(dsdp->dep); + erts_de_links_lock(dsdp->dep); ASSERT(!erts_lookup_monitor(dsdp->dep->monitors, ref)); - erts_smp_de_links_unlock(dsdp->dep); + erts_de_links_unlock(dsdp->dep); #endif res = dsig_send_ctl(dsdp, ctl, 1); @@ -906,11 +951,30 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) } #endif - if (token != NIL) - ctl = TUPLE4(&ctx->ctl_heap[0], - make_small(DOP_SEND_TT), am_Empty, remote, token); - else - ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_SEND), am_Empty, remote); + if (token != NIL) { + Eterm el1, el2; + if (ctx->dep->flags & DFLAG_SEND_SENDER) { + el1 = make_small(DOP_SEND_SENDER_TT); + el2 = sender->common.id; + } + else { + el1 = make_small(DOP_SEND_TT); + el2 = am_Empty; + } + ctl = TUPLE4(&ctx->ctl_heap[0], el1, el2, remote, token); + } + else { + Eterm el1, el2; + if (ctx->dep->flags & DFLAG_SEND_SENDER) { + el1 = make_small(DOP_SEND_SENDER); + el2 = sender->common.id; + } + else { + el1 = make_small(DOP_SEND); + el2 = am_Empty; + } + ctl = TUPLE3(&ctx->ctl_heap[0], el1, el2, remote); + } DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); DTRACE7(message_send_remote, sender_name, node_name, receiver_name, @@ -1147,22 +1211,25 @@ int erts_net_message(Port *prt, ErtsLink *lnk; Uint tuple_arity; int res; + Uint32 connection_id; #ifdef ERTS_DIST_MSG_DBG ErlDrvSizeT orig_len = len; #endif UseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); - ERTS_SMP_CHK_NO_PROC_LOCKS; + ERTS_CHK_NO_PROC_LOCKS; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_LC_ASSERT(!prt || erts_lc_is_port_locked(prt)); if (!erts_is_alive) { UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); return 0; } - if (hlen != 0) - goto data_error; + + + ASSERT(hlen == 0); + if (len == 0) { /* HANDLE TICK !!! */ UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); return 0; @@ -1181,30 +1248,31 @@ int erts_net_message(Port *prt, len--; } - if (len == 0) { - PURIFY_MSG("data error"); - goto data_error; - } + res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache, &connection_id); - res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache); - - if (res >= 0) - res = ctl_len = erts_decode_dist_ext_size(&ede); - else { + switch (res) { + case ERTS_PREP_DIST_EXT_CLOSED: + return 0; /* Connection not alive; ignore signal... */ + case ERTS_PREP_DIST_EXT_FAILED: #ifdef ERTS_DIST_MSG_DBG erts_fprintf(stderr, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n"); bw(buf, orig_len); #endif - ctl_len = 0; - } - - if (res < 0) { + goto data_error; + case ERTS_PREP_DIST_EXT_SUCCESS: + ctl_len = erts_decode_dist_ext_size(&ede); + if (ctl_len < 0) { #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n"); - bw(buf, orig_len); + erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n"); + bw(buf, orig_len); #endif - PURIFY_MSG("data error"); - goto data_error; + PURIFY_MSG("data error"); + goto data_error; + } + break; + default: + ERTS_INTERNAL_ERROR("Unexpected result from erts_prepare_dist_ext()"); + break; } if (ctl_len > DIST_CTL_DEFAULT_SIZE) { @@ -1235,6 +1303,7 @@ int erts_net_message(Port *prt, } token_size = 0; + token = NIL; switch (type = unsigned_val(tuple[1])) { case DOP_LINK: @@ -1263,23 +1332,23 @@ int erts_net_message(Port *prt, break; } - erts_smp_de_links_lock(dep); + erts_de_links_lock(dep); res = erts_add_link(&ERTS_P_LINKS(rp), LINK_PID, from); if (res < 0) { /* It was already there! Lets skip the rest... */ - erts_smp_de_links_unlock(dep); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + erts_de_links_unlock(dep); + erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); break; } lnk = erts_add_or_lookup_link(&(dep->nlinks), LINK_PID, rp->common.id); erts_add_link(&(ERTS_LINK_ROOT(lnk)), LINK_PID, from); - erts_smp_de_links_unlock(dep); + erts_de_links_unlock(dep); if (IS_TRACED_FL(rp, F_TRACE_PROCS)) trace_proc(NULL, 0, rp, am_getting_linked, from); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); break; case DOP_UNLINK: { @@ -1305,7 +1374,7 @@ int erts_net_message(Port *prt, trace_proc(NULL, 0, rp, am_getting_unlinked, from); } - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); erts_remove_dist_link(&dld, to, from, dep); erts_destroy_dist_link(&dld); @@ -1357,11 +1426,11 @@ int erts_net_message(Port *prt, else { if (is_atom(watched)) watched = rp->common.id; - erts_smp_de_links_lock(dep); + erts_de_links_lock(dep); erts_add_monitor(&(dep->monitors), MON_ORIGIN, ref, watched, name); erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, ref, watcher, name); - erts_smp_de_links_unlock(dep); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + erts_de_links_unlock(dep); + erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); } break; @@ -1383,9 +1452,9 @@ int erts_net_message(Port *prt, goto invalid_message; } - erts_smp_de_links_lock(dep); + erts_de_links_lock(dep); mon = erts_remove_monitor(&(dep->monitors),ref); - erts_smp_de_links_unlock(dep); + erts_de_links_unlock(dep); /* ASSERT(mon != NULL); can happen in case of broken dist message */ if (mon == NULL) { break; @@ -1399,7 +1468,7 @@ int erts_net_message(Port *prt, break; } mon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + erts_proc_unlock(rp, ERTS_PROC_LOCK_LINK); ASSERT(mon != NULL); if (mon == NULL) { break; @@ -1460,42 +1529,56 @@ int erts_net_message(Port *prt, erts_queue_dist_message(rp, locks, ede_copy, token, from); if (locks) - erts_smp_proc_unlock(rp, locks); + erts_proc_unlock(rp, locks); } break; + case DOP_SEND_SENDER_TT: { + Uint xsize; case DOP_SEND_TT: + if (tuple_arity != 4) { goto invalid_message; } - - token_size = size_object(tuple[4]); - /* Fall through ... */ + + token = tuple[4]; + token_size = size_object(token); + xsize = ERTS_HEAP_FRAG_SIZE(token_size); + goto send_common; + + case DOP_SEND_SENDER: case DOP_SEND: + + token = NIL; + xsize = 0; + if (tuple_arity != 3) + goto invalid_message; + + send_common: + /* - * There is intentionally no testing of the cookie (it is always '') - * from R9B and onwards. + * If DOP_SEND_SENDER or DOP_SEND_SENDER_TT element 2 contains + * the sender pid (i.e. DFLAG_SEND_SENDER is set); otherwise, + * the atom '' (empty cookie). */ + ASSERT((type == DOP_SEND_SENDER || type == DOP_SEND_SENDER_TT) + ? (is_pid(tuple[2]) && (dep->flags & DFLAG_SEND_SENDER)) + : tuple[2] == am_Empty); + #ifdef ERTS_DIST_MSG_DBG dist_msg_dbg(&ede, "MSG", buf, orig_len); #endif - if (type != DOP_SEND_TT && tuple_arity != 3) { - goto invalid_message; - } to = tuple[3]; if (is_not_pid(to)) { goto invalid_message; } rp = erts_proc_lookup(to); if (rp) { - Uint xsize = type == DOP_SEND ? 0 : ERTS_HEAP_FRAG_SIZE(token_size); ErtsProcLocks locks = 0; ErtsDistExternal *ede_copy; ede_copy = erts_make_dist_ext_copy(&ede, xsize); - if (type == DOP_SEND) { - token = NIL; - } else { + if (is_not_nil(token)) { ErlHeapFragment *heap_frag; ErlOffHeap *ohp; ASSERT(xsize); @@ -1503,15 +1586,15 @@ int erts_net_message(Port *prt, ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size); hp = heap_frag->mem; ohp = &heap_frag->off_heap; - token = tuple[4]; token = copy_struct(token, token_size, &hp, ohp); } - erts_queue_dist_message(rp, locks, ede_copy, token, tuple[2]); + erts_queue_dist_message(rp, locks, ede_copy, token, am_Empty); if (locks) - erts_smp_proc_unlock(rp, locks); + erts_proc_unlock(rp, locks); } break; + } case DOP_MONITOR_P_EXIT: { /* We are monitoring a process on the remote node which dies, we get @@ -1535,7 +1618,7 @@ int erts_net_message(Port *prt, goto invalid_message; } - erts_smp_de_links_lock(dep); + erts_de_links_lock(dep); sysname = dep->sysname; mon = erts_remove_monitor(&(dep->monitors), ref); /* @@ -1544,7 +1627,7 @@ int erts_net_message(Port *prt, * removed info about monitor. In this case, do nothing * and everything will be as it should. */ - erts_smp_de_links_unlock(dep); + erts_de_links_unlock(dep); if (mon == NULL) { break; } @@ -1558,7 +1641,7 @@ int erts_net_message(Port *prt, mon = erts_remove_monitor(&ERTS_P_MONITORS(rp), ref); if (mon == NULL) { - erts_smp_proc_unlock(rp, rp_locks); + erts_proc_unlock(rp, rp_locks); break; } UseTmpHeapNoproc(3); @@ -1569,7 +1652,7 @@ int erts_net_message(Port *prt, erts_queue_monitor_message(rp, &rp_locks, ref, am_process, watched, reason); - erts_smp_proc_unlock(rp, rp_locks); + erts_proc_unlock(rp, rp_locks); erts_destroy_monitor(mon); UnUseTmpHeapNoproc(3); break; @@ -1631,13 +1714,13 @@ int erts_net_message(Port *prt, if (xres >= 0 && IS_TRACED_FL(rp, F_TRACE_PROCS)) { /* We didn't exit the process and it is traced */ if (rp_locks & ERTS_PROC_LOCKS_XSIG_SEND) { - erts_smp_proc_unlock(rp, ERTS_PROC_LOCKS_XSIG_SEND); + erts_proc_unlock(rp, ERTS_PROC_LOCKS_XSIG_SEND); rp_locks &= ~ERTS_PROC_LOCKS_XSIG_SEND; } trace_proc(NULL, 0, rp, am_getting_unlinked, from); } } - erts_smp_proc_unlock(rp, rp_locks); + erts_proc_unlock(rp, rp_locks); } erts_remove_dist_link(&dld, to, from, dep); if (lnk) @@ -1679,7 +1762,7 @@ int erts_net_message(Port *prt, token, NULL, 0); - erts_smp_proc_unlock(rp, rp_locks); + erts_proc_unlock(rp, rp_locks); } break; } @@ -1697,7 +1780,7 @@ int erts_net_message(Port *prt, if (!rp) break; rp->group_leader = STORE_NC_IN_PROC(rp, from); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); + erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); break; default: @@ -1709,7 +1792,7 @@ int erts_net_message(Port *prt, erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl); } UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); - ERTS_SMP_CHK_NO_PROC_LOCKS; + ERTS_CHK_NO_PROC_LOCKS; return 0; invalid_message: { @@ -1725,8 +1808,8 @@ decode_error: } data_error: UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); - erts_deliver_port_exit(prt, dep->cid, am_killed, 0, 1); - ERTS_SMP_CHK_NO_PROC_LOCKS; + erts_kill_dist_connection(dep, connection_id); + ERTS_CHK_NO_PROC_LOCKS; return -1; } @@ -1746,6 +1829,31 @@ static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy) return ret; } +static ERTS_INLINE void +notify_dist_data(Process *c_p, Eterm pid) +{ + Process *rp; + ErtsProcLocks rp_locks; + + ASSERT(erts_get_scheduler_data() + && !ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data())); + ASSERT(is_internal_pid(pid)); + + if (c_p && c_p->common.id == pid) { + rp = c_p; + rp_locks = ERTS_PROC_LOCK_MAIN; + } + else { + rp = erts_proc_lookup(pid); + rp_locks = 0; + } + + if (rp) { + ErtsMessage *mp = erts_alloc_message(0, NULL); + erts_queue_message(rp, rp_locks, mp, am_dist_data, am_system); + } +} + int erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) { @@ -1762,7 +1870,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) if (!ctx->c_p || dsdp->no_suspend) ctx->force_busy = 1; - ERTS_SMP_LC_ASSERT(!ctx->c_p + ERTS_LC_ASSERT(!ctx->c_p || (ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(ctx->c_p))); @@ -1851,28 +1959,48 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) * and if so enqueue the signal and schedule it for send. */ ctx->obuf->next = NULL; - erts_smp_de_rlock(dep); + erts_de_rlock(dep); cid = dep->cid; if (cid != dsdp->cid || dep->connection_id != dsdp->connection_id || dep->status & ERTS_DE_SFLG_EXITING) { /* Not the same connection as when we started; drop message... */ - erts_smp_de_runlock(dep); + erts_de_runlock(dep); free_dist_obuf(ctx->obuf); } else { + Sint qsize; + erts_aint32_t qflgs; ErtsProcList *plp = NULL; - erts_smp_mtx_lock(&dep->qlock); - dep->qsize += size_obuf(ctx->obuf); - if (dep->qsize >= erts_dist_buf_busy_limit) - dep->qflgs |= ERTS_DE_QFLG_BUSY; - if (!ctx->force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) { - erts_smp_mtx_unlock(&dep->qlock); + Eterm notify_proc = NIL; + Sint obsz = size_obuf(ctx->obuf); + + erts_mtx_lock(&dep->qlock); + qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) obsz); + ASSERT(qsize >= obsz); + qflgs = erts_atomic32_read_nob(&dep->qflgs); + if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) { + erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY); + qflgs |= ERTS_DE_QFLG_BUSY; + } + if (qsize == obsz && (qflgs & ERTS_DE_QFLG_REQ_INFO)) { + /* Previously empty queue and info requested... */ + qflgs = erts_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + notify_proc = dep->cid; + ASSERT(is_internal_pid(notify_proc)); + } + /* else: requester will send itself the message... */ + qflgs &= ~ERTS_DE_QFLG_REQ_INFO; + } + if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) { + erts_mtx_unlock(&dep->qlock); plp = erts_proclist_create(ctx->c_p); erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL); suspended = 1; - erts_smp_mtx_lock(&dep->qlock); + erts_mtx_lock(&dep->qlock); } /* Enqueue obuf on dist entry */ @@ -1883,7 +2011,8 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) dep->out_queue.last = ctx->obuf; if (!ctx->force_busy) { - if (!(dep->qflgs & ERTS_DE_QFLG_BUSY)) { + qflgs = erts_atomic32_read_nob(&dep->qflgs); + if (!(qflgs & ERTS_DE_QFLG_BUSY)) { if (suspended) resume = 1; /* was busy when we started, but isn't now */ #ifdef USE_VM_PROBES @@ -1907,9 +2036,12 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } } - erts_smp_mtx_unlock(&dep->qlock); - erts_schedule_dist_command(NULL, dep); - erts_smp_de_runlock(dep); + erts_mtx_unlock(&dep->qlock); + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + erts_de_runlock(dep); + if (is_internal_pid(notify_proc)) + notify_dist_data(ctx->c_p, notify_proc); if (resume) { erts_resume(ctx->c_p, ERTS_PROC_LOCK_MAIN); @@ -1963,16 +2095,20 @@ static Uint dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) { int fpe_was_unmasked; - Uint size = obuf->ext_endp - obuf->extp; + ErlDrvSizeT size; + char *bufp; - ERTS_SMP_CHK_NO_PROC_LOCKS; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_CHK_NO_PROC_LOCKS; + ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (size > (Uint) INT_MAX) - erts_exit(ERTS_DUMP_EXIT, - "Absurdly large distribution output data buffer " - "(%beu bytes) passed.\n", - size); + if (!obuf) { + size = 0; + bufp = NULL; + } + else { + size = obuf->ext_endp - obuf->extp; + bufp = (char*) obuf->extp; + } #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_output)) { @@ -1987,11 +2123,10 @@ dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) remote_str, size); } #endif + prt->caller = NIL; fpe_was_unmasked = erts_block_fpe(); - (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, - (char*) obuf->extp, - (int) size); + (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, bufp, size); erts_unblock_fpe(fpe_was_unmasked); return size; } @@ -2000,33 +2135,41 @@ static Uint dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) { int fpe_was_unmasked; - Uint size = obuf->ext_endp - obuf->extp; + ErlDrvSizeT size; SysIOVec iov[2]; ErlDrvBinary* bv[2]; ErlIOVec eiov; - ERTS_SMP_CHK_NO_PROC_LOCKS; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - - if (size > (Uint) INT_MAX) - erts_exit(ERTS_DUMP_EXIT, - "Absurdly large distribution output data buffer " - "(%beu bytes) passed.\n", - size); + ERTS_CHK_NO_PROC_LOCKS; + ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); iov[0].iov_base = NULL; iov[0].iov_len = 0; bv[0] = NULL; - iov[1].iov_base = obuf->extp; - iov[1].iov_len = size; - bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); + if (!obuf) { + size = 0; + eiov.vsize = 1; + } + else { + size = obuf->ext_endp - obuf->extp; + eiov.vsize = 2; + + iov[1].iov_base = obuf->extp; + iov[1].iov_len = size; + bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); + } - eiov.vsize = 2; eiov.size = size; eiov.iov = iov; eiov.binv = bv; + if (size > (Uint) INT_MAX) + erts_exit(ERTS_DUMP_EXIT, + "Absurdly large distribution output data buffer " + "(%beu bytes) passed.\n", + size); + ASSERT(prt->drv_ptr->outputv); #ifdef USE_VM_PROBES @@ -2074,29 +2217,25 @@ erts_dist_command(Port *prt, int reds_limit) Sint reds = ERTS_PORT_REDS_DIST_CMD_START; Uint32 status; Uint32 flags; - Sint obufsize = 0; + Sint qsize, obufsize = 0; ErtsDistOutputQueue oq, foq; DistEntry *dep = prt->dist_entry; Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); erts_aint32_t sched_flags; ErtsSchedulerData *esdp = erts_get_scheduler_data(); - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - - erts_smp_refc_inc(&dep->refc, 1); /* Otherwise dist_entry might be - removed if port command fails */ + ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); - erts_smp_atomic_set_mb(&dep->dist_cmd_scheduled, 0); + erts_atomic_set_mb(&dep->dist_cmd_scheduled, 0); - erts_smp_de_rlock(dep); + erts_de_rlock(dep); flags = dep->flags; status = dep->status; send = dep->send; - erts_smp_de_runlock(dep); + erts_de_runlock(dep); if (status & ERTS_DE_SFLG_EXITING) { erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1); - erts_deref_dist_entry(dep); return reds + ERTS_PORT_REDS_DIST_CMD_EXIT; } @@ -2110,19 +2249,19 @@ erts_dist_command(Port *prt, int reds_limit) * a mess. */ - erts_smp_mtx_lock(&dep->qlock); + erts_mtx_lock(&dep->qlock); oq.first = dep->out_queue.first; oq.last = dep->out_queue.last; dep->out_queue.first = NULL; dep->out_queue.last = NULL; - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_unlock(&dep->qlock); foq.first = dep->finalized_out_queue.first; foq.last = dep->finalized_out_queue.last; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; - sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); + sched_flags = erts_atomic32_read_nob(&prt->sched.flags); if (reds > reds_limit) goto preempted; @@ -2130,21 +2269,21 @@ erts_dist_command(Port *prt, int reds_limit) if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) { int preempt = 0; do { - Uint size; - ErtsDistOutputBuf *fob; - - size = (*send)(prt, foq.first); - esdp->io.out += (Uint64) size; + Uint size; + ErtsDistOutputBuf *fob; + size = (*send)(prt, foq.first); + erts_atomic64_inc_nob(&dep->out); + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG - erts_fprintf(stderr, ">> "); - bw(foq.first->extp, size); + erts_fprintf(stderr, ">> "); + bw(foq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = foq.first; - obufsize += size_obuf(fob); - foq.first = foq.first->next; - free_dist_obuf(fob); - sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); + reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = foq.first; + obufsize += size_obuf(fob); + foq.first = foq.first->next; + free_dist_obuf(fob); + sched_flags = erts_atomic32_read_nob(&prt->sched.flags); preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) break; @@ -2204,32 +2343,34 @@ erts_dist_command(Port *prt, int reds_limit) } } else { + int de_busy; int preempt = 0; while (oq.first && !preempt) { - ErtsDistOutputBuf *fob; - Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ - ASSERT(&oq.first->data[0] <= oq.first->extp - && oq.first->extp < oq.first->ext_endp); - size = (*send)(prt, oq.first); - esdp->io.out += (Uint64) size; + ErtsDistOutputBuf *fob; + Uint size; + oq.first->extp + = erts_encode_ext_dist_header_finalize(oq.first->extp, + dep->cache, + flags); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' + needed */ + ASSERT(&oq.first->data[0] <= oq.first->extp + && oq.first->extp < oq.first->ext_endp); + size = (*send)(prt, oq.first); + erts_atomic64_inc_nob(&dep->out); + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG - erts_fprintf(stderr, ">> "); - bw(oq.first->extp, size); + erts_fprintf(stderr, ">> "); + bw(oq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = oq.first; - obufsize += size_obuf(fob); - oq.first = oq.first->next; - free_dist_obuf(fob); - sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); + reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = oq.first; + obufsize += size_obuf(fob); + oq.first = oq.first->next; + free_dist_obuf(fob); + sched_flags = erts_atomic32_read_nob(&prt->sched.flags); preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt) goto finalize_only; @@ -2256,23 +2397,24 @@ erts_dist_command(Port *prt, int reds_limit) * dist entry in a non-busy state and resume suspended * processes. */ - erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize >= obufsize); - dep->qsize -= obufsize; + erts_mtx_lock(&dep->qlock); + de_busy = !!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_BUSY); + qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize, + (erts_aint_t) -obufsize); + ASSERT(qsize >= 0); obufsize = 0; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) - && (dep->qflgs & ERTS_DE_QFLG_BUSY) - && dep->qsize < erts_dist_buf_busy_limit) { + && de_busy && qsize < erts_dist_buf_busy_limit) { ErtsProcList *suspendees; int resumed; suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_unlock(&dep->qlock); resumed = erts_resume_processes(suspendees); reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } else - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_unlock(&dep->qlock); } ASSERT(!oq.first && !oq.last); @@ -2281,10 +2423,15 @@ erts_dist_command(Port *prt, int reds_limit) if (obufsize != 0) { ASSERT(obufsize > 0); - erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize >= obufsize); - dep->qsize -= obufsize; - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_lock(&dep->qlock); +#ifdef DEBUG + qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize, + (erts_aint_t) -obufsize); + ASSERT(qsize >= 0); +#else + erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize); +#endif + erts_mtx_unlock(&dep->qlock); } ASSERT(foq.first || !foq.last); @@ -2301,8 +2448,6 @@ erts_dist_command(Port *prt, int reds_limit) if (reds > INT_MAX/2) reds = INT_MAX/2; - erts_deref_dist_entry(dep); - return reds; preempted: @@ -2338,9 +2483,9 @@ erts_dist_command(Port *prt, int reds_limit) foq.last = NULL; #ifdef DEBUG - erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize == obufsize); - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_lock(&dep->qlock); + ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); + erts_mtx_unlock(&dep->qlock); #endif } else { @@ -2349,14 +2494,14 @@ erts_dist_command(Port *prt, int reds_limit) * Unhandle buffers need to be put back first * in out_queue. */ - erts_smp_mtx_lock(&dep->qlock); - dep->qsize -= obufsize; + erts_mtx_lock(&dep->qlock); + erts_atomic_add_nob(&dep->qsize, -obufsize); obufsize = 0; oq.last->next = dep->out_queue.first; dep->out_queue.first = oq.first; if (!dep->out_queue.last) dep->out_queue.last = oq.last; - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_unlock(&dep->qlock); } erts_schedule_dist_command(prt, NULL); @@ -2364,6 +2509,370 @@ erts_dist_command(Port *prt, int reds_limit) goto done; } +#if 0 + +int +dist_data_finalize(Process *c_p, int reds_limit) +{ + int reds = 5; + DistEntry *dep = ; + ErtsDistOutputQueue oq, foq; + ErtsDistOutputBuf *ob; + int preempt; + + + erts_mtx_lock(&dep->qlock); + flags = dep->flags; + oq.first = dep->out_queue.first; + oq.last = dep->out_queue.last; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + erts_mtx_unlock(&dep->qlock); + + if (!oq.first) { + ASSERT(!oq.last); + oq.first = dep->tmp_out_queue.first; + oq.last = dep->tmp_out_queue.last; + } + else { + ErtsDistOutputBuf *f, *l; + ASSERT(oq.last); + if (dep->tmp_out_queue.last) { + dep->tmp_out_queue.last->next = oq.first; + oq.first = dep->tmp_out_queue.first; + } + } + + if (!oq.first) { + /* Nothing to do... */ + ASSERT(!oq.last); + return reds; + } + + foq.first = dep->finalized_out_queue.first; + foq.last = dep->finalized_out_queue.last; + + preempt = 0; + ob = oq.first; + ASSERT(ob); + + do { + ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, + dep->cache, + flags); + if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' + needed */ + ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + preempt = reds > reds_limit; + if (preempt) + break; + ob = ob->next; + } while (ob); + /* + * At least one buffer was finalized; if we got preempted, + * ob points to the last buffer that we finalized. + */ + if (foq.last) + foq.last->next = oq.first; + else + foq.first = oq.first; + if (!preempt) { + /* All buffers finalized */ + foq.last = oq.last; + oq.first = oq.last = NULL; + } + else { + /* Not all buffers finalized; split oq. */ + foq.last = ob; + oq.first = ob->next; + if (oq.first) + ob->next = NULL; + else + oq.last = NULL; + } + + dep->finalized_out_queue.first = foq.first; + dep->finalized_out_queue.last = foq.last; + dep->tmp_out_queue.first = oq.first; + dep->tmp_out_queue.last = oq.last; + + return reds; +} + +#endif + +BIF_RETTYPE +dist_ctrl_get_data_notification_1(BIF_ALIST_1) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + erts_aint32_t qflgs; + erts_aint_t qsize; + Eterm receiver = NIL; + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) + BIF_ERROR(BIF_P, BADARG); + + /* + * Caller is the only one that can consume from this queue + * and the only one that can set the req-info flag... + */ + + erts_de_rlock(dep); + + ASSERT(dep->cid == BIF_P->common.id); + + qflgs = erts_atomic32_read_acqb(&dep->qflgs); + + if (!(qflgs & ERTS_DE_QFLG_REQ_INFO)) { + qsize = erts_atomic_read_acqb(&dep->qsize); + ASSERT(qsize >= 0); + if (qsize > 0) + receiver = BIF_P->common.id; /* Notify ourselves... */ + else { /* Empty queue; set req-info flag... */ + qflgs = erts_atomic32_read_bor_mb(&dep->qflgs, + ERTS_DE_QFLG_REQ_INFO); + qsize = erts_atomic_read_acqb(&dep->qsize); + ASSERT(qsize >= 0); + if (qsize > 0) { + qflgs = erts_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) + receiver = BIF_P->common.id; /* Notify ourselves... */ + /* else: someone else will notify us... */ + } + /* else: still empty queue... */ + } + } + /* else: Already requested... */ + + erts_de_runlock(dep); + + if (is_internal_pid(receiver)) + notify_dist_data(BIF_P, receiver); + + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_ctrl_put_data_2(BIF_ALIST_2) +{ + DistEntry *dep; + ErlDrvSizeT size; + Eterm input_handler; + + if (is_binary(BIF_ARG_2)) + size = binary_size(BIF_ARG_2); + else if (is_nil(BIF_ARG_2)) + size = 0; + else if (is_list(BIF_ARG_2)) + BIF_TRAP2(dist_ctrl_put_data_trap, + BIF_P, BIF_ARG_1, BIF_ARG_2); + else + BIF_ERROR(BIF_P, BADARG); + + dep = erts_dhandle_to_dist_entry(BIF_ARG_1); + if (!dep) + BIF_ERROR(BIF_P, BADARG); + + input_handler = (Eterm) erts_atomic_read_nob(&dep->input_handler); + + if (input_handler != BIF_P->common.id) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + erts_atomic64_inc_nob(&dep->in); + + if (size != 0) { + byte *data, *temp_alloc = NULL; + + data = (byte *) erts_get_aligned_binary_bytes(BIF_ARG_2, &temp_alloc); + if (!data) + BIF_ERROR(BIF_P, BADARG); + + erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); + + (void) erts_net_message(NULL, dep, NULL, 0, data, size); + /* + * We ignore any decode failures. On fatal failures the + * connection will be taken down by killing the + * distribution channel controller... + */ + + erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + + BUMP_REDS(BIF_P, 5); + + erts_free_aligned_binary_bytes(temp_alloc); + + } + + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_get_stat_1(BIF_ALIST_1) +{ + Sint64 read, write, pend; + Eterm res, *hp, **hpp; + Uint sz, *szp; + DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1); + + if (!dep) + BIF_ERROR(BIF_P, BADARG); + + erts_de_rlock(dep); + + read = (Sint64) erts_atomic64_read_nob(&dep->in); + write = (Sint64) erts_atomic64_read_nob(&dep->out); + pend = (Sint64) erts_atomic_read_nob(&dep->qsize); + + erts_de_runlock(dep); + + sz = 0; + szp = &sz; + hpp = NULL; + + while (1) { + res = erts_bld_tuple(hpp, szp, 4, + am_ok, + erts_bld_sint64(hpp, szp, read), + erts_bld_sint64(hpp, szp, write), + pend ? am_true : am_false); + if (hpp) + break; + hp = HAlloc(BIF_P, sz); + hpp = &hp; + szp = NULL; + } + + BIF_RET(res); +} + +BIF_RETTYPE +dist_ctrl_input_handler_2(BIF_ALIST_2) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) + BIF_ERROR(BIF_P, BADARG); + + if (is_not_internal_pid(BIF_ARG_2)) + BIF_ERROR(BIF_P, BADARG); + + erts_atomic_set_nob(&dep->input_handler, + (erts_aint_t) BIF_ARG_2); + + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_ctrl_get_data_1(BIF_ALIST_1) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + int reds = 1; + ErtsDistOutputBuf *obuf; + Eterm *hp; + ProcBin *pb; + erts_aint_t qsize; + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep) + BIF_ERROR(BIF_P, BADARG); + + erts_de_rlock(dep); + + if (dep->status & ERTS_DE_SFLG_EXITING) + goto return_none; + + ASSERT(dep->cid == BIF_P->common.id); + +#if 0 + if (dep->finalized_out_queue.first) { + obuf = dep->finalized_out_queue.first; + dep->finalized_out_queue.first = obuf->next; + if (!obuf->next) + dep->finalized_out_queue.last = NULL; + } + else +#endif + { + if (!dep->tmp_out_queue.first) { + ASSERT(!dep->tmp_out_queue.last); + qsize = erts_atomic_read_acqb(&dep->qsize); + if (qsize > 0) { + erts_mtx_lock(&dep->qlock); + dep->tmp_out_queue.first = dep->out_queue.first; + dep->tmp_out_queue.last = dep->out_queue.last; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + erts_mtx_unlock(&dep->qlock); + } + } + + if (!dep->tmp_out_queue.first) { + ASSERT(!dep->tmp_out_queue.last); + return_none: + erts_de_runlock(dep); + BIF_RET(am_none); + } + else { + obuf = dep->tmp_out_queue.first; + dep->tmp_out_queue.first = obuf->next; + if (!obuf->next) + dep->tmp_out_queue.last = NULL; + } + + obuf->extp = erts_encode_ext_dist_header_finalize(obuf->extp, + dep->cache, + dep->flags); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--obuf->extp = PASS_THROUGH; /* 'pass through' needed */ + ASSERT(&obuf->data[0] <= obuf->extp + && obuf->extp < obuf->ext_endp); + } + + erts_atomic64_inc_nob(&dep->out); + + erts_de_runlock(dep); + + hp = HAlloc(BIF_P, PROC_BIN_SIZE); + pb = (ProcBin *) (char *) hp; + pb->thing_word = HEADER_PROC_BIN; + pb->size = obuf->ext_endp - obuf->extp; + pb->next = MSO(BIF_P).first; + MSO(BIF_P).first = (struct erl_off_heap_header*) pb; + pb->val = ErtsDistOutputBuf2Binary(obuf); + pb->bytes = (byte*) obuf->extp; + pb->flags = 0; + + qsize = erts_atomic_add_read_nob(&dep->qsize, -size_obuf(obuf)); + ASSERT(qsize >= 0); + + if (qsize < erts_dist_buf_busy_limit/2 + && (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY)) { + ErtsProcList *resume_procs = NULL; + erts_mtx_lock(&dep->qlock); + resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); + erts_mtx_unlock(&dep->qlock); + if (resume_procs) { + int resumed = erts_resume_processes(resume_procs); + reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + } + } + + BIF_RET2(make_binary(pb), reds); +} + void erts_dist_port_not_busy(Port *prt) { @@ -2386,21 +2895,23 @@ erts_dist_port_not_busy(Port *prt) void erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) { - erts_smp_de_rwlock(dep); - if (is_internal_port(dep->cid) - && connection_id == dep->connection_id + erts_de_rwlock(dep); + if (connection_id == dep->connection_id && !(dep->status & ERTS_DE_SFLG_EXITING)) { dep->status |= ERTS_DE_SFLG_EXITING; - erts_smp_mtx_lock(&dep->qlock); - ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT)); - dep->qflgs |= ERTS_DE_QFLG_EXIT; - erts_smp_mtx_unlock(&dep->qlock); + erts_mtx_lock(&dep->qlock); + ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); + erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); + erts_mtx_unlock(&dep->qlock); - erts_schedule_dist_command(NULL, dep); + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + else if (is_internal_pid(dep->cid)) + schedule_kill_dist_ctrl_proc(dep->cid); } - erts_smp_de_rwunlock(dep); + erts_de_rwunlock(dep); } struct print_to_data { @@ -2515,9 +3026,6 @@ info_dist_entry(fmtfn_t to, void *arg, DistEntry *dep, int visible, int connecte } erts_print(to, arg, "Name: %T", dep->sysname); -#ifdef DEBUG - erts_print(to, arg, " (refc=%d)", erts_smp_refc_read(&dep->refc, 0)); -#endif erts_print(to, arg, "\n"); if (!connected && is_nil(dep->cid)) { if (dep->nlinks) { @@ -2637,32 +3145,46 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) goto error; } - net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN, - am_net_kernel, ERTS_PROC_LOCK_MAIN, 0); - if (!net_kernel) + net_kernel = erts_whereis_process(BIF_P, + ERTS_PROC_LOCK_MAIN, + am_net_kernel, + ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS, + 0); + if (!net_kernel || ERTS_PROC_GET_DIST_ENTRY(net_kernel)) goto error; /* By setting F_DISTRIBUTION on net_kernel, - * do_net_exist will be called when net_kernel is terminated !! */ + * erts_do_net_exits will be called when net_kernel is terminated !! */ net_kernel->flags |= F_DISTRIBUTION; - if (net_kernel != BIF_P) - erts_smp_proc_unlock(net_kernel, ERTS_PROC_LOCK_MAIN); + erts_proc_unlock(net_kernel, + (ERTS_PROC_LOCK_STATUS + | ((net_kernel != BIF_P) + ? ERTS_PROC_LOCK_MAIN + : 0))); #ifdef DEBUG - erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx); + erts_rwmtx_rlock(&erts_dist_table_rwmtx); ASSERT(!erts_visible_dist_entries && !erts_hidden_dist_entries); - erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx); + erts_rwmtx_runlock(&erts_dist_table_rwmtx); #endif - erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); - erts_smp_thr_progress_block(); + erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); + erts_thr_progress_block(); inc_no_nodes(); erts_set_this_node(BIF_ARG_1, (Uint32) creation); erts_is_alive = 1; send_nodes_mon_msgs(NULL, am_nodeup, BIF_ARG_1, am_visible, NIL); - erts_smp_thr_progress_unblock(); - erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + erts_thr_progress_unblock(); + erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + + /* + * Note erts_this_dist_entry is changed by erts_set_this_node(), + * so we *need* to use the new one after erts_set_this_node() + * is called. + */ + erts_ref_dist_entry(erts_this_dist_entry); + ERTS_PROC_SET_DIST_ENTRY(net_kernel, erts_this_dist_entry); BIF_RET(am_true); @@ -2693,18 +3215,18 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) Eterm ic, oc; Eterm *tp; DistEntry *dep = NULL; + ErtsProcLocks proc_unlock = 0; + Process *proc; Port *pp = NULL; - /* Prepare for success */ - ERTS_BIF_PREP_RET(ret, am_true); - /* * Check and pick out arguments */ if (!is_node_name_atom(BIF_ARG_1) || - is_not_internal_port(BIF_ARG_2) || - (erts_this_node->sysname == am_Noname)) { + !(is_internal_port(BIF_ARG_2) + || is_internal_pid(BIF_ARG_2)) + || (erts_this_node->sysname == am_Noname)) { goto badarg; } @@ -2748,77 +3270,124 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) else if (!dep) goto system_limit; /* Should never happen!!! */ - pp = erts_id2port_sflgs(BIF_ARG_2, - BIF_P, - ERTS_PROC_LOCK_MAIN, - ERTS_PORT_SFLGS_INVALID_LOOKUP); - erts_smp_de_rwlock(dep); + if (is_internal_pid(BIF_ARG_2)) { + if (BIF_P->common.id == BIF_ARG_2) { + proc_unlock = 0; + proc = BIF_P; + } + else { + proc_unlock = ERTS_PROC_LOCK_MAIN; + proc = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN, + BIF_ARG_2, proc_unlock); + } + erts_de_rwlock(dep); - if (!pp || (erts_atomic32_read_nob(&pp->state) - & ERTS_PORT_SFLG_EXITING)) - goto badarg; + if (!proc) + goto badarg; + else if (proc == ERTS_PROC_LOCK_BUSY) { + proc_unlock = 0; + goto yield; + } - if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) - goto badarg; + erts_proc_lock(proc, ERTS_PROC_LOCK_STATUS); + proc_unlock |= ERTS_PROC_LOCK_STATUS; + + if (ERTS_PROC_GET_DIST_ENTRY(proc)) { + if (dep == ERTS_PROC_GET_DIST_ENTRY(proc) + && (proc->flags & F_DISTRIBUTION) + && dep->cid == BIF_ARG_2) { + ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); + goto done; + } + goto badarg; + } + + if (is_not_nil(dep->cid)) + goto badarg; + + proc->flags |= F_DISTRIBUTION; + ERTS_PROC_SET_DIST_ENTRY(proc, dep); - if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) - goto done; /* Already set */ + proc_unlock &= ~ERTS_PROC_LOCK_STATUS; + erts_proc_unlock(proc, ERTS_PROC_LOCK_STATUS); + + dep->send = NULL; /* Only for distr ports... */ - if (dep->status & ERTS_DE_SFLG_EXITING) { - /* Suspend on dist entry waiting for the exit to finish */ - ErtsProcList *plp = erts_proclist_create(BIF_P); - plp->next = NULL; - erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); - erts_smp_mtx_lock(&dep->qlock); - erts_proclist_store_last(&dep->suspended, plp); - erts_smp_mtx_unlock(&dep->qlock); - goto yield; } + else { - ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); + pp = erts_id2port_sflgs(BIF_ARG_2, + BIF_P, + ERTS_PROC_LOCK_MAIN, + ERTS_PORT_SFLGS_INVALID_LOOKUP); + erts_de_rwlock(dep); - if (pp->dist_entry || is_not_nil(dep->cid)) - goto badarg; + if (!pp || (erts_atomic32_read_nob(&pp->state) + & ERTS_PORT_SFLG_EXITING)) + goto badarg; - erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); + if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) + goto badarg; - /* - * Dist-ports do not use the "busy port message queue" functionality, but - * instead use "busy dist entry" functionality. - */ - { - ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; - erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); - } + if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) { + ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); + goto done; /* Already set */ + } - pp->dist_entry = dep; + if (dep->status & ERTS_DE_SFLG_EXITING) { + /* Suspend on dist entry waiting for the exit to finish */ + ErtsProcList *plp = erts_proclist_create(BIF_P); + plp->next = NULL; + erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); + erts_mtx_lock(&dep->qlock); + erts_proclist_store_last(&dep->suspended, plp); + erts_mtx_unlock(&dep->qlock); + goto yield; + } - dep->version = version; - dep->creation = 0; + ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); - ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); + if (pp->dist_entry || is_not_nil(dep->cid)) + goto badarg; -#if 1 - dep->send = (pp->drv_ptr->outputv - ? dist_port_commandv - : dist_port_command); -#else - dep->send = dist_port_command; -#endif - ASSERT(dep->send); + erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); + + pp->dist_entry = dep; + + ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); + + dep->send = (pp->drv_ptr->outputv + ? dist_port_commandv + : dist_port_command); + ASSERT(dep->send); + + /* + * Dist-ports do not use the "busy port message queue" functionality, but + * instead use "busy dist entry" functionality. + */ + { + ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; + erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); + } + + } + + dep->version = version; + dep->creation = 0; #ifdef DEBUG - erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize == 0); - erts_smp_mtx_unlock(&dep->qlock); + ASSERT(erts_atomic_read_nob(&dep->qsize) == 0); #endif - erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); - if (flags & DFLAG_DIST_HDR_ATOM_CACHE) create_cache(dep); - erts_smp_de_rwunlock(dep); + erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); + + erts_de_rwunlock(dep); + + ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); + dep = NULL; /* inc of refc transferred to port (dist_entry field) */ inc_no_nodes(); @@ -2831,13 +3400,16 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) done: if (dep && dep != erts_this_dist_entry) { - erts_smp_de_rwunlock(dep); + erts_de_rwunlock(dep); erts_deref_dist_entry(dep); } if (pp) erts_port_release(pp); + if (proc_unlock) + erts_proc_unlock(proc, proc_unlock); + return ret; yield: @@ -2883,7 +3455,7 @@ BIF_RETTYPE dist_exit_3(BIF_ALIST_3) if (BIF_P->common.id == local) { lp_locks = ERTS_PROC_LOCKS_ALL; lp = BIF_P; - erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR); + erts_proc_lock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR); } else { lp_locks = ERTS_PROC_LOCKS_XSIG_SEND; @@ -2902,21 +3474,17 @@ BIF_RETTYPE dist_exit_3(BIF_ALIST_3) NIL, NULL, 0); -#ifdef ERTS_SMP if (lp == BIF_P) lp_locks &= ~ERTS_PROC_LOCK_MAIN; -#endif - erts_smp_proc_unlock(lp, lp_locks); + erts_proc_unlock(lp, lp_locks); if (lp == BIF_P) { - erts_aint32_t state = erts_smp_atomic32_read_acqb(&BIF_P->state); + erts_aint32_t state = erts_atomic32_read_acqb(&BIF_P->state); /* * We may have exited current process and may have to take action. */ if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { -#ifdef ERTS_SMP if (state & ERTS_PSFLG_PENDING_EXIT) erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN); -#endif ERTS_BIF_EXITED(BIF_P); } } @@ -3002,7 +3570,7 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) length = 0; - erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx); + erts_rwmtx_rlock(&erts_dist_table_rwmtx); ASSERT(erts_no_of_not_connected_dist_entries > 0); ASSERT(erts_no_of_hidden_dist_entries >= 0); @@ -3019,7 +3587,7 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) result = NIL; if (length == 0) { - erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx); + erts_rwmtx_runlock(&erts_dist_table_rwmtx); goto done; } @@ -3050,7 +3618,7 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) hp += 2; } ASSERT(endp == hp); - erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx); + erts_rwmtx_runlock(&erts_dist_table_rwmtx); done: UnUseTmpHeap(2,BIF_P); @@ -3105,15 +3673,15 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) if (dep == erts_this_dist_entry) goto done; - erts_smp_proc_lock(p, ERTS_PROC_LOCK_LINK); - erts_smp_de_rlock(dep); + erts_proc_lock(p, ERTS_PROC_LOCK_LINK); + erts_de_rlock(dep); if (ERTS_DE_IS_NOT_CONNECTED(dep)) { - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK); - erts_smp_de_runlock(dep); + erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); + erts_de_runlock(dep); goto do_trap; } - erts_smp_de_links_lock(dep); - erts_smp_de_runlock(dep); + erts_de_links_lock(dep); + erts_de_runlock(dep); if (Bool == am_true) { ASSERT(dep->cid != NIL); @@ -3140,11 +3708,10 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) } } - erts_smp_de_links_unlock(dep); - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK); + erts_de_links_unlock(dep); + erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); done: - erts_deref_dist_entry(dep); BIF_RET(am_true); } @@ -3173,9 +3740,9 @@ BIF_RETTYPE net_kernel_dflag_unicode_io_1(BIF_ALIST_1) if (de == erts_this_dist_entry) { BIF_RET(am_true); } - erts_smp_de_rlock(de); + erts_de_rlock(de); f = de->flags; - erts_smp_de_runlock(de); + erts_de_runlock(de); BIF_RET(((f & DFLAG_UNICODE_IO) ? am_true : am_false)); } @@ -3205,7 +3772,7 @@ struct ErtsNodesMonitor_ { Uint16 no; }; -static erts_smp_mtx_t nodes_monitors_mtx; +static erts_mtx_t nodes_monitors_mtx; static ErtsNodesMonitor *nodes_monitors; static ErtsNodesMonitor *nodes_monitors_end; @@ -3223,7 +3790,7 @@ static ErtsNodesMonitor *nodes_monitors_end; static void init_nodes_monitors(void) { - erts_smp_mtx_init(&nodes_monitors_mtx, "nodes_monitors", NIL, + erts_mtx_init(&nodes_monitors_mtx, "nodes_monitors", NIL, ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION); nodes_monitors = NULL; nodes_monitors_end = NULL; @@ -3349,10 +3916,10 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas } #endif - ERTS_SMP_LC_ASSERT(!c_p + ERTS_LC_ASSERT(!c_p || (erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN)); - erts_smp_mtx_lock(&nodes_monitors_mtx); + erts_mtx_lock(&nodes_monitors_mtx); for (nmp = nodes_monitors; nmp; nmp = nmp->next) { int i; @@ -3385,7 +3952,7 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas if (rp) { if (rp == c_p) rp_locks &= ~ERTS_PROC_LOCK_MAIN; - erts_smp_proc_unlock(rp, rp_locks); + erts_proc_unlock(rp, rp_locks); } rp = nmp->proc; @@ -3412,10 +3979,10 @@ send_nodes_mon_msgs(Process *c_p, Eterm what, Eterm node, Eterm type, Eterm reas if (rp) { if (rp == c_p) rp_locks &= ~ERTS_PROC_LOCK_MAIN; - erts_smp_proc_unlock(rp, rp_locks); + erts_proc_unlock(rp, rp_locks); } - erts_smp_mtx_unlock(&nodes_monitors_mtx); + erts_mtx_unlock(&nodes_monitors_mtx); } static Eterm @@ -3425,8 +3992,8 @@ insert_nodes_monitor(Process *c_p, Uint32 opts) Eterm res = am_false; ErtsNodesMonitor *xnmp, *nmp; - ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&nodes_monitors_mtx)); - ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN); + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&nodes_monitors_mtx)); + ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN); xnmp = c_p->nodes_monitors; if (xnmp) { @@ -3510,8 +4077,8 @@ remove_nodes_monitors(Process *c_p, Uint32 opts, int all) Eterm res = am_false; ErtsNodesMonitor *nmp; - ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&nodes_monitors_mtx)); - ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN); + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&nodes_monitors_mtx)); + ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) & ERTS_PROC_LOCK_MAIN); nmp = c_p->nodes_monitors; ASSERT(!nmp || !nmp->prev || nmp->prev->proc != c_p); @@ -3553,23 +4120,23 @@ remove_nodes_monitors(Process *c_p, Uint32 opts, int all) void erts_delete_nodes_monitors(Process *c_p, ErtsProcLocks locks) { -#if defined(ERTS_ENABLE_LOCK_CHECK) && defined(ERTS_SMP) +#if defined(ERTS_ENABLE_LOCK_CHECK) if (c_p) { ErtsProcLocks might_unlock = locks & ~ERTS_PROC_LOCK_MAIN; if (might_unlock) erts_proc_lc_might_unlock(c_p, might_unlock); } #endif - if (erts_smp_mtx_trylock(&nodes_monitors_mtx) == EBUSY) { + if (erts_mtx_trylock(&nodes_monitors_mtx) == EBUSY) { ErtsProcLocks unlock_locks = locks & ~ERTS_PROC_LOCK_MAIN; if (c_p && unlock_locks) - erts_smp_proc_unlock(c_p, unlock_locks); - erts_smp_mtx_lock(&nodes_monitors_mtx); + erts_proc_unlock(c_p, unlock_locks); + erts_mtx_lock(&nodes_monitors_mtx); if (c_p && unlock_locks) - erts_smp_proc_lock(c_p, unlock_locks); + erts_proc_lock(c_p, unlock_locks); } remove_nodes_monitors(c_p, 0, 1); - erts_smp_mtx_unlock(&nodes_monitors_mtx); + erts_mtx_unlock(&nodes_monitors_mtx); } Eterm @@ -3580,7 +4147,7 @@ erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist) Uint16 opts = (Uint16) 0; ASSERT(c_p); - ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); + ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); if (on != am_true && on != am_false) return THE_NON_VALUE; @@ -3636,14 +4203,14 @@ erts_monitor_nodes(Process *c_p, Eterm on, Eterm olist) return THE_NON_VALUE; } - erts_smp_mtx_lock(&nodes_monitors_mtx); + erts_mtx_lock(&nodes_monitors_mtx); if (on == am_true) res = insert_nodes_monitor(c_p, opts); else res = remove_nodes_monitors(c_p, opts, 0); - erts_smp_mtx_unlock(&nodes_monitors_mtx); + erts_mtx_unlock(&nodes_monitors_mtx); return res; } @@ -3666,8 +4233,8 @@ erts_processes_monitoring_nodes(Process *c_p) #endif ASSERT(c_p); - ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); - erts_smp_mtx_lock(&nodes_monitors_mtx); + ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); + erts_mtx_lock(&nodes_monitors_mtx); sz = 0; szp = &sz; @@ -3716,7 +4283,7 @@ erts_processes_monitoring_nodes(Process *c_p) ASSERT(hp == hend); - erts_smp_mtx_unlock(&nodes_monitors_mtx); + erts_mtx_unlock(&nodes_monitors_mtx); return res; } |