/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 2007-2012. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* %CopyrightEnd%
*/
/*
* Description: Impementation of Erlang process locks.
*
* Author: Rickard Green
*/
/*
* A short explanation of the process lock implementation:
* Each process has a lock bitfield and a number of lock wait
* queues.
* The bit field contains of a number of lock flags (L1, L2, ...)
* and a number of wait flags (W1, W2, ...). Each lock flag has a
* corresponding wait flag. The bit field isn't guarranteed to be
* larger than 32-bits which sets a maximum of 16 different locks
* per process. Currently, only 4 locks per process are used. The
* bit field is operated on by use of atomic operations (custom
* made bitwise atomic operations). When a lock is locked the
* corresponding lock bit is set. When a thread is waiting on a
* lock the wait flag for the lock is set.
* The process table is protected by pix (process index) locks
* which is spinlocks that protects a number of process indices in
* the process table. The pix locks also protects the lock queues
* and modifications of wait flags.
* When acquiring a process lock we first try to set the lock
* flag. If we are able to set the lock flag and the wait flag
* isn't set we are done. If the lock flag was already set we
* have to acquire the pix lock, set the wait flag, and put
* ourselves in the wait queue.
* Process locks will always be acquired in fifo order.
* When releasing a process lock we first unset all lock flags
* whose corresponding wait flag is clear (which will succeed).
* If wait flags were set for the locks being released, we acquire
* the pix lock, and transfer the lock to the first thread
* in the wait queue.
* Note that wait flags may be read without the pix lock, but
* it is important that wait flags only are modified when the pix
* lock is held.
* This implementation assumes that erts_smp_atomic_or_retold()
* provides necessary memorybarriers for a lock operation, and that
* erts_smp_atomic_and_retold() provides necessary memorybarriers
* for an unlock operation.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "erl_process.h"
const Process erts_proc_lock_busy;
#ifdef ERTS_SMP
#define ERTS_PROC_LOCK_SPIN_COUNT_MAX 2000
#define ERTS_PROC_LOCK_SPIN_COUNT_SCHED_INC 32
#define ERTS_PROC_LOCK_SPIN_COUNT_BASE 1000
#define ERTS_PROC_LOCK_AUX_SPIN_COUNT 50
#define ERTS_PROC_LOCK_SPIN_UNTIL_YIELD 25
#ifdef ERTS_PROC_LOCK_DEBUG
#define ERTS_PROC_LOCK_HARD_DEBUG
#endif
#ifdef ERTS_PROC_LOCK_HARD_DEBUG
static void check_queue(erts_proc_lock_t *lck);
#endif
#if SIZEOF_INT < 4
#error "The size of the 'uflgs' field of the erts_tse_t type is too small"
#endif
#ifdef ERTS_ENABLE_LOCK_CHECK
static struct {
Sint16 proc_lock_main;
Sint16 proc_lock_link;
Sint16 proc_lock_msgq;
Sint16 proc_lock_status;
} lc_id;
#endif
erts_pix_lock_t erts_pix_locks[ERTS_NO_OF_PIX_LOCKS];
static int proc_lock_spin_count;
static int aux_thr_proc_lock_spin_count;
static void cleanup_tse(void);
void
erts_init_proc_lock(int cpus)
{
int i;
for (i = 0; i < ERTS_NO_OF_PIX_LOCKS; i++) {
#ifdef ERTS_ENABLE_LOCK_COUNT
erts_mtx_init_x(&erts_pix_locks[i].u.mtx,
"pix_lock", make_small(i));
#else
erts_mtx_init(&erts_pix_locks[i].u.mtx, "pix_lock");
#endif
}
erts_thr_install_exit_handler(cleanup_tse);
#ifdef ERTS_ENABLE_LOCK_CHECK
lc_id.proc_lock_main = erts_lc_get_lock_order_id("proc_main");
lc_id.proc_lock_link = erts_lc_get_lock_order_id("proc_link");
lc_id.proc_lock_msgq = erts_lc_get_lock_order_id("proc_msgq");
lc_id.proc_lock_status = erts_lc_get_lock_order_id("proc_status");
#endif
if (cpus > 1) {
proc_lock_spin_count = ERTS_PROC_LOCK_SPIN_COUNT_BASE;
proc_lock_spin_count += (ERTS_PROC_LOCK_SPIN_COUNT_SCHED_INC
* ((int) erts_no_schedulers));
aux_thr_proc_lock_spin_count = ERTS_PROC_LOCK_AUX_SPIN_COUNT;
}
else if (cpus == 1) {
proc_lock_spin_count = 0;
aux_thr_proc_lock_spin_count = 0;
}
else { /* No of cpus unknown. Assume multi proc, but be conservative. */
proc_lock_spin_count = ERTS_PROC_LOCK_SPIN_COUNT_BASE/2;
aux_thr_proc_lock_spin_count = ERTS_PROC_LOCK_AUX_SPIN_COUNT/2;
}
if (proc_lock_spin_count > ERTS_PROC_LOCK_SPIN_COUNT_MAX)
proc_lock_spin_count = ERTS_PROC_LOCK_SPIN_COUNT_MAX;
}
#ifdef ERTS_ENABLE_LOCK_CHECK
#define CHECK_UNUSED_TSE(W) ERTS_LC_ASSERT((W)->uflgs == 0)
#else
#define CHECK_UNUSED_TSE(W)
#endif
static ERTS_INLINE erts_tse_t *
tse_fetch(erts_pix_lock_t *pix_lock)
{
erts_tse_t *tse = erts_tse_fetch();
tse->uflgs = 0;
return tse;
}
static ERTS_INLINE void
tse_return(erts_tse_t *tse)
{
CHECK_UNUSED_TSE(tse);
erts_tse_return(tse);
}
void
erts_proc_lock_prepare_proc_lock_waiter(void)
{
tse_return(tse_fetch(NULL));
}
static void
cleanup_tse(void)
{
erts_tse_t *tse = erts_tse_fetch();
if (tse)
erts_tse_return(tse);
}
/*
* Waiters are queued in a circular double linked list;
* where lck->queue[lock_ix] is the first waiter in queue, and
* lck->queue[lock_ix]->prev is the last waiter in queue.
*/
static ERTS_INLINE void
enqueue_waiter(erts_proc_lock_t *lck, int ix, erts_tse_t *wtr)
{
if (!lck->queue[ix]) {
lck->queue[ix] = wtr;
wtr->next = wtr;
wtr->prev = wtr;
}
else {
ERTS_LC_ASSERT(lck->queue[ix]->next && lck->queue[ix]->prev);
wtr->next = lck->queue[ix];
wtr->prev = lck->queue[ix]->prev;
wtr->prev->next = wtr;
lck->queue[ix]->prev = wtr;
}
}
static erts_tse_t *
dequeue_waiter(erts_proc_lock_t *lck, int ix)
{
erts_tse_t *wtr = lck->queue[ix];
ERTS_LC_ASSERT(lck->queue[ix]);
if (wtr->next == wtr) {
ERTS_LC_ASSERT(lck->queue[ix]->prev == wtr);
lck->queue[ix] = NULL;
}
else {
ERTS_LC_ASSERT(wtr->next != wtr);
ERTS_LC_ASSERT(wtr->prev != wtr);
wtr->next->prev = wtr->prev;
wtr->prev->next = wtr->next;
lck->queue[ix] = wtr->next;
}
return wtr;
}
/*
* Tries to aquire as many locks as possible in lock order,
* and sets the wait flag on the first lock not possible to
* aquire.
*
* Note: We need the pix lock during this operation. Wait
* flags are only allowed to be manipulated under pix
* lock.
*/
static ERTS_INLINE void
try_aquire(erts_proc_lock_t *lck, erts_tse_t *wtr)
{
ErtsProcLocks got_locks = (ErtsProcLocks) 0;
ErtsProcLocks locks = wtr->uflgs;
int lock_no;
ERTS_LC_ASSERT(got_locks != locks);
for (lock_no = 0; lock_no <= ERTS_PROC_LOCK_MAX_BIT; lock_no++) {
ErtsProcLocks lock = ((ErtsProcLocks) 1) << lock_no;
if (locks & lock) {
ErtsProcLocks wflg, old_lflgs;
if (lck->queue[lock_no]) {
/* Others already waiting */
enqueue:
ERTS_LC_ASSERT(ERTS_PROC_LOCK_FLGS_READ_(lck)
& (lock << ERTS_PROC_LOCK_WAITER_SHIFT));
enqueue_waiter(lck, lock_no, wtr);
break;
}
wflg = lock << ERTS_PROC_LOCK_WAITER_SHIFT;
old_lflgs = ERTS_PROC_LOCK_FLGS_BOR_ACQB_(lck, wflg | lock);
if (old_lflgs & lock) {
/* Didn't get the lock */
goto enqueue;
}
else {
/* Got the lock */
got_locks |= lock;
ERTS_LC_ASSERT(!(old_lflgs & wflg));
/* No one else can be waiting for the lock; remove wait flag */
(void) ERTS_PROC_LOCK_FLGS_BAND_(lck, ~wflg);
if (got_locks == locks)
break;
}
}
}
wtr->uflgs &= ~got_locks;
}
/*
* Transfer 'trnsfr_lcks' held by this executing thread to other
* threads waiting for the locks. When a lock has been transferred
* we also have to try to aquire as many lock as possible for the
* other thread.
*/
static int
transfer_locks(Process *p,
ErtsProcLocks trnsfr_lcks,
erts_pix_lock_t *pix_lock,
int unlock)
{
int transferred = 0;
erts_tse_t *wake = NULL;
erts_tse_t *wtr;
ErtsProcLocks unset_waiter = 0;
ErtsProcLocks tlocks = trnsfr_lcks;
int lock_no;
ERTS_LC_ASSERT(erts_lc_pix_lock_is_locked(pix_lock));
#ifdef ERTS_PROC_LOCK_HARD_DEBUG
check_queue(&p->lock);
#endif
for (lock_no = 0; tlocks && lock_no <= ERTS_PROC_LOCK_MAX_BIT; lock_no++) {
ErtsProcLocks lock = ((ErtsProcLocks) 1) << lock_no;
if (tlocks & lock) {
/* Transfer lock */
#ifdef ERTS_ENABLE_LOCK_CHECK
tlocks &= ~lock;
#endif
ERTS_LC_ASSERT(ERTS_PROC_LOCK_FLGS_READ_(&p->lock)
& (lock << ERTS_PROC_LOCK_WAITER_SHIFT));
transferred++;
wtr = dequeue_waiter(&p->lock, lock_no);
ERTS_LC_ASSERT(wtr);
if (!p->lock.queue[lock_no])
unset_waiter |= lock;
ERTS_LC_ASSERT(wtr->uflgs & lock);
wtr->uflgs &= ~lock;
if (wtr->uflgs)
try_aquire(&p->lock, wtr);
if (!wtr->uflgs) {
/*
* The other thread got all locks it needs;
* need to wake it up.
*/
wtr->next = wake;
wake = wtr;
}
}
}
if (unset_waiter) {
unset_waiter <<= ERTS_PROC_LOCK_WAITER_SHIFT;
(void) ERTS_PROC_LOCK_FLGS_BAND_(&p->lock, ~unset_waiter);
}
#ifdef ERTS_PROC_LOCK_HARD_DEBUG
check_queue(&p->lock);
#endif
ERTS_LC_ASSERT(tlocks == 0); /* We should have transferred all of them */
if (!wake) {
if (unlock)
erts_pix_unlock(pix_lock);
}
else {
erts_pix_unlock(pix_lock);
do {
erts_tse_t *tmp = wake;
wake = wake->next;
erts_atomic32_set_nob(&tmp->uaflgs, 0);
erts_tse_set(tmp);
} while (wake);
if (!unlock)
erts_pix_lock(pix_lock);
}
return transferred;
}
/*
* Determine which locks in 'need_locks' are not currently locked in
* 'in_use', but do not return any locks "above" some lock we need,
* so we do not attempt to grab locks out of order.
*
* For example, if we want to lock 10111, and 00100 was already locked, this
* would return 00011, indicating we should not try for 10000 yet because
* that would be a lock-ordering violation.
*/
static ERTS_INLINE ErtsProcLocks
in_order_locks(ErtsProcLocks in_use, ErtsProcLocks need_locks)
{
/* All locks we want that are already locked by someone else. */
ErtsProcLocks busy = in_use & need_locks;
/* Just the lowest numbered lock we want that's in use; 0 if none. */
ErtsProcLocks lowest_busy = busy & -busy;
/* All locks below the lowest one we want that's in use already. */
return need_locks & (lowest_busy - 1);
}
/*
* Try to grab locks one at a time in lock order and wait on the lowest
* lock we fail to grab, if any.
*
* If successful, this returns 0 and all locks in 'need_locks' are held.
*
* On entry, the pix lock is held iff !ERTS_PROC_LOCK_ATOMIC_IMPL.
* On exit it is not held.
*/
static void
wait_for_locks(Process *p,
erts_pix_lock_t *pixlck,
ErtsProcLocks locks,
ErtsProcLocks need_locks,
ErtsProcLocks olflgs)
{
erts_pix_lock_t *pix_lock = pixlck ? pixlck : ERTS_PID2PIXLOCK(p->id);
erts_tse_t *wtr;
/* Acquire a waiter object on which this thread can wait. */
wtr = tse_fetch(pix_lock);
/* Record which locks this waiter needs. */
wtr->uflgs = need_locks;
ASSERT((wtr->uflgs & ~ERTS_PROC_LOCKS_ALL) == 0);
#if ERTS_PROC_LOCK_ATOMIC_IMPL
erts_pix_lock(pix_lock);
#endif
ERTS_LC_ASSERT(erts_lc_pix_lock_is_locked(pix_lock));
#ifdef ERTS_PROC_LOCK_HARD_DEBUG
check_queue(&p->lock);
#endif
/* Try to aquire locks one at a time in lock order and set wait flag */
try_aquire(&p->lock, wtr);
ASSERT((wtr->uflgs & ~ERTS_PROC_LOCKS_ALL) == 0);
#ifdef ERTS_PROC_LOCK_HARD_DEBUG
check_queue(&p->lock);
#endif
if (wtr->uflgs == 0)
erts_pix_unlock(pix_lock);
else {
/* We didn't get them all; need to wait... */
ASSERT((wtr->uflgs & ~ERTS_PROC_LOCKS_ALL) == 0);
erts_atomic32_set_nob(&wtr->uaflgs, 1);
erts_pix_unlock(pix_lock);
while (1) {
int res;
erts_tse_reset(wtr);
if (erts_atomic32_read_nob(&wtr->uaflgs) == 0)
break;
/*
* Wait for needed locks. When we are woken all needed locks have
* have been acquired by other threads and transfered to us.
* However, we need to be prepared for spurious wakeups.
*/
do {
res = erts_tse_wait(wtr); /* might return EINTR */
} while (res != 0);
}
ASSERT(wtr->uflgs == 0);
}
ERTS_LC_ASSERT(locks == (ERTS_PROC_LOCK_FLGS_READ_(&p->lock) & locks));
tse_return(wtr);
}
/*
* erts_proc_lock_failed() is called when erts_smp_proc_lock()
* wasn't able to lock all locks. We may need to transfer locks
* to waiters and wait for our turn on locks.
*
* Iff !ERTS_PROC_LOCK_ATOMIC_IMPL, the pix lock is locked on entry.
*
* This always returns with the pix lock unlocked.
*/
void
erts_proc_lock_failed(Process *p,
erts_pix_lock_t *pixlck,
ErtsProcLocks locks,
ErtsProcLocks old_lflgs)
{
int until_yield = ERTS_PROC_LOCK_SPIN_UNTIL_YIELD;
int thr_spin_count;
int spin_count;
ErtsProcLocks need_locks = locks;
ErtsProcLocks olflgs = old_lflgs;
if (erts_thr_get_main_status())
thr_spin_count = proc_lock_spin_count;
else
thr_spin_count = aux_thr_proc_lock_spin_count;
spin_count = thr_spin_count;
while (need_locks != 0) {
ErtsProcLocks can_grab;
can_grab = in_order_locks(olflgs, need_locks);
if (can_grab == 0) {
/* Someone already has the lowest-numbered lock we want. */
if (spin_count-- <= 0) {
/* Too many retries, give up and sleep for the lock. */
wait_for_locks(p, pixlck, locks, need_locks, olflgs);
return;
}
ERTS_SPIN_BODY;
if (--until_yield == 0) {
until_yield = ERTS_PROC_LOCK_SPIN_UNTIL_YIELD;
erts_thr_yield();
}
olflgs = ERTS_PROC_LOCK_FLGS_READ_(&p->lock);
}
else {
/* Try to grab all of the grabbable locks at once with cmpxchg. */
ErtsProcLocks grabbed = olflgs | can_grab;
ErtsProcLocks nflgs =
ERTS_PROC_LOCK_FLGS_CMPXCHG_ACQB_(&p->lock, grabbed, olflgs);
if (nflgs == olflgs) {
/* Success! We grabbed the 'can_grab' locks. */
olflgs = grabbed;
need_locks &= ~can_grab;
/* Since we made progress, reset the spin count. */
spin_count = thr_spin_count;
}
else {
/* Compare-and-exchange failed, try again. */
olflgs = nflgs;
}
}
}
/* Now we have all of the locks we wanted. */
#if !ERTS_PROC_LOCK_ATOMIC_IMPL
erts_pix_unlock(pixlck);
#endif
}
/*
* erts_proc_unlock_failed() is called when erts_smp_proc_unlock()
* wasn't able to unlock all locks. We may need to transfer locks
* to waiters.
*/
void
erts_proc_unlock_failed(Process *p,
erts_pix_lock_t *pixlck,
ErtsProcLocks wait_locks)
{
erts_pix_lock_t *pix_lock = pixlck ? pixlck : ERTS_PID2PIXLOCK(p->id);
#if ERTS_PROC_LOCK_ATOMIC_IMPL
erts_pix_lock(pix_lock);
#endif
transfer_locks(p, wait_locks, pix_lock, 1); /* unlocks pix_lock */
}
/*
* proc_safelock() locks process locks on two processes. In order
* to avoid a deadlock, proc_safelock() unlocks those locks that
* needs to be unlocked, and then acquires locks in lock order
* (including the previously unlocked ones).
*/
static void
proc_safelock(Process *a_proc,
erts_pix_lock_t *a_pix_lck,
ErtsProcLocks a_have_locks,
ErtsProcLocks a_need_locks,
Process *b_proc,
erts_pix_lock_t *b_pix_lck,
ErtsProcLocks b_have_locks,
ErtsProcLocks b_need_locks)
{
Process *p1, *p2;
#ifdef ERTS_ENABLE_LOCK_CHECK
Eterm pid1, pid2;
#endif
erts_pix_lock_t *pix_lck1, *pix_lck2;
ErtsProcLocks need_locks1, have_locks1, need_locks2, have_locks2;
ErtsProcLocks unlock_mask;
int lock_no, refc1 = 0, refc2 = 0;
ERTS_LC_ASSERT(b_proc);
/* Determine inter process lock order...
* Locks with the same lock order should be locked on p1 before p2.
*/
if (a_proc) {
if (a_proc->id < b_proc->id) {
p1 = a_proc;
#ifdef ERTS_ENABLE_LOCK_CHECK
pid1 = a_proc->id;
#endif
pix_lck1 = a_pix_lck;
need_locks1 = a_need_locks;
have_locks1 = a_have_locks;
p2 = b_proc;
#ifdef ERTS_ENABLE_LOCK_CHECK
pid2 = b_proc->id;
#endif
pix_lck2 = b_pix_lck;
need_locks2 = b_need_locks;
have_locks2 = b_have_locks;
}
else if (a_proc->id > b_proc->id) {
p1 = b_proc;
#ifdef ERTS_ENABLE_LOCK_CHECK
pid1 = b_proc->id;
#endif
pix_lck1 = b_pix_lck;
need_locks1 = b_need_locks;
have_locks1 = b_have_locks;
p2 = a_proc;
#ifdef ERTS_ENABLE_LOCK_CHECK
pid2 = a_proc->id;
#endif
pix_lck2 = a_pix_lck;
need_locks2 = a_need_locks;
have_locks2 = a_have_locks;
}
else {
ERTS_LC_ASSERT(a_proc == b_proc);
ERTS_LC_ASSERT(a_proc->id == b_proc->id);
p1 = a_proc;
#ifdef ERTS_ENABLE_LOCK_CHECK
pid1 = a_proc->id;
#endif
pix_lck1 = a_pix_lck;
need_locks1 = a_need_locks | b_need_locks;
have_locks1 = a_have_locks | b_have_locks;
p2 = NULL;
#ifdef ERTS_ENABLE_LOCK_CHECK
pid2 = 0;
#endif
pix_lck2 = NULL;
need_locks2 = 0;
have_locks2 = 0;
}
}
else {
p1 = b_proc;
#ifdef ERTS_ENABLE_LOCK_CHECK
pid1 = b_proc->id;
#endif
pix_lck1 = b_pix_lck;
need_locks1 = b_need_locks;
have_locks1 = b_have_locks;
p2 = NULL;
#ifdef ERTS_ENABLE_LOCK_CHECK
pid2 = 0;
#endif
pix_lck2 = NULL;
need_locks2 = 0;
have_locks2 = 0;
#ifdef ERTS_ENABLE_LOCK_CHECK
a_need_locks = 0;
a_have_locks = 0;
#endif
}
#ifdef ERTS_ENABLE_LOCK_CHECK
if (p1)
erts_proc_lc_chk_proc_locks(p1, have_locks1);
if (p2)
erts_proc_lc_chk_proc_locks(p2, have_locks2);
if ((need_locks1 & have_locks1) != have_locks1)
erts_lc_fail("Thread tries to release process lock(s) "
"on %T via erts_proc_safelock().", pid1);
if ((need_locks2 & have_locks2) != have_locks2)
erts_lc_fail("Thread tries to release process lock(s) "
"on %T via erts_proc_safelock().",
pid2);
#endif
need_locks1 &= ~have_locks1;
need_locks2 &= ~have_locks2;
/* Figure out the range of locks that needs to be unlocked... */
unlock_mask = ERTS_PROC_LOCKS_ALL;
for (lock_no = 0;
lock_no <= ERTS_PROC_LOCK_MAX_BIT;
lock_no++) {
ErtsProcLocks lock = (1 << lock_no);
if (lock & need_locks1)
break;
unlock_mask &= ~lock;
if (lock & need_locks2)
break;
}
/* ... and unlock locks in that range... */
if (have_locks1 || have_locks2) {
ErtsProcLocks unlock_locks;
unlock_locks = unlock_mask & have_locks1;
if (unlock_locks) {
have_locks1 &= ~unlock_locks;
need_locks1 |= unlock_locks;
if (!have_locks1) {
refc1 = 1;
erts_smp_proc_inc_refc(p1);
}
erts_smp_proc_unlock__(p1, pix_lck1, unlock_locks);
}
unlock_locks = unlock_mask & have_locks2;
if (unlock_locks) {
have_locks2 &= ~unlock_locks;
need_locks2 |= unlock_locks;
if (!have_locks2) {
refc2 = 1;
erts_smp_proc_inc_refc(p2);
}
erts_smp_proc_unlock__(p2, pix_lck2, unlock_locks);
}
}
/*
* lock_no equals the number of the first lock to lock on
* either p1 *or* p2.
*/
#ifdef ERTS_ENABLE_LOCK_CHECK
if (p1)
erts_proc_lc_chk_proc_locks(p1, have_locks1);
if (p2)
erts_proc_lc_chk_proc_locks(p2, have_locks2);
#endif
/* Lock locks in lock order... */
while (lock_no <= ERTS_PROC_LOCK_MAX_BIT) {
ErtsProcLocks locks;
ErtsProcLocks lock = (1 << lock_no);
ErtsProcLocks lock_mask = 0;
if (need_locks1 & lock) {
do {
lock = (1 << lock_no++);
lock_mask |= lock;
} while (lock_no <= ERTS_PROC_LOCK_MAX_BIT
&& !(need_locks2 & lock));
if (need_locks2 & lock)
lock_no--;
locks = need_locks1 & lock_mask;
erts_smp_proc_lock__(p1, pix_lck1, locks);
have_locks1 |= locks;
need_locks1 &= ~locks;
}
else if (need_locks2 & lock) {
while (lock_no <= ERTS_PROC_LOCK_MAX_BIT
&& !(need_locks1 & lock)) {
lock_mask |= lock;
lock = (1 << ++lock_no);
}
locks = need_locks2 & lock_mask;
erts_smp_proc_lock__(p2, pix_lck2, locks);
have_locks2 |= locks;
need_locks2 &= ~locks;
}
else
lock_no++;
}
#ifdef ERTS_ENABLE_LOCK_CHECK
if (p1)
erts_proc_lc_chk_proc_locks(p1, have_locks1);
if (p2)
erts_proc_lc_chk_proc_locks(p2, have_locks2);
if (p1 && p2) {
if (p1 == a_proc) {
ERTS_LC_ASSERT(a_need_locks == have_locks1);
ERTS_LC_ASSERT(b_need_locks == have_locks2);
}
else {
ERTS_LC_ASSERT(a_need_locks == have_locks2);
ERTS_LC_ASSERT(b_need_locks == have_locks1);
}
}
else {
ERTS_LC_ASSERT(p1);
if (a_proc) {
ERTS_LC_ASSERT(have_locks1 == (a_need_locks | b_need_locks));
}
else {
ERTS_LC_ASSERT(have_locks1 == b_need_locks);
}
}
#endif
if (refc1)
erts_smp_proc_dec_refc(p1);
if (refc2)
erts_smp_proc_dec_refc(p2);
}
void
erts_proc_safelock(Process *a_proc,
ErtsProcLocks a_have_locks,
ErtsProcLocks a_need_locks,
Process *b_proc,
ErtsProcLocks b_have_locks,
ErtsProcLocks b_need_locks)
{
proc_safelock(a_proc,
a_proc ? ERTS_PID2PIXLOCK(a_proc->id) : NULL,
a_have_locks,
a_need_locks,
b_proc,
b_proc ? ERTS_PID2PIXLOCK(b_proc->id) : NULL,
b_have_locks,
b_need_locks);
}
/*
* erts_pid2proc_safelock() is called from erts_pid2proc_opt() when
* it wasn't possible to trylock all locks needed.
* c_p - current process
* c_p_have_locks - locks held on c_p
* pid - process id of process we are looking up
* proc - process struct of process we are looking
* up (both in and out argument)
* need_locks - all locks we need (including have_locks)
* pix_lock - pix lock for process we are looking up
* flags - option flags
*/
void
erts_pid2proc_safelock(Process *c_p,
ErtsProcLocks c_p_have_locks,
Process **proc,
ErtsProcLocks need_locks,
erts_pix_lock_t *pix_lock,
int flags)
{
Process *p = *proc;
ERTS_LC_ASSERT(p->lock.refc > 0);
ERTS_LC_ASSERT(process_tab[internal_pid_index(p->id)] == p);
p->lock.refc++;
erts_pix_unlock(pix_lock);
proc_safelock(c_p,
c_p ? ERTS_PID2PIXLOCK(c_p->id) : NULL,
c_p_have_locks,
c_p_have_locks,
p,
pix_lock,
0,
need_locks);
erts_pix_lock(pix_lock);
if (!p->is_exiting
|| ((flags & ERTS_P2P_FLG_ALLOW_OTHER_X)
&& process_tab[internal_pid_index(p->id)] == p)) {
ERTS_LC_ASSERT(p->lock.refc > 1);
p->lock.refc--;
}
else {
/* No proc. Note, we need to keep refc until after process unlock */
erts_pix_unlock(pix_lock);
erts_smp_proc_unlock__(p, pix_lock, need_locks);
*proc = NULL;
erts_pix_lock(pix_lock);
ERTS_LC_ASSERT(p->lock.refc > 0);
if (--p->lock.refc == 0) {
erts_pix_unlock(pix_lock);
erts_free_proc(p);
erts_pix_lock(pix_lock);
}
}
}
void
erts_proc_lock_init(Process *p)
{
int i;
/* We always start with all locks locked */
#if ERTS_PROC_LOCK_ATOMIC_IMPL
erts_smp_atomic32_init_nob(&p->lock.flags,
(erts_aint32_t) ERTS_PROC_LOCKS_ALL);
#else
p->lock.flags = ERTS_PROC_LOCKS_ALL;
#endif
for (i = 0; i <= ERTS_PROC_LOCK_MAX_BIT; i++)
p->lock.queue[i] = NULL;
p->lock.refc = 1;
#ifdef ERTS_ENABLE_LOCK_COUNT
erts_lcnt_proc_lock_init(p);
erts_lcnt_proc_lock(&(p->lock), ERTS_PROC_LOCKS_ALL);
erts_lcnt_proc_lock_post_x(&(p->lock), ERTS_PROC_LOCKS_ALL, __FILE__, __LINE__);
#endif
#ifdef ERTS_ENABLE_LOCK_CHECK
erts_proc_lc_trylock(p, ERTS_PROC_LOCKS_ALL, 1);
#endif
#ifdef ERTS_PROC_LOCK_DEBUG
for (i = 0; i <= ERTS_PROC_LOCK_MAX_BIT; i++)
erts_smp_atomic32_init_nob(&p->lock.locked[i], (erts_aint32_t) 1);
#endif
}
/* --- Process lock counting ----------------------------------------------- */
#ifdef ERTS_ENABLE_LOCK_COUNT
void erts_lcnt_proc_lock_init(Process *p) {
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) {
if (p->id != ERTS_INVALID_PID) {
erts_lcnt_init_lock_x(&(p->lock.lcnt_main), "proc_main", ERTS_LCNT_LT_PROCLOCK, p->id);
erts_lcnt_init_lock_x(&(p->lock.lcnt_msgq), "proc_msgq", ERTS_LCNT_LT_PROCLOCK, p->id);
erts_lcnt_init_lock_x(&(p->lock.lcnt_link), "proc_link", ERTS_LCNT_LT_PROCLOCK, p->id);
erts_lcnt_init_lock_x(&(p->lock.lcnt_status), "proc_status", ERTS_LCNT_LT_PROCLOCK, p->id);
} else {
erts_lcnt_init_lock(&(p->lock.lcnt_main), "proc_main", ERTS_LCNT_LT_PROCLOCK);
erts_lcnt_init_lock(&(p->lock.lcnt_msgq), "proc_msgq", ERTS_LCNT_LT_PROCLOCK);
erts_lcnt_init_lock(&(p->lock.lcnt_link), "proc_link", ERTS_LCNT_LT_PROCLOCK);
erts_lcnt_init_lock(&(p->lock.lcnt_status), "proc_status", ERTS_LCNT_LT_PROCLOCK);
}
} else {
sys_memzero(&(p->lock.lcnt_main), sizeof(p->lock.lcnt_main));
sys_memzero(&(p->lock.lcnt_msgq), sizeof(p->lock.lcnt_msgq));
sys_memzero(&(p->lock.lcnt_link), sizeof(p->lock.lcnt_link));
sys_memzero(&(p->lock.lcnt_status), sizeof(p->lock.lcnt_status));
}
}
void erts_lcnt_proc_lock_destroy(Process *p) {
erts_lcnt_destroy_lock(&(p->lock.lcnt_main));
erts_lcnt_destroy_lock(&(p->lock.lcnt_msgq));
erts_lcnt_destroy_lock(&(p->lock.lcnt_link));
erts_lcnt_destroy_lock(&(p->lock.lcnt_status));
}
void erts_lcnt_proc_lock(erts_proc_lock_t *lock, ErtsProcLocks locks) {
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) {
if (locks & ERTS_PROC_LOCK_MAIN) {
erts_lcnt_lock(&(lock->lcnt_main));
}
if (locks & ERTS_PROC_LOCK_MSGQ) {
erts_lcnt_lock(&(lock->lcnt_msgq));
}
if (locks & ERTS_PROC_LOCK_LINK) {
erts_lcnt_lock(&(lock->lcnt_link));
}
if (locks & ERTS_PROC_LOCK_STATUS) {
erts_lcnt_lock(&(lock->lcnt_status));
}
}
}
void erts_lcnt_proc_lock_post_x(erts_proc_lock_t *lock, ErtsProcLocks locks, char *file, unsigned int line) {
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) {
if (locks & ERTS_PROC_LOCK_MAIN) {
erts_lcnt_lock_post_x(&(lock->lcnt_main), file, line);
}
if (locks & ERTS_PROC_LOCK_MSGQ) {
erts_lcnt_lock_post_x(&(lock->lcnt_msgq), file, line);
}
if (locks & ERTS_PROC_LOCK_LINK) {
erts_lcnt_lock_post_x(&(lock->lcnt_link), file, line);
}
if (locks & ERTS_PROC_LOCK_STATUS) {
erts_lcnt_lock_post_x(&(lock->lcnt_status), file, line);
}
}
}
void erts_lcnt_proc_lock_unaquire(erts_proc_lock_t *lock, ErtsProcLocks locks) {
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) {
if (locks & ERTS_PROC_LOCK_MAIN) {
erts_lcnt_lock_unaquire(&(lock->lcnt_main));
}
if (locks & ERTS_PROC_LOCK_MSGQ) {
erts_lcnt_lock_unaquire(&(lock->lcnt_msgq));
}
if (locks & ERTS_PROC_LOCK_LINK) {
erts_lcnt_lock_unaquire(&(lock->lcnt_link));
}
if (locks & ERTS_PROC_LOCK_STATUS) {
erts_lcnt_lock_unaquire(&(lock->lcnt_status));
}
}
}
void erts_lcnt_proc_unlock(erts_proc_lock_t *lock, ErtsProcLocks locks) {
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) {
if (locks & ERTS_PROC_LOCK_MAIN) {
erts_lcnt_unlock(&(lock->lcnt_main));
}
if (locks & ERTS_PROC_LOCK_MSGQ) {
erts_lcnt_unlock(&(lock->lcnt_msgq));
}
if (locks & ERTS_PROC_LOCK_LINK) {
erts_lcnt_unlock(&(lock->lcnt_link));
}
if (locks & ERTS_PROC_LOCK_STATUS) {
erts_lcnt_unlock(&(lock->lcnt_status));
}
}
}
void erts_lcnt_proc_trylock(erts_proc_lock_t *lock, ErtsProcLocks locks, int res) {
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) {
if (locks & ERTS_PROC_LOCK_MAIN) {
erts_lcnt_trylock(&(lock->lcnt_main), res);
}
if (locks & ERTS_PROC_LOCK_MSGQ) {
erts_lcnt_trylock(&(lock->lcnt_msgq), res);
}
if (locks & ERTS_PROC_LOCK_LINK) {
erts_lcnt_trylock(&(lock->lcnt_link), res);
}
if (locks & ERTS_PROC_LOCK_STATUS) {
erts_lcnt_trylock(&(lock->lcnt_status), res);
}
}
}
void erts_lcnt_enable_proc_lock_count(int enable) {
int i;
for (i = 0; i < erts_max_processes; ++i) {
Process* p = process_tab[i];
if (p) {
if (enable) {
if (!ERTS_LCNT_LOCK_TYPE(&(p->lock.lcnt_main))) {
erts_lcnt_proc_lock_init(p);
}
} else {
if (ERTS_LCNT_LOCK_TYPE(&(p->lock.lcnt_main))) {
erts_lcnt_proc_lock_destroy(p);
}
}
}
}
}
#endif /* ifdef ERTS_ENABLE_LOCK_COUNT */
/* --- Process lock checking ----------------------------------------------- */
#ifdef ERTS_ENABLE_LOCK_CHECK
void
erts_proc_lc_lock(Process *p, ErtsProcLocks locks)
{
erts_lc_lock_t lck = ERTS_LC_LOCK_INIT(-1,
p->id,
ERTS_LC_FLG_LT_PROCLOCK);
if (locks & ERTS_PROC_LOCK_MAIN) {
lck.id = lc_id.proc_lock_main;
erts_lc_lock(&lck);
}
if (locks & ERTS_PROC_LOCK_LINK) {
lck.id = lc_id.proc_lock_link;
erts_lc_lock(&lck);
}
if (locks & ERTS_PROC_LOCK_MSGQ) {
lck.id = lc_id.proc_lock_msgq;
erts_lc_lock(&lck);
}
if (locks & ERTS_PROC_LOCK_STATUS) {
lck.id = lc_id.proc_lock_status;
erts_lc_lock(&lck);
}
}
void
erts_proc_lc_trylock(Process *p, ErtsProcLocks locks, int locked)
{
erts_lc_lock_t lck = ERTS_LC_LOCK_INIT(-1,
p->id,
ERTS_LC_FLG_LT_PROCLOCK);
if (locks & ERTS_PROC_LOCK_MAIN) {
lck.id = lc_id.proc_lock_main;
erts_lc_trylock(locked, &lck);
}
if (locks & ERTS_PROC_LOCK_LINK) {
lck.id = lc_id.proc_lock_link;
erts_lc_trylock(locked, &lck);
}
if (locks & ERTS_PROC_LOCK_MSGQ) {
lck.id = lc_id.proc_lock_msgq;
erts_lc_trylock(locked, &lck);
}
if (locks & ERTS_PROC_LOCK_STATUS) {
lck.id = lc_id.proc_lock_status;
erts_lc_trylock(locked, &lck);
}
}
void
erts_proc_lc_unlock(Process *p, ErtsProcLocks locks)
{
erts_lc_lock_t lck = ERTS_LC_LOCK_INIT(-1,
p->id,
ERTS_LC_FLG_LT_PROCLOCK);
if (locks & ERTS_PROC_LOCK_STATUS) {
lck.id = lc_id.proc_lock_status;
erts_lc_unlock(&lck);
}
if (locks & ERTS_PROC_LOCK_MSGQ) {
lck.id = lc_id.proc_lock_msgq;
erts_lc_unlock(&lck);
}
if (locks & ERTS_PROC_LOCK_LINK) {
lck.id = lc_id.proc_lock_link;
erts_lc_unlock(&lck);
}
if (locks & ERTS_PROC_LOCK_MAIN) {
lck.id = lc_id.proc_lock_main;
erts_lc_unlock(&lck);
}
}
void
erts_proc_lc_might_unlock(Process *p, ErtsProcLocks locks)
{
erts_lc_lock_t lck = ERTS_LC_LOCK_INIT(-1,
p->id,
ERTS_LC_FLG_LT_PROCLOCK);
if (locks & ERTS_PROC_LOCK_STATUS) {
lck.id = lc_id.proc_lock_status;
erts_lc_might_unlock(&lck);
}
if (locks & ERTS_PROC_LOCK_MSGQ) {
lck.id = lc_id.proc_lock_msgq;
erts_lc_might_unlock(&lck);
}
if (locks & ERTS_PROC_LOCK_LINK) {
lck.id = lc_id.proc_lock_link;
erts_lc_might_unlock(&lck);
}
if (locks & ERTS_PROC_LOCK_MAIN) {
lck.id = lc_id.proc_lock_main;
erts_lc_might_unlock(&lck);
}
}
void
erts_proc_lc_require_lock(Process *p, ErtsProcLocks locks)
{
erts_lc_lock_t lck = ERTS_LC_LOCK_INIT(-1,
p->id,
ERTS_LC_FLG_LT_PROCLOCK);
if (locks & ERTS_PROC_LOCK_MAIN) {
lck.id = lc_id.proc_lock_main;
erts_lc_require_lock(&lck);
}
if (locks & ERTS_PROC_LOCK_LINK) {
lck.id = lc_id.proc_lock_link;
erts_lc_require_lock(&lck);
}
if (locks & ERTS_PROC_LOCK_MSGQ) {
lck.id = lc_id.proc_lock_msgq;
erts_lc_require_lock(&lck);
}
if (locks & ERTS_PROC_LOCK_STATUS) {
lck.id = lc_id.proc_lock_status;
erts_lc_require_lock(&lck);
}
}
void
erts_proc_lc_unrequire_lock(Process *p, ErtsProcLocks locks)
{
erts_lc_lock_t lck = ERTS_LC_LOCK_INIT(-1,
p->id,
ERTS_LC_FLG_LT_PROCLOCK);
if (locks & ERTS_PROC_LOCK_STATUS) {
lck.id = lc_id.proc_lock_status;
erts_lc_unrequire_lock(&lck);
}
if (locks & ERTS_PROC_LOCK_MSGQ) {
lck.id = lc_id.proc_lock_msgq;
erts_lc_unrequire_lock(&lck);
}
if (locks & ERTS_PROC_LOCK_LINK) {
lck.id = lc_id.proc_lock_link;
erts_lc_unrequire_lock(&lck);
}
if (locks & ERTS_PROC_LOCK_MAIN) {
lck.id = lc_id.proc_lock_main;
erts_lc_unrequire_lock(&lck);
}
}
int
erts_proc_lc_trylock_force_busy(Process *p, ErtsProcLocks locks)
{
if (locks & ERTS_PROC_LOCKS_ALL) {
erts_lc_lock_t lck = ERTS_LC_LOCK_INIT(-1,
p->id,
ERTS_LC_FLG_LT_PROCLOCK);
if (locks & ERTS_PROC_LOCK_MAIN)
lck.id = lc_id.proc_lock_main;
else if (locks & ERTS_PROC_LOCK_LINK)
lck.id = lc_id.proc_lock_link;
else if (locks & ERTS_PROC_LOCK_MSGQ)
lck.id = lc_id.proc_lock_msgq;
else if (locks & ERTS_PROC_LOCK_STATUS)
lck.id = lc_id.proc_lock_status;
else
erts_lc_fail("Unknown proc lock found");
return erts_lc_trylock_force_busy(&lck);
}
return 0;
}
void erts_proc_lc_chk_only_proc_main(Process *p)
{
erts_lc_lock_t proc_main = ERTS_LC_LOCK_INIT(lc_id.proc_lock_main,
p->id,
ERTS_LC_FLG_LT_PROCLOCK);
erts_lc_check_exact(&proc_main, 1);
}
#define ERTS_PROC_LC_EMPTY_LOCK_INIT \
ERTS_LC_LOCK_INIT(-1, THE_NON_VALUE, ERTS_LC_FLG_LT_PROCLOCK)
void
erts_proc_lc_chk_have_proc_locks(Process *p, ErtsProcLocks locks)
{
int have_locks_len = 0;
erts_lc_lock_t have_locks[4] = {ERTS_PROC_LC_EMPTY_LOCK_INIT,
ERTS_PROC_LC_EMPTY_LOCK_INIT,
ERTS_PROC_LC_EMPTY_LOCK_INIT,
ERTS_PROC_LC_EMPTY_LOCK_INIT};
if (locks & ERTS_PROC_LOCK_MAIN) {
have_locks[have_locks_len].id = lc_id.proc_lock_main;
have_locks[have_locks_len++].extra = p->id;
}
if (locks & ERTS_PROC_LOCK_LINK) {
have_locks[have_locks_len].id = lc_id.proc_lock_link;
have_locks[have_locks_len++].extra = p->id;
}
if (locks & ERTS_PROC_LOCK_MSGQ) {
have_locks[have_locks_len].id = lc_id.proc_lock_msgq;
have_locks[have_locks_len++].extra = p->id;
}
if (locks & ERTS_PROC_LOCK_STATUS) {
have_locks[have_locks_len].id = lc_id.proc_lock_status;
have_locks[have_locks_len++].extra = p->id;
}
erts_lc_check(have_locks, have_locks_len, NULL, 0);
}
void
erts_proc_lc_chk_proc_locks(Process *p, ErtsProcLocks locks)
{
int have_locks_len = 0;
int have_not_locks_len = 0;
erts_lc_lock_t have_locks[4] = {ERTS_PROC_LC_EMPTY_LOCK_INIT,
ERTS_PROC_LC_EMPTY_LOCK_INIT,
ERTS_PROC_LC_EMPTY_LOCK_INIT,
ERTS_PROC_LC_EMPTY_LOCK_INIT};
erts_lc_lock_t have_not_locks[4] = {ERTS_PROC_LC_EMPTY_LOCK_INIT,
ERTS_PROC_LC_EMPTY_LOCK_INIT,
ERTS_PROC_LC_EMPTY_LOCK_INIT,
ERTS_PROC_LC_EMPTY_LOCK_INIT};
if (locks & ERTS_PROC_LOCK_MAIN) {
have_locks[have_locks_len].id = lc_id.proc_lock_main;
have_locks[have_locks_len++].extra = p->id;
}
else {
have_not_locks[have_not_locks_len].id = lc_id.proc_lock_main;
have_not_locks[have_not_locks_len++].extra = p->id;
}
if (locks & ERTS_PROC_LOCK_LINK) {
have_locks[have_locks_len].id = lc_id.proc_lock_link;
have_locks[have_locks_len++].extra = p->id;
}
else {
have_not_locks[have_not_locks_len].id = lc_id.proc_lock_link;
have_not_locks[have_not_locks_len++].extra = p->id;
}
if (locks & ERTS_PROC_LOCK_MSGQ) {
have_locks[have_locks_len].id = lc_id.proc_lock_msgq;
have_locks[have_locks_len++].extra = p->id;
}
else {
have_not_locks[have_not_locks_len].id = lc_id.proc_lock_msgq;
have_not_locks[have_not_locks_len++].extra = p->id;
}
if (locks & ERTS_PROC_LOCK_STATUS) {
have_locks[have_locks_len].id = lc_id.proc_lock_status;
have_locks[have_locks_len++].extra = p->id;
}
else {
have_not_locks[have_not_locks_len].id = lc_id.proc_lock_status;
have_not_locks[have_not_locks_len++].extra = p->id;
}
erts_lc_check(have_locks, have_locks_len,
have_not_locks, have_not_locks_len);
}
ErtsProcLocks
erts_proc_lc_my_proc_locks(Process *p)
{
int resv[4];
erts_lc_lock_t locks[4] = {ERTS_LC_LOCK_INIT(lc_id.proc_lock_main,
p->id,
ERTS_LC_FLG_LT_PROCLOCK),
ERTS_LC_LOCK_INIT(lc_id.proc_lock_link,
p->id,
ERTS_LC_FLG_LT_PROCLOCK),
ERTS_LC_LOCK_INIT(lc_id.proc_lock_msgq,
p->id,
ERTS_LC_FLG_LT_PROCLOCK),
ERTS_LC_LOCK_INIT(lc_id.proc_lock_status,
p->id,
ERTS_LC_FLG_LT_PROCLOCK)};
ErtsProcLocks res = 0;
erts_lc_have_locks(resv, locks, 4);
if (resv[0])
res |= ERTS_PROC_LOCK_MAIN;
if (resv[1])
res |= ERTS_PROC_LOCK_LINK;
if (resv[2])
res |= ERTS_PROC_LOCK_MSGQ;
if (resv[3])
res |= ERTS_PROC_LOCK_STATUS;
return res;
}
void
erts_proc_lc_chk_no_proc_locks(char *file, int line)
{
int resv[4];
int ids[4] = {lc_id.proc_lock_main,
lc_id.proc_lock_link,
lc_id.proc_lock_msgq,
lc_id.proc_lock_status};
erts_lc_have_lock_ids(resv, ids, 4);
if (resv[0] || resv[1] || resv[2] || resv[3]) {
erts_lc_fail("%s:%d: Thread has process locks locked when expected "
"not to have any process locks locked",
file, line);
}
}
#endif /* #ifdef ERTS_ENABLE_LOCK_CHECK */
#ifdef ERTS_PROC_LOCK_HARD_DEBUG
void
check_queue(erts_proc_lock_t *lck)
{
int lock_no;
ErtsProcLocks lflgs = ERTS_PROC_LOCK_FLGS_READ_(lck);
for (lock_no = 0; lock_no <= ERTS_PROC_LOCK_MAX_BIT; lock_no++) {
ErtsProcLocks wtr;
wtr = (((ErtsProcLocks) 1) << lock_no) << ERTS_PROC_LOCK_WAITER_SHIFT;
if (lflgs & wtr) {
int n;
erts_tse_t *wtr;
ERTS_LC_ASSERT(lck->queue[lock_no]);
wtr = lck->queue[lock_no];
n = 0;
do {
wtr = wtr->next;
n++;
} while (wtr != lck->queue[lock_no]);
do {
wtr = wtr->prev;
n--;
} while (wtr != lck->queue[lock_no]);
ERTS_LC_ASSERT(n == 0);
}
else {
ERTS_LC_ASSERT(!lck->queue[lock_no]);
}
}
}
#endif
#endif /* ERTS_SMP (the whole file) */