diff options
author | Rickard Green <[email protected]> | 2011-10-30 00:23:16 +0200 |
---|---|---|
committer | Rickard Green <[email protected]> | 2011-11-13 20:40:59 +0100 |
commit | 15774d2ac5ba38dba287309f91eb7e4f58b9a636 (patch) | |
tree | ff9f7f07f0d5c2b807c1614fb2f44b31f505d672 /erts | |
parent | dcc7ecbf6af5420af2d5dbd0e97fc7a2e0e894a6 (diff) | |
download | otp-15774d2ac5ba38dba287309f91eb7e4f58b9a636.tar.gz otp-15774d2ac5ba38dba287309f91eb7e4f58b9a636.tar.bz2 otp-15774d2ac5ba38dba287309f91eb7e4f58b9a636.zip |
Use critical sections as mutex implementation on Windows
Windows native critical sections are now used internally in the
runtime system as mutex implementation. This since they perform
better under extreme contention than our own implementation.
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/beam/erl_drv_thread.c | 8 | ||||
-rw-r--r-- | erts/emulator/beam/erl_smp.h | 10 | ||||
-rw-r--r-- | erts/emulator/beam/erl_threads.h | 12 | ||||
-rw-r--r-- | erts/emulator/sys/win32/sys.c | 4 | ||||
-rw-r--r-- | erts/include/internal/ethr_mutex.h | 115 | ||||
-rw-r--r-- | erts/include/internal/ethread.h | 1 | ||||
-rw-r--r-- | erts/lib_src/common/ethr_mutex.c | 494 |
7 files changed, 588 insertions, 56 deletions
diff --git a/erts/emulator/beam/erl_drv_thread.c b/erts/emulator/beam/erl_drv_thread.c index dc578f6d2a..a49a155701 100644 --- a/erts/emulator/beam/erl_drv_thread.c +++ b/erts/emulator/beam/erl_drv_thread.c @@ -158,7 +158,9 @@ erl_drv_mutex_create(char *name) (sizeof(ErlDrvMutex) + (name ? sys_strlen(name) + 1 : 0))); if (dmtx) { - if (ethr_mutex_init(&dmtx->mtx) != 0) { + ethr_mutex_opt opt = ETHR_MUTEX_OPT_DEFAULT_INITER; + opt.posix_compliant = 1; + if (ethr_mutex_init_opt(&dmtx->mtx, &opt) != 0) { erts_free(ERTS_ALC_T_DRV_MTX, (void *) dmtx); dmtx = NULL; } @@ -226,7 +228,9 @@ erl_drv_cond_create(char *name) (sizeof(ErlDrvCond) + (name ? sys_strlen(name) + 1 : 0))); if (dcnd) { - if (ethr_cond_init(&dcnd->cnd) != 0) { + ethr_cond_opt opt = ETHR_COND_OPT_DEFAULT_INITER; + opt.posix_compliant = 1; + if (ethr_cond_init_opt(&dcnd->cnd, &opt) != 0) { erts_free(ERTS_ALC_T_DRV_CND, (void *) dcnd); dcnd = NULL; } diff --git a/erts/emulator/beam/erl_smp.h b/erts/emulator/beam/erl_smp.h index a89ddfbcc1..63179dfad4 100644 --- a/erts/emulator/beam/erl_smp.h +++ b/erts/emulator/beam/erl_smp.h @@ -822,6 +822,16 @@ erts_smp_cnd_wait(erts_smp_cnd_t *cnd, erts_smp_mtx_t *mtx) #endif } +/* + * IMPORTANT note about erts_smp_cnd_signal() and erts_smp_cnd_broadcast() + * + * POSIX allow a call to `pthread_cond_signal' or `pthread_cond_broadcast' + * even though the associated mutex/mutexes isn't/aren't locked by the + * caller. Our implementation do not allow that in order to avoid a + * performance penalty. That is, all associated mutexes *need* to be + * locked by the caller of erts_smp_cnd_signal()/erts_smp_cnd_broadcast()! + */ + ERTS_GLB_INLINE void erts_smp_cnd_signal(erts_smp_cnd_t *cnd) { diff --git a/erts/emulator/beam/erl_threads.h b/erts/emulator/beam/erl_threads.h index b4b6d0dfd5..065e7077c0 100644 --- a/erts/emulator/beam/erl_threads.h +++ b/erts/emulator/beam/erl_threads.h @@ -92,6 +92,8 @@ typedef struct { #endif } erts_rwmtx_t; +#define ERTS_MTX_OPT_DEFAULT_INITER ETHR_MUTEX_OPT_DEFAULT_INITER +#define ERTS_CND_OPT_DEFAULT_INITER ETHR_COND_OPT_DEFAULT_INITER #define ERTS_RWMTX_OPT_DEFAULT_INITER ETHR_RWMUTEX_OPT_DEFAULT_INITER #define ERTS_RWMTX_TYPE_NORMAL ETHR_RWMUTEX_TYPE_NORMAL #define ERTS_RWMTX_TYPE_FREQUENT_READ ETHR_RWMUTEX_TYPE_FREQUENT_READ @@ -1130,6 +1132,16 @@ erts_cnd_wait(erts_cnd_t *cnd, erts_mtx_t *mtx) #endif } +/* + * IMPORTANT note about erts_cnd_signal() and erts_cnd_broadcast() + * + * POSIX allow a call to `pthread_cond_signal' or `pthread_cond_broadcast' + * even though the associated mutex/mutexes isn't/aren't locked by the + * caller. Our implementation do not allow that in order to avoid a + * performance penalty. That is, all associated mutexes *need* to be + * locked by the caller of erts_cnd_signal()/erts_cnd_broadcast()! + */ + ERTS_GLB_INLINE void erts_cnd_signal(erts_cnd_t *cnd) { diff --git a/erts/emulator/sys/win32/sys.c b/erts/emulator/sys/win32/sys.c index 02d16b83a2..6f33ef7ad6 100644 --- a/erts/emulator/sys/win32/sys.c +++ b/erts/emulator/sys/win32/sys.c @@ -216,6 +216,9 @@ void sys_tty_reset(int exit_code) void erl_sys_args(int* argc, char** argv) { char *event_name; + + erts_sys_env_init(); + nohup = get_and_remove_option(argc, argv, "-nohup"); #ifdef DEBUG @@ -3214,7 +3217,6 @@ erts_sys_pre_init(void) } #endif erts_smp_atomic_init_nob(&sys_misc_mem_sz, 0); - erts_sys_env_init(); } void noinherit_std_handle(DWORD type) diff --git a/erts/include/internal/ethr_mutex.h b/erts/include/internal/ethr_mutex.h index a0685ea3c0..86a1e9fbdf 100644 --- a/erts/include/internal/ethr_mutex.h +++ b/erts/include/internal/ethr_mutex.h @@ -22,6 +22,23 @@ * Author: Rickard Green */ +/* + * IMPORTANT note about ethr_cond_signal() and ethr_cond_broadcast() + * + * POSIX allow a call to `pthread_cond_signal' or `pthread_cond_broadcast' + * even though the associated mutex/mutexes isn't/aren't locked by the + * caller. We do not allow that by default in order to avoid a performance + * penalty on some platforms. + * + * Mutexes and condition variables can, however, be initialized as POSIX + * compliant. When initialized as such ethr_cond_signal(), and + * ethr_cond_broadcast() are allowed to be called even though the associated + * mutexes aren't locked. This will, however, incur a performance penalty on + * some platforms. + * + * POSIX compliant mutexes and condition variables *need* to be used together. + */ + #ifndef ETHR_MUTEX_H__ #define ETHR_MUTEX_H__ @@ -40,6 +57,14 @@ #endif #endif +/* #define ETHR_DBG_WIN_MTX_WITH_PTHREADS */ +#ifdef ETHR_DBG_WIN_MTX_WITH_PTHREADS +typedef pthread_mutex_t CRITICAL_SECTION; +int TryEnterCriticalSection(CRITICAL_SECTION *); +void EnterCriticalSection(CRITICAL_SECTION *); +void LeaveCriticalSection(CRITICAL_SECTION *); +#endif + #ifdef ETHR_MTX_HARD_DEBUG # ifdef __GNUC__ # warning ETHR_MTX_HARD_DEBUG @@ -140,13 +165,19 @@ struct ethr_mutex_base_ { typedef struct { int main_spincount; int aux_spincount; + int posix_compliant; } ethr_mutex_opt; +#define ETHR_MUTEX_OPT_DEFAULT_INITER {-1, -1, 0} + typedef struct { int main_spincount; int aux_spincount; + int posix_compliant; } ethr_cond_opt; +#define ETHR_COND_OPT_DEFAULT_INITER {-1, -1, 0} + #ifdef ETHR_USE_OWN_MTX_IMPL__ typedef struct ethr_mutex_ ethr_mutex; @@ -179,7 +210,7 @@ struct ethr_cond_ { #endif }; -#else /* pthread */ +#elif defined(ETHR_PTHREADS) && !defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) typedef struct ethr_mutex_ ethr_mutex; struct ethr_mutex_ { @@ -197,7 +228,36 @@ struct ethr_cond_ { #endif }; -#endif /* pthread */ +#elif defined(ETHR_WIN32_THREADS) || defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) +# define ETHR_WIN_MUTEX__ + +typedef struct ethr_mutex_ ethr_mutex; +struct ethr_mutex_ { + int posix_compliant; + CRITICAL_SECTION cs; + ethr_ts_event *wakeups; + ethr_atomic32_t have_wakeups; /* only when posix compliant */ + ethr_atomic32_t locked; /* only when posix compliant */ + ethr_spinlock_t lock; /* only when posix compliant */ +#if ETHR_XCHK + int initialized; +#endif +}; + +typedef struct ethr_cond_ ethr_cond; +struct ethr_cond_ { + int posix_compliant; + CRITICAL_SECTION cs; + ethr_ts_event *waiters; + int spincount; +#if ETHR_XCHK + int initialized; +#endif +}; + +#else +# error "no mutex implementation" +#endif int ethr_mutex_init_opt(ethr_mutex *, ethr_mutex_opt *); int ethr_mutex_init(ethr_mutex *); @@ -573,7 +633,7 @@ ETHR_INLINE_MTX_FUNC_NAME_(ethr_mutex_unlock)(ethr_mutex *mtx) #endif /* ETHR_TRY_INLINE_FUNCS */ -#else /* pthread_mutex */ +#elif defined(ETHR_PTHREADS) && !defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) #if defined(ETHR_TRY_INLINE_FUNCS) || defined(ETHR_MUTEX_IMPL__) @@ -605,7 +665,54 @@ ETHR_INLINE_MTX_FUNC_NAME_(ethr_mutex_unlock)(ethr_mutex *mtx) #endif /* ETHR_TRY_INLINE_FUNCS */ -#endif /* pthread_mutex */ +#elif defined(ETHR_WIN32_THREADS) || defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) + +#if defined(ETHR_TRY_INLINE_FUNCS) || defined(ETHR_MUTEX_IMPL__) + +static ETHR_INLINE int +ETHR_INLINE_MTX_FUNC_NAME_(ethr_mutex_trylock)(ethr_mutex *mtx) +{ + if (!TryEnterCriticalSection(&mtx->cs)) + return EBUSY; + if (mtx->posix_compliant) + ethr_atomic32_set(&mtx->locked, 1); + return 0; +} + +static ETHR_INLINE void +ETHR_INLINE_MTX_FUNC_NAME_(ethr_mutex_lock)(ethr_mutex *mtx) +{ + EnterCriticalSection(&mtx->cs); + if (mtx->posix_compliant) + ethr_atomic32_set(&mtx->locked, 1); +} + +void ethr_mutex_cond_wakeup__(ethr_mutex *mtx); + +static ETHR_INLINE void +ETHR_INLINE_MTX_FUNC_NAME_(ethr_mutex_unlock)(ethr_mutex *mtx) +{ + if (mtx->posix_compliant) { + ethr_atomic32_set_mb(&mtx->locked, 0); + if (ethr_atomic32_read_acqb(&mtx->have_wakeups)) + goto cond_wakeup; + else + goto leave_cs; + } + + if (mtx->wakeups) { + cond_wakeup: + ethr_mutex_cond_wakeup__(mtx); + } + else { + leave_cs: + LeaveCriticalSection(&mtx->cs); + } +} + +#endif /* ETHR_TRY_INLINE_FUNCS */ + +#endif #ifdef ETHR_USE_OWN_RWMTX_IMPL__ diff --git a/erts/include/internal/ethread.h b/erts/include/internal/ethread.h index 8ad0ded144..142c26c0ca 100644 --- a/erts/include/internal/ethread.h +++ b/erts/include/internal/ethread.h @@ -191,7 +191,6 @@ typedef DWORD ethr_tsd_key; #undef ETHR_HAVE_ETHR_SIG_FUNCS #define ETHR_USE_OWN_RWMTX_IMPL__ -#define ETHR_USE_OWN_MTX_IMPL__ #define ETHR_YIELD() (Sleep(0), 0) diff --git a/erts/lib_src/common/ethr_mutex.c b/erts/lib_src/common/ethr_mutex.c index 81fd6af80a..5688ac8b9a 100644 --- a/erts/lib_src/common/ethr_mutex.c +++ b/erts/lib_src/common/ethr_mutex.c @@ -223,9 +223,59 @@ rwmutex_try_complete_runlock(ethr_rwmutex *rwmtx, int try_write_lock); #endif +/* -- Utilities used by multiple implementations -- */ + +#if defined(ETHR_USE_OWN_RWMTX_IMPL__) || defined(ETHR_USE_OWN_MTX_IMPL__) \ + || defined(ETHR_WIN32_THREADS) + +static ETHR_INLINE void +enqueue(ethr_ts_event **queue, + ethr_ts_event *tse_start, + ethr_ts_event *tse_end) +{ + if (!*queue) { + *queue = tse_start; + tse_start->prev = tse_end; + tse_end->next = tse_start; + } + else { + tse_end->next = *queue; + tse_start->prev = (*queue)->prev; + (*queue)->prev->next = tse_start; + (*queue)->prev = tse_end; + } +} + + +static ETHR_INLINE void +dequeue(ethr_ts_event **queue, + ethr_ts_event *tse_start, + ethr_ts_event *tse_end) +{ + if (tse_start->prev == tse_end) { + ETHR_ASSERT(*queue == tse_start && tse_end->next == tse_start); + *queue = NULL; + } + else { + if (*queue == tse_start) + *queue = tse_end->next; + tse_end->next->prev = tse_start->prev; + tse_start->prev->next = tse_end->next; + } +} + +#endif + #if defined(ETHR_USE_OWN_RWMTX_IMPL__) || defined(ETHR_USE_OWN_MTX_IMPL__) -/* -- Utilities operating both on ordinary mutexes and read write mutexes -- */ +static ETHR_INLINE void +insert(ethr_ts_event *tse_pred, ethr_ts_event *tse) +{ + tse->next = tse_pred->next; + tse->prev = tse_pred; + tse_pred->next->prev = tse; + tse_pred->next = tse; +} static ETHR_INLINE void rwmutex_freqread_wtng_rdrs_inc(ethr_rwmutex *rwmtx, ethr_ts_event *tse) @@ -355,51 +405,6 @@ rwmutex_freqread_rdrs_read(ethr_rwmutex *rwmtx, int ix) return res; } - -static ETHR_INLINE void -enqueue(ethr_ts_event **queue, - ethr_ts_event *tse_start, - ethr_ts_event *tse_end) -{ - if (!*queue) { - *queue = tse_start; - tse_start->prev = tse_end; - tse_end->next = tse_start; - } - else { - tse_end->next = *queue; - tse_start->prev = (*queue)->prev; - (*queue)->prev->next = tse_start; - (*queue)->prev = tse_end; - } -} - -static ETHR_INLINE void -insert(ethr_ts_event *tse_pred, ethr_ts_event *tse) -{ - tse->next = tse_pred->next; - tse->prev = tse_pred; - tse_pred->next->prev = tse; - tse_pred->next = tse; -} - -static ETHR_INLINE void -dequeue(ethr_ts_event **queue, - ethr_ts_event *tse_start, - ethr_ts_event *tse_end) -{ - if (tse_start->prev == tse_end) { - ETHR_ASSERT(*queue == tse_start && tse_end->next == tse_start); - *queue = NULL; - } - else { - if (*queue == tse_start) - *queue = tse_end->next; - tse_end->next->prev = tse_start->prev; - tse_start->prev->next = tse_end->next; - } -} - static void event_wait(struct ethr_mutex_base_ *mtxb, ethr_ts_event *tse, @@ -1244,7 +1249,7 @@ ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) return 0; } -#else +#elif defined(ETHR_PTHREADS) && !defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) /* -- pthread mutex and condition variables -------------------------------- */ int @@ -1261,6 +1266,12 @@ ethr_mutex_init(ethr_mutex *mtx) } int +ethr_mutex_init_opt(ethr_mutex *mtx, ethr_mutex_opt *opt) +{ + return ethr_mutex_init(mtx); +} + +int ethr_mutex_destroy(ethr_mutex *mtx) { #if ETHR_XCHK @@ -1293,6 +1304,12 @@ ethr_cond_init(ethr_cond *cnd) } int +ethr_cond_init_opt(ethr_cond *cnd, ethr_cond_opt *opt) +{ + return ethr_cond_init(cnd); +} + +int ethr_cond_destroy(ethr_cond *cnd) { #if ETHR_XCHK @@ -1354,7 +1371,388 @@ ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) return res; } -#endif /* pthread_mutex */ +#elif defined(ETHR_WIN32_THREADS) || defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) + +/* + * As of Vista/Server, 2008 Windows has condition variables that can be + * used with critical sections. However, we need to be able to run on + * older Windows versions too, so we need to implement condition variables + * ourselves. + */ + +#ifdef ETHR_DBG_WIN_MTX_WITH_PTHREADS +/* + * For debugging of this implementation on POSIX platforms... + */ + +#define ethr_win_get_errno__() EINVAL +#if defined(__GNUC__) +#define __forceinline __inline__ +#else +#define __forceinline +#endif + +static int +InitializeCriticalSectionAndSpinCount(CRITICAL_SECTION *cs, int sc) +{ + return 0 == pthread_mutex_init((pthread_mutex_t *) cs, NULL); +} + +static void DeleteCriticalSection(CRITICAL_SECTION *cs) +{ + int res = pthread_mutex_destroy((pthread_mutex_t *) cs); + if (res != 0) + ETHR_FATAL_ERROR__(res); +} + +int TryEnterCriticalSection(CRITICAL_SECTION *cs) +{ + int res; + res = pthread_mutex_trylock((pthread_mutex_t *) cs); + if (res != 0 && res != EBUSY) + ETHR_FATAL_ERROR__(res); + return res == 0; +} + +void EnterCriticalSection(CRITICAL_SECTION *cs) +{ + int res = pthread_mutex_lock((pthread_mutex_t *) cs); + if (res != 0) + ETHR_FATAL_ERROR__(res); +} + +void LeaveCriticalSection(CRITICAL_SECTION *cs) +{ + int res = pthread_mutex_unlock((pthread_mutex_t *) cs); + if (res != 0) + ETHR_FATAL_ERROR__(res); +} + +#endif + +#define ETHR_CND_WAIT__ ((ethr_sint32_t) 0x11dead11) +#define ETHR_CND_WAKEUP__ ((ethr_sint32_t) 0x11beef11) + +static __forceinline void +cond_wakeup(ethr_ts_event *tse) +{ + ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs) == ETHR_CND_WAIT__); + + ethr_atomic32_set_relb(&tse->uaflgs, ETHR_CND_WAKEUP__); + ethr_event_set(&tse->event); +} + +void +ethr_mutex_cond_wakeup__(ethr_mutex *mtx) +{ + /* + * Called by ethr_mutex_unlock() when we have + * cond signal/broadcast wakeups waiting to + * be completed. + */ + ethr_ts_event *tse; + + if (!mtx->posix_compliant) { + tse = mtx->wakeups; + dequeue(&mtx->wakeups, tse, tse); + } + else { + ethr_spin_lock(&mtx->lock); + tse = mtx->wakeups; + if (tse) + dequeue(&mtx->wakeups, tse, tse); + if (!mtx->wakeups) + ethr_atomic32_set_relb(&mtx->have_wakeups, 0); + ethr_spin_unlock(&mtx->lock); + } + + LeaveCriticalSection(&mtx->cs); + + ETHR_ASSERT(tse || mtx->posix_compliant); + + /* + * We delay actual condition variable wakeup until + * this point when we have left the critical section. + * This in order to avoid that the other thread is + * woken and then right away have to go to sleep + * waiting for the critical section that we are in. + * + * We also only wake one thread at a time even if + * there are multiple threads waiting to be woken. + * Otherwise all but one will be woken and then right + * away have to go to sleep on the critical section. + * Since each wakeup is guaranteed to generate at + * least one lock/unlock sequence on this mutex, all + * threads will eventually be woken. + */ + + if (tse) + cond_wakeup(tse); +} + +int +ethr_mutex_init_opt(ethr_mutex *mtx, ethr_mutex_opt *opt) +{ + int spincount; +#if ETHR_XCHK + if (!mtx) { + ETHR_ASSERT(0); + return EINVAL; + } + mtx->initialized = ETHR_MUTEX_INITIALIZED; +#endif + + spincount = opt ? opt->aux_spincount : 0; + if (spincount < 0) + spincount = 0; + + if (!InitializeCriticalSectionAndSpinCount(&mtx->cs, spincount)) { +#if ETHR_XCHK + mtx->initialized = 0; +#endif + return ethr_win_get_errno__(); + } + + mtx->posix_compliant = opt ? opt->posix_compliant : 0; + mtx->wakeups = NULL; + if (mtx->posix_compliant) { + ethr_atomic32_init(&mtx->locked, 0); + ethr_atomic32_init(&mtx->have_wakeups, 0); + ethr_spinlock_init(&mtx->lock); + } + return 0; +} + +int +ethr_mutex_init(ethr_mutex *mtx) +{ + return ethr_mutex_init_opt(mtx, NULL); +} + +int +ethr_mutex_destroy(ethr_mutex *mtx) +{ + DeleteCriticalSection(&mtx->cs); + if (mtx->posix_compliant) + return ethr_spinlock_destroy(&mtx->lock); + else + return 0; +} + +int +ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) +{ + void *udata; + ethr_ts_event *tse = ethr_get_ts_event(); + int spincount; + + udata = tse->udata; + tse->udata = (void *) mtx; + ethr_atomic32_set_relb(&tse->uaflgs, ETHR_CND_WAIT__); + + EnterCriticalSection(&cnd->cs); + enqueue(&cnd->waiters, tse, tse); + LeaveCriticalSection(&cnd->cs); + + ethr_mutex_unlock(mtx); + + spincount = cnd->spincount; + + while (ethr_atomic32_read_acqb(&tse->uaflgs) != ETHR_CND_WAKEUP__) { + ethr_event_reset(&tse->event); + if (ethr_atomic32_read_acqb(&tse->uaflgs) == ETHR_CND_WAKEUP__) + break; + ethr_event_swait(&tse->event, spincount); + spincount = 0; + } + + tse->udata = udata; + ethr_leave_ts_event(tse); + + ethr_mutex_lock(mtx); + + return 0; +} + +static __forceinline void +posix_compliant_mtx_enqueue(ethr_mutex *mtx, + ethr_ts_event *tse_start, + ethr_ts_event *tse_end) +{ + ethr_ts_event *tse_wakeup = NULL; /* Avoid erroneous compiler warning... */ + /* + * The associated mutex might not be locked, so we need to + * check if it is. If locked, enqueue for wakeup at unlock; + * otherwise, wakeup the first one now and enqueue the rest. + */ + if (tse_start == tse_end && !ethr_atomic32_read(&mtx->locked)) { + tse_wakeup = tse_start; + wakeup: + cond_wakeup(tse_wakeup); + } + else { + int need_wakeup; + ethr_spin_lock(&mtx->lock); + if (!mtx->wakeups) + ethr_atomic32_set_mb(&mtx->have_wakeups, 1); + need_wakeup = !ethr_atomic32_read(&mtx->locked); + if (need_wakeup) { + if (tse_start == tse_end) { + if (!mtx->wakeups) + ethr_atomic32_set_relb(&mtx->have_wakeups, 0); + ethr_spin_unlock(&mtx->lock); + tse_wakeup = tse_start; + goto wakeup; + } + tse_wakeup = tse_start; + tse_start = tse_start->next; + } + enqueue(&mtx->wakeups, tse_start, tse_end); + ethr_spin_unlock(&mtx->lock); + if (need_wakeup) + goto wakeup; + } +} + +static __forceinline void +enqueue_cond_wakeups(ethr_ts_event *queue, int posix_compliant) +{ + if (queue) { + int more; + ethr_ts_event *q = queue; + + /* + * Waiters may be using different mutexes... + */ + + do { + ethr_mutex *mtx; + ethr_ts_event *tse, *tse_start, *tse_end; + + more = 0; + tse_start = q; + mtx = (ethr_mutex *) tse_start->udata; + + ETHR_ASSERT(posix_compliant + ? mtx->posix_compliant + : !mtx->posix_compliant); + + ETHR_ASSERT(ethr_atomic32_read(&tse_start->uaflgs) + == ETHR_CND_WAIT__); + ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED); + + tse_end = tse_start->prev; + + for (tse = tse_start->next; tse != tse_start; tse = tse->next) { + + ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs) + == ETHR_CND_WAIT__); + + if (mtx != (ethr_mutex *) tse->udata) { + tse_end = tse->prev; + dequeue(&q, tse_start, tse_end); + more = 1; + break; + } + } + + if (posix_compliant) + posix_compliant_mtx_enqueue(mtx, tse_start, tse_end); + else + enqueue(&mtx->wakeups, tse_start, tse_end); + + } while (more); + } +} + +void +ethr_cond_broadcast(ethr_cond *cnd) +{ + ethr_ts_event *waiters; + + EnterCriticalSection(&cnd->cs); + waiters = cnd->waiters; + cnd->waiters = NULL; + LeaveCriticalSection(&cnd->cs); + + if (cnd->posix_compliant) + enqueue_cond_wakeups(waiters, 1); + else + enqueue_cond_wakeups(waiters, 0); +} + +void +ethr_cond_signal(ethr_cond *cnd) +{ + ethr_mutex *mtx; + ethr_ts_event *tse; + + EnterCriticalSection(&cnd->cs); + tse = cnd->waiters; + if (tse) + dequeue(&cnd->waiters, tse, tse); + LeaveCriticalSection(&cnd->cs); + + if (tse) { + mtx = (ethr_mutex *) tse->udata; + + ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs) == ETHR_CND_WAIT__); + ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED); + ETHR_ASSERT(cnd->posix_compliant + ? mtx->posix_compliant + : !mtx->posix_compliant); + + if (cnd->posix_compliant) + posix_compliant_mtx_enqueue(mtx, tse, tse); + else + enqueue(&mtx->wakeups, tse, tse); + } +} + +int +ethr_cond_init_opt(ethr_cond *cnd, ethr_cond_opt *opt) +{ + int spincount; + +#if ETHR_XCHK + if (!cnd) { + ETHR_ASSERT(0); + return EINVAL; + } + cnd->initialized = ETHR_COND_INITIALIZED; +#endif + + spincount = opt ? opt->aux_spincount : 0; + if (spincount < 0) + spincount = 0; + + if (!InitializeCriticalSectionAndSpinCount(&cnd->cs, spincount)) { +#if ETHR_XCHK + cnd->initialized = 0; +#endif + return ethr_win_get_errno__(); + } + + cnd->posix_compliant = opt ? opt->posix_compliant : 0; + cnd->waiters = NULL; + cnd->spincount = spincount; + return 0; +} + +int +ethr_cond_init(ethr_cond *cnd) +{ + return ethr_cond_init_opt(cnd, NULL); +} + +int +ethr_cond_destroy(ethr_cond *cnd) +{ + DeleteCriticalSection(&cnd->cs); + return 0; +} + +#endif /* -- Exported symbols of inline functions --------------------------------- */ |