diff options
Diffstat (limited to 'erts/emulator/beam/erl_thr_queue.c')
-rw-r--r-- | erts/emulator/beam/erl_thr_queue.c | 745 |
1 files changed, 745 insertions, 0 deletions
diff --git a/erts/emulator/beam/erl_thr_queue.c b/erts/emulator/beam/erl_thr_queue.c new file mode 100644 index 0000000000..9ac4cd4b8e --- /dev/null +++ b/erts/emulator/beam/erl_thr_queue.c @@ -0,0 +1,745 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2011. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +/* + * Description: Lock-free queue for communication between threads. + * + * Currently only a many-to-one version has been, + * implemented, i.e., many threads can enqueue but + * only one thread can dequeue at a time. It doesn't + * have to be the same thread dequeuing every time, but + * synchronization so that only one thread dequeues + * at a time has to be provided by other means. + * + * When/If the need for a many-to-many queue arises, + * this implementation can relatively easy be extended + * to support that too. + * + * Usage instructions below. + * + * Author: Rickard Green + */ + +/* + * ------ Usage instructions ----------------------------------------------- + * + * Dequeuing generates garbage that needs to be cleaned up. + * erts_thr_q_dequeue() automatically cleans, but garbage may have to be + * cleaned up also when the queue is empty. This is done by calling + * erts_thr_q_clean(). In the SMP case thread progress may have to be made + * before cleaning can continue. If so, erts_thr_q_need_thr_progress() in + * combination with erts_thr_progress_wakeup() can be used in order to + * request a wakeup at appropriate time. + * + * Enqueuing implies memory allocation and dequeuing implies memory + * deallocation. Memory allocation can be moved to another more suitable + * thread using erts_thr_q_prepare_enqueue() together with + * erts_thr_q_enqueue_prepared() instead of using erts_thr_q_enqueue(). + * Memory deallocation can can be moved to another more suitable thread by + * disabling auto_finalize_dequeue when initializing the queue and then use + * erts_thr_q_get_finalize_dequeue_data() together + * erts_thr_q_finalize_dequeue() after dequeuing or cleaning. + * + * Ending the life of the queue using either erts_thr_q_destroy() + * or erts_thr_q_finalize() impies cleaning the queue. Both functions + * return the cleaning result and may have to be called multiple times + * until the queue is clean. Once one of these functions have been called + * enqueuing is not allowed. This has to be synchronized by the user. + * If auto_finalize_dequeue has been disabled, the finalize dequeue + * functionality has to be called after ending the life of the queue just + * as when dequeuing or cleaning on a queue that is alive. + * + * ------------------------------------------------------------------------- + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "erl_thr_queue.h" + +#if defined(DEBUG) +#define ERTS_THR_Q_DBG_CHK_DATA 1 +#else +#define ERTS_THR_Q_DBG_CHK_DATA 0 +#endif + +#define ERTS_THR_Q_MAX_CLEAN_REACHED_HEAD_COUNT 100 +#define ERTS_THR_Q_MAX_SCHED_CLEAN_OPS 50 +#define ERTS_THR_Q_MAX_DEQUEUE_CLEAN_OPS 3 + +#define ERTS_THR_Q_MAX_FINI_DEQ_OPS 50 + +#ifdef ERTS_SMP +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(sl_element, + ErtsThrQElement_t, + 1000, + ERTS_ALC_T_THR_Q_EL_SL) +#else + +static void +init_sl_element_alloc(void) +{ +} + +static ErtsThrQElement_t * +sl_element_alloc(void) +{ + return erts_alloc(ERTS_ALC_T_THR_Q_EL_SL, + sizeof(ErtsThrQElement_t)); +} + +static void +sl_element_free(ErtsThrQElement_t *p) +{ + erts_free(ERTS_ALC_T_THR_Q_EL_SL, p); +} + +#endif + +typedef union { + ErtsThrQ_t q; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQ_t))]; +} ErtsAlignedThrQ_t; + +void +erts_thr_q_init(void) +{ + init_sl_element_alloc(); +} + +static void noop_callback(void *arg) { } + +void +erts_thr_q_initialize(ErtsThrQ_t *q, ErtsThrQInit_t *qi) +{ +#ifndef USE_THREADS + q->init = *qi; + if (!q->init.notify) + q->init.notify = noop_callback; + q->first = NULL; + q->last = NULL; + q->q.blk = NULL; +#else + erts_atomic_init_nob(&q->tail.data.marker.next.atmc, ERTS_AINT_NULL); + q->tail.data.marker.data.ptr = NULL; + erts_atomic_init_nob(&q->tail.data.last, + (erts_aint_t) &q->tail.data.marker); + erts_atomic_init_nob(&q->tail.data.um_refc[0], 0); + erts_atomic_init_nob(&q->tail.data.um_refc[1], 0); + erts_atomic32_init_nob(&q->tail.data.um_refc_ix, 0); + q->tail.data.live = qi->live.objects; + q->tail.data.arg = qi->arg; + q->tail.data.notify = qi->notify; + if (!q->tail.data.notify) + q->tail.data.notify = noop_callback; + + q->head.head.ptr = &q->tail.data.marker; + q->head.live = qi->live.objects; + q->head.first = &q->tail.data.marker; + q->head.unref_end = &q->tail.data.marker; + q->head.clean_reached_head_count = 0; + q->head.deq_fini.automatic = qi->auto_finalize_dequeue; + q->head.deq_fini.start = NULL; + q->head.deq_fini.end = NULL; +#ifdef ERTS_SMP + q->head.next.thr_progress = erts_thr_progress_current(); + q->head.next.thr_progress_reached = 1; +#endif + q->head.next.um_refc_ix = 1; + q->head.next.unref_end = &q->tail.data.marker; + q->head.used_marker = 1; + q->head.arg = qi->arg; + q->head.notify = q->tail.data.notify; + q->q.finalizing = 0; + q->q.live = qi->live.queue; + q->q.blk = NULL; +#endif +} + +ErtsThrQCleanState_t +erts_thr_q_finalize(ErtsThrQ_t *q) +{ +#ifdef USE_THREADS + q->q.finalizing = 1; +#endif + while (erts_thr_q_dequeue(q)); + return erts_thr_q_clean(q); +} + +ErtsThrQ_t * +erts_thr_q_create(ErtsThrQInit_t *qi) +{ + ErtsAlcType_t atype; + ErtsThrQ_t *q, *qblk; + UWord qw; + + switch (qi->live.queue) { + case ERTS_THR_Q_LIVE_SHORT: + atype = ERTS_ALC_T_THR_Q_SL; + break; + case ERTS_THR_Q_LIVE_LONG: + atype = ERTS_ALC_T_THR_Q_LL; + break; + default: + atype = ERTS_ALC_T_THR_Q; + break; + } + + qw = (UWord) erts_alloc(atype, + sizeof(ErtsThrQ_t) + (ERTS_CACHE_LINE_SIZE-1)); + qblk = (ErtsThrQ_t *) qw; + if (qw & ERTS_CACHE_LINE_MASK) + qw = (qw & ~ERTS_CACHE_LINE_MASK) + ERTS_CACHE_LINE_SIZE; + ASSERT((qw & ERTS_CACHE_LINE_MASK) == 0); + q = (ErtsThrQ_t *) qw; + erts_thr_q_initialize(q, qi); + q->q.blk = qblk; + return q; +} + +ErtsThrQCleanState_t +erts_thr_q_destroy(ErtsThrQ_t *q) +{ + if (!q->q.blk) + erl_exit(ERTS_ABORT_EXIT, + "Trying to destroy not created thread queue\n"); + return erts_thr_q_finalize(q); +} + +#ifdef USE_THREADS + +static void +destroy(ErtsThrQ_t *q) +{ + ErtsAlcType_t atype; + switch (q->q.live) { + case ERTS_THR_Q_LIVE_SHORT: + atype = ERTS_ALC_T_THR_Q_SL; + break; + case ERTS_THR_Q_LIVE_LONG: + atype = ERTS_ALC_T_THR_Q_LL; + break; + default: + atype = ERTS_ALC_T_THR_Q; + break; + } + erts_free(atype, q->q.blk); +} + +#endif + +static ERTS_INLINE ErtsThrQElement_t * +element_live_alloc(ErtsThrQLive_t live) +{ + switch (live) { + case ERTS_THR_Q_LIVE_SHORT: + return sl_element_alloc(); + default: + return (ErtsThrQElement_t *) erts_alloc(ERTS_ALC_T_THR_Q_EL, + sizeof(ErtsThrQElement_t)); + } +} + +static ERTS_INLINE ErtsThrQElement_t * +element_alloc(ErtsThrQ_t *q) +{ + ErtsThrQLive_t live; +#ifdef USE_THREADS + live = q->tail.data.live; +#else + live = q->init.live.objects; +#endif + return element_live_alloc(live); +} + +static ERTS_INLINE void +element_live_free(ErtsThrQLive_t live, ErtsThrQElement_t *el) +{ + switch (live) { + case ERTS_THR_Q_LIVE_SHORT: + sl_element_free(el); + break; + default: + erts_free(ERTS_ALC_T_THR_Q_EL, el); + } +} + +static ERTS_INLINE void +element_free(ErtsThrQ_t *q, ErtsThrQElement_t *el) +{ + ErtsThrQLive_t live; +#ifdef USE_THREADS + live = q->head.live; +#else + live = q->init.live.objects; +#endif + element_live_free(live, el); +} + +#ifdef USE_THREADS + +static ERTS_INLINE ErtsThrQElement_t * +enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last) +{ + erts_aint_t ilast, itmp; + + erts_atomic_init_nob(&this->next.atmc, ERTS_AINT_NULL); + /* Enqueue at end of list... */ + + ilast = erts_atomic_read_nob(&q->tail.data.last); + while (1) { + ErtsThrQElement_t *last = (ErtsThrQElement_t *) ilast; + itmp = erts_atomic_cmpxchg_mb(&last->next.atmc, + (erts_aint_t) this, + ERTS_AINT_NULL); + if (itmp == ERTS_AINT_NULL) + break; + ilast = itmp; + } + + /* Move last pointer forward... */ + while (1) { + if (want_last) { + if (erts_atomic_read_rb(&this->next.atmc) != ERTS_AINT_NULL) { + /* Someone else will move it forward */ + ilast = erts_atomic_read_rb(&q->tail.data.last); + return (ErtsThrQElement_t *) ilast; + } + } + else { + if (erts_atomic_read_nob(&this->next.atmc) != ERTS_AINT_NULL) { + /* Someone else will move it forward */ + return NULL; + } + } + itmp = erts_atomic_cmpxchg_mb(&q->tail.data.last, + (erts_aint_t) this, + ilast); + if (ilast == itmp) + return want_last ? this : NULL; + ilast = itmp; + } +} + +static ErtsThrQCleanState_t +clean(ErtsThrQ_t *q, int max_ops, int do_notify) +{ + erts_aint_t ilast; + int um_refc_ix; + int ops; + + for (ops = 0; ops < max_ops; ops++) { + ErtsThrQElement_t *tmp; + restart: + ASSERT(q->head.first); + if (q->head.first == q->head.head.ptr) { + q->head.clean_reached_head_count++; + if (q->head.clean_reached_head_count + >= ERTS_THR_Q_MAX_CLEAN_REACHED_HEAD_COUNT) { + q->head.clean_reached_head_count = 0; + break; + } + goto inspect_head; + } + if (q->head.first == q->head.unref_end) + break; + if (q->head.first == &q->tail.data.marker) { + q->head.used_marker = 0; + q->head.first = q->head.first->next.ptr; + goto restart; + } + tmp = q->head.first; + q->head.first = q->head.first->next.ptr; + if (q->head.deq_fini.automatic) + element_free(q, tmp); + else { + tmp->data.ptr = (void *) (UWord) q->head.live; + if (!q->head.deq_fini.start) + q->head.deq_fini.start = tmp; + else if (q->head.deq_fini.end->next.ptr == &q->tail.data.marker) + q->head.deq_fini.end->next.ptr = tmp; + q->head.deq_fini.end = tmp; + } + } + + ilast = erts_atomic_read_nob(&q->tail.data.last); + if (q->head.first == ((ErtsThrQElement_t *) ilast) + && ((ErtsThrQElement_t *) ilast) == &q->tail.data.marker + && q->head.first == &q->tail.data.marker) { + /* Empty and clean queue */ + if (q->q.finalizing) + destroy(q); + return ERTS_THR_Q_CLEAN; + } + +#ifdef ERTS_SMP + if (q->head.next.thr_progress_reached + || erts_thr_progress_has_reached(q->head.next.thr_progress)) { + q->head.next.thr_progress_reached = 1; +#endif + um_refc_ix = q->head.next.um_refc_ix; + if (erts_atomic_read_acqb(&q->tail.data.um_refc[um_refc_ix]) == 0) { + /* Move unreferenced end pointer forward... */ + q->head.clean_reached_head_count = 0; + q->head.unref_end = q->head.next.unref_end; + + if (!q->head.used_marker + && q->head.unref_end == (ErtsThrQElement_t *) ilast) { + q->head.used_marker = 1; + ilast = (erts_aint_t) enqueue_managed(q, + &q->tail.data.marker, + 1); + if (q->head.head.ptr == q->head.unref_end) { + ErtsThrQElement_t *next; + next = ((ErtsThrQElement_t *) + erts_atomic_read_acqb(&q->head.head.ptr->next.atmc)); + if (next == &q->tail.data.marker) { + q->head.head.ptr->next.ptr = &q->tail.data.marker; + q->head.head.ptr = &q->tail.data.marker; + } + } + } + + if (q->head.unref_end == (ErtsThrQElement_t *) ilast) + ERTS_THR_MEMORY_BARRIER; + else { + q->head.next.unref_end = (ErtsThrQElement_t *) ilast; + ERTS_THR_MEMORY_BARRIER; +#ifdef ERTS_SMP + q->head.next.thr_progress = erts_thr_progress_later(); +#endif + erts_atomic32_set_relb(&q->tail.data.um_refc_ix, + um_refc_ix); + q->head.next.um_refc_ix = um_refc_ix == 0 ? 1 : 0; +#ifdef ERTS_SMP + q->head.next.thr_progress_reached = 0; +#endif + } + } +#ifdef ERTS_SMP + } +#endif + + if (q->head.first == q->head.head.ptr) { + inspect_head: + if (!q->head.used_marker) { + erts_aint_t inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) { + q->head.used_marker = 1; + (void) enqueue_managed(q, &q->tail.data.marker, 0); + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == (erts_aint_t) &q->tail.data.marker) { + q->head.head.ptr->next.ptr = &q->tail.data.marker; + q->head.head.ptr = &q->tail.data.marker; +#ifdef ERTS_SMP + if (!q->head.next.thr_progress_reached) + return ERTS_THR_Q_NEED_THR_PRGR; +#else + if (do_notify) + q->head.notify(q->head.arg); +#endif + return ERTS_THR_Q_DIRTY; + } + } + } + return ERTS_THR_Q_CLEAN; + } + + if (q->head.first != q->head.unref_end) { + if (do_notify) + q->head.notify(q->head.arg); + return ERTS_THR_Q_DIRTY; + } + +#ifdef ERTS_SMP + if (!q->head.next.thr_progress_reached) + return ERTS_THR_Q_NEED_THR_PRGR; +#endif + + return ERTS_THR_Q_CLEAN; /* Waiting for unmanaged threads to complete... */ +} + +#endif + +ErtsThrQCleanState_t +erts_thr_q_clean(ErtsThrQ_t *q) +{ +#ifdef USE_THREADS + return clean(q, ERTS_THR_Q_MAX_SCHED_CLEAN_OPS, 0); +#else + return ERTS_THR_Q_CLEAN; +#endif +} + +ErtsThrQCleanState_t +erts_thr_q_inspect(ErtsThrQ_t *q, int ensure_empty) +{ +#ifdef USE_THREADS + if (ensure_empty) { + erts_aint_t inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext != ERTS_AINT_NULL) { + if (&q->tail.data.marker != (ErtsThrQElement_t *) inext) + return ERTS_THR_Q_DIRTY; + else { + q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; + q->head.head.ptr = (ErtsThrQElement_t *) inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext != ERTS_AINT_NULL) + return ERTS_THR_Q_DIRTY; + } + } + } + + if (q->head.first == q->head.head.ptr) { + if (!q->head.used_marker) { + erts_aint_t inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) + return ERTS_THR_Q_DIRTY; + } + return ERTS_THR_Q_CLEAN; + } + + if (q->head.first != q->head.unref_end) + return ERTS_THR_Q_DIRTY; + +#ifdef ERTS_SMP + if (!q->head.next.thr_progress_reached) + return ERTS_THR_Q_NEED_THR_PRGR; +#endif +#endif + return ERTS_THR_Q_CLEAN; +} + +static void +enqueue(ErtsThrQ_t *q, void *data, ErtsThrQElement_t *this) +{ +#ifndef USE_THREADS + ASSERT(data); + + this->next.ptr = NULL; + this->data.ptr = data; + + if (q->last) + q->last->next.ptr = this; + else { + q->first = q->last = this; + q->init.notify(q->init.arg); + } +#else + int notify; + int um_refc_ix = 0; +#ifdef ERTS_SMP + int unmanaged_thread; +#endif + +#if ERTS_THR_Q_DBG_CHK_DATA + if (!data) + erl_exit(ERTS_ABORT_EXIT, "Missing data in enqueue\n"); +#endif + + ASSERT(!q->q.finalizing); + + this->data.ptr = data; + +#ifdef ERTS_SMP + unmanaged_thread = !erts_thr_progress_is_managed_thread(); + if (unmanaged_thread) +#endif + { + um_refc_ix = erts_atomic32_read_acqb(&q->tail.data.um_refc_ix); + while (1) { + int tmp_um_refc_ix; + erts_atomic_inc_acqb(&q->tail.data.um_refc[um_refc_ix]); + tmp_um_refc_ix = erts_atomic32_read_acqb(&q->tail.data.um_refc_ix); + if (tmp_um_refc_ix == um_refc_ix) + break; + erts_atomic_dec_relb(&q->tail.data.um_refc[um_refc_ix]); + um_refc_ix = tmp_um_refc_ix; + } + } + + notify = this == enqueue_managed(q, this, 1); + + +#ifdef ERTS_SMP + if (unmanaged_thread) +#endif + { + if (notify) + erts_atomic_dec_relb(&q->tail.data.um_refc[um_refc_ix]); + else if (erts_atomic_dec_read_relb(&q->tail.data.um_refc[um_refc_ix]) == 0) + notify = 1; + } + if (notify) + q->tail.data.notify(q->tail.data.arg); +#endif +} + +void +erts_thr_q_enqueue(ErtsThrQ_t *q, void *data) +{ + enqueue(q, data, element_alloc(q)); +} + +ErtsThrQPrepEnQ_t * +erts_thr_q_prepare_enqueue(ErtsThrQ_t *q) +{ + return (ErtsThrQPrepEnQ_t *) element_alloc(q); +} + +int +erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *q, ErtsThrQFinDeQ_t *fdp) +{ +#ifndef USE_THREADS + return 0; +#else +#ifdef DEBUG + if (!q->head.deq_fini.start) { + ASSERT(!q->head.deq_fini.end); + } + else { + ErtsThrQElement_t *e = q->head.deq_fini.start; + ErtsThrQElement_t *end = q->head.deq_fini.end; + while (e != end) { + ASSERT(q->head.head.ptr != e); + ASSERT(q->head.first != e); + ASSERT(q->head.unref_end != e); + e = e->next.ptr; + } + } +#endif + fdp->start = q->head.deq_fini.start; + fdp->end = q->head.deq_fini.end; + if (fdp->end) + fdp->end->next.ptr = NULL; + q->head.deq_fini.start = NULL; + q->head.deq_fini.end = NULL; + return fdp->start != NULL; +#endif +} + +void +erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *fdp0, + ErtsThrQFinDeQ_t *fdp1) +{ +#ifdef USE_THREADS + if (fdp1->start) { + if (fdp0->end) + fdp0->end->next.ptr = fdp1->start; + else + fdp0->start = fdp1->start; + fdp0->end = fdp1->end; + } +#endif +} + + +int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *state) +{ +#ifdef USE_THREADS + ErtsThrQElement_t *start = state->start; + if (start) { + ErtsThrQLive_t live; + int i; + for (i = 0; i < ERTS_THR_Q_MAX_FINI_DEQ_OPS; i++) { + ErtsThrQElement_t *tmp; + if (!start) + break; + tmp = start; + start = start->next.ptr; + live = (ErtsThrQLive_t) (UWord) tmp->data.ptr; + element_live_free(live, tmp); + } + state->start = start; + if (start) + return 1; /* More to do */ + state->end = NULL; + } +#endif + return 0; +} + +void +erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *state) +{ +#ifdef USE_THREADS + state->start = NULL; + state->end = NULL; +#endif +} + + +void +erts_thr_q_enqueue_prepared(ErtsThrQ_t *q, void *data, ErtsThrQPrepEnQ_t *prep) +{ + ASSERT(prep); + enqueue(q, data, (ErtsThrQElement_t *) prep); +} + +void * +erts_thr_q_dequeue(ErtsThrQ_t *q) +{ +#ifndef USE_THREADS + void *res; + ErtsThrQElement_t *tmp; + + if (!q->first) + return NULL; + tmp = q->first; + res = tmp->data.ptr; + q->first = tmp->next.ptr; + if (!q->first) + q->last = NULL; + + element_free(q, tmp); + + return res; +#else + erts_aint_t inext; + void *res; + + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) + return NULL; + q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; + q->head.head.ptr = (ErtsThrQElement_t *) inext; + if (q->head.head.ptr == &q->tail.data.marker) { + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) + return NULL; + q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; + q->head.head.ptr = (ErtsThrQElement_t *) inext; + } + res = q->head.head.ptr->data.ptr; +#if ERTS_THR_Q_DBG_CHK_DATA + q->head.head.ptr->data.ptr = NULL; + if (!res) + erl_exit(ERTS_ABORT_EXIT, "Missing data in dequeue\n"); +#endif + clean(q, + (q->head.deq_fini.automatic + ? ERTS_THR_Q_MAX_DEQUEUE_CLEAN_OPS + : ERTS_THR_Q_MAX_SCHED_CLEAN_OPS), 1); + return res; +#endif +} |