diff options
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/beam/dist.c | 71 | ||||
-rw-r--r-- | erts/emulator/beam/dist.h | 7 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.c | 4 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.h | 4 |
4 files changed, 48 insertions, 38 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 982f1066df..d9feec4722 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -176,11 +176,13 @@ Uint erts_dist_cache_size(void) } static ErtsProcList * -get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs) +get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs) { + erts_aint32_t qflgs; ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&dep->qlock)); - dep->qflgs &= ~unset_qflgs; - if (dep->qflgs & ERTS_DE_QFLG_EXIT) { + qflgs = erts_smp_atomic32_read_band_acqb(&dep->qflgs, ~unset_qflgs); + qflgs &= ~unset_qflgs; + if (qflgs & ERTS_DE_QFLG_EXIT) { /* No resume when exit has been scheduled */ return NULL; } @@ -544,16 +546,14 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (dep->status & ERTS_DE_SFLG_EXITING) { #ifdef DEBUG - erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT); - erts_smp_mtx_unlock(&dep->qlock); + ASSERT(erts_smp_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT); #endif } else { dep->status |= ERTS_DE_SFLG_EXITING; erts_smp_mtx_lock(&dep->qlock); - ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT)); - dep->qflgs |= ERTS_DE_QFLG_EXIT; + ASSERT(!(erts_smp_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); + erts_smp_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT); erts_smp_mtx_unlock(&dep->qlock); } @@ -706,8 +706,9 @@ static void clear_dist_entry(DistEntry *dep) if (obufsize) { erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize >= obufsize); - dep->qsize -= obufsize; + ASSERT(erts_smp_atomic_read_nob(&dep->qsize) >= obufsize); + erts_smp_atomic_add_nob(&dep->qsize, + (erts_aint_t) -obufsize); erts_smp_mtx_unlock(&dep->qlock); } } @@ -1861,12 +1862,18 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) free_dist_obuf(ctx->obuf); } else { + Sint qsize; + erts_aint32_t qflgs; ErtsProcList *plp = NULL; erts_smp_mtx_lock(&dep->qlock); - dep->qsize += size_obuf(ctx->obuf); - if (dep->qsize >= erts_dist_buf_busy_limit) - dep->qflgs |= ERTS_DE_QFLG_BUSY; - if (!ctx->force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) { + qsize = erts_smp_atomic_add_read_nob(&dep->qsize, + (erts_aint_t) size_obuf(ctx->obuf)); + qflgs = erts_smp_atomic32_read_nob(&dep->qflgs); + if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) { + erts_smp_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY); + qflgs |= ERTS_DE_QFLG_BUSY; + } + if (!ctx->force_busy && (qflgs & ERTS_DE_QFLG_BUSY)) { erts_smp_mtx_unlock(&dep->qlock); plp = erts_proclist_create(ctx->c_p); @@ -1883,7 +1890,8 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) dep->out_queue.last = ctx->obuf; if (!ctx->force_busy) { - if (!(dep->qflgs & ERTS_DE_QFLG_BUSY)) { + qflgs = erts_smp_atomic32_read_nob(&dep->qflgs); + if (!(qflgs & ERTS_DE_QFLG_BUSY)) { if (suspended) resume = 1; /* was busy when we started, but isn't now */ #ifdef USE_VM_PROBES @@ -2074,7 +2082,7 @@ erts_dist_command(Port *prt, int reds_limit) Sint reds = ERTS_PORT_REDS_DIST_CMD_START; Uint32 status; Uint32 flags; - Sint obufsize = 0; + Sint qsize, obufsize = 0; ErtsDistOutputQueue oq, foq; DistEntry *dep = prt->dist_entry; Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); @@ -2204,6 +2212,7 @@ erts_dist_command(Port *prt, int reds_limit) } } else { + int de_busy; int preempt = 0; while (oq.first && !preempt) { ErtsDistOutputBuf *fob; @@ -2257,12 +2266,13 @@ erts_dist_command(Port *prt, int reds_limit) * processes. */ erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize >= obufsize); - dep->qsize -= obufsize; + de_busy = !!(erts_smp_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_BUSY); + qsize = (Sint) erts_smp_atomic_add_read_nob(&dep->qsize, + (erts_aint_t) -obufsize); + ASSERT(qsize >= 0); obufsize = 0; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) - && (dep->qflgs & ERTS_DE_QFLG_BUSY) - && dep->qsize < erts_dist_buf_busy_limit) { + && de_busy && qsize < erts_dist_buf_busy_limit) { ErtsProcList *suspendees; int resumed; suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); @@ -2282,8 +2292,13 @@ erts_dist_command(Port *prt, int reds_limit) if (obufsize != 0) { ASSERT(obufsize > 0); erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize >= obufsize); - dep->qsize -= obufsize; +#ifdef DEBUG + qsize = (Sint) erts_smp_atomic_add_read_nob(&dep->qsize, + (erts_aint_t) -obufsize); + ASSERT(qsize >= 0); +#else + erts_smp_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize); +#endif erts_smp_mtx_unlock(&dep->qlock); } @@ -2339,7 +2354,7 @@ erts_dist_command(Port *prt, int reds_limit) #ifdef DEBUG erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize == obufsize); + ASSERT(erts_smp_atomic_read_nob(&dep->qsize) == obufsize); erts_smp_mtx_unlock(&dep->qlock); #endif } @@ -2350,7 +2365,7 @@ erts_dist_command(Port *prt, int reds_limit) * in out_queue. */ erts_smp_mtx_lock(&dep->qlock); - dep->qsize -= obufsize; + erts_smp_atomic_add_nob(&dep->qsize, -obufsize); obufsize = 0; oq.last->next = dep->out_queue.first; dep->out_queue.first = oq.first; @@ -2394,8 +2409,8 @@ erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) dep->status |= ERTS_DE_SFLG_EXITING; erts_smp_mtx_lock(&dep->qlock); - ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT)); - dep->qflgs |= ERTS_DE_QFLG_EXIT; + ASSERT(!(erts_smp_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); + erts_smp_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); erts_smp_mtx_unlock(&dep->qlock); erts_schedule_dist_command(NULL, dep); @@ -2808,9 +2823,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) ASSERT(dep->send); #ifdef DEBUG - erts_smp_mtx_lock(&dep->qlock); - ASSERT(dep->qsize == 0); - erts_smp_mtx_unlock(&dep->qlock); + ASSERT(erts_smp_atomic_read_nob(&dep->qsize) == 0); #endif erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 3e17645997..dcd5846ca5 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -161,13 +161,10 @@ erts_dsig_prepare(ErtsDSigData *dsdp, goto fail; } if (no_suspend) { - failure = ERTS_DSIG_PREP_CONNECTED; - erts_smp_mtx_lock(&dep->qlock); - if (dep->qflgs & ERTS_DE_QFLG_BUSY) + if (erts_smp_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) { failure = ERTS_DSIG_PREP_WOULD_SUSPEND; - erts_smp_mtx_unlock(&dep->qlock); - if (failure == ERTS_DSIG_PREP_WOULD_SUSPEND) goto fail; + } } dsdp->proc = proc; dsdp->dep = dep; diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 3c5945d48d..c1796a8894 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -113,8 +113,8 @@ dist_table_alloc(void *dep_tmpl) dep->monitors = NULL; erts_smp_mtx_init_x(&dep->qlock, "dist_entry_out_queue", chnl_nr); - dep->qflgs = 0; - dep->qsize = 0; + erts_smp_atomic32_init_nob(&dep->qflgs, 0); + erts_smp_atomic_init_nob(&dep->qsize, 0); dep->out_queue.first = NULL; dep->out_queue.last = NULL; dep->suspended = NULL; diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index 489da1ba17..04acfe41b1 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -133,8 +133,8 @@ typedef struct dist_entry_ { ErtsMonitor *monitors; /* Monitor tree */ erts_smp_mtx_t qlock; /* Protects qflgs and out_queue */ - Uint32 qflgs; - Sint qsize; + erts_smp_atomic32_t qflgs; + erts_smp_atomic_t qsize; ErtsDistOutputQueue out_queue; struct ErtsProcList_ *suspended; |