diff options
Diffstat (limited to 'erts/lib_src/common')
-rw-r--r-- | erts/lib_src/common/erl_misc_utils.c | 8 | ||||
-rw-r--r-- | erts/lib_src/common/erl_printf.c | 14 | ||||
-rw-r--r-- | erts/lib_src/common/erl_printf_format.c | 4 | ||||
-rw-r--r-- | erts/lib_src/common/ethr_mutex.c | 496 |
4 files changed, 464 insertions, 58 deletions
diff --git a/erts/lib_src/common/erl_misc_utils.c b/erts/lib_src/common/erl_misc_utils.c index 5dbf98c7d1..5e94ff19db 100644 --- a/erts/lib_src/common/erl_misc_utils.c +++ b/erts/lib_src/common/erl_misc_utils.c @@ -55,6 +55,12 @@ # ifdef HAVE_UNISTD_H # include <unistd.h> # endif +# if defined(_SC_NPROC_CONF) && !defined(_SC_NPROCESSORS_CONF) +# define _SC_NPROCESSORS_CONF _SC_NPROC_CONF +# endif +# if defined(_SC_NPROC_ONLN) && !defined(_SC_NPROCESSORS_ONLN) +# define _SC_NPROCESSORS_ONLN _SC_NPROC_ONLN +# endif # if (defined(NO_SYSCONF) || !defined(_SC_NPROCESSORS_CONF)) # ifdef HAVE_SYS_SYSCTL_H # include <sys/sysctl.h> @@ -1511,7 +1517,7 @@ const char* parse_topology_spec_group(erts_cpu_info_t *cpuinfo, const char* xml, } } - if (cacheLevel == 0) { + if (parentCacheLevel == 0) { *core_p = 0; *processor_p = (*processor_p)++; } else { diff --git a/erts/lib_src/common/erl_printf.c b/erts/lib_src/common/erl_printf.c index 72d18ab6f1..108a8bb531 100644 --- a/erts/lib_src/common/erl_printf.c +++ b/erts/lib_src/common/erl_printf.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 2005-2009. All Rights Reserved. + * Copyright Ericsson AB 2005-2011. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in @@ -108,7 +108,7 @@ write_f_add_cr(void *vfp, char* buf, size_t len) if (PUTC(buf[i], (FILE *) vfp) == EOF) return get_error_result(); } - return 0; + return len; } static int @@ -126,13 +126,14 @@ write_f(void *vfp, char* buf, size_t len) #endif if (FWRITE((void *) buf, sizeof(char), len, (FILE *) vfp) != len) return get_error_result(); - return 0; + return len; } static int write_fd(void *vfdp, char* buf, size_t len) { ssize_t size; + size_t res = len; ASSERT(vfdp); while (len) { @@ -149,7 +150,7 @@ write_fd(void *vfdp, char* buf, size_t len) len -= size; } - return 0; + return res; } static int @@ -160,7 +161,7 @@ write_s(void *vwbufpp, char* bufp, size_t len) ASSERT(len > 0); memcpy((void *) *wbufpp, (void *) bufp, len); *wbufpp += len; - return 0; + return len; } @@ -182,6 +183,7 @@ write_sn(void *vwsnap, char* buf, size_t len) memcpy((void *) wsnap->buf, (void *) buf, sz); wsnap->buf += sz; wsnap->len -= sz; + return sz; } return 0; } @@ -201,7 +203,7 @@ write_ds(void *vdsbufp, char* buf, size_t len) } memcpy((void *) (dsbufp->str + dsbufp->str_len), (void *) buf, len); dsbufp->str_len += len; - return 0; + return len; } int diff --git a/erts/lib_src/common/erl_printf_format.c b/erts/lib_src/common/erl_printf_format.c index fba3fd723c..473791dce4 100644 --- a/erts/lib_src/common/erl_printf_format.c +++ b/erts/lib_src/common/erl_printf_format.c @@ -388,7 +388,7 @@ static int fmt_double(fmtfn_t fn,void*arg,double val, max_size++; if (precision) max_size += precision; - else if (fmt && FMTF_alt) + else if (fmt & FMTF_alt) max_size++; break; case FMTC_E: @@ -402,7 +402,7 @@ static int fmt_double(fmtfn_t fn,void*arg,double val, max_size += 4; if (precision) max_size += precision; - else if (fmt && FMTF_alt) + else if (fmt & FMTF_alt) max_size++; aexp = exp >= 0 ? exp : -exp; if (aexp < 100) diff --git a/erts/lib_src/common/ethr_mutex.c b/erts/lib_src/common/ethr_mutex.c index 81fd6af80a..e363279f2e 100644 --- a/erts/lib_src/common/ethr_mutex.c +++ b/erts/lib_src/common/ethr_mutex.c @@ -223,9 +223,59 @@ rwmutex_try_complete_runlock(ethr_rwmutex *rwmtx, int try_write_lock); #endif +/* -- Utilities used by multiple implementations -- */ + +#if defined(ETHR_USE_OWN_RWMTX_IMPL__) || defined(ETHR_USE_OWN_MTX_IMPL__) \ + || defined(ETHR_WIN32_THREADS) + +static ETHR_INLINE void +enqueue(ethr_ts_event **queue, + ethr_ts_event *tse_start, + ethr_ts_event *tse_end) +{ + if (!*queue) { + *queue = tse_start; + tse_start->prev = tse_end; + tse_end->next = tse_start; + } + else { + tse_end->next = *queue; + tse_start->prev = (*queue)->prev; + (*queue)->prev->next = tse_start; + (*queue)->prev = tse_end; + } +} + + +static ETHR_INLINE void +dequeue(ethr_ts_event **queue, + ethr_ts_event *tse_start, + ethr_ts_event *tse_end) +{ + if (tse_start->prev == tse_end) { + ETHR_ASSERT(*queue == tse_start && tse_end->next == tse_start); + *queue = NULL; + } + else { + if (*queue == tse_start) + *queue = tse_end->next; + tse_end->next->prev = tse_start->prev; + tse_start->prev->next = tse_end->next; + } +} + +#endif + #if defined(ETHR_USE_OWN_RWMTX_IMPL__) || defined(ETHR_USE_OWN_MTX_IMPL__) -/* -- Utilities operating both on ordinary mutexes and read write mutexes -- */ +static ETHR_INLINE void +insert(ethr_ts_event *tse_pred, ethr_ts_event *tse) +{ + tse->next = tse_pred->next; + tse->prev = tse_pred; + tse_pred->next->prev = tse; + tse_pred->next = tse; +} static ETHR_INLINE void rwmutex_freqread_wtng_rdrs_inc(ethr_rwmutex *rwmtx, ethr_ts_event *tse) @@ -355,51 +405,6 @@ rwmutex_freqread_rdrs_read(ethr_rwmutex *rwmtx, int ix) return res; } - -static ETHR_INLINE void -enqueue(ethr_ts_event **queue, - ethr_ts_event *tse_start, - ethr_ts_event *tse_end) -{ - if (!*queue) { - *queue = tse_start; - tse_start->prev = tse_end; - tse_end->next = tse_start; - } - else { - tse_end->next = *queue; - tse_start->prev = (*queue)->prev; - (*queue)->prev->next = tse_start; - (*queue)->prev = tse_end; - } -} - -static ETHR_INLINE void -insert(ethr_ts_event *tse_pred, ethr_ts_event *tse) -{ - tse->next = tse_pred->next; - tse->prev = tse_pred; - tse_pred->next->prev = tse; - tse_pred->next = tse; -} - -static ETHR_INLINE void -dequeue(ethr_ts_event **queue, - ethr_ts_event *tse_start, - ethr_ts_event *tse_end) -{ - if (tse_start->prev == tse_end) { - ETHR_ASSERT(*queue == tse_start && tse_end->next == tse_start); - *queue = NULL; - } - else { - if (*queue == tse_start) - *queue = tse_end->next; - tse_end->next->prev = tse_start->prev; - tse_start->prev->next = tse_end->next; - } -} - static void event_wait(struct ethr_mutex_base_ *mtxb, ethr_ts_event *tse, @@ -1244,7 +1249,7 @@ ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) return 0; } -#else +#elif defined(ETHR_PTHREADS) && !defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) /* -- pthread mutex and condition variables -------------------------------- */ int @@ -1261,6 +1266,12 @@ ethr_mutex_init(ethr_mutex *mtx) } int +ethr_mutex_init_opt(ethr_mutex *mtx, ethr_mutex_opt *opt) +{ + return ethr_mutex_init(mtx); +} + +int ethr_mutex_destroy(ethr_mutex *mtx) { #if ETHR_XCHK @@ -1293,6 +1304,12 @@ ethr_cond_init(ethr_cond *cnd) } int +ethr_cond_init_opt(ethr_cond *cnd, ethr_cond_opt *opt) +{ + return ethr_cond_init(cnd); +} + +int ethr_cond_destroy(ethr_cond *cnd) { #if ETHR_XCHK @@ -1354,7 +1371,388 @@ ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) return res; } -#endif /* pthread_mutex */ +#elif defined(ETHR_WIN32_THREADS) || defined(ETHR_DBG_WIN_MTX_WITH_PTHREADS) + +/* + * As of Vista/Server, 2008 Windows has condition variables that can be + * used with critical sections. However, we need to be able to run on + * older Windows versions too, so we need to implement condition variables + * ourselves. + */ + +#ifdef ETHR_DBG_WIN_MTX_WITH_PTHREADS +/* + * For debugging of this implementation on POSIX platforms... + */ + +#define ethr_win_get_errno__() EINVAL +#if defined(__GNUC__) +#define __forceinline __inline__ +#else +#define __forceinline +#endif + +static int +InitializeCriticalSectionAndSpinCount(CRITICAL_SECTION *cs, int sc) +{ + return 0 == pthread_mutex_init((pthread_mutex_t *) cs, NULL); +} + +static void DeleteCriticalSection(CRITICAL_SECTION *cs) +{ + int res = pthread_mutex_destroy((pthread_mutex_t *) cs); + if (res != 0) + ETHR_FATAL_ERROR__(res); +} + +int TryEnterCriticalSection(CRITICAL_SECTION *cs) +{ + int res; + res = pthread_mutex_trylock((pthread_mutex_t *) cs); + if (res != 0 && res != EBUSY) + ETHR_FATAL_ERROR__(res); + return res == 0; +} + +void EnterCriticalSection(CRITICAL_SECTION *cs) +{ + int res = pthread_mutex_lock((pthread_mutex_t *) cs); + if (res != 0) + ETHR_FATAL_ERROR__(res); +} + +void LeaveCriticalSection(CRITICAL_SECTION *cs) +{ + int res = pthread_mutex_unlock((pthread_mutex_t *) cs); + if (res != 0) + ETHR_FATAL_ERROR__(res); +} + +#endif + +#define ETHR_CND_WAIT__ ((ethr_sint32_t) 0x11dead11) +#define ETHR_CND_WAKEUP__ ((ethr_sint32_t) 0x11beef11) + +static __forceinline void +cond_wakeup(ethr_ts_event *tse) +{ + ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs) == ETHR_CND_WAIT__); + + ethr_atomic32_set_relb(&tse->uaflgs, ETHR_CND_WAKEUP__); + ethr_event_set(&tse->event); +} + +void +ethr_mutex_cond_wakeup__(ethr_mutex *mtx) +{ + /* + * Called by ethr_mutex_unlock() when we have + * cond signal/broadcast wakeups waiting to + * be completed. + */ + ethr_ts_event *tse; + + if (!mtx->posix_compliant) { + tse = mtx->wakeups; + dequeue(&mtx->wakeups, tse, tse); + } + else { + ethr_spin_lock(&mtx->lock); + tse = mtx->wakeups; + if (tse) + dequeue(&mtx->wakeups, tse, tse); + if (!mtx->wakeups) + ethr_atomic32_set_relb(&mtx->have_wakeups, 0); + ethr_spin_unlock(&mtx->lock); + } + + LeaveCriticalSection(&mtx->cs); + + ETHR_ASSERT(tse || mtx->posix_compliant); + + /* + * We delay actual condition variable wakeup until + * this point when we have left the critical section. + * This in order to avoid that the other thread is + * woken and then right away have to go to sleep + * waiting for the critical section that we are in. + * + * We also only wake one thread at a time even if + * there are multiple threads waiting to be woken. + * Otherwise all but one will be woken and then right + * away have to go to sleep on the critical section. + * Since each wakeup is guaranteed to generate at + * least one lock/unlock sequence on this mutex, all + * threads will eventually be woken. + */ + + if (tse) + cond_wakeup(tse); +} + +int +ethr_mutex_init_opt(ethr_mutex *mtx, ethr_mutex_opt *opt) +{ + int spincount; +#if ETHR_XCHK + if (!mtx) { + ETHR_ASSERT(0); + return EINVAL; + } + mtx->initialized = ETHR_MUTEX_INITIALIZED; +#endif + + spincount = opt ? opt->aux_spincount : 0; + if (spincount < 0) + spincount = 0; + + if (!InitializeCriticalSectionAndSpinCount(&mtx->cs, spincount)) { +#if ETHR_XCHK + mtx->initialized = 0; +#endif + return ethr_win_get_errno__(); + } + + mtx->posix_compliant = opt ? opt->posix_compliant : 0; + mtx->wakeups = NULL; + if (mtx->posix_compliant) { + ethr_atomic32_init(&mtx->locked, 0); + ethr_atomic32_init(&mtx->have_wakeups, 0); + ethr_spinlock_init(&mtx->lock); + } + return 0; +} + +int +ethr_mutex_init(ethr_mutex *mtx) +{ + return ethr_mutex_init_opt(mtx, NULL); +} + +int +ethr_mutex_destroy(ethr_mutex *mtx) +{ + DeleteCriticalSection(&mtx->cs); + if (mtx->posix_compliant) + return ethr_spinlock_destroy(&mtx->lock); + else + return 0; +} + +int +ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) +{ + void *udata; + ethr_ts_event *tse = ethr_get_ts_event(); + int spincount; + + udata = tse->udata; + tse->udata = (void *) mtx; + ethr_atomic32_set_relb(&tse->uaflgs, ETHR_CND_WAIT__); + + EnterCriticalSection(&cnd->cs); + enqueue(&cnd->waiters, tse, tse); + LeaveCriticalSection(&cnd->cs); + + ethr_mutex_unlock(mtx); + + spincount = cnd->spincount; + + while (ethr_atomic32_read_acqb(&tse->uaflgs) != ETHR_CND_WAKEUP__) { + ethr_event_reset(&tse->event); + if (ethr_atomic32_read_acqb(&tse->uaflgs) == ETHR_CND_WAKEUP__) + break; + ethr_event_swait(&tse->event, spincount); + spincount = 0; + } + + tse->udata = udata; + ethr_leave_ts_event(tse); + + ethr_mutex_lock(mtx); + + return 0; +} + +static __forceinline void +posix_compliant_mtx_enqueue(ethr_mutex *mtx, + ethr_ts_event *tse_start, + ethr_ts_event *tse_end) +{ + ethr_ts_event *tse_wakeup = NULL; /* Avoid erroneous compiler warning... */ + /* + * The associated mutex might not be locked, so we need to + * check if it is. If locked, enqueue for wakeup at unlock; + * otherwise, wakeup the first one now and enqueue the rest. + */ + if (tse_start == tse_end && !ethr_atomic32_read(&mtx->locked)) { + tse_wakeup = tse_start; + wakeup: + cond_wakeup(tse_wakeup); + } + else { + int need_wakeup; + ethr_spin_lock(&mtx->lock); + if (!mtx->wakeups) + ethr_atomic32_set_mb(&mtx->have_wakeups, 1); + need_wakeup = !ethr_atomic32_read(&mtx->locked); + if (need_wakeup) { + if (tse_start == tse_end) { + if (!mtx->wakeups) + ethr_atomic32_set_relb(&mtx->have_wakeups, 0); + ethr_spin_unlock(&mtx->lock); + tse_wakeup = tse_start; + goto wakeup; + } + tse_wakeup = tse_start; + tse_start = tse_start->next; + } + enqueue(&mtx->wakeups, tse_start, tse_end); + ethr_spin_unlock(&mtx->lock); + if (need_wakeup) + goto wakeup; + } +} + +static __forceinline void +enqueue_cond_wakeups(ethr_ts_event *queue, int posix_compliant) +{ + if (queue) { + int more; + ethr_ts_event *q = queue; + + /* + * Waiters may be using different mutexes... + */ + + do { + ethr_mutex *mtx; + ethr_ts_event *tse, *tse_start, *tse_end; + + more = 0; + tse_start = q; + mtx = (ethr_mutex *) tse_start->udata; + + ETHR_ASSERT(posix_compliant + ? mtx->posix_compliant + : !mtx->posix_compliant); + + ETHR_ASSERT(ethr_atomic32_read(&tse_start->uaflgs) + == ETHR_CND_WAIT__); + ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED); + + tse_end = tse_start->prev; + + for (tse = tse_start->next; tse != tse_start; tse = tse->next) { + + ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs) + == ETHR_CND_WAIT__); + + if (mtx != (ethr_mutex *) tse->udata) { + tse_end = tse->prev; + dequeue(&q, tse_start, tse_end); + more = 1; + break; + } + } + + if (posix_compliant) + posix_compliant_mtx_enqueue(mtx, tse_start, tse_end); + else + enqueue(&mtx->wakeups, tse_start, tse_end); + + } while (more); + } +} + +void +ethr_cond_broadcast(ethr_cond *cnd) +{ + ethr_ts_event *waiters; + + EnterCriticalSection(&cnd->cs); + waiters = cnd->waiters; + cnd->waiters = NULL; + LeaveCriticalSection(&cnd->cs); + + if (cnd->posix_compliant) + enqueue_cond_wakeups(waiters, 1); + else + enqueue_cond_wakeups(waiters, 0); +} + +void +ethr_cond_signal(ethr_cond *cnd) +{ + ethr_mutex *mtx; + ethr_ts_event *tse; + + EnterCriticalSection(&cnd->cs); + tse = cnd->waiters; + if (tse) + dequeue(&cnd->waiters, tse, tse); + LeaveCriticalSection(&cnd->cs); + + if (tse) { + mtx = (ethr_mutex *) tse->udata; + + ETHR_ASSERT(ethr_atomic32_read(&tse->uaflgs) == ETHR_CND_WAIT__); + ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED); + ETHR_ASSERT(cnd->posix_compliant + ? mtx->posix_compliant + : !mtx->posix_compliant); + + if (cnd->posix_compliant) + posix_compliant_mtx_enqueue(mtx, tse, tse); + else + enqueue(&mtx->wakeups, tse, tse); + } +} + +int +ethr_cond_init_opt(ethr_cond *cnd, ethr_cond_opt *opt) +{ + int spincount; + +#if ETHR_XCHK + if (!cnd) { + ETHR_ASSERT(0); + return EINVAL; + } + cnd->initialized = ETHR_COND_INITIALIZED; +#endif + + spincount = opt ? opt->aux_spincount : 0; + if (spincount < 0) + spincount = 0; + + if (!InitializeCriticalSectionAndSpinCount(&cnd->cs, spincount)) { +#if ETHR_XCHK + cnd->initialized = 0; +#endif + return ethr_win_get_errno__(); + } + + cnd->posix_compliant = opt ? opt->posix_compliant : 0; + cnd->waiters = NULL; + cnd->spincount = spincount; + return 0; +} + +int +ethr_cond_init(ethr_cond *cnd) +{ + return ethr_cond_init_opt(cnd, NULL); +} + +int +ethr_cond_destroy(ethr_cond *cnd) +{ + DeleteCriticalSection(&cnd->cs); + return 0; +} + +#endif /* -- Exported symbols of inline functions --------------------------------- */ @@ -1969,7 +2367,7 @@ dbg_unlock_wake(ethr_rwmutex *rwmtx, exp = have_w ? ETHR_RWMTX_W_FLG__ : 0; if (rwmtx->type != ETHR_RWMUTEX_TYPE_NORMAL) - imask = ETHR_RWMTX_R_PEND_UNLCK_MASK__; + imask = ETHR_RWMTX_R_PEND_UNLCK_MASK__|ETHR_RWMTX_R_ABRT_UNLCK_FLG__; else { #ifdef ETHR_RLOCK_WITH_INC_DEC imask = ETHR_RWMTX_RS_MASK__; |