aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator')
-rw-r--r--erts/emulator/Makefile.in2
-rw-r--r--erts/emulator/beam/erl_alloc.c13
-rw-r--r--erts/emulator/beam/erl_alloc.types35
-rw-r--r--erts/emulator/beam/erl_async.c737
-rw-r--r--erts/emulator/beam/erl_async.h66
-rw-r--r--erts/emulator/beam/erl_bif_info.c1
-rw-r--r--erts/emulator/beam/erl_db.c3
-rw-r--r--erts/emulator/beam/erl_driver.h15
-rw-r--r--erts/emulator/beam/erl_init.c75
-rw-r--r--erts/emulator/beam/erl_lock_check.c9
-rw-r--r--erts/emulator/beam/erl_process.c293
-rw-r--r--erts/emulator/beam/erl_process.h43
-rw-r--r--erts/emulator/beam/erl_thr_queue.c745
-rw-r--r--erts/emulator/beam/erl_thr_queue.h211
-rw-r--r--erts/emulator/beam/global.h6
-rw-r--r--erts/emulator/beam/io.c6
-rw-r--r--erts/emulator/beam/sys.h11
-rw-r--r--erts/emulator/beam/utils.c1
-rw-r--r--erts/emulator/sys/unix/sys.c171
-rw-r--r--erts/emulator/sys/vxworks/sys.c6
-rw-r--r--erts/emulator/sys/win32/sys.c137
-rw-r--r--erts/emulator/sys/win32/sys_env.c14
-rw-r--r--erts/emulator/test/driver_SUITE.erl77
-rw-r--r--erts/emulator/test/driver_SUITE_data/Makefile.src3
-rw-r--r--erts/emulator/test/driver_SUITE_data/async_blast_drv.c124
25 files changed, 2057 insertions, 747 deletions
diff --git a/erts/emulator/Makefile.in b/erts/emulator/Makefile.in
index afca3b85df..708d4ca0a3 100644
--- a/erts/emulator/Makefile.in
+++ b/erts/emulator/Makefile.in
@@ -739,7 +739,7 @@ RUN_OBJS = \
$(OBJDIR)/packet_parser.o $(OBJDIR)/safe_hash.o \
$(OBJDIR)/erl_zlib.o $(OBJDIR)/erl_nif.o \
$(OBJDIR)/erl_bif_binary.o $(OBJDIR)/erl_ao_firstfit_alloc.o \
- $(OBJDIR)/erl_sched_spec_pre_alloc.o
+ $(OBJDIR)/erl_thr_queue.o $(OBJDIR)/erl_sched_spec_pre_alloc.o
ifeq ($(TARGET),win32)
DRV_OBJS = \
diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c
index 705ace26fa..33d6cf5f2f 100644
--- a/erts/emulator/beam/erl_alloc.c
+++ b/erts/emulator/beam/erl_alloc.c
@@ -41,6 +41,7 @@
#include "erl_monitors.h"
#include "erl_bif_timer.h"
#include "erl_cpu_topology.h"
+#include "erl_thr_queue.h"
#if defined(ERTS_ALC_T_DRV_SEL_D_STATE) || defined(ERTS_ALC_T_DRV_EV_D_STATE)
#include "erl_check_io.h"
#endif
@@ -524,6 +525,10 @@ erts_alloc_init(int *argc, char **argv, ErtsAllocInitOpts *eaiop)
= sizeof(ErtsDrvSelectDataState);
fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_MSG_REF)]
= sizeof(ErlMessage);
+#ifdef ERTS_SMP
+ fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_THR_Q_EL_SL)]
+ = sizeof(ErtsThrQElement_t);
+#endif
#ifdef HARD_DEBUG
hdbg_init();
#endif
@@ -3070,10 +3075,10 @@ erts_request_alloc_info(struct process *c_p,
#ifdef ERTS_SMP
if (erts_no_schedulers > 1)
- erts_smp_schedule_misc_aux_work(1,
- erts_no_schedulers,
- reply_alloc_info,
- (void *) air);
+ erts_schedule_multi_misc_aux_work(1,
+ erts_no_schedulers,
+ reply_alloc_info,
+ (void *) air);
#endif
reply_alloc_info((void *) air);
diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types
index 79d3433fc0..962db8b831 100644
--- a/erts/emulator/beam/erl_alloc.types
+++ b/erts/emulator/beam/erl_alloc.types
@@ -50,6 +50,15 @@
# command line argument to make_alloc_types. The variable X is false
# after a "+disable X" statement or if it has never been mentioned.
++if smp
++disable threads_no_smp
++else
++if threads
++enable threads_no_smp
++else
++disable threads_no_smp
++endif
++endif
# --- Allocator declarations -------------------------------------------------
#
@@ -192,7 +201,7 @@ type LINEBUF STANDARD SYSTEM line_buf
type IOQ STANDARD SYSTEM io_queue
type BITS_BUF STANDARD SYSTEM bits_buf
type TMP_DIST_BUF TEMPORARY SYSTEM tmp_dist_buf
-type ASYNC_Q LONG_LIVED SYSTEM async_queue
+type ASYNC_DATA LONG_LIVED SYSTEM internal_async_data
type ESTACK TEMPORARY SYSTEM estack
type PORT_CALL_BUF TEMPORARY SYSTEM port_call_buf
type DB_TABLE ETS ETS db_tab
@@ -253,6 +262,22 @@ type EXT_TERM_DATA SHORT_LIVED PROCESSES external_term_data
type ZLIB STANDARD SYSTEM zlib
type CPU_GRPS_MAP LONG_LIVED SYSTEM cpu_groups_map
type AUX_WORK_TMO LONG_LIVED SYSTEM aux_work_timeouts
+type MISC_AUX_WORK_Q LONG_LIVED SYSTEM misc_aux_work_q
+
++if threads_no_smp
+# Need thread safe allocs, but std_alloc and fix_alloc are not;
+# use driver_alloc which is...
+type THR_Q_EL DRIVER SYSTEM thr_q_element
+type THR_Q_EL_SL DRIVER SYSTEM sl_thr_q_element
+type MISC_AUX_WORK DRIVER SYSTEM misc_aux_work
++else
+type THR_Q_EL STANDARD SYSTEM thr_q_element
+type THR_Q_EL_SL FIXED_SIZE SYSTEM sl_thr_q_element
+type MISC_AUX_WORK SHORT_LIVED SYSTEM misc_aux_work
++endif
+type THR_Q STANDARD SYSTEM thr_queue
+type THR_Q_SL SHORT_LIVED SYSTEM short_lived_thr_queue
+type THR_Q_LL LONG_LIVED SYSTEM long_lived_thr_queue
+if smp
type ASYNC SHORT_LIVED SYSTEM async
@@ -268,8 +293,6 @@ type XPORTS_LIST SHORT_LIVED SYSTEM extra_port_list
type PROC_LCK_WTR LONG_LIVED SYSTEM proc_lock_waiter
type PROC_LCK_QS LONG_LIVED SYSTEM proc_lock_queues
type RUNQ_BLNS LONG_LIVED SYSTEM run_queue_balancing
-type MISC_AUX_WORK_Q LONG_LIVED SYSTEM misc_aux_work_q
-type MISC_AUX_WORK SHORT_LIVED SYSTEM misc_aux_work
type THR_PRGR_IDATA LONG_LIVED SYSTEM thr_prgr_internal_data
type THR_PRGR_DATA LONG_LIVED SYSTEM thr_prgr_data
type T_THR_PRGR_DATA SHORT_LIVED SYSTEM temp_thr_prgr_data
@@ -285,12 +308,6 @@ type ETHR_STD STANDARD SYSTEM ethread_standard
type ETHR_SL SHORT_LIVED SYSTEM ethread_short_lived
type ETHR_LL LONG_LIVED SYSTEM ethread_long_lived
-+ifnot smp
-
-type ARCALLBACK LONG_LIVED SYSTEM async_ready_callback
-
-+endif
-
+endif
+if shared_heap
diff --git a/erts/emulator/beam/erl_async.c b/erts/emulator/beam/erl_async.c
index 91b64411d4..2dc7237f7c 100644
--- a/erts/emulator/beam/erl_async.c
+++ b/erts/emulator/beam/erl_async.c
@@ -24,10 +24,18 @@
#include "erl_sys_driver.h"
#include "global.h"
#include "erl_threads.h"
+#include "erl_thr_queue.h"
+#include "erl_async.h"
+
+#define ERTS_MAX_ASYNC_READY_CALLS_IN_SEQ 20
+
+#define ERTS_ASYNC_PRINT_JOB 0
+
+#if !defined(ERTS_SMP) && defined(USE_THREADS) && !ERTS_USE_ASYNC_READY_Q
+# error "Need async ready queue in non-smp case"
+#endif
typedef struct _erl_async {
- struct _erl_async* next;
- struct _erl_async* prev;
DE_Handle* hndl; /* The DE_Handle is needed when port is gone */
Eterm port;
long async_id;
@@ -35,345 +43,498 @@ typedef struct _erl_async {
ErlDrvPDL pdl;
void (*async_invoke)(void*);
void (*async_free)(void*);
-} ErlAsync;
+#if ERTS_USE_ASYNC_READY_Q
+ Uint sched_id;
+ union {
+ ErtsThrQPrepEnQ_t *prep_enq;
+ ErtsThrQFinDeQ_t fin_deq;
+ } q;
+#endif
+} ErtsAsync;
+
+#if ERTS_USE_ASYNC_READY_Q
+
+/*
+ * We can do without the enqueue mutex since it isn't needed for
+ * thread safety. Its only purpose is to put async threads to sleep
+ * during a blast of ready async jobs. This in order to reduce
+ * contention on the enqueue end of the async ready queues. During
+ * such a blast without the enqueue mutex much cpu time is consumed
+ * by the async threads without them doing much progress which in turn
+ * slow down progress of scheduler threads.
+ */
+#define ERTS_USE_ASYNC_READY_ENQ_MTX 1
+
+#if ERTS_USE_ASYNC_READY_ENQ_MTX
typedef struct {
- erts_mtx_t mtx;
- erts_cnd_t cv;
- erts_tid_t thr;
- int len;
-#ifndef ERTS_SMP
- int hndl;
+ erts_mtx_t enq_mtx;
+} ErtsAsyncReadyQXData;
+
#endif
- ErlAsync* head;
- ErlAsync* tail;
-#ifdef ERTS_ENABLE_LOCK_CHECK
- int no;
+
+typedef struct {
+#if ERTS_USE_ASYNC_READY_ENQ_MTX
+ union {
+ ErtsAsyncReadyQXData data;
+ char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(
+ sizeof(ErtsAsyncReadyQXData))];
+ } x;
#endif
-} AsyncQueue;
+ ErtsThrQ_t thr_q;
+ ErtsThrQFinDeQ_t fin_deq;
+} ErtsAsyncReadyQ;
-static erts_smp_spinlock_t async_id_lock;
-static long async_id = 0;
+typedef union {
+ ErtsAsyncReadyQ arq;
+ char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncReadyQ))];
+} ErtsAlgndAsyncReadyQ;
-#ifndef ERTS_SMP
+#endif /* ERTS_USE_ASYNC_READY_Q */
-erts_mtx_t async_ready_mtx;
-static ErlAsync* async_ready_list = NULL;
+typedef struct {
+ ErtsThrQ_t thr_q;
+ erts_tid_t thr_id;
+} ErtsAsyncQ;
+
+typedef union {
+ ErtsAsyncQ aq;
+ char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncQ))];
+} ErtsAlgndAsyncQ;
+typedef struct {
+ int no_initialized;
+ erts_mtx_t mtx;
+ erts_cnd_t cnd;
+ erts_atomic_t id;
+} ErtsAsyncInit;
+
+typedef struct {
+ union {
+ ErtsAsyncInit data;
+ char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncInit))];
+ } init;
+ ErtsAlgndAsyncQ *queue;
+#if ERTS_USE_ASYNC_READY_Q
+ ErtsAlgndAsyncReadyQ *ready_queue;
#endif
+} ErtsAsyncData;
-/*
-** Initialize worker threads (if supported)
-*/
+int erts_async_max_threads; /* Initialized by erl_init.c */
+int erts_async_thread_suggested_stack_size; /* Initialized by erl_init.c */
-/* Detach from driver */
-static void async_detach(DE_Handle* dh)
-{
- return;
-}
+static ErtsAsyncData *async;
+#ifndef USE_THREADS
-#ifdef USE_THREADS
+void
+erts_init_async(void)
+{
-static AsyncQueue* async_q;
+}
-static void* async_main(void*);
-static void async_add(ErlAsync*, AsyncQueue*);
+#else
-#ifndef ERTS_SMP
-typedef struct ErtsAsyncReadyCallback_ ErtsAsyncReadyCallback;
-struct ErtsAsyncReadyCallback_ {
- struct ErtsAsyncReadyCallback_ *next;
- void (*callback)(void);
-};
+static void *async_main(void *);
-static ErtsAsyncReadyCallback *callbacks;
-static int async_handle;
+static ERTS_INLINE ErtsAsyncQ *
+async_q(int i)
+{
+ return &async->queue[i].aq;
+}
+
+#if ERTS_USE_ASYNC_READY_Q
-int erts_register_async_ready_callback(void (*funcp)(void))
+static ERTS_INLINE ErtsAsyncReadyQ *
+async_ready_q(Uint sched_id)
{
- ErtsAsyncReadyCallback *cb = erts_alloc(ERTS_ALC_T_ARCALLBACK,
- sizeof(ErtsAsyncReadyCallback));
- cb->next = callbacks;
- cb->callback = funcp;
- erts_mtx_lock(&async_ready_mtx);
- callbacks = cb;
- erts_mtx_unlock(&async_ready_mtx);
- return async_handle;
+ return &async->ready_queue[((int)sched_id)-1].arq;
}
+
#endif
-int init_async(int hndl)
+void
+erts_init_async(void)
{
- erts_thr_opts_t thr_opts = ERTS_THR_OPTS_DEFAULT_INITER;
- AsyncQueue* q;
- int i;
+ async = NULL;
+ if (erts_async_max_threads > 0) {
+#if ERTS_USE_ASYNC_READY_Q
+ ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT;
+#endif
+ erts_thr_opts_t thr_opts = ERTS_THR_OPTS_DEFAULT_INITER;
+ char *ptr;
+ size_t tot_size = 0;
+ int i;
+
+ tot_size += ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncData));
+ tot_size += sizeof(ErtsAlgndAsyncQ)*erts_async_max_threads;
+#if ERTS_USE_ASYNC_READY_Q
+ tot_size += sizeof(ErtsAlgndAsyncReadyQ)*erts_no_schedulers;
+#endif
- thr_opts.detached = 0;
- thr_opts.suggested_stack_size = erts_async_thread_suggested_stack_size;
-
-#ifndef ERTS_SMP
- callbacks = NULL;
- async_handle = hndl;
- erts_mtx_init(&async_ready_mtx, "async_ready");
- async_ready_list = NULL;
-#endif
-
- async_id = 0;
- erts_smp_spinlock_init(&async_id_lock, "async_id");
-
- async_q = q = (AsyncQueue*)
- (erts_async_max_threads
- ? erts_alloc(ERTS_ALC_T_ASYNC_Q,
- erts_async_max_threads * sizeof(AsyncQueue))
- : NULL);
- for (i = 0; i < erts_async_max_threads; i++) {
- q->head = NULL;
- q->tail = NULL;
- q->len = 0;
-#ifndef ERTS_SMP
- q->hndl = hndl;
-#endif
-#ifdef ERTS_ENABLE_LOCK_CHECK
- q->no = i;
-#endif
- erts_mtx_init(&q->mtx, "asyncq");
- erts_cnd_init(&q->cv);
- erts_thr_create(&q->thr, async_main, (void*)q, &thr_opts);
- q++;
- }
- return 0;
-}
+ ptr = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_ASYNC_DATA,
+ tot_size);
+ async = (ErtsAsyncData *) ptr;
+ ptr += ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncData));
-int exit_async()
-{
- int i;
+ async->init.data.no_initialized = 0;
+ erts_mtx_init(&async->init.data.mtx, "async_init_mtx");
+ erts_cnd_init(&async->init.data.cnd);
+ erts_atomic_init_nob(&async->init.data.id, 0);
- /* terminate threads */
- for (i = 0; i < erts_async_max_threads; i++) {
- ErlAsync* a = (ErlAsync*) erts_alloc(ERTS_ALC_T_ASYNC,
- sizeof(ErlAsync));
- a->port = NIL;
- async_add(a, &async_q[i]);
- }
+ async->queue = (ErtsAlgndAsyncQ *) ptr;
+ ptr += sizeof(ErtsAlgndAsyncQ)*erts_async_max_threads;
- for (i = 0; i < erts_async_max_threads; i++) {
- erts_thr_join(async_q[i].thr, NULL);
- erts_mtx_destroy(&async_q[i].mtx);
- erts_cnd_destroy(&async_q[i].cv);
- }
-#ifndef ERTS_SMP
- erts_mtx_destroy(&async_ready_mtx);
+#if ERTS_USE_ASYNC_READY_Q
+
+ qinit.live.queue = ERTS_THR_Q_LIVE_LONG;
+ qinit.live.objects = ERTS_THR_Q_LIVE_SHORT;
+ qinit.notify = erts_notify_check_async_ready_queue;
+
+ async->ready_queue = (ErtsAlgndAsyncReadyQ *) ptr;
+ ptr += sizeof(ErtsAlgndAsyncReadyQ)*erts_no_schedulers;
+
+ for (i = 1; i <= erts_no_schedulers; i++) {
+ ErtsAsyncReadyQ *arq = async_ready_q(i);
+#if ERTS_USE_ASYNC_READY_ENQ_MTX
+ erts_mtx_init(&arq->x.data.enq_mtx, "async_enq_mtx");
#endif
- if (async_q)
- erts_free(ERTS_ALC_T_ASYNC_Q, (void *) async_q);
- return 0;
+ erts_thr_q_finalize_dequeue_state_init(&arq->fin_deq);
+ qinit.arg = (void *) (SWord) i;
+ erts_thr_q_initialize(&arq->thr_q, &qinit);
+ }
+
+#endif
+
+ /* Create async threads... */
+
+ thr_opts.detached = 0;
+ thr_opts.suggested_stack_size
+ = erts_async_thread_suggested_stack_size;
+
+ for (i = 0; i < erts_async_max_threads; i++) {
+ ErtsAsyncQ *aq = async_q(i);
+ erts_thr_create(&aq->thr_id, async_main, (void*) aq, &thr_opts);
+ }
+
+ /* Wait for async threads to initialize... */
+
+ erts_mtx_lock(&async->init.data.mtx);
+ while (async->init.data.no_initialized != erts_async_max_threads)
+ erts_cnd_wait(&async->init.data.cnd, &async->init.data.mtx);
+ erts_mtx_unlock(&async->init.data.mtx);
+
+ erts_mtx_destroy(&async->init.data.mtx);
+ erts_cnd_destroy(&async->init.data.cnd);
+
+ }
}
+#if ERTS_USE_ASYNC_READY_Q
-static void async_add(ErlAsync* a, AsyncQueue* q)
+void *
+erts_get_async_ready_queue(Uint sched_id)
+{
+ return (void *) async ? async_ready_q(sched_id) : NULL;
+}
+
+#endif
+
+static ERTS_INLINE void async_add(ErtsAsync *a, ErtsAsyncQ* q)
{
if (is_internal_port(a->port)) {
- ERTS_LC_ASSERT(erts_drvportid2port(a->port));
+#if ERTS_USE_ASYNC_READY_Q
+ ErtsAsyncReadyQ *arq = async_ready_q(a->sched_id);
+ a->q.prep_enq = erts_thr_q_prepare_enqueue(&arq->thr_q);
+#endif
/* make sure the driver will stay around */
- driver_lock_driver(internal_port_index(a->port));
+ if (a->hndl)
+ erts_ddll_reference_referenced_driver(a->hndl);
}
- erts_mtx_lock(&q->mtx);
+#if ERTS_ASYNC_PRINT_JOB
+ erts_fprintf(stderr, "-> %ld\n", a->async_id);
+#endif
- if (q->len == 0) {
- q->head = a;
- q->tail = a;
- q->len = 1;
- erts_cnd_signal(&q->cv);
- }
- else { /* no need to signal (since the worker is working) */
- a->next = q->head;
- q->head->prev = a;
- q->head = a;
- q->len++;
- }
- erts_mtx_unlock(&q->mtx);
+ erts_thr_q_enqueue(&q->thr_q, a);
}
-static ErlAsync* async_get(AsyncQueue* q)
+static ERTS_INLINE ErtsAsync *async_get(ErtsThrQ_t *q,
+ erts_tse_t *tse,
+ ErtsThrQPrepEnQ_t **prep_enq)
{
- ErlAsync* a;
+#if ERTS_USE_ASYNC_READY_Q
+ int saved_fin_deq = 0;
+ ErtsThrQFinDeQ_t fin_deq;
+#endif
- erts_mtx_lock(&q->mtx);
- while((a = q->tail) == NULL) {
- erts_cnd_wait(&q->cv, &q->mtx);
- }
+ while (1) {
+ ErtsAsync *a = (ErtsAsync *) erts_thr_q_dequeue(q);
+ if (a) {
+
+#if ERTS_USE_ASYNC_READY_Q
+ *prep_enq = a->q.prep_enq;
+ erts_thr_q_get_finalize_dequeue_data(q, &a->q.fin_deq);
+ if (saved_fin_deq)
+ erts_thr_q_append_finalize_dequeue_data(&a->q.fin_deq, &fin_deq);
+#endif
+
+ return a;
+ }
+
+ if (ERTS_THR_Q_DIRTY != erts_thr_q_clean(q)) {
+ ErtsThrQFinDeQ_t tmp_fin_deq;
+
+ erts_tse_reset(tse);
+
+#if ERTS_USE_ASYNC_READY_Q
+ chk_fin_deq:
+ if (erts_thr_q_get_finalize_dequeue_data(q, &tmp_fin_deq)) {
+ if (!saved_fin_deq) {
+ erts_thr_q_finalize_dequeue_state_init(&fin_deq);
+ saved_fin_deq = 1;
+ }
+ erts_thr_q_append_finalize_dequeue_data(&fin_deq,
+ &tmp_fin_deq);
+ }
+#endif
+
+ switch (erts_thr_q_inspect(q, 1)) {
+ case ERTS_THR_Q_DIRTY:
+ break;
#ifdef ERTS_SMP
- ASSERT(a && q->tail == a);
+ case ERTS_THR_Q_NEED_THR_PRGR: {
+ ErtsThrPrgrVal prgr = erts_thr_q_need_thr_progress(q);
+ erts_thr_progress_wakeup(NULL, prgr);
+ /*
+ * We do no dequeue finalizing in hope that a new async
+ * job will arrive before we are woken due to thread
+ * progress...
+ */
+ erts_tse_wait(tse);
+ break;
+ }
#endif
- if (q->head == q->tail) {
- q->head = q->tail = NULL;
- q->len = 0;
- }
- else {
- q->tail->prev->next = NULL;
- q->tail = q->tail->prev;
- q->len--;
+ case ERTS_THR_Q_CLEAN:
+
+#if ERTS_USE_ASYNC_READY_Q
+ if (saved_fin_deq) {
+ if (erts_thr_q_finalize_dequeue(&fin_deq))
+ goto chk_fin_deq;
+ else
+ saved_fin_deq = 0;
+ }
+#endif
+
+ erts_tse_wait(tse);
+ break;
+
+ default:
+ ASSERT(0);
+ break;
+ }
+
+ }
}
- erts_mtx_unlock(&q->mtx);
- return a;
}
-
-static int async_del(long id)
+static ERTS_INLINE void call_async_ready(ErtsAsync *a)
{
- int i;
- /* scan all queue for an entry with async_id == 'id' */
-
- for (i = 0; i < erts_async_max_threads; i++) {
- ErlAsync* a;
- erts_mtx_lock(&async_q[i].mtx);
-
- a = async_q[i].head;
- while(a != NULL) {
- if (a->async_id == id) {
- if (a->prev != NULL)
- a->prev->next = a->next;
- else
- async_q[i].head = a->next;
- if (a->next != NULL)
- a->next->prev = a->prev;
- else
- async_q[i].tail = a->prev;
- async_q[i].len--;
- erts_mtx_unlock(&async_q[i].mtx);
- if (a->async_free != NULL)
- a->async_free(a->async_data);
- async_detach(a->hndl);
- erts_free(ERTS_ALC_T_ASYNC, a);
- return 1;
- }
- a = a->next;
+ Port *p = erts_id2port_sflgs(a->port,
+ NULL,
+ 0,
+ ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
+ if (!p) {
+ if (a->async_free)
+ a->async_free(a->async_data);
+ }
+ else {
+ if (async_ready(p, a->async_data)) {
+ if (a->async_free)
+ a->async_free(a->async_data);
}
- erts_mtx_unlock(&async_q[i].mtx);
+ erts_port_release(p);
}
- return 0;
+ if (a->hndl)
+ erts_ddll_dereference_driver(a->hndl);
}
-static void* async_main(void* arg)
+static ERTS_INLINE void async_reply(ErtsAsync *a, ErtsThrQPrepEnQ_t *prep_enq)
{
- AsyncQueue* q = (AsyncQueue*) arg;
+#if ERTS_USE_ASYNC_READY_Q
+ ErtsAsyncReadyQ *arq;
-#ifdef ERTS_ENABLE_LOCK_CHECK
- {
- char buf[27];
- erts_snprintf(&buf[0], 27, "async %d", q->no);
- erts_lc_set_thread_name(&buf[0]);
- }
+ if (a->pdl)
+ driver_pdl_dec_refc(a->pdl);
+
+#if ERTS_ASYNC_PRINT_JOB
+ erts_fprintf(stderr, "=>> %ld\n", a->async_id);
#endif
- while(1) {
- ErlAsync* a = async_get(q);
+ arq = async_ready_q(a->sched_id);
- if (a->port == NIL) { /* TIME TO DIE SIGNAL */
- erts_free(ERTS_ALC_T_ASYNC, (void *) a);
- break;
- }
- else {
- (*a->async_invoke)(a->async_data);
- /* Major problem if the code for async_invoke
- or async_free is removed during a blocking operation */
+#if ERTS_USE_ASYNC_READY_ENQ_MTX
+ erts_mtx_lock(&arq->x.data.enq_mtx);
+#endif
+
+ erts_thr_q_enqueue_prepared(&arq->thr_q, (void *) a, prep_enq);
+
+#if ERTS_USE_ASYNC_READY_ENQ_MTX
+ erts_mtx_unlock(&arq->x.data.enq_mtx);
+#endif
+
+#else /* ERTS_USE_ASYNC_READY_Q */
+
+ call_async_ready(a);
+ if (a->pdl)
+ driver_pdl_dec_refc(a->pdl);
+ erts_free(ERTS_ALC_T_ASYNC, (void *) a);
+
+#endif /* ERTS_USE_ASYNC_READY_Q */
+}
+
+
+static void
+async_wakeup(void *vtse)
+{
+ erts_tse_set((erts_tse_t *) vtse);
+}
+
+static erts_tse_t *async_thread_init(ErtsAsyncQ *aq)
+{
+ ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT;
+ erts_tse_t *tse = erts_tse_fetch();
#ifdef ERTS_SMP
- {
- Port *p;
- p = erts_id2port_sflgs(a->port,
- NULL,
- 0,
- ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
- if (!p) {
- if (a->async_free)
- (*a->async_free)(a->async_data);
- }
- else {
- if (async_ready(p, a->async_data)) {
- if (a->async_free)
- (*a->async_free)(a->async_data);
- }
- async_detach(a->hndl);
- erts_port_release(p);
- }
- if (a->pdl) {
- driver_pdl_dec_refc(a->pdl);
- }
- erts_free(ERTS_ALC_T_ASYNC, (void *) a);
- }
-#else
- if (a->pdl) {
- driver_pdl_dec_refc(a->pdl);
- }
- erts_mtx_lock(&async_ready_mtx);
- a->next = async_ready_list;
- async_ready_list = a;
- erts_mtx_unlock(&async_ready_mtx);
- sys_async_ready(q->hndl);
+ ErtsThrPrgrCallbacks callbacks;
+
+ callbacks.arg = (void *) tse;
+ callbacks.wakeup = async_wakeup;
+ callbacks.prepare_wait = NULL;
+ callbacks.wait = NULL;
+
+ erts_thr_progress_register_unmanaged_thread(&callbacks);
#endif
- }
- }
- return NULL;
+ qinit.live.queue = ERTS_THR_Q_LIVE_LONG;
+ qinit.live.objects = ERTS_THR_Q_LIVE_SHORT;
+ qinit.arg = (void *) tse;
+ qinit.notify = async_wakeup;
+#if ERTS_USE_ASYNC_READY_Q
+ qinit.auto_finalize_dequeue = 0;
+#endif
+
+ erts_thr_q_initialize(&aq->thr_q, &qinit);
+
+ /* Inform main thread that we are done initializing... */
+ erts_mtx_lock(&async->init.data.mtx);
+ async->init.data.no_initialized++;
+ erts_cnd_signal(&async->init.data.cnd);
+ erts_mtx_unlock(&async->init.data.mtx);
+
+ return tse;
}
+static void *async_main(void* arg)
+{
+ ErtsAsyncQ *aq = (ErtsAsyncQ *) arg;
+ erts_tse_t *tse = async_thread_init(aq);
+
+ while (1) {
+ ErtsThrQPrepEnQ_t *prep_enq;
+ ErtsAsync *a = async_get(&aq->thr_q, tse, &prep_enq);
+ if (is_nil(a->port))
+ break; /* Time to die */
+#if ERTS_ASYNC_PRINT_JOB
+ erts_fprintf(stderr, "<- %ld\n", a->async_id);
#endif
-#ifndef ERTS_SMP
+ a->async_invoke(a->async_data);
+
+ async_reply(a, prep_enq);
+ }
+
+ return NULL;
+}
+
+#endif /* USE_THREADS */
-int check_async_ready(void)
+void
+erts_exit_flush_async(void)
{
#ifdef USE_THREADS
- ErtsAsyncReadyCallback *cbs;
+ int i;
+ ErtsAsync a;
+ a.port = NIL;
+ /*
+ * Terminate threads in order to flush queues. We do not
+ * bother to clean everything up since we are about to
+ * terminate the runtime system and a cleanup would only
+ * delay the termination.
+ */
+ for (i = 0; i < erts_async_max_threads; i++)
+ async_add(&a, async_q(i));
+ for (i = 0; i < erts_async_max_threads; i++)
+ erts_thr_join(async->queue[i].aq.thr_id, NULL);
#endif
- ErlAsync* a;
- int count = 0;
+}
- erts_mtx_lock(&async_ready_mtx);
- a = async_ready_list;
- async_ready_list = NULL;
-#ifdef USE_THREADS
- cbs = callbacks;
-#endif
- erts_mtx_unlock(&async_ready_mtx);
-
- while(a != NULL) {
- ErlAsync* a_next = a->next;
- /* Every port not dead */
- Port *p = erts_id2port_sflgs(a->port,
- NULL,
- 0,
- ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
- if (!p) {
- if (a->async_free)
- (*a->async_free)(a->async_data);
- }
- else {
- count++;
- if (async_ready(p, a->async_data)) {
- if (a->async_free != NULL)
- (*a->async_free)(a->async_data);
- }
- async_detach(a->hndl);
- erts_port_release(p);
+#if defined(USE_THREADS) && ERTS_USE_ASYNC_READY_Q
+
+int erts_check_async_ready(void *varq)
+{
+ ErtsAsyncReadyQ *arq = (ErtsAsyncReadyQ *) varq;
+ int res = 1;
+ int i;
+
+ for (i = 0; i < ERTS_MAX_ASYNC_READY_CALLS_IN_SEQ; i++) {
+ ErtsAsync *a = (ErtsAsync *) erts_thr_q_dequeue(&arq->thr_q);
+ if (!a) {
+ res = 0;
+ break;
}
+
+#if ERTS_ASYNC_PRINT_JOB
+ erts_fprintf(stderr, "<<= %ld\n", a->async_id);
+#endif
+ erts_thr_q_append_finalize_dequeue_data(&arq->fin_deq, &a->q.fin_deq);
+ call_async_ready(a);
erts_free(ERTS_ALC_T_ASYNC, (void *) a);
- a = a_next;
}
-#ifdef USE_THREADS
- for (; cbs; cbs = cbs->next)
- (*cbs->callback)();
-#endif
- return count;
+
+ erts_thr_q_finalize_dequeue(&arq->fin_deq);
+
+ return res;
}
+int erts_async_ready_clean(void *varq, void *val)
+{
+ ErtsAsyncReadyQ *arq = (ErtsAsyncReadyQ *) varq;
+ ErtsThrQCleanState_t cstate;
+
+ cstate = erts_thr_q_clean(&arq->thr_q);
+
+ if (erts_thr_q_finalize_dequeue(&arq->fin_deq))
+ return ERTS_ASYNC_READY_DIRTY;
+
+ switch (cstate) {
+ case ERTS_THR_Q_DIRTY:
+ return ERTS_ASYNC_READY_DIRTY;
+#ifdef ERTS_SMP
+ case ERTS_THR_Q_NEED_THR_PRGR:
+ *((ErtsThrPrgrVal *) val)
+ = erts_thr_q_need_thr_progress(&arq->thr_q);
+ return ERTS_ASYNC_READY_NEED_THR_PRGR;
#endif
+ case ERTS_THR_Q_CLEAN:
+ break;
+ }
+ return ERTS_ASYNC_READY_CLEAN;
+}
+#endif
/*
** Schedule async_invoke on a worker thread
@@ -393,19 +554,29 @@ long driver_async(ErlDrvPort ix, unsigned int* key,
void (*async_invoke)(void*), void* async_data,
void (*async_free)(void*))
{
- ErlAsync* a = (ErlAsync*) erts_alloc(ERTS_ALC_T_ASYNC, sizeof(ErlAsync));
- Port* prt = erts_drvport2port(ix);
+ ErtsAsync* a;
+ Port* prt;
long id;
unsigned int qix;
+#if ERTS_USE_ASYNC_READY_Q
+ Uint sched_id;
+ sched_id = erts_get_scheduler_id();
+ if (!sched_id)
+ sched_id = 1;
+#endif
+ prt = erts_drvport2port(ix);
if (!prt)
return -1;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
- a->next = NULL;
- a->prev = NULL;
+ a = (ErtsAsync*) erts_alloc(ERTS_ALC_T_ASYNC, sizeof(ErtsAsync));
+
+#if ERTS_USE_ASYNC_READY_Q
+ a->sched_id = sched_id;
+#endif
a->hndl = (DE_Handle*)prt->drv_ptr->handle;
a->port = prt->id;
a->pdl = NULL;
@@ -413,12 +584,16 @@ long driver_async(ErlDrvPort ix, unsigned int* key,
a->async_invoke = async_invoke;
a->async_free = async_free;
- erts_smp_spin_lock(&async_id_lock);
- async_id = (async_id + 1) & 0x7fffffff;
- if (async_id == 0)
- async_id++;
- id = async_id;
- erts_smp_spin_unlock(&async_id_lock);
+ if (!async)
+ id = 0;
+ else {
+ do {
+ id = erts_atomic_inc_read_nob(&async->init.data.id);
+ } while (id == 0);
+ if (id < 0)
+ id *= -1;
+ ASSERT(id > 0);
+ }
a->async_id = id;
@@ -437,7 +612,7 @@ long driver_async(ErlDrvPort ix, unsigned int* key,
driver_pdl_inc_refc(prt->port_data_lock);
a->pdl = prt->port_data_lock;
}
- async_add(a, &async_q[qix]);
+ async_add(a, async_q(qix));
return id;
}
#endif
@@ -455,10 +630,16 @@ long driver_async(ErlDrvPort ix, unsigned int* key,
int driver_async_cancel(unsigned int id)
{
-#ifdef USE_THREADS
- if (erts_async_max_threads > 0)
- return async_del(id);
-#endif
+ /*
+ * Not supported anymore. Always fail (which is backward
+ * compatible).
+ *
+ * This functionality could be implemented again. However,
+ * it is (and always has been) completely useless since
+ * it doesn't give you any guarantees whatsoever. The user
+ * needs to (and always have had to) synchronize in his/her
+ * own code in order to get any guarantees.
+ */
return 0;
}
diff --git a/erts/emulator/beam/erl_async.h b/erts/emulator/beam/erl_async.h
new file mode 100644
index 0000000000..95374a8fc9
--- /dev/null
+++ b/erts/emulator/beam/erl_async.h
@@ -0,0 +1,66 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2011. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+#ifndef ERL_ASYNC_H__
+#define ERL_ASYNC_H__
+
+#define ERTS_MAX_NO_OF_ASYNC_THREADS 1024
+extern int erts_async_max_threads;
+#define ERTS_ASYNC_THREAD_MIN_STACK_SIZE 16 /* Kilo words */
+#define ERTS_ASYNC_THREAD_MAX_STACK_SIZE 8192 /* Kilo words */
+extern int erts_async_thread_suggested_stack_size;
+
+#ifdef USE_THREADS
+
+#ifdef ERTS_SMP
+/*
+ * With smp support we can choose to have, or not to
+ * have an async ready queue.
+ */
+#define ERTS_USE_ASYNC_READY_Q 1
+#endif
+
+#ifndef ERTS_SMP
+/* In non-smp case we *need* the async ready queue */
+# undef ERTS_USE_ASYNC_READY_Q
+# define ERTS_USE_ASYNC_READY_Q 1
+#endif
+
+#ifndef ERTS_USE_ASYNC_READY_Q
+# define ERTS_USE_ASYNC_READY_Q 0
+#endif
+
+#if ERTS_USE_ASYNC_READY_Q
+int erts_check_async_ready(void *);
+int erts_async_ready_clean(void *, void *);
+void *erts_get_async_ready_queue(Uint sched_id);
+#define ERTS_ASYNC_READY_CLEAN 0
+#define ERTS_ASYNC_READY_DIRTY 1
+#ifdef ERTS_SMP
+#define ERTS_ASYNC_READY_NEED_THR_PRGR 2
+#endif
+#endif /* ERTS_USE_ASYNC_READY_Q */
+
+#endif /* USE_THREADS */
+
+void erts_init_async(void);
+void erts_exit_flush_async(void);
+
+
+#endif /* ERL_ASYNC_H__ */
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 7119306a52..a79feaebdb 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -39,6 +39,7 @@
#include "dist.h"
#include "erl_gc.h"
#include "erl_cpu_topology.h"
+#include "erl_async.h"
#include "erl_thr_progress.h"
#ifdef HIPE
#include "hipe_arch.h"
diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c
index d8b4294a30..0079c13287 100644
--- a/erts/emulator/beam/erl_db.c
+++ b/erts/emulator/beam/erl_db.c
@@ -280,8 +280,7 @@ static void schedule_free_dbtable(DbTable* tb)
ASSERT(scheds >= 1);
ASSERT(erts_refc_read(&tb->common.ref, 0) == 0);
erts_refc_init(&tb->common.ref, scheds);
- ERTS_THR_MEMORY_BARRIER;
- erts_smp_schedule_misc_aux_work(0, scheds, chk_free_dbtable, tb);
+ erts_schedule_multi_misc_aux_work(0, scheds, chk_free_dbtable, tb);
#else
free_dbtable(tb);
#endif
diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h
index 401967a8de..ae0c9def90 100644
--- a/erts/emulator/beam/erl_driver.h
+++ b/erts/emulator/beam/erl_driver.h
@@ -28,6 +28,14 @@
# include "config.h"
#endif
+#define ERL_DRV_DEPRECATED_FUNC
+#ifdef __GNUC__
+# if __GNUC__ >= 3
+# undef ERL_DRV_DEPRECATED_FUNC
+# define ERL_DRV_DEPRECATED_FUNC __attribute__((deprecated))
+# endif
+#endif
+
#ifdef SIZEOF_CHAR
# define SIZEOF_CHAR_SAVED__ SIZEOF_CHAR
# undef SIZEOF_CHAR
@@ -582,8 +590,11 @@ EXTERN long driver_async(ErlDrvPort ix,
void* async_data,
void (*async_free)(void*));
-
-EXTERN int driver_async_cancel(unsigned int key);
+/*
+ * driver_async_cancel() is deprecated. It is scheduled for removal
+ * in OTP-R16. For more information see the erl_driver(3) documentation.
+ */
+EXTERN int driver_async_cancel(unsigned int key) ERL_DRV_DEPRECATED_FUNC;
/* Locks the driver in the machine "forever", there is
no unlock function. Note that this is almost never useful, as an open
diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c
index 647074a47f..9a09f08618 100644
--- a/erts/emulator/beam/erl_init.c
+++ b/erts/emulator/beam/erl_init.c
@@ -43,6 +43,8 @@
#include "packet_parser.h"
#include "erl_cpu_topology.h"
#include "erl_thr_progress.h"
+#include "erl_thr_queue.h"
+#include "erl_async.h"
#ifdef HIPE
#include "hipe_mode_switch.h" /* for hipe_mode_switch_init() */
@@ -100,8 +102,6 @@ int erts_backtrace_depth; /* How many functions to show in a backtrace
* in error codes.
*/
-int erts_async_max_threads; /* number of threads for async support */
-int erts_async_thread_suggested_stack_size;
erts_smp_atomic32_t erts_max_gen_gcs;
Eterm erts_error_logger_warnings; /* What to map warning logs to, am_error,
@@ -280,6 +280,7 @@ erl_init(int ncpu)
erts_init_node_tables();
init_dist();
erl_drv_thr_init();
+ erts_init_async();
init_io();
init_copy();
init_load();
@@ -606,6 +607,8 @@ early_init(int *argc, char **argv) /*
int max_main_threads;
int max_reader_groups;
int reader_groups;
+ char envbuf[21]; /* enough for any 64-bit integer */
+ size_t envbufsz;
use_multi_run_queue = 1;
erts_printf_eterm_func = erts_printf_term;
@@ -677,6 +680,16 @@ early_init(int *argc, char **argv) /*
schdlrs = no_schedulers;
schdlrs_onln = no_schedulers_online;
+ envbufsz = sizeof(envbuf);
+
+ /* erts_sys_getenv() not initialized yet; need erts_sys_getenv__() */
+ if (erts_sys_getenv__("ERL_THREAD_POOL_SIZE", envbuf, &envbufsz) == 0)
+ erts_async_max_threads = atoi(envbuf);
+ else
+ erts_async_max_threads = 0;
+ if (erts_async_max_threads > ERTS_MAX_NO_OF_ASYNC_THREADS)
+ erts_async_max_threads = ERTS_MAX_NO_OF_ASYNC_THREADS;
+
if (argc && argv) {
int i = 1;
while (i < *argc) {
@@ -704,6 +717,20 @@ early_init(int *argc, char **argv) /*
}
break;
}
+ case 'A': {
+ /* set number of threads in thread pool */
+ char *arg = get_arg(argv[i]+2, argv[i+1], &i);
+ if (((erts_async_max_threads = atoi(arg)) < 0) ||
+ (erts_async_max_threads > ERTS_MAX_NO_OF_ASYNC_THREADS)) {
+ erts_fprintf(stderr,
+ "bad number of async threads %s\n",
+ arg);
+ erts_usage();
+ VERBOSE(DEBUG_SYSTEM, ("using %d async-threads\n",
+ erts_async_max_threads));
+ }
+ break;
+ }
case 'S' : {
int tot, onln;
char *arg = get_arg(argv[i]+2, argv[i+1], &i);
@@ -784,10 +811,14 @@ early_init(int *argc, char **argv) /*
* ** Aux thread (see erl_process.c)
* ** Sys message dispatcher thread (see erl_trace.c)
*
- * * No unmanaged threads that need to register.
+ * * Unmanaged threads that need to register:
+ * ** Async threads (see erl_async.c)
*/
- erts_thr_progress_init(no_schedulers, no_schedulers+1, 0);
+ erts_thr_progress_init(no_schedulers,
+ no_schedulers+2,
+ erts_async_max_threads);
#endif
+ erts_thr_q_init();
erts_init_utils();
erts_early_init_cpu_topology(no_schedulers,
&max_main_threads,
@@ -867,7 +898,6 @@ erl_start(int argc, char **argv)
int have_break_handler = 1;
char envbuf[21]; /* enough for any 64-bit integer */
size_t envbufsz;
- int async_max_threads = erts_async_max_threads;
int ncpu = early_init(&argc, argv);
envbufsz = sizeof(envbuf);
@@ -883,11 +913,6 @@ erl_start(int argc, char **argv)
(erts_aint32_t) max_gen_gcs);
}
- envbufsz = sizeof(envbuf);
- if (erts_sys_getenv("ERL_THREAD_POOL_SIZE", envbuf, &envbufsz) == 0) {
- async_max_threads = atoi(envbuf);
- }
-
#if (defined(__APPLE__) && defined(__MACH__)) || defined(__DARWIN__)
/*
* The default stack size on MacOS X is too small for pcre.
@@ -1317,17 +1342,8 @@ erl_start(int argc, char **argv)
break;
}
- case 'A':
- /* set number of threads in thread pool */
- arg = get_arg(argv[i]+2, argv[i+1], &i);
- if (((async_max_threads = atoi(arg)) < 0) ||
- (async_max_threads > ERTS_MAX_NO_OF_ASYNC_THREADS)) {
- erts_fprintf(stderr, "bad number of async threads %s\n", arg);
- erts_usage();
- }
-
- VERBOSE(DEBUG_SYSTEM, ("using %d async-threads\n",
- async_max_threads));
+ case 'A': /* Was handled in early init just read past it */
+ (void) get_arg(argv[i]+2, argv[i+1], &i);
break;
case 'a':
@@ -1416,10 +1432,6 @@ erl_start(int argc, char **argv)
i++;
}
-#ifdef USE_THREADS
- erts_async_max_threads = async_max_threads;
-#endif
-
/* Delayed check of +P flag */
if (erts_max_processes < ERTS_MIN_PROCESSES
|| erts_max_processes > ERTS_MAX_PROCESSES
@@ -1465,6 +1477,10 @@ erl_start(int argc, char **argv)
erts_sys_main_thread(); /* May or may not return! */
#else
erts_thr_set_main_status(1, 1);
+#if ERTS_USE_ASYNC_READY_Q
+ erts_get_scheduler_data()->aux_work_data.async_ready.queue
+ = erts_get_async_ready_queue(1);
+#endif
set_main_stack_size();
process_main();
#endif
@@ -1537,14 +1553,7 @@ system_cleanup(int exit_code)
erts_cleanup_incgc();
#endif
-#if defined(USE_THREADS)
- exit_async();
-#endif
-
- /*
- * A lot more cleaning could/should have been done...
- */
-
+ erts_exit_flush_async();
}
/*
diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c
index 02d1407a2d..44da6b6c51 100644
--- a/erts/emulator/beam/erl_lock_check.c
+++ b/erts/emulator/beam/erl_lock_check.c
@@ -110,10 +110,6 @@ static erts_lc_lock_order_t erts_lock_order[] = {
{ "fun_tab", NULL },
{ "environ", NULL },
#endif
- { "asyncq", "address" },
-#ifndef ERTS_SMP
- { "async_ready", NULL },
-#endif
{ "efile_drv", "address" },
#if defined(ENABLE_CHILD_WAITER_THREAD) || defined(ERTS_SMP)
{ "child_status", NULL },
@@ -138,6 +134,7 @@ static erts_lc_lock_order_t erts_lock_order[] = {
{ "alcu_init_atoms", NULL },
{ "mseg_init_atoms", NULL },
{ "drv_tsd", NULL },
+ { "async_enq_mtx", NULL },
#ifdef ERTS_SMP
{ "sys_msg_q", NULL },
{ "atom_tab", NULL },
@@ -173,14 +170,12 @@ static erts_lc_lock_order_t erts_lock_order[] = {
{ "timeofday", NULL },
{ "breakpoints", NULL },
{ "pollsets_lock", NULL },
- { "async_id", NULL },
{ "pix_lock", "address" },
{ "run_queues_lists", NULL },
- { "misc_aux_work_queue", "index" },
- { "misc_aux_work_pre_alloc_lock", "address" },
{ "sched_stat", NULL },
{ "run_queue_sleep_list", "address" },
#endif
+ { "async_init_mtx", NULL },
#ifdef ERTS_SMP
{ "proc_lck_qs_alloc", NULL },
#endif
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 6adeef2e69..a3c1c9577b 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -40,6 +40,8 @@
#include "beam_bp.h"
#include "erl_cpu_topology.h"
#include "erl_thr_progress.h"
+#include "erl_thr_queue.h"
+#include "erl_async.h"
#define ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED (2000*CONTEXT_REDS)
#define ERTS_RUNQ_CALL_CHECK_BALANCE_REDS \
@@ -125,7 +127,6 @@ ErtsLcPSDLocks erts_psd_required_locks[ERTS_PSD_SIZE];
#endif
#ifdef ERTS_SMP
-
int erts_disable_proc_not_running_opt;
static ErtsAuxWorkData *aux_thread_aux_work_data;
@@ -361,6 +362,15 @@ dbg_chk_aux_work_val(erts_aint32_t value)
#ifdef ERTS_SSI_AUX_WORK_MISC
valid |= ERTS_SSI_AUX_WORK_MISC;
#endif
+#ifdef ERTS_SSI_AUX_WORK_MISC_THR_PRGR
+ valid |= ERTS_SSI_AUX_WORK_MISC_THR_PRGR;
+#endif
+#ifdef ERTS_SSI_AUX_WORK_ASYNC_READY
+ valid |= ERTS_SSI_AUX_WORK_ASYNC_READY;
+#endif
+#ifdef ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN
+ valid |= ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN;
+#endif
#ifdef ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM
valid |= ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM;
@@ -707,37 +717,37 @@ unset_aux_work_flags(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flgs)
return erts_atomic32_read_band_nob(&ssi->aux_work, ~flgs);
}
-#ifdef ERTS_SMP
-
typedef struct erts_misc_aux_work_t_ erts_misc_aux_work_t;
struct erts_misc_aux_work_t_ {
- erts_misc_aux_work_t *next;
void (*func)(void *);
void *arg;
};
-typedef struct {
- erts_smp_mtx_t mtx;
- erts_misc_aux_work_t *first;
- erts_misc_aux_work_t *last;
-} erts_misc_aux_work_q_t;
+ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_aux_work,
+ erts_misc_aux_work_t,
+ 200,
+ ERTS_ALC_T_MISC_AUX_WORK)
typedef union {
- erts_misc_aux_work_q_t data;
- char align[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_misc_aux_work_q_t))];
+ ErtsThrQ_t q;
+ char align[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQ_t))];
} erts_algnd_misc_aux_work_q_t;
static erts_algnd_misc_aux_work_q_t *misc_aux_work_queues;
-ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_aux_work,
- erts_misc_aux_work_t,
- 200,
- ERTS_ALC_T_MISC_AUX_WORK)
+static void
+notify_aux_work(void *vssi)
+{
+ set_aux_work_flags_wakeup_nob((ErtsSchedulerSleepInfo *) vssi,
+ ERTS_SSI_AUX_WORK_MISC);
+}
static void
init_misc_aux_work(void)
{
int ix;
+ ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT;
+ qinit.notify = notify_aux_work;
init_misc_aux_work_alloc();
@@ -746,88 +756,189 @@ init_misc_aux_work(void)
sizeof(erts_algnd_misc_aux_work_q_t)
* (erts_no_schedulers+1));
- for (ix = 0; ix <= erts_no_schedulers; ix++) {
- erts_smp_mtx_init_x(&misc_aux_work_queues[ix].data.mtx,
- "misc_aux_work_queue",
- make_small(ix));
- misc_aux_work_queues[ix].data.first = NULL;
- misc_aux_work_queues[ix].data.last = NULL;
+#ifdef ERTS_SMP
+ ix = 0; /* aux_thread + schedulers */
+#else
+ ix = 1; /* scheduler only */
+#endif
+
+ for (; ix <= erts_no_schedulers; ix++) {
+ qinit.arg = (void *) ERTS_SCHED_SLEEP_INFO_IX(ix-1);
+ erts_thr_q_initialize(&misc_aux_work_queues[ix].q, &qinit);
+ }
+}
+
+static erts_aint32_t
+misc_aux_work_clean(ErtsThrQ_t *q,
+ ErtsAuxWorkData *awdp,
+ erts_aint32_t aux_work)
+{
+ switch (erts_thr_q_clean(q)) {
+ case ERTS_THR_Q_DIRTY:
+ set_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC);
+ return aux_work | ERTS_SSI_AUX_WORK_MISC;
+#ifdef ERTS_SMP
+ case ERTS_THR_Q_NEED_THR_PRGR:
+ set_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC_THR_PRGR);
+ erts_thr_progress_wakeup(awdp->esdp,
+ erts_thr_q_need_thr_progress(q));
+#endif
+ case ERTS_THR_Q_CLEAN:
+ break;
}
+ return aux_work;
}
static erts_aint32_t
handle_misc_aux_work(ErtsAuxWorkData *awdp,
erts_aint32_t aux_work)
{
- int ix = (int) awdp->sched_id;
- erts_misc_aux_work_t *mawp;
+ ErtsThrQ_t *q = &misc_aux_work_queues[awdp->sched_id].q;
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC);
-
- erts_smp_mtx_lock(&misc_aux_work_queues[ix].data.mtx);
- mawp = misc_aux_work_queues[ix].data.first;
- misc_aux_work_queues[ix].data.first = NULL;
- misc_aux_work_queues[ix].data.last = NULL;
- erts_smp_mtx_unlock(&misc_aux_work_queues[ix].data.mtx);
-
- while (mawp) {
- erts_misc_aux_work_t *free_mawp;
+ while (1) {
+ erts_misc_aux_work_t *mawp = erts_thr_q_dequeue(q);
+ if (!mawp)
+ break;
mawp->func(mawp->arg);
- free_mawp = mawp;
- mawp = mawp->next;
- misc_aux_work_free(free_mawp);
+ misc_aux_work_free(mawp);
}
- return aux_work & ~ERTS_SSI_AUX_WORK_MISC;
+ return misc_aux_work_clean(q, awdp, aux_work & ~ERTS_SSI_AUX_WORK_MISC);
}
-static void
-smp_schedule_misc_aux_work(int ix,
- void (*func)(void *),
- void *arg)
+#ifdef ERTS_SMP
+
+static erts_aint32_t
+handle_misc_aux_work_thr_prgr(ErtsAuxWorkData *awdp,
+ erts_aint32_t aux_work)
{
- erts_aint32_t aux_work;
+ if (!erts_thr_progress_has_reached(awdp->misc.thr_prgr))
+ return aux_work;
+
+ unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC_THR_PRGR);
+
+ return misc_aux_work_clean(&misc_aux_work_queues[awdp->sched_id].q,
+ awdp,
+ aux_work & ~ERTS_SSI_AUX_WORK_MISC_THR_PRGR);
+}
+
+#endif
+
+static ERTS_INLINE void
+schedule_misc_aux_work(int sched_id,
+ void (*func)(void *),
+ void *arg)
+{
+ ErtsThrQ_t *q;
erts_misc_aux_work_t *mawp;
- ErtsSchedulerSleepInfo *ssi;
- mawp = misc_aux_work_alloc();
+#ifdef ERTS_SMP
+ ASSERT(0 <= sched_id && sched_id <= erts_no_schedulers);
+#else
+ ASSERT(sched_id == 1);
+#endif
+ q = &misc_aux_work_queues[sched_id].q;
+ mawp = misc_aux_work_alloc();
mawp->func = func;
mawp->arg = arg;
- mawp->next = NULL;
-
- erts_smp_mtx_lock(&misc_aux_work_queues[ix].data.mtx);
- if (!misc_aux_work_queues[ix].data.last)
- misc_aux_work_queues[ix].data.first = mawp;
- else
- misc_aux_work_queues[ix].data.last->next = mawp;
- misc_aux_work_queues[ix].data.last = mawp;
- erts_smp_mtx_unlock(&misc_aux_work_queues[ix].data.mtx);
+ erts_thr_q_enqueue(q, mawp);
+}
- set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(ix-1),
- ERTS_SSI_AUX_WORK_MISC);
+void
+erts_schedule_misc_aux_work(int sched_id,
+ void (*func)(void *),
+ void *arg)
+{
+ schedule_misc_aux_work(sched_id, func, arg);
}
void
-erts_smp_schedule_misc_aux_work(int ignore_self,
- int max_sched,
- void (*func)(void *),
- void *arg)
+erts_schedule_multi_misc_aux_work(int ignore_self,
+ int max_sched,
+ void (*func)(void *),
+ void *arg)
{
- int ix, ignore_ix = -1;
+ int id, self = 0;
if (ignore_self) {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
if (esdp)
- ignore_ix = (int) esdp->no;
+ self = (int) esdp->no;
}
ASSERT(0 < max_sched && max_sched <= erts_no_schedulers);
- for (ix = 1; ix <= max_sched; ix++) {
- if (ix == ignore_ix)
+ for (id = 1; id <= max_sched; id++) {
+ if (id == self)
continue;
- smp_schedule_misc_aux_work(ix, func, arg);
+ schedule_misc_aux_work(id, func, arg);
+ }
+}
+
+#if ERTS_USE_ASYNC_READY_Q
+
+void
+erts_notify_check_async_ready_queue(void *vno)
+{
+ int ix = ((int) (SWord) vno) -1;
+ set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(ix),
+ ERTS_SSI_AUX_WORK_ASYNC_READY);
+}
+
+static erts_aint32_t
+handle_async_ready(ErtsAuxWorkData *awdp,
+ erts_aint32_t aux_work)
+{
+ ErtsSchedulerSleepInfo *ssi = awdp->ssi;
+ unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY);
+ if (erts_check_async_ready(awdp->async_ready.queue)) {
+ if (set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY)
+ & ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN) {
+ unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN);
+ aux_work &= ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN;
+ }
+ return aux_work;
+ }
+#ifdef ERTS_SMP
+ awdp->async_ready.need_thr_prgr = 0;
+#endif
+ set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN);
+ return ((aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY)
+ | ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN);
+}
+
+static erts_aint32_t
+handle_async_ready_clean(ErtsAuxWorkData *awdp,
+ erts_aint32_t aux_work)
+{
+ void *thr_prgr_p;
+
+#ifdef ERTS_SMP
+ if (awdp->async_ready.need_thr_prgr
+ && !erts_thr_progress_has_reached(awdp->misc.thr_prgr)) {
+ return aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN;
+ }
+
+ awdp->async_ready.need_thr_prgr = 0;
+ thr_prgr_p = (void *) &awdp->async_ready.thr_prgr;
+#else
+ thr_prgr_p = NULL;
+#endif
+
+ switch (erts_async_ready_clean(awdp->async_ready.queue, thr_prgr_p)) {
+ case ERTS_ASYNC_READY_CLEAN:
+ unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN);
+ return aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN;
+#ifdef ERTS_SMP
+ case ERTS_ASYNC_READY_NEED_THR_PRGR:
+ erts_thr_progress_wakeup(awdp->esdp,
+ awdp->async_ready.thr_prgr);
+ awdp->async_ready.need_thr_prgr = 1;
+#endif
+ default:
+ return aux_work;
}
}
@@ -964,14 +1075,14 @@ prep_setup_completed_dealloc(void *vproc)
erts_aint32_t count = (erts_aint32_t) (erts_no_schedulers+1);
if (erts_atomic32_dec_read_mb(&completed_dealloc_count) == count) {
/* scheduler threads */
- erts_smp_schedule_misc_aux_work(0,
- erts_no_schedulers,
- setup_completed_dealloc,
- vproc);
+ erts_schedule_multi_misc_aux_work(0,
+ erts_no_schedulers,
+ setup_completed_dealloc,
+ vproc);
/* aux_thread */
- smp_schedule_misc_aux_work(0,
- setup_completed_dealloc,
- vproc);
+ erts_schedule_misc_aux_work(0,
+ setup_completed_dealloc,
+ vproc);
}
}
@@ -992,14 +1103,14 @@ erts_debug_wait_deallocations(Process *c_p)
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
erts_smp_proc_inc_refc(c_p);
/* scheduler threads */
- erts_smp_schedule_misc_aux_work(0,
- erts_no_schedulers,
- prep_setup_completed_dealloc,
- (void *) c_p);
+ erts_schedule_multi_misc_aux_work(0,
+ erts_no_schedulers,
+ prep_setup_completed_dealloc,
+ (void *) c_p);
/* aux_thread */
- smp_schedule_misc_aux_work(0,
- prep_setup_completed_dealloc,
- (void *) c_p);
+ erts_schedule_misc_aux_work(0,
+ prep_setup_completed_dealloc,
+ (void *) c_p);
return 1;
}
return 0;
@@ -1062,10 +1173,24 @@ handle_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
ERTS_DBG_CHK_AUX_WORK_VAL(aux_work);
}
#ifdef ERTS_SMP
+ if (aux_work & ERTS_SSI_AUX_WORK_MISC_THR_PRGR) {
+ aux_work = handle_misc_aux_work_thr_prgr(awdp, aux_work);
+ ERTS_DBG_CHK_AUX_WORK_VAL(aux_work);
+ }
+#endif
if (aux_work & ERTS_SSI_AUX_WORK_MISC) {
aux_work = handle_misc_aux_work(awdp, aux_work);
ERTS_DBG_CHK_AUX_WORK_VAL(aux_work);
}
+#if ERTS_USE_ASYNC_READY_Q
+ if (aux_work & ERTS_SSI_AUX_WORK_ASYNC_READY) {
+ aux_work = handle_async_ready(awdp, aux_work);
+ ERTS_DBG_CHK_AUX_WORK_VAL(aux_work);
+ }
+ if (aux_work & ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN) {
+ aux_work = handle_async_ready_clean(awdp, aux_work);
+ ERTS_DBG_CHK_AUX_WORK_VAL(aux_work);
+ }
#endif
#ifdef ERTS_SMP_SCHEDULERS_NEED_TO_CHECK_CHILDREN
if (aux_work & ERTS_SSI_AUX_WORK_CHECK_CHILDREN) {
@@ -3191,10 +3316,18 @@ init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp)
awdp->esdp = esdp;
awdp->ssi = esdp ? esdp->ssi : NULL;
#ifdef ERTS_SMP
+ awdp->misc.thr_prgr = ERTS_THR_PRGR_VAL_WAITING;
awdp->dd.thr_prgr = ERTS_THR_PRGR_VAL_WAITING;
awdp->dd.completed_callback = NULL;
awdp->dd.completed_arg = NULL;
#endif
+#ifdef ERTS_USE_ASYNC_READY_Q
+#ifdef ERTS_SMP
+ awdp->async_ready.need_thr_prgr = 0;
+ awdp->async_ready.thr_prgr = ERTS_THR_PRGR_VAL_WAITING;
+#endif
+ awdp->async_ready.queue = NULL;
+#endif
}
void
@@ -3385,9 +3518,10 @@ erts_init_scheduling(int mrq, int no_schedulers, int no_schedulers_online)
init_aux_work_data(&esdp->aux_work_data, esdp);
}
-#ifdef ERTS_SMP
init_misc_aux_work();
+#ifdef ERTS_SMP
+
erts_atomic32_init_nob(&completed_dealloc_count, 0); /* debug only */
aux_thread_aux_work_data =
@@ -4408,6 +4542,9 @@ sched_thread_func(void *vesdp)
#if HAVE_ERTS_MSEG
erts_mseg_late_init();
#endif
+#if ERTS_USE_ASYNC_READY_Q
+ esdp->aux_work_data.async_ready.queue = erts_get_async_ready_queue(no);
+#endif
erts_sched_init_check_cpu_bind(esdp);
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index 8a0944236c..4027fade35 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -54,6 +54,7 @@ typedef struct process Process;
#include "erl_atom_table.h"
#include "external.h"
#include "erl_mseg.h"
+#include "erl_async.h"
#ifdef HIPE
#include "hipe_process.h"
@@ -251,13 +252,18 @@ typedef enum {
#define ERTS_SSI_AUX_WORK_SET_TMO (((erts_aint32_t) 1) << 0)
#define ERTS_SSI_AUX_WORK_CHECK_CHILDREN (((erts_aint32_t) 1) << 1)
#define ERTS_SSI_AUX_WORK_MISC (((erts_aint32_t) 1) << 2)
-#define ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM (((erts_aint32_t) 1) << 3)
-#define ERTS_SSI_AUX_WORK_FIX_ALLOC_DEALLOC (((erts_aint32_t) 1) << 4)
#ifdef ERTS_SMP
-#define ERTS_SSI_AUX_WORK_DD (((erts_aint32_t) 1) << 5)
-#define ERTS_SSI_AUX_WORK_DD_THR_PRGR (((erts_aint32_t) 1) << 6)
+#define ERTS_SSI_AUX_WORK_MISC_THR_PRGR (((erts_aint32_t) 1) << 3)
#endif
-#define ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK (((erts_aint32_t) 1) << 7)
+#define ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM (((erts_aint32_t) 1) << 4)
+#define ERTS_SSI_AUX_WORK_FIX_ALLOC_DEALLOC (((erts_aint32_t) 1) << 5)
+#define ERTS_SSI_AUX_WORK_ASYNC_READY (((erts_aint32_t) 1) << 6)
+#define ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN (((erts_aint32_t) 1) << 7)
+#ifdef ERTS_SMP
+#define ERTS_SSI_AUX_WORK_DD (((erts_aint32_t) 1) << 8)
+#define ERTS_SSI_AUX_WORK_DD_THR_PRGR (((erts_aint32_t) 1) << 9)
+#endif
+#define ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK (((erts_aint32_t) 1) << 10)
#if !HAVE_ERTS_MSEG
# undef ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK
@@ -404,6 +410,9 @@ typedef struct {
ErtsSchedulerSleepInfo *ssi;
struct {
int ix;
+#ifdef ERTS_SMP
+ ErtsThrPrgrVal thr_prgr;
+#endif
} misc;
#ifdef ERTS_SMP
struct {
@@ -412,6 +421,15 @@ typedef struct {
void (*completed_arg)(void *);
} dd;
#endif
+#ifdef ERTS_USE_ASYNC_READY_Q
+ struct {
+#ifdef ERTS_SMP
+ int need_thr_prgr;
+ ErtsThrPrgrVal thr_prgr;
+#endif
+ void *queue;
+ } async_ready;
+#endif
} ErtsAuxWorkData;
struct ErtsSchedulerData_ {
@@ -1090,12 +1108,17 @@ Eterm erts_multi_scheduling_blockers(Process *);
void erts_start_schedulers(void);
void erts_alloc_notify_delayed_dealloc(int);
void erts_smp_notify_check_children_needed(void);
-void
-erts_smp_schedule_misc_aux_work(int ignore_self,
- int max_sched,
- void (*func)(void *),
- void *arg);
#endif
+#if ERTS_USE_ASYNC_READY_Q
+void erts_notify_check_async_ready_queue(void *);
+#endif
+void erts_schedule_misc_aux_work(int sched_id,
+ void (*func)(void *),
+ void *arg);
+void erts_schedule_multi_misc_aux_work(int ignore_self,
+ int max_sched,
+ void (*func)(void *),
+ void *arg);
erts_aint32_t erts_set_aux_work_timeout(int, erts_aint32_t, int);
void erts_sched_notify_check_cpu_bind(void);
Uint erts_active_schedulers(void);
diff --git a/erts/emulator/beam/erl_thr_queue.c b/erts/emulator/beam/erl_thr_queue.c
new file mode 100644
index 0000000000..9ac4cd4b8e
--- /dev/null
+++ b/erts/emulator/beam/erl_thr_queue.c
@@ -0,0 +1,745 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2011. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+/*
+ * Description: Lock-free queue for communication between threads.
+ *
+ * Currently only a many-to-one version has been,
+ * implemented, i.e., many threads can enqueue but
+ * only one thread can dequeue at a time. It doesn't
+ * have to be the same thread dequeuing every time, but
+ * synchronization so that only one thread dequeues
+ * at a time has to be provided by other means.
+ *
+ * When/If the need for a many-to-many queue arises,
+ * this implementation can relatively easy be extended
+ * to support that too.
+ *
+ * Usage instructions below.
+ *
+ * Author: Rickard Green
+ */
+
+/*
+ * ------ Usage instructions -----------------------------------------------
+ *
+ * Dequeuing generates garbage that needs to be cleaned up.
+ * erts_thr_q_dequeue() automatically cleans, but garbage may have to be
+ * cleaned up also when the queue is empty. This is done by calling
+ * erts_thr_q_clean(). In the SMP case thread progress may have to be made
+ * before cleaning can continue. If so, erts_thr_q_need_thr_progress() in
+ * combination with erts_thr_progress_wakeup() can be used in order to
+ * request a wakeup at appropriate time.
+ *
+ * Enqueuing implies memory allocation and dequeuing implies memory
+ * deallocation. Memory allocation can be moved to another more suitable
+ * thread using erts_thr_q_prepare_enqueue() together with
+ * erts_thr_q_enqueue_prepared() instead of using erts_thr_q_enqueue().
+ * Memory deallocation can can be moved to another more suitable thread by
+ * disabling auto_finalize_dequeue when initializing the queue and then use
+ * erts_thr_q_get_finalize_dequeue_data() together
+ * erts_thr_q_finalize_dequeue() after dequeuing or cleaning.
+ *
+ * Ending the life of the queue using either erts_thr_q_destroy()
+ * or erts_thr_q_finalize() impies cleaning the queue. Both functions
+ * return the cleaning result and may have to be called multiple times
+ * until the queue is clean. Once one of these functions have been called
+ * enqueuing is not allowed. This has to be synchronized by the user.
+ * If auto_finalize_dequeue has been disabled, the finalize dequeue
+ * functionality has to be called after ending the life of the queue just
+ * as when dequeuing or cleaning on a queue that is alive.
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "erl_thr_queue.h"
+
+#if defined(DEBUG)
+#define ERTS_THR_Q_DBG_CHK_DATA 1
+#else
+#define ERTS_THR_Q_DBG_CHK_DATA 0
+#endif
+
+#define ERTS_THR_Q_MAX_CLEAN_REACHED_HEAD_COUNT 100
+#define ERTS_THR_Q_MAX_SCHED_CLEAN_OPS 50
+#define ERTS_THR_Q_MAX_DEQUEUE_CLEAN_OPS 3
+
+#define ERTS_THR_Q_MAX_FINI_DEQ_OPS 50
+
+#ifdef ERTS_SMP
+ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(sl_element,
+ ErtsThrQElement_t,
+ 1000,
+ ERTS_ALC_T_THR_Q_EL_SL)
+#else
+
+static void
+init_sl_element_alloc(void)
+{
+}
+
+static ErtsThrQElement_t *
+sl_element_alloc(void)
+{
+ return erts_alloc(ERTS_ALC_T_THR_Q_EL_SL,
+ sizeof(ErtsThrQElement_t));
+}
+
+static void
+sl_element_free(ErtsThrQElement_t *p)
+{
+ erts_free(ERTS_ALC_T_THR_Q_EL_SL, p);
+}
+
+#endif
+
+typedef union {
+ ErtsThrQ_t q;
+ char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQ_t))];
+} ErtsAlignedThrQ_t;
+
+void
+erts_thr_q_init(void)
+{
+ init_sl_element_alloc();
+}
+
+static void noop_callback(void *arg) { }
+
+void
+erts_thr_q_initialize(ErtsThrQ_t *q, ErtsThrQInit_t *qi)
+{
+#ifndef USE_THREADS
+ q->init = *qi;
+ if (!q->init.notify)
+ q->init.notify = noop_callback;
+ q->first = NULL;
+ q->last = NULL;
+ q->q.blk = NULL;
+#else
+ erts_atomic_init_nob(&q->tail.data.marker.next.atmc, ERTS_AINT_NULL);
+ q->tail.data.marker.data.ptr = NULL;
+ erts_atomic_init_nob(&q->tail.data.last,
+ (erts_aint_t) &q->tail.data.marker);
+ erts_atomic_init_nob(&q->tail.data.um_refc[0], 0);
+ erts_atomic_init_nob(&q->tail.data.um_refc[1], 0);
+ erts_atomic32_init_nob(&q->tail.data.um_refc_ix, 0);
+ q->tail.data.live = qi->live.objects;
+ q->tail.data.arg = qi->arg;
+ q->tail.data.notify = qi->notify;
+ if (!q->tail.data.notify)
+ q->tail.data.notify = noop_callback;
+
+ q->head.head.ptr = &q->tail.data.marker;
+ q->head.live = qi->live.objects;
+ q->head.first = &q->tail.data.marker;
+ q->head.unref_end = &q->tail.data.marker;
+ q->head.clean_reached_head_count = 0;
+ q->head.deq_fini.automatic = qi->auto_finalize_dequeue;
+ q->head.deq_fini.start = NULL;
+ q->head.deq_fini.end = NULL;
+#ifdef ERTS_SMP
+ q->head.next.thr_progress = erts_thr_progress_current();
+ q->head.next.thr_progress_reached = 1;
+#endif
+ q->head.next.um_refc_ix = 1;
+ q->head.next.unref_end = &q->tail.data.marker;
+ q->head.used_marker = 1;
+ q->head.arg = qi->arg;
+ q->head.notify = q->tail.data.notify;
+ q->q.finalizing = 0;
+ q->q.live = qi->live.queue;
+ q->q.blk = NULL;
+#endif
+}
+
+ErtsThrQCleanState_t
+erts_thr_q_finalize(ErtsThrQ_t *q)
+{
+#ifdef USE_THREADS
+ q->q.finalizing = 1;
+#endif
+ while (erts_thr_q_dequeue(q));
+ return erts_thr_q_clean(q);
+}
+
+ErtsThrQ_t *
+erts_thr_q_create(ErtsThrQInit_t *qi)
+{
+ ErtsAlcType_t atype;
+ ErtsThrQ_t *q, *qblk;
+ UWord qw;
+
+ switch (qi->live.queue) {
+ case ERTS_THR_Q_LIVE_SHORT:
+ atype = ERTS_ALC_T_THR_Q_SL;
+ break;
+ case ERTS_THR_Q_LIVE_LONG:
+ atype = ERTS_ALC_T_THR_Q_LL;
+ break;
+ default:
+ atype = ERTS_ALC_T_THR_Q;
+ break;
+ }
+
+ qw = (UWord) erts_alloc(atype,
+ sizeof(ErtsThrQ_t) + (ERTS_CACHE_LINE_SIZE-1));
+ qblk = (ErtsThrQ_t *) qw;
+ if (qw & ERTS_CACHE_LINE_MASK)
+ qw = (qw & ~ERTS_CACHE_LINE_MASK) + ERTS_CACHE_LINE_SIZE;
+ ASSERT((qw & ERTS_CACHE_LINE_MASK) == 0);
+ q = (ErtsThrQ_t *) qw;
+ erts_thr_q_initialize(q, qi);
+ q->q.blk = qblk;
+ return q;
+}
+
+ErtsThrQCleanState_t
+erts_thr_q_destroy(ErtsThrQ_t *q)
+{
+ if (!q->q.blk)
+ erl_exit(ERTS_ABORT_EXIT,
+ "Trying to destroy not created thread queue\n");
+ return erts_thr_q_finalize(q);
+}
+
+#ifdef USE_THREADS
+
+static void
+destroy(ErtsThrQ_t *q)
+{
+ ErtsAlcType_t atype;
+ switch (q->q.live) {
+ case ERTS_THR_Q_LIVE_SHORT:
+ atype = ERTS_ALC_T_THR_Q_SL;
+ break;
+ case ERTS_THR_Q_LIVE_LONG:
+ atype = ERTS_ALC_T_THR_Q_LL;
+ break;
+ default:
+ atype = ERTS_ALC_T_THR_Q;
+ break;
+ }
+ erts_free(atype, q->q.blk);
+}
+
+#endif
+
+static ERTS_INLINE ErtsThrQElement_t *
+element_live_alloc(ErtsThrQLive_t live)
+{
+ switch (live) {
+ case ERTS_THR_Q_LIVE_SHORT:
+ return sl_element_alloc();
+ default:
+ return (ErtsThrQElement_t *) erts_alloc(ERTS_ALC_T_THR_Q_EL,
+ sizeof(ErtsThrQElement_t));
+ }
+}
+
+static ERTS_INLINE ErtsThrQElement_t *
+element_alloc(ErtsThrQ_t *q)
+{
+ ErtsThrQLive_t live;
+#ifdef USE_THREADS
+ live = q->tail.data.live;
+#else
+ live = q->init.live.objects;
+#endif
+ return element_live_alloc(live);
+}
+
+static ERTS_INLINE void
+element_live_free(ErtsThrQLive_t live, ErtsThrQElement_t *el)
+{
+ switch (live) {
+ case ERTS_THR_Q_LIVE_SHORT:
+ sl_element_free(el);
+ break;
+ default:
+ erts_free(ERTS_ALC_T_THR_Q_EL, el);
+ }
+}
+
+static ERTS_INLINE void
+element_free(ErtsThrQ_t *q, ErtsThrQElement_t *el)
+{
+ ErtsThrQLive_t live;
+#ifdef USE_THREADS
+ live = q->head.live;
+#else
+ live = q->init.live.objects;
+#endif
+ element_live_free(live, el);
+}
+
+#ifdef USE_THREADS
+
+static ERTS_INLINE ErtsThrQElement_t *
+enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last)
+{
+ erts_aint_t ilast, itmp;
+
+ erts_atomic_init_nob(&this->next.atmc, ERTS_AINT_NULL);
+ /* Enqueue at end of list... */
+
+ ilast = erts_atomic_read_nob(&q->tail.data.last);
+ while (1) {
+ ErtsThrQElement_t *last = (ErtsThrQElement_t *) ilast;
+ itmp = erts_atomic_cmpxchg_mb(&last->next.atmc,
+ (erts_aint_t) this,
+ ERTS_AINT_NULL);
+ if (itmp == ERTS_AINT_NULL)
+ break;
+ ilast = itmp;
+ }
+
+ /* Move last pointer forward... */
+ while (1) {
+ if (want_last) {
+ if (erts_atomic_read_rb(&this->next.atmc) != ERTS_AINT_NULL) {
+ /* Someone else will move it forward */
+ ilast = erts_atomic_read_rb(&q->tail.data.last);
+ return (ErtsThrQElement_t *) ilast;
+ }
+ }
+ else {
+ if (erts_atomic_read_nob(&this->next.atmc) != ERTS_AINT_NULL) {
+ /* Someone else will move it forward */
+ return NULL;
+ }
+ }
+ itmp = erts_atomic_cmpxchg_mb(&q->tail.data.last,
+ (erts_aint_t) this,
+ ilast);
+ if (ilast == itmp)
+ return want_last ? this : NULL;
+ ilast = itmp;
+ }
+}
+
+static ErtsThrQCleanState_t
+clean(ErtsThrQ_t *q, int max_ops, int do_notify)
+{
+ erts_aint_t ilast;
+ int um_refc_ix;
+ int ops;
+
+ for (ops = 0; ops < max_ops; ops++) {
+ ErtsThrQElement_t *tmp;
+ restart:
+ ASSERT(q->head.first);
+ if (q->head.first == q->head.head.ptr) {
+ q->head.clean_reached_head_count++;
+ if (q->head.clean_reached_head_count
+ >= ERTS_THR_Q_MAX_CLEAN_REACHED_HEAD_COUNT) {
+ q->head.clean_reached_head_count = 0;
+ break;
+ }
+ goto inspect_head;
+ }
+ if (q->head.first == q->head.unref_end)
+ break;
+ if (q->head.first == &q->tail.data.marker) {
+ q->head.used_marker = 0;
+ q->head.first = q->head.first->next.ptr;
+ goto restart;
+ }
+ tmp = q->head.first;
+ q->head.first = q->head.first->next.ptr;
+ if (q->head.deq_fini.automatic)
+ element_free(q, tmp);
+ else {
+ tmp->data.ptr = (void *) (UWord) q->head.live;
+ if (!q->head.deq_fini.start)
+ q->head.deq_fini.start = tmp;
+ else if (q->head.deq_fini.end->next.ptr == &q->tail.data.marker)
+ q->head.deq_fini.end->next.ptr = tmp;
+ q->head.deq_fini.end = tmp;
+ }
+ }
+
+ ilast = erts_atomic_read_nob(&q->tail.data.last);
+ if (q->head.first == ((ErtsThrQElement_t *) ilast)
+ && ((ErtsThrQElement_t *) ilast) == &q->tail.data.marker
+ && q->head.first == &q->tail.data.marker) {
+ /* Empty and clean queue */
+ if (q->q.finalizing)
+ destroy(q);
+ return ERTS_THR_Q_CLEAN;
+ }
+
+#ifdef ERTS_SMP
+ if (q->head.next.thr_progress_reached
+ || erts_thr_progress_has_reached(q->head.next.thr_progress)) {
+ q->head.next.thr_progress_reached = 1;
+#endif
+ um_refc_ix = q->head.next.um_refc_ix;
+ if (erts_atomic_read_acqb(&q->tail.data.um_refc[um_refc_ix]) == 0) {
+ /* Move unreferenced end pointer forward... */
+ q->head.clean_reached_head_count = 0;
+ q->head.unref_end = q->head.next.unref_end;
+
+ if (!q->head.used_marker
+ && q->head.unref_end == (ErtsThrQElement_t *) ilast) {
+ q->head.used_marker = 1;
+ ilast = (erts_aint_t) enqueue_managed(q,
+ &q->tail.data.marker,
+ 1);
+ if (q->head.head.ptr == q->head.unref_end) {
+ ErtsThrQElement_t *next;
+ next = ((ErtsThrQElement_t *)
+ erts_atomic_read_acqb(&q->head.head.ptr->next.atmc));
+ if (next == &q->tail.data.marker) {
+ q->head.head.ptr->next.ptr = &q->tail.data.marker;
+ q->head.head.ptr = &q->tail.data.marker;
+ }
+ }
+ }
+
+ if (q->head.unref_end == (ErtsThrQElement_t *) ilast)
+ ERTS_THR_MEMORY_BARRIER;
+ else {
+ q->head.next.unref_end = (ErtsThrQElement_t *) ilast;
+ ERTS_THR_MEMORY_BARRIER;
+#ifdef ERTS_SMP
+ q->head.next.thr_progress = erts_thr_progress_later();
+#endif
+ erts_atomic32_set_relb(&q->tail.data.um_refc_ix,
+ um_refc_ix);
+ q->head.next.um_refc_ix = um_refc_ix == 0 ? 1 : 0;
+#ifdef ERTS_SMP
+ q->head.next.thr_progress_reached = 0;
+#endif
+ }
+ }
+#ifdef ERTS_SMP
+ }
+#endif
+
+ if (q->head.first == q->head.head.ptr) {
+ inspect_head:
+ if (!q->head.used_marker) {
+ erts_aint_t inext;
+ inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+ if (inext == ERTS_AINT_NULL) {
+ q->head.used_marker = 1;
+ (void) enqueue_managed(q, &q->tail.data.marker, 0);
+ inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+ if (inext == (erts_aint_t) &q->tail.data.marker) {
+ q->head.head.ptr->next.ptr = &q->tail.data.marker;
+ q->head.head.ptr = &q->tail.data.marker;
+#ifdef ERTS_SMP
+ if (!q->head.next.thr_progress_reached)
+ return ERTS_THR_Q_NEED_THR_PRGR;
+#else
+ if (do_notify)
+ q->head.notify(q->head.arg);
+#endif
+ return ERTS_THR_Q_DIRTY;
+ }
+ }
+ }
+ return ERTS_THR_Q_CLEAN;
+ }
+
+ if (q->head.first != q->head.unref_end) {
+ if (do_notify)
+ q->head.notify(q->head.arg);
+ return ERTS_THR_Q_DIRTY;
+ }
+
+#ifdef ERTS_SMP
+ if (!q->head.next.thr_progress_reached)
+ return ERTS_THR_Q_NEED_THR_PRGR;
+#endif
+
+ return ERTS_THR_Q_CLEAN; /* Waiting for unmanaged threads to complete... */
+}
+
+#endif
+
+ErtsThrQCleanState_t
+erts_thr_q_clean(ErtsThrQ_t *q)
+{
+#ifdef USE_THREADS
+ return clean(q, ERTS_THR_Q_MAX_SCHED_CLEAN_OPS, 0);
+#else
+ return ERTS_THR_Q_CLEAN;
+#endif
+}
+
+ErtsThrQCleanState_t
+erts_thr_q_inspect(ErtsThrQ_t *q, int ensure_empty)
+{
+#ifdef USE_THREADS
+ if (ensure_empty) {
+ erts_aint_t inext;
+ inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+ if (inext != ERTS_AINT_NULL) {
+ if (&q->tail.data.marker != (ErtsThrQElement_t *) inext)
+ return ERTS_THR_Q_DIRTY;
+ else {
+ q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext;
+ q->head.head.ptr = (ErtsThrQElement_t *) inext;
+ inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+ if (inext != ERTS_AINT_NULL)
+ return ERTS_THR_Q_DIRTY;
+ }
+ }
+ }
+
+ if (q->head.first == q->head.head.ptr) {
+ if (!q->head.used_marker) {
+ erts_aint_t inext;
+ inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+ if (inext == ERTS_AINT_NULL)
+ return ERTS_THR_Q_DIRTY;
+ }
+ return ERTS_THR_Q_CLEAN;
+ }
+
+ if (q->head.first != q->head.unref_end)
+ return ERTS_THR_Q_DIRTY;
+
+#ifdef ERTS_SMP
+ if (!q->head.next.thr_progress_reached)
+ return ERTS_THR_Q_NEED_THR_PRGR;
+#endif
+#endif
+ return ERTS_THR_Q_CLEAN;
+}
+
+static void
+enqueue(ErtsThrQ_t *q, void *data, ErtsThrQElement_t *this)
+{
+#ifndef USE_THREADS
+ ASSERT(data);
+
+ this->next.ptr = NULL;
+ this->data.ptr = data;
+
+ if (q->last)
+ q->last->next.ptr = this;
+ else {
+ q->first = q->last = this;
+ q->init.notify(q->init.arg);
+ }
+#else
+ int notify;
+ int um_refc_ix = 0;
+#ifdef ERTS_SMP
+ int unmanaged_thread;
+#endif
+
+#if ERTS_THR_Q_DBG_CHK_DATA
+ if (!data)
+ erl_exit(ERTS_ABORT_EXIT, "Missing data in enqueue\n");
+#endif
+
+ ASSERT(!q->q.finalizing);
+
+ this->data.ptr = data;
+
+#ifdef ERTS_SMP
+ unmanaged_thread = !erts_thr_progress_is_managed_thread();
+ if (unmanaged_thread)
+#endif
+ {
+ um_refc_ix = erts_atomic32_read_acqb(&q->tail.data.um_refc_ix);
+ while (1) {
+ int tmp_um_refc_ix;
+ erts_atomic_inc_acqb(&q->tail.data.um_refc[um_refc_ix]);
+ tmp_um_refc_ix = erts_atomic32_read_acqb(&q->tail.data.um_refc_ix);
+ if (tmp_um_refc_ix == um_refc_ix)
+ break;
+ erts_atomic_dec_relb(&q->tail.data.um_refc[um_refc_ix]);
+ um_refc_ix = tmp_um_refc_ix;
+ }
+ }
+
+ notify = this == enqueue_managed(q, this, 1);
+
+
+#ifdef ERTS_SMP
+ if (unmanaged_thread)
+#endif
+ {
+ if (notify)
+ erts_atomic_dec_relb(&q->tail.data.um_refc[um_refc_ix]);
+ else if (erts_atomic_dec_read_relb(&q->tail.data.um_refc[um_refc_ix]) == 0)
+ notify = 1;
+ }
+ if (notify)
+ q->tail.data.notify(q->tail.data.arg);
+#endif
+}
+
+void
+erts_thr_q_enqueue(ErtsThrQ_t *q, void *data)
+{
+ enqueue(q, data, element_alloc(q));
+}
+
+ErtsThrQPrepEnQ_t *
+erts_thr_q_prepare_enqueue(ErtsThrQ_t *q)
+{
+ return (ErtsThrQPrepEnQ_t *) element_alloc(q);
+}
+
+int
+erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *q, ErtsThrQFinDeQ_t *fdp)
+{
+#ifndef USE_THREADS
+ return 0;
+#else
+#ifdef DEBUG
+ if (!q->head.deq_fini.start) {
+ ASSERT(!q->head.deq_fini.end);
+ }
+ else {
+ ErtsThrQElement_t *e = q->head.deq_fini.start;
+ ErtsThrQElement_t *end = q->head.deq_fini.end;
+ while (e != end) {
+ ASSERT(q->head.head.ptr != e);
+ ASSERT(q->head.first != e);
+ ASSERT(q->head.unref_end != e);
+ e = e->next.ptr;
+ }
+ }
+#endif
+ fdp->start = q->head.deq_fini.start;
+ fdp->end = q->head.deq_fini.end;
+ if (fdp->end)
+ fdp->end->next.ptr = NULL;
+ q->head.deq_fini.start = NULL;
+ q->head.deq_fini.end = NULL;
+ return fdp->start != NULL;
+#endif
+}
+
+void
+erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *fdp0,
+ ErtsThrQFinDeQ_t *fdp1)
+{
+#ifdef USE_THREADS
+ if (fdp1->start) {
+ if (fdp0->end)
+ fdp0->end->next.ptr = fdp1->start;
+ else
+ fdp0->start = fdp1->start;
+ fdp0->end = fdp1->end;
+ }
+#endif
+}
+
+
+int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *state)
+{
+#ifdef USE_THREADS
+ ErtsThrQElement_t *start = state->start;
+ if (start) {
+ ErtsThrQLive_t live;
+ int i;
+ for (i = 0; i < ERTS_THR_Q_MAX_FINI_DEQ_OPS; i++) {
+ ErtsThrQElement_t *tmp;
+ if (!start)
+ break;
+ tmp = start;
+ start = start->next.ptr;
+ live = (ErtsThrQLive_t) (UWord) tmp->data.ptr;
+ element_live_free(live, tmp);
+ }
+ state->start = start;
+ if (start)
+ return 1; /* More to do */
+ state->end = NULL;
+ }
+#endif
+ return 0;
+}
+
+void
+erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *state)
+{
+#ifdef USE_THREADS
+ state->start = NULL;
+ state->end = NULL;
+#endif
+}
+
+
+void
+erts_thr_q_enqueue_prepared(ErtsThrQ_t *q, void *data, ErtsThrQPrepEnQ_t *prep)
+{
+ ASSERT(prep);
+ enqueue(q, data, (ErtsThrQElement_t *) prep);
+}
+
+void *
+erts_thr_q_dequeue(ErtsThrQ_t *q)
+{
+#ifndef USE_THREADS
+ void *res;
+ ErtsThrQElement_t *tmp;
+
+ if (!q->first)
+ return NULL;
+ tmp = q->first;
+ res = tmp->data.ptr;
+ q->first = tmp->next.ptr;
+ if (!q->first)
+ q->last = NULL;
+
+ element_free(q, tmp);
+
+ return res;
+#else
+ erts_aint_t inext;
+ void *res;
+
+ inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+ if (inext == ERTS_AINT_NULL)
+ return NULL;
+ q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext;
+ q->head.head.ptr = (ErtsThrQElement_t *) inext;
+ if (q->head.head.ptr == &q->tail.data.marker) {
+ inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+ if (inext == ERTS_AINT_NULL)
+ return NULL;
+ q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext;
+ q->head.head.ptr = (ErtsThrQElement_t *) inext;
+ }
+ res = q->head.head.ptr->data.ptr;
+#if ERTS_THR_Q_DBG_CHK_DATA
+ q->head.head.ptr->data.ptr = NULL;
+ if (!res)
+ erl_exit(ERTS_ABORT_EXIT, "Missing data in dequeue\n");
+#endif
+ clean(q,
+ (q->head.deq_fini.automatic
+ ? ERTS_THR_Q_MAX_DEQUEUE_CLEAN_OPS
+ : ERTS_THR_Q_MAX_SCHED_CLEAN_OPS), 1);
+ return res;
+#endif
+}
diff --git a/erts/emulator/beam/erl_thr_queue.h b/erts/emulator/beam/erl_thr_queue.h
new file mode 100644
index 0000000000..407c23f5eb
--- /dev/null
+++ b/erts/emulator/beam/erl_thr_queue.h
@@ -0,0 +1,211 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2011. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+/*
+ * Description: Lock-free queue for communication between threads.
+ *
+ * Currently only a many-to-one version has been,
+ * implemented, i.e., many threads can enqueue but
+ * only one thread can dequeue at a time. It doesn't
+ * have to be the same thread dequeuing every time, but
+ * synchronization so that only one thread dequeues
+ * at a time has to be provided by other means.
+ *
+ * When/If the need for a many-to-many queue arises,
+ * this implementation can relatively easy be extended
+ * to support that too.
+ *
+ * Usage instructions can be found in erts_thr_queue.c
+ *
+ * Author: Rickard Green
+ */
+
+#ifndef ERL_THR_QUEUE_H__
+#define ERL_THR_QUEUE_H__
+
+#include "sys.h"
+#include "erl_threads.h"
+#include "erl_alloc.h"
+#include "erl_thr_progress.h"
+
+typedef enum {
+ ERTS_THR_Q_LIVE_UNDEF,
+ ERTS_THR_Q_LIVE_SHORT,
+ ERTS_THR_Q_LIVE_LONG
+} ErtsThrQLive_t;
+
+#define ERTS_THR_Q_INIT_DEFAULT \
+{ \
+ { \
+ ERTS_THR_Q_LIVE_UNDEF, \
+ ERTS_THR_Q_LIVE_SHORT \
+ }, \
+ NULL, \
+ NULL, \
+ 1 \
+}
+
+typedef struct ErtsThrQ_t_ ErtsThrQ_t;
+
+typedef struct {
+ struct {
+ ErtsThrQLive_t queue;
+ ErtsThrQLive_t objects;
+ } live;
+ void *arg;
+ void (*notify)(void *);
+ int auto_finalize_dequeue;
+} ErtsThrQInit_t;
+
+typedef struct ErtsThrQElement_t_ ErtsThrQElement_t;
+typedef struct ErtsThrQElement_t ErtsThrQPrepEnQ_t;
+
+typedef union {
+ erts_atomic_t atmc;
+ ErtsThrQElement_t *ptr;
+} ErtsThrQPtr_t;
+
+struct ErtsThrQElement_t_ {
+ ErtsThrQPtr_t next;
+ union {
+ erts_atomic_t atmc;
+ void *ptr;
+ } data;
+};
+
+typedef struct {
+ ErtsThrQElement_t *start;
+ ErtsThrQElement_t *end;
+} ErtsThrQFinDeQ_t;
+
+typedef enum {
+ ERTS_THR_Q_CLEAN,
+#ifdef ERTS_SMP
+ ERTS_THR_Q_NEED_THR_PRGR,
+#endif
+ ERTS_THR_Q_DIRTY,
+} ErtsThrQCleanState_t;
+
+#ifdef USE_THREADS
+
+typedef struct {
+ ErtsThrQElement_t marker;
+ erts_atomic_t last;
+ erts_atomic_t um_refc[2];
+ erts_atomic32_t um_refc_ix;
+ ErtsThrQLive_t live;
+#ifdef ERTS_SMP
+ erts_atomic32_t thr_prgr_clean_scheduled;
+#endif
+ void *arg;
+ void (*notify)(void *);
+} ErtsThrQTail_t;
+
+struct ErtsThrQ_t_ {
+ /*
+ * This structure needs to be cache line aligned for best
+ * performance.
+ */
+ union {
+ /* Modified by threads enqueuing */
+ ErtsThrQTail_t data;
+ char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQTail_t))];
+ } tail;
+ /*
+ * Everything below this point is *only* accessed by the
+ * thread dequeuing.
+ */
+ struct {
+ ErtsThrQPtr_t head;
+ ErtsThrQLive_t live;
+ ErtsThrQElement_t *first;
+ ErtsThrQElement_t *unref_end;
+ int clean_reached_head_count;
+ struct {
+ int automatic;
+ ErtsThrQElement_t *start;
+ ErtsThrQElement_t *end;
+ } deq_fini;
+ struct {
+#ifdef ERTS_SMP
+ ErtsThrPrgrVal thr_progress;
+ int thr_progress_reached;
+#endif
+ int um_refc_ix;
+ ErtsThrQElement_t *unref_end;
+ } next;
+ int used_marker;
+ void *arg;
+ void (*notify)(void *);
+ } head;
+ struct {
+ int finalizing;
+ ErtsThrQLive_t live;
+ void *blk;
+ } q;
+};
+
+#else /* !USE_THREADS */
+
+struct ErtsThrQ_t_ {
+ ErtsThrQInit_t init;
+ ErtsThrQElement_t *first;
+ ErtsThrQElement_t *last;
+ struct {
+ void *blk;
+ } q;
+};
+
+#endif
+
+void erts_thr_q_init(void);
+void erts_thr_q_initialize(ErtsThrQ_t *, ErtsThrQInit_t *);
+ErtsThrQCleanState_t erts_thr_q_finalize(ErtsThrQ_t *);
+ErtsThrQ_t *erts_thr_q_create(ErtsThrQInit_t *);
+ErtsThrQCleanState_t erts_thr_q_destroy(ErtsThrQ_t *);
+ErtsThrQCleanState_t erts_thr_q_clean(ErtsThrQ_t *);
+ErtsThrQCleanState_t erts_thr_q_inspect(ErtsThrQ_t *, int);
+ErtsThrQPrepEnQ_t *erts_thr_q_prepare_enqueue(ErtsThrQ_t *);
+void erts_thr_q_enqueue_prepared(ErtsThrQ_t *, void *, ErtsThrQPrepEnQ_t *);
+void erts_thr_q_enqueue(ErtsThrQ_t *, void *);
+void * erts_thr_q_dequeue(ErtsThrQ_t *);
+int erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *,
+ ErtsThrQFinDeQ_t *);
+void erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *,
+ ErtsThrQFinDeQ_t *);
+int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *);
+void erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *);
+
+#ifdef ERTS_SMP
+ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_q_need_thr_progress(ErtsThrQ_t *q);
+#endif
+
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+#ifdef ERTS_SMP
+ERTS_GLB_INLINE ErtsThrPrgrVal
+erts_thr_q_need_thr_progress(ErtsThrQ_t *q)
+{
+ return q->head.next.thr_progress;
+}
+#endif
+
+#endif /* ERTS_GLB_INLINE_INCL_FUNC_DEF */
+
+#endif /* ERL_THR_QUEUE_H__ */
diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h
index 684e910fc3..4a4973baab 100644
--- a/erts/emulator/beam/global.h
+++ b/erts/emulator/beam/global.h
@@ -42,12 +42,6 @@
typedef struct port Port;
#include "erl_port_task.h"
-#define ERTS_MAX_NO_OF_ASYNC_THREADS 1024
-extern int erts_async_max_threads;
-#define ERTS_ASYNC_THREAD_MIN_STACK_SIZE 16 /* Kilo words */
-#define ERTS_ASYNC_THREAD_MAX_STACK_SIZE 8192 /* Kilo words */
-extern int erts_async_thread_suggested_stack_size;
-
typedef struct erts_driver_t_ erts_driver_t;
#define SMALL_IO_QUEUE 5 /* Number of fixed elements */
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 151c776a3d..fff720634d 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -42,6 +42,7 @@
#include "erl_bits.h"
#include "erl_version.h"
#include "error.h"
+#include "erl_async.h"
extern ErlDrvEntry fd_driver_entry;
extern ErlDrvEntry vanilla_driver_entry;
@@ -4579,7 +4580,10 @@ int driver_lock_driver(ErlDrvPort ix)
erts_smp_mtx_lock(&erts_driver_list_lock);
- if (prt == NULL) return -1;
+ if (prt == NULL) {
+ erts_smp_mtx_unlock(&erts_driver_list_lock);
+ return -1;
+ }
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
if ((dh = (DE_Handle*)prt->drv_ptr->handle ) == NULL) {
diff --git a/erts/emulator/beam/sys.h b/erts/emulator/beam/sys.h
index b63fe98f27..f9cbcc5892 100644
--- a/erts/emulator/beam/sys.h
+++ b/erts/emulator/beam/sys.h
@@ -475,15 +475,6 @@ __decl_noreturn void __noreturn erl_exit(int n, char*, ...);
#define ERTS_ABORT_EXIT (INT_MIN + 1) /* no crash dump; only abort() */
#define ERTS_DUMP_EXIT (127) /* crash dump; then exit() */
-
-#ifndef ERTS_SMP
-int check_async_ready(void);
-#ifdef USE_THREADS
-void sys_async_ready(int hndl);
-int erts_register_async_ready_callback(void (*funcp)(void));
-#endif
-#endif
-
Eterm erts_check_io_info(void *p);
/* Size of misc memory allocated from system dependent code */
@@ -671,6 +662,8 @@ int erts_sys_putenv(char *key_value, int sep_ix);
*size), a value > 0 if value buffer is too small (*size is set to needed
size), and a value < 0 on failure. */
int erts_sys_getenv(char *key, char *value, size_t *size);
+/* erts_sys_getenv__() is only allowed to be used in early init phase */
+int erts_sys_getenv__(char *key, char *value, size_t *size);
/* Easier to use, but not as efficient, environment functions */
char *erts_read_env(char *key);
diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c
index 65485241aa..1bd178f280 100644
--- a/erts/emulator/beam/utils.c
+++ b/erts/emulator/beam/utils.c
@@ -43,6 +43,7 @@
#include "erl_smp.h"
#include "erl_time.h"
#include "erl_thr_progress.h"
+#include "erl_thr_queue.h"
#include "erl_sched_spec_pre_alloc.h"
#undef M_TRIM_THRESHOLD
diff --git a/erts/emulator/sys/unix/sys.c b/erts/emulator/sys/unix/sys.c
index d7c4812dad..c6b63350e5 100644
--- a/erts/emulator/sys/unix/sys.c
+++ b/erts/emulator/sys/unix/sys.c
@@ -128,7 +128,6 @@ static ErtsSysReportExit *report_exit_list;
static ErtsSysReportExit *report_exit_transit_list;
#endif
-extern int check_async_ready(void);
extern int driver_interrupt(int, int);
extern void do_break(void);
@@ -1120,31 +1119,6 @@ struct erl_drv_entry vanilla_driver_entry = {
stop_select
};
-#if defined(USE_THREADS) && !defined(ERTS_SMP)
-static int async_drv_init(void);
-static ErlDrvData async_drv_start(ErlDrvPort, char*, SysDriverOpts*);
-static void async_drv_stop(ErlDrvData);
-static void async_drv_input(ErlDrvData, ErlDrvEvent);
-
-/* INTERNAL use only */
-
-struct erl_drv_entry async_driver_entry = {
- async_drv_init,
- async_drv_start,
- async_drv_stop,
- NULL,
- async_drv_input,
- NULL,
- "async",
- NULL,
- NULL,
- NULL,
- NULL,
- NULL,
- NULL
-};
-#endif
-
/* Handle SIGCHLD signals. */
#if (defined(SIG_SIGSET) || defined(SIG_SIGNAL))
static RETSIGTYPE onchld(void)
@@ -2329,87 +2303,6 @@ static void stop_select(ErlDrvEvent fd, void* _)
close((int)fd);
}
-/*
-** Async opertation support
-*/
-#if defined(USE_THREADS) && !defined(ERTS_SMP)
-static void
-sys_async_ready_failed(int fd, int r, int err)
-{
- char buf[120];
- sprintf(buf, "sys_async_ready(): Fatal error: fd=%d, r=%d, errno=%d\n",
- fd, r, err);
- erts_silence_warn_unused_result(write(2, buf, strlen(buf)));
- abort();
-}
-
-/* called from threads !! */
-void sys_async_ready(int fd)
-{
- int r;
- while (1) {
- r = write(fd, "0", 1); /* signal main thread fd MUST be async_fd[1] */
- if (r == 1) {
- DEBUGF(("sys_async_ready(): r = 1\r\n"));
- break;
- }
- if (r < 0 && errno == EINTR) {
- DEBUGF(("sys_async_ready(): r = %d\r\n", r));
- continue;
- }
- sys_async_ready_failed(fd, r, errno);
- }
-}
-
-static int async_drv_init(void)
-{
- async_fd[0] = -1;
- async_fd[1] = -1;
- return 0;
-}
-
-static ErlDrvData async_drv_start(ErlDrvPort port_num,
- char* name, SysDriverOpts* opts)
-{
- if (async_fd[0] != -1)
- return ERL_DRV_ERROR_GENERAL;
- if (pipe(async_fd) < 0)
- return ERL_DRV_ERROR_GENERAL;
-
- DEBUGF(("async_drv_start: %d\r\n", port_num));
-
- SET_NONBLOCKING(async_fd[0]);
- driver_select(port_num, async_fd[0], ERL_DRV_READ, 1);
-
- if (init_async(async_fd[1]) < 0)
- return ERL_DRV_ERROR_GENERAL;
- return (ErlDrvData)port_num;
-}
-
-static void async_drv_stop(ErlDrvData e)
-{
- int port_num = (int)(long)e;
-
- DEBUGF(("async_drv_stop: %d\r\n", port_num));
-
- exit_async();
-
- driver_select(port_num, async_fd[0], ERL_DRV_READ, 0);
-
- close(async_fd[0]);
- close(async_fd[1]);
- async_fd[0] = async_fd[1] = -1;
-}
-
-
-static void async_drv_input(ErlDrvData e, ErlDrvEvent fd)
-{
- char *buf[32];
- DEBUGF(("async_drv_input\r\n"));
- while (read((int) fd, (void *) buf, 32) > 0); /* fd MUST be async_fd[0] */
- check_async_ready(); /* invoke all async_ready */
-}
-#endif
void erts_do_break_handling(void)
{
@@ -2483,12 +2376,10 @@ erts_sys_putenv(char *buffer, int sep_ix)
}
int
-erts_sys_getenv(char *key, char *value, size_t *size)
+erts_sys_getenv__(char *key, char *value, size_t *size)
{
- char *orig_value;
int res;
- erts_smp_rwmtx_rlock(&environ_rwmtx);
- orig_value = getenv(key);
+ char *orig_value = getenv(key);
if (!orig_value)
res = -1;
else {
@@ -2503,6 +2394,15 @@ erts_sys_getenv(char *key, char *value, size_t *size)
res = 0;
}
}
+ return res;
+}
+
+int
+erts_sys_getenv(char *key, char *value, size_t *size)
+{
+ int res;
+ erts_smp_rwmtx_rlock(&environ_rwmtx);
+ res = erts_sys_getenv__(key, value, size);
erts_smp_rwmtx_runlock(&environ_rwmtx);
return res;
}
@@ -2514,31 +2414,6 @@ sys_init_io(void)
erts_alloc(ERTS_ALC_T_FD_TAB, max_files * sizeof(struct fd_data));
erts_smp_atomic_add_nob(&sys_misc_mem_sz,
max_files * sizeof(struct fd_data));
-
-#ifdef USE_THREADS
-#ifdef ERTS_SMP
- if (init_async(-1) < 0)
- erl_exit(1, "Failed to initialize async-threads\n");
-#else
- {
- /* This is speical stuff, starting a driver from the
- * system routines, but is a nice way of handling stuff
- * the erlang way
- */
- SysDriverOpts dopts;
- int ret;
-
- sys_memset((void*)&dopts, 0, sizeof(SysDriverOpts));
- add_driver_entry(&async_driver_entry);
- ret = erts_open_driver(NULL, NIL, "async", &dopts, NULL);
- DEBUGF(("open_driver = %d\n", ret));
- if (ret < 0)
- erl_exit(1, "Failed to open async driver\n");
- erts_port[ret].status |= ERTS_PORT_SFLG_IMMORTAL;
- }
-#endif
-#endif
-
}
#if (0) /* unused? */
@@ -2765,15 +2640,7 @@ initiate_report_exit_status(ErtsSysReportExit *rep, int status)
rep->next = report_exit_transit_list;
rep->status = status;
report_exit_transit_list = rep;
- /*
- * We need the scheduler thread to call check_children().
- * If the scheduler thread is sleeping in a poll with a
- * timeout, we need to wake the scheduler thread. We use the
- * functionality of the async driver to do this, instead of
- * implementing yet another driver doing the same thing. A
- * little bit ugly, but it works...
- */
- sys_async_ready(async_fd[1]);
+ erts_sys_schedule_interrupt(1);
}
static int check_children(void)
@@ -2860,19 +2727,11 @@ erl_sys_schedule(int runnable)
{
#ifdef ERTS_SMP
ERTS_CHK_IO(!runnable);
- ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
#else
- if (runnable) {
- ERTS_CHK_IO(0); /* Poll for I/O */
- check_async_ready(); /* Check async completions */
- } else {
- int wait_for_io = !check_async_ready();
- if (wait_for_io)
- wait_for_io = !check_children();
- ERTS_CHK_IO(wait_for_io);
- }
- (void) check_children();
+ ERTS_CHK_IO(runnable ? 0 : !check_children());
#endif
+ ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
+ (void) check_children();
}
diff --git a/erts/emulator/sys/vxworks/sys.c b/erts/emulator/sys/vxworks/sys.c
index 97a2ae7f7b..d6d1fe64e0 100644
--- a/erts/emulator/sys/vxworks/sys.c
+++ b/erts/emulator/sys/vxworks/sys.c
@@ -1520,6 +1520,12 @@ erts_sys_getenv(char *key, char *value, size_t *size)
return res;
}
+int
+erts_sys_getenv__(char *key, char *value, size_t *size)
+{
+ return erts_sys_getenv(key, value, size);
+}
+
void
sys_init_io(void)
{
diff --git a/erts/emulator/sys/win32/sys.c b/erts/emulator/sys/win32/sys.c
index ace1e1fca0..02d16b83a2 100644
--- a/erts/emulator/sys/win32/sys.c
+++ b/erts/emulator/sys/win32/sys.c
@@ -566,51 +566,6 @@ struct erl_drv_entry vanilla_driver_entry = {
stop_select
};
-#if defined(USE_THREADS) && !defined(ERTS_SMP)
-
-static int async_drv_init(void);
-static ErlDrvData async_drv_start(ErlDrvPort, char*, SysDriverOpts*);
-static void async_drv_stop(ErlDrvData);
-static void async_drv_input(ErlDrvData, ErlDrvEvent);
-
-/* INTERNAL use only */
-
-void null_output(ErlDrvData drv_data, char* buf, int len)
-{
-}
-
-void null_ready_output(ErlDrvData drv_data, ErlDrvEvent event)
-{
-}
-
-struct erl_drv_entry async_driver_entry = {
- async_drv_init,
- async_drv_start,
- async_drv_stop,
- null_output,
- async_drv_input,
- null_ready_output,
- "async",
- NULL, /* finish */
- NULL, /* handle */
- NULL, /* control */
- NULL, /* timeout */
- NULL, /* outputv */
- NULL, /* ready_async */
- NULL, /* flush */
- NULL, /* call */
- NULL, /* event */
- ERL_DRV_EXTENDED_MARKER,
- ERL_DRV_EXTENDED_MAJOR_VERSION,
- ERL_DRV_EXTENDED_MINOR_VERSION,
- 0, /* ERL_DRV_FLAGs */
- NULL,
- NULL, /* process_exit */
- stop_select
-};
-
-#endif
-
/*
* Initialises a DriverData structure.
*
@@ -2825,30 +2780,6 @@ sys_init_io(void)
We estimate the number to twice the amount of ports.
We really dont know on windows, do we? */
max_files = 2*erts_max_ports;
-
-#ifdef USE_THREADS
-#ifdef ERTS_SMP
- if (init_async(-1) < 0)
- erl_exit(1, "Failed to initialize async-threads\n");
-#else
- {
- /* This is special stuff, starting a driver from the
- * system routines, but is a nice way of handling stuff
- * the erlang way
- */
- SysDriverOpts dopts;
- int ret;
-
- sys_memset((void*)&dopts, 0, sizeof(SysDriverOpts));
- add_driver_entry(&async_driver_entry);
- ret = erts_open_driver(NULL, NIL, "async", &dopts, NULL);
- DEBUGF(("open_driver = %d\n", ret));
- if (ret < 0)
- erl_exit(1, "Failed to open async driver\n");
- erts_port[ret].status |= ERTS_PORT_SFLG_IMMORTAL;
- }
-#endif
-#endif
}
#ifdef ERTS_SMP
@@ -3382,75 +3313,7 @@ erts_sys_schedule_interrupt_timed(int set, long msec)
void
erl_sys_schedule(int runnable)
{
-#ifdef ERTS_SMP
erts_check_io(!runnable);
ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
-#else
- if (runnable) {
- erts_check_io(0); /* Poll for I/O */
- check_async_ready(); /* Check async completions */
- } else {
- erts_check_io(check_async_ready() ? 0 : 1);
- }
-#endif
-}
-
-#if defined(USE_THREADS) && !defined(ERTS_SMP)
-/*
- * Async operation support.
- */
-
-static ErlDrvEvent async_drv_event;
-
-void
-sys_async_ready(int fd)
-{
- SetEvent((HANDLE)async_drv_event);
}
-static int
-async_drv_init(void)
-{
- async_drv_event = (ErlDrvEvent) NULL;
- return 0;
-}
-
-static ErlDrvData
-async_drv_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts)
-{
- if (async_drv_event != (ErlDrvEvent) NULL) {
- return ERL_DRV_ERROR_GENERAL;
- }
- if ((async_drv_event = (ErlDrvEvent)CreateAutoEvent(FALSE)) == (ErlDrvEvent) NULL) {
- return ERL_DRV_ERROR_GENERAL;
- }
-
- driver_select(port_num, async_drv_event, ERL_DRV_READ|ERL_DRV_USE, 1);
- if (init_async(async_drv_event) < 0) {
- return ERL_DRV_ERROR_GENERAL;
- }
- return (ErlDrvData)port_num;
-}
-
-static void
-async_drv_stop(ErlDrvData port_num)
-{
- exit_async();
- driver_select((ErlDrvPort)port_num, async_drv_event, ERL_DRV_READ|ERL_DRV_USE, 0);
- /*CloseHandle((HANDLE)async_drv_event);*/
- async_drv_event = (ErlDrvEvent) NULL;
-}
-
-
-static void
-async_drv_input(ErlDrvData port_num, ErlDrvEvent e)
-{
- check_async_ready();
-
- /*
- * Our event is auto-resetting.
- */
-}
-
-#endif
-
diff --git a/erts/emulator/sys/win32/sys_env.c b/erts/emulator/sys/win32/sys_env.c
index 02c8433a10..7acc7f07ee 100644
--- a/erts/emulator/sys/win32/sys_env.c
+++ b/erts/emulator/sys/win32/sys_env.c
@@ -55,19 +55,17 @@ erts_sys_putenv(char *key_value, int sep_ix)
}
int
-erts_sys_getenv(char *key, char *value, size_t *size)
+erts_sys_getenv__(char *key, char *value, size_t *size)
{
size_t req_size = 0;
int res = 0;
DWORD new_size;
- erts_smp_rwmtx_rlock(&environ_rwmtx);
SetLastError(0);
new_size = GetEnvironmentVariable((LPCTSTR) key,
(LPTSTR) value,
(DWORD) *size);
res = !new_size && GetLastError() == ERROR_ENVVAR_NOT_FOUND ? -1 : 0;
- erts_smp_rwmtx_runlock(&environ_rwmtx);
if (res < 0)
return res;
res = new_size > *size ? 1 : 0;
@@ -75,6 +73,16 @@ erts_sys_getenv(char *key, char *value, size_t *size)
return res;
}
+int
+erts_sys_getenv(char *key, char *value, size_t *size)
+{
+ int res;
+ erts_smp_rwmtx_rlock(&environ_rwmtx);
+ res = erts_sys_getenv__(key, value, size);
+ erts_smp_rwmtx_runlock(&environ_rwmtx);
+ return res;
+}
+
struct win32_getenv_state {
char *env;
char *next;
diff --git a/erts/emulator/test/driver_SUITE.erl b/erts/emulator/test/driver_SUITE.erl
index bcb0257ed1..c07dbc5871 100644
--- a/erts/emulator/test/driver_SUITE.erl
+++ b/erts/emulator/test/driver_SUITE.erl
@@ -76,7 +76,8 @@
driver_select_use/1,
thread_mseg_alloc_cache_clean/1,
otp_9302/1,
- thr_free_drv/1]).
+ thr_free_drv/1,
+ async_blast/1]).
-export([bin_prefix/2]).
@@ -145,7 +146,8 @@ all() ->
smp_select, driver_select_use,
thread_mseg_alloc_cache_clean,
otp_9302,
- thr_free_drv].
+ thr_free_drv,
+ async_blast].
groups() ->
[{timer, [],
@@ -1911,17 +1913,30 @@ otp_9302(Config) when is_list(Config) ->
?line port_command(Port, ""),
?line {msg, block} = get_port_msg(Port, infinity),
?line {msg, job} = get_port_msg(Port, infinity),
- ?line case erlang:system_info(thread_pool_size) of
- 0 ->
- {msg, cancel} = get_port_msg(Port, infinity);
- _ ->
- ok
- end,
- ?line {msg, job} = get_port_msg(Port, infinity),
+ ?line C = case erlang:system_info(thread_pool_size) of
+ 0 ->
+ ?line {msg, cancel} = get_port_msg(Port, infinity),
+ ?line {msg, job} = get_port_msg(Port, infinity),
+ ?line false;
+ _ ->
+ case get_port_msg(Port, infinity) of
+ {msg, cancel} -> %% Cancel always fail in Rel >= 15
+ ?line {msg, job} = get_port_msg(Port, infinity),
+ ?line false;
+ {msg, job} ->
+ ?line ok,
+ ?line true
+ end
+ end,
?line {msg, end_of_jobs} = get_port_msg(Port, infinity),
?line no_msg = get_port_msg(Port, 2000),
?line port_close(Port),
- ?line ok.
+ ?line case C of
+ true ->
+ ?line {comment, "Async job cancelled"};
+ false ->
+ ?line {comment, "Async job not cancelled"}
+ end.
thr_free_drv(Config) when is_list(Config) ->
?line Path = ?config(data_dir, Config),
@@ -1954,6 +1969,48 @@ thr_free_drv_control(Port, N) ->
% io:format("N=~p, SID=~p", [N, erlang:system_info(scheduler_id)]),
thr_free_drv_control(Port, N+1)
end.
+
+async_blast(Config) when is_list(Config) ->
+ ?line Path = ?config(data_dir, Config),
+ ?line erl_ddll:start(),
+ ?line ok = load_driver(Path, async_blast_drv),
+ ?line SchedOnln = erlang:system_info(schedulers_online),
+ ?line MemBefore = driver_alloc_size(),
+ ?line Start = os:timestamp(),
+ ?line Blast = fun () ->
+ Port = open_port({spawn, async_blast_drv}, []),
+ true = is_port(Port),
+ port_command(Port, ""),
+ receive
+ {Port, done} ->
+ ok
+ end,
+ port_close(Port)
+ end,
+ ?line Ps = lists:map(fun (N) ->
+ spawn_opt(Blast,
+ [{scheduler,
+ (N rem SchedOnln)+ 1},
+ monitor])
+ end,
+ lists:seq(1, 100)),
+ ?line MemMid = driver_alloc_size(),
+ ?line lists:foreach(fun ({Pid, Mon}) ->
+ receive
+ {'DOWN',Mon,process,Pid,_} -> ok
+ end
+ end, Ps),
+ ?line End = os:timestamp(),
+ ?line MemAfter = driver_alloc_size(),
+ ?line io:format("MemBefore=~p, MemMid=~p, MemAfter=~p~n",
+ [MemBefore, MemMid, MemAfter]),
+ ?line AsyncBlastTime = timer:now_diff(End,Start)/1000000,
+ ?line io:format("AsyncBlastTime=~p~n", [AsyncBlastTime]),
+ ?line MemBefore = MemAfter,
+ ?line erlang:display({async_blast_time, AsyncBlastTime}),
+ ?line ok.
+
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Utilities
diff --git a/erts/emulator/test/driver_SUITE_data/Makefile.src b/erts/emulator/test/driver_SUITE_data/Makefile.src
index 62ab5169c0..dd48f6a0f7 100644
--- a/erts/emulator/test/driver_SUITE_data/Makefile.src
+++ b/erts/emulator/test/driver_SUITE_data/Makefile.src
@@ -13,7 +13,8 @@ MISC_DRVS = outputv_drv@dll@ \
missing_callback_drv@dll@ \
thr_alloc_drv@dll@ \
otp_9302_drv@dll@ \
- thr_free_drv@dll@
+ thr_free_drv@dll@ \
+ async_blast_drv@dll@
SYS_INFO_DRVS = sys_info_1_0_drv@dll@ \
sys_info_1_1_drv@dll@ \
diff --git a/erts/emulator/test/driver_SUITE_data/async_blast_drv.c b/erts/emulator/test/driver_SUITE_data/async_blast_drv.c
new file mode 100644
index 0000000000..3821f7e3dc
--- /dev/null
+++ b/erts/emulator/test/driver_SUITE_data/async_blast_drv.c
@@ -0,0 +1,124 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2011. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+#include "erl_driver.h"
+
+#define NO_ASYNC_JOBS 10000
+
+static void stop(ErlDrvData drv_data);
+static ErlDrvData start(ErlDrvPort port,
+ char *command);
+static void output(ErlDrvData drv_data,
+ char *buf, int len);
+static void ready_async(ErlDrvData drv_data,
+ ErlDrvThreadData thread_data);
+
+static ErlDrvEntry async_blast_drv_entry = {
+ NULL /* init */,
+ start,
+ stop,
+ output,
+ NULL /* ready_input */,
+ NULL /* ready_output */,
+ "async_blast_drv",
+ NULL /* finish */,
+ NULL /* handle */,
+ NULL /* control */,
+ NULL /* timeout */,
+ NULL /* outputv */,
+ ready_async,
+ NULL /* flush */,
+ NULL /* call */,
+ NULL /* event */,
+ ERL_DRV_EXTENDED_MARKER,
+ ERL_DRV_EXTENDED_MAJOR_VERSION,
+ ERL_DRV_EXTENDED_MINOR_VERSION,
+ ERL_DRV_FLAG_USE_PORT_LOCKING,
+ NULL /* handle2 */,
+ NULL /* handle_monitor */
+};
+
+typedef struct {
+ ErlDrvPort port;
+ ErlDrvTermData caller;
+ int counter;
+} async_blast_data_t;
+
+
+DRIVER_INIT(async_blast_drv)
+{
+ return &async_blast_drv_entry;
+}
+
+static void stop(ErlDrvData drv_data)
+{
+ driver_free((void *) drv_data);
+}
+
+static ErlDrvData start(ErlDrvPort port,
+ char *command)
+{
+ async_blast_data_t *abd;
+
+ abd = driver_alloc(sizeof(async_blast_data_t));
+ if (!abd)
+ return ERL_DRV_ERROR_GENERAL;
+
+ abd->port = port;
+ abd->counter = 0;
+ return (ErlDrvData) abd;
+}
+
+static void async_invoke(void *data)
+{
+
+}
+#include <stdio.h>
+
+static void ready_async(ErlDrvData drv_data,
+ ErlDrvThreadData thread_data)
+{
+ async_blast_data_t *abd = (async_blast_data_t *) drv_data;
+ if (--abd->counter == 0) {
+ ErlDrvTermData spec[] = {
+ ERL_DRV_PORT, driver_mk_port(abd->port),
+ ERL_DRV_ATOM, driver_mk_atom("done"),
+ ERL_DRV_TUPLE, 2
+ };
+ driver_send_term(abd->port, abd->caller,
+ spec, sizeof(spec)/sizeof(spec[0]));
+ }
+}
+
+static void output(ErlDrvData drv_data,
+ char *buf, int len)
+{
+ async_blast_data_t *abd = (async_blast_data_t *) drv_data;
+ if (abd->counter == 0) {
+ int i;
+ abd->caller = driver_caller(abd->port);
+ abd->counter = NO_ASYNC_JOBS;
+ for (i = 0; i < NO_ASYNC_JOBS; i++) {
+ if (0 > driver_async(abd->port, NULL, async_invoke, NULL, NULL)) {
+ driver_failure_atom(abd->port, "driver_async_failed");
+ break;
+ }
+ }
+ }
+}