aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c71
1 files changed, 42 insertions, 29 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 982f1066df..d9feec4722 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -176,11 +176,13 @@ Uint erts_dist_cache_size(void)
}
static ErtsProcList *
-get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs)
+get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs)
{
+ erts_aint32_t qflgs;
ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&dep->qlock));
- dep->qflgs &= ~unset_qflgs;
- if (dep->qflgs & ERTS_DE_QFLG_EXIT) {
+ qflgs = erts_smp_atomic32_read_band_acqb(&dep->qflgs, ~unset_qflgs);
+ qflgs &= ~unset_qflgs;
+ if (qflgs & ERTS_DE_QFLG_EXIT) {
/* No resume when exit has been scheduled */
return NULL;
}
@@ -544,16 +546,14 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
if (dep->status & ERTS_DE_SFLG_EXITING) {
#ifdef DEBUG
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT);
- erts_smp_mtx_unlock(&dep->qlock);
+ ASSERT(erts_smp_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT);
#endif
}
else {
dep->status |= ERTS_DE_SFLG_EXITING;
erts_smp_mtx_lock(&dep->qlock);
- ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
- dep->qflgs |= ERTS_DE_QFLG_EXIT;
+ ASSERT(!(erts_smp_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
+ erts_smp_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT);
erts_smp_mtx_unlock(&dep->qlock);
}
@@ -706,8 +706,9 @@ static void clear_dist_entry(DistEntry *dep)
if (obufsize) {
erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
+ ASSERT(erts_smp_atomic_read_nob(&dep->qsize) >= obufsize);
+ erts_smp_atomic_add_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
erts_smp_mtx_unlock(&dep->qlock);
}
}
@@ -1861,12 +1862,18 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
free_dist_obuf(ctx->obuf);
}
else {
+ Sint qsize;
+ erts_aint32_t qflgs;
ErtsProcList *plp = NULL;
erts_smp_mtx_lock(&dep->qlock);
- dep->qsize += size_obuf(ctx->obuf);
- if (dep->qsize >= erts_dist_buf_busy_limit)
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- if (!ctx->force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) {
+ qsize = erts_smp_atomic_add_read_nob(&dep->qsize,
+ (erts_aint_t) size_obuf(ctx->obuf));
+ qflgs = erts_smp_atomic32_read_nob(&dep->qflgs);
+ if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) {
+ erts_smp_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY);
+ qflgs |= ERTS_DE_QFLG_BUSY;
+ }
+ if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) {
erts_smp_mtx_unlock(&dep->qlock);
plp = erts_proclist_create(ctx->c_p);
@@ -1883,7 +1890,8 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
dep->out_queue.last = ctx->obuf;
if (!ctx->force_busy) {
- if (!(dep->qflgs & ERTS_DE_QFLG_BUSY)) {
+ qflgs = erts_smp_atomic32_read_nob(&dep->qflgs);
+ if (!(qflgs & ERTS_DE_QFLG_BUSY)) {
if (suspended)
resume = 1; /* was busy when we started, but isn't now */
#ifdef USE_VM_PROBES
@@ -2074,7 +2082,7 @@ erts_dist_command(Port *prt, int reds_limit)
Sint reds = ERTS_PORT_REDS_DIST_CMD_START;
Uint32 status;
Uint32 flags;
- Sint obufsize = 0;
+ Sint qsize, obufsize = 0;
ErtsDistOutputQueue oq, foq;
DistEntry *dep = prt->dist_entry;
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
@@ -2204,6 +2212,7 @@ erts_dist_command(Port *prt, int reds_limit)
}
}
else {
+ int de_busy;
int preempt = 0;
while (oq.first && !preempt) {
ErtsDistOutputBuf *fob;
@@ -2257,12 +2266,13 @@ erts_dist_command(Port *prt, int reds_limit)
* processes.
*/
erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
+ de_busy = !!(erts_smp_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_BUSY);
+ qsize = (Sint) erts_smp_atomic_add_read_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
+ ASSERT(qsize >= 0);
obufsize = 0;
if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)
- && (dep->qflgs & ERTS_DE_QFLG_BUSY)
- && dep->qsize < erts_dist_buf_busy_limit) {
+ && de_busy && qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
int resumed;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
@@ -2282,8 +2292,13 @@ erts_dist_command(Port *prt, int reds_limit)
if (obufsize != 0) {
ASSERT(obufsize > 0);
erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
+#ifdef DEBUG
+ qsize = (Sint) erts_smp_atomic_add_read_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
+ ASSERT(qsize >= 0);
+#else
+ erts_smp_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize);
+#endif
erts_smp_mtx_unlock(&dep->qlock);
}
@@ -2339,7 +2354,7 @@ erts_dist_command(Port *prt, int reds_limit)
#ifdef DEBUG
erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize == obufsize);
+ ASSERT(erts_smp_atomic_read_nob(&dep->qsize) == obufsize);
erts_smp_mtx_unlock(&dep->qlock);
#endif
}
@@ -2350,7 +2365,7 @@ erts_dist_command(Port *prt, int reds_limit)
* in out_queue.
*/
erts_smp_mtx_lock(&dep->qlock);
- dep->qsize -= obufsize;
+ erts_smp_atomic_add_nob(&dep->qsize, -obufsize);
obufsize = 0;
oq.last->next = dep->out_queue.first;
dep->out_queue.first = oq.first;
@@ -2394,8 +2409,8 @@ erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id)
dep->status |= ERTS_DE_SFLG_EXITING;
erts_smp_mtx_lock(&dep->qlock);
- ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
- dep->qflgs |= ERTS_DE_QFLG_EXIT;
+ ASSERT(!(erts_smp_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
+ erts_smp_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT);
erts_smp_mtx_unlock(&dep->qlock);
erts_schedule_dist_command(NULL, dep);
@@ -2808,9 +2823,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
ASSERT(dep->send);
#ifdef DEBUG
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize == 0);
- erts_smp_mtx_unlock(&dep->qlock);
+ ASSERT(erts_smp_atomic_read_nob(&dep->qsize) == 0);
#endif
erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);