diff options
author | Rickard Green <[email protected]> | 2011-11-13 21:42:30 +0100 |
---|---|---|
committer | Rickard Green <[email protected]> | 2011-11-13 21:42:30 +0100 |
commit | ca58731b5df58aa2a8b42c583d1ba7bb929e72b2 (patch) | |
tree | 6bfe9e29277a767a240536897646016a971ead81 /erts | |
parent | 73ee2e00fe0389d0362e89a74d1909510da9e0fd (diff) | |
parent | dcc7ecbf6af5420af2d5dbd0e97fc7a2e0e894a6 (diff) | |
download | otp-ca58731b5df58aa2a8b42c583d1ba7bb929e72b2.tar.gz otp-ca58731b5df58aa2a8b42c583d1ba7bb929e72b2.tar.bz2 otp-ca58731b5df58aa2a8b42c583d1ba7bb929e72b2.zip |
Merge branch 'rickard/generic-thr-queue/OTP-9632'
* rickard/generic-thr-queue/OTP-9632:
Use generic lock-free queue for async threads
Use generic lock-free queue for misc aux work
Implement generic lock-free queue
Diffstat (limited to 'erts')
26 files changed, 2070 insertions, 753 deletions
diff --git a/erts/doc/src/erl_driver.xml b/erts/doc/src/erl_driver.xml index 2fb03954b6..8e18dd6657 100644 --- a/erts/doc/src/erl_driver.xml +++ b/erts/doc/src/erl_driver.xml @@ -1638,12 +1638,19 @@ ERL_DRV_EXT2TERM char *buf, ErlDrvUInt len <fsummary>Cancel an asynchronous call</fsummary> <desc> <marker id="driver_async_cancel"></marker> - <p>This function cancels an asynchronous operation, by removing - it from the queue. Only functions in the queue can be - cancelled; if a function is executing, it's too late to - cancel it. The <c>async_free</c> function is also called.</p> - <p>The return value is 1 if the operation was removed from the - queue, otherwise 0.</p> + <p>This function used to cancel a scheduled asynchronous operation, + if it was still in the queue. It returned 1 if it succeeded, and + 0 if it failed.</p> + <p>Since it could not guarantee success, it was more or less useless. + The user had to implement synchronization of cancellation anyway. + It also unnecessarily complicated the implementation. Therefore, + as of OTP-R15B <c>driver_async_cancel()</c> is deprecated, and + scheduled for removal in OTP-R16. It will currently always fail, + and return 0.</p> + <warning><p><c>driver_async_cancel()</c> is deferred and will + be removed in the OTP-R16 release.</p> + </warning> + </desc> </func> <func> diff --git a/erts/emulator/Makefile.in b/erts/emulator/Makefile.in index afca3b85df..708d4ca0a3 100644 --- a/erts/emulator/Makefile.in +++ b/erts/emulator/Makefile.in @@ -739,7 +739,7 @@ RUN_OBJS = \ $(OBJDIR)/packet_parser.o $(OBJDIR)/safe_hash.o \ $(OBJDIR)/erl_zlib.o $(OBJDIR)/erl_nif.o \ $(OBJDIR)/erl_bif_binary.o $(OBJDIR)/erl_ao_firstfit_alloc.o \ - $(OBJDIR)/erl_sched_spec_pre_alloc.o + $(OBJDIR)/erl_thr_queue.o $(OBJDIR)/erl_sched_spec_pre_alloc.o ifeq ($(TARGET),win32) DRV_OBJS = \ diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c index 705ace26fa..33d6cf5f2f 100644 --- a/erts/emulator/beam/erl_alloc.c +++ b/erts/emulator/beam/erl_alloc.c @@ -41,6 +41,7 @@ #include "erl_monitors.h" #include "erl_bif_timer.h" #include "erl_cpu_topology.h" +#include "erl_thr_queue.h" #if defined(ERTS_ALC_T_DRV_SEL_D_STATE) || defined(ERTS_ALC_T_DRV_EV_D_STATE) #include "erl_check_io.h" #endif @@ -524,6 +525,10 @@ erts_alloc_init(int *argc, char **argv, ErtsAllocInitOpts *eaiop) = sizeof(ErtsDrvSelectDataState); fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_MSG_REF)] = sizeof(ErlMessage); +#ifdef ERTS_SMP + fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_THR_Q_EL_SL)] + = sizeof(ErtsThrQElement_t); +#endif #ifdef HARD_DEBUG hdbg_init(); #endif @@ -3070,10 +3075,10 @@ erts_request_alloc_info(struct process *c_p, #ifdef ERTS_SMP if (erts_no_schedulers > 1) - erts_smp_schedule_misc_aux_work(1, - erts_no_schedulers, - reply_alloc_info, - (void *) air); + erts_schedule_multi_misc_aux_work(1, + erts_no_schedulers, + reply_alloc_info, + (void *) air); #endif reply_alloc_info((void *) air); diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 79d3433fc0..962db8b831 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -50,6 +50,15 @@ # command line argument to make_alloc_types. The variable X is false # after a "+disable X" statement or if it has never been mentioned. ++if smp ++disable threads_no_smp ++else ++if threads ++enable threads_no_smp ++else ++disable threads_no_smp ++endif ++endif # --- Allocator declarations ------------------------------------------------- # @@ -192,7 +201,7 @@ type LINEBUF STANDARD SYSTEM line_buf type IOQ STANDARD SYSTEM io_queue type BITS_BUF STANDARD SYSTEM bits_buf type TMP_DIST_BUF TEMPORARY SYSTEM tmp_dist_buf -type ASYNC_Q LONG_LIVED SYSTEM async_queue +type ASYNC_DATA LONG_LIVED SYSTEM internal_async_data type ESTACK TEMPORARY SYSTEM estack type PORT_CALL_BUF TEMPORARY SYSTEM port_call_buf type DB_TABLE ETS ETS db_tab @@ -253,6 +262,22 @@ type EXT_TERM_DATA SHORT_LIVED PROCESSES external_term_data type ZLIB STANDARD SYSTEM zlib type CPU_GRPS_MAP LONG_LIVED SYSTEM cpu_groups_map type AUX_WORK_TMO LONG_LIVED SYSTEM aux_work_timeouts +type MISC_AUX_WORK_Q LONG_LIVED SYSTEM misc_aux_work_q + ++if threads_no_smp +# Need thread safe allocs, but std_alloc and fix_alloc are not; +# use driver_alloc which is... +type THR_Q_EL DRIVER SYSTEM thr_q_element +type THR_Q_EL_SL DRIVER SYSTEM sl_thr_q_element +type MISC_AUX_WORK DRIVER SYSTEM misc_aux_work ++else +type THR_Q_EL STANDARD SYSTEM thr_q_element +type THR_Q_EL_SL FIXED_SIZE SYSTEM sl_thr_q_element +type MISC_AUX_WORK SHORT_LIVED SYSTEM misc_aux_work ++endif +type THR_Q STANDARD SYSTEM thr_queue +type THR_Q_SL SHORT_LIVED SYSTEM short_lived_thr_queue +type THR_Q_LL LONG_LIVED SYSTEM long_lived_thr_queue +if smp type ASYNC SHORT_LIVED SYSTEM async @@ -268,8 +293,6 @@ type XPORTS_LIST SHORT_LIVED SYSTEM extra_port_list type PROC_LCK_WTR LONG_LIVED SYSTEM proc_lock_waiter type PROC_LCK_QS LONG_LIVED SYSTEM proc_lock_queues type RUNQ_BLNS LONG_LIVED SYSTEM run_queue_balancing -type MISC_AUX_WORK_Q LONG_LIVED SYSTEM misc_aux_work_q -type MISC_AUX_WORK SHORT_LIVED SYSTEM misc_aux_work type THR_PRGR_IDATA LONG_LIVED SYSTEM thr_prgr_internal_data type THR_PRGR_DATA LONG_LIVED SYSTEM thr_prgr_data type T_THR_PRGR_DATA SHORT_LIVED SYSTEM temp_thr_prgr_data @@ -285,12 +308,6 @@ type ETHR_STD STANDARD SYSTEM ethread_standard type ETHR_SL SHORT_LIVED SYSTEM ethread_short_lived type ETHR_LL LONG_LIVED SYSTEM ethread_long_lived -+ifnot smp - -type ARCALLBACK LONG_LIVED SYSTEM async_ready_callback - -+endif - +endif +if shared_heap diff --git a/erts/emulator/beam/erl_async.c b/erts/emulator/beam/erl_async.c index 91b64411d4..2dc7237f7c 100644 --- a/erts/emulator/beam/erl_async.c +++ b/erts/emulator/beam/erl_async.c @@ -24,10 +24,18 @@ #include "erl_sys_driver.h" #include "global.h" #include "erl_threads.h" +#include "erl_thr_queue.h" +#include "erl_async.h" + +#define ERTS_MAX_ASYNC_READY_CALLS_IN_SEQ 20 + +#define ERTS_ASYNC_PRINT_JOB 0 + +#if !defined(ERTS_SMP) && defined(USE_THREADS) && !ERTS_USE_ASYNC_READY_Q +# error "Need async ready queue in non-smp case" +#endif typedef struct _erl_async { - struct _erl_async* next; - struct _erl_async* prev; DE_Handle* hndl; /* The DE_Handle is needed when port is gone */ Eterm port; long async_id; @@ -35,345 +43,498 @@ typedef struct _erl_async { ErlDrvPDL pdl; void (*async_invoke)(void*); void (*async_free)(void*); -} ErlAsync; +#if ERTS_USE_ASYNC_READY_Q + Uint sched_id; + union { + ErtsThrQPrepEnQ_t *prep_enq; + ErtsThrQFinDeQ_t fin_deq; + } q; +#endif +} ErtsAsync; + +#if ERTS_USE_ASYNC_READY_Q + +/* + * We can do without the enqueue mutex since it isn't needed for + * thread safety. Its only purpose is to put async threads to sleep + * during a blast of ready async jobs. This in order to reduce + * contention on the enqueue end of the async ready queues. During + * such a blast without the enqueue mutex much cpu time is consumed + * by the async threads without them doing much progress which in turn + * slow down progress of scheduler threads. + */ +#define ERTS_USE_ASYNC_READY_ENQ_MTX 1 + +#if ERTS_USE_ASYNC_READY_ENQ_MTX typedef struct { - erts_mtx_t mtx; - erts_cnd_t cv; - erts_tid_t thr; - int len; -#ifndef ERTS_SMP - int hndl; + erts_mtx_t enq_mtx; +} ErtsAsyncReadyQXData; + #endif - ErlAsync* head; - ErlAsync* tail; -#ifdef ERTS_ENABLE_LOCK_CHECK - int no; + +typedef struct { +#if ERTS_USE_ASYNC_READY_ENQ_MTX + union { + ErtsAsyncReadyQXData data; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE( + sizeof(ErtsAsyncReadyQXData))]; + } x; #endif -} AsyncQueue; + ErtsThrQ_t thr_q; + ErtsThrQFinDeQ_t fin_deq; +} ErtsAsyncReadyQ; -static erts_smp_spinlock_t async_id_lock; -static long async_id = 0; +typedef union { + ErtsAsyncReadyQ arq; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncReadyQ))]; +} ErtsAlgndAsyncReadyQ; -#ifndef ERTS_SMP +#endif /* ERTS_USE_ASYNC_READY_Q */ -erts_mtx_t async_ready_mtx; -static ErlAsync* async_ready_list = NULL; +typedef struct { + ErtsThrQ_t thr_q; + erts_tid_t thr_id; +} ErtsAsyncQ; + +typedef union { + ErtsAsyncQ aq; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncQ))]; +} ErtsAlgndAsyncQ; +typedef struct { + int no_initialized; + erts_mtx_t mtx; + erts_cnd_t cnd; + erts_atomic_t id; +} ErtsAsyncInit; + +typedef struct { + union { + ErtsAsyncInit data; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncInit))]; + } init; + ErtsAlgndAsyncQ *queue; +#if ERTS_USE_ASYNC_READY_Q + ErtsAlgndAsyncReadyQ *ready_queue; #endif +} ErtsAsyncData; -/* -** Initialize worker threads (if supported) -*/ +int erts_async_max_threads; /* Initialized by erl_init.c */ +int erts_async_thread_suggested_stack_size; /* Initialized by erl_init.c */ -/* Detach from driver */ -static void async_detach(DE_Handle* dh) -{ - return; -} +static ErtsAsyncData *async; +#ifndef USE_THREADS -#ifdef USE_THREADS +void +erts_init_async(void) +{ -static AsyncQueue* async_q; +} -static void* async_main(void*); -static void async_add(ErlAsync*, AsyncQueue*); +#else -#ifndef ERTS_SMP -typedef struct ErtsAsyncReadyCallback_ ErtsAsyncReadyCallback; -struct ErtsAsyncReadyCallback_ { - struct ErtsAsyncReadyCallback_ *next; - void (*callback)(void); -}; +static void *async_main(void *); -static ErtsAsyncReadyCallback *callbacks; -static int async_handle; +static ERTS_INLINE ErtsAsyncQ * +async_q(int i) +{ + return &async->queue[i].aq; +} + +#if ERTS_USE_ASYNC_READY_Q -int erts_register_async_ready_callback(void (*funcp)(void)) +static ERTS_INLINE ErtsAsyncReadyQ * +async_ready_q(Uint sched_id) { - ErtsAsyncReadyCallback *cb = erts_alloc(ERTS_ALC_T_ARCALLBACK, - sizeof(ErtsAsyncReadyCallback)); - cb->next = callbacks; - cb->callback = funcp; - erts_mtx_lock(&async_ready_mtx); - callbacks = cb; - erts_mtx_unlock(&async_ready_mtx); - return async_handle; + return &async->ready_queue[((int)sched_id)-1].arq; } + #endif -int init_async(int hndl) +void +erts_init_async(void) { - erts_thr_opts_t thr_opts = ERTS_THR_OPTS_DEFAULT_INITER; - AsyncQueue* q; - int i; + async = NULL; + if (erts_async_max_threads > 0) { +#if ERTS_USE_ASYNC_READY_Q + ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT; +#endif + erts_thr_opts_t thr_opts = ERTS_THR_OPTS_DEFAULT_INITER; + char *ptr; + size_t tot_size = 0; + int i; + + tot_size += ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncData)); + tot_size += sizeof(ErtsAlgndAsyncQ)*erts_async_max_threads; +#if ERTS_USE_ASYNC_READY_Q + tot_size += sizeof(ErtsAlgndAsyncReadyQ)*erts_no_schedulers; +#endif - thr_opts.detached = 0; - thr_opts.suggested_stack_size = erts_async_thread_suggested_stack_size; - -#ifndef ERTS_SMP - callbacks = NULL; - async_handle = hndl; - erts_mtx_init(&async_ready_mtx, "async_ready"); - async_ready_list = NULL; -#endif - - async_id = 0; - erts_smp_spinlock_init(&async_id_lock, "async_id"); - - async_q = q = (AsyncQueue*) - (erts_async_max_threads - ? erts_alloc(ERTS_ALC_T_ASYNC_Q, - erts_async_max_threads * sizeof(AsyncQueue)) - : NULL); - for (i = 0; i < erts_async_max_threads; i++) { - q->head = NULL; - q->tail = NULL; - q->len = 0; -#ifndef ERTS_SMP - q->hndl = hndl; -#endif -#ifdef ERTS_ENABLE_LOCK_CHECK - q->no = i; -#endif - erts_mtx_init(&q->mtx, "asyncq"); - erts_cnd_init(&q->cv); - erts_thr_create(&q->thr, async_main, (void*)q, &thr_opts); - q++; - } - return 0; -} + ptr = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_ASYNC_DATA, + tot_size); + async = (ErtsAsyncData *) ptr; + ptr += ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncData)); -int exit_async() -{ - int i; + async->init.data.no_initialized = 0; + erts_mtx_init(&async->init.data.mtx, "async_init_mtx"); + erts_cnd_init(&async->init.data.cnd); + erts_atomic_init_nob(&async->init.data.id, 0); - /* terminate threads */ - for (i = 0; i < erts_async_max_threads; i++) { - ErlAsync* a = (ErlAsync*) erts_alloc(ERTS_ALC_T_ASYNC, - sizeof(ErlAsync)); - a->port = NIL; - async_add(a, &async_q[i]); - } + async->queue = (ErtsAlgndAsyncQ *) ptr; + ptr += sizeof(ErtsAlgndAsyncQ)*erts_async_max_threads; - for (i = 0; i < erts_async_max_threads; i++) { - erts_thr_join(async_q[i].thr, NULL); - erts_mtx_destroy(&async_q[i].mtx); - erts_cnd_destroy(&async_q[i].cv); - } -#ifndef ERTS_SMP - erts_mtx_destroy(&async_ready_mtx); +#if ERTS_USE_ASYNC_READY_Q + + qinit.live.queue = ERTS_THR_Q_LIVE_LONG; + qinit.live.objects = ERTS_THR_Q_LIVE_SHORT; + qinit.notify = erts_notify_check_async_ready_queue; + + async->ready_queue = (ErtsAlgndAsyncReadyQ *) ptr; + ptr += sizeof(ErtsAlgndAsyncReadyQ)*erts_no_schedulers; + + for (i = 1; i <= erts_no_schedulers; i++) { + ErtsAsyncReadyQ *arq = async_ready_q(i); +#if ERTS_USE_ASYNC_READY_ENQ_MTX + erts_mtx_init(&arq->x.data.enq_mtx, "async_enq_mtx"); #endif - if (async_q) - erts_free(ERTS_ALC_T_ASYNC_Q, (void *) async_q); - return 0; + erts_thr_q_finalize_dequeue_state_init(&arq->fin_deq); + qinit.arg = (void *) (SWord) i; + erts_thr_q_initialize(&arq->thr_q, &qinit); + } + +#endif + + /* Create async threads... */ + + thr_opts.detached = 0; + thr_opts.suggested_stack_size + = erts_async_thread_suggested_stack_size; + + for (i = 0; i < erts_async_max_threads; i++) { + ErtsAsyncQ *aq = async_q(i); + erts_thr_create(&aq->thr_id, async_main, (void*) aq, &thr_opts); + } + + /* Wait for async threads to initialize... */ + + erts_mtx_lock(&async->init.data.mtx); + while (async->init.data.no_initialized != erts_async_max_threads) + erts_cnd_wait(&async->init.data.cnd, &async->init.data.mtx); + erts_mtx_unlock(&async->init.data.mtx); + + erts_mtx_destroy(&async->init.data.mtx); + erts_cnd_destroy(&async->init.data.cnd); + + } } +#if ERTS_USE_ASYNC_READY_Q -static void async_add(ErlAsync* a, AsyncQueue* q) +void * +erts_get_async_ready_queue(Uint sched_id) +{ + return (void *) async ? async_ready_q(sched_id) : NULL; +} + +#endif + +static ERTS_INLINE void async_add(ErtsAsync *a, ErtsAsyncQ* q) { if (is_internal_port(a->port)) { - ERTS_LC_ASSERT(erts_drvportid2port(a->port)); +#if ERTS_USE_ASYNC_READY_Q + ErtsAsyncReadyQ *arq = async_ready_q(a->sched_id); + a->q.prep_enq = erts_thr_q_prepare_enqueue(&arq->thr_q); +#endif /* make sure the driver will stay around */ - driver_lock_driver(internal_port_index(a->port)); + if (a->hndl) + erts_ddll_reference_referenced_driver(a->hndl); } - erts_mtx_lock(&q->mtx); +#if ERTS_ASYNC_PRINT_JOB + erts_fprintf(stderr, "-> %ld\n", a->async_id); +#endif - if (q->len == 0) { - q->head = a; - q->tail = a; - q->len = 1; - erts_cnd_signal(&q->cv); - } - else { /* no need to signal (since the worker is working) */ - a->next = q->head; - q->head->prev = a; - q->head = a; - q->len++; - } - erts_mtx_unlock(&q->mtx); + erts_thr_q_enqueue(&q->thr_q, a); } -static ErlAsync* async_get(AsyncQueue* q) +static ERTS_INLINE ErtsAsync *async_get(ErtsThrQ_t *q, + erts_tse_t *tse, + ErtsThrQPrepEnQ_t **prep_enq) { - ErlAsync* a; +#if ERTS_USE_ASYNC_READY_Q + int saved_fin_deq = 0; + ErtsThrQFinDeQ_t fin_deq; +#endif - erts_mtx_lock(&q->mtx); - while((a = q->tail) == NULL) { - erts_cnd_wait(&q->cv, &q->mtx); - } + while (1) { + ErtsAsync *a = (ErtsAsync *) erts_thr_q_dequeue(q); + if (a) { + +#if ERTS_USE_ASYNC_READY_Q + *prep_enq = a->q.prep_enq; + erts_thr_q_get_finalize_dequeue_data(q, &a->q.fin_deq); + if (saved_fin_deq) + erts_thr_q_append_finalize_dequeue_data(&a->q.fin_deq, &fin_deq); +#endif + + return a; + } + + if (ERTS_THR_Q_DIRTY != erts_thr_q_clean(q)) { + ErtsThrQFinDeQ_t tmp_fin_deq; + + erts_tse_reset(tse); + +#if ERTS_USE_ASYNC_READY_Q + chk_fin_deq: + if (erts_thr_q_get_finalize_dequeue_data(q, &tmp_fin_deq)) { + if (!saved_fin_deq) { + erts_thr_q_finalize_dequeue_state_init(&fin_deq); + saved_fin_deq = 1; + } + erts_thr_q_append_finalize_dequeue_data(&fin_deq, + &tmp_fin_deq); + } +#endif + + switch (erts_thr_q_inspect(q, 1)) { + case ERTS_THR_Q_DIRTY: + break; #ifdef ERTS_SMP - ASSERT(a && q->tail == a); + case ERTS_THR_Q_NEED_THR_PRGR: { + ErtsThrPrgrVal prgr = erts_thr_q_need_thr_progress(q); + erts_thr_progress_wakeup(NULL, prgr); + /* + * We do no dequeue finalizing in hope that a new async + * job will arrive before we are woken due to thread + * progress... + */ + erts_tse_wait(tse); + break; + } #endif - if (q->head == q->tail) { - q->head = q->tail = NULL; - q->len = 0; - } - else { - q->tail->prev->next = NULL; - q->tail = q->tail->prev; - q->len--; + case ERTS_THR_Q_CLEAN: + +#if ERTS_USE_ASYNC_READY_Q + if (saved_fin_deq) { + if (erts_thr_q_finalize_dequeue(&fin_deq)) + goto chk_fin_deq; + else + saved_fin_deq = 0; + } +#endif + + erts_tse_wait(tse); + break; + + default: + ASSERT(0); + break; + } + + } } - erts_mtx_unlock(&q->mtx); - return a; } - -static int async_del(long id) +static ERTS_INLINE void call_async_ready(ErtsAsync *a) { - int i; - /* scan all queue for an entry with async_id == 'id' */ - - for (i = 0; i < erts_async_max_threads; i++) { - ErlAsync* a; - erts_mtx_lock(&async_q[i].mtx); - - a = async_q[i].head; - while(a != NULL) { - if (a->async_id == id) { - if (a->prev != NULL) - a->prev->next = a->next; - else - async_q[i].head = a->next; - if (a->next != NULL) - a->next->prev = a->prev; - else - async_q[i].tail = a->prev; - async_q[i].len--; - erts_mtx_unlock(&async_q[i].mtx); - if (a->async_free != NULL) - a->async_free(a->async_data); - async_detach(a->hndl); - erts_free(ERTS_ALC_T_ASYNC, a); - return 1; - } - a = a->next; + Port *p = erts_id2port_sflgs(a->port, + NULL, + 0, + ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); + if (!p) { + if (a->async_free) + a->async_free(a->async_data); + } + else { + if (async_ready(p, a->async_data)) { + if (a->async_free) + a->async_free(a->async_data); } - erts_mtx_unlock(&async_q[i].mtx); + erts_port_release(p); } - return 0; + if (a->hndl) + erts_ddll_dereference_driver(a->hndl); } -static void* async_main(void* arg) +static ERTS_INLINE void async_reply(ErtsAsync *a, ErtsThrQPrepEnQ_t *prep_enq) { - AsyncQueue* q = (AsyncQueue*) arg; +#if ERTS_USE_ASYNC_READY_Q + ErtsAsyncReadyQ *arq; -#ifdef ERTS_ENABLE_LOCK_CHECK - { - char buf[27]; - erts_snprintf(&buf[0], 27, "async %d", q->no); - erts_lc_set_thread_name(&buf[0]); - } + if (a->pdl) + driver_pdl_dec_refc(a->pdl); + +#if ERTS_ASYNC_PRINT_JOB + erts_fprintf(stderr, "=>> %ld\n", a->async_id); #endif - while(1) { - ErlAsync* a = async_get(q); + arq = async_ready_q(a->sched_id); - if (a->port == NIL) { /* TIME TO DIE SIGNAL */ - erts_free(ERTS_ALC_T_ASYNC, (void *) a); - break; - } - else { - (*a->async_invoke)(a->async_data); - /* Major problem if the code for async_invoke - or async_free is removed during a blocking operation */ +#if ERTS_USE_ASYNC_READY_ENQ_MTX + erts_mtx_lock(&arq->x.data.enq_mtx); +#endif + + erts_thr_q_enqueue_prepared(&arq->thr_q, (void *) a, prep_enq); + +#if ERTS_USE_ASYNC_READY_ENQ_MTX + erts_mtx_unlock(&arq->x.data.enq_mtx); +#endif + +#else /* ERTS_USE_ASYNC_READY_Q */ + + call_async_ready(a); + if (a->pdl) + driver_pdl_dec_refc(a->pdl); + erts_free(ERTS_ALC_T_ASYNC, (void *) a); + +#endif /* ERTS_USE_ASYNC_READY_Q */ +} + + +static void +async_wakeup(void *vtse) +{ + erts_tse_set((erts_tse_t *) vtse); +} + +static erts_tse_t *async_thread_init(ErtsAsyncQ *aq) +{ + ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT; + erts_tse_t *tse = erts_tse_fetch(); #ifdef ERTS_SMP - { - Port *p; - p = erts_id2port_sflgs(a->port, - NULL, - 0, - ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); - if (!p) { - if (a->async_free) - (*a->async_free)(a->async_data); - } - else { - if (async_ready(p, a->async_data)) { - if (a->async_free) - (*a->async_free)(a->async_data); - } - async_detach(a->hndl); - erts_port_release(p); - } - if (a->pdl) { - driver_pdl_dec_refc(a->pdl); - } - erts_free(ERTS_ALC_T_ASYNC, (void *) a); - } -#else - if (a->pdl) { - driver_pdl_dec_refc(a->pdl); - } - erts_mtx_lock(&async_ready_mtx); - a->next = async_ready_list; - async_ready_list = a; - erts_mtx_unlock(&async_ready_mtx); - sys_async_ready(q->hndl); + ErtsThrPrgrCallbacks callbacks; + + callbacks.arg = (void *) tse; + callbacks.wakeup = async_wakeup; + callbacks.prepare_wait = NULL; + callbacks.wait = NULL; + + erts_thr_progress_register_unmanaged_thread(&callbacks); #endif - } - } - return NULL; + qinit.live.queue = ERTS_THR_Q_LIVE_LONG; + qinit.live.objects = ERTS_THR_Q_LIVE_SHORT; + qinit.arg = (void *) tse; + qinit.notify = async_wakeup; +#if ERTS_USE_ASYNC_READY_Q + qinit.auto_finalize_dequeue = 0; +#endif + + erts_thr_q_initialize(&aq->thr_q, &qinit); + + /* Inform main thread that we are done initializing... */ + erts_mtx_lock(&async->init.data.mtx); + async->init.data.no_initialized++; + erts_cnd_signal(&async->init.data.cnd); + erts_mtx_unlock(&async->init.data.mtx); + + return tse; } +static void *async_main(void* arg) +{ + ErtsAsyncQ *aq = (ErtsAsyncQ *) arg; + erts_tse_t *tse = async_thread_init(aq); + + while (1) { + ErtsThrQPrepEnQ_t *prep_enq; + ErtsAsync *a = async_get(&aq->thr_q, tse, &prep_enq); + if (is_nil(a->port)) + break; /* Time to die */ +#if ERTS_ASYNC_PRINT_JOB + erts_fprintf(stderr, "<- %ld\n", a->async_id); #endif -#ifndef ERTS_SMP + a->async_invoke(a->async_data); + + async_reply(a, prep_enq); + } + + return NULL; +} + +#endif /* USE_THREADS */ -int check_async_ready(void) +void +erts_exit_flush_async(void) { #ifdef USE_THREADS - ErtsAsyncReadyCallback *cbs; + int i; + ErtsAsync a; + a.port = NIL; + /* + * Terminate threads in order to flush queues. We do not + * bother to clean everything up since we are about to + * terminate the runtime system and a cleanup would only + * delay the termination. + */ + for (i = 0; i < erts_async_max_threads; i++) + async_add(&a, async_q(i)); + for (i = 0; i < erts_async_max_threads; i++) + erts_thr_join(async->queue[i].aq.thr_id, NULL); #endif - ErlAsync* a; - int count = 0; +} - erts_mtx_lock(&async_ready_mtx); - a = async_ready_list; - async_ready_list = NULL; -#ifdef USE_THREADS - cbs = callbacks; -#endif - erts_mtx_unlock(&async_ready_mtx); - - while(a != NULL) { - ErlAsync* a_next = a->next; - /* Every port not dead */ - Port *p = erts_id2port_sflgs(a->port, - NULL, - 0, - ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); - if (!p) { - if (a->async_free) - (*a->async_free)(a->async_data); - } - else { - count++; - if (async_ready(p, a->async_data)) { - if (a->async_free != NULL) - (*a->async_free)(a->async_data); - } - async_detach(a->hndl); - erts_port_release(p); +#if defined(USE_THREADS) && ERTS_USE_ASYNC_READY_Q + +int erts_check_async_ready(void *varq) +{ + ErtsAsyncReadyQ *arq = (ErtsAsyncReadyQ *) varq; + int res = 1; + int i; + + for (i = 0; i < ERTS_MAX_ASYNC_READY_CALLS_IN_SEQ; i++) { + ErtsAsync *a = (ErtsAsync *) erts_thr_q_dequeue(&arq->thr_q); + if (!a) { + res = 0; + break; } + +#if ERTS_ASYNC_PRINT_JOB + erts_fprintf(stderr, "<<= %ld\n", a->async_id); +#endif + erts_thr_q_append_finalize_dequeue_data(&arq->fin_deq, &a->q.fin_deq); + call_async_ready(a); erts_free(ERTS_ALC_T_ASYNC, (void *) a); - a = a_next; } -#ifdef USE_THREADS - for (; cbs; cbs = cbs->next) - (*cbs->callback)(); -#endif - return count; + + erts_thr_q_finalize_dequeue(&arq->fin_deq); + + return res; } +int erts_async_ready_clean(void *varq, void *val) +{ + ErtsAsyncReadyQ *arq = (ErtsAsyncReadyQ *) varq; + ErtsThrQCleanState_t cstate; + + cstate = erts_thr_q_clean(&arq->thr_q); + + if (erts_thr_q_finalize_dequeue(&arq->fin_deq)) + return ERTS_ASYNC_READY_DIRTY; + + switch (cstate) { + case ERTS_THR_Q_DIRTY: + return ERTS_ASYNC_READY_DIRTY; +#ifdef ERTS_SMP + case ERTS_THR_Q_NEED_THR_PRGR: + *((ErtsThrPrgrVal *) val) + = erts_thr_q_need_thr_progress(&arq->thr_q); + return ERTS_ASYNC_READY_NEED_THR_PRGR; #endif + case ERTS_THR_Q_CLEAN: + break; + } + return ERTS_ASYNC_READY_CLEAN; +} +#endif /* ** Schedule async_invoke on a worker thread @@ -393,19 +554,29 @@ long driver_async(ErlDrvPort ix, unsigned int* key, void (*async_invoke)(void*), void* async_data, void (*async_free)(void*)) { - ErlAsync* a = (ErlAsync*) erts_alloc(ERTS_ALC_T_ASYNC, sizeof(ErlAsync)); - Port* prt = erts_drvport2port(ix); + ErtsAsync* a; + Port* prt; long id; unsigned int qix; +#if ERTS_USE_ASYNC_READY_Q + Uint sched_id; + sched_id = erts_get_scheduler_id(); + if (!sched_id) + sched_id = 1; +#endif + prt = erts_drvport2port(ix); if (!prt) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); - a->next = NULL; - a->prev = NULL; + a = (ErtsAsync*) erts_alloc(ERTS_ALC_T_ASYNC, sizeof(ErtsAsync)); + +#if ERTS_USE_ASYNC_READY_Q + a->sched_id = sched_id; +#endif a->hndl = (DE_Handle*)prt->drv_ptr->handle; a->port = prt->id; a->pdl = NULL; @@ -413,12 +584,16 @@ long driver_async(ErlDrvPort ix, unsigned int* key, a->async_invoke = async_invoke; a->async_free = async_free; - erts_smp_spin_lock(&async_id_lock); - async_id = (async_id + 1) & 0x7fffffff; - if (async_id == 0) - async_id++; - id = async_id; - erts_smp_spin_unlock(&async_id_lock); + if (!async) + id = 0; + else { + do { + id = erts_atomic_inc_read_nob(&async->init.data.id); + } while (id == 0); + if (id < 0) + id *= -1; + ASSERT(id > 0); + } a->async_id = id; @@ -437,7 +612,7 @@ long driver_async(ErlDrvPort ix, unsigned int* key, driver_pdl_inc_refc(prt->port_data_lock); a->pdl = prt->port_data_lock; } - async_add(a, &async_q[qix]); + async_add(a, async_q(qix)); return id; } #endif @@ -455,10 +630,16 @@ long driver_async(ErlDrvPort ix, unsigned int* key, int driver_async_cancel(unsigned int id) { -#ifdef USE_THREADS - if (erts_async_max_threads > 0) - return async_del(id); -#endif + /* + * Not supported anymore. Always fail (which is backward + * compatible). + * + * This functionality could be implemented again. However, + * it is (and always has been) completely useless since + * it doesn't give you any guarantees whatsoever. The user + * needs to (and always have had to) synchronize in his/her + * own code in order to get any guarantees. + */ return 0; } diff --git a/erts/emulator/beam/erl_async.h b/erts/emulator/beam/erl_async.h new file mode 100644 index 0000000000..95374a8fc9 --- /dev/null +++ b/erts/emulator/beam/erl_async.h @@ -0,0 +1,66 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2011. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +#ifndef ERL_ASYNC_H__ +#define ERL_ASYNC_H__ + +#define ERTS_MAX_NO_OF_ASYNC_THREADS 1024 +extern int erts_async_max_threads; +#define ERTS_ASYNC_THREAD_MIN_STACK_SIZE 16 /* Kilo words */ +#define ERTS_ASYNC_THREAD_MAX_STACK_SIZE 8192 /* Kilo words */ +extern int erts_async_thread_suggested_stack_size; + +#ifdef USE_THREADS + +#ifdef ERTS_SMP +/* + * With smp support we can choose to have, or not to + * have an async ready queue. + */ +#define ERTS_USE_ASYNC_READY_Q 1 +#endif + +#ifndef ERTS_SMP +/* In non-smp case we *need* the async ready queue */ +# undef ERTS_USE_ASYNC_READY_Q +# define ERTS_USE_ASYNC_READY_Q 1 +#endif + +#ifndef ERTS_USE_ASYNC_READY_Q +# define ERTS_USE_ASYNC_READY_Q 0 +#endif + +#if ERTS_USE_ASYNC_READY_Q +int erts_check_async_ready(void *); +int erts_async_ready_clean(void *, void *); +void *erts_get_async_ready_queue(Uint sched_id); +#define ERTS_ASYNC_READY_CLEAN 0 +#define ERTS_ASYNC_READY_DIRTY 1 +#ifdef ERTS_SMP +#define ERTS_ASYNC_READY_NEED_THR_PRGR 2 +#endif +#endif /* ERTS_USE_ASYNC_READY_Q */ + +#endif /* USE_THREADS */ + +void erts_init_async(void); +void erts_exit_flush_async(void); + + +#endif /* ERL_ASYNC_H__ */ diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 7119306a52..a79feaebdb 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -39,6 +39,7 @@ #include "dist.h" #include "erl_gc.h" #include "erl_cpu_topology.h" +#include "erl_async.h" #include "erl_thr_progress.h" #ifdef HIPE #include "hipe_arch.h" diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c index d8b4294a30..0079c13287 100644 --- a/erts/emulator/beam/erl_db.c +++ b/erts/emulator/beam/erl_db.c @@ -280,8 +280,7 @@ static void schedule_free_dbtable(DbTable* tb) ASSERT(scheds >= 1); ASSERT(erts_refc_read(&tb->common.ref, 0) == 0); erts_refc_init(&tb->common.ref, scheds); - ERTS_THR_MEMORY_BARRIER; - erts_smp_schedule_misc_aux_work(0, scheds, chk_free_dbtable, tb); + erts_schedule_multi_misc_aux_work(0, scheds, chk_free_dbtable, tb); #else free_dbtable(tb); #endif diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h index 401967a8de..ae0c9def90 100644 --- a/erts/emulator/beam/erl_driver.h +++ b/erts/emulator/beam/erl_driver.h @@ -28,6 +28,14 @@ # include "config.h" #endif +#define ERL_DRV_DEPRECATED_FUNC +#ifdef __GNUC__ +# if __GNUC__ >= 3 +# undef ERL_DRV_DEPRECATED_FUNC +# define ERL_DRV_DEPRECATED_FUNC __attribute__((deprecated)) +# endif +#endif + #ifdef SIZEOF_CHAR # define SIZEOF_CHAR_SAVED__ SIZEOF_CHAR # undef SIZEOF_CHAR @@ -582,8 +590,11 @@ EXTERN long driver_async(ErlDrvPort ix, void* async_data, void (*async_free)(void*)); - -EXTERN int driver_async_cancel(unsigned int key); +/* + * driver_async_cancel() is deprecated. It is scheduled for removal + * in OTP-R16. For more information see the erl_driver(3) documentation. + */ +EXTERN int driver_async_cancel(unsigned int key) ERL_DRV_DEPRECATED_FUNC; /* Locks the driver in the machine "forever", there is no unlock function. Note that this is almost never useful, as an open diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index 647074a47f..9a09f08618 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -43,6 +43,8 @@ #include "packet_parser.h" #include "erl_cpu_topology.h" #include "erl_thr_progress.h" +#include "erl_thr_queue.h" +#include "erl_async.h" #ifdef HIPE #include "hipe_mode_switch.h" /* for hipe_mode_switch_init() */ @@ -100,8 +102,6 @@ int erts_backtrace_depth; /* How many functions to show in a backtrace * in error codes. */ -int erts_async_max_threads; /* number of threads for async support */ -int erts_async_thread_suggested_stack_size; erts_smp_atomic32_t erts_max_gen_gcs; Eterm erts_error_logger_warnings; /* What to map warning logs to, am_error, @@ -280,6 +280,7 @@ erl_init(int ncpu) erts_init_node_tables(); init_dist(); erl_drv_thr_init(); + erts_init_async(); init_io(); init_copy(); init_load(); @@ -606,6 +607,8 @@ early_init(int *argc, char **argv) /* int max_main_threads; int max_reader_groups; int reader_groups; + char envbuf[21]; /* enough for any 64-bit integer */ + size_t envbufsz; use_multi_run_queue = 1; erts_printf_eterm_func = erts_printf_term; @@ -677,6 +680,16 @@ early_init(int *argc, char **argv) /* schdlrs = no_schedulers; schdlrs_onln = no_schedulers_online; + envbufsz = sizeof(envbuf); + + /* erts_sys_getenv() not initialized yet; need erts_sys_getenv__() */ + if (erts_sys_getenv__("ERL_THREAD_POOL_SIZE", envbuf, &envbufsz) == 0) + erts_async_max_threads = atoi(envbuf); + else + erts_async_max_threads = 0; + if (erts_async_max_threads > ERTS_MAX_NO_OF_ASYNC_THREADS) + erts_async_max_threads = ERTS_MAX_NO_OF_ASYNC_THREADS; + if (argc && argv) { int i = 1; while (i < *argc) { @@ -704,6 +717,20 @@ early_init(int *argc, char **argv) /* } break; } + case 'A': { + /* set number of threads in thread pool */ + char *arg = get_arg(argv[i]+2, argv[i+1], &i); + if (((erts_async_max_threads = atoi(arg)) < 0) || + (erts_async_max_threads > ERTS_MAX_NO_OF_ASYNC_THREADS)) { + erts_fprintf(stderr, + "bad number of async threads %s\n", + arg); + erts_usage(); + VERBOSE(DEBUG_SYSTEM, ("using %d async-threads\n", + erts_async_max_threads)); + } + break; + } case 'S' : { int tot, onln; char *arg = get_arg(argv[i]+2, argv[i+1], &i); @@ -784,10 +811,14 @@ early_init(int *argc, char **argv) /* * ** Aux thread (see erl_process.c) * ** Sys message dispatcher thread (see erl_trace.c) * - * * No unmanaged threads that need to register. + * * Unmanaged threads that need to register: + * ** Async threads (see erl_async.c) */ - erts_thr_progress_init(no_schedulers, no_schedulers+1, 0); + erts_thr_progress_init(no_schedulers, + no_schedulers+2, + erts_async_max_threads); #endif + erts_thr_q_init(); erts_init_utils(); erts_early_init_cpu_topology(no_schedulers, &max_main_threads, @@ -867,7 +898,6 @@ erl_start(int argc, char **argv) int have_break_handler = 1; char envbuf[21]; /* enough for any 64-bit integer */ size_t envbufsz; - int async_max_threads = erts_async_max_threads; int ncpu = early_init(&argc, argv); envbufsz = sizeof(envbuf); @@ -883,11 +913,6 @@ erl_start(int argc, char **argv) (erts_aint32_t) max_gen_gcs); } - envbufsz = sizeof(envbuf); - if (erts_sys_getenv("ERL_THREAD_POOL_SIZE", envbuf, &envbufsz) == 0) { - async_max_threads = atoi(envbuf); - } - #if (defined(__APPLE__) && defined(__MACH__)) || defined(__DARWIN__) /* * The default stack size on MacOS X is too small for pcre. @@ -1317,17 +1342,8 @@ erl_start(int argc, char **argv) break; } - case 'A': - /* set number of threads in thread pool */ - arg = get_arg(argv[i]+2, argv[i+1], &i); - if (((async_max_threads = atoi(arg)) < 0) || - (async_max_threads > ERTS_MAX_NO_OF_ASYNC_THREADS)) { - erts_fprintf(stderr, "bad number of async threads %s\n", arg); - erts_usage(); - } - - VERBOSE(DEBUG_SYSTEM, ("using %d async-threads\n", - async_max_threads)); + case 'A': /* Was handled in early init just read past it */ + (void) get_arg(argv[i]+2, argv[i+1], &i); break; case 'a': @@ -1416,10 +1432,6 @@ erl_start(int argc, char **argv) i++; } -#ifdef USE_THREADS - erts_async_max_threads = async_max_threads; -#endif - /* Delayed check of +P flag */ if (erts_max_processes < ERTS_MIN_PROCESSES || erts_max_processes > ERTS_MAX_PROCESSES @@ -1465,6 +1477,10 @@ erl_start(int argc, char **argv) erts_sys_main_thread(); /* May or may not return! */ #else erts_thr_set_main_status(1, 1); +#if ERTS_USE_ASYNC_READY_Q + erts_get_scheduler_data()->aux_work_data.async_ready.queue + = erts_get_async_ready_queue(1); +#endif set_main_stack_size(); process_main(); #endif @@ -1537,14 +1553,7 @@ system_cleanup(int exit_code) erts_cleanup_incgc(); #endif -#if defined(USE_THREADS) - exit_async(); -#endif - - /* - * A lot more cleaning could/should have been done... - */ - + erts_exit_flush_async(); } /* diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c index 02d1407a2d..44da6b6c51 100644 --- a/erts/emulator/beam/erl_lock_check.c +++ b/erts/emulator/beam/erl_lock_check.c @@ -110,10 +110,6 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "fun_tab", NULL }, { "environ", NULL }, #endif - { "asyncq", "address" }, -#ifndef ERTS_SMP - { "async_ready", NULL }, -#endif { "efile_drv", "address" }, #if defined(ENABLE_CHILD_WAITER_THREAD) || defined(ERTS_SMP) { "child_status", NULL }, @@ -138,6 +134,7 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "alcu_init_atoms", NULL }, { "mseg_init_atoms", NULL }, { "drv_tsd", NULL }, + { "async_enq_mtx", NULL }, #ifdef ERTS_SMP { "sys_msg_q", NULL }, { "atom_tab", NULL }, @@ -173,14 +170,12 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "timeofday", NULL }, { "breakpoints", NULL }, { "pollsets_lock", NULL }, - { "async_id", NULL }, { "pix_lock", "address" }, { "run_queues_lists", NULL }, - { "misc_aux_work_queue", "index" }, - { "misc_aux_work_pre_alloc_lock", "address" }, { "sched_stat", NULL }, { "run_queue_sleep_list", "address" }, #endif + { "async_init_mtx", NULL }, #ifdef ERTS_SMP { "proc_lck_qs_alloc", NULL }, #endif diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 6adeef2e69..a3c1c9577b 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -40,6 +40,8 @@ #include "beam_bp.h" #include "erl_cpu_topology.h" #include "erl_thr_progress.h" +#include "erl_thr_queue.h" +#include "erl_async.h" #define ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED (2000*CONTEXT_REDS) #define ERTS_RUNQ_CALL_CHECK_BALANCE_REDS \ @@ -125,7 +127,6 @@ ErtsLcPSDLocks erts_psd_required_locks[ERTS_PSD_SIZE]; #endif #ifdef ERTS_SMP - int erts_disable_proc_not_running_opt; static ErtsAuxWorkData *aux_thread_aux_work_data; @@ -361,6 +362,15 @@ dbg_chk_aux_work_val(erts_aint32_t value) #ifdef ERTS_SSI_AUX_WORK_MISC valid |= ERTS_SSI_AUX_WORK_MISC; #endif +#ifdef ERTS_SSI_AUX_WORK_MISC_THR_PRGR + valid |= ERTS_SSI_AUX_WORK_MISC_THR_PRGR; +#endif +#ifdef ERTS_SSI_AUX_WORK_ASYNC_READY + valid |= ERTS_SSI_AUX_WORK_ASYNC_READY; +#endif +#ifdef ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN + valid |= ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN; +#endif #ifdef ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM valid |= ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM; @@ -707,37 +717,37 @@ unset_aux_work_flags(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flgs) return erts_atomic32_read_band_nob(&ssi->aux_work, ~flgs); } -#ifdef ERTS_SMP - typedef struct erts_misc_aux_work_t_ erts_misc_aux_work_t; struct erts_misc_aux_work_t_ { - erts_misc_aux_work_t *next; void (*func)(void *); void *arg; }; -typedef struct { - erts_smp_mtx_t mtx; - erts_misc_aux_work_t *first; - erts_misc_aux_work_t *last; -} erts_misc_aux_work_q_t; +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_aux_work, + erts_misc_aux_work_t, + 200, + ERTS_ALC_T_MISC_AUX_WORK) typedef union { - erts_misc_aux_work_q_t data; - char align[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_misc_aux_work_q_t))]; + ErtsThrQ_t q; + char align[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQ_t))]; } erts_algnd_misc_aux_work_q_t; static erts_algnd_misc_aux_work_q_t *misc_aux_work_queues; -ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_aux_work, - erts_misc_aux_work_t, - 200, - ERTS_ALC_T_MISC_AUX_WORK) +static void +notify_aux_work(void *vssi) +{ + set_aux_work_flags_wakeup_nob((ErtsSchedulerSleepInfo *) vssi, + ERTS_SSI_AUX_WORK_MISC); +} static void init_misc_aux_work(void) { int ix; + ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT; + qinit.notify = notify_aux_work; init_misc_aux_work_alloc(); @@ -746,88 +756,189 @@ init_misc_aux_work(void) sizeof(erts_algnd_misc_aux_work_q_t) * (erts_no_schedulers+1)); - for (ix = 0; ix <= erts_no_schedulers; ix++) { - erts_smp_mtx_init_x(&misc_aux_work_queues[ix].data.mtx, - "misc_aux_work_queue", - make_small(ix)); - misc_aux_work_queues[ix].data.first = NULL; - misc_aux_work_queues[ix].data.last = NULL; +#ifdef ERTS_SMP + ix = 0; /* aux_thread + schedulers */ +#else + ix = 1; /* scheduler only */ +#endif + + for (; ix <= erts_no_schedulers; ix++) { + qinit.arg = (void *) ERTS_SCHED_SLEEP_INFO_IX(ix-1); + erts_thr_q_initialize(&misc_aux_work_queues[ix].q, &qinit); + } +} + +static erts_aint32_t +misc_aux_work_clean(ErtsThrQ_t *q, + ErtsAuxWorkData *awdp, + erts_aint32_t aux_work) +{ + switch (erts_thr_q_clean(q)) { + case ERTS_THR_Q_DIRTY: + set_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC); + return aux_work | ERTS_SSI_AUX_WORK_MISC; +#ifdef ERTS_SMP + case ERTS_THR_Q_NEED_THR_PRGR: + set_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC_THR_PRGR); + erts_thr_progress_wakeup(awdp->esdp, + erts_thr_q_need_thr_progress(q)); +#endif + case ERTS_THR_Q_CLEAN: + break; } + return aux_work; } static erts_aint32_t handle_misc_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t aux_work) { - int ix = (int) awdp->sched_id; - erts_misc_aux_work_t *mawp; + ErtsThrQ_t *q = &misc_aux_work_queues[awdp->sched_id].q; unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC); - - erts_smp_mtx_lock(&misc_aux_work_queues[ix].data.mtx); - mawp = misc_aux_work_queues[ix].data.first; - misc_aux_work_queues[ix].data.first = NULL; - misc_aux_work_queues[ix].data.last = NULL; - erts_smp_mtx_unlock(&misc_aux_work_queues[ix].data.mtx); - - while (mawp) { - erts_misc_aux_work_t *free_mawp; + while (1) { + erts_misc_aux_work_t *mawp = erts_thr_q_dequeue(q); + if (!mawp) + break; mawp->func(mawp->arg); - free_mawp = mawp; - mawp = mawp->next; - misc_aux_work_free(free_mawp); + misc_aux_work_free(mawp); } - return aux_work & ~ERTS_SSI_AUX_WORK_MISC; + return misc_aux_work_clean(q, awdp, aux_work & ~ERTS_SSI_AUX_WORK_MISC); } -static void -smp_schedule_misc_aux_work(int ix, - void (*func)(void *), - void *arg) +#ifdef ERTS_SMP + +static erts_aint32_t +handle_misc_aux_work_thr_prgr(ErtsAuxWorkData *awdp, + erts_aint32_t aux_work) { - erts_aint32_t aux_work; + if (!erts_thr_progress_has_reached(awdp->misc.thr_prgr)) + return aux_work; + + unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC_THR_PRGR); + + return misc_aux_work_clean(&misc_aux_work_queues[awdp->sched_id].q, + awdp, + aux_work & ~ERTS_SSI_AUX_WORK_MISC_THR_PRGR); +} + +#endif + +static ERTS_INLINE void +schedule_misc_aux_work(int sched_id, + void (*func)(void *), + void *arg) +{ + ErtsThrQ_t *q; erts_misc_aux_work_t *mawp; - ErtsSchedulerSleepInfo *ssi; - mawp = misc_aux_work_alloc(); +#ifdef ERTS_SMP + ASSERT(0 <= sched_id && sched_id <= erts_no_schedulers); +#else + ASSERT(sched_id == 1); +#endif + q = &misc_aux_work_queues[sched_id].q; + mawp = misc_aux_work_alloc(); mawp->func = func; mawp->arg = arg; - mawp->next = NULL; - - erts_smp_mtx_lock(&misc_aux_work_queues[ix].data.mtx); - if (!misc_aux_work_queues[ix].data.last) - misc_aux_work_queues[ix].data.first = mawp; - else - misc_aux_work_queues[ix].data.last->next = mawp; - misc_aux_work_queues[ix].data.last = mawp; - erts_smp_mtx_unlock(&misc_aux_work_queues[ix].data.mtx); + erts_thr_q_enqueue(q, mawp); +} - set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(ix-1), - ERTS_SSI_AUX_WORK_MISC); +void +erts_schedule_misc_aux_work(int sched_id, + void (*func)(void *), + void *arg) +{ + schedule_misc_aux_work(sched_id, func, arg); } void -erts_smp_schedule_misc_aux_work(int ignore_self, - int max_sched, - void (*func)(void *), - void *arg) +erts_schedule_multi_misc_aux_work(int ignore_self, + int max_sched, + void (*func)(void *), + void *arg) { - int ix, ignore_ix = -1; + int id, self = 0; if (ignore_self) { ErtsSchedulerData *esdp = erts_get_scheduler_data(); if (esdp) - ignore_ix = (int) esdp->no; + self = (int) esdp->no; } ASSERT(0 < max_sched && max_sched <= erts_no_schedulers); - for (ix = 1; ix <= max_sched; ix++) { - if (ix == ignore_ix) + for (id = 1; id <= max_sched; id++) { + if (id == self) continue; - smp_schedule_misc_aux_work(ix, func, arg); + schedule_misc_aux_work(id, func, arg); + } +} + +#if ERTS_USE_ASYNC_READY_Q + +void +erts_notify_check_async_ready_queue(void *vno) +{ + int ix = ((int) (SWord) vno) -1; + set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(ix), + ERTS_SSI_AUX_WORK_ASYNC_READY); +} + +static erts_aint32_t +handle_async_ready(ErtsAuxWorkData *awdp, + erts_aint32_t aux_work) +{ + ErtsSchedulerSleepInfo *ssi = awdp->ssi; + unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY); + if (erts_check_async_ready(awdp->async_ready.queue)) { + if (set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY) + & ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN) { + unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN); + aux_work &= ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN; + } + return aux_work; + } +#ifdef ERTS_SMP + awdp->async_ready.need_thr_prgr = 0; +#endif + set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN); + return ((aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY) + | ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN); +} + +static erts_aint32_t +handle_async_ready_clean(ErtsAuxWorkData *awdp, + erts_aint32_t aux_work) +{ + void *thr_prgr_p; + +#ifdef ERTS_SMP + if (awdp->async_ready.need_thr_prgr + && !erts_thr_progress_has_reached(awdp->misc.thr_prgr)) { + return aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN; + } + + awdp->async_ready.need_thr_prgr = 0; + thr_prgr_p = (void *) &awdp->async_ready.thr_prgr; +#else + thr_prgr_p = NULL; +#endif + + switch (erts_async_ready_clean(awdp->async_ready.queue, thr_prgr_p)) { + case ERTS_ASYNC_READY_CLEAN: + unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN); + return aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN; +#ifdef ERTS_SMP + case ERTS_ASYNC_READY_NEED_THR_PRGR: + erts_thr_progress_wakeup(awdp->esdp, + awdp->async_ready.thr_prgr); + awdp->async_ready.need_thr_prgr = 1; +#endif + default: + return aux_work; } } @@ -964,14 +1075,14 @@ prep_setup_completed_dealloc(void *vproc) erts_aint32_t count = (erts_aint32_t) (erts_no_schedulers+1); if (erts_atomic32_dec_read_mb(&completed_dealloc_count) == count) { /* scheduler threads */ - erts_smp_schedule_misc_aux_work(0, - erts_no_schedulers, - setup_completed_dealloc, - vproc); + erts_schedule_multi_misc_aux_work(0, + erts_no_schedulers, + setup_completed_dealloc, + vproc); /* aux_thread */ - smp_schedule_misc_aux_work(0, - setup_completed_dealloc, - vproc); + erts_schedule_misc_aux_work(0, + setup_completed_dealloc, + vproc); } } @@ -992,14 +1103,14 @@ erts_debug_wait_deallocations(Process *c_p) erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL); erts_smp_proc_inc_refc(c_p); /* scheduler threads */ - erts_smp_schedule_misc_aux_work(0, - erts_no_schedulers, - prep_setup_completed_dealloc, - (void *) c_p); + erts_schedule_multi_misc_aux_work(0, + erts_no_schedulers, + prep_setup_completed_dealloc, + (void *) c_p); /* aux_thread */ - smp_schedule_misc_aux_work(0, - prep_setup_completed_dealloc, - (void *) c_p); + erts_schedule_misc_aux_work(0, + prep_setup_completed_dealloc, + (void *) c_p); return 1; } return 0; @@ -1062,10 +1173,24 @@ handle_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t aux_work) ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); } #ifdef ERTS_SMP + if (aux_work & ERTS_SSI_AUX_WORK_MISC_THR_PRGR) { + aux_work = handle_misc_aux_work_thr_prgr(awdp, aux_work); + ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); + } +#endif if (aux_work & ERTS_SSI_AUX_WORK_MISC) { aux_work = handle_misc_aux_work(awdp, aux_work); ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); } +#if ERTS_USE_ASYNC_READY_Q + if (aux_work & ERTS_SSI_AUX_WORK_ASYNC_READY) { + aux_work = handle_async_ready(awdp, aux_work); + ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); + } + if (aux_work & ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN) { + aux_work = handle_async_ready_clean(awdp, aux_work); + ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); + } #endif #ifdef ERTS_SMP_SCHEDULERS_NEED_TO_CHECK_CHILDREN if (aux_work & ERTS_SSI_AUX_WORK_CHECK_CHILDREN) { @@ -3191,10 +3316,18 @@ init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp) awdp->esdp = esdp; awdp->ssi = esdp ? esdp->ssi : NULL; #ifdef ERTS_SMP + awdp->misc.thr_prgr = ERTS_THR_PRGR_VAL_WAITING; awdp->dd.thr_prgr = ERTS_THR_PRGR_VAL_WAITING; awdp->dd.completed_callback = NULL; awdp->dd.completed_arg = NULL; #endif +#ifdef ERTS_USE_ASYNC_READY_Q +#ifdef ERTS_SMP + awdp->async_ready.need_thr_prgr = 0; + awdp->async_ready.thr_prgr = ERTS_THR_PRGR_VAL_WAITING; +#endif + awdp->async_ready.queue = NULL; +#endif } void @@ -3385,9 +3518,10 @@ erts_init_scheduling(int mrq, int no_schedulers, int no_schedulers_online) init_aux_work_data(&esdp->aux_work_data, esdp); } -#ifdef ERTS_SMP init_misc_aux_work(); +#ifdef ERTS_SMP + erts_atomic32_init_nob(&completed_dealloc_count, 0); /* debug only */ aux_thread_aux_work_data = @@ -4408,6 +4542,9 @@ sched_thread_func(void *vesdp) #if HAVE_ERTS_MSEG erts_mseg_late_init(); #endif +#if ERTS_USE_ASYNC_READY_Q + esdp->aux_work_data.async_ready.queue = erts_get_async_ready_queue(no); +#endif erts_sched_init_check_cpu_bind(esdp); diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 8a0944236c..4027fade35 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -54,6 +54,7 @@ typedef struct process Process; #include "erl_atom_table.h" #include "external.h" #include "erl_mseg.h" +#include "erl_async.h" #ifdef HIPE #include "hipe_process.h" @@ -251,13 +252,18 @@ typedef enum { #define ERTS_SSI_AUX_WORK_SET_TMO (((erts_aint32_t) 1) << 0) #define ERTS_SSI_AUX_WORK_CHECK_CHILDREN (((erts_aint32_t) 1) << 1) #define ERTS_SSI_AUX_WORK_MISC (((erts_aint32_t) 1) << 2) -#define ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM (((erts_aint32_t) 1) << 3) -#define ERTS_SSI_AUX_WORK_FIX_ALLOC_DEALLOC (((erts_aint32_t) 1) << 4) #ifdef ERTS_SMP -#define ERTS_SSI_AUX_WORK_DD (((erts_aint32_t) 1) << 5) -#define ERTS_SSI_AUX_WORK_DD_THR_PRGR (((erts_aint32_t) 1) << 6) +#define ERTS_SSI_AUX_WORK_MISC_THR_PRGR (((erts_aint32_t) 1) << 3) #endif -#define ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK (((erts_aint32_t) 1) << 7) +#define ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM (((erts_aint32_t) 1) << 4) +#define ERTS_SSI_AUX_WORK_FIX_ALLOC_DEALLOC (((erts_aint32_t) 1) << 5) +#define ERTS_SSI_AUX_WORK_ASYNC_READY (((erts_aint32_t) 1) << 6) +#define ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN (((erts_aint32_t) 1) << 7) +#ifdef ERTS_SMP +#define ERTS_SSI_AUX_WORK_DD (((erts_aint32_t) 1) << 8) +#define ERTS_SSI_AUX_WORK_DD_THR_PRGR (((erts_aint32_t) 1) << 9) +#endif +#define ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK (((erts_aint32_t) 1) << 10) #if !HAVE_ERTS_MSEG # undef ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK @@ -404,6 +410,9 @@ typedef struct { ErtsSchedulerSleepInfo *ssi; struct { int ix; +#ifdef ERTS_SMP + ErtsThrPrgrVal thr_prgr; +#endif } misc; #ifdef ERTS_SMP struct { @@ -412,6 +421,15 @@ typedef struct { void (*completed_arg)(void *); } dd; #endif +#ifdef ERTS_USE_ASYNC_READY_Q + struct { +#ifdef ERTS_SMP + int need_thr_prgr; + ErtsThrPrgrVal thr_prgr; +#endif + void *queue; + } async_ready; +#endif } ErtsAuxWorkData; struct ErtsSchedulerData_ { @@ -1090,12 +1108,17 @@ Eterm erts_multi_scheduling_blockers(Process *); void erts_start_schedulers(void); void erts_alloc_notify_delayed_dealloc(int); void erts_smp_notify_check_children_needed(void); -void -erts_smp_schedule_misc_aux_work(int ignore_self, - int max_sched, - void (*func)(void *), - void *arg); #endif +#if ERTS_USE_ASYNC_READY_Q +void erts_notify_check_async_ready_queue(void *); +#endif +void erts_schedule_misc_aux_work(int sched_id, + void (*func)(void *), + void *arg); +void erts_schedule_multi_misc_aux_work(int ignore_self, + int max_sched, + void (*func)(void *), + void *arg); erts_aint32_t erts_set_aux_work_timeout(int, erts_aint32_t, int); void erts_sched_notify_check_cpu_bind(void); Uint erts_active_schedulers(void); diff --git a/erts/emulator/beam/erl_thr_queue.c b/erts/emulator/beam/erl_thr_queue.c new file mode 100644 index 0000000000..9ac4cd4b8e --- /dev/null +++ b/erts/emulator/beam/erl_thr_queue.c @@ -0,0 +1,745 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2011. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +/* + * Description: Lock-free queue for communication between threads. + * + * Currently only a many-to-one version has been, + * implemented, i.e., many threads can enqueue but + * only one thread can dequeue at a time. It doesn't + * have to be the same thread dequeuing every time, but + * synchronization so that only one thread dequeues + * at a time has to be provided by other means. + * + * When/If the need for a many-to-many queue arises, + * this implementation can relatively easy be extended + * to support that too. + * + * Usage instructions below. + * + * Author: Rickard Green + */ + +/* + * ------ Usage instructions ----------------------------------------------- + * + * Dequeuing generates garbage that needs to be cleaned up. + * erts_thr_q_dequeue() automatically cleans, but garbage may have to be + * cleaned up also when the queue is empty. This is done by calling + * erts_thr_q_clean(). In the SMP case thread progress may have to be made + * before cleaning can continue. If so, erts_thr_q_need_thr_progress() in + * combination with erts_thr_progress_wakeup() can be used in order to + * request a wakeup at appropriate time. + * + * Enqueuing implies memory allocation and dequeuing implies memory + * deallocation. Memory allocation can be moved to another more suitable + * thread using erts_thr_q_prepare_enqueue() together with + * erts_thr_q_enqueue_prepared() instead of using erts_thr_q_enqueue(). + * Memory deallocation can can be moved to another more suitable thread by + * disabling auto_finalize_dequeue when initializing the queue and then use + * erts_thr_q_get_finalize_dequeue_data() together + * erts_thr_q_finalize_dequeue() after dequeuing or cleaning. + * + * Ending the life of the queue using either erts_thr_q_destroy() + * or erts_thr_q_finalize() impies cleaning the queue. Both functions + * return the cleaning result and may have to be called multiple times + * until the queue is clean. Once one of these functions have been called + * enqueuing is not allowed. This has to be synchronized by the user. + * If auto_finalize_dequeue has been disabled, the finalize dequeue + * functionality has to be called after ending the life of the queue just + * as when dequeuing or cleaning on a queue that is alive. + * + * ------------------------------------------------------------------------- + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "erl_thr_queue.h" + +#if defined(DEBUG) +#define ERTS_THR_Q_DBG_CHK_DATA 1 +#else +#define ERTS_THR_Q_DBG_CHK_DATA 0 +#endif + +#define ERTS_THR_Q_MAX_CLEAN_REACHED_HEAD_COUNT 100 +#define ERTS_THR_Q_MAX_SCHED_CLEAN_OPS 50 +#define ERTS_THR_Q_MAX_DEQUEUE_CLEAN_OPS 3 + +#define ERTS_THR_Q_MAX_FINI_DEQ_OPS 50 + +#ifdef ERTS_SMP +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(sl_element, + ErtsThrQElement_t, + 1000, + ERTS_ALC_T_THR_Q_EL_SL) +#else + +static void +init_sl_element_alloc(void) +{ +} + +static ErtsThrQElement_t * +sl_element_alloc(void) +{ + return erts_alloc(ERTS_ALC_T_THR_Q_EL_SL, + sizeof(ErtsThrQElement_t)); +} + +static void +sl_element_free(ErtsThrQElement_t *p) +{ + erts_free(ERTS_ALC_T_THR_Q_EL_SL, p); +} + +#endif + +typedef union { + ErtsThrQ_t q; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQ_t))]; +} ErtsAlignedThrQ_t; + +void +erts_thr_q_init(void) +{ + init_sl_element_alloc(); +} + +static void noop_callback(void *arg) { } + +void +erts_thr_q_initialize(ErtsThrQ_t *q, ErtsThrQInit_t *qi) +{ +#ifndef USE_THREADS + q->init = *qi; + if (!q->init.notify) + q->init.notify = noop_callback; + q->first = NULL; + q->last = NULL; + q->q.blk = NULL; +#else + erts_atomic_init_nob(&q->tail.data.marker.next.atmc, ERTS_AINT_NULL); + q->tail.data.marker.data.ptr = NULL; + erts_atomic_init_nob(&q->tail.data.last, + (erts_aint_t) &q->tail.data.marker); + erts_atomic_init_nob(&q->tail.data.um_refc[0], 0); + erts_atomic_init_nob(&q->tail.data.um_refc[1], 0); + erts_atomic32_init_nob(&q->tail.data.um_refc_ix, 0); + q->tail.data.live = qi->live.objects; + q->tail.data.arg = qi->arg; + q->tail.data.notify = qi->notify; + if (!q->tail.data.notify) + q->tail.data.notify = noop_callback; + + q->head.head.ptr = &q->tail.data.marker; + q->head.live = qi->live.objects; + q->head.first = &q->tail.data.marker; + q->head.unref_end = &q->tail.data.marker; + q->head.clean_reached_head_count = 0; + q->head.deq_fini.automatic = qi->auto_finalize_dequeue; + q->head.deq_fini.start = NULL; + q->head.deq_fini.end = NULL; +#ifdef ERTS_SMP + q->head.next.thr_progress = erts_thr_progress_current(); + q->head.next.thr_progress_reached = 1; +#endif + q->head.next.um_refc_ix = 1; + q->head.next.unref_end = &q->tail.data.marker; + q->head.used_marker = 1; + q->head.arg = qi->arg; + q->head.notify = q->tail.data.notify; + q->q.finalizing = 0; + q->q.live = qi->live.queue; + q->q.blk = NULL; +#endif +} + +ErtsThrQCleanState_t +erts_thr_q_finalize(ErtsThrQ_t *q) +{ +#ifdef USE_THREADS + q->q.finalizing = 1; +#endif + while (erts_thr_q_dequeue(q)); + return erts_thr_q_clean(q); +} + +ErtsThrQ_t * +erts_thr_q_create(ErtsThrQInit_t *qi) +{ + ErtsAlcType_t atype; + ErtsThrQ_t *q, *qblk; + UWord qw; + + switch (qi->live.queue) { + case ERTS_THR_Q_LIVE_SHORT: + atype = ERTS_ALC_T_THR_Q_SL; + break; + case ERTS_THR_Q_LIVE_LONG: + atype = ERTS_ALC_T_THR_Q_LL; + break; + default: + atype = ERTS_ALC_T_THR_Q; + break; + } + + qw = (UWord) erts_alloc(atype, + sizeof(ErtsThrQ_t) + (ERTS_CACHE_LINE_SIZE-1)); + qblk = (ErtsThrQ_t *) qw; + if (qw & ERTS_CACHE_LINE_MASK) + qw = (qw & ~ERTS_CACHE_LINE_MASK) + ERTS_CACHE_LINE_SIZE; + ASSERT((qw & ERTS_CACHE_LINE_MASK) == 0); + q = (ErtsThrQ_t *) qw; + erts_thr_q_initialize(q, qi); + q->q.blk = qblk; + return q; +} + +ErtsThrQCleanState_t +erts_thr_q_destroy(ErtsThrQ_t *q) +{ + if (!q->q.blk) + erl_exit(ERTS_ABORT_EXIT, + "Trying to destroy not created thread queue\n"); + return erts_thr_q_finalize(q); +} + +#ifdef USE_THREADS + +static void +destroy(ErtsThrQ_t *q) +{ + ErtsAlcType_t atype; + switch (q->q.live) { + case ERTS_THR_Q_LIVE_SHORT: + atype = ERTS_ALC_T_THR_Q_SL; + break; + case ERTS_THR_Q_LIVE_LONG: + atype = ERTS_ALC_T_THR_Q_LL; + break; + default: + atype = ERTS_ALC_T_THR_Q; + break; + } + erts_free(atype, q->q.blk); +} + +#endif + +static ERTS_INLINE ErtsThrQElement_t * +element_live_alloc(ErtsThrQLive_t live) +{ + switch (live) { + case ERTS_THR_Q_LIVE_SHORT: + return sl_element_alloc(); + default: + return (ErtsThrQElement_t *) erts_alloc(ERTS_ALC_T_THR_Q_EL, + sizeof(ErtsThrQElement_t)); + } +} + +static ERTS_INLINE ErtsThrQElement_t * +element_alloc(ErtsThrQ_t *q) +{ + ErtsThrQLive_t live; +#ifdef USE_THREADS + live = q->tail.data.live; +#else + live = q->init.live.objects; +#endif + return element_live_alloc(live); +} + +static ERTS_INLINE void +element_live_free(ErtsThrQLive_t live, ErtsThrQElement_t *el) +{ + switch (live) { + case ERTS_THR_Q_LIVE_SHORT: + sl_element_free(el); + break; + default: + erts_free(ERTS_ALC_T_THR_Q_EL, el); + } +} + +static ERTS_INLINE void +element_free(ErtsThrQ_t *q, ErtsThrQElement_t *el) +{ + ErtsThrQLive_t live; +#ifdef USE_THREADS + live = q->head.live; +#else + live = q->init.live.objects; +#endif + element_live_free(live, el); +} + +#ifdef USE_THREADS + +static ERTS_INLINE ErtsThrQElement_t * +enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last) +{ + erts_aint_t ilast, itmp; + + erts_atomic_init_nob(&this->next.atmc, ERTS_AINT_NULL); + /* Enqueue at end of list... */ + + ilast = erts_atomic_read_nob(&q->tail.data.last); + while (1) { + ErtsThrQElement_t *last = (ErtsThrQElement_t *) ilast; + itmp = erts_atomic_cmpxchg_mb(&last->next.atmc, + (erts_aint_t) this, + ERTS_AINT_NULL); + if (itmp == ERTS_AINT_NULL) + break; + ilast = itmp; + } + + /* Move last pointer forward... */ + while (1) { + if (want_last) { + if (erts_atomic_read_rb(&this->next.atmc) != ERTS_AINT_NULL) { + /* Someone else will move it forward */ + ilast = erts_atomic_read_rb(&q->tail.data.last); + return (ErtsThrQElement_t *) ilast; + } + } + else { + if (erts_atomic_read_nob(&this->next.atmc) != ERTS_AINT_NULL) { + /* Someone else will move it forward */ + return NULL; + } + } + itmp = erts_atomic_cmpxchg_mb(&q->tail.data.last, + (erts_aint_t) this, + ilast); + if (ilast == itmp) + return want_last ? this : NULL; + ilast = itmp; + } +} + +static ErtsThrQCleanState_t +clean(ErtsThrQ_t *q, int max_ops, int do_notify) +{ + erts_aint_t ilast; + int um_refc_ix; + int ops; + + for (ops = 0; ops < max_ops; ops++) { + ErtsThrQElement_t *tmp; + restart: + ASSERT(q->head.first); + if (q->head.first == q->head.head.ptr) { + q->head.clean_reached_head_count++; + if (q->head.clean_reached_head_count + >= ERTS_THR_Q_MAX_CLEAN_REACHED_HEAD_COUNT) { + q->head.clean_reached_head_count = 0; + break; + } + goto inspect_head; + } + if (q->head.first == q->head.unref_end) + break; + if (q->head.first == &q->tail.data.marker) { + q->head.used_marker = 0; + q->head.first = q->head.first->next.ptr; + goto restart; + } + tmp = q->head.first; + q->head.first = q->head.first->next.ptr; + if (q->head.deq_fini.automatic) + element_free(q, tmp); + else { + tmp->data.ptr = (void *) (UWord) q->head.live; + if (!q->head.deq_fini.start) + q->head.deq_fini.start = tmp; + else if (q->head.deq_fini.end->next.ptr == &q->tail.data.marker) + q->head.deq_fini.end->next.ptr = tmp; + q->head.deq_fini.end = tmp; + } + } + + ilast = erts_atomic_read_nob(&q->tail.data.last); + if (q->head.first == ((ErtsThrQElement_t *) ilast) + && ((ErtsThrQElement_t *) ilast) == &q->tail.data.marker + && q->head.first == &q->tail.data.marker) { + /* Empty and clean queue */ + if (q->q.finalizing) + destroy(q); + return ERTS_THR_Q_CLEAN; + } + +#ifdef ERTS_SMP + if (q->head.next.thr_progress_reached + || erts_thr_progress_has_reached(q->head.next.thr_progress)) { + q->head.next.thr_progress_reached = 1; +#endif + um_refc_ix = q->head.next.um_refc_ix; + if (erts_atomic_read_acqb(&q->tail.data.um_refc[um_refc_ix]) == 0) { + /* Move unreferenced end pointer forward... */ + q->head.clean_reached_head_count = 0; + q->head.unref_end = q->head.next.unref_end; + + if (!q->head.used_marker + && q->head.unref_end == (ErtsThrQElement_t *) ilast) { + q->head.used_marker = 1; + ilast = (erts_aint_t) enqueue_managed(q, + &q->tail.data.marker, + 1); + if (q->head.head.ptr == q->head.unref_end) { + ErtsThrQElement_t *next; + next = ((ErtsThrQElement_t *) + erts_atomic_read_acqb(&q->head.head.ptr->next.atmc)); + if (next == &q->tail.data.marker) { + q->head.head.ptr->next.ptr = &q->tail.data.marker; + q->head.head.ptr = &q->tail.data.marker; + } + } + } + + if (q->head.unref_end == (ErtsThrQElement_t *) ilast) + ERTS_THR_MEMORY_BARRIER; + else { + q->head.next.unref_end = (ErtsThrQElement_t *) ilast; + ERTS_THR_MEMORY_BARRIER; +#ifdef ERTS_SMP + q->head.next.thr_progress = erts_thr_progress_later(); +#endif + erts_atomic32_set_relb(&q->tail.data.um_refc_ix, + um_refc_ix); + q->head.next.um_refc_ix = um_refc_ix == 0 ? 1 : 0; +#ifdef ERTS_SMP + q->head.next.thr_progress_reached = 0; +#endif + } + } +#ifdef ERTS_SMP + } +#endif + + if (q->head.first == q->head.head.ptr) { + inspect_head: + if (!q->head.used_marker) { + erts_aint_t inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) { + q->head.used_marker = 1; + (void) enqueue_managed(q, &q->tail.data.marker, 0); + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == (erts_aint_t) &q->tail.data.marker) { + q->head.head.ptr->next.ptr = &q->tail.data.marker; + q->head.head.ptr = &q->tail.data.marker; +#ifdef ERTS_SMP + if (!q->head.next.thr_progress_reached) + return ERTS_THR_Q_NEED_THR_PRGR; +#else + if (do_notify) + q->head.notify(q->head.arg); +#endif + return ERTS_THR_Q_DIRTY; + } + } + } + return ERTS_THR_Q_CLEAN; + } + + if (q->head.first != q->head.unref_end) { + if (do_notify) + q->head.notify(q->head.arg); + return ERTS_THR_Q_DIRTY; + } + +#ifdef ERTS_SMP + if (!q->head.next.thr_progress_reached) + return ERTS_THR_Q_NEED_THR_PRGR; +#endif + + return ERTS_THR_Q_CLEAN; /* Waiting for unmanaged threads to complete... */ +} + +#endif + +ErtsThrQCleanState_t +erts_thr_q_clean(ErtsThrQ_t *q) +{ +#ifdef USE_THREADS + return clean(q, ERTS_THR_Q_MAX_SCHED_CLEAN_OPS, 0); +#else + return ERTS_THR_Q_CLEAN; +#endif +} + +ErtsThrQCleanState_t +erts_thr_q_inspect(ErtsThrQ_t *q, int ensure_empty) +{ +#ifdef USE_THREADS + if (ensure_empty) { + erts_aint_t inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext != ERTS_AINT_NULL) { + if (&q->tail.data.marker != (ErtsThrQElement_t *) inext) + return ERTS_THR_Q_DIRTY; + else { + q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; + q->head.head.ptr = (ErtsThrQElement_t *) inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext != ERTS_AINT_NULL) + return ERTS_THR_Q_DIRTY; + } + } + } + + if (q->head.first == q->head.head.ptr) { + if (!q->head.used_marker) { + erts_aint_t inext; + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) + return ERTS_THR_Q_DIRTY; + } + return ERTS_THR_Q_CLEAN; + } + + if (q->head.first != q->head.unref_end) + return ERTS_THR_Q_DIRTY; + +#ifdef ERTS_SMP + if (!q->head.next.thr_progress_reached) + return ERTS_THR_Q_NEED_THR_PRGR; +#endif +#endif + return ERTS_THR_Q_CLEAN; +} + +static void +enqueue(ErtsThrQ_t *q, void *data, ErtsThrQElement_t *this) +{ +#ifndef USE_THREADS + ASSERT(data); + + this->next.ptr = NULL; + this->data.ptr = data; + + if (q->last) + q->last->next.ptr = this; + else { + q->first = q->last = this; + q->init.notify(q->init.arg); + } +#else + int notify; + int um_refc_ix = 0; +#ifdef ERTS_SMP + int unmanaged_thread; +#endif + +#if ERTS_THR_Q_DBG_CHK_DATA + if (!data) + erl_exit(ERTS_ABORT_EXIT, "Missing data in enqueue\n"); +#endif + + ASSERT(!q->q.finalizing); + + this->data.ptr = data; + +#ifdef ERTS_SMP + unmanaged_thread = !erts_thr_progress_is_managed_thread(); + if (unmanaged_thread) +#endif + { + um_refc_ix = erts_atomic32_read_acqb(&q->tail.data.um_refc_ix); + while (1) { + int tmp_um_refc_ix; + erts_atomic_inc_acqb(&q->tail.data.um_refc[um_refc_ix]); + tmp_um_refc_ix = erts_atomic32_read_acqb(&q->tail.data.um_refc_ix); + if (tmp_um_refc_ix == um_refc_ix) + break; + erts_atomic_dec_relb(&q->tail.data.um_refc[um_refc_ix]); + um_refc_ix = tmp_um_refc_ix; + } + } + + notify = this == enqueue_managed(q, this, 1); + + +#ifdef ERTS_SMP + if (unmanaged_thread) +#endif + { + if (notify) + erts_atomic_dec_relb(&q->tail.data.um_refc[um_refc_ix]); + else if (erts_atomic_dec_read_relb(&q->tail.data.um_refc[um_refc_ix]) == 0) + notify = 1; + } + if (notify) + q->tail.data.notify(q->tail.data.arg); +#endif +} + +void +erts_thr_q_enqueue(ErtsThrQ_t *q, void *data) +{ + enqueue(q, data, element_alloc(q)); +} + +ErtsThrQPrepEnQ_t * +erts_thr_q_prepare_enqueue(ErtsThrQ_t *q) +{ + return (ErtsThrQPrepEnQ_t *) element_alloc(q); +} + +int +erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *q, ErtsThrQFinDeQ_t *fdp) +{ +#ifndef USE_THREADS + return 0; +#else +#ifdef DEBUG + if (!q->head.deq_fini.start) { + ASSERT(!q->head.deq_fini.end); + } + else { + ErtsThrQElement_t *e = q->head.deq_fini.start; + ErtsThrQElement_t *end = q->head.deq_fini.end; + while (e != end) { + ASSERT(q->head.head.ptr != e); + ASSERT(q->head.first != e); + ASSERT(q->head.unref_end != e); + e = e->next.ptr; + } + } +#endif + fdp->start = q->head.deq_fini.start; + fdp->end = q->head.deq_fini.end; + if (fdp->end) + fdp->end->next.ptr = NULL; + q->head.deq_fini.start = NULL; + q->head.deq_fini.end = NULL; + return fdp->start != NULL; +#endif +} + +void +erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *fdp0, + ErtsThrQFinDeQ_t *fdp1) +{ +#ifdef USE_THREADS + if (fdp1->start) { + if (fdp0->end) + fdp0->end->next.ptr = fdp1->start; + else + fdp0->start = fdp1->start; + fdp0->end = fdp1->end; + } +#endif +} + + +int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *state) +{ +#ifdef USE_THREADS + ErtsThrQElement_t *start = state->start; + if (start) { + ErtsThrQLive_t live; + int i; + for (i = 0; i < ERTS_THR_Q_MAX_FINI_DEQ_OPS; i++) { + ErtsThrQElement_t *tmp; + if (!start) + break; + tmp = start; + start = start->next.ptr; + live = (ErtsThrQLive_t) (UWord) tmp->data.ptr; + element_live_free(live, tmp); + } + state->start = start; + if (start) + return 1; /* More to do */ + state->end = NULL; + } +#endif + return 0; +} + +void +erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *state) +{ +#ifdef USE_THREADS + state->start = NULL; + state->end = NULL; +#endif +} + + +void +erts_thr_q_enqueue_prepared(ErtsThrQ_t *q, void *data, ErtsThrQPrepEnQ_t *prep) +{ + ASSERT(prep); + enqueue(q, data, (ErtsThrQElement_t *) prep); +} + +void * +erts_thr_q_dequeue(ErtsThrQ_t *q) +{ +#ifndef USE_THREADS + void *res; + ErtsThrQElement_t *tmp; + + if (!q->first) + return NULL; + tmp = q->first; + res = tmp->data.ptr; + q->first = tmp->next.ptr; + if (!q->first) + q->last = NULL; + + element_free(q, tmp); + + return res; +#else + erts_aint_t inext; + void *res; + + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) + return NULL; + q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; + q->head.head.ptr = (ErtsThrQElement_t *) inext; + if (q->head.head.ptr == &q->tail.data.marker) { + inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + if (inext == ERTS_AINT_NULL) + return NULL; + q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; + q->head.head.ptr = (ErtsThrQElement_t *) inext; + } + res = q->head.head.ptr->data.ptr; +#if ERTS_THR_Q_DBG_CHK_DATA + q->head.head.ptr->data.ptr = NULL; + if (!res) + erl_exit(ERTS_ABORT_EXIT, "Missing data in dequeue\n"); +#endif + clean(q, + (q->head.deq_fini.automatic + ? ERTS_THR_Q_MAX_DEQUEUE_CLEAN_OPS + : ERTS_THR_Q_MAX_SCHED_CLEAN_OPS), 1); + return res; +#endif +} diff --git a/erts/emulator/beam/erl_thr_queue.h b/erts/emulator/beam/erl_thr_queue.h new file mode 100644 index 0000000000..407c23f5eb --- /dev/null +++ b/erts/emulator/beam/erl_thr_queue.h @@ -0,0 +1,211 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2011. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +/* + * Description: Lock-free queue for communication between threads. + * + * Currently only a many-to-one version has been, + * implemented, i.e., many threads can enqueue but + * only one thread can dequeue at a time. It doesn't + * have to be the same thread dequeuing every time, but + * synchronization so that only one thread dequeues + * at a time has to be provided by other means. + * + * When/If the need for a many-to-many queue arises, + * this implementation can relatively easy be extended + * to support that too. + * + * Usage instructions can be found in erts_thr_queue.c + * + * Author: Rickard Green + */ + +#ifndef ERL_THR_QUEUE_H__ +#define ERL_THR_QUEUE_H__ + +#include "sys.h" +#include "erl_threads.h" +#include "erl_alloc.h" +#include "erl_thr_progress.h" + +typedef enum { + ERTS_THR_Q_LIVE_UNDEF, + ERTS_THR_Q_LIVE_SHORT, + ERTS_THR_Q_LIVE_LONG +} ErtsThrQLive_t; + +#define ERTS_THR_Q_INIT_DEFAULT \ +{ \ + { \ + ERTS_THR_Q_LIVE_UNDEF, \ + ERTS_THR_Q_LIVE_SHORT \ + }, \ + NULL, \ + NULL, \ + 1 \ +} + +typedef struct ErtsThrQ_t_ ErtsThrQ_t; + +typedef struct { + struct { + ErtsThrQLive_t queue; + ErtsThrQLive_t objects; + } live; + void *arg; + void (*notify)(void *); + int auto_finalize_dequeue; +} ErtsThrQInit_t; + +typedef struct ErtsThrQElement_t_ ErtsThrQElement_t; +typedef struct ErtsThrQElement_t ErtsThrQPrepEnQ_t; + +typedef union { + erts_atomic_t atmc; + ErtsThrQElement_t *ptr; +} ErtsThrQPtr_t; + +struct ErtsThrQElement_t_ { + ErtsThrQPtr_t next; + union { + erts_atomic_t atmc; + void *ptr; + } data; +}; + +typedef struct { + ErtsThrQElement_t *start; + ErtsThrQElement_t *end; +} ErtsThrQFinDeQ_t; + +typedef enum { + ERTS_THR_Q_CLEAN, +#ifdef ERTS_SMP + ERTS_THR_Q_NEED_THR_PRGR, +#endif + ERTS_THR_Q_DIRTY, +} ErtsThrQCleanState_t; + +#ifdef USE_THREADS + +typedef struct { + ErtsThrQElement_t marker; + erts_atomic_t last; + erts_atomic_t um_refc[2]; + erts_atomic32_t um_refc_ix; + ErtsThrQLive_t live; +#ifdef ERTS_SMP + erts_atomic32_t thr_prgr_clean_scheduled; +#endif + void *arg; + void (*notify)(void *); +} ErtsThrQTail_t; + +struct ErtsThrQ_t_ { + /* + * This structure needs to be cache line aligned for best + * performance. + */ + union { + /* Modified by threads enqueuing */ + ErtsThrQTail_t data; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQTail_t))]; + } tail; + /* + * Everything below this point is *only* accessed by the + * thread dequeuing. + */ + struct { + ErtsThrQPtr_t head; + ErtsThrQLive_t live; + ErtsThrQElement_t *first; + ErtsThrQElement_t *unref_end; + int clean_reached_head_count; + struct { + int automatic; + ErtsThrQElement_t *start; + ErtsThrQElement_t *end; + } deq_fini; + struct { +#ifdef ERTS_SMP + ErtsThrPrgrVal thr_progress; + int thr_progress_reached; +#endif + int um_refc_ix; + ErtsThrQElement_t *unref_end; + } next; + int used_marker; + void *arg; + void (*notify)(void *); + } head; + struct { + int finalizing; + ErtsThrQLive_t live; + void *blk; + } q; +}; + +#else /* !USE_THREADS */ + +struct ErtsThrQ_t_ { + ErtsThrQInit_t init; + ErtsThrQElement_t *first; + ErtsThrQElement_t *last; + struct { + void *blk; + } q; +}; + +#endif + +void erts_thr_q_init(void); +void erts_thr_q_initialize(ErtsThrQ_t *, ErtsThrQInit_t *); +ErtsThrQCleanState_t erts_thr_q_finalize(ErtsThrQ_t *); +ErtsThrQ_t *erts_thr_q_create(ErtsThrQInit_t *); +ErtsThrQCleanState_t erts_thr_q_destroy(ErtsThrQ_t *); +ErtsThrQCleanState_t erts_thr_q_clean(ErtsThrQ_t *); +ErtsThrQCleanState_t erts_thr_q_inspect(ErtsThrQ_t *, int); +ErtsThrQPrepEnQ_t *erts_thr_q_prepare_enqueue(ErtsThrQ_t *); +void erts_thr_q_enqueue_prepared(ErtsThrQ_t *, void *, ErtsThrQPrepEnQ_t *); +void erts_thr_q_enqueue(ErtsThrQ_t *, void *); +void * erts_thr_q_dequeue(ErtsThrQ_t *); +int erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *, + ErtsThrQFinDeQ_t *); +void erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *, + ErtsThrQFinDeQ_t *); +int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *); +void erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *); + +#ifdef ERTS_SMP +ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_q_need_thr_progress(ErtsThrQ_t *q); +#endif + +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +#ifdef ERTS_SMP +ERTS_GLB_INLINE ErtsThrPrgrVal +erts_thr_q_need_thr_progress(ErtsThrQ_t *q) +{ + return q->head.next.thr_progress; +} +#endif + +#endif /* ERTS_GLB_INLINE_INCL_FUNC_DEF */ + +#endif /* ERL_THR_QUEUE_H__ */ diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index 684e910fc3..4a4973baab 100644 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -42,12 +42,6 @@ typedef struct port Port; #include "erl_port_task.h" -#define ERTS_MAX_NO_OF_ASYNC_THREADS 1024 -extern int erts_async_max_threads; -#define ERTS_ASYNC_THREAD_MIN_STACK_SIZE 16 /* Kilo words */ -#define ERTS_ASYNC_THREAD_MAX_STACK_SIZE 8192 /* Kilo words */ -extern int erts_async_thread_suggested_stack_size; - typedef struct erts_driver_t_ erts_driver_t; #define SMALL_IO_QUEUE 5 /* Number of fixed elements */ diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 151c776a3d..fff720634d 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -42,6 +42,7 @@ #include "erl_bits.h" #include "erl_version.h" #include "error.h" +#include "erl_async.h" extern ErlDrvEntry fd_driver_entry; extern ErlDrvEntry vanilla_driver_entry; @@ -4579,7 +4580,10 @@ int driver_lock_driver(ErlDrvPort ix) erts_smp_mtx_lock(&erts_driver_list_lock); - if (prt == NULL) return -1; + if (prt == NULL) { + erts_smp_mtx_unlock(&erts_driver_list_lock); + return -1; + } ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); if ((dh = (DE_Handle*)prt->drv_ptr->handle ) == NULL) { diff --git a/erts/emulator/beam/sys.h b/erts/emulator/beam/sys.h index b63fe98f27..f9cbcc5892 100644 --- a/erts/emulator/beam/sys.h +++ b/erts/emulator/beam/sys.h @@ -475,15 +475,6 @@ __decl_noreturn void __noreturn erl_exit(int n, char*, ...); #define ERTS_ABORT_EXIT (INT_MIN + 1) /* no crash dump; only abort() */ #define ERTS_DUMP_EXIT (127) /* crash dump; then exit() */ - -#ifndef ERTS_SMP -int check_async_ready(void); -#ifdef USE_THREADS -void sys_async_ready(int hndl); -int erts_register_async_ready_callback(void (*funcp)(void)); -#endif -#endif - Eterm erts_check_io_info(void *p); /* Size of misc memory allocated from system dependent code */ @@ -671,6 +662,8 @@ int erts_sys_putenv(char *key_value, int sep_ix); *size), a value > 0 if value buffer is too small (*size is set to needed size), and a value < 0 on failure. */ int erts_sys_getenv(char *key, char *value, size_t *size); +/* erts_sys_getenv__() is only allowed to be used in early init phase */ +int erts_sys_getenv__(char *key, char *value, size_t *size); /* Easier to use, but not as efficient, environment functions */ char *erts_read_env(char *key); diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c index 65485241aa..1bd178f280 100644 --- a/erts/emulator/beam/utils.c +++ b/erts/emulator/beam/utils.c @@ -43,6 +43,7 @@ #include "erl_smp.h" #include "erl_time.h" #include "erl_thr_progress.h" +#include "erl_thr_queue.h" #include "erl_sched_spec_pre_alloc.h" #undef M_TRIM_THRESHOLD diff --git a/erts/emulator/sys/unix/sys.c b/erts/emulator/sys/unix/sys.c index d7c4812dad..c6b63350e5 100644 --- a/erts/emulator/sys/unix/sys.c +++ b/erts/emulator/sys/unix/sys.c @@ -128,7 +128,6 @@ static ErtsSysReportExit *report_exit_list; static ErtsSysReportExit *report_exit_transit_list; #endif -extern int check_async_ready(void); extern int driver_interrupt(int, int); extern void do_break(void); @@ -1120,31 +1119,6 @@ struct erl_drv_entry vanilla_driver_entry = { stop_select }; -#if defined(USE_THREADS) && !defined(ERTS_SMP) -static int async_drv_init(void); -static ErlDrvData async_drv_start(ErlDrvPort, char*, SysDriverOpts*); -static void async_drv_stop(ErlDrvData); -static void async_drv_input(ErlDrvData, ErlDrvEvent); - -/* INTERNAL use only */ - -struct erl_drv_entry async_driver_entry = { - async_drv_init, - async_drv_start, - async_drv_stop, - NULL, - async_drv_input, - NULL, - "async", - NULL, - NULL, - NULL, - NULL, - NULL, - NULL -}; -#endif - /* Handle SIGCHLD signals. */ #if (defined(SIG_SIGSET) || defined(SIG_SIGNAL)) static RETSIGTYPE onchld(void) @@ -2329,87 +2303,6 @@ static void stop_select(ErlDrvEvent fd, void* _) close((int)fd); } -/* -** Async opertation support -*/ -#if defined(USE_THREADS) && !defined(ERTS_SMP) -static void -sys_async_ready_failed(int fd, int r, int err) -{ - char buf[120]; - sprintf(buf, "sys_async_ready(): Fatal error: fd=%d, r=%d, errno=%d\n", - fd, r, err); - erts_silence_warn_unused_result(write(2, buf, strlen(buf))); - abort(); -} - -/* called from threads !! */ -void sys_async_ready(int fd) -{ - int r; - while (1) { - r = write(fd, "0", 1); /* signal main thread fd MUST be async_fd[1] */ - if (r == 1) { - DEBUGF(("sys_async_ready(): r = 1\r\n")); - break; - } - if (r < 0 && errno == EINTR) { - DEBUGF(("sys_async_ready(): r = %d\r\n", r)); - continue; - } - sys_async_ready_failed(fd, r, errno); - } -} - -static int async_drv_init(void) -{ - async_fd[0] = -1; - async_fd[1] = -1; - return 0; -} - -static ErlDrvData async_drv_start(ErlDrvPort port_num, - char* name, SysDriverOpts* opts) -{ - if (async_fd[0] != -1) - return ERL_DRV_ERROR_GENERAL; - if (pipe(async_fd) < 0) - return ERL_DRV_ERROR_GENERAL; - - DEBUGF(("async_drv_start: %d\r\n", port_num)); - - SET_NONBLOCKING(async_fd[0]); - driver_select(port_num, async_fd[0], ERL_DRV_READ, 1); - - if (init_async(async_fd[1]) < 0) - return ERL_DRV_ERROR_GENERAL; - return (ErlDrvData)port_num; -} - -static void async_drv_stop(ErlDrvData e) -{ - int port_num = (int)(long)e; - - DEBUGF(("async_drv_stop: %d\r\n", port_num)); - - exit_async(); - - driver_select(port_num, async_fd[0], ERL_DRV_READ, 0); - - close(async_fd[0]); - close(async_fd[1]); - async_fd[0] = async_fd[1] = -1; -} - - -static void async_drv_input(ErlDrvData e, ErlDrvEvent fd) -{ - char *buf[32]; - DEBUGF(("async_drv_input\r\n")); - while (read((int) fd, (void *) buf, 32) > 0); /* fd MUST be async_fd[0] */ - check_async_ready(); /* invoke all async_ready */ -} -#endif void erts_do_break_handling(void) { @@ -2483,12 +2376,10 @@ erts_sys_putenv(char *buffer, int sep_ix) } int -erts_sys_getenv(char *key, char *value, size_t *size) +erts_sys_getenv__(char *key, char *value, size_t *size) { - char *orig_value; int res; - erts_smp_rwmtx_rlock(&environ_rwmtx); - orig_value = getenv(key); + char *orig_value = getenv(key); if (!orig_value) res = -1; else { @@ -2503,6 +2394,15 @@ erts_sys_getenv(char *key, char *value, size_t *size) res = 0; } } + return res; +} + +int +erts_sys_getenv(char *key, char *value, size_t *size) +{ + int res; + erts_smp_rwmtx_rlock(&environ_rwmtx); + res = erts_sys_getenv__(key, value, size); erts_smp_rwmtx_runlock(&environ_rwmtx); return res; } @@ -2514,31 +2414,6 @@ sys_init_io(void) erts_alloc(ERTS_ALC_T_FD_TAB, max_files * sizeof(struct fd_data)); erts_smp_atomic_add_nob(&sys_misc_mem_sz, max_files * sizeof(struct fd_data)); - -#ifdef USE_THREADS -#ifdef ERTS_SMP - if (init_async(-1) < 0) - erl_exit(1, "Failed to initialize async-threads\n"); -#else - { - /* This is speical stuff, starting a driver from the - * system routines, but is a nice way of handling stuff - * the erlang way - */ - SysDriverOpts dopts; - int ret; - - sys_memset((void*)&dopts, 0, sizeof(SysDriverOpts)); - add_driver_entry(&async_driver_entry); - ret = erts_open_driver(NULL, NIL, "async", &dopts, NULL); - DEBUGF(("open_driver = %d\n", ret)); - if (ret < 0) - erl_exit(1, "Failed to open async driver\n"); - erts_port[ret].status |= ERTS_PORT_SFLG_IMMORTAL; - } -#endif -#endif - } #if (0) /* unused? */ @@ -2765,15 +2640,7 @@ initiate_report_exit_status(ErtsSysReportExit *rep, int status) rep->next = report_exit_transit_list; rep->status = status; report_exit_transit_list = rep; - /* - * We need the scheduler thread to call check_children(). - * If the scheduler thread is sleeping in a poll with a - * timeout, we need to wake the scheduler thread. We use the - * functionality of the async driver to do this, instead of - * implementing yet another driver doing the same thing. A - * little bit ugly, but it works... - */ - sys_async_ready(async_fd[1]); + erts_sys_schedule_interrupt(1); } static int check_children(void) @@ -2860,19 +2727,11 @@ erl_sys_schedule(int runnable) { #ifdef ERTS_SMP ERTS_CHK_IO(!runnable); - ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); #else - if (runnable) { - ERTS_CHK_IO(0); /* Poll for I/O */ - check_async_ready(); /* Check async completions */ - } else { - int wait_for_io = !check_async_ready(); - if (wait_for_io) - wait_for_io = !check_children(); - ERTS_CHK_IO(wait_for_io); - } - (void) check_children(); + ERTS_CHK_IO(runnable ? 0 : !check_children()); #endif + ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); + (void) check_children(); } diff --git a/erts/emulator/sys/vxworks/sys.c b/erts/emulator/sys/vxworks/sys.c index 97a2ae7f7b..d6d1fe64e0 100644 --- a/erts/emulator/sys/vxworks/sys.c +++ b/erts/emulator/sys/vxworks/sys.c @@ -1520,6 +1520,12 @@ erts_sys_getenv(char *key, char *value, size_t *size) return res; } +int +erts_sys_getenv__(char *key, char *value, size_t *size) +{ + return erts_sys_getenv(key, value, size); +} + void sys_init_io(void) { diff --git a/erts/emulator/sys/win32/sys.c b/erts/emulator/sys/win32/sys.c index ace1e1fca0..02d16b83a2 100644 --- a/erts/emulator/sys/win32/sys.c +++ b/erts/emulator/sys/win32/sys.c @@ -566,51 +566,6 @@ struct erl_drv_entry vanilla_driver_entry = { stop_select }; -#if defined(USE_THREADS) && !defined(ERTS_SMP) - -static int async_drv_init(void); -static ErlDrvData async_drv_start(ErlDrvPort, char*, SysDriverOpts*); -static void async_drv_stop(ErlDrvData); -static void async_drv_input(ErlDrvData, ErlDrvEvent); - -/* INTERNAL use only */ - -void null_output(ErlDrvData drv_data, char* buf, int len) -{ -} - -void null_ready_output(ErlDrvData drv_data, ErlDrvEvent event) -{ -} - -struct erl_drv_entry async_driver_entry = { - async_drv_init, - async_drv_start, - async_drv_stop, - null_output, - async_drv_input, - null_ready_output, - "async", - NULL, /* finish */ - NULL, /* handle */ - NULL, /* control */ - NULL, /* timeout */ - NULL, /* outputv */ - NULL, /* ready_async */ - NULL, /* flush */ - NULL, /* call */ - NULL, /* event */ - ERL_DRV_EXTENDED_MARKER, - ERL_DRV_EXTENDED_MAJOR_VERSION, - ERL_DRV_EXTENDED_MINOR_VERSION, - 0, /* ERL_DRV_FLAGs */ - NULL, - NULL, /* process_exit */ - stop_select -}; - -#endif - /* * Initialises a DriverData structure. * @@ -2825,30 +2780,6 @@ sys_init_io(void) We estimate the number to twice the amount of ports. We really dont know on windows, do we? */ max_files = 2*erts_max_ports; - -#ifdef USE_THREADS -#ifdef ERTS_SMP - if (init_async(-1) < 0) - erl_exit(1, "Failed to initialize async-threads\n"); -#else - { - /* This is special stuff, starting a driver from the - * system routines, but is a nice way of handling stuff - * the erlang way - */ - SysDriverOpts dopts; - int ret; - - sys_memset((void*)&dopts, 0, sizeof(SysDriverOpts)); - add_driver_entry(&async_driver_entry); - ret = erts_open_driver(NULL, NIL, "async", &dopts, NULL); - DEBUGF(("open_driver = %d\n", ret)); - if (ret < 0) - erl_exit(1, "Failed to open async driver\n"); - erts_port[ret].status |= ERTS_PORT_SFLG_IMMORTAL; - } -#endif -#endif } #ifdef ERTS_SMP @@ -3382,75 +3313,7 @@ erts_sys_schedule_interrupt_timed(int set, long msec) void erl_sys_schedule(int runnable) { -#ifdef ERTS_SMP erts_check_io(!runnable); ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); -#else - if (runnable) { - erts_check_io(0); /* Poll for I/O */ - check_async_ready(); /* Check async completions */ - } else { - erts_check_io(check_async_ready() ? 0 : 1); - } -#endif -} - -#if defined(USE_THREADS) && !defined(ERTS_SMP) -/* - * Async operation support. - */ - -static ErlDrvEvent async_drv_event; - -void -sys_async_ready(int fd) -{ - SetEvent((HANDLE)async_drv_event); } -static int -async_drv_init(void) -{ - async_drv_event = (ErlDrvEvent) NULL; - return 0; -} - -static ErlDrvData -async_drv_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts) -{ - if (async_drv_event != (ErlDrvEvent) NULL) { - return ERL_DRV_ERROR_GENERAL; - } - if ((async_drv_event = (ErlDrvEvent)CreateAutoEvent(FALSE)) == (ErlDrvEvent) NULL) { - return ERL_DRV_ERROR_GENERAL; - } - - driver_select(port_num, async_drv_event, ERL_DRV_READ|ERL_DRV_USE, 1); - if (init_async(async_drv_event) < 0) { - return ERL_DRV_ERROR_GENERAL; - } - return (ErlDrvData)port_num; -} - -static void -async_drv_stop(ErlDrvData port_num) -{ - exit_async(); - driver_select((ErlDrvPort)port_num, async_drv_event, ERL_DRV_READ|ERL_DRV_USE, 0); - /*CloseHandle((HANDLE)async_drv_event);*/ - async_drv_event = (ErlDrvEvent) NULL; -} - - -static void -async_drv_input(ErlDrvData port_num, ErlDrvEvent e) -{ - check_async_ready(); - - /* - * Our event is auto-resetting. - */ -} - -#endif - diff --git a/erts/emulator/sys/win32/sys_env.c b/erts/emulator/sys/win32/sys_env.c index 02c8433a10..7acc7f07ee 100644 --- a/erts/emulator/sys/win32/sys_env.c +++ b/erts/emulator/sys/win32/sys_env.c @@ -55,19 +55,17 @@ erts_sys_putenv(char *key_value, int sep_ix) } int -erts_sys_getenv(char *key, char *value, size_t *size) +erts_sys_getenv__(char *key, char *value, size_t *size) { size_t req_size = 0; int res = 0; DWORD new_size; - erts_smp_rwmtx_rlock(&environ_rwmtx); SetLastError(0); new_size = GetEnvironmentVariable((LPCTSTR) key, (LPTSTR) value, (DWORD) *size); res = !new_size && GetLastError() == ERROR_ENVVAR_NOT_FOUND ? -1 : 0; - erts_smp_rwmtx_runlock(&environ_rwmtx); if (res < 0) return res; res = new_size > *size ? 1 : 0; @@ -75,6 +73,16 @@ erts_sys_getenv(char *key, char *value, size_t *size) return res; } +int +erts_sys_getenv(char *key, char *value, size_t *size) +{ + int res; + erts_smp_rwmtx_rlock(&environ_rwmtx); + res = erts_sys_getenv__(key, value, size); + erts_smp_rwmtx_runlock(&environ_rwmtx); + return res; +} + struct win32_getenv_state { char *env; char *next; diff --git a/erts/emulator/test/driver_SUITE.erl b/erts/emulator/test/driver_SUITE.erl index bcb0257ed1..c07dbc5871 100644 --- a/erts/emulator/test/driver_SUITE.erl +++ b/erts/emulator/test/driver_SUITE.erl @@ -76,7 +76,8 @@ driver_select_use/1, thread_mseg_alloc_cache_clean/1, otp_9302/1, - thr_free_drv/1]). + thr_free_drv/1, + async_blast/1]). -export([bin_prefix/2]). @@ -145,7 +146,8 @@ all() -> smp_select, driver_select_use, thread_mseg_alloc_cache_clean, otp_9302, - thr_free_drv]. + thr_free_drv, + async_blast]. groups() -> [{timer, [], @@ -1911,17 +1913,30 @@ otp_9302(Config) when is_list(Config) -> ?line port_command(Port, ""), ?line {msg, block} = get_port_msg(Port, infinity), ?line {msg, job} = get_port_msg(Port, infinity), - ?line case erlang:system_info(thread_pool_size) of - 0 -> - {msg, cancel} = get_port_msg(Port, infinity); - _ -> - ok - end, - ?line {msg, job} = get_port_msg(Port, infinity), + ?line C = case erlang:system_info(thread_pool_size) of + 0 -> + ?line {msg, cancel} = get_port_msg(Port, infinity), + ?line {msg, job} = get_port_msg(Port, infinity), + ?line false; + _ -> + case get_port_msg(Port, infinity) of + {msg, cancel} -> %% Cancel always fail in Rel >= 15 + ?line {msg, job} = get_port_msg(Port, infinity), + ?line false; + {msg, job} -> + ?line ok, + ?line true + end + end, ?line {msg, end_of_jobs} = get_port_msg(Port, infinity), ?line no_msg = get_port_msg(Port, 2000), ?line port_close(Port), - ?line ok. + ?line case C of + true -> + ?line {comment, "Async job cancelled"}; + false -> + ?line {comment, "Async job not cancelled"} + end. thr_free_drv(Config) when is_list(Config) -> ?line Path = ?config(data_dir, Config), @@ -1954,6 +1969,48 @@ thr_free_drv_control(Port, N) -> % io:format("N=~p, SID=~p", [N, erlang:system_info(scheduler_id)]), thr_free_drv_control(Port, N+1) end. + +async_blast(Config) when is_list(Config) -> + ?line Path = ?config(data_dir, Config), + ?line erl_ddll:start(), + ?line ok = load_driver(Path, async_blast_drv), + ?line SchedOnln = erlang:system_info(schedulers_online), + ?line MemBefore = driver_alloc_size(), + ?line Start = os:timestamp(), + ?line Blast = fun () -> + Port = open_port({spawn, async_blast_drv}, []), + true = is_port(Port), + port_command(Port, ""), + receive + {Port, done} -> + ok + end, + port_close(Port) + end, + ?line Ps = lists:map(fun (N) -> + spawn_opt(Blast, + [{scheduler, + (N rem SchedOnln)+ 1}, + monitor]) + end, + lists:seq(1, 100)), + ?line MemMid = driver_alloc_size(), + ?line lists:foreach(fun ({Pid, Mon}) -> + receive + {'DOWN',Mon,process,Pid,_} -> ok + end + end, Ps), + ?line End = os:timestamp(), + ?line MemAfter = driver_alloc_size(), + ?line io:format("MemBefore=~p, MemMid=~p, MemAfter=~p~n", + [MemBefore, MemMid, MemAfter]), + ?line AsyncBlastTime = timer:now_diff(End,Start)/1000000, + ?line io:format("AsyncBlastTime=~p~n", [AsyncBlastTime]), + ?line MemBefore = MemAfter, + ?line erlang:display({async_blast_time, AsyncBlastTime}), + ?line ok. + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Utilities diff --git a/erts/emulator/test/driver_SUITE_data/Makefile.src b/erts/emulator/test/driver_SUITE_data/Makefile.src index 62ab5169c0..dd48f6a0f7 100644 --- a/erts/emulator/test/driver_SUITE_data/Makefile.src +++ b/erts/emulator/test/driver_SUITE_data/Makefile.src @@ -13,7 +13,8 @@ MISC_DRVS = outputv_drv@dll@ \ missing_callback_drv@dll@ \ thr_alloc_drv@dll@ \ otp_9302_drv@dll@ \ - thr_free_drv@dll@ + thr_free_drv@dll@ \ + async_blast_drv@dll@ SYS_INFO_DRVS = sys_info_1_0_drv@dll@ \ sys_info_1_1_drv@dll@ \ diff --git a/erts/emulator/test/driver_SUITE_data/async_blast_drv.c b/erts/emulator/test/driver_SUITE_data/async_blast_drv.c new file mode 100644 index 0000000000..3821f7e3dc --- /dev/null +++ b/erts/emulator/test/driver_SUITE_data/async_blast_drv.c @@ -0,0 +1,124 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2011. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +#include "erl_driver.h" + +#define NO_ASYNC_JOBS 10000 + +static void stop(ErlDrvData drv_data); +static ErlDrvData start(ErlDrvPort port, + char *command); +static void output(ErlDrvData drv_data, + char *buf, int len); +static void ready_async(ErlDrvData drv_data, + ErlDrvThreadData thread_data); + +static ErlDrvEntry async_blast_drv_entry = { + NULL /* init */, + start, + stop, + output, + NULL /* ready_input */, + NULL /* ready_output */, + "async_blast_drv", + NULL /* finish */, + NULL /* handle */, + NULL /* control */, + NULL /* timeout */, + NULL /* outputv */, + ready_async, + NULL /* flush */, + NULL /* call */, + NULL /* event */, + ERL_DRV_EXTENDED_MARKER, + ERL_DRV_EXTENDED_MAJOR_VERSION, + ERL_DRV_EXTENDED_MINOR_VERSION, + ERL_DRV_FLAG_USE_PORT_LOCKING, + NULL /* handle2 */, + NULL /* handle_monitor */ +}; + +typedef struct { + ErlDrvPort port; + ErlDrvTermData caller; + int counter; +} async_blast_data_t; + + +DRIVER_INIT(async_blast_drv) +{ + return &async_blast_drv_entry; +} + +static void stop(ErlDrvData drv_data) +{ + driver_free((void *) drv_data); +} + +static ErlDrvData start(ErlDrvPort port, + char *command) +{ + async_blast_data_t *abd; + + abd = driver_alloc(sizeof(async_blast_data_t)); + if (!abd) + return ERL_DRV_ERROR_GENERAL; + + abd->port = port; + abd->counter = 0; + return (ErlDrvData) abd; +} + +static void async_invoke(void *data) +{ + +} +#include <stdio.h> + +static void ready_async(ErlDrvData drv_data, + ErlDrvThreadData thread_data) +{ + async_blast_data_t *abd = (async_blast_data_t *) drv_data; + if (--abd->counter == 0) { + ErlDrvTermData spec[] = { + ERL_DRV_PORT, driver_mk_port(abd->port), + ERL_DRV_ATOM, driver_mk_atom("done"), + ERL_DRV_TUPLE, 2 + }; + driver_send_term(abd->port, abd->caller, + spec, sizeof(spec)/sizeof(spec[0])); + } +} + +static void output(ErlDrvData drv_data, + char *buf, int len) +{ + async_blast_data_t *abd = (async_blast_data_t *) drv_data; + if (abd->counter == 0) { + int i; + abd->caller = driver_caller(abd->port); + abd->counter = NO_ASYNC_JOBS; + for (i = 0; i < NO_ASYNC_JOBS; i++) { + if (0 > driver_async(abd->port, NULL, async_invoke, NULL, NULL)) { + driver_failure_atom(abd->port, "driver_async_failed"); + break; + } + } + } +} |