diff options
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r-- | erts/emulator/beam/erl_alloc.c | 5 | ||||
-rw-r--r-- | erts/emulator/beam/erl_alloc.types | 22 | ||||
-rw-r--r-- | erts/emulator/beam/erl_init.c | 2 | ||||
-rw-r--r-- | erts/emulator/beam/erl_thr_queue.c | 745 | ||||
-rw-r--r-- | erts/emulator/beam/erl_thr_queue.h | 211 | ||||
-rw-r--r-- | erts/emulator/beam/utils.c | 1 |
6 files changed, 986 insertions, 0 deletions
diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c index 705ace26fa..cce4b4adf0 100644 --- a/erts/emulator/beam/erl_alloc.c +++ b/erts/emulator/beam/erl_alloc.c @@ -41,6 +41,7 @@ #include "erl_monitors.h" #include "erl_bif_timer.h" #include "erl_cpu_topology.h" +#include "erl_thr_queue.h" #if defined(ERTS_ALC_T_DRV_SEL_D_STATE) || defined(ERTS_ALC_T_DRV_EV_D_STATE) #include "erl_check_io.h" #endif @@ -524,6 +525,10 @@ erts_alloc_init(int *argc, char **argv, ErtsAllocInitOpts *eaiop) = sizeof(ErtsDrvSelectDataState); fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_MSG_REF)] = sizeof(ErlMessage); +#ifdef ERTS_SMP + fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_THR_Q_EL_SL)] + = sizeof(ErtsThrQElement_t); +#endif #ifdef HARD_DEBUG hdbg_init(); #endif diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 79d3433fc0..4efad0197b 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -50,6 +50,15 @@ # command line argument to make_alloc_types. The variable X is false # after a "+disable X" statement or if it has never been mentioned. ++if smp ++disable threads_no_smp ++else ++if threads ++enable threads_no_smp ++else ++disable threads_no_smp ++endif ++endif # --- Allocator declarations ------------------------------------------------- # @@ -254,6 +263,19 @@ type ZLIB STANDARD SYSTEM zlib type CPU_GRPS_MAP LONG_LIVED SYSTEM cpu_groups_map type AUX_WORK_TMO LONG_LIVED SYSTEM aux_work_timeouts ++if threads_no_smp +# Need thread safe allocs, but std_alloc and fix_alloc are not; +# use driver_alloc which is... +type THR_Q_EL DRIVER SYSTEM thr_q_element +type THR_Q_EL_SL DRIVER SYSTEM sl_thr_q_element ++else +type THR_Q_EL STANDARD SYSTEM thr_q_element +type THR_Q_EL_SL FIXED_SIZE SYSTEM sl_thr_q_element ++endif +type THR_Q STANDARD SYSTEM thr_queue +type THR_Q_SL SHORT_LIVED SYSTEM short_lived_thr_queue +type THR_Q_LL LONG_LIVED SYSTEM long_lived_thr_queue + +if smp type ASYNC SHORT_LIVED SYSTEM async +else diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index f4f2a4d011..5fe44afdce 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -43,6 +43,7 @@ #include "packet_parser.h" #include "erl_cpu_topology.h" #include "erl_thr_progress.h" +#include "erl_thr_queue.h" #ifdef HIPE #include "hipe_mode_switch.h" /* for hipe_mode_switch_init() */ @@ -786,6 +787,7 @@ early_init(int *argc, char **argv) /* */ erts_thr_progress_init(no_schedulers, no_schedulers+1, 0); #endif + erts_thr_q_init(); erts_init_utils(); erts_early_init_cpu_topology(no_schedulers, &max_main_threads, diff --git a/erts/emulator/beam/erl_thr_queue.c b/erts/emulator/beam/erl_thr_queue.c new file mode 100644 index 0000000000..9ac4cd4b8e --- /dev/null +++ b/erts/emulator/beam/erl_thr_queue.c @@ -0,0 +1,745 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2011. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +/* + * Description: Lock-free queue for communication between threads. + * + * Currently only a many-to-one version has been, + * implemented, i.e., many threads can enqueue but + * only one thread can dequeue at a time. It doesn't + * have to be the same thread dequeuing every time, but + * synchronization so that only one thread dequeues + * at a time has to be provided by other means. + * + * When/If the need for a many-to-many queue arises, + * this implementation can relatively easy be extended + * to support that too. + * + * Usage instructions below. + * + * Author: Rickard Green + */ + +/* + * ------ Usage instructions ----------------------------------------------- + * + * Dequeuing generates garbage that needs to be cleaned up. + * erts_thr_q_dequeue() automatically cleans, but garbage may have to be + * cleaned up also when the queue is empty. This is done by calling + * erts_thr_q_clean(). In the SMP case thread progress may have to be made + * before cleaning can continue. If so, erts_thr_q_need_thr_progress() in + * combination with erts_thr_progress_wakeup() can be used in order to + * request a wakeup at appropriate time. + * + * Enqueuing implies memory allocation and dequeuing implies memory + * deallocation. Memory allocation can be moved to another more suitable + * thread using erts_thr_q_prepare_enqueue() together with + * erts_thr_q_enqueue_prepared() instead of using erts_thr_q_enqueue(). + * Memory deallocation can can be moved to another more suitable thread by + * disabling auto_finalize_dequeue when initializing the queue and then use + * erts_thr_q_get_finalize_dequeue_data() together + * erts_thr_q_finalize_dequeue() after dequeuing or cleaning. + * + * Ending the life of the queue using either erts_thr_q_destroy() + * or erts_thr_q_finalize() impies cleaning the queue. Both functions + * return the cleaning result and may have to be called multiple times + * until the queue is clean. Once one of these functions have been called + * enqueuing is not allowed. This has to be synchronized by the user. + * If auto_finalize_dequeue has been disabled, the finalize dequeue + * functionality has to be called after ending the life of the queue just + * as when dequeuing or cleaning on a queue that is alive. + * + * ------------------------------------------------------------------------- + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "erl_thr_queue.h" + +#if defined(DEBUG) +#define ERTS_THR_Q_DBG_CHK_DATA 1 +#else +#define ERTS_THR_Q_DBG_CHK_DATA 0 +#endif + +#define ERTS_THR_Q_MAX_CLEAN_REACHED_HEAD_COUNT 100 +#define ERTS_THR_Q_MAX_SCHED_CLEAN_OPS 50 +#define ERTS_THR_Q_MAX_DEQUEUE_CLEAN_OPS 3 + +#define ERTS_THR_Q_MAX_FINI_DEQ_OPS 50 + +#ifdef ERTS_SMP +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(sl_element, + ErtsThrQElement_t, + 1000, + ERTS_ALC_T_THR_Q_EL_SL) +#else + +static void +init_sl_element_alloc(void) +{ +} + +static ErtsThrQElement_t * +sl_element_alloc(void) +{ + return erts_alloc(ERTS_ALC_T_THR_Q_EL_SL, + sizeof(ErtsThrQElement_t)); +} + +static void +sl_element_free(ErtsThrQElement_t *p) +{ + erts_free(ERTS_ALC_T_THR_Q_EL_SL, p); +} + +#endif + +typedef union { + ErtsThrQ_t q; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQ_t))]; +} ErtsAlignedThrQ_t; + +void +erts_thr_q_init(void) +{ + init_sl_element_alloc(); +} + +static void noop_callback(void *arg) { } + +void +erts_thr_q_initialize(ErtsThrQ_t *q, ErtsThrQInit_t *qi) +{ +#ifndef USE_THREADS + q->init = *qi; + if (!q->init.notify) + q->init.notify = noop_callback; + q->first = NULL; + q->last = NULL; + q->q.blk = NULL; +#else + erts_atomic_init_nob(&q->tail.data.marker.next.atmc, ERTS_AINT_NULL); + q->tail.data.marker.data.ptr = NULL; + erts_atomic_init_nob(&q->tail.data.last, + (erts_aint_t) &q->tail.data.marker); + erts_atomic_init_nob(&q->tail.data.um_refc[0], 0); + erts_atomic_init_nob(&q->tail.data.um_refc[1], 0); + erts_atomic32_init_nob(&q->tail.data.um_refc_ix, 0); + q->tail.data.live = qi->live.objects; + q->tail.data.arg = qi->arg; + q->tail.data.notify = qi->notify; + if (!q->tail.data.notify) + q->tail.data.notify = noop_callback; + + q->head.head.ptr = &q->tail.data.marker; + q->head.live = qi->live.objects; + q->head.first = &q->tail.data.marker; + q->head.unref_end = &q->tail.data.marker; + q->head.clean_reached_head_count = 0; + q->head.deq_fini.automatic = qi->auto_finalize_dequeue; + q->head.deq_fini.start = NULL; + q->head.deq_fini.end = NULL; +#ifdef ERTS_SMP + q->head.next.thr_progress = erts_thr_progress_current(); + q->head.next.thr_progress_reached = 1; +#endif + q->head.next.um_refc_ix = 1; + q->head.next.unref_end = &q->tail.data.marker; + q->head.used_marker = 1; + q->head.arg = qi->arg; + q->head.notify = q->tail.data.notify; + q->q.finalizing = 0; + q->q.live = qi->live.queue; + q->q.blk = NULL; +#endif +} + +ErtsThrQCleanState_t +erts_thr_q_finalize(ErtsThrQ_t *q) +{ +#ifdef USE_THREADS + q->q.finalizing = 1; +#endif + while (erts_thr_q_dequeue(q)); + return erts_thr_q_clean(q); +} + +ErtsThrQ_t * +erts_thr_q_create(ErtsThrQInit_t *qi) +{ + ErtsAlcType_t atype; + ErtsThrQ_t *q, *qblk; + UWord qw; + + switch (qi->live.queue) { + case ERTS_THR_Q_LIVE_SHORT: + atype = ERTS_ALC_T_THR_Q_SL; + break; + case ERTS_THR_Q_LIVE_LONG: + atype = ERTS_ALC_T_THR_Q_LL; + break; + default: + atype = ERTS_ALC_T_THR_Q; + break; + } + + qw = (UWord) erts_alloc(atype, + sizeof(ErtsThrQ_t) + (ERTS_CACHE_LINE_SIZE-1)); + qblk = (ErtsThrQ_t *) qw; + if (qw & ERTS_CACHE_LINE_MASK) + qw = (qw & ~ERTS_CACHE_LINE_MASK) + ERTS_CACHE_LINE_SIZE; + ASSERT((qw & ERTS_CACHE_LINE_MASK) == 0); + q = (ErtsThrQ_t *) qw; + erts_thr_q_initialize(q, qi); + q->q.blk = qblk; + return q; +} + +ErtsThrQCleanState_t +erts_thr_q_destroy(ErtsThrQ_t *q) +{ + if (!q->q.blk) + erl_exit(ERTS_ABORT_EXIT, + "Trying to destroy not created thread queue\n"); + return erts_thr_q_finalize(q); +} + +#ifdef USE_THREADS + +static void +destroy(ErtsThrQ_t *q) +{ + ErtsAlcType_t atype; + switch (q->q.live) { + case ERTS_THR_Q_LIVE_SHORT: + atype = ERTS_ALC_T_THR_Q_SL; + break; + case ERTS_THR_Q_LIVE_LONG: + atype = ERTS_ALC_T_THR_Q_LL; + break; + default: + atype = ERTS_ALC_T_THR_Q; + break; + } + erts_free(atype, q->q.blk); +} + +#endif + +static ERTS_INLINE ErtsThrQElement_t * +element_live_alloc(ErtsThrQLive_t live) +{ + switch (live) { + case ERTS_THR_Q_LIVE_SHORT: + return sl_element_alloc(); + default: + return (ErtsThrQElement_t *) erts_alloc(ERTS_ALC_T_THR_Q_EL, + sizeof(ErtsThrQElement_t)); + } +} + +static ERTS_INLINE ErtsThrQElement_t * +element_alloc(ErtsThrQ_t *q) +{ + ErtsThrQLive_t live; +#ifdef USE_THREADS + live = q->tail.data.live; +#else + live = q->init.live.objects; +#endif + return element_live_alloc(live); +} + +static ERTS_INLINE void +element_live_free(ErtsThrQLive_t live, ErtsThrQElement_t *el) +{ + switch (live) { + case ERTS_THR_Q_LIVE_SHORT: + sl_element_free(el); + break; + default: + erts_free(ERTS_ALC_T_THR_Q_EL, el); + } +} + +static ERTS_INLINE void +element_free(ErtsThrQ_t *q, ErtsThrQElement_t *el) +{ + ErtsThrQLive_t live; +#ifdef USE_THREADS + live = q->head.live; +#else + live = q->init.live.objects; +#endif + element_live_free(live, el); +} + +#ifdef USE_THREADS + +static ERTS_INLINE ErtsThrQElement_t * +enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last) +{ + erts_aint_t ilast, itmp; + + erts_atomic_init_nob(&this->next.atmc, ERTS_AINT_NULL); + /* Enqueue at end of list... */ + + ilast = erts_atomic_read_nob(&q->tail.data.last); + while (1) { + ErtsThrQElement_t *last = (ErtsThrQElement_t *) ilast; + itmp = erts_atomic_cmpxchg_mb(&last->next.atmc, + (erts_aint_t) this, + ERTS_AINT_NULL); + if (itmp == ERTS_AINT_NULL) + break; + ilast = itmp; + } + + /* Move last pointer forward... */ + while (1) { + if (want_last) { + if (erts_atomic_read_rb(&this->next.atmc) != ERTS_AINT_NULL) { + /* Someone else will move it forward */ + ilast = erts_atomic_read_rb(&q->tail.data.last); + return (ErtsThrQElement_t *) ilast; + } + } + else { + if (erts_atomic_read_nob(&this->next.atmc) != ERTS_AINT_NULL) { + /* Someone else will move it forward */ + return NULL; + } + } + itmp = erts_atomic_cmpxchg_mb(&q->tail.data.last, + (erts_aint_t) this, + ilast); + if (ilast == itmp) + return want_last ? this : NULL; + ilast = itmp; + } +} + +static ErtsThrQCleanState_t +clean(ErtsThrQ_t *q, int max_ops, int do_notify) +{ + erts_aint_t ilast; + int um_refc_ix; + int ops; + + for (ops = 0; ops < max_ops; ops++) { + ErtsThrQElement_t *tmp; + restart: + ASSERT(q->head.first); + if (q->head.first == q->head.head.ptr) { + q->head.clean_reached_head_count++; + if (q->head.clean_reached_head_count + >= ERTS_THR_Q_MAX_CLEAN_REACHED_HEAD_COUNT) { + q->head.clean_reached_head_count = 0; + break; + } + goto inspect_head; + } + if (q->head.first == q->head.unref_end) + break; + if (q->head.first == &q->tail.data.marker) { + q->head.used_marker = 0; + q->head.first = q->head.first->next.ptr; + goto restart; + } + tmp = q->head.first; + q->head.first = q->head.first->next.ptr; + if (q->head.deq_fini.automatic) + element_free(q, tmp); + else { + tmp->data.ptr = (void *) (UWord) q->head.live; + if (!q->head.deq_fini.start) + q->head.deq_fini.start = tmp; + else if (q->head.deq_fini.end->next.ptr == &q->tail.data.marker) + q->head.deq_fini.end->next.ptr = tmp; + q->head.deq_fini.end = tmp; + } + } + + ilast = erts_atomic_read_nob(&q->tail.data.last); + if (q->head.first == ((ErtsThrQElement_t *) ilast) + && ((ErtsThrQElement_t *) ilast) == &q->tail.data.marker + && q->head.first == &q->tail.data.marker) { + /* Empty and clean queue */ + if (q->q.finalizing) + destroy(q); + return ERTS_THR_Q_CLEAN; + } + +#ifdef ERTS_SMP + if (q->head.next.thr_progress_reached + || erts_thr_progress_has_reached(q->head.next.thr_progress)) { + q->head.next.thr_progress_reached = 1; +#endif + um_refc_ix = q->head.next.um_refc_ix; + if (erts_atomic_read_acqb(&q->tail.data.um_refc[um_refc_ix]) == 0) { + /* Move unreferenced end pointer forward... */ + q->head.clean_reached_head_count = 0; + q->head.unref_end = q->head.next.unref_end; + + if (!q->head.used_marker + && q->head.unref_end == (ErtsThrQElement_t *) ilast) { + q->head.used_marker = 1; + ilast = (erts_aint_t) enqueue_managed(q, + &q->tail.data.marker, + 1); + if (q->head.head.ptr == q->head.unref_end) { + ErtsThrQElement_t *next; + next = ((ErtsThrQElement_t *) + erts_atomic_read_acqb(&q->head.head.ptr->next.atmc)); + if (next == &q->tail.data.marker) { + q->head.head.ptr->next.ptr = &q->tail.data.marker; + q->head.head.ptr = &q->tail.data.marker; + } + } + } + + if (q->head.unref_end == (ErtsThrQElement_t *) ilast) + ERTS_THR_MEMORY_BARRIER; + else { + q->head.next.unref_end = (ErtsThrQElement_t *) ilast; + ERTS_THR_MEMORY_BARRIER; +#ifdef ERTS_SMP + q->head.next.thr_progress = erts_thr_progress_later(); +#endif + erts_atomic32_set_relb(&q->tail.data.um_refc_ix, + um_refc_ix); + q->head.next.um_refc_ix = um_refc_ix == 0 ? 1 : 0; +#ifdef ERTS_SMP + q->head.next.thr_progress_reached = 0; +#endif + } + } +#ifdef ERTS_SMP + } +#endif + + if (q->head.first == q->head.head.ptr) { + inspect_head: + if (!q->head.used_marker) { + erts_aint_t inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) { + q->head.used_marker = 1; + (void) enqueue_managed(q, &q->tail.data.marker, 0); + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == (erts_aint_t) &q->tail.data.marker) { + q->head.head.ptr->next.ptr = &q->tail.data.marker; + q->head.head.ptr = &q->tail.data.marker; +#ifdef ERTS_SMP + if (!q->head.next.thr_progress_reached) + return ERTS_THR_Q_NEED_THR_PRGR; +#else + if (do_notify) + q->head.notify(q->head.arg); +#endif + return ERTS_THR_Q_DIRTY; + } + } + } + return ERTS_THR_Q_CLEAN; + } + + if (q->head.first != q->head.unref_end) { + if (do_notify) + q->head.notify(q->head.arg); + return ERTS_THR_Q_DIRTY; + } + +#ifdef ERTS_SMP + if (!q->head.next.thr_progress_reached) + return ERTS_THR_Q_NEED_THR_PRGR; +#endif + + return ERTS_THR_Q_CLEAN; /* Waiting for unmanaged threads to complete... */ +} + +#endif + +ErtsThrQCleanState_t +erts_thr_q_clean(ErtsThrQ_t *q) +{ +#ifdef USE_THREADS + return clean(q, ERTS_THR_Q_MAX_SCHED_CLEAN_OPS, 0); +#else + return ERTS_THR_Q_CLEAN; +#endif +} + +ErtsThrQCleanState_t +erts_thr_q_inspect(ErtsThrQ_t *q, int ensure_empty) +{ +#ifdef USE_THREADS + if (ensure_empty) { + erts_aint_t inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext != ERTS_AINT_NULL) { + if (&q->tail.data.marker != (ErtsThrQElement_t *) inext) + return ERTS_THR_Q_DIRTY; + else { + q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; + q->head.head.ptr = (ErtsThrQElement_t *) inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext != ERTS_AINT_NULL) + return ERTS_THR_Q_DIRTY; + } + } + } + + if (q->head.first == q->head.head.ptr) { + if (!q->head.used_marker) { + erts_aint_t inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) + return ERTS_THR_Q_DIRTY; + } + return ERTS_THR_Q_CLEAN; + } + + if (q->head.first != q->head.unref_end) + return ERTS_THR_Q_DIRTY; + +#ifdef ERTS_SMP + if (!q->head.next.thr_progress_reached) + return ERTS_THR_Q_NEED_THR_PRGR; +#endif +#endif + return ERTS_THR_Q_CLEAN; +} + +static void +enqueue(ErtsThrQ_t *q, void *data, ErtsThrQElement_t *this) +{ +#ifndef USE_THREADS + ASSERT(data); + + this->next.ptr = NULL; + this->data.ptr = data; + + if (q->last) + q->last->next.ptr = this; + else { + q->first = q->last = this; + q->init.notify(q->init.arg); + } +#else + int notify; + int um_refc_ix = 0; +#ifdef ERTS_SMP + int unmanaged_thread; +#endif + +#if ERTS_THR_Q_DBG_CHK_DATA + if (!data) + erl_exit(ERTS_ABORT_EXIT, "Missing data in enqueue\n"); +#endif + + ASSERT(!q->q.finalizing); + + this->data.ptr = data; + +#ifdef ERTS_SMP + unmanaged_thread = !erts_thr_progress_is_managed_thread(); + if (unmanaged_thread) +#endif + { + um_refc_ix = erts_atomic32_read_acqb(&q->tail.data.um_refc_ix); + while (1) { + int tmp_um_refc_ix; + erts_atomic_inc_acqb(&q->tail.data.um_refc[um_refc_ix]); + tmp_um_refc_ix = erts_atomic32_read_acqb(&q->tail.data.um_refc_ix); + if (tmp_um_refc_ix == um_refc_ix) + break; + erts_atomic_dec_relb(&q->tail.data.um_refc[um_refc_ix]); + um_refc_ix = tmp_um_refc_ix; + } + } + + notify = this == enqueue_managed(q, this, 1); + + +#ifdef ERTS_SMP + if (unmanaged_thread) +#endif + { + if (notify) + erts_atomic_dec_relb(&q->tail.data.um_refc[um_refc_ix]); + else if (erts_atomic_dec_read_relb(&q->tail.data.um_refc[um_refc_ix]) == 0) + notify = 1; + } + if (notify) + q->tail.data.notify(q->tail.data.arg); +#endif +} + +void +erts_thr_q_enqueue(ErtsThrQ_t *q, void *data) +{ + enqueue(q, data, element_alloc(q)); +} + +ErtsThrQPrepEnQ_t * +erts_thr_q_prepare_enqueue(ErtsThrQ_t *q) +{ + return (ErtsThrQPrepEnQ_t *) element_alloc(q); +} + +int +erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *q, ErtsThrQFinDeQ_t *fdp) +{ +#ifndef USE_THREADS + return 0; +#else +#ifdef DEBUG + if (!q->head.deq_fini.start) { + ASSERT(!q->head.deq_fini.end); + } + else { + ErtsThrQElement_t *e = q->head.deq_fini.start; + ErtsThrQElement_t *end = q->head.deq_fini.end; + while (e != end) { + ASSERT(q->head.head.ptr != e); + ASSERT(q->head.first != e); + ASSERT(q->head.unref_end != e); + e = e->next.ptr; + } + } +#endif + fdp->start = q->head.deq_fini.start; + fdp->end = q->head.deq_fini.end; + if (fdp->end) + fdp->end->next.ptr = NULL; + q->head.deq_fini.start = NULL; + q->head.deq_fini.end = NULL; + return fdp->start != NULL; +#endif +} + +void +erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *fdp0, + ErtsThrQFinDeQ_t *fdp1) +{ +#ifdef USE_THREADS + if (fdp1->start) { + if (fdp0->end) + fdp0->end->next.ptr = fdp1->start; + else + fdp0->start = fdp1->start; + fdp0->end = fdp1->end; + } +#endif +} + + +int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *state) +{ +#ifdef USE_THREADS + ErtsThrQElement_t *start = state->start; + if (start) { + ErtsThrQLive_t live; + int i; + for (i = 0; i < ERTS_THR_Q_MAX_FINI_DEQ_OPS; i++) { + ErtsThrQElement_t *tmp; + if (!start) + break; + tmp = start; + start = start->next.ptr; + live = (ErtsThrQLive_t) (UWord) tmp->data.ptr; + element_live_free(live, tmp); + } + state->start = start; + if (start) + return 1; /* More to do */ + state->end = NULL; + } +#endif + return 0; +} + +void +erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *state) +{ +#ifdef USE_THREADS + state->start = NULL; + state->end = NULL; +#endif +} + + +void +erts_thr_q_enqueue_prepared(ErtsThrQ_t *q, void *data, ErtsThrQPrepEnQ_t *prep) +{ + ASSERT(prep); + enqueue(q, data, (ErtsThrQElement_t *) prep); +} + +void * +erts_thr_q_dequeue(ErtsThrQ_t *q) +{ +#ifndef USE_THREADS + void *res; + ErtsThrQElement_t *tmp; + + if (!q->first) + return NULL; + tmp = q->first; + res = tmp->data.ptr; + q->first = tmp->next.ptr; + if (!q->first) + q->last = NULL; + + element_free(q, tmp); + + return res; +#else + erts_aint_t inext; + void *res; + + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) + return NULL; + q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; + q->head.head.ptr = (ErtsThrQElement_t *) inext; + if (q->head.head.ptr == &q->tail.data.marker) { + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) + return NULL; + q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; + q->head.head.ptr = (ErtsThrQElement_t *) inext; + } + res = q->head.head.ptr->data.ptr; +#if ERTS_THR_Q_DBG_CHK_DATA + q->head.head.ptr->data.ptr = NULL; + if (!res) + erl_exit(ERTS_ABORT_EXIT, "Missing data in dequeue\n"); +#endif + clean(q, + (q->head.deq_fini.automatic + ? ERTS_THR_Q_MAX_DEQUEUE_CLEAN_OPS + : ERTS_THR_Q_MAX_SCHED_CLEAN_OPS), 1); + return res; +#endif +} diff --git a/erts/emulator/beam/erl_thr_queue.h b/erts/emulator/beam/erl_thr_queue.h new file mode 100644 index 0000000000..407c23f5eb --- /dev/null +++ b/erts/emulator/beam/erl_thr_queue.h @@ -0,0 +1,211 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2011. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +/* + * Description: Lock-free queue for communication between threads. + * + * Currently only a many-to-one version has been, + * implemented, i.e., many threads can enqueue but + * only one thread can dequeue at a time. It doesn't + * have to be the same thread dequeuing every time, but + * synchronization so that only one thread dequeues + * at a time has to be provided by other means. + * + * When/If the need for a many-to-many queue arises, + * this implementation can relatively easy be extended + * to support that too. + * + * Usage instructions can be found in erts_thr_queue.c + * + * Author: Rickard Green + */ + +#ifndef ERL_THR_QUEUE_H__ +#define ERL_THR_QUEUE_H__ + +#include "sys.h" +#include "erl_threads.h" +#include "erl_alloc.h" +#include "erl_thr_progress.h" + +typedef enum { + ERTS_THR_Q_LIVE_UNDEF, + ERTS_THR_Q_LIVE_SHORT, + ERTS_THR_Q_LIVE_LONG +} ErtsThrQLive_t; + +#define ERTS_THR_Q_INIT_DEFAULT \ +{ \ + { \ + ERTS_THR_Q_LIVE_UNDEF, \ + ERTS_THR_Q_LIVE_SHORT \ + }, \ + NULL, \ + NULL, \ + 1 \ +} + +typedef struct ErtsThrQ_t_ ErtsThrQ_t; + +typedef struct { + struct { + ErtsThrQLive_t queue; + ErtsThrQLive_t objects; + } live; + void *arg; + void (*notify)(void *); + int auto_finalize_dequeue; +} ErtsThrQInit_t; + +typedef struct ErtsThrQElement_t_ ErtsThrQElement_t; +typedef struct ErtsThrQElement_t ErtsThrQPrepEnQ_t; + +typedef union { + erts_atomic_t atmc; + ErtsThrQElement_t *ptr; +} ErtsThrQPtr_t; + +struct ErtsThrQElement_t_ { + ErtsThrQPtr_t next; + union { + erts_atomic_t atmc; + void *ptr; + } data; +}; + +typedef struct { + ErtsThrQElement_t *start; + ErtsThrQElement_t *end; +} ErtsThrQFinDeQ_t; + +typedef enum { + ERTS_THR_Q_CLEAN, +#ifdef ERTS_SMP + ERTS_THR_Q_NEED_THR_PRGR, +#endif + ERTS_THR_Q_DIRTY, +} ErtsThrQCleanState_t; + +#ifdef USE_THREADS + +typedef struct { + ErtsThrQElement_t marker; + erts_atomic_t last; + erts_atomic_t um_refc[2]; + erts_atomic32_t um_refc_ix; + ErtsThrQLive_t live; +#ifdef ERTS_SMP + erts_atomic32_t thr_prgr_clean_scheduled; +#endif + void *arg; + void (*notify)(void *); +} ErtsThrQTail_t; + +struct ErtsThrQ_t_ { + /* + * This structure needs to be cache line aligned for best + * performance. + */ + union { + /* Modified by threads enqueuing */ + ErtsThrQTail_t data; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQTail_t))]; + } tail; + /* + * Everything below this point is *only* accessed by the + * thread dequeuing. + */ + struct { + ErtsThrQPtr_t head; + ErtsThrQLive_t live; + ErtsThrQElement_t *first; + ErtsThrQElement_t *unref_end; + int clean_reached_head_count; + struct { + int automatic; + ErtsThrQElement_t *start; + ErtsThrQElement_t *end; + } deq_fini; + struct { +#ifdef ERTS_SMP + ErtsThrPrgrVal thr_progress; + int thr_progress_reached; +#endif + int um_refc_ix; + ErtsThrQElement_t *unref_end; + } next; + int used_marker; + void *arg; + void (*notify)(void *); + } head; + struct { + int finalizing; + ErtsThrQLive_t live; + void *blk; + } q; +}; + +#else /* !USE_THREADS */ + +struct ErtsThrQ_t_ { + ErtsThrQInit_t init; + ErtsThrQElement_t *first; + ErtsThrQElement_t *last; + struct { + void *blk; + } q; +}; + +#endif + +void erts_thr_q_init(void); +void erts_thr_q_initialize(ErtsThrQ_t *, ErtsThrQInit_t *); +ErtsThrQCleanState_t erts_thr_q_finalize(ErtsThrQ_t *); +ErtsThrQ_t *erts_thr_q_create(ErtsThrQInit_t *); +ErtsThrQCleanState_t erts_thr_q_destroy(ErtsThrQ_t *); +ErtsThrQCleanState_t erts_thr_q_clean(ErtsThrQ_t *); +ErtsThrQCleanState_t erts_thr_q_inspect(ErtsThrQ_t *, int); +ErtsThrQPrepEnQ_t *erts_thr_q_prepare_enqueue(ErtsThrQ_t *); +void erts_thr_q_enqueue_prepared(ErtsThrQ_t *, void *, ErtsThrQPrepEnQ_t *); +void erts_thr_q_enqueue(ErtsThrQ_t *, void *); +void * erts_thr_q_dequeue(ErtsThrQ_t *); +int erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *, + ErtsThrQFinDeQ_t *); +void erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *, + ErtsThrQFinDeQ_t *); +int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *); +void erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *); + +#ifdef ERTS_SMP +ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_q_need_thr_progress(ErtsThrQ_t *q); +#endif + +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +#ifdef ERTS_SMP +ERTS_GLB_INLINE ErtsThrPrgrVal +erts_thr_q_need_thr_progress(ErtsThrQ_t *q) +{ + return q->head.next.thr_progress; +} +#endif + +#endif /* ERTS_GLB_INLINE_INCL_FUNC_DEF */ + +#endif /* ERL_THR_QUEUE_H__ */ diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c index cc377b543d..aa86f4590d 100644 --- a/erts/emulator/beam/utils.c +++ b/erts/emulator/beam/utils.c @@ -43,6 +43,7 @@ #include "erl_smp.h" #include "erl_time.h" #include "erl_thr_progress.h" +#include "erl_thr_queue.h" #include "erl_sched_spec_pre_alloc.h" #undef M_TRIM_THRESHOLD |