diff options
author | Rickard Green <[email protected]> | 2011-11-13 21:42:11 +0100 |
---|---|---|
committer | Rickard Green <[email protected]> | 2011-11-13 21:42:11 +0100 |
commit | 73ee2e00fe0389d0362e89a74d1909510da9e0fd (patch) | |
tree | b75c2a8b4b77885c6864ffd4f2d4825cb781c103 /erts/emulator/beam/erl_thr_progress.c | |
parent | c12befbdc957f7f166598c6d5143ce27a0d10fa8 (diff) | |
parent | bc5818cfdd56e19a16357f4443d80a56426aa134 (diff) | |
download | otp-73ee2e00fe0389d0362e89a74d1909510da9e0fd.tar.gz otp-73ee2e00fe0389d0362e89a74d1909510da9e0fd.tar.bz2 otp-73ee2e00fe0389d0362e89a74d1909510da9e0fd.zip |
Merge branch 'rickard/thr-progress-block/OTP-9631'
* rickard/thr-progress-block/OTP-9631:
Replace system block with thread progress block
Diffstat (limited to 'erts/emulator/beam/erl_thr_progress.c')
-rw-r--r-- | erts/emulator/beam/erl_thr_progress.c | 481 |
1 files changed, 422 insertions, 59 deletions
diff --git a/erts/emulator/beam/erl_thr_progress.c b/erts/emulator/beam/erl_thr_progress.c index f96ae4b70d..9324bcde51 100644 --- a/erts/emulator/beam/erl_thr_progress.c +++ b/erts/emulator/beam/erl_thr_progress.c @@ -76,6 +76,7 @@ #include <stddef.h> /* offsetof() */ #include "erl_thr_progress.h" +#include "global.h" #ifdef ERTS_SMP @@ -88,9 +89,14 @@ #define ERTS_THR_PRGR_PRINT_LEADER 0 #define ERTS_THR_PRGR_PRINT_VAL 0 +#define ERTS_THR_PRGR_PRINT_BLOCKERS 0 -#define ERTS_THR_PRGR_LFLG_NO_LEADER (((erts_aint32_t) 1) << 31) -#define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~ERTS_THR_PRGR_LFLG_NO_LEADER) +#define ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL 100 + +#define ERTS_THR_PRGR_LFLG_BLOCK (((erts_aint32_t) 1) << 31) +#define ERTS_THR_PRGR_LFLG_NO_LEADER (((erts_aint32_t) 1) << 30) +#define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~(ERTS_THR_PRGR_LFLG_NO_LEADER \ + | ERTS_THR_PRGR_LFLG_BLOCK)) #define ERTS_THR_PRGR_LFLGS_ACTIVE(LFLGS) \ ((LFLGS) & ERTS_THR_PRGR_LFLG_ACTIVE_MASK) @@ -225,6 +231,11 @@ do { \ #endif /* ERTS_THR_PROGRESS_STATE_DEBUG */ +#define ERTS_THR_PRGR_BLCKR_INVALID (~((erts_aint32_t) 0)) +#define ERTS_THR_PRGR_BLCKR_UNMANAGED (((erts_aint32_t) 1) << 31) + +#define ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING (((erts_aint32_t) 1) << 31) + #define ERTS_THR_PRGR_BM_BITS 32 #define ERTS_THR_PRGR_BM_SHIFT 5 #define ERTS_THR_PRGR_BM_MASK 0x1f @@ -249,11 +260,13 @@ typedef struct { typedef struct { erts_atomic32_t lflgs; - + erts_atomic32_t block_count; + erts_atomic_t blocker_event; erts_atomic32_t pref_wakeup_used; + erts_atomic32_t managed_count; erts_atomic32_t managed_id; erts_atomic32_t unmanaged_id; -} ErtsThrPrgrMiscVolatile; +} ErtsThrPrgrMiscData; typedef struct { ERTS_THR_PRGR_ATOMIC current; @@ -269,19 +282,19 @@ typedef union { typedef struct { union { - ErtsThrPrgrMiscVolatile tile; + ErtsThrPrgrMiscData data; char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE( - sizeof(ErtsThrPrgrMiscVolatile))]; - } vola; + sizeof(ErtsThrPrgrMiscData))]; + } misc; ErtsThrPrgrArray *thr; struct { int no; - ErtsThrPrgrWakeupCallback *callback; + ErtsThrPrgrCallbacks *callbacks; ErtsThrPrgrManagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE]; } managed; struct { int no; - ErtsThrPrgrWakeupCallback *callback; + ErtsThrPrgrCallbacks *callbacks; ErtsThrPrgrUnmanagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE]; } unmanaged; } ErtsThrPrgrInternalData; @@ -294,36 +307,106 @@ erts_tsd_key_t erts_thr_prgr_data_key__; static void handle_wakeup_requests(ErtsThrPrgrVal current); static int got_sched_wakeups(void); +static erts_aint32_t block_thread(ErtsThrPrgrData *tpd); static ERTS_INLINE void wakeup_managed(int id) { - ErtsThrPrgrWakeupCallback *wdp = &intrnl->managed.callback[id]; + ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[id]; ASSERT(0 <= id && id < intrnl->managed.no); - wdp->wakeup(wdp->arg); + cbp->wakeup(cbp->arg); } static ERTS_INLINE void wakeup_unmanaged(int id) { - ErtsThrPrgrWakeupCallback *wdp = &intrnl->unmanaged.callback[id]; + ErtsThrPrgrCallbacks *cbp = &intrnl->unmanaged.callbacks[id]; ASSERT(0 <= id && id < intrnl->unmanaged.no); - wdp->wakeup(wdp->arg); + cbp->wakeup(cbp->arg); } static ERTS_INLINE ErtsThrPrgrData * -thr_prgr_data(ErtsSchedulerData *esdp) +perhaps_thr_prgr_data(ErtsSchedulerData *esdp) { - ErtsThrPrgrData *tpd; if (esdp) - tpd = &esdp->thr_progress_data; + return &esdp->thr_progress_data; else - tpd = erts_tsd_get(erts_thr_prgr_data_key__); + return erts_tsd_get(erts_thr_prgr_data_key__); +} + +static ERTS_INLINE ErtsThrPrgrData * +thr_prgr_data(ErtsSchedulerData *esdp) +{ + ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp); ASSERT(tpd); return tpd; } +static void +init_tmp_thr_prgr_data(ErtsThrPrgrData *tpd) +{ + tpd->id = -1; + tpd->is_managed = 0; + tpd->is_blocking = 0; + tpd->is_temporary = 1; + + erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd); +} + +static ERTS_INLINE ErtsThrPrgrData * +tmp_thr_prgr_data(ErtsSchedulerData *esdp) +{ + ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp); + + if (!tpd) { + /* + * We only allocate the part up to the wakeup_request field + * which is the first field only used by registered threads + */ + tpd = erts_alloc(ERTS_ALC_T_T_THR_PRGR_DATA, + offsetof(ErtsThrPrgrData, wakeup_request)); + init_tmp_thr_prgr_data(tpd); + } + + return tpd; +} + +static ERTS_INLINE void +return_tmp_thr_prgr_data(ErtsThrPrgrData *tpd) +{ + if (tpd->is_temporary) { + erts_tsd_set(erts_thr_prgr_data_key__, NULL); + erts_free(ERTS_ALC_T_T_THR_PRGR_DATA, tpd); + } +} + +static ERTS_INLINE int +block_count_dec(void) +{ + erts_aint32_t block_count; + block_count = erts_atomic32_dec_read_mb(&intrnl->misc.data.block_count); + if (block_count == 0) { + erts_tse_t *event; + event = ((erts_tse_t*) + erts_atomic_read_nob(&intrnl->misc.data.blocker_event)); + if (event) + erts_tse_set(event); + return 1; + } + + return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0; +} + +static ERTS_INLINE int +block_count_inc(void) +{ + erts_aint32_t block_count; + block_count = erts_atomic32_inc_read_mb(&intrnl->misc.data.block_count); + return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0; +} + + void erts_thr_progress_pre_init(void) { @@ -343,7 +426,7 @@ erts_thr_progress_init(int no_schedulers, int managed, int unmanaged) intrnl_sz = sizeof(ErtsThrPrgrInternalData); intrnl_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(intrnl_sz); - cb_sz = sizeof(ErtsThrPrgrWakeupCallback)*(managed+unmanaged); + cb_sz = sizeof(ErtsThrPrgrCallbacks)*(managed+unmanaged); cb_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(cb_sz); thr_arr_sz = sizeof(ErtsThrPrgrArray)*managed; @@ -372,31 +455,36 @@ erts_thr_progress_init(int no_schedulers, int managed, int unmanaged) intrnl = (ErtsThrPrgrInternalData *) ptr; ptr += intrnl_sz; - erts_atomic32_init_nob(&intrnl->vola.tile.lflgs, + erts_atomic32_init_nob(&intrnl->misc.data.lflgs, ERTS_THR_PRGR_LFLG_NO_LEADER); - erts_atomic32_init_nob(&intrnl->vola.tile.pref_wakeup_used, 0); - erts_atomic32_init_nob(&intrnl->vola.tile.managed_id, no_schedulers); - erts_atomic32_init_nob(&intrnl->vola.tile.unmanaged_id, -1); + erts_atomic32_init_nob(&intrnl->misc.data.block_count, + (ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING + | (erts_aint32_t) managed)); + erts_atomic_init_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL); + erts_atomic32_init_nob(&intrnl->misc.data.pref_wakeup_used, 0); + erts_atomic32_init_nob(&intrnl->misc.data.managed_count, 0); + erts_atomic32_init_nob(&intrnl->misc.data.managed_id, no_schedulers); + erts_atomic32_init_nob(&intrnl->misc.data.unmanaged_id, -1); intrnl->thr = (ErtsThrPrgrArray *) ptr; ptr += thr_arr_sz; for (i = 0; i < managed; i++) init_nob(&intrnl->thr[i].data.current, 0); - intrnl->managed.callback = (ErtsThrPrgrWakeupCallback *) ptr; - intrnl->unmanaged.callback = &intrnl->managed.callback[managed]; + intrnl->managed.callbacks = (ErtsThrPrgrCallbacks *) ptr; + intrnl->unmanaged.callbacks = &intrnl->managed.callbacks[managed]; ptr += cb_sz; intrnl->managed.no = managed; for (i = 0; i < managed; i++) { - intrnl->managed.callback[i].arg = NULL; - intrnl->managed.callback[i].wakeup = NULL; + intrnl->managed.callbacks[i].arg = NULL; + intrnl->managed.callbacks[i].wakeup = NULL; } intrnl->unmanaged.no = unmanaged; for (i = 0; i < unmanaged; i++) { - intrnl->unmanaged.callback[i].arg = NULL; - intrnl->unmanaged.callback[i].wakeup = NULL; + intrnl->unmanaged.callbacks[i].arg = NULL; + intrnl->unmanaged.callbacks[i].wakeup = NULL; } for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) { @@ -439,21 +527,30 @@ init_wakeup_request_array(ErtsThrPrgrVal *w) } void -erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrWakeupCallback *callback) +erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks *callbacks) { - ErtsThrPrgrData *tpd; - if (erts_tsd_get(erts_thr_prgr_data_key__)) - erl_exit(ERTS_ABORT_EXIT, - "%s:%d:%s(): Double register of thread\n", - __FILE__, __LINE__, __func__); + ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL); + int is_blocking = 0; + + if (tpd) { + if (!tpd->is_temporary) + erl_exit(ERTS_ABORT_EXIT, + "%s:%d:%s(): Double register of thread\n", + __FILE__, __LINE__, __func__); + is_blocking = tpd->is_blocking; + return_tmp_thr_prgr_data(tpd); + } + /* * We only allocate the part up to the leader field * which is the first field only used by managed threads */ tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA, offsetof(ErtsThrPrgrData, leader)); - tpd->id = (int) erts_atomic32_inc_read_nob(&intrnl->vola.tile.unmanaged_id); + tpd->id = (int) erts_atomic32_inc_read_nob(&intrnl->misc.data.unmanaged_id); tpd->is_managed = 0; + tpd->is_blocking = is_blocking; + tpd->is_temporary = 0; ASSERT(tpd->id >= 0); if (tpd->id >= intrnl->unmanaged.no) erl_exit(ERTS_ABORT_EXIT, @@ -463,32 +560,41 @@ erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrWakeupCallback *callback) init_wakeup_request_array(&tpd->wakeup_request[0]); erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd); - intrnl->unmanaged.callback[tpd->id] = *callback; + ASSERT(callbacks->wakeup); + + intrnl->unmanaged.callbacks[tpd->id] = *callbacks; } void erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp, - ErtsThrPrgrWakeupCallback *callback, + ErtsThrPrgrCallbacks *callbacks, int pref_wakeup) { - ErtsThrPrgrData *tpd; - if (erts_tsd_get(erts_thr_prgr_data_key__)) - erl_exit(ERTS_ABORT_EXIT, - "%s:%d:%s(): Double register of thread\n", - __FILE__, __LINE__, __func__); + ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL); + int is_blocking = 0, managed; + + if (tpd) { + if (!tpd->is_temporary) + erl_exit(ERTS_ABORT_EXIT, + "%s:%d:%s(): Double register of thread\n", + __FILE__, __LINE__, __func__); + is_blocking = tpd->is_blocking; + return_tmp_thr_prgr_data(tpd); + } + if (esdp) tpd = &esdp->thr_progress_data; else tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA, sizeof(ErtsThrPrgrData)); if (pref_wakeup - && !erts_atomic32_xchg_nob(&intrnl->vola.tile.pref_wakeup_used, 1)) + && !erts_atomic32_xchg_nob(&intrnl->misc.data.pref_wakeup_used, 1)) tpd->id = 0; else if (esdp) tpd->id = (int) esdp->no; else - tpd->id = erts_atomic32_inc_read_nob(&intrnl->vola.tile.managed_id); + tpd->id = erts_atomic32_inc_read_nob(&intrnl->misc.data.managed_id); ASSERT(tpd->id >= 0); if (tpd->id >= intrnl->managed.no) erl_exit(ERTS_ABORT_EXIT, @@ -496,6 +602,8 @@ erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp, __FILE__, __LINE__, __func__); tpd->is_managed = 1; + tpd->is_blocking = is_blocking; + tpd->is_temporary = 0; init_wakeup_request_array(&tpd->wakeup_request[0]); @@ -507,14 +615,46 @@ erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp, tpd->previous.current = ERTS_THR_PRGR_VAL_WAITING; erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd); - erts_atomic32_inc_nob(&intrnl->vola.tile.lflgs); - intrnl->managed.callback[tpd->id] = *callback; + erts_atomic32_inc_nob(&intrnl->misc.data.lflgs); + + ASSERT(callbacks->wakeup); + ASSERT(callbacks->prepare_wait); + ASSERT(callbacks->wait); + ASSERT(callbacks->finalize_wait); + + intrnl->managed.callbacks[tpd->id] = *callbacks; + + callbacks->prepare_wait(callbacks->arg); + managed = erts_atomic32_inc_read_relb(&intrnl->misc.data.managed_count); + if (managed != intrnl->managed.no) { + /* Wait until all managed threads have registered... */ + do { + callbacks->wait(callbacks->arg); + callbacks->prepare_wait(callbacks->arg); + managed = erts_atomic32_read_acqb(&intrnl->misc.data.managed_count); + } while (managed != intrnl->managed.no); + } + else { + int id; + /* All managed threads have registered; lets go... */ + for (id = 0; id < managed; id++) + if (id != tpd->id) + wakeup_managed(id); + } + callbacks->finalize_wait(callbacks->arg); } static ERTS_INLINE int leader_update(ErtsThrPrgrData *tpd) { - if (tpd->leader) { +#ifdef ERTS_ENABLE_LOCK_CHECK + erts_lc_check_exact(NULL, 0); +#endif + if (!tpd->leader) { + /* Probably need to block... */ + block_thread(tpd); + } + else { erts_aint32_t lflgs; ErtsThrPrgrVal next; int ix, sz, make_progress; @@ -570,16 +710,24 @@ leader_update(ErtsThrPrgrData *tpd) handle_wakeup_requests(current); } - if (!tpd->active) { + if (tpd->active) { + lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); + if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) + (void) block_thread(tpd); + } + else { tpd->leader = 0; tpd->previous.current = ERTS_THR_PRGR_VAL_WAITING; #if ERTS_THR_PRGR_PRINT_LEADER erts_fprintf(stderr, "L <- %d\n", tpd->id); #endif + ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 0); - lflgs = erts_atomic32_read_bor_relb(&intrnl->vola.tile.lflgs, + lflgs = erts_atomic32_read_bor_relb(&intrnl->misc.data.lflgs, ERTS_THR_PRGR_LFLG_NO_LEADER); + if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) + lflgs = block_thread(tpd); if (ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0 && got_sched_wakeups()) wakeup_managed(0); } @@ -591,10 +739,14 @@ leader_update(ErtsThrPrgrData *tpd) static int update(ErtsThrPrgrData *tpd) { + int res; ErtsThrPrgrVal val; - if (!tpd->leader) { + if (tpd->leader) + res = 1; + else { erts_aint32_t lflgs; + res = 0; val = read_acqb(&erts_thr_prgr__.current); if (tpd->previous.local == val) { val++; @@ -604,13 +756,16 @@ update(ErtsThrPrgrData *tpd) set_mb(&intrnl->thr[tpd->id].data.current, val); } - lflgs = erts_atomic32_read_nob(&intrnl->vola.tile.lflgs); + lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); + if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) + res = 1; /* Need to block in leader_update() */ + if ((lflgs & ERTS_THR_PRGR_LFLG_NO_LEADER) && (tpd->active || ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0)) { /* Try to take over leadership... */ erts_aint32_t olflgs; olflgs = erts_atomic32_read_band_acqb( - &intrnl->vola.tile.lflgs, + &intrnl->misc.data.lflgs, ~ERTS_THR_PRGR_LFLG_NO_LEADER); if (olflgs & ERTS_THR_PRGR_LFLG_NO_LEADER) { tpd->leader = 1; @@ -620,8 +775,9 @@ update(ErtsThrPrgrData *tpd) ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 1); } } + res |= tpd->leader; } - return tpd->leader; + return res; } int @@ -643,10 +799,16 @@ erts_thr_progress_prepare_wait(ErtsSchedulerData *esdp) erts_aint32_t lflgs; ErtsThrPrgrData *tpd = thr_prgr_data(esdp); +#ifdef ERTS_ENABLE_LOCK_CHECK + erts_lc_check_exact(NULL, 0); +#endif + + block_count_dec(); + tpd->previous.local = ERTS_THR_PRGR_VAL_WAITING; set_mb(&intrnl->thr[tpd->id].data.current, ERTS_THR_PRGR_VAL_WAITING); - lflgs = erts_atomic32_read_nob(&intrnl->vola.tile.lflgs); + lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); if (ERTS_THR_PRGR_LFLGS_ALL_WAITING(lflgs) && got_sched_wakeups()) wakeup_managed(0); /* Someone need to make progress */ } @@ -657,6 +819,10 @@ erts_thr_progress_finalize_wait(ErtsSchedulerData *esdp) ErtsThrPrgrData *tpd = thr_prgr_data(esdp); ErtsThrPrgrVal current, val; +#ifdef ERTS_ENABLE_LOCK_CHECK + erts_lc_check_exact(NULL, 0); +#endif + /* * We aren't allowed to continue until our thread * progress is past global current. @@ -673,6 +839,8 @@ erts_thr_progress_finalize_wait(ErtsSchedulerData *esdp) break; current = val; } + if (block_count_inc()) + block_thread(tpd); if (update(tpd)) leader_update(tpd); } @@ -682,25 +850,28 @@ erts_thr_progress_active(ErtsSchedulerData *esdp, int on) { ErtsThrPrgrData *tpd = thr_prgr_data(esdp); +#ifdef ERTS_ENABLE_LOCK_CHECK + erts_lc_check_exact(NULL, 0); +#endif + ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(tpd->id, on); if (on) { ASSERT(!tpd->active); tpd->active = 1; - erts_atomic32_inc_nob(&intrnl->vola.tile.lflgs); + erts_atomic32_inc_nob(&intrnl->misc.data.lflgs); } else { ASSERT(tpd->active); tpd->active = 0; - erts_atomic32_dec_nob(&intrnl->vola.tile.lflgs); - + erts_atomic32_dec_nob(&intrnl->misc.data.lflgs); if (update(tpd)) leader_update(tpd); } #ifdef DEBUG { - erts_aint32_t n = erts_atomic32_read_nob(&intrnl->vola.tile.lflgs); + erts_aint32_t n = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); n &= ERTS_THR_PRGR_LFLG_ACTIVE_MASK; ASSERT(tpd->active <= n && n <= intrnl->managed.no); } @@ -868,7 +1039,7 @@ erts_thr_progress_wakeup(ErtsSchedulerData *esdp, ErtsThrPrgrVal value) { ErtsThrPrgrData *tpd = thr_prgr_data(esdp); - ASSERT(tpd); + ASSERT(!tpd->is_temporary); if (tpd->is_managed) request_wakeup_managed(tpd, value); else @@ -957,6 +1128,198 @@ got_sched_wakeups(void) return 0; } +static erts_aint32_t +block_thread(ErtsThrPrgrData *tpd) +{ + erts_aint32_t lflgs; + ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[tpd->id]; + + do { + block_count_dec(); + + while (1) { + cbp->prepare_wait(cbp->arg); + lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); + if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) + cbp->wait(cbp->arg); + else + break; + } + + } while (block_count_inc()); + + cbp->finalize_wait(cbp->arg); + + return lflgs; +} + +static erts_aint32_t +thr_progress_block(ErtsThrPrgrData *tpd, int wait) +{ + erts_tse_t *event = NULL; /* Remove erroneous warning... sigh... */ + erts_aint32_t lflgs, bc; + + if (tpd->is_blocking++) + return (erts_aint32_t) 0; + + while (1) { + lflgs = erts_atomic32_read_bor_nob(&intrnl->misc.data.lflgs, + ERTS_THR_PRGR_LFLG_BLOCK); + if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) + block_thread(tpd); + else + break; + } + +#if ERTS_THR_PRGR_PRINT_BLOCKERS + erts_fprintf(stderr, "block(%d)\n", tpd->id); +#endif + + ASSERT(ERTS_AINT_NULL + == erts_atomic_read_nob(&intrnl->misc.data.blocker_event)); + + if (wait) { + event = erts_tse_fetch(); + erts_tse_reset(event); + erts_atomic_set_nob(&intrnl->misc.data.blocker_event, + (erts_aint_t) event); + } + if (tpd->is_managed) + erts_atomic32_dec_nob(&intrnl->misc.data.block_count); + bc = erts_atomic32_read_band_mb(&intrnl->misc.data.block_count, + ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING); + bc &= ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING; + if (wait) { + while (bc != 0) { + erts_tse_wait(event); + erts_tse_reset(event); + bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count); + } + } + return bc; + +} + +void +erts_thr_progress_block(void) +{ + thr_progress_block(tmp_thr_prgr_data(NULL), 1); +} + +void +erts_thr_progress_fatal_error_block(SWord timeout) +{ + ErtsThrPrgrData tpd_buf; + ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL); + erts_aint32_t bc; + SWord time_left = timeout; + SysTimeval to; + + /* + * Counting poll intervals may give us a too long timeout + * if cpu is busy. If we got tolerant time of day we use it + * to prevent this. + */ + if (!erts_disable_tolerant_timeofday) { + erts_get_timeval(&to); + to.tv_sec += timeout / 1000; + to.tv_sec += timeout % 1000; + } + + if (!tpd) { + /* + * We stack allocate since failure to allocate memory may + * have caused the problem in the first place. This is ok + * since we never complete an unblock after a fatal error + * block. + */ + tpd = &tpd_buf; + init_tmp_thr_prgr_data(tpd); + } + + bc = thr_progress_block(tpd, 0); + if (bc == 0) + return; /* Succefully blocked all managed threads */ + + while (1) { + if (erts_milli_sleep(ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL) == 0) + time_left -= ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL; + bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count); + if (bc == 0) + break; /* Succefully blocked all managed threads */ + if (time_left <= 0) + break; /* Timeout */ + if (!erts_disable_tolerant_timeofday) { + SysTimeval now; + erts_get_timeval(&now); + if (now.tv_sec > to.tv_sec) + break; /* Timeout */ + if (now.tv_sec == to.tv_sec && now.tv_usec >= to.tv_usec) + break; /* Timeout */ + } + } +} + +void +erts_thr_progress_unblock(void) +{ + erts_tse_t *event; + int id, break_id, sz, wakeup; + ErtsThrPrgrData *tpd = thr_prgr_data(NULL); + + ASSERT(tpd->is_blocking); + if (--tpd->is_blocking) + return; + + sz = intrnl->managed.no; + + wakeup = 1; + if (!tpd->is_managed) + id = break_id = tpd->id < 0 ? 0 : tpd->id % sz; + else { + break_id = tpd->id; + id = break_id + 1; + if (id >= sz) + id = 0; + if (id == break_id) + wakeup = 0; + erts_atomic32_inc_nob(&intrnl->misc.data.block_count); + } + + event = ((erts_tse_t *) + erts_atomic_read_nob(&intrnl->misc.data.blocker_event)); + ASSERT(event); + erts_atomic_set_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL); + + erts_atomic32_read_bor_relb(&intrnl->misc.data.block_count, + ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING); +#if ERTS_THR_PRGR_PRINT_BLOCKERS + erts_fprintf(stderr, "unblock(%d)\n", tpd->id); +#endif + erts_atomic32_read_band_mb(&intrnl->misc.data.lflgs, + ~ERTS_THR_PRGR_LFLG_BLOCK); + + if (wakeup) { + do { + ErtsThrPrgrVal tmp; + tmp = read_nob(&intrnl->thr[id].data.current); + if (tmp != ERTS_THR_PRGR_VAL_WAITING) + wakeup_managed(id); + if (++id >= sz) + id = 0; + } while (id != break_id); + } + + return_tmp_thr_prgr_data(tpd); + erts_tse_return(event); +} + +int +erts_thr_progress_is_blocking(void) +{ + ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL); + return tpd && tpd->is_blocking; +} void erts_thr_progress_dbg_print_state(void) { |