diff options
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r-- | erts/emulator/beam/dist.c | 43 |
1 files changed, 23 insertions, 20 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 5de8236a7e..eccef44a11 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -423,7 +423,8 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) prt = erts_id2port(prt_id, NULL, 0); if (prt) { - ASSERT(prt->status & ERTS_PORT_SFLG_DISTRIBUTION); + ASSERT(erts_smp_atomic32_read_nob(&prt->state) + & ERTS_PORT_SFLG_DISTRIBUTION); ASSERT(prt->dist_entry); /* will call do_net_exists !!! */ erts_do_exit_port(prt, prt_id, nd_reason); @@ -1907,13 +1908,13 @@ int erts_dist_command(Port *prt, int reds_limit) { Sint reds = ERTS_PORT_REDS_DIST_CMD_START; - int prt_busy; Uint32 status; Uint32 flags; Sint obufsize = 0; ErtsDistOutputQueue oq, foq; DistEntry *dep = prt->dist_entry; Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); + erts_aint32_t state; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); @@ -1956,12 +1957,12 @@ erts_dist_command(Port *prt, int reds_limit) dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; + state = erts_smp_atomic32_read_nob(&prt->state); + if (reds > reds_limit) goto preempted; - prt_busy = (int) (prt->status & ERTS_PORT_SFLG_PORT_BUSY); - - if (!prt_busy && foq.first) { + if (!(state & ERTS_PORT_SFLG_PORT_BUSY) && foq.first) { int preempt = 0; do { Uint size; @@ -1977,11 +1978,10 @@ erts_dist_command(Port *prt, int reds_limit) obufsize += size_obuf(fob); foq.first = foq.first->next; free_dist_obuf(fob); - preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD); - if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) { - prt_busy = 1; + state = erts_smp_atomic32_read_nob(&prt->state); + preempt = reds > reds_limit || (state & ERTS_PORT_SFLGS_DEAD); + if (state & ERTS_PORT_SFLG_PORT_BUSY) break; - } } while (foq.first && !preempt); if (!foq.first) foq.last = NULL; @@ -1989,7 +1989,7 @@ erts_dist_command(Port *prt, int reds_limit) goto preempted; } - if (prt_busy) { + if (state & ERTS_PORT_SFLG_PORT_BUSY) { if (oq.first) { ErtsDistOutputBuf *ob; int preempt; @@ -2060,12 +2060,10 @@ erts_dist_command(Port *prt, int reds_limit) obufsize += size_obuf(fob); oq.first = oq.first->next; free_dist_obuf(fob); - preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD); - if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) { - prt_busy = 1; - if (oq.first && !preempt) - goto finalize_only; - } + state = erts_smp_atomic32_read_nob(&prt->state); + preempt = reds > reds_limit || (state & ERTS_PORT_SFLGS_DEAD); + if ((state & ERTS_PORT_SFLG_PORT_BUSY) && oq.first && !preempt) + goto finalize_only; } ASSERT(!oq.first || preempt); @@ -2093,7 +2091,7 @@ erts_dist_command(Port *prt, int reds_limit) ASSERT(dep->qsize >= obufsize); dep->qsize -= obufsize; obufsize = 0; - if (!prt_busy + if (!(state & ERTS_PORT_SFLG_PORT_BUSY) && (dep->qflgs & ERTS_DE_QFLG_BUSY) && dep->qsize < erts_dist_buf_busy_limit) { ErtsProcList *suspendees; @@ -2139,11 +2137,15 @@ erts_dist_command(Port *prt, int reds_limit) return reds; preempted: + /* + * Here we assume that state has been read + * since last call to driver. + */ ASSERT(oq.first || !oq.last); ASSERT(!oq.first || oq.last); - if (prt->status & ERTS_PORT_SFLGS_DEAD) { + if (state & ERTS_PORT_SFLGS_DEAD) { /* * Port died during port command; clean up 'oq' * and 'foq'. Things buffered in dist entry after @@ -2581,7 +2583,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) pp = erts_id2port(BIF_ARG_2, BIF_P, ERTS_PROC_LOCK_MAIN); erts_smp_de_rwlock(dep); - if (!pp || (pp->status & ERTS_PORT_SFLG_EXITING)) + if (!pp || (erts_smp_atomic32_read_nob(&pp->state) + & ERTS_PORT_SFLG_EXITING)) goto badarg; if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) @@ -2610,7 +2613,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) if (pp->dist_entry || is_not_nil(dep->cid)) goto badarg; - erts_port_status_bor_set(pp, ERTS_PORT_SFLG_DISTRIBUTION); + erts_smp_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); pp->dist_entry = dep; |