aboutsummaryrefslogtreecommitdiffstats
path: root/erts/lib_src
diff options
context:
space:
mode:
authorRickard Green <rickard@erlang.org>2011-10-30 00:23:16 +0200
committerRickard Green <rickard@erlang.org>2011-11-13 20:40:59 +0100
commit15774d2ac5ba38dba287309f91eb7e4f58b9a636 (patch)
treeff9f7f07f0d5c2b807c1614fb2f44b31f505d672 /erts/lib_src
parentdcc7ecbf6af5420af2d5dbd0e97fc7a2e0e894a6 (diff)
downloadotp-15774d2ac5ba38dba287309f91eb7e4f58b9a636.tar.gz
otp-15774d2ac5ba38dba287309f91eb7e4f58b9a636.tar.bz2
otp-15774d2ac5ba38dba287309f91eb7e4f58b9a636.zip
Use critical sections as mutex implementation on Windows
Windows native critical sections are now used internally in the runtime system as mutex implementation. This since they perform better under extreme contention than our own implementation.
Diffstat (limited to 'erts/lib_src')
-rw-r--r--erts/lib_src/common/ethr_mutex.c494
1 files changed, 446 insertions, 48 deletions
diff --git a/erts/lib_src/common/ethr_mutex.c b/erts/lib_src/common/ethr_mutex.c
index 81fd6af80a..5688ac8b9a 100644
--- a/erts/lib_src/common/ethr_mutex.c
+++ b/erts/lib_src/common/ethr_mutex.c
@@ -223,9 +223,59 @@ rwmutex_try_complete_runlock(ethr_rwmutex *rwmtx,
int try_write_lock);
#endif
+/* -- Utilities used by multiple implementations -- */
+
+#if defined(ETHR_USE_OWN_RWMTX_IMPL__) || defined(ETHR_USE_OWN_MTX_IMPL__) \
+ || defined(ETHR_WIN32_THREADS)
+
+static ETHR_INLINE void
+enqueue(ethr_ts_event **queue,
+ ethr_ts_event *tse_start,
+ ethr_ts_event *tse_end)
+{
+ if (!*queue) {
+ *queue = tse_start;
+ tse_start->prev = tse_end;
+ tse_end->next = tse_start;
+ }
+ else {
+ tse_end->next = *queue;
+ tse_start->prev = (*queue)->prev;
+ (*queue)->prev->next = tse_start;
+ (*queue)->prev = tse_end;
+ }
+}
+
+
+static ETHR_INLINE void
+dequeue(ethr_ts_event **queue,
+ ethr_ts_event *tse_start,
+ ethr_ts_event *tse_end)
+{
+ if (tse_start->prev == tse_end) {
+ ETHR_ASSERT(*queue == tse_start && tse_end->next == tse_start);
+ *queue = NULL;
+ }
+ else {
+ if (*queue == tse_start)
+ *queue = tse_end->next;
+ tse_end->next->prev = tse_start->prev;
+ tse_start->prev->next = tse_end->next;
+ }
+}
+
+#endif
+
#if defined(ETHR_USE_OWN_RWMTX_IMPL__) || defined(ETHR_USE_OWN_MTX_IMPL__)
-/* -- Utilities operating both on ordinary mutexes and read write mutexes -- */
+static ETHR_INLINE void
+insert(ethr_ts_event *tse_pred, ethr_ts_event *tse)
+{
+ tse->next = tse_pred->next;
+ tse->prev = tse_pred;
+ tse_pred->next->prev = tse;
+ tse_pred->next = tse;
+}
static ETHR_INLINE void
rwmutex_freqread_wtng_rdrs_inc(ethr_rwmutex *rwmtx, ethr_ts_event *tse)
@@ -355,51 +405,6 @@ rwmutex_freqread_rdrs_read(ethr_rwmutex *rwmtx, int ix)
return res;
}
-
-static ETHR_INLINE void
-enqueue(ethr_ts_event **queue,
- ethr_ts_event *tse_start,
- ethr_ts_event *tse_end)
-{
- if (!*queue) {
- *queue = tse_start;
- tse_start->prev = tse_end;
- tse_end->next = tse_start;
- }
- else {
- tse_end->next = *queue;
- tse_start->prev = (*queue)->prev;
- (*queue)->prev->next = tse_start;
- (*queue)->prev = tse_end;
- }
-}
-
-static ETHR_INLINE void
-insert(ethr_ts_event *tse_pred, ethr_ts_event *tse)
-{
- tse->next = tse_pred->next;
- tse->prev = tse_pred;
- tse_pred->next->prev = tse;
- tse_pred->next = tse;
-}
-
-static ETHR_INLINE void
-dequeue(ethr_ts_event **queue,
- ethr_ts_event *tse_start,
- ethr_ts_event *tse_end)
-{
- if (tse_start->prev == tse_end) {
- ETHR_ASSERT(*queue == tse_start && tse_end->next == tse_start);
- *queue = NULL;
- }
- else {
- if (*queue == tse_start)
- *queue = tse_end->next;
- tse_end->next->prev = tse_start->prev;
- tse_start->prev->next = tse_end->next;
- }
-}
-
static void
event_wait(struct ethr_mutex_base_ *mtxb,
ethr_ts_event *tse,
@@ -1244,7 +1249,7 @@ ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx)
return 0;
}
-#else
+#elif defined(ETHR_PTHREADS) && !defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS)
/* -- pthread mutex and condition variables -------------------------------- */
int
@@ -1261,6 +1266,12 @@ ethr_mutex_init(ethr_mutex *mtx)
}
int
+ethr_mutex_init_opt(ethr_mutex *mtx, ethr_mutex_opt *opt)
+{
+ return ethr_mutex_init(mtx);
+}
+
+int
ethr_mutex_destroy(ethr_mutex *mtx)
{
#if ETHR_XCHK
@@ -1293,6 +1304,12 @@ ethr_cond_init(ethr_cond *cnd)
}
int
+ethr_cond_init_opt(ethr_cond *cnd, ethr_cond_opt *opt)
+{
+ return ethr_cond_init(cnd);
+}
+
+int
ethr_cond_destroy(ethr_cond *cnd)
{
#if ETHR_XCHK
@@ -1354,7 +1371,388 @@ ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx)
return res;
}
-#endif /* pthread_mutex */
+#elif defined(ETHR_WIN32_THREADS) || defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS)
+
+/*
+ * As of Vista/Server, 2008 Windows has condition variables that can be
+ * used with critical sections. However, we need to be able to run on
+ * older Windows versions too, so we need to implement condition variables
+ * ourselves.
+ */
+
+#ifdef ETHR_DBG_WIN_MTX_WITH_PTHREADS
+/*
+ * For debugging of this implementation on POSIX platforms...
+ */
+
+#define ethr_win_get_errno__() EINVAL
+#if defined(__GNUC__)
+#define __forceinline __inline__
+#else
+#define __forceinline
+#endif
+
+static int
+InitializeCriticalSectionAndSpinCount(CRITICAL_SECTION *cs, int sc)
+{
+ return 0 == pthread_mutex_init((pthread_mutex_t *) cs, NULL);
+}
+
+static void DeleteCriticalSection(CRITICAL_SECTION *cs)
+{
+ int res = pthread_mutex_destroy((pthread_mutex_t *) cs);
+ if (res != 0)
+ ETHR_FATAL_ERROR__(res);
+}
+
+int TryEnterCriticalSection(CRITICAL_SECTION *cs)
+{
+ int res;
+ res = pthread_mutex_trylock((pthread_mutex_t *) cs);
+ if (res != 0 && res != EBUSY)
+ ETHR_FATAL_ERROR__(res);
+ return res == 0;
+}
+
+void EnterCriticalSection(CRITICAL_SECTION *cs)
+{
+ int res = pthread_mutex_lock((pthread_mutex_t *) cs);
+ if (res != 0)
+ ETHR_FATAL_ERROR__(res);
+}
+
+void LeaveCriticalSection(CRITICAL_SECTION *cs)
+{
+ int res = pthread_mutex_unlock((pthread_mutex_t *) cs);
+ if (res != 0)
+ ETHR_FATAL_ERROR__(res);
+}
+
+#endif
+
+#define ETHR_CND_WAIT__ ((ethr_sint32_t) 0x11dead11)
+#define ETHR_CND_WAKEUP__ ((ethr_sint32_t) 0x11beef11)
+
+static __forceinline void
+cond_wakeup(ethr_ts_event *tse)
+{
+ ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs) == ETHR_CND_WAIT__);
+
+ ethr_atomic32_set_relb(&tse->uaflgs, ETHR_CND_WAKEUP__);
+ ethr_event_set(&tse->event);
+}
+
+void
+ethr_mutex_cond_wakeup__(ethr_mutex *mtx)
+{
+ /*
+ * Called by ethr_mutex_unlock() when we have
+ * cond signal/broadcast wakeups waiting to
+ * be completed.
+ */
+ ethr_ts_event *tse;
+
+ if (!mtx->posix_compliant) {
+ tse = mtx->wakeups;
+ dequeue(&mtx->wakeups, tse, tse);
+ }
+ else {
+ ethr_spin_lock(&mtx->lock);
+ tse = mtx->wakeups;
+ if (tse)
+ dequeue(&mtx->wakeups, tse, tse);
+ if (!mtx->wakeups)
+ ethr_atomic32_set_relb(&mtx->have_wakeups, 0);
+ ethr_spin_unlock(&mtx->lock);
+ }
+
+ LeaveCriticalSection(&mtx->cs);
+
+ ETHR_ASSERT(tse || mtx->posix_compliant);
+
+ /*
+ * We delay actual condition variable wakeup until
+ * this point when we have left the critical section.
+ * This in order to avoid that the other thread is
+ * woken and then right away have to go to sleep
+ * waiting for the critical section that we are in.
+ *
+ * We also only wake one thread at a time even if
+ * there are multiple threads waiting to be woken.
+ * Otherwise all but one will be woken and then right
+ * away have to go to sleep on the critical section.
+ * Since each wakeup is guaranteed to generate at
+ * least one lock/unlock sequence on this mutex, all
+ * threads will eventually be woken.
+ */
+
+ if (tse)
+ cond_wakeup(tse);
+}
+
+int
+ethr_mutex_init_opt(ethr_mutex *mtx, ethr_mutex_opt *opt)
+{
+ int spincount;
+#if ETHR_XCHK
+ if (!mtx) {
+ ETHR_ASSERT(0);
+ return EINVAL;
+ }
+ mtx->initialized = ETHR_MUTEX_INITIALIZED;
+#endif
+
+ spincount = opt ? opt->aux_spincount : 0;
+ if (spincount < 0)
+ spincount = 0;
+
+ if (!InitializeCriticalSectionAndSpinCount(&mtx->cs, spincount)) {
+#if ETHR_XCHK
+ mtx->initialized = 0;
+#endif
+ return ethr_win_get_errno__();
+ }
+
+ mtx->posix_compliant = opt ? opt->posix_compliant : 0;
+ mtx->wakeups = NULL;
+ if (mtx->posix_compliant) {
+ ethr_atomic32_init(&mtx->locked, 0);
+ ethr_atomic32_init(&mtx->have_wakeups, 0);
+ ethr_spinlock_init(&mtx->lock);
+ }
+ return 0;
+}
+
+int
+ethr_mutex_init(ethr_mutex *mtx)
+{
+ return ethr_mutex_init_opt(mtx, NULL);
+}
+
+int
+ethr_mutex_destroy(ethr_mutex *mtx)
+{
+ DeleteCriticalSection(&mtx->cs);
+ if (mtx->posix_compliant)
+ return ethr_spinlock_destroy(&mtx->lock);
+ else
+ return 0;
+}
+
+int
+ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx)
+{
+ void *udata;
+ ethr_ts_event *tse = ethr_get_ts_event();
+ int spincount;
+
+ udata = tse->udata;
+ tse->udata = (void *) mtx;
+ ethr_atomic32_set_relb(&tse->uaflgs, ETHR_CND_WAIT__);
+
+ EnterCriticalSection(&cnd->cs);
+ enqueue(&cnd->waiters, tse, tse);
+ LeaveCriticalSection(&cnd->cs);
+
+ ethr_mutex_unlock(mtx);
+
+ spincount = cnd->spincount;
+
+ while (ethr_atomic32_read_acqb(&tse->uaflgs) != ETHR_CND_WAKEUP__) {
+ ethr_event_reset(&tse->event);
+ if (ethr_atomic32_read_acqb(&tse->uaflgs) == ETHR_CND_WAKEUP__)
+ break;
+ ethr_event_swait(&tse->event, spincount);
+ spincount = 0;
+ }
+
+ tse->udata = udata;
+ ethr_leave_ts_event(tse);
+
+ ethr_mutex_lock(mtx);
+
+ return 0;
+}
+
+static __forceinline void
+posix_compliant_mtx_enqueue(ethr_mutex *mtx,
+ ethr_ts_event *tse_start,
+ ethr_ts_event *tse_end)
+{
+ ethr_ts_event *tse_wakeup = NULL; /* Avoid erroneous compiler warning... */
+ /*
+ * The associated mutex might not be locked, so we need to
+ * check if it is. If locked, enqueue for wakeup at unlock;
+ * otherwise, wakeup the first one now and enqueue the rest.
+ */
+ if (tse_start == tse_end && !ethr_atomic32_read(&mtx->locked)) {
+ tse_wakeup = tse_start;
+ wakeup:
+ cond_wakeup(tse_wakeup);
+ }
+ else {
+ int need_wakeup;
+ ethr_spin_lock(&mtx->lock);
+ if (!mtx->wakeups)
+ ethr_atomic32_set_mb(&mtx->have_wakeups, 1);
+ need_wakeup = !ethr_atomic32_read(&mtx->locked);
+ if (need_wakeup) {
+ if (tse_start == tse_end) {
+ if (!mtx->wakeups)
+ ethr_atomic32_set_relb(&mtx->have_wakeups, 0);
+ ethr_spin_unlock(&mtx->lock);
+ tse_wakeup = tse_start;
+ goto wakeup;
+ }
+ tse_wakeup = tse_start;
+ tse_start = tse_start->next;
+ }
+ enqueue(&mtx->wakeups, tse_start, tse_end);
+ ethr_spin_unlock(&mtx->lock);
+ if (need_wakeup)
+ goto wakeup;
+ }
+}
+
+static __forceinline void
+enqueue_cond_wakeups(ethr_ts_event *queue, int posix_compliant)
+{
+ if (queue) {
+ int more;
+ ethr_ts_event *q = queue;
+
+ /*
+ * Waiters may be using different mutexes...
+ */
+
+ do {
+ ethr_mutex *mtx;
+ ethr_ts_event *tse, *tse_start, *tse_end;
+
+ more = 0;
+ tse_start = q;
+ mtx = (ethr_mutex *) tse_start->udata;
+
+ ETHR_ASSERT(posix_compliant
+ ? mtx->posix_compliant
+ : !mtx->posix_compliant);
+
+ ETHR_ASSERT(ethr_atomic32_read(&tse_start->uaflgs)
+ == ETHR_CND_WAIT__);
+ ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED);
+
+ tse_end = tse_start->prev;
+
+ for (tse = tse_start->next; tse != tse_start; tse = tse->next) {
+
+ ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs)
+ == ETHR_CND_WAIT__);
+
+ if (mtx != (ethr_mutex *) tse->udata) {
+ tse_end = tse->prev;
+ dequeue(&q, tse_start, tse_end);
+ more = 1;
+ break;
+ }
+ }
+
+ if (posix_compliant)
+ posix_compliant_mtx_enqueue(mtx, tse_start, tse_end);
+ else
+ enqueue(&mtx->wakeups, tse_start, tse_end);
+
+ } while (more);
+ }
+}
+
+void
+ethr_cond_broadcast(ethr_cond *cnd)
+{
+ ethr_ts_event *waiters;
+
+ EnterCriticalSection(&cnd->cs);
+ waiters = cnd->waiters;
+ cnd->waiters = NULL;
+ LeaveCriticalSection(&cnd->cs);
+
+ if (cnd->posix_compliant)
+ enqueue_cond_wakeups(waiters, 1);
+ else
+ enqueue_cond_wakeups(waiters, 0);
+}
+
+void
+ethr_cond_signal(ethr_cond *cnd)
+{
+ ethr_mutex *mtx;
+ ethr_ts_event *tse;
+
+ EnterCriticalSection(&cnd->cs);
+ tse = cnd->waiters;
+ if (tse)
+ dequeue(&cnd->waiters, tse, tse);
+ LeaveCriticalSection(&cnd->cs);
+
+ if (tse) {
+ mtx = (ethr_mutex *) tse->udata;
+
+ ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs) == ETHR_CND_WAIT__);
+ ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED);
+ ETHR_ASSERT(cnd->posix_compliant
+ ? mtx->posix_compliant
+ : !mtx->posix_compliant);
+
+ if (cnd->posix_compliant)
+ posix_compliant_mtx_enqueue(mtx, tse, tse);
+ else
+ enqueue(&mtx->wakeups, tse, tse);
+ }
+}
+
+int
+ethr_cond_init_opt(ethr_cond *cnd, ethr_cond_opt *opt)
+{
+ int spincount;
+
+#if ETHR_XCHK
+ if (!cnd) {
+ ETHR_ASSERT(0);
+ return EINVAL;
+ }
+ cnd->initialized = ETHR_COND_INITIALIZED;
+#endif
+
+ spincount = opt ? opt->aux_spincount : 0;
+ if (spincount < 0)
+ spincount = 0;
+
+ if (!InitializeCriticalSectionAndSpinCount(&cnd->cs, spincount)) {
+#if ETHR_XCHK
+ cnd->initialized = 0;
+#endif
+ return ethr_win_get_errno__();
+ }
+
+ cnd->posix_compliant = opt ? opt->posix_compliant : 0;
+ cnd->waiters = NULL;
+ cnd->spincount = spincount;
+ return 0;
+}
+
+int
+ethr_cond_init(ethr_cond *cnd)
+{
+ return ethr_cond_init_opt(cnd, NULL);
+}
+
+int
+ethr_cond_destroy(ethr_cond *cnd)
+{
+ DeleteCriticalSection(&cnd->cs);
+ return 0;
+}
+
+#endif
/* -- Exported symbols of inline functions --------------------------------- */