diff options
author | Sverker Eriksson <[email protected]> | 2017-05-12 18:05:03 +0200 |
---|---|---|
committer | Lukas Larsson <[email protected]> | 2017-10-02 10:34:26 +0200 |
commit | 9a0970257aaaf9d343f8045548a34abf30dc0c92 (patch) | |
tree | c46a7a7e5acf262f11099d00f1b42bed89f5590b /erts/emulator/sys/common | |
parent | 48e77453536e49b07ddb6be63ba322ddaa5dac45 (diff) | |
download | otp-9a0970257aaaf9d343f8045548a34abf30dc0c92.tar.gz otp-9a0970257aaaf9d343f8045548a34abf30dc0c92.tar.bz2 otp-9a0970257aaaf9d343f8045548a34abf30dc0c92.zip |
erts: Add multiple poll sets
Diffstat (limited to 'erts/emulator/sys/common')
-rw-r--r-- | erts/emulator/sys/common/erl_check_io.c | 734 | ||||
-rw-r--r-- | erts/emulator/sys/common/erl_check_io.h | 27 | ||||
-rw-r--r-- | erts/emulator/sys/common/erl_poll.h | 1 | ||||
-rw-r--r-- | erts/emulator/sys/common/erl_sys_common_misc.c | 8 |
4 files changed, 449 insertions, 321 deletions
diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c index 52584e4246..ff0f3ea121 100644 --- a/erts/emulator/sys/common/erl_check_io.c +++ b/erts/emulator/sys/common/erl_check_io.c @@ -94,6 +94,7 @@ static struct pollset_info { ErtsPollSet ps; erts_atomic_t in_poll_wait; /* set while doing poll */ + erts_atomic_t check_io_time; struct { int six; /* start index */ int eix; /* end index */ @@ -102,7 +103,7 @@ static struct pollset_info ErtsSysFdType *array; } active_fd; erts_atomic_t removed_list; /* struct removed_fd* */ -}pollset; +}*pollsetv; #define NUM_OF_POLLSETS 1 @@ -114,21 +115,39 @@ int ERTS_CIO_EXPORT(enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags Uint ERTS_CIO_EXPORT(erts_check_io_size)(void); Eterm ERTS_CIO_EXPORT(erts_check_io_info)(void *); int ERTS_CIO_EXPORT(erts_check_io_max_files)(void); -void ERTS_CIO_EXPORT(erts_check_io_interrupt)(int); -void ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(int, ErtsMonotonicTime); +void ERTS_CIO_EXPORT(erts_check_io_interrupt)(struct pollset_info*, int); +void ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(struct pollset_info*, int, ErtsMonotonicTime); void ERTS_CIO_EXPORT(erts_check_io)(int); +struct pollset_info* ERTS_CIO_EXPORT(erts_get_pollset)(int); int ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *); +void ERTS_CIO_EXPORT(erts_io_notify_port_task_executed)(ErtsPortTaskHandle *pthp); #ifdef ERTS_ENABLE_LOCK_COUNT void ERTS_CIO_EXPORT(erts_lcnt_update_cio_locks)(int enable); #endif #endif +/* ToDo: Was inline in erl_check_io.h but now need struct pollset_info */ +void +ERTS_CIO_EXPORT(erts_io_notify_port_task_executed)(ErtsPortTaskHandle *pthp) +{ + ErtsIoTask *itp = ErtsContainerStruct(pthp, ErtsIoTask, task); + erts_aint_t ci_time = erts_atomic_read_acqb(&itp->pollset->check_io_time); + erts_atomic_set_relb(&itp->executed_time, ci_time); +} + + +struct pollset_info* ERTS_CIO_EXPORT(erts_get_pollset)(int sched_nr) +{ + ASSERT(sched_nr > 0 && sched_nr <= erts_no_schedulers); + return &pollsetv[sched_nr - 1]; +} typedef struct { #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS SafeHashBucket hb; #endif ErtsSysFdType fd; + struct pollset_info *pollset; struct { ErtsDrvSelectDataState *select; /* ERTS_EV_TYPE_DRV_SEL */ ErtsNifSelectDataState *nif; /* ERTS_EV_TYPE_NIF */ @@ -209,6 +228,7 @@ static ERTS_INLINE ErtsDrvEventState* hash_new_drv_ev_state(ErtsSysFdType fd) { ErtsDrvEventState tmpl; tmpl.fd = fd; + tmpl.pollset = NULL; tmpl.driver.select = NULL; tmpl.driver.nif = NULL; tmpl.driver.stop.drv_ptr = NULL; @@ -252,10 +272,11 @@ steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource*, ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(removed_fd, struct removed_fd, 64, ERTS_ALC_T_FD_LIST) static ERTS_INLINE void -init_iotask(ErtsIoTask *io_task) +init_iotask(ErtsIoTask *io_task, struct pollset_info* psi) { erts_port_task_handle_init(&io_task->task); erts_atomic_init_nob(&io_task->executed_time, ~((erts_aint_t) 0)); + io_task->pollset = psi; } static ERTS_INLINE int @@ -269,14 +290,14 @@ is_iotask_active(ErtsIoTask *io_task, erts_aint_t current_cio_time) } static ERTS_INLINE ErtsDrvSelectDataState * -alloc_drv_select_data(void) +alloc_drv_select_data(struct pollset_info* psi) { ErtsDrvSelectDataState *dsp = erts_alloc(ERTS_ALC_T_DRV_SEL_D_STATE, sizeof(ErtsDrvSelectDataState)); dsp->inport = NIL; dsp->outport = NIL; - init_iotask(&dsp->iniotask); - init_iotask(&dsp->outiotask); + init_iotask(&dsp->iniotask, psi); + init_iotask(&dsp->outiotask, psi); return dsp; } @@ -307,11 +328,11 @@ free_nif_select_data(ErtsNifSelectDataState *dsp) } static ERTS_INLINE void -remember_removed(ErtsDrvEventState *state, struct pollset_info* psi) +remember_removed(ErtsDrvEventState *state) { struct removed_fd *fdlp; ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd))); - if (erts_atomic_read_nob(&psi->in_poll_wait)) { + if (erts_atomic_read_nob(&state->pollset->in_poll_wait)) { erts_aint_t was_next, exp_next; state->remove_cnt++; ASSERT(state->remove_cnt > 0); @@ -324,11 +345,11 @@ remember_removed(ErtsDrvEventState *state, struct pollset_info* psi) #endif /* Lockless atomic insertion in removed_list */ - was_next = erts_atomic_read_acqb(&psi->removed_list); + was_next = erts_atomic_read_acqb(&state->pollset->removed_list); do { exp_next = was_next; fdlp->next = (struct removed_fd*) exp_next; - was_next = erts_atomic_cmpxchg_mb(&psi->removed_list, + was_next = erts_atomic_cmpxchg_mb(&state->pollset->removed_list, (erts_aint_t) fdlp, exp_next); }while (was_next != exp_next); @@ -384,7 +405,7 @@ forget_removed(struct pollset_info* psi) state->driver.stop.resource = NULL; ASSERT(resource); state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; + state->flags = 0; goto case_ERTS_EV_TYPE_NONE; case ERTS_EV_TYPE_STOP_USE: @@ -392,11 +413,12 @@ forget_removed(struct pollset_info* psi) drv_ptr = state->driver.stop.drv_ptr; ASSERT(drv_ptr); state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; + state->flags = 0; state->driver.stop.drv_ptr = NULL; /* Fall through */ case ERTS_EV_TYPE_NONE: case_ERTS_EV_TYPE_NONE: + state->pollset = NULL; #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS hash_erase_drv_ev_state(state); #endif @@ -455,6 +477,7 @@ grow_drv_ev_state(int min_ix) sizeof(ErtsDrvEventState)*new_len)); for (i = old_len; i < new_len; i++) { drv_ev_state.v[i].fd = (ErtsSysFdType) i; + drv_ev_state.v[i].pollset = NULL; drv_ev_state.v[i].driver.select = NULL; drv_ev_state.v[i].driver.stop.drv_ptr = NULL; drv_ev_state.v[i].driver.nif = NULL; @@ -541,7 +564,9 @@ deselect(ErtsDrvEventState *state, int mode) } } - state->events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, rm_events, 0, &do_wake); + ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, rm_events, + 0, &do_wake); + state->events &= ~rm_events; if (!(state->events)) { switch (state->type) { @@ -565,8 +590,8 @@ deselect(ErtsDrvEventState *state, int mode) } state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; - remember_removed(state, &pollset); + state->flags = 0; + remember_removed(state); } } @@ -585,7 +610,7 @@ check_fd_cleanup(ErtsDrvEventState *state, ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd))); - current_cio_time = erts_atomic_read_acqb(&erts_check_io_time); + current_cio_time = erts_atomic_read_acqb(&state->pollset->check_io_time); *free_select = NULL; if (state->driver.select && (state->type != ERTS_EV_TYPE_DRV_SEL) @@ -602,19 +627,27 @@ check_fd_cleanup(ErtsDrvEventState *state, state->driver.nif = NULL; } -#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS if (((state->type != ERTS_EV_TYPE_NONE) | state->remove_cnt + | (state->driver.nif != NULL) | (state->driver.select != NULL)) == 0) { + state->pollset = NULL; +#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS hash_erase_drv_ev_state(state); - - } #endif + } } +#ifdef __WIN32__ +# define MUST_DEFER(MAY_SLEEP) 1 +#else +# define MUST_DEFER(MAY_SLEEP) (MAY_SLEEP) +#endif + static ERTS_INLINE int -check_cleanup_active_fd(ErtsSysFdType fd, +check_cleanup_active_fd(struct pollset_info* psi, + ErtsSysFdType fd, #if ERTS_CIO_DEFER_ACTIVE_EVENTS ErtsPollControlEntry *pce, int *pce_ix, @@ -637,14 +670,15 @@ check_cleanup_active_fd(ErtsSysFdType fd, state = &drv_ev_state.v[(int) fd]; #else state = hash_get_drv_ev_state(fd); /* may be NULL! */ - if (state) #endif + if (state && state->pollset == psi) { if (state->driver.select) { #if ERTS_CIO_DEFER_ACTIVE_EVENTS if (is_iotask_active(&state->driver.select->iniotask, current_cio_time)) { active = 1; - if ((state->events & ERTS_POLL_EV_IN) + if (MUST_DEFER(may_sleep) + && (state->events & ERTS_POLL_EV_IN) && !(state->flags & ERTS_EV_FLAG_DEFER_IN_EV)) { evoff |= ERTS_POLL_EV_IN; state->flags |= ERTS_EV_FLAG_DEFER_IN_EV; @@ -657,15 +691,17 @@ check_cleanup_active_fd(ErtsSysFdType fd, } if (is_iotask_active(&state->driver.select->outiotask, current_cio_time)) { active = 1; - if ((state->events & ERTS_POLL_EV_OUT) + if (MUST_DEFER(may_sleep) + && (state->events & ERTS_POLL_EV_OUT) && !(state->flags & ERTS_EV_FLAG_DEFER_OUT_EV)) { evoff |= ERTS_POLL_EV_OUT; state->flags |= ERTS_EV_FLAG_DEFER_OUT_EV; } } else if (state->flags & ERTS_EV_FLAG_DEFER_OUT_EV) { - if (state->events & ERTS_POLL_EV_OUT) + if (state->events & ERTS_POLL_EV_OUT) { evon |= ERTS_POLL_EV_OUT; + } state->flags &= ~ERTS_EV_FLAG_DEFER_OUT_EV; } if (active) @@ -681,6 +717,12 @@ check_cleanup_active_fd(ErtsSysFdType fd, free_select = state->driver.select; state->driver.select = NULL; } +#if ERTS_CIO_DEFER_ACTIVE_EVENTS + if (evon) { + int do_wake = 0; + ERTS_CIO_POLL_CTL(psi->ps, state->fd, evon, 1, &do_wake); + } +#endif } if (state->driver.nif) { @@ -705,7 +747,7 @@ check_cleanup_active_fd(ErtsSysFdType fd, } if (rm_events) { int do_wake = 0; - state->events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, + state->events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, rm_events, 0, &do_wake); } if (state->events) @@ -720,7 +762,6 @@ check_cleanup_active_fd(ErtsSysFdType fd, if (((state->type != ERTS_EV_TYPE_NONE) | state->remove_cnt | active) == 0) hash_erase_drv_ev_state(state); #endif - } erts_mtx_unlock(mtx); @@ -737,28 +778,23 @@ check_cleanup_active_fd(ErtsSysFdType fd, pcep->events = evoff; pcep->on = 0; } - if (evon) { - ErtsPollControlEntry *pcep = &pce[(*pce_ix)++]; - pcep->fd = fd; - pcep->events = evon; - pcep->on = 1; - } #endif return active; } static void -check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep) +check_cleanup_active_fds(struct pollset_info* psi, + erts_aint_t current_cio_time, int may_sleep) { - int six = pollset.active_fd.six; - int eix = pollset.active_fd.eix; - erts_aint32_t no = erts_atomic32_read_dirty(&pollset.active_fd.no); - int size = pollset.active_fd.size; + int six = psi->active_fd.six; + int eix = psi->active_fd.eix; + erts_aint32_t no = erts_atomic32_read_dirty(&psi->active_fd.no); + const int size = psi->active_fd.size; int ix = six; #if ERTS_CIO_DEFER_ACTIVE_EVENTS - /* every fd might add two entries */ - Uint pce_sz = 2*sizeof(ErtsPollControlEntry)*no; + /* every fd might add one entry */ + Uint pce_sz = sizeof(ErtsPollControlEntry)*no; ErtsPollControlEntry *pctrl_entries = (pce_sz ? erts_alloc(ERTS_ALC_T_TMP, pce_sz) : NULL); @@ -766,12 +802,12 @@ check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep) #endif while (ix != eix) { - ErtsSysFdType fd = pollset.active_fd.array[ix]; + ErtsSysFdType fd = psi->active_fd.array[ix]; int nix = ix + 1; if (nix >= size) nix = 0; ASSERT(fd != ERTS_SYS_FD_INVALID); - if (!check_cleanup_active_fd(fd, + if (!check_cleanup_active_fd(psi, fd, #if ERTS_CIO_DEFER_ACTIVE_EVENTS pctrl_entries, &pctrl_ix, @@ -781,14 +817,14 @@ check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep) no--; if (ix == six) { #ifdef DEBUG - pollset.active_fd.array[ix] = ERTS_SYS_FD_INVALID; + psi->active_fd.array[ix] = ERTS_SYS_FD_INVALID; #endif six = nix; } else { - pollset.active_fd.array[ix] = pollset.active_fd.array[six]; + psi->active_fd.array[ix] = psi->active_fd.array[six]; #ifdef DEBUG - pollset.active_fd.array[six] = ERTS_SYS_FD_INVALID; + psi->active_fd.array[six] = ERTS_SYS_FD_INVALID; #endif six++; if (six >= size) @@ -801,53 +837,53 @@ check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep) #if ERTS_CIO_DEFER_ACTIVE_EVENTS ASSERT(pctrl_ix <= pce_sz/sizeof(ErtsPollControlEntry)); if (pctrl_ix) - ERTS_CIO_POLL_CTLV(pollset.ps, pctrl_entries, pctrl_ix); + ERTS_CIO_POLL_CTLV(psi->ps, pctrl_entries, pctrl_ix); if (pctrl_entries) erts_free(ERTS_ALC_T_TMP, pctrl_entries); #endif - pollset.active_fd.six = six; - pollset.active_fd.eix = eix; - erts_atomic32_set_relb(&pollset.active_fd.no, no); + psi->active_fd.six = six; + psi->active_fd.eix = eix; + erts_atomic32_set_relb(&psi->active_fd.no, no); } -static void grow_active_fds(void) +static void grow_active_fds(struct pollset_info *psi) { - ASSERT(pollset.active_fd.six == pollset.active_fd.eix); - pollset.active_fd.six = 0; - pollset.active_fd.eix = pollset.active_fd.size; - pollset.active_fd.size += ERTS_ACTIVE_FD_INC; - pollset.active_fd.array = erts_realloc(ERTS_ALC_T_ACTIVE_FD_ARR, - pollset.active_fd.array, - pollset.active_fd.size*sizeof(ErtsSysFdType)); + ASSERT(psi->active_fd.six == psi->active_fd.eix); + psi->active_fd.six = 0; + psi->active_fd.eix = psi->active_fd.size; + psi->active_fd.size += ERTS_ACTIVE_FD_INC; + psi->active_fd.array = erts_realloc(ERTS_ALC_T_ACTIVE_FD_ARR, + psi->active_fd.array, + psi->active_fd.size*sizeof(ErtsSysFdType)); #ifdef DEBUG { int i; - for (i = pollset.active_fd.eix + 1; i < pollset.active_fd.size; i++) - pollset.active_fd.array[i] = ERTS_SYS_FD_INVALID; + for (i = psi->active_fd.eix + 1; i < psi->active_fd.size; i++) + psi->active_fd.array[i] = ERTS_SYS_FD_INVALID; } #endif } static ERTS_INLINE void -add_active_fd(ErtsSysFdType fd) +add_active_fd(struct pollset_info *psi, ErtsSysFdType fd) { - int eix = pollset.active_fd.eix; - int size = pollset.active_fd.size; + int eix = psi->active_fd.eix; + const int size = psi->active_fd.size; - pollset.active_fd.array[eix] = fd; + psi->active_fd.array[eix] = fd; - erts_atomic32_set_relb(&pollset.active_fd.no, - (erts_atomic32_read_dirty(&pollset.active_fd.no) + erts_atomic32_set_relb(&psi->active_fd.no, + (erts_atomic32_read_dirty(&psi->active_fd.no) + 1)); eix++; if (eix >= size) eix = 0; - pollset.active_fd.eix = eix; + psi->active_fd.eix = eix; - if (pollset.active_fd.six == eix) { - grow_active_fds(); + if (psi->active_fd.six == eix) { + grow_active_fds(psi); } } @@ -862,9 +898,9 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, Eterm id = erts_drvport2id(ix); ErtsSysFdType fd = (ErtsSysFdType) e; ErtsPollEvents ctl_events = (ErtsPollEvents) 0; - ErtsPollEvents new_events, old_events; + ErtsPollEvents old_events; ErtsDrvEventState *state; - int wake_poller; + int wake_poller = 0; int ret; ErtsDrvSelectDataState *free_select = NULL; ErtsNifSelectDataState *free_nif = NULL; @@ -898,22 +934,25 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, state = hash_get_drv_ev_state(fd); /* may be NULL! */ #endif - if (!on && (mode&ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) { - if (IS_FD_UNKNOWN(state)) { - /* fast track to stop_select callback */ - stop_select_fn = prt->drv_ptr->stop_select; -#ifdef USE_VM_PROBES - strncpy(name, prt->drv_ptr->name, - sizeof(DTRACE_CHARBUF_NAME(name))-1); - name[sizeof(name)-1] = '\0'; -#endif - ret = 0; - goto done_unknown; - } - mode |= (ERL_DRV_READ | ERL_DRV_WRITE); - wake_poller = 1; /* to eject fd from pollset (if needed) */ + if (!on) { + if (IS_FD_UNKNOWN(state)) { + if ((mode&ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) { + /* fast track to stop_select callback */ + stop_select_fn = prt->drv_ptr->stop_select; + #ifdef USE_VM_PROBES + strncpy(name, prt->drv_ptr->name, + sizeof(DTRACE_CHARBUF_NAME(name))-1); + name[sizeof(name)-1] = '\0'; + #endif + } + ret = 0; + goto done_unknown; + } + else if ((mode&ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) { + mode |= (ERL_DRV_READ | ERL_DRV_WRITE); + wake_poller = 1; /* to eject fd from pollset (if needed) */ + } } - else wake_poller = 0; #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS if (state == NULL) { @@ -951,7 +990,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, if (owner != id && is_not_nil(owner)) drv_select_steal(ix, state, mode, on); } - ctl_events |= ERTS_POLL_EV_IN; + ctl_events = ERTS_POLL_EV_IN; } if (mode & ERL_DRV_WRITE) { if (state->type == ERTS_EV_TYPE_DRV_SEL) { @@ -962,44 +1001,61 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, ctl_events |= ERTS_POLL_EV_OUT; } + ASSERT((state->type == ERTS_EV_TYPE_DRV_SEL) || (state->type == ERTS_EV_TYPE_NONE && !state->events)); - if (!on && !(state->flags & ERTS_EV_FLAG_USED) - && state->events && !(state->events & ~ctl_events)) { - /* Old driver removing all events. At least wake poller. - It will not make close() 100% safe but it will prevent - actions delayed by poll timeout. */ - wake_poller = 1; - } + old_events = state->events; - new_events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, ctl_events, on, &wake_poller); + if (on) { + ctl_events &= ~old_events; + state->events |= ctl_events; + } + else { + ctl_events &= old_events; + state->events &= ~ctl_events; - if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { - if (state->type == ERTS_EV_TYPE_DRV_SEL && !state->events) { - state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; - state->driver.select->inport = NIL; - state->driver.select->outport = NIL; - } - ret = -1; - goto done; + if (!(state->flags & ERTS_EV_FLAG_USED) + && old_events && !state->events) { + /* + * Old driver removing all events. At least wake poller. + * It will not make close() 100% safe but it will prevent + * actions delayed by poll timeout. + */ + wake_poller = 1; + } } - old_events = state->events; + if (ctl_events) { + ErtsPollEvents new_events; - ASSERT(on - ? (new_events == (state->events | ctl_events)) - : (new_events == (state->events & ~ctl_events))); + if (!state->pollset) { + ErtsSchedulerData* esdp = erts_get_scheduler_data(); + ASSERT(esdp); + state->pollset = esdp->pollset; + } - ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL - || state->type == ERTS_EV_TYPE_NONE); + new_events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, ctl_events, on, &wake_poller); - state->events = new_events; - if (ctl_events) { - if (on) { + if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { + if (state->type == ERTS_EV_TYPE_DRV_SEL && !old_events) { + state->type = ERTS_EV_TYPE_NONE; + state->flags = 0; + state->driver.select->inport = NIL; + state->driver.select->outport = NIL; + } + ret = -1; + goto done; + } + + ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL + || state->type == ERTS_EV_TYPE_NONE); + } + + if (on) { + if (ctl_events) { if (!state->driver.select) - state->driver.select = alloc_drv_select_data(); + state->driver.select = alloc_drv_select_data(state->pollset); if (state->type == ERTS_EV_TYPE_NONE) state->type = ERTS_EV_TYPE_DRV_SEL; ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL); @@ -1010,8 +1066,9 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, if (mode & ERL_DRV_USE) { state->flags |= ERTS_EV_FLAG_USED; } - } - else { /* off */ + } + } + else { /* off */ if (state->type == ERTS_EV_TYPE_DRV_SEL) { if (ctl_events & ERTS_POLL_EV_IN) { abort_tasks(state, ERL_DRV_READ); @@ -1021,20 +1078,20 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, abort_tasks(state, ERL_DRV_WRITE); state->driver.select->outport = NIL; } - if (new_events == 0) { + if (state->events == 0) { if (old_events != 0) { - remember_removed(state, &pollset); + remember_removed(state); } if ((mode & ERL_DRV_USE) || !(state->flags & ERTS_EV_FLAG_USED)) { state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; + state->flags = 0; } /*else keep it, as fd will probably be selected upon again */ } } if ((mode & ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) { erts_driver_t* drv_ptr = prt->drv_ptr; - ASSERT(new_events==0); + ASSERT(state->events==0); if (state->remove_cnt == 0 || !wake_poller) { /* Safe to close fd now as it is not in pollset or there was no need to eject fd (kernel poll) */ @@ -1053,7 +1110,6 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, } } } - } } ret = 0; @@ -1093,7 +1149,7 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, ErtsResource* resource = DATA_TO_RESOURCE(obj); ErtsSysFdType fd = (ErtsSysFdType) e; ErtsPollEvents ctl_events = (ErtsPollEvents) 0; - ErtsPollEvents new_events, old_events; + ErtsPollEvents old_events; ErtsDrvEventState *state; int wake_poller; int ret; @@ -1192,32 +1248,45 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, ASSERT((state->type == ERTS_EV_TYPE_NIF) || (state->type == ERTS_EV_TYPE_NONE && !state->events)); - new_events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, ctl_events, on, &wake_poller); + old_events = state->events; - if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { - if (state->type == ERTS_EV_TYPE_NIF && !state->events) { - state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; - state->driver.nif->in.pid = NIL; - state->driver.nif->out.pid = NIL; - state->driver.nif->in.ddeselect_cnt = 0; - state->driver.nif->out.ddeselect_cnt = 0; - state->driver.stop.resource = NULL; - } - ret = INT_MIN | ERL_NIF_SELECT_FAILED; - goto done; + if (on) { + ctl_events &= ~old_events; + state->events |= ctl_events; + } + else { + ctl_events &= old_events; + state->events &= ~ctl_events; } - old_events = state->events; + if (ctl_events) { + ErtsPollEvents new_events; + + if (!state->pollset) { + state->pollset = erts_get_scheduler_data()->pollset; + } - ASSERT(on - ? (new_events == (state->events | ctl_events)) - : (new_events == (state->events & ~ctl_events))); + new_events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, ctl_events, on, &wake_poller); + + if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { + if (state->type == ERTS_EV_TYPE_NIF && !old_events) { + state->type = ERTS_EV_TYPE_NONE; + state->flags = 0; + state->driver.nif->in.pid = NIL; + state->driver.nif->out.pid = NIL; + state->driver.nif->in.ddeselect_cnt = 0; + state->driver.nif->out.ddeselect_cnt = 0; + state->driver.stop.resource = NULL; + } + ret = INT_MIN | ERL_NIF_SELECT_FAILED; + goto done; + } + ASSERT(new_events == state->events); + } ASSERT(state->type == ERTS_EV_TYPE_NIF || state->type == ERTS_EV_TYPE_NONE); - state->events = new_events; if (on) { const Eterm recipient = pid ? pid->pid : env->proc->common.id; Uint32* refn; @@ -1230,7 +1299,7 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, } ASSERT(state->type == ERTS_EV_TYPE_NIF); ASSERT(state->driver.stop.resource == resource); - if (ctl_events & ERTS_POLL_EV_IN) { + if (mode & ERL_DRV_READ) { state->driver.nif->in.pid = recipient; if (is_immed(ref)) { state->driver.nif->in.immed = ref; @@ -1244,7 +1313,7 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, } state->driver.nif->in.ddeselect_cnt = 0; } - if (ctl_events & ERTS_POLL_EV_OUT) { + if (mode & ERL_DRV_WRITE) { state->driver.nif->out.pid = recipient; if (is_immed(ref)) { state->driver.nif->out.immed = ref; @@ -1267,10 +1336,10 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, state->driver.nif->in.ddeselect_cnt = 0; state->driver.nif->out.ddeselect_cnt = 0; if (old_events != 0) { - remember_removed(state, &pollset); + remember_removed(state); } } - ASSERT(new_events==0); + ASSERT(state->events==0); if (state->remove_cnt == 0 || !wake_poller) { /* * Safe to close fd now as it is not in pollset @@ -1567,7 +1636,7 @@ steal_pending_stop_use(erts_dsprintf_buf_t *dsbufp, ErlDrvPort ix, erts_ddll_dereference_driver(state->driver.stop.drv_ptr->handle); } state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; + state->flags = 0; state->driver.stop.drv_ptr = NULL; } else { @@ -1606,7 +1675,7 @@ steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource* resource, enif_release_resource(state->driver.stop.resource); state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; + state->flags = 0; state->driver.stop.resource = NULL; } else { @@ -1656,7 +1725,7 @@ iready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time) (ErlDrvEvent) state->fd) != 0) { stale_drv_select(id, state, ERL_DRV_READ); } - add_active_fd(state->fd); + add_active_fd(state->pollset, state->fd); } } @@ -1674,7 +1743,7 @@ oready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time) (ErlDrvEvent) state->fd) != 0) { stale_drv_select(id, state, ERL_DRV_WRITE); } - add_active_fd(state->fd); + add_active_fd(state->pollset, state->fd); } } @@ -1729,19 +1798,20 @@ send_event_tuple(struct erts_nif_select_event* e, ErtsResource* resource, static void bad_fd_in_pollset(ErtsDrvEventState *, Eterm inport, Eterm outport); void -ERTS_CIO_EXPORT(erts_check_io_interrupt)(int set) +ERTS_CIO_EXPORT(erts_check_io_interrupt)(struct pollset_info *psi, int set) { - ERTS_CIO_POLL_INTR(pollset.ps, set); + ERTS_CIO_POLL_INTR(psi->ps, set); } void -ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(int set, +ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(struct pollset_info *psi, + int set, ErtsMonotonicTime timeout_time) { - ERTS_CIO_POLL_INTR_TMD(pollset.ps, set, timeout_time); + ERTS_CIO_POLL_INTR_TMD(psi->ps, set, timeout_time); } -#if !ERTS_CIO_DEFER_ACTIVE_EVENTS +#ifndef __WIN32__ /* * Number of ignored events, for a lingering fd added by enif_select(), * until we deselect fd-event from pollset. @@ -1761,8 +1831,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) int poll_ret, i; erts_aint_t current_cio_time; ErtsSchedulerData *esdp = erts_get_scheduler_data(); - - ASSERT(esdp); + struct pollset_info *psi = esdp->pollset; restart: @@ -1781,24 +1850,25 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) * erts_check_io_time, since only one thread can * check io at a time. */ - current_cio_time = erts_atomic_read_dirty(&erts_check_io_time); + current_cio_time = erts_atomic_read_dirty(&psi->check_io_time); current_cio_time++; - erts_atomic_set_relb(&erts_check_io_time, current_cio_time); + erts_atomic_set_relb(&psi->check_io_time, current_cio_time); - check_cleanup_active_fds(current_cio_time, + check_cleanup_active_fds(psi, + current_cio_time, timeout_time != ERTS_POLL_NO_TIMEOUT); #ifdef ERTS_ENABLE_LOCK_CHECK erts_lc_check_exact(NULL, 0); /* No locks should be locked */ #endif - pollres_len = erts_atomic32_read_dirty(&pollset.active_fd.no) + ERTS_CHECK_IO_POLL_RES_LEN; + pollres_len = erts_atomic32_read_dirty(&psi->active_fd.no) + ERTS_CHECK_IO_POLL_RES_LEN; pollres = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsPollResFd)*pollres_len); - erts_atomic_set_nob(&pollset.in_poll_wait, 1); + erts_atomic_set_nob(&psi->in_poll_wait, 1); - poll_ret = ERTS_CIO_POLL_WAIT(pollset.ps, pollres, &pollres_len, timeout_time); + poll_ret = ERTS_CIO_POLL_WAIT(psi->ps, pollres, &pollres_len, timeout_time); #ifdef ERTS_ENABLE_LOCK_CHECK erts_lc_check_exact(NULL, 0); /* No locks should be locked */ @@ -1810,8 +1880,8 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) #endif if (poll_ret != 0) { - erts_atomic_set_nob(&pollset.in_poll_wait, 0); - forget_removed(&pollset); + erts_atomic_set_nob(&psi->in_poll_wait, 0); + forget_removed(psi); erts_free(ERTS_ALC_T_TMP, pollres); if (poll_ret == EAGAIN) { goto restart; @@ -1838,6 +1908,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) erts_mtx_lock(fd_mtx(fd)); + #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS state = &drv_ev_state.v[ (int) fd]; #else @@ -1848,7 +1919,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) #endif /* Skip this fd if it was removed from pollset */ - if (is_removed(state)) { + if (is_removed(state) || state->pollset != psi) { goto next_pollres; } @@ -1884,7 +1955,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) bad_fd_in_pollset(state, state->driver.select->inport, state->driver.select->outport); - add_active_fd(state->fd); + add_active_fd(psi, state->fd); } break; } @@ -1915,7 +1986,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) resource = state->driver.stop.resource; state->driver.nif->out.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT; state->driver.nif->out.pid = NIL; - add_active_fd(state->fd); + add_active_fd(psi, state->fd); } else { ASSERT(state->driver.nif->out.ddeselect_cnt >= 2); @@ -1928,7 +1999,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) resource = state->driver.stop.resource; state->driver.nif->in.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT; state->driver.nif->in.pid = NIL; - add_active_fd(state->fd); + add_active_fd(psi, state->fd); } else { ASSERT(state->driver.nif->in.ddeselect_cnt >= 2); @@ -1938,7 +2009,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) } else if (revents & ERTS_POLL_EV_NVAL) { bad_fd_in_pollset(state, NIL, NIL); - add_active_fd(state->fd); + add_active_fd(psi, state->fd); } erts_mtx_unlock(fd_mtx(fd)); @@ -1963,7 +2034,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) (int) state->type); ASSERT(0); deselect(state, 0); - add_active_fd(state->fd); + add_active_fd(psi, state->fd); break; } } @@ -1973,9 +2044,9 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) next_pollres_unlocked:; } - erts_atomic_set_nob(&pollset.in_poll_wait, 0); + erts_atomic_set_nob(&psi->in_poll_wait, 0); erts_free(ERTS_ALC_T_TMP, pollres); - forget_removed(&pollset); + forget_removed(psi); } static void @@ -2096,16 +2167,17 @@ static void drv_ev_state_free(void *des) } #endif - #ifdef ERTS_ENABLE_KERNEL_POLL struct io_functions { int (*select)(ErlDrvPort, ErlDrvEvent, int, int); int (*enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, const ErlNifPid*, Eterm); - void (*check_io_as_interrupt)(void); - void (*check_io_interrupt)(int); - void (*check_io_interrupt_tmd)(int, ErtsMonotonicTime); + void (*check_io_as_interrupt)(struct pollset_info*); + void (*check_io_interrupt)(struct pollset_info*, int); + void (*check_io_interrupt_tmd)(struct pollset_info*, int, ErtsMonotonicTime); void (*check_io)(int); + struct pollset_info* (*get_pollset)(int sched_nr); + void (*notify_port_task_executed)(ErtsPortTaskHandle *pthp); int (*max_files)(void); Uint (*size)(void); Eterm (*info)(void *); @@ -2126,12 +2198,11 @@ extern struct io_functions erts_io_funcs; void ERTS_CIO_EXPORT(erts_init_check_io)(void) { + int j; ERTS_CT_ASSERT((INT_MIN & (ERL_NIF_SELECT_STOP_CALLED | ERL_NIF_SELECT_STOP_SCHEDULED | ERL_NIF_SELECT_INVALID_EVENT | ERL_NIF_SELECT_FAILED)) == 0); - erts_atomic_init_nob(&erts_check_io_time, 0); - erts_atomic_init_nob(&pollset.in_poll_wait, 0); #ifdef ERTS_ENABLE_KERNEL_POLL ASSERT(erts_io_funcs.select == NULL); @@ -2140,6 +2211,8 @@ ERTS_CIO_EXPORT(erts_init_check_io)(void) erts_io_funcs.check_io_interrupt = ERTS_CIO_EXPORT(erts_check_io_interrupt); erts_io_funcs.check_io_interrupt_tmd= ERTS_CIO_EXPORT(erts_check_io_interrupt_timed); erts_io_funcs.check_io = ERTS_CIO_EXPORT(erts_check_io); + erts_io_funcs.get_pollset = ERTS_CIO_EXPORT(erts_get_pollset); + erts_io_funcs.notify_port_task_executed = ERTS_CIO_EXPORT(erts_io_notify_port_task_executed); erts_io_funcs.max_files = ERTS_CIO_EXPORT(erts_check_io_max_files); erts_io_funcs.size = ERTS_CIO_EXPORT(erts_check_io_size); erts_io_funcs.info = ERTS_CIO_EXPORT(erts_check_io_info); @@ -2149,25 +2222,34 @@ ERTS_CIO_EXPORT(erts_init_check_io)(void) #endif #endif + init_removed_fd_alloc(); + ERTS_CIO_POLL_INIT(); - pollset.ps = ERTS_CIO_NEW_POLLSET(); - - pollset.active_fd.six = 0; - pollset.active_fd.eix = 0; - erts_atomic32_init_nob(&pollset.active_fd.no, 0); - pollset.active_fd.size = ERTS_ACTIVE_FD_INC; - pollset.active_fd.array = erts_alloc(ERTS_ALC_T_ACTIVE_FD_ARR, - sizeof(ErtsSysFdType)*ERTS_ACTIVE_FD_INC); + pollsetv = erts_alloc(ERTS_ALC_T_POLLSET, + sizeof(struct pollset_info)*erts_no_schedulers); + for (j=0; j < erts_no_schedulers; j++) { + struct pollset_info* psi = &pollsetv[j]; + + erts_atomic_init_nob(&psi->check_io_time, 0); + erts_atomic_init_nob(&psi->in_poll_wait, 0); + psi->ps = ERTS_CIO_NEW_POLLSET(); + psi->active_fd.six = 0; + psi->active_fd.eix = 0; + erts_atomic32_init_nob(&psi->active_fd.no, 0); + psi->active_fd.size = ERTS_ACTIVE_FD_INC; + psi->active_fd.array = erts_alloc(ERTS_ALC_T_ACTIVE_FD_ARR, + sizeof(ErtsSysFdType)*ERTS_ACTIVE_FD_INC); #ifdef DEBUG - { - int i; - for (i = 0; i < ERTS_ACTIVE_FD_INC; i++) - pollset.active_fd.array[i] = ERTS_SYS_FD_INVALID; - } + { + int i; + for (i = 0; i < ERTS_ACTIVE_FD_INC; i++) + psi->active_fd.array[i] = ERTS_SYS_FD_INVALID; + } #endif - init_removed_fd_alloc(); - erts_atomic_init_nob(&pollset.removed_list, (erts_aint_t)NULL); + erts_atomic_init_nob(&psi->removed_list, (erts_aint_t)NULL); + } + { int i; for (i=0; i<DRV_EV_STATE_LOCK_CNT; i++) { @@ -2211,10 +2293,14 @@ ERTS_CIO_EXPORT(erts_check_io_max_files)(void) Uint ERTS_CIO_EXPORT(erts_check_io_size)(void) { - Uint res; + Uint res = 0; ErtsPollInfo pi; - ERTS_CIO_POLL_INFO(pollset.ps, &pi); - res = pi.memory_size; + int i; + + for (i = 0; i < erts_no_schedulers; i++) { + ERTS_CIO_POLL_INFO(pollsetv[i].ps, &pi); + res += pi.memory_size; + } #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS res += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len); #else @@ -2235,105 +2321,122 @@ Eterm ERTS_CIO_EXPORT(erts_check_io_info)(void *proc) { Process *p = (Process *) proc; - Eterm tags[16], values[16], res; - Uint sz, *szp, *hp, **hpp, memory_size; - Sint i; - ErtsPollInfo pi; - erts_aint_t cio_time = erts_atomic_read_acqb(&erts_check_io_time); - int active_fds = (int) erts_atomic32_read_acqb(&pollset.active_fd.no); - - while (1) { - erts_aint_t post_cio_time; - int post_active_fds; - - ERTS_CIO_POLL_INFO(pollset.ps, &pi); - - post_cio_time = erts_atomic_read_mb(&erts_check_io_time); - post_active_fds = (int) erts_atomic32_read_acqb(&pollset.active_fd.no); - if (cio_time == post_cio_time && active_fds == post_active_fds) - break; - cio_time = post_cio_time; - active_fds = post_active_fds; - } + Eterm tags[16], values[16], res, list = NIL; + Uint sz, *szp, *hp, **hpp; + ErtsPollInfo *piv; + Sint i, j; + + piv = erts_alloc(ERTS_ALC_T_TMP, + sizeof(ErtsPollInfo) * erts_no_schedulers); + + for (j = 0; j < erts_no_schedulers; j++) { + struct pollset_info *psi = &pollsetv[j]; + erts_aint_t cio_time = erts_atomic_read_acqb(&psi->check_io_time); + + piv[j].active_fds = (int) erts_atomic32_read_acqb(&psi->active_fd.no); + while (1) { + erts_aint_t post_cio_time; + int post_active_fds; + + ERTS_CIO_POLL_INFO(psi->ps, &piv[j]); + + post_cio_time = erts_atomic_read_mb(&psi->check_io_time); + post_active_fds = (int) erts_atomic32_read_acqb(&psi->active_fd.no); + if (cio_time == post_cio_time && piv[j].active_fds == post_active_fds) + break; + cio_time = post_cio_time; + piv[j].active_fds = post_active_fds; + } - memory_size = pi.memory_size; -#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - memory_size += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len); -#else - memory_size += safe_hash_table_sz(&drv_ev_state.tab); - { - SafeHashInfo hi; - safe_hash_get_info(&hi, &drv_ev_state.tab); - memory_size += hi.objs * sizeof(ErtsDrvEventState); + #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS + piv[j].memory_size += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len); + #else + piv[j].memory_size += safe_hash_table_sz(&drv_ev_state.tab); + { + SafeHashInfo hi; + safe_hash_get_info(&hi, &drv_ev_state.tab); + piv[j].memory_size += hi.objs * sizeof(ErtsDrvEventState); + } + erts_spin_lock(&drv_ev_state.prealloc_lock); + piv[j].memory_size += drv_ev_state.num_prealloc * sizeof(ErtsDrvEventState); + erts_spin_unlock(&drv_ev_state.prealloc_lock); + #endif } - erts_spin_lock(&drv_ev_state.prealloc_lock); - memory_size += drv_ev_state.num_prealloc * sizeof(ErtsDrvEventState); - erts_spin_unlock(&drv_ev_state.prealloc_lock); -#endif hpp = NULL; szp = &sz; sz = 0; bld_it: - i = 0; - tags[i] = erts_bld_atom(hpp, szp, "name"); - values[i++] = erts_bld_atom(hpp, szp, "erts_poll"); + for (j = erts_no_schedulers-1; j >= 0; j--) { + i = 0; - tags[i] = erts_bld_atom(hpp, szp, "primary"); - values[i++] = erts_bld_atom(hpp, szp, pi.primary); + tags[i] = erts_bld_atom(hpp, szp, "name"); + values[i++] = erts_bld_atom(hpp, szp, "erts_poll"); - tags[i] = erts_bld_atom(hpp, szp, "fallback"); - values[i++] = erts_bld_atom(hpp, szp, pi.fallback ? pi.fallback : "false"); + tags[i] = erts_bld_atom(hpp, szp, "primary"); + values[i++] = erts_bld_atom(hpp, szp, piv[j].primary); - tags[i] = erts_bld_atom(hpp, szp, "kernel_poll"); - values[i++] = erts_bld_atom(hpp, szp, - pi.kernel_poll ? pi.kernel_poll : "false"); + tags[i] = erts_bld_atom(hpp, szp, "fallback"); + values[i++] = erts_bld_atom(hpp, szp, piv[j].fallback ? piv[j].fallback : "false"); - tags[i] = erts_bld_atom(hpp, szp, "memory_size"); - values[i++] = erts_bld_uint(hpp, szp, memory_size); + tags[i] = erts_bld_atom(hpp, szp, "kernel_poll"); + values[i++] = erts_bld_atom(hpp, szp, + piv[j].kernel_poll ? piv[j].kernel_poll : "false"); - tags[i] = erts_bld_atom(hpp, szp, "total_poll_set_size"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.poll_set_size); + tags[i] = erts_bld_atom(hpp, szp, "memory_size"); + values[i++] = erts_bld_uint(hpp, szp, piv[j].memory_size); - if (pi.fallback) { - tags[i] = erts_bld_atom(hpp, szp, "fallback_poll_set_size"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.fallback_poll_set_size); - } + tags[i] = erts_bld_atom(hpp, szp, "total_poll_set_size"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].poll_set_size); + + if (piv[j].fallback) { + tags[i] = erts_bld_atom(hpp, szp, "fallback_poll_set_size"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].fallback_poll_set_size); + } - tags[i] = erts_bld_atom(hpp, szp, "lazy_updates"); - values[i++] = pi.lazy_updates ? am_true : am_false; + tags[i] = erts_bld_atom(hpp, szp, "lazy_updates"); + values[i++] = piv[j].lazy_updates ? am_true : am_false; - if (pi.lazy_updates) { - tags[i] = erts_bld_atom(hpp, szp, "pending_updates"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.pending_updates); - } + if (piv[j].lazy_updates) { + tags[i] = erts_bld_atom(hpp, szp, "pending_updates"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].pending_updates); + } - tags[i] = erts_bld_atom(hpp, szp, "batch_updates"); - values[i++] = pi.batch_updates ? am_true : am_false; + tags[i] = erts_bld_atom(hpp, szp, "batch_updates"); + values[i++] = piv[j].batch_updates ? am_true : am_false; - tags[i] = erts_bld_atom(hpp, szp, "concurrent_updates"); - values[i++] = pi.concurrent_updates ? am_true : am_false; + tags[i] = erts_bld_atom(hpp, szp, "concurrent_updates"); + values[i++] = piv[j].concurrent_updates ? am_true : am_false; - tags[i] = erts_bld_atom(hpp, szp, "max_fds"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.max_fds); + tags[i] = erts_bld_atom(hpp, szp, "max_fds"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].max_fds); - tags[i] = erts_bld_atom(hpp, szp, "active_fds"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) active_fds); + tags[i] = erts_bld_atom(hpp, szp, "active_fds"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].active_fds); -#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS - tags[i] = erts_bld_atom(hpp, szp, "no_avoided_wakeups"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.no_avoided_wakeups); + #ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS + tags[i] = erts_bld_atom(hpp, szp, "no_avoided_wakeups"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_avoided_wakeups); - tags[i] = erts_bld_atom(hpp, szp, "no_avoided_interrupts"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.no_avoided_interrupts); + tags[i] = erts_bld_atom(hpp, szp, "no_avoided_interrupts"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_avoided_interrupts); - tags[i] = erts_bld_atom(hpp, szp, "no_interrupt_timed"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.no_interrupt_timed); -#endif + tags[i] = erts_bld_atom(hpp, szp, "no_interrupt_timed"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_interrupt_timed); + #endif - res = erts_bld_2tup_list(hpp, szp, i, tags, values); + res = erts_bld_2tup_list(hpp, szp, i, tags, values); + + if (!hpp) { + *szp += 2; + } + else { + list = CONS(*hpp, res, list); + *hpp += 2; + } + } if (!hpp) { hp = HAlloc(p, sz); @@ -2342,7 +2445,9 @@ ERTS_CIO_EXPORT(erts_check_io_info)(void *proc) goto bld_it; } - return res; + erts_free(ERTS_ALC_T_TMP, piv); + + return list; } static ERTS_INLINE ErtsPollEvents @@ -2373,6 +2478,23 @@ print_events(ErtsPollEvents ev) return ev; } +static ERTS_INLINE void +print_flags(EventStateFlags f) +{ + const char* delim = ""; + if(f & ERTS_EV_FLAG_USED) { + erts_printf("%s","USED"); + delim = "|"; + } + if(f & ERTS_EV_FLAG_DEFER_IN_EV) { + erts_printf("%s%s", delim, "DRIN"); + delim = "|"; + } + if(f & ERTS_EV_FLAG_DEFER_OUT_EV) { + erts_printf("%s%s", delim, "DROUT"); + } +} + typedef struct { int used_fds; int num_errors; @@ -2419,7 +2541,8 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters) counters->used_fds++; #endif - erts_printf("fd=%d ", (int) fd); + erts_printf("pollset=%d fd=%d ", + (int)(state->pollset - pollsetv), (int) fd); #if defined(HAVE_FSTAT) && !defined(NO_FSTAT_ON_SYS_FD_TYPE) if (fstat((int) fd, &stat_buf) < 0) @@ -2497,7 +2620,15 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters) err = 1; } else { - err = 1; + ErtsPollEvents ev = cio_events; +#if ERTS_CIO_DEFER_ACTIVE_EVENTS + if (state->flags & ERTS_EV_FLAG_DEFER_IN_EV) + ev &= ~ERTS_POLL_EV_IN; + if (state->flags & ERTS_EV_FLAG_DEFER_OUT_EV) + ev &= ~ERTS_POLL_EV_OUT; +#endif + if (ev != ep_events) + err = 1; erts_printf("cio_ev="); print_events(cio_events); erts_printf(" ep_ev="); @@ -2607,6 +2738,8 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters) #endif } + erts_printf(" flags="); print_flags(state->flags); + if (err) { counters->num_errors++; erts_printf(" ERROR"); @@ -2619,18 +2752,23 @@ int ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip) { #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - int fd, len; + int fd, len, i; #endif - IterDebugCounters counters; + IterDebugCounters counters = {0}; #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS ErtsDrvEventState null_des; + null_des.pollset = NULL; null_des.driver.select = NULL; null_des.driver.nif = NULL; null_des.driver.stop.drv_ptr = NULL; null_des.events = 0; null_des.remove_cnt = 0; null_des.type = ERTS_EV_TYPE_NONE; + null_des.flags = 0; + + counters.epep = erts_alloc(ERTS_ALC_T_TMP, + sizeof(ErtsPollEvents)*drv_ev_state.max_fds); #endif erts_printf("--- fds in pollset --------------------------------------\n"); @@ -2642,28 +2780,25 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip) erts_thr_progress_block(); /* stop the world to avoid messy locking */ #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - counters.epep = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsPollEvents)*drv_ev_state.max_fds); - ERTS_POLL_EXPORT(erts_poll_get_selected_events)(pollset.ps, counters.epep, drv_ev_state.max_fds); - counters.internal_fds = 0; -#endif - counters.used_fds = 0; - counters.num_errors = 0; - counters.no_driver_select_structs = 0; - counters.no_enif_select_structs = 0; - -#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS len = erts_atomic_read_nob(&drv_ev_state.len); - for (fd = 0; fd < len; fd++) { - doit_erts_check_io_debug((void *) &drv_ev_state.v[fd], (void *) &counters); + + for (i = 0; i < erts_no_schedulers; i++) { + ERTS_POLL_EXPORT(erts_poll_get_selected_events)(pollsetv[i].ps, + counters.epep, + drv_ev_state.max_fds); + for (fd = 0; fd < len; fd++) { + if (drv_ev_state.v[fd].pollset == &pollsetv[i]) + doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters); + } } - for ( ; fd < drv_ev_state.max_fds; fd++) { - null_des.fd = fd; - doit_erts_check_io_debug((void *) &null_des, (void *) &counters); + for (fd = len ; fd < drv_ev_state.max_fds; fd++) { + null_des.fd = fd; + doit_erts_check_io_debug(&null_des, &counters); } #else - safe_hash_for_each(&drv_ev_state.tab, &doit_erts_check_io_debug, (void *) &counters); + safe_hash_for_each(&drv_ev_state.tab, &doit_erts_check_io_debug, + &counters); #endif - erts_thr_progress_unblock(); ciodip->no_used_fds = counters.used_fds; @@ -2696,7 +2831,6 @@ void ERTS_CIO_EXPORT(erts_lcnt_update_cio_locks)(int enable) { } #endif /* ERTS_ENABLE_LOCK_COUNT */ - #ifdef ERTS_ENABLE_KERNEL_POLL # ifdef ERTS_KERNEL_POLL_VERSION @@ -2724,6 +2858,16 @@ int enif_select(ErlNifEnv* env, ErlNifEvent event, return (*erts_io_funcs.enif_select)(env, event, flags, obj, pid, ref); } +struct pollset_info* erts_get_pollset(int sched_nr) +{ + return (*erts_io_funcs.get_pollset)(sched_nr); +} + +void erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp) +{ + erts_io_funcs.notify_port_task_executed(pthp); +} + void erts_check_io(int do_wait) { erts_io_funcs.check_io(do_wait); @@ -2751,15 +2895,15 @@ erts_check_io_debug(ErtsCheckIoDebugInfo *ip) return (*erts_io_funcs.check_io_debug)(ip); } -void erts_check_io_interrupt(int set) +void erts_check_io_interrupt(struct pollset_info* psi, int set) { - erts_io_funcs.check_io_interrupt(set); + erts_io_funcs.check_io_interrupt(psi, set); } -void erts_check_io_interrupt_timed(int set, +void erts_check_io_interrupt_timed(struct pollset_info* psi, int set, ErtsMonotonicTime timeout_time) { - erts_io_funcs.check_io_interrupt_tmd(set, timeout_time); + erts_io_funcs.check_io_interrupt_tmd(psi, set, timeout_time); } #ifdef ERTS_ENABLE_LOCK_COUNT diff --git a/erts/emulator/sys/common/erl_check_io.h b/erts/emulator/sys/common/erl_check_io.h index f4d7983002..ab53d91756 100644 --- a/erts/emulator/sys/common/erl_check_io.h +++ b/erts/emulator/sys/common/erl_check_io.h @@ -30,38 +30,29 @@ #include "sys.h" #include "erl_sys_driver.h" +struct pollset_info; + Uint erts_check_io_size(void); Eterm erts_check_io_info(void *); +void erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp); +void erts_check_io_async_sig_interrupt(struct pollset_info *psi); int erts_check_io_max_files(void); -void erts_check_io_interrupt(int); -void erts_check_io_interrupt_timed(int, ErtsMonotonicTime); void erts_check_io(int); void erts_init_check_io(void); #ifdef ERTS_ENABLE_LOCK_COUNT void erts_lcnt_update_cio_locks(int enable); #endif -extern erts_atomic_t erts_check_io_time; +void erts_check_io_interrupt(struct pollset_info*, int); +void erts_check_io_interrupt_timed(struct pollset_info*, int, ErtsMonotonicTime); +struct pollset_info* erts_get_pollset(int sched_num); typedef struct { ErtsPortTaskHandle task; erts_atomic_t executed_time; + struct pollset_info *pollset; } ErtsIoTask; -ERTS_GLB_INLINE void erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp); - -#if ERTS_GLB_INLINE_INCL_FUNC_DEF - -ERTS_GLB_INLINE void -erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp) -{ - ErtsIoTask *itp = (ErtsIoTask *) (((char *) pthp) - offsetof(ErtsIoTask, task)); - erts_aint_t ci_time = erts_atomic_read_acqb(&erts_check_io_time); - erts_atomic_set_relb(&itp->executed_time, ci_time); -} - -#endif - #endif /* ERL_CHECK_IO_H__ */ #if !defined(ERL_CHECK_IO_C__) && !defined(ERTS_ALLOC_C__) @@ -80,7 +71,7 @@ erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp) */ # define ERTS_CIO_DEFER_ACTIVE_EVENTS 1 #else -# define ERTS_CIO_DEFER_ACTIVE_EVENTS 0 +# define ERTS_CIO_DEFER_ACTIVE_EVENTS 1 #endif typedef struct { diff --git a/erts/emulator/sys/common/erl_poll.h b/erts/emulator/sys/common/erl_poll.h index 12dfc66e51..6c961205fe 100644 --- a/erts/emulator/sys/common/erl_poll.h +++ b/erts/emulator/sys/common/erl_poll.h @@ -225,6 +225,7 @@ typedef struct { long no_avoided_interrupts; long no_interrupt_timed; #endif + int active_fds; } ErtsPollInfo; void ERTS_POLL_EXPORT(erts_poll_interrupt)(ErtsPollSet, diff --git a/erts/emulator/sys/common/erl_sys_common_misc.c b/erts/emulator/sys/common/erl_sys_common_misc.c index 09237c81ce..420138ff0a 100644 --- a/erts/emulator/sys/common/erl_sys_common_misc.c +++ b/erts/emulator/sys/common/erl_sys_common_misc.c @@ -45,14 +45,6 @@ #endif #endif -/* - * erts_check_io_time is used by the erl_check_io implementation. The - * global erts_check_io_time variable is declared here since there - * (often) exist two versions of erl_check_io (kernel-poll and - * non-kernel-poll), and we dont want two versions of this variable. - */ -erts_atomic_t erts_check_io_time; - /* Written once and only once */ static int filename_encoding = ERL_FILENAME_UNKNOWN; |