/* * %CopyrightBegin% * * Copyright Ericsson AB 2011-2012. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved online at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * %CopyrightEnd% */ /* * Description: Thread progress information. Used by lock free algorithms * to determine when all involved threads are guaranteed to * have passed a specific point of execution. * * Usage instructions below. * * Author: Rickard Green */ /* * ------ Usage instructions ----------------------------------------------- * * This module keeps track of the progress of a set of managed threads. Only * threads that behave well can be allowed to be managed. A managed thread * should update its thread progress frequently. Currently only scheduler * threads, the system-message-dispatcher threads, and the aux-thread are * managed threads. We typically do not want any async threads as managed * threads since they cannot guarantee a frequent update of thread progress, * since they execute user implemented driver code that is assumed to be * time consuming. * * erts_thr_progress_current() returns the global current thread progress * value of managed threads. I.e., the latest progress value that all * managed threads have reached. Thread progress values are opaque. * * erts_thr_progress_has_reached(VAL) returns a value != 0 if current * global thread progress has reached or passed VAL. * * erts_thr_progress_later() returns a thread progress value in the future * which no managed thread have yet reached. * * All threads issue a full memory barrier when reaching a new thread * progress value. They only reach new thread progress values in specific * controlled states when calling erts_thr_progress_update(). Schedulers * call erts_thr_progress_update() in between execution of processes, * when going to sleep and when waking up. * * Sleeping managed threads are considered to have reached next thread * progress value immediately. They are not woken and do therefore not * issue any memory barriers when reaching a new thread progress value. * A sleeping thread do however immediately issue a memory barrier upon * wakeup. * * Both managed and registered unmanaged threads may request wakeup when * the global thread progress reach a certain value using * erts_thr_progress_wakeup(). * * Note that thread progress values are opaque, and that you are only * allowed to use thread progress values retrieved from this API! * * ------------------------------------------------------------------------- */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include <stddef.h> /* offsetof() */ #include "erl_thr_progress.h" #include "global.h" #ifdef ERTS_SMP #define ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE 0 #ifdef DEBUG #undef ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE #define ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE 1 #endif #define ERTS_THR_PRGR_PRINT_LEADER 0 #define ERTS_THR_PRGR_PRINT_VAL 0 #define ERTS_THR_PRGR_PRINT_BLOCKERS 0 #define ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL 100 #define ERTS_THR_PRGR_LFLG_BLOCK (((erts_aint32_t) 1) << 31) #define ERTS_THR_PRGR_LFLG_NO_LEADER (((erts_aint32_t) 1) << 30) #define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~(ERTS_THR_PRGR_LFLG_NO_LEADER \ | ERTS_THR_PRGR_LFLG_BLOCK)) #define ERTS_THR_PRGR_LFLGS_ACTIVE(LFLGS) \ ((LFLGS) & ERTS_THR_PRGR_LFLG_ACTIVE_MASK) #define ERTS_THR_PRGR_LFLGS_ALL_WAITING(LFLGS) \ (((LFLGS) & (ERTS_THR_PRGR_LFLG_NO_LEADER \ |ERTS_THR_PRGR_LFLG_ACTIVE_MASK)) \ == ERTS_THR_PRGR_LFLG_NO_LEADER) /* * We use a 64-bit value for thread progress. By this wrapping of * the thread progress will more or less never occur. * * On 32-bit systems we therefore need a double word atomic. */ #undef read_acqb #define read_acqb erts_thr_prgr_read_acqb__ #undef read_nob #define read_nob erts_thr_prgr_read_nob__ #ifdef ARCH_64 static ERTS_INLINE void set_mb(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val) { erts_atomic_set_mb(atmc, val); } static ERTS_INLINE void set_nob(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val) { erts_atomic_set_nob(atmc, val); } static ERTS_INLINE void init_nob(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val) { erts_atomic_init_nob(atmc, val); } #else #undef dw_aint_to_val #define dw_aint_to_val erts_thr_prgr_dw_aint_to_val__ static void val_to_dw_aint(erts_dw_aint_t *dw_aint, ErtsThrPrgrVal val) { #ifdef ETHR_SU_DW_NAINT_T__ dw_aint->dw_sint = (ETHR_SU_DW_NAINT_T__) val; #else dw_aint->sint[ERTS_DW_AINT_LOW_WORD] = (erts_aint_t) (val & 0xffffffff); dw_aint->sint[ERTS_DW_AINT_HIGH_WORD] = (erts_aint_t) ((val >> 32) & 0xffffffff); #endif } static ERTS_INLINE void set_mb(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val) { erts_dw_aint_t dw_aint; val_to_dw_aint(&dw_aint, val); erts_dw_atomic_set_mb(atmc, &dw_aint); } static ERTS_INLINE void set_nob(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val) { erts_dw_aint_t dw_aint; val_to_dw_aint(&dw_aint, val); erts_dw_atomic_set_nob(atmc, &dw_aint); } static ERTS_INLINE void init_nob(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val) { erts_dw_aint_t dw_aint; val_to_dw_aint(&dw_aint, val); erts_dw_atomic_init_nob(atmc, &dw_aint); } #endif /* #define ERTS_THR_PROGRESS_STATE_DEBUG */ #ifdef ERTS_THR_PROGRESS_STATE_DEBUG #ifdef __GNUC__ #warning "Thread progress state debug is on" #endif #define ERTS_THR_PROGRESS_STATE_DEBUG_LEADER (((erts_aint32_t) 1) << 0) #define ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE (((erts_aint32_t) 1) << 1) #define ERTS_THR_PROGRESS_STATE_DEBUG_INIT(ID) \ erts_atomic32_init_nob(&intrnl->thr[(ID)].data.state_debug, \ ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE) #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(ID, ON) \ do { \ erts_aint32_t state_debug__; \ state_debug__ = erts_atomic32_read_nob(&intrnl->thr[(ID)].data.state_debug); \ if ((ON)) \ state_debug__ |= ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE; \ else \ state_debug__ &= ~ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE; \ erts_atomic32_set_nob(&intrnl->thr[(ID)].data.state_debug, state_debug__); \ } while (0) #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(ID, ON) \ do { \ erts_aint32_t state_debug__; \ state_debug__ = erts_atomic32_read_nob(&intrnl->thr[(ID)].data.state_debug); \ if ((ON)) \ state_debug__ |= ERTS_THR_PROGRESS_STATE_DEBUG_LEADER; \ else \ state_debug__ &= ~ERTS_THR_PROGRESS_STATE_DEBUG_LEADER; \ erts_atomic32_set_nob(&intrnl->thr[(ID)].data.state_debug, state_debug__); \ } while (0) #else #define ERTS_THR_PROGRESS_STATE_DEBUG_INIT(ID) #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(ID, ON) #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(ID, ON) #endif /* ERTS_THR_PROGRESS_STATE_DEBUG */ #define ERTS_THR_PRGR_BLCKR_INVALID (~((erts_aint32_t) 0)) #define ERTS_THR_PRGR_BLCKR_UNMANAGED (((erts_aint32_t) 1) << 31) #define ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING (((erts_aint32_t) 1) << 31) #define ERTS_THR_PRGR_BM_BITS 32 #define ERTS_THR_PRGR_BM_SHIFT 5 #define ERTS_THR_PRGR_BM_MASK 0x1f #define ERTS_THR_PRGR_WAKEUP_DATA_MASK (ERTS_THR_PRGR_WAKEUP_DATA_SIZE - 1) #define ERTS_THR_PRGR_WAKEUP_IX(V) \ ((int) ((V) & ERTS_THR_PRGR_WAKEUP_DATA_MASK)) typedef struct { erts_atomic32_t len; int id[1]; } ErtsThrPrgrManagedWakeupData; typedef struct { erts_atomic32_t len; int high_sz; int low_sz; erts_atomic32_t *high; erts_atomic32_t *low; } ErtsThrPrgrUnmanagedWakeupData; typedef struct { erts_atomic32_t lflgs; erts_atomic32_t block_count; erts_atomic_t blocker_event; erts_atomic32_t pref_wakeup_used; erts_atomic32_t managed_count; erts_atomic32_t managed_id; erts_atomic32_t unmanaged_id; } ErtsThrPrgrMiscData; typedef struct { ERTS_THR_PRGR_ATOMIC current; #ifdef ERTS_THR_PROGRESS_STATE_DEBUG erts_atomic32_t state_debug; #endif } ErtsThrPrgrElement; typedef union { ErtsThrPrgrElement data; char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrPrgrElement))]; } ErtsThrPrgrArray; typedef struct { union { ErtsThrPrgrMiscData data; char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE( sizeof(ErtsThrPrgrMiscData))]; } misc; ErtsThrPrgrArray *thr; struct { int no; ErtsThrPrgrCallbacks *callbacks; ErtsThrPrgrManagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE]; } managed; struct { int no; ErtsThrPrgrCallbacks *callbacks; ErtsThrPrgrUnmanagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE]; } unmanaged; } ErtsThrPrgrInternalData; static ErtsThrPrgrInternalData *intrnl; ErtsThrPrgr erts_thr_prgr__; erts_tsd_key_t erts_thr_prgr_data_key__; static void handle_wakeup_requests(ErtsThrPrgrVal current); static int got_sched_wakeups(void); static erts_aint32_t block_thread(ErtsThrPrgrData *tpd); static ERTS_INLINE void wakeup_managed(int id) { ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[id]; ASSERT(0 <= id && id < intrnl->managed.no); cbp->wakeup(cbp->arg); } static ERTS_INLINE void wakeup_unmanaged(int id) { ErtsThrPrgrCallbacks *cbp = &intrnl->unmanaged.callbacks[id]; ASSERT(0 <= id && id < intrnl->unmanaged.no); cbp->wakeup(cbp->arg); } static ERTS_INLINE ErtsThrPrgrData * perhaps_thr_prgr_data(ErtsSchedulerData *esdp) { if (esdp) return &esdp->thr_progress_data; else return erts_tsd_get(erts_thr_prgr_data_key__); } static ERTS_INLINE ErtsThrPrgrData * thr_prgr_data(ErtsSchedulerData *esdp) { ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp); ASSERT(tpd); return tpd; } static void init_tmp_thr_prgr_data(ErtsThrPrgrData *tpd) { tpd->id = -1; tpd->is_managed = 0; tpd->is_blocking = 0; tpd->is_temporary = 1; erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd); } static ERTS_INLINE ErtsThrPrgrData * tmp_thr_prgr_data(ErtsSchedulerData *esdp) { ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp); if (!tpd) { /* * We only allocate the part up to the wakeup_request field * which is the first field only used by registered threads */ tpd = erts_alloc(ERTS_ALC_T_T_THR_PRGR_DATA, offsetof(ErtsThrPrgrData, wakeup_request)); init_tmp_thr_prgr_data(tpd); } return tpd; } static ERTS_INLINE void return_tmp_thr_prgr_data(ErtsThrPrgrData *tpd) { if (tpd->is_temporary) { erts_tsd_set(erts_thr_prgr_data_key__, NULL); erts_free(ERTS_ALC_T_T_THR_PRGR_DATA, tpd); } } static ERTS_INLINE int block_count_dec(void) { erts_aint32_t block_count; block_count = erts_atomic32_dec_read_mb(&intrnl->misc.data.block_count); if (block_count == 0) { erts_tse_t *event; event = ((erts_tse_t*) erts_atomic_read_nob(&intrnl->misc.data.blocker_event)); if (event) erts_tse_set(event); return 1; } return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0; } static ERTS_INLINE int block_count_inc(void) { erts_aint32_t block_count; block_count = erts_atomic32_inc_read_mb(&intrnl->misc.data.block_count); return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0; } void erts_thr_progress_pre_init(void) { intrnl = NULL; erts_tsd_key_create(&erts_thr_prgr_data_key__); init_nob(&erts_thr_prgr__.current, 0); } void erts_thr_progress_init(int no_schedulers, int managed, int unmanaged) { int i, j, um_low, um_high; char *ptr; size_t cb_sz, intrnl_sz, thr_arr_sz, m_wakeup_size, um_wakeup_size, tot_size; intrnl_sz = sizeof(ErtsThrPrgrInternalData); intrnl_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(intrnl_sz); cb_sz = sizeof(ErtsThrPrgrCallbacks)*(managed+unmanaged); cb_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(cb_sz); thr_arr_sz = sizeof(ErtsThrPrgrArray)*managed; ASSERT(thr_arr_sz == ERTS_ALC_CACHE_LINE_ALIGN_SIZE(thr_arr_sz)); m_wakeup_size = sizeof(ErtsThrPrgrManagedWakeupData); m_wakeup_size += (managed - 1)*sizeof(int); m_wakeup_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(m_wakeup_size); um_low = (unmanaged - 1)/ERTS_THR_PRGR_BM_BITS + 1; um_high = (um_low - 1)/ERTS_THR_PRGR_BM_BITS + 1; um_wakeup_size = sizeof(ErtsThrPrgrUnmanagedWakeupData); um_wakeup_size += (um_high + um_low)*sizeof(erts_atomic32_t); um_wakeup_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(um_wakeup_size); tot_size = intrnl_sz; tot_size += cb_sz; tot_size += thr_arr_sz; tot_size += m_wakeup_size*ERTS_THR_PRGR_WAKEUP_DATA_SIZE; tot_size += um_wakeup_size*ERTS_THR_PRGR_WAKEUP_DATA_SIZE; ptr = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_THR_PRGR_IDATA, tot_size); intrnl = (ErtsThrPrgrInternalData *) ptr; ptr += intrnl_sz; erts_atomic32_init_nob(&intrnl->misc.data.lflgs, ERTS_THR_PRGR_LFLG_NO_LEADER); erts_atomic32_init_nob(&intrnl->misc.data.block_count, (ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING | (erts_aint32_t) managed)); erts_atomic_init_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL); erts_atomic32_init_nob(&intrnl->misc.data.pref_wakeup_used, 0); erts_atomic32_init_nob(&intrnl->misc.data.managed_count, 0); erts_atomic32_init_nob(&intrnl->misc.data.managed_id, no_schedulers); erts_atomic32_init_nob(&intrnl->misc.data.unmanaged_id, -1); intrnl->thr = (ErtsThrPrgrArray *) ptr; ptr += thr_arr_sz; for (i = 0; i < managed; i++) init_nob(&intrnl->thr[i].data.current, 0); intrnl->managed.callbacks = (ErtsThrPrgrCallbacks *) ptr; intrnl->unmanaged.callbacks = &intrnl->managed.callbacks[managed]; ptr += cb_sz; intrnl->managed.no = managed; for (i = 0; i < managed; i++) { intrnl->managed.callbacks[i].arg = NULL; intrnl->managed.callbacks[i].wakeup = NULL; } intrnl->unmanaged.no = unmanaged; for (i = 0; i < unmanaged; i++) { intrnl->unmanaged.callbacks[i].arg = NULL; intrnl->unmanaged.callbacks[i].wakeup = NULL; } for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) { intrnl->managed.data[i] = (ErtsThrPrgrManagedWakeupData *) ptr; erts_atomic32_init_nob(&intrnl->managed.data[i]->len, 0); ptr += m_wakeup_size; } for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) { erts_atomic32_t *bm; intrnl->unmanaged.data[i] = (ErtsThrPrgrUnmanagedWakeupData *) ptr; erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->len, 0); bm = (erts_atomic32_t *) (ptr + sizeof(ErtsThrPrgrUnmanagedWakeupData)); intrnl->unmanaged.data[i]->high = bm; intrnl->unmanaged.data[i]->high_sz = um_high; for (j = 0; j < um_high; j++) erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->high[j], 0); intrnl->unmanaged.data[i]->low = &intrnl->unmanaged.data[i]->high[um_high]; intrnl->unmanaged.data[i]->low_sz = um_low; for (j = 0; j < um_low; j++) erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->low[j], 0); ptr += um_wakeup_size; } ERTS_THR_MEMORY_BARRIER; } static void init_wakeup_request_array(ErtsThrPrgrVal *w) { int i; ErtsThrPrgrVal current; current = read_acqb(&erts_thr_prgr__.current); for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) { w[i] = current - ((ErtsThrPrgrVal) (ERTS_THR_PRGR_WAKEUP_DATA_SIZE + i)); if (w[i] > current) w[i]--; } } void erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks *callbacks) { ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL); int is_blocking = 0; if (tpd) { if (!tpd->is_temporary) erl_exit(ERTS_ABORT_EXIT, "%s:%d:%s(): Double register of thread\n", __FILE__, __LINE__, __func__); is_blocking = tpd->is_blocking; return_tmp_thr_prgr_data(tpd); } /* * We only allocate the part up to the leader field * which is the first field only used by managed threads */ tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA, offsetof(ErtsThrPrgrData, leader)); tpd->id = (int) erts_atomic32_inc_read_nob(&intrnl->misc.data.unmanaged_id); tpd->is_managed = 0; tpd->is_blocking = is_blocking; tpd->is_temporary = 0; ASSERT(tpd->id >= 0); if (tpd->id >= intrnl->unmanaged.no) erl_exit(ERTS_ABORT_EXIT, "%s:%d:%s(): Too many unmanaged registered threads\n", __FILE__, __LINE__, __func__); init_wakeup_request_array(&tpd->wakeup_request[0]); erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd); ASSERT(callbacks->wakeup); intrnl->unmanaged.callbacks[tpd->id] = *callbacks; } void erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp, ErtsThrPrgrCallbacks *callbacks, int pref_wakeup) { ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL); int is_blocking = 0, managed; if (tpd) { if (!tpd->is_temporary) erl_exit(ERTS_ABORT_EXIT, "%s:%d:%s(): Double register of thread\n", __FILE__, __LINE__, __func__); is_blocking = tpd->is_blocking; return_tmp_thr_prgr_data(tpd); } if (esdp) tpd = &esdp->thr_progress_data; else tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA, sizeof(ErtsThrPrgrData)); if (pref_wakeup && !erts_atomic32_xchg_nob(&intrnl->misc.data.pref_wakeup_used, 1)) tpd->id = 0; else if (esdp) tpd->id = (int) esdp->no; else tpd->id = erts_atomic32_inc_read_nob(&intrnl->misc.data.managed_id); ASSERT(tpd->id >= 0); if (tpd->id >= intrnl->managed.no) erl_exit(ERTS_ABORT_EXIT, "%s:%d:%s(): Too many managed registered threads\n", __FILE__, __LINE__, __func__); tpd->is_managed = 1; tpd->is_blocking = is_blocking; tpd->is_temporary = 0; init_wakeup_request_array(&tpd->wakeup_request[0]); ERTS_THR_PROGRESS_STATE_DEBUG_INIT(tpd->id); tpd->leader = 0; tpd->active = 1; tpd->previous.local = 0; tpd->previous.current = ERTS_THR_PRGR_VAL_WAITING; erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd); erts_atomic32_inc_nob(&intrnl->misc.data.lflgs); ASSERT(callbacks->wakeup); ASSERT(callbacks->prepare_wait); ASSERT(callbacks->wait); ASSERT(callbacks->finalize_wait); intrnl->managed.callbacks[tpd->id] = *callbacks; callbacks->prepare_wait(callbacks->arg); managed = erts_atomic32_inc_read_relb(&intrnl->misc.data.managed_count); if (managed != intrnl->managed.no) { /* Wait until all managed threads have registered... */ do { callbacks->wait(callbacks->arg); callbacks->prepare_wait(callbacks->arg); managed = erts_atomic32_read_acqb(&intrnl->misc.data.managed_count); } while (managed != intrnl->managed.no); } else { int id; /* All managed threads have registered; lets go... */ for (id = 0; id < managed; id++) if (id != tpd->id) wakeup_managed(id); } callbacks->finalize_wait(callbacks->arg); } static ERTS_INLINE int leader_update(ErtsThrPrgrData *tpd) { #ifdef ERTS_ENABLE_LOCK_CHECK erts_lc_check_exact(NULL, 0); #endif if (!tpd->leader) { /* Probably need to block... */ block_thread(tpd); } else { erts_aint32_t lflgs; ErtsThrPrgrVal next; int ix, sz, make_progress; if (tpd->previous.current == ERTS_THR_PRGR_VAL_WAITING) { /* Took over as leader from another thread */ tpd->previous.current = read_acqb(&erts_thr_prgr__.current); tpd->previous.next = tpd->previous.current; tpd->previous.next++; if (tpd->previous.next == ERTS_THR_PRGR_VAL_WAITING) tpd->previous.next = 0; } if (tpd->previous.local == tpd->previous.current) { ErtsThrPrgrVal val = tpd->previous.current + 1; if (val == ERTS_THR_PRGR_VAL_WAITING) val = 0; tpd->previous.local = val; set_mb(&intrnl->thr[tpd->id].data.current, val); } next = tpd->previous.next; make_progress = 1; sz = intrnl->managed.no; for (ix = 0; ix < sz; ix++) { ErtsThrPrgrVal tmp; tmp = read_nob(&intrnl->thr[ix].data.current); if (tmp != next && tmp != ERTS_THR_PRGR_VAL_WAITING) { make_progress = 0; ASSERT(erts_thr_progress_has_passed__(next, tmp)); break; } } if (make_progress) { ErtsThrPrgrVal current = next; next++; if (next == ERTS_THR_PRGR_VAL_WAITING) next = 0; set_nob(&intrnl->thr[tpd->id].data.current, next); set_mb(&erts_thr_prgr__.current, current); tpd->previous.local = next; tpd->previous.next = next; tpd->previous.current = current; #if ERTS_THR_PRGR_PRINT_VAL if (current % 1000 == 0) erts_fprintf(stderr, "%b64u\n", current); #endif handle_wakeup_requests(current); } if (tpd->active) { lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) (void) block_thread(tpd); } else { tpd->leader = 0; tpd->previous.current = ERTS_THR_PRGR_VAL_WAITING; #if ERTS_THR_PRGR_PRINT_LEADER erts_fprintf(stderr, "L <- %d\n", tpd->id); #endif ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 0); lflgs = erts_atomic32_read_bor_relb(&intrnl->misc.data.lflgs, ERTS_THR_PRGR_LFLG_NO_LEADER); if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) lflgs = block_thread(tpd); if (ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0 && got_sched_wakeups()) wakeup_managed(0); } } return tpd->leader; } static int update(ErtsThrPrgrData *tpd) { int res; ErtsThrPrgrVal val; if (tpd->leader) res = 1; else { erts_aint32_t lflgs; res = 0; val = read_acqb(&erts_thr_prgr__.current); if (tpd->previous.local == val) { val++; if (val == ERTS_THR_PRGR_VAL_WAITING) val = 0; tpd->previous.local = val; set_mb(&intrnl->thr[tpd->id].data.current, val); } lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) res = 1; /* Need to block in leader_update() */ if ((lflgs & ERTS_THR_PRGR_LFLG_NO_LEADER) && (tpd->active || ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0)) { /* Try to take over leadership... */ erts_aint32_t olflgs; olflgs = erts_atomic32_read_band_acqb( &intrnl->misc.data.lflgs, ~ERTS_THR_PRGR_LFLG_NO_LEADER); if (olflgs & ERTS_THR_PRGR_LFLG_NO_LEADER) { tpd->leader = 1; #if ERTS_THR_PRGR_PRINT_LEADER erts_fprintf(stderr, "L -> %d\n", tpd->id); #endif ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 1); } } res |= tpd->leader; } return res; } int erts_thr_progress_update(ErtsSchedulerData *esdp) { return update(thr_prgr_data(esdp)); } int erts_thr_progress_leader_update(ErtsSchedulerData *esdp) { return leader_update(thr_prgr_data(esdp)); } void erts_thr_progress_prepare_wait(ErtsSchedulerData *esdp) { erts_aint32_t lflgs; ErtsThrPrgrData *tpd = thr_prgr_data(esdp); #ifdef ERTS_ENABLE_LOCK_CHECK erts_lc_check_exact(NULL, 0); #endif block_count_dec(); tpd->previous.local = ERTS_THR_PRGR_VAL_WAITING; set_mb(&intrnl->thr[tpd->id].data.current, ERTS_THR_PRGR_VAL_WAITING); lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); if (ERTS_THR_PRGR_LFLGS_ALL_WAITING(lflgs) && got_sched_wakeups()) wakeup_managed(0); /* Someone need to make progress */ } void erts_thr_progress_finalize_wait(ErtsSchedulerData *esdp) { ErtsThrPrgrData *tpd = thr_prgr_data(esdp); ErtsThrPrgrVal current, val; #ifdef ERTS_ENABLE_LOCK_CHECK erts_lc_check_exact(NULL, 0); #endif /* * We aren't allowed to continue until our thread * progress is past global current. */ val = current = read_acqb(&erts_thr_prgr__.current); while (1) { val++; if (val == ERTS_THR_PRGR_VAL_WAITING) val = 0; tpd->previous.local = val; set_mb(&intrnl->thr[tpd->id].data.current, val); val = read_acqb(&erts_thr_prgr__.current); if (current == val) break; current = val; } if (block_count_inc()) block_thread(tpd); if (update(tpd)) leader_update(tpd); } void erts_thr_progress_active(ErtsSchedulerData *esdp, int on) { ErtsThrPrgrData *tpd = thr_prgr_data(esdp); #ifdef ERTS_ENABLE_LOCK_CHECK erts_lc_check_exact(NULL, 0); #endif ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(tpd->id, on); if (on) { ASSERT(!tpd->active); tpd->active = 1; erts_atomic32_inc_nob(&intrnl->misc.data.lflgs); } else { ASSERT(tpd->active); tpd->active = 0; erts_atomic32_dec_nob(&intrnl->misc.data.lflgs); if (update(tpd)) leader_update(tpd); } #ifdef DEBUG { erts_aint32_t n = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); n &= ERTS_THR_PRGR_LFLG_ACTIVE_MASK; ASSERT(tpd->active <= n && n <= intrnl->managed.no); } #endif } static ERTS_INLINE int has_reached_wakeup(ErtsThrPrgrVal wakeup) { /* * Exactly the same as erts_thr_progress_has_reached(), but * also verify valid wakeup requests in debug mode. */ ErtsThrPrgrVal current; current = read_acqb(&erts_thr_prgr__.current); #if ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE { ErtsThrPrgrVal limit; /* * erts_thr_progress_later() returns values which are * equal to 'current + 2', or 'current + 3'. That is, users * should never get a hold of values larger than that. * * That is, valid values are values less than 'current + 4'. * * Values larger than this won't work with the wakeup * algorithm. */ limit = current + 4; if (limit == ERTS_THR_PRGR_VAL_WAITING) limit = 0; else if (limit < current) /* Wrapped */ limit += 1; if (!erts_thr_progress_has_passed__(limit, wakeup)) erl_exit(ERTS_ABORT_EXIT, "Invalid wakeup request value found:" " current=%b64u, wakeup=%b64u, limit=%b64u", current, wakeup, limit); } #endif if (current == wakeup) return 1; return erts_thr_progress_has_passed__(current, wakeup); } static void request_wakeup_managed(ErtsThrPrgrData *tpd, ErtsThrPrgrVal value) { ErtsThrPrgrManagedWakeupData *mwd; int ix, wix; /* * Only managed threads that aren't in waiting state * are allowed to call this function. */ ASSERT(tpd->is_managed); ASSERT(tpd->previous.local != ERTS_THR_PRGR_VAL_WAITING); if (has_reached_wakeup(value)) { wakeup_managed(tpd->id); return; } wix = ERTS_THR_PRGR_WAKEUP_IX(value); if (tpd->wakeup_request[wix] == value) return; /* Already got a request registered */ ASSERT(erts_thr_progress_has_passed__(value, tpd->wakeup_request[wix])); if (tpd->previous.local == value) { /* * We have already confirmed this value. We need to request * wakeup for a value later than our latest confirmed value in * order to prevent progress from reaching the requested value * while we are writing the request. * * It is ok to move the wakeup request forward since the only * guarantee we make (and can make) is that the thread will be * woken some time *after* the requested value has been reached. */ value++; if (value == ERTS_THR_PRGR_VAL_WAITING) value = 0; wix = ERTS_THR_PRGR_WAKEUP_IX(value); if (tpd->wakeup_request[wix] == value) return; /* Already got a request registered */ ASSERT(erts_thr_progress_has_passed__(value, tpd->wakeup_request[wix])); } tpd->wakeup_request[wix] = value; mwd = intrnl->managed.data[wix]; ix = erts_atomic32_inc_read_nob(&mwd->len) - 1; #if ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE if (ix >= intrnl->managed.no) erl_exit(ERTS_ABORT_EXIT, "Internal error: Too many wakeup requests\n"); #endif mwd->id[ix] = tpd->id; ASSERT(!erts_thr_progress_has_reached(value)); /* * This thread is guarranteed to issue a full memory barrier: * - after the request has been written, but * - before the global thread progress reach the (possibly * increased) requested wakeup value. */ } static void request_wakeup_unmanaged(ErtsThrPrgrData *tpd, ErtsThrPrgrVal value) { int wix, ix, id, bit; ErtsThrPrgrUnmanagedWakeupData *umwd; ASSERT(!tpd->is_managed); /* * Thread progress *can* reach and pass our requested value while * we are writing the request. */ if (has_reached_wakeup(value)) { wakeup_unmanaged(tpd->id); return; } wix = ERTS_THR_PRGR_WAKEUP_IX(value); if (tpd->wakeup_request[wix] == value) return; /* Already got a request registered */ ASSERT(erts_thr_progress_has_passed__(value, tpd->wakeup_request[wix])); umwd = intrnl->unmanaged.data[wix]; id = tpd->id; bit = id & ERTS_THR_PRGR_BM_MASK; ix = id >> ERTS_THR_PRGR_BM_SHIFT; ASSERT(0 <= ix && ix < umwd->low_sz); erts_atomic32_read_bor_nob(&umwd->low[ix], 1 << bit); bit = ix & ERTS_THR_PRGR_BM_MASK; ix >>= ERTS_THR_PRGR_BM_SHIFT; ASSERT(0 <= ix && ix < umwd->high_sz); erts_atomic32_read_bor_nob(&umwd->high[ix], 1 << bit); erts_atomic32_inc_mb(&umwd->len); if (erts_thr_progress_has_reached(value)) wakeup_unmanaged(tpd->id); else tpd->wakeup_request[wix] = value; } void erts_thr_progress_wakeup(ErtsSchedulerData *esdp, ErtsThrPrgrVal value) { ErtsThrPrgrData *tpd = thr_prgr_data(esdp); ASSERT(!tpd->is_temporary); if (tpd->is_managed) request_wakeup_managed(tpd, value); else request_wakeup_unmanaged(tpd, value); } static void wakeup_unmanaged_threads(ErtsThrPrgrUnmanagedWakeupData *umwd) { int hix; for (hix = 0; hix < umwd->high_sz; hix++) { erts_aint32_t hmask = erts_atomic32_read_nob(&umwd->high[hix]); if (hmask) { int hbase = hix << ERTS_THR_PRGR_BM_SHIFT; int hbit; for (hbit = 0; hbit < ERTS_THR_PRGR_BM_BITS; hbit++) { if (hmask & (1 << hbit)) { erts_aint_t lmask; int lix = hbase + hbit; ASSERT(0 <= lix && lix < umwd->low_sz); lmask = erts_atomic32_read_nob(&umwd->low[lix]); if (lmask) { int lbase = lix << ERTS_THR_PRGR_BM_SHIFT; int lbit; for (lbit = 0; lbit < ERTS_THR_PRGR_BM_BITS; lbit++) { if (lmask & (1 << lbit)) { int id = lbase + lbit; wakeup_unmanaged(id); } } erts_atomic32_set_nob(&umwd->low[lix], 0); } } } erts_atomic32_set_nob(&umwd->high[hix], 0); } } } static void handle_wakeup_requests(ErtsThrPrgrVal current) { ErtsThrPrgrManagedWakeupData *mwd; ErtsThrPrgrUnmanagedWakeupData *umwd; int wix, len, i; wix = ERTS_THR_PRGR_WAKEUP_IX(current); mwd = intrnl->managed.data[wix]; len = erts_atomic32_read_nob(&mwd->len); ASSERT(len >= 0); if (len) { for (i = 0; i < len; i++) wakeup_managed(mwd->id[i]); erts_atomic32_set_nob(&mwd->len, 0); } umwd = intrnl->unmanaged.data[wix]; len = erts_atomic32_read_nob(&umwd->len); ASSERT(len >= 0); if (len) { wakeup_unmanaged_threads(umwd); erts_atomic32_set_nob(&umwd->len, 0); } } static int got_sched_wakeups(void) { int wix; ERTS_THR_MEMORY_BARRIER; for (wix = 0; wix < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; wix++) { ErtsThrPrgrManagedWakeupData **mwd = intrnl->managed.data; if (erts_atomic32_read_nob(&mwd[wix]->len)) return 1; } for (wix = 0; wix < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; wix++) { ErtsThrPrgrUnmanagedWakeupData **umwd = intrnl->unmanaged.data; if (erts_atomic32_read_nob(&umwd[wix]->len)) return 1; } return 0; } static erts_aint32_t block_thread(ErtsThrPrgrData *tpd) { erts_aint32_t lflgs; ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[tpd->id]; do { block_count_dec(); while (1) { cbp->prepare_wait(cbp->arg); lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) cbp->wait(cbp->arg); else break; } } while (block_count_inc()); cbp->finalize_wait(cbp->arg); return lflgs; } static erts_aint32_t thr_progress_block(ErtsThrPrgrData *tpd, int wait) { erts_tse_t *event = NULL; /* Remove erroneous warning... sigh... */ erts_aint32_t lflgs, bc; if (tpd->is_blocking++) return (erts_aint32_t) 0; while (1) { lflgs = erts_atomic32_read_bor_nob(&intrnl->misc.data.lflgs, ERTS_THR_PRGR_LFLG_BLOCK); if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) block_thread(tpd); else break; } #if ERTS_THR_PRGR_PRINT_BLOCKERS erts_fprintf(stderr, "block(%d)\n", tpd->id); #endif ASSERT(ERTS_AINT_NULL == erts_atomic_read_nob(&intrnl->misc.data.blocker_event)); if (wait) { event = erts_tse_fetch(); erts_tse_reset(event); erts_atomic_set_nob(&intrnl->misc.data.blocker_event, (erts_aint_t) event); } if (tpd->is_managed) erts_atomic32_dec_nob(&intrnl->misc.data.block_count); bc = erts_atomic32_read_band_mb(&intrnl->misc.data.block_count, ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING); bc &= ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING; if (wait) { while (bc != 0) { erts_tse_wait(event); erts_tse_reset(event); bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count); } } return bc; } void erts_thr_progress_block(void) { thr_progress_block(tmp_thr_prgr_data(NULL), 1); } void erts_thr_progress_fatal_error_block(SWord timeout, ErtsThrPrgrData *tmp_tpd_bufp) { ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL); erts_aint32_t bc; SWord time_left = timeout; SysTimeval to; /* * Counting poll intervals may give us a too long timeout * if cpu is busy. If we got tolerant time of day we use it * to prevent this. */ if (!erts_disable_tolerant_timeofday) { erts_get_timeval(&to); to.tv_sec += timeout / 1000; to.tv_sec += timeout % 1000; } if (!tpd) { /* * We stack allocate since failure to allocate memory may * have caused the problem in the first place. This is ok * since we never complete an unblock after a fatal error * block. */ tpd = tmp_tpd_bufp; init_tmp_thr_prgr_data(tpd); } bc = thr_progress_block(tpd, 0); if (bc == 0) return; /* Succefully blocked all managed threads */ while (1) { if (erts_milli_sleep(ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL) == 0) time_left -= ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL; bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count); if (bc == 0) break; /* Succefully blocked all managed threads */ if (time_left <= 0) break; /* Timeout */ if (!erts_disable_tolerant_timeofday) { SysTimeval now; erts_get_timeval(&now); if (now.tv_sec > to.tv_sec) break; /* Timeout */ if (now.tv_sec == to.tv_sec && now.tv_usec >= to.tv_usec) break; /* Timeout */ } } } void erts_thr_progress_unblock(void) { erts_tse_t *event; int id, break_id, sz, wakeup; ErtsThrPrgrData *tpd = thr_prgr_data(NULL); ASSERT(tpd->is_blocking); if (--tpd->is_blocking) return; sz = intrnl->managed.no; wakeup = 1; if (!tpd->is_managed) id = break_id = tpd->id < 0 ? 0 : tpd->id % sz; else { break_id = tpd->id; id = break_id + 1; if (id >= sz) id = 0; if (id == break_id) wakeup = 0; erts_atomic32_inc_nob(&intrnl->misc.data.block_count); } event = ((erts_tse_t *) erts_atomic_read_nob(&intrnl->misc.data.blocker_event)); ASSERT(event); erts_atomic_set_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL); erts_atomic32_read_bor_relb(&intrnl->misc.data.block_count, ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING); #if ERTS_THR_PRGR_PRINT_BLOCKERS erts_fprintf(stderr, "unblock(%d)\n", tpd->id); #endif erts_atomic32_read_band_mb(&intrnl->misc.data.lflgs, ~ERTS_THR_PRGR_LFLG_BLOCK); if (wakeup) { do { ErtsThrPrgrVal tmp; tmp = read_nob(&intrnl->thr[id].data.current); if (tmp != ERTS_THR_PRGR_VAL_WAITING) wakeup_managed(id); if (++id >= sz) id = 0; } while (id != break_id); } return_tmp_thr_prgr_data(tpd); erts_tse_return(event); } int erts_thr_progress_is_blocking(void) { ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL); return tpd && tpd->is_blocking; } void erts_thr_progress_dbg_print_state(void) { int id; int sz = intrnl->managed.no; erts_fprintf(stderr, "--- thread progress ---\n"); erts_fprintf(stderr,"current=%b64u\n", erts_thr_progress_current()); for (id = 0; id < sz; id++) { ErtsThrPrgrVal current = read_nob(&intrnl->thr[id].data.current); #ifdef ERTS_THR_PROGRESS_STATE_DEBUG erts_aint32_t state_debug; char *active, *leader; state_debug = erts_atomic32_read_nob(&intrnl->thr[id].data.state_debug); active = (state_debug & ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE ? "true" : "false"); leader = (state_debug & ERTS_THR_PROGRESS_STATE_DEBUG_LEADER ? "true" : "false"); #endif if (current == ERTS_THR_PRGR_VAL_WAITING) erts_fprintf(stderr, " id=%d, current=WAITING" #ifdef ERTS_THR_PROGRESS_STATE_DEBUG ", active=%s, leader=%s" #endif "\n", id #ifdef ERTS_THR_PROGRESS_STATE_DEBUG , active, leader #endif ); else erts_fprintf(stderr, " id=%d, current=%b64u" #ifdef ERTS_THR_PROGRESS_STATE_DEBUG ", active=%s, leader=%s" #endif "\n", id, current #ifdef ERTS_THR_PROGRESS_STATE_DEBUG , active, leader #endif ); } erts_fprintf(stderr, "-----------------------\n"); } #endif