/* * %CopyrightBegin% * * Copyright Ericsson AB 2000-2013. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved online at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * %CopyrightEnd% */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "sys.h" #include "erl_sys_driver.h" #include "global.h" #include "erl_threads.h" #include "erl_thr_queue.h" #include "erl_async.h" #include "dtrace-wrapper.h" #define ERTS_MAX_ASYNC_READY_CALLS_IN_SEQ 20 #define ERTS_ASYNC_PRINT_JOB 0 #if !defined(ERTS_SMP) && defined(USE_THREADS) && !ERTS_USE_ASYNC_READY_Q # error "Need async ready queue in non-smp case" #endif typedef struct _erl_async { DE_Handle* hndl; /* The DE_Handle is needed when port is gone */ Eterm port; long async_id; void* async_data; ErlDrvPDL pdl; void (*async_invoke)(void*); void (*async_free)(void*); #if ERTS_USE_ASYNC_READY_Q Uint sched_id; union { ErtsThrQPrepEnQ_t *prep_enq; ErtsThrQFinDeQ_t fin_deq; } q; #endif } ErtsAsync; #if ERTS_USE_ASYNC_READY_Q /* * We can do without the enqueue mutex since it isn't needed for * thread safety. Its only purpose is to put async threads to sleep * during a blast of ready async jobs. This in order to reduce * contention on the enqueue end of the async ready queues. During * such a blast without the enqueue mutex much cpu time is consumed * by the async threads without them doing much progress which in turn * slow down progress of scheduler threads. */ #define ERTS_USE_ASYNC_READY_ENQ_MTX 1 #if ERTS_USE_ASYNC_READY_ENQ_MTX typedef struct { erts_mtx_t enq_mtx; } ErtsAsyncReadyQXData; #endif typedef struct { #if ERTS_USE_ASYNC_READY_ENQ_MTX union { ErtsAsyncReadyQXData data; char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE( sizeof(ErtsAsyncReadyQXData))]; } x; #endif ErtsThrQ_t thr_q; ErtsThrQFinDeQ_t fin_deq; } ErtsAsyncReadyQ; typedef union { ErtsAsyncReadyQ arq; char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncReadyQ))]; } ErtsAlgndAsyncReadyQ; #endif /* ERTS_USE_ASYNC_READY_Q */ typedef struct { ErtsThrQ_t thr_q; erts_tid_t thr_id; } ErtsAsyncQ; typedef union { ErtsAsyncQ aq; char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncQ))]; } ErtsAlgndAsyncQ; typedef struct { int no_initialized; erts_mtx_t mtx; erts_cnd_t cnd; erts_atomic_t id; } ErtsAsyncInit; typedef struct { union { ErtsAsyncInit data; char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncInit))]; } init; ErtsAlgndAsyncQ *queue; #if ERTS_USE_ASYNC_READY_Q ErtsAlgndAsyncReadyQ *ready_queue; #endif } ErtsAsyncData; #if defined(USE_THREADS) && defined(USE_VM_PROBES) /* * Some compilers, e.g. GCC 4.2.1 and -O3, will optimize away DTrace * calls if they're the last thing in the function. :-( * Many thanks to Trond Norbye, via: * https://github.com/memcached/memcached/commit/6298b3978687530bc9d219b6ac707a1b681b2a46 */ static unsigned gcc_optimizer_hack = 0; #endif int erts_async_max_threads; /* Initialized by erl_init.c */ int erts_async_thread_suggested_stack_size; /* Initialized by erl_init.c */ static ErtsAsyncData *async; #ifndef USE_THREADS void erts_init_async(void) { } #else static void *async_main(void *); static ERTS_INLINE ErtsAsyncQ * async_q(int i) { return &async->queue[i].aq; } #if ERTS_USE_ASYNC_READY_Q static ERTS_INLINE ErtsAsyncReadyQ * async_ready_q(Uint sched_id) { return &async->ready_queue[((int)sched_id)-1].arq; } #endif void erts_init_async(void) { async = NULL; if (erts_async_max_threads > 0) { #if ERTS_USE_ASYNC_READY_Q ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT; #endif erts_thr_opts_t thr_opts = ERTS_THR_OPTS_DEFAULT_INITER; char *ptr; size_t tot_size = 0; int i; tot_size += ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncData)); tot_size += sizeof(ErtsAlgndAsyncQ)*erts_async_max_threads; #if ERTS_USE_ASYNC_READY_Q tot_size += sizeof(ErtsAlgndAsyncReadyQ)*erts_no_schedulers; #endif ptr = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_ASYNC_DATA, tot_size); async = (ErtsAsyncData *) ptr; ptr += ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncData)); async->init.data.no_initialized = 0; erts_mtx_init(&async->init.data.mtx, "async_init_mtx"); erts_cnd_init(&async->init.data.cnd); erts_atomic_init_nob(&async->init.data.id, 0); async->queue = (ErtsAlgndAsyncQ *) ptr; ptr += sizeof(ErtsAlgndAsyncQ)*erts_async_max_threads; #if ERTS_USE_ASYNC_READY_Q qinit.live.queue = ERTS_THR_Q_LIVE_LONG; qinit.live.objects = ERTS_THR_Q_LIVE_SHORT; qinit.notify = erts_notify_check_async_ready_queue; async->ready_queue = (ErtsAlgndAsyncReadyQ *) ptr; ptr += sizeof(ErtsAlgndAsyncReadyQ)*erts_no_schedulers; for (i = 1; i <= erts_no_schedulers; i++) { ErtsAsyncReadyQ *arq = async_ready_q(i); #if ERTS_USE_ASYNC_READY_ENQ_MTX erts_mtx_init(&arq->x.data.enq_mtx, "async_enq_mtx"); #endif erts_thr_q_finalize_dequeue_state_init(&arq->fin_deq); qinit.arg = (void *) (SWord) i; erts_thr_q_initialize(&arq->thr_q, &qinit); } #endif /* Create async threads... */ thr_opts.detached = 0; thr_opts.suggested_stack_size = erts_async_thread_suggested_stack_size; for (i = 0; i < erts_async_max_threads; i++) { ErtsAsyncQ *aq = async_q(i); erts_thr_create(&aq->thr_id, async_main, (void*) aq, &thr_opts); } /* Wait for async threads to initialize... */ erts_mtx_lock(&async->init.data.mtx); while (async->init.data.no_initialized != erts_async_max_threads) erts_cnd_wait(&async->init.data.cnd, &async->init.data.mtx); erts_mtx_unlock(&async->init.data.mtx); erts_mtx_destroy(&async->init.data.mtx); erts_cnd_destroy(&async->init.data.cnd); } } #if ERTS_USE_ASYNC_READY_Q void * erts_get_async_ready_queue(Uint sched_id) { return (void *) async ? async_ready_q(sched_id) : NULL; } #endif static ERTS_INLINE void async_add(ErtsAsync *a, ErtsAsyncQ* q) { #ifdef USE_VM_PROBES int len; #endif if (is_internal_port(a->port)) { #if ERTS_USE_ASYNC_READY_Q ErtsAsyncReadyQ *arq = async_ready_q(a->sched_id); a->q.prep_enq = erts_thr_q_prepare_enqueue(&arq->thr_q); #endif /* make sure the driver will stay around */ if (a->hndl) erts_ddll_reference_referenced_driver(a->hndl); } #if ERTS_ASYNC_PRINT_JOB erts_fprintf(stderr, "-> %ld\n", a->async_id); #endif erts_thr_q_enqueue(&q->thr_q, a); #ifdef USE_VM_PROBES if (DTRACE_ENABLED(aio_pool_add)) { DTRACE_CHARBUF(port_str, 16); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", a->port); /* DTRACE TODO: Get the queue length from erts_thr_q_enqueue() ? */ len = -1; DTRACE2(aio_pool_add, port_str, len); } gcc_optimizer_hack++; #endif } static ERTS_INLINE ErtsAsync *async_get(ErtsThrQ_t *q, erts_tse_t *tse, ErtsThrQPrepEnQ_t **prep_enq) { #if ERTS_USE_ASYNC_READY_Q int saved_fin_deq = 0; ErtsThrQFinDeQ_t fin_deq; #endif #ifdef USE_VM_PROBES int len; #endif while (1) { ErtsAsync *a = (ErtsAsync *) erts_thr_q_dequeue(q); if (a) { #if ERTS_USE_ASYNC_READY_Q *prep_enq = a->q.prep_enq; erts_thr_q_get_finalize_dequeue_data(q, &a->q.fin_deq); if (saved_fin_deq) erts_thr_q_append_finalize_dequeue_data(&a->q.fin_deq, &fin_deq); #endif #ifdef USE_VM_PROBES if (DTRACE_ENABLED(aio_pool_get)) { DTRACE_CHARBUF(port_str, 16); erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", a->port); /* DTRACE TODO: Get the length from erts_thr_q_dequeue() ? */ len = -1; DTRACE2(aio_pool_get, port_str, len); } #endif return a; } if (ERTS_THR_Q_DIRTY != erts_thr_q_clean(q)) { ErtsThrQFinDeQ_t tmp_fin_deq; erts_tse_reset(tse); #if ERTS_USE_ASYNC_READY_Q chk_fin_deq: if (erts_thr_q_get_finalize_dequeue_data(q, &tmp_fin_deq)) { if (!saved_fin_deq) { erts_thr_q_finalize_dequeue_state_init(&fin_deq); saved_fin_deq = 1; } erts_thr_q_append_finalize_dequeue_data(&fin_deq, &tmp_fin_deq); } #endif switch (erts_thr_q_inspect(q, 1)) { case ERTS_THR_Q_DIRTY: break; case ERTS_THR_Q_NEED_THR_PRGR: #ifdef ERTS_SMP { ErtsThrPrgrVal prgr = erts_thr_q_need_thr_progress(q); erts_thr_progress_wakeup(NULL, prgr); /* * We do no dequeue finalizing in hope that a new async * job will arrive before we are woken due to thread * progress... */ erts_tse_wait(tse); break; } #endif case ERTS_THR_Q_CLEAN: #if ERTS_USE_ASYNC_READY_Q if (saved_fin_deq) { if (erts_thr_q_finalize_dequeue(&fin_deq)) goto chk_fin_deq; else saved_fin_deq = 0; } #endif erts_tse_wait(tse); break; default: ASSERT(0); break; } } } } static ERTS_INLINE void call_async_ready(ErtsAsync *a) { #if ERTS_USE_ASYNC_READY_Q Port *p = erts_id2port_sflgs(a->port, NULL, 0, ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); #else Port *p = erts_thr_id2port_sflgs(a->port, ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); #endif if (!p) { if (a->async_free) a->async_free(a->async_data); } else { if (async_ready(p, a->async_data)) { if (a->async_free) a->async_free(a->async_data); } #if ERTS_USE_ASYNC_READY_Q erts_port_release(p); #else erts_thr_port_release(p); #endif } if (a->pdl) driver_pdl_dec_refc(a->pdl); if (a->hndl) erts_ddll_dereference_driver(a->hndl); } static ERTS_INLINE void async_reply(ErtsAsync *a, ErtsThrQPrepEnQ_t *prep_enq) { #if ERTS_USE_ASYNC_READY_Q ErtsAsyncReadyQ *arq; #if ERTS_ASYNC_PRINT_JOB erts_fprintf(stderr, "=>> %ld\n", a->async_id); #endif arq = async_ready_q(a->sched_id); #if ERTS_USE_ASYNC_READY_ENQ_MTX erts_mtx_lock(&arq->x.data.enq_mtx); #endif erts_thr_q_enqueue_prepared(&arq->thr_q, (void *) a, prep_enq); #if ERTS_USE_ASYNC_READY_ENQ_MTX erts_mtx_unlock(&arq->x.data.enq_mtx); #endif #else /* ERTS_USE_ASYNC_READY_Q */ call_async_ready(a); erts_free(ERTS_ALC_T_ASYNC, (void *) a); #endif /* ERTS_USE_ASYNC_READY_Q */ } static void async_wakeup(void *vtse) { erts_tse_set((erts_tse_t *) vtse); } static erts_tse_t *async_thread_init(ErtsAsyncQ *aq) { ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT; erts_tse_t *tse = erts_tse_fetch(); #ifdef ERTS_SMP ErtsThrPrgrCallbacks callbacks; callbacks.arg = (void *) tse; callbacks.wakeup = async_wakeup; callbacks.prepare_wait = NULL; callbacks.wait = NULL; erts_thr_progress_register_unmanaged_thread(&callbacks); #endif qinit.live.queue = ERTS_THR_Q_LIVE_LONG; qinit.live.objects = ERTS_THR_Q_LIVE_SHORT; qinit.arg = (void *) tse; qinit.notify = async_wakeup; #if ERTS_USE_ASYNC_READY_Q qinit.auto_finalize_dequeue = 0; #endif erts_thr_q_initialize(&aq->thr_q, &qinit); /* Inform main thread that we are done initializing... */ erts_mtx_lock(&async->init.data.mtx); async->init.data.no_initialized++; erts_cnd_signal(&async->init.data.cnd); erts_mtx_unlock(&async->init.data.mtx); return tse; } static void *async_main(void* arg) { ErtsAsyncQ *aq = (ErtsAsyncQ *) arg; erts_tse_t *tse = async_thread_init(aq); while (1) { ErtsThrQPrepEnQ_t *prep_enq; ErtsAsync *a = async_get(&aq->thr_q, tse, &prep_enq); if (is_nil(a->port)) break; /* Time to die */ #if ERTS_ASYNC_PRINT_JOB erts_fprintf(stderr, "<- %ld\n", a->async_id); #endif a->async_invoke(a->async_data); async_reply(a, prep_enq); } return NULL; } #endif /* USE_THREADS */ void erts_exit_flush_async(void) { #ifdef USE_THREADS int i; ErtsAsync a; a.port = NIL; /* * Terminate threads in order to flush queues. We do not * bother to clean everything up since we are about to * terminate the runtime system and a cleanup would only * delay the termination. */ for (i = 0; i < erts_async_max_threads; i++) async_add(&a, async_q(i)); for (i = 0; i < erts_async_max_threads; i++) erts_thr_join(async->queue[i].aq.thr_id, NULL); #endif } #if defined(USE_THREADS) && ERTS_USE_ASYNC_READY_Q int erts_check_async_ready(void *varq) { ErtsAsyncReadyQ *arq = (ErtsAsyncReadyQ *) varq; int res = 1; int i; for (i = 0; i < ERTS_MAX_ASYNC_READY_CALLS_IN_SEQ; i++) { ErtsAsync *a = (ErtsAsync *) erts_thr_q_dequeue(&arq->thr_q); if (!a) { res = 0; break; } #if ERTS_ASYNC_PRINT_JOB erts_fprintf(stderr, "<<= %ld\n", a->async_id); #endif erts_thr_q_append_finalize_dequeue_data(&arq->fin_deq, &a->q.fin_deq); call_async_ready(a); erts_free(ERTS_ALC_T_ASYNC, (void *) a); } erts_thr_q_finalize_dequeue(&arq->fin_deq); return res; } int erts_async_ready_clean(void *varq, void *val) { ErtsAsyncReadyQ *arq = (ErtsAsyncReadyQ *) varq; ErtsThrQCleanState_t cstate; cstate = erts_thr_q_clean(&arq->thr_q); if (erts_thr_q_finalize_dequeue(&arq->fin_deq)) return ERTS_ASYNC_READY_DIRTY; switch (cstate) { case ERTS_THR_Q_DIRTY: return ERTS_ASYNC_READY_DIRTY; case ERTS_THR_Q_NEED_THR_PRGR: #ifdef ERTS_SMP *((ErtsThrPrgrVal *) val) = erts_thr_q_need_thr_progress(&arq->thr_q); return ERTS_ASYNC_READY_NEED_THR_PRGR; #endif case ERTS_THR_Q_CLEAN: break; } return ERTS_ASYNC_READY_CLEAN; } #endif /* ** Generate a fair async key prom an ErlDrvPort ** The port data gives a fair distribution grom port pointer ** to unsigned integer - to be used in key for driver_async below. */ unsigned int driver_async_port_key(ErlDrvPort port) { ErlDrvTermData td = driver_mk_port(port); if (td == (ErlDrvTermData) NIL) { return 0; } return (unsigned int) (UWord) internal_port_data(td); } /* ** Schedule async_invoke on a worker thread ** NOTE will be syncrounous when threads are unsupported ** return values: ** 0 completed ** -1 error ** N handle value (used with async_cancel) ** arguments: ** ix driver index ** key pointer to secedule queue (NULL means round robin) ** async_invoke function to run in thread ** async_data data to pass to invoke function ** async_free function for relase async_data in case of failure */ long driver_async(ErlDrvPort ix, unsigned int* key, void (*async_invoke)(void*), void* async_data, void (*async_free)(void*)) { ErtsAsync* a; Port* prt; long id; unsigned int qix; #if ERTS_USE_ASYNC_READY_Q Uint sched_id; sched_id = erts_get_scheduler_id(); if (!sched_id) sched_id = 1; #endif prt = erts_drvport2port(ix); if (prt == ERTS_INVALID_ERL_DRV_PORT) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); a = (ErtsAsync*) erts_alloc(ERTS_ALC_T_ASYNC, sizeof(ErtsAsync)); #if ERTS_USE_ASYNC_READY_Q a->sched_id = sched_id; #endif a->hndl = (DE_Handle*)prt->drv_ptr->handle; a->port = prt->common.id; a->pdl = NULL; a->async_data = async_data; a->async_invoke = async_invoke; a->async_free = async_free; if (!async) id = 0; else { do { id = erts_atomic_inc_read_nob(&async->init.data.id); } while (id == 0); if (id < 0) id *= -1; ASSERT(id > 0); } a->async_id = id; if (key == NULL) { qix = (erts_async_max_threads > 0) ? (id % erts_async_max_threads) : 0; } else { qix = (erts_async_max_threads > 0) ? (*key % erts_async_max_threads) : 0; *key = qix; } #ifdef USE_THREADS if (erts_async_max_threads > 0) { if (prt->port_data_lock) { driver_pdl_inc_refc(prt->port_data_lock); a->pdl = prt->port_data_lock; } async_add(a, async_q(qix)); return id; } #endif (*a->async_invoke)(a->async_data); if (async_ready(prt, a->async_data)) { if (a->async_free != NULL) (*a->async_free)(a->async_data); } erts_free(ERTS_ALC_T_ASYNC, (void *) a); return id; } int driver_async_cancel(unsigned int id) { /* * Not supported anymore. Always fail (which is backward * compatible). * * This functionality could be implemented again. However, * it is (and always has been) completely useless since * it doesn't give you any guarantees whatsoever. The user * needs to (and always have had to) synchronize in his/her * own code in order to get any guarantees. */ return 0; }