From 9bed74c6b44f691c7c6572ec2c9f57219d8894a6 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Sun, 9 Oct 2011 01:00:51 +0200 Subject: Implement generic lock-free queue The implementation of an ERTS internal, generic, many to one, lock-free queue for communication between threads. The many to one scenario is very common in ERTS, so it can be used in a lot of places in the future. Changing to this queue from a lock based queue, however, often requires some redesigning. This since we have often used the lock of the queue to protect other information too. --- erts/emulator/Makefile.in | 2 +- erts/emulator/beam/erl_alloc.c | 5 + erts/emulator/beam/erl_alloc.types | 22 ++ erts/emulator/beam/erl_init.c | 2 + erts/emulator/beam/erl_thr_queue.c | 745 +++++++++++++++++++++++++++++++++++++ erts/emulator/beam/erl_thr_queue.h | 211 +++++++++++ erts/emulator/beam/utils.c | 1 + 7 files changed, 987 insertions(+), 1 deletion(-) create mode 100644 erts/emulator/beam/erl_thr_queue.c create mode 100644 erts/emulator/beam/erl_thr_queue.h diff --git a/erts/emulator/Makefile.in b/erts/emulator/Makefile.in index 6ccad081e5..15244a6589 100644 --- a/erts/emulator/Makefile.in +++ b/erts/emulator/Makefile.in @@ -743,7 +743,7 @@ RUN_OBJS = \ $(OBJDIR)/packet_parser.o $(OBJDIR)/safe_hash.o \ $(OBJDIR)/erl_zlib.o $(OBJDIR)/erl_nif.o \ $(OBJDIR)/erl_bif_binary.o $(OBJDIR)/erl_ao_firstfit_alloc.o \ - $(OBJDIR)/erl_sched_spec_pre_alloc.o + $(OBJDIR)/erl_thr_queue.o $(OBJDIR)/erl_sched_spec_pre_alloc.o ifeq ($(TARGET),win32) DRV_OBJS = \ diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c index 705ace26fa..cce4b4adf0 100644 --- a/erts/emulator/beam/erl_alloc.c +++ b/erts/emulator/beam/erl_alloc.c @@ -41,6 +41,7 @@ #include "erl_monitors.h" #include "erl_bif_timer.h" #include "erl_cpu_topology.h" +#include "erl_thr_queue.h" #if defined(ERTS_ALC_T_DRV_SEL_D_STATE) || defined(ERTS_ALC_T_DRV_EV_D_STATE) #include "erl_check_io.h" #endif @@ -524,6 +525,10 @@ erts_alloc_init(int *argc, char **argv, ErtsAllocInitOpts *eaiop) = sizeof(ErtsDrvSelectDataState); fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_MSG_REF)] = sizeof(ErlMessage); +#ifdef ERTS_SMP + fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_THR_Q_EL_SL)] + = sizeof(ErtsThrQElement_t); +#endif #ifdef HARD_DEBUG hdbg_init(); #endif diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 79d3433fc0..4efad0197b 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -50,6 +50,15 @@ # command line argument to make_alloc_types. The variable X is false # after a "+disable X" statement or if it has never been mentioned. ++if smp ++disable threads_no_smp ++else ++if threads ++enable threads_no_smp ++else ++disable threads_no_smp ++endif ++endif # --- Allocator declarations ------------------------------------------------- # @@ -254,6 +263,19 @@ type ZLIB STANDARD SYSTEM zlib type CPU_GRPS_MAP LONG_LIVED SYSTEM cpu_groups_map type AUX_WORK_TMO LONG_LIVED SYSTEM aux_work_timeouts ++if threads_no_smp +# Need thread safe allocs, but std_alloc and fix_alloc are not; +# use driver_alloc which is... +type THR_Q_EL DRIVER SYSTEM thr_q_element +type THR_Q_EL_SL DRIVER SYSTEM sl_thr_q_element ++else +type THR_Q_EL STANDARD SYSTEM thr_q_element +type THR_Q_EL_SL FIXED_SIZE SYSTEM sl_thr_q_element ++endif +type THR_Q STANDARD SYSTEM thr_queue +type THR_Q_SL SHORT_LIVED SYSTEM short_lived_thr_queue +type THR_Q_LL LONG_LIVED SYSTEM long_lived_thr_queue + +if smp type ASYNC SHORT_LIVED SYSTEM async +else diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index f4f2a4d011..5fe44afdce 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -43,6 +43,7 @@ #include "packet_parser.h" #include "erl_cpu_topology.h" #include "erl_thr_progress.h" +#include "erl_thr_queue.h" #ifdef HIPE #include "hipe_mode_switch.h" /* for hipe_mode_switch_init() */ @@ -786,6 +787,7 @@ early_init(int *argc, char **argv) /* */ erts_thr_progress_init(no_schedulers, no_schedulers+1, 0); #endif + erts_thr_q_init(); erts_init_utils(); erts_early_init_cpu_topology(no_schedulers, &max_main_threads, diff --git a/erts/emulator/beam/erl_thr_queue.c b/erts/emulator/beam/erl_thr_queue.c new file mode 100644 index 0000000000..9ac4cd4b8e --- /dev/null +++ b/erts/emulator/beam/erl_thr_queue.c @@ -0,0 +1,745 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2011. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +/* + * Description: Lock-free queue for communication between threads. + * + * Currently only a many-to-one version has been, + * implemented, i.e., many threads can enqueue but + * only one thread can dequeue at a time. It doesn't + * have to be the same thread dequeuing every time, but + * synchronization so that only one thread dequeues + * at a time has to be provided by other means. + * + * When/If the need for a many-to-many queue arises, + * this implementation can relatively easy be extended + * to support that too. + * + * Usage instructions below. + * + * Author: Rickard Green + */ + +/* + * ------ Usage instructions ----------------------------------------------- + * + * Dequeuing generates garbage that needs to be cleaned up. + * erts_thr_q_dequeue() automatically cleans, but garbage may have to be + * cleaned up also when the queue is empty. This is done by calling + * erts_thr_q_clean(). In the SMP case thread progress may have to be made + * before cleaning can continue. If so, erts_thr_q_need_thr_progress() in + * combination with erts_thr_progress_wakeup() can be used in order to + * request a wakeup at appropriate time. + * + * Enqueuing implies memory allocation and dequeuing implies memory + * deallocation. Memory allocation can be moved to another more suitable + * thread using erts_thr_q_prepare_enqueue() together with + * erts_thr_q_enqueue_prepared() instead of using erts_thr_q_enqueue(). + * Memory deallocation can can be moved to another more suitable thread by + * disabling auto_finalize_dequeue when initializing the queue and then use + * erts_thr_q_get_finalize_dequeue_data() together + * erts_thr_q_finalize_dequeue() after dequeuing or cleaning. + * + * Ending the life of the queue using either erts_thr_q_destroy() + * or erts_thr_q_finalize() impies cleaning the queue. Both functions + * return the cleaning result and may have to be called multiple times + * until the queue is clean. Once one of these functions have been called + * enqueuing is not allowed. This has to be synchronized by the user. + * If auto_finalize_dequeue has been disabled, the finalize dequeue + * functionality has to be called after ending the life of the queue just + * as when dequeuing or cleaning on a queue that is alive. + * + * ------------------------------------------------------------------------- + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "erl_thr_queue.h" + +#if defined(DEBUG) +#define ERTS_THR_Q_DBG_CHK_DATA 1 +#else +#define ERTS_THR_Q_DBG_CHK_DATA 0 +#endif + +#define ERTS_THR_Q_MAX_CLEAN_REACHED_HEAD_COUNT 100 +#define ERTS_THR_Q_MAX_SCHED_CLEAN_OPS 50 +#define ERTS_THR_Q_MAX_DEQUEUE_CLEAN_OPS 3 + +#define ERTS_THR_Q_MAX_FINI_DEQ_OPS 50 + +#ifdef ERTS_SMP +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(sl_element, + ErtsThrQElement_t, + 1000, + ERTS_ALC_T_THR_Q_EL_SL) +#else + +static void +init_sl_element_alloc(void) +{ +} + +static ErtsThrQElement_t * +sl_element_alloc(void) +{ + return erts_alloc(ERTS_ALC_T_THR_Q_EL_SL, + sizeof(ErtsThrQElement_t)); +} + +static void +sl_element_free(ErtsThrQElement_t *p) +{ + erts_free(ERTS_ALC_T_THR_Q_EL_SL, p); +} + +#endif + +typedef union { + ErtsThrQ_t q; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQ_t))]; +} ErtsAlignedThrQ_t; + +void +erts_thr_q_init(void) +{ + init_sl_element_alloc(); +} + +static void noop_callback(void *arg) { } + +void +erts_thr_q_initialize(ErtsThrQ_t *q, ErtsThrQInit_t *qi) +{ +#ifndef USE_THREADS + q->init = *qi; + if (!q->init.notify) + q->init.notify = noop_callback; + q->first = NULL; + q->last = NULL; + q->q.blk = NULL; +#else + erts_atomic_init_nob(&q->tail.data.marker.next.atmc, ERTS_AINT_NULL); + q->tail.data.marker.data.ptr = NULL; + erts_atomic_init_nob(&q->tail.data.last, + (erts_aint_t) &q->tail.data.marker); + erts_atomic_init_nob(&q->tail.data.um_refc[0], 0); + erts_atomic_init_nob(&q->tail.data.um_refc[1], 0); + erts_atomic32_init_nob(&q->tail.data.um_refc_ix, 0); + q->tail.data.live = qi->live.objects; + q->tail.data.arg = qi->arg; + q->tail.data.notify = qi->notify; + if (!q->tail.data.notify) + q->tail.data.notify = noop_callback; + + q->head.head.ptr = &q->tail.data.marker; + q->head.live = qi->live.objects; + q->head.first = &q->tail.data.marker; + q->head.unref_end = &q->tail.data.marker; + q->head.clean_reached_head_count = 0; + q->head.deq_fini.automatic = qi->auto_finalize_dequeue; + q->head.deq_fini.start = NULL; + q->head.deq_fini.end = NULL; +#ifdef ERTS_SMP + q->head.next.thr_progress = erts_thr_progress_current(); + q->head.next.thr_progress_reached = 1; +#endif + q->head.next.um_refc_ix = 1; + q->head.next.unref_end = &q->tail.data.marker; + q->head.used_marker = 1; + q->head.arg = qi->arg; + q->head.notify = q->tail.data.notify; + q->q.finalizing = 0; + q->q.live = qi->live.queue; + q->q.blk = NULL; +#endif +} + +ErtsThrQCleanState_t +erts_thr_q_finalize(ErtsThrQ_t *q) +{ +#ifdef USE_THREADS + q->q.finalizing = 1; +#endif + while (erts_thr_q_dequeue(q)); + return erts_thr_q_clean(q); +} + +ErtsThrQ_t * +erts_thr_q_create(ErtsThrQInit_t *qi) +{ + ErtsAlcType_t atype; + ErtsThrQ_t *q, *qblk; + UWord qw; + + switch (qi->live.queue) { + case ERTS_THR_Q_LIVE_SHORT: + atype = ERTS_ALC_T_THR_Q_SL; + break; + case ERTS_THR_Q_LIVE_LONG: + atype = ERTS_ALC_T_THR_Q_LL; + break; + default: + atype = ERTS_ALC_T_THR_Q; + break; + } + + qw = (UWord) erts_alloc(atype, + sizeof(ErtsThrQ_t) + (ERTS_CACHE_LINE_SIZE-1)); + qblk = (ErtsThrQ_t *) qw; + if (qw & ERTS_CACHE_LINE_MASK) + qw = (qw & ~ERTS_CACHE_LINE_MASK) + ERTS_CACHE_LINE_SIZE; + ASSERT((qw & ERTS_CACHE_LINE_MASK) == 0); + q = (ErtsThrQ_t *) qw; + erts_thr_q_initialize(q, qi); + q->q.blk = qblk; + return q; +} + +ErtsThrQCleanState_t +erts_thr_q_destroy(ErtsThrQ_t *q) +{ + if (!q->q.blk) + erl_exit(ERTS_ABORT_EXIT, + "Trying to destroy not created thread queue\n"); + return erts_thr_q_finalize(q); +} + +#ifdef USE_THREADS + +static void +destroy(ErtsThrQ_t *q) +{ + ErtsAlcType_t atype; + switch (q->q.live) { + case ERTS_THR_Q_LIVE_SHORT: + atype = ERTS_ALC_T_THR_Q_SL; + break; + case ERTS_THR_Q_LIVE_LONG: + atype = ERTS_ALC_T_THR_Q_LL; + break; + default: + atype = ERTS_ALC_T_THR_Q; + break; + } + erts_free(atype, q->q.blk); +} + +#endif + +static ERTS_INLINE ErtsThrQElement_t * +element_live_alloc(ErtsThrQLive_t live) +{ + switch (live) { + case ERTS_THR_Q_LIVE_SHORT: + return sl_element_alloc(); + default: + return (ErtsThrQElement_t *) erts_alloc(ERTS_ALC_T_THR_Q_EL, + sizeof(ErtsThrQElement_t)); + } +} + +static ERTS_INLINE ErtsThrQElement_t * +element_alloc(ErtsThrQ_t *q) +{ + ErtsThrQLive_t live; +#ifdef USE_THREADS + live = q->tail.data.live; +#else + live = q->init.live.objects; +#endif + return element_live_alloc(live); +} + +static ERTS_INLINE void +element_live_free(ErtsThrQLive_t live, ErtsThrQElement_t *el) +{ + switch (live) { + case ERTS_THR_Q_LIVE_SHORT: + sl_element_free(el); + break; + default: + erts_free(ERTS_ALC_T_THR_Q_EL, el); + } +} + +static ERTS_INLINE void +element_free(ErtsThrQ_t *q, ErtsThrQElement_t *el) +{ + ErtsThrQLive_t live; +#ifdef USE_THREADS + live = q->head.live; +#else + live = q->init.live.objects; +#endif + element_live_free(live, el); +} + +#ifdef USE_THREADS + +static ERTS_INLINE ErtsThrQElement_t * +enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last) +{ + erts_aint_t ilast, itmp; + + erts_atomic_init_nob(&this->next.atmc, ERTS_AINT_NULL); + /* Enqueue at end of list... */ + + ilast = erts_atomic_read_nob(&q->tail.data.last); + while (1) { + ErtsThrQElement_t *last = (ErtsThrQElement_t *) ilast; + itmp = erts_atomic_cmpxchg_mb(&last->next.atmc, + (erts_aint_t) this, + ERTS_AINT_NULL); + if (itmp == ERTS_AINT_NULL) + break; + ilast = itmp; + } + + /* Move last pointer forward... */ + while (1) { + if (want_last) { + if (erts_atomic_read_rb(&this->next.atmc) != ERTS_AINT_NULL) { + /* Someone else will move it forward */ + ilast = erts_atomic_read_rb(&q->tail.data.last); + return (ErtsThrQElement_t *) ilast; + } + } + else { + if (erts_atomic_read_nob(&this->next.atmc) != ERTS_AINT_NULL) { + /* Someone else will move it forward */ + return NULL; + } + } + itmp = erts_atomic_cmpxchg_mb(&q->tail.data.last, + (erts_aint_t) this, + ilast); + if (ilast == itmp) + return want_last ? this : NULL; + ilast = itmp; + } +} + +static ErtsThrQCleanState_t +clean(ErtsThrQ_t *q, int max_ops, int do_notify) +{ + erts_aint_t ilast; + int um_refc_ix; + int ops; + + for (ops = 0; ops < max_ops; ops++) { + ErtsThrQElement_t *tmp; + restart: + ASSERT(q->head.first); + if (q->head.first == q->head.head.ptr) { + q->head.clean_reached_head_count++; + if (q->head.clean_reached_head_count + >= ERTS_THR_Q_MAX_CLEAN_REACHED_HEAD_COUNT) { + q->head.clean_reached_head_count = 0; + break; + } + goto inspect_head; + } + if (q->head.first == q->head.unref_end) + break; + if (q->head.first == &q->tail.data.marker) { + q->head.used_marker = 0; + q->head.first = q->head.first->next.ptr; + goto restart; + } + tmp = q->head.first; + q->head.first = q->head.first->next.ptr; + if (q->head.deq_fini.automatic) + element_free(q, tmp); + else { + tmp->data.ptr = (void *) (UWord) q->head.live; + if (!q->head.deq_fini.start) + q->head.deq_fini.start = tmp; + else if (q->head.deq_fini.end->next.ptr == &q->tail.data.marker) + q->head.deq_fini.end->next.ptr = tmp; + q->head.deq_fini.end = tmp; + } + } + + ilast = erts_atomic_read_nob(&q->tail.data.last); + if (q->head.first == ((ErtsThrQElement_t *) ilast) + && ((ErtsThrQElement_t *) ilast) == &q->tail.data.marker + && q->head.first == &q->tail.data.marker) { + /* Empty and clean queue */ + if (q->q.finalizing) + destroy(q); + return ERTS_THR_Q_CLEAN; + } + +#ifdef ERTS_SMP + if (q->head.next.thr_progress_reached + || erts_thr_progress_has_reached(q->head.next.thr_progress)) { + q->head.next.thr_progress_reached = 1; +#endif + um_refc_ix = q->head.next.um_refc_ix; + if (erts_atomic_read_acqb(&q->tail.data.um_refc[um_refc_ix]) == 0) { + /* Move unreferenced end pointer forward... */ + q->head.clean_reached_head_count = 0; + q->head.unref_end = q->head.next.unref_end; + + if (!q->head.used_marker + && q->head.unref_end == (ErtsThrQElement_t *) ilast) { + q->head.used_marker = 1; + ilast = (erts_aint_t) enqueue_managed(q, + &q->tail.data.marker, + 1); + if (q->head.head.ptr == q->head.unref_end) { + ErtsThrQElement_t *next; + next = ((ErtsThrQElement_t *) + erts_atomic_read_acqb(&q->head.head.ptr->next.atmc)); + if (next == &q->tail.data.marker) { + q->head.head.ptr->next.ptr = &q->tail.data.marker; + q->head.head.ptr = &q->tail.data.marker; + } + } + } + + if (q->head.unref_end == (ErtsThrQElement_t *) ilast) + ERTS_THR_MEMORY_BARRIER; + else { + q->head.next.unref_end = (ErtsThrQElement_t *) ilast; + ERTS_THR_MEMORY_BARRIER; +#ifdef ERTS_SMP + q->head.next.thr_progress = erts_thr_progress_later(); +#endif + erts_atomic32_set_relb(&q->tail.data.um_refc_ix, + um_refc_ix); + q->head.next.um_refc_ix = um_refc_ix == 0 ? 1 : 0; +#ifdef ERTS_SMP + q->head.next.thr_progress_reached = 0; +#endif + } + } +#ifdef ERTS_SMP + } +#endif + + if (q->head.first == q->head.head.ptr) { + inspect_head: + if (!q->head.used_marker) { + erts_aint_t inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) { + q->head.used_marker = 1; + (void) enqueue_managed(q, &q->tail.data.marker, 0); + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == (erts_aint_t) &q->tail.data.marker) { + q->head.head.ptr->next.ptr = &q->tail.data.marker; + q->head.head.ptr = &q->tail.data.marker; +#ifdef ERTS_SMP + if (!q->head.next.thr_progress_reached) + return ERTS_THR_Q_NEED_THR_PRGR; +#else + if (do_notify) + q->head.notify(q->head.arg); +#endif + return ERTS_THR_Q_DIRTY; + } + } + } + return ERTS_THR_Q_CLEAN; + } + + if (q->head.first != q->head.unref_end) { + if (do_notify) + q->head.notify(q->head.arg); + return ERTS_THR_Q_DIRTY; + } + +#ifdef ERTS_SMP + if (!q->head.next.thr_progress_reached) + return ERTS_THR_Q_NEED_THR_PRGR; +#endif + + return ERTS_THR_Q_CLEAN; /* Waiting for unmanaged threads to complete... */ +} + +#endif + +ErtsThrQCleanState_t +erts_thr_q_clean(ErtsThrQ_t *q) +{ +#ifdef USE_THREADS + return clean(q, ERTS_THR_Q_MAX_SCHED_CLEAN_OPS, 0); +#else + return ERTS_THR_Q_CLEAN; +#endif +} + +ErtsThrQCleanState_t +erts_thr_q_inspect(ErtsThrQ_t *q, int ensure_empty) +{ +#ifdef USE_THREADS + if (ensure_empty) { + erts_aint_t inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext != ERTS_AINT_NULL) { + if (&q->tail.data.marker != (ErtsThrQElement_t *) inext) + return ERTS_THR_Q_DIRTY; + else { + q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; + q->head.head.ptr = (ErtsThrQElement_t *) inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext != ERTS_AINT_NULL) + return ERTS_THR_Q_DIRTY; + } + } + } + + if (q->head.first == q->head.head.ptr) { + if (!q->head.used_marker) { + erts_aint_t inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) + return ERTS_THR_Q_DIRTY; + } + return ERTS_THR_Q_CLEAN; + } + + if (q->head.first != q->head.unref_end) + return ERTS_THR_Q_DIRTY; + +#ifdef ERTS_SMP + if (!q->head.next.thr_progress_reached) + return ERTS_THR_Q_NEED_THR_PRGR; +#endif +#endif + return ERTS_THR_Q_CLEAN; +} + +static void +enqueue(ErtsThrQ_t *q, void *data, ErtsThrQElement_t *this) +{ +#ifndef USE_THREADS + ASSERT(data); + + this->next.ptr = NULL; + this->data.ptr = data; + + if (q->last) + q->last->next.ptr = this; + else { + q->first = q->last = this; + q->init.notify(q->init.arg); + } +#else + int notify; + int um_refc_ix = 0; +#ifdef ERTS_SMP + int unmanaged_thread; +#endif + +#if ERTS_THR_Q_DBG_CHK_DATA + if (!data) + erl_exit(ERTS_ABORT_EXIT, "Missing data in enqueue\n"); +#endif + + ASSERT(!q->q.finalizing); + + this->data.ptr = data; + +#ifdef ERTS_SMP + unmanaged_thread = !erts_thr_progress_is_managed_thread(); + if (unmanaged_thread) +#endif + { + um_refc_ix = erts_atomic32_read_acqb(&q->tail.data.um_refc_ix); + while (1) { + int tmp_um_refc_ix; + erts_atomic_inc_acqb(&q->tail.data.um_refc[um_refc_ix]); + tmp_um_refc_ix = erts_atomic32_read_acqb(&q->tail.data.um_refc_ix); + if (tmp_um_refc_ix == um_refc_ix) + break; + erts_atomic_dec_relb(&q->tail.data.um_refc[um_refc_ix]); + um_refc_ix = tmp_um_refc_ix; + } + } + + notify = this == enqueue_managed(q, this, 1); + + +#ifdef ERTS_SMP + if (unmanaged_thread) +#endif + { + if (notify) + erts_atomic_dec_relb(&q->tail.data.um_refc[um_refc_ix]); + else if (erts_atomic_dec_read_relb(&q->tail.data.um_refc[um_refc_ix]) == 0) + notify = 1; + } + if (notify) + q->tail.data.notify(q->tail.data.arg); +#endif +} + +void +erts_thr_q_enqueue(ErtsThrQ_t *q, void *data) +{ + enqueue(q, data, element_alloc(q)); +} + +ErtsThrQPrepEnQ_t * +erts_thr_q_prepare_enqueue(ErtsThrQ_t *q) +{ + return (ErtsThrQPrepEnQ_t *) element_alloc(q); +} + +int +erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *q, ErtsThrQFinDeQ_t *fdp) +{ +#ifndef USE_THREADS + return 0; +#else +#ifdef DEBUG + if (!q->head.deq_fini.start) { + ASSERT(!q->head.deq_fini.end); + } + else { + ErtsThrQElement_t *e = q->head.deq_fini.start; + ErtsThrQElement_t *end = q->head.deq_fini.end; + while (e != end) { + ASSERT(q->head.head.ptr != e); + ASSERT(q->head.first != e); + ASSERT(q->head.unref_end != e); + e = e->next.ptr; + } + } +#endif + fdp->start = q->head.deq_fini.start; + fdp->end = q->head.deq_fini.end; + if (fdp->end) + fdp->end->next.ptr = NULL; + q->head.deq_fini.start = NULL; + q->head.deq_fini.end = NULL; + return fdp->start != NULL; +#endif +} + +void +erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *fdp0, + ErtsThrQFinDeQ_t *fdp1) +{ +#ifdef USE_THREADS + if (fdp1->start) { + if (fdp0->end) + fdp0->end->next.ptr = fdp1->start; + else + fdp0->start = fdp1->start; + fdp0->end = fdp1->end; + } +#endif +} + + +int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *state) +{ +#ifdef USE_THREADS + ErtsThrQElement_t *start = state->start; + if (start) { + ErtsThrQLive_t live; + int i; + for (i = 0; i < ERTS_THR_Q_MAX_FINI_DEQ_OPS; i++) { + ErtsThrQElement_t *tmp; + if (!start) + break; + tmp = start; + start = start->next.ptr; + live = (ErtsThrQLive_t) (UWord) tmp->data.ptr; + element_live_free(live, tmp); + } + state->start = start; + if (start) + return 1; /* More to do */ + state->end = NULL; + } +#endif + return 0; +} + +void +erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *state) +{ +#ifdef USE_THREADS + state->start = NULL; + state->end = NULL; +#endif +} + + +void +erts_thr_q_enqueue_prepared(ErtsThrQ_t *q, void *data, ErtsThrQPrepEnQ_t *prep) +{ + ASSERT(prep); + enqueue(q, data, (ErtsThrQElement_t *) prep); +} + +void * +erts_thr_q_dequeue(ErtsThrQ_t *q) +{ +#ifndef USE_THREADS + void *res; + ErtsThrQElement_t *tmp; + + if (!q->first) + return NULL; + tmp = q->first; + res = tmp->data.ptr; + q->first = tmp->next.ptr; + if (!q->first) + q->last = NULL; + + element_free(q, tmp); + + return res; +#else + erts_aint_t inext; + void *res; + + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) + return NULL; + q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; + q->head.head.ptr = (ErtsThrQElement_t *) inext; + if (q->head.head.ptr == &q->tail.data.marker) { + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) + return NULL; + q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; + q->head.head.ptr = (ErtsThrQElement_t *) inext; + } + res = q->head.head.ptr->data.ptr; +#if ERTS_THR_Q_DBG_CHK_DATA + q->head.head.ptr->data.ptr = NULL; + if (!res) + erl_exit(ERTS_ABORT_EXIT, "Missing data in dequeue\n"); +#endif + clean(q, + (q->head.deq_fini.automatic + ? ERTS_THR_Q_MAX_DEQUEUE_CLEAN_OPS + : ERTS_THR_Q_MAX_SCHED_CLEAN_OPS), 1); + return res; +#endif +} diff --git a/erts/emulator/beam/erl_thr_queue.h b/erts/emulator/beam/erl_thr_queue.h new file mode 100644 index 0000000000..407c23f5eb --- /dev/null +++ b/erts/emulator/beam/erl_thr_queue.h @@ -0,0 +1,211 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2011. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +/* + * Description: Lock-free queue for communication between threads. + * + * Currently only a many-to-one version has been, + * implemented, i.e., many threads can enqueue but + * only one thread can dequeue at a time. It doesn't + * have to be the same thread dequeuing every time, but + * synchronization so that only one thread dequeues + * at a time has to be provided by other means. + * + * When/If the need for a many-to-many queue arises, + * this implementation can relatively easy be extended + * to support that too. + * + * Usage instructions can be found in erts_thr_queue.c + * + * Author: Rickard Green + */ + +#ifndef ERL_THR_QUEUE_H__ +#define ERL_THR_QUEUE_H__ + +#include "sys.h" +#include "erl_threads.h" +#include "erl_alloc.h" +#include "erl_thr_progress.h" + +typedef enum { + ERTS_THR_Q_LIVE_UNDEF, + ERTS_THR_Q_LIVE_SHORT, + ERTS_THR_Q_LIVE_LONG +} ErtsThrQLive_t; + +#define ERTS_THR_Q_INIT_DEFAULT \ +{ \ + { \ + ERTS_THR_Q_LIVE_UNDEF, \ + ERTS_THR_Q_LIVE_SHORT \ + }, \ + NULL, \ + NULL, \ + 1 \ +} + +typedef struct ErtsThrQ_t_ ErtsThrQ_t; + +typedef struct { + struct { + ErtsThrQLive_t queue; + ErtsThrQLive_t objects; + } live; + void *arg; + void (*notify)(void *); + int auto_finalize_dequeue; +} ErtsThrQInit_t; + +typedef struct ErtsThrQElement_t_ ErtsThrQElement_t; +typedef struct ErtsThrQElement_t ErtsThrQPrepEnQ_t; + +typedef union { + erts_atomic_t atmc; + ErtsThrQElement_t *ptr; +} ErtsThrQPtr_t; + +struct ErtsThrQElement_t_ { + ErtsThrQPtr_t next; + union { + erts_atomic_t atmc; + void *ptr; + } data; +}; + +typedef struct { + ErtsThrQElement_t *start; + ErtsThrQElement_t *end; +} ErtsThrQFinDeQ_t; + +typedef enum { + ERTS_THR_Q_CLEAN, +#ifdef ERTS_SMP + ERTS_THR_Q_NEED_THR_PRGR, +#endif + ERTS_THR_Q_DIRTY, +} ErtsThrQCleanState_t; + +#ifdef USE_THREADS + +typedef struct { + ErtsThrQElement_t marker; + erts_atomic_t last; + erts_atomic_t um_refc[2]; + erts_atomic32_t um_refc_ix; + ErtsThrQLive_t live; +#ifdef ERTS_SMP + erts_atomic32_t thr_prgr_clean_scheduled; +#endif + void *arg; + void (*notify)(void *); +} ErtsThrQTail_t; + +struct ErtsThrQ_t_ { + /* + * This structure needs to be cache line aligned for best + * performance. + */ + union { + /* Modified by threads enqueuing */ + ErtsThrQTail_t data; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQTail_t))]; + } tail; + /* + * Everything below this point is *only* accessed by the + * thread dequeuing. + */ + struct { + ErtsThrQPtr_t head; + ErtsThrQLive_t live; + ErtsThrQElement_t *first; + ErtsThrQElement_t *unref_end; + int clean_reached_head_count; + struct { + int automatic; + ErtsThrQElement_t *start; + ErtsThrQElement_t *end; + } deq_fini; + struct { +#ifdef ERTS_SMP + ErtsThrPrgrVal thr_progress; + int thr_progress_reached; +#endif + int um_refc_ix; + ErtsThrQElement_t *unref_end; + } next; + int used_marker; + void *arg; + void (*notify)(void *); + } head; + struct { + int finalizing; + ErtsThrQLive_t live; + void *blk; + } q; +}; + +#else /* !USE_THREADS */ + +struct ErtsThrQ_t_ { + ErtsThrQInit_t init; + ErtsThrQElement_t *first; + ErtsThrQElement_t *last; + struct { + void *blk; + } q; +}; + +#endif + +void erts_thr_q_init(void); +void erts_thr_q_initialize(ErtsThrQ_t *, ErtsThrQInit_t *); +ErtsThrQCleanState_t erts_thr_q_finalize(ErtsThrQ_t *); +ErtsThrQ_t *erts_thr_q_create(ErtsThrQInit_t *); +ErtsThrQCleanState_t erts_thr_q_destroy(ErtsThrQ_t *); +ErtsThrQCleanState_t erts_thr_q_clean(ErtsThrQ_t *); +ErtsThrQCleanState_t erts_thr_q_inspect(ErtsThrQ_t *, int); +ErtsThrQPrepEnQ_t *erts_thr_q_prepare_enqueue(ErtsThrQ_t *); +void erts_thr_q_enqueue_prepared(ErtsThrQ_t *, void *, ErtsThrQPrepEnQ_t *); +void erts_thr_q_enqueue(ErtsThrQ_t *, void *); +void * erts_thr_q_dequeue(ErtsThrQ_t *); +int erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *, + ErtsThrQFinDeQ_t *); +void erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *, + ErtsThrQFinDeQ_t *); +int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *); +void erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *); + +#ifdef ERTS_SMP +ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_q_need_thr_progress(ErtsThrQ_t *q); +#endif + +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +#ifdef ERTS_SMP +ERTS_GLB_INLINE ErtsThrPrgrVal +erts_thr_q_need_thr_progress(ErtsThrQ_t *q) +{ + return q->head.next.thr_progress; +} +#endif + +#endif /* ERTS_GLB_INLINE_INCL_FUNC_DEF */ + +#endif /* ERL_THR_QUEUE_H__ */ diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c index cc377b543d..aa86f4590d 100644 --- a/erts/emulator/beam/utils.c +++ b/erts/emulator/beam/utils.c @@ -43,6 +43,7 @@ #include "erl_smp.h" #include "erl_time.h" #include "erl_thr_progress.h" +#include "erl_thr_queue.h" #include "erl_sched_spec_pre_alloc.h" #undef M_TRIM_THRESHOLD -- cgit v1.2.3 From 933790021e5fa95e4e6242e3f2eb2fcf64666a57 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Sun, 9 Oct 2011 01:03:06 +0200 Subject: Use generic lock-free queue for misc aux work --- erts/emulator/beam/erl_alloc.c | 8 +- erts/emulator/beam/erl_alloc.types | 5 +- erts/emulator/beam/erl_db.c | 3 +- erts/emulator/beam/erl_lock_check.c | 2 - erts/emulator/beam/erl_process.c | 207 ++++++++++++++++++++++-------------- erts/emulator/beam/erl_process.h | 28 +++-- 6 files changed, 151 insertions(+), 102 deletions(-) diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c index cce4b4adf0..33d6cf5f2f 100644 --- a/erts/emulator/beam/erl_alloc.c +++ b/erts/emulator/beam/erl_alloc.c @@ -3075,10 +3075,10 @@ erts_request_alloc_info(struct process *c_p, #ifdef ERTS_SMP if (erts_no_schedulers > 1) - erts_smp_schedule_misc_aux_work(1, - erts_no_schedulers, - reply_alloc_info, - (void *) air); + erts_schedule_multi_misc_aux_work(1, + erts_no_schedulers, + reply_alloc_info, + (void *) air); #endif reply_alloc_info((void *) air); diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 4efad0197b..9f0cb681c0 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -262,15 +262,18 @@ type EXT_TERM_DATA SHORT_LIVED PROCESSES external_term_data type ZLIB STANDARD SYSTEM zlib type CPU_GRPS_MAP LONG_LIVED SYSTEM cpu_groups_map type AUX_WORK_TMO LONG_LIVED SYSTEM aux_work_timeouts +type MISC_AUX_WORK_Q LONG_LIVED SYSTEM misc_aux_work_q +if threads_no_smp # Need thread safe allocs, but std_alloc and fix_alloc are not; # use driver_alloc which is... type THR_Q_EL DRIVER SYSTEM thr_q_element type THR_Q_EL_SL DRIVER SYSTEM sl_thr_q_element +type MISC_AUX_WORK DRIVER SYSTEM misc_aux_work +else type THR_Q_EL STANDARD SYSTEM thr_q_element type THR_Q_EL_SL FIXED_SIZE SYSTEM sl_thr_q_element +type MISC_AUX_WORK SHORT_LIVED SYSTEM misc_aux_work +endif type THR_Q STANDARD SYSTEM thr_queue type THR_Q_SL SHORT_LIVED SYSTEM short_lived_thr_queue @@ -290,8 +293,6 @@ type XPORTS_LIST SHORT_LIVED SYSTEM extra_port_list type PROC_LCK_WTR LONG_LIVED SYSTEM proc_lock_waiter type PROC_LCK_QS LONG_LIVED SYSTEM proc_lock_queues type RUNQ_BLNS LONG_LIVED SYSTEM run_queue_balancing -type MISC_AUX_WORK_Q LONG_LIVED SYSTEM misc_aux_work_q -type MISC_AUX_WORK SHORT_LIVED SYSTEM misc_aux_work type THR_PRGR_IDATA LONG_LIVED SYSTEM thr_prgr_internal_data type THR_PRGR_DATA LONG_LIVED SYSTEM thr_prgr_data type T_THR_PRGR_DATA SHORT_LIVED SYSTEM temp_thr_prgr_data diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c index 0327850cb9..259ebd838e 100644 --- a/erts/emulator/beam/erl_db.c +++ b/erts/emulator/beam/erl_db.c @@ -277,8 +277,7 @@ static void schedule_free_dbtable(DbTable* tb) ASSERT(scheds >= 1); ASSERT(erts_refc_read(&tb->common.ref, 0) == 0); erts_refc_init(&tb->common.ref, scheds); - ERTS_THR_MEMORY_BARRIER; - erts_smp_schedule_misc_aux_work(0, scheds, chk_free_dbtable, tb); + erts_schedule_multi_misc_aux_work(0, scheds, chk_free_dbtable, tb); #else free_dbtable(tb); #endif diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c index 02d1407a2d..633be0ef58 100644 --- a/erts/emulator/beam/erl_lock_check.c +++ b/erts/emulator/beam/erl_lock_check.c @@ -176,8 +176,6 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "async_id", NULL }, { "pix_lock", "address" }, { "run_queues_lists", NULL }, - { "misc_aux_work_queue", "index" }, - { "misc_aux_work_pre_alloc_lock", "address" }, { "sched_stat", NULL }, { "run_queue_sleep_list", "address" }, #endif diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index f14a35bafd..4292522fba 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -40,6 +40,7 @@ #include "beam_bp.h" #include "erl_cpu_topology.h" #include "erl_thr_progress.h" +#include "erl_thr_queue.h" #define ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED (2000*CONTEXT_REDS) #define ERTS_RUNQ_CALL_CHECK_BALANCE_REDS \ @@ -125,7 +126,6 @@ ErtsLcPSDLocks erts_psd_required_locks[ERTS_PSD_SIZE]; #endif #ifdef ERTS_SMP - int erts_disable_proc_not_running_opt; static ErtsAuxWorkData *aux_thread_aux_work_data; @@ -361,6 +361,9 @@ dbg_chk_aux_work_val(erts_aint32_t value) #ifdef ERTS_SSI_AUX_WORK_MISC valid |= ERTS_SSI_AUX_WORK_MISC; #endif +#ifdef ERTS_SSI_AUX_WORK_MISC_THR_PRGR + valid |= ERTS_SSI_AUX_WORK_MISC_THR_PRGR; +#endif #ifdef ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM valid |= ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM; @@ -707,37 +710,37 @@ unset_aux_work_flags(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flgs) return erts_atomic32_read_band_nob(&ssi->aux_work, ~flgs); } -#ifdef ERTS_SMP - typedef struct erts_misc_aux_work_t_ erts_misc_aux_work_t; struct erts_misc_aux_work_t_ { - erts_misc_aux_work_t *next; void (*func)(void *); void *arg; }; -typedef struct { - erts_smp_mtx_t mtx; - erts_misc_aux_work_t *first; - erts_misc_aux_work_t *last; -} erts_misc_aux_work_q_t; +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_aux_work, + erts_misc_aux_work_t, + 200, + ERTS_ALC_T_MISC_AUX_WORK) typedef union { - erts_misc_aux_work_q_t data; - char align[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_misc_aux_work_q_t))]; + ErtsThrQ_t q; + char align[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQ_t))]; } erts_algnd_misc_aux_work_q_t; static erts_algnd_misc_aux_work_q_t *misc_aux_work_queues; -ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_aux_work, - erts_misc_aux_work_t, - 200, - ERTS_ALC_T_MISC_AUX_WORK) +static void +notify_aux_work(void *vssi) +{ + set_aux_work_flags_wakeup_nob((ErtsSchedulerSleepInfo *) vssi, + ERTS_SSI_AUX_WORK_MISC); +} static void init_misc_aux_work(void) { int ix; + ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT; + qinit.notify = notify_aux_work; init_misc_aux_work_alloc(); @@ -746,93 +749,127 @@ init_misc_aux_work(void) sizeof(erts_algnd_misc_aux_work_q_t) * (erts_no_schedulers+1)); - for (ix = 0; ix <= erts_no_schedulers; ix++) { - erts_smp_mtx_init_x(&misc_aux_work_queues[ix].data.mtx, - "misc_aux_work_queue", - make_small(ix)); - misc_aux_work_queues[ix].data.first = NULL; - misc_aux_work_queues[ix].data.last = NULL; +#ifdef ERTS_SMP + ix = 0; /* aux_thread + schedulers */ +#else + ix = 1; /* scheduler only */ +#endif + + for (; ix <= erts_no_schedulers; ix++) { + qinit.arg = (void *) ERTS_SCHED_SLEEP_INFO_IX(ix-1); + erts_thr_q_initialize(&misc_aux_work_queues[ix].q, &qinit); + } +} + +static erts_aint32_t +misc_aux_work_clean(ErtsThrQ_t *q, + ErtsAuxWorkData *awdp, + erts_aint32_t aux_work) +{ + switch (erts_thr_q_clean(q)) { + case ERTS_THR_Q_DIRTY: + set_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC); + return aux_work | ERTS_SSI_AUX_WORK_MISC; +#ifdef ERTS_SMP + case ERTS_THR_Q_NEED_THR_PRGR: + set_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC_THR_PRGR); + erts_thr_progress_wakeup(awdp->esdp, + erts_thr_q_need_thr_progress(q)); +#endif + case ERTS_THR_Q_CLEAN: + break; } + return aux_work; } static erts_aint32_t handle_misc_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t aux_work) { - int ix = (int) awdp->sched_id; - erts_misc_aux_work_t *mawp; + ErtsThrQ_t *q = &misc_aux_work_queues[awdp->sched_id].q; unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC); - - erts_smp_mtx_lock(&misc_aux_work_queues[ix].data.mtx); - mawp = misc_aux_work_queues[ix].data.first; - misc_aux_work_queues[ix].data.first = NULL; - misc_aux_work_queues[ix].data.last = NULL; - erts_smp_mtx_unlock(&misc_aux_work_queues[ix].data.mtx); - - while (mawp) { - erts_misc_aux_work_t *free_mawp; + while (1) { + erts_misc_aux_work_t *mawp = erts_thr_q_dequeue(q); + if (!mawp) + break; mawp->func(mawp->arg); - free_mawp = mawp; - mawp = mawp->next; - misc_aux_work_free(free_mawp); + misc_aux_work_free(mawp); } - return aux_work & ~ERTS_SSI_AUX_WORK_MISC; + return misc_aux_work_clean(q, awdp, aux_work & ~ERTS_SSI_AUX_WORK_MISC); } -static void -smp_schedule_misc_aux_work(int ix, - void (*func)(void *), - void *arg) +#ifdef ERTS_SMP + +static erts_aint32_t +handle_misc_aux_work_thr_prgr(ErtsAuxWorkData *awdp, + erts_aint32_t aux_work) { - erts_aint32_t aux_work; + if (!erts_thr_progress_has_reached(awdp->misc.thr_prgr)) + return aux_work; + + unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC_THR_PRGR); + + return misc_aux_work_clean(&misc_aux_work_queues[awdp->sched_id].q, + awdp, + aux_work & ~ERTS_SSI_AUX_WORK_MISC_THR_PRGR); +} + +#endif + +static ERTS_INLINE void +schedule_misc_aux_work(int sched_id, + void (*func)(void *), + void *arg) +{ + ErtsThrQ_t *q; erts_misc_aux_work_t *mawp; - ErtsSchedulerSleepInfo *ssi; - mawp = misc_aux_work_alloc(); +#ifdef ERTS_SMP + ASSERT(0 <= sched_id && sched_id <= erts_no_schedulers); +#else + ASSERT(sched_id == 1); +#endif + q = &misc_aux_work_queues[sched_id].q; + mawp = misc_aux_work_alloc(); mawp->func = func; mawp->arg = arg; - mawp->next = NULL; - - erts_smp_mtx_lock(&misc_aux_work_queues[ix].data.mtx); - if (!misc_aux_work_queues[ix].data.last) - misc_aux_work_queues[ix].data.first = mawp; - else - misc_aux_work_queues[ix].data.last->next = mawp; - misc_aux_work_queues[ix].data.last = mawp; - erts_smp_mtx_unlock(&misc_aux_work_queues[ix].data.mtx); + erts_thr_q_enqueue(q, mawp); +} - set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(ix-1), - ERTS_SSI_AUX_WORK_MISC); +void +erts_schedule_misc_aux_work(int sched_id, + void (*func)(void *), + void *arg) +{ + schedule_misc_aux_work(sched_id, func, arg); } void -erts_smp_schedule_misc_aux_work(int ignore_self, - int max_sched, - void (*func)(void *), - void *arg) +erts_schedule_multi_misc_aux_work(int ignore_self, + int max_sched, + void (*func)(void *), + void *arg) { - int ix, ignore_ix = -1; + int id, self = 0; if (ignore_self) { ErtsSchedulerData *esdp = erts_get_scheduler_data(); if (esdp) - ignore_ix = (int) esdp->no; + self = (int) esdp->no; } ASSERT(0 < max_sched && max_sched <= erts_no_schedulers); - for (ix = 1; ix <= max_sched; ix++) { - if (ix == ignore_ix) + for (id = 1; id <= max_sched; id++) { + if (id == self) continue; - smp_schedule_misc_aux_work(ix, func, arg); - } + schedule_misc_aux_work(id, func, arg); + } } -#endif - static erts_aint32_t handle_fix_alloc(ErtsAuxWorkData *awdp, erts_aint32_t aux_work) { @@ -964,14 +1001,14 @@ prep_setup_completed_dealloc(void *vproc) erts_aint32_t count = (erts_aint32_t) (erts_no_schedulers+1); if (erts_atomic32_dec_read_mb(&completed_dealloc_count) == count) { /* scheduler threads */ - erts_smp_schedule_misc_aux_work(0, - erts_no_schedulers, - setup_completed_dealloc, - vproc); + erts_schedule_multi_misc_aux_work(0, + erts_no_schedulers, + setup_completed_dealloc, + vproc); /* aux_thread */ - smp_schedule_misc_aux_work(0, - setup_completed_dealloc, - vproc); + erts_schedule_misc_aux_work(0, + setup_completed_dealloc, + vproc); } } @@ -992,14 +1029,14 @@ erts_debug_wait_deallocations(Process *c_p) erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL); erts_smp_proc_inc_refc(c_p); /* scheduler threads */ - erts_smp_schedule_misc_aux_work(0, - erts_no_schedulers, - prep_setup_completed_dealloc, - (void *) c_p); + erts_schedule_multi_misc_aux_work(0, + erts_no_schedulers, + prep_setup_completed_dealloc, + (void *) c_p); /* aux_thread */ - smp_schedule_misc_aux_work(0, - prep_setup_completed_dealloc, - (void *) c_p); + erts_schedule_misc_aux_work(0, + prep_setup_completed_dealloc, + (void *) c_p); return 1; } return 0; @@ -1062,11 +1099,15 @@ handle_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t aux_work) ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); } #ifdef ERTS_SMP + if (aux_work & ERTS_SSI_AUX_WORK_MISC_THR_PRGR) { + aux_work = handle_misc_aux_work_thr_prgr(awdp, aux_work); + ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); + } +#endif if (aux_work & ERTS_SSI_AUX_WORK_MISC) { aux_work = handle_misc_aux_work(awdp, aux_work); ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); } -#endif #ifdef ERTS_SMP_SCHEDULERS_NEED_TO_CHECK_CHILDREN if (aux_work & ERTS_SSI_AUX_WORK_CHECK_CHILDREN) { aux_work = handle_check_children(awdp, aux_work); @@ -3191,6 +3232,7 @@ init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp) awdp->esdp = esdp; awdp->ssi = esdp ? esdp->ssi : NULL; #ifdef ERTS_SMP + awdp->misc.thr_prgr = ERTS_THR_PRGR_VAL_WAITING; awdp->dd.thr_prgr = ERTS_THR_PRGR_VAL_WAITING; awdp->dd.completed_callback = NULL; awdp->dd.completed_arg = NULL; @@ -3382,9 +3424,10 @@ erts_init_scheduling(int mrq, int no_schedulers, int no_schedulers_online) init_aux_work_data(&esdp->aux_work_data, esdp); } -#ifdef ERTS_SMP init_misc_aux_work(); +#ifdef ERTS_SMP + erts_atomic32_init_nob(&completed_dealloc_count, 0); /* debug only */ aux_thread_aux_work_data = diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 337990a0db..895f5ae3c0 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -251,13 +251,16 @@ typedef enum { #define ERTS_SSI_AUX_WORK_SET_TMO (((erts_aint32_t) 1) << 0) #define ERTS_SSI_AUX_WORK_CHECK_CHILDREN (((erts_aint32_t) 1) << 1) #define ERTS_SSI_AUX_WORK_MISC (((erts_aint32_t) 1) << 2) -#define ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM (((erts_aint32_t) 1) << 3) -#define ERTS_SSI_AUX_WORK_FIX_ALLOC_DEALLOC (((erts_aint32_t) 1) << 4) #ifdef ERTS_SMP -#define ERTS_SSI_AUX_WORK_DD (((erts_aint32_t) 1) << 5) -#define ERTS_SSI_AUX_WORK_DD_THR_PRGR (((erts_aint32_t) 1) << 6) +#define ERTS_SSI_AUX_WORK_MISC_THR_PRGR (((erts_aint32_t) 1) << 3) #endif -#define ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK (((erts_aint32_t) 1) << 7) +#define ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM (((erts_aint32_t) 1) << 4) +#define ERTS_SSI_AUX_WORK_FIX_ALLOC_DEALLOC (((erts_aint32_t) 1) << 5) +#ifdef ERTS_SMP +#define ERTS_SSI_AUX_WORK_DD (((erts_aint32_t) 1) << 6) +#define ERTS_SSI_AUX_WORK_DD_THR_PRGR (((erts_aint32_t) 1) << 7) +#endif +#define ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK (((erts_aint32_t) 1) << 8) #if !HAVE_ERTS_MSEG # undef ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK @@ -404,6 +407,9 @@ typedef struct { ErtsSchedulerSleepInfo *ssi; struct { int ix; +#ifdef ERTS_SMP + ErtsThrPrgrVal thr_prgr; +#endif } misc; #ifdef ERTS_SMP struct { @@ -1094,12 +1100,14 @@ Eterm erts_multi_scheduling_blockers(Process *); void erts_start_schedulers(void); void erts_alloc_notify_delayed_dealloc(int); void erts_smp_notify_check_children_needed(void); -void -erts_smp_schedule_misc_aux_work(int ignore_self, - int max_sched, - void (*func)(void *), - void *arg); #endif +void erts_schedule_misc_aux_work(int sched_id, + void (*func)(void *), + void *arg); +void erts_schedule_multi_misc_aux_work(int ignore_self, + int max_sched, + void (*func)(void *), + void *arg); erts_aint32_t erts_set_aux_work_timeout(int, erts_aint32_t, int); void erts_sched_notify_check_cpu_bind(void); Uint erts_active_schedulers(void); -- cgit v1.2.3 From dcc7ecbf6af5420af2d5dbd0e97fc7a2e0e894a6 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Sun, 9 Oct 2011 00:03:14 +0200 Subject: Use generic lock-free queue for async threads Queues used for communication between async threads and scheduler threads have been replaced with lock-free queues. Drivers using the driver_async functionality are not automatically locked to the system anymore, and can be unloaded as any dynamically linked in driver. Scheduling of ready async jobs is now also interleaved in between other jobs. Previously all ready async jobs was performed at once. --- erts/doc/src/erl_driver.xml | 19 +- erts/emulator/beam/erl_alloc.types | 8 +- erts/emulator/beam/erl_async.c | 737 +++++++++++++-------- erts/emulator/beam/erl_async.h | 66 ++ erts/emulator/beam/erl_bif_info.c | 1 + erts/emulator/beam/erl_driver.h | 15 +- erts/emulator/beam/erl_init.c | 73 +- erts/emulator/beam/erl_lock_check.c | 7 +- erts/emulator/beam/erl_process.c | 94 +++ erts/emulator/beam/erl_process.h | 21 +- erts/emulator/beam/global.h | 6 - erts/emulator/beam/io.c | 6 +- erts/emulator/beam/sys.h | 11 +- erts/emulator/sys/unix/sys.c | 171 +---- erts/emulator/sys/vxworks/sys.c | 6 + erts/emulator/sys/win32/sys.c | 137 ---- erts/emulator/sys/win32/sys_env.c | 14 +- erts/emulator/test/driver_SUITE.erl | 77 ++- erts/emulator/test/driver_SUITE_data/Makefile.src | 3 +- .../test/driver_SUITE_data/async_blast_drv.c | 124 ++++ 20 files changed, 939 insertions(+), 657 deletions(-) create mode 100644 erts/emulator/beam/erl_async.h create mode 100644 erts/emulator/test/driver_SUITE_data/async_blast_drv.c diff --git a/erts/doc/src/erl_driver.xml b/erts/doc/src/erl_driver.xml index 2fb03954b6..8e18dd6657 100644 --- a/erts/doc/src/erl_driver.xml +++ b/erts/doc/src/erl_driver.xml @@ -1638,12 +1638,19 @@ ERL_DRV_EXT2TERM char *buf, ErlDrvUInt len Cancel an asynchronous call -

This function cancels an asynchronous operation, by removing - it from the queue. Only functions in the queue can be - cancelled; if a function is executing, it's too late to - cancel it. The async_free function is also called.

-

The return value is 1 if the operation was removed from the - queue, otherwise 0.

+

This function used to cancel a scheduled asynchronous operation, + if it was still in the queue. It returned 1 if it succeeded, and + 0 if it failed.

+

Since it could not guarantee success, it was more or less useless. + The user had to implement synchronization of cancellation anyway. + It also unnecessarily complicated the implementation. Therefore, + as of OTP-R15B driver_async_cancel() is deprecated, and + scheduled for removal in OTP-R16. It will currently always fail, + and return 0.

+

driver_async_cancel() is deferred and will + be removed in the OTP-R16 release.

+
+
diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 9f0cb681c0..962db8b831 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -201,7 +201,7 @@ type LINEBUF STANDARD SYSTEM line_buf type IOQ STANDARD SYSTEM io_queue type BITS_BUF STANDARD SYSTEM bits_buf type TMP_DIST_BUF TEMPORARY SYSTEM tmp_dist_buf -type ASYNC_Q LONG_LIVED SYSTEM async_queue +type ASYNC_DATA LONG_LIVED SYSTEM internal_async_data type ESTACK TEMPORARY SYSTEM estack type PORT_CALL_BUF TEMPORARY SYSTEM port_call_buf type DB_TABLE ETS ETS db_tab @@ -308,12 +308,6 @@ type ETHR_STD STANDARD SYSTEM ethread_standard type ETHR_SL SHORT_LIVED SYSTEM ethread_short_lived type ETHR_LL LONG_LIVED SYSTEM ethread_long_lived -+ifnot smp - -type ARCALLBACK LONG_LIVED SYSTEM async_ready_callback - -+endif - +endif +if shared_heap diff --git a/erts/emulator/beam/erl_async.c b/erts/emulator/beam/erl_async.c index 91b64411d4..2dc7237f7c 100644 --- a/erts/emulator/beam/erl_async.c +++ b/erts/emulator/beam/erl_async.c @@ -24,10 +24,18 @@ #include "erl_sys_driver.h" #include "global.h" #include "erl_threads.h" +#include "erl_thr_queue.h" +#include "erl_async.h" + +#define ERTS_MAX_ASYNC_READY_CALLS_IN_SEQ 20 + +#define ERTS_ASYNC_PRINT_JOB 0 + +#if !defined(ERTS_SMP) && defined(USE_THREADS) && !ERTS_USE_ASYNC_READY_Q +# error "Need async ready queue in non-smp case" +#endif typedef struct _erl_async { - struct _erl_async* next; - struct _erl_async* prev; DE_Handle* hndl; /* The DE_Handle is needed when port is gone */ Eterm port; long async_id; @@ -35,345 +43,498 @@ typedef struct _erl_async { ErlDrvPDL pdl; void (*async_invoke)(void*); void (*async_free)(void*); -} ErlAsync; +#if ERTS_USE_ASYNC_READY_Q + Uint sched_id; + union { + ErtsThrQPrepEnQ_t *prep_enq; + ErtsThrQFinDeQ_t fin_deq; + } q; +#endif +} ErtsAsync; + +#if ERTS_USE_ASYNC_READY_Q + +/* + * We can do without the enqueue mutex since it isn't needed for + * thread safety. Its only purpose is to put async threads to sleep + * during a blast of ready async jobs. This in order to reduce + * contention on the enqueue end of the async ready queues. During + * such a blast without the enqueue mutex much cpu time is consumed + * by the async threads without them doing much progress which in turn + * slow down progress of scheduler threads. + */ +#define ERTS_USE_ASYNC_READY_ENQ_MTX 1 + +#if ERTS_USE_ASYNC_READY_ENQ_MTX typedef struct { - erts_mtx_t mtx; - erts_cnd_t cv; - erts_tid_t thr; - int len; -#ifndef ERTS_SMP - int hndl; + erts_mtx_t enq_mtx; +} ErtsAsyncReadyQXData; + #endif - ErlAsync* head; - ErlAsync* tail; -#ifdef ERTS_ENABLE_LOCK_CHECK - int no; + +typedef struct { +#if ERTS_USE_ASYNC_READY_ENQ_MTX + union { + ErtsAsyncReadyQXData data; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE( + sizeof(ErtsAsyncReadyQXData))]; + } x; #endif -} AsyncQueue; + ErtsThrQ_t thr_q; + ErtsThrQFinDeQ_t fin_deq; +} ErtsAsyncReadyQ; -static erts_smp_spinlock_t async_id_lock; -static long async_id = 0; +typedef union { + ErtsAsyncReadyQ arq; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncReadyQ))]; +} ErtsAlgndAsyncReadyQ; -#ifndef ERTS_SMP +#endif /* ERTS_USE_ASYNC_READY_Q */ -erts_mtx_t async_ready_mtx; -static ErlAsync* async_ready_list = NULL; +typedef struct { + ErtsThrQ_t thr_q; + erts_tid_t thr_id; +} ErtsAsyncQ; + +typedef union { + ErtsAsyncQ aq; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncQ))]; +} ErtsAlgndAsyncQ; +typedef struct { + int no_initialized; + erts_mtx_t mtx; + erts_cnd_t cnd; + erts_atomic_t id; +} ErtsAsyncInit; + +typedef struct { + union { + ErtsAsyncInit data; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncInit))]; + } init; + ErtsAlgndAsyncQ *queue; +#if ERTS_USE_ASYNC_READY_Q + ErtsAlgndAsyncReadyQ *ready_queue; #endif +} ErtsAsyncData; -/* -** Initialize worker threads (if supported) -*/ +int erts_async_max_threads; /* Initialized by erl_init.c */ +int erts_async_thread_suggested_stack_size; /* Initialized by erl_init.c */ -/* Detach from driver */ -static void async_detach(DE_Handle* dh) -{ - return; -} +static ErtsAsyncData *async; +#ifndef USE_THREADS -#ifdef USE_THREADS +void +erts_init_async(void) +{ -static AsyncQueue* async_q; +} -static void* async_main(void*); -static void async_add(ErlAsync*, AsyncQueue*); +#else -#ifndef ERTS_SMP -typedef struct ErtsAsyncReadyCallback_ ErtsAsyncReadyCallback; -struct ErtsAsyncReadyCallback_ { - struct ErtsAsyncReadyCallback_ *next; - void (*callback)(void); -}; +static void *async_main(void *); -static ErtsAsyncReadyCallback *callbacks; -static int async_handle; +static ERTS_INLINE ErtsAsyncQ * +async_q(int i) +{ + return &async->queue[i].aq; +} + +#if ERTS_USE_ASYNC_READY_Q -int erts_register_async_ready_callback(void (*funcp)(void)) +static ERTS_INLINE ErtsAsyncReadyQ * +async_ready_q(Uint sched_id) { - ErtsAsyncReadyCallback *cb = erts_alloc(ERTS_ALC_T_ARCALLBACK, - sizeof(ErtsAsyncReadyCallback)); - cb->next = callbacks; - cb->callback = funcp; - erts_mtx_lock(&async_ready_mtx); - callbacks = cb; - erts_mtx_unlock(&async_ready_mtx); - return async_handle; + return &async->ready_queue[((int)sched_id)-1].arq; } + #endif -int init_async(int hndl) +void +erts_init_async(void) { - erts_thr_opts_t thr_opts = ERTS_THR_OPTS_DEFAULT_INITER; - AsyncQueue* q; - int i; + async = NULL; + if (erts_async_max_threads > 0) { +#if ERTS_USE_ASYNC_READY_Q + ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT; +#endif + erts_thr_opts_t thr_opts = ERTS_THR_OPTS_DEFAULT_INITER; + char *ptr; + size_t tot_size = 0; + int i; + + tot_size += ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncData)); + tot_size += sizeof(ErtsAlgndAsyncQ)*erts_async_max_threads; +#if ERTS_USE_ASYNC_READY_Q + tot_size += sizeof(ErtsAlgndAsyncReadyQ)*erts_no_schedulers; +#endif - thr_opts.detached = 0; - thr_opts.suggested_stack_size = erts_async_thread_suggested_stack_size; - -#ifndef ERTS_SMP - callbacks = NULL; - async_handle = hndl; - erts_mtx_init(&async_ready_mtx, "async_ready"); - async_ready_list = NULL; -#endif - - async_id = 0; - erts_smp_spinlock_init(&async_id_lock, "async_id"); - - async_q = q = (AsyncQueue*) - (erts_async_max_threads - ? erts_alloc(ERTS_ALC_T_ASYNC_Q, - erts_async_max_threads * sizeof(AsyncQueue)) - : NULL); - for (i = 0; i < erts_async_max_threads; i++) { - q->head = NULL; - q->tail = NULL; - q->len = 0; -#ifndef ERTS_SMP - q->hndl = hndl; -#endif -#ifdef ERTS_ENABLE_LOCK_CHECK - q->no = i; -#endif - erts_mtx_init(&q->mtx, "asyncq"); - erts_cnd_init(&q->cv); - erts_thr_create(&q->thr, async_main, (void*)q, &thr_opts); - q++; - } - return 0; -} + ptr = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_ASYNC_DATA, + tot_size); + async = (ErtsAsyncData *) ptr; + ptr += ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncData)); -int exit_async() -{ - int i; + async->init.data.no_initialized = 0; + erts_mtx_init(&async->init.data.mtx, "async_init_mtx"); + erts_cnd_init(&async->init.data.cnd); + erts_atomic_init_nob(&async->init.data.id, 0); - /* terminate threads */ - for (i = 0; i < erts_async_max_threads; i++) { - ErlAsync* a = (ErlAsync*) erts_alloc(ERTS_ALC_T_ASYNC, - sizeof(ErlAsync)); - a->port = NIL; - async_add(a, &async_q[i]); - } + async->queue = (ErtsAlgndAsyncQ *) ptr; + ptr += sizeof(ErtsAlgndAsyncQ)*erts_async_max_threads; - for (i = 0; i < erts_async_max_threads; i++) { - erts_thr_join(async_q[i].thr, NULL); - erts_mtx_destroy(&async_q[i].mtx); - erts_cnd_destroy(&async_q[i].cv); - } -#ifndef ERTS_SMP - erts_mtx_destroy(&async_ready_mtx); +#if ERTS_USE_ASYNC_READY_Q + + qinit.live.queue = ERTS_THR_Q_LIVE_LONG; + qinit.live.objects = ERTS_THR_Q_LIVE_SHORT; + qinit.notify = erts_notify_check_async_ready_queue; + + async->ready_queue = (ErtsAlgndAsyncReadyQ *) ptr; + ptr += sizeof(ErtsAlgndAsyncReadyQ)*erts_no_schedulers; + + for (i = 1; i <= erts_no_schedulers; i++) { + ErtsAsyncReadyQ *arq = async_ready_q(i); +#if ERTS_USE_ASYNC_READY_ENQ_MTX + erts_mtx_init(&arq->x.data.enq_mtx, "async_enq_mtx"); #endif - if (async_q) - erts_free(ERTS_ALC_T_ASYNC_Q, (void *) async_q); - return 0; + erts_thr_q_finalize_dequeue_state_init(&arq->fin_deq); + qinit.arg = (void *) (SWord) i; + erts_thr_q_initialize(&arq->thr_q, &qinit); + } + +#endif + + /* Create async threads... */ + + thr_opts.detached = 0; + thr_opts.suggested_stack_size + = erts_async_thread_suggested_stack_size; + + for (i = 0; i < erts_async_max_threads; i++) { + ErtsAsyncQ *aq = async_q(i); + erts_thr_create(&aq->thr_id, async_main, (void*) aq, &thr_opts); + } + + /* Wait for async threads to initialize... */ + + erts_mtx_lock(&async->init.data.mtx); + while (async->init.data.no_initialized != erts_async_max_threads) + erts_cnd_wait(&async->init.data.cnd, &async->init.data.mtx); + erts_mtx_unlock(&async->init.data.mtx); + + erts_mtx_destroy(&async->init.data.mtx); + erts_cnd_destroy(&async->init.data.cnd); + + } } +#if ERTS_USE_ASYNC_READY_Q -static void async_add(ErlAsync* a, AsyncQueue* q) +void * +erts_get_async_ready_queue(Uint sched_id) +{ + return (void *) async ? async_ready_q(sched_id) : NULL; +} + +#endif + +static ERTS_INLINE void async_add(ErtsAsync *a, ErtsAsyncQ* q) { if (is_internal_port(a->port)) { - ERTS_LC_ASSERT(erts_drvportid2port(a->port)); +#if ERTS_USE_ASYNC_READY_Q + ErtsAsyncReadyQ *arq = async_ready_q(a->sched_id); + a->q.prep_enq = erts_thr_q_prepare_enqueue(&arq->thr_q); +#endif /* make sure the driver will stay around */ - driver_lock_driver(internal_port_index(a->port)); + if (a->hndl) + erts_ddll_reference_referenced_driver(a->hndl); } - erts_mtx_lock(&q->mtx); +#if ERTS_ASYNC_PRINT_JOB + erts_fprintf(stderr, "-> %ld\n", a->async_id); +#endif - if (q->len == 0) { - q->head = a; - q->tail = a; - q->len = 1; - erts_cnd_signal(&q->cv); - } - else { /* no need to signal (since the worker is working) */ - a->next = q->head; - q->head->prev = a; - q->head = a; - q->len++; - } - erts_mtx_unlock(&q->mtx); + erts_thr_q_enqueue(&q->thr_q, a); } -static ErlAsync* async_get(AsyncQueue* q) +static ERTS_INLINE ErtsAsync *async_get(ErtsThrQ_t *q, + erts_tse_t *tse, + ErtsThrQPrepEnQ_t **prep_enq) { - ErlAsync* a; +#if ERTS_USE_ASYNC_READY_Q + int saved_fin_deq = 0; + ErtsThrQFinDeQ_t fin_deq; +#endif - erts_mtx_lock(&q->mtx); - while((a = q->tail) == NULL) { - erts_cnd_wait(&q->cv, &q->mtx); - } + while (1) { + ErtsAsync *a = (ErtsAsync *) erts_thr_q_dequeue(q); + if (a) { + +#if ERTS_USE_ASYNC_READY_Q + *prep_enq = a->q.prep_enq; + erts_thr_q_get_finalize_dequeue_data(q, &a->q.fin_deq); + if (saved_fin_deq) + erts_thr_q_append_finalize_dequeue_data(&a->q.fin_deq, &fin_deq); +#endif + + return a; + } + + if (ERTS_THR_Q_DIRTY != erts_thr_q_clean(q)) { + ErtsThrQFinDeQ_t tmp_fin_deq; + + erts_tse_reset(tse); + +#if ERTS_USE_ASYNC_READY_Q + chk_fin_deq: + if (erts_thr_q_get_finalize_dequeue_data(q, &tmp_fin_deq)) { + if (!saved_fin_deq) { + erts_thr_q_finalize_dequeue_state_init(&fin_deq); + saved_fin_deq = 1; + } + erts_thr_q_append_finalize_dequeue_data(&fin_deq, + &tmp_fin_deq); + } +#endif + + switch (erts_thr_q_inspect(q, 1)) { + case ERTS_THR_Q_DIRTY: + break; #ifdef ERTS_SMP - ASSERT(a && q->tail == a); + case ERTS_THR_Q_NEED_THR_PRGR: { + ErtsThrPrgrVal prgr = erts_thr_q_need_thr_progress(q); + erts_thr_progress_wakeup(NULL, prgr); + /* + * We do no dequeue finalizing in hope that a new async + * job will arrive before we are woken due to thread + * progress... + */ + erts_tse_wait(tse); + break; + } #endif - if (q->head == q->tail) { - q->head = q->tail = NULL; - q->len = 0; - } - else { - q->tail->prev->next = NULL; - q->tail = q->tail->prev; - q->len--; + case ERTS_THR_Q_CLEAN: + +#if ERTS_USE_ASYNC_READY_Q + if (saved_fin_deq) { + if (erts_thr_q_finalize_dequeue(&fin_deq)) + goto chk_fin_deq; + else + saved_fin_deq = 0; + } +#endif + + erts_tse_wait(tse); + break; + + default: + ASSERT(0); + break; + } + + } } - erts_mtx_unlock(&q->mtx); - return a; } - -static int async_del(long id) +static ERTS_INLINE void call_async_ready(ErtsAsync *a) { - int i; - /* scan all queue for an entry with async_id == 'id' */ - - for (i = 0; i < erts_async_max_threads; i++) { - ErlAsync* a; - erts_mtx_lock(&async_q[i].mtx); - - a = async_q[i].head; - while(a != NULL) { - if (a->async_id == id) { - if (a->prev != NULL) - a->prev->next = a->next; - else - async_q[i].head = a->next; - if (a->next != NULL) - a->next->prev = a->prev; - else - async_q[i].tail = a->prev; - async_q[i].len--; - erts_mtx_unlock(&async_q[i].mtx); - if (a->async_free != NULL) - a->async_free(a->async_data); - async_detach(a->hndl); - erts_free(ERTS_ALC_T_ASYNC, a); - return 1; - } - a = a->next; + Port *p = erts_id2port_sflgs(a->port, + NULL, + 0, + ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); + if (!p) { + if (a->async_free) + a->async_free(a->async_data); + } + else { + if (async_ready(p, a->async_data)) { + if (a->async_free) + a->async_free(a->async_data); } - erts_mtx_unlock(&async_q[i].mtx); + erts_port_release(p); } - return 0; + if (a->hndl) + erts_ddll_dereference_driver(a->hndl); } -static void* async_main(void* arg) +static ERTS_INLINE void async_reply(ErtsAsync *a, ErtsThrQPrepEnQ_t *prep_enq) { - AsyncQueue* q = (AsyncQueue*) arg; +#if ERTS_USE_ASYNC_READY_Q + ErtsAsyncReadyQ *arq; -#ifdef ERTS_ENABLE_LOCK_CHECK - { - char buf[27]; - erts_snprintf(&buf[0], 27, "async %d", q->no); - erts_lc_set_thread_name(&buf[0]); - } + if (a->pdl) + driver_pdl_dec_refc(a->pdl); + +#if ERTS_ASYNC_PRINT_JOB + erts_fprintf(stderr, "=>> %ld\n", a->async_id); #endif - while(1) { - ErlAsync* a = async_get(q); + arq = async_ready_q(a->sched_id); - if (a->port == NIL) { /* TIME TO DIE SIGNAL */ - erts_free(ERTS_ALC_T_ASYNC, (void *) a); - break; - } - else { - (*a->async_invoke)(a->async_data); - /* Major problem if the code for async_invoke - or async_free is removed during a blocking operation */ +#if ERTS_USE_ASYNC_READY_ENQ_MTX + erts_mtx_lock(&arq->x.data.enq_mtx); +#endif + + erts_thr_q_enqueue_prepared(&arq->thr_q, (void *) a, prep_enq); + +#if ERTS_USE_ASYNC_READY_ENQ_MTX + erts_mtx_unlock(&arq->x.data.enq_mtx); +#endif + +#else /* ERTS_USE_ASYNC_READY_Q */ + + call_async_ready(a); + if (a->pdl) + driver_pdl_dec_refc(a->pdl); + erts_free(ERTS_ALC_T_ASYNC, (void *) a); + +#endif /* ERTS_USE_ASYNC_READY_Q */ +} + + +static void +async_wakeup(void *vtse) +{ + erts_tse_set((erts_tse_t *) vtse); +} + +static erts_tse_t *async_thread_init(ErtsAsyncQ *aq) +{ + ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT; + erts_tse_t *tse = erts_tse_fetch(); #ifdef ERTS_SMP - { - Port *p; - p = erts_id2port_sflgs(a->port, - NULL, - 0, - ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); - if (!p) { - if (a->async_free) - (*a->async_free)(a->async_data); - } - else { - if (async_ready(p, a->async_data)) { - if (a->async_free) - (*a->async_free)(a->async_data); - } - async_detach(a->hndl); - erts_port_release(p); - } - if (a->pdl) { - driver_pdl_dec_refc(a->pdl); - } - erts_free(ERTS_ALC_T_ASYNC, (void *) a); - } -#else - if (a->pdl) { - driver_pdl_dec_refc(a->pdl); - } - erts_mtx_lock(&async_ready_mtx); - a->next = async_ready_list; - async_ready_list = a; - erts_mtx_unlock(&async_ready_mtx); - sys_async_ready(q->hndl); + ErtsThrPrgrCallbacks callbacks; + + callbacks.arg = (void *) tse; + callbacks.wakeup = async_wakeup; + callbacks.prepare_wait = NULL; + callbacks.wait = NULL; + + erts_thr_progress_register_unmanaged_thread(&callbacks); #endif - } - } - return NULL; + qinit.live.queue = ERTS_THR_Q_LIVE_LONG; + qinit.live.objects = ERTS_THR_Q_LIVE_SHORT; + qinit.arg = (void *) tse; + qinit.notify = async_wakeup; +#if ERTS_USE_ASYNC_READY_Q + qinit.auto_finalize_dequeue = 0; +#endif + + erts_thr_q_initialize(&aq->thr_q, &qinit); + + /* Inform main thread that we are done initializing... */ + erts_mtx_lock(&async->init.data.mtx); + async->init.data.no_initialized++; + erts_cnd_signal(&async->init.data.cnd); + erts_mtx_unlock(&async->init.data.mtx); + + return tse; } +static void *async_main(void* arg) +{ + ErtsAsyncQ *aq = (ErtsAsyncQ *) arg; + erts_tse_t *tse = async_thread_init(aq); + + while (1) { + ErtsThrQPrepEnQ_t *prep_enq; + ErtsAsync *a = async_get(&aq->thr_q, tse, &prep_enq); + if (is_nil(a->port)) + break; /* Time to die */ +#if ERTS_ASYNC_PRINT_JOB + erts_fprintf(stderr, "<- %ld\n", a->async_id); #endif -#ifndef ERTS_SMP + a->async_invoke(a->async_data); + + async_reply(a, prep_enq); + } + + return NULL; +} + +#endif /* USE_THREADS */ -int check_async_ready(void) +void +erts_exit_flush_async(void) { #ifdef USE_THREADS - ErtsAsyncReadyCallback *cbs; + int i; + ErtsAsync a; + a.port = NIL; + /* + * Terminate threads in order to flush queues. We do not + * bother to clean everything up since we are about to + * terminate the runtime system and a cleanup would only + * delay the termination. + */ + for (i = 0; i < erts_async_max_threads; i++) + async_add(&a, async_q(i)); + for (i = 0; i < erts_async_max_threads; i++) + erts_thr_join(async->queue[i].aq.thr_id, NULL); #endif - ErlAsync* a; - int count = 0; +} - erts_mtx_lock(&async_ready_mtx); - a = async_ready_list; - async_ready_list = NULL; -#ifdef USE_THREADS - cbs = callbacks; -#endif - erts_mtx_unlock(&async_ready_mtx); - - while(a != NULL) { - ErlAsync* a_next = a->next; - /* Every port not dead */ - Port *p = erts_id2port_sflgs(a->port, - NULL, - 0, - ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); - if (!p) { - if (a->async_free) - (*a->async_free)(a->async_data); - } - else { - count++; - if (async_ready(p, a->async_data)) { - if (a->async_free != NULL) - (*a->async_free)(a->async_data); - } - async_detach(a->hndl); - erts_port_release(p); +#if defined(USE_THREADS) && ERTS_USE_ASYNC_READY_Q + +int erts_check_async_ready(void *varq) +{ + ErtsAsyncReadyQ *arq = (ErtsAsyncReadyQ *) varq; + int res = 1; + int i; + + for (i = 0; i < ERTS_MAX_ASYNC_READY_CALLS_IN_SEQ; i++) { + ErtsAsync *a = (ErtsAsync *) erts_thr_q_dequeue(&arq->thr_q); + if (!a) { + res = 0; + break; } + +#if ERTS_ASYNC_PRINT_JOB + erts_fprintf(stderr, "<<= %ld\n", a->async_id); +#endif + erts_thr_q_append_finalize_dequeue_data(&arq->fin_deq, &a->q.fin_deq); + call_async_ready(a); erts_free(ERTS_ALC_T_ASYNC, (void *) a); - a = a_next; } -#ifdef USE_THREADS - for (; cbs; cbs = cbs->next) - (*cbs->callback)(); -#endif - return count; + + erts_thr_q_finalize_dequeue(&arq->fin_deq); + + return res; } +int erts_async_ready_clean(void *varq, void *val) +{ + ErtsAsyncReadyQ *arq = (ErtsAsyncReadyQ *) varq; + ErtsThrQCleanState_t cstate; + + cstate = erts_thr_q_clean(&arq->thr_q); + + if (erts_thr_q_finalize_dequeue(&arq->fin_deq)) + return ERTS_ASYNC_READY_DIRTY; + + switch (cstate) { + case ERTS_THR_Q_DIRTY: + return ERTS_ASYNC_READY_DIRTY; +#ifdef ERTS_SMP + case ERTS_THR_Q_NEED_THR_PRGR: + *((ErtsThrPrgrVal *) val) + = erts_thr_q_need_thr_progress(&arq->thr_q); + return ERTS_ASYNC_READY_NEED_THR_PRGR; #endif + case ERTS_THR_Q_CLEAN: + break; + } + return ERTS_ASYNC_READY_CLEAN; +} +#endif /* ** Schedule async_invoke on a worker thread @@ -393,19 +554,29 @@ long driver_async(ErlDrvPort ix, unsigned int* key, void (*async_invoke)(void*), void* async_data, void (*async_free)(void*)) { - ErlAsync* a = (ErlAsync*) erts_alloc(ERTS_ALC_T_ASYNC, sizeof(ErlAsync)); - Port* prt = erts_drvport2port(ix); + ErtsAsync* a; + Port* prt; long id; unsigned int qix; +#if ERTS_USE_ASYNC_READY_Q + Uint sched_id; + sched_id = erts_get_scheduler_id(); + if (!sched_id) + sched_id = 1; +#endif + prt = erts_drvport2port(ix); if (!prt) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - a->next = NULL; - a->prev = NULL; + a = (ErtsAsync*) erts_alloc(ERTS_ALC_T_ASYNC, sizeof(ErtsAsync)); + +#if ERTS_USE_ASYNC_READY_Q + a->sched_id = sched_id; +#endif a->hndl = (DE_Handle*)prt->drv_ptr->handle; a->port = prt->id; a->pdl = NULL; @@ -413,12 +584,16 @@ long driver_async(ErlDrvPort ix, unsigned int* key, a->async_invoke = async_invoke; a->async_free = async_free; - erts_smp_spin_lock(&async_id_lock); - async_id = (async_id + 1) & 0x7fffffff; - if (async_id == 0) - async_id++; - id = async_id; - erts_smp_spin_unlock(&async_id_lock); + if (!async) + id = 0; + else { + do { + id = erts_atomic_inc_read_nob(&async->init.data.id); + } while (id == 0); + if (id < 0) + id *= -1; + ASSERT(id > 0); + } a->async_id = id; @@ -437,7 +612,7 @@ long driver_async(ErlDrvPort ix, unsigned int* key, driver_pdl_inc_refc(prt->port_data_lock); a->pdl = prt->port_data_lock; } - async_add(a, &async_q[qix]); + async_add(a, async_q(qix)); return id; } #endif @@ -455,10 +630,16 @@ long driver_async(ErlDrvPort ix, unsigned int* key, int driver_async_cancel(unsigned int id) { -#ifdef USE_THREADS - if (erts_async_max_threads > 0) - return async_del(id); -#endif + /* + * Not supported anymore. Always fail (which is backward + * compatible). + * + * This functionality could be implemented again. However, + * it is (and always has been) completely useless since + * it doesn't give you any guarantees whatsoever. The user + * needs to (and always have had to) synchronize in his/her + * own code in order to get any guarantees. + */ return 0; } diff --git a/erts/emulator/beam/erl_async.h b/erts/emulator/beam/erl_async.h new file mode 100644 index 0000000000..95374a8fc9 --- /dev/null +++ b/erts/emulator/beam/erl_async.h @@ -0,0 +1,66 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2011. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +#ifndef ERL_ASYNC_H__ +#define ERL_ASYNC_H__ + +#define ERTS_MAX_NO_OF_ASYNC_THREADS 1024 +extern int erts_async_max_threads; +#define ERTS_ASYNC_THREAD_MIN_STACK_SIZE 16 /* Kilo words */ +#define ERTS_ASYNC_THREAD_MAX_STACK_SIZE 8192 /* Kilo words */ +extern int erts_async_thread_suggested_stack_size; + +#ifdef USE_THREADS + +#ifdef ERTS_SMP +/* + * With smp support we can choose to have, or not to + * have an async ready queue. + */ +#define ERTS_USE_ASYNC_READY_Q 1 +#endif + +#ifndef ERTS_SMP +/* In non-smp case we *need* the async ready queue */ +# undef ERTS_USE_ASYNC_READY_Q +# define ERTS_USE_ASYNC_READY_Q 1 +#endif + +#ifndef ERTS_USE_ASYNC_READY_Q +# define ERTS_USE_ASYNC_READY_Q 0 +#endif + +#if ERTS_USE_ASYNC_READY_Q +int erts_check_async_ready(void *); +int erts_async_ready_clean(void *, void *); +void *erts_get_async_ready_queue(Uint sched_id); +#define ERTS_ASYNC_READY_CLEAN 0 +#define ERTS_ASYNC_READY_DIRTY 1 +#ifdef ERTS_SMP +#define ERTS_ASYNC_READY_NEED_THR_PRGR 2 +#endif +#endif /* ERTS_USE_ASYNC_READY_Q */ + +#endif /* USE_THREADS */ + +void erts_init_async(void); +void erts_exit_flush_async(void); + + +#endif /* ERL_ASYNC_H__ */ diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 58eb58d1dc..d4d5691f62 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -39,6 +39,7 @@ #include "dist.h" #include "erl_gc.h" #include "erl_cpu_topology.h" +#include "erl_async.h" #include "erl_thr_progress.h" #ifdef HIPE #include "hipe_arch.h" diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h index 401967a8de..ae0c9def90 100644 --- a/erts/emulator/beam/erl_driver.h +++ b/erts/emulator/beam/erl_driver.h @@ -28,6 +28,14 @@ # include "config.h" #endif +#define ERL_DRV_DEPRECATED_FUNC +#ifdef __GNUC__ +# if __GNUC__ >= 3 +# undef ERL_DRV_DEPRECATED_FUNC +# define ERL_DRV_DEPRECATED_FUNC __attribute__((deprecated)) +# endif +#endif + #ifdef SIZEOF_CHAR # define SIZEOF_CHAR_SAVED__ SIZEOF_CHAR # undef SIZEOF_CHAR @@ -582,8 +590,11 @@ EXTERN long driver_async(ErlDrvPort ix, void* async_data, void (*async_free)(void*)); - -EXTERN int driver_async_cancel(unsigned int key); +/* + * driver_async_cancel() is deprecated. It is scheduled for removal + * in OTP-R16. For more information see the erl_driver(3) documentation. + */ +EXTERN int driver_async_cancel(unsigned int key) ERL_DRV_DEPRECATED_FUNC; /* Locks the driver in the machine "forever", there is no unlock function. Note that this is almost never useful, as an open diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index 5fe44afdce..98b997583c 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -44,6 +44,7 @@ #include "erl_cpu_topology.h" #include "erl_thr_progress.h" #include "erl_thr_queue.h" +#include "erl_async.h" #ifdef HIPE #include "hipe_mode_switch.h" /* for hipe_mode_switch_init() */ @@ -101,8 +102,6 @@ int erts_backtrace_depth; /* How many functions to show in a backtrace * in error codes. */ -int erts_async_max_threads; /* number of threads for async support */ -int erts_async_thread_suggested_stack_size; erts_smp_atomic32_t erts_max_gen_gcs; Eterm erts_error_logger_warnings; /* What to map warning logs to, am_error, @@ -279,6 +278,7 @@ erl_init(int ncpu) erts_init_node_tables(); init_dist(); erl_drv_thr_init(); + erts_init_async(); init_io(); init_copy(); init_load(); @@ -605,6 +605,8 @@ early_init(int *argc, char **argv) /* int max_main_threads; int max_reader_groups; int reader_groups; + char envbuf[21]; /* enough for any 64-bit integer */ + size_t envbufsz; use_multi_run_queue = 1; erts_printf_eterm_func = erts_printf_term; @@ -676,6 +678,16 @@ early_init(int *argc, char **argv) /* schdlrs = no_schedulers; schdlrs_onln = no_schedulers_online; + envbufsz = sizeof(envbuf); + + /* erts_sys_getenv() not initialized yet; need erts_sys_getenv__() */ + if (erts_sys_getenv__("ERL_THREAD_POOL_SIZE", envbuf, &envbufsz) == 0) + erts_async_max_threads = atoi(envbuf); + else + erts_async_max_threads = 0; + if (erts_async_max_threads > ERTS_MAX_NO_OF_ASYNC_THREADS) + erts_async_max_threads = ERTS_MAX_NO_OF_ASYNC_THREADS; + if (argc && argv) { int i = 1; while (i < *argc) { @@ -703,6 +715,20 @@ early_init(int *argc, char **argv) /* } break; } + case 'A': { + /* set number of threads in thread pool */ + char *arg = get_arg(argv[i]+2, argv[i+1], &i); + if (((erts_async_max_threads = atoi(arg)) < 0) || + (erts_async_max_threads > ERTS_MAX_NO_OF_ASYNC_THREADS)) { + erts_fprintf(stderr, + "bad number of async threads %s\n", + arg); + erts_usage(); + VERBOSE(DEBUG_SYSTEM, ("using %d async-threads\n", + erts_async_max_threads)); + } + break; + } case 'S' : { int tot, onln; char *arg = get_arg(argv[i]+2, argv[i+1], &i); @@ -783,9 +809,12 @@ early_init(int *argc, char **argv) /* * ** Aux thread (see erl_process.c) * ** Sys message dispatcher thread (see erl_trace.c) * - * * No unmanaged threads that need to register. + * * Unmanaged threads that need to register: + * ** Async threads (see erl_async.c) */ - erts_thr_progress_init(no_schedulers, no_schedulers+1, 0); + erts_thr_progress_init(no_schedulers, + no_schedulers+2, + erts_async_max_threads); #endif erts_thr_q_init(); erts_init_utils(); @@ -867,7 +896,6 @@ erl_start(int argc, char **argv) int have_break_handler = 1; char envbuf[21]; /* enough for any 64-bit integer */ size_t envbufsz; - int async_max_threads = erts_async_max_threads; int ncpu = early_init(&argc, argv); envbufsz = sizeof(envbuf); @@ -883,11 +911,6 @@ erl_start(int argc, char **argv) (erts_aint32_t) max_gen_gcs); } - envbufsz = sizeof(envbuf); - if (erts_sys_getenv("ERL_THREAD_POOL_SIZE", envbuf, &envbufsz) == 0) { - async_max_threads = atoi(envbuf); - } - #if (defined(__APPLE__) && defined(__MACH__)) || defined(__DARWIN__) /* * The default stack size on MacOS X is too small for pcre. @@ -1315,17 +1338,8 @@ erl_start(int argc, char **argv) break; } - case 'A': - /* set number of threads in thread pool */ - arg = get_arg(argv[i]+2, argv[i+1], &i); - if (((async_max_threads = atoi(arg)) < 0) || - (async_max_threads > ERTS_MAX_NO_OF_ASYNC_THREADS)) { - erts_fprintf(stderr, "bad number of async threads %s\n", arg); - erts_usage(); - } - - VERBOSE(DEBUG_SYSTEM, ("using %d async-threads\n", - async_max_threads)); + case 'A': /* Was handled in early init just read past it */ + (void) get_arg(argv[i]+2, argv[i+1], &i); break; case 'a': @@ -1414,10 +1428,6 @@ erl_start(int argc, char **argv) i++; } -#ifdef USE_THREADS - erts_async_max_threads = async_max_threads; -#endif - /* Delayed check of +P flag */ if (erts_max_processes < ERTS_MIN_PROCESSES || erts_max_processes > ERTS_MAX_PROCESSES @@ -1463,6 +1473,10 @@ erl_start(int argc, char **argv) erts_sys_main_thread(); /* May or may not return! */ #else erts_thr_set_main_status(1, 1); +#if ERTS_USE_ASYNC_READY_Q + erts_get_scheduler_data()->aux_work_data.async_ready.queue + = erts_get_async_ready_queue(1); +#endif set_main_stack_size(); process_main(); #endif @@ -1535,14 +1549,7 @@ system_cleanup(int exit_code) erts_cleanup_incgc(); #endif -#if defined(USE_THREADS) - exit_async(); -#endif - - /* - * A lot more cleaning could/should have been done... - */ - + erts_exit_flush_async(); } /* diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c index 633be0ef58..44da6b6c51 100644 --- a/erts/emulator/beam/erl_lock_check.c +++ b/erts/emulator/beam/erl_lock_check.c @@ -109,10 +109,6 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "export_tab", NULL }, { "fun_tab", NULL }, { "environ", NULL }, -#endif - { "asyncq", "address" }, -#ifndef ERTS_SMP - { "async_ready", NULL }, #endif { "efile_drv", "address" }, #if defined(ENABLE_CHILD_WAITER_THREAD) || defined(ERTS_SMP) @@ -138,6 +134,7 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "alcu_init_atoms", NULL }, { "mseg_init_atoms", NULL }, { "drv_tsd", NULL }, + { "async_enq_mtx", NULL }, #ifdef ERTS_SMP { "sys_msg_q", NULL }, { "atom_tab", NULL }, @@ -173,12 +170,12 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "timeofday", NULL }, { "breakpoints", NULL }, { "pollsets_lock", NULL }, - { "async_id", NULL }, { "pix_lock", "address" }, { "run_queues_lists", NULL }, { "sched_stat", NULL }, { "run_queue_sleep_list", "address" }, #endif + { "async_init_mtx", NULL }, #ifdef ERTS_SMP { "proc_lck_qs_alloc", NULL }, #endif diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 4292522fba..9db2e874c6 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -41,6 +41,7 @@ #include "erl_cpu_topology.h" #include "erl_thr_progress.h" #include "erl_thr_queue.h" +#include "erl_async.h" #define ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED (2000*CONTEXT_REDS) #define ERTS_RUNQ_CALL_CHECK_BALANCE_REDS \ @@ -364,6 +365,12 @@ dbg_chk_aux_work_val(erts_aint32_t value) #ifdef ERTS_SSI_AUX_WORK_MISC_THR_PRGR valid |= ERTS_SSI_AUX_WORK_MISC_THR_PRGR; #endif +#ifdef ERTS_SSI_AUX_WORK_ASYNC_READY + valid |= ERTS_SSI_AUX_WORK_ASYNC_READY; +#endif +#ifdef ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN + valid |= ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN; +#endif #ifdef ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM valid |= ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM; @@ -870,6 +877,73 @@ erts_schedule_multi_misc_aux_work(int ignore_self, } } +#if ERTS_USE_ASYNC_READY_Q + +void +erts_notify_check_async_ready_queue(void *vno) +{ + int ix = ((int) (SWord) vno) -1; + set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(ix), + ERTS_SSI_AUX_WORK_ASYNC_READY); +} + +static erts_aint32_t +handle_async_ready(ErtsAuxWorkData *awdp, + erts_aint32_t aux_work) +{ + ErtsSchedulerSleepInfo *ssi = awdp->ssi; + unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY); + if (erts_check_async_ready(awdp->async_ready.queue)) { + if (set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY) + & ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN) { + unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN); + aux_work &= ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN; + } + return aux_work; + } +#ifdef ERTS_SMP + awdp->async_ready.need_thr_prgr = 0; +#endif + set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN); + return ((aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY) + | ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN); +} + +static erts_aint32_t +handle_async_ready_clean(ErtsAuxWorkData *awdp, + erts_aint32_t aux_work) +{ + void *thr_prgr_p; + +#ifdef ERTS_SMP + if (awdp->async_ready.need_thr_prgr + && !erts_thr_progress_has_reached(awdp->misc.thr_prgr)) { + return aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN; + } + + awdp->async_ready.need_thr_prgr = 0; + thr_prgr_p = (void *) &awdp->async_ready.thr_prgr; +#else + thr_prgr_p = NULL; +#endif + + switch (erts_async_ready_clean(awdp->async_ready.queue, thr_prgr_p)) { + case ERTS_ASYNC_READY_CLEAN: + unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN); + return aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN; +#ifdef ERTS_SMP + case ERTS_ASYNC_READY_NEED_THR_PRGR: + erts_thr_progress_wakeup(awdp->esdp, + awdp->async_ready.thr_prgr); + awdp->async_ready.need_thr_prgr = 1; +#endif + default: + return aux_work; + } +} + +#endif + static erts_aint32_t handle_fix_alloc(ErtsAuxWorkData *awdp, erts_aint32_t aux_work) { @@ -1108,6 +1182,16 @@ handle_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t aux_work) aux_work = handle_misc_aux_work(awdp, aux_work); ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); } +#if ERTS_USE_ASYNC_READY_Q + if (aux_work & ERTS_SSI_AUX_WORK_ASYNC_READY) { + aux_work = handle_async_ready(awdp, aux_work); + ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); + } + if (aux_work & ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN) { + aux_work = handle_async_ready_clean(awdp, aux_work); + ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); + } +#endif #ifdef ERTS_SMP_SCHEDULERS_NEED_TO_CHECK_CHILDREN if (aux_work & ERTS_SSI_AUX_WORK_CHECK_CHILDREN) { aux_work = handle_check_children(awdp, aux_work); @@ -3237,6 +3321,13 @@ init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp) awdp->dd.completed_callback = NULL; awdp->dd.completed_arg = NULL; #endif +#ifdef ERTS_USE_ASYNC_READY_Q +#ifdef ERTS_SMP + awdp->async_ready.need_thr_prgr = 0; + awdp->async_ready.thr_prgr = ERTS_THR_PRGR_VAL_WAITING; +#endif + awdp->async_ready.queue = NULL; +#endif } void @@ -4448,6 +4539,9 @@ sched_thread_func(void *vesdp) #if HAVE_ERTS_MSEG erts_mseg_late_init(); #endif +#if ERTS_USE_ASYNC_READY_Q + esdp->aux_work_data.async_ready.queue = erts_get_async_ready_queue(no); +#endif erts_sched_init_check_cpu_bind(esdp); diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 895f5ae3c0..0c8204b4ce 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -54,6 +54,7 @@ typedef struct process Process; #include "erl_atom_table.h" #include "external.h" #include "erl_mseg.h" +#include "erl_async.h" #ifdef HIPE #include "hipe_process.h" @@ -256,11 +257,13 @@ typedef enum { #endif #define ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM (((erts_aint32_t) 1) << 4) #define ERTS_SSI_AUX_WORK_FIX_ALLOC_DEALLOC (((erts_aint32_t) 1) << 5) +#define ERTS_SSI_AUX_WORK_ASYNC_READY (((erts_aint32_t) 1) << 6) +#define ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN (((erts_aint32_t) 1) << 7) #ifdef ERTS_SMP -#define ERTS_SSI_AUX_WORK_DD (((erts_aint32_t) 1) << 6) -#define ERTS_SSI_AUX_WORK_DD_THR_PRGR (((erts_aint32_t) 1) << 7) +#define ERTS_SSI_AUX_WORK_DD (((erts_aint32_t) 1) << 8) +#define ERTS_SSI_AUX_WORK_DD_THR_PRGR (((erts_aint32_t) 1) << 9) #endif -#define ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK (((erts_aint32_t) 1) << 8) +#define ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK (((erts_aint32_t) 1) << 10) #if !HAVE_ERTS_MSEG # undef ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK @@ -418,6 +421,15 @@ typedef struct { void (*completed_arg)(void *); } dd; #endif +#ifdef ERTS_USE_ASYNC_READY_Q + struct { +#ifdef ERTS_SMP + int need_thr_prgr; + ErtsThrPrgrVal thr_prgr; +#endif + void *queue; + } async_ready; +#endif } ErtsAuxWorkData; struct ErtsSchedulerData_ { @@ -1101,6 +1113,9 @@ void erts_start_schedulers(void); void erts_alloc_notify_delayed_dealloc(int); void erts_smp_notify_check_children_needed(void); #endif +#if ERTS_USE_ASYNC_READY_Q +void erts_notify_check_async_ready_queue(void *); +#endif void erts_schedule_misc_aux_work(int sched_id, void (*func)(void *), void *arg); diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index 6687e02485..ba0b96870e 100644 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -41,12 +41,6 @@ typedef struct port Port; #include "erl_port_task.h" -#define ERTS_MAX_NO_OF_ASYNC_THREADS 1024 -extern int erts_async_max_threads; -#define ERTS_ASYNC_THREAD_MIN_STACK_SIZE 16 /* Kilo words */ -#define ERTS_ASYNC_THREAD_MAX_STACK_SIZE 8192 /* Kilo words */ -extern int erts_async_thread_suggested_stack_size; - typedef struct erts_driver_t_ erts_driver_t; #define SMALL_IO_QUEUE 5 /* Number of fixed elements */ diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 151c776a3d..fff720634d 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -42,6 +42,7 @@ #include "erl_bits.h" #include "erl_version.h" #include "error.h" +#include "erl_async.h" extern ErlDrvEntry fd_driver_entry; extern ErlDrvEntry vanilla_driver_entry; @@ -4579,7 +4580,10 @@ int driver_lock_driver(ErlDrvPort ix) erts_smp_mtx_lock(&erts_driver_list_lock); - if (prt == NULL) return -1; + if (prt == NULL) { + erts_smp_mtx_unlock(&erts_driver_list_lock); + return -1; + } ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); if ((dh = (DE_Handle*)prt->drv_ptr->handle ) == NULL) { diff --git a/erts/emulator/beam/sys.h b/erts/emulator/beam/sys.h index b63fe98f27..f9cbcc5892 100644 --- a/erts/emulator/beam/sys.h +++ b/erts/emulator/beam/sys.h @@ -475,15 +475,6 @@ __decl_noreturn void __noreturn erl_exit(int n, char*, ...); #define ERTS_ABORT_EXIT (INT_MIN + 1) /* no crash dump; only abort() */ #define ERTS_DUMP_EXIT (127) /* crash dump; then exit() */ - -#ifndef ERTS_SMP -int check_async_ready(void); -#ifdef USE_THREADS -void sys_async_ready(int hndl); -int erts_register_async_ready_callback(void (*funcp)(void)); -#endif -#endif - Eterm erts_check_io_info(void *p); /* Size of misc memory allocated from system dependent code */ @@ -671,6 +662,8 @@ int erts_sys_putenv(char *key_value, int sep_ix); *size), a value > 0 if value buffer is too small (*size is set to needed size), and a value < 0 on failure. */ int erts_sys_getenv(char *key, char *value, size_t *size); +/* erts_sys_getenv__() is only allowed to be used in early init phase */ +int erts_sys_getenv__(char *key, char *value, size_t *size); /* Easier to use, but not as efficient, environment functions */ char *erts_read_env(char *key); diff --git a/erts/emulator/sys/unix/sys.c b/erts/emulator/sys/unix/sys.c index 8e8d4cce61..7f851e6007 100644 --- a/erts/emulator/sys/unix/sys.c +++ b/erts/emulator/sys/unix/sys.c @@ -128,7 +128,6 @@ static ErtsSysReportExit *report_exit_list; static ErtsSysReportExit *report_exit_transit_list; #endif -extern int check_async_ready(void); extern int driver_interrupt(int, int); extern void do_break(void); @@ -1125,31 +1124,6 @@ struct erl_drv_entry vanilla_driver_entry = { stop_select }; -#if defined(USE_THREADS) && !defined(ERTS_SMP) -static int async_drv_init(void); -static ErlDrvData async_drv_start(ErlDrvPort, char*, SysDriverOpts*); -static void async_drv_stop(ErlDrvData); -static void async_drv_input(ErlDrvData, ErlDrvEvent); - -/* INTERNAL use only */ - -struct erl_drv_entry async_driver_entry = { - async_drv_init, - async_drv_start, - async_drv_stop, - NULL, - async_drv_input, - NULL, - "async", - NULL, - NULL, - NULL, - NULL, - NULL, - NULL -}; -#endif - /* Handle SIGCHLD signals. */ #if (defined(SIG_SIGSET) || defined(SIG_SIGNAL)) static RETSIGTYPE onchld(void) @@ -2334,87 +2308,6 @@ static void stop_select(ErlDrvEvent fd, void* _) close((int)fd); } -/* -** Async opertation support -*/ -#if defined(USE_THREADS) && !defined(ERTS_SMP) -static void -sys_async_ready_failed(int fd, int r, int err) -{ - char buf[120]; - sprintf(buf, "sys_async_ready(): Fatal error: fd=%d, r=%d, errno=%d\n", - fd, r, err); - erts_silence_warn_unused_result(write(2, buf, strlen(buf))); - abort(); -} - -/* called from threads !! */ -void sys_async_ready(int fd) -{ - int r; - while (1) { - r = write(fd, "0", 1); /* signal main thread fd MUST be async_fd[1] */ - if (r == 1) { - DEBUGF(("sys_async_ready(): r = 1\r\n")); - break; - } - if (r < 0 && errno == EINTR) { - DEBUGF(("sys_async_ready(): r = %d\r\n", r)); - continue; - } - sys_async_ready_failed(fd, r, errno); - } -} - -static int async_drv_init(void) -{ - async_fd[0] = -1; - async_fd[1] = -1; - return 0; -} - -static ErlDrvData async_drv_start(ErlDrvPort port_num, - char* name, SysDriverOpts* opts) -{ - if (async_fd[0] != -1) - return ERL_DRV_ERROR_GENERAL; - if (pipe(async_fd) < 0) - return ERL_DRV_ERROR_GENERAL; - - DEBUGF(("async_drv_start: %d\r\n", port_num)); - - SET_NONBLOCKING(async_fd[0]); - driver_select(port_num, async_fd[0], ERL_DRV_READ, 1); - - if (init_async(async_fd[1]) < 0) - return ERL_DRV_ERROR_GENERAL; - return (ErlDrvData)port_num; -} - -static void async_drv_stop(ErlDrvData e) -{ - int port_num = (int)(long)e; - - DEBUGF(("async_drv_stop: %d\r\n", port_num)); - - exit_async(); - - driver_select(port_num, async_fd[0], ERL_DRV_READ, 0); - - close(async_fd[0]); - close(async_fd[1]); - async_fd[0] = async_fd[1] = -1; -} - - -static void async_drv_input(ErlDrvData e, ErlDrvEvent fd) -{ - char *buf[32]; - DEBUGF(("async_drv_input\r\n")); - while (read((int) fd, (void *) buf, 32) > 0); /* fd MUST be async_fd[0] */ - check_async_ready(); /* invoke all async_ready */ -} -#endif void erts_do_break_handling(void) { @@ -2488,12 +2381,10 @@ erts_sys_putenv(char *buffer, int sep_ix) } int -erts_sys_getenv(char *key, char *value, size_t *size) +erts_sys_getenv__(char *key, char *value, size_t *size) { - char *orig_value; int res; - erts_smp_rwmtx_rlock(&environ_rwmtx); - orig_value = getenv(key); + char *orig_value = getenv(key); if (!orig_value) res = -1; else { @@ -2508,6 +2399,15 @@ erts_sys_getenv(char *key, char *value, size_t *size) res = 0; } } + return res; +} + +int +erts_sys_getenv(char *key, char *value, size_t *size) +{ + int res; + erts_smp_rwmtx_rlock(&environ_rwmtx); + res = erts_sys_getenv__(key, value, size); erts_smp_rwmtx_runlock(&environ_rwmtx); return res; } @@ -2519,31 +2419,6 @@ sys_init_io(void) erts_alloc(ERTS_ALC_T_FD_TAB, max_files * sizeof(struct fd_data)); erts_smp_atomic_add_nob(&sys_misc_mem_sz, max_files * sizeof(struct fd_data)); - -#ifdef USE_THREADS -#ifdef ERTS_SMP - if (init_async(-1) < 0) - erl_exit(1, "Failed to initialize async-threads\n"); -#else - { - /* This is speical stuff, starting a driver from the - * system routines, but is a nice way of handling stuff - * the erlang way - */ - SysDriverOpts dopts; - int ret; - - sys_memset((void*)&dopts, 0, sizeof(SysDriverOpts)); - add_driver_entry(&async_driver_entry); - ret = erts_open_driver(NULL, NIL, "async", &dopts, NULL); - DEBUGF(("open_driver = %d\n", ret)); - if (ret < 0) - erl_exit(1, "Failed to open async driver\n"); - erts_port[ret].status |= ERTS_PORT_SFLG_IMMORTAL; - } -#endif -#endif - } #if (0) /* unused? */ @@ -2770,15 +2645,7 @@ initiate_report_exit_status(ErtsSysReportExit *rep, int status) rep->next = report_exit_transit_list; rep->status = status; report_exit_transit_list = rep; - /* - * We need the scheduler thread to call check_children(). - * If the scheduler thread is sleeping in a poll with a - * timeout, we need to wake the scheduler thread. We use the - * functionality of the async driver to do this, instead of - * implementing yet another driver doing the same thing. A - * little bit ugly, but it works... - */ - sys_async_ready(async_fd[1]); + erts_sys_schedule_interrupt(1); } static int check_children(void) @@ -2865,19 +2732,11 @@ erl_sys_schedule(int runnable) { #ifdef ERTS_SMP ERTS_CHK_IO(!runnable); - ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); #else - if (runnable) { - ERTS_CHK_IO(0); /* Poll for I/O */ - check_async_ready(); /* Check async completions */ - } else { - int wait_for_io = !check_async_ready(); - if (wait_for_io) - wait_for_io = !check_children(); - ERTS_CHK_IO(wait_for_io); - } - (void) check_children(); + ERTS_CHK_IO(runnable ? 0 : !check_children()); #endif + ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); + (void) check_children(); } diff --git a/erts/emulator/sys/vxworks/sys.c b/erts/emulator/sys/vxworks/sys.c index 08c4f3f4e5..fc7e6cec08 100644 --- a/erts/emulator/sys/vxworks/sys.c +++ b/erts/emulator/sys/vxworks/sys.c @@ -1520,6 +1520,12 @@ erts_sys_getenv(char *key, char *value, size_t *size) return res; } +int +erts_sys_getenv__(char *key, char *value, size_t *size) +{ + return erts_sys_getenv(key, value, size); +} + void sys_init_io(void) { diff --git a/erts/emulator/sys/win32/sys.c b/erts/emulator/sys/win32/sys.c index ace1e1fca0..02d16b83a2 100644 --- a/erts/emulator/sys/win32/sys.c +++ b/erts/emulator/sys/win32/sys.c @@ -566,51 +566,6 @@ struct erl_drv_entry vanilla_driver_entry = { stop_select }; -#if defined(USE_THREADS) && !defined(ERTS_SMP) - -static int async_drv_init(void); -static ErlDrvData async_drv_start(ErlDrvPort, char*, SysDriverOpts*); -static void async_drv_stop(ErlDrvData); -static void async_drv_input(ErlDrvData, ErlDrvEvent); - -/* INTERNAL use only */ - -void null_output(ErlDrvData drv_data, char* buf, int len) -{ -} - -void null_ready_output(ErlDrvData drv_data, ErlDrvEvent event) -{ -} - -struct erl_drv_entry async_driver_entry = { - async_drv_init, - async_drv_start, - async_drv_stop, - null_output, - async_drv_input, - null_ready_output, - "async", - NULL, /* finish */ - NULL, /* handle */ - NULL, /* control */ - NULL, /* timeout */ - NULL, /* outputv */ - NULL, /* ready_async */ - NULL, /* flush */ - NULL, /* call */ - NULL, /* event */ - ERL_DRV_EXTENDED_MARKER, - ERL_DRV_EXTENDED_MAJOR_VERSION, - ERL_DRV_EXTENDED_MINOR_VERSION, - 0, /* ERL_DRV_FLAGs */ - NULL, - NULL, /* process_exit */ - stop_select -}; - -#endif - /* * Initialises a DriverData structure. * @@ -2825,30 +2780,6 @@ sys_init_io(void) We estimate the number to twice the amount of ports. We really dont know on windows, do we? */ max_files = 2*erts_max_ports; - -#ifdef USE_THREADS -#ifdef ERTS_SMP - if (init_async(-1) < 0) - erl_exit(1, "Failed to initialize async-threads\n"); -#else - { - /* This is special stuff, starting a driver from the - * system routines, but is a nice way of handling stuff - * the erlang way - */ - SysDriverOpts dopts; - int ret; - - sys_memset((void*)&dopts, 0, sizeof(SysDriverOpts)); - add_driver_entry(&async_driver_entry); - ret = erts_open_driver(NULL, NIL, "async", &dopts, NULL); - DEBUGF(("open_driver = %d\n", ret)); - if (ret < 0) - erl_exit(1, "Failed to open async driver\n"); - erts_port[ret].status |= ERTS_PORT_SFLG_IMMORTAL; - } -#endif -#endif } #ifdef ERTS_SMP @@ -3382,75 +3313,7 @@ erts_sys_schedule_interrupt_timed(int set, long msec) void erl_sys_schedule(int runnable) { -#ifdef ERTS_SMP erts_check_io(!runnable); ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); -#else - if (runnable) { - erts_check_io(0); /* Poll for I/O */ - check_async_ready(); /* Check async completions */ - } else { - erts_check_io(check_async_ready() ? 0 : 1); - } -#endif -} - -#if defined(USE_THREADS) && !defined(ERTS_SMP) -/* - * Async operation support. - */ - -static ErlDrvEvent async_drv_event; - -void -sys_async_ready(int fd) -{ - SetEvent((HANDLE)async_drv_event); } -static int -async_drv_init(void) -{ - async_drv_event = (ErlDrvEvent) NULL; - return 0; -} - -static ErlDrvData -async_drv_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts) -{ - if (async_drv_event != (ErlDrvEvent) NULL) { - return ERL_DRV_ERROR_GENERAL; - } - if ((async_drv_event = (ErlDrvEvent)CreateAutoEvent(FALSE)) == (ErlDrvEvent) NULL) { - return ERL_DRV_ERROR_GENERAL; - } - - driver_select(port_num, async_drv_event, ERL_DRV_READ|ERL_DRV_USE, 1); - if (init_async(async_drv_event) < 0) { - return ERL_DRV_ERROR_GENERAL; - } - return (ErlDrvData)port_num; -} - -static void -async_drv_stop(ErlDrvData port_num) -{ - exit_async(); - driver_select((ErlDrvPort)port_num, async_drv_event, ERL_DRV_READ|ERL_DRV_USE, 0); - /*CloseHandle((HANDLE)async_drv_event);*/ - async_drv_event = (ErlDrvEvent) NULL; -} - - -static void -async_drv_input(ErlDrvData port_num, ErlDrvEvent e) -{ - check_async_ready(); - - /* - * Our event is auto-resetting. - */ -} - -#endif - diff --git a/erts/emulator/sys/win32/sys_env.c b/erts/emulator/sys/win32/sys_env.c index 02c8433a10..7acc7f07ee 100644 --- a/erts/emulator/sys/win32/sys_env.c +++ b/erts/emulator/sys/win32/sys_env.c @@ -55,19 +55,17 @@ erts_sys_putenv(char *key_value, int sep_ix) } int -erts_sys_getenv(char *key, char *value, size_t *size) +erts_sys_getenv__(char *key, char *value, size_t *size) { size_t req_size = 0; int res = 0; DWORD new_size; - erts_smp_rwmtx_rlock(&environ_rwmtx); SetLastError(0); new_size = GetEnvironmentVariable((LPCTSTR) key, (LPTSTR) value, (DWORD) *size); res = !new_size && GetLastError() == ERROR_ENVVAR_NOT_FOUND ? -1 : 0; - erts_smp_rwmtx_runlock(&environ_rwmtx); if (res < 0) return res; res = new_size > *size ? 1 : 0; @@ -75,6 +73,16 @@ erts_sys_getenv(char *key, char *value, size_t *size) return res; } +int +erts_sys_getenv(char *key, char *value, size_t *size) +{ + int res; + erts_smp_rwmtx_rlock(&environ_rwmtx); + res = erts_sys_getenv__(key, value, size); + erts_smp_rwmtx_runlock(&environ_rwmtx); + return res; +} + struct win32_getenv_state { char *env; char *next; diff --git a/erts/emulator/test/driver_SUITE.erl b/erts/emulator/test/driver_SUITE.erl index bcb0257ed1..c07dbc5871 100644 --- a/erts/emulator/test/driver_SUITE.erl +++ b/erts/emulator/test/driver_SUITE.erl @@ -76,7 +76,8 @@ driver_select_use/1, thread_mseg_alloc_cache_clean/1, otp_9302/1, - thr_free_drv/1]). + thr_free_drv/1, + async_blast/1]). -export([bin_prefix/2]). @@ -145,7 +146,8 @@ all() -> smp_select, driver_select_use, thread_mseg_alloc_cache_clean, otp_9302, - thr_free_drv]. + thr_free_drv, + async_blast]. groups() -> [{timer, [], @@ -1911,17 +1913,30 @@ otp_9302(Config) when is_list(Config) -> ?line port_command(Port, ""), ?line {msg, block} = get_port_msg(Port, infinity), ?line {msg, job} = get_port_msg(Port, infinity), - ?line case erlang:system_info(thread_pool_size) of - 0 -> - {msg, cancel} = get_port_msg(Port, infinity); - _ -> - ok - end, - ?line {msg, job} = get_port_msg(Port, infinity), + ?line C = case erlang:system_info(thread_pool_size) of + 0 -> + ?line {msg, cancel} = get_port_msg(Port, infinity), + ?line {msg, job} = get_port_msg(Port, infinity), + ?line false; + _ -> + case get_port_msg(Port, infinity) of + {msg, cancel} -> %% Cancel always fail in Rel >= 15 + ?line {msg, job} = get_port_msg(Port, infinity), + ?line false; + {msg, job} -> + ?line ok, + ?line true + end + end, ?line {msg, end_of_jobs} = get_port_msg(Port, infinity), ?line no_msg = get_port_msg(Port, 2000), ?line port_close(Port), - ?line ok. + ?line case C of + true -> + ?line {comment, "Async job cancelled"}; + false -> + ?line {comment, "Async job not cancelled"} + end. thr_free_drv(Config) when is_list(Config) -> ?line Path = ?config(data_dir, Config), @@ -1954,6 +1969,48 @@ thr_free_drv_control(Port, N) -> % io:format("N=~p, SID=~p", [N, erlang:system_info(scheduler_id)]), thr_free_drv_control(Port, N+1) end. + +async_blast(Config) when is_list(Config) -> + ?line Path = ?config(data_dir, Config), + ?line erl_ddll:start(), + ?line ok = load_driver(Path, async_blast_drv), + ?line SchedOnln = erlang:system_info(schedulers_online), + ?line MemBefore = driver_alloc_size(), + ?line Start = os:timestamp(), + ?line Blast = fun () -> + Port = open_port({spawn, async_blast_drv}, []), + true = is_port(Port), + port_command(Port, ""), + receive + {Port, done} -> + ok + end, + port_close(Port) + end, + ?line Ps = lists:map(fun (N) -> + spawn_opt(Blast, + [{scheduler, + (N rem SchedOnln)+ 1}, + monitor]) + end, + lists:seq(1, 100)), + ?line MemMid = driver_alloc_size(), + ?line lists:foreach(fun ({Pid, Mon}) -> + receive + {'DOWN',Mon,process,Pid,_} -> ok + end + end, Ps), + ?line End = os:timestamp(), + ?line MemAfter = driver_alloc_size(), + ?line io:format("MemBefore=~p, MemMid=~p, MemAfter=~p~n", + [MemBefore, MemMid, MemAfter]), + ?line AsyncBlastTime = timer:now_diff(End,Start)/1000000, + ?line io:format("AsyncBlastTime=~p~n", [AsyncBlastTime]), + ?line MemBefore = MemAfter, + ?line erlang:display({async_blast_time, AsyncBlastTime}), + ?line ok. + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Utilities diff --git a/erts/emulator/test/driver_SUITE_data/Makefile.src b/erts/emulator/test/driver_SUITE_data/Makefile.src index 62ab5169c0..dd48f6a0f7 100644 --- a/erts/emulator/test/driver_SUITE_data/Makefile.src +++ b/erts/emulator/test/driver_SUITE_data/Makefile.src @@ -13,7 +13,8 @@ MISC_DRVS = outputv_drv@dll@ \ missing_callback_drv@dll@ \ thr_alloc_drv@dll@ \ otp_9302_drv@dll@ \ - thr_free_drv@dll@ + thr_free_drv@dll@ \ + async_blast_drv@dll@ SYS_INFO_DRVS = sys_info_1_0_drv@dll@ \ sys_info_1_1_drv@dll@ \ diff --git a/erts/emulator/test/driver_SUITE_data/async_blast_drv.c b/erts/emulator/test/driver_SUITE_data/async_blast_drv.c new file mode 100644 index 0000000000..3821f7e3dc --- /dev/null +++ b/erts/emulator/test/driver_SUITE_data/async_blast_drv.c @@ -0,0 +1,124 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2011. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +#include "erl_driver.h" + +#define NO_ASYNC_JOBS 10000 + +static void stop(ErlDrvData drv_data); +static ErlDrvData start(ErlDrvPort port, + char *command); +static void output(ErlDrvData drv_data, + char *buf, int len); +static void ready_async(ErlDrvData drv_data, + ErlDrvThreadData thread_data); + +static ErlDrvEntry async_blast_drv_entry = { + NULL /* init */, + start, + stop, + output, + NULL /* ready_input */, + NULL /* ready_output */, + "async_blast_drv", + NULL /* finish */, + NULL /* handle */, + NULL /* control */, + NULL /* timeout */, + NULL /* outputv */, + ready_async, + NULL /* flush */, + NULL /* call */, + NULL /* event */, + ERL_DRV_EXTENDED_MARKER, + ERL_DRV_EXTENDED_MAJOR_VERSION, + ERL_DRV_EXTENDED_MINOR_VERSION, + ERL_DRV_FLAG_USE_PORT_LOCKING, + NULL /* handle2 */, + NULL /* handle_monitor */ +}; + +typedef struct { + ErlDrvPort port; + ErlDrvTermData caller; + int counter; +} async_blast_data_t; + + +DRIVER_INIT(async_blast_drv) +{ + return &async_blast_drv_entry; +} + +static void stop(ErlDrvData drv_data) +{ + driver_free((void *) drv_data); +} + +static ErlDrvData start(ErlDrvPort port, + char *command) +{ + async_blast_data_t *abd; + + abd = driver_alloc(sizeof(async_blast_data_t)); + if (!abd) + return ERL_DRV_ERROR_GENERAL; + + abd->port = port; + abd->counter = 0; + return (ErlDrvData) abd; +} + +static void async_invoke(void *data) +{ + +} +#include + +static void ready_async(ErlDrvData drv_data, + ErlDrvThreadData thread_data) +{ + async_blast_data_t *abd = (async_blast_data_t *) drv_data; + if (--abd->counter == 0) { + ErlDrvTermData spec[] = { + ERL_DRV_PORT, driver_mk_port(abd->port), + ERL_DRV_ATOM, driver_mk_atom("done"), + ERL_DRV_TUPLE, 2 + }; + driver_send_term(abd->port, abd->caller, + spec, sizeof(spec)/sizeof(spec[0])); + } +} + +static void output(ErlDrvData drv_data, + char *buf, int len) +{ + async_blast_data_t *abd = (async_blast_data_t *) drv_data; + if (abd->counter == 0) { + int i; + abd->caller = driver_caller(abd->port); + abd->counter = NO_ASYNC_JOBS; + for (i = 0; i < NO_ASYNC_JOBS; i++) { + if (0 > driver_async(abd->port, NULL, async_invoke, NULL, NULL)) { + driver_failure_atom(abd->port, "driver_async_failed"); + break; + } + } + } +} -- cgit v1.2.3