aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/Makefile.in2
-rw-r--r--erts/emulator/beam/erl_alloc.c5
-rw-r--r--erts/emulator/beam/erl_alloc.types22
-rw-r--r--erts/emulator/beam/erl_init.c2
-rw-r--r--erts/emulator/beam/erl_thr_queue.c745
-rw-r--r--erts/emulator/beam/erl_thr_queue.h211
-rw-r--r--erts/emulator/beam/utils.c1
7 files changed, 987 insertions, 1 deletions
diff --git a/erts/emulator/Makefile.in b/erts/emulator/Makefile.in
index 6ccad081e5..15244a6589 100644
--- a/erts/emulator/Makefile.in
+++ b/erts/emulator/Makefile.in
@@ -743,7 +743,7 @@ RUN_OBJS = \
$(OBJDIR)/packet_parser.o $(OBJDIR)/safe_hash.o \
$(OBJDIR)/erl_zlib.o $(OBJDIR)/erl_nif.o \
$(OBJDIR)/erl_bif_binary.o $(OBJDIR)/erl_ao_firstfit_alloc.o \
- $(OBJDIR)/erl_sched_spec_pre_alloc.o
+ $(OBJDIR)/erl_thr_queue.o $(OBJDIR)/erl_sched_spec_pre_alloc.o
ifeq ($(TARGET),win32)
DRV_OBJS = \
diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c
index 705ace26fa..cce4b4adf0 100644
--- a/erts/emulator/beam/erl_alloc.c
+++ b/erts/emulator/beam/erl_alloc.c
@@ -41,6 +41,7 @@
#include "erl_monitors.h"
#include "erl_bif_timer.h"
#include "erl_cpu_topology.h"
+#include "erl_thr_queue.h"
#if defined(ERTS_ALC_T_DRV_SEL_D_STATE) || defined(ERTS_ALC_T_DRV_EV_D_STATE)
#include "erl_check_io.h"
#endif
@@ -524,6 +525,10 @@ erts_alloc_init(int *argc, char **argv, ErtsAllocInitOpts *eaiop)
= sizeof(ErtsDrvSelectDataState);
fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_MSG_REF)]
= sizeof(ErlMessage);
+#ifdef ERTS_SMP
+ fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_THR_Q_EL_SL)]
+ = sizeof(ErtsThrQElement_t);
+#endif
#ifdef HARD_DEBUG
hdbg_init();
#endif
diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types
index 79d3433fc0..4efad0197b 100644
--- a/erts/emulator/beam/erl_alloc.types
+++ b/erts/emulator/beam/erl_alloc.types
@@ -50,6 +50,15 @@
# command line argument to make_alloc_types. The variable X is false
# after a "+disable X" statement or if it has never been mentioned.
++if smp
++disable threads_no_smp
++else
++if threads
++enable threads_no_smp
++else
++disable threads_no_smp
++endif
++endif
# --- Allocator declarations -------------------------------------------------
#
@@ -254,6 +263,19 @@ type ZLIB STANDARD SYSTEM zlib
type CPU_GRPS_MAP LONG_LIVED SYSTEM cpu_groups_map
type AUX_WORK_TMO LONG_LIVED SYSTEM aux_work_timeouts
++if threads_no_smp
+# Need thread safe allocs, but std_alloc and fix_alloc are not;
+# use driver_alloc which is...
+type THR_Q_EL DRIVER SYSTEM thr_q_element
+type THR_Q_EL_SL DRIVER SYSTEM sl_thr_q_element
++else
+type THR_Q_EL STANDARD SYSTEM thr_q_element
+type THR_Q_EL_SL FIXED_SIZE SYSTEM sl_thr_q_element
++endif
+type THR_Q STANDARD SYSTEM thr_queue
+type THR_Q_SL SHORT_LIVED SYSTEM short_lived_thr_queue
+type THR_Q_LL LONG_LIVED SYSTEM long_lived_thr_queue
+
+if smp
type ASYNC SHORT_LIVED SYSTEM async
+else
diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c
index f4f2a4d011..5fe44afdce 100644
--- a/erts/emulator/beam/erl_init.c
+++ b/erts/emulator/beam/erl_init.c
@@ -43,6 +43,7 @@
#include "packet_parser.h"
#include "erl_cpu_topology.h"
#include "erl_thr_progress.h"
+#include "erl_thr_queue.h"
#ifdef HIPE
#include "hipe_mode_switch.h" /* for hipe_mode_switch_init() */
@@ -786,6 +787,7 @@ early_init(int *argc, char **argv) /*
*/
erts_thr_progress_init(no_schedulers, no_schedulers+1, 0);
#endif
+ erts_thr_q_init();
erts_init_utils();
erts_early_init_cpu_topology(no_schedulers,
&max_main_threads,
diff --git a/erts/emulator/beam/erl_thr_queue.c b/erts/emulator/beam/erl_thr_queue.c
new file mode 100644
index 0000000000..9ac4cd4b8e
--- /dev/null
+++ b/erts/emulator/beam/erl_thr_queue.c
@@ -0,0 +1,745 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2011. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+/*
+ * Description: Lock-free queue for communication between threads.
+ *
+ * Currently only a many-to-one version has been,
+ * implemented, i.e., many threads can enqueue but
+ * only one thread can dequeue at a time. It doesn't
+ * have to be the same thread dequeuing every time, but
+ * synchronization so that only one thread dequeues
+ * at a time has to be provided by other means.
+ *
+ * When/If the need for a many-to-many queue arises,
+ * this implementation can relatively easy be extended
+ * to support that too.
+ *
+ * Usage instructions below.
+ *
+ * Author: Rickard Green
+ */
+
+/*
+ * ------ Usage instructions -----------------------------------------------
+ *
+ * Dequeuing generates garbage that needs to be cleaned up.
+ * erts_thr_q_dequeue() automatically cleans, but garbage may have to be
+ * cleaned up also when the queue is empty. This is done by calling
+ * erts_thr_q_clean(). In the SMP case thread progress may have to be made
+ * before cleaning can continue. If so, erts_thr_q_need_thr_progress() in
+ * combination with erts_thr_progress_wakeup() can be used in order to
+ * request a wakeup at appropriate time.
+ *
+ * Enqueuing implies memory allocation and dequeuing implies memory
+ * deallocation. Memory allocation can be moved to another more suitable
+ * thread using erts_thr_q_prepare_enqueue() together with
+ * erts_thr_q_enqueue_prepared() instead of using erts_thr_q_enqueue().
+ * Memory deallocation can can be moved to another more suitable thread by
+ * disabling auto_finalize_dequeue when initializing the queue and then use
+ * erts_thr_q_get_finalize_dequeue_data() together
+ * erts_thr_q_finalize_dequeue() after dequeuing or cleaning.
+ *
+ * Ending the life of the queue using either erts_thr_q_destroy()
+ * or erts_thr_q_finalize() impies cleaning the queue. Both functions
+ * return the cleaning result and may have to be called multiple times
+ * until the queue is clean. Once one of these functions have been called
+ * enqueuing is not allowed. This has to be synchronized by the user.
+ * If auto_finalize_dequeue has been disabled, the finalize dequeue
+ * functionality has to be called after ending the life of the queue just
+ * as when dequeuing or cleaning on a queue that is alive.
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "erl_thr_queue.h"
+
+#if defined(DEBUG)
+#define ERTS_THR_Q_DBG_CHK_DATA 1
+#else
+#define ERTS_THR_Q_DBG_CHK_DATA 0
+#endif
+
+#define ERTS_THR_Q_MAX_CLEAN_REACHED_HEAD_COUNT 100
+#define ERTS_THR_Q_MAX_SCHED_CLEAN_OPS 50
+#define ERTS_THR_Q_MAX_DEQUEUE_CLEAN_OPS 3
+
+#define ERTS_THR_Q_MAX_FINI_DEQ_OPS 50
+
+#ifdef ERTS_SMP
+ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(sl_element,
+ ErtsThrQElement_t,
+ 1000,
+ ERTS_ALC_T_THR_Q_EL_SL)
+#else
+
+static void
+init_sl_element_alloc(void)
+{
+}
+
+static ErtsThrQElement_t *
+sl_element_alloc(void)
+{
+ return erts_alloc(ERTS_ALC_T_THR_Q_EL_SL,
+ sizeof(ErtsThrQElement_t));
+}
+
+static void
+sl_element_free(ErtsThrQElement_t *p)
+{
+ erts_free(ERTS_ALC_T_THR_Q_EL_SL, p);
+}
+
+#endif
+
+typedef union {
+ ErtsThrQ_t q;
+ char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQ_t))];
+} ErtsAlignedThrQ_t;
+
+void
+erts_thr_q_init(void)
+{
+ init_sl_element_alloc();
+}
+
+static void noop_callback(void *arg) { }
+
+void
+erts_thr_q_initialize(ErtsThrQ_t *q, ErtsThrQInit_t *qi)
+{
+#ifndef USE_THREADS
+ q->init = *qi;
+ if (!q->init.notify)
+ q->init.notify = noop_callback;
+ q->first = NULL;
+ q->last = NULL;
+ q->q.blk = NULL;
+#else
+ erts_atomic_init_nob(&q->tail.data.marker.next.atmc, ERTS_AINT_NULL);
+ q->tail.data.marker.data.ptr = NULL;
+ erts_atomic_init_nob(&q->tail.data.last,
+ (erts_aint_t) &q->tail.data.marker);
+ erts_atomic_init_nob(&q->tail.data.um_refc[0], 0);
+ erts_atomic_init_nob(&q->tail.data.um_refc[1], 0);
+ erts_atomic32_init_nob(&q->tail.data.um_refc_ix, 0);
+ q->tail.data.live = qi->live.objects;
+ q->tail.data.arg = qi->arg;
+ q->tail.data.notify = qi->notify;
+ if (!q->tail.data.notify)
+ q->tail.data.notify = noop_callback;
+
+ q->head.head.ptr = &q->tail.data.marker;
+ q->head.live = qi->live.objects;
+ q->head.first = &q->tail.data.marker;
+ q->head.unref_end = &q->tail.data.marker;
+ q->head.clean_reached_head_count = 0;
+ q->head.deq_fini.automatic = qi->auto_finalize_dequeue;
+ q->head.deq_fini.start = NULL;
+ q->head.deq_fini.end = NULL;
+#ifdef ERTS_SMP
+ q->head.next.thr_progress = erts_thr_progress_current();
+ q->head.next.thr_progress_reached = 1;
+#endif
+ q->head.next.um_refc_ix = 1;
+ q->head.next.unref_end = &q->tail.data.marker;
+ q->head.used_marker = 1;
+ q->head.arg = qi->arg;
+ q->head.notify = q->tail.data.notify;
+ q->q.finalizing = 0;
+ q->q.live = qi->live.queue;
+ q->q.blk = NULL;
+#endif
+}
+
+ErtsThrQCleanState_t
+erts_thr_q_finalize(ErtsThrQ_t *q)
+{
+#ifdef USE_THREADS
+ q->q.finalizing = 1;
+#endif
+ while (erts_thr_q_dequeue(q));
+ return erts_thr_q_clean(q);
+}
+
+ErtsThrQ_t *
+erts_thr_q_create(ErtsThrQInit_t *qi)
+{
+ ErtsAlcType_t atype;
+ ErtsThrQ_t *q, *qblk;
+ UWord qw;
+
+ switch (qi->live.queue) {
+ case ERTS_THR_Q_LIVE_SHORT:
+ atype = ERTS_ALC_T_THR_Q_SL;
+ break;
+ case ERTS_THR_Q_LIVE_LONG:
+ atype = ERTS_ALC_T_THR_Q_LL;
+ break;
+ default:
+ atype = ERTS_ALC_T_THR_Q;
+ break;
+ }
+
+ qw = (UWord) erts_alloc(atype,
+ sizeof(ErtsThrQ_t) + (ERTS_CACHE_LINE_SIZE-1));
+ qblk = (ErtsThrQ_t *) qw;
+ if (qw & ERTS_CACHE_LINE_MASK)
+ qw = (qw & ~ERTS_CACHE_LINE_MASK) + ERTS_CACHE_LINE_SIZE;
+ ASSERT((qw & ERTS_CACHE_LINE_MASK) == 0);
+ q = (ErtsThrQ_t *) qw;
+ erts_thr_q_initialize(q, qi);
+ q->q.blk = qblk;
+ return q;
+}
+
+ErtsThrQCleanState_t
+erts_thr_q_destroy(ErtsThrQ_t *q)
+{
+ if (!q->q.blk)
+ erl_exit(ERTS_ABORT_EXIT,
+ "Trying to destroy not created thread queue\n");
+ return erts_thr_q_finalize(q);
+}
+
+#ifdef USE_THREADS
+
+static void
+destroy(ErtsThrQ_t *q)
+{
+ ErtsAlcType_t atype;
+ switch (q->q.live) {
+ case ERTS_THR_Q_LIVE_SHORT:
+ atype = ERTS_ALC_T_THR_Q_SL;
+ break;
+ case ERTS_THR_Q_LIVE_LONG:
+ atype = ERTS_ALC_T_THR_Q_LL;
+ break;
+ default:
+ atype = ERTS_ALC_T_THR_Q;
+ break;
+ }
+ erts_free(atype, q->q.blk);
+}
+
+#endif
+
+static ERTS_INLINE ErtsThrQElement_t *
+element_live_alloc(ErtsThrQLive_t live)
+{
+ switch (live) {
+ case ERTS_THR_Q_LIVE_SHORT:
+ return sl_element_alloc();
+ default:
+ return (ErtsThrQElement_t *) erts_alloc(ERTS_ALC_T_THR_Q_EL,
+ sizeof(ErtsThrQElement_t));
+ }
+}
+
+static ERTS_INLINE ErtsThrQElement_t *
+element_alloc(ErtsThrQ_t *q)
+{
+ ErtsThrQLive_t live;
+#ifdef USE_THREADS
+ live = q->tail.data.live;
+#else
+ live = q->init.live.objects;
+#endif
+ return element_live_alloc(live);
+}
+
+static ERTS_INLINE void
+element_live_free(ErtsThrQLive_t live, ErtsThrQElement_t *el)
+{
+ switch (live) {
+ case ERTS_THR_Q_LIVE_SHORT:
+ sl_element_free(el);
+ break;
+ default:
+ erts_free(ERTS_ALC_T_THR_Q_EL, el);
+ }
+}
+
+static ERTS_INLINE void
+element_free(ErtsThrQ_t *q, ErtsThrQElement_t *el)
+{
+ ErtsThrQLive_t live;
+#ifdef USE_THREADS
+ live = q->head.live;
+#else
+ live = q->init.live.objects;
+#endif
+ element_live_free(live, el);
+}
+
+#ifdef USE_THREADS
+
+static ERTS_INLINE ErtsThrQElement_t *
+enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last)
+{
+ erts_aint_t ilast, itmp;
+
+ erts_atomic_init_nob(&this->next.atmc, ERTS_AINT_NULL);
+ /* Enqueue at end of list... */
+
+ ilast = erts_atomic_read_nob(&q->tail.data.last);
+ while (1) {
+ ErtsThrQElement_t *last = (ErtsThrQElement_t *) ilast;
+ itmp = erts_atomic_cmpxchg_mb(&last->next.atmc,
+ (erts_aint_t) this,
+ ERTS_AINT_NULL);
+ if (itmp == ERTS_AINT_NULL)
+ break;
+ ilast = itmp;
+ }
+
+ /* Move last pointer forward... */
+ while (1) {
+ if (want_last) {
+ if (erts_atomic_read_rb(&this->next.atmc) != ERTS_AINT_NULL) {
+ /* Someone else will move it forward */
+ ilast = erts_atomic_read_rb(&q->tail.data.last);
+ return (ErtsThrQElement_t *) ilast;
+ }
+ }
+ else {
+ if (erts_atomic_read_nob(&this->next.atmc) != ERTS_AINT_NULL) {
+ /* Someone else will move it forward */
+ return NULL;
+ }
+ }
+ itmp = erts_atomic_cmpxchg_mb(&q->tail.data.last,
+ (erts_aint_t) this,
+ ilast);
+ if (ilast == itmp)
+ return want_last ? this : NULL;
+ ilast = itmp;
+ }
+}
+
+static ErtsThrQCleanState_t
+clean(ErtsThrQ_t *q, int max_ops, int do_notify)
+{
+ erts_aint_t ilast;
+ int um_refc_ix;
+ int ops;
+
+ for (ops = 0; ops < max_ops; ops++) {
+ ErtsThrQElement_t *tmp;
+ restart:
+ ASSERT(q->head.first);
+ if (q->head.first == q->head.head.ptr) {
+ q->head.clean_reached_head_count++;
+ if (q->head.clean_reached_head_count
+ >= ERTS_THR_Q_MAX_CLEAN_REACHED_HEAD_COUNT) {
+ q->head.clean_reached_head_count = 0;
+ break;
+ }
+ goto inspect_head;
+ }
+ if (q->head.first == q->head.unref_end)
+ break;
+ if (q->head.first == &q->tail.data.marker) {
+ q->head.used_marker = 0;
+ q->head.first = q->head.first->next.ptr;
+ goto restart;
+ }
+ tmp = q->head.first;
+ q->head.first = q->head.first->next.ptr;
+ if (q->head.deq_fini.automatic)
+ element_free(q, tmp);
+ else {
+ tmp->data.ptr = (void *) (UWord) q->head.live;
+ if (!q->head.deq_fini.start)
+ q->head.deq_fini.start = tmp;
+ else if (q->head.deq_fini.end->next.ptr == &q->tail.data.marker)
+ q->head.deq_fini.end->next.ptr = tmp;
+ q->head.deq_fini.end = tmp;
+ }
+ }
+
+ ilast = erts_atomic_read_nob(&q->tail.data.last);
+ if (q->head.first == ((ErtsThrQElement_t *) ilast)
+ && ((ErtsThrQElement_t *) ilast) == &q->tail.data.marker
+ && q->head.first == &q->tail.data.marker) {
+ /* Empty and clean queue */
+ if (q->q.finalizing)
+ destroy(q);
+ return ERTS_THR_Q_CLEAN;
+ }
+
+#ifdef ERTS_SMP
+ if (q->head.next.thr_progress_reached
+ || erts_thr_progress_has_reached(q->head.next.thr_progress)) {
+ q->head.next.thr_progress_reached = 1;
+#endif
+ um_refc_ix = q->head.next.um_refc_ix;
+ if (erts_atomic_read_acqb(&q->tail.data.um_refc[um_refc_ix]) == 0) {
+ /* Move unreferenced end pointer forward... */
+ q->head.clean_reached_head_count = 0;
+ q->head.unref_end = q->head.next.unref_end;
+
+ if (!q->head.used_marker
+ && q->head.unref_end == (ErtsThrQElement_t *) ilast) {
+ q->head.used_marker = 1;
+ ilast = (erts_aint_t) enqueue_managed(q,
+ &q->tail.data.marker,
+ 1);
+ if (q->head.head.ptr == q->head.unref_end) {
+ ErtsThrQElement_t *next;
+ next = ((ErtsThrQElement_t *)
+ erts_atomic_read_acqb(&q->head.head.ptr->next.atmc));
+ if (next == &q->tail.data.marker) {
+ q->head.head.ptr->next.ptr = &q->tail.data.marker;
+ q->head.head.ptr = &q->tail.data.marker;
+ }
+ }
+ }
+
+ if (q->head.unref_end == (ErtsThrQElement_t *) ilast)
+ ERTS_THR_MEMORY_BARRIER;
+ else {
+ q->head.next.unref_end = (ErtsThrQElement_t *) ilast;
+ ERTS_THR_MEMORY_BARRIER;
+#ifdef ERTS_SMP
+ q->head.next.thr_progress = erts_thr_progress_later();
+#endif
+ erts_atomic32_set_relb(&q->tail.data.um_refc_ix,
+ um_refc_ix);
+ q->head.next.um_refc_ix = um_refc_ix == 0 ? 1 : 0;
+#ifdef ERTS_SMP
+ q->head.next.thr_progress_reached = 0;
+#endif
+ }
+ }
+#ifdef ERTS_SMP
+ }
+#endif
+
+ if (q->head.first == q->head.head.ptr) {
+ inspect_head:
+ if (!q->head.used_marker) {
+ erts_aint_t inext;
+ inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+ if (inext == ERTS_AINT_NULL) {
+ q->head.used_marker = 1;
+ (void) enqueue_managed(q, &q->tail.data.marker, 0);
+ inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+ if (inext == (erts_aint_t) &q->tail.data.marker) {
+ q->head.head.ptr->next.ptr = &q->tail.data.marker;
+ q->head.head.ptr = &q->tail.data.marker;
+#ifdef ERTS_SMP
+ if (!q->head.next.thr_progress_reached)
+ return ERTS_THR_Q_NEED_THR_PRGR;
+#else
+ if (do_notify)
+ q->head.notify(q->head.arg);
+#endif
+ return ERTS_THR_Q_DIRTY;
+ }
+ }
+ }
+ return ERTS_THR_Q_CLEAN;
+ }
+
+ if (q->head.first != q->head.unref_end) {
+ if (do_notify)
+ q->head.notify(q->head.arg);
+ return ERTS_THR_Q_DIRTY;
+ }
+
+#ifdef ERTS_SMP
+ if (!q->head.next.thr_progress_reached)
+ return ERTS_THR_Q_NEED_THR_PRGR;
+#endif
+
+ return ERTS_THR_Q_CLEAN; /* Waiting for unmanaged threads to complete... */
+}
+
+#endif
+
+ErtsThrQCleanState_t
+erts_thr_q_clean(ErtsThrQ_t *q)
+{
+#ifdef USE_THREADS
+ return clean(q, ERTS_THR_Q_MAX_SCHED_CLEAN_OPS, 0);
+#else
+ return ERTS_THR_Q_CLEAN;
+#endif
+}
+
+ErtsThrQCleanState_t
+erts_thr_q_inspect(ErtsThrQ_t *q, int ensure_empty)
+{
+#ifdef USE_THREADS
+ if (ensure_empty) {
+ erts_aint_t inext;
+ inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+ if (inext != ERTS_AINT_NULL) {
+ if (&q->tail.data.marker != (ErtsThrQElement_t *) inext)
+ return ERTS_THR_Q_DIRTY;
+ else {
+ q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext;
+ q->head.head.ptr = (ErtsThrQElement_t *) inext;
+ inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+ if (inext != ERTS_AINT_NULL)
+ return ERTS_THR_Q_DIRTY;
+ }
+ }
+ }
+
+ if (q->head.first == q->head.head.ptr) {
+ if (!q->head.used_marker) {
+ erts_aint_t inext;
+ inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+ if (inext == ERTS_AINT_NULL)
+ return ERTS_THR_Q_DIRTY;
+ }
+ return ERTS_THR_Q_CLEAN;
+ }
+
+ if (q->head.first != q->head.unref_end)
+ return ERTS_THR_Q_DIRTY;
+
+#ifdef ERTS_SMP
+ if (!q->head.next.thr_progress_reached)
+ return ERTS_THR_Q_NEED_THR_PRGR;
+#endif
+#endif
+ return ERTS_THR_Q_CLEAN;
+}
+
+static void
+enqueue(ErtsThrQ_t *q, void *data, ErtsThrQElement_t *this)
+{
+#ifndef USE_THREADS
+ ASSERT(data);
+
+ this->next.ptr = NULL;
+ this->data.ptr = data;
+
+ if (q->last)
+ q->last->next.ptr = this;
+ else {
+ q->first = q->last = this;
+ q->init.notify(q->init.arg);
+ }
+#else
+ int notify;
+ int um_refc_ix = 0;
+#ifdef ERTS_SMP
+ int unmanaged_thread;
+#endif
+
+#if ERTS_THR_Q_DBG_CHK_DATA
+ if (!data)
+ erl_exit(ERTS_ABORT_EXIT, "Missing data in enqueue\n");
+#endif
+
+ ASSERT(!q->q.finalizing);
+
+ this->data.ptr = data;
+
+#ifdef ERTS_SMP
+ unmanaged_thread = !erts_thr_progress_is_managed_thread();
+ if (unmanaged_thread)
+#endif
+ {
+ um_refc_ix = erts_atomic32_read_acqb(&q->tail.data.um_refc_ix);
+ while (1) {
+ int tmp_um_refc_ix;
+ erts_atomic_inc_acqb(&q->tail.data.um_refc[um_refc_ix]);
+ tmp_um_refc_ix = erts_atomic32_read_acqb(&q->tail.data.um_refc_ix);
+ if (tmp_um_refc_ix == um_refc_ix)
+ break;
+ erts_atomic_dec_relb(&q->tail.data.um_refc[um_refc_ix]);
+ um_refc_ix = tmp_um_refc_ix;
+ }
+ }
+
+ notify = this == enqueue_managed(q, this, 1);
+
+
+#ifdef ERTS_SMP
+ if (unmanaged_thread)
+#endif
+ {
+ if (notify)
+ erts_atomic_dec_relb(&q->tail.data.um_refc[um_refc_ix]);
+ else if (erts_atomic_dec_read_relb(&q->tail.data.um_refc[um_refc_ix]) == 0)
+ notify = 1;
+ }
+ if (notify)
+ q->tail.data.notify(q->tail.data.arg);
+#endif
+}
+
+void
+erts_thr_q_enqueue(ErtsThrQ_t *q, void *data)
+{
+ enqueue(q, data, element_alloc(q));
+}
+
+ErtsThrQPrepEnQ_t *
+erts_thr_q_prepare_enqueue(ErtsThrQ_t *q)
+{
+ return (ErtsThrQPrepEnQ_t *) element_alloc(q);
+}
+
+int
+erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *q, ErtsThrQFinDeQ_t *fdp)
+{
+#ifndef USE_THREADS
+ return 0;
+#else
+#ifdef DEBUG
+ if (!q->head.deq_fini.start) {
+ ASSERT(!q->head.deq_fini.end);
+ }
+ else {
+ ErtsThrQElement_t *e = q->head.deq_fini.start;
+ ErtsThrQElement_t *end = q->head.deq_fini.end;
+ while (e != end) {
+ ASSERT(q->head.head.ptr != e);
+ ASSERT(q->head.first != e);
+ ASSERT(q->head.unref_end != e);
+ e = e->next.ptr;
+ }
+ }
+#endif
+ fdp->start = q->head.deq_fini.start;
+ fdp->end = q->head.deq_fini.end;
+ if (fdp->end)
+ fdp->end->next.ptr = NULL;
+ q->head.deq_fini.start = NULL;
+ q->head.deq_fini.end = NULL;
+ return fdp->start != NULL;
+#endif
+}
+
+void
+erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *fdp0,
+ ErtsThrQFinDeQ_t *fdp1)
+{
+#ifdef USE_THREADS
+ if (fdp1->start) {
+ if (fdp0->end)
+ fdp0->end->next.ptr = fdp1->start;
+ else
+ fdp0->start = fdp1->start;
+ fdp0->end = fdp1->end;
+ }
+#endif
+}
+
+
+int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *state)
+{
+#ifdef USE_THREADS
+ ErtsThrQElement_t *start = state->start;
+ if (start) {
+ ErtsThrQLive_t live;
+ int i;
+ for (i = 0; i < ERTS_THR_Q_MAX_FINI_DEQ_OPS; i++) {
+ ErtsThrQElement_t *tmp;
+ if (!start)
+ break;
+ tmp = start;
+ start = start->next.ptr;
+ live = (ErtsThrQLive_t) (UWord) tmp->data.ptr;
+ element_live_free(live, tmp);
+ }
+ state->start = start;
+ if (start)
+ return 1; /* More to do */
+ state->end = NULL;
+ }
+#endif
+ return 0;
+}
+
+void
+erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *state)
+{
+#ifdef USE_THREADS
+ state->start = NULL;
+ state->end = NULL;
+#endif
+}
+
+
+void
+erts_thr_q_enqueue_prepared(ErtsThrQ_t *q, void *data, ErtsThrQPrepEnQ_t *prep)
+{
+ ASSERT(prep);
+ enqueue(q, data, (ErtsThrQElement_t *) prep);
+}
+
+void *
+erts_thr_q_dequeue(ErtsThrQ_t *q)
+{
+#ifndef USE_THREADS
+ void *res;
+ ErtsThrQElement_t *tmp;
+
+ if (!q->first)
+ return NULL;
+ tmp = q->first;
+ res = tmp->data.ptr;
+ q->first = tmp->next.ptr;
+ if (!q->first)
+ q->last = NULL;
+
+ element_free(q, tmp);
+
+ return res;
+#else
+ erts_aint_t inext;
+ void *res;
+
+ inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+ if (inext == ERTS_AINT_NULL)
+ return NULL;
+ q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext;
+ q->head.head.ptr = (ErtsThrQElement_t *) inext;
+ if (q->head.head.ptr == &q->tail.data.marker) {
+ inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+ if (inext == ERTS_AINT_NULL)
+ return NULL;
+ q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext;
+ q->head.head.ptr = (ErtsThrQElement_t *) inext;
+ }
+ res = q->head.head.ptr->data.ptr;
+#if ERTS_THR_Q_DBG_CHK_DATA
+ q->head.head.ptr->data.ptr = NULL;
+ if (!res)
+ erl_exit(ERTS_ABORT_EXIT, "Missing data in dequeue\n");
+#endif
+ clean(q,
+ (q->head.deq_fini.automatic
+ ? ERTS_THR_Q_MAX_DEQUEUE_CLEAN_OPS
+ : ERTS_THR_Q_MAX_SCHED_CLEAN_OPS), 1);
+ return res;
+#endif
+}
diff --git a/erts/emulator/beam/erl_thr_queue.h b/erts/emulator/beam/erl_thr_queue.h
new file mode 100644
index 0000000000..407c23f5eb
--- /dev/null
+++ b/erts/emulator/beam/erl_thr_queue.h
@@ -0,0 +1,211 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2011. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+/*
+ * Description: Lock-free queue for communication between threads.
+ *
+ * Currently only a many-to-one version has been,
+ * implemented, i.e., many threads can enqueue but
+ * only one thread can dequeue at a time. It doesn't
+ * have to be the same thread dequeuing every time, but
+ * synchronization so that only one thread dequeues
+ * at a time has to be provided by other means.
+ *
+ * When/If the need for a many-to-many queue arises,
+ * this implementation can relatively easy be extended
+ * to support that too.
+ *
+ * Usage instructions can be found in erts_thr_queue.c
+ *
+ * Author: Rickard Green
+ */
+
+#ifndef ERL_THR_QUEUE_H__
+#define ERL_THR_QUEUE_H__
+
+#include "sys.h"
+#include "erl_threads.h"
+#include "erl_alloc.h"
+#include "erl_thr_progress.h"
+
+typedef enum {
+ ERTS_THR_Q_LIVE_UNDEF,
+ ERTS_THR_Q_LIVE_SHORT,
+ ERTS_THR_Q_LIVE_LONG
+} ErtsThrQLive_t;
+
+#define ERTS_THR_Q_INIT_DEFAULT \
+{ \
+ { \
+ ERTS_THR_Q_LIVE_UNDEF, \
+ ERTS_THR_Q_LIVE_SHORT \
+ }, \
+ NULL, \
+ NULL, \
+ 1 \
+}
+
+typedef struct ErtsThrQ_t_ ErtsThrQ_t;
+
+typedef struct {
+ struct {
+ ErtsThrQLive_t queue;
+ ErtsThrQLive_t objects;
+ } live;
+ void *arg;
+ void (*notify)(void *);
+ int auto_finalize_dequeue;
+} ErtsThrQInit_t;
+
+typedef struct ErtsThrQElement_t_ ErtsThrQElement_t;
+typedef struct ErtsThrQElement_t ErtsThrQPrepEnQ_t;
+
+typedef union {
+ erts_atomic_t atmc;
+ ErtsThrQElement_t *ptr;
+} ErtsThrQPtr_t;
+
+struct ErtsThrQElement_t_ {
+ ErtsThrQPtr_t next;
+ union {
+ erts_atomic_t atmc;
+ void *ptr;
+ } data;
+};
+
+typedef struct {
+ ErtsThrQElement_t *start;
+ ErtsThrQElement_t *end;
+} ErtsThrQFinDeQ_t;
+
+typedef enum {
+ ERTS_THR_Q_CLEAN,
+#ifdef ERTS_SMP
+ ERTS_THR_Q_NEED_THR_PRGR,
+#endif
+ ERTS_THR_Q_DIRTY,
+} ErtsThrQCleanState_t;
+
+#ifdef USE_THREADS
+
+typedef struct {
+ ErtsThrQElement_t marker;
+ erts_atomic_t last;
+ erts_atomic_t um_refc[2];
+ erts_atomic32_t um_refc_ix;
+ ErtsThrQLive_t live;
+#ifdef ERTS_SMP
+ erts_atomic32_t thr_prgr_clean_scheduled;
+#endif
+ void *arg;
+ void (*notify)(void *);
+} ErtsThrQTail_t;
+
+struct ErtsThrQ_t_ {
+ /*
+ * This structure needs to be cache line aligned for best
+ * performance.
+ */
+ union {
+ /* Modified by threads enqueuing */
+ ErtsThrQTail_t data;
+ char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQTail_t))];
+ } tail;
+ /*
+ * Everything below this point is *only* accessed by the
+ * thread dequeuing.
+ */
+ struct {
+ ErtsThrQPtr_t head;
+ ErtsThrQLive_t live;
+ ErtsThrQElement_t *first;
+ ErtsThrQElement_t *unref_end;
+ int clean_reached_head_count;
+ struct {
+ int automatic;
+ ErtsThrQElement_t *start;
+ ErtsThrQElement_t *end;
+ } deq_fini;
+ struct {
+#ifdef ERTS_SMP
+ ErtsThrPrgrVal thr_progress;
+ int thr_progress_reached;
+#endif
+ int um_refc_ix;
+ ErtsThrQElement_t *unref_end;
+ } next;
+ int used_marker;
+ void *arg;
+ void (*notify)(void *);
+ } head;
+ struct {
+ int finalizing;
+ ErtsThrQLive_t live;
+ void *blk;
+ } q;
+};
+
+#else /* !USE_THREADS */
+
+struct ErtsThrQ_t_ {
+ ErtsThrQInit_t init;
+ ErtsThrQElement_t *first;
+ ErtsThrQElement_t *last;
+ struct {
+ void *blk;
+ } q;
+};
+
+#endif
+
+void erts_thr_q_init(void);
+void erts_thr_q_initialize(ErtsThrQ_t *, ErtsThrQInit_t *);
+ErtsThrQCleanState_t erts_thr_q_finalize(ErtsThrQ_t *);
+ErtsThrQ_t *erts_thr_q_create(ErtsThrQInit_t *);
+ErtsThrQCleanState_t erts_thr_q_destroy(ErtsThrQ_t *);
+ErtsThrQCleanState_t erts_thr_q_clean(ErtsThrQ_t *);
+ErtsThrQCleanState_t erts_thr_q_inspect(ErtsThrQ_t *, int);
+ErtsThrQPrepEnQ_t *erts_thr_q_prepare_enqueue(ErtsThrQ_t *);
+void erts_thr_q_enqueue_prepared(ErtsThrQ_t *, void *, ErtsThrQPrepEnQ_t *);
+void erts_thr_q_enqueue(ErtsThrQ_t *, void *);
+void * erts_thr_q_dequeue(ErtsThrQ_t *);
+int erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *,
+ ErtsThrQFinDeQ_t *);
+void erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *,
+ ErtsThrQFinDeQ_t *);
+int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *);
+void erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *);
+
+#ifdef ERTS_SMP
+ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_q_need_thr_progress(ErtsThrQ_t *q);
+#endif
+
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+#ifdef ERTS_SMP
+ERTS_GLB_INLINE ErtsThrPrgrVal
+erts_thr_q_need_thr_progress(ErtsThrQ_t *q)
+{
+ return q->head.next.thr_progress;
+}
+#endif
+
+#endif /* ERTS_GLB_INLINE_INCL_FUNC_DEF */
+
+#endif /* ERL_THR_QUEUE_H__ */
diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c
index cc377b543d..aa86f4590d 100644
--- a/erts/emulator/beam/utils.c
+++ b/erts/emulator/beam/utils.c
@@ -43,6 +43,7 @@
#include "erl_smp.h"
#include "erl_time.h"
#include "erl_thr_progress.h"
+#include "erl_thr_queue.h"
#include "erl_sched_spec_pre_alloc.h"
#undef M_TRIM_THRESHOLD