diff options
author | Rickard Green <[email protected]> | 2010-08-10 13:42:42 +0200 |
---|---|---|
committer | Rickard Green <[email protected]> | 2010-08-10 13:42:42 +0200 |
commit | 59ee2a593090e7d53c97ceba63cbd300d1b9657e (patch) | |
tree | b60c1078eebb64955bba181cfc118ee2f3b845d4 /erts/emulator/beam/erl_process_lock.c | |
parent | 0d553b45b5c3ae8287340887f271bc70f1f1370c (diff) | |
parent | 300b419486c1ca88e33938f182d5d5a8b90fb73f (diff) | |
download | otp-59ee2a593090e7d53c97ceba63cbd300d1b9657e.tar.gz otp-59ee2a593090e7d53c97ceba63cbd300d1b9657e.tar.bz2 otp-59ee2a593090e7d53c97ceba63cbd300d1b9657e.zip |
Merge branch 'rickard/ethread-rewrite/OTP-8544' into dev
* rickard/ethread-rewrite/OTP-8544:
Rewrite ethread library
Diffstat (limited to 'erts/emulator/beam/erl_process_lock.c')
-rw-r--r-- | erts/emulator/beam/erl_process_lock.c | 350 |
1 files changed, 181 insertions, 169 deletions
diff --git a/erts/emulator/beam/erl_process_lock.c b/erts/emulator/beam/erl_process_lock.c index 52440fb635..a4d12139e9 100644 --- a/erts/emulator/beam/erl_process_lock.c +++ b/erts/emulator/beam/erl_process_lock.c @@ -1,19 +1,19 @@ /* * %CopyrightBegin% - * - * Copyright Ericsson AB 2007-2009. All Rights Reserved. - * + * + * Copyright Ericsson AB 2007-2010. All Rights Reserved. + * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved online at http://www.erlang.org/. - * + * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. - * + * * %CopyrightEnd% */ @@ -71,9 +71,12 @@ const Process erts_proc_lock_busy; #ifdef ERTS_SMP -/*#define ERTS_PROC_LOCK_SPIN_ON_GATE*/ -#define ERTS_PROC_LOCK_SPIN_COUNT_MAX 16000 +#define ERTS_PROC_LOCK_SPIN_COUNT_MAX 2000 +#define ERTS_PROC_LOCK_SPIN_COUNT_SCHED_INC 32 #define ERTS_PROC_LOCK_SPIN_COUNT_BASE 1000 +#define ERTS_PROC_LOCK_AUX_SPIN_COUNT 50 + +#define ERTS_PROC_LOCK_SPIN_UNTIL_YIELD 25 #ifdef ERTS_PROC_LOCK_DEBUG #define ERTS_PROC_LOCK_HARD_DEBUG @@ -83,32 +86,19 @@ const Process erts_proc_lock_busy; static void check_queue(erts_proc_lock_t *lck); #endif - -typedef struct erts_proc_lock_waiter_t_ erts_proc_lock_waiter_t; -struct erts_proc_lock_waiter_t_ { - erts_proc_lock_waiter_t *next; - erts_proc_lock_waiter_t *prev; - ErtsProcLocks wait_locks; - erts_smp_gate_t gate; - erts_proc_lock_queues_t *queues; -}; +#if SIZEOF_INT < 4 +#error "The size of the 'uflgs' field of the erts_tse_t type is too small" +#endif struct erts_proc_lock_queues_t_ { erts_proc_lock_queues_t *next; - erts_proc_lock_waiter_t *queue[ERTS_PROC_LOCK_MAX_BIT+1]; -}; - -struct erts_proc_lock_thr_spec_data_t_ { - erts_proc_lock_queues_t *qs; - erts_proc_lock_waiter_t *wtr; + erts_tse_t *queue[ERTS_PROC_LOCK_MAX_BIT+1]; }; static erts_proc_lock_queues_t zeroqs = {0}; -static erts_smp_spinlock_t wtr_lock; -static erts_proc_lock_waiter_t *waiter_free_list; +static erts_smp_spinlock_t qs_lock; static erts_proc_lock_queues_t *queue_free_list; -static erts_tsd_key_t waiter_key; #ifdef ERTS_ENABLE_LOCK_CHECK static struct { @@ -122,35 +112,26 @@ static struct { erts_pix_lock_t erts_pix_locks[ERTS_NO_OF_PIX_LOCKS]; static int proc_lock_spin_count; -static int proc_lock_trans_spin_cost; +static int aux_thr_proc_lock_spin_count; -static void cleanup_waiter(void); +static void cleanup_tse(void); void erts_init_proc_lock(void) { int i; int cpus; - erts_smp_spinlock_init(&wtr_lock, "proc_lck_wtr_alloc"); + erts_smp_spinlock_init(&qs_lock, "proc_lck_qs_alloc"); for (i = 0; i < ERTS_NO_OF_PIX_LOCKS; i++) { -#if ERTS_PROC_LOCK_MUTEX_IMPL -#ifdef ERTS_ENABLE_LOCK_COUNT - erts_smp_mtx_init_x(&erts_pix_locks[i].u.mtx, "pix_lock", make_small(i)); -#else - erts_smp_mtx_init(&erts_pix_locks[i].u.mtx, "pix_lock"); -#endif -#else #ifdef ERTS_ENABLE_LOCK_COUNT - erts_smp_spinlock_init_x(&erts_pix_locks[i].u.spnlck, "pix_lock", make_small(i)); + erts_smp_spinlock_init_x(&erts_pix_locks[i].u.spnlck, + "pix_lock", make_small(i)); #else erts_smp_spinlock_init(&erts_pix_locks[i].u.spnlck, "pix_lock"); #endif -#endif } - waiter_free_list = NULL; queue_free_list = NULL; - erts_tsd_key_create(&waiter_key); - erts_thr_install_exit_handler(cleanup_waiter); + erts_thr_install_exit_handler(cleanup_tse); #ifdef ERTS_ENABLE_LOCK_CHECK lc_id.proc_lock_main = erts_lc_get_lock_order_id("proc_main"); lc_id.proc_lock_link = erts_lc_get_lock_order_id("proc_link"); @@ -158,86 +139,106 @@ erts_init_proc_lock(void) lc_id.proc_lock_status = erts_lc_get_lock_order_id("proc_status"); #endif cpus = erts_get_cpu_configured(erts_cpuinfo); - if (cpus > 1) - proc_lock_spin_count = (ERTS_PROC_LOCK_SPIN_COUNT_BASE - * ((int) erts_no_schedulers)); - else if (cpus == 1) - proc_lock_spin_count = 0; - else /* No of cpus unknown. Assume multi proc, but be conservative. */ + if (cpus > 1) { proc_lock_spin_count = ERTS_PROC_LOCK_SPIN_COUNT_BASE; - if (proc_lock_spin_count > ERTS_PROC_LOCK_SPIN_COUNT_MAX) - proc_lock_spin_count = ERTS_PROC_LOCK_SPIN_COUNT_MAX; - proc_lock_trans_spin_cost = proc_lock_spin_count/20; -} - -static ERTS_INLINE erts_proc_lock_waiter_t * -alloc_wtr(void) -{ - erts_proc_lock_waiter_t *wtr; - erts_smp_spin_lock(&wtr_lock); - wtr = waiter_free_list; - if (wtr) { - waiter_free_list = wtr->next; - ERTS_LC_ASSERT(queue_free_list); - wtr->queues = queue_free_list; - queue_free_list = wtr->queues->next; - erts_smp_spin_unlock(&wtr_lock); + proc_lock_spin_count += (ERTS_PROC_LOCK_SPIN_COUNT_SCHED_INC + * ((int) erts_no_schedulers)); + aux_thr_proc_lock_spin_count = ERTS_PROC_LOCK_AUX_SPIN_COUNT; } - else { - erts_smp_spin_unlock(&wtr_lock); - wtr = erts_alloc(ERTS_ALC_T_PROC_LCK_WTR, - sizeof(erts_proc_lock_waiter_t)); - erts_smp_gate_init(&wtr->gate); - wtr->wait_locks = (ErtsProcLocks) 0; - wtr->queues = erts_alloc(ERTS_ALC_T_PROC_LCK_QS, - sizeof(erts_proc_lock_queues_t)); - sys_memcpy((void *) wtr->queues, - (void *) &zeroqs, - sizeof(erts_proc_lock_queues_t)); + else if (cpus == 1) { + proc_lock_spin_count = 0; + aux_thr_proc_lock_spin_count = 0; } - return wtr; + else { /* No of cpus unknown. Assume multi proc, but be conservative. */ + proc_lock_spin_count = ERTS_PROC_LOCK_SPIN_COUNT_BASE/2; + aux_thr_proc_lock_spin_count = ERTS_PROC_LOCK_AUX_SPIN_COUNT/2; + } + if (proc_lock_spin_count > ERTS_PROC_LOCK_SPIN_COUNT_MAX) + proc_lock_spin_count = ERTS_PROC_LOCK_SPIN_COUNT_MAX; } #ifdef ERTS_ENABLE_LOCK_CHECK static void -check_unused_waiter(erts_proc_lock_waiter_t *wtr) +check_unused_tse(erts_tse_t *wtr) { int i; - ERTS_LC_ASSERT(wtr->wait_locks == 0); + erts_proc_lock_queues_t *queues = wtr->udata; + ERTS_LC_ASSERT(wtr->uflgs == 0); for (i = 0; i <= ERTS_PROC_LOCK_MAX_BIT; i++) - ERTS_LC_ASSERT(!wtr->queues->queue[i]); + ERTS_LC_ASSERT(!queues->queue[i]); } -#define CHECK_UNUSED_WAITER(W) check_unused_waiter((W)) +#define CHECK_UNUSED_TSE(W) check_unused_tse((W)) #else -#define CHECK_UNUSED_WAITER(W) +#define CHECK_UNUSED_TSE(W) #endif +static ERTS_INLINE erts_tse_t * +tse_fetch(erts_pix_lock_t *pix_lock) +{ + erts_tse_t *tse = erts_tse_fetch(); + if (!tse->udata) { + erts_proc_lock_queues_t *qs; +#if ERTS_PROC_LOCK_SPINLOCK_IMPL && !ERTS_PROC_LOCK_ATOMIC_IMPL + if (pix_lock) + erts_pix_unlock(pix_lock); +#endif + erts_smp_spin_lock(&qs_lock); + qs = queue_free_list; + if (qs) { + queue_free_list = queue_free_list->next; + erts_smp_spin_unlock(&qs_lock); + } + else { + erts_smp_spin_unlock(&qs_lock); + qs = erts_alloc(ERTS_ALC_T_PROC_LCK_QS, + sizeof(erts_proc_lock_queues_t)); + sys_memcpy((void *) qs, + (void *) &zeroqs, + sizeof(erts_proc_lock_queues_t)); + } + tse->udata = qs; +#if ERTS_PROC_LOCK_SPINLOCK_IMPL && !ERTS_PROC_LOCK_ATOMIC_IMPL + if (pix_lock) + erts_pix_lock(pix_lock); +#endif + } + tse->uflgs = 0; + return tse; +} static ERTS_INLINE void -free_wtr(erts_proc_lock_waiter_t *wtr) +tse_return(erts_tse_t *tse, int force_free_q) { - CHECK_UNUSED_WAITER(wtr); - erts_smp_spin_lock(&wtr_lock); - wtr->next = waiter_free_list; - waiter_free_list = wtr; - wtr->queues->next = queue_free_list; - queue_free_list = wtr->queues; - erts_smp_spin_unlock(&wtr_lock); + CHECK_UNUSED_TSE(tse); + if (force_free_q || erts_tse_is_tmp(tse)) { + erts_proc_lock_queues_t *qs = tse->udata; + ASSERT(qs); + erts_smp_spin_lock(&qs_lock); + qs->next = queue_free_list; + queue_free_list = qs; + erts_smp_spin_unlock(&qs_lock); + tse->udata = NULL; + } + erts_tse_return(tse); } void erts_proc_lock_prepare_proc_lock_waiter(void) { - erts_tsd_set(waiter_key, (void *) alloc_wtr()); + tse_return(tse_fetch(NULL), 0); } static void -cleanup_waiter(void) +cleanup_tse(void) { - erts_proc_lock_waiter_t *wtr = erts_tsd_get(waiter_key); - if (wtr) - free_wtr(wtr); + erts_tse_t *tse = erts_tse_fetch(); + if (tse) { + if (tse->udata) + tse_return(tse, 1); + else + erts_tse_return(tse); + } } @@ -250,7 +251,7 @@ cleanup_waiter(void) static ERTS_INLINE void enqueue_waiter(erts_proc_lock_queues_t *qs, int ix, - erts_proc_lock_waiter_t *wtr) + erts_tse_t *wtr) { if (!qs->queue[ix]) { qs->queue[ix] = wtr; @@ -266,10 +267,10 @@ enqueue_waiter(erts_proc_lock_queues_t *qs, } } -static erts_proc_lock_waiter_t * +static erts_tse_t * dequeue_waiter(erts_proc_lock_queues_t *qs, int ix) { - erts_proc_lock_waiter_t *wtr = qs->queue[ix]; + erts_tse_t *wtr = qs->queue[ix]; ERTS_LC_ASSERT(qs->queue[ix]); if (wtr->next == wtr) { ERTS_LC_ASSERT(qs->queue[ix]->prev == wtr); @@ -295,10 +296,10 @@ dequeue_waiter(erts_proc_lock_queues_t *qs, int ix) * lock. */ static ERTS_INLINE void -try_aquire(erts_proc_lock_t *lck, erts_proc_lock_waiter_t *wtr) +try_aquire(erts_proc_lock_t *lck, erts_tse_t *wtr) { ErtsProcLocks got_locks = (ErtsProcLocks) 0; - ErtsProcLocks locks = wtr->wait_locks; + ErtsProcLocks locks = wtr->uflgs; int lock_no; ERTS_LC_ASSERT(lck->queues); @@ -334,7 +335,7 @@ try_aquire(erts_proc_lock_t *lck, erts_proc_lock_waiter_t *wtr) } } - wtr->wait_locks &= ~got_locks; + wtr->uflgs &= ~got_locks; } /* @@ -350,8 +351,8 @@ transfer_locks(Process *p, int unlock) { int transferred = 0; - erts_proc_lock_waiter_t *wake = NULL; - erts_proc_lock_waiter_t *wtr; + erts_tse_t *wake = NULL; + erts_tse_t *wtr; ErtsProcLocks unset_waiter = 0; ErtsProcLocks tlocks = trnsfr_lcks; int lock_no; @@ -377,11 +378,11 @@ transfer_locks(Process *p, ERTS_LC_ASSERT(wtr); if (!qs->queue[lock_no]) unset_waiter |= lock; - ERTS_LC_ASSERT(wtr->wait_locks & lock); - wtr->wait_locks &= ~lock; - if (wtr->wait_locks) + ERTS_LC_ASSERT(wtr->uflgs & lock); + wtr->uflgs &= ~lock; + if (wtr->uflgs) try_aquire(&p->lock, wtr); - if (!wtr->wait_locks) { + if (!wtr->uflgs) { /* * The other thread got all locks it needs; * need to wake it up. @@ -412,9 +413,10 @@ transfer_locks(Process *p, erts_pix_unlock(pix_lock); do { - erts_proc_lock_waiter_t *tmp = wake; + erts_tse_t *tmp = wake; wake = wake->next; - erts_smp_gate_let_through(&tmp->gate, 1); + erts_atomic_set(&tmp->uaflgs, 0); + erts_tse_set(tmp); } while (wake); if (!unlock) @@ -462,26 +464,16 @@ wait_for_locks(Process *p, ErtsProcLocks olflgs) { erts_pix_lock_t *pix_lock = pixlck ? pixlck : ERTS_PID2PIXLOCK(p->id); - int tsd; - erts_proc_lock_waiter_t *wtr; + erts_tse_t *wtr; + erts_proc_lock_queues_t *qs; /* Acquire a waiter object on which this thread can wait. */ - wtr = erts_tsd_get(waiter_key); - if (wtr) - tsd = 1; - else { -#if ERTS_PROC_LOCK_SPINLOCK_IMPL && !ERTS_PROC_LOCK_ATOMIC_IMPL - erts_pix_unlock(pix_lock); -#endif - wtr = alloc_wtr(); - tsd = 0; -#if ERTS_PROC_LOCK_SPINLOCK_IMPL && !ERTS_PROC_LOCK_ATOMIC_IMPL - erts_pix_lock(pix_lock); -#endif - } + wtr = tse_fetch(pix_lock); /* Record which locks this waiter needs. */ - wtr->wait_locks = need_locks; + wtr->uflgs = need_locks; + + ASSERT((wtr->uflgs & ~ERTS_PROC_LOCKS_ALL) == 0); #if ERTS_PROC_LOCK_ATOMIC_IMPL erts_pix_lock(pix_lock); @@ -489,14 +481,16 @@ wait_for_locks(Process *p, ERTS_LC_ASSERT(erts_lc_pix_lock_is_locked(pix_lock)); + qs = wtr->udata; + ASSERT(qs); /* Provide the process with waiter queues, if it doesn't have one. */ if (!p->lock.queues) { - wtr->queues->next = NULL; - p->lock.queues = wtr->queues; + qs->next = NULL; + p->lock.queues = qs; } else { - wtr->queues->next = p->lock.queues->next; - p->lock.queues->next = wtr->queues; + qs->next = p->lock.queues->next; + p->lock.queues->next = qs; } #ifdef ERTS_PROC_LOCK_HARD_DEBUG @@ -506,46 +500,59 @@ wait_for_locks(Process *p, /* Try to aquire locks one at a time in lock order and set wait flag */ try_aquire(&p->lock, wtr); + ASSERT((wtr->uflgs & ~ERTS_PROC_LOCKS_ALL) == 0); + #ifdef ERTS_PROC_LOCK_HARD_DEBUG check_queue(&p->lock); #endif - if (wtr->wait_locks) { /* We didn't get them all; need to wait... */ - /* Got to wait for locks... */ + if (wtr->uflgs) { + /* We didn't get them all; need to wait... */ + + ASSERT((wtr->uflgs & ~ERTS_PROC_LOCKS_ALL) == 0); + + erts_atomic_set(&wtr->uaflgs, 1); erts_pix_unlock(pix_lock); - /* - * Wait for needed locks. When we return all needed locks have - * have been acquired by other threads and transfered to us. - */ -#ifdef ERTS_PROC_LOCK_SPIN_ON_GATE - erts_smp_gate_swait(&wtr->gate, proc_lock_spin_count); -#else - erts_smp_gate_wait(&wtr->gate); -#endif + while (1) { + int res; + erts_tse_reset(wtr); + + if (erts_atomic_read(&wtr->uaflgs) == 0) + break; + + /* + * Wait for needed locks. When we are woken all needed locks have + * have been acquired by other threads and transfered to us. + * However, we need to be prepared for spurious wakeups. + */ + do { + res = erts_tse_wait(wtr); /* might return EINTR */ + } while (res != 0); + } erts_pix_lock(pix_lock); + + ASSERT(wtr->uflgs == 0); } /* Recover some queues to store in the waiter. */ ERTS_LC_ASSERT(p->lock.queues); if (p->lock.queues->next) { - wtr->queues = p->lock.queues->next; - p->lock.queues->next = wtr->queues->next; + qs = p->lock.queues->next; + p->lock.queues->next = qs->next; } else { - wtr->queues = p->lock.queues; + qs = p->lock.queues; p->lock.queues = NULL; } + wtr->udata = qs; erts_pix_unlock(pix_lock); ERTS_LC_ASSERT(locks == (ERTS_PROC_LOCK_FLGS_READ_(&p->lock) & locks)); - if (tsd) - CHECK_UNUSED_WAITER(wtr); - else - free_wtr(wtr); + tse_return(wtr, 0); } /* @@ -563,52 +570,57 @@ erts_proc_lock_failed(Process *p, ErtsProcLocks locks, ErtsProcLocks old_lflgs) { -#ifdef ERTS_PROC_LOCK_SPIN_ON_GATE - int spin_count = 0; -#else - int spin_count = proc_lock_spin_count; -#endif - + int until_yield = ERTS_PROC_LOCK_SPIN_UNTIL_YIELD; + int thr_spin_count; + int spin_count; ErtsProcLocks need_locks = locks; ErtsProcLocks olflgs = old_lflgs; - while (need_locks != 0) - { - ErtsProcLocks can_grab = in_order_locks(olflgs, need_locks); + if (erts_thr_get_main_status()) + thr_spin_count = proc_lock_spin_count; + else + thr_spin_count = aux_thr_proc_lock_spin_count; + + spin_count = thr_spin_count; + + while (need_locks != 0) { + ErtsProcLocks can_grab; + + can_grab = in_order_locks(olflgs, need_locks); - if (can_grab == 0) - { + if (can_grab == 0) { /* Someone already has the lowest-numbered lock we want. */ - if (spin_count-- <= 0) - { + if (spin_count-- <= 0) { /* Too many retries, give up and sleep for the lock. */ wait_for_locks(p, pixlck, locks, need_locks, olflgs); return; } + ERTS_SPIN_BODY; + + if (--until_yield == 0) { + until_yield = ERTS_PROC_LOCK_SPIN_UNTIL_YIELD; + erts_thr_yield(); + } + olflgs = ERTS_PROC_LOCK_FLGS_READ_(&p->lock); } - else - { + else { /* Try to grab all of the grabbable locks at once with cmpxchg. */ ErtsProcLocks grabbed = olflgs | can_grab; ErtsProcLocks nflgs = - ERTS_PROC_LOCK_FLGS_CMPXCHG_(&p->lock, grabbed, olflgs); + ERTS_PROC_LOCK_FLGS_CMPXCHG_ACQB_(&p->lock, grabbed, olflgs); - if (nflgs == olflgs) - { + if (nflgs == olflgs) { /* Success! We grabbed the 'can_grab' locks. */ olflgs = grabbed; need_locks &= ~can_grab; -#ifndef ERTS_PROC_LOCK_SPIN_ON_GATE /* Since we made progress, reset the spin count. */ - spin_count = proc_lock_spin_count; -#endif + spin_count = thr_spin_count; } - else - { + else { /* Compare-and-exchange failed, try again. */ olflgs = nflgs; } @@ -1407,7 +1419,7 @@ check_queue(erts_proc_lock_t *lck) wtr = (((ErtsProcLocks) 1) << lock_no) << ERTS_PROC_LOCK_WAITER_SHIFT; if (lflgs & wtr) { int n; - erts_proc_lock_waiter_t *wtr; + erts_tse_t *wtr; ERTS_LC_ASSERT(lck->queues && lck->queues->queue[lock_no]); wtr = lck->queues->queue[lock_no]; n = 0; |