/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 2006-2012. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* %CopyrightEnd%
*/
/*
* Description: Poll interface suitable for ERTS on OSE with or without
* SMP support.
*
* The interface is currently implemented using:
* - receive + receive_fsem
*
* Author: Lukas Larsson
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "erl_thr_progress.h"
#include "erl_driver.h"
#include "erl_alloc.h"
#include "erl_poll.h"
#define NOFILE 4096
/*
* Some debug macros
*/
/* #define HARDDEBUG
#define HARDTRACE*/
#ifdef HARDDEBUG
#ifdef HARDTRACE
#define HARDTRACEF(X, ...) { fprintf(stderr, X, __VA_ARGS__); fprintf(stderr,"\r\n"); }
#else
#define HARDTRACEF(...)
#endif
#else
#define HARDTRACEF(X,...)
#define HARDDEBUGF(...)
#endif
#if 0
#define ERTS_POLL_DEBUG_PRINT
#endif
#if defined(DEBUG) && 0
#define HARD_DEBUG
#endif
# define SEL_ALLOC erts_alloc
# define SEL_REALLOC realloc_wrap
# define SEL_FREE erts_free
#ifdef ERTS_SMP
#define ERTS_POLLSET_LOCK(PS) \
erts_smp_mtx_lock(&(PS)->mtx)
#define ERTS_POLLSET_UNLOCK(PS) \
erts_smp_mtx_unlock(&(PS)->mtx)
#else
#define ERTS_POLLSET_LOCK(PS)
#define ERTS_POLLSET_UNLOCK(PS)
#endif
/*
* --- Data types ------------------------------------------------------------
*/
union SIGNAL {
SIGSELECT sig_no;
};
typedef struct erts_sigsel_item_ ErtsSigSelItem;
struct erts_sigsel_item_ {
ErtsSigSelItem *next;
ErtsSysFdType fd;
ErtsPollEvents events;
};
typedef struct erts_sigsel_info_ ErtsSigSelInfo;
struct erts_sigsel_info_ {
ErtsSigSelInfo *next;
SIGSELECT signo;
int (*decode)(OseSignal* sig, int* mode);
ErtsSigSelItem *fds;
};
struct ErtsPollSet_ {
SIGSELECT *sigs;
ErtsSigSelInfo *info;
Uint sig_count;
Uint item_count;
PROCESS interrupt;
erts_atomic32_t wakeup_state;
erts_smp_atomic32_t timeout;
#ifdef ERTS_SMP
erts_smp_mtx_t mtx;
#endif
};
static int max_fds = -1;
#define ERTS_POLL_NOT_WOKEN ((erts_aint32_t) (1 << 0))
#define ERTS_POLL_WOKEN_INTR ((erts_aint32_t) (1 << 1))
#define ERTS_POLL_WOKEN_TIMEDOUT ((erts_aint32_t) (1 << 2))
#define ERTS_POLL_WOKEN_IO_READY ((erts_aint32_t) (1 << 3))
#define ERTS_POLL_SLEEPING ((erts_aint32_t) (1 << 4))
/* signal list prototypes */
static ErtsSigSelInfo *get_sigsel_info(ErtsPollSet ps, SIGSELECT signo);
static ErtsSigSelItem *get_sigsel_item(ErtsPollSet ps, ErtsSysFdType fd);
static ErtsSigSelInfo *add_sigsel_info(ErtsPollSet ps, ErtsSysFdType fd, int (*decode)(OseSignal* sig, int* mode));
static ErtsSigSelItem *add_sigsel_item(ErtsPollSet ps, ErtsSysFdType fd, int (*decode)(OseSignal* sig, int* mode));
static int del_sigsel_info(ErtsPollSet ps, ErtsSigSelInfo *info);
static int del_sigsel_item(ErtsPollSet ps, ErtsSigSelItem *item);
static int update_sigsel(ErtsPollSet ps);
static ErtsSigSelInfo *
get_sigsel_info(ErtsPollSet ps, SIGSELECT signo) {
ErtsSigSelInfo *curr = ps->info;
while (curr != NULL) {
if (curr->signo == signo)
return curr;
curr = curr->next;
}
return NULL;
}
static ErtsSigSelItem *
get_sigsel_item(ErtsPollSet ps, ErtsSysFdType fd) {
ErtsSigSelInfo *info = get_sigsel_info(ps,fd->signo);
ErtsSigSelItem *curr;
if (info == NULL)
return NULL;
curr = info->fds;
while (curr != NULL) {
if (curr->fd->id == fd->id) {
ASSERT(curr->fd->signo == fd->signo);
return curr;
}
curr = curr->next;
}
return NULL;
}
static ErtsSigSelInfo *
add_sigsel_info(ErtsPollSet ps, ErtsSysFdType fd,
int (*decode)(OseSignal* sig, int* mode)) {
ErtsSigSelInfo *info = SEL_ALLOC(ERTS_ALC_T_POLLSET,
sizeof(ErtsSigSelInfo));
info->next = ps->info;
info->fds = NULL;
info->signo = fd->signo;
info->decode = decode;
ps->info = info;
ps->sig_count++;
return info;
}
static ErtsSigSelItem *
add_sigsel_item(ErtsPollSet ps, ErtsSysFdType fd,
int (*decode)(OseSignal* sig, int* mode)) {
ErtsSigSelInfo *info = get_sigsel_info(ps,fd->signo);
ErtsSigSelItem *item = SEL_ALLOC(ERTS_ALC_T_POLLSET,
sizeof(ErtsSigSelItem));
if (info == NULL)
info = add_sigsel_info(ps, fd, decode);
if (info->decode != decode) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
erts_dsprintf(dsbufp, "erts_poll_control() inconsistency: multiple resolve_signal functions for same signal (%d)\n",
fd->signo);
erts_send_error_to_logger_nogl(dsbufp);
}
ASSERT(info->decode == decode);
item->next = info->fds;
item->fd = fd;
item->events = 0;
info->fds = item;
ps->item_count++;
return item;
}
static int del_sigsel_info(ErtsPollSet ps, ErtsSigSelInfo *info) {
ErtsSigSelInfo *curr, *prev;
if (ps->info == info) {
ps->info = ps->info->next;
} else {
curr = ps->info->next;
prev = ps->info;
while (curr != info) {
if (curr == NULL)
return 1;
prev = curr;
curr = curr->next;
}
prev->next = curr->next;
}
ps->sig_count--;
SEL_FREE(ERTS_ALC_T_POLLSET, info);
return 0;
}
static int del_sigsel_item(ErtsPollSet ps, ErtsSigSelItem *item) {
ErtsSigSelInfo *info = get_sigsel_info(ps,item->fd->signo);
ErtsSigSelItem *curr, *prev;
ps->item_count--;
ASSERT(ps->item_count >= 0);
if (info->fds == item) {
info->fds = info->fds->next;
SEL_FREE(ERTS_ALC_T_POLLSET,item);
if (info->fds == NULL)
return del_sigsel_info(ps,info);
return 0;
}
curr = info->fds->next;
prev = info->fds;
while (curr != item) {
if (curr == NULL) {
/* We did not find an item to delete so we have to
* increment item count again.
*/
ps->item_count++;
return 1;
}
prev = curr;
curr = curr->next;
}
prev->next = curr->next;
SEL_FREE(ERTS_ALC_T_POLLSET,item);
return 0;
}
#ifdef ERTS_SMP
static void update_redir_tables(ErtsPollSet ps) {
struct OS_redir_entry *redir_table;
PROCESS sched_1 = ERTS_SCHEDULER_IX(0)->tid.id;
int i;
redir_table = SEL_ALLOC(ERTS_ALC_T_POLLSET,
sizeof(struct OS_redir_entry)*(ps->sig_count+1));
redir_table[0].sig = ps->sig_count+1;
redir_table[0].pid = 0;
for (i = 1; i < ps->sig_count+1; i++) {
redir_table[i].sig = ps->sigs[i];
redir_table[i].pid = sched_1;
}
for (i = 1; i < erts_no_schedulers; i++) {
ErtsSchedulerData *esdp = ERTS_SCHEDULER_IX(i);
set_redirection(esdp->tid.id,redir_table);
}
SEL_FREE(ERTS_ALC_T_POLLSET,redir_table);
}
#endif
static int update_sigsel(ErtsPollSet ps) {
ErtsSigSelInfo *info = ps->info;
int i;
if (ps->sigs != NULL)
SEL_FREE(ERTS_ALC_T_POLLSET,ps->sigs);
if (ps->sig_count == 0) {
/* If there are no signals we place a non-valid signal to make sure that
* we do not trigger on a any unrelated signals which are sent to the
* process.
*/
ps->sigs = SEL_ALLOC(ERTS_ALC_T_POLLSET,sizeof(SIGSELECT)*(2));
ps->sigs[0] = 1;
ps->sigs[1] = ERTS_SIGNAL_INVALID;
return 0;
}
ps->sigs = SEL_ALLOC(ERTS_ALC_T_POLLSET,sizeof(SIGSELECT)*(ps->sig_count+1));
ps->sigs[0] = ps->sig_count;
for (i = 1; info != NULL; i++, info = info->next)
ps->sigs[i] = info->signo;
#ifdef ERTS_SMP
update_redir_tables(ps);
#endif
return 0;
}
static ERTS_INLINE void
wake_poller(ErtsPollSet ps)
{
erts_aint32_t wakeup_state;
ERTS_THR_MEMORY_BARRIER;
wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state);
while (wakeup_state != ERTS_POLL_WOKEN_IO_READY
&& wakeup_state != ERTS_POLL_WOKEN_INTR) {
erts_aint32_t act = erts_atomic32_cmpxchg_nob(&ps->wakeup_state,
ERTS_POLL_WOKEN_INTR,
wakeup_state);
if (act == wakeup_state) {
wakeup_state = act;
break;
}
wakeup_state = act;
}
if (wakeup_state == ERTS_POLL_SLEEPING) {
/*
* Since we don't know the internals of signal_fsem() we issue
* a memory barrier as a safety precaution ensuring that
* the store we just made to wakeup_state wont be reordered
* with loads in signal_fsem().
*/
ERTS_THR_MEMORY_BARRIER;
signal_fsem(ps->interrupt);
}
}
static ERTS_INLINE void
reset_interrupt(ErtsPollSet ps)
{
/* We need to keep io-ready if set */
erts_aint32_t wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state);
while (wakeup_state != ERTS_POLL_NOT_WOKEN &&
wakeup_state != ERTS_POLL_SLEEPING) {
erts_aint32_t act = erts_atomic32_cmpxchg_nob(&ps->wakeup_state,
ERTS_POLL_NOT_WOKEN,
wakeup_state);
if (wakeup_state == act)
break;
wakeup_state = act;
}
ERTS_THR_MEMORY_BARRIER;
}
static ERTS_INLINE void
set_interrupt(ErtsPollSet ps)
{
wake_poller(ps);
}
void erts_poll_interrupt(ErtsPollSet ps,int set) {
HARDTRACEF("erts_poll_interrupt called!\n");
if (!set)
reset_interrupt(ps);
else
set_interrupt(ps);
}
void erts_poll_interrupt_timed(ErtsPollSet ps,int set,erts_short_time_t msec) {
HARDTRACEF("erts_poll_interrupt_timed called!\n");
if (!set)
reset_interrupt(ps);
else if (erts_smp_atomic32_read_acqb(&ps->timeout) > (erts_aint32_t) msec)
set_interrupt(ps);
}
ErtsPollEvents erts_poll_control(ErtsPollSet ps, ErtsSysFdType fd,
ErtsPollEvents pe, int on, int* do_wake,
int(*decode)(OseSignal* sig, int* mode)) {
ErtsSigSelItem *curr;
ErtsPollEvents new_events;
int old_sig_count;
HARDTRACEF(
"%ux: In erts_poll_control, fd = %d, pe = %d, on = %d, *do_wake = %d, curr = 0x%xu",
ps, fd, pe, on, do_wake, curr);
ERTS_POLLSET_LOCK(ps);
curr = get_sigsel_item(ps, fd);
old_sig_count = ps->sig_count;
if (curr == NULL && on) {
curr = add_sigsel_item(ps, fd, decode);
} else if (curr == NULL && !on) {
new_events = ERTS_POLL_EV_NVAL;
goto done;
}
new_events = curr->events;
if (pe == 0) {
*do_wake = 0;
goto done;
}
if (on) {
new_events |= pe;
curr->events = new_events;
} else {
new_events &= ~pe;
curr->events = new_events;
if (new_events == 0 && del_sigsel_item(ps, curr)) {
new_events = ERTS_POLL_EV_NVAL;
goto done;
}
}
if (ps->sig_count != old_sig_count) {
if (update_sigsel(ps))
new_events = ERTS_POLL_EV_NVAL;
}
done:
ERTS_POLLSET_UNLOCK(ps);
HARDTRACEF("%ux: Out erts_poll_control", ps);
return new_events;
}
int erts_poll_wait(ErtsPollSet ps,
ErtsPollResFd pr[],
int *len,
SysTimeval *utvp) {
int res = ETIMEDOUT, no_fds, currid = 0;
OSTIME timeout;
OseSignal *sig;
// HARDTRACEF("%ux: In erts_poll_wait",ps);
if (ps->interrupt == (PROCESS)0)
ps->interrupt = current_process();
ASSERT(current_process() == ps->interrupt);
ASSERT(get_fsem(current_process()) == 0);
ASSERT(erts_atomic32_read_nob(&ps->wakeup_state) &
(ERTS_POLL_NOT_WOKEN | ERTS_POLL_WOKEN_INTR));
/* Max no of spots avable in pr */
no_fds = *len;
*len = 0;
ASSERT(utvp);
/* erts_printf("Entering erts_poll_wait(), timeout=%d\n",
(int) utvp->tv_sec*1000 + utvp->tv_usec/1000); */
timeout = utvp->tv_sec*1000 + utvp->tv_usec/1000;
if (timeout > ((time_t) ERTS_AINT32_T_MAX))
timeout = ERTS_AINT32_T_MAX;
erts_smp_atomic32_set_relb(&ps->timeout, (erts_aint32_t) timeout);
while (currid < no_fds) {
if (timeout > 0) {
erts_aint32_t act = erts_atomic32_cmpxchg_nob(&ps->wakeup_state,
ERTS_POLL_SLEEPING,
ERTS_POLL_NOT_WOKEN);
if (act == ERTS_POLL_NOT_WOKEN) {
#ifdef ERTS_SMP
erts_thr_progress_prepare_wait(NULL);
#endif
sig = receive_fsem(timeout, ps->sigs, 1);
#ifdef ERTS_SMP
erts_thr_progress_finalize_wait(NULL);
#endif
} else {
ASSERT(act == ERTS_POLL_WOKEN_INTR);
sig = OS_RCV_FSEM;
}
} else
sig = receive_w_tmo(0, ps->sigs);
if (sig == NULL) {
if (timeout > 0) {
erts_aint32_t act = erts_atomic32_cmpxchg_nob(&ps->wakeup_state,
ERTS_POLL_WOKEN_TIMEDOUT,
ERTS_POLL_SLEEPING);
if (act == ERTS_POLL_WOKEN_INTR)
/* Restore fsem as it was signaled but we got a timeout */
wait_fsem(1);
} else
erts_atomic32_cmpxchg_nob(&ps->wakeup_state,
ERTS_POLL_WOKEN_TIMEDOUT,
ERTS_POLL_NOT_WOKEN);
break;
} else if (sig == OS_RCV_FSEM) {
ASSERT(erts_atomic32_read_nob(&ps->wakeup_state) == ERTS_POLL_WOKEN_INTR);
break;
}
{
ErtsSigSelInfo *info = get_sigsel_info(ps, sig->sig_no);
int mode = -1;
struct erts_sys_fd_type fd = { sig->sig_no, info->decode(sig, &mode) };
ErtsSigSelItem *item = get_sigsel_item(ps, &fd);
ASSERT(sig);
if (currid == 0 && timeout > 0) {
erts_aint32_t act = erts_atomic32_cmpxchg_nob(&ps->wakeup_state,
ERTS_POLL_WOKEN_IO_READY,
ERTS_POLL_SLEEPING);
if (act == ERTS_POLL_WOKEN_INTR) {
/* Restore fsem as it was signaled but we got a msg */
wait_fsem(1);
act = erts_atomic32_cmpxchg_nob(&ps->wakeup_state,
ERTS_POLL_WOKEN_IO_READY,
ERTS_POLL_WOKEN_INTR);
}
} else if (currid == 0) {
erts_atomic32_set_nob(&ps->wakeup_state,
ERTS_POLL_WOKEN_IO_READY);
}
if (item == NULL) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
erts_dsprintf(
dsbufp,
"erts_poll_wait() failed: found unkown signal id %d (signo %u) "
"(curr_proc 0x%x /sender 0x%x)\n",
fd.id, fd.signo, current_process(), sender(&sig));
erts_send_error_to_logger_nogl(dsbufp);
timeout = 0;
ASSERT(0);
} else if (mode == -1 && item->events == (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
erts_dsprintf(
dsbufp,
"erts_poll_wait() failed: found ambigous signal id %d (signo %u) "
"(curr_proc 0x%x /sender 0x%x)\n You have to give a specify a mode "
"in the resolve_signal callback for this signal.\n",
fd.id, fd.signo, current_process(), sender(&sig));
erts_send_error_to_logger_nogl(dsbufp);
timeout = 0;
ASSERT(0);
} else {
int i;
struct erts_sys_fd_type *fd = NULL;
ErtsPollOseMsgList *tl,*new;
/* Figure out which mode to set and which queue to store
the signal in */
if (mode == -1)
mode = item->events;
else if (mode == 0)
mode = ERTS_POLL_EV_IN;
else if (mode == 1)
mode = ERTS_POLL_EV_OUT;
else
abort();
/* Check if this fd has already been triggered by a previous signal */
for (i = 0; i < currid;i++) {
if (pr[i].fd == item->fd) {
fd = pr[i].fd;
pr[i].events |= mode;
break;
}
}
/* First time this fd is triggered */
if (fd == NULL) {
pr[currid].fd = item->fd;
pr[currid].events = mode;
fd = item->fd;
timeout = 0;
currid++;
}
/* Insert new signal in approriate list */
new = erts_alloc(ERTS_ALC_T_FD_SIG_LIST,sizeof(ErtsPollOseMsgList));
new->next = NULL;
new->data = sig;
ethr_mutex_lock(&fd->mtx);
if (mode & ERTS_POLL_EV_IN)
tl = fd->imsgs;
else if (mode & ERTS_POLL_EV_OUT)
tl = fd->omsgs;
if (tl == NULL) {
if (mode & ERTS_POLL_EV_IN)
fd->imsgs = new;
else if (mode & ERTS_POLL_EV_OUT)
fd->omsgs = new;
} else {
while (tl->next != NULL)
tl = tl->next;
tl->next = new;
}
ethr_mutex_unlock(&fd->mtx);
}
}
}
{
erts_aint32_t wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state);
switch (wakeup_state) {
case ERTS_POLL_WOKEN_IO_READY:
res = 0;
break;
case ERTS_POLL_WOKEN_INTR:
res = EINTR;
break;
case ERTS_POLL_WOKEN_TIMEDOUT:
res = ETIMEDOUT;
break;
case ERTS_POLL_NOT_WOKEN:
/* This happens when we get an invalid signal only */
res = EINVAL;
break;
default:
res = 0;
erl_exit(ERTS_ABORT_EXIT,
"%s:%d: Internal error: Invalid wakeup_state=%d\n",
__FILE__, __LINE__, (int) wakeup_state);
}
}
erts_atomic32_set_nob(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN);
erts_smp_atomic32_set_nob(&ps->timeout, ERTS_AINT32_T_MAX);
*len = currid;
// HARDTRACEF("%ux: Out erts_poll_wait",ps);
return res;
}
int erts_poll_max_fds(void)
{
HARDTRACEF("In/Out erts_poll_max_fds -> %d",max_fds);
return max_fds;
}
void erts_poll_info(ErtsPollSet ps,
ErtsPollInfo *pip)
{
Uint size = 0;
Uint num_events = 0;
size += sizeof(struct ErtsPollSet_);
size += sizeof(ErtsSigSelInfo)*ps->sig_count;
size += sizeof(ErtsSigSelItem)*ps->item_count;
size += sizeof(SIGSELECT)*(ps->sig_count+1);
pip->primary = "receive_fsem";
pip->fallback = NULL;
pip->kernel_poll = NULL;
pip->memory_size = size;
pip->poll_set_size = num_events;
pip->fallback_poll_set_size = 0;
pip->lazy_updates = 0;
pip->pending_updates = 0;
pip->batch_updates = 0;
pip->concurrent_updates = 0;
pip->max_fds = erts_poll_max_fds();
HARDTRACEF("%ux: Out erts_poll_info",ps);
}
ErtsPollSet erts_poll_create_pollset(void)
{
ErtsPollSet ps = SEL_ALLOC(ERTS_ALC_T_POLLSET,
sizeof(struct ErtsPollSet_));
ps->sigs = NULL;
ps->sig_count = 0;
ps->item_count = 0;
ps->info = NULL;
ps->interrupt = (PROCESS)0;
erts_atomic32_init_nob(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN);
erts_smp_atomic32_init_nob(&ps->timeout, ERTS_AINT32_T_MAX);
#ifdef ERTS_SMP
erts_smp_mtx_init(&ps->mtx, "pollset");
#endif
update_sigsel(ps);
HARDTRACEF("%ux: Out erts_poll_create_pollset",ps);
return ps;
}
void erts_poll_destroy_pollset(ErtsPollSet ps)
{
ErtsSigSelInfo *info;
for (info = ps->info; ps->info != NULL; info = ps->info, ps->info = ps->info->next) {
ErtsSigSelItem *item;
for (item = info->fds; info->fds != NULL; item = info->fds, info->fds = info->fds->next)
SEL_FREE(ERTS_ALC_T_POLLSET, item);
SEL_FREE(ERTS_ALC_T_POLLSET, info);
}
SEL_FREE(ERTS_ALC_T_POLLSET,ps->sigs);
#ifdef ERTS_SMP
erts_smp_mtx_destroy(&ps->mtx);
#endif
SEL_FREE(ERTS_ALC_T_POLLSET,ps);
}
void erts_poll_init(void)
{
HARDTRACEF("In %s", __FUNCTION__);
max_fds = 256;
HARDTRACEF("Out %s", __FUNCTION__);
}