diff options
author | Rickard Green <rickard@erlang.org> | 2011-11-13 21:43:04 +0100 |
---|---|---|
committer | Rickard Green <rickard@erlang.org> | 2011-11-13 21:43:04 +0100 |
commit | 32ef224c9769cd9359496953075e0133358ccf86 (patch) | |
tree | 791517a9be6dbf7a9081192edbf96e69f086d2e1 /erts | |
parent | ca58731b5df58aa2a8b42c583d1ba7bb929e72b2 (diff) | |
parent | 15774d2ac5ba38dba287309f91eb7e4f58b9a636 (diff) | |
download | otp-32ef224c9769cd9359496953075e0133358ccf86.tar.gz otp-32ef224c9769cd9359496953075e0133358ccf86.tar.bz2 otp-32ef224c9769cd9359496953075e0133358ccf86.zip |
Merge branch 'rickard/win-cs-mutex/OTP-9671'
* rickard/win-cs-mutex/OTP-9671:
Use critical sections as mutex implementation on Windows
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/beam/erl_drv_thread.c | 8 | ||||
-rw-r--r-- | erts/emulator/beam/erl_smp.h | 10 | ||||
-rw-r--r-- | erts/emulator/beam/erl_threads.h | 12 | ||||
-rw-r--r-- | erts/emulator/sys/win32/sys.c | 4 | ||||
-rw-r--r-- | erts/include/internal/ethr_mutex.h | 115 | ||||
-rw-r--r-- | erts/include/internal/ethread.h | 1 | ||||
-rw-r--r-- | erts/lib_src/common/ethr_mutex.c | 494 |
7 files changed, 588 insertions, 56 deletions
diff --git a/erts/emulator/beam/erl_drv_thread.c b/erts/emulator/beam/erl_drv_thread.c index dc578f6d2a..a49a155701 100644 --- a/erts/emulator/beam/erl_drv_thread.c +++ b/erts/emulator/beam/erl_drv_thread.c @@ -158,7 +158,9 @@ erl_drv_mutex_create(char *name) (sizeof(ErlDrvMutex) + (name ? sys_strlen(name) + 1 : 0))); if (dmtx) { - if (ethr_mutex_init(&dmtx->mtx) != 0) { + ethr_mutex_opt opt = ETHR_MUTEX_OPT_DEFAULT_INITER; + opt.posix_compliant = 1; + if (ethr_mutex_init_opt(&dmtx->mtx, &opt) != 0) { erts_free(ERTS_ALC_T_DRV_MTX, (void *) dmtx); dmtx = NULL; } @@ -226,7 +228,9 @@ erl_drv_cond_create(char *name) (sizeof(ErlDrvCond) + (name ? sys_strlen(name) + 1 : 0))); if (dcnd) { - if (ethr_cond_init(&dcnd->cnd) != 0) { + ethr_cond_opt opt = ETHR_COND_OPT_DEFAULT_INITER; + opt.posix_compliant = 1; + if (ethr_cond_init_opt(&dcnd->cnd, &opt) != 0) { erts_free(ERTS_ALC_T_DRV_CND, (void *) dcnd); dcnd = NULL; } diff --git a/erts/emulator/beam/erl_smp.h b/erts/emulator/beam/erl_smp.h index a89ddfbcc1..63179dfad4 100644 --- a/erts/emulator/beam/erl_smp.h +++ b/erts/emulator/beam/erl_smp.h @@ -822,6 +822,16 @@ erts_smp_cnd_wait(erts_smp_cnd_t *cnd, erts_smp_mtx_t *mtx) #endif } +/* + * IMPORTANT note about erts_smp_cnd_signal() and erts_smp_cnd_broadcast() + * + * POSIX allow a call to `pthread_cond_signal' or `pthread_cond_broadcast' + * even though the associated mutex/mutexes isn't/aren't locked by the + * caller. Our implementation do not allow that in order to avoid a + * performance penalty. That is, all associated mutexes *need* to be + * locked by the caller of erts_smp_cnd_signal()/erts_smp_cnd_broadcast()! + */ + ERTS_GLB_INLINE void erts_smp_cnd_signal(erts_smp_cnd_t *cnd) { diff --git a/erts/emulator/beam/erl_threads.h b/erts/emulator/beam/erl_threads.h index b4b6d0dfd5..065e7077c0 100644 --- a/erts/emulator/beam/erl_threads.h +++ b/erts/emulator/beam/erl_threads.h @@ -92,6 +92,8 @@ typedef struct { #endif } erts_rwmtx_t; +#define ERTS_MTX_OPT_DEFAULT_INITER ETHR_MUTEX_OPT_DEFAULT_INITER +#define ERTS_CND_OPT_DEFAULT_INITER ETHR_COND_OPT_DEFAULT_INITER #define ERTS_RWMTX_OPT_DEFAULT_INITER ETHR_RWMUTEX_OPT_DEFAULT_INITER #define ERTS_RWMTX_TYPE_NORMAL ETHR_RWMUTEX_TYPE_NORMAL #define ERTS_RWMTX_TYPE_FREQUENT_READ ETHR_RWMUTEX_TYPE_FREQUENT_READ @@ -1130,6 +1132,16 @@ erts_cnd_wait(erts_cnd_t *cnd, erts_mtx_t *mtx) #endif } +/* + * IMPORTANT note about erts_cnd_signal() and erts_cnd_broadcast() + * + * POSIX allow a call to `pthread_cond_signal' or `pthread_cond_broadcast' + * even though the associated mutex/mutexes isn't/aren't locked by the + * caller. Our implementation do not allow that in order to avoid a + * performance penalty. That is, all associated mutexes *need* to be + * locked by the caller of erts_cnd_signal()/erts_cnd_broadcast()! + */ + ERTS_GLB_INLINE void erts_cnd_signal(erts_cnd_t *cnd) { diff --git a/erts/emulator/sys/win32/sys.c b/erts/emulator/sys/win32/sys.c index 02d16b83a2..6f33ef7ad6 100644 --- a/erts/emulator/sys/win32/sys.c +++ b/erts/emulator/sys/win32/sys.c @@ -216,6 +216,9 @@ void sys_tty_reset(int exit_code) void erl_sys_args(int* argc, char** argv) { char *event_name; + + erts_sys_env_init(); + nohup = get_and_remove_option(argc, argv, "-nohup"); #ifdef DEBUG @@ -3214,7 +3217,6 @@ erts_sys_pre_init(void) } #endif erts_smp_atomic_init_nob(&sys_misc_mem_sz, 0); - erts_sys_env_init(); } void noinherit_std_handle(DWORD type) diff --git a/erts/include/internal/ethr_mutex.h b/erts/include/internal/ethr_mutex.h index a0685ea3c0..86a1e9fbdf 100644 --- a/erts/include/internal/ethr_mutex.h +++ b/erts/include/internal/ethr_mutex.h @@ -22,6 +22,23 @@ * Author: Rickard Green */ +/* + * IMPORTANT note about ethr_cond_signal() and ethr_cond_broadcast() + * + * POSIX allow a call to `pthread_cond_signal' or `pthread_cond_broadcast' + * even though the associated mutex/mutexes isn't/aren't locked by the + * caller. We do not allow that by default in order to avoid a performance + * penalty on some platforms. + * + * Mutexes and condition variables can, however, be initialized as POSIX + * compliant. When initialized as such ethr_cond_signal(), and + * ethr_cond_broadcast() are allowed to be called even though the associated + * mutexes aren't locked. This will, however, incur a performance penalty on + * some platforms. + * + * POSIX compliant mutexes and condition variables *need* to be used together. + */ + #ifndef ETHR_MUTEX_H__ #define ETHR_MUTEX_H__ @@ -40,6 +57,14 @@ #endif #endif +/* #define ETHR_DBG_WIN_MTX_WITH_PTHREADS */ +#ifdef ETHR_DBG_WIN_MTX_WITH_PTHREADS +typedef pthread_mutex_t CRITICAL_SECTION; +int TryEnterCriticalSection(CRITICAL_SECTION *); +void EnterCriticalSection(CRITICAL_SECTION *); +void LeaveCriticalSection(CRITICAL_SECTION *); +#endif + #ifdef ETHR_MTX_HARD_DEBUG # ifdef __GNUC__ # warning ETHR_MTX_HARD_DEBUG @@ -140,13 +165,19 @@ struct ethr_mutex_base_ { typedef struct { int main_spincount; int aux_spincount; + int posix_compliant; } ethr_mutex_opt; +#define ETHR_MUTEX_OPT_DEFAULT_INITER {-1, -1, 0} + typedef struct { int main_spincount; int aux_spincount; + int posix_compliant; } ethr_cond_opt; +#define ETHR_COND_OPT_DEFAULT_INITER {-1, -1, 0} + #ifdef ETHR_USE_OWN_MTX_IMPL__ typedef struct ethr_mutex_ ethr_mutex; @@ -179,7 +210,7 @@ struct ethr_cond_ { #endif }; -#else /* pthread */ +#elif defined(ETHR_PTHREADS) && !defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) typedef struct ethr_mutex_ ethr_mutex; struct ethr_mutex_ { @@ -197,7 +228,36 @@ struct ethr_cond_ { #endif }; -#endif /* pthread */ +#elif defined(ETHR_WIN32_THREADS) || defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) +# define ETHR_WIN_MUTEX__ + +typedef struct ethr_mutex_ ethr_mutex; +struct ethr_mutex_ { + int posix_compliant; + CRITICAL_SECTION cs; + ethr_ts_event *wakeups; + ethr_atomic32_t have_wakeups; /* only when posix compliant */ + ethr_atomic32_t locked; /* only when posix compliant */ + ethr_spinlock_t lock; /* only when posix compliant */ +#if ETHR_XCHK + int initialized; +#endif +}; + +typedef struct ethr_cond_ ethr_cond; +struct ethr_cond_ { + int posix_compliant; + CRITICAL_SECTION cs; + ethr_ts_event *waiters; + int spincount; +#if ETHR_XCHK + int initialized; +#endif +}; + +#else +# error "no mutex implementation" +#endif int ethr_mutex_init_opt(ethr_mutex *, ethr_mutex_opt *); int ethr_mutex_init(ethr_mutex *); @@ -573,7 +633,7 @@ ETHR_INLINE_MTX_FUNC_NAME_(ethr_mutex_unlock)(ethr_mutex *mtx) #endif /* ETHR_TRY_INLINE_FUNCS */ -#else /* pthread_mutex */ +#elif defined(ETHR_PTHREADS) && !defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) #if defined(ETHR_TRY_INLINE_FUNCS) || defined(ETHR_MUTEX_IMPL__) @@ -605,7 +665,54 @@ ETHR_INLINE_MTX_FUNC_NAME_(ethr_mutex_unlock)(ethr_mutex *mtx) #endif /* ETHR_TRY_INLINE_FUNCS */ -#endif /* pthread_mutex */ +#elif defined(ETHR_WIN32_THREADS) || defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) + +#if defined(ETHR_TRY_INLINE_FUNCS) || defined(ETHR_MUTEX_IMPL__) + +static ETHR_INLINE int +ETHR_INLINE_MTX_FUNC_NAME_(ethr_mutex_trylock)(ethr_mutex *mtx) +{ + if (!TryEnterCriticalSection(&mtx->cs)) + return EBUSY; + if (mtx->posix_compliant) + ethr_atomic32_set(&mtx->locked, 1); + return 0; +} + +static ETHR_INLINE void +ETHR_INLINE_MTX_FUNC_NAME_(ethr_mutex_lock)(ethr_mutex *mtx) +{ + EnterCriticalSection(&mtx->cs); + if (mtx->posix_compliant) + ethr_atomic32_set(&mtx->locked, 1); +} + +void ethr_mutex_cond_wakeup__(ethr_mutex *mtx); + +static ETHR_INLINE void +ETHR_INLINE_MTX_FUNC_NAME_(ethr_mutex_unlock)(ethr_mutex *mtx) +{ + if (mtx->posix_compliant) { + ethr_atomic32_set_mb(&mtx->locked, 0); + if (ethr_atomic32_read_acqb(&mtx->have_wakeups)) + goto cond_wakeup; + else + goto leave_cs; + } + + if (mtx->wakeups) { + cond_wakeup: + ethr_mutex_cond_wakeup__(mtx); + } + else { + leave_cs: + LeaveCriticalSection(&mtx->cs); + } +} + +#endif /* ETHR_TRY_INLINE_FUNCS */ + +#endif #ifdef ETHR_USE_OWN_RWMTX_IMPL__ diff --git a/erts/include/internal/ethread.h b/erts/include/internal/ethread.h index 8ad0ded144..142c26c0ca 100644 --- a/erts/include/internal/ethread.h +++ b/erts/include/internal/ethread.h @@ -191,7 +191,6 @@ typedef DWORD ethr_tsd_key; #undef ETHR_HAVE_ETHR_SIG_FUNCS #define ETHR_USE_OWN_RWMTX_IMPL__ -#define ETHR_USE_OWN_MTX_IMPL__ #define ETHR_YIELD() (Sleep(0), 0) diff --git a/erts/lib_src/common/ethr_mutex.c b/erts/lib_src/common/ethr_mutex.c index 81fd6af80a..5688ac8b9a 100644 --- a/erts/lib_src/common/ethr_mutex.c +++ b/erts/lib_src/common/ethr_mutex.c @@ -223,9 +223,59 @@ rwmutex_try_complete_runlock(ethr_rwmutex *rwmtx, int try_write_lock); #endif +/* -- Utilities used by multiple implementations -- */ + +#if defined(ETHR_USE_OWN_RWMTX_IMPL__) || defined(ETHR_USE_OWN_MTX_IMPL__) \ + || defined(ETHR_WIN32_THREADS) + +static ETHR_INLINE void +enqueue(ethr_ts_event **queue, + ethr_ts_event *tse_start, + ethr_ts_event *tse_end) +{ + if (!*queue) { + *queue = tse_start; + tse_start->prev = tse_end; + tse_end->next = tse_start; + } + else { + tse_end->next = *queue; + tse_start->prev = (*queue)->prev; + (*queue)->prev->next = tse_start; + (*queue)->prev = tse_end; + } +} + + +static ETHR_INLINE void +dequeue(ethr_ts_event **queue, + ethr_ts_event *tse_start, + ethr_ts_event *tse_end) +{ + if (tse_start->prev == tse_end) { + ETHR_ASSERT(*queue == tse_start && tse_end->next == tse_start); + *queue = NULL; + } + else { + if (*queue == tse_start) + *queue = tse_end->next; + tse_end->next->prev = tse_start->prev; + tse_start->prev->next = tse_end->next; + } +} + +#endif + #if defined(ETHR_USE_OWN_RWMTX_IMPL__) || defined(ETHR_USE_OWN_MTX_IMPL__) -/* -- Utilities operating both on ordinary mutexes and read write mutexes -- */ +static ETHR_INLINE void +insert(ethr_ts_event *tse_pred, ethr_ts_event *tse) +{ + tse->next = tse_pred->next; + tse->prev = tse_pred; + tse_pred->next->prev = tse; + tse_pred->next = tse; +} static ETHR_INLINE void rwmutex_freqread_wtng_rdrs_inc(ethr_rwmutex *rwmtx, ethr_ts_event *tse) @@ -355,51 +405,6 @@ rwmutex_freqread_rdrs_read(ethr_rwmutex *rwmtx, int ix) return res; } - -static ETHR_INLINE void -enqueue(ethr_ts_event **queue, - ethr_ts_event *tse_start, - ethr_ts_event *tse_end) -{ - if (!*queue) { - *queue = tse_start; - tse_start->prev = tse_end; - tse_end->next = tse_start; - } - else { - tse_end->next = *queue; - tse_start->prev = (*queue)->prev; - (*queue)->prev->next = tse_start; - (*queue)->prev = tse_end; - } -} - -static ETHR_INLINE void -insert(ethr_ts_event *tse_pred, ethr_ts_event *tse) -{ - tse->next = tse_pred->next; - tse->prev = tse_pred; - tse_pred->next->prev = tse; - tse_pred->next = tse; -} - -static ETHR_INLINE void -dequeue(ethr_ts_event **queue, - ethr_ts_event *tse_start, - ethr_ts_event *tse_end) -{ - if (tse_start->prev == tse_end) { - ETHR_ASSERT(*queue == tse_start && tse_end->next == tse_start); - *queue = NULL; - } - else { - if (*queue == tse_start) - *queue = tse_end->next; - tse_end->next->prev = tse_start->prev; - tse_start->prev->next = tse_end->next; - } -} - static void event_wait(struct ethr_mutex_base_ *mtxb, ethr_ts_event *tse, @@ -1244,7 +1249,7 @@ ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) return 0; } -#else +#elif defined(ETHR_PTHREADS) && !defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) /* -- pthread mutex and condition variables -------------------------------- */ int @@ -1261,6 +1266,12 @@ ethr_mutex_init(ethr_mutex *mtx) } int +ethr_mutex_init_opt(ethr_mutex *mtx, ethr_mutex_opt *opt) +{ + return ethr_mutex_init(mtx); +} + +int ethr_mutex_destroy(ethr_mutex *mtx) { #if ETHR_XCHK @@ -1293,6 +1304,12 @@ ethr_cond_init(ethr_cond *cnd) } int +ethr_cond_init_opt(ethr_cond *cnd, ethr_cond_opt *opt) +{ + return ethr_cond_init(cnd); +} + +int ethr_cond_destroy(ethr_cond *cnd) { #if ETHR_XCHK @@ -1354,7 +1371,388 @@ ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) return res; } -#endif /* pthread_mutex */ +#elif defined(ETHR_WIN32_THREADS) || defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) + +/* + * As of Vista/Server, 2008 Windows has condition variables that can be + * used with critical sections. However, we need to be able to run on + * older Windows versions too, so we need to implement condition variables + * ourselves. + */ + +#ifdef ETHR_DBG_WIN_MTX_WITH_PTHREADS +/* + * For debugging of this implementation on POSIX platforms... + */ + +#define ethr_win_get_errno__() EINVAL +#if defined(__GNUC__) +#define __forceinline __inline__ +#else +#define __forceinline +#endif + +static int +InitializeCriticalSectionAndSpinCount(CRITICAL_SECTION *cs, int sc) +{ + return 0 == pthread_mutex_init((pthread_mutex_t *) cs, NULL); +} + +static void DeleteCriticalSection(CRITICAL_SECTION *cs) +{ + int res = pthread_mutex_destroy((pthread_mutex_t *) cs); + if (res != 0) + ETHR_FATAL_ERROR__(res); +} + +int TryEnterCriticalSection(CRITICAL_SECTION *cs) +{ + int res; + res = pthread_mutex_trylock((pthread_mutex_t *) cs); + if (res != 0 && res != EBUSY) + ETHR_FATAL_ERROR__(res); + return res == 0; +} + +void EnterCriticalSection(CRITICAL_SECTION *cs) +{ + int res = pthread_mutex_lock((pthread_mutex_t *) cs); + if (res != 0) + ETHR_FATAL_ERROR__(res); +} + +void LeaveCriticalSection(CRITICAL_SECTION *cs) +{ + int res = pthread_mutex_unlock((pthread_mutex_t *) cs); + if (res != 0) + ETHR_FATAL_ERROR__(res); +} + +#endif + +#define ETHR_CND_WAIT__ ((ethr_sint32_t) 0x11dead11) +#define ETHR_CND_WAKEUP__ ((ethr_sint32_t) 0x11beef11) + +static __forceinline void +cond_wakeup(ethr_ts_event *tse) +{ + ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs) == ETHR_CND_WAIT__); + + ethr_atomic32_set_relb(&tse->uaflgs, ETHR_CND_WAKEUP__); + ethr_event_set(&tse->event); +} + +void +ethr_mutex_cond_wakeup__(ethr_mutex *mtx) +{ + /* + * Called by ethr_mutex_unlock() when we have + * cond signal/broadcast wakeups waiting to + * be completed. + */ + ethr_ts_event *tse; + + if (!mtx->posix_compliant) { + tse = mtx->wakeups; + dequeue(&mtx->wakeups, tse, tse); + } + else { + ethr_spin_lock(&mtx->lock); + tse = mtx->wakeups; + if (tse) + dequeue(&mtx->wakeups, tse, tse); + if (!mtx->wakeups) + ethr_atomic32_set_relb(&mtx->have_wakeups, 0); + ethr_spin_unlock(&mtx->lock); + } + + LeaveCriticalSection(&mtx->cs); + + ETHR_ASSERT(tse || mtx->posix_compliant); + + /* + * We delay actual condition variable wakeup until + * this point when we have left the critical section. + * This in order to avoid that the other thread is + * woken and then right away have to go to sleep + * waiting for the critical section that we are in. + * + * We also only wake one thread at a time even if + * there are multiple threads waiting to be woken. + * Otherwise all but one will be woken and then right + * away have to go to sleep on the critical section. + * Since each wakeup is guaranteed to generate at + * least one lock/unlock sequence on this mutex, all + * threads will eventually be woken. + */ + + if (tse) + cond_wakeup(tse); +} + +int +ethr_mutex_init_opt(ethr_mutex *mtx, ethr_mutex_opt *opt) +{ + int spincount; +#if ETHR_XCHK + if (!mtx) { + ETHR_ASSERT(0); + return EINVAL; + } + mtx->initialized = ETHR_MUTEX_INITIALIZED; +#endif + + spincount = opt ? opt->aux_spincount : 0; + if (spincount < 0) + spincount = 0; + + if (!InitializeCriticalSectionAndSpinCount(&mtx->cs, spincount)) { +#if ETHR_XCHK + mtx->initialized = 0; +#endif + return ethr_win_get_errno__(); + } + + mtx->posix_compliant = opt ? opt->posix_compliant : 0; + mtx->wakeups = NULL; + if (mtx->posix_compliant) { + ethr_atomic32_init(&mtx->locked, 0); + ethr_atomic32_init(&mtx->have_wakeups, 0); + ethr_spinlock_init(&mtx->lock); + } + return 0; +} + +int +ethr_mutex_init(ethr_mutex *mtx) +{ + return ethr_mutex_init_opt(mtx, NULL); +} + +int +ethr_mutex_destroy(ethr_mutex *mtx) +{ + DeleteCriticalSection(&mtx->cs); + if (mtx->posix_compliant) + return ethr_spinlock_destroy(&mtx->lock); + else + return 0; +} + +int +ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) +{ + void *udata; + ethr_ts_event *tse = ethr_get_ts_event(); + int spincount; + + udata = tse->udata; + tse->udata = (void *) mtx; + ethr_atomic32_set_relb(&tse->uaflgs, ETHR_CND_WAIT__); + + EnterCriticalSection(&cnd->cs); + enqueue(&cnd->waiters, tse, tse); + LeaveCriticalSection(&cnd->cs); + + ethr_mutex_unlock(mtx); + + spincount = cnd->spincount; + + while (ethr_atomic32_read_acqb(&tse->uaflgs) != ETHR_CND_WAKEUP__) { + ethr_event_reset(&tse->event); + if (ethr_atomic32_read_acqb(&tse->uaflgs) == ETHR_CND_WAKEUP__) + break; + ethr_event_swait(&tse->event, spincount); + spincount = 0; + } + + tse->udata = udata; + ethr_leave_ts_event(tse); + + ethr_mutex_lock(mtx); + + return 0; +} + +static __forceinline void +posix_compliant_mtx_enqueue(ethr_mutex *mtx, + ethr_ts_event *tse_start, + ethr_ts_event *tse_end) +{ + ethr_ts_event *tse_wakeup = NULL; /* Avoid erroneous compiler warning... */ + /* + * The associated mutex might not be locked, so we need to + * check if it is. If locked, enqueue for wakeup at unlock; + * otherwise, wakeup the first one now and enqueue the rest. + */ + if (tse_start == tse_end && !ethr_atomic32_read(&mtx->locked)) { + tse_wakeup = tse_start; + wakeup: + cond_wakeup(tse_wakeup); + } + else { + int need_wakeup; + ethr_spin_lock(&mtx->lock); + if (!mtx->wakeups) + ethr_atomic32_set_mb(&mtx->have_wakeups, 1); + need_wakeup = !ethr_atomic32_read(&mtx->locked); + if (need_wakeup) { + if (tse_start == tse_end) { + if (!mtx->wakeups) + ethr_atomic32_set_relb(&mtx->have_wakeups, 0); + ethr_spin_unlock(&mtx->lock); + tse_wakeup = tse_start; + goto wakeup; + } + tse_wakeup = tse_start; + tse_start = tse_start->next; + } + enqueue(&mtx->wakeups, tse_start, tse_end); + ethr_spin_unlock(&mtx->lock); + if (need_wakeup) + goto wakeup; + } +} + +static __forceinline void +enqueue_cond_wakeups(ethr_ts_event *queue, int posix_compliant) +{ + if (queue) { + int more; + ethr_ts_event *q = queue; + + /* + * Waiters may be using different mutexes... + */ + + do { + ethr_mutex *mtx; + ethr_ts_event *tse, *tse_start, *tse_end; + + more = 0; + tse_start = q; + mtx = (ethr_mutex *) tse_start->udata; + + ETHR_ASSERT(posix_compliant + ? mtx->posix_compliant + : !mtx->posix_compliant); + + ETHR_ASSERT(ethr_atomic32_read(&tse_start->uaflgs) + == ETHR_CND_WAIT__); + ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED); + + tse_end = tse_start->prev; + + for (tse = tse_start->next; tse != tse_start; tse = tse->next) { + + ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs) + == ETHR_CND_WAIT__); + + if (mtx != (ethr_mutex *) tse->udata) { + tse_end = tse->prev; + dequeue(&q, tse_start, tse_end); + more = 1; + break; + } + } + + if (posix_compliant) + posix_compliant_mtx_enqueue(mtx, tse_start, tse_end); + else + enqueue(&mtx->wakeups, tse_start, tse_end); + + } while (more); + } +} + +void +ethr_cond_broadcast(ethr_cond *cnd) +{ + ethr_ts_event *waiters; + + EnterCriticalSection(&cnd->cs); + waiters = cnd->waiters; + cnd->waiters = NULL; + LeaveCriticalSection(&cnd->cs); + + if (cnd->posix_compliant) + enqueue_cond_wakeups(waiters, 1); + else + enqueue_cond_wakeups(waiters, 0); +} + +void +ethr_cond_signal(ethr_cond *cnd) +{ + ethr_mutex *mtx; + ethr_ts_event *tse; + + EnterCriticalSection(&cnd->cs); + tse = cnd->waiters; + if (tse) + dequeue(&cnd->waiters, tse, tse); + LeaveCriticalSection(&cnd->cs); + + if (tse) { + mtx = (ethr_mutex *) tse->udata; + + ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs) == ETHR_CND_WAIT__); + ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED); + ETHR_ASSERT(cnd->posix_compliant + ? mtx->posix_compliant + : !mtx->posix_compliant); + + if (cnd->posix_compliant) + posix_compliant_mtx_enqueue(mtx, tse, tse); + else + enqueue(&mtx->wakeups, tse, tse); + } +} + +int +ethr_cond_init_opt(ethr_cond *cnd, ethr_cond_opt *opt) +{ + int spincount; + +#if ETHR_XCHK + if (!cnd) { + ETHR_ASSERT(0); + return EINVAL; + } + cnd->initialized = ETHR_COND_INITIALIZED; +#endif + + spincount = opt ? opt->aux_spincount : 0; + if (spincount < 0) + spincount = 0; + + if (!InitializeCriticalSectionAndSpinCount(&cnd->cs, spincount)) { +#if ETHR_XCHK + cnd->initialized = 0; +#endif + return ethr_win_get_errno__(); + } + + cnd->posix_compliant = opt ? opt->posix_compliant : 0; + cnd->waiters = NULL; + cnd->spincount = spincount; + return 0; +} + +int +ethr_cond_init(ethr_cond *cnd) +{ + return ethr_cond_init_opt(cnd, NULL); +} + +int +ethr_cond_destroy(ethr_cond *cnd) +{ + DeleteCriticalSection(&cnd->cs); + return 0; +} + +#endif /* -- Exported symbols of inline functions --------------------------------- */ |