diff options
Diffstat (limited to 'erts/emulator/sys/common/erl_check_io.c')
-rw-r--r-- | erts/emulator/sys/common/erl_check_io.c | 2304 |
1 files changed, 1043 insertions, 1261 deletions
diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c index ff0f3ea121..3131b96536 100644 --- a/erts/emulator/sys/common/erl_check_io.c +++ b/erts/emulator/sys/common/erl_check_io.c @@ -29,7 +29,6 @@ #endif #define ERL_CHECK_IO_C__ -#define ERTS_WANT_BREAK_HANDLING #ifndef WANT_NONBLOCKING # define WANT_NONBLOCKING #endif @@ -44,151 +43,108 @@ #define ERTS_WANT_TIMER_WHEEL_API #include "erl_time.h" +#if 0 +#define DEBUG_PRINT(FMT, ...) erts_printf(FMT "\r\n", ##__VA_ARGS__) +#define DEBUG_PRINT_FD(FMT, STATE, ...) \ + DEBUG_PRINT("%d: " FMT " (ev=%s, ac=%s, flg=%d)", \ + (STATE) ? (STATE)->fd : (ErtsSysFdType)-1, ##__VA_ARGS__, \ + ev2str((STATE) ? (STATE)->events : ERTS_POLL_EV_NONE), \ + ev2str((STATE) ? (STATE)->active_events : ERTS_POLL_EV_NONE), \ + (STATE) ? (STATE)->flags : ERTS_EV_FLAG_CLEAR) +#define DEBUG_PRINT_MODE +#else +#define DEBUG_PRINT(...) +#endif + +#ifndef DEBUG_PRINT_FD +#define DEBUG_PRINT_FD(...) +#endif + #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS # include "safe_hash.h" # define DRV_EV_STATE_HTAB_SIZE 1024 #endif -typedef char EventStateType; -#define ERTS_EV_TYPE_NONE ((EventStateType) 0) -#define ERTS_EV_TYPE_DRV_SEL ((EventStateType) 1) /* driver_select */ -#define ERTS_EV_TYPE_STOP_USE ((EventStateType) 2) /* pending stop_select */ -#define ERTS_EV_TYPE_NIF ((EventStateType) 3) /* enif_select */ -#define ERTS_EV_TYPE_STOP_NIF ((EventStateType) 4) /* pending nif stop */ - -typedef char EventStateFlags; -#define ERTS_EV_FLAG_USED ((EventStateFlags) 1) /* ERL_DRV_USE has been turned on */ -#define ERTS_EV_FLAG_DEFER_IN_EV ((EventStateFlags) 2) -#define ERTS_EV_FLAG_DEFER_OUT_EV ((EventStateFlags) 4) +typedef enum { + ERTS_EV_TYPE_NONE = 0, + ERTS_EV_TYPE_DRV_SEL = 1, /* driver_select */ + ERTS_EV_TYPE_STOP_USE = 2, /* pending stop_select */ + ERTS_EV_TYPE_NIF = 3, /* enif_select */ + ERTS_EV_TYPE_STOP_NIF = 4 /* pending nif stop */ +} EventStateType; -#ifdef DEBUG -# define ERTS_ACTIVE_FD_INC 2 +typedef enum { + ERTS_EV_FLAG_CLEAR = 0, + ERTS_EV_FLAG_USED = 1, /* ERL_DRV_USE has been turned on */ +#ifdef ERTS_ENABLE_KERNEL_POLL + ERTS_EV_FLAG_FALLBACK = 2, /* Set when kernel poll rejected fd + and it was put in the nkp version */ #else -# define ERTS_ACTIVE_FD_INC 128 + ERTS_EV_FLAG_FALLBACK = ERTS_EV_FLAG_CLEAR, #endif -#define ERTS_CHECK_IO_POLL_RES_LEN 512 - -#if defined(ERTS_KERNEL_POLL_VERSION) -# define ERTS_CIO_EXPORT(FUNC) FUNC ## _kp -#elif defined(ERTS_NO_KERNEL_POLL_VERSION) -# define ERTS_CIO_EXPORT(FUNC) FUNC ## _nkp -#else -# define ERTS_CIO_EXPORT(FUNC) FUNC -#endif + /* Combinations */ + ERTS_EV_FLAG_USED_FALLBACK = ERTS_EV_FLAG_USED | ERTS_EV_FLAG_FALLBACK +} EventStateFlags; -#define ERTS_CIO_POLL_CTL ERTS_POLL_EXPORT(erts_poll_control) -#define ERTS_CIO_POLL_CTLV ERTS_POLL_EXPORT(erts_poll_controlv) -#define ERTS_CIO_POLL_WAIT ERTS_POLL_EXPORT(erts_poll_wait) -#define ERTS_CIO_POLL_INTR ERTS_POLL_EXPORT(erts_poll_interrupt) -#define ERTS_CIO_POLL_INTR_TMD ERTS_POLL_EXPORT(erts_poll_interrupt_timed) -#define ERTS_CIO_NEW_POLLSET ERTS_POLL_EXPORT(erts_poll_create_pollset) -#define ERTS_CIO_FREE_POLLSET ERTS_POLL_EXPORT(erts_poll_destroy_pollset) -#define ERTS_CIO_POLL_MAX_FDS ERTS_POLL_EXPORT(erts_poll_max_fds) -#define ERTS_CIO_POLL_INIT ERTS_POLL_EXPORT(erts_poll_init) -#define ERTS_CIO_POLL_INFO ERTS_POLL_EXPORT(erts_poll_info) +#define flag2str(flags) \ + ((flags) == ERTS_EV_FLAG_CLEAR ? "CLEAR" : \ + ((flags) == ERTS_EV_FLAG_USED ? "USED" : \ + ((flags) == ERTS_EV_FLAG_FALLBACK ? "FLBK" : \ + ((flags) == ERTS_EV_FLAG_USED_FALLBACK ? "USED|FLBK" : "ERROR")))) -#define GET_FD(fd) fd +/* How many events that can be handled at once by one erts_poll_wait call */ +#define ERTS_CHECK_IO_POLL_RES_LEN 512 -static struct pollset_info +/* Each I/O Poll Thread has one ErtsPollThread each. The ps field + can point to either a private ErtsPollSet or a shared one. + At the moment only kqueue and epoll pollsets can be + shared across threads. +*/ +typedef struct erts_poll_thread { - ErtsPollSet ps; - erts_atomic_t in_poll_wait; /* set while doing poll */ - erts_atomic_t check_io_time; - struct { - int six; /* start index */ - int eix; /* end index */ - erts_atomic32_t no; - int size; - ErtsSysFdType *array; - } active_fd; - erts_atomic_t removed_list; /* struct removed_fd* */ -}*pollsetv; - -#define NUM_OF_POLLSETS 1 + ErtsPollSet *ps; + ErtsPollResFd *pollres; + int pollres_len; +} ErtsPollThread; -#ifdef ERTS_ENABLE_KERNEL_POLL -void erts_init_check_io_kp(void); -void erts_init_check_io_nkp(void); -int ERTS_CIO_EXPORT(driver_select)(ErlDrvPort, ErlDrvEvent, int, int); -int ERTS_CIO_EXPORT(enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, const ErlNifPid*, Eterm); -Uint ERTS_CIO_EXPORT(erts_check_io_size)(void); -Eterm ERTS_CIO_EXPORT(erts_check_io_info)(void *); -int ERTS_CIO_EXPORT(erts_check_io_max_files)(void); -void ERTS_CIO_EXPORT(erts_check_io_interrupt)(struct pollset_info*, int); -void ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(struct pollset_info*, int, ErtsMonotonicTime); -void ERTS_CIO_EXPORT(erts_check_io)(int); -struct pollset_info* ERTS_CIO_EXPORT(erts_get_pollset)(int); -int ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *); -void ERTS_CIO_EXPORT(erts_io_notify_port_task_executed)(ErtsPortTaskHandle *pthp); -#ifdef ERTS_ENABLE_LOCK_COUNT -void ERTS_CIO_EXPORT(erts_lcnt_update_cio_locks)(int enable); -#endif +/* pollsetv contains pointers to the ErtsPollSets that are in use. + * Which pollset to use is determined by hashing the fd. + */ +static ErtsPollSet **pollsetv; +#if ERTS_POLL_USE_FALLBACK +static ErtsPollSet *flbk_pollset; #endif - -/* ToDo: Was inline in erl_check_io.h but now need struct pollset_info */ -void -ERTS_CIO_EXPORT(erts_io_notify_port_task_executed)(ErtsPortTaskHandle *pthp) -{ - ErtsIoTask *itp = ErtsContainerStruct(pthp, ErtsIoTask, task); - erts_aint_t ci_time = erts_atomic_read_acqb(&itp->pollset->check_io_time); - erts_atomic_set_relb(&itp->executed_time, ci_time); -} - - -struct pollset_info* ERTS_CIO_EXPORT(erts_get_pollset)(int sched_nr) -{ - ASSERT(sched_nr > 0 && sched_nr <= erts_no_schedulers); - return &pollsetv[sched_nr - 1]; -} +static ErtsPollThread *psiv; typedef struct { #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS SafeHashBucket hb; #endif ErtsSysFdType fd; - struct pollset_info *pollset; struct { ErtsDrvSelectDataState *select; /* ERTS_EV_TYPE_DRV_SEL */ ErtsNifSelectDataState *nif; /* ERTS_EV_TYPE_NIF */ union { erts_driver_t* drv_ptr; /* ERTS_EV_TYPE_STOP_USE */ ErtsResource* resource; /* ERTS_EV_TYPE_STOP_NIF */ - }stop; + } stop; } driver; - ErtsPollEvents events; - unsigned short remove_cnt; /* number of removed_fd's referring to this fd */ + ErtsPollEvents events; /* The events that have been selected upon */ + ErtsPollEvents active_events; /* The events currently active in the pollset */ EventStateType type; EventStateFlags flags; } ErtsDrvEventState; -struct removed_fd { - struct removed_fd *next; -#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - ErtsSysFdType fd; -#else - ErtsDrvEventState* state; - #ifdef DEBUG - ErtsSysFdType fd; - #endif -#endif - -}; - struct drv_ev_state_shared { - /* The layout of this struct must be independent of kp/nkp compilation */ -#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - int max_fds; -#endif - -#define DRV_EV_STATE_LOCK_CNT 128 union { erts_mtx_t lck; byte _cache_line_alignment[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_mtx_t))]; - }locks[DRV_EV_STATE_LOCK_CNT]; + } locks[ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT]; #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS + int max_fds; erts_atomic_t len; ErtsDrvEventState *v; erts_mtx_t grow_lock; /* prevent lock-hogging of racing growers */ @@ -200,51 +156,121 @@ struct drv_ev_state_shared { #endif }; -#ifndef ERTS_KERNEL_POLL_VERSION +int ERTS_WRITE_UNLIKELY(erts_no_pollsets) = 1; +int ERTS_WRITE_UNLIKELY(erts_no_poll_threads) = 1; struct drv_ev_state_shared drv_ev_state; -#else -extern struct drv_ev_state_shared drv_ev_state; -#endif -static ERTS_INLINE erts_mtx_t* fd_mtx(ErtsSysFdType fd) -{ +static ERTS_INLINE int fd_hash(ErtsSysFdType fd) { int hash = (int)fd; # ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS hash ^= (hash >> 9); # endif - return &drv_ev_state.locks[hash % DRV_EV_STATE_LOCK_CNT].lck; + return hash; } -#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS +static ERTS_INLINE erts_mtx_t* fd_mtx(ErtsSysFdType fd) +{ + return &drv_ev_state.locks[fd_hash(fd) % ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT].lck; +} + +#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS + +static ERTS_INLINE ErtsDrvEventState *get_drv_ev_state(ErtsSysFdType fd) +{ + return &drv_ev_state.v[(int) fd]; +} + +#define new_drv_ev_state(State, fd) (State) +#define erase_drv_ev_state(State) + +static ERTS_INLINE int grow_drv_ev_state(ErtsSysFdType fd) { + int i; + int old_len; + int new_len; + + if ((unsigned)fd >= (unsigned)erts_atomic_read_nob(&drv_ev_state.len)) { + + if (fd < 0 || fd >= drv_ev_state.max_fds) + return 0; -static ERTS_INLINE ErtsDrvEventState *hash_get_drv_ev_state(ErtsSysFdType fd) + erts_mtx_lock(&drv_ev_state.grow_lock); + old_len = erts_atomic_read_nob(&drv_ev_state.len); + if (fd >= old_len) { + new_len = erts_poll_new_table_len(old_len, fd + 1); + if (new_len > drv_ev_state.max_fds) + new_len = drv_ev_state.max_fds; + + for (i=0; i<ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT; i++) { /* lock all fd's */ + erts_mtx_lock(&drv_ev_state.locks[i].lck); + } + drv_ev_state.v = (drv_ev_state.v + ? erts_realloc(ERTS_ALC_T_DRV_EV_STATE, + drv_ev_state.v, + sizeof(ErtsDrvEventState)*new_len) + : erts_alloc(ERTS_ALC_T_DRV_EV_STATE, + sizeof(ErtsDrvEventState)*new_len)); + ERTS_CT_ASSERT(ERTS_EV_TYPE_NONE == 0); + sys_memzero(drv_ev_state.v+old_len, + sizeof(ErtsDrvEventState) * (new_len - old_len)); + for (i = old_len; i < new_len; i++) { + drv_ev_state.v[i].fd = (ErtsSysFdType) i; + } + erts_atomic_set_nob(&drv_ev_state.len, new_len); + for (i=0; i<ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT; i++) { + erts_mtx_unlock(&drv_ev_state.locks[i].lck); + } + } + /*else already grown by racing thread */ + + erts_mtx_unlock(&drv_ev_state.grow_lock); + } + return 1; +} + +static int drv_ev_state_len(void) +{ + return erts_atomic_read_nob(&drv_ev_state.len); +} + +#else /* !ERTS_SYS_CONTINOUS_FD_NUMBERS */ + +static ERTS_INLINE ErtsDrvEventState *get_drv_ev_state(ErtsSysFdType fd) { ErtsDrvEventState tmpl; tmpl.fd = fd; return (ErtsDrvEventState *) safe_hash_get(&drv_ev_state.tab, (void *) &tmpl); } -static ERTS_INLINE ErtsDrvEventState* hash_new_drv_ev_state(ErtsSysFdType fd) +static ERTS_INLINE ErtsDrvEventState* new_drv_ev_state(ErtsDrvEventState *state, + ErtsSysFdType fd) { ErtsDrvEventState tmpl; + + if (state) + return state; + tmpl.fd = fd; - tmpl.pollset = NULL; tmpl.driver.select = NULL; tmpl.driver.nif = NULL; tmpl.driver.stop.drv_ptr = NULL; tmpl.events = 0; - tmpl.remove_cnt = 0; + tmpl.active_events = 0; tmpl.type = ERTS_EV_TYPE_NONE; tmpl.flags = 0; + return (ErtsDrvEventState *) safe_hash_put(&drv_ev_state.tab, (void *) &tmpl); } -static ERTS_INLINE void hash_erase_drv_ev_state(ErtsDrvEventState *state) +static ERTS_INLINE void erase_drv_ev_state(ErtsDrvEventState *state) { - ASSERT(state->remove_cnt == 0); safe_hash_erase(&drv_ev_state.tab, (void *) state); } +static int drv_ev_state_len(void) +{ + return erts_atomic_read_nob(&drv_ev_state.tab.nitems); +} + #endif /* !ERTS_SYS_CONTINOUS_FD_NUMBERS */ static void stale_drv_select(Eterm id, ErtsDrvEventState *state, int mode); @@ -268,36 +294,39 @@ steal_pending_stop_use(erts_dsprintf_buf_t*, ErlDrvPort, ErtsDrvEventState*, static void steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource*, ErtsDrvEventState *state, int mode, int on); - -ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(removed_fd, struct removed_fd, 64, ERTS_ALC_T_FD_LIST) +static ERTS_INLINE void +check_fd_cleanup(ErtsDrvEventState *state, + ErtsDrvSelectDataState **free_select, + ErtsNifSelectDataState **free_nif); +#ifdef DEBUG_PRINT_MODE +static char *drvmode2str(int mode); +static char *nifmode2str(enum ErlNifSelectFlags mode); +#endif static ERTS_INLINE void -init_iotask(ErtsIoTask *io_task, struct pollset_info* psi) +init_iotask(ErtsIoTask *io_task, ErtsSysFdType fd) { erts_port_task_handle_init(&io_task->task); - erts_atomic_init_nob(&io_task->executed_time, ~((erts_aint_t) 0)); - io_task->pollset = psi; + io_task->fd = fd; } static ERTS_INLINE int -is_iotask_active(ErtsIoTask *io_task, erts_aint_t current_cio_time) -{ +is_iotask_active(ErtsIoTask *io_task) +{ if (erts_port_task_is_scheduled(&io_task->task)) return 1; - if (erts_atomic_read_nob(&io_task->executed_time) == current_cio_time) - return 1; return 0; } static ERTS_INLINE ErtsDrvSelectDataState * -alloc_drv_select_data(struct pollset_info* psi) +alloc_drv_select_data(ErtsSysFdType fd) { ErtsDrvSelectDataState *dsp = erts_alloc(ERTS_ALC_T_DRV_SEL_D_STATE, sizeof(ErtsDrvSelectDataState)); dsp->inport = NIL; dsp->outport = NIL; - init_iotask(&dsp->iniotask, psi); - init_iotask(&dsp->outiotask, psi); + init_iotask(&dsp->iniotask, fd); + init_iotask(&dsp->outiotask, fd); return dsp; } @@ -308,8 +337,6 @@ alloc_nif_select_data(void) sizeof(ErtsNifSelectDataState)); dsp->in.pid = NIL; dsp->out.pid = NIL; - dsp->in.ddeselect_cnt = 0; - dsp->out.ddeselect_cnt = 0; return dsp; } @@ -327,176 +354,126 @@ free_nif_select_data(ErtsNifSelectDataState *dsp) erts_free(ERTS_ALC_T_NIF_SEL_D_STATE, dsp); } -static ERTS_INLINE void -remember_removed(ErtsDrvEventState *state) +static ERTS_INLINE int +get_pollset_id(ErtsSysFdType fd) { - struct removed_fd *fdlp; - ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd))); - if (erts_atomic_read_nob(&state->pollset->in_poll_wait)) { - erts_aint_t was_next, exp_next; - state->remove_cnt++; - ASSERT(state->remove_cnt > 0); - fdlp = removed_fd_alloc(); - #if defined(ERTS_SYS_CONTINOUS_FD_NUMBERS) || defined(DEBUG) - fdlp->fd = state->fd; - #endif - #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS - fdlp->state = state; - #endif - - /* Lockless atomic insertion in removed_list */ - was_next = erts_atomic_read_acqb(&state->pollset->removed_list); - do { - exp_next = was_next; - fdlp->next = (struct removed_fd*) exp_next; - was_next = erts_atomic_cmpxchg_mb(&state->pollset->removed_list, - (erts_aint_t) fdlp, - exp_next); - }while (was_next != exp_next); - } + return fd_hash(fd) % erts_no_pollsets; } +static ERTS_INLINE ErtsPollSet * +get_pollset(ErtsSysFdType fd) +{ + return pollsetv[get_pollset_id(fd)]; +} -static ERTS_INLINE int -is_removed(ErtsDrvEventState *state) +#if ERTS_POLL_USE_FALLBACK +static ERTS_INLINE ErtsPollSet * +get_fallback(void) { - /* Note that there is a possible race here, where an fd is removed - (increasing remove_cnt) and then added again just before erts_poll_wait - is called by erts_check_io. Any polled event on the re-added fd will then - be falsely ignored. But that does not matter, as the event will trigger - again next time erl_check_io is called. */ - return state->remove_cnt > 0; + return flbk_pollset; } +#endif -static void -forget_removed(struct pollset_info* psi) +/* + * Place a fd within a pollset. This will automatically use + * the fallback ps if needed. + */ +static ERTS_INLINE ErtsPollEvents +erts_io_control_wakeup(ErtsDrvEventState *state, ErtsPollOp op, + ErtsPollEvents pe, int *wake_poller) { - struct removed_fd* fdlp; - struct removed_fd* tofree; + ErtsSysFdType fd = state->fd; + ErtsPollEvents res = 0; + EventStateFlags flags = state->flags; - fdlp = (struct removed_fd*) erts_atomic_xchg_mb(&psi->removed_list, - (erts_aint_t) NULL); + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd))); - while (fdlp) { - ErtsResource* resource = NULL; - erts_driver_t* drv_ptr = NULL; - erts_mtx_t* mtx; - ErtsSysFdType fd; - ErtsDrvEventState *state; + if (!(flags & ERTS_EV_FLAG_FALLBACK)) { + res = erts_poll_control(get_pollset(fd), fd, op, pe, wake_poller); -#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - fd = fdlp->fd; - mtx = fd_mtx(fd); - erts_mtx_lock(mtx); - state = &drv_ev_state.v[(int) fd]; -#else - state = fdlp->state; - fd = state->fd; - ASSERT(fd == fdlp->fd); - mtx = fd_mtx(fd); - erts_mtx_lock(mtx); -#endif - ASSERT(state->remove_cnt > 0); - if (--state->remove_cnt == 0) { - switch (state->type) { - case ERTS_EV_TYPE_STOP_NIF: - /* Now we can call stop */ - resource = state->driver.stop.resource; - state->driver.stop.resource = NULL; - ASSERT(resource); - state->type = ERTS_EV_TYPE_NONE; - state->flags = 0; - goto case_ERTS_EV_TYPE_NONE; - - case ERTS_EV_TYPE_STOP_USE: - /* Now we can call stop_select */ - drv_ptr = state->driver.stop.drv_ptr; - ASSERT(drv_ptr); - state->type = ERTS_EV_TYPE_NONE; - state->flags = 0; - state->driver.stop.drv_ptr = NULL; - /* Fall through */ - case ERTS_EV_TYPE_NONE: - case_ERTS_EV_TYPE_NONE: - state->pollset = NULL; -#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS - hash_erase_drv_ev_state(state); -#endif - break; - case ERTS_EV_TYPE_DRV_SEL: - break; - default: - ASSERT(0); - } - } - erts_mtx_unlock(mtx); - if (drv_ptr) { - int was_unmasked = erts_block_fpe(); - DTRACE1(driver_stop_select, drv_ptr->name); - LTTNG1(driver_stop_select, drv_ptr->name); - (*drv_ptr->stop_select) ((ErlDrvEvent) fd, NULL); - erts_unblock_fpe(was_unmasked); - if (drv_ptr->handle) { - erts_ddll_dereference_driver(drv_ptr->handle); - } - } - if (resource) { - erts_resource_stop(resource, (ErlNifEvent)fd, 0); - enif_release_resource(resource->data); +#if ERTS_POLL_USE_FALLBACK + if (op == ERTS_POLL_OP_ADD && res == ERTS_POLL_EV_NVAL) { + /* When an add fails with NVAL, the poll/kevent operation could not + put that fd in the pollset, so we instead put it into a fallback pollset */ + state->flags |= ERTS_EV_FLAG_FALLBACK; + res = erts_poll_control_flbk(get_fallback(), fd, op, pe, wake_poller); } - - tofree = fdlp; - fdlp = fdlp->next; - removed_fd_free(tofree); + } else { + ASSERT(op != ERTS_POLL_OP_ADD); + res = erts_poll_control_flbk(get_fallback(), fd, op, pe, wake_poller); +#endif } + + return res; } -#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS -static void -grow_drv_ev_state(int min_ix) +static ERTS_INLINE ErtsPollEvents +erts_io_control(ErtsDrvEventState *state, ErtsPollOp op, ErtsPollEvents pe) { - int i; - int old_len; - int new_len; + int wake_poller = 0; + return erts_io_control_wakeup(state, op, pe, &wake_poller); +} - erts_mtx_lock(&drv_ev_state.grow_lock); - old_len = erts_atomic_read_nob(&drv_ev_state.len); - if (min_ix >= old_len) { - new_len = erts_poll_new_table_len(old_len, min_ix + 1); - if (new_len > drv_ev_state.max_fds) - new_len = drv_ev_state.max_fds; - - for (i=0; i<DRV_EV_STATE_LOCK_CNT; i++) { /* lock all fd's */ - erts_mtx_lock(&drv_ev_state.locks[i].lck); - } - drv_ev_state.v = (drv_ev_state.v - ? erts_realloc(ERTS_ALC_T_DRV_EV_STATE, - drv_ev_state.v, - sizeof(ErtsDrvEventState)*new_len) - : erts_alloc(ERTS_ALC_T_DRV_EV_STATE, - sizeof(ErtsDrvEventState)*new_len)); - for (i = old_len; i < new_len; i++) { - drv_ev_state.v[i].fd = (ErtsSysFdType) i; - drv_ev_state.v[i].pollset = NULL; - drv_ev_state.v[i].driver.select = NULL; - drv_ev_state.v[i].driver.stop.drv_ptr = NULL; - drv_ev_state.v[i].driver.nif = NULL; - drv_ev_state.v[i].events = 0; - drv_ev_state.v[i].remove_cnt = 0; - drv_ev_state.v[i].type = ERTS_EV_TYPE_NONE; - drv_ev_state.v[i].flags = 0; - } - erts_atomic_set_nob(&drv_ev_state.len, new_len); - for (i=0; i<DRV_EV_STATE_LOCK_CNT; i++) { - erts_mtx_unlock(&drv_ev_state.locks[i].lck); - } +/* ToDo: Was inline in erl_check_io.h but now need struct erts_poll_thread */ +void +erts_io_notify_port_task_executed(ErtsPortTaskType type, + ErtsPortTaskHandle *pthp, + void (*reset_handle)(ErtsPortTaskHandle *)) +{ + ErtsIoTask *itp = ErtsContainerStruct(pthp, ErtsIoTask, task); + ErtsSysFdType fd = itp->fd; + erts_mtx_t *mtx = fd_mtx(fd); + int active_events; + ErtsDrvEventState *state; + ErtsDrvSelectDataState *free_select = NULL; + ErtsNifSelectDataState *free_nif = NULL; + + erts_mtx_lock(mtx); + state = get_drv_ev_state(fd); + + active_events = state->active_events; + + switch (type) { + case ERTS_PORT_TASK_INPUT: + + DEBUG_PRINT_FD("executed ready_input", state); + + ASSERT(!(state->active_events & ERTS_POLL_EV_IN)); + if (state->events & ERTS_POLL_EV_IN) + active_events |= ERTS_POLL_EV_IN; + break; + case ERTS_PORT_TASK_OUTPUT: + + DEBUG_PRINT_FD("executed ready_output", state); + + ASSERT(!(state->active_events & ERTS_POLL_EV_OUT)); + if (state->events & ERTS_POLL_EV_OUT) + active_events |= ERTS_POLL_EV_OUT; + break; + default: + erts_exit(ERTS_ABORT_EXIT, "Invalid IO port task type"); + break; } - /*else already grown by racing thread */ - erts_mtx_unlock(&drv_ev_state.grow_lock); -} -#endif /* ERTS_SYS_CONTINOUS_FD_NUMBERS */ + reset_handle(pthp); + + if (active_events) { + /* This is not needed if active_events has not changed */ + if (state->active_events != active_events) { + state->active_events = active_events; + erts_io_control(state, ERTS_POLL_OP_MOD, active_events); + } + } else { + check_fd_cleanup(state, &free_select, &free_nif); + } + erts_mtx_unlock(mtx); + + if (free_select) + free_drv_select_data(free_select); + if (free_nif) + free_nif_select_data(free_nif); +} static ERTS_INLINE void abort_task(Eterm id, ErtsPortTaskHandle *pthp, EventStateType type) @@ -542,16 +519,14 @@ abort_tasks(ErtsDrvEventState *state, int mode) static void deselect(ErtsDrvEventState *state, int mode) { - int do_wake = 0; ErtsPollEvents rm_events; ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd))); - ASSERT(state->events); abort_tasks(state, mode); - if (!mode) + if (!mode) { rm_events = state->events; - else { + } else { rm_events = 0; ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL); if (mode & ERL_DRV_READ) { @@ -564,18 +539,16 @@ deselect(ErtsDrvEventState *state, int mode) } } - ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, rm_events, - 0, &do_wake); state->events &= ~rm_events; + state->active_events &= ~rm_events; if (!(state->events)) { + erts_io_control(state, ERTS_POLL_OP_DEL, 0); switch (state->type) { case ERTS_EV_TYPE_NIF: state->driver.nif->in.pid = NIL; state->driver.nif->out.pid = NIL; - state->driver.nif->in.ddeselect_cnt = 0; - state->driver.nif->out.ddeselect_cnt = 0; - enif_release_resource(state->driver.stop.resource); + enif_release_resource(state->driver.stop.resource->data); state->driver.stop.resource = NULL; break; case ERTS_EV_TYPE_DRV_SEL: @@ -588,15 +561,15 @@ deselect(ErtsDrvEventState *state, int mode) ASSERT(0); break; } - state->type = ERTS_EV_TYPE_NONE; state->flags = 0; - remember_removed(state); + } else { + erts_io_control(state, ERTS_POLL_OP_MOD, state->active_events); } } #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS -# define IS_FD_UNKNOWN(state) ((state)->type == ERTS_EV_TYPE_NONE && (state)->remove_cnt == 0) +# define IS_FD_UNKNOWN(state) ((state)->type == ERTS_EV_TYPE_NONE) #else # define IS_FD_UNKNOWN(state) ((state) == NULL) #endif @@ -606,17 +579,13 @@ check_fd_cleanup(ErtsDrvEventState *state, ErtsDrvSelectDataState **free_select, ErtsNifSelectDataState **free_nif) { - erts_aint_t current_cio_time; - ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd))); - - current_cio_time = erts_atomic_read_acqb(&state->pollset->check_io_time); *free_select = NULL; if (state->driver.select && (state->type != ERTS_EV_TYPE_DRV_SEL) - && !is_iotask_active(&state->driver.select->iniotask, current_cio_time) - && !is_iotask_active(&state->driver.select->outiotask, current_cio_time)) { - + && !is_iotask_active(&state->driver.select->iniotask) + && !is_iotask_active(&state->driver.select->outiotask)) { + *free_select = state->driver.select; state->driver.select = NULL; } @@ -628,14 +597,10 @@ check_fd_cleanup(ErtsDrvEventState *state, } if (((state->type != ERTS_EV_TYPE_NONE) - | state->remove_cnt | (state->driver.nif != NULL) | (state->driver.select != NULL)) == 0) { - state->pollset = NULL; -#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS - hash_erase_drv_ev_state(state); -#endif + erase_drv_ev_state(state); } } @@ -645,253 +610,8 @@ check_fd_cleanup(ErtsDrvEventState *state, # define MUST_DEFER(MAY_SLEEP) (MAY_SLEEP) #endif -static ERTS_INLINE int -check_cleanup_active_fd(struct pollset_info* psi, - ErtsSysFdType fd, -#if ERTS_CIO_DEFER_ACTIVE_EVENTS - ErtsPollControlEntry *pce, - int *pce_ix, -#endif - erts_aint_t current_cio_time, - int may_sleep) -{ - ErtsDrvEventState *state; - int active = 0; - erts_mtx_t *mtx = fd_mtx(fd); - void *free_select = NULL; - void *free_nif = NULL; -#if ERTS_CIO_DEFER_ACTIVE_EVENTS - ErtsPollEvents evon = 0, evoff = 0; -#endif - - erts_mtx_lock(mtx); - -#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - state = &drv_ev_state.v[(int) fd]; -#else - state = hash_get_drv_ev_state(fd); /* may be NULL! */ -#endif - if (state && state->pollset == psi) - { - if (state->driver.select) { -#if ERTS_CIO_DEFER_ACTIVE_EVENTS - if (is_iotask_active(&state->driver.select->iniotask, current_cio_time)) { - active = 1; - if (MUST_DEFER(may_sleep) - && (state->events & ERTS_POLL_EV_IN) - && !(state->flags & ERTS_EV_FLAG_DEFER_IN_EV)) { - evoff |= ERTS_POLL_EV_IN; - state->flags |= ERTS_EV_FLAG_DEFER_IN_EV; - } - } - else if (state->flags & ERTS_EV_FLAG_DEFER_IN_EV) { - if (state->events & ERTS_POLL_EV_IN) - evon |= ERTS_POLL_EV_IN; - state->flags &= ~ERTS_EV_FLAG_DEFER_IN_EV; - } - if (is_iotask_active(&state->driver.select->outiotask, current_cio_time)) { - active = 1; - if (MUST_DEFER(may_sleep) - && (state->events & ERTS_POLL_EV_OUT) - && !(state->flags & ERTS_EV_FLAG_DEFER_OUT_EV)) { - evoff |= ERTS_POLL_EV_OUT; - state->flags |= ERTS_EV_FLAG_DEFER_OUT_EV; - } - } - else if (state->flags & ERTS_EV_FLAG_DEFER_OUT_EV) { - if (state->events & ERTS_POLL_EV_OUT) { - evon |= ERTS_POLL_EV_OUT; - } - state->flags &= ~ERTS_EV_FLAG_DEFER_OUT_EV; - } - if (active) - (void) 0; - else -#else - if (is_iotask_active(&state->driver.select->iniotask, current_cio_time) - || is_iotask_active(&state->driver.select->outiotask, current_cio_time)) - active = 1; - else -#endif - if (state->type != ERTS_EV_TYPE_DRV_SEL) { - free_select = state->driver.select; - state->driver.select = NULL; - } -#if ERTS_CIO_DEFER_ACTIVE_EVENTS - if (evon) { - int do_wake = 0; - ERTS_CIO_POLL_CTL(psi->ps, state->fd, evon, 1, &do_wake); - } -#endif - } - - if (state->driver.nif) { - ErtsPollEvents rm_events = 0; - if (state->driver.nif->in.ddeselect_cnt) { - ASSERT(state->type == ERTS_EV_TYPE_NIF); - ASSERT(state->events & ERTS_POLL_EV_IN); - ASSERT(is_nil(state->driver.nif->in.pid)); - if (may_sleep || state->driver.nif->in.ddeselect_cnt == 1) { - rm_events = ERTS_POLL_EV_IN; - state->driver.nif->in.ddeselect_cnt = 0; - } - } - if (state->driver.nif->out.ddeselect_cnt) { - ASSERT(state->type == ERTS_EV_TYPE_NIF); - ASSERT(state->events & ERTS_POLL_EV_OUT); - ASSERT(is_nil(state->driver.nif->out.pid)); - if (may_sleep || state->driver.nif->out.ddeselect_cnt == 1) { - rm_events |= ERTS_POLL_EV_OUT; - state->driver.nif->out.ddeselect_cnt = 0; - } - } - if (rm_events) { - int do_wake = 0; - state->events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, - rm_events, 0, &do_wake); - } - if (state->events) - active = 1; - else if (state->type != ERTS_EV_TYPE_NIF) { - free_nif = state->driver.nif; - state->driver.nif = NULL; - } - } - -#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS - if (((state->type != ERTS_EV_TYPE_NONE) | state->remove_cnt | active) == 0) - hash_erase_drv_ev_state(state); -#endif - } - - erts_mtx_unlock(mtx); - - if (free_select) - free_drv_select_data(free_select); - if (free_nif) - free_nif_select_data(free_nif); - -#if ERTS_CIO_DEFER_ACTIVE_EVENTS - if (evoff) { - ErtsPollControlEntry *pcep = &pce[(*pce_ix)++]; - pcep->fd = fd; - pcep->events = evoff; - pcep->on = 0; - } -#endif - - return active; -} - -static void -check_cleanup_active_fds(struct pollset_info* psi, - erts_aint_t current_cio_time, int may_sleep) -{ - int six = psi->active_fd.six; - int eix = psi->active_fd.eix; - erts_aint32_t no = erts_atomic32_read_dirty(&psi->active_fd.no); - const int size = psi->active_fd.size; - int ix = six; -#if ERTS_CIO_DEFER_ACTIVE_EVENTS - /* every fd might add one entry */ - Uint pce_sz = sizeof(ErtsPollControlEntry)*no; - ErtsPollControlEntry *pctrl_entries = (pce_sz - ? erts_alloc(ERTS_ALC_T_TMP, pce_sz) - : NULL); - int pctrl_ix = 0; -#endif - - while (ix != eix) { - ErtsSysFdType fd = psi->active_fd.array[ix]; - int nix = ix + 1; - if (nix >= size) - nix = 0; - ASSERT(fd != ERTS_SYS_FD_INVALID); - if (!check_cleanup_active_fd(psi, fd, -#if ERTS_CIO_DEFER_ACTIVE_EVENTS - pctrl_entries, - &pctrl_ix, -#endif - current_cio_time, - may_sleep)) { - no--; - if (ix == six) { -#ifdef DEBUG - psi->active_fd.array[ix] = ERTS_SYS_FD_INVALID; -#endif - six = nix; - } - else { - psi->active_fd.array[ix] = psi->active_fd.array[six]; -#ifdef DEBUG - psi->active_fd.array[six] = ERTS_SYS_FD_INVALID; -#endif - six++; - if (six >= size) - six = 0; - } - } - ix = nix; - } - -#if ERTS_CIO_DEFER_ACTIVE_EVENTS - ASSERT(pctrl_ix <= pce_sz/sizeof(ErtsPollControlEntry)); - if (pctrl_ix) - ERTS_CIO_POLL_CTLV(psi->ps, pctrl_entries, pctrl_ix); - if (pctrl_entries) - erts_free(ERTS_ALC_T_TMP, pctrl_entries); -#endif - - psi->active_fd.six = six; - psi->active_fd.eix = eix; - erts_atomic32_set_relb(&psi->active_fd.no, no); -} - -static void grow_active_fds(struct pollset_info *psi) -{ - ASSERT(psi->active_fd.six == psi->active_fd.eix); - psi->active_fd.six = 0; - psi->active_fd.eix = psi->active_fd.size; - psi->active_fd.size += ERTS_ACTIVE_FD_INC; - psi->active_fd.array = erts_realloc(ERTS_ALC_T_ACTIVE_FD_ARR, - psi->active_fd.array, - psi->active_fd.size*sizeof(ErtsSysFdType)); -#ifdef DEBUG - { - int i; - for (i = psi->active_fd.eix + 1; i < psi->active_fd.size; i++) - psi->active_fd.array[i] = ERTS_SYS_FD_INVALID; - } -#endif -} - -static ERTS_INLINE void -add_active_fd(struct pollset_info *psi, ErtsSysFdType fd) -{ - int eix = psi->active_fd.eix; - const int size = psi->active_fd.size; - - psi->active_fd.array[eix] = fd; - - erts_atomic32_set_relb(&psi->active_fd.no, - (erts_atomic32_read_dirty(&psi->active_fd.no) - + 1)); - - eix++; - if (eix >= size) - eix = 0; - psi->active_fd.eix = eix; - - if (psi->active_fd.six == eix) { - grow_active_fds(psi); - } -} - int -ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, - ErlDrvEvent e, - int mode, - int on) +driver_select(ErlDrvPort ix, ErlDrvEvent e, int mode, int on) { void (*stop_select_fn)(ErlDrvEvent, void*) = NULL; Port *prt = erts_drvport2port(ix); @@ -899,6 +619,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, ErtsSysFdType fd = (ErtsSysFdType) e; ErtsPollEvents ctl_events = (ErtsPollEvents) 0; ErtsPollEvents old_events; + ErtsPollOp ctl_op = ERTS_POLL_OP_MOD; ErtsDrvEventState *state; int wake_poller = 0; int ret; @@ -907,32 +628,29 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, #ifdef USE_VM_PROBES DTRACE_CHARBUF(name, 64); #endif + ERTS_MSACC_PUSH_AND_SET_STATE(ERTS_MSACC_STATE_CHECK_IO); - if (prt == ERTS_INVALID_ERL_DRV_PORT) + if (prt == ERTS_INVALID_ERL_DRV_PORT) { + ERTS_MSACC_POP_STATE(); return -1; + } ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - if ((unsigned)fd >= (unsigned)erts_atomic_read_nob(&drv_ev_state.len)) { - if (fd < 0) { - return -1; - } - if (fd >= drv_ev_state.max_fds) { - drv_select_large_fd_error(ix, fd, mode, on); - return -1; - } - grow_drv_ev_state(fd); + if (!grow_drv_ev_state(fd)) { + if (fd > 0) drv_select_large_fd_error(ix, fd, mode, on); + ERTS_MSACC_POP_STATE(); + return -1; } #endif erts_mtx_lock(fd_mtx(fd)); -#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - state = &drv_ev_state.v[(int) fd]; -#else - state = hash_get_drv_ev_state(fd); /* may be NULL! */ -#endif + state = get_drv_ev_state(fd); /* may be NULL! */ + + DEBUG_PRINT_FD("driver_select(%T, %p, %s, %d)", + state, id, fd, drvmode2str(mode), on); if (!on) { if (IS_FD_UNKNOWN(state)) { @@ -950,15 +668,10 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, } else if ((mode&ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) { mode |= (ERL_DRV_READ | ERL_DRV_WRITE); - wake_poller = 1; /* to eject fd from pollset (if needed) */ } } -#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS - if (state == NULL) { - state = hash_new_drv_ev_state(fd); - } -#endif + state = new_drv_ev_state(state, fd); switch (state->type) { case ERTS_EV_TYPE_NIF: @@ -982,7 +695,9 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, ASSERT(state->type == ERTS_EV_TYPE_NONE); break; - }} + } + default: break; + } if (mode & ERL_DRV_READ) { if (state->type == ERTS_EV_TYPE_DRV_SEL) { @@ -999,7 +714,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, drv_select_steal(ix, state, mode, on); } ctl_events |= ERTS_POLL_EV_OUT; - } + } ASSERT((state->type == ERTS_EV_TYPE_DRV_SEL) || @@ -1010,32 +725,31 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, if (on) { ctl_events &= ~old_events; state->events |= ctl_events; + if (ctl_events & ERTS_POLL_EV_IN && (!state->driver.select || !is_iotask_active(&state->driver.select->iniotask))) + state->active_events |= ERTS_POLL_EV_IN; + if (ctl_events & ERTS_POLL_EV_OUT && (!state->driver.select || !is_iotask_active(&state->driver.select->outiotask))) + state->active_events |= ERTS_POLL_EV_OUT; + if (old_events == 0 && !(state->flags & ERTS_EV_FLAG_USED)) { + ctl_op = ERTS_POLL_OP_ADD; + } } else { ctl_events &= old_events; state->events &= ~ctl_events; + state->active_events &= ~ctl_events; - if (!(state->flags & ERTS_EV_FLAG_USED) - && old_events && !state->events) { - /* - * Old driver removing all events. At least wake poller. - * It will not make close() 100% safe but it will prevent - * actions delayed by poll timeout. - */ - wake_poller = 1; + if (!state->events) { + if (!(state->flags & ERTS_EV_FLAG_USED) || mode & ERL_DRV_USE) + ctl_op = ERTS_POLL_OP_DEL; } } - if (ctl_events) { + if (ctl_events || ctl_op == ERTS_POLL_OP_DEL) { ErtsPollEvents new_events; - if (!state->pollset) { - ErtsSchedulerData* esdp = erts_get_scheduler_data(); - ASSERT(esdp); - state->pollset = esdp->pollset; - } - - new_events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, ctl_events, on, &wake_poller); + new_events = erts_io_control_wakeup(state, ctl_op, + state->active_events, + &wake_poller); if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { if (state->type == ERTS_EV_TYPE_DRV_SEL && !old_events) { @@ -1048,14 +762,13 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, goto done; } - ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL - || state->type == ERTS_EV_TYPE_NONE); + ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL || state->type == ERTS_EV_TYPE_NONE); } if (on) { if (ctl_events) { if (!state->driver.select) - state->driver.select = alloc_drv_select_data(state->pollset); + state->driver.select = alloc_drv_select_data(state->fd); if (state->type == ERTS_EV_TYPE_NONE) state->type = ERTS_EV_TYPE_DRV_SEL; ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL); @@ -1069,47 +782,44 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, } } else { /* off */ - if (state->type == ERTS_EV_TYPE_DRV_SEL) { - if (ctl_events & ERTS_POLL_EV_IN) { - abort_tasks(state, ERL_DRV_READ); - state->driver.select->inport = NIL; - } - if (ctl_events & ERTS_POLL_EV_OUT) { - abort_tasks(state, ERL_DRV_WRITE); - state->driver.select->outport = NIL; - } - if (state->events == 0) { - if (old_events != 0) { - remember_removed(state); - } - if ((mode & ERL_DRV_USE) || !(state->flags & ERTS_EV_FLAG_USED)) { - state->type = ERTS_EV_TYPE_NONE; - state->flags = 0; - } - /*else keep it, as fd will probably be selected upon again */ - } - } - if ((mode & ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) { - erts_driver_t* drv_ptr = prt->drv_ptr; - ASSERT(state->events==0); - if (state->remove_cnt == 0 || !wake_poller) { - /* Safe to close fd now as it is not in pollset - or there was no need to eject fd (kernel poll) */ - stop_select_fn = drv_ptr->stop_select; + if (state->type == ERTS_EV_TYPE_DRV_SEL) { + if (ctl_events & ERTS_POLL_EV_IN) { + abort_tasks(state, ERL_DRV_READ); + state->driver.select->inport = NIL; + } + if (ctl_events & ERTS_POLL_EV_OUT) { + abort_tasks(state, ERL_DRV_WRITE); + state->driver.select->outport = NIL; + } + if (state->events == 0) { + if ((mode & ERL_DRV_USE) || !(state->flags & ERTS_EV_FLAG_USED)) { + state->type = ERTS_EV_TYPE_NONE; + state->flags = 0; + } + /*else keep it, as fd will probably be selected upon again */ + } + } + if ((mode & ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) { + erts_driver_t* drv_ptr = prt->drv_ptr; + ASSERT(state->events==0); + if (!wake_poller) { + /* Safe to close fd now as it is not in pollset + or there was no need to eject fd (kernel poll) */ + stop_select_fn = drv_ptr->stop_select; #ifdef USE_VM_PROBES - strncpy(name, prt->drv_ptr->name, sizeof(name)-1); - name[sizeof(name)-1] = '\0'; + strncpy(name, prt->drv_ptr->name, sizeof(name)-1); + name[sizeof(name)-1] = '\0'; #endif - } - else { - /* Not safe to close fd, postpone stop_select callback. */ - state->type = ERTS_EV_TYPE_STOP_USE; - state->driver.stop.drv_ptr = drv_ptr; - if (drv_ptr->handle) { - erts_ddll_reference_referenced_driver(drv_ptr->handle); - } - } - } + } + else { + /* Not safe to close fd, postpone stop_select callback. */ + state->type = ERTS_EV_TYPE_STOP_USE; + state->driver.stop.drv_ptr = drv_ptr; + if (drv_ptr->handle) { + erts_ddll_reference_referenced_driver(drv_ptr->handle); + } + } + } } ret = 0; @@ -1134,54 +844,47 @@ done_unknown: if (free_nif) free_nif_select_data(free_nif); + ERTS_MSACC_POP_STATE(); + return ret; } int -ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, - ErlNifEvent e, - enum ErlNifSelectFlags mode, - void* obj, - const ErlNifPid* pid, - Eterm ref) +enif_select(ErlNifEnv* env, + ErlNifEvent e, + enum ErlNifSelectFlags mode, + void* obj, + const ErlNifPid* pid, + Eterm ref) { int on; ErtsResource* resource = DATA_TO_RESOURCE(obj); ErtsSysFdType fd = (ErtsSysFdType) e; ErtsPollEvents ctl_events = (ErtsPollEvents) 0; ErtsPollEvents old_events; + ErtsPollOp ctl_op = ERTS_POLL_OP_MOD; ErtsDrvEventState *state; - int wake_poller; - int ret; + int ret, wake_poller = 0; enum { NO_STOP=0, CALL_STOP, CALL_STOP_AND_RELEASE } call_stop = NO_STOP; ErtsDrvSelectDataState *free_select = NULL; ErtsNifSelectDataState *free_nif = NULL; -#ifdef USE_VM_PROBES - DTRACE_CHARBUF(name, 64); -#endif ASSERT(!(resource->monitors && resource->monitors->is_dying)); #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - if ((unsigned)fd >= (unsigned)erts_atomic_read_nob(&drv_ev_state.len)) { - if (fd < 0) { - return INT_MIN | ERL_NIF_SELECT_INVALID_EVENT; - } - if (fd >= drv_ev_state.max_fds) { - nif_select_large_fd_error(fd, mode, resource, ref); - return INT_MIN | ERL_NIF_SELECT_INVALID_EVENT; - } - grow_drv_ev_state(fd); + if (!grow_drv_ev_state(fd)) { + if (fd > 0) nif_select_large_fd_error(fd, mode, resource, ref); + return INT_MIN | ERL_NIF_SELECT_INVALID_EVENT; } #endif erts_mtx_lock(fd_mtx(fd)); -#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - state = &drv_ev_state.v[(int) fd]; -#else - state = hash_get_drv_ev_state(fd); /* may be NULL! */ -#endif + state = get_drv_ev_state(fd); /* may be NULL! */ + + DEBUG_PRINT_FD("enif_select(%T, %d, %s, %p, %T, %T)", + state, env->proc->common.id, fd, nifmode2str(mode), resource, + pid ? pid->pid : THE_NON_VALUE, ref); if (mode & ERL_NIF_SELECT_STOP) { ASSERT(resource->type->stop); @@ -1193,13 +896,12 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, } on = 0; mode = ERL_DRV_READ | ERL_DRV_WRITE | ERL_DRV_USE; - wake_poller = 1; /* to eject fd from pollset (if needed) */ ctl_events = ERTS_POLL_EV_IN | ERTS_POLL_EV_OUT; + ctl_op = ERTS_POLL_OP_DEL; } else { on = 1; ASSERT(mode); - wake_poller = 0; if (mode & ERL_DRV_READ) { ctl_events |= ERTS_POLL_EV_IN; } @@ -1208,11 +910,7 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, } } -#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS - if (state == NULL) { - state = hash_new_drv_ev_state(fd); - } -#endif + state = new_drv_ev_state(state,fd); switch (state->type) { case ERTS_EV_TYPE_NIF: @@ -1243,7 +941,9 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, } ASSERT(state->type == ERTS_EV_TYPE_NONE); break; - }} + } + default: break; + } ASSERT((state->type == ERTS_EV_TYPE_NIF) || (state->type == ERTS_EV_TYPE_NONE && !state->events)); @@ -1253,20 +953,22 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, if (on) { ctl_events &= ~old_events; state->events |= ctl_events; + state->active_events |= ctl_events; + if (state->type == ERTS_EV_TYPE_NONE) + ctl_op = ERTS_POLL_OP_ADD; } else { ctl_events &= old_events; state->events &= ~ctl_events; + state->active_events &= ~ctl_events; } - if (ctl_events) { + if (ctl_events || ctl_op == ERTS_POLL_OP_DEL) { ErtsPollEvents new_events; - if (!state->pollset) { - state->pollset = erts_get_scheduler_data()->pollset; - } - - new_events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, ctl_events, on, &wake_poller); + new_events = erts_io_control_wakeup(state, ctl_op, + state->active_events, + &wake_poller); if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { if (state->type == ERTS_EV_TYPE_NIF && !old_events) { @@ -1274,8 +976,6 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, state->flags = 0; state->driver.nif->in.pid = NIL; state->driver.nif->out.pid = NIL; - state->driver.nif->in.ddeselect_cnt = 0; - state->driver.nif->out.ddeselect_cnt = 0; state->driver.stop.resource = NULL; } ret = INT_MIN | ERL_NIF_SELECT_FAILED; @@ -1307,11 +1007,9 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, ASSERT(is_internal_ref(ref)); refn = internal_ref_numbers(ref); state->driver.nif->in.immed = THE_NON_VALUE; - state->driver.nif->in.refn[0] = refn[0]; - state->driver.nif->in.refn[1] = refn[1]; - state->driver.nif->in.refn[2] = refn[2]; + sys_memcpy(state->driver.nif->in.refn, refn, + sizeof(state->driver.nif->in.refn)); } - state->driver.nif->in.ddeselect_cnt = 0; } if (mode & ERL_DRV_WRITE) { state->driver.nif->out.pid = recipient; @@ -1321,11 +1019,9 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, ASSERT(is_internal_ref(ref)); refn = internal_ref_numbers(ref); state->driver.nif->out.immed = THE_NON_VALUE; - state->driver.nif->out.refn[0] = refn[0]; - state->driver.nif->out.refn[1] = refn[1]; - state->driver.nif->out.refn[2] = refn[2]; + sys_memcpy(state->driver.nif->out.refn, refn, + sizeof(state->driver.nif->out.refn)); } - state->driver.nif->out.ddeselect_cnt = 0; } ret = 0; } @@ -1333,14 +1029,9 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, if (state->type == ERTS_EV_TYPE_NIF) { state->driver.nif->in.pid = NIL; state->driver.nif->out.pid = NIL; - state->driver.nif->in.ddeselect_cnt = 0; - state->driver.nif->out.ddeselect_cnt = 0; - if (old_events != 0) { - remember_removed(state); - } } ASSERT(state->events==0); - if (state->remove_cnt == 0 || !wake_poller) { + if (!wake_poller) { /* * Safe to close fd now as it is not in pollset * or there was no need to eject fd (kernel poll) @@ -1455,7 +1146,7 @@ print_driver_name(erts_dsprintf_buf_t *dsbufp, Eterm id) static void steal(erts_dsprintf_buf_t *dsbufp, ErtsDrvEventState *state, int mode) { - erts_dsprintf(dsbufp, "stealing control of fd=%d from ", (int) GET_FD(state->fd)); + erts_dsprintf(dsbufp, "stealing control of fd=%d from ", (int) state->fd); switch (state->type) { case ERTS_EV_TYPE_DRV_SEL: { int deselect_mode = 0; @@ -1479,7 +1170,7 @@ steal(erts_dsprintf_buf_t *dsbufp, ErtsDrvEventState *state, int mode) if (deselect_mode) deselect(state, deselect_mode); else { - erts_dsprintf(dsbufp, "no one", (int) GET_FD(state->fd)); + erts_dsprintf(dsbufp, "no one", (int) state->fd); ASSERT(0); } erts_dsprintf(dsbufp, "\n"); @@ -1510,7 +1201,7 @@ steal(erts_dsprintf_buf_t *dsbufp, ErtsDrvEventState *state, int mode) break; } default: - erts_dsprintf(dsbufp, "no one\n", (int) GET_FD(state->fd)); + erts_dsprintf(dsbufp, "no one\n", (int) state->fd); ASSERT(0); } } @@ -1524,7 +1215,7 @@ print_drv_select_op(erts_dsprintf_buf_t *dsbufp, "driver_select(%p, %d,%s%s%s%s, %d) " "by ", ix, - (int) GET_FD(fd), + (int) fd, mode & ERL_DRV_READ ? " ERL_DRV_READ" : "", mode & ERL_DRV_WRITE ? " ERL_DRV_WRITE" : "", mode & ERL_DRV_USE ? " ERL_DRV_USE" : "", @@ -1541,7 +1232,7 @@ print_nif_select_op(erts_dsprintf_buf_t *dsbufp, { erts_dsprintf(dsbufp, "enif_select(_, %d,%s%s%s, %T:%T, %T) ", - (int) GET_FD(fd), + (int) fd, mode & ERL_NIF_SELECT_READ ? " READ" : "", mode & ERL_NIF_SELECT_WRITE ? " WRITE" : "", mode & ERL_NIF_SELECT_STOP ? " STOP" : "", @@ -1673,7 +1364,7 @@ steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource* resource, erts_dsprintf(dsbufp, "called before stop was called for NIF resource %T:%T\n", rt->module, rt->name); - enif_release_resource(state->driver.stop.resource); + enif_release_resource(state->driver.stop.resource->data); state->type = ERTS_EV_TYPE_NONE; state->flags = 0; state->driver.stop.resource = NULL; @@ -1687,8 +1378,7 @@ steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource* resource, static ERTS_INLINE int io_task_schedule_allowed(ErtsDrvEventState *state, - ErtsPortTaskType type, - erts_aint_t current_cio_time) + ErtsPortTaskType type) { ErtsIoTask *io_task; @@ -1708,42 +1398,41 @@ io_task_schedule_allowed(ErtsDrvEventState *state, return 0; } - return !is_iotask_active(io_task, current_cio_time); + return !is_iotask_active(io_task); } static ERTS_INLINE void -iready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time) +iready(Eterm id, ErtsDrvEventState *state) { if (io_task_schedule_allowed(state, - ERTS_PORT_TASK_INPUT, - current_cio_time)) { + ERTS_PORT_TASK_INPUT)) { ErtsIoTask *iotask = &state->driver.select->iniotask; - erts_atomic_set_nob(&iotask->executed_time, current_cio_time); if (erts_port_task_schedule(id, &iotask->task, ERTS_PORT_TASK_INPUT, (ErlDrvEvent) state->fd) != 0) { stale_drv_select(id, state, ERL_DRV_READ); - } - add_active_fd(state->pollset, state->fd); + } else { + DEBUG_PRINT_FD("schedule ready_input(%T, %d)", + state, id, state->fd); + } } } static ERTS_INLINE void -oready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time) +oready(Eterm id, ErtsDrvEventState *state) { if (io_task_schedule_allowed(state, - ERTS_PORT_TASK_OUTPUT, - current_cio_time)) { + ERTS_PORT_TASK_OUTPUT)) { ErtsIoTask *iotask = &state->driver.select->outiotask; - erts_atomic_set_nob(&iotask->executed_time, current_cio_time); if (erts_port_task_schedule(id, &iotask->task, ERTS_PORT_TASK_OUTPUT, (ErlDrvEvent) state->fd) != 0) { stale_drv_select(id, state, ERL_DRV_WRITE); - } - add_active_fd(state->pollset, state->fd); + } else { + DEBUG_PRINT_FD("schedule ready_output(%T, %d)", state, id, state->fd); + } } } @@ -1798,91 +1487,56 @@ send_event_tuple(struct erts_nif_select_event* e, ErtsResource* resource, static void bad_fd_in_pollset(ErtsDrvEventState *, Eterm inport, Eterm outport); void -ERTS_CIO_EXPORT(erts_check_io_interrupt)(struct pollset_info *psi, int set) +erts_check_io_interrupt(ErtsPollThread *psi, int set) { - ERTS_CIO_POLL_INTR(psi->ps, set); + if (psi) { +#if ERTS_POLL_USE_FALLBACK + if (psi->ps == get_fallback()) { + erts_poll_interrupt_flbk(psi->ps, set); + return; + } +#endif + erts_poll_interrupt(psi->ps, set); + } } -void -ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(struct pollset_info *psi, - int set, - ErtsMonotonicTime timeout_time) -{ - ERTS_CIO_POLL_INTR_TMD(psi->ps, set, timeout_time); +ErtsPollThread * +erts_create_pollset_thread(int id) { + return psiv+id; } -#ifndef __WIN32__ -/* - * Number of ignored events, for a lingering fd added by enif_select(), - * until we deselect fd-event from pollset. - */ -# define ERTS_NIF_DELAYED_DESELECT 20 -#else -/* Disable delayed deselect as pollset cannot handle active events */ -# define ERTS_NIF_DELAYED_DESELECT 1 -#endif - void -ERTS_CIO_EXPORT(erts_check_io)(int do_wait) +erts_check_io(ErtsPollThread *psi) { - ErtsPollResFd *pollres; int pollres_len; - ErtsMonotonicTime timeout_time; int poll_ret, i; - erts_aint_t current_cio_time; - ErtsSchedulerData *esdp = erts_get_scheduler_data(); - struct pollset_info *psi = esdp->pollset; + ERTS_MSACC_PUSH_AND_SET_STATE(ERTS_MSACC_STATE_CHECK_IO); restart: -#ifdef ERTS_BREAK_REQUESTED - if (ERTS_BREAK_REQUESTED) - erts_do_break_handling(); -#endif - - /* Figure out timeout value */ - timeout_time = (do_wait - ? erts_check_next_timeout_time(esdp) - : ERTS_POLL_NO_TIMEOUT /* poll only */); - - /* - * No need for an atomic inc op when incrementing - * erts_check_io_time, since only one thread can - * check io at a time. - */ - current_cio_time = erts_atomic_read_dirty(&psi->check_io_time); - current_cio_time++; - erts_atomic_set_relb(&psi->check_io_time, current_cio_time); - - check_cleanup_active_fds(psi, - current_cio_time, - timeout_time != ERTS_POLL_NO_TIMEOUT); - #ifdef ERTS_ENABLE_LOCK_CHECK erts_lc_check_exact(NULL, 0); /* No locks should be locked */ #endif - pollres_len = erts_atomic32_read_dirty(&psi->active_fd.no) + ERTS_CHECK_IO_POLL_RES_LEN; + pollres_len = psi->pollres_len; - pollres = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsPollResFd)*pollres_len); +#if ERTS_POLL_USE_FALLBACK + if (psi->ps == get_fallback()) { - erts_atomic_set_nob(&psi->in_poll_wait, 1); + poll_ret = erts_poll_wait_flbk(psi->ps, psi->pollres, &pollres_len); - poll_ret = ERTS_CIO_POLL_WAIT(psi->ps, pollres, &pollres_len, timeout_time); + } else +#endif + { + poll_ret = erts_poll_wait(psi->ps, psi->pollres, &pollres_len); + } #ifdef ERTS_ENABLE_LOCK_CHECK erts_lc_check_exact(NULL, 0); /* No locks should be locked */ #endif -#ifdef ERTS_BREAK_REQUESTED - if (ERTS_BREAK_REQUESTED) - erts_do_break_handling(); -#endif - if (poll_ret != 0) { - erts_atomic_set_nob(&psi->in_poll_wait, 0); - forget_removed(psi); - erts_free(ERTS_ALC_T_TMP, pollres); + if (poll_ret == EAGAIN) { goto restart; } @@ -1898,64 +1552,78 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) erl_errno_id(poll_ret), poll_ret); erts_send_error_to_logger_nogl(dsbufp); } + ERTS_MSACC_POP_STATE(); return; } for (i = 0; i < pollres_len; i++) { - ErtsSysFdType fd = (ErtsSysFdType) pollres[i].fd; + erts_driver_t* drv_ptr = NULL; + ErtsResource* resource = NULL; + ErtsDrvSelectDataState *free_select = NULL; + ErtsNifSelectDataState *free_nif = NULL; + ErtsSysFdType fd = (ErtsSysFdType) ERTS_POLL_RES_GET_FD(&psi->pollres[i]); ErtsDrvEventState *state; + ErtsPollEvents revents; erts_mtx_lock(fd_mtx(fd)); + state = get_drv_ev_state(fd); -#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - state = &drv_ev_state.v[ (int) fd]; -#else - state = hash_get_drv_ev_state(fd); if (!state) { - goto next_pollres; + erts_mtx_unlock(fd_mtx(fd)); + continue; } -#endif - /* Skip this fd if it was removed from pollset */ - if (is_removed(state) || state->pollset != psi) { - goto next_pollres; - } + revents = ERTS_POLL_RES_GET_EVTS(&psi->pollres[i]); + + DEBUG_PRINT_FD("triggered %s", state, ev2str(revents)); + + if (revents & ERTS_POLL_EV_ERR) { + /* + * Handle error events by triggering all in/out events + * that has been selected on. + * We *do not* want to call a callback that corresponds + * to an event not selected. + */ + revents = state->active_events; + state->active_events = 0; + } else { + + /* Disregard any events that are not active at the moment, + for instance this could happen if the driver/nif does + select/deselect in rapid succession. */ + revents &= state->active_events | ERTS_POLL_EV_NVAL; + state->active_events &= ~revents; + + /* Reactivate the poll op if there are still active events */ + if (state->active_events) { + DEBUG_PRINT_FD("re-enable %s", state, ev2str(state->active_events)); + erts_io_control(state, ERTS_POLL_OP_MOD, state->active_events); + } + } switch (state->type) { case ERTS_EV_TYPE_DRV_SEL: { /* Requested via driver_select()... */ - ErtsPollEvents revents = pollres[i].events; - - if (revents & ERTS_POLL_EV_ERR) { - /* - * Handle error events by triggering all in/out events - * that the driver has selected. - * We *do not* want to call a callback that corresponds - * to an event not selected. - */ - revents = state->events; - } - else { - revents &= (state->events | ERTS_POLL_EV_NVAL); - } if (revents & (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) { if (revents & ERTS_POLL_EV_OUT) { - oready(state->driver.select->outport, state, current_cio_time); + oready(state->driver.select->outport, state); } /* Someone might have deselected input since revents - was read therefore, update revents... */ - revents &= state->events; + was read (true also on the non-smp emulator since + oready() may have been called); therefore, update + revents... */ + revents &= state->events; if (revents & ERTS_POLL_EV_IN) { - iready(state->driver.select->inport, state, current_cio_time); + iready(state->driver.select->inport, state); } } else if (revents & ERTS_POLL_EV_NVAL) { bad_fd_in_pollset(state, state->driver.select->inport, state->driver.select->outport); - add_active_fd(psi, state->fd); + check_fd_cleanup(state, &free_select, &free_nif); } break; } @@ -1964,89 +1632,116 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) struct erts_nif_select_event in = {NIL}; struct erts_nif_select_event out = {NIL}; ErtsResource* resource = NULL; - ErtsPollEvents revents = pollres[i].events; - - if (revents & ERTS_POLL_EV_ERR) { - /* - * Handle error events by triggering all in/out events - * that the NIF has selected. - * We *do not* want to send a message that corresponds - * to an event not selected. - */ - revents = state->events; - } - else { - revents &= (state->events | ERTS_POLL_EV_NVAL); - } if (revents & (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) { if (revents & ERTS_POLL_EV_OUT) { if (is_not_nil(state->driver.nif->out.pid)) { out = state->driver.nif->out; resource = state->driver.stop.resource; - state->driver.nif->out.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT; state->driver.nif->out.pid = NIL; - add_active_fd(psi, state->fd); - } - else { - ASSERT(state->driver.nif->out.ddeselect_cnt >= 2); - state->driver.nif->out.ddeselect_cnt--; } } if (revents & ERTS_POLL_EV_IN) { if (is_not_nil(state->driver.nif->in.pid)) { in = state->driver.nif->in; resource = state->driver.stop.resource; - state->driver.nif->in.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT; state->driver.nif->in.pid = NIL; - add_active_fd(psi, state->fd); - } - else { - ASSERT(state->driver.nif->in.ddeselect_cnt >= 2); - state->driver.nif->in.ddeselect_cnt--; } } + state->events &= ~revents; } else if (revents & ERTS_POLL_EV_NVAL) { bad_fd_in_pollset(state, NIL, NIL); - add_active_fd(psi, state->fd); + check_fd_cleanup(state, &free_select, &free_nif); } erts_mtx_unlock(fd_mtx(fd)); + if (is_not_nil(in.pid)) { send_event_tuple(&in, resource, am_ready_input); } if (is_not_nil(out.pid)) { send_event_tuple(&out, resource, am_ready_output); } - goto next_pollres_unlocked; + continue; } + case ERTS_EV_TYPE_STOP_NIF: { + resource = state->driver.stop.resource; + state->type = ERTS_EV_TYPE_NONE; + goto case_ERTS_EV_TYPE_NONE; + } + + case ERTS_EV_TYPE_STOP_USE: { +#if ERTS_POLL_USE_FALLBACK + ASSERT(psi->ps == get_fallback()); +#endif + drv_ptr = state->driver.stop.drv_ptr; + state->type = ERTS_EV_TYPE_NONE; + /* fallthrough */ case ERTS_EV_TYPE_NONE: /* Deselected ... */ + case_ERTS_EV_TYPE_NONE: + ASSERT(!state->events && !state->active_events && !state->flags); + check_fd_cleanup(state, &free_select, &free_nif); break; + } default: { /* Error */ erts_dsprintf_buf_t *dsbufp; dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "Invalid event request type for fd in erts_poll()! " - "fd=%d, event request type=%sd\n", (int) state->fd, + "fd=%d, event request type=%d\n", (int) state->fd, (int) state->type); ASSERT(0); deselect(state, 0); - add_active_fd(psi, state->fd); break; } } - next_pollres:; erts_mtx_unlock(fd_mtx(fd)); - next_pollres_unlocked:; - } - erts_atomic_set_nob(&psi->in_poll_wait, 0); - erts_free(ERTS_ALC_T_TMP, pollres); - forget_removed(psi); + if (drv_ptr) { + int was_unmasked = erts_block_fpe(); + DTRACE1(driver_stop_select, drv_ptr->name); + LTTNG1(driver_stop_select, drv_ptr->name); + (*drv_ptr->stop_select)((ErlDrvEvent) fd, NULL); + erts_unblock_fpe(was_unmasked); + if (drv_ptr->handle) { + erts_ddll_dereference_driver(drv_ptr->handle); + } + } + if (resource) { + erts_resource_stop(resource, (ErlNifEvent)fd, 1); + enif_release_resource(resource->data); + } + if (free_select) + free_drv_select_data(free_select); + if (free_nif) + free_nif_select_data(free_nif); + } + + /* The entire pollres array was filled with events, + * grow it for the next call. We do this for two reasons: + * 1. Pulling out more events in on go will increase throughput + * 2. If the polling implementation is not fair, this will make + * sure that we get all fds that we can. i.e. if 12 fds are + * constantly active, but we only have a pollres_len of 10, + * two of the fds may never be triggered depending on what the + * kernel decides to do. + **/ + if (pollres_len == psi->pollres_len) { + int ev_state_len = drv_ev_state_len(); + erts_free(ERTS_ALC_T_POLLSET, psi->pollres); + psi->pollres_len *= 2; + /* Never grow it larger than the current drv_ev_state.len size */ + if (psi->pollres_len > ev_state_len) + psi->pollres_len = ev_state_len; + psi->pollres = erts_alloc(ERTS_ALC_T_POLLSET, + sizeof(ErtsPollResFd) * psi->pollres_len); + } + + ERTS_MSACC_POP_STATE(); } static void @@ -2167,98 +1862,189 @@ static void drv_ev_state_free(void *des) } #endif -#ifdef ERTS_ENABLE_KERNEL_POLL +#define ERTS_MAX_NO_OF_POLL_THREADS ERTS_MAX_NO_OF_SCHEDULERS -struct io_functions { - int (*select)(ErlDrvPort, ErlDrvEvent, int, int); - int (*enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, const ErlNifPid*, Eterm); - void (*check_io_as_interrupt)(struct pollset_info*); - void (*check_io_interrupt)(struct pollset_info*, int); - void (*check_io_interrupt_tmd)(struct pollset_info*, int, ErtsMonotonicTime); - void (*check_io)(int); - struct pollset_info* (*get_pollset)(int sched_nr); - void (*notify_port_task_executed)(ErtsPortTaskHandle *pthp); - int (*max_files)(void); - Uint (*size)(void); - Eterm (*info)(void *); - int (*check_io_debug)(ErtsCheckIoDebugInfo *); -#ifdef ERTS_ENABLE_LOCK_COUNT - void (*lcnt_update_cio_locks)(int enable); -#endif -}; +static char * +get_arg(char* rest, char** argv, int* ip) +{ + int i = *ip; + if (*rest == '\0') { + if (argv[i+1] == NULL) { + erts_fprintf(stderr, "too few arguments\n"); + erts_usage(); + } + argv[i++] = NULL; + rest = argv[i]; + } + argv[i] = NULL; + *ip = i; + return rest; +} -# ifdef ERTS_KERNEL_POLL_VERSION -struct io_functions erts_io_funcs = {0}; -# else -extern struct io_functions erts_io_funcs; -# endif +static void +parse_args(int *argc, char **argv, int concurrent_waiters) +{ + int i = 0, j; + int no_pollsets = 0, no_poll_threads = 0, + no_pollsets_percentage = 0, + no_poll_threads_percentage = 0; + ASSERT(argc && argv); + while (i < *argc) { + if(argv[i][0] == '-') { + switch (argv[i][1]) { + case 'I': { + if (strncmp(argv[i]+2, "Ot", 2) == 0) { + char *arg = get_arg(argv[i]+4, argv, &i); + if (sscanf(arg, "%d", &no_poll_threads) != 1 || + no_poll_threads < 1 || + ERTS_MAX_NO_OF_POLL_THREADS < no_poll_threads) { + erts_fprintf(stderr,"bad I/O poll threads number: %s\n", arg); + erts_usage(); + } + } else if (strncmp(argv[i]+2, "Op", 3) == 0) { + char *arg = get_arg(argv[i]+4, argv, &i); + if (sscanf(arg, "%d", &no_pollsets) != 1 || + no_pollsets < 1) { + erts_fprintf(stderr,"bad I/O pollset number: %s\n", arg); + erts_usage(); + } + } else if (strncmp(argv[i]+2, "OPt", 4) == 0) { + char *arg = get_arg(argv[i]+5, argv, &i); + if (sscanf(arg, "%d", &no_poll_threads_percentage) != 1 || + no_poll_threads_percentage < 0 || + no_poll_threads_percentage > 100) { + erts_fprintf(stderr,"bad I/O poll thread percentage number: %s\n", arg); + erts_usage(); + } + } else if (strncmp(argv[i]+2, "OPp", 4) == 0) { + char *arg = get_arg(argv[i]+5, argv, &i); + if (sscanf(arg, "%d", &no_pollsets_percentage) != 1 || + no_pollsets_percentage < 0 || + no_pollsets_percentage > 100) { + erts_fprintf(stderr,"bad I/O pollset percentage number: %s\n", arg); + erts_usage(); + } + } else { + break; + } + break; + } + case 'K': + (void)get_arg(argv[i]+2, argv, &i); + break; + case '-': + goto args_parsed; + default: + break; + } + } + i++; + } + +args_parsed: -#endif /* ERTS_ENABLE_KERNEL_POLL */ + if (!concurrent_waiters) { + no_pollsets = no_poll_threads; + no_pollsets_percentage = 100; + } + + if (no_poll_threads == 0) { + if (no_poll_threads_percentage == 0) + no_poll_threads = 1; /* This is the default */ + else { + no_poll_threads = erts_no_schedulers * no_poll_threads_percentage / 100; + if (no_poll_threads < 1) + no_poll_threads = 1; + } + } + + if (no_pollsets == 0) { + if (no_pollsets_percentage == 0) + no_pollsets = 1; /* This is the default */ + else { + no_pollsets = no_poll_threads * no_pollsets_percentage / 100; + if (no_pollsets < 1) + no_pollsets = 1; + } + } + + if (no_poll_threads < no_pollsets) { + erts_fprintf(stderr, + "number of IO poll threads has to be greater or equal to " + "the number of \nIO pollsets. Current values are set to: \n" + " -IOt %d -IOp %d\n", + no_poll_threads, no_pollsets); + erts_usage(); + } + + /* Handled arguments have been marked with NULL. Slide arguments + not handled towards the beginning of argv. */ + for (i = 0, j = 0; i < *argc; i++) { + if (argv[i]) + argv[j++] = argv[i]; + } + *argc = j; + + erts_no_pollsets = no_pollsets; + erts_no_poll_threads = no_poll_threads; +} void -ERTS_CIO_EXPORT(erts_init_check_io)(void) +erts_init_check_io(int *argc, char **argv) { - int j; + int j, concurrent_waiters, no_poll_threads; ERTS_CT_ASSERT((INT_MIN & (ERL_NIF_SELECT_STOP_CALLED | ERL_NIF_SELECT_STOP_SCHEDULED | ERL_NIF_SELECT_INVALID_EVENT | ERL_NIF_SELECT_FAILED)) == 0); -#ifdef ERTS_ENABLE_KERNEL_POLL - ASSERT(erts_io_funcs.select == NULL); - erts_io_funcs.select = ERTS_CIO_EXPORT(driver_select); - erts_io_funcs.enif_select = ERTS_CIO_EXPORT(enif_select); - erts_io_funcs.check_io_interrupt = ERTS_CIO_EXPORT(erts_check_io_interrupt); - erts_io_funcs.check_io_interrupt_tmd= ERTS_CIO_EXPORT(erts_check_io_interrupt_timed); - erts_io_funcs.check_io = ERTS_CIO_EXPORT(erts_check_io); - erts_io_funcs.get_pollset = ERTS_CIO_EXPORT(erts_get_pollset); - erts_io_funcs.notify_port_task_executed = ERTS_CIO_EXPORT(erts_io_notify_port_task_executed); - erts_io_funcs.max_files = ERTS_CIO_EXPORT(erts_check_io_max_files); - erts_io_funcs.size = ERTS_CIO_EXPORT(erts_check_io_size); - erts_io_funcs.info = ERTS_CIO_EXPORT(erts_check_io_info); - erts_io_funcs.check_io_debug = ERTS_CIO_EXPORT(erts_check_io_debug); -#ifdef ERTS_ENABLE_LOCK_COUNT - erts_io_funcs.lcnt_update_cio_locks = ERTS_CIO_EXPORT(erts_lcnt_update_cio_locks); -#endif -#endif - - init_removed_fd_alloc(); - - ERTS_CIO_POLL_INIT(); - pollsetv = erts_alloc(ERTS_ALC_T_POLLSET, - sizeof(struct pollset_info)*erts_no_schedulers); - for (j=0; j < erts_no_schedulers; j++) { - struct pollset_info* psi = &pollsetv[j]; - - erts_atomic_init_nob(&psi->check_io_time, 0); - erts_atomic_init_nob(&psi->in_poll_wait, 0); - psi->ps = ERTS_CIO_NEW_POLLSET(); - psi->active_fd.six = 0; - psi->active_fd.eix = 0; - erts_atomic32_init_nob(&psi->active_fd.no, 0); - psi->active_fd.size = ERTS_ACTIVE_FD_INC; - psi->active_fd.array = erts_alloc(ERTS_ALC_T_ACTIVE_FD_ARR, - sizeof(ErtsSysFdType)*ERTS_ACTIVE_FD_INC); -#ifdef DEBUG - { - int i; - for (i = 0; i < ERTS_ACTIVE_FD_INC; i++) - psi->active_fd.array[i] = ERTS_SYS_FD_INVALID; - } + + erts_poll_init(&concurrent_waiters); +#if ERTS_POLL_USE_FALLBACK + erts_poll_init_flbk(NULL); #endif - erts_atomic_init_nob(&psi->removed_list, (erts_aint_t)NULL); + parse_args(argc, argv, concurrent_waiters); + + /* Create the actual pollsets */ + pollsetv = erts_alloc(ERTS_ALC_T_POLLSET,sizeof(ErtsPollSet *) * erts_no_pollsets); + + for (j=0; j < erts_no_pollsets; j++) + pollsetv[j] = erts_poll_create_pollset(j); + +#if ERTS_POLL_USE_FALLBACK + flbk_pollset = erts_poll_create_pollset_flbk(-1); +#endif + + no_poll_threads = erts_no_poll_threads; +#if ERTS_POLL_USE_FALLBACK + no_poll_threads++; +#endif + + psiv = erts_alloc(ERTS_ALC_T_POLLSET, sizeof(ErtsPollThread) * no_poll_threads); + +#if ERTS_POLL_USE_FALLBACK + psiv[0].pollres_len = ERTS_CHECK_IO_POLL_RES_LEN; + psiv[0].pollres = erts_alloc(ERTS_ALC_T_POLLSET, + sizeof(ErtsPollResFd) * ERTS_CHECK_IO_POLL_RES_LEN); + psiv[0].ps = get_fallback(); + psiv++; +#endif + + for (j = 0; j < erts_no_poll_threads; j++) { + psiv[j].pollres_len = ERTS_CHECK_IO_POLL_RES_LEN; + psiv[j].pollres = erts_alloc(ERTS_ALC_T_POLLSET, + sizeof(ErtsPollResFd) * ERTS_CHECK_IO_POLL_RES_LEN); + psiv[j].ps = pollsetv[j % erts_no_pollsets]; } - { - int i; - for (i=0; i<DRV_EV_STATE_LOCK_CNT; i++) { - erts_mtx_init(&drv_ev_state.locks[i].lck, "drv_ev_state", make_small(i), - ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_IO); - } + for (j=0; j < ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT; j++) { + erts_mtx_init(&drv_ev_state.locks[j].lck, "drv_ev_state", make_small(j), + ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_IO); } + #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - drv_ev_state.max_fds = ERTS_CIO_POLL_MAX_FDS(); + drv_ev_state.max_fds = erts_poll_max_fds(); erts_atomic_init_nob(&drv_ev_state.len, 0); drv_ev_state.v = NULL; erts_mtx_init(&drv_ev_state.grow_lock, "drv_ev_state_grow", NIL, @@ -2281,24 +2067,29 @@ ERTS_CIO_EXPORT(erts_init_check_io)(void) } int -ERTS_CIO_EXPORT(erts_check_io_max_files)(void) +erts_check_io_max_files(void) { #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS return drv_ev_state.max_fds; #else - return ERTS_POLL_EXPORT(erts_poll_max_fds)(); + return erts_poll_max_fds(); #endif } Uint -ERTS_CIO_EXPORT(erts_check_io_size)(void) +erts_check_io_size(void) { Uint res = 0; ErtsPollInfo pi; int i; - for (i = 0; i < erts_no_schedulers; i++) { - ERTS_CIO_POLL_INFO(pollsetv[i].ps, &pi); +#if ERTS_POLL_USE_FALLBACK + erts_poll_info(get_fallback(), &pi); + res += pi.memory_size; +#endif + + for (i = 0; i < erts_no_pollsets; i++) { + erts_poll_info(pollsetv[i], &pi); res += pi.memory_size; } #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS @@ -2318,58 +2109,77 @@ ERTS_CIO_EXPORT(erts_check_io_size)(void) } Eterm -ERTS_CIO_EXPORT(erts_check_io_info)(void *proc) +erts_check_io_info(void *proc) { Process *p = (Process *) proc; Eterm tags[16], values[16], res, list = NIL; Uint sz, *szp, *hp, **hpp; ErtsPollInfo *piv; - Sint i, j; - - piv = erts_alloc(ERTS_ALC_T_TMP, - sizeof(ErtsPollInfo) * erts_no_schedulers); + Sint i, j = 0, len; + int no_pollsets = erts_no_pollsets + ERTS_POLL_USE_FALLBACK; + ERTS_CT_ASSERT(ERTS_POLL_USE_FALLBACK == 0 || ERTS_POLL_USE_FALLBACK == 1); - for (j = 0; j < erts_no_schedulers; j++) { - struct pollset_info *psi = &pollsetv[j]; - erts_aint_t cio_time = erts_atomic_read_acqb(&psi->check_io_time); + piv = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsPollInfo) * no_pollsets); - piv[j].active_fds = (int) erts_atomic32_read_acqb(&psi->active_fd.no); - while (1) { - erts_aint_t post_cio_time; - int post_active_fds; +#if ERTS_POLL_USE_FALLBACK + erts_poll_info_flbk(get_fallback(), &piv[0]); + piv[0].poll_threads = 1; + piv[0].active_fds = 0; + piv++; +#endif - ERTS_CIO_POLL_INFO(psi->ps, &piv[j]); + for (j = 0; j < erts_no_pollsets; j++) { + erts_poll_info(pollsetv[j], &piv[j]); + piv[j].active_fds = 0; + piv[j].poll_threads = erts_no_poll_threads / erts_no_pollsets; + if (erts_no_poll_threads % erts_no_pollsets > j) + piv[j].poll_threads++; + } - post_cio_time = erts_atomic_read_mb(&psi->check_io_time); - post_active_fds = (int) erts_atomic32_read_acqb(&psi->active_fd.no); - if (cio_time == post_cio_time && piv[j].active_fds == post_active_fds) - break; - cio_time = post_cio_time; - piv[j].active_fds = post_active_fds; +#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS + i = 0; + erts_mtx_lock(&drv_ev_state.grow_lock); + len = erts_atomic_read_nob(&drv_ev_state.len); + for (i = 0; i < ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT; i++) { + erts_mtx_lock(&drv_ev_state.locks[i].lck); + for (j = i; j < len; j+=ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT) { + ErtsDrvEventState *state = get_drv_ev_state(j); + int pollsetid = get_pollset_id(j); + ASSERT(fd_mtx(j) == &drv_ev_state.locks[i].lck); + if (state->flags & ERTS_EV_FLAG_FALLBACK) + pollsetid = -1; + if (state->driver.select + && (state->type == ERTS_EV_TYPE_DRV_SEL) + && (is_iotask_active(&state->driver.select->iniotask) + || is_iotask_active(&state->driver.select->outiotask))) + piv[pollsetid].active_fds++; } + erts_mtx_unlock(&drv_ev_state.locks[i].lck); + } + erts_mtx_unlock(&drv_ev_state.grow_lock); - #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - piv[j].memory_size += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len); - #else - piv[j].memory_size += safe_hash_table_sz(&drv_ev_state.tab); - { - SafeHashInfo hi; - safe_hash_get_info(&hi, &drv_ev_state.tab); - piv[j].memory_size += hi.objs * sizeof(ErtsDrvEventState); - } - erts_spin_lock(&drv_ev_state.prealloc_lock); - piv[j].memory_size += drv_ev_state.num_prealloc * sizeof(ErtsDrvEventState); - erts_spin_unlock(&drv_ev_state.prealloc_lock); - #endif + piv[0].memory_size += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len); +#else + piv[0].memory_size += safe_hash_table_sz(&drv_ev_state.tab); + { + SafeHashInfo hi; + safe_hash_get_info(&hi, &drv_ev_state.tab); + piv[0].memory_size += hi.objs * sizeof(ErtsDrvEventState); } + erts_spin_lock(&drv_ev_state.prealloc_lock); + piv[0].memory_size += drv_ev_state.num_prealloc * sizeof(ErtsDrvEventState); + erts_spin_unlock(&drv_ev_state.prealloc_lock); +#endif hpp = NULL; szp = &sz; sz = 0; + piv -= ERTS_POLL_USE_FALLBACK; + bld_it: - for (j = erts_no_schedulers-1; j >= 0; j--) { + for (j = no_pollsets-1; j >= 0; j--) { i = 0; tags[i] = erts_bld_atom(hpp, szp, "name"); @@ -2378,9 +2188,6 @@ ERTS_CIO_EXPORT(erts_check_io_info)(void *proc) tags[i] = erts_bld_atom(hpp, szp, "primary"); values[i++] = erts_bld_atom(hpp, szp, piv[j].primary); - tags[i] = erts_bld_atom(hpp, szp, "fallback"); - values[i++] = erts_bld_atom(hpp, szp, piv[j].fallback ? piv[j].fallback : "false"); - tags[i] = erts_bld_atom(hpp, szp, "kernel_poll"); values[i++] = erts_bld_atom(hpp, szp, piv[j].kernel_poll ? piv[j].kernel_poll : "false"); @@ -2389,20 +2196,13 @@ ERTS_CIO_EXPORT(erts_check_io_info)(void *proc) values[i++] = erts_bld_uint(hpp, szp, piv[j].memory_size); tags[i] = erts_bld_atom(hpp, szp, "total_poll_set_size"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].poll_set_size); - - if (piv[j].fallback) { - tags[i] = erts_bld_atom(hpp, szp, "fallback_poll_set_size"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].fallback_poll_set_size); - } + values[i++] = erts_bld_uint(hpp, szp, piv[j].poll_set_size); tags[i] = erts_bld_atom(hpp, szp, "lazy_updates"); values[i++] = piv[j].lazy_updates ? am_true : am_false; - if (piv[j].lazy_updates) { - tags[i] = erts_bld_atom(hpp, szp, "pending_updates"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].pending_updates); - } + tags[i] = erts_bld_atom(hpp, szp, "pending_updates"); + values[i++] = erts_bld_uint(hpp, szp, piv[j].pending_updates); tags[i] = erts_bld_atom(hpp, szp, "batch_updates"); values[i++] = piv[j].batch_updates ? am_true : am_false; @@ -2410,22 +2210,17 @@ ERTS_CIO_EXPORT(erts_check_io_info)(void *proc) tags[i] = erts_bld_atom(hpp, szp, "concurrent_updates"); values[i++] = piv[j].concurrent_updates ? am_true : am_false; + tags[i] = erts_bld_atom(hpp, szp, "fallback"); + values[i++] = piv[j].is_fallback ? am_true : am_false; + tags[i] = erts_bld_atom(hpp, szp, "max_fds"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].max_fds); + values[i++] = erts_bld_uint(hpp, szp, piv[j].max_fds); tags[i] = erts_bld_atom(hpp, szp, "active_fds"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].active_fds); - - #ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS - tags[i] = erts_bld_atom(hpp, szp, "no_avoided_wakeups"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_avoided_wakeups); + values[i++] = erts_bld_uint(hpp, szp, piv[j].active_fds); - tags[i] = erts_bld_atom(hpp, szp, "no_avoided_interrupts"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_avoided_interrupts); - - tags[i] = erts_bld_atom(hpp, szp, "no_interrupt_timed"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_interrupt_timed); - #endif + tags[i] = erts_bld_atom(hpp, szp, "poll_threads"); + values[i++] = erts_bld_uint(hpp, szp, piv[j].poll_threads); res = erts_bld_2tup_list(hpp, szp, i, tags, values); @@ -2454,6 +2249,10 @@ static ERTS_INLINE ErtsPollEvents print_events(ErtsPollEvents ev) { int first = 1; + if(ev == ERTS_POLL_EV_NONE) { + erts_printf("N/A"); + return 0; + } if(ev & ERTS_POLL_EV_IN) { ev &= ~ERTS_POLL_EV_IN; erts_printf("%s%s", first ? "" : "|", "IN"); @@ -2486,15 +2285,40 @@ print_flags(EventStateFlags f) erts_printf("%s","USED"); delim = "|"; } - if(f & ERTS_EV_FLAG_DEFER_IN_EV) { - erts_printf("%s%s", delim, "DRIN"); + if(f & ERTS_EV_FLAG_FALLBACK) { + erts_printf("%s%s", delim, "FLBK"); delim = "|"; } - if(f & ERTS_EV_FLAG_DEFER_OUT_EV) { - erts_printf("%s%s", delim, "DROUT"); +} + +#ifdef DEBUG_PRINT_MODE + +static ERTS_INLINE char * +drvmode2str(int mode) { + switch (mode) { + case ERL_DRV_READ|ERL_DRV_USE: return "READ|USE"; + case ERL_DRV_WRITE|ERL_DRV_USE: return "WRITE|USE"; + case ERL_DRV_READ|ERL_DRV_WRITE|ERL_DRV_USE: return "READ|WRITE|USE"; + case ERL_DRV_USE: return "USE"; + case ERL_DRV_READ: return "READ"; + case ERL_DRV_WRITE: return "WRITE"; + case ERL_DRV_READ|ERL_DRV_WRITE: return "READ|WRITE"; + default: return "UNKNOWN"; + } +} + +static ERTS_INLINE char * +nifmode2str(enum ErlNifSelectFlags mode) { + switch (mode) { + case ERL_NIF_SELECT_READ: return "READ"; + case ERL_NIF_SELECT_WRITE: return "WRITE"; + case ERL_NIF_SELECT_STOP: return "STOP"; + default: return "UNKNOWN"; } } +#endif + typedef struct { int used_fds; int num_errors; @@ -2506,53 +2330,32 @@ typedef struct { #endif } IterDebugCounters; -static void doit_erts_check_io_debug(void *vstate, void *vcounters) +static int erts_debug_print_checkio_state(ErtsDrvEventState *state, + ErtsPollEvents ep_events, + int internal) { - ErtsDrvEventState *state = (ErtsDrvEventState *) vstate; - IterDebugCounters *counters = (IterDebugCounters *) vcounters; - ErtsPollEvents cio_events = state->events; - ErtsSysFdType fd = state->fd; -#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - int internal = 0; - ErtsPollEvents ep_events = counters->epep[(int) fd]; -#endif - int err = 0; - #if defined(HAVE_FSTAT) && !defined(NO_FSTAT_ON_SYS_FD_TYPE) struct stat stat_buf; #endif - if (state->driver.select) - counters->no_driver_select_structs++; - if (state->driver.nif) - counters->no_enif_select_structs++; - + ErtsSysFdType fd = state->fd; + ErtsPollEvents cio_events = state->events; + int err = 0; #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - if (state->events || ep_events) { - if (ep_events & ERTS_POLL_EV_NVAL) { - ep_events &= ~ERTS_POLL_EV_NVAL; - internal = 1; - counters->internal_fds++; - } - else - counters->used_fds++; -#else - if (state->events) { - counters->used_fds++; + ErtsPollEvents aio_events = state->active_events; #endif - - erts_printf("pollset=%d fd=%d ", - (int)(state->pollset - pollsetv), (int) fd); - + erts_printf("pollset=%d fd=%d ", + state->flags & ERTS_EV_FLAG_FALLBACK ? -1 : get_pollset_id(fd), (int) fd); + #if defined(HAVE_FSTAT) && !defined(NO_FSTAT_ON_SYS_FD_TYPE) - if (fstat((int) fd, &stat_buf) < 0) - erts_printf("type=unknown "); - else { - erts_printf("type="); + if (fstat((int) fd, &stat_buf) < 0) + erts_printf("type=unknown "); + else { + erts_printf("type="); #ifdef S_ISSOCK - if (S_ISSOCK(stat_buf.st_mode)) - erts_printf("sock "); - else + if (S_ISSOCK(stat_buf.st_mode)) + erts_printf("sock "); + else #endif #ifdef S_ISFIFO if (S_ISFIFO(stat_buf.st_mode)) @@ -2560,196 +2363,239 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters) else #endif #ifdef S_ISCHR - if (S_ISCHR(stat_buf.st_mode)) - erts_printf("chr "); - else + if (S_ISCHR(stat_buf.st_mode)) + erts_printf("chr "); + else #endif #ifdef S_ISDIR - if (S_ISDIR(stat_buf.st_mode)) - erts_printf("dir "); - else + if (S_ISDIR(stat_buf.st_mode)) + erts_printf("dir "); + else #endif #ifdef S_ISBLK - if (S_ISBLK(stat_buf.st_mode)) - erts_printf("blk "); - else + if (S_ISBLK(stat_buf.st_mode)) + erts_printf("blk "); + else #endif #ifdef S_ISREG - if (S_ISREG(stat_buf.st_mode)) - erts_printf("reg "); - else + if (S_ISREG(stat_buf.st_mode)) + erts_printf("reg "); + else #endif #ifdef S_ISLNK - if (S_ISLNK(stat_buf.st_mode)) - erts_printf("lnk "); - else + if (S_ISLNK(stat_buf.st_mode)) + erts_printf("lnk "); + else #endif #ifdef S_ISDOOR - if (S_ISDOOR(stat_buf.st_mode)) - erts_printf("door "); - else + if (S_ISDOOR(stat_buf.st_mode)) + erts_printf("door "); + else #endif #ifdef S_ISWHT - if (S_ISWHT(stat_buf.st_mode)) - erts_printf("wht "); - else + if (S_ISWHT(stat_buf.st_mode)) + erts_printf("wht "); + else #endif #ifdef S_ISXATTR - if (S_ISXATTR(stat_buf.st_mode)) - erts_printf("xattr "); - else + if (S_ISXATTR(stat_buf.st_mode)) + erts_printf("xattr "); + else #endif - erts_printf("unknown "); - } + erts_printf("unknown "); + } #else - erts_printf("type=unknown "); + erts_printf("type=unknown "); #endif - if (state->type == ERTS_EV_TYPE_DRV_SEL) { - erts_printf("driver_select "); - -#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - if (internal) { - erts_printf("internal "); - err = 1; - } - - if (cio_events == ep_events) { - erts_printf("ev="); - if (print_events(cio_events) != 0) - err = 1; - } - else { - ErtsPollEvents ev = cio_events; -#if ERTS_CIO_DEFER_ACTIVE_EVENTS - if (state->flags & ERTS_EV_FLAG_DEFER_IN_EV) - ev &= ~ERTS_POLL_EV_IN; - if (state->flags & ERTS_EV_FLAG_DEFER_OUT_EV) - ev &= ~ERTS_POLL_EV_OUT; -#endif - if (ev != ep_events) - err = 1; - erts_printf("cio_ev="); - print_events(cio_events); - erts_printf(" ep_ev="); - print_events(ep_events); - } -#else - if (print_events(cio_events) != 0) - err = 1; -#endif - erts_printf(" "); - if (cio_events & ERTS_POLL_EV_IN) { - Eterm id = state->driver.select->inport; - if (is_nil(id)) { - erts_printf("inport=none inname=none indrv=none "); - err = 1; - } - else { - ErtsPortNames *pnp = erts_get_port_names(id, ERTS_INVALID_ERL_DRV_PORT); - erts_printf(" inport=%T inname=%s indrv=%s ", - id, - pnp->name ? pnp->name : "unknown", - (pnp->driver_name - ? pnp->driver_name - : "unknown")); - erts_free_port_names(pnp); - } - } - if (cio_events & ERTS_POLL_EV_OUT) { - Eterm id = state->driver.select->outport; - if (is_nil(id)) { - erts_printf("outport=none outname=none outdrv=none "); - err = 1; - } - else { - ErtsPortNames *pnp = erts_get_port_names(id, ERTS_INVALID_ERL_DRV_PORT); - erts_printf(" outport=%T outname=%s outdrv=%s ", - id, - pnp->name ? pnp->name : "unknown", - (pnp->driver_name - ? pnp->driver_name - : "unknown")); - erts_free_port_names(pnp); - } - } - } - else if (state->type == ERTS_EV_TYPE_NIF) { - ErtsResource* r; - erts_printf("enif_select "); + if (state->type == ERTS_EV_TYPE_DRV_SEL) { + erts_printf("driver_select "); #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - if (internal) { - erts_printf("internal "); - err = 1; - } - + if (internal) { + erts_printf("internal "); + err = 1; + } + if (aio_events == cio_events) { if (cio_events == ep_events) { erts_printf("ev="); if (print_events(cio_events) != 0) err = 1; } else { - err = 1; + ErtsPollEvents ev = cio_events; + if (ev != ep_events && ep_events != ERTS_POLL_EV_NONE) + err = 1; erts_printf("cio_ev="); print_events(cio_events); erts_printf(" ep_ev="); print_events(ep_events); } + } else { + erts_printf("cio_ev="); + print_events(cio_events); + erts_printf(" aio_ev="); + print_events(aio_events); + if ((aio_events != ep_events && ep_events != ERTS_POLL_EV_NONE) || + (aio_events != 0 && ep_events == ERTS_POLL_EV_NONE)) { + erts_printf(" ep_ev="); + print_events(ep_events); + err = 1; + } + } #else + if (print_events(cio_events) != 0) + err = 1; +#endif + erts_printf(" "); + if (cio_events & ERTS_POLL_EV_IN) { + Eterm id = state->driver.select->inport; + if (is_nil(id)) { + erts_printf("inport=none inname=none indrv=none "); + err = 1; + } + else { + ErtsPortNames *pnp = erts_get_port_names(id, ERTS_INVALID_ERL_DRV_PORT); + erts_printf(" inport=%T inname=%s indrv=%s ", + id, + pnp->name ? pnp->name : "unknown", + (pnp->driver_name + ? pnp->driver_name + : "unknown")); + erts_free_port_names(pnp); + } + } + if (cio_events & ERTS_POLL_EV_OUT) { + Eterm id = state->driver.select->outport; + if (is_nil(id)) { + erts_printf("outport=none outname=none outdrv=none "); + err = 1; + } + else { + ErtsPortNames *pnp = erts_get_port_names(id, ERTS_INVALID_ERL_DRV_PORT); + erts_printf(" outport=%T outname=%s outdrv=%s ", + id, + pnp->name ? pnp->name : "unknown", + (pnp->driver_name + ? pnp->driver_name + : "unknown")); + erts_free_port_names(pnp); + } + } + } + else if (state->type == ERTS_EV_TYPE_NIF) { + ErtsResource* r; + erts_printf("enif_select "); + +#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS + if (internal) { + erts_printf("internal "); + err = 1; + } + + if (cio_events == ep_events) { + erts_printf("ev="); if (print_events(cio_events) != 0) err = 1; + } + else { + err = 1; + erts_printf("cio_ev="); + print_events(cio_events); + erts_printf(" ep_ev="); + print_events(ep_events); + } +#else + if (print_events(cio_events) != 0) + err = 1; #endif - erts_printf(" inpid=%T dd_cnt=%b32d", state->driver.nif->in.pid, - state->driver.nif->in.ddeselect_cnt); - erts_printf(" outpid=%T dd_cnt=%b32d", state->driver.nif->out.pid, - state->driver.nif->out.ddeselect_cnt); - r = state->driver.stop.resource; - erts_printf(" resource=%p(%T:%T)", r, r->type->module, r->type->name); + erts_printf(" inpid=%T", state->driver.nif->in.pid); + erts_printf(" outpid=%T", state->driver.nif->out.pid); + r = state->driver.stop.resource; + erts_printf(" resource=%p(%T:%T)", r, r->type->module, r->type->name); + } +#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS + else if (internal) { + erts_printf("internal "); + if (cio_events) { + err = 1; + erts_printf("cio_ev="); + print_events(cio_events); + } + if (ep_events) { + erts_printf("ep_ev="); + print_events(ep_events); } + } +#endif + else { + err = 1; + erts_printf("control_type=%d ", (int)state->type); #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - else if (internal) { - erts_printf("internal "); - if (cio_events) { - err = 1; - erts_printf("cio_ev="); - print_events(cio_events); - } - if (ep_events) { - erts_printf("ep_ev="); - print_events(ep_events); - } - } + if (cio_events == ep_events) { + erts_printf("ev="); + print_events(cio_events); + } + else { + erts_printf("cio_ev="); print_events(cio_events); + erts_printf(" ep_ev="); print_events(ep_events); + } +#else + erts_printf("ev=0x%b32x", (Uint32) cio_events); #endif - else { - err = 1; - erts_printf("control_type=%d ", (int)state->type); + } + + erts_printf(" flags="); print_flags(state->flags); + if (err) { + erts_printf(" ERROR"); + } + erts_printf("\r\n"); + return err; +} + +static void doit_erts_check_io_debug(void *vstate, void *vcounters) +{ + ErtsDrvEventState *state = (ErtsDrvEventState *) vstate; + IterDebugCounters *counters = (IterDebugCounters *) vcounters; + int internal = 0; #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - if (cio_events == ep_events) { - erts_printf("ev="); - print_events(cio_events); - } - else { - erts_printf("cio_ev="); print_events(cio_events); - erts_printf(" ep_ev="); print_events(ep_events); - } + ErtsSysFdType fd = state->fd; + ErtsPollEvents ep_events = counters->epep[(int) fd]; #else - erts_printf("ev=0x%b32x", (Uint32) cio_events); + ErtsPollEvents ep_events = ERTS_POLL_EV_NONE; #endif - } - - erts_printf(" flags="); print_flags(state->flags); - if (err) { + if (state->driver.select) { + counters->no_driver_select_structs++; + ASSERT(state->events || (ep_events != 0 && ep_events != ERTS_POLL_EV_NONE)); + } + if (state->driver.nif) { + counters->no_enif_select_structs++; + ASSERT(state->events || (ep_events != 0 && ep_events != ERTS_POLL_EV_NONE)); + } + +#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS + if (state->events || (ep_events != 0 && ep_events != ERTS_POLL_EV_NONE)) { + if (ep_events & ERTS_POLL_EV_NVAL) { + ep_events &= ~ERTS_POLL_EV_NVAL; + internal = 1; + counters->internal_fds++; + } + else + counters->used_fds++; +#else + if (state->events) { + counters->used_fds++; +#endif + if (erts_debug_print_checkio_state(state, ep_events, internal)) { counters->num_errors++; - erts_printf(" ERROR"); } - erts_printf("\n"); } } - + +/* ciodpi can be NULL when called from etp-commands */ int -ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip) +erts_check_io_debug(ErtsCheckIoDebugInfo *ciodip) { #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS int fd, len, i; @@ -2758,12 +2604,10 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip) #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS ErtsDrvEventState null_des; - null_des.pollset = NULL; null_des.driver.select = NULL; null_des.driver.nif = NULL; null_des.driver.stop.drv_ptr = NULL; null_des.events = 0; - null_des.remove_cnt = 0; null_des.type = ERTS_EV_TYPE_NONE; null_des.flags = 0; @@ -2771,24 +2615,36 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip) sizeof(ErtsPollEvents)*drv_ev_state.max_fds); #endif - erts_printf("--- fds in pollset --------------------------------------\n"); #if defined(ERTS_ENABLE_LOCK_CHECK) erts_lc_check_exact(NULL, 0); /* No locks should be locked */ #endif - erts_thr_progress_block(); /* stop the world to avoid messy locking */ + if (ciodip) + erts_thr_progress_block(); /* stop the world to avoid messy locking */ #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS len = erts_atomic_read_nob(&drv_ev_state.len); - for (i = 0; i < erts_no_schedulers; i++) { - ERTS_POLL_EXPORT(erts_poll_get_selected_events)(pollsetv[i].ps, - counters.epep, - drv_ev_state.max_fds); +#if ERTS_POLL_USE_FALLBACK + erts_printf("--- fds in flbk pollset ---------------------------------\n"); + erts_poll_get_selected_events_flbk(get_fallback(), counters.epep, + drv_ev_state.max_fds); + for (fd = 0; fd < len; fd++) { + if (drv_ev_state.v[fd].flags & ERTS_EV_FLAG_FALLBACK) + doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters); + } +#endif + erts_printf("--- fds in pollset --------------------------------------\n"); + + for (i = 0; i < erts_no_pollsets; i++) { + erts_poll_get_selected_events(pollsetv[i], + counters.epep, + drv_ev_state.max_fds); for (fd = 0; fd < len; fd++) { - if (drv_ev_state.v[fd].pollset == &pollsetv[i]) - doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters); + if (!(drv_ev_state.v[fd].flags & ERTS_EV_FLAG_FALLBACK) + && get_pollset_id(fd) == i) + doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters); } } for (fd = len ; fd < drv_ev_state.max_fds; fd++) { @@ -2799,11 +2655,15 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip) safe_hash_for_each(&drv_ev_state.tab, &doit_erts_check_io_debug, &counters); #endif - erts_thr_progress_unblock(); - ciodip->no_used_fds = counters.used_fds; - ciodip->no_driver_select_structs = counters.no_driver_select_structs; - ciodip->no_enif_select_structs = counters.no_enif_select_structs; + if (ciodip) + erts_thr_progress_unblock(); + + if (ciodip) { + ciodip->no_used_fds = counters.used_fds; + ciodip->no_driver_select_structs = counters.no_driver_select_structs; + ciodip->no_enif_select_structs = counters.no_enif_select_structs; + } erts_printf("\n"); erts_printf("used fds=%d\n", counters.used_fds); @@ -2822,97 +2682,19 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip) } #ifdef ERTS_ENABLE_LOCK_COUNT -void ERTS_CIO_EXPORT(erts_lcnt_update_cio_locks)(int enable) { +void erts_lcnt_update_cio_locks(int enable) { + int i; #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS erts_lcnt_enable_hash_lock_count(&drv_ev_state.tab, ERTS_LOCK_FLAGS_CATEGORY_IO, enable); #else (void)enable; #endif -} -#endif /* ERTS_ENABLE_LOCK_COUNT */ - -#ifdef ERTS_ENABLE_KERNEL_POLL -# ifdef ERTS_KERNEL_POLL_VERSION - -/* - * Compile these only once for kp/nkp - */ -void erts_init_check_io(void) -{ - if (erts_use_kernel_poll) - erts_init_check_io_kp(); - else - erts_init_check_io_nkp(); -} - -int -driver_select(ErlDrvPort port, ErlDrvEvent event, int mode, int on) -{ - return (*erts_io_funcs.select)(port, event, mode, on); -} - -int enif_select(ErlNifEnv* env, ErlNifEvent event, - enum ErlNifSelectFlags flags, void* obj, const ErlNifPid* pid, Eterm ref) -{ - return (*erts_io_funcs.enif_select)(env, event, flags, obj, pid, ref); -} - -struct pollset_info* erts_get_pollset(int sched_nr) -{ - return (*erts_io_funcs.get_pollset)(sched_nr); -} - -void erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp) -{ - erts_io_funcs.notify_port_task_executed(pthp); -} - -void erts_check_io(int do_wait) -{ - erts_io_funcs.check_io(do_wait); -} - -Uint erts_check_io_size(void) -{ - return erts_io_funcs.size(); -} - - -Eterm erts_check_io_info(void *p) -{ - return (*erts_io_funcs.info)(p); -} - -int erts_check_io_max_files(void) -{ - return erts_io_funcs.max_files(); -} - -int -erts_check_io_debug(ErtsCheckIoDebugInfo *ip) -{ - return (*erts_io_funcs.check_io_debug)(ip); -} - -void erts_check_io_interrupt(struct pollset_info* psi, int set) -{ - erts_io_funcs.check_io_interrupt(psi, set); -} - -void erts_check_io_interrupt_timed(struct pollset_info* psi, int set, - ErtsMonotonicTime timeout_time) -{ - erts_io_funcs.check_io_interrupt_tmd(psi, set, timeout_time); -} - -#ifdef ERTS_ENABLE_LOCK_COUNT -void erts_lcnt_update_cio_locks(int enable) -{ - erts_io_funcs.lcnt_update_cio_locks(enable); -} +#if ERTS_POLL_USE_FALLBACK + erts_lcnt_enable_pollset_lock_count_flbk(get_fallback(), enable); #endif -#endif /* ERTS_KERNEL_POLL_VERSION */ - -#endif /* !ERTS_ENABLE_KERNEL_POLL */ + for (i = 0; i < erts_no_pollsets; i++) + erts_lcnt_enable_pollset_lock_count(pollsetv[i], enable); +} +#endif /* ERTS_ENABLE_LOCK_COUNT */ |