From 6e24f2dcb8095aeed157d66204a2e5bace90863b Mon Sep 17 00:00:00 2001
From: Rickard Green <rickard@erlang.org>
Date: Wed, 25 Jul 2012 22:27:26 +0200
Subject: Use static allocation of process lock queues

By using statically allocated lock queues there is no longer
any need for locking corresponding pix lock when process
locks have been transferred after a wait. This costs us 3 words
extra in process structure, but improves performance during
contention.
---
 erts/emulator/beam/erl_alloc.types    |   1 -
 erts/emulator/beam/erl_lock_check.c   |   3 -
 erts/emulator/beam/erl_process_lock.c | 168 ++++++++--------------------------
 erts/emulator/beam/erl_process_lock.h |   4 +-
 4 files changed, 39 insertions(+), 137 deletions(-)

(limited to 'erts/emulator')

diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types
index bba6b83ac6..d3c0e54946 100644
--- a/erts/emulator/beam/erl_alloc.types
+++ b/erts/emulator/beam/erl_alloc.types
@@ -291,7 +291,6 @@ type	PORT_LOCK	STANDARD	SYSTEM		port_lock
 type	DRIVER_LOCK	STANDARD	SYSTEM		driver_lock
 type	XPORTS_LIST	SHORT_LIVED	SYSTEM		extra_port_list
 type	PROC_LCK_WTR	LONG_LIVED	SYSTEM		proc_lock_waiter
-type	PROC_LCK_QS	LONG_LIVED	SYSTEM		proc_lock_queues
 type	RUNQ_BLNS	LONG_LIVED	SYSTEM		run_queue_balancing
 type	THR_PRGR_IDATA	LONG_LIVED	SYSTEM		thr_prgr_internal_data
 type	THR_PRGR_DATA	LONG_LIVED	SYSTEM		thr_prgr_data
diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c
index a0f744be9d..b545ec07c0 100644
--- a/erts/emulator/beam/erl_lock_check.c
+++ b/erts/emulator/beam/erl_lock_check.c
@@ -175,9 +175,6 @@ static erts_lc_lock_order_t erts_lock_order[] = {
     {	"sched_stat",				NULL			},
 #endif
     {	"async_init_mtx",			NULL			},
-#ifdef ERTS_SMP
-    {	"proc_lck_qs_alloc",			NULL 			},
-#endif
 #ifdef __WIN32__
 #ifdef DEBUG
     {   "save_ops_lock",                        NULL                    },
diff --git a/erts/emulator/beam/erl_process_lock.c b/erts/emulator/beam/erl_process_lock.c
index a5a753b798..dde2e7bce6 100644
--- a/erts/emulator/beam/erl_process_lock.c
+++ b/erts/emulator/beam/erl_process_lock.c
@@ -90,16 +90,6 @@ static void check_queue(erts_proc_lock_t *lck);
 #error "The size of the 'uflgs' field of the erts_tse_t type is too small"
 #endif
 
-struct erts_proc_lock_queues_t_ {
-    erts_proc_lock_queues_t *next;
-    erts_tse_t *queue[ERTS_PROC_LOCK_MAX_BIT+1];
-};
-
-static erts_proc_lock_queues_t zeroqs = {0};
-
-static erts_smp_spinlock_t qs_lock;
-static erts_proc_lock_queues_t *queue_free_list;
-
 #ifdef ERTS_ENABLE_LOCK_CHECK
 static struct {
     Sint16 proc_lock_main;
@@ -120,7 +110,6 @@ void
 erts_init_proc_lock(int cpus)
 {
     int i;
-    erts_smp_spinlock_init(&qs_lock, "proc_lck_qs_alloc");
     for (i = 0; i < ERTS_NO_OF_PIX_LOCKS; i++) {
 #ifdef ERTS_ENABLE_LOCK_COUNT
 	erts_mtx_init_x(&erts_pix_locks[i].u.mtx,
@@ -129,7 +118,6 @@ erts_init_proc_lock(int cpus)
 	erts_mtx_init(&erts_pix_locks[i].u.mtx, "pix_lock");
 #endif
     }
-    queue_free_list = NULL;
     erts_thr_install_exit_handler(cleanup_tse);
 #ifdef ERTS_ENABLE_LOCK_CHECK
     lc_id.proc_lock_main	= erts_lc_get_lock_order_id("proc_main");
@@ -156,16 +144,7 @@ erts_init_proc_lock(int cpus)
 }
 
 #ifdef ERTS_ENABLE_LOCK_CHECK
-static void
-check_unused_tse(erts_tse_t *wtr)
-{
-    int i;
-    erts_proc_lock_queues_t *queues = wtr->udata;
-    ERTS_LC_ASSERT(wtr->uflgs == 0);
-    for (i = 0; i <= ERTS_PROC_LOCK_MAX_BIT; i++)
-	ERTS_LC_ASSERT(!queues->queue[i]);
-}
-#define CHECK_UNUSED_TSE(W) check_unused_tse((W))
+#define CHECK_UNUSED_TSE(W) ERTS_LC_ASSERT((W)->uflgs == 0)
 #else
 #define CHECK_UNUSED_TSE(W)
 #endif
@@ -174,56 +153,21 @@ static ERTS_INLINE erts_tse_t *
 tse_fetch(erts_pix_lock_t *pix_lock)
 {
     erts_tse_t *tse = erts_tse_fetch();
-    if (!tse->udata) {
-	erts_proc_lock_queues_t *qs;
-#if ERTS_PROC_LOCK_SPINLOCK_IMPL && !ERTS_PROC_LOCK_ATOMIC_IMPL
-	if (pix_lock)
-	    erts_pix_unlock(pix_lock);
-#endif
-	erts_smp_spin_lock(&qs_lock);
-	qs = queue_free_list;
-	if (qs) {
-	    queue_free_list = queue_free_list->next;
-	    erts_smp_spin_unlock(&qs_lock);
-	}
-	else {
-	    erts_smp_spin_unlock(&qs_lock);
-	    qs = erts_alloc(ERTS_ALC_T_PROC_LCK_QS,
-			    sizeof(erts_proc_lock_queues_t));
-	    sys_memcpy((void *) qs,
-		       (void *) &zeroqs,
-		       sizeof(erts_proc_lock_queues_t));
-	}
-	tse->udata = qs;
-#if ERTS_PROC_LOCK_SPINLOCK_IMPL && !ERTS_PROC_LOCK_ATOMIC_IMPL
-	if (pix_lock)
-	    erts_pix_lock(pix_lock);
-#endif
-    }
     tse->uflgs = 0;
     return tse;
 }
 
 static ERTS_INLINE void
-tse_return(erts_tse_t *tse, int force_free_q)
+tse_return(erts_tse_t *tse)
 {
     CHECK_UNUSED_TSE(tse);
-    if (force_free_q || erts_tse_is_tmp(tse)) {
-	erts_proc_lock_queues_t *qs = tse->udata;
-	ASSERT(qs);
-	erts_smp_spin_lock(&qs_lock);
-	qs->next = queue_free_list;
-	queue_free_list = qs;
-	erts_smp_spin_unlock(&qs_lock);
-	tse->udata = NULL;
-    }
     erts_tse_return(tse);
 }
 
 void
 erts_proc_lock_prepare_proc_lock_waiter(void)
 {
-    tse_return(tse_fetch(NULL), 0);
+    tse_return(tse_fetch(NULL));
 }
 
 
@@ -231,55 +175,49 @@ static void
 cleanup_tse(void)
 {
     erts_tse_t *tse = erts_tse_fetch();
-    if (tse) {
-	if (tse->udata)
-	    tse_return(tse, 1);
-	else
-	    erts_tse_return(tse);
-    }
+    if (tse)
+	erts_tse_return(tse);
 }
 
 
 /*
  * Waiters are queued in a circular double linked list;
- * where qs->queue[lock_ix] is the first waiter in queue, and
- * qs->queue[lock_ix]->prev is the last waiter in queue.
+ * where lck->queue[lock_ix] is the first waiter in queue, and
+ * lck->queue[lock_ix]->prev is the last waiter in queue.
  */
 
 static ERTS_INLINE void
-enqueue_waiter(erts_proc_lock_queues_t *qs,
-	       int ix,
-	       erts_tse_t *wtr)
+enqueue_waiter(erts_proc_lock_t *lck, int ix, erts_tse_t *wtr)
 {
-    if (!qs->queue[ix]) {
-	qs->queue[ix] = wtr;
+    if (!lck->queue[ix]) {
+	lck->queue[ix] = wtr;
 	wtr->next = wtr;
 	wtr->prev = wtr;
     }
     else {
-	ERTS_LC_ASSERT(qs->queue[ix]->next && qs->queue[ix]->prev);
-	wtr->next = qs->queue[ix];
-	wtr->prev = qs->queue[ix]->prev;
+	ERTS_LC_ASSERT(lck->queue[ix]->next && lck->queue[ix]->prev);
+	wtr->next = lck->queue[ix];
+	wtr->prev = lck->queue[ix]->prev;
 	wtr->prev->next = wtr;
-	qs->queue[ix]->prev = wtr;
+	lck->queue[ix]->prev = wtr;
     }
 }
 
 static erts_tse_t *
-dequeue_waiter(erts_proc_lock_queues_t *qs, int ix)
+dequeue_waiter(erts_proc_lock_t *lck, int ix)
 {
-    erts_tse_t *wtr = qs->queue[ix];
-    ERTS_LC_ASSERT(qs->queue[ix]);
+    erts_tse_t *wtr = lck->queue[ix];
+    ERTS_LC_ASSERT(lck->queue[ix]);
     if (wtr->next == wtr) {
-	ERTS_LC_ASSERT(qs->queue[ix]->prev == wtr);
-	qs->queue[ix] = NULL;
+	ERTS_LC_ASSERT(lck->queue[ix]->prev == wtr);
+	lck->queue[ix] = NULL;
     }
     else {
 	ERTS_LC_ASSERT(wtr->next != wtr);
 	ERTS_LC_ASSERT(wtr->prev != wtr);
 	wtr->next->prev = wtr->prev;
 	wtr->prev->next = wtr->next;
-	qs->queue[ix] = wtr->next;
+	lck->queue[ix] = wtr->next;
     }
     return wtr;
 }
@@ -300,19 +238,18 @@ try_aquire(erts_proc_lock_t *lck, erts_tse_t *wtr)
     ErtsProcLocks locks = wtr->uflgs;
     int lock_no;
 
-    ERTS_LC_ASSERT(lck->queues);
     ERTS_LC_ASSERT(got_locks != locks);
 
     for (lock_no = 0; lock_no <= ERTS_PROC_LOCK_MAX_BIT; lock_no++) {
 	ErtsProcLocks lock = ((ErtsProcLocks) 1) << lock_no;
 	if (locks & lock) {
 	    ErtsProcLocks wflg, old_lflgs;
-	    if (lck->queues->queue[lock_no]) {
+	    if (lck->queue[lock_no]) {
 		/* Others already waiting */
 	    enqueue:
 		ERTS_LC_ASSERT(ERTS_PROC_LOCK_FLGS_READ_(lck)
 			       & (lock << ERTS_PROC_LOCK_WAITER_SHIFT));
-		enqueue_waiter(lck->queues, lock_no, wtr);
+		enqueue_waiter(lck, lock_no, wtr);
 		break;
 	    }
 	    wflg = lock << ERTS_PROC_LOCK_WAITER_SHIFT;
@@ -364,7 +301,6 @@ transfer_locks(Process *p,
     for (lock_no = 0; tlocks && lock_no <= ERTS_PROC_LOCK_MAX_BIT; lock_no++) {
 	ErtsProcLocks lock = ((ErtsProcLocks) 1) << lock_no;
 	if (tlocks & lock) {
-	    erts_proc_lock_queues_t *qs = p->lock.queues;
 	    /* Transfer lock */
 #ifdef ERTS_ENABLE_LOCK_CHECK
 	    tlocks &= ~lock;
@@ -372,9 +308,9 @@ transfer_locks(Process *p,
 	    ERTS_LC_ASSERT(ERTS_PROC_LOCK_FLGS_READ_(&p->lock)
 			   & (lock << ERTS_PROC_LOCK_WAITER_SHIFT));
 	    transferred++;
-	    wtr = dequeue_waiter(qs, lock_no);
+	    wtr = dequeue_waiter(&p->lock, lock_no);
 	    ERTS_LC_ASSERT(wtr);
-	    if (!qs->queue[lock_no])
+	    if (!p->lock.queue[lock_no])
 		unset_waiter |= lock;
 	    ERTS_LC_ASSERT(wtr->uflgs & lock);
 	    wtr->uflgs &= ~lock;
@@ -463,7 +399,6 @@ wait_for_locks(Process *p,
 {
     erts_pix_lock_t *pix_lock = pixlck ? pixlck : ERTS_PID2PIXLOCK(p->id);
     erts_tse_t *wtr;
-    erts_proc_lock_queues_t *qs;
 
     /* Acquire a waiter object on which this thread can wait. */
     wtr = tse_fetch(pix_lock);
@@ -479,18 +414,6 @@ wait_for_locks(Process *p,
 
     ERTS_LC_ASSERT(erts_lc_pix_lock_is_locked(pix_lock));
 
-    qs = wtr->udata;
-    ASSERT(qs);
-    /* Provide the process with waiter queues, if it doesn't have one. */
-    if (!p->lock.queues) {
-	qs->next = NULL;
-	p->lock.queues = qs;
-    }
-    else {
-	qs->next = p->lock.queues->next;
-	p->lock.queues->next = qs;
-    }
-
 #ifdef ERTS_PROC_LOCK_HARD_DEBUG
     check_queue(&p->lock);
 #endif
@@ -504,7 +427,9 @@ wait_for_locks(Process *p,
     check_queue(&p->lock);
 #endif
 
-    if (wtr->uflgs) {
+    if (wtr->uflgs == 0)
+	erts_pix_unlock(pix_lock);
+    else {
 	/* We didn't get them all; need to wait... */
 
 	ASSERT((wtr->uflgs & ~ERTS_PROC_LOCKS_ALL) == 0);
@@ -529,28 +454,12 @@ wait_for_locks(Process *p,
 	    } while (res != 0);
 	}
 
-	erts_pix_lock(pix_lock);
-
 	ASSERT(wtr->uflgs == 0);
     }
 
-    /* Recover some queues to store in the waiter. */
-    ERTS_LC_ASSERT(p->lock.queues);
-    if (p->lock.queues->next) {
-	qs = p->lock.queues->next;
-	p->lock.queues->next = qs->next;
-    }
-    else {
-	qs = p->lock.queues;
-	p->lock.queues = NULL;
-    }
-    wtr->udata = qs;
-
-    erts_pix_unlock(pix_lock);
-
     ERTS_LC_ASSERT(locks == (ERTS_PROC_LOCK_FLGS_READ_(&p->lock) & locks));
 
-    tse_return(wtr, 0);
+    tse_return(wtr);
 }
 
 /*
@@ -971,6 +880,7 @@ erts_pid2proc_safelock(Process *c_p,
 void
 erts_proc_lock_init(Process *p)
 {
+    int i;
     /* We always start with all locks locked */
 #if ERTS_PROC_LOCK_ATOMIC_IMPL
     erts_smp_atomic32_init_nob(&p->lock.flags,
@@ -978,7 +888,8 @@ erts_proc_lock_init(Process *p)
 #else
     p->lock.flags = ERTS_PROC_LOCKS_ALL;
 #endif
-    p->lock.queues = NULL;
+    for (i = 0; i <= ERTS_PROC_LOCK_MAX_BIT; i++)
+	p->lock.queue[i] = NULL;
     p->lock.refc = 1;
 #ifdef ERTS_ENABLE_LOCK_COUNT
     erts_lcnt_proc_lock_init(p);
@@ -990,11 +901,8 @@ erts_proc_lock_init(Process *p)
     erts_proc_lc_trylock(p, ERTS_PROC_LOCKS_ALL, 1);
 #endif
 #ifdef ERTS_PROC_LOCK_DEBUG
-    {
-	int i;
-	for (i = 0; i <= ERTS_PROC_LOCK_MAX_BIT; i++)
-	    erts_smp_atomic32_init_nob(&p->lock.locked[i], (erts_aint32_t) 1);
-    }
+    for (i = 0; i <= ERTS_PROC_LOCK_MAX_BIT; i++)
+	erts_smp_atomic32_init_nob(&p->lock.locked[i], (erts_aint32_t) 1);
 #endif
 }
 
@@ -1437,21 +1345,21 @@ check_queue(erts_proc_lock_t *lck)
 	if (lflgs & wtr) {
 	    int n;
 	    erts_tse_t *wtr;
-	    ERTS_LC_ASSERT(lck->queues && lck->queues->queue[lock_no]);
-	    wtr = lck->queues->queue[lock_no];
+	    ERTS_LC_ASSERT(lck->queue[lock_no]);
+	    wtr = lck->queue[lock_no];
 	    n = 0;
 	    do {
 		wtr = wtr->next;
 		n++;
-	    } while (wtr != lck->queues->queue[lock_no]);
+	    } while (wtr != lck->queue[lock_no]);
 	    do {
 		wtr = wtr->prev;
 		n--;
-	    } while (wtr != lck->queues->queue[lock_no]);
+	    } while (wtr != lck->queue[lock_no]);
 	    ERTS_LC_ASSERT(n == 0);
 	}
 	else {
-	    ERTS_LC_ASSERT(!lck->queues || !lck->queues->queue[lock_no]);
+	    ERTS_LC_ASSERT(!lck->queue[lock_no]);
 	}
     }
 }
diff --git a/erts/emulator/beam/erl_process_lock.h b/erts/emulator/beam/erl_process_lock.h
index 8dbdaccc68..51282ed872 100644
--- a/erts/emulator/beam/erl_process_lock.h
+++ b/erts/emulator/beam/erl_process_lock.h
@@ -56,15 +56,13 @@
 
 typedef erts_aint32_t ErtsProcLocks;
 
-typedef struct erts_proc_lock_queues_t_ erts_proc_lock_queues_t;
-
 typedef struct erts_proc_lock_t_ {
 #if ERTS_PROC_LOCK_ATOMIC_IMPL
     erts_smp_atomic32_t flags;
 #else
     ErtsProcLocks flags;
 #endif
-    erts_proc_lock_queues_t *queues;
+    erts_tse_t *queue[ERTS_PROC_LOCK_MAX_BIT+1];
     Sint32 refc;
 #ifdef ERTS_PROC_LOCK_DEBUG
     erts_smp_atomic32_t locked[ERTS_PROC_LOCK_MAX_BIT+1];
-- 
cgit v1.2.3