diff options
Diffstat (limited to 'erts/lib_src/common/ethr_mutex.c')
-rw-r--r-- | erts/lib_src/common/ethr_mutex.c | 494 |
1 files changed, 446 insertions, 48 deletions
diff --git a/erts/lib_src/common/ethr_mutex.c b/erts/lib_src/common/ethr_mutex.c index 81fd6af80a..5688ac8b9a 100644 --- a/erts/lib_src/common/ethr_mutex.c +++ b/erts/lib_src/common/ethr_mutex.c @@ -223,9 +223,59 @@ rwmutex_try_complete_runlock(ethr_rwmutex *rwmtx, int try_write_lock); #endif +/* -- Utilities used by multiple implementations -- */ + +#if defined(ETHR_USE_OWN_RWMTX_IMPL__) || defined(ETHR_USE_OWN_MTX_IMPL__) \ + || defined(ETHR_WIN32_THREADS) + +static ETHR_INLINE void +enqueue(ethr_ts_event **queue, + ethr_ts_event *tse_start, + ethr_ts_event *tse_end) +{ + if (!*queue) { + *queue = tse_start; + tse_start->prev = tse_end; + tse_end->next = tse_start; + } + else { + tse_end->next = *queue; + tse_start->prev = (*queue)->prev; + (*queue)->prev->next = tse_start; + (*queue)->prev = tse_end; + } +} + + +static ETHR_INLINE void +dequeue(ethr_ts_event **queue, + ethr_ts_event *tse_start, + ethr_ts_event *tse_end) +{ + if (tse_start->prev == tse_end) { + ETHR_ASSERT(*queue == tse_start && tse_end->next == tse_start); + *queue = NULL; + } + else { + if (*queue == tse_start) + *queue = tse_end->next; + tse_end->next->prev = tse_start->prev; + tse_start->prev->next = tse_end->next; + } +} + +#endif + #if defined(ETHR_USE_OWN_RWMTX_IMPL__) || defined(ETHR_USE_OWN_MTX_IMPL__) -/* -- Utilities operating both on ordinary mutexes and read write mutexes -- */ +static ETHR_INLINE void +insert(ethr_ts_event *tse_pred, ethr_ts_event *tse) +{ + tse->next = tse_pred->next; + tse->prev = tse_pred; + tse_pred->next->prev = tse; + tse_pred->next = tse; +} static ETHR_INLINE void rwmutex_freqread_wtng_rdrs_inc(ethr_rwmutex *rwmtx, ethr_ts_event *tse) @@ -355,51 +405,6 @@ rwmutex_freqread_rdrs_read(ethr_rwmutex *rwmtx, int ix) return res; } - -static ETHR_INLINE void -enqueue(ethr_ts_event **queue, - ethr_ts_event *tse_start, - ethr_ts_event *tse_end) -{ - if (!*queue) { - *queue = tse_start; - tse_start->prev = tse_end; - tse_end->next = tse_start; - } - else { - tse_end->next = *queue; - tse_start->prev = (*queue)->prev; - (*queue)->prev->next = tse_start; - (*queue)->prev = tse_end; - } -} - -static ETHR_INLINE void -insert(ethr_ts_event *tse_pred, ethr_ts_event *tse) -{ - tse->next = tse_pred->next; - tse->prev = tse_pred; - tse_pred->next->prev = tse; - tse_pred->next = tse; -} - -static ETHR_INLINE void -dequeue(ethr_ts_event **queue, - ethr_ts_event *tse_start, - ethr_ts_event *tse_end) -{ - if (tse_start->prev == tse_end) { - ETHR_ASSERT(*queue == tse_start && tse_end->next == tse_start); - *queue = NULL; - } - else { - if (*queue == tse_start) - *queue = tse_end->next; - tse_end->next->prev = tse_start->prev; - tse_start->prev->next = tse_end->next; - } -} - static void event_wait(struct ethr_mutex_base_ *mtxb, ethr_ts_event *tse, @@ -1244,7 +1249,7 @@ ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) return 0; } -#else +#elif defined(ETHR_PTHREADS) && !defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) /* -- pthread mutex and condition variables -------------------------------- */ int @@ -1261,6 +1266,12 @@ ethr_mutex_init(ethr_mutex *mtx) } int +ethr_mutex_init_opt(ethr_mutex *mtx, ethr_mutex_opt *opt) +{ + return ethr_mutex_init(mtx); +} + +int ethr_mutex_destroy(ethr_mutex *mtx) { #if ETHR_XCHK @@ -1293,6 +1304,12 @@ ethr_cond_init(ethr_cond *cnd) } int +ethr_cond_init_opt(ethr_cond *cnd, ethr_cond_opt *opt) +{ + return ethr_cond_init(cnd); +} + +int ethr_cond_destroy(ethr_cond *cnd) { #if ETHR_XCHK @@ -1354,7 +1371,388 @@ ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) return res; } -#endif /* pthread_mutex */ +#elif defined(ETHR_WIN32_THREADS) || defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) + +/* + * As of Vista/Server, 2008 Windows has condition variables that can be + * used with critical sections. However, we need to be able to run on + * older Windows versions too, so we need to implement condition variables + * ourselves. + */ + +#ifdef ETHR_DBG_WIN_MTX_WITH_PTHREADS +/* + * For debugging of this implementation on POSIX platforms... + */ + +#define ethr_win_get_errno__() EINVAL +#if defined(__GNUC__) +#define __forceinline __inline__ +#else +#define __forceinline +#endif + +static int +InitializeCriticalSectionAndSpinCount(CRITICAL_SECTION *cs, int sc) +{ + return 0 == pthread_mutex_init((pthread_mutex_t *) cs, NULL); +} + +static void DeleteCriticalSection(CRITICAL_SECTION *cs) +{ + int res = pthread_mutex_destroy((pthread_mutex_t *) cs); + if (res != 0) + ETHR_FATAL_ERROR__(res); +} + +int TryEnterCriticalSection(CRITICAL_SECTION *cs) +{ + int res; + res = pthread_mutex_trylock((pthread_mutex_t *) cs); + if (res != 0 && res != EBUSY) + ETHR_FATAL_ERROR__(res); + return res == 0; +} + +void EnterCriticalSection(CRITICAL_SECTION *cs) +{ + int res = pthread_mutex_lock((pthread_mutex_t *) cs); + if (res != 0) + ETHR_FATAL_ERROR__(res); +} + +void LeaveCriticalSection(CRITICAL_SECTION *cs) +{ + int res = pthread_mutex_unlock((pthread_mutex_t *) cs); + if (res != 0) + ETHR_FATAL_ERROR__(res); +} + +#endif + +#define ETHR_CND_WAIT__ ((ethr_sint32_t) 0x11dead11) +#define ETHR_CND_WAKEUP__ ((ethr_sint32_t) 0x11beef11) + +static __forceinline void +cond_wakeup(ethr_ts_event *tse) +{ + ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs) == ETHR_CND_WAIT__); + + ethr_atomic32_set_relb(&tse->uaflgs, ETHR_CND_WAKEUP__); + ethr_event_set(&tse->event); +} + +void +ethr_mutex_cond_wakeup__(ethr_mutex *mtx) +{ + /* + * Called by ethr_mutex_unlock() when we have + * cond signal/broadcast wakeups waiting to + * be completed. + */ + ethr_ts_event *tse; + + if (!mtx->posix_compliant) { + tse = mtx->wakeups; + dequeue(&mtx->wakeups, tse, tse); + } + else { + ethr_spin_lock(&mtx->lock); + tse = mtx->wakeups; + if (tse) + dequeue(&mtx->wakeups, tse, tse); + if (!mtx->wakeups) + ethr_atomic32_set_relb(&mtx->have_wakeups, 0); + ethr_spin_unlock(&mtx->lock); + } + + LeaveCriticalSection(&mtx->cs); + + ETHR_ASSERT(tse || mtx->posix_compliant); + + /* + * We delay actual condition variable wakeup until + * this point when we have left the critical section. + * This in order to avoid that the other thread is + * woken and then right away have to go to sleep + * waiting for the critical section that we are in. + * + * We also only wake one thread at a time even if + * there are multiple threads waiting to be woken. + * Otherwise all but one will be woken and then right + * away have to go to sleep on the critical section. + * Since each wakeup is guaranteed to generate at + * least one lock/unlock sequence on this mutex, all + * threads will eventually be woken. + */ + + if (tse) + cond_wakeup(tse); +} + +int +ethr_mutex_init_opt(ethr_mutex *mtx, ethr_mutex_opt *opt) +{ + int spincount; +#if ETHR_XCHK + if (!mtx) { + ETHR_ASSERT(0); + return EINVAL; + } + mtx->initialized = ETHR_MUTEX_INITIALIZED; +#endif + + spincount = opt ? opt->aux_spincount : 0; + if (spincount < 0) + spincount = 0; + + if (!InitializeCriticalSectionAndSpinCount(&mtx->cs, spincount)) { +#if ETHR_XCHK + mtx->initialized = 0; +#endif + return ethr_win_get_errno__(); + } + + mtx->posix_compliant = opt ? opt->posix_compliant : 0; + mtx->wakeups = NULL; + if (mtx->posix_compliant) { + ethr_atomic32_init(&mtx->locked, 0); + ethr_atomic32_init(&mtx->have_wakeups, 0); + ethr_spinlock_init(&mtx->lock); + } + return 0; +} + +int +ethr_mutex_init(ethr_mutex *mtx) +{ + return ethr_mutex_init_opt(mtx, NULL); +} + +int +ethr_mutex_destroy(ethr_mutex *mtx) +{ + DeleteCriticalSection(&mtx->cs); + if (mtx->posix_compliant) + return ethr_spinlock_destroy(&mtx->lock); + else + return 0; +} + +int +ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) +{ + void *udata; + ethr_ts_event *tse = ethr_get_ts_event(); + int spincount; + + udata = tse->udata; + tse->udata = (void *) mtx; + ethr_atomic32_set_relb(&tse->uaflgs, ETHR_CND_WAIT__); + + EnterCriticalSection(&cnd->cs); + enqueue(&cnd->waiters, tse, tse); + LeaveCriticalSection(&cnd->cs); + + ethr_mutex_unlock(mtx); + + spincount = cnd->spincount; + + while (ethr_atomic32_read_acqb(&tse->uaflgs) != ETHR_CND_WAKEUP__) { + ethr_event_reset(&tse->event); + if (ethr_atomic32_read_acqb(&tse->uaflgs) == ETHR_CND_WAKEUP__) + break; + ethr_event_swait(&tse->event, spincount); + spincount = 0; + } + + tse->udata = udata; + ethr_leave_ts_event(tse); + + ethr_mutex_lock(mtx); + + return 0; +} + +static __forceinline void +posix_compliant_mtx_enqueue(ethr_mutex *mtx, + ethr_ts_event *tse_start, + ethr_ts_event *tse_end) +{ + ethr_ts_event *tse_wakeup = NULL; /* Avoid erroneous compiler warning... */ + /* + * The associated mutex might not be locked, so we need to + * check if it is. If locked, enqueue for wakeup at unlock; + * otherwise, wakeup the first one now and enqueue the rest. + */ + if (tse_start == tse_end && !ethr_atomic32_read(&mtx->locked)) { + tse_wakeup = tse_start; + wakeup: + cond_wakeup(tse_wakeup); + } + else { + int need_wakeup; + ethr_spin_lock(&mtx->lock); + if (!mtx->wakeups) + ethr_atomic32_set_mb(&mtx->have_wakeups, 1); + need_wakeup = !ethr_atomic32_read(&mtx->locked); + if (need_wakeup) { + if (tse_start == tse_end) { + if (!mtx->wakeups) + ethr_atomic32_set_relb(&mtx->have_wakeups, 0); + ethr_spin_unlock(&mtx->lock); + tse_wakeup = tse_start; + goto wakeup; + } + tse_wakeup = tse_start; + tse_start = tse_start->next; + } + enqueue(&mtx->wakeups, tse_start, tse_end); + ethr_spin_unlock(&mtx->lock); + if (need_wakeup) + goto wakeup; + } +} + +static __forceinline void +enqueue_cond_wakeups(ethr_ts_event *queue, int posix_compliant) +{ + if (queue) { + int more; + ethr_ts_event *q = queue; + + /* + * Waiters may be using different mutexes... + */ + + do { + ethr_mutex *mtx; + ethr_ts_event *tse, *tse_start, *tse_end; + + more = 0; + tse_start = q; + mtx = (ethr_mutex *) tse_start->udata; + + ETHR_ASSERT(posix_compliant + ? mtx->posix_compliant + : !mtx->posix_compliant); + + ETHR_ASSERT(ethr_atomic32_read(&tse_start->uaflgs) + == ETHR_CND_WAIT__); + ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED); + + tse_end = tse_start->prev; + + for (tse = tse_start->next; tse != tse_start; tse = tse->next) { + + ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs) + == ETHR_CND_WAIT__); + + if (mtx != (ethr_mutex *) tse->udata) { + tse_end = tse->prev; + dequeue(&q, tse_start, tse_end); + more = 1; + break; + } + } + + if (posix_compliant) + posix_compliant_mtx_enqueue(mtx, tse_start, tse_end); + else + enqueue(&mtx->wakeups, tse_start, tse_end); + + } while (more); + } +} + +void +ethr_cond_broadcast(ethr_cond *cnd) +{ + ethr_ts_event *waiters; + + EnterCriticalSection(&cnd->cs); + waiters = cnd->waiters; + cnd->waiters = NULL; + LeaveCriticalSection(&cnd->cs); + + if (cnd->posix_compliant) + enqueue_cond_wakeups(waiters, 1); + else + enqueue_cond_wakeups(waiters, 0); +} + +void +ethr_cond_signal(ethr_cond *cnd) +{ + ethr_mutex *mtx; + ethr_ts_event *tse; + + EnterCriticalSection(&cnd->cs); + tse = cnd->waiters; + if (tse) + dequeue(&cnd->waiters, tse, tse); + LeaveCriticalSection(&cnd->cs); + + if (tse) { + mtx = (ethr_mutex *) tse->udata; + + ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs) == ETHR_CND_WAIT__); + ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED); + ETHR_ASSERT(cnd->posix_compliant + ? mtx->posix_compliant + : !mtx->posix_compliant); + + if (cnd->posix_compliant) + posix_compliant_mtx_enqueue(mtx, tse, tse); + else + enqueue(&mtx->wakeups, tse, tse); + } +} + +int +ethr_cond_init_opt(ethr_cond *cnd, ethr_cond_opt *opt) +{ + int spincount; + +#if ETHR_XCHK + if (!cnd) { + ETHR_ASSERT(0); + return EINVAL; + } + cnd->initialized = ETHR_COND_INITIALIZED; +#endif + + spincount = opt ? opt->aux_spincount : 0; + if (spincount < 0) + spincount = 0; + + if (!InitializeCriticalSectionAndSpinCount(&cnd->cs, spincount)) { +#if ETHR_XCHK + cnd->initialized = 0; +#endif + return ethr_win_get_errno__(); + } + + cnd->posix_compliant = opt ? opt->posix_compliant : 0; + cnd->waiters = NULL; + cnd->spincount = spincount; + return 0; +} + +int +ethr_cond_init(ethr_cond *cnd) +{ + return ethr_cond_init_opt(cnd, NULL); +} + +int +ethr_cond_destroy(ethr_cond *cnd) +{ + DeleteCriticalSection(&cnd->cs); + return 0; +} + +#endif /* -- Exported symbols of inline functions --------------------------------- */ |