/* * %CopyrightBegin% * * Copyright Ericsson AB 2006-2012. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved online at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * %CopyrightEnd% */ /* * Description: Poll interface suitable for ERTS on OSE with or without * SMP support. * * The interface is currently implemented using: * - receive + receive_fsem * * Author: Lukas Larsson */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "erl_thr_progress.h" #include "erl_driver.h" #include "erl_alloc.h" #include "erl_poll.h" #define NOFILE 4096 /* * Some debug macros */ /* #define HARDDEBUG #define HARDTRACE*/ #ifdef HARDDEBUG #ifdef HARDTRACE #define HARDTRACEF(X, ...) { fprintf(stderr, X, __VA_ARGS__); fprintf(stderr,"\r\n"); } #else #define HARDTRACEF(...) #endif #else #define HARDTRACEF(X,...) #define HARDDEBUGF(...) #endif #if 0 #define ERTS_POLL_DEBUG_PRINT #endif #if defined(DEBUG) && 0 #define HARD_DEBUG #endif # define SEL_ALLOC erts_alloc # define SEL_REALLOC realloc_wrap # define SEL_FREE erts_free #ifdef ERTS_SMP #define ERTS_POLLSET_LOCK(PS) \ erts_smp_mtx_lock(&(PS)->mtx) #define ERTS_POLLSET_UNLOCK(PS) \ erts_smp_mtx_unlock(&(PS)->mtx) #else #define ERTS_POLLSET_LOCK(PS) #define ERTS_POLLSET_UNLOCK(PS) #endif /* * --- Data types ------------------------------------------------------------ */ union SIGNAL { SIGSELECT sig_no; }; typedef struct erts_sigsel_item_ ErtsSigSelItem; struct erts_sigsel_item_ { ErtsSigSelItem *next; ErtsSysFdType fd; ErtsPollEvents events; }; typedef struct erts_sigsel_info_ ErtsSigSelInfo; struct erts_sigsel_info_ { ErtsSigSelInfo *next; SIGSELECT signo; int (*decode)(OseSignal* sig, int* mode); ErtsSigSelItem *fds; }; struct ErtsPollSet_ { SIGSELECT *sigs; ErtsSigSelInfo *info; Uint sig_count; Uint item_count; PROCESS interrupt; erts_atomic32_t wakeup_state; erts_smp_atomic32_t timeout; #ifdef ERTS_SMP erts_smp_mtx_t mtx; #endif }; static int max_fds = -1; #define ERTS_POLL_NOT_WOKEN ((erts_aint32_t) (1 << 0)) #define ERTS_POLL_WOKEN_INTR ((erts_aint32_t) (1 << 1)) #define ERTS_POLL_WOKEN_TIMEDOUT ((erts_aint32_t) (1 << 2)) #define ERTS_POLL_WOKEN_IO_READY ((erts_aint32_t) (1 << 3)) #define ERTS_POLL_SLEEPING ((erts_aint32_t) (1 << 4)) /* signal list prototypes */ static ErtsSigSelInfo *get_sigsel_info(ErtsPollSet ps, SIGSELECT signo); static ErtsSigSelItem *get_sigsel_item(ErtsPollSet ps, ErtsSysFdType fd); static ErtsSigSelInfo *add_sigsel_info(ErtsPollSet ps, ErtsSysFdType fd, int (*decode)(OseSignal* sig, int* mode)); static ErtsSigSelItem *add_sigsel_item(ErtsPollSet ps, ErtsSysFdType fd, int (*decode)(OseSignal* sig, int* mode)); static int del_sigsel_info(ErtsPollSet ps, ErtsSigSelInfo *info); static int del_sigsel_item(ErtsPollSet ps, ErtsSigSelItem *item); static int update_sigsel(ErtsPollSet ps); static ErtsSigSelInfo * get_sigsel_info(ErtsPollSet ps, SIGSELECT signo) { ErtsSigSelInfo *curr = ps->info; while (curr != NULL) { if (curr->signo == signo) return curr; curr = curr->next; } return NULL; } static ErtsSigSelItem * get_sigsel_item(ErtsPollSet ps, ErtsSysFdType fd) { ErtsSigSelInfo *info = get_sigsel_info(ps,fd->signo); ErtsSigSelItem *curr; if (info == NULL) return NULL; curr = info->fds; while (curr != NULL) { if (curr->fd->id == fd->id) { ASSERT(curr->fd->signo == fd->signo); return curr; } curr = curr->next; } return NULL; } static ErtsSigSelInfo * add_sigsel_info(ErtsPollSet ps, ErtsSysFdType fd, int (*decode)(OseSignal* sig, int* mode)) { ErtsSigSelInfo *info = SEL_ALLOC(ERTS_ALC_T_POLLSET, sizeof(ErtsSigSelInfo)); info->next = ps->info; info->fds = NULL; info->signo = fd->signo; info->decode = decode; ps->info = info; ps->sig_count++; return info; } static ErtsSigSelItem * add_sigsel_item(ErtsPollSet ps, ErtsSysFdType fd, int (*decode)(OseSignal* sig, int* mode)) { ErtsSigSelInfo *info = get_sigsel_info(ps,fd->signo); ErtsSigSelItem *item = SEL_ALLOC(ERTS_ALC_T_POLLSET, sizeof(ErtsSigSelItem)); if (info == NULL) info = add_sigsel_info(ps, fd, decode); if (info->decode != decode) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "erts_poll_control() inconsistency: multiple resolve_signal functions for same signal (%d)\n", fd->signo); erts_send_error_to_logger_nogl(dsbufp); } ASSERT(info->decode == decode); item->next = info->fds; item->fd = fd; item->events = 0; info->fds = item; ps->item_count++; return item; } static int del_sigsel_info(ErtsPollSet ps, ErtsSigSelInfo *info) { ErtsSigSelInfo *curr, *prev; if (ps->info == info) { ps->info = ps->info->next; } else { curr = ps->info->next; prev = ps->info; while (curr != info) { if (curr == NULL) return 1; prev = curr; curr = curr->next; } prev->next = curr->next; } ps->sig_count--; SEL_FREE(ERTS_ALC_T_POLLSET, info); return 0; } static int del_sigsel_item(ErtsPollSet ps, ErtsSigSelItem *item) { ErtsSigSelInfo *info = get_sigsel_info(ps,item->fd->signo); ErtsSigSelItem *curr, *prev; ps->item_count--; ASSERT(ps->item_count >= 0); if (info->fds == item) { info->fds = info->fds->next; SEL_FREE(ERTS_ALC_T_POLLSET,item); if (info->fds == NULL) return del_sigsel_info(ps,info); return 0; } curr = info->fds->next; prev = info->fds; while (curr != item) { if (curr == NULL) { /* We did not find an item to delete so we have to * increment item count again. */ ps->item_count++; return 1; } prev = curr; curr = curr->next; } prev->next = curr->next; SEL_FREE(ERTS_ALC_T_POLLSET,item); return 0; } #ifdef ERTS_SMP static void update_redir_tables(ErtsPollSet ps) { struct OS_redir_entry *redir_table; PROCESS sched_1 = ERTS_SCHEDULER_IX(0)->tid.id; int i; redir_table = SEL_ALLOC(ERTS_ALC_T_POLLSET, sizeof(struct OS_redir_entry)*(ps->sig_count+1)); redir_table[0].sig = ps->sig_count+1; redir_table[0].pid = 0; for (i = 1; i < ps->sig_count+1; i++) { ramlog_printf("Adding 0x%p -> 0x%p to redir table\n",ps->sigs[i],sched_1); redir_table[i].sig = ps->sigs[i]; redir_table[i].pid = sched_1; } for (i = 1; i < erts_no_schedulers; i++) { ErtsSchedulerData *esdp = ERTS_SCHEDULER_IX(i); set_redirection(esdp->tid.id,redir_table); ramlog_printf("Setting redir table to 0x%p\n",esdp->tid.id); } SEL_FREE(ERTS_ALC_T_POLLSET,redir_table); } #endif static int update_sigsel(ErtsPollSet ps) { ErtsSigSelInfo *info = ps->info; int i; if (ps->sigs != NULL) SEL_FREE(ERTS_ALC_T_POLLSET,ps->sigs); if (ps->sig_count == 0) { /* If there are no signals we place a non-valid signal to make sure that * we do not trigger on a any unrelated signals which are sent to the * process. */ ps->sigs = SEL_ALLOC(ERTS_ALC_T_POLLSET,sizeof(SIGSELECT)*(2)); ps->sigs[0] = 1; ps->sigs[1] = ERTS_SIGNAL_INVALID; return 0; } ps->sigs = SEL_ALLOC(ERTS_ALC_T_POLLSET,sizeof(SIGSELECT)*(ps->sig_count+1)); ps->sigs[0] = ps->sig_count; for (i = 1; info != NULL; i++, info = info->next) ps->sigs[i] = info->signo; #ifdef ERTS_SMP update_redir_tables(ps); #endif return 0; } static ERTS_INLINE void wake_poller(ErtsPollSet ps) { erts_aint32_t wakeup_state; ERTS_THR_MEMORY_BARRIER; wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state); while (wakeup_state != ERTS_POLL_WOKEN_IO_READY && wakeup_state != ERTS_POLL_WOKEN_INTR) { erts_aint32_t act = erts_atomic32_cmpxchg_nob(&ps->wakeup_state, ERTS_POLL_WOKEN_INTR, wakeup_state); if (act == wakeup_state) { wakeup_state = act; break; } wakeup_state = act; } if (wakeup_state == ERTS_POLL_SLEEPING) { /* * Since we don't know the internals of signal_fsem() we issue * a memory barrier as a safety precaution ensuring that * the store we just made to wakeup_state wont be reordered * with loads in signal_fsem(). */ ERTS_THR_MEMORY_BARRIER; signal_fsem(ps->interrupt); } } static ERTS_INLINE void reset_interrupt(ErtsPollSet ps) { /* We need to keep io-ready if set */ erts_aint32_t wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state); while (wakeup_state != ERTS_POLL_NOT_WOKEN && wakeup_state != ERTS_POLL_SLEEPING) { erts_aint32_t act = erts_atomic32_cmpxchg_nob(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN, wakeup_state); if (wakeup_state == act) break; wakeup_state = act; } ERTS_THR_MEMORY_BARRIER; } static ERTS_INLINE void set_interrupt(ErtsPollSet ps) { wake_poller(ps); } void erts_poll_interrupt(ErtsPollSet ps,int set) { HARDTRACEF("erts_poll_interrupt called!\n"); if (!set) reset_interrupt(ps); else set_interrupt(ps); } void erts_poll_interrupt_timed(ErtsPollSet ps,int set,erts_short_time_t msec) { HARDTRACEF("erts_poll_interrupt_timed called!\n"); if (!set) reset_interrupt(ps); else if (erts_smp_atomic32_read_acqb(&ps->timeout) > (erts_aint32_t) msec) set_interrupt(ps); } ErtsPollEvents erts_poll_control(ErtsPollSet ps, ErtsSysFdType fd, ErtsPollEvents pe, int on, int* do_wake, int(*decode)(OseSignal* sig, int* mode)) { ErtsSigSelItem *curr; ErtsPollEvents new_events; int old_sig_count; HARDTRACEF( "%ux: In erts_poll_control, fd = %d, pe = %d, on = %d, *do_wake = %d, curr = 0x%xu", ps, fd, pe, on, do_wake, curr); ERTS_POLLSET_LOCK(ps); curr = get_sigsel_item(ps, fd); old_sig_count = ps->sig_count; if (curr == NULL && on) { curr = add_sigsel_item(ps, fd, decode); } else if (curr == NULL && !on) { new_events = ERTS_POLL_EV_NVAL; goto done; } new_events = curr->events; if (pe == 0) { *do_wake = 0; goto done; } if (on) { new_events |= pe; curr->events = new_events; } else { new_events &= ~pe; curr->events = new_events; if (new_events == 0 && del_sigsel_item(ps, curr)) { new_events = ERTS_POLL_EV_NVAL; goto done; } } if (ps->sig_count != old_sig_count) { if (update_sigsel(ps)) new_events = ERTS_POLL_EV_NVAL; } done: ERTS_POLLSET_UNLOCK(ps); HARDTRACEF("%ux: Out erts_poll_control", ps); return new_events; } int erts_poll_wait(ErtsPollSet ps, ErtsPollResFd pr[], int *len, SysTimeval *utvp) { int res = ETIMEDOUT, no_fds, currid = 0; OSTIME timeout; OseSignal *sig; // HARDTRACEF("%ux: In erts_poll_wait",ps); if (ps->interrupt == (PROCESS)0) ps->interrupt = current_process(); ASSERT(current_process() == ps->interrupt); ASSERT(get_fsem(current_process()) == 0); ASSERT(erts_atomic32_read_nob(&ps->wakeup_state) & (ERTS_POLL_NOT_WOKEN | ERTS_POLL_WOKEN_INTR)); /* Max no of spots avable in pr */ no_fds = *len; *len = 0; ASSERT(utvp); /* erts_printf("Entering erts_poll_wait(), timeout=%d\n", (int) utvp->tv_sec*1000 + utvp->tv_usec/1000); */ timeout = utvp->tv_sec*1000 + utvp->tv_usec/1000; if (timeout > ((time_t) ERTS_AINT32_T_MAX)) timeout = ERTS_AINT32_T_MAX; erts_smp_atomic32_set_relb(&ps->timeout, (erts_aint32_t) timeout); while (currid < no_fds) { if (timeout > 0) { erts_aint32_t act = erts_atomic32_cmpxchg_nob(&ps->wakeup_state, ERTS_POLL_SLEEPING, ERTS_POLL_NOT_WOKEN); if (act == ERTS_POLL_NOT_WOKEN) { #ifdef ERTS_SMP erts_thr_progress_prepare_wait(NULL); #endif sig = receive_fsem(timeout, ps->sigs, 1); #ifdef ERTS_SMP erts_thr_progress_finalize_wait(NULL); #endif } else { ASSERT(act == ERTS_POLL_WOKEN_INTR); sig = OS_RCV_FSEM; } } else sig = receive_w_tmo(0, ps->sigs); if (sig == NULL) { if (timeout > 0) { erts_aint32_t act = erts_atomic32_cmpxchg_nob(&ps->wakeup_state, ERTS_POLL_WOKEN_TIMEDOUT, ERTS_POLL_SLEEPING); if (act == ERTS_POLL_WOKEN_INTR) /* Restore fsem as it was signaled but we got a timeout */ wait_fsem(1); } else erts_atomic32_cmpxchg_nob(&ps->wakeup_state, ERTS_POLL_WOKEN_TIMEDOUT, ERTS_POLL_NOT_WOKEN); break; } else if (sig == OS_RCV_FSEM) { ASSERT(erts_atomic32_read_nob(&ps->wakeup_state) == ERTS_POLL_WOKEN_INTR); break; } { ErtsSigSelInfo *info = get_sigsel_info(ps, sig->sig_no); int mode = -1; struct erts_sys_fd_type fd = { sig->sig_no, info->decode(sig, &mode) }; ErtsSigSelItem *item = get_sigsel_item(ps, &fd); ASSERT(sig); if (currid == 0 && timeout > 0) { erts_aint32_t act = erts_atomic32_cmpxchg_nob(&ps->wakeup_state, ERTS_POLL_WOKEN_IO_READY, ERTS_POLL_SLEEPING); if (act == ERTS_POLL_WOKEN_INTR) { /* Restore fsem as it was signaled but we got a msg */ wait_fsem(1); act = erts_atomic32_cmpxchg_nob(&ps->wakeup_state, ERTS_POLL_WOKEN_IO_READY, ERTS_POLL_WOKEN_INTR); } } else if (currid == 0) { erts_atomic32_set_nob(&ps->wakeup_state, ERTS_POLL_WOKEN_IO_READY); } if (item == NULL) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf( dsbufp, "erts_poll_wait() failed: found unkown signal id %d (signo %u) " "(curr_proc 0x%x /sender 0x%x)\n", fd.id, fd.signo, current_process(), sender(&sig)); erts_send_error_to_logger_nogl(dsbufp); timeout = 0; ASSERT(0); } else if (mode == -1 && item->events == (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf( dsbufp, "erts_poll_wait() failed: found ambigous signal id %d (signo %u) " "(curr_proc 0x%x /sender 0x%x)\n You have to give a specify a mode " "in the resolve_signal callback for this signal.\n", fd.id, fd.signo, current_process(), sender(&sig)); erts_send_error_to_logger_nogl(dsbufp); timeout = 0; ASSERT(0); } else { int i; struct erts_sys_fd_type *fd = NULL; ErtsPollOseMsgList *tl,*new; /* Figure out which mode to set and which queue to store the signal in */ if (mode == -1) mode = item->events; else if (mode == 0) mode = ERTS_POLL_EV_IN; else if (mode == 1) mode = ERTS_POLL_EV_OUT; else abort(); /* Check if this fd has already been triggered by a previous signal */ for (i = 0; i < currid;i++) { if (pr[i].fd == item->fd) { fd = pr[i].fd; pr[i].events |= mode; break; } } /* First time this fd is triggered */ if (fd == NULL) { pr[currid].fd = item->fd; pr[currid].events = mode; fd = item->fd; timeout = 0; currid++; } /* Insert new signal in approriate list */ new = erts_alloc(ERTS_ALC_T_FD_SIG_LIST,sizeof(ErtsPollOseMsgList)); new->next = NULL; new->data = sig; ethr_mutex_lock(&fd->mtx); if (mode & ERTS_POLL_EV_IN) tl = fd->imsgs; else if (mode & ERTS_POLL_EV_OUT) tl = fd->omsgs; if (tl == NULL) { if (mode & ERTS_POLL_EV_IN) fd->imsgs = new; else if (mode & ERTS_POLL_EV_OUT) fd->omsgs = new; } else { while (tl->next != NULL) tl = tl->next; tl->next = new; } ethr_mutex_unlock(&fd->mtx); } } } { erts_aint32_t wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state); switch (wakeup_state) { case ERTS_POLL_WOKEN_IO_READY: res = 0; break; case ERTS_POLL_WOKEN_INTR: res = EINTR; break; case ERTS_POLL_WOKEN_TIMEDOUT: res = ETIMEDOUT; break; case ERTS_POLL_NOT_WOKEN: /* This happens when we get an invalid signal only */ res = EINVAL; break; default: res = 0; erl_exit(ERTS_ABORT_EXIT, "%s:%d: Internal error: Invalid wakeup_state=%d\n", __FILE__, __LINE__, (int) wakeup_state); } } erts_atomic32_set_nob(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN); erts_smp_atomic32_set_nob(&ps->timeout, ERTS_AINT32_T_MAX); *len = currid; // HARDTRACEF("%ux: Out erts_poll_wait",ps); return res; } int erts_poll_max_fds(void) { HARDTRACEF("In/Out erts_poll_max_fds -> %d",max_fds); return max_fds; } void erts_poll_info(ErtsPollSet ps, ErtsPollInfo *pip) { Uint size = 0; Uint num_events = 0; size += sizeof(struct ErtsPollSet_); size += sizeof(ErtsSigSelInfo)*ps->sig_count; size += sizeof(ErtsSigSelItem)*ps->item_count; size += sizeof(SIGSELECT)*(ps->sig_count+1); pip->primary = "receive_fsem"; pip->fallback = NULL; pip->kernel_poll = NULL; pip->memory_size = size; pip->poll_set_size = num_events; pip->fallback_poll_set_size = 0; pip->lazy_updates = 0; pip->pending_updates = 0; pip->batch_updates = 0; pip->concurrent_updates = 0; pip->max_fds = erts_poll_max_fds(); HARDTRACEF("%ux: Out erts_poll_info",ps); } ErtsPollSet erts_poll_create_pollset(void) { ErtsPollSet ps = SEL_ALLOC(ERTS_ALC_T_POLLSET, sizeof(struct ErtsPollSet_)); ps->sigs = NULL; ps->sig_count = 0; ps->item_count = 0; ps->info = NULL; ps->interrupt = (PROCESS)0; erts_atomic32_init_nob(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN); erts_smp_atomic32_init_nob(&ps->timeout, ERTS_AINT32_T_MAX); #ifdef ERTS_SMP erts_smp_mtx_init(&ps->mtx, "pollset"); #endif update_sigsel(ps); HARDTRACEF("%ux: Out erts_poll_create_pollset",ps); return ps; } void erts_poll_destroy_pollset(ErtsPollSet ps) { ErtsSigSelInfo *info; for (info = ps->info; ps->info != NULL; info = ps->info, ps->info = ps->info->next) { ErtsSigSelItem *item; for (item = info->fds; info->fds != NULL; item = info->fds, info->fds = info->fds->next) SEL_FREE(ERTS_ALC_T_POLLSET, item); SEL_FREE(ERTS_ALC_T_POLLSET, info); } SEL_FREE(ERTS_ALC_T_POLLSET,ps->sigs); #ifdef ERTS_SMP erts_smp_mtx_destroy(&ps->mtx); #endif SEL_FREE(ERTS_ALC_T_POLLSET,ps); } void erts_poll_init(void) { HARDTRACEF("In %s", __FUNCTION__); max_fds = 256; HARDTRACEF("Out %s", __FUNCTION__); }