aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c43
1 files changed, 23 insertions, 20 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 5de8236a7e..eccef44a11 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -423,7 +423,8 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
prt = erts_id2port(prt_id, NULL, 0);
if (prt) {
- ASSERT(prt->status & ERTS_PORT_SFLG_DISTRIBUTION);
+ ASSERT(erts_smp_atomic32_read_nob(&prt->state)
+ & ERTS_PORT_SFLG_DISTRIBUTION);
ASSERT(prt->dist_entry);
/* will call do_net_exists !!! */
erts_do_exit_port(prt, prt_id, nd_reason);
@@ -1907,13 +1908,13 @@ int
erts_dist_command(Port *prt, int reds_limit)
{
Sint reds = ERTS_PORT_REDS_DIST_CMD_START;
- int prt_busy;
Uint32 status;
Uint32 flags;
Sint obufsize = 0;
ErtsDistOutputQueue oq, foq;
DistEntry *dep = prt->dist_entry;
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
+ erts_aint32_t state;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
@@ -1956,12 +1957,12 @@ erts_dist_command(Port *prt, int reds_limit)
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
+ state = erts_smp_atomic32_read_nob(&prt->state);
+
if (reds > reds_limit)
goto preempted;
- prt_busy = (int) (prt->status & ERTS_PORT_SFLG_PORT_BUSY);
-
- if (!prt_busy && foq.first) {
+ if (!(state & ERTS_PORT_SFLG_PORT_BUSY) && foq.first) {
int preempt = 0;
do {
Uint size;
@@ -1977,11 +1978,10 @@ erts_dist_command(Port *prt, int reds_limit)
obufsize += size_obuf(fob);
foq.first = foq.first->next;
free_dist_obuf(fob);
- preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD);
- if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) {
- prt_busy = 1;
+ state = erts_smp_atomic32_read_nob(&prt->state);
+ preempt = reds > reds_limit || (state & ERTS_PORT_SFLGS_DEAD);
+ if (state & ERTS_PORT_SFLG_PORT_BUSY)
break;
- }
} while (foq.first && !preempt);
if (!foq.first)
foq.last = NULL;
@@ -1989,7 +1989,7 @@ erts_dist_command(Port *prt, int reds_limit)
goto preempted;
}
- if (prt_busy) {
+ if (state & ERTS_PORT_SFLG_PORT_BUSY) {
if (oq.first) {
ErtsDistOutputBuf *ob;
int preempt;
@@ -2060,12 +2060,10 @@ erts_dist_command(Port *prt, int reds_limit)
obufsize += size_obuf(fob);
oq.first = oq.first->next;
free_dist_obuf(fob);
- preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD);
- if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) {
- prt_busy = 1;
- if (oq.first && !preempt)
- goto finalize_only;
- }
+ state = erts_smp_atomic32_read_nob(&prt->state);
+ preempt = reds > reds_limit || (state & ERTS_PORT_SFLGS_DEAD);
+ if ((state & ERTS_PORT_SFLG_PORT_BUSY) && oq.first && !preempt)
+ goto finalize_only;
}
ASSERT(!oq.first || preempt);
@@ -2093,7 +2091,7 @@ erts_dist_command(Port *prt, int reds_limit)
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
obufsize = 0;
- if (!prt_busy
+ if (!(state & ERTS_PORT_SFLG_PORT_BUSY)
&& (dep->qflgs & ERTS_DE_QFLG_BUSY)
&& dep->qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
@@ -2139,11 +2137,15 @@ erts_dist_command(Port *prt, int reds_limit)
return reds;
preempted:
+ /*
+ * Here we assume that state has been read
+ * since last call to driver.
+ */
ASSERT(oq.first || !oq.last);
ASSERT(!oq.first || oq.last);
- if (prt->status & ERTS_PORT_SFLGS_DEAD) {
+ if (state & ERTS_PORT_SFLGS_DEAD) {
/*
* Port died during port command; clean up 'oq'
* and 'foq'. Things buffered in dist entry after
@@ -2581,7 +2583,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
pp = erts_id2port(BIF_ARG_2, BIF_P, ERTS_PROC_LOCK_MAIN);
erts_smp_de_rwlock(dep);
- if (!pp || (pp->status & ERTS_PORT_SFLG_EXITING))
+ if (!pp || (erts_smp_atomic32_read_nob(&pp->state)
+ & ERTS_PORT_SFLG_EXITING))
goto badarg;
if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
@@ -2610,7 +2613,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
if (pp->dist_entry || is_not_nil(dep->cid))
goto badarg;
- erts_port_status_bor_set(pp, ERTS_PORT_SFLG_DISTRIBUTION);
+ erts_smp_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
pp->dist_entry = dep;