aboutsummaryrefslogtreecommitdiffstats
path: root/erts
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2017-06-19 17:06:25 +0200
committerRickard Green <[email protected]>2017-07-04 17:12:18 +0200
commit6b267b203c950db2879f254b6a9d3b7591115f9d (patch)
treec31b98bb30185a990754e3ab046be7246d22a829 /erts
parenta01de6873844ba510084090abec734c4166d71fa (diff)
downloadotp-6b267b203c950db2879f254b6a9d3b7591115f9d.tar.gz
otp-6b267b203c950db2879f254b6a9d3b7591115f9d.tar.bz2
otp-6b267b203c950db2879f254b6a9d3b7591115f9d.zip
Change some dist-entry types
Diffstat (limited to 'erts')
-rw-r--r--erts/emulator/beam/dist.c71
-rw-r--r--erts/emulator/beam/dist.h7
-rw-r--r--erts/emulator/beam/erl_node_tables.c4
-rw-r--r--erts/emulator/beam/erl_node_tables.h4
4 files changed, 48 insertions, 38 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 982f1066df..d9feec4722 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -176,11 +176,13 @@ Uint erts_dist_cache_size(void)
}
static ErtsProcList *
-get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs)
+get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs)
{
+ erts_aint32_t qflgs;
ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&dep->qlock));
- dep->qflgs &= ~unset_qflgs;
- if (dep->qflgs & ERTS_DE_QFLG_EXIT) {
+ qflgs = erts_smp_atomic32_read_band_acqb(&dep->qflgs, ~unset_qflgs);
+ qflgs &= ~unset_qflgs;
+ if (qflgs & ERTS_DE_QFLG_EXIT) {
/* No resume when exit has been scheduled */
return NULL;
}
@@ -544,16 +546,14 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
if (dep->status & ERTS_DE_SFLG_EXITING) {
#ifdef DEBUG
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT);
- erts_smp_mtx_unlock(&dep->qlock);
+ ASSERT(erts_smp_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT);
#endif
}
else {
dep->status |= ERTS_DE_SFLG_EXITING;
erts_smp_mtx_lock(&dep->qlock);
- ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
- dep->qflgs |= ERTS_DE_QFLG_EXIT;
+ ASSERT(!(erts_smp_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
+ erts_smp_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT);
erts_smp_mtx_unlock(&dep->qlock);
}
@@ -706,8 +706,9 @@ static void clear_dist_entry(DistEntry *dep)
if (obufsize) {
erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
+ ASSERT(erts_smp_atomic_read_nob(&dep->qsize) >= obufsize);
+ erts_smp_atomic_add_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
erts_smp_mtx_unlock(&dep->qlock);
}
}
@@ -1861,12 +1862,18 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
free_dist_obuf(ctx->obuf);
}
else {
+ Sint qsize;
+ erts_aint32_t qflgs;
ErtsProcList *plp = NULL;
erts_smp_mtx_lock(&dep->qlock);
- dep->qsize += size_obuf(ctx->obuf);
- if (dep->qsize >= erts_dist_buf_busy_limit)
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- if (!ctx->force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) {
+ qsize = erts_smp_atomic_add_read_nob(&dep->qsize,
+ (erts_aint_t) size_obuf(ctx->obuf));
+ qflgs = erts_smp_atomic32_read_nob(&dep->qflgs);
+ if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) {
+ erts_smp_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY);
+ qflgs |= ERTS_DE_QFLG_BUSY;
+ }
+ if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) {
erts_smp_mtx_unlock(&dep->qlock);
plp = erts_proclist_create(ctx->c_p);
@@ -1883,7 +1890,8 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
dep->out_queue.last = ctx->obuf;
if (!ctx->force_busy) {
- if (!(dep->qflgs & ERTS_DE_QFLG_BUSY)) {
+ qflgs = erts_smp_atomic32_read_nob(&dep->qflgs);
+ if (!(qflgs & ERTS_DE_QFLG_BUSY)) {
if (suspended)
resume = 1; /* was busy when we started, but isn't now */
#ifdef USE_VM_PROBES
@@ -2074,7 +2082,7 @@ erts_dist_command(Port *prt, int reds_limit)
Sint reds = ERTS_PORT_REDS_DIST_CMD_START;
Uint32 status;
Uint32 flags;
- Sint obufsize = 0;
+ Sint qsize, obufsize = 0;
ErtsDistOutputQueue oq, foq;
DistEntry *dep = prt->dist_entry;
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
@@ -2204,6 +2212,7 @@ erts_dist_command(Port *prt, int reds_limit)
}
}
else {
+ int de_busy;
int preempt = 0;
while (oq.first && !preempt) {
ErtsDistOutputBuf *fob;
@@ -2257,12 +2266,13 @@ erts_dist_command(Port *prt, int reds_limit)
* processes.
*/
erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
+ de_busy = !!(erts_smp_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_BUSY);
+ qsize = (Sint) erts_smp_atomic_add_read_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
+ ASSERT(qsize >= 0);
obufsize = 0;
if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)
- && (dep->qflgs & ERTS_DE_QFLG_BUSY)
- && dep->qsize < erts_dist_buf_busy_limit) {
+ && de_busy && qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
int resumed;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
@@ -2282,8 +2292,13 @@ erts_dist_command(Port *prt, int reds_limit)
if (obufsize != 0) {
ASSERT(obufsize > 0);
erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize >= obufsize);
- dep->qsize -= obufsize;
+#ifdef DEBUG
+ qsize = (Sint) erts_smp_atomic_add_read_nob(&dep->qsize,
+ (erts_aint_t) -obufsize);
+ ASSERT(qsize >= 0);
+#else
+ erts_smp_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize);
+#endif
erts_smp_mtx_unlock(&dep->qlock);
}
@@ -2339,7 +2354,7 @@ erts_dist_command(Port *prt, int reds_limit)
#ifdef DEBUG
erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize == obufsize);
+ ASSERT(erts_smp_atomic_read_nob(&dep->qsize) == obufsize);
erts_smp_mtx_unlock(&dep->qlock);
#endif
}
@@ -2350,7 +2365,7 @@ erts_dist_command(Port *prt, int reds_limit)
* in out_queue.
*/
erts_smp_mtx_lock(&dep->qlock);
- dep->qsize -= obufsize;
+ erts_smp_atomic_add_nob(&dep->qsize, -obufsize);
obufsize = 0;
oq.last->next = dep->out_queue.first;
dep->out_queue.first = oq.first;
@@ -2394,8 +2409,8 @@ erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id)
dep->status |= ERTS_DE_SFLG_EXITING;
erts_smp_mtx_lock(&dep->qlock);
- ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
- dep->qflgs |= ERTS_DE_QFLG_EXIT;
+ ASSERT(!(erts_smp_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
+ erts_smp_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT);
erts_smp_mtx_unlock(&dep->qlock);
erts_schedule_dist_command(NULL, dep);
@@ -2808,9 +2823,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
ASSERT(dep->send);
#ifdef DEBUG
- erts_smp_mtx_lock(&dep->qlock);
- ASSERT(dep->qsize == 0);
- erts_smp_mtx_unlock(&dep->qlock);
+ ASSERT(erts_smp_atomic_read_nob(&dep->qsize) == 0);
#endif
erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index 3e17645997..dcd5846ca5 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -161,13 +161,10 @@ erts_dsig_prepare(ErtsDSigData *dsdp,
goto fail;
}
if (no_suspend) {
- failure = ERTS_DSIG_PREP_CONNECTED;
- erts_smp_mtx_lock(&dep->qlock);
- if (dep->qflgs & ERTS_DE_QFLG_BUSY)
+ if (erts_smp_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) {
failure = ERTS_DSIG_PREP_WOULD_SUSPEND;
- erts_smp_mtx_unlock(&dep->qlock);
- if (failure == ERTS_DSIG_PREP_WOULD_SUSPEND)
goto fail;
+ }
}
dsdp->proc = proc;
dsdp->dep = dep;
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index 3c5945d48d..c1796a8894 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -113,8 +113,8 @@ dist_table_alloc(void *dep_tmpl)
dep->monitors = NULL;
erts_smp_mtx_init_x(&dep->qlock, "dist_entry_out_queue", chnl_nr);
- dep->qflgs = 0;
- dep->qsize = 0;
+ erts_smp_atomic32_init_nob(&dep->qflgs, 0);
+ erts_smp_atomic_init_nob(&dep->qsize, 0);
dep->out_queue.first = NULL;
dep->out_queue.last = NULL;
dep->suspended = NULL;
diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h
index 489da1ba17..04acfe41b1 100644
--- a/erts/emulator/beam/erl_node_tables.h
+++ b/erts/emulator/beam/erl_node_tables.h
@@ -133,8 +133,8 @@ typedef struct dist_entry_ {
ErtsMonitor *monitors; /* Monitor tree */
erts_smp_mtx_t qlock; /* Protects qflgs and out_queue */
- Uint32 qflgs;
- Sint qsize;
+ erts_smp_atomic32_t qflgs;
+ erts_smp_atomic_t qsize;
ErtsDistOutputQueue out_queue;
struct ErtsProcList_ *suspended;