aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_thr_progress.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2011-10-10 21:03:10 +0200
committerRickard Green <[email protected]>2011-11-13 20:40:56 +0100
commitbc5818cfdd56e19a16357f4443d80a56426aa134 (patch)
treebfe53d3e0cf24ccd7374e0174c1a4c441ab2e097 /erts/emulator/beam/erl_thr_progress.c
parenta67e91e658bdbba24fcc3c79b06fdf10ff830bc9 (diff)
downloadotp-bc5818cfdd56e19a16357f4443d80a56426aa134.tar.gz
otp-bc5818cfdd56e19a16357f4443d80a56426aa134.tar.bz2
otp-bc5818cfdd56e19a16357f4443d80a56426aa134.zip
Replace system block with thread progress block
The ERTS internal system block functionality has been replaced by new functionality for blocking the system. The old system block functionality had contention issues and complexity issues. The new functionality piggy-backs on thread progress tracking functionality needed by newly introduced lock-free synchronization in the runtime system. When the functionality for blocking the system isn't used there is more or less no overhead at all. This since the functionality for tracking thread progress is there and needed anyway.
Diffstat (limited to 'erts/emulator/beam/erl_thr_progress.c')
-rw-r--r--erts/emulator/beam/erl_thr_progress.c481
1 files changed, 422 insertions, 59 deletions
diff --git a/erts/emulator/beam/erl_thr_progress.c b/erts/emulator/beam/erl_thr_progress.c
index f96ae4b70d..9324bcde51 100644
--- a/erts/emulator/beam/erl_thr_progress.c
+++ b/erts/emulator/beam/erl_thr_progress.c
@@ -76,6 +76,7 @@
#include <stddef.h> /* offsetof() */
#include "erl_thr_progress.h"
+#include "global.h"
#ifdef ERTS_SMP
@@ -88,9 +89,14 @@
#define ERTS_THR_PRGR_PRINT_LEADER 0
#define ERTS_THR_PRGR_PRINT_VAL 0
+#define ERTS_THR_PRGR_PRINT_BLOCKERS 0
-#define ERTS_THR_PRGR_LFLG_NO_LEADER (((erts_aint32_t) 1) << 31)
-#define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~ERTS_THR_PRGR_LFLG_NO_LEADER)
+#define ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL 100
+
+#define ERTS_THR_PRGR_LFLG_BLOCK (((erts_aint32_t) 1) << 31)
+#define ERTS_THR_PRGR_LFLG_NO_LEADER (((erts_aint32_t) 1) << 30)
+#define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~(ERTS_THR_PRGR_LFLG_NO_LEADER \
+ | ERTS_THR_PRGR_LFLG_BLOCK))
#define ERTS_THR_PRGR_LFLGS_ACTIVE(LFLGS) \
((LFLGS) & ERTS_THR_PRGR_LFLG_ACTIVE_MASK)
@@ -225,6 +231,11 @@ do { \
#endif /* ERTS_THR_PROGRESS_STATE_DEBUG */
+#define ERTS_THR_PRGR_BLCKR_INVALID (~((erts_aint32_t) 0))
+#define ERTS_THR_PRGR_BLCKR_UNMANAGED (((erts_aint32_t) 1) << 31)
+
+#define ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING (((erts_aint32_t) 1) << 31)
+
#define ERTS_THR_PRGR_BM_BITS 32
#define ERTS_THR_PRGR_BM_SHIFT 5
#define ERTS_THR_PRGR_BM_MASK 0x1f
@@ -249,11 +260,13 @@ typedef struct {
typedef struct {
erts_atomic32_t lflgs;
-
+ erts_atomic32_t block_count;
+ erts_atomic_t blocker_event;
erts_atomic32_t pref_wakeup_used;
+ erts_atomic32_t managed_count;
erts_atomic32_t managed_id;
erts_atomic32_t unmanaged_id;
-} ErtsThrPrgrMiscVolatile;
+} ErtsThrPrgrMiscData;
typedef struct {
ERTS_THR_PRGR_ATOMIC current;
@@ -269,19 +282,19 @@ typedef union {
typedef struct {
union {
- ErtsThrPrgrMiscVolatile tile;
+ ErtsThrPrgrMiscData data;
char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(
- sizeof(ErtsThrPrgrMiscVolatile))];
- } vola;
+ sizeof(ErtsThrPrgrMiscData))];
+ } misc;
ErtsThrPrgrArray *thr;
struct {
int no;
- ErtsThrPrgrWakeupCallback *callback;
+ ErtsThrPrgrCallbacks *callbacks;
ErtsThrPrgrManagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE];
} managed;
struct {
int no;
- ErtsThrPrgrWakeupCallback *callback;
+ ErtsThrPrgrCallbacks *callbacks;
ErtsThrPrgrUnmanagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE];
} unmanaged;
} ErtsThrPrgrInternalData;
@@ -294,36 +307,106 @@ erts_tsd_key_t erts_thr_prgr_data_key__;
static void handle_wakeup_requests(ErtsThrPrgrVal current);
static int got_sched_wakeups(void);
+static erts_aint32_t block_thread(ErtsThrPrgrData *tpd);
static ERTS_INLINE void
wakeup_managed(int id)
{
- ErtsThrPrgrWakeupCallback *wdp = &intrnl->managed.callback[id];
+ ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[id];
ASSERT(0 <= id && id < intrnl->managed.no);
- wdp->wakeup(wdp->arg);
+ cbp->wakeup(cbp->arg);
}
static ERTS_INLINE void
wakeup_unmanaged(int id)
{
- ErtsThrPrgrWakeupCallback *wdp = &intrnl->unmanaged.callback[id];
+ ErtsThrPrgrCallbacks *cbp = &intrnl->unmanaged.callbacks[id];
ASSERT(0 <= id && id < intrnl->unmanaged.no);
- wdp->wakeup(wdp->arg);
+ cbp->wakeup(cbp->arg);
}
static ERTS_INLINE ErtsThrPrgrData *
-thr_prgr_data(ErtsSchedulerData *esdp)
+perhaps_thr_prgr_data(ErtsSchedulerData *esdp)
{
- ErtsThrPrgrData *tpd;
if (esdp)
- tpd = &esdp->thr_progress_data;
+ return &esdp->thr_progress_data;
else
- tpd = erts_tsd_get(erts_thr_prgr_data_key__);
+ return erts_tsd_get(erts_thr_prgr_data_key__);
+}
+
+static ERTS_INLINE ErtsThrPrgrData *
+thr_prgr_data(ErtsSchedulerData *esdp)
+{
+ ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp);
ASSERT(tpd);
return tpd;
}
+static void
+init_tmp_thr_prgr_data(ErtsThrPrgrData *tpd)
+{
+ tpd->id = -1;
+ tpd->is_managed = 0;
+ tpd->is_blocking = 0;
+ tpd->is_temporary = 1;
+
+ erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
+}
+
+static ERTS_INLINE ErtsThrPrgrData *
+tmp_thr_prgr_data(ErtsSchedulerData *esdp)
+{
+ ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp);
+
+ if (!tpd) {
+ /*
+ * We only allocate the part up to the wakeup_request field
+ * which is the first field only used by registered threads
+ */
+ tpd = erts_alloc(ERTS_ALC_T_T_THR_PRGR_DATA,
+ offsetof(ErtsThrPrgrData, wakeup_request));
+ init_tmp_thr_prgr_data(tpd);
+ }
+
+ return tpd;
+}
+
+static ERTS_INLINE void
+return_tmp_thr_prgr_data(ErtsThrPrgrData *tpd)
+{
+ if (tpd->is_temporary) {
+ erts_tsd_set(erts_thr_prgr_data_key__, NULL);
+ erts_free(ERTS_ALC_T_T_THR_PRGR_DATA, tpd);
+ }
+}
+
+static ERTS_INLINE int
+block_count_dec(void)
+{
+ erts_aint32_t block_count;
+ block_count = erts_atomic32_dec_read_mb(&intrnl->misc.data.block_count);
+ if (block_count == 0) {
+ erts_tse_t *event;
+ event = ((erts_tse_t*)
+ erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
+ if (event)
+ erts_tse_set(event);
+ return 1;
+ }
+
+ return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0;
+}
+
+static ERTS_INLINE int
+block_count_inc(void)
+{
+ erts_aint32_t block_count;
+ block_count = erts_atomic32_inc_read_mb(&intrnl->misc.data.block_count);
+ return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0;
+}
+
+
void
erts_thr_progress_pre_init(void)
{
@@ -343,7 +426,7 @@ erts_thr_progress_init(int no_schedulers, int managed, int unmanaged)
intrnl_sz = sizeof(ErtsThrPrgrInternalData);
intrnl_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(intrnl_sz);
- cb_sz = sizeof(ErtsThrPrgrWakeupCallback)*(managed+unmanaged);
+ cb_sz = sizeof(ErtsThrPrgrCallbacks)*(managed+unmanaged);
cb_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(cb_sz);
thr_arr_sz = sizeof(ErtsThrPrgrArray)*managed;
@@ -372,31 +455,36 @@ erts_thr_progress_init(int no_schedulers, int managed, int unmanaged)
intrnl = (ErtsThrPrgrInternalData *) ptr;
ptr += intrnl_sz;
- erts_atomic32_init_nob(&intrnl->vola.tile.lflgs,
+ erts_atomic32_init_nob(&intrnl->misc.data.lflgs,
ERTS_THR_PRGR_LFLG_NO_LEADER);
- erts_atomic32_init_nob(&intrnl->vola.tile.pref_wakeup_used, 0);
- erts_atomic32_init_nob(&intrnl->vola.tile.managed_id, no_schedulers);
- erts_atomic32_init_nob(&intrnl->vola.tile.unmanaged_id, -1);
+ erts_atomic32_init_nob(&intrnl->misc.data.block_count,
+ (ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING
+ | (erts_aint32_t) managed));
+ erts_atomic_init_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL);
+ erts_atomic32_init_nob(&intrnl->misc.data.pref_wakeup_used, 0);
+ erts_atomic32_init_nob(&intrnl->misc.data.managed_count, 0);
+ erts_atomic32_init_nob(&intrnl->misc.data.managed_id, no_schedulers);
+ erts_atomic32_init_nob(&intrnl->misc.data.unmanaged_id, -1);
intrnl->thr = (ErtsThrPrgrArray *) ptr;
ptr += thr_arr_sz;
for (i = 0; i < managed; i++)
init_nob(&intrnl->thr[i].data.current, 0);
- intrnl->managed.callback = (ErtsThrPrgrWakeupCallback *) ptr;
- intrnl->unmanaged.callback = &intrnl->managed.callback[managed];
+ intrnl->managed.callbacks = (ErtsThrPrgrCallbacks *) ptr;
+ intrnl->unmanaged.callbacks = &intrnl->managed.callbacks[managed];
ptr += cb_sz;
intrnl->managed.no = managed;
for (i = 0; i < managed; i++) {
- intrnl->managed.callback[i].arg = NULL;
- intrnl->managed.callback[i].wakeup = NULL;
+ intrnl->managed.callbacks[i].arg = NULL;
+ intrnl->managed.callbacks[i].wakeup = NULL;
}
intrnl->unmanaged.no = unmanaged;
for (i = 0; i < unmanaged; i++) {
- intrnl->unmanaged.callback[i].arg = NULL;
- intrnl->unmanaged.callback[i].wakeup = NULL;
+ intrnl->unmanaged.callbacks[i].arg = NULL;
+ intrnl->unmanaged.callbacks[i].wakeup = NULL;
}
for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) {
@@ -439,21 +527,30 @@ init_wakeup_request_array(ErtsThrPrgrVal *w)
}
void
-erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrWakeupCallback *callback)
+erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks *callbacks)
{
- ErtsThrPrgrData *tpd;
- if (erts_tsd_get(erts_thr_prgr_data_key__))
- erl_exit(ERTS_ABORT_EXIT,
- "%s:%d:%s(): Double register of thread\n",
- __FILE__, __LINE__, __func__);
+ ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
+ int is_blocking = 0;
+
+ if (tpd) {
+ if (!tpd->is_temporary)
+ erl_exit(ERTS_ABORT_EXIT,
+ "%s:%d:%s(): Double register of thread\n",
+ __FILE__, __LINE__, __func__);
+ is_blocking = tpd->is_blocking;
+ return_tmp_thr_prgr_data(tpd);
+ }
+
/*
* We only allocate the part up to the leader field
* which is the first field only used by managed threads
*/
tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA,
offsetof(ErtsThrPrgrData, leader));
- tpd->id = (int) erts_atomic32_inc_read_nob(&intrnl->vola.tile.unmanaged_id);
+ tpd->id = (int) erts_atomic32_inc_read_nob(&intrnl->misc.data.unmanaged_id);
tpd->is_managed = 0;
+ tpd->is_blocking = is_blocking;
+ tpd->is_temporary = 0;
ASSERT(tpd->id >= 0);
if (tpd->id >= intrnl->unmanaged.no)
erl_exit(ERTS_ABORT_EXIT,
@@ -463,32 +560,41 @@ erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrWakeupCallback *callback)
init_wakeup_request_array(&tpd->wakeup_request[0]);
erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
- intrnl->unmanaged.callback[tpd->id] = *callback;
+ ASSERT(callbacks->wakeup);
+
+ intrnl->unmanaged.callbacks[tpd->id] = *callbacks;
}
void
erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp,
- ErtsThrPrgrWakeupCallback *callback,
+ ErtsThrPrgrCallbacks *callbacks,
int pref_wakeup)
{
- ErtsThrPrgrData *tpd;
- if (erts_tsd_get(erts_thr_prgr_data_key__))
- erl_exit(ERTS_ABORT_EXIT,
- "%s:%d:%s(): Double register of thread\n",
- __FILE__, __LINE__, __func__);
+ ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
+ int is_blocking = 0, managed;
+
+ if (tpd) {
+ if (!tpd->is_temporary)
+ erl_exit(ERTS_ABORT_EXIT,
+ "%s:%d:%s(): Double register of thread\n",
+ __FILE__, __LINE__, __func__);
+ is_blocking = tpd->is_blocking;
+ return_tmp_thr_prgr_data(tpd);
+ }
+
if (esdp)
tpd = &esdp->thr_progress_data;
else
tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA, sizeof(ErtsThrPrgrData));
if (pref_wakeup
- && !erts_atomic32_xchg_nob(&intrnl->vola.tile.pref_wakeup_used, 1))
+ && !erts_atomic32_xchg_nob(&intrnl->misc.data.pref_wakeup_used, 1))
tpd->id = 0;
else if (esdp)
tpd->id = (int) esdp->no;
else
- tpd->id = erts_atomic32_inc_read_nob(&intrnl->vola.tile.managed_id);
+ tpd->id = erts_atomic32_inc_read_nob(&intrnl->misc.data.managed_id);
ASSERT(tpd->id >= 0);
if (tpd->id >= intrnl->managed.no)
erl_exit(ERTS_ABORT_EXIT,
@@ -496,6 +602,8 @@ erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp,
__FILE__, __LINE__, __func__);
tpd->is_managed = 1;
+ tpd->is_blocking = is_blocking;
+ tpd->is_temporary = 0;
init_wakeup_request_array(&tpd->wakeup_request[0]);
@@ -507,14 +615,46 @@ erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp,
tpd->previous.current = ERTS_THR_PRGR_VAL_WAITING;
erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
- erts_atomic32_inc_nob(&intrnl->vola.tile.lflgs);
- intrnl->managed.callback[tpd->id] = *callback;
+ erts_atomic32_inc_nob(&intrnl->misc.data.lflgs);
+
+ ASSERT(callbacks->wakeup);
+ ASSERT(callbacks->prepare_wait);
+ ASSERT(callbacks->wait);
+ ASSERT(callbacks->finalize_wait);
+
+ intrnl->managed.callbacks[tpd->id] = *callbacks;
+
+ callbacks->prepare_wait(callbacks->arg);
+ managed = erts_atomic32_inc_read_relb(&intrnl->misc.data.managed_count);
+ if (managed != intrnl->managed.no) {
+ /* Wait until all managed threads have registered... */
+ do {
+ callbacks->wait(callbacks->arg);
+ callbacks->prepare_wait(callbacks->arg);
+ managed = erts_atomic32_read_acqb(&intrnl->misc.data.managed_count);
+ } while (managed != intrnl->managed.no);
+ }
+ else {
+ int id;
+ /* All managed threads have registered; lets go... */
+ for (id = 0; id < managed; id++)
+ if (id != tpd->id)
+ wakeup_managed(id);
+ }
+ callbacks->finalize_wait(callbacks->arg);
}
static ERTS_INLINE int
leader_update(ErtsThrPrgrData *tpd)
{
- if (tpd->leader) {
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ erts_lc_check_exact(NULL, 0);
+#endif
+ if (!tpd->leader) {
+ /* Probably need to block... */
+ block_thread(tpd);
+ }
+ else {
erts_aint32_t lflgs;
ErtsThrPrgrVal next;
int ix, sz, make_progress;
@@ -570,16 +710,24 @@ leader_update(ErtsThrPrgrData *tpd)
handle_wakeup_requests(current);
}
- if (!tpd->active) {
+ if (tpd->active) {
+ lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
+ if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
+ (void) block_thread(tpd);
+ }
+ else {
tpd->leader = 0;
tpd->previous.current = ERTS_THR_PRGR_VAL_WAITING;
#if ERTS_THR_PRGR_PRINT_LEADER
erts_fprintf(stderr, "L <- %d\n", tpd->id);
#endif
+
ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 0);
- lflgs = erts_atomic32_read_bor_relb(&intrnl->vola.tile.lflgs,
+ lflgs = erts_atomic32_read_bor_relb(&intrnl->misc.data.lflgs,
ERTS_THR_PRGR_LFLG_NO_LEADER);
+ if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
+ lflgs = block_thread(tpd);
if (ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0 && got_sched_wakeups())
wakeup_managed(0);
}
@@ -591,10 +739,14 @@ leader_update(ErtsThrPrgrData *tpd)
static int
update(ErtsThrPrgrData *tpd)
{
+ int res;
ErtsThrPrgrVal val;
- if (!tpd->leader) {
+ if (tpd->leader)
+ res = 1;
+ else {
erts_aint32_t lflgs;
+ res = 0;
val = read_acqb(&erts_thr_prgr__.current);
if (tpd->previous.local == val) {
val++;
@@ -604,13 +756,16 @@ update(ErtsThrPrgrData *tpd)
set_mb(&intrnl->thr[tpd->id].data.current, val);
}
- lflgs = erts_atomic32_read_nob(&intrnl->vola.tile.lflgs);
+ lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
+ if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
+ res = 1; /* Need to block in leader_update() */
+
if ((lflgs & ERTS_THR_PRGR_LFLG_NO_LEADER)
&& (tpd->active || ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0)) {
/* Try to take over leadership... */
erts_aint32_t olflgs;
olflgs = erts_atomic32_read_band_acqb(
- &intrnl->vola.tile.lflgs,
+ &intrnl->misc.data.lflgs,
~ERTS_THR_PRGR_LFLG_NO_LEADER);
if (olflgs & ERTS_THR_PRGR_LFLG_NO_LEADER) {
tpd->leader = 1;
@@ -620,8 +775,9 @@ update(ErtsThrPrgrData *tpd)
ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 1);
}
}
+ res |= tpd->leader;
}
- return tpd->leader;
+ return res;
}
int
@@ -643,10 +799,16 @@ erts_thr_progress_prepare_wait(ErtsSchedulerData *esdp)
erts_aint32_t lflgs;
ErtsThrPrgrData *tpd = thr_prgr_data(esdp);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ erts_lc_check_exact(NULL, 0);
+#endif
+
+ block_count_dec();
+
tpd->previous.local = ERTS_THR_PRGR_VAL_WAITING;
set_mb(&intrnl->thr[tpd->id].data.current, ERTS_THR_PRGR_VAL_WAITING);
- lflgs = erts_atomic32_read_nob(&intrnl->vola.tile.lflgs);
+ lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
if (ERTS_THR_PRGR_LFLGS_ALL_WAITING(lflgs) && got_sched_wakeups())
wakeup_managed(0); /* Someone need to make progress */
}
@@ -657,6 +819,10 @@ erts_thr_progress_finalize_wait(ErtsSchedulerData *esdp)
ErtsThrPrgrData *tpd = thr_prgr_data(esdp);
ErtsThrPrgrVal current, val;
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ erts_lc_check_exact(NULL, 0);
+#endif
+
/*
* We aren't allowed to continue until our thread
* progress is past global current.
@@ -673,6 +839,8 @@ erts_thr_progress_finalize_wait(ErtsSchedulerData *esdp)
break;
current = val;
}
+ if (block_count_inc())
+ block_thread(tpd);
if (update(tpd))
leader_update(tpd);
}
@@ -682,25 +850,28 @@ erts_thr_progress_active(ErtsSchedulerData *esdp, int on)
{
ErtsThrPrgrData *tpd = thr_prgr_data(esdp);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ erts_lc_check_exact(NULL, 0);
+#endif
+
ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(tpd->id, on);
if (on) {
ASSERT(!tpd->active);
tpd->active = 1;
- erts_atomic32_inc_nob(&intrnl->vola.tile.lflgs);
+ erts_atomic32_inc_nob(&intrnl->misc.data.lflgs);
}
else {
ASSERT(tpd->active);
tpd->active = 0;
- erts_atomic32_dec_nob(&intrnl->vola.tile.lflgs);
-
+ erts_atomic32_dec_nob(&intrnl->misc.data.lflgs);
if (update(tpd))
leader_update(tpd);
}
#ifdef DEBUG
{
- erts_aint32_t n = erts_atomic32_read_nob(&intrnl->vola.tile.lflgs);
+ erts_aint32_t n = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
n &= ERTS_THR_PRGR_LFLG_ACTIVE_MASK;
ASSERT(tpd->active <= n && n <= intrnl->managed.no);
}
@@ -868,7 +1039,7 @@ erts_thr_progress_wakeup(ErtsSchedulerData *esdp,
ErtsThrPrgrVal value)
{
ErtsThrPrgrData *tpd = thr_prgr_data(esdp);
- ASSERT(tpd);
+ ASSERT(!tpd->is_temporary);
if (tpd->is_managed)
request_wakeup_managed(tpd, value);
else
@@ -957,6 +1128,198 @@ got_sched_wakeups(void)
return 0;
}
+static erts_aint32_t
+block_thread(ErtsThrPrgrData *tpd)
+{
+ erts_aint32_t lflgs;
+ ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[tpd->id];
+
+ do {
+ block_count_dec();
+
+ while (1) {
+ cbp->prepare_wait(cbp->arg);
+ lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
+ if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
+ cbp->wait(cbp->arg);
+ else
+ break;
+ }
+
+ } while (block_count_inc());
+
+ cbp->finalize_wait(cbp->arg);
+
+ return lflgs;
+}
+
+static erts_aint32_t
+thr_progress_block(ErtsThrPrgrData *tpd, int wait)
+{
+ erts_tse_t *event = NULL; /* Remove erroneous warning... sigh... */
+ erts_aint32_t lflgs, bc;
+
+ if (tpd->is_blocking++)
+ return (erts_aint32_t) 0;
+
+ while (1) {
+ lflgs = erts_atomic32_read_bor_nob(&intrnl->misc.data.lflgs,
+ ERTS_THR_PRGR_LFLG_BLOCK);
+ if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
+ block_thread(tpd);
+ else
+ break;
+ }
+
+#if ERTS_THR_PRGR_PRINT_BLOCKERS
+ erts_fprintf(stderr, "block(%d)\n", tpd->id);
+#endif
+
+ ASSERT(ERTS_AINT_NULL
+ == erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
+
+ if (wait) {
+ event = erts_tse_fetch();
+ erts_tse_reset(event);
+ erts_atomic_set_nob(&intrnl->misc.data.blocker_event,
+ (erts_aint_t) event);
+ }
+ if (tpd->is_managed)
+ erts_atomic32_dec_nob(&intrnl->misc.data.block_count);
+ bc = erts_atomic32_read_band_mb(&intrnl->misc.data.block_count,
+ ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING);
+ bc &= ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING;
+ if (wait) {
+ while (bc != 0) {
+ erts_tse_wait(event);
+ erts_tse_reset(event);
+ bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count);
+ }
+ }
+ return bc;
+
+}
+
+void
+erts_thr_progress_block(void)
+{
+ thr_progress_block(tmp_thr_prgr_data(NULL), 1);
+}
+
+void
+erts_thr_progress_fatal_error_block(SWord timeout)
+{
+ ErtsThrPrgrData tpd_buf;
+ ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
+ erts_aint32_t bc;
+ SWord time_left = timeout;
+ SysTimeval to;
+
+ /*
+ * Counting poll intervals may give us a too long timeout
+ * if cpu is busy. If we got tolerant time of day we use it
+ * to prevent this.
+ */
+ if (!erts_disable_tolerant_timeofday) {
+ erts_get_timeval(&to);
+ to.tv_sec += timeout / 1000;
+ to.tv_sec += timeout % 1000;
+ }
+
+ if (!tpd) {
+ /*
+ * We stack allocate since failure to allocate memory may
+ * have caused the problem in the first place. This is ok
+ * since we never complete an unblock after a fatal error
+ * block.
+ */
+ tpd = &tpd_buf;
+ init_tmp_thr_prgr_data(tpd);
+ }
+
+ bc = thr_progress_block(tpd, 0);
+ if (bc == 0)
+ return; /* Succefully blocked all managed threads */
+
+ while (1) {
+ if (erts_milli_sleep(ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL) == 0)
+ time_left -= ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL;
+ bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count);
+ if (bc == 0)
+ break; /* Succefully blocked all managed threads */
+ if (time_left <= 0)
+ break; /* Timeout */
+ if (!erts_disable_tolerant_timeofday) {
+ SysTimeval now;
+ erts_get_timeval(&now);
+ if (now.tv_sec > to.tv_sec)
+ break; /* Timeout */
+ if (now.tv_sec == to.tv_sec && now.tv_usec >= to.tv_usec)
+ break; /* Timeout */
+ }
+ }
+}
+
+void
+erts_thr_progress_unblock(void)
+{
+ erts_tse_t *event;
+ int id, break_id, sz, wakeup;
+ ErtsThrPrgrData *tpd = thr_prgr_data(NULL);
+
+ ASSERT(tpd->is_blocking);
+ if (--tpd->is_blocking)
+ return;
+
+ sz = intrnl->managed.no;
+
+ wakeup = 1;
+ if (!tpd->is_managed)
+ id = break_id = tpd->id < 0 ? 0 : tpd->id % sz;
+ else {
+ break_id = tpd->id;
+ id = break_id + 1;
+ if (id >= sz)
+ id = 0;
+ if (id == break_id)
+ wakeup = 0;
+ erts_atomic32_inc_nob(&intrnl->misc.data.block_count);
+ }
+
+ event = ((erts_tse_t *)
+ erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
+ ASSERT(event);
+ erts_atomic_set_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL);
+
+ erts_atomic32_read_bor_relb(&intrnl->misc.data.block_count,
+ ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING);
+#if ERTS_THR_PRGR_PRINT_BLOCKERS
+ erts_fprintf(stderr, "unblock(%d)\n", tpd->id);
+#endif
+ erts_atomic32_read_band_mb(&intrnl->misc.data.lflgs,
+ ~ERTS_THR_PRGR_LFLG_BLOCK);
+
+ if (wakeup) {
+ do {
+ ErtsThrPrgrVal tmp;
+ tmp = read_nob(&intrnl->thr[id].data.current);
+ if (tmp != ERTS_THR_PRGR_VAL_WAITING)
+ wakeup_managed(id);
+ if (++id >= sz)
+ id = 0;
+ } while (id != break_id);
+ }
+
+ return_tmp_thr_prgr_data(tpd);
+ erts_tse_return(event);
+}
+
+int
+erts_thr_progress_is_blocking(void)
+{
+ ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
+ return tpd && tpd->is_blocking;
+}
void erts_thr_progress_dbg_print_state(void)
{