/* * %CopyrightBegin% * * Copyright Ericsson AB 2010. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved online at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * %CopyrightEnd% */ /* * Description: Mutex, rwmutex and condition variable implementation * Author: Rickard Green */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #define ETHR_INLINE_FUNC_NAME_(X) X ## __ #define ETHR_MUTEX_IMPL__ #include <limits.h> #include "ethread.h" #include "ethr_internal.h" #define ETHR_SPIN_WITH_WAITERS 1 #define ETHR_MTX_MAX_FLGS_SPIN 10 #ifdef ETHR_USE_OWN_RWMTX_IMPL__ static int default_rwmtx_main_spincount; static int default_rwmtx_aux_spincount; #endif #ifdef ETHR_USE_OWN_MTX_IMPL__ static int default_mtx_main_spincount; static int default_mtx_aux_spincount; static int default_cnd_main_spincount; static int default_cnd_aux_spincount; #endif static int no_spin; #ifndef ETHR_USE_OWN_RWMTX_IMPL__ static pthread_rwlockattr_t write_pref_attr_data; static pthread_rwlockattr_t *write_pref_attr; #endif #if defined(ETHR_MTX_Q_LOCK_SPINLOCK__) # define ETHR_MTX_QLOCK_INIT ethr_spinlock_init # define ETHR_MTX_QLOCK_DESTROY ethr_spinlock_destroy # define ETHR_MTX_Q_LOCK ethr_spin_lock # define ETHR_MTX_Q_UNLOCK ethr_spin_unlock #elif defined(ETHR_MTX_Q_LOCK_PTHREAD_MUTEX__) # define ETHR_MTX_QLOCK_INIT(QL) pthread_mutex_init((QL), NULL) # define ETHR_MTX_QLOCK_DESTROY pthread_mutex_destroy # define ETHR_MTX_Q_LOCK(L) \ do { \ int res__ = pthread_mutex_lock(L); \ if (res__ != 0) \ ETHR_FATAL_ERROR__(res__); \ } while (0) # define ETHR_MTX_Q_UNLOCK(L) \ do { \ int res__ = pthread_mutex_unlock(L); \ if (res__ != 0) \ ETHR_FATAL_ERROR__(res__); \ } while (0) #elif defined(ETHR_MTX_Q_LOCK_CRITICAL_SECTION__) # define ETHR_MTX_QLOCK_INIT(QL) (InitializeCriticalSection((QL)), 0) # define ETHR_MTX_QLOCK_DESTROY(QL) (DeleteCriticalSection((QL)), 0) # define ETHR_MTX_Q_LOCK(QL) EnterCriticalSection((QL)) # define ETHR_MTX_Q_UNLOCK(QL) LeaveCriticalSection((QL)) #endif int ethr_mutex_lib_init(int cpu_conf) { int res = 0; no_spin = cpu_conf == 1; #ifdef ETHR_USE_OWN_MTX_IMPL__ default_mtx_main_spincount = ETHR_MTX_DEFAULT_MAIN_SPINCOUNT_BASE; default_mtx_aux_spincount = ETHR_MTX_DEFAULT_AUX_SPINCOUNT; default_cnd_main_spincount = ETHR_CND_DEFAULT_MAIN_SPINCOUNT; default_cnd_aux_spincount = ETHR_CND_DEFAULT_AUX_SPINCOUNT; #endif #ifdef ETHR_USE_OWN_RWMTX_IMPL__ default_rwmtx_main_spincount = ETHR_RWMTX_DEFAULT_MAIN_SPINCOUNT_BASE; default_rwmtx_aux_spincount = ETHR_RWMTX_DEFAULT_AUX_SPINCOUNT; #else #if defined(ETHR_HAVE_PTHREAD_RWLOCKATTR_SETKIND_NP) \ && defined(ETHR_HAVE_PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) res = pthread_rwlockattr_init(&write_pref_attr_data); if (res != 0) return res; res = pthread_rwlockattr_setkind_np( &write_pref_attr_data, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); write_pref_attr = &write_pref_attr_data; #else write_pref_attr = NULL; #endif #endif return res; } #ifdef ETHR_USE_OWN_RWMTX_IMPL__ #ifdef ETHR_ATOMIC_HAVE_INC_DEC_INSTRUCTIONS #if 0 /* * When inc and dec are real atomic instructions as on x86, the * ETHR_RLOCK_WITH_INC_DEC implementations performs better with * lots of read locks compared to the cmpxchg based implementation. * It, however, performs worse with lots of mixed reads and writes. * It could be used for rwlocks that are known to be read locked * much, but the readers array based implementation outperforms it * by far. Therefore, it has been disabled, and will probably be * removed some time in the future. */ # define ETHR_RLOCK_WITH_INC_DEC #endif #endif static int reader_groups_array_size = 0; static int main_threads_array_size = 0; #endif int ethr_mutex_lib_late_init(int no_reader_groups, int no_main_threads) { #ifdef ETHR_USE_OWN_MTX_IMPL__ default_mtx_main_spincount += (no_main_threads * ETHR_MTX_DEFAULT_MAIN_SPINCOUNT_INC); if (default_mtx_main_spincount > ETHR_MTX_DEFAULT_MAIN_SPINCOUNT_MAX) default_mtx_main_spincount = ETHR_MTX_DEFAULT_MAIN_SPINCOUNT_MAX; #endif #ifdef ETHR_USE_OWN_RWMTX_IMPL__ default_rwmtx_main_spincount += (no_main_threads * ETHR_RWMTX_DEFAULT_MAIN_SPINCOUNT_INC); if (default_rwmtx_main_spincount > ETHR_RWMTX_DEFAULT_MAIN_SPINCOUNT_MAX) default_rwmtx_main_spincount = ETHR_RWMTX_DEFAULT_MAIN_SPINCOUNT_MAX; reader_groups_array_size = (no_reader_groups <= 1 ? 1 : no_reader_groups + 1); main_threads_array_size = (no_main_threads <= 1 ? 1 : no_main_threads + 1); #endif return 0; } int ethr_rwmutex_set_reader_group(int ix) { #ifdef ETHR_USE_OWN_RWMTX_IMPL__ ethr_ts_event *tse; if (ix < 0 || reader_groups_array_size <= ix) return EINVAL; tse = ethr_get_ts_event(); if ((tse->iflgs & ETHR_TS_EV_ETHREAD) == 0) { ethr_leave_ts_event(tse); return EINVAL; } tse->rgix = ix; ethr_leave_ts_event(tse); #endif return 0; } #if defined(ETHR_MTX_HARD_DEBUG_Q) || defined(ETHR_MTX_HARD_DEBUG_WSQ) static void hard_debug_chk_q__(struct ethr_mutex_base_ *, int); #define ETHR_RWMTX_HARD_DEBUG_CHK_Q(RWMTX) hard_debug_chk_q__(&(RWMTX)->mtxb,1) #define ETHR_MTX_HARD_DEBUG_CHK_Q(MTX) hard_debug_chk_q__(&(MTX)->mtxb, 0) #else #define ETHR_RWMTX_HARD_DEBUG_CHK_Q(RWMTX) #define ETHR_MTX_HARD_DEBUG_CHK_Q(MTX) #endif #ifdef ETHR_USE_OWN_RWMTX_IMPL__ static void rwmutex_unlock_wake(ethr_rwmutex *rwmtx, int have_w, long initial); static int rwmutex_try_complete_runlock(ethr_rwmutex *rwmtx, long initial, ethr_ts_event *tse, int start_next_ix, int check_before_try, int try_write_lock); #endif #if defined(ETHR_USE_OWN_RWMTX_IMPL__) || defined(ETHR_USE_OWN_MTX_IMPL__) /* -- Utilities operating both on ordinary mutexes and read write mutexes -- */ static ETHR_INLINE void rwmutex_freqread_wtng_rdrs_inc(ethr_rwmutex *rwmtx, ethr_ts_event *tse) { int ix = (rwmtx->type == ETHR_RWMUTEX_TYPE_FREQUENT_READ ? tse->rgix : tse->mtix); rwmtx->tdata.ra[ix].data.waiting_readers++; } static ETHR_INLINE void rwmutex_freqread_rdrs_add(ethr_rwmutex *rwmtx, ethr_rwmutex_type type, int ix, int inc) { if (type == ETHR_RWMUTEX_TYPE_FREQUENT_READ || ix == 0) ethr_atomic_add(&rwmtx->tdata.ra[ix].data.readers, inc); else { ETHR_ASSERT(type == ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ); ETHR_ASSERT(ethr_atomic_read(&rwmtx->tdata.ra[ix].data.readers) == 0); ETHR_ASSERT(inc == 1); ethr_atomic_set(&rwmtx->tdata.ra[ix].data.readers, (long) 1); } } static ETHR_INLINE void rwmutex_freqread_rdrs_inc(ethr_rwmutex *rwmtx, ethr_ts_event *tse) { int ix; if (rwmtx->type == ETHR_RWMUTEX_TYPE_FREQUENT_READ) { ix = tse->rgix; atomic_inc: ethr_atomic_inc(&rwmtx->tdata.ra[ix].data.readers); } else { ix = tse->mtix; if (ix == 0) goto atomic_inc; ETHR_ASSERT(rwmtx->type == ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ); ETHR_ASSERT(ethr_atomic_read(&rwmtx->tdata.ra[ix].data.readers) == 0); ethr_atomic_set(&rwmtx->tdata.ra[ix].data.readers, (long) 1); } } static ETHR_INLINE void rwmutex_freqread_rdrs_dec(ethr_rwmutex *rwmtx, ethr_ts_event *tse) { int ix; if (rwmtx->type == ETHR_RWMUTEX_TYPE_FREQUENT_READ) { ix = tse->rgix; atomic_dec: ethr_atomic_dec(&rwmtx->tdata.ra[ix].data.readers); } else { ix = tse->mtix; if (ix == 0) goto atomic_dec; ETHR_ASSERT(rwmtx->type == ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ); ETHR_ASSERT(ethr_atomic_read(&rwmtx->tdata.ra[ix].data.readers) == 1); ethr_atomic_set(&rwmtx->tdata.ra[ix].data.readers, (long) 0); } } static ETHR_INLINE long rwmutex_freqread_rdrs_dec_read(ethr_rwmutex *rwmtx, ethr_ts_event *tse) { int ix; if (rwmtx->type == ETHR_RWMUTEX_TYPE_FREQUENT_READ) { ix = tse->rgix; atomic_dec_read: return ethr_atomic_dec_read(&rwmtx->tdata.ra[ix].data.readers); } else { ix = tse->mtix; if (ix == 0) goto atomic_dec_read; ETHR_ASSERT(rwmtx->type == ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ); ETHR_ASSERT(ethr_atomic_read(&rwmtx->tdata.ra[ix].data.readers) == 1); ethr_atomic_set(&rwmtx->tdata.ra[ix].data.readers, (long) 0); return (long) 0; } } static ETHR_INLINE long rwmutex_freqread_rdrs_dec_read_relb(ethr_rwmutex *rwmtx, ethr_ts_event *tse) { int ix; if (rwmtx->type == ETHR_RWMUTEX_TYPE_FREQUENT_READ) { ix = tse->rgix; atomic_dec_read: return ethr_atomic_dec_read_relb(&rwmtx->tdata.ra[ix].data.readers); } else { ix = tse->mtix; if (ix == 0) goto atomic_dec_read; ETHR_ASSERT(rwmtx->type == ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ); ETHR_ASSERT(ethr_atomic_read(&rwmtx->tdata.ra[ix].data.readers) == 1); ethr_atomic_set_relb(&rwmtx->tdata.ra[ix].data.readers, (long) 0); return (long) 0; } } static ETHR_INLINE long rwmutex_freqread_rdrs_read(ethr_rwmutex *rwmtx, int ix) { long res = ethr_atomic_read(&rwmtx->tdata.ra[ix].data.readers); #ifdef ETHR_DEBUG switch (rwmtx->type) { case ETHR_RWMUTEX_TYPE_FREQUENT_READ: ETHR_ASSERT(res >= 0); break; case ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ: ETHR_ASSERT(res == 0 || res == 1); break; default: ETHR_ASSERT(0); break; } #endif return res; } static ETHR_INLINE void enqueue(ethr_ts_event **queue, ethr_ts_event *tse_start, ethr_ts_event *tse_end) { if (!*queue) { *queue = tse_start; tse_start->prev = tse_end; tse_end->next = tse_start; } else { tse_end->next = *queue; tse_start->prev = (*queue)->prev; (*queue)->prev->next = tse_start; (*queue)->prev = tse_end; } } static ETHR_INLINE void insert(ethr_ts_event *tse_pred, ethr_ts_event *tse) { tse->next = tse_pred->next; tse->prev = tse_pred; tse_pred->next->prev = tse; tse_pred->next = tse; } static ETHR_INLINE void dequeue(ethr_ts_event **queue, ethr_ts_event *tse_start, ethr_ts_event *tse_end) { if (tse_start->prev == tse_end) { ETHR_ASSERT(*queue == tse_start && tse_end->next == tse_start); *queue = NULL; } else { if (*queue == tse_start) *queue = tse_end->next; tse_end->next->prev = tse_start->prev; tse_start->prev->next = tse_end->next; } } static void event_wait(struct ethr_mutex_base_ *mtxb, ethr_ts_event *tse, int spincount, long type, int is_rwmtx, int is_freq_read) { int locked = 0; long act; int need_try_complete_runlock = 0; /* Need to enqueue and wait... */ tse->uflgs = type; ethr_atomic_set(&tse->uaflgs, type); ETHR_MTX_Q_LOCK(&mtxb->qlck); locked = 1; #ifdef ETHR_MTX_HARD_DEBUG_Q hard_debug_chk_q__(mtxb, is_rwmtx); #endif act = ethr_atomic_read(&mtxb->flgs); if (act & type) { /* Wait bit already there; enqueue... */ ETHR_ASSERT(mtxb->q); if (type == ETHR_RWMTX_W_WAIT_FLG__) { enqueue(&mtxb->q, tse, tse); #ifdef ETHR_MTX_HARD_DEBUG_WSQ mtxb->ws++; #endif } else { ethr_rwmutex *rwmtx = (ethr_rwmutex *) mtxb; ETHR_ASSERT(is_rwmtx); ETHR_ASSERT(rwmtx->rq_end); insert(rwmtx->rq_end, tse); rwmtx->rq_end = tse; if (is_freq_read) rwmutex_freqread_wtng_rdrs_inc(rwmtx, tse); else rwmtx->tdata.rs++; } } else { /* Set wait bit */ while (1) { long new, exp = act; int freqread_tryrlock = 0; need_try_complete_runlock = 0; if (type == ETHR_RWMTX_W_WAIT_FLG__) { if (is_freq_read && act == ETHR_RWMTX_R_FLG__) need_try_complete_runlock = 1; if (act != 0) new = act | ETHR_RWMTX_W_WAIT_FLG__; else new = ETHR_RWMTX_W_FLG__; /* Try to get it */ } else { ETHR_ASSERT(is_rwmtx); if (!is_freq_read) { if (act & (ETHR_RWMTX_W_FLG__| ETHR_RWMTX_W_WAIT_FLG__)) new = act | ETHR_RWMTX_R_WAIT_FLG__; else new = act + 1; /* Try to get it */ } else { if (act & ~ETHR_RWMTX_R_FLG__) new = act | ETHR_RWMTX_R_WAIT_FLG__; else { /* Try to get it */ ethr_rwmutex *rwmtx = (ethr_rwmutex *) mtxb; rwmutex_freqread_rdrs_inc(rwmtx, tse); ETHR_MEMORY_BARRIER; new = act | ETHR_RWMTX_R_FLG__; freqread_tryrlock = 1; } } } act = ethr_atomic_cmpxchg_acqb(&mtxb->flgs, new, exp); if (exp == act) { if (new & type) { act = new; break; } else { /* Got it */ goto done; } } if (freqread_tryrlock) { ethr_rwmutex *rwmtx = (ethr_rwmutex *) mtxb; /* We didn't set ETHR_RWMTX_R_FLG__, however someone else might have */ if (act == ETHR_RWMTX_R_FLG__) goto done; /* Got it by help from someone else */ ETHR_ASSERT((act & ETHR_RWMTX_WAIT_FLGS__) == 0); /* * We know that no waiter flags have been set, i.e., * we cannot get into a situation where we need to wake * someone up here. Just restore the readers counter * and do it over again... */ rwmutex_freqread_rdrs_dec(rwmtx, tse); } } /* Enqueue */ if (type == ETHR_RWMTX_R_WAIT_FLG__) { ethr_rwmutex *rwmtx = (ethr_rwmutex *) mtxb; ETHR_ASSERT(is_rwmtx); ETHR_ASSERT(!rwmtx->rq_end); rwmtx->rq_end = tse; if (is_freq_read) rwmutex_freqread_wtng_rdrs_inc(rwmtx, tse); else rwmtx->tdata.rs++; } #ifdef ETHR_MTX_HARD_DEBUG_WSQ else { mtxb->ws++; } #endif enqueue(&mtxb->q, tse, tse); } #ifdef ETHR_MTX_HARD_DEBUG_Q hard_debug_chk_q__(mtxb, is_rwmtx); #endif /* Wait */ locked = 0; ETHR_MTX_Q_UNLOCK(&mtxb->qlck); if (need_try_complete_runlock) { ETHR_ASSERT(((ethr_rwmutex *) mtxb)->type != ETHR_RWMUTEX_TYPE_NORMAL); /* * We were the only one in queue when we enqueued, and it * was seemingly read locked. We need to try to complete a * runlock otherwise we might be hanging forever. If the * runlock could be completed we will be dequeued and * woken by ourselves. */ rwmutex_try_complete_runlock((ethr_rwmutex *) mtxb, act, tse, 0, 1, 0); } while (1) { ethr_event_reset(&tse->event); act = ethr_atomic_read_acqb(&tse->uaflgs); if (!act) goto done; /* Got it */ ETHR_ASSERT(act == type); ethr_event_swait(&tse->event, spincount); /* swait result: 0 || EINTR */ act = ethr_atomic_read_acqb(&tse->uaflgs); if (!act) goto done; /* Got it */ } done: if (locked) ETHR_MTX_Q_UNLOCK(&mtxb->qlck); } static void wake_writer(struct ethr_mutex_base_ *mtxb, int is_rwmtx) { ethr_ts_event *tse; tse = mtxb->q; ETHR_ASSERT(tse); dequeue(&mtxb->q, tse, tse); ETHR_ASSERT(tse->uflgs == ETHR_RWMTX_W_WAIT_FLG__); ETHR_ASSERT(ethr_atomic_read(&tse->uaflgs) == ETHR_RWMTX_W_WAIT_FLG__); #ifdef ETHR_MTX_HARD_DEBUG_WSQ mtxb->ws--; #endif #if defined(ETHR_MTX_HARD_DEBUG_Q) || defined(ETHR_MTX_HARD_DEBUG_WSQ) hard_debug_chk_q__(mtxb, is_rwmtx); #endif ETHR_MTX_Q_UNLOCK(&mtxb->qlck); ethr_atomic_set(&tse->uaflgs, 0); ethr_event_set(&tse->event); } static ETHR_INLINE int initial_spincount(struct ethr_mutex_base_ *mtxb) { return (mtxb->aux_scnt < ETHR_MTX_MAX_FLGS_SPIN ? mtxb->aux_scnt : ETHR_MTX_MAX_FLGS_SPIN); } static ETHR_INLINE int update_spincount(struct ethr_mutex_base_ *mtxb, ethr_ts_event *tse, int *scnt_state, int *scnt) { int state = *scnt_state; if (state <= 0) { /* Here state is max spincount to do on event negated */ *scnt = -state; } else { /* Here state is initial spincount made on flags */ *scnt = ((tse->iflgs & ETHR_TS_EV_MAIN_THR) ? mtxb->main_scnt : mtxb->aux_scnt); if (*scnt <= state) *scnt = 0; else { if (*scnt <= ETHR_MTX_MAX_FLGS_SPIN) *scnt_state = 0; /* No spin on event */ else { /* Spin on event after... */ *scnt_state = -1*(*scnt - ETHR_MTX_MAX_FLGS_SPIN); /* ... we have spun on flags */ *scnt = ETHR_MTX_MAX_FLGS_SPIN; } *scnt -= state; return 0; } } return 1; } int check_readers_array(ethr_rwmutex *rwmtx, int start_rix, int length); static ETHR_INLINE void write_lock_wait(struct ethr_mutex_base_ *mtxb, long initial, int is_rwmtx, int is_freq_read) { long act = initial; int scnt, start_scnt; ethr_ts_event *tse = NULL; int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; int res; int freq_read_size = -1; int freq_read_start_ix = -1; ETHR_ASSERT(!is_freq_read || is_rwmtx); start_scnt = scnt = initial_spincount(mtxb); /* * Spin trying to write lock for a while. If unsuccessful, * wait on event. */ while (1) { long exp; while (act != 0) { if (is_freq_read && act == ETHR_RWMTX_R_FLG__) { ethr_rwmutex *rwmtx = (ethr_rwmutex *) mtxb; if (!tse) tse = ethr_get_ts_event(); if (freq_read_size < 0) { if (rwmtx->type == ETHR_RWMUTEX_TYPE_FREQUENT_READ) { freq_read_size = reader_groups_array_size; freq_read_start_ix = tse->rgix; } else { freq_read_size = main_threads_array_size; freq_read_start_ix = tse->mtix; } } res = check_readers_array(rwmtx, freq_read_start_ix, freq_read_size); scnt--; if (res == 0) { act = ethr_atomic_read(&mtxb->flgs); if (act & ETHR_RWMTX_R_MASK__) { res = rwmutex_try_complete_runlock(rwmtx, act, tse, 0, 0, 1); if (res != EBUSY) goto done; /* Got it */ } if (scnt <= 0) goto chk_spin; if (--until_yield == 0) { until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; ETHR_YIELD(); } continue; } } if (scnt <= 0) { chk_spin: scnt = 0; if (!tse) tse = ethr_get_ts_event(); if (update_spincount(mtxb, tse, &start_scnt, &scnt)) { event_wait(mtxb, tse, scnt, ETHR_RWMTX_W_WAIT_FLG__, is_rwmtx, is_freq_read); goto done; } } ETHR_SPIN_BODY; if (--until_yield == 0) { until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; ETHR_YIELD(); } act = ethr_atomic_read(&mtxb->flgs); scnt--; } exp = act; act = ethr_atomic_cmpxchg_acqb(&mtxb->flgs, ETHR_RWMTX_W_FLG__, exp); if (act == 0) goto done; /* Got it */ } done: if (tse) ethr_leave_ts_event(tse); } static int mtxb_init(struct ethr_mutex_base_ *mtxb, int def_main_scnt, int main_scnt, int def_aux_scnt, int aux_scnt) { ETHR_MTX_HARD_DEBUG_LFS_INIT(mtxb); #ifdef ETHR_MTX_HARD_DEBUG_WSQ mtxb->ws = 0; #endif if (no_spin) { mtxb->main_scnt = 0; mtxb->aux_scnt = 0; } else { if (main_scnt > SHRT_MAX) mtxb->main_scnt = SHRT_MAX; else if (main_scnt < 0) mtxb->main_scnt = def_main_scnt; else mtxb->main_scnt = (short) main_scnt; if (aux_scnt > SHRT_MAX) mtxb->aux_scnt = SHRT_MAX; else if (aux_scnt < 0) mtxb->aux_scnt = def_aux_scnt; else mtxb->aux_scnt = (short) aux_scnt; if (mtxb->main_scnt < mtxb->aux_scnt) mtxb->main_scnt = mtxb->aux_scnt; } mtxb->q = NULL; ethr_atomic_init(&mtxb->flgs, 0); return ETHR_MTX_QLOCK_INIT(&mtxb->qlck); } static int mtxb_destroy(struct ethr_mutex_base_ *mtxb) { long act; ETHR_MTX_Q_LOCK(&mtxb->qlck); act = ethr_atomic_read(&mtxb->flgs); ETHR_MTX_Q_UNLOCK(&mtxb->qlck); if (act != 0) return EINVAL; return ETHR_MTX_QLOCK_DESTROY(&mtxb->qlck); } #endif /* ETHR_USE_OWN_RWMTX_IMPL__ || ETHR_USE_OWN_MTX_IMPL__ */ /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\ * Mutex and condition variable implementation * \* */ #ifdef ETHR_USE_OWN_MTX_IMPL__ /* -- Mutex ---------------------------------------------------------------- */ int ethr_mutex_init_opt(ethr_mutex *mtx, ethr_mutex_opt *opt) { int res; #if ETHR_XCHK if (!mtx) { ETHR_ASSERT(0); return EINVAL; } mtx->initialized = ETHR_MUTEX_INITIALIZED; #endif ETHR_MTX_HARD_DEBUG_FENCE_INIT(mtx); res = mtxb_init(&mtx->mtxb, default_mtx_main_spincount, opt ? opt->main_spincount : -1, default_mtx_aux_spincount, opt ? opt->aux_spincount : -1); #if ETHR_XCHK if (res != 0) mtx->initialized = 0; #endif return res; } int ethr_mutex_init(ethr_mutex *mtx) { return ethr_mutex_init_opt(mtx, NULL); } int ethr_mutex_destroy(ethr_mutex *mtx) { #if ETHR_XCHK if (ethr_not_inited__) { ETHR_ASSERT(0); return EACCES; } if (!mtx) { ETHR_ASSERT(0); return EINVAL; } mtx->initialized = 0; #endif return mtxb_destroy(&mtx->mtxb); } void ethr_mutex_lock_wait__(ethr_mutex *mtx, long initial) { write_lock_wait(&mtx->mtxb, initial, 0, 0); } void ethr_mutex_unlock_wake__(ethr_mutex *mtx, long initial) { ethr_ts_event *tse; ETHR_MTX_Q_LOCK(&mtx->mtxb.qlck); tse = mtx->mtxb.q; ETHR_ASSERT(tse); ETHR_ASSERT(ethr_atomic_read(&mtx->mtxb.flgs) == (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_W_WAIT_FLG__)); ETHR_ASSERT(initial & ETHR_RWMTX_W_WAIT_FLG__); ETHR_MTX_HARD_DEBUG_CHK_Q(mtx); /* * If we have multiple waiters, there is no need to modify * mtxb->flgs; otherwise, we need to clear the write wait bit... */ if (tse->next == mtx->mtxb.q) ethr_atomic_set(&mtx->mtxb.flgs, ETHR_RWMTX_W_FLG__); wake_writer(&mtx->mtxb, 0); } /* -- Condition variables -------------------------------------------------- */ static void enqueue_mtx(ethr_mutex *mtx, ethr_ts_event *tse_start, ethr_ts_event *tse_end) { long act; /* * `ethr_cond_signal()' and `ethr_cond_broadcast()' end up here. If `mtx' * is not currently locked by current thread, we almost certainly have a * hard to debug race condition. There might however be some (strange) * use for it. POSIX also allow a call to `pthread_cond_signal' or * `pthread_cond_broadcast' even though the the associated mutex isn't * locked by the caller. Therefore, we also allow this kind of strange * usage, but optimize for the case where the mutex is locked by the * calling thread. */ ETHR_MTX_Q_LOCK(&mtx->mtxb.qlck); ETHR_MTX_HARD_DEBUG_CHK_Q(mtx); #ifdef ETHR_MTX_HARD_DEBUG_WSQ { int dbg_nws__ = 0; ethr_ts_event *dbg_tse__; for (dbg_tse__ = tse_start; dbg_tse__ != tse_end; dbg_tse__ = dbg_tse__->next) dbg_nws__++; mtx->mtxb.ws += dbg_nws__ + 1; } #endif act = ethr_atomic_read(&mtx->mtxb.flgs); ETHR_ASSERT(act == 0 || act == ETHR_RWMTX_W_FLG__ || act == (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_W_WAIT_FLG__)); if (act & ETHR_RWMTX_W_FLG__) { /* The normal sane case */ if (!(act & ETHR_RWMTX_W_WAIT_FLG__)) { ETHR_ASSERT(!mtx->mtxb.q); act = ethr_atomic_cmpxchg(&mtx->mtxb.flgs, (ETHR_RWMTX_W_FLG__ | ETHR_RWMTX_W_WAIT_FLG__), ETHR_RWMTX_W_FLG__); if (act != ETHR_RWMTX_W_FLG__) { /* * Sigh... this wasn't so sane after all since, the mutex was * obviously not locked by the current thread.... */ ETHR_ASSERT(act == 0); goto mtx_unlocked; } } #ifdef ETHR_DEBUG if (act & ETHR_RWMTX_W_WAIT_FLG__) ETHR_ASSERT(mtx->mtxb.q); else ETHR_ASSERT(!mtx->mtxb.q); #endif enqueue(&mtx->mtxb.q, tse_start, tse_end); ETHR_MTX_HARD_DEBUG_CHK_Q(mtx); ETHR_MTX_Q_UNLOCK(&mtx->mtxb.qlck); } else { int multi; mtx_unlocked: /* Sigh... mutex isn't locked... */ multi = tse_start != tse_end; while (1) { long new, exp = act; if (multi || (act & ETHR_RWMTX_W_FLG__)) new = ETHR_RWMTX_W_FLG__|ETHR_RWMTX_W_WAIT_FLG__; else new = ETHR_RWMTX_W_FLG__; act = ethr_atomic_cmpxchg(&mtx->mtxb.flgs, new, exp); if (exp == act) { ETHR_ASSERT(!mtx->mtxb.q); if (act & ETHR_RWMTX_W_FLG__) { enqueue(&mtx->mtxb.q, tse_start, tse_end); ETHR_MTX_HARD_DEBUG_CHK_Q(mtx); ETHR_MTX_Q_UNLOCK(&mtx->mtxb.qlck); } else { ETHR_ASSERT(!mtx->mtxb.q); /* * Acquired the mutex on behalf of the * first thread in the queue; wake * it and enqueue the rest... */ #ifdef ETHR_MTX_HARD_DEBUG_WSQ mtx->mtxb.ws--; #endif if (multi) { enqueue(&mtx->mtxb.q, tse_start->next, tse_end); ETHR_ASSERT(mtx->mtxb.q); } ETHR_MTX_HARD_DEBUG_CHK_Q(mtx); ETHR_MTX_Q_UNLOCK(&mtx->mtxb.qlck); ethr_atomic_set(&tse_start->uaflgs, 0); ethr_event_set(&tse_start->event); } break; } } } } int ethr_cond_init_opt(ethr_cond *cnd, ethr_cond_opt *opt) { #if ETHR_XCHK if (!cnd) { ETHR_ASSERT(0); return EINVAL; } cnd->initialized = ETHR_COND_INITIALIZED; #endif ETHR_MTX_HARD_DEBUG_FENCE_INIT(cnd); cnd->q = NULL; if (no_spin) { cnd->main_scnt = 0; cnd->aux_scnt = 0; } else { if (!opt || opt->main_spincount < 0) cnd->main_scnt = default_cnd_main_spincount; else if (opt->main_spincount > SHRT_MAX) cnd->main_scnt = SHRT_MAX; else cnd->main_scnt = (short) opt->main_spincount; if (!opt || opt->aux_spincount < 0) cnd->aux_scnt = default_cnd_aux_spincount; else if (opt->aux_spincount > SHRT_MAX) cnd->aux_scnt = SHRT_MAX; else cnd->aux_scnt = (short) opt->aux_spincount; if (cnd->main_scnt < cnd->aux_scnt) cnd->main_scnt = cnd->aux_scnt; } ETHR_MTX_QLOCK_INIT(&cnd->qlck); return 0; } int ethr_cond_init(ethr_cond *cnd) { return ethr_cond_init_opt(cnd, NULL); } int ethr_cond_destroy(ethr_cond *cnd) { #if ETHR_XCHK if (ethr_not_inited__) { ETHR_ASSERT(0); return EACCES; } if (!cnd || cnd->initialized != ETHR_COND_INITIALIZED) { ETHR_ASSERT(0); return EINVAL; } cnd->initialized = 0; #endif return ETHR_MTX_QLOCK_DESTROY(&cnd->qlck); } void ethr_cond_signal(ethr_cond *cnd) { ethr_ts_event *tse; ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(cnd); ETHR_ASSERT(cnd->initialized == ETHR_COND_INITIALIZED); ETHR_MTX_HARD_DEBUG_FENCE_CHK(cnd); ETHR_MTX_Q_LOCK(&cnd->qlck); tse = cnd->q; if (!tse) { ETHR_MTX_HARD_DEBUG_FENCE_CHK(cnd); ETHR_MTX_Q_UNLOCK(&cnd->qlck); } else { ethr_mutex *mtx = (ethr_mutex *) tse->udata; ETHR_MTX_HARD_DEBUG_FENCE_CHK(mtx); ETHR_ASSERT(tse->uflgs == ETHR_RWMTX_W_WAIT_FLG__); ETHR_ASSERT(ethr_atomic_read(&tse->uaflgs) == ETHR_CND_WAIT_FLG__); ethr_atomic_set(&tse->uaflgs, ETHR_RWMTX_W_WAIT_FLG__); dequeue(&cnd->q, tse, tse); ETHR_MTX_Q_UNLOCK(&cnd->qlck); tse->next = tse->prev = NULL; enqueue_mtx(mtx, tse, tse); ETHR_MTX_HARD_DEBUG_FENCE_CHK(cnd); ETHR_MTX_HARD_DEBUG_FENCE_CHK(mtx); } } void ethr_cond_broadcast(ethr_cond *cnd) { int got_all; ethr_ts_event *tse; ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(cnd); ETHR_ASSERT(cnd->initialized == ETHR_COND_INITIALIZED); ETHR_MTX_HARD_DEBUG_FENCE_CHK(cnd); do { got_all = 1; ETHR_MTX_Q_LOCK(&cnd->qlck); tse = cnd->q; if (!tse) { ETHR_MTX_HARD_DEBUG_FENCE_CHK(cnd); ETHR_MTX_Q_UNLOCK(&cnd->qlck); } else { ethr_mutex *mtx = (ethr_mutex *) tse->udata; ethr_ts_event *tse_tmp, *tse_end; ETHR_MTX_HARD_DEBUG_FENCE_CHK(mtx); tse_end = cnd->q->prev; tse_tmp = tse; do { if (mtx == (ethr_mutex *) tse_tmp->udata) { /* The normal case */ ETHR_ASSERT(tse_tmp->uflgs == ETHR_RWMTX_W_WAIT_FLG__); ETHR_ASSERT(ethr_atomic_read(&tse_tmp->uaflgs) == ETHR_CND_WAIT_FLG__); ethr_atomic_set(&tse_tmp->uaflgs, ETHR_RWMTX_W_WAIT_FLG__); } else { /* Should be very unusual */ ETHR_MTX_HARD_DEBUG_FENCE_CHK(mtx); tse_end = tse_tmp->prev; got_all = 0; break; } tse_tmp = tse_tmp->next; } while (tse_tmp != cnd->q); dequeue(&cnd->q, tse, tse_end); ETHR_MTX_Q_UNLOCK(&cnd->qlck); enqueue_mtx(mtx, tse, tse_end); } } while (!got_all); ETHR_MTX_HARD_DEBUG_FENCE_CHK(cnd); } int ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) { int woken; int scnt; void *udata = NULL; ethr_ts_event *tse; ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(cnd); ETHR_ASSERT(cnd->initialized == ETHR_COND_INITIALIZED); ETHR_ASSERT(mtx); ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED); tse = ethr_get_ts_event(); scnt = ((tse->iflgs & ETHR_TS_EV_MAIN_THR) ? cnd->main_scnt : cnd->aux_scnt); ETHR_MTX_HARD_DEBUG_FENCE_CHK(cnd); ETHR_MTX_HARD_DEBUG_FENCE_CHK(mtx); udata = tse->udata; /* Got to restore udata before returning */ tse->udata = (void *) mtx; tse->uflgs = ETHR_RWMTX_W_WAIT_FLG__; /* Prep for mutex lock op */ ethr_atomic_set(&tse->uaflgs, ETHR_CND_WAIT_FLG__); ETHR_MTX_Q_LOCK(&cnd->qlck); enqueue(&cnd->q, tse, tse); ETHR_MTX_Q_UNLOCK(&cnd->qlck); ethr_mutex_unlock(mtx); /* Wait */ woken = 0; while (1) { long act; ethr_event_reset(&tse->event); act = ethr_atomic_read_acqb(&tse->uaflgs); if (!act) break; /* Mtx locked */ /* First time, got EINTR, or spurious wakeup... */ ETHR_ASSERT(act == ETHR_CND_WAIT_FLG__ || act == ETHR_RWMTX_W_WAIT_FLG__); if (woken) { /* * If act == ETHR_RWMTX_W_WAIT_FLG__, we have already been enqueued * on the mutex; continue wait until locked... */ if (act == ETHR_CND_WAIT_FLG__) { ETHR_MTX_Q_LOCK(&cnd->qlck); act = ethr_atomic_read(&tse->uaflgs); ETHR_ASSERT(act == ETHR_CND_WAIT_FLG__ || act == ETHR_RWMTX_W_WAIT_FLG__); /* * If act == ETHR_RWMTX_W_WAIT_FLG__, we have already * enqueued on the mutex; continue wait until locked... */ if (act == ETHR_CND_WAIT_FLG__) dequeue(&cnd->q, tse, tse); ETHR_MTX_Q_UNLOCK(&cnd->qlck); if (act == ETHR_CND_WAIT_FLG__) { tse->udata = udata; ethr_leave_ts_event(tse); ETHR_MTX_HARD_DEBUG_FENCE_CHK(cnd); ethr_mutex_lock(mtx); return EINTR; } } ETHR_ASSERT(act == ETHR_RWMTX_W_WAIT_FLG__); } ethr_event_swait(&tse->event, scnt); /* swait result: 0 || EINTR */ woken = 1; } ETHR_MTX_HARD_DEBUG_LFS_RWLOCK(&mtx->mtxb); ETHR_MTX_HARD_DEBUG_FENCE_CHK(cnd); ETHR_MTX_HARD_DEBUG_FENCE_CHK(mtx); tse->udata = udata; ethr_leave_ts_event(tse); return 0; } #else /* -- pthread mutex and condition variables -------------------------------- */ int ethr_mutex_init(ethr_mutex *mtx) { #if ETHR_XCHK if (!mtx) { ETHR_ASSERT(0); return EINVAL; } mtx->initialized = ETHR_MUTEX_INITIALIZED; #endif return pthread_mutex_init(&mtx->pt_mtx, NULL); } int ethr_mutex_destroy(ethr_mutex *mtx) { #if ETHR_XCHK if (ethr_not_inited__) { ETHR_ASSERT(0); return EACCES; } if (!mtx || mtx->initialized != ETHR_MUTEX_INITIALIZED) { ETHR_ASSERT(0); return EINVAL; } #endif #if ETHR_XCHK mtx->initialized = 0; #endif return pthread_mutex_destroy(&mtx->pt_mtx); } int ethr_cond_init(ethr_cond *cnd) { #if ETHR_XCHK if (!cnd) { ETHR_ASSERT(0); return EINVAL; } cnd->initialized = ETHR_COND_INITIALIZED; #endif return pthread_cond_init(&cnd->pt_cnd, NULL); } int ethr_cond_destroy(ethr_cond *cnd) { #if ETHR_XCHK if (ethr_not_inited__) { ETHR_ASSERT(0); return EACCES; } if (!cnd || cnd->initialized != ETHR_COND_INITIALIZED) { ETHR_ASSERT(0); return EINVAL; } cnd->initialized = 0; #endif return pthread_cond_destroy(&cnd->pt_cnd); } void ethr_cond_signal(ethr_cond *cnd) { int res; ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(cnd); ETHR_ASSERT(cnd->initialized == ETHR_COND_INITIALIZED); res = pthread_cond_signal(&cnd->pt_cnd); if (res != 0) ETHR_FATAL_ERROR__(res); } void ethr_cond_broadcast(ethr_cond *cnd) { int res; ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(cnd); ETHR_ASSERT(cnd->initialized == ETHR_COND_INITIALIZED); res = pthread_cond_broadcast(&cnd->pt_cnd); if (res != 0) ETHR_FATAL_ERROR__(res); } int ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) { int res; ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(cnd); ETHR_ASSERT(cnd->initialized == ETHR_COND_INITIALIZED); ETHR_ASSERT(mtx); ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED); res = pthread_cond_wait(&cnd->pt_cnd, &mtx->pt_mtx); if (res != 0 && res != EINTR) ETHR_FATAL_ERROR__(res); return res; } #endif /* pthread_mutex */ /* -- Exported symbols of inline functions --------------------------------- */ int ethr_mutex_trylock(ethr_mutex *mtx) { ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(mtx); ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED); return ethr_mutex_trylock__(mtx); } void ethr_mutex_lock(ethr_mutex *mtx) { ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(mtx); ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED); ethr_mutex_lock__(mtx); } void ethr_mutex_unlock(ethr_mutex *mtx) { ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(mtx); ETHR_ASSERT(mtx->initialized == ETHR_MUTEX_INITIALIZED); ethr_mutex_unlock__(mtx); } #ifdef ETHR_USE_OWN_RWMTX_IMPL__ /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\ * Read/Write Mutex * \* */ static void wake_readers(ethr_rwmutex *rwmtx, int rs) { ethr_ts_event *tse; #ifdef ETHR_DEBUG int drs = 0; #endif tse = rwmtx->mtxb.q; ETHR_ASSERT(tse); ETHR_ASSERT(rwmtx->rq_end); dequeue(&rwmtx->mtxb.q, tse, rwmtx->rq_end); rwmtx->rq_end->next = NULL; rwmtx->rq_end = NULL; ETHR_ASSERT(!rwmtx->mtxb.q || (ethr_atomic_read(&rwmtx->mtxb.q->uaflgs) == ETHR_RWMTX_W_WAIT_FLG__)); ETHR_RWMTX_HARD_DEBUG_CHK_Q(rwmtx); ETHR_MTX_Q_UNLOCK(&rwmtx->mtxb.qlck); while (tse) { ethr_ts_event *tse_next; #ifdef ETHR_DEBUG ETHR_ASSERT(tse->uflgs == ETHR_RWMTX_R_WAIT_FLG__); ETHR_ASSERT(ethr_atomic_read(&tse->uaflgs) == ETHR_RWMTX_R_WAIT_FLG__); drs++; #endif tse_next = tse->next; /* we aren't allowed to read tse->next after we have reset uaflgs */ ethr_atomic_set(&tse->uaflgs, 0); ethr_event_set(&tse->event); tse = tse_next; } ETHR_ASSERT(rs == drs); } static ETHR_INLINE int is_w_waiter(ethr_ts_event *tse) { ETHR_ASSERT(tse->uflgs == ETHR_RWMTX_W_WAIT_FLG__ || tse->uflgs == ETHR_RWMTX_R_WAIT_FLG__); return tse->uflgs == ETHR_RWMTX_W_WAIT_FLG__; } static ETHR_INLINE int multiple_w_waiters(ethr_rwmutex *rwmtx) { ETHR_ASSERT(rwmtx->mtxb.q); ETHR_ASSERT(rwmtx->mtxb.q->uflgs == ETHR_RWMTX_W_WAIT_FLG__); if (!rwmtx->rq_end) return rwmtx->mtxb.q->next != rwmtx->mtxb.q; else { ETHR_ASSERT(rwmtx->mtxb.q->next != rwmtx->mtxb.q); if (rwmtx->mtxb.q->next->uflgs == ETHR_RWMTX_W_WAIT_FLG__) return 1; ETHR_ASSERT(rwmtx->rq_end->next == rwmtx->mtxb.q || rwmtx->rq_end->next->uflgs == ETHR_RWMTX_W_WAIT_FLG__); return rwmtx->rq_end->next != rwmtx->mtxb.q; } } int check_readers_array(ethr_rwmutex *rwmtx, int start_rix, int length) { int ix = start_rix; ETHR_MEMORY_BARRIER; do { long act = rwmutex_freqread_rdrs_read(rwmtx, ix); if (act != 0) return EBUSY; ix++; if (ix == length) ix = 0; } while (ix != start_rix); return 0; } static ETHR_INLINE void rwmutex_freqread_restore_failed_tryrlock(ethr_rwmutex *rwmtx, ethr_ts_event *tse) { long act; /* * Restore failed increment */ act = rwmutex_freqread_rdrs_dec_read(rwmtx, tse); ETHR_WRITE_MEMORY_BARRIER; if (act == 0) { #ifndef ETHR_WRITE_MEMORY_BARRIER_IS_FULL ETHR_READ_MEMORY_BARRIER; #endif act = ethr_atomic_read(&rwmtx->mtxb.flgs); if ((act & ETHR_RWMTX_W_FLG__) == 0 && act & (ETHR_RWMTX_WAIT_FLGS__|ETHR_RWMTX_R_PEND_UNLCK_MASK__)) { /* * We either got waiters, or someone else trying * to read unlock which we might have to help. */ rwmutex_try_complete_runlock(rwmtx, act, tse, 1, 1, 0); } } } static int rwmutex_try_complete_runlock(ethr_rwmutex *rwmtx, long initial, ethr_ts_event *tse, int start_next_ix, int check_before_try, int try_write_lock) { ethr_ts_event *tse_tmp; long act = initial; int six, res, length; tse_tmp = tse; if (!tse_tmp) tse_tmp = ethr_get_ts_event(); if ((act & ETHR_RWMTX_WAIT_FLGS__) && (act & ~ETHR_RWMTX_WAIT_FLGS__) == 0) goto check_waiters; if (rwmtx->type == ETHR_RWMUTEX_TYPE_FREQUENT_READ) { length = reader_groups_array_size; six = tse_tmp->rgix; } else { length = main_threads_array_size; six = tse_tmp->mtix; } if (start_next_ix) { six++; if (six >= length) six = 0; } if (!tse) ethr_leave_ts_event(tse_tmp); if (check_before_try) { res = check_readers_array(rwmtx, six, length); if (res == EBUSY) return try_write_lock ? EBUSY : 0; } while (1) { long exp = act; long new = act+1; ETHR_ASSERT((act & ETHR_RWMTX_R_PEND_UNLCK_MASK__) < ETHR_RWMTX_R_PEND_UNLCK_MASK__); act = ethr_atomic_cmpxchg(&rwmtx->mtxb.flgs, new, exp); if (exp == act) { act = new; break; } if (!try_write_lock) { if (act == ETHR_RWMTX_W_FLG__ || act == 0) return 0; if ((act & ETHR_RWMTX_WAIT_FLGS__) == 0) { if ((act & ETHR_RWMTX_R_FLG__) == 0) return 0; } else if ((act & ETHR_RWMTX_R_FLG__) == 0) { if (act & ETHR_RWMTX_R_PEND_UNLCK_MASK__) return 0; goto check_waiters; } } else { if (act == 0) goto tryrwlock; if (act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_WAIT_FLGS__)) return EBUSY; } } res = check_readers_array(rwmtx, six, length); if (res == EBUSY) { act = ethr_atomic_dec_read(&rwmtx->mtxb.flgs); if (act & ETHR_RWMTX_R_MASK__) return try_write_lock ? EBUSY : 0; } else { while (1) { long exp = act; long new = act; new &= ~ETHR_RWMTX_R_FLG__; new--; ETHR_ASSERT(act & ETHR_RWMTX_R_PEND_UNLCK_MASK__); act = ethr_atomic_cmpxchg(&rwmtx->mtxb.flgs, new, exp); if (exp == act) { if (new & ETHR_RWMTX_R_PEND_UNLCK_MASK__) return try_write_lock ? EBUSY : 0; act = new; break; } } } /* * Read unlock completed, but we have to check if * threads have to be woken (or if we should try * to write lock it). */ if (act & ETHR_RWMTX_W_FLG__) return try_write_lock ? EBUSY : 0; if (act & ETHR_RWMTX_WAIT_FLGS__) { check_waiters: rwmutex_unlock_wake(rwmtx, 0, act); return try_write_lock ? EBUSY : 0; } if (!try_write_lock) return 0; tryrwlock: /* Try to write lock it */ act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, ETHR_RWMTX_W_FLG__, 0); return act == 0 ? 0 : EBUSY; } #ifdef ETHR_RLOCK_WITH_INC_DEC static ETHR_INLINE void rwmutex_incdec_restore_failed_tryrlock(ethr_rwmutex *rwmtx) { long act; /* * Restore failed increment */ act = ethr_atomic_dec_read(&rwmtx->mtxb.flgs); if ((act & ETHR_RWMTX_WAIT_FLGS__) && (act & ~ETHR_RWMTX_WAIT_FLGS__) == 0) { rwmutex_unlock_wake(rwmtx, 0, act); } } #endif static void rwmutex_normal_rlock_wait(ethr_rwmutex *rwmtx, long initial) { long act = initial, exp; int scnt, start_scnt; ethr_ts_event *tse = NULL; int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; start_scnt = scnt = initial_spincount(&rwmtx->mtxb); /* * Spin trying to read lock for a while. If unsuccessful, * wait on event. */ while (1) { #ifdef ETHR_RLOCK_WITH_INC_DEC rwmutex_incdec_restore_failed_tryrlock(rwmtx); act = ethr_atomic_read(&rwmtx->mtxb.flgs); #endif while (act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_W_WAIT_FLG__)) { if (scnt >= 0) { tse = ethr_get_ts_event(); if (update_spincount(&rwmtx->mtxb, tse, &start_scnt, &scnt)) { event_wait(&rwmtx->mtxb, tse, scnt, ETHR_RWMTX_R_WAIT_FLG__, 1, 0); goto done; } } ETHR_SPIN_BODY; if (--until_yield == 0) { until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; ETHR_YIELD(); } act = ethr_atomic_read(&rwmtx->mtxb.flgs); scnt--; } exp = act; #ifdef ETHR_RLOCK_WITH_INC_DEC act = ethr_atomic_inc_read(&rwmtx->mtxb.flgs); if ((act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_W_WAIT_FLG__)) == 0) goto done; /* Got it */ #else act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, exp+1, exp); if (act == exp) goto done; /* Got it */ #endif } done: if (tse) ethr_leave_ts_event(tse); } static void rwmutex_freqread_rlock_wait(ethr_rwmutex *rwmtx, ethr_ts_event *tse, long initial) { long act = initial; int scnt, start_scnt; int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; start_scnt = scnt = initial_spincount(&rwmtx->mtxb); /* * Spin trying to read lock for a while. If unsuccessful, * wait on event. */ while (1) { rwmutex_freqread_restore_failed_tryrlock(rwmtx, tse); act = ethr_atomic_read(&rwmtx->mtxb.flgs); while (act & ~(ETHR_RWMTX_R_FLG__|ETHR_RWMTX_R_WAIT_FLG__)) { if (scnt >= 0) { if (update_spincount(&rwmtx->mtxb, tse, &start_scnt, &scnt)) { event_wait(&rwmtx->mtxb, tse, scnt, ETHR_RWMTX_R_WAIT_FLG__, 1, 1); return; /* Got it */ } } ETHR_SPIN_BODY; if (--until_yield == 0) { until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; ETHR_YIELD(); } act = ethr_atomic_read(&rwmtx->mtxb.flgs); scnt--; } rwmutex_freqread_rdrs_inc(rwmtx, tse); ETHR_MEMORY_BARRIER; act = ethr_atomic_read(&rwmtx->mtxb.flgs); if (act == ETHR_RWMTX_R_FLG__) return; /* Got it */ while (1) { long exp, new; if (act & ~(ETHR_RWMTX_R_FLG__|ETHR_RWMTX_R_WAIT_FLG__)) break; /* Busy (need to restore inc) */ if (act & ETHR_RWMTX_R_FLG__) return; /* Got it */ exp = act; new = act | ETHR_RWMTX_R_FLG__; act = ethr_atomic_cmpxchg(&rwmtx->mtxb.flgs, new, exp); if (act == exp) return; /* Got it */ } } } static void rwmutex_normal_rwlock_wait(ethr_rwmutex *rwmtx, long initial) { write_lock_wait(&rwmtx->mtxb, initial, 1, 0); } static void rwmutex_freqread_rwlock_wait(ethr_rwmutex *rwmtx, long initial) { write_lock_wait(&rwmtx->mtxb, initial, 1, 1); } static ETHR_INLINE void rwlock_wake_set_flags(ethr_rwmutex *rwmtx, long new_initial, int act_initial) { long act, act_mask; if (rwmtx->type != ETHR_RWMUTEX_TYPE_NORMAL) { /* r pend unlock mask may vary and must be retained */ act_mask = ETHR_RWMTX_R_PEND_UNLCK_MASK__; } else { #ifdef ETHR_RLOCK_WITH_INC_DEC /* rs mask may vary and must be retained */ act_mask = ETHR_RWMTX_RS_MASK__; #else /* rs mask always zero */ ETHR_ASSERT((act_initial & ETHR_RWMTX_RS_MASK__) == 0); ethr_atomic_set(&rwmtx->mtxb.flgs, new_initial); return; #endif } act = act_initial; while (1) { long exp = act; long new = new_initial + (act & act_mask); act = ethr_atomic_cmpxchg(&rwmtx->mtxb.flgs, new, exp); if (act == exp) break; exp = act; } } #ifdef ETHR_DEBUG static void dbg_unlock_wake(ethr_rwmutex *rwmtx, int have_w, ethr_ts_event *tse) { long exp, act, imask; exp = have_w ? ETHR_RWMTX_W_FLG__ : 0; if (rwmtx->type != ETHR_RWMUTEX_TYPE_NORMAL) imask = ETHR_RWMTX_R_PEND_UNLCK_MASK__; else { #ifdef ETHR_RLOCK_WITH_INC_DEC imask = ETHR_RWMTX_RS_MASK__; #else imask = 0; #endif } ETHR_ASSERT(tse); if (is_w_waiter(tse)) { exp |= ETHR_RWMTX_W_WAIT_FLG__; if (rwmtx->rq_end) { exp |= ETHR_RWMTX_R_WAIT_FLG__; } act = ethr_atomic_read(&rwmtx->mtxb.flgs); ETHR_ASSERT((exp & ~imask) == (act & ~imask)); ETHR_RWMTX_HARD_DEBUG_CHK_Q(rwmtx); } else { exp |= ETHR_RWMTX_R_WAIT_FLG__; if (rwmtx->rq_end->next != rwmtx->mtxb.q) exp |= ETHR_RWMTX_W_WAIT_FLG__; act = ethr_atomic_read(&rwmtx->mtxb.flgs); ETHR_ASSERT((exp & ~imask) == (act & ~imask)); ETHR_RWMTX_HARD_DEBUG_CHK_Q(rwmtx); } } #endif static void rwmutex_unlock_wake(ethr_rwmutex *rwmtx, int have_w, long initial) { long new, act = initial; ethr_ts_event *tse; if ((act & ETHR_RWMTX_WAIT_FLGS__) == 0) { if (!have_w) return; else { while ((act & ETHR_RWMTX_WAIT_FLGS__) == 0) { long exp = act; new = exp & ~ETHR_RWMTX_W_FLG__; act = ethr_atomic_cmpxchg(&rwmtx->mtxb.flgs, new, exp); if (act == exp) return; } } } ETHR_MTX_Q_LOCK(&rwmtx->mtxb.qlck); tse = rwmtx->mtxb.q; if (!have_w) { if (!tse) { #ifdef ETHR_DEBUG act = ethr_atomic_read(&rwmtx->mtxb.flgs); ETHR_ASSERT((act & ETHR_RWMTX_WAIT_FLGS__) == 0); #endif goto already_served; } act = ethr_atomic_read(&rwmtx->mtxb.flgs); if (act & ~ETHR_RWMTX_WAIT_FLGS__) { already_served: ETHR_MTX_Q_UNLOCK(&rwmtx->mtxb.qlck); return; } } #ifdef ETHR_DEBUG dbg_unlock_wake(rwmtx, have_w, tse); #endif if (is_w_waiter(tse)) { if (!have_w) { act = ethr_atomic_read_bor(&rwmtx->mtxb.flgs, ETHR_RWMTX_W_FLG__); ETHR_ASSERT((act & ~ETHR_RWMTX_WAIT_FLGS__) == 0); ETHR_ASSERT(act & ETHR_RWMTX_W_WAIT_FLG__); act |= ETHR_RWMTX_W_FLG__; } /* * If we have multiple write waiters, there * is no need to modify mtxb->flgs; otherwise, * we need to clear the write wait bit... */ if (!multiple_w_waiters(rwmtx)) { new = ETHR_RWMTX_W_FLG__; if (tse->next != rwmtx->mtxb.q) { ETHR_ASSERT(tse->next->uflgs == ETHR_RWMTX_R_WAIT_FLG__); new |= ETHR_RWMTX_R_WAIT_FLG__; } rwlock_wake_set_flags(rwmtx, new, act); } wake_writer(&rwmtx->mtxb, 1); } else { int rs; if (rwmtx->type == ETHR_RWMUTEX_TYPE_NORMAL) { rs = rwmtx->tdata.rs; new = (long) rs; rwmtx->tdata.rs = 0; } else { ethr_rwmutex_type type = rwmtx->type; int length = (type == ETHR_RWMUTEX_TYPE_FREQUENT_READ ? reader_groups_array_size : main_threads_array_size); int ix; rs = 0; for (ix = 0; ix < length; ix++) { int wrs = rwmtx->tdata.ra[ix].data.waiting_readers; rwmtx->tdata.ra[ix].data.waiting_readers = 0; ETHR_ASSERT(wrs >= 0); if (wrs) { rs += wrs; rwmutex_freqread_rdrs_add(rwmtx, type, ix, wrs); } } new = ETHR_RWMTX_R_FLG__; } if (rwmtx->rq_end->next != rwmtx->mtxb.q) new |= ETHR_RWMTX_W_WAIT_FLG__; rwlock_wake_set_flags(rwmtx, new, act); wake_readers(rwmtx, rs); } } static ethr_rwmtx_readers_array__ * alloc_readers_array(int length, ethr_rwmutex_lived lived) { ethr_rwmtx_readers_array__ *ra; size_t sz; void *mem; sz = sizeof(ethr_rwmtx_readers_array__) * (length + 1); switch (lived) { case ETHR_RWMUTEX_LONG_LIVED: mem = ethr_mem__.ll.alloc(sz); break; case ETHR_RWMUTEX_SHORT_LIVED: mem = ethr_mem__.sl.alloc(sz); break; default: mem = ethr_mem__.std.alloc(sz); break; } if (!mem) return NULL; if ((((unsigned long) mem) & ETHR_CACHE_LINE_MASK) == 0) { ra = (ethr_rwmtx_readers_array__ *) mem; ra->data.byte_offset = 0; } else { ra = ((ethr_rwmtx_readers_array__ *) ((((unsigned long) mem) & ~ETHR_CACHE_LINE_MASK) + ETHR_CACHE_LINE_SIZE)); ra->data.byte_offset = (int) ((unsigned long) ra - (unsigned long) mem); } ra->data.lived = lived; return ra; } static void free_readers_array(ethr_rwmtx_readers_array__ *ra) { void *ptr = (void *) (((char *) ra) - ra->data.byte_offset); switch (ra->data.lived) { case ETHR_RWMUTEX_LONG_LIVED: ethr_mem__.ll.free(ptr); break; case ETHR_RWMUTEX_SHORT_LIVED: ethr_mem__.sl.free(ptr); break; default: ethr_mem__.std.free(ptr); break; } } int ethr_rwmutex_init_opt(ethr_rwmutex *rwmtx, ethr_rwmutex_opt *opt) { int res; ethr_rwmtx_readers_array__ *ra = NULL; #if ETHR_XCHK if (ethr_not_completely_inited__) { ETHR_ASSERT(0); return EACCES; } if (!rwmtx) { ETHR_ASSERT(0); return EINVAL; } rwmtx->initialized = ETHR_RWMUTEX_INITIALIZED; #endif ETHR_MTX_HARD_DEBUG_FENCE_INIT(rwmtx); rwmtx->rq_end = NULL; rwmtx->type = opt ? opt->type : ETHR_RWMUTEX_TYPE_NORMAL; switch (rwmtx->type) { case ETHR_RWMUTEX_TYPE_FREQUENT_READ: if (main_threads_array_size <= reader_groups_array_size) { /* No point using reader groups... */ rwmtx->type = ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ; } /* Fall through */ case ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ: { int length; length = (rwmtx->type == ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ ? main_threads_array_size : reader_groups_array_size); if (length == 1) { /* No point using a frequent reader type... */ rwmtx->type = ETHR_RWMUTEX_TYPE_NORMAL; } else { int ix; ra = alloc_readers_array(length, (opt ? opt->lived : ETHR_RWMUTEX_UNKNOWN_LIVED)); if (!ra) { res = ENOMEM; goto error; } rwmtx->tdata.ra = ra; for (ix = 0; ix < length; ix++) { ethr_atomic_init(&rwmtx->tdata.ra[ix].data.readers, 0); rwmtx->tdata.ra[ix].data.waiting_readers = 0; } break; } } case ETHR_RWMUTEX_TYPE_NORMAL: rwmtx->tdata.rs = 0; break; default: res = EINVAL; goto error; } res = mtxb_init(&rwmtx->mtxb, default_rwmtx_main_spincount, opt ? opt->main_spincount : -1, default_rwmtx_aux_spincount, opt ? opt->aux_spincount : -1); if (res == 0) return 0; error: if (ra) free_readers_array(ra); #if ETHR_XCHK rwmtx->initialized = 0; #endif return res; } int ethr_rwmutex_init(ethr_rwmutex *rwmtx) { return ethr_rwmutex_init_opt(rwmtx, NULL); } int ethr_rwmutex_destroy(ethr_rwmutex *rwmtx) { int res; #if ETHR_XCHK if (ethr_not_inited__) { ETHR_ASSERT(0); return EACCES; } if (!rwmtx || rwmtx->initialized != ETHR_RWMUTEX_INITIALIZED) { ETHR_ASSERT(0); return EINVAL; } #endif if (rwmtx->type != ETHR_RWMUTEX_TYPE_NORMAL) { long act = ethr_atomic_read(&rwmtx->mtxb.flgs); if (act == ETHR_RWMTX_R_FLG__) rwmutex_try_complete_runlock(rwmtx, act, NULL, 0, 0, 0); } res = mtxb_destroy(&rwmtx->mtxb); if (res != 0) return res; if (rwmtx->type != ETHR_RWMUTEX_TYPE_NORMAL) free_readers_array(rwmtx->tdata.ra); #if ETHR_XCHK rwmtx->initialized = 0; #endif return 0; } #define ETHR_MAX_TRYRLOCK_TRIES 5 int ethr_rwmutex_tryrlock(ethr_rwmutex *rwmtx) { int res = 0; long act; ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(rwmtx); ETHR_ASSERT(rwmtx->initialized == ETHR_RWMUTEX_INITIALIZED); ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); switch (rwmtx->type) { case ETHR_RWMUTEX_TYPE_NORMAL: { #ifdef ETHR_RLOCK_WITH_INC_DEC act = ethr_atomic_read(&rwmtx->mtxb.flgs); if (act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_W_WAIT_FLG__)) res = EBUSY; else { act = ethr_atomic_inc_read_acqb(&rwmtx->mtxb.flgs); if (act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_W_WAIT_FLG__)) { rwmutex_incdec_restore_failed_tryrlock(rwmtx); res = EBUSY; } } #else long exp = 0; int tries = 0; while (1) { act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, exp+1, exp); if (act == exp) { res = 0; break; } if (tries > ETHR_MAX_TRYRLOCK_TRIES || (act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_W_WAIT_FLG__))) { res = EBUSY; break; } tries++; exp = act; } #endif break; } case ETHR_RWMUTEX_TYPE_FREQUENT_READ: case ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ: { ethr_ts_event *tse = ethr_get_ts_event(); rwmutex_freqread_rdrs_inc(rwmtx, tse); ETHR_MEMORY_BARRIER; act = ethr_atomic_read_acqb(&rwmtx->mtxb.flgs); if (act != ETHR_RWMTX_R_FLG__) { while (1) { long exp, new; if (act & ~(ETHR_RWMTX_R_FLG__|ETHR_RWMTX_R_WAIT_FLG__)) { rwmutex_freqread_restore_failed_tryrlock(rwmtx, tse); res = EBUSY; break; } if (act & ETHR_RWMTX_R_FLG__) break; exp = act; new = act | ETHR_RWMTX_R_FLG__; act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, new, exp); if (act == exp) break; } } ethr_leave_ts_event(tse); break; } } ETHR_MTX_HARD_DEBUG_LFS_TRYRLOCK(&rwmtx->mtxb, res); ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); return res; } void ethr_rwmutex_rlock(ethr_rwmutex *rwmtx) { long act; ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(rwmtx); ETHR_ASSERT(rwmtx->initialized == ETHR_RWMUTEX_INITIALIZED); ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); switch (rwmtx->type) { case ETHR_RWMUTEX_TYPE_NORMAL: { #ifdef ETHR_RLOCK_WITH_INC_DEC act = ethr_atomic_inc_read_acqb(&rwmtx->mtxb.flgs); if (act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_W_WAIT_FLG__)) rwmutex_normal_rlock_wait(rwmtx, act); #else long exp = 0; while (1) { act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, exp+1, exp); if (act == exp) { break; } if (act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_W_WAIT_FLG__)) { rwmutex_normal_rlock_wait(rwmtx, act); break; } exp = act; } #endif break; } case ETHR_RWMUTEX_TYPE_FREQUENT_READ: case ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ: { ethr_ts_event *tse = ethr_get_ts_event(); rwmutex_freqread_rdrs_inc(rwmtx, tse); ETHR_MEMORY_BARRIER; act = ethr_atomic_read_acqb(&rwmtx->mtxb.flgs); if (act != ETHR_RWMTX_R_FLG__) { while (1) { long exp, new; if (act & ~(ETHR_RWMTX_R_FLG__|ETHR_RWMTX_R_WAIT_FLG__)) { rwmutex_freqread_rlock_wait(rwmtx, tse, act); break; } if (act & ETHR_RWMTX_R_FLG__) break; exp = act; new = act | ETHR_RWMTX_R_FLG__; act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, new, exp); if (act == exp) break; } } ethr_leave_ts_event(tse); break; } } ETHR_MTX_HARD_DEBUG_LFS_RLOCK(&rwmtx->mtxb); ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); } void ethr_rwmutex_runlock(ethr_rwmutex *rwmtx) { long act; ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(rwmtx); ETHR_ASSERT(rwmtx->initialized == ETHR_RWMUTEX_INITIALIZED); ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); ETHR_MTX_HARD_DEBUG_LFS_RUNLOCK(&rwmtx->mtxb); switch (rwmtx->type) { case ETHR_RWMUTEX_TYPE_NORMAL: act = ethr_atomic_dec_read_relb(&rwmtx->mtxb.flgs); if ((act & ETHR_RWMTX_WAIT_FLGS__) && (act & ~ETHR_RWMTX_WAIT_FLGS__) == 0) { ETHR_ASSERT((act & ETHR_RWMTX_W_FLG__) == 0); rwmutex_unlock_wake(rwmtx, 0, act); } break; case ETHR_RWMUTEX_TYPE_FREQUENT_READ: case ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ: { ethr_ts_event *tse = ethr_get_ts_event(); act = rwmutex_freqread_rdrs_dec_read_relb(rwmtx, tse); ETHR_ASSERT(act >= 0); ETHR_WRITE_MEMORY_BARRIER; if (act == 0) { #ifndef ETHR_WRITE_MEMORY_BARRIER_IS_FULL ETHR_READ_MEMORY_BARRIER; #endif act = ethr_atomic_read(&rwmtx->mtxb.flgs); if ((act & ETHR_RWMTX_W_FLG__) == 0 && (act & (ETHR_RWMTX_WAIT_FLGS__ | ETHR_RWMTX_R_PEND_UNLCK_MASK__))) { rwmutex_try_complete_runlock(rwmtx, act, tse, 1, 0, 0); } } ethr_leave_ts_event(tse); break; } } ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); } int ethr_rwmutex_tryrwlock(ethr_rwmutex *rwmtx) { int res = 0; long act; ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(rwmtx); ETHR_ASSERT(rwmtx->initialized == ETHR_RWMUTEX_INITIALIZED); ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); switch (rwmtx->type) { case ETHR_RWMUTEX_TYPE_NORMAL: act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, ETHR_RWMTX_W_FLG__, 0); if (act != 0) res = EBUSY; break; case ETHR_RWMUTEX_TYPE_FREQUENT_READ: case ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ: res = 0; act = ethr_atomic_read(&rwmtx->mtxb.flgs); do { if (act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_WAIT_FLGS__)) { res = EBUSY; break; } if (act & ETHR_RWMTX_R_MASK__) { res = rwmutex_try_complete_runlock(rwmtx, act, NULL, 0, 1, 1); break; } act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, ETHR_RWMTX_W_FLG__, 0); } while (act != 0); break; } ETHR_MTX_HARD_DEBUG_LFS_TRYRWLOCK(&rwmtx->mtxb, res); ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); return res; } void ethr_rwmutex_rwlock(ethr_rwmutex *rwmtx) { long act; ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(rwmtx); ETHR_ASSERT(rwmtx->initialized == ETHR_RWMUTEX_INITIALIZED); ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); switch (rwmtx->type) { case ETHR_RWMUTEX_TYPE_NORMAL: act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, ETHR_RWMTX_W_FLG__, 0); if (act != 0) rwmutex_normal_rwlock_wait(rwmtx, act); break; case ETHR_RWMUTEX_TYPE_FREQUENT_READ: case ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ: act = ethr_atomic_read(&rwmtx->mtxb.flgs); do { if (act != 0) { rwmutex_freqread_rwlock_wait(rwmtx, act); break; } act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, ETHR_RWMTX_W_FLG__, 0); } while (act != 0); break; } ETHR_MTX_HARD_DEBUG_LFS_RWLOCK(&rwmtx->mtxb); ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); } void ethr_rwmutex_rwunlock(ethr_rwmutex *rwmtx) { long act; ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(rwmtx); ETHR_ASSERT(rwmtx->initialized == ETHR_RWMUTEX_INITIALIZED); ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); ETHR_MTX_HARD_DEBUG_LFS_RWUNLOCK(&rwmtx->mtxb); switch (rwmtx->type) { case ETHR_RWMUTEX_TYPE_NORMAL: act = ethr_atomic_cmpxchg_relb(&rwmtx->mtxb.flgs, 0, ETHR_RWMTX_W_FLG__); if (act != ETHR_RWMTX_W_FLG__) rwmutex_unlock_wake(rwmtx, 1, act); break; case ETHR_RWMUTEX_TYPE_FREQUENT_READ: case ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ: act = ethr_atomic_cmpxchg_relb(&rwmtx->mtxb.flgs, 0, ETHR_RWMTX_W_FLG__); if (act != ETHR_RWMTX_W_FLG__) rwmutex_unlock_wake(rwmtx, 1, act); break; } ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); } #else /* -- pthread read/write mutex --------------------------------------------- */ int ethr_rwmutex_init(ethr_rwmutex *rwmtx) { #if ETHR_XCHK if (!rwmtx) { ETHR_ASSERT(0); return EINVAL; } rwmtx->initialized = ETHR_RWMUTEX_INITIALIZED; #endif return pthread_rwlock_init(&rwmtx->pt_rwlock, write_pref_attr); } int ethr_rwmutex_init_opt(ethr_rwmutex *rwmtx, ethr_rwmutex_opt *opt) { return ethr_rwmutex_init(rwmtx); } int ethr_rwmutex_destroy(ethr_rwmutex *rwmtx) { int res; #if ETHR_XCHK if (ethr_not_inited__) { ETHR_ASSERT(0); return EACCES; } if (!rwmtx || rwmtx->initialized != ETHR_RWMUTEX_INITIALIZED) { ETHR_ASSERT(0); return EINVAL; } #endif res = pthread_rwlock_destroy(&rwmtx->pt_rwlock); #if ETHR_XCHK rwmtx->initialized = 0; #endif return res; } /* -- Exported symbols of inline functions --------------------------------- */ int ethr_rwmutex_tryrlock(ethr_rwmutex *rwmtx) { ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(rwmtx); ETHR_ASSERT(rwmtx->initialized == ETHR_RWMUTEX_INITIALIZED); return ethr_rwmutex_tryrlock__(rwmtx); } void ethr_rwmutex_rlock(ethr_rwmutex *rwmtx) { ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(rwmtx); ETHR_ASSERT(rwmtx->initialized == ETHR_RWMUTEX_INITIALIZED); ethr_rwmutex_rlock__(rwmtx); } void ethr_rwmutex_runlock(ethr_rwmutex *rwmtx) { ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(rwmtx); ETHR_ASSERT(rwmtx->initialized == ETHR_RWMUTEX_INITIALIZED); ethr_rwmutex_runlock__(rwmtx); } int ethr_rwmutex_tryrwlock(ethr_rwmutex *rwmtx) { ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(rwmtx); ETHR_ASSERT(rwmtx->initialized == ETHR_RWMUTEX_INITIALIZED); return ethr_rwmutex_tryrwlock__(rwmtx); } void ethr_rwmutex_rwlock(ethr_rwmutex *rwmtx) { ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(rwmtx); ETHR_ASSERT(rwmtx->initialized == ETHR_RWMUTEX_INITIALIZED); return ethr_rwmutex_rwlock__(rwmtx); } void ethr_rwmutex_rwunlock(ethr_rwmutex *rwmtx) { ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(rwmtx); ETHR_ASSERT(rwmtx->initialized == ETHR_RWMUTEX_INITIALIZED); ethr_rwmutex_rwunlock__(rwmtx); } #endif /* pthread */ #if defined(ETHR_USE_OWN_RWMTX_IMPL__) || defined(ETHR_USE_OWN_MTX_IMPL__) #ifdef ETHR_MTX_HARD_DEBUG_Q static void hard_debug_chk_q__(struct ethr_mutex_base_ *mtxb, int is_rwmtx) { int res; long flgs = ethr_atomic_read(&mtxb->flgs); ETHR_MTX_HARD_ASSERT(res == 0); ETHR_MTX_HARD_ASSERT(!(flgs & ETHR_RWMTX_R_WAIT_FLG__) || is_rwmtx); if (!(flgs & ETHR_RWMTX_WAIT_FLGS__)) { ETHR_MTX_HARD_ASSERT(!mtxb->q); if (is_rwmtx) { ethr_rwmutex *rwmtx = (ethr_rwmutex *) mtxb; ETHR_MTX_HARD_ASSERT(!rwmtx->rq_end); ETHR_MTX_HARD_ASSERT(!rwmtx->rs); } } else { ethr_ts_event *tse; int ws = 0, rs = 0, rsf = 0, ref = 0; ETHR_MTX_HARD_ASSERT(mtxb->q); tse = mtxb->q; do { long type; ETHR_MTX_HARD_ASSERT(tse->next->prev == tse); ETHR_MTX_HARD_ASSERT(tse->prev->next == tse); type = ethr_atomic_read(&tse->uaflgs); ETHR_MTX_HARD_ASSERT(type == tse->uflgs); switch (type) { case ETHR_RWMTX_W_WAIT_FLG__: ws++; break; case ETHR_RWMTX_R_WAIT_FLG__: { ethr_rwmutex *rwmtx = (ethr_rwmutex *) mtxb; ETHR_MTX_HARD_ASSERT(is_rwmtx); if (!rsf) rsf = 1; ETHR_MTX_HARD_ASSERT(!ref); if (rwmtx->rq_end == tse) { ETHR_MTX_HARD_ASSERT( tse->next == rwmtx->mtxb.q || tse->next->uflgs == ETHR_RWMTX_W_WAIT_FLG__); ref = 1; } rs++; break; } default: ETHR_MTX_HARD_ASSERT(! "invalid wait type found"); } tse = tse->next; } while (tse != mtxb->q); if (is_rwmtx) { ethr_rwmutex *rwmtx = (ethr_rwmutex *) mtxb; ETHR_MTX_HARD_ASSERT(rs == rwmtx->rs); } #ifdef ETHR_MTX_HARD_DEBUG_WSQ ETHR_MTX_HARD_ASSERT(ws == mtxb->ws); #endif if (flgs & ETHR_RWMTX_W_WAIT_FLG__) ETHR_MTX_HARD_ASSERT(ws); else ETHR_MTX_HARD_ASSERT(!ws); if (flgs & ETHR_RWMTX_R_WAIT_FLG__) { ethr_rwmutex *rwmtx = (ethr_rwmutex *) mtxb; ETHR_MTX_HARD_ASSERT(is_rwmtx); ETHR_MTX_HARD_ASSERT(rwmtx->rq_end); ETHR_MTX_HARD_ASSERT(rsf); ETHR_MTX_HARD_ASSERT(ref); ETHR_MTX_HARD_ASSERT(rs); } else { if (is_rwmtx) { ethr_rwmutex *rwmtx = (ethr_rwmutex *) mtxb; ETHR_MTX_HARD_ASSERT(!rwmtx->rq_end); } ETHR_MTX_HARD_ASSERT(!rsf); ETHR_MTX_HARD_ASSERT(!ref); ETHR_MTX_HARD_ASSERT(!rs); } } } #elif defined(ETHR_MTX_HARD_DEBUG_WSQ) static void hard_debug_chk_q__(struct ethr_mutex_base_ *mtxb, int is_rwmtx) { int ws = 0; int rs = 0; if (mtxb->q) { ethr_ts_event *tse = mtxb->q; do { switch (tse->uflgs) { case ETHR_RWMTX_W_WAIT_FLG__: ws++; break; case ETHR_RWMTX_R_WAIT_FLG__: rs++; break; default: ETHR_MTX_HARD_ASSERT(0); break; } tse = tse->next; } while (tse != mtxb->q); } ETHR_MTX_HARD_ASSERT(mtxb->ws == ws); if (is_rwmtx) { ethr_rwmutex *rwmtx = (ethr_rwmutex *) mtxb; ETHR_MTX_HARD_ASSERT(rwmtx->rs == rs); } } #endif #endif