aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-09-14 21:42:11 +0200
committerRickard Green <[email protected]>2012-12-03 21:18:09 +0100
commit34fc6f243f8a462f4b2370a9fe5376df1ca08f1d (patch)
tree1a7377203f573c40269f12b59611a68e0859c627 /erts/emulator/beam/dist.c
parent620c8c5bfe4c2b306a7bc0a7d41749bddea4ee62 (diff)
downloadotp-34fc6f243f8a462f4b2370a9fe5376df1ca08f1d.tar.gz
otp-34fc6f243f8a462f4b2370a9fe5376df1ca08f1d.tar.bz2
otp-34fc6f243f8a462f4b2370a9fe5376df1ca08f1d.zip
Move busy port flag
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c46
1 files changed, 18 insertions, 28 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 432b842848..f28d9dff09 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -171,11 +171,10 @@ get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs)
return NULL;
}
else {
- ErtsProcList *plp;
- plp = dep->suspended.first;
- dep->suspended.first = NULL;
- dep->suspended.last = NULL;
- return plp;
+ ErtsProcList *suspended = dep->suspended;
+ dep->suspended = NULL;
+ erts_proclist_fetch(&suspended, NULL);
+ return suspended;
}
}
@@ -1698,7 +1697,6 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
erts_smp_mtx_unlock(&dep->qlock);
plp = erts_proclist_create(c_p);
- plp->next = NULL;
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
suspended = 1;
erts_smp_mtx_lock(&dep->qlock);
@@ -1731,11 +1729,7 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
else {
/* Enqueue suspended process on dist entry */
ASSERT(plp);
- if (dep->suspended.last)
- dep->suspended.last->next = plp;
- else
- dep->suspended.first = plp;
- dep->suspended.last = plp;
+ erts_proclist_store_last(&dep->suspended, plp);
}
}
@@ -1914,7 +1908,7 @@ erts_dist_command(Port *prt, int reds_limit)
ErtsDistOutputQueue oq, foq;
DistEntry *dep = prt->dist_entry;
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
- erts_aint32_t state;
+ erts_aint32_t sched_flags;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
@@ -1957,12 +1951,12 @@ erts_dist_command(Port *prt, int reds_limit)
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
- state = erts_atomic32_read_nob(&prt->state);
+ sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
if (reds > reds_limit)
goto preempted;
- if (!(state & ERTS_PORT_SFLG_PORT_BUSY) && foq.first) {
+ if (!(sched_flags & ERTS_PTS_FLG_BUSY) && foq.first) {
int preempt = 0;
do {
Uint size;
@@ -1978,9 +1972,9 @@ erts_dist_command(Port *prt, int reds_limit)
obufsize += size_obuf(fob);
foq.first = foq.first->next;
free_dist_obuf(fob);
- state = erts_atomic32_read_nob(&prt->state);
- preempt = reds > reds_limit || (state & ERTS_PORT_SFLGS_DEAD);
- if (state & ERTS_PORT_SFLG_PORT_BUSY)
+ sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT);
+ if (sched_flags & ERTS_PTS_FLG_BUSY)
break;
} while (foq.first && !preempt);
if (!foq.first)
@@ -1989,7 +1983,7 @@ erts_dist_command(Port *prt, int reds_limit)
goto preempted;
}
- if (state & ERTS_PORT_SFLG_PORT_BUSY) {
+ if (sched_flags & ERTS_PTS_FLG_BUSY) {
if (oq.first) {
ErtsDistOutputBuf *ob;
int preempt;
@@ -2060,9 +2054,9 @@ erts_dist_command(Port *prt, int reds_limit)
obufsize += size_obuf(fob);
oq.first = oq.first->next;
free_dist_obuf(fob);
- state = erts_atomic32_read_nob(&prt->state);
- preempt = reds > reds_limit || (state & ERTS_PORT_SFLGS_DEAD);
- if ((state & ERTS_PORT_SFLG_PORT_BUSY) && oq.first && !preempt)
+ sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
+ preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT);
+ if ((sched_flags & ERTS_PTS_FLG_BUSY) && oq.first && !preempt)
goto finalize_only;
}
@@ -2091,7 +2085,7 @@ erts_dist_command(Port *prt, int reds_limit)
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
obufsize = 0;
- if (!(state & ERTS_PORT_SFLG_PORT_BUSY)
+ if (!(sched_flags & ERTS_PTS_FLG_BUSY)
&& (dep->qflgs & ERTS_DE_QFLG_BUSY)
&& dep->qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
@@ -2145,7 +2139,7 @@ erts_dist_command(Port *prt, int reds_limit)
ASSERT(oq.first || !oq.last);
ASSERT(!oq.first || oq.last);
- if (state & ERTS_PORT_SFLGS_DEAD) {
+ if (sched_flags & ERTS_PTS_FLG_EXIT) {
/*
* Port died during port command; clean up 'oq'
* and 'foq'. Things buffered in dist entry after
@@ -2602,11 +2596,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
plp->next = NULL;
erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
erts_smp_mtx_lock(&dep->qlock);
- if (dep->suspended.last)
- dep->suspended.last->next = plp;
- else
- dep->suspended.first = plp;
- dep->suspended.last = plp;
+ erts_proclist_store_last(&dep->suspended, plp);
erts_smp_mtx_unlock(&dep->qlock);
goto yield;
}