diff options
author | Rickard Green <[email protected]> | 2011-10-10 21:03:10 +0200 |
---|---|---|
committer | Rickard Green <[email protected]> | 2011-11-13 20:40:56 +0100 |
commit | bc5818cfdd56e19a16357f4443d80a56426aa134 (patch) | |
tree | bfe53d3e0cf24ccd7374e0174c1a4c441ab2e097 /erts/emulator/beam/erl_thr_progress.c | |
parent | a67e91e658bdbba24fcc3c79b06fdf10ff830bc9 (diff) | |
download | otp-bc5818cfdd56e19a16357f4443d80a56426aa134.tar.gz otp-bc5818cfdd56e19a16357f4443d80a56426aa134.tar.bz2 otp-bc5818cfdd56e19a16357f4443d80a56426aa134.zip |
Replace system block with thread progress block
The ERTS internal system block functionality has been replaced by
new functionality for blocking the system. The old system block
functionality had contention issues and complexity issues. The
new functionality piggy-backs on thread progress tracking functionality
needed by newly introduced lock-free synchronization in the runtime
system. When the functionality for blocking the system isn't used
there is more or less no overhead at all. This since the functionality
for tracking thread progress is there and needed anyway.
Diffstat (limited to 'erts/emulator/beam/erl_thr_progress.c')
-rw-r--r-- | erts/emulator/beam/erl_thr_progress.c | 481 |
1 files changed, 422 insertions, 59 deletions
diff --git a/erts/emulator/beam/erl_thr_progress.c b/erts/emulator/beam/erl_thr_progress.c index f96ae4b70d..9324bcde51 100644 --- a/erts/emulator/beam/erl_thr_progress.c +++ b/erts/emulator/beam/erl_thr_progress.c @@ -76,6 +76,7 @@ #include <stddef.h> /* offsetof() */ #include "erl_thr_progress.h" +#include "global.h" #ifdef ERTS_SMP @@ -88,9 +89,14 @@ #define ERTS_THR_PRGR_PRINT_LEADER 0 #define ERTS_THR_PRGR_PRINT_VAL 0 +#define ERTS_THR_PRGR_PRINT_BLOCKERS 0 -#define ERTS_THR_PRGR_LFLG_NO_LEADER (((erts_aint32_t) 1) << 31) -#define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~ERTS_THR_PRGR_LFLG_NO_LEADER) +#define ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL 100 + +#define ERTS_THR_PRGR_LFLG_BLOCK (((erts_aint32_t) 1) << 31) +#define ERTS_THR_PRGR_LFLG_NO_LEADER (((erts_aint32_t) 1) << 30) +#define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~(ERTS_THR_PRGR_LFLG_NO_LEADER \ + | ERTS_THR_PRGR_LFLG_BLOCK)) #define ERTS_THR_PRGR_LFLGS_ACTIVE(LFLGS) \ ((LFLGS) & ERTS_THR_PRGR_LFLG_ACTIVE_MASK) @@ -225,6 +231,11 @@ do { \ #endif /* ERTS_THR_PROGRESS_STATE_DEBUG */ +#define ERTS_THR_PRGR_BLCKR_INVALID (~((erts_aint32_t) 0)) +#define ERTS_THR_PRGR_BLCKR_UNMANAGED (((erts_aint32_t) 1) << 31) + +#define ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING (((erts_aint32_t) 1) << 31) + #define ERTS_THR_PRGR_BM_BITS 32 #define ERTS_THR_PRGR_BM_SHIFT 5 #define ERTS_THR_PRGR_BM_MASK 0x1f @@ -249,11 +260,13 @@ typedef struct { typedef struct { erts_atomic32_t lflgs; - + erts_atomic32_t block_count; + erts_atomic_t blocker_event; erts_atomic32_t pref_wakeup_used; + erts_atomic32_t managed_count; erts_atomic32_t managed_id; erts_atomic32_t unmanaged_id; -} ErtsThrPrgrMiscVolatile; +} ErtsThrPrgrMiscData; typedef struct { ERTS_THR_PRGR_ATOMIC current; @@ -269,19 +282,19 @@ typedef union { typedef struct { union { - ErtsThrPrgrMiscVolatile tile; + ErtsThrPrgrMiscData data; char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE( - sizeof(ErtsThrPrgrMiscVolatile))]; - } vola; + sizeof(ErtsThrPrgrMiscData))]; + } misc; ErtsThrPrgrArray *thr; struct { int no; - ErtsThrPrgrWakeupCallback *callback; + ErtsThrPrgrCallbacks *callbacks; ErtsThrPrgrManagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE]; } managed; struct { int no; - ErtsThrPrgrWakeupCallback *callback; + ErtsThrPrgrCallbacks *callbacks; ErtsThrPrgrUnmanagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE]; } unmanaged; } ErtsThrPrgrInternalData; @@ -294,36 +307,106 @@ erts_tsd_key_t erts_thr_prgr_data_key__; static void handle_wakeup_requests(ErtsThrPrgrVal current); static int got_sched_wakeups(void); +static erts_aint32_t block_thread(ErtsThrPrgrData *tpd); static ERTS_INLINE void wakeup_managed(int id) { - ErtsThrPrgrWakeupCallback *wdp = &intrnl->managed.callback[id]; + ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[id]; ASSERT(0 <= id && id < intrnl->managed.no); - wdp->wakeup(wdp->arg); + cbp->wakeup(cbp->arg); } static ERTS_INLINE void wakeup_unmanaged(int id) { - ErtsThrPrgrWakeupCallback *wdp = &intrnl->unmanaged.callback[id]; + ErtsThrPrgrCallbacks *cbp = &intrnl->unmanaged.callbacks[id]; ASSERT(0 <= id && id < intrnl->unmanaged.no); - wdp->wakeup(wdp->arg); + cbp->wakeup(cbp->arg); } static ERTS_INLINE ErtsThrPrgrData * -thr_prgr_data(ErtsSchedulerData *esdp) +perhaps_thr_prgr_data(ErtsSchedulerData *esdp) { - ErtsThrPrgrData *tpd; if (esdp) - tpd = &esdp->thr_progress_data; + return &esdp->thr_progress_data; else - tpd = erts_tsd_get(erts_thr_prgr_data_key__); + return erts_tsd_get(erts_thr_prgr_data_key__); +} + +static ERTS_INLINE ErtsThrPrgrData * +thr_prgr_data(ErtsSchedulerData *esdp) +{ + ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp); ASSERT(tpd); return tpd; } +static void +init_tmp_thr_prgr_data(ErtsThrPrgrData *tpd) +{ + tpd->id = -1; + tpd->is_managed = 0; + tpd->is_blocking = 0; + tpd->is_temporary = 1; + + erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd); +} + +static ERTS_INLINE ErtsThrPrgrData * +tmp_thr_prgr_data(ErtsSchedulerData *esdp) +{ + ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp); + + if (!tpd) { + /* + * We only allocate the part up to the wakeup_request field + * which is the first field only used by registered threads + */ + tpd = erts_alloc(ERTS_ALC_T_T_THR_PRGR_DATA, + offsetof(ErtsThrPrgrData, wakeup_request)); + init_tmp_thr_prgr_data(tpd); + } + + return tpd; +} + +static ERTS_INLINE void +return_tmp_thr_prgr_data(ErtsThrPrgrData *tpd) +{ + if (tpd->is_temporary) { + erts_tsd_set(erts_thr_prgr_data_key__, NULL); + erts_free(ERTS_ALC_T_T_THR_PRGR_DATA, tpd); + } +} + +static ERTS_INLINE int +block_count_dec(void) +{ + erts_aint32_t block_count; + block_count = erts_atomic32_dec_read_mb(&intrnl->misc.data.block_count); + if (block_count == 0) { + erts_tse_t *event; + event = ((erts_tse_t*) + erts_atomic_read_nob(&intrnl->misc.data.blocker_event)); + if (event) + erts_tse_set(event); + return 1; + } + + return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0; +} + +static ERTS_INLINE int +block_count_inc(void) +{ + erts_aint32_t block_count; + block_count = erts_atomic32_inc_read_mb(&intrnl->misc.data.block_count); + return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0; +} + + void erts_thr_progress_pre_init(void) { @@ -343,7 +426,7 @@ erts_thr_progress_init(int no_schedulers, int managed, int unmanaged) intrnl_sz = sizeof(ErtsThrPrgrInternalData); intrnl_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(intrnl_sz); - cb_sz = sizeof(ErtsThrPrgrWakeupCallback)*(managed+unmanaged); + cb_sz = sizeof(ErtsThrPrgrCallbacks)*(managed+unmanaged); cb_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(cb_sz); thr_arr_sz = sizeof(ErtsThrPrgrArray)*managed; @@ -372,31 +455,36 @@ erts_thr_progress_init(int no_schedulers, int managed, int unmanaged) intrnl = (ErtsThrPrgrInternalData *) ptr; ptr += intrnl_sz; - erts_atomic32_init_nob(&intrnl->vola.tile.lflgs, + erts_atomic32_init_nob(&intrnl->misc.data.lflgs, ERTS_THR_PRGR_LFLG_NO_LEADER); - erts_atomic32_init_nob(&intrnl->vola.tile.pref_wakeup_used, 0); - erts_atomic32_init_nob(&intrnl->vola.tile.managed_id, no_schedulers); - erts_atomic32_init_nob(&intrnl->vola.tile.unmanaged_id, -1); + erts_atomic32_init_nob(&intrnl->misc.data.block_count, + (ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING + | (erts_aint32_t) managed)); + erts_atomic_init_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL); + erts_atomic32_init_nob(&intrnl->misc.data.pref_wakeup_used, 0); + erts_atomic32_init_nob(&intrnl->misc.data.managed_count, 0); + erts_atomic32_init_nob(&intrnl->misc.data.managed_id, no_schedulers); + erts_atomic32_init_nob(&intrnl->misc.data.unmanaged_id, -1); intrnl->thr = (ErtsThrPrgrArray *) ptr; ptr += thr_arr_sz; for (i = 0; i < managed; i++) init_nob(&intrnl->thr[i].data.current, 0); - intrnl->managed.callback = (ErtsThrPrgrWakeupCallback *) ptr; - intrnl->unmanaged.callback = &intrnl->managed.callback[managed]; + intrnl->managed.callbacks = (ErtsThrPrgrCallbacks *) ptr; + intrnl->unmanaged.callbacks = &intrnl->managed.callbacks[managed]; ptr += cb_sz; intrnl->managed.no = managed; for (i = 0; i < managed; i++) { - intrnl->managed.callback[i].arg = NULL; - intrnl->managed.callback[i].wakeup = NULL; + intrnl->managed.callbacks[i].arg = NULL; + intrnl->managed.callbacks[i].wakeup = NULL; } intrnl->unmanaged.no = unmanaged; for (i = 0; i < unmanaged; i++) { - intrnl->unmanaged.callback[i].arg = NULL; - intrnl->unmanaged.callback[i].wakeup = NULL; + intrnl->unmanaged.callbacks[i].arg = NULL; + intrnl->unmanaged.callbacks[i].wakeup = NULL; } for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) { @@ -439,21 +527,30 @@ init_wakeup_request_array(ErtsThrPrgrVal *w) } void -erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrWakeupCallback *callback) +erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks *callbacks) { - ErtsThrPrgrData *tpd; - if (erts_tsd_get(erts_thr_prgr_data_key__)) - erl_exit(ERTS_ABORT_EXIT, - "%s:%d:%s(): Double register of thread\n", - __FILE__, __LINE__, __func__); + ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL); + int is_blocking = 0; + + if (tpd) { + if (!tpd->is_temporary) + erl_exit(ERTS_ABORT_EXIT, + "%s:%d:%s(): Double register of thread\n", + __FILE__, __LINE__, __func__); + is_blocking = tpd->is_blocking; + return_tmp_thr_prgr_data(tpd); + } + /* * We only allocate the part up to the leader field * which is the first field only used by managed threads */ tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA, offsetof(ErtsThrPrgrData, leader)); - tpd->id = (int) erts_atomic32_inc_read_nob(&intrnl->vola.tile.unmanaged_id); + tpd->id = (int) erts_atomic32_inc_read_nob(&intrnl->misc.data.unmanaged_id); tpd->is_managed = 0; + tpd->is_blocking = is_blocking; + tpd->is_temporary = 0; ASSERT(tpd->id >= 0); if (tpd->id >= intrnl->unmanaged.no) erl_exit(ERTS_ABORT_EXIT, @@ -463,32 +560,41 @@ erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrWakeupCallback *callback) init_wakeup_request_array(&tpd->wakeup_request[0]); erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd); - intrnl->unmanaged.callback[tpd->id] = *callback; + ASSERT(callbacks->wakeup); + + intrnl->unmanaged.callbacks[tpd->id] = *callbacks; } void erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp, - ErtsThrPrgrWakeupCallback *callback, + ErtsThrPrgrCallbacks *callbacks, int pref_wakeup) { - ErtsThrPrgrData *tpd; - if (erts_tsd_get(erts_thr_prgr_data_key__)) - erl_exit(ERTS_ABORT_EXIT, - "%s:%d:%s(): Double register of thread\n", - __FILE__, __LINE__, __func__); + ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL); + int is_blocking = 0, managed; + + if (tpd) { + if (!tpd->is_temporary) + erl_exit(ERTS_ABORT_EXIT, + "%s:%d:%s(): Double register of thread\n", + __FILE__, __LINE__, __func__); + is_blocking = tpd->is_blocking; + return_tmp_thr_prgr_data(tpd); + } + if (esdp) tpd = &esdp->thr_progress_data; else tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA, sizeof(ErtsThrPrgrData)); if (pref_wakeup - && !erts_atomic32_xchg_nob(&intrnl->vola.tile.pref_wakeup_used, 1)) + && !erts_atomic32_xchg_nob(&intrnl->misc.data.pref_wakeup_used, 1)) tpd->id = 0; else if (esdp) tpd->id = (int) esdp->no; else - tpd->id = erts_atomic32_inc_read_nob(&intrnl->vola.tile.managed_id); + tpd->id = erts_atomic32_inc_read_nob(&intrnl->misc.data.managed_id); ASSERT(tpd->id >= 0); if (tpd->id >= intrnl->managed.no) erl_exit(ERTS_ABORT_EXIT, @@ -496,6 +602,8 @@ erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp, __FILE__, __LINE__, __func__); tpd->is_managed = 1; + tpd->is_blocking = is_blocking; + tpd->is_temporary = 0; init_wakeup_request_array(&tpd->wakeup_request[0]); @@ -507,14 +615,46 @@ erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp, tpd->previous.current = ERTS_THR_PRGR_VAL_WAITING; erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd); - erts_atomic32_inc_nob(&intrnl->vola.tile.lflgs); - intrnl->managed.callback[tpd->id] = *callback; + erts_atomic32_inc_nob(&intrnl->misc.data.lflgs); + + ASSERT(callbacks->wakeup); + ASSERT(callbacks->prepare_wait); + ASSERT(callbacks->wait); + ASSERT(callbacks->finalize_wait); + + intrnl->managed.callbacks[tpd->id] = *callbacks; + + callbacks->prepare_wait(callbacks->arg); + managed = erts_atomic32_inc_read_relb(&intrnl->misc.data.managed_count); + if (managed != intrnl->managed.no) { + /* Wait until all managed threads have registered... */ + do { + callbacks->wait(callbacks->arg); + callbacks->prepare_wait(callbacks->arg); + managed = erts_atomic32_read_acqb(&intrnl->misc.data.managed_count); + } while (managed != intrnl->managed.no); + } + else { + int id; + /* All managed threads have registered; lets go... */ + for (id = 0; id < managed; id++) + if (id != tpd->id) + wakeup_managed(id); + } + callbacks->finalize_wait(callbacks->arg); } static ERTS_INLINE int leader_update(ErtsThrPrgrData *tpd) { - if (tpd->leader) { +#ifdef ERTS_ENABLE_LOCK_CHECK + erts_lc_check_exact(NULL, 0); +#endif + if (!tpd->leader) { + /* Probably need to block... */ + block_thread(tpd); + } + else { erts_aint32_t lflgs; ErtsThrPrgrVal next; int ix, sz, make_progress; @@ -570,16 +710,24 @@ leader_update(ErtsThrPrgrData *tpd) handle_wakeup_requests(current); } - if (!tpd->active) { + if (tpd->active) { + lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); + if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) + (void) block_thread(tpd); + } + else { tpd->leader = 0; tpd->previous.current = ERTS_THR_PRGR_VAL_WAITING; #if ERTS_THR_PRGR_PRINT_LEADER erts_fprintf(stderr, "L <- %d\n", tpd->id); #endif + ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 0); - lflgs = erts_atomic32_read_bor_relb(&intrnl->vola.tile.lflgs, + lflgs = erts_atomic32_read_bor_relb(&intrnl->misc.data.lflgs, ERTS_THR_PRGR_LFLG_NO_LEADER); + if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) + lflgs = block_thread(tpd); if (ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0 && got_sched_wakeups()) wakeup_managed(0); } @@ -591,10 +739,14 @@ leader_update(ErtsThrPrgrData *tpd) static int update(ErtsThrPrgrData *tpd) { + int res; ErtsThrPrgrVal val; - if (!tpd->leader) { + if (tpd->leader) + res = 1; + else { erts_aint32_t lflgs; + res = 0; val = read_acqb(&erts_thr_prgr__.current); if (tpd->previous.local == val) { val++; @@ -604,13 +756,16 @@ update(ErtsThrPrgrData *tpd) set_mb(&intrnl->thr[tpd->id].data.current, val); } - lflgs = erts_atomic32_read_nob(&intrnl->vola.tile.lflgs); + lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); + if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) + res = 1; /* Need to block in leader_update() */ + if ((lflgs & ERTS_THR_PRGR_LFLG_NO_LEADER) && (tpd->active || ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0)) { /* Try to take over leadership... */ erts_aint32_t olflgs; olflgs = erts_atomic32_read_band_acqb( - &intrnl->vola.tile.lflgs, + &intrnl->misc.data.lflgs, ~ERTS_THR_PRGR_LFLG_NO_LEADER); if (olflgs & ERTS_THR_PRGR_LFLG_NO_LEADER) { tpd->leader = 1; @@ -620,8 +775,9 @@ update(ErtsThrPrgrData *tpd) ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 1); } } + res |= tpd->leader; } - return tpd->leader; + return res; } int @@ -643,10 +799,16 @@ erts_thr_progress_prepare_wait(ErtsSchedulerData *esdp) erts_aint32_t lflgs; ErtsThrPrgrData *tpd = thr_prgr_data(esdp); +#ifdef ERTS_ENABLE_LOCK_CHECK + erts_lc_check_exact(NULL, 0); +#endif + + block_count_dec(); + tpd->previous.local = ERTS_THR_PRGR_VAL_WAITING; set_mb(&intrnl->thr[tpd->id].data.current, ERTS_THR_PRGR_VAL_WAITING); - lflgs = erts_atomic32_read_nob(&intrnl->vola.tile.lflgs); + lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); if (ERTS_THR_PRGR_LFLGS_ALL_WAITING(lflgs) && got_sched_wakeups()) wakeup_managed(0); /* Someone need to make progress */ } @@ -657,6 +819,10 @@ erts_thr_progress_finalize_wait(ErtsSchedulerData *esdp) ErtsThrPrgrData *tpd = thr_prgr_data(esdp); ErtsThrPrgrVal current, val; +#ifdef ERTS_ENABLE_LOCK_CHECK + erts_lc_check_exact(NULL, 0); +#endif + /* * We aren't allowed to continue until our thread * progress is past global current. @@ -673,6 +839,8 @@ erts_thr_progress_finalize_wait(ErtsSchedulerData *esdp) break; current = val; } + if (block_count_inc()) + block_thread(tpd); if (update(tpd)) leader_update(tpd); } @@ -682,25 +850,28 @@ erts_thr_progress_active(ErtsSchedulerData *esdp, int on) { ErtsThrPrgrData *tpd = thr_prgr_data(esdp); +#ifdef ERTS_ENABLE_LOCK_CHECK + erts_lc_check_exact(NULL, 0); +#endif + ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(tpd->id, on); if (on) { ASSERT(!tpd->active); tpd->active = 1; - erts_atomic32_inc_nob(&intrnl->vola.tile.lflgs); + erts_atomic32_inc_nob(&intrnl->misc.data.lflgs); } else { ASSERT(tpd->active); tpd->active = 0; - erts_atomic32_dec_nob(&intrnl->vola.tile.lflgs); - + erts_atomic32_dec_nob(&intrnl->misc.data.lflgs); if (update(tpd)) leader_update(tpd); } #ifdef DEBUG { - erts_aint32_t n = erts_atomic32_read_nob(&intrnl->vola.tile.lflgs); + erts_aint32_t n = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); n &= ERTS_THR_PRGR_LFLG_ACTIVE_MASK; ASSERT(tpd->active <= n && n <= intrnl->managed.no); } @@ -868,7 +1039,7 @@ erts_thr_progress_wakeup(ErtsSchedulerData *esdp, ErtsThrPrgrVal value) { ErtsThrPrgrData *tpd = thr_prgr_data(esdp); - ASSERT(tpd); + ASSERT(!tpd->is_temporary); if (tpd->is_managed) request_wakeup_managed(tpd, value); else @@ -957,6 +1128,198 @@ got_sched_wakeups(void) return 0; } +static erts_aint32_t +block_thread(ErtsThrPrgrData *tpd) +{ + erts_aint32_t lflgs; + ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[tpd->id]; + + do { + block_count_dec(); + + while (1) { + cbp->prepare_wait(cbp->arg); + lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); + if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) + cbp->wait(cbp->arg); + else + break; + } + + } while (block_count_inc()); + + cbp->finalize_wait(cbp->arg); + + return lflgs; +} + +static erts_aint32_t +thr_progress_block(ErtsThrPrgrData *tpd, int wait) +{ + erts_tse_t *event = NULL; /* Remove erroneous warning... sigh... */ + erts_aint32_t lflgs, bc; + + if (tpd->is_blocking++) + return (erts_aint32_t) 0; + + while (1) { + lflgs = erts_atomic32_read_bor_nob(&intrnl->misc.data.lflgs, + ERTS_THR_PRGR_LFLG_BLOCK); + if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) + block_thread(tpd); + else + break; + } + +#if ERTS_THR_PRGR_PRINT_BLOCKERS + erts_fprintf(stderr, "block(%d)\n", tpd->id); +#endif + + ASSERT(ERTS_AINT_NULL + == erts_atomic_read_nob(&intrnl->misc.data.blocker_event)); + + if (wait) { + event = erts_tse_fetch(); + erts_tse_reset(event); + erts_atomic_set_nob(&intrnl->misc.data.blocker_event, + (erts_aint_t) event); + } + if (tpd->is_managed) + erts_atomic32_dec_nob(&intrnl->misc.data.block_count); + bc = erts_atomic32_read_band_mb(&intrnl->misc.data.block_count, + ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING); + bc &= ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING; + if (wait) { + while (bc != 0) { + erts_tse_wait(event); + erts_tse_reset(event); + bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count); + } + } + return bc; + +} + +void +erts_thr_progress_block(void) +{ + thr_progress_block(tmp_thr_prgr_data(NULL), 1); +} + +void +erts_thr_progress_fatal_error_block(SWord timeout) +{ + ErtsThrPrgrData tpd_buf; + ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL); + erts_aint32_t bc; + SWord time_left = timeout; + SysTimeval to; + + /* + * Counting poll intervals may give us a too long timeout + * if cpu is busy. If we got tolerant time of day we use it + * to prevent this. + */ + if (!erts_disable_tolerant_timeofday) { + erts_get_timeval(&to); + to.tv_sec += timeout / 1000; + to.tv_sec += timeout % 1000; + } + + if (!tpd) { + /* + * We stack allocate since failure to allocate memory may + * have caused the problem in the first place. This is ok + * since we never complete an unblock after a fatal error + * block. + */ + tpd = &tpd_buf; + init_tmp_thr_prgr_data(tpd); + } + + bc = thr_progress_block(tpd, 0); + if (bc == 0) + return; /* Succefully blocked all managed threads */ + + while (1) { + if (erts_milli_sleep(ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL) == 0) + time_left -= ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL; + bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count); + if (bc == 0) + break; /* Succefully blocked all managed threads */ + if (time_left <= 0) + break; /* Timeout */ + if (!erts_disable_tolerant_timeofday) { + SysTimeval now; + erts_get_timeval(&now); + if (now.tv_sec > to.tv_sec) + break; /* Timeout */ + if (now.tv_sec == to.tv_sec && now.tv_usec >= to.tv_usec) + break; /* Timeout */ + } + } +} + +void +erts_thr_progress_unblock(void) +{ + erts_tse_t *event; + int id, break_id, sz, wakeup; + ErtsThrPrgrData *tpd = thr_prgr_data(NULL); + + ASSERT(tpd->is_blocking); + if (--tpd->is_blocking) + return; + + sz = intrnl->managed.no; + + wakeup = 1; + if (!tpd->is_managed) + id = break_id = tpd->id < 0 ? 0 : tpd->id % sz; + else { + break_id = tpd->id; + id = break_id + 1; + if (id >= sz) + id = 0; + if (id == break_id) + wakeup = 0; + erts_atomic32_inc_nob(&intrnl->misc.data.block_count); + } + + event = ((erts_tse_t *) + erts_atomic_read_nob(&intrnl->misc.data.blocker_event)); + ASSERT(event); + erts_atomic_set_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL); + + erts_atomic32_read_bor_relb(&intrnl->misc.data.block_count, + ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING); +#if ERTS_THR_PRGR_PRINT_BLOCKERS + erts_fprintf(stderr, "unblock(%d)\n", tpd->id); +#endif + erts_atomic32_read_band_mb(&intrnl->misc.data.lflgs, + ~ERTS_THR_PRGR_LFLG_BLOCK); + + if (wakeup) { + do { + ErtsThrPrgrVal tmp; + tmp = read_nob(&intrnl->thr[id].data.current); + if (tmp != ERTS_THR_PRGR_VAL_WAITING) + wakeup_managed(id); + if (++id >= sz) + id = 0; + } while (id != break_id); + } + + return_tmp_thr_prgr_data(tpd); + erts_tse_return(event); +} + +int +erts_thr_progress_is_blocking(void) +{ + ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL); + return tpd && tpd->is_blocking; +} void erts_thr_progress_dbg_print_state(void) { |