diff options
author | Rickard Green <[email protected]> | 2017-06-30 16:32:42 +0200 |
---|---|---|
committer | Rickard Green <[email protected]> | 2017-08-28 14:40:52 +0200 |
commit | 78fad16ef7c5477239bc0b51125fabfe6567039d (patch) | |
tree | 87071be4c822f110bfcac704d7d8484b3325f0ab /erts/emulator/beam/dist.c | |
parent | 6b267b203c950db2879f254b6a9d3b7591115f9d (diff) | |
download | otp-78fad16ef7c5477239bc0b51125fabfe6567039d.tar.gz otp-78fad16ef7c5477239bc0b51125fabfe6567039d.tar.bz2 otp-78fad16ef7c5477239bc0b51125fabfe6567039d.zip |
Support for distribution controller processes
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r-- | erts/emulator/beam/dist.c | 872 |
1 files changed, 707 insertions, 165 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index d9feec4722..5d7501f234 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -121,7 +121,7 @@ Export* dexit_trap = NULL; Export* dmonitor_p_trap = NULL; /* local variables */ - +static Export *dist_ctrl_put_data_trap; /* forward declarations */ @@ -156,9 +156,7 @@ create_cache(DistEntry *dep) int i; ErtsAtomCache *cp; - ERTS_SMP_LC_ASSERT( - is_internal_port(dep->cid) - && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); + ERTS_SMP_LC_ASSERT(is_nil(dep->cid)); ASSERT(!dep->cache); dep->cache = cp = (ErtsAtomCache*) erts_alloc(ERTS_ALC_T_DCACHE, @@ -450,7 +448,35 @@ inc_no_nodes(void) #endif erts_smp_atomic_inc_mb(&no_nodes); } - + +static void +kill_dist_ctrl_proc(void *vpid) +{ + Eterm pid = (Eterm) vpid; + ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; + Process *rp = erts_pid2proc(NULL, 0, pid, rp_locks); + if (rp) { + erts_send_exit_signal(NULL, rp->common.id, rp, &rp_locks, + am_kill, NIL, NULL, 0); + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); + } +} + +static void +schedule_kill_dist_ctrl_proc(Eterm pid) +{ + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + int sched_id = 1; + if (!esdp || ERTS_SCHEDULER_IS_DIRTY(esdp)) + sched_id = 1; + else + sched_id = (int) esdp->no; + erts_schedule_misc_aux_work(sched_id, + kill_dist_ctrl_proc, + (void *) (UWord) pid); +} + /* * proc is currently running or exiting process. */ @@ -460,58 +486,62 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */ DistEntry *tdep; - int no_dist_port = 0; + int no_dist_ctrl = 0; Eterm nd_reason = (reason == am_no_network ? am_no_network : am_net_kernel_terminated); erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) - no_dist_port++; + no_dist_ctrl++; for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) - no_dist_port++; + no_dist_ctrl++; /* KILL all port controllers */ - if (no_dist_port == 0) + if (no_dist_ctrl == 0) erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx); else { Eterm def_buf[128]; int i = 0; - Eterm *dist_port; + Eterm *dist_ctrl; - if (no_dist_port <= sizeof(def_buf)/sizeof(def_buf[0])) - dist_port = &def_buf[0]; + if (no_dist_ctrl <= sizeof(def_buf)/sizeof(def_buf[0])) + dist_ctrl = &def_buf[0]; else - dist_port = erts_alloc(ERTS_ALC_T_TMP, - sizeof(Eterm)*no_dist_port); + dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, + sizeof(Eterm)*no_dist_ctrl); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) { - ASSERT(is_internal_port(tdep->cid)); - dist_port[i++] = tdep->cid; + ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + dist_ctrl[i++] = tdep->cid; } for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) { - ASSERT(is_internal_port(tdep->cid)); - dist_port[i++] = tdep->cid; + ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + dist_ctrl[i++] = tdep->cid; } erts_smp_rwmtx_runlock(&erts_dist_table_rwmtx); - for (i = 0; i < no_dist_port; i++) { - Port *prt = erts_port_lookup(dist_port[i], - ERTS_PORT_SFLGS_INVALID_LOOKUP); - if (!prt) - continue; - ASSERT(erts_atomic32_read_nob(&prt->state) - & ERTS_PORT_SFLG_DISTRIBUTION); - - erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED, - prt, dist_port[i], nd_reason, NULL); + for (i = 0; i < no_dist_ctrl; i++) { + if (is_internal_pid(dist_ctrl[i])) + schedule_kill_dist_ctrl_proc(dist_ctrl[i]); + else { + Port *prt = erts_port_lookup(dist_ctrl[i], + ERTS_PORT_SFLGS_INVALID_LOOKUP); + if (prt) { + ASSERT(erts_atomic32_read_nob(&prt->state) + & ERTS_PORT_SFLG_DISTRIBUTION); + + erts_port_exit(NULL, ERTS_PORT_SIG_FLG_FORCE_SCHED, + prt, dist_ctrl[i], nd_reason, NULL); + } + } } - if (dist_port != &def_buf[0]) - erts_free(ERTS_ALC_T_TMP, dist_port); + if (dist_ctrl != &def_buf[0]) + erts_free(ERTS_ALC_T_TMP, dist_ctrl); } /* - * When last dist port exits, node will be taken + * When last dist ctrl exits, node will be taken * from alive to not alive. */ ASSERT(is_nil(nodedown.reason) && !nodedown.bp); @@ -528,7 +558,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) &nodedown.bp->off_heap); } } - else { /* Call from distribution port */ + else { /* Call from distribution controller (port/process) */ NetExitsContext nec = {dep}; ErtsLink *nlinks; ErtsLink *node_links; @@ -538,11 +568,12 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) erts_smp_atomic_set_mb(&dep->dist_cmd_scheduled, 1); erts_smp_de_rwlock(dep); - ERTS_SMP_LC_ASSERT(is_internal_port(dep->cid) - && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); + if (is_internal_port(dep->cid)) { + ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); - if (erts_port_task_is_scheduled(&dep->dist_cmd)) - erts_port_task_abort(&dep->dist_cmd); + if (erts_port_task_is_scheduled(&dep->dist_cmd)) + erts_port_task_abort(&dep->dist_cmd); + } if (dep->status & ERTS_DE_SFLG_EXITING) { #ifdef DEBUG @@ -618,6 +649,9 @@ void init_dist(void) dgroup_leader_trap = trap_function(am_dgroup_leader,2); dexit_trap = trap_function(am_dexit, 2); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); + dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, + am_dist_ctrl_put_data, + 2); } #define ErtsDistOutputBuf2Binary(OB) \ @@ -660,6 +694,8 @@ static void clear_dist_entry(DistEntry *dep) ErtsDistOutputBuf *obuf; erts_smp_de_rwlock(dep); + erts_smp_atomic_set_nob(&dep->input_handler, + (erts_aint_t) NIL); cache = dep->cache; dep->cache = NULL; @@ -673,6 +709,9 @@ static void clear_dist_entry(DistEntry *dep) erts_smp_mtx_lock(&dep->qlock); + erts_smp_atomic64_set_nob(&dep->in, 0); + erts_smp_atomic64_set_nob(&dep->out, 0); + if (!dep->out_queue.last) obuf = dep->finalized_out_queue.first; else { @@ -680,8 +719,15 @@ static void clear_dist_entry(DistEntry *dep) obuf = dep->out_queue.first; } + if (dep->tmp_out_queue.first) { + dep->tmp_out_queue.last->next = obuf; + obuf = dep->tmp_out_queue.first; + } + dep->out_queue.first = NULL; dep->out_queue.last = NULL; + dep->tmp_out_queue.first = NULL; + dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; dep->status = 0; @@ -1148,6 +1194,7 @@ int erts_net_message(Port *prt, ErtsLink *lnk; Uint tuple_arity; int res; + Uint32 connection_id; #ifdef ERTS_DIST_MSG_DBG ErlDrvSizeT orig_len = len; #endif @@ -1156,14 +1203,17 @@ int erts_net_message(Port *prt, ERTS_SMP_CHK_NO_PROC_LOCKS; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_SMP_LC_ASSERT(!prt || erts_lc_is_port_locked(prt)); if (!erts_is_alive) { UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); return 0; } + + if (hlen != 0) goto data_error; + if (len == 0) { /* HANDLE TICK !!! */ UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); return 0; @@ -1187,25 +1237,31 @@ int erts_net_message(Port *prt, goto data_error; } - res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache); + res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache, &connection_id); - if (res >= 0) - res = ctl_len = erts_decode_dist_ext_size(&ede); - else { + switch (res) { + case ERTS_PREP_DIST_EXT_CLOSED: + return 0; /* Connection not alive; ignore signal... */ + case ERTS_PREP_DIST_EXT_FAILED: #ifdef ERTS_DIST_MSG_DBG erts_fprintf(stderr, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n"); bw(buf, orig_len); #endif - ctl_len = 0; - } - - if (res < 0) { + goto data_error; + case ERTS_PREP_DIST_EXT_SUCCESS: + ctl_len = erts_decode_dist_ext_size(&ede); + if (ctl_len < 0) { #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n"); - bw(buf, orig_len); + erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n"); + bw(buf, orig_len); #endif - PURIFY_MSG("data error"); - goto data_error; + PURIFY_MSG("data error"); + goto data_error; + } + break; + default: + ERTS_INTERNAL_ERROR("Unexpected result from erts_prepare_dist_ext()"); + break; } if (ctl_len > DIST_CTL_DEFAULT_SIZE) { @@ -1726,7 +1782,7 @@ decode_error: } data_error: UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); - erts_deliver_port_exit(prt, dep->cid, am_killed, 0, 1); + erts_kill_dist_connection(dep, connection_id); ERTS_SMP_CHK_NO_PROC_LOCKS; return -1; } @@ -1747,6 +1803,31 @@ static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy) return ret; } +static ERTS_INLINE void +notify_dist_data(Process *c_p, Eterm pid) +{ + Process *rp; + ErtsProcLocks rp_locks; + + ASSERT(erts_get_scheduler_data() + && !ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data())); + ASSERT(is_internal_pid(pid)); + + if (c_p && c_p->common.id == pid) { + rp = c_p; + rp_locks = ERTS_PROC_LOCK_MAIN; + } + else { + rp = erts_proc_lookup(pid); + rp_locks = 0; + } + + if (rp) { + ErtsMessage *mp = erts_alloc_message(0, NULL); + erts_queue_message(rp, rp_locks, mp, am_dist_data, am_system); + } +} + int erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) { @@ -1865,14 +1946,28 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) Sint qsize; erts_aint32_t qflgs; ErtsProcList *plp = NULL; + Eterm notify_proc = NIL; + Sint obsz = size_obuf(ctx->obuf); + erts_smp_mtx_lock(&dep->qlock); - qsize = erts_smp_atomic_add_read_nob(&dep->qsize, - (erts_aint_t) size_obuf(ctx->obuf)); + qsize = erts_smp_atomic_add_read_nob(&dep->qsize, (erts_aint_t) obsz); + ASSERT(qsize >= obsz); qflgs = erts_smp_atomic32_read_nob(&dep->qflgs); if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) { erts_smp_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY); qflgs |= ERTS_DE_QFLG_BUSY; } + if (qsize == obsz && (qflgs & ERTS_DE_QFLG_REQ_INFO)) { + /* Previously empty queue and info requested... */ + qflgs = erts_smp_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + notify_proc = dep->cid; + ASSERT(is_internal_pid(notify_proc)); + } + /* else: requester will send itself the message... */ + qflgs &= ~ERTS_DE_QFLG_REQ_INFO; + } if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) { erts_smp_mtx_unlock(&dep->qlock); @@ -1916,8 +2011,11 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } erts_smp_mtx_unlock(&dep->qlock); - erts_schedule_dist_command(NULL, dep); + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); erts_smp_de_runlock(dep); + if (is_internal_pid(notify_proc)) + notify_dist_data(ctx->c_p, notify_proc); if (resume) { erts_resume(ctx->c_p, ERTS_PROC_LOCK_MAIN); @@ -1971,16 +2069,20 @@ static Uint dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) { int fpe_was_unmasked; - Uint size = obuf->ext_endp - obuf->extp; + ErlDrvSizeT size; + char *bufp; ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (size > (Uint) INT_MAX) - erts_exit(ERTS_DUMP_EXIT, - "Absurdly large distribution output data buffer " - "(%beu bytes) passed.\n", - size); + if (!obuf) { + size = 0; + bufp = NULL; + } + else { + size = obuf->ext_endp - obuf->extp; + bufp = (char*) obuf->extp; + } #ifdef USE_VM_PROBES if (DTRACE_ENABLED(dist_output)) { @@ -1995,11 +2097,10 @@ dist_port_command(Port *prt, ErtsDistOutputBuf *obuf) remote_str, size); } #endif + prt->caller = NIL; fpe_was_unmasked = erts_block_fpe(); - (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, - (char*) obuf->extp, - (int) size); + (*prt->drv_ptr->output)((ErlDrvData) prt->drv_data, bufp, size); erts_unblock_fpe(fpe_was_unmasked); return size; } @@ -2008,7 +2109,7 @@ static Uint dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) { int fpe_was_unmasked; - Uint size = obuf->ext_endp - obuf->extp; + ErlDrvSizeT size; SysIOVec iov[2]; ErlDrvBinary* bv[2]; ErlIOVec eiov; @@ -2016,25 +2117,33 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - if (size > (Uint) INT_MAX) - erts_exit(ERTS_DUMP_EXIT, - "Absurdly large distribution output data buffer " - "(%beu bytes) passed.\n", - size); - iov[0].iov_base = NULL; iov[0].iov_len = 0; bv[0] = NULL; - iov[1].iov_base = obuf->extp; - iov[1].iov_len = size; - bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); + if (!obuf) { + size = 0; + eiov.vsize = 1; + } + else { + size = obuf->ext_endp - obuf->extp; + eiov.vsize = 2; + + iov[1].iov_base = obuf->extp; + iov[1].iov_len = size; + bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); + } - eiov.vsize = 2; eiov.size = size; eiov.iov = iov; eiov.binv = bv; + if (size > (Uint) INT_MAX) + erts_exit(ERTS_DUMP_EXIT, + "Absurdly large distribution output data buffer " + "(%beu bytes) passed.\n", + size); + ASSERT(prt->drv_ptr->outputv); #ifdef USE_VM_PROBES @@ -2138,20 +2247,20 @@ erts_dist_command(Port *prt, int reds_limit) if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) { int preempt = 0; do { - Uint size; - ErtsDistOutputBuf *fob; - - size = (*send)(prt, foq.first); - esdp->io.out += (Uint64) size; + Uint size; + ErtsDistOutputBuf *fob; + size = (*send)(prt, foq.first); + erts_smp_atomic64_inc_nob(&dep->out); + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG - erts_fprintf(stderr, ">> "); - bw(foq.first->extp, size); + erts_fprintf(stderr, ">> "); + bw(foq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = foq.first; - obufsize += size_obuf(fob); - foq.first = foq.first->next; - free_dist_obuf(fob); + reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = foq.first; + obufsize += size_obuf(fob); + foq.first = foq.first->next; + free_dist_obuf(fob); sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) @@ -2215,29 +2324,30 @@ erts_dist_command(Port *prt, int reds_limit) int de_busy; int preempt = 0; while (oq.first && !preempt) { - ErtsDistOutputBuf *fob; - Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ - ASSERT(&oq.first->data[0] <= oq.first->extp - && oq.first->extp < oq.first->ext_endp); - size = (*send)(prt, oq.first); - esdp->io.out += (Uint64) size; + ErtsDistOutputBuf *fob; + Uint size; + oq.first->extp + = erts_encode_ext_dist_header_finalize(oq.first->extp, + dep->cache, + flags); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' + needed */ + ASSERT(&oq.first->data[0] <= oq.first->extp + && oq.first->extp < oq.first->ext_endp); + size = (*send)(prt, oq.first); + erts_smp_atomic64_inc_nob(&dep->out); + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG - erts_fprintf(stderr, ">> "); - bw(oq.first->extp, size); + erts_fprintf(stderr, ">> "); + bw(oq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = oq.first; - obufsize += size_obuf(fob); - oq.first = oq.first->next; - free_dist_obuf(fob); + reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = oq.first; + obufsize += size_obuf(fob); + oq.first = oq.first->next; + free_dist_obuf(fob); sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags); preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt) @@ -2379,6 +2489,374 @@ erts_dist_command(Port *prt, int reds_limit) goto done; } +#if 0 + +int +dist_data_finalize(Process *c_p, int reds_limit) +{ + int reds = 5; + DistEntry *dep = ; + ErtsDistOutputQueue oq, foq; + ErtsDistOutputBuf *ob; + int preempt; + + + erts_smp_mtx_lock(&dep->qlock); + flags = dep->flags; + oq.first = dep->out_queue.first; + oq.last = dep->out_queue.last; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + erts_smp_mtx_unlock(&dep->qlock); + + if (!oq.first) { + ASSERT(!oq.last); + oq.first = dep->tmp_out_queue.first; + oq.last = dep->tmp_out_queue.last; + } + else { + ErtsDistOutputBuf *f, *l; + ASSERT(oq.last); + if (dep->tmp_out_queue.last) { + dep->tmp_out_queue.last->next = oq.first; + oq.first = dep->tmp_out_queue.first; + } + } + + if (!oq.first) { + /* Nothing to do... */ + ASSERT(!oq.last); + return reds; + } + + foq.first = dep->finalized_out_queue.first; + foq.last = dep->finalized_out_queue.last; + + preempt = 0; + ob = oq.first; + ASSERT(ob); + + do { + ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, + dep->cache, + flags); + if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' + needed */ + ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + preempt = reds > reds_limit; + if (preempt) + break; + ob = ob->next; + } while (ob); + /* + * At least one buffer was finalized; if we got preempted, + * ob points to the last buffer that we finalized. + */ + if (foq.last) + foq.last->next = oq.first; + else + foq.first = oq.first; + if (!preempt) { + /* All buffers finalized */ + foq.last = oq.last; + oq.first = oq.last = NULL; + } + else { + /* Not all buffers finalized; split oq. */ + foq.last = ob; + oq.first = ob->next; + if (oq.first) + ob->next = NULL; + else + oq.last = NULL; + } + + dep->finalized_out_queue.first = foq.first; + dep->finalized_out_queue.last = foq.last; + dep->tmp_out_queue.first = oq.first; + dep->tmp_out_queue.last = oq.last; + + return reds; +} + +#endif + +BIF_RETTYPE +dist_ctrl_get_data_notification_1(BIF_ALIST_1) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + erts_aint32_t qflgs; + erts_aint_t qsize; + Eterm receiver = NIL; + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (dep->sysname != BIF_ARG_1) + BIF_ERROR(BIF_P, BADARG); + + /* + * Caller is the only one that can consume from this queue + * and the only one that can set the req-info flag... + */ + + erts_smp_de_rlock(dep); + + ASSERT(dep->cid == BIF_P->common.id); + + qflgs = erts_smp_atomic32_read_acqb(&dep->qflgs); + + if (!(qflgs & ERTS_DE_QFLG_REQ_INFO)) { + qsize = erts_smp_atomic_read_acqb(&dep->qsize); + ASSERT(qsize >= 0); + if (qsize > 0) + receiver = BIF_P->common.id; /* Notify ourselves... */ + else { /* Empty queue; set req-info flag... */ + qflgs = erts_smp_atomic32_read_bor_mb(&dep->qflgs, + ERTS_DE_QFLG_REQ_INFO); + qsize = erts_smp_atomic_read_acqb(&dep->qsize); + ASSERT(qsize >= 0); + if (qsize > 0) { + qflgs = erts_smp_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) + receiver = BIF_P->common.id; /* Notify ourselves... */ + /* else: someone else will notify us... */ + } + /* else: still empty queue... */ + } + } + /* else: Already requested... */ + + erts_smp_de_runlock(dep); + + if (is_internal_pid(receiver)) + notify_dist_data(BIF_P, receiver); + + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_ctrl_put_data_2(BIF_ALIST_2) +{ + DistEntry *dep; + ErlDrvSizeT size; + Eterm input_handler; + + if (is_binary(BIF_ARG_2)) + size = binary_size(BIF_ARG_2); + else if (is_nil(BIF_ARG_2)) + size = 0; + else if (is_list(BIF_ARG_2)) + BIF_TRAP2(dist_ctrl_put_data_trap, + BIF_P, BIF_ARG_1, BIF_ARG_2); + else + BIF_ERROR(BIF_P, BADARG); + + dep = erts_find_dist_entry(BIF_ARG_1); + if (!dep) + BIF_ERROR(BIF_P, BADARG); + + input_handler = (Eterm) erts_smp_atomic_read_nob(&dep->input_handler); + + if (input_handler != BIF_P->common.id) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + erts_smp_atomic64_inc_nob(&dep->in); + + if (size != 0) { + byte *data, *temp_alloc = NULL; + + data = (byte *) erts_get_aligned_binary_bytes(BIF_ARG_2, &temp_alloc); + if (!data) + BIF_ERROR(BIF_P, BADARG); + + erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); + + (void) erts_net_message(NULL, dep, NULL, 0, data, size); + /* + * We ignore any decode failures. On fatal failures the + * connection will be taken down by killing the + * distribution channel controller... + */ + + erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + + BUMP_REDS(BIF_P, 5); + + erts_free_aligned_binary_bytes(temp_alloc); + + } + + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_get_stat_1(BIF_ALIST_1) +{ + Sint64 read, write, pend; + Eterm res, *hp, **hpp; + Uint sz, *szp; + DistEntry *dep = erts_find_dist_entry(BIF_ARG_1); + + if (!dep) + BIF_ERROR(BIF_P, BADARG); + + ASSERT(dep->sysname == BIF_ARG_1); + + erts_smp_de_rlock(dep); + + read = (Sint64) erts_smp_atomic64_read_nob(&dep->in); + write = (Sint64) erts_smp_atomic64_read_nob(&dep->out); + pend = (Sint64) erts_smp_atomic_read_nob(&dep->qsize); + + erts_smp_de_runlock(dep); + + erts_deref_dist_entry(dep); + + sz = 0; + szp = &sz; + hpp = NULL; + + while (1) { + res = erts_bld_tuple(hpp, szp, 4, + am_ok, + erts_bld_sint64(hpp, szp, read), + erts_bld_sint64(hpp, szp, write), + pend ? am_true : am_false); + if (hpp) + break; + hp = HAlloc(BIF_P, sz); + hpp = &hp; + szp = NULL; + } + + BIF_RET(res); +} + +BIF_RETTYPE +dist_ctrl_input_handler_2(BIF_ALIST_2) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (dep->sysname != BIF_ARG_1) + BIF_ERROR(BIF_P, BADARG); + + if (is_not_internal_pid(BIF_ARG_2)) + BIF_ERROR(BIF_P, BADARG); + + erts_smp_atomic_set_nob(&dep->input_handler, + (erts_aint_t) BIF_ARG_2); + + BIF_RET(am_ok); +} + +BIF_RETTYPE +dist_ctrl_get_data_1(BIF_ALIST_1) +{ + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); + int reds = 1; + ErtsDistOutputBuf *obuf; + Eterm *hp; + ProcBin *pb; + erts_aint_t qsize; + + if (!dep) + BIF_ERROR(BIF_P, EXC_NOTSUP); + + if (dep->sysname != BIF_ARG_1) + BIF_ERROR(BIF_P, BADARG); + + erts_smp_de_rlock(dep); + + if (dep->status & ERTS_DE_SFLG_EXITING) + goto return_none; + + ASSERT(dep->cid == BIF_P->common.id); + +#if 0 + if (dep->finalized_out_queue.first) { + obuf = dep->finalized_out_queue.first; + dep->finalized_out_queue.first = obuf->next; + if (!obuf->next) + dep->finalized_out_queue.last = NULL; + } + else +#endif + { + if (!dep->tmp_out_queue.first) { + ASSERT(!dep->tmp_out_queue.last); + qsize = erts_smp_atomic_read_acqb(&dep->qsize); + if (qsize > 0) { + erts_smp_mtx_lock(&dep->qlock); + dep->tmp_out_queue.first = dep->out_queue.first; + dep->tmp_out_queue.last = dep->out_queue.last; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + erts_smp_mtx_unlock(&dep->qlock); + } + } + + if (!dep->tmp_out_queue.first) { + ASSERT(!dep->tmp_out_queue.last); + return_none: + erts_smp_de_runlock(dep); + BIF_RET(am_none); + } + else { + obuf = dep->tmp_out_queue.first; + dep->tmp_out_queue.first = obuf->next; + if (!obuf->next) + dep->tmp_out_queue.last = NULL; + } + + obuf->extp = erts_encode_ext_dist_header_finalize(obuf->extp, + dep->cache, + dep->flags); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--obuf->extp = PASS_THROUGH; /* 'pass through' needed */ + ASSERT(&obuf->data[0] <= obuf->extp + && obuf->extp < obuf->ext_endp); + } + + erts_smp_atomic64_inc_nob(&dep->out); + + erts_smp_de_runlock(dep); + + hp = HAlloc(BIF_P, PROC_BIN_SIZE); + pb = (ProcBin *) (char *) hp; + pb->thing_word = HEADER_PROC_BIN; + pb->size = obuf->ext_endp - obuf->extp; + pb->next = MSO(BIF_P).first; + MSO(BIF_P).first = (struct erl_off_heap_header*) pb; + pb->val = ErtsDistOutputBuf2Binary(obuf); + pb->bytes = (byte*) obuf->extp; + pb->flags = 0; + + qsize = erts_smp_atomic_add_read_nob(&dep->qsize, -size_obuf(obuf)); + ASSERT(qsize >= 0); + + if (qsize < erts_dist_buf_busy_limit/2 + && (erts_smp_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY)) { + ErtsProcList *resume_procs = NULL; + erts_smp_mtx_lock(&dep->qlock); + resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); + erts_smp_mtx_unlock(&dep->qlock); + if (resume_procs) { + int resumed = erts_resume_processes(resume_procs); + reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + } + } + + BIF_RET2(make_binary(pb), reds); +} + void erts_dist_port_not_busy(Port *prt) { @@ -2402,8 +2880,7 @@ void erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) { erts_smp_de_rwlock(dep); - if (is_internal_port(dep->cid) - && connection_id == dep->connection_id + if (connection_id == dep->connection_id && !(dep->status & ERTS_DE_SFLG_EXITING)) { dep->status |= ERTS_DE_SFLG_EXITING; @@ -2413,7 +2890,10 @@ erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) erts_smp_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); erts_smp_mtx_unlock(&dep->qlock); - erts_schedule_dist_command(NULL, dep); + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + else if (is_internal_pid(dep->cid)) + schedule_kill_dist_ctrl_proc(dep->cid); } erts_smp_de_rwunlock(dep); } @@ -2652,17 +3132,23 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) goto error; } - net_kernel = erts_whereis_process(BIF_P, ERTS_PROC_LOCK_MAIN, - am_net_kernel, ERTS_PROC_LOCK_MAIN, 0); - if (!net_kernel) + net_kernel = erts_whereis_process(BIF_P, + ERTS_PROC_LOCK_MAIN, + am_net_kernel, + ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS, + 0); + if (!net_kernel || ERTS_PROC_GET_DIST_ENTRY(net_kernel)) goto error; /* By setting F_DISTRIBUTION on net_kernel, - * do_net_exist will be called when net_kernel is terminated !! */ + * erts_do_net_exits will be called when net_kernel is terminated !! */ net_kernel->flags |= F_DISTRIBUTION; - if (net_kernel != BIF_P) - erts_smp_proc_unlock(net_kernel, ERTS_PROC_LOCK_MAIN); + erts_smp_proc_unlock(net_kernel, + (ERTS_PROC_LOCK_STATUS + | ((net_kernel != BIF_P) + ? ERTS_PROC_LOCK_MAIN + : 0))); #ifdef DEBUG erts_smp_rwmtx_rlock(&erts_dist_table_rwmtx); @@ -2679,6 +3165,14 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) erts_smp_thr_progress_unblock(); erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + /* + * Note erts_this_dist_entry is changed by erts_set_this_node(), + * so we *need* to use the new one after erts_set_this_node() + * is called. + */ + erts_smp_refc_inc(&erts_this_dist_entry->refc, 1); + ERTS_PROC_SET_DIST_ENTRY(net_kernel, erts_this_dist_entry); + BIF_RET(am_true); error: @@ -2708,18 +3202,21 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) Eterm ic, oc; Eterm *tp; DistEntry *dep = NULL; + ErtsProcLocks proc_unlock = 0; + Process *proc; Port *pp = NULL; /* Prepare for success */ - ERTS_BIF_PREP_RET(ret, am_true); + ERTS_BIF_PREP_RET(ret, BIF_ARG_1); /* * Check and pick out arguments */ if (!is_node_name_atom(BIF_ARG_1) || - is_not_internal_port(BIF_ARG_2) || - (erts_this_node->sysname == am_Noname)) { + !(is_internal_port(BIF_ARG_2) + || is_internal_pid(BIF_ARG_2)) + || (erts_this_node->sysname == am_Noname)) { goto badarg; } @@ -2763,74 +3260,116 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) else if (!dep) goto system_limit; /* Should never happen!!! */ - pp = erts_id2port_sflgs(BIF_ARG_2, - BIF_P, - ERTS_PROC_LOCK_MAIN, - ERTS_PORT_SFLGS_INVALID_LOOKUP); - erts_smp_de_rwlock(dep); + if (is_internal_pid(BIF_ARG_2)) { + if (BIF_P->common.id == BIF_ARG_2) { + proc_unlock = 0; + proc = BIF_P; + } + else { + proc_unlock = ERTS_PROC_LOCK_MAIN; + proc = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN, + BIF_ARG_2, proc_unlock); + } + erts_smp_de_rwlock(dep); - if (!pp || (erts_atomic32_read_nob(&pp->state) - & ERTS_PORT_SFLG_EXITING)) - goto badarg; + if (!proc) + goto badarg; + else if (proc == ERTS_PROC_LOCK_BUSY) { + proc_unlock = 0; + goto yield; + } - if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) - goto badarg; + erts_smp_proc_lock(proc, ERTS_PROC_LOCK_STATUS); + proc_unlock |= ERTS_PROC_LOCK_STATUS; - if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) - goto done; /* Already set */ + if (ERTS_PROC_GET_DIST_ENTRY(proc)) { + if (dep == ERTS_PROC_GET_DIST_ENTRY(proc) + && (proc->flags & F_DISTRIBUTION) + && dep->cid == BIF_ARG_2) + goto done; + goto badarg; + } - if (dep->status & ERTS_DE_SFLG_EXITING) { - /* Suspend on dist entry waiting for the exit to finish */ - ErtsProcList *plp = erts_proclist_create(BIF_P); - plp->next = NULL; - erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); - erts_smp_mtx_lock(&dep->qlock); - erts_proclist_store_last(&dep->suspended, plp); - erts_smp_mtx_unlock(&dep->qlock); - goto yield; - } + if (is_not_nil(dep->cid)) + goto badarg; - ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); + proc->flags |= F_DISTRIBUTION; + ERTS_PROC_SET_DIST_ENTRY(proc, dep); - if (pp->dist_entry || is_not_nil(dep->cid)) - goto badarg; + proc_unlock &= ~ERTS_PROC_LOCK_STATUS; + erts_smp_proc_unlock(proc, ERTS_PROC_LOCK_STATUS); - erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); + dep->send = NULL; /* Only for distr ports... */ - /* - * Dist-ports do not use the "busy port message queue" functionality, but - * instead use "busy dist entry" functionality. - */ - { - ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; - erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); } + else { - pp->dist_entry = dep; + pp = erts_id2port_sflgs(BIF_ARG_2, + BIF_P, + ERTS_PROC_LOCK_MAIN, + ERTS_PORT_SFLGS_INVALID_LOOKUP); + erts_smp_de_rwlock(dep); + + if (!pp || (erts_atomic32_read_nob(&pp->state) + & ERTS_PORT_SFLG_EXITING)) + goto badarg; + + if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) + goto badarg; + + if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) + goto done; /* Already set */ + + if (dep->status & ERTS_DE_SFLG_EXITING) { + /* Suspend on dist entry waiting for the exit to finish */ + ErtsProcList *plp = erts_proclist_create(BIF_P); + plp->next = NULL; + erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); + erts_smp_mtx_lock(&dep->qlock); + erts_proclist_store_last(&dep->suspended, plp); + erts_smp_mtx_unlock(&dep->qlock); + goto yield; + } - dep->version = version; - dep->creation = 0; + ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); - ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); + if (pp->dist_entry || is_not_nil(dep->cid)) + goto badarg; -#if 1 - dep->send = (pp->drv_ptr->outputv - ? dist_port_commandv - : dist_port_command); -#else - dep->send = dist_port_command; -#endif - ASSERT(dep->send); + erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); + + pp->dist_entry = dep; + + ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); + + dep->send = (pp->drv_ptr->outputv + ? dist_port_commandv + : dist_port_command); + ASSERT(dep->send); + + /* + * Dist-ports do not use the "busy port message queue" functionality, but + * instead use "busy dist entry" functionality. + */ + { + ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; + erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); + } + + } + + dep->version = version; + dep->creation = 0; #ifdef DEBUG ASSERT(erts_smp_atomic_read_nob(&dep->qsize) == 0); #endif - erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); - if (flags & DFLAG_DIST_HDR_ATOM_CACHE) create_cache(dep); + erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); + erts_smp_de_rwunlock(dep); dep = NULL; /* inc of refc transferred to port (dist_entry field) */ @@ -2851,6 +3390,9 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) if (pp) erts_port_release(pp); + if (proc_unlock) + erts_smp_proc_unlock(proc, proc_unlock); + return ret; yield: |