/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 2000-2010. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* %CopyrightEnd%
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "sys.h"
#include "erl_sys_driver.h"
#include "global.h"
#include "erl_threads.h"
typedef struct _erl_async {
struct _erl_async* next;
struct _erl_async* prev;
DE_Handle* hndl; /* The DE_Handle is needed when port is gone */
Eterm port;
long async_id;
void* async_data;
ErlDrvPDL pdl;
void (*async_invoke)(void*);
void (*async_free)(void*);
} ErlAsync;
typedef struct {
erts_mtx_t mtx;
erts_cnd_t cv;
erts_tid_t thr;
int len;
#ifndef ERTS_SMP
int hndl;
#endif
ErlAsync* head;
ErlAsync* tail;
#ifdef ERTS_ENABLE_LOCK_CHECK
int no;
#endif
} AsyncQueue;
static erts_smp_spinlock_t async_id_lock;
static long async_id = 0;
#ifndef ERTS_SMP
erts_mtx_t async_ready_mtx;
static ErlAsync* async_ready_list = NULL;
#endif
/*
** Initialize worker threads (if supported)
*/
/* Detach from driver */
static void async_detach(DE_Handle* dh)
{
return;
}
#ifdef USE_THREADS
static AsyncQueue* async_q;
static void* async_main(void*);
static void async_add(ErlAsync*, AsyncQueue*);
#ifndef ERTS_SMP
typedef struct ErtsAsyncReadyCallback_ ErtsAsyncReadyCallback;
struct ErtsAsyncReadyCallback_ {
struct ErtsAsyncReadyCallback_ *next;
void (*callback)(void);
};
static ErtsAsyncReadyCallback *callbacks;
static int async_handle;
int erts_register_async_ready_callback(void (*funcp)(void))
{
ErtsAsyncReadyCallback *cb = erts_alloc(ERTS_ALC_T_ARCALLBACK,
sizeof(ErtsAsyncReadyCallback));
cb->next = callbacks;
cb->callback = funcp;
erts_mtx_lock(&async_ready_mtx);
callbacks = cb;
erts_mtx_unlock(&async_ready_mtx);
return async_handle;
}
#endif
int init_async(int hndl)
{
erts_thr_opts_t thr_opts = ERTS_THR_OPTS_DEFAULT_INITER;
AsyncQueue* q;
int i;
thr_opts.detached = 0;
thr_opts.suggested_stack_size = erts_async_thread_suggested_stack_size;
#ifndef ERTS_SMP
callbacks = NULL;
async_handle = hndl;
erts_mtx_init(&async_ready_mtx, "async_ready");
async_ready_list = NULL;
#endif
async_id = 0;
erts_smp_spinlock_init(&async_id_lock, "async_id");
async_q = q = (AsyncQueue*)
(erts_async_max_threads
? erts_alloc(ERTS_ALC_T_ASYNC_Q,
erts_async_max_threads * sizeof(AsyncQueue))
: NULL);
for (i = 0; i < erts_async_max_threads; i++) {
q->head = NULL;
q->tail = NULL;
q->len = 0;
#ifndef ERTS_SMP
q->hndl = hndl;
#endif
#ifdef ERTS_ENABLE_LOCK_CHECK
q->no = i;
#endif
erts_mtx_init(&q->mtx, "asyncq");
erts_cnd_init(&q->cv);
erts_thr_create(&q->thr, async_main, (void*)q, &thr_opts);
q++;
}
return 0;
}
int exit_async()
{
int i;
/* terminate threads */
for (i = 0; i < erts_async_max_threads; i++) {
ErlAsync* a = (ErlAsync*) erts_alloc(ERTS_ALC_T_ASYNC,
sizeof(ErlAsync));
a->port = NIL;
async_add(a, &async_q[i]);
}
for (i = 0; i < erts_async_max_threads; i++) {
erts_thr_join(async_q[i].thr, NULL);
erts_mtx_destroy(&async_q[i].mtx);
erts_cnd_destroy(&async_q[i].cv);
}
#ifndef ERTS_SMP
erts_mtx_destroy(&async_ready_mtx);
#endif
if (async_q)
erts_free(ERTS_ALC_T_ASYNC_Q, (void *) async_q);
return 0;
}
static void async_add(ErlAsync* a, AsyncQueue* q)
{
if (is_internal_port(a->port)) {
ERTS_LC_ASSERT(erts_drvportid2port(a->port));
/* make sure the driver will stay around */
driver_lock_driver(internal_port_index(a->port));
}
erts_mtx_lock(&q->mtx);
if (q->len == 0) {
q->head = a;
q->tail = a;
q->len = 1;
erts_cnd_signal(&q->cv);
}
else { /* no need to signal (since the worker is working) */
a->next = q->head;
q->head->prev = a;
q->head = a;
q->len++;
}
erts_mtx_unlock(&q->mtx);
}
static ErlAsync* async_get(AsyncQueue* q)
{
ErlAsync* a;
erts_mtx_lock(&q->mtx);
while((a = q->tail) == NULL) {
erts_cnd_wait(&q->cv, &q->mtx);
}
#ifdef ERTS_SMP
ASSERT(a && q->tail == a);
#endif
if (q->head == q->tail) {
q->head = q->tail = NULL;
q->len = 0;
}
else {
q->tail->prev->next = NULL;
q->tail = q->tail->prev;
q->len--;
}
erts_mtx_unlock(&q->mtx);
return a;
}
static int async_del(long id)
{
int i;
/* scan all queue for an entry with async_id == 'id' */
for (i = 0; i < erts_async_max_threads; i++) {
ErlAsync* a;
erts_mtx_lock(&async_q[i].mtx);
a = async_q[i].head;
while(a != NULL) {
if (a->async_id == id) {
if (a->prev != NULL)
a->prev->next = a->next;
else
async_q[i].head = a->next;
if (a->next != NULL)
a->next->prev = a->prev;
else
async_q[i].tail = a->prev;
async_q[i].len--;
erts_mtx_unlock(&async_q[i].mtx);
if (a->async_free != NULL)
a->async_free(a->async_data);
async_detach(a->hndl);
erts_free(ERTS_ALC_T_ASYNC, a);
return 1;
}
a = a->next;
}
erts_mtx_unlock(&async_q[i].mtx);
}
return 0;
}
static void* async_main(void* arg)
{
AsyncQueue* q = (AsyncQueue*) arg;
#ifdef ERTS_ENABLE_LOCK_CHECK
{
char buf[27];
erts_snprintf(&buf[0], 27, "async %d", q->no);
erts_lc_set_thread_name(&buf[0]);
}
#endif
while(1) {
ErlAsync* a = async_get(q);
if (a->port == NIL) { /* TIME TO DIE SIGNAL */
erts_free(ERTS_ALC_T_ASYNC, (void *) a);
break;
}
else {
(*a->async_invoke)(a->async_data);
/* Major problem if the code for async_invoke
or async_free is removed during a blocking operation */
#ifdef ERTS_SMP
{
Port *p;
p = erts_id2port_sflgs(a->port,
NULL,
0,
ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
if (!p) {
if (a->async_free)
(*a->async_free)(a->async_data);
}
else {
if (async_ready(p, a->async_data)) {
if (a->async_free)
(*a->async_free)(a->async_data);
}
async_detach(a->hndl);
erts_port_release(p);
}
if (a->pdl) {
driver_pdl_dec_refc(a->pdl);
}
erts_free(ERTS_ALC_T_ASYNC, (void *) a);
}
#else
if (a->pdl) {
driver_pdl_dec_refc(a->pdl);
}
erts_mtx_lock(&async_ready_mtx);
a->next = async_ready_list;
async_ready_list = a;
erts_mtx_unlock(&async_ready_mtx);
sys_async_ready(q->hndl);
#endif
}
}
return NULL;
}
#endif
#ifndef ERTS_SMP
int check_async_ready(void)
{
#ifdef USE_THREADS
ErtsAsyncReadyCallback *cbs;
#endif
ErlAsync* a;
int count = 0;
erts_mtx_lock(&async_ready_mtx);
a = async_ready_list;
async_ready_list = NULL;
#ifdef USE_THREADS
cbs = callbacks;
#endif
erts_mtx_unlock(&async_ready_mtx);
while(a != NULL) {
ErlAsync* a_next = a->next;
/* Every port not dead */
Port *p = erts_id2port_sflgs(a->port,
NULL,
0,
ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
if (!p) {
if (a->async_free)
(*a->async_free)(a->async_data);
}
else {
count++;
if (async_ready(p, a->async_data)) {
if (a->async_free != NULL)
(*a->async_free)(a->async_data);
}
async_detach(a->hndl);
erts_port_release(p);
}
erts_free(ERTS_ALC_T_ASYNC, (void *) a);
a = a_next;
}
#ifdef USE_THREADS
for (; cbs; cbs = cbs->next)
(*cbs->callback)();
#endif
return count;
}
#endif
/*
** Schedule async_invoke on a worker thread
** NOTE will be syncrounous when threads are unsupported
** return values:
** 0 completed
** -1 error
** N handle value (used with async_cancel)
** arguments:
** ix driver index
** key pointer to secedule queue (NULL means round robin)
** async_invoke function to run in thread
** async_data data to pass to invoke function
** async_free function for relase async_data in case of failure
*/
long driver_async(ErlDrvPort ix, unsigned int* key,
void (*async_invoke)(void*), void* async_data,
void (*async_free)(void*))
{
ErlAsync* a = (ErlAsync*) erts_alloc(ERTS_ALC_T_ASYNC, sizeof(ErlAsync));
Port* prt = erts_drvport2port(ix);
long id;
unsigned int qix;
if (!prt)
return -1;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
a->next = NULL;
a->prev = NULL;
a->hndl = (DE_Handle*)prt->drv_ptr->handle;
a->port = prt->id;
a->pdl = NULL;
a->async_data = async_data;
a->async_invoke = async_invoke;
a->async_free = async_free;
erts_smp_spin_lock(&async_id_lock);
async_id = (async_id + 1) & 0x7fffffff;
if (async_id == 0)
async_id++;
id = async_id;
erts_smp_spin_unlock(&async_id_lock);
a->async_id = id;
if (key == NULL) {
qix = (erts_async_max_threads > 0)
? (id % erts_async_max_threads) : 0;
}
else {
qix = (erts_async_max_threads > 0) ?
(*key % erts_async_max_threads) : 0;
*key = qix;
}
#ifdef USE_THREADS
if (erts_async_max_threads > 0) {
if (prt->port_data_lock) {
driver_pdl_inc_refc(prt->port_data_lock);
a->pdl = prt->port_data_lock;
}
async_add(a, &async_q[qix]);
return id;
}
#endif
(*a->async_invoke)(a->async_data);
if (async_ready(prt, a->async_data)) {
if (a->async_free != NULL)
(*a->async_free)(a->async_data);
}
erts_free(ERTS_ALC_T_ASYNC, (void *) a);
return id;
}
int driver_async_cancel(unsigned int id)
{
#ifdef USE_THREADS
if (erts_async_max_threads > 0)
return async_del(id);
#endif
return 0;
}