aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_thr_progress.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_thr_progress.c')
-rw-r--r--erts/emulator/beam/erl_thr_progress.c481
1 files changed, 422 insertions, 59 deletions
diff --git a/erts/emulator/beam/erl_thr_progress.c b/erts/emulator/beam/erl_thr_progress.c
index f96ae4b70d..9324bcde51 100644
--- a/erts/emulator/beam/erl_thr_progress.c
+++ b/erts/emulator/beam/erl_thr_progress.c
@@ -76,6 +76,7 @@
#include <stddef.h> /* offsetof() */
#include "erl_thr_progress.h"
+#include "global.h"
#ifdef ERTS_SMP
@@ -88,9 +89,14 @@
#define ERTS_THR_PRGR_PRINT_LEADER 0
#define ERTS_THR_PRGR_PRINT_VAL 0
+#define ERTS_THR_PRGR_PRINT_BLOCKERS 0
-#define ERTS_THR_PRGR_LFLG_NO_LEADER (((erts_aint32_t) 1) << 31)
-#define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~ERTS_THR_PRGR_LFLG_NO_LEADER)
+#define ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL 100
+
+#define ERTS_THR_PRGR_LFLG_BLOCK (((erts_aint32_t) 1) << 31)
+#define ERTS_THR_PRGR_LFLG_NO_LEADER (((erts_aint32_t) 1) << 30)
+#define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~(ERTS_THR_PRGR_LFLG_NO_LEADER \
+ | ERTS_THR_PRGR_LFLG_BLOCK))
#define ERTS_THR_PRGR_LFLGS_ACTIVE(LFLGS) \
((LFLGS) & ERTS_THR_PRGR_LFLG_ACTIVE_MASK)
@@ -225,6 +231,11 @@ do { \
#endif /* ERTS_THR_PROGRESS_STATE_DEBUG */
+#define ERTS_THR_PRGR_BLCKR_INVALID (~((erts_aint32_t) 0))
+#define ERTS_THR_PRGR_BLCKR_UNMANAGED (((erts_aint32_t) 1) << 31)
+
+#define ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING (((erts_aint32_t) 1) << 31)
+
#define ERTS_THR_PRGR_BM_BITS 32
#define ERTS_THR_PRGR_BM_SHIFT 5
#define ERTS_THR_PRGR_BM_MASK 0x1f
@@ -249,11 +260,13 @@ typedef struct {
typedef struct {
erts_atomic32_t lflgs;
-
+ erts_atomic32_t block_count;
+ erts_atomic_t blocker_event;
erts_atomic32_t pref_wakeup_used;
+ erts_atomic32_t managed_count;
erts_atomic32_t managed_id;
erts_atomic32_t unmanaged_id;
-} ErtsThrPrgrMiscVolatile;
+} ErtsThrPrgrMiscData;
typedef struct {
ERTS_THR_PRGR_ATOMIC current;
@@ -269,19 +282,19 @@ typedef union {
typedef struct {
union {
- ErtsThrPrgrMiscVolatile tile;
+ ErtsThrPrgrMiscData data;
char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(
- sizeof(ErtsThrPrgrMiscVolatile))];
- } vola;
+ sizeof(ErtsThrPrgrMiscData))];
+ } misc;
ErtsThrPrgrArray *thr;
struct {
int no;
- ErtsThrPrgrWakeupCallback *callback;
+ ErtsThrPrgrCallbacks *callbacks;
ErtsThrPrgrManagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE];
} managed;
struct {
int no;
- ErtsThrPrgrWakeupCallback *callback;
+ ErtsThrPrgrCallbacks *callbacks;
ErtsThrPrgrUnmanagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE];
} unmanaged;
} ErtsThrPrgrInternalData;
@@ -294,36 +307,106 @@ erts_tsd_key_t erts_thr_prgr_data_key__;
static void handle_wakeup_requests(ErtsThrPrgrVal current);
static int got_sched_wakeups(void);
+static erts_aint32_t block_thread(ErtsThrPrgrData *tpd);
static ERTS_INLINE void
wakeup_managed(int id)
{
- ErtsThrPrgrWakeupCallback *wdp = &intrnl->managed.callback[id];
+ ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[id];
ASSERT(0 <= id && id < intrnl->managed.no);
- wdp->wakeup(wdp->arg);
+ cbp->wakeup(cbp->arg);
}
static ERTS_INLINE void
wakeup_unmanaged(int id)
{
- ErtsThrPrgrWakeupCallback *wdp = &intrnl->unmanaged.callback[id];
+ ErtsThrPrgrCallbacks *cbp = &intrnl->unmanaged.callbacks[id];
ASSERT(0 <= id && id < intrnl->unmanaged.no);
- wdp->wakeup(wdp->arg);
+ cbp->wakeup(cbp->arg);
}
static ERTS_INLINE ErtsThrPrgrData *
-thr_prgr_data(ErtsSchedulerData *esdp)
+perhaps_thr_prgr_data(ErtsSchedulerData *esdp)
{
- ErtsThrPrgrData *tpd;
if (esdp)
- tpd = &esdp->thr_progress_data;
+ return &esdp->thr_progress_data;
else
- tpd = erts_tsd_get(erts_thr_prgr_data_key__);
+ return erts_tsd_get(erts_thr_prgr_data_key__);
+}
+
+static ERTS_INLINE ErtsThrPrgrData *
+thr_prgr_data(ErtsSchedulerData *esdp)
+{
+ ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp);
ASSERT(tpd);
return tpd;
}
+static void
+init_tmp_thr_prgr_data(ErtsThrPrgrData *tpd)
+{
+ tpd->id = -1;
+ tpd->is_managed = 0;
+ tpd->is_blocking = 0;
+ tpd->is_temporary = 1;
+
+ erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
+}
+
+static ERTS_INLINE ErtsThrPrgrData *
+tmp_thr_prgr_data(ErtsSchedulerData *esdp)
+{
+ ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp);
+
+ if (!tpd) {
+ /*
+ * We only allocate the part up to the wakeup_request field
+ * which is the first field only used by registered threads
+ */
+ tpd = erts_alloc(ERTS_ALC_T_T_THR_PRGR_DATA,
+ offsetof(ErtsThrPrgrData, wakeup_request));
+ init_tmp_thr_prgr_data(tpd);
+ }
+
+ return tpd;
+}
+
+static ERTS_INLINE void
+return_tmp_thr_prgr_data(ErtsThrPrgrData *tpd)
+{
+ if (tpd->is_temporary) {
+ erts_tsd_set(erts_thr_prgr_data_key__, NULL);
+ erts_free(ERTS_ALC_T_T_THR_PRGR_DATA, tpd);
+ }
+}
+
+static ERTS_INLINE int
+block_count_dec(void)
+{
+ erts_aint32_t block_count;
+ block_count = erts_atomic32_dec_read_mb(&intrnl->misc.data.block_count);
+ if (block_count == 0) {
+ erts_tse_t *event;
+ event = ((erts_tse_t*)
+ erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
+ if (event)
+ erts_tse_set(event);
+ return 1;
+ }
+
+ return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0;
+}
+
+static ERTS_INLINE int
+block_count_inc(void)
+{
+ erts_aint32_t block_count;
+ block_count = erts_atomic32_inc_read_mb(&intrnl->misc.data.block_count);
+ return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0;
+}
+
+
void
erts_thr_progress_pre_init(void)
{
@@ -343,7 +426,7 @@ erts_thr_progress_init(int no_schedulers, int managed, int unmanaged)
intrnl_sz = sizeof(ErtsThrPrgrInternalData);
intrnl_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(intrnl_sz);
- cb_sz = sizeof(ErtsThrPrgrWakeupCallback)*(managed+unmanaged);
+ cb_sz = sizeof(ErtsThrPrgrCallbacks)*(managed+unmanaged);
cb_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(cb_sz);
thr_arr_sz = sizeof(ErtsThrPrgrArray)*managed;
@@ -372,31 +455,36 @@ erts_thr_progress_init(int no_schedulers, int managed, int unmanaged)
intrnl = (ErtsThrPrgrInternalData *) ptr;
ptr += intrnl_sz;
- erts_atomic32_init_nob(&intrnl->vola.tile.lflgs,
+ erts_atomic32_init_nob(&intrnl->misc.data.lflgs,
ERTS_THR_PRGR_LFLG_NO_LEADER);
- erts_atomic32_init_nob(&intrnl->vola.tile.pref_wakeup_used, 0);
- erts_atomic32_init_nob(&intrnl->vola.tile.managed_id, no_schedulers);
- erts_atomic32_init_nob(&intrnl->vola.tile.unmanaged_id, -1);
+ erts_atomic32_init_nob(&intrnl->misc.data.block_count,
+ (ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING
+ | (erts_aint32_t) managed));
+ erts_atomic_init_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL);
+ erts_atomic32_init_nob(&intrnl->misc.data.pref_wakeup_used, 0);
+ erts_atomic32_init_nob(&intrnl->misc.data.managed_count, 0);
+ erts_atomic32_init_nob(&intrnl->misc.data.managed_id, no_schedulers);
+ erts_atomic32_init_nob(&intrnl->misc.data.unmanaged_id, -1);
intrnl->thr = (ErtsThrPrgrArray *) ptr;
ptr += thr_arr_sz;
for (i = 0; i < managed; i++)
init_nob(&intrnl->thr[i].data.current, 0);
- intrnl->managed.callback = (ErtsThrPrgrWakeupCallback *) ptr;
- intrnl->unmanaged.callback = &intrnl->managed.callback[managed];
+ intrnl->managed.callbacks = (ErtsThrPrgrCallbacks *) ptr;
+ intrnl->unmanaged.callbacks = &intrnl->managed.callbacks[managed];
ptr += cb_sz;
intrnl->managed.no = managed;
for (i = 0; i < managed; i++) {
- intrnl->managed.callback[i].arg = NULL;
- intrnl->managed.callback[i].wakeup = NULL;
+ intrnl->managed.callbacks[i].arg = NULL;
+ intrnl->managed.callbacks[i].wakeup = NULL;
}
intrnl->unmanaged.no = unmanaged;
for (i = 0; i < unmanaged; i++) {
- intrnl->unmanaged.callback[i].arg = NULL;
- intrnl->unmanaged.callback[i].wakeup = NULL;
+ intrnl->unmanaged.callbacks[i].arg = NULL;
+ intrnl->unmanaged.callbacks[i].wakeup = NULL;
}
for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) {
@@ -439,21 +527,30 @@ init_wakeup_request_array(ErtsThrPrgrVal *w)
}
void
-erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrWakeupCallback *callback)
+erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks *callbacks)
{
- ErtsThrPrgrData *tpd;
- if (erts_tsd_get(erts_thr_prgr_data_key__))
- erl_exit(ERTS_ABORT_EXIT,
- "%s:%d:%s(): Double register of thread\n",
- __FILE__, __LINE__, __func__);
+ ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
+ int is_blocking = 0;
+
+ if (tpd) {
+ if (!tpd->is_temporary)
+ erl_exit(ERTS_ABORT_EXIT,
+ "%s:%d:%s(): Double register of thread\n",
+ __FILE__, __LINE__, __func__);
+ is_blocking = tpd->is_blocking;
+ return_tmp_thr_prgr_data(tpd);
+ }
+
/*
* We only allocate the part up to the leader field
* which is the first field only used by managed threads
*/
tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA,
offsetof(ErtsThrPrgrData, leader));
- tpd->id = (int) erts_atomic32_inc_read_nob(&intrnl->vola.tile.unmanaged_id);
+ tpd->id = (int) erts_atomic32_inc_read_nob(&intrnl->misc.data.unmanaged_id);
tpd->is_managed = 0;
+ tpd->is_blocking = is_blocking;
+ tpd->is_temporary = 0;
ASSERT(tpd->id >= 0);
if (tpd->id >= intrnl->unmanaged.no)
erl_exit(ERTS_ABORT_EXIT,
@@ -463,32 +560,41 @@ erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrWakeupCallback *callback)
init_wakeup_request_array(&tpd->wakeup_request[0]);
erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
- intrnl->unmanaged.callback[tpd->id] = *callback;
+ ASSERT(callbacks->wakeup);
+
+ intrnl->unmanaged.callbacks[tpd->id] = *callbacks;
}
void
erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp,
- ErtsThrPrgrWakeupCallback *callback,
+ ErtsThrPrgrCallbacks *callbacks,
int pref_wakeup)
{
- ErtsThrPrgrData *tpd;
- if (erts_tsd_get(erts_thr_prgr_data_key__))
- erl_exit(ERTS_ABORT_EXIT,
- "%s:%d:%s(): Double register of thread\n",
- __FILE__, __LINE__, __func__);
+ ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
+ int is_blocking = 0, managed;
+
+ if (tpd) {
+ if (!tpd->is_temporary)
+ erl_exit(ERTS_ABORT_EXIT,
+ "%s:%d:%s(): Double register of thread\n",
+ __FILE__, __LINE__, __func__);
+ is_blocking = tpd->is_blocking;
+ return_tmp_thr_prgr_data(tpd);
+ }
+
if (esdp)
tpd = &esdp->thr_progress_data;
else
tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA, sizeof(ErtsThrPrgrData));
if (pref_wakeup
- && !erts_atomic32_xchg_nob(&intrnl->vola.tile.pref_wakeup_used, 1))
+ && !erts_atomic32_xchg_nob(&intrnl->misc.data.pref_wakeup_used, 1))
tpd->id = 0;
else if (esdp)
tpd->id = (int) esdp->no;
else
- tpd->id = erts_atomic32_inc_read_nob(&intrnl->vola.tile.managed_id);
+ tpd->id = erts_atomic32_inc_read_nob(&intrnl->misc.data.managed_id);
ASSERT(tpd->id >= 0);
if (tpd->id >= intrnl->managed.no)
erl_exit(ERTS_ABORT_EXIT,
@@ -496,6 +602,8 @@ erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp,
__FILE__, __LINE__, __func__);
tpd->is_managed = 1;
+ tpd->is_blocking = is_blocking;
+ tpd->is_temporary = 0;
init_wakeup_request_array(&tpd->wakeup_request[0]);
@@ -507,14 +615,46 @@ erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp,
tpd->previous.current = ERTS_THR_PRGR_VAL_WAITING;
erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
- erts_atomic32_inc_nob(&intrnl->vola.tile.lflgs);
- intrnl->managed.callback[tpd->id] = *callback;
+ erts_atomic32_inc_nob(&intrnl->misc.data.lflgs);
+
+ ASSERT(callbacks->wakeup);
+ ASSERT(callbacks->prepare_wait);
+ ASSERT(callbacks->wait);
+ ASSERT(callbacks->finalize_wait);
+
+ intrnl->managed.callbacks[tpd->id] = *callbacks;
+
+ callbacks->prepare_wait(callbacks->arg);
+ managed = erts_atomic32_inc_read_relb(&intrnl->misc.data.managed_count);
+ if (managed != intrnl->managed.no) {
+ /* Wait until all managed threads have registered... */
+ do {
+ callbacks->wait(callbacks->arg);
+ callbacks->prepare_wait(callbacks->arg);
+ managed = erts_atomic32_read_acqb(&intrnl->misc.data.managed_count);
+ } while (managed != intrnl->managed.no);
+ }
+ else {
+ int id;
+ /* All managed threads have registered; lets go... */
+ for (id = 0; id < managed; id++)
+ if (id != tpd->id)
+ wakeup_managed(id);
+ }
+ callbacks->finalize_wait(callbacks->arg);
}
static ERTS_INLINE int
leader_update(ErtsThrPrgrData *tpd)
{
- if (tpd->leader) {
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ erts_lc_check_exact(NULL, 0);
+#endif
+ if (!tpd->leader) {
+ /* Probably need to block... */
+ block_thread(tpd);
+ }
+ else {
erts_aint32_t lflgs;
ErtsThrPrgrVal next;
int ix, sz, make_progress;
@@ -570,16 +710,24 @@ leader_update(ErtsThrPrgrData *tpd)
handle_wakeup_requests(current);
}
- if (!tpd->active) {
+ if (tpd->active) {
+ lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
+ if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
+ (void) block_thread(tpd);
+ }
+ else {
tpd->leader = 0;
tpd->previous.current = ERTS_THR_PRGR_VAL_WAITING;
#if ERTS_THR_PRGR_PRINT_LEADER
erts_fprintf(stderr, "L <- %d\n", tpd->id);
#endif
+
ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 0);
- lflgs = erts_atomic32_read_bor_relb(&intrnl->vola.tile.lflgs,
+ lflgs = erts_atomic32_read_bor_relb(&intrnl->misc.data.lflgs,
ERTS_THR_PRGR_LFLG_NO_LEADER);
+ if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
+ lflgs = block_thread(tpd);
if (ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0 && got_sched_wakeups())
wakeup_managed(0);
}
@@ -591,10 +739,14 @@ leader_update(ErtsThrPrgrData *tpd)
static int
update(ErtsThrPrgrData *tpd)
{
+ int res;
ErtsThrPrgrVal val;
- if (!tpd->leader) {
+ if (tpd->leader)
+ res = 1;
+ else {
erts_aint32_t lflgs;
+ res = 0;
val = read_acqb(&erts_thr_prgr__.current);
if (tpd->previous.local == val) {
val++;
@@ -604,13 +756,16 @@ update(ErtsThrPrgrData *tpd)
set_mb(&intrnl->thr[tpd->id].data.current, val);
}
- lflgs = erts_atomic32_read_nob(&intrnl->vola.tile.lflgs);
+ lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
+ if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
+ res = 1; /* Need to block in leader_update() */
+
if ((lflgs & ERTS_THR_PRGR_LFLG_NO_LEADER)
&& (tpd->active || ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0)) {
/* Try to take over leadership... */
erts_aint32_t olflgs;
olflgs = erts_atomic32_read_band_acqb(
- &intrnl->vola.tile.lflgs,
+ &intrnl->misc.data.lflgs,
~ERTS_THR_PRGR_LFLG_NO_LEADER);
if (olflgs & ERTS_THR_PRGR_LFLG_NO_LEADER) {
tpd->leader = 1;
@@ -620,8 +775,9 @@ update(ErtsThrPrgrData *tpd)
ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 1);
}
}
+ res |= tpd->leader;
}
- return tpd->leader;
+ return res;
}
int
@@ -643,10 +799,16 @@ erts_thr_progress_prepare_wait(ErtsSchedulerData *esdp)
erts_aint32_t lflgs;
ErtsThrPrgrData *tpd = thr_prgr_data(esdp);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ erts_lc_check_exact(NULL, 0);
+#endif
+
+ block_count_dec();
+
tpd->previous.local = ERTS_THR_PRGR_VAL_WAITING;
set_mb(&intrnl->thr[tpd->id].data.current, ERTS_THR_PRGR_VAL_WAITING);
- lflgs = erts_atomic32_read_nob(&intrnl->vola.tile.lflgs);
+ lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
if (ERTS_THR_PRGR_LFLGS_ALL_WAITING(lflgs) && got_sched_wakeups())
wakeup_managed(0); /* Someone need to make progress */
}
@@ -657,6 +819,10 @@ erts_thr_progress_finalize_wait(ErtsSchedulerData *esdp)
ErtsThrPrgrData *tpd = thr_prgr_data(esdp);
ErtsThrPrgrVal current, val;
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ erts_lc_check_exact(NULL, 0);
+#endif
+
/*
* We aren't allowed to continue until our thread
* progress is past global current.
@@ -673,6 +839,8 @@ erts_thr_progress_finalize_wait(ErtsSchedulerData *esdp)
break;
current = val;
}
+ if (block_count_inc())
+ block_thread(tpd);
if (update(tpd))
leader_update(tpd);
}
@@ -682,25 +850,28 @@ erts_thr_progress_active(ErtsSchedulerData *esdp, int on)
{
ErtsThrPrgrData *tpd = thr_prgr_data(esdp);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ erts_lc_check_exact(NULL, 0);
+#endif
+
ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(tpd->id, on);
if (on) {
ASSERT(!tpd->active);
tpd->active = 1;
- erts_atomic32_inc_nob(&intrnl->vola.tile.lflgs);
+ erts_atomic32_inc_nob(&intrnl->misc.data.lflgs);
}
else {
ASSERT(tpd->active);
tpd->active = 0;
- erts_atomic32_dec_nob(&intrnl->vola.tile.lflgs);
-
+ erts_atomic32_dec_nob(&intrnl->misc.data.lflgs);
if (update(tpd))
leader_update(tpd);
}
#ifdef DEBUG
{
- erts_aint32_t n = erts_atomic32_read_nob(&intrnl->vola.tile.lflgs);
+ erts_aint32_t n = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
n &= ERTS_THR_PRGR_LFLG_ACTIVE_MASK;
ASSERT(tpd->active <= n && n <= intrnl->managed.no);
}
@@ -868,7 +1039,7 @@ erts_thr_progress_wakeup(ErtsSchedulerData *esdp,
ErtsThrPrgrVal value)
{
ErtsThrPrgrData *tpd = thr_prgr_data(esdp);
- ASSERT(tpd);
+ ASSERT(!tpd->is_temporary);
if (tpd->is_managed)
request_wakeup_managed(tpd, value);
else
@@ -957,6 +1128,198 @@ got_sched_wakeups(void)
return 0;
}
+static erts_aint32_t
+block_thread(ErtsThrPrgrData *tpd)
+{
+ erts_aint32_t lflgs;
+ ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[tpd->id];
+
+ do {
+ block_count_dec();
+
+ while (1) {
+ cbp->prepare_wait(cbp->arg);
+ lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
+ if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
+ cbp->wait(cbp->arg);
+ else
+ break;
+ }
+
+ } while (block_count_inc());
+
+ cbp->finalize_wait(cbp->arg);
+
+ return lflgs;
+}
+
+static erts_aint32_t
+thr_progress_block(ErtsThrPrgrData *tpd, int wait)
+{
+ erts_tse_t *event = NULL; /* Remove erroneous warning... sigh... */
+ erts_aint32_t lflgs, bc;
+
+ if (tpd->is_blocking++)
+ return (erts_aint32_t) 0;
+
+ while (1) {
+ lflgs = erts_atomic32_read_bor_nob(&intrnl->misc.data.lflgs,
+ ERTS_THR_PRGR_LFLG_BLOCK);
+ if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
+ block_thread(tpd);
+ else
+ break;
+ }
+
+#if ERTS_THR_PRGR_PRINT_BLOCKERS
+ erts_fprintf(stderr, "block(%d)\n", tpd->id);
+#endif
+
+ ASSERT(ERTS_AINT_NULL
+ == erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
+
+ if (wait) {
+ event = erts_tse_fetch();
+ erts_tse_reset(event);
+ erts_atomic_set_nob(&intrnl->misc.data.blocker_event,
+ (erts_aint_t) event);
+ }
+ if (tpd->is_managed)
+ erts_atomic32_dec_nob(&intrnl->misc.data.block_count);
+ bc = erts_atomic32_read_band_mb(&intrnl->misc.data.block_count,
+ ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING);
+ bc &= ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING;
+ if (wait) {
+ while (bc != 0) {
+ erts_tse_wait(event);
+ erts_tse_reset(event);
+ bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count);
+ }
+ }
+ return bc;
+
+}
+
+void
+erts_thr_progress_block(void)
+{
+ thr_progress_block(tmp_thr_prgr_data(NULL), 1);
+}
+
+void
+erts_thr_progress_fatal_error_block(SWord timeout)
+{
+ ErtsThrPrgrData tpd_buf;
+ ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
+ erts_aint32_t bc;
+ SWord time_left = timeout;
+ SysTimeval to;
+
+ /*
+ * Counting poll intervals may give us a too long timeout
+ * if cpu is busy. If we got tolerant time of day we use it
+ * to prevent this.
+ */
+ if (!erts_disable_tolerant_timeofday) {
+ erts_get_timeval(&to);
+ to.tv_sec += timeout / 1000;
+ to.tv_sec += timeout % 1000;
+ }
+
+ if (!tpd) {
+ /*
+ * We stack allocate since failure to allocate memory may
+ * have caused the problem in the first place. This is ok
+ * since we never complete an unblock after a fatal error
+ * block.
+ */
+ tpd = &tpd_buf;
+ init_tmp_thr_prgr_data(tpd);
+ }
+
+ bc = thr_progress_block(tpd, 0);
+ if (bc == 0)
+ return; /* Succefully blocked all managed threads */
+
+ while (1) {
+ if (erts_milli_sleep(ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL) == 0)
+ time_left -= ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL;
+ bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count);
+ if (bc == 0)
+ break; /* Succefully blocked all managed threads */
+ if (time_left <= 0)
+ break; /* Timeout */
+ if (!erts_disable_tolerant_timeofday) {
+ SysTimeval now;
+ erts_get_timeval(&now);
+ if (now.tv_sec > to.tv_sec)
+ break; /* Timeout */
+ if (now.tv_sec == to.tv_sec && now.tv_usec >= to.tv_usec)
+ break; /* Timeout */
+ }
+ }
+}
+
+void
+erts_thr_progress_unblock(void)
+{
+ erts_tse_t *event;
+ int id, break_id, sz, wakeup;
+ ErtsThrPrgrData *tpd = thr_prgr_data(NULL);
+
+ ASSERT(tpd->is_blocking);
+ if (--tpd->is_blocking)
+ return;
+
+ sz = intrnl->managed.no;
+
+ wakeup = 1;
+ if (!tpd->is_managed)
+ id = break_id = tpd->id < 0 ? 0 : tpd->id % sz;
+ else {
+ break_id = tpd->id;
+ id = break_id + 1;
+ if (id >= sz)
+ id = 0;
+ if (id == break_id)
+ wakeup = 0;
+ erts_atomic32_inc_nob(&intrnl->misc.data.block_count);
+ }
+
+ event = ((erts_tse_t *)
+ erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
+ ASSERT(event);
+ erts_atomic_set_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL);
+
+ erts_atomic32_read_bor_relb(&intrnl->misc.data.block_count,
+ ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING);
+#if ERTS_THR_PRGR_PRINT_BLOCKERS
+ erts_fprintf(stderr, "unblock(%d)\n", tpd->id);
+#endif
+ erts_atomic32_read_band_mb(&intrnl->misc.data.lflgs,
+ ~ERTS_THR_PRGR_LFLG_BLOCK);
+
+ if (wakeup) {
+ do {
+ ErtsThrPrgrVal tmp;
+ tmp = read_nob(&intrnl->thr[id].data.current);
+ if (tmp != ERTS_THR_PRGR_VAL_WAITING)
+ wakeup_managed(id);
+ if (++id >= sz)
+ id = 0;
+ } while (id != break_id);
+ }
+
+ return_tmp_thr_prgr_data(tpd);
+ erts_tse_return(event);
+}
+
+int
+erts_thr_progress_is_blocking(void)
+{
+ ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
+ return tpd && tpd->is_blocking;
+}
void erts_thr_progress_dbg_print_state(void)
{