aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/sys
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2017-05-12 18:05:03 +0200
committerLukas Larsson <[email protected]>2017-10-02 10:34:26 +0200
commit9a0970257aaaf9d343f8045548a34abf30dc0c92 (patch)
treec46a7a7e5acf262f11099d00f1b42bed89f5590b /erts/emulator/sys
parent48e77453536e49b07ddb6be63ba322ddaa5dac45 (diff)
downloadotp-9a0970257aaaf9d343f8045548a34abf30dc0c92.tar.gz
otp-9a0970257aaaf9d343f8045548a34abf30dc0c92.tar.bz2
otp-9a0970257aaaf9d343f8045548a34abf30dc0c92.zip
erts: Add multiple poll sets
Diffstat (limited to 'erts/emulator/sys')
-rw-r--r--erts/emulator/sys/common/erl_check_io.c734
-rw-r--r--erts/emulator/sys/common/erl_check_io.h27
-rw-r--r--erts/emulator/sys/common/erl_poll.h1
-rw-r--r--erts/emulator/sys/common/erl_sys_common_misc.c8
-rw-r--r--erts/emulator/sys/unix/sys.c6
-rw-r--r--erts/emulator/sys/unix/sys_drivers.c11
6 files changed, 463 insertions, 324 deletions
diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c
index 52584e4246..ff0f3ea121 100644
--- a/erts/emulator/sys/common/erl_check_io.c
+++ b/erts/emulator/sys/common/erl_check_io.c
@@ -94,6 +94,7 @@ static struct pollset_info
{
ErtsPollSet ps;
erts_atomic_t in_poll_wait; /* set while doing poll */
+ erts_atomic_t check_io_time;
struct {
int six; /* start index */
int eix; /* end index */
@@ -102,7 +103,7 @@ static struct pollset_info
ErtsSysFdType *array;
} active_fd;
erts_atomic_t removed_list; /* struct removed_fd* */
-}pollset;
+}*pollsetv;
#define NUM_OF_POLLSETS 1
@@ -114,21 +115,39 @@ int ERTS_CIO_EXPORT(enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags
Uint ERTS_CIO_EXPORT(erts_check_io_size)(void);
Eterm ERTS_CIO_EXPORT(erts_check_io_info)(void *);
int ERTS_CIO_EXPORT(erts_check_io_max_files)(void);
-void ERTS_CIO_EXPORT(erts_check_io_interrupt)(int);
-void ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(int, ErtsMonotonicTime);
+void ERTS_CIO_EXPORT(erts_check_io_interrupt)(struct pollset_info*, int);
+void ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(struct pollset_info*, int, ErtsMonotonicTime);
void ERTS_CIO_EXPORT(erts_check_io)(int);
+struct pollset_info* ERTS_CIO_EXPORT(erts_get_pollset)(int);
int ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *);
+void ERTS_CIO_EXPORT(erts_io_notify_port_task_executed)(ErtsPortTaskHandle *pthp);
#ifdef ERTS_ENABLE_LOCK_COUNT
void ERTS_CIO_EXPORT(erts_lcnt_update_cio_locks)(int enable);
#endif
#endif
+/* ToDo: Was inline in erl_check_io.h but now need struct pollset_info */
+void
+ERTS_CIO_EXPORT(erts_io_notify_port_task_executed)(ErtsPortTaskHandle *pthp)
+{
+ ErtsIoTask *itp = ErtsContainerStruct(pthp, ErtsIoTask, task);
+ erts_aint_t ci_time = erts_atomic_read_acqb(&itp->pollset->check_io_time);
+ erts_atomic_set_relb(&itp->executed_time, ci_time);
+}
+
+
+struct pollset_info* ERTS_CIO_EXPORT(erts_get_pollset)(int sched_nr)
+{
+ ASSERT(sched_nr > 0 && sched_nr <= erts_no_schedulers);
+ return &pollsetv[sched_nr - 1];
+}
typedef struct {
#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
SafeHashBucket hb;
#endif
ErtsSysFdType fd;
+ struct pollset_info *pollset;
struct {
ErtsDrvSelectDataState *select; /* ERTS_EV_TYPE_DRV_SEL */
ErtsNifSelectDataState *nif; /* ERTS_EV_TYPE_NIF */
@@ -209,6 +228,7 @@ static ERTS_INLINE ErtsDrvEventState* hash_new_drv_ev_state(ErtsSysFdType fd)
{
ErtsDrvEventState tmpl;
tmpl.fd = fd;
+ tmpl.pollset = NULL;
tmpl.driver.select = NULL;
tmpl.driver.nif = NULL;
tmpl.driver.stop.drv_ptr = NULL;
@@ -252,10 +272,11 @@ steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource*,
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(removed_fd, struct removed_fd, 64, ERTS_ALC_T_FD_LIST)
static ERTS_INLINE void
-init_iotask(ErtsIoTask *io_task)
+init_iotask(ErtsIoTask *io_task, struct pollset_info* psi)
{
erts_port_task_handle_init(&io_task->task);
erts_atomic_init_nob(&io_task->executed_time, ~((erts_aint_t) 0));
+ io_task->pollset = psi;
}
static ERTS_INLINE int
@@ -269,14 +290,14 @@ is_iotask_active(ErtsIoTask *io_task, erts_aint_t current_cio_time)
}
static ERTS_INLINE ErtsDrvSelectDataState *
-alloc_drv_select_data(void)
+alloc_drv_select_data(struct pollset_info* psi)
{
ErtsDrvSelectDataState *dsp = erts_alloc(ERTS_ALC_T_DRV_SEL_D_STATE,
sizeof(ErtsDrvSelectDataState));
dsp->inport = NIL;
dsp->outport = NIL;
- init_iotask(&dsp->iniotask);
- init_iotask(&dsp->outiotask);
+ init_iotask(&dsp->iniotask, psi);
+ init_iotask(&dsp->outiotask, psi);
return dsp;
}
@@ -307,11 +328,11 @@ free_nif_select_data(ErtsNifSelectDataState *dsp)
}
static ERTS_INLINE void
-remember_removed(ErtsDrvEventState *state, struct pollset_info* psi)
+remember_removed(ErtsDrvEventState *state)
{
struct removed_fd *fdlp;
ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd)));
- if (erts_atomic_read_nob(&psi->in_poll_wait)) {
+ if (erts_atomic_read_nob(&state->pollset->in_poll_wait)) {
erts_aint_t was_next, exp_next;
state->remove_cnt++;
ASSERT(state->remove_cnt > 0);
@@ -324,11 +345,11 @@ remember_removed(ErtsDrvEventState *state, struct pollset_info* psi)
#endif
/* Lockless atomic insertion in removed_list */
- was_next = erts_atomic_read_acqb(&psi->removed_list);
+ was_next = erts_atomic_read_acqb(&state->pollset->removed_list);
do {
exp_next = was_next;
fdlp->next = (struct removed_fd*) exp_next;
- was_next = erts_atomic_cmpxchg_mb(&psi->removed_list,
+ was_next = erts_atomic_cmpxchg_mb(&state->pollset->removed_list,
(erts_aint_t) fdlp,
exp_next);
}while (was_next != exp_next);
@@ -384,7 +405,7 @@ forget_removed(struct pollset_info* psi)
state->driver.stop.resource = NULL;
ASSERT(resource);
state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
+ state->flags = 0;
goto case_ERTS_EV_TYPE_NONE;
case ERTS_EV_TYPE_STOP_USE:
@@ -392,11 +413,12 @@ forget_removed(struct pollset_info* psi)
drv_ptr = state->driver.stop.drv_ptr;
ASSERT(drv_ptr);
state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
+ state->flags = 0;
state->driver.stop.drv_ptr = NULL;
/* Fall through */
case ERTS_EV_TYPE_NONE:
case_ERTS_EV_TYPE_NONE:
+ state->pollset = NULL;
#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
hash_erase_drv_ev_state(state);
#endif
@@ -455,6 +477,7 @@ grow_drv_ev_state(int min_ix)
sizeof(ErtsDrvEventState)*new_len));
for (i = old_len; i < new_len; i++) {
drv_ev_state.v[i].fd = (ErtsSysFdType) i;
+ drv_ev_state.v[i].pollset = NULL;
drv_ev_state.v[i].driver.select = NULL;
drv_ev_state.v[i].driver.stop.drv_ptr = NULL;
drv_ev_state.v[i].driver.nif = NULL;
@@ -541,7 +564,9 @@ deselect(ErtsDrvEventState *state, int mode)
}
}
- state->events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, rm_events, 0, &do_wake);
+ ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, rm_events,
+ 0, &do_wake);
+ state->events &= ~rm_events;
if (!(state->events)) {
switch (state->type) {
@@ -565,8 +590,8 @@ deselect(ErtsDrvEventState *state, int mode)
}
state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
- remember_removed(state, &pollset);
+ state->flags = 0;
+ remember_removed(state);
}
}
@@ -585,7 +610,7 @@ check_fd_cleanup(ErtsDrvEventState *state,
ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd)));
- current_cio_time = erts_atomic_read_acqb(&erts_check_io_time);
+ current_cio_time = erts_atomic_read_acqb(&state->pollset->check_io_time);
*free_select = NULL;
if (state->driver.select
&& (state->type != ERTS_EV_TYPE_DRV_SEL)
@@ -602,19 +627,27 @@ check_fd_cleanup(ErtsDrvEventState *state,
state->driver.nif = NULL;
}
-#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
if (((state->type != ERTS_EV_TYPE_NONE)
| state->remove_cnt
+ | (state->driver.nif != NULL)
| (state->driver.select != NULL)) == 0) {
+ state->pollset = NULL;
+#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
hash_erase_drv_ev_state(state);
-
- }
#endif
+ }
}
+#ifdef __WIN32__
+# define MUST_DEFER(MAY_SLEEP) 1
+#else
+# define MUST_DEFER(MAY_SLEEP) (MAY_SLEEP)
+#endif
+
static ERTS_INLINE int
-check_cleanup_active_fd(ErtsSysFdType fd,
+check_cleanup_active_fd(struct pollset_info* psi,
+ ErtsSysFdType fd,
#if ERTS_CIO_DEFER_ACTIVE_EVENTS
ErtsPollControlEntry *pce,
int *pce_ix,
@@ -637,14 +670,15 @@ check_cleanup_active_fd(ErtsSysFdType fd,
state = &drv_ev_state.v[(int) fd];
#else
state = hash_get_drv_ev_state(fd); /* may be NULL! */
- if (state)
#endif
+ if (state && state->pollset == psi)
{
if (state->driver.select) {
#if ERTS_CIO_DEFER_ACTIVE_EVENTS
if (is_iotask_active(&state->driver.select->iniotask, current_cio_time)) {
active = 1;
- if ((state->events & ERTS_POLL_EV_IN)
+ if (MUST_DEFER(may_sleep)
+ && (state->events & ERTS_POLL_EV_IN)
&& !(state->flags & ERTS_EV_FLAG_DEFER_IN_EV)) {
evoff |= ERTS_POLL_EV_IN;
state->flags |= ERTS_EV_FLAG_DEFER_IN_EV;
@@ -657,15 +691,17 @@ check_cleanup_active_fd(ErtsSysFdType fd,
}
if (is_iotask_active(&state->driver.select->outiotask, current_cio_time)) {
active = 1;
- if ((state->events & ERTS_POLL_EV_OUT)
+ if (MUST_DEFER(may_sleep)
+ && (state->events & ERTS_POLL_EV_OUT)
&& !(state->flags & ERTS_EV_FLAG_DEFER_OUT_EV)) {
evoff |= ERTS_POLL_EV_OUT;
state->flags |= ERTS_EV_FLAG_DEFER_OUT_EV;
}
}
else if (state->flags & ERTS_EV_FLAG_DEFER_OUT_EV) {
- if (state->events & ERTS_POLL_EV_OUT)
+ if (state->events & ERTS_POLL_EV_OUT) {
evon |= ERTS_POLL_EV_OUT;
+ }
state->flags &= ~ERTS_EV_FLAG_DEFER_OUT_EV;
}
if (active)
@@ -681,6 +717,12 @@ check_cleanup_active_fd(ErtsSysFdType fd,
free_select = state->driver.select;
state->driver.select = NULL;
}
+#if ERTS_CIO_DEFER_ACTIVE_EVENTS
+ if (evon) {
+ int do_wake = 0;
+ ERTS_CIO_POLL_CTL(psi->ps, state->fd, evon, 1, &do_wake);
+ }
+#endif
}
if (state->driver.nif) {
@@ -705,7 +747,7 @@ check_cleanup_active_fd(ErtsSysFdType fd,
}
if (rm_events) {
int do_wake = 0;
- state->events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd,
+ state->events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd,
rm_events, 0, &do_wake);
}
if (state->events)
@@ -720,7 +762,6 @@ check_cleanup_active_fd(ErtsSysFdType fd,
if (((state->type != ERTS_EV_TYPE_NONE) | state->remove_cnt | active) == 0)
hash_erase_drv_ev_state(state);
#endif
-
}
erts_mtx_unlock(mtx);
@@ -737,28 +778,23 @@ check_cleanup_active_fd(ErtsSysFdType fd,
pcep->events = evoff;
pcep->on = 0;
}
- if (evon) {
- ErtsPollControlEntry *pcep = &pce[(*pce_ix)++];
- pcep->fd = fd;
- pcep->events = evon;
- pcep->on = 1;
- }
#endif
return active;
}
static void
-check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep)
+check_cleanup_active_fds(struct pollset_info* psi,
+ erts_aint_t current_cio_time, int may_sleep)
{
- int six = pollset.active_fd.six;
- int eix = pollset.active_fd.eix;
- erts_aint32_t no = erts_atomic32_read_dirty(&pollset.active_fd.no);
- int size = pollset.active_fd.size;
+ int six = psi->active_fd.six;
+ int eix = psi->active_fd.eix;
+ erts_aint32_t no = erts_atomic32_read_dirty(&psi->active_fd.no);
+ const int size = psi->active_fd.size;
int ix = six;
#if ERTS_CIO_DEFER_ACTIVE_EVENTS
- /* every fd might add two entries */
- Uint pce_sz = 2*sizeof(ErtsPollControlEntry)*no;
+ /* every fd might add one entry */
+ Uint pce_sz = sizeof(ErtsPollControlEntry)*no;
ErtsPollControlEntry *pctrl_entries = (pce_sz
? erts_alloc(ERTS_ALC_T_TMP, pce_sz)
: NULL);
@@ -766,12 +802,12 @@ check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep)
#endif
while (ix != eix) {
- ErtsSysFdType fd = pollset.active_fd.array[ix];
+ ErtsSysFdType fd = psi->active_fd.array[ix];
int nix = ix + 1;
if (nix >= size)
nix = 0;
ASSERT(fd != ERTS_SYS_FD_INVALID);
- if (!check_cleanup_active_fd(fd,
+ if (!check_cleanup_active_fd(psi, fd,
#if ERTS_CIO_DEFER_ACTIVE_EVENTS
pctrl_entries,
&pctrl_ix,
@@ -781,14 +817,14 @@ check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep)
no--;
if (ix == six) {
#ifdef DEBUG
- pollset.active_fd.array[ix] = ERTS_SYS_FD_INVALID;
+ psi->active_fd.array[ix] = ERTS_SYS_FD_INVALID;
#endif
six = nix;
}
else {
- pollset.active_fd.array[ix] = pollset.active_fd.array[six];
+ psi->active_fd.array[ix] = psi->active_fd.array[six];
#ifdef DEBUG
- pollset.active_fd.array[six] = ERTS_SYS_FD_INVALID;
+ psi->active_fd.array[six] = ERTS_SYS_FD_INVALID;
#endif
six++;
if (six >= size)
@@ -801,53 +837,53 @@ check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep)
#if ERTS_CIO_DEFER_ACTIVE_EVENTS
ASSERT(pctrl_ix <= pce_sz/sizeof(ErtsPollControlEntry));
if (pctrl_ix)
- ERTS_CIO_POLL_CTLV(pollset.ps, pctrl_entries, pctrl_ix);
+ ERTS_CIO_POLL_CTLV(psi->ps, pctrl_entries, pctrl_ix);
if (pctrl_entries)
erts_free(ERTS_ALC_T_TMP, pctrl_entries);
#endif
- pollset.active_fd.six = six;
- pollset.active_fd.eix = eix;
- erts_atomic32_set_relb(&pollset.active_fd.no, no);
+ psi->active_fd.six = six;
+ psi->active_fd.eix = eix;
+ erts_atomic32_set_relb(&psi->active_fd.no, no);
}
-static void grow_active_fds(void)
+static void grow_active_fds(struct pollset_info *psi)
{
- ASSERT(pollset.active_fd.six == pollset.active_fd.eix);
- pollset.active_fd.six = 0;
- pollset.active_fd.eix = pollset.active_fd.size;
- pollset.active_fd.size += ERTS_ACTIVE_FD_INC;
- pollset.active_fd.array = erts_realloc(ERTS_ALC_T_ACTIVE_FD_ARR,
- pollset.active_fd.array,
- pollset.active_fd.size*sizeof(ErtsSysFdType));
+ ASSERT(psi->active_fd.six == psi->active_fd.eix);
+ psi->active_fd.six = 0;
+ psi->active_fd.eix = psi->active_fd.size;
+ psi->active_fd.size += ERTS_ACTIVE_FD_INC;
+ psi->active_fd.array = erts_realloc(ERTS_ALC_T_ACTIVE_FD_ARR,
+ psi->active_fd.array,
+ psi->active_fd.size*sizeof(ErtsSysFdType));
#ifdef DEBUG
{
int i;
- for (i = pollset.active_fd.eix + 1; i < pollset.active_fd.size; i++)
- pollset.active_fd.array[i] = ERTS_SYS_FD_INVALID;
+ for (i = psi->active_fd.eix + 1; i < psi->active_fd.size; i++)
+ psi->active_fd.array[i] = ERTS_SYS_FD_INVALID;
}
#endif
}
static ERTS_INLINE void
-add_active_fd(ErtsSysFdType fd)
+add_active_fd(struct pollset_info *psi, ErtsSysFdType fd)
{
- int eix = pollset.active_fd.eix;
- int size = pollset.active_fd.size;
+ int eix = psi->active_fd.eix;
+ const int size = psi->active_fd.size;
- pollset.active_fd.array[eix] = fd;
+ psi->active_fd.array[eix] = fd;
- erts_atomic32_set_relb(&pollset.active_fd.no,
- (erts_atomic32_read_dirty(&pollset.active_fd.no)
+ erts_atomic32_set_relb(&psi->active_fd.no,
+ (erts_atomic32_read_dirty(&psi->active_fd.no)
+ 1));
eix++;
if (eix >= size)
eix = 0;
- pollset.active_fd.eix = eix;
+ psi->active_fd.eix = eix;
- if (pollset.active_fd.six == eix) {
- grow_active_fds();
+ if (psi->active_fd.six == eix) {
+ grow_active_fds(psi);
}
}
@@ -862,9 +898,9 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
Eterm id = erts_drvport2id(ix);
ErtsSysFdType fd = (ErtsSysFdType) e;
ErtsPollEvents ctl_events = (ErtsPollEvents) 0;
- ErtsPollEvents new_events, old_events;
+ ErtsPollEvents old_events;
ErtsDrvEventState *state;
- int wake_poller;
+ int wake_poller = 0;
int ret;
ErtsDrvSelectDataState *free_select = NULL;
ErtsNifSelectDataState *free_nif = NULL;
@@ -898,22 +934,25 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
state = hash_get_drv_ev_state(fd); /* may be NULL! */
#endif
- if (!on && (mode&ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) {
- if (IS_FD_UNKNOWN(state)) {
- /* fast track to stop_select callback */
- stop_select_fn = prt->drv_ptr->stop_select;
-#ifdef USE_VM_PROBES
- strncpy(name, prt->drv_ptr->name,
- sizeof(DTRACE_CHARBUF_NAME(name))-1);
- name[sizeof(name)-1] = '\0';
-#endif
- ret = 0;
- goto done_unknown;
- }
- mode |= (ERL_DRV_READ | ERL_DRV_WRITE);
- wake_poller = 1; /* to eject fd from pollset (if needed) */
+ if (!on) {
+ if (IS_FD_UNKNOWN(state)) {
+ if ((mode&ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) {
+ /* fast track to stop_select callback */
+ stop_select_fn = prt->drv_ptr->stop_select;
+ #ifdef USE_VM_PROBES
+ strncpy(name, prt->drv_ptr->name,
+ sizeof(DTRACE_CHARBUF_NAME(name))-1);
+ name[sizeof(name)-1] = '\0';
+ #endif
+ }
+ ret = 0;
+ goto done_unknown;
+ }
+ else if ((mode&ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) {
+ mode |= (ERL_DRV_READ | ERL_DRV_WRITE);
+ wake_poller = 1; /* to eject fd from pollset (if needed) */
+ }
}
- else wake_poller = 0;
#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
if (state == NULL) {
@@ -951,7 +990,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
if (owner != id && is_not_nil(owner))
drv_select_steal(ix, state, mode, on);
}
- ctl_events |= ERTS_POLL_EV_IN;
+ ctl_events = ERTS_POLL_EV_IN;
}
if (mode & ERL_DRV_WRITE) {
if (state->type == ERTS_EV_TYPE_DRV_SEL) {
@@ -962,44 +1001,61 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
ctl_events |= ERTS_POLL_EV_OUT;
}
+
ASSERT((state->type == ERTS_EV_TYPE_DRV_SEL) ||
(state->type == ERTS_EV_TYPE_NONE && !state->events));
- if (!on && !(state->flags & ERTS_EV_FLAG_USED)
- && state->events && !(state->events & ~ctl_events)) {
- /* Old driver removing all events. At least wake poller.
- It will not make close() 100% safe but it will prevent
- actions delayed by poll timeout. */
- wake_poller = 1;
- }
+ old_events = state->events;
- new_events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, ctl_events, on, &wake_poller);
+ if (on) {
+ ctl_events &= ~old_events;
+ state->events |= ctl_events;
+ }
+ else {
+ ctl_events &= old_events;
+ state->events &= ~ctl_events;
- if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
- if (state->type == ERTS_EV_TYPE_DRV_SEL && !state->events) {
- state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
- state->driver.select->inport = NIL;
- state->driver.select->outport = NIL;
- }
- ret = -1;
- goto done;
+ if (!(state->flags & ERTS_EV_FLAG_USED)
+ && old_events && !state->events) {
+ /*
+ * Old driver removing all events. At least wake poller.
+ * It will not make close() 100% safe but it will prevent
+ * actions delayed by poll timeout.
+ */
+ wake_poller = 1;
+ }
}
- old_events = state->events;
+ if (ctl_events) {
+ ErtsPollEvents new_events;
- ASSERT(on
- ? (new_events == (state->events | ctl_events))
- : (new_events == (state->events & ~ctl_events)));
+ if (!state->pollset) {
+ ErtsSchedulerData* esdp = erts_get_scheduler_data();
+ ASSERT(esdp);
+ state->pollset = esdp->pollset;
+ }
- ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL
- || state->type == ERTS_EV_TYPE_NONE);
+ new_events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, ctl_events, on, &wake_poller);
- state->events = new_events;
- if (ctl_events) {
- if (on) {
+ if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
+ if (state->type == ERTS_EV_TYPE_DRV_SEL && !old_events) {
+ state->type = ERTS_EV_TYPE_NONE;
+ state->flags = 0;
+ state->driver.select->inport = NIL;
+ state->driver.select->outport = NIL;
+ }
+ ret = -1;
+ goto done;
+ }
+
+ ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL
+ || state->type == ERTS_EV_TYPE_NONE);
+ }
+
+ if (on) {
+ if (ctl_events) {
if (!state->driver.select)
- state->driver.select = alloc_drv_select_data();
+ state->driver.select = alloc_drv_select_data(state->pollset);
if (state->type == ERTS_EV_TYPE_NONE)
state->type = ERTS_EV_TYPE_DRV_SEL;
ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL);
@@ -1010,8 +1066,9 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
if (mode & ERL_DRV_USE) {
state->flags |= ERTS_EV_FLAG_USED;
}
- }
- else { /* off */
+ }
+ }
+ else { /* off */
if (state->type == ERTS_EV_TYPE_DRV_SEL) {
if (ctl_events & ERTS_POLL_EV_IN) {
abort_tasks(state, ERL_DRV_READ);
@@ -1021,20 +1078,20 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
abort_tasks(state, ERL_DRV_WRITE);
state->driver.select->outport = NIL;
}
- if (new_events == 0) {
+ if (state->events == 0) {
if (old_events != 0) {
- remember_removed(state, &pollset);
+ remember_removed(state);
}
if ((mode & ERL_DRV_USE) || !(state->flags & ERTS_EV_FLAG_USED)) {
state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
+ state->flags = 0;
}
/*else keep it, as fd will probably be selected upon again */
}
}
if ((mode & ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) {
erts_driver_t* drv_ptr = prt->drv_ptr;
- ASSERT(new_events==0);
+ ASSERT(state->events==0);
if (state->remove_cnt == 0 || !wake_poller) {
/* Safe to close fd now as it is not in pollset
or there was no need to eject fd (kernel poll) */
@@ -1053,7 +1110,6 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
}
}
}
- }
}
ret = 0;
@@ -1093,7 +1149,7 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
ErtsResource* resource = DATA_TO_RESOURCE(obj);
ErtsSysFdType fd = (ErtsSysFdType) e;
ErtsPollEvents ctl_events = (ErtsPollEvents) 0;
- ErtsPollEvents new_events, old_events;
+ ErtsPollEvents old_events;
ErtsDrvEventState *state;
int wake_poller;
int ret;
@@ -1192,32 +1248,45 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
ASSERT((state->type == ERTS_EV_TYPE_NIF) ||
(state->type == ERTS_EV_TYPE_NONE && !state->events));
- new_events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, ctl_events, on, &wake_poller);
+ old_events = state->events;
- if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
- if (state->type == ERTS_EV_TYPE_NIF && !state->events) {
- state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
- state->driver.nif->in.pid = NIL;
- state->driver.nif->out.pid = NIL;
- state->driver.nif->in.ddeselect_cnt = 0;
- state->driver.nif->out.ddeselect_cnt = 0;
- state->driver.stop.resource = NULL;
- }
- ret = INT_MIN | ERL_NIF_SELECT_FAILED;
- goto done;
+ if (on) {
+ ctl_events &= ~old_events;
+ state->events |= ctl_events;
+ }
+ else {
+ ctl_events &= old_events;
+ state->events &= ~ctl_events;
}
- old_events = state->events;
+ if (ctl_events) {
+ ErtsPollEvents new_events;
+
+ if (!state->pollset) {
+ state->pollset = erts_get_scheduler_data()->pollset;
+ }
- ASSERT(on
- ? (new_events == (state->events | ctl_events))
- : (new_events == (state->events & ~ctl_events)));
+ new_events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, ctl_events, on, &wake_poller);
+
+ if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
+ if (state->type == ERTS_EV_TYPE_NIF && !old_events) {
+ state->type = ERTS_EV_TYPE_NONE;
+ state->flags = 0;
+ state->driver.nif->in.pid = NIL;
+ state->driver.nif->out.pid = NIL;
+ state->driver.nif->in.ddeselect_cnt = 0;
+ state->driver.nif->out.ddeselect_cnt = 0;
+ state->driver.stop.resource = NULL;
+ }
+ ret = INT_MIN | ERL_NIF_SELECT_FAILED;
+ goto done;
+ }
+ ASSERT(new_events == state->events);
+ }
ASSERT(state->type == ERTS_EV_TYPE_NIF
|| state->type == ERTS_EV_TYPE_NONE);
- state->events = new_events;
if (on) {
const Eterm recipient = pid ? pid->pid : env->proc->common.id;
Uint32* refn;
@@ -1230,7 +1299,7 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
}
ASSERT(state->type == ERTS_EV_TYPE_NIF);
ASSERT(state->driver.stop.resource == resource);
- if (ctl_events & ERTS_POLL_EV_IN) {
+ if (mode & ERL_DRV_READ) {
state->driver.nif->in.pid = recipient;
if (is_immed(ref)) {
state->driver.nif->in.immed = ref;
@@ -1244,7 +1313,7 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
}
state->driver.nif->in.ddeselect_cnt = 0;
}
- if (ctl_events & ERTS_POLL_EV_OUT) {
+ if (mode & ERL_DRV_WRITE) {
state->driver.nif->out.pid = recipient;
if (is_immed(ref)) {
state->driver.nif->out.immed = ref;
@@ -1267,10 +1336,10 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
state->driver.nif->in.ddeselect_cnt = 0;
state->driver.nif->out.ddeselect_cnt = 0;
if (old_events != 0) {
- remember_removed(state, &pollset);
+ remember_removed(state);
}
}
- ASSERT(new_events==0);
+ ASSERT(state->events==0);
if (state->remove_cnt == 0 || !wake_poller) {
/*
* Safe to close fd now as it is not in pollset
@@ -1567,7 +1636,7 @@ steal_pending_stop_use(erts_dsprintf_buf_t *dsbufp, ErlDrvPort ix,
erts_ddll_dereference_driver(state->driver.stop.drv_ptr->handle);
}
state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
+ state->flags = 0;
state->driver.stop.drv_ptr = NULL;
}
else {
@@ -1606,7 +1675,7 @@ steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource* resource,
enif_release_resource(state->driver.stop.resource);
state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
+ state->flags = 0;
state->driver.stop.resource = NULL;
}
else {
@@ -1656,7 +1725,7 @@ iready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time)
(ErlDrvEvent) state->fd) != 0) {
stale_drv_select(id, state, ERL_DRV_READ);
}
- add_active_fd(state->fd);
+ add_active_fd(state->pollset, state->fd);
}
}
@@ -1674,7 +1743,7 @@ oready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time)
(ErlDrvEvent) state->fd) != 0) {
stale_drv_select(id, state, ERL_DRV_WRITE);
}
- add_active_fd(state->fd);
+ add_active_fd(state->pollset, state->fd);
}
}
@@ -1729,19 +1798,20 @@ send_event_tuple(struct erts_nif_select_event* e, ErtsResource* resource,
static void bad_fd_in_pollset(ErtsDrvEventState *, Eterm inport, Eterm outport);
void
-ERTS_CIO_EXPORT(erts_check_io_interrupt)(int set)
+ERTS_CIO_EXPORT(erts_check_io_interrupt)(struct pollset_info *psi, int set)
{
- ERTS_CIO_POLL_INTR(pollset.ps, set);
+ ERTS_CIO_POLL_INTR(psi->ps, set);
}
void
-ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(int set,
+ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(struct pollset_info *psi,
+ int set,
ErtsMonotonicTime timeout_time)
{
- ERTS_CIO_POLL_INTR_TMD(pollset.ps, set, timeout_time);
+ ERTS_CIO_POLL_INTR_TMD(psi->ps, set, timeout_time);
}
-#if !ERTS_CIO_DEFER_ACTIVE_EVENTS
+#ifndef __WIN32__
/*
* Number of ignored events, for a lingering fd added by enif_select(),
* until we deselect fd-event from pollset.
@@ -1761,8 +1831,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
int poll_ret, i;
erts_aint_t current_cio_time;
ErtsSchedulerData *esdp = erts_get_scheduler_data();
-
- ASSERT(esdp);
+ struct pollset_info *psi = esdp->pollset;
restart:
@@ -1781,24 +1850,25 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
* erts_check_io_time, since only one thread can
* check io at a time.
*/
- current_cio_time = erts_atomic_read_dirty(&erts_check_io_time);
+ current_cio_time = erts_atomic_read_dirty(&psi->check_io_time);
current_cio_time++;
- erts_atomic_set_relb(&erts_check_io_time, current_cio_time);
+ erts_atomic_set_relb(&psi->check_io_time, current_cio_time);
- check_cleanup_active_fds(current_cio_time,
+ check_cleanup_active_fds(psi,
+ current_cio_time,
timeout_time != ERTS_POLL_NO_TIMEOUT);
#ifdef ERTS_ENABLE_LOCK_CHECK
erts_lc_check_exact(NULL, 0); /* No locks should be locked */
#endif
- pollres_len = erts_atomic32_read_dirty(&pollset.active_fd.no) + ERTS_CHECK_IO_POLL_RES_LEN;
+ pollres_len = erts_atomic32_read_dirty(&psi->active_fd.no) + ERTS_CHECK_IO_POLL_RES_LEN;
pollres = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsPollResFd)*pollres_len);
- erts_atomic_set_nob(&pollset.in_poll_wait, 1);
+ erts_atomic_set_nob(&psi->in_poll_wait, 1);
- poll_ret = ERTS_CIO_POLL_WAIT(pollset.ps, pollres, &pollres_len, timeout_time);
+ poll_ret = ERTS_CIO_POLL_WAIT(psi->ps, pollres, &pollres_len, timeout_time);
#ifdef ERTS_ENABLE_LOCK_CHECK
erts_lc_check_exact(NULL, 0); /* No locks should be locked */
@@ -1810,8 +1880,8 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
#endif
if (poll_ret != 0) {
- erts_atomic_set_nob(&pollset.in_poll_wait, 0);
- forget_removed(&pollset);
+ erts_atomic_set_nob(&psi->in_poll_wait, 0);
+ forget_removed(psi);
erts_free(ERTS_ALC_T_TMP, pollres);
if (poll_ret == EAGAIN) {
goto restart;
@@ -1838,6 +1908,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
erts_mtx_lock(fd_mtx(fd));
+
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
state = &drv_ev_state.v[ (int) fd];
#else
@@ -1848,7 +1919,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
#endif
/* Skip this fd if it was removed from pollset */
- if (is_removed(state)) {
+ if (is_removed(state) || state->pollset != psi) {
goto next_pollres;
}
@@ -1884,7 +1955,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
bad_fd_in_pollset(state,
state->driver.select->inport,
state->driver.select->outport);
- add_active_fd(state->fd);
+ add_active_fd(psi, state->fd);
}
break;
}
@@ -1915,7 +1986,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
resource = state->driver.stop.resource;
state->driver.nif->out.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT;
state->driver.nif->out.pid = NIL;
- add_active_fd(state->fd);
+ add_active_fd(psi, state->fd);
}
else {
ASSERT(state->driver.nif->out.ddeselect_cnt >= 2);
@@ -1928,7 +1999,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
resource = state->driver.stop.resource;
state->driver.nif->in.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT;
state->driver.nif->in.pid = NIL;
- add_active_fd(state->fd);
+ add_active_fd(psi, state->fd);
}
else {
ASSERT(state->driver.nif->in.ddeselect_cnt >= 2);
@@ -1938,7 +2009,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
}
else if (revents & ERTS_POLL_EV_NVAL) {
bad_fd_in_pollset(state, NIL, NIL);
- add_active_fd(state->fd);
+ add_active_fd(psi, state->fd);
}
erts_mtx_unlock(fd_mtx(fd));
@@ -1963,7 +2034,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
(int) state->type);
ASSERT(0);
deselect(state, 0);
- add_active_fd(state->fd);
+ add_active_fd(psi, state->fd);
break;
}
}
@@ -1973,9 +2044,9 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
next_pollres_unlocked:;
}
- erts_atomic_set_nob(&pollset.in_poll_wait, 0);
+ erts_atomic_set_nob(&psi->in_poll_wait, 0);
erts_free(ERTS_ALC_T_TMP, pollres);
- forget_removed(&pollset);
+ forget_removed(psi);
}
static void
@@ -2096,16 +2167,17 @@ static void drv_ev_state_free(void *des)
}
#endif
-
#ifdef ERTS_ENABLE_KERNEL_POLL
struct io_functions {
int (*select)(ErlDrvPort, ErlDrvEvent, int, int);
int (*enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, const ErlNifPid*, Eterm);
- void (*check_io_as_interrupt)(void);
- void (*check_io_interrupt)(int);
- void (*check_io_interrupt_tmd)(int, ErtsMonotonicTime);
+ void (*check_io_as_interrupt)(struct pollset_info*);
+ void (*check_io_interrupt)(struct pollset_info*, int);
+ void (*check_io_interrupt_tmd)(struct pollset_info*, int, ErtsMonotonicTime);
void (*check_io)(int);
+ struct pollset_info* (*get_pollset)(int sched_nr);
+ void (*notify_port_task_executed)(ErtsPortTaskHandle *pthp);
int (*max_files)(void);
Uint (*size)(void);
Eterm (*info)(void *);
@@ -2126,12 +2198,11 @@ extern struct io_functions erts_io_funcs;
void
ERTS_CIO_EXPORT(erts_init_check_io)(void)
{
+ int j;
ERTS_CT_ASSERT((INT_MIN & (ERL_NIF_SELECT_STOP_CALLED |
ERL_NIF_SELECT_STOP_SCHEDULED |
ERL_NIF_SELECT_INVALID_EVENT |
ERL_NIF_SELECT_FAILED)) == 0);
- erts_atomic_init_nob(&erts_check_io_time, 0);
- erts_atomic_init_nob(&pollset.in_poll_wait, 0);
#ifdef ERTS_ENABLE_KERNEL_POLL
ASSERT(erts_io_funcs.select == NULL);
@@ -2140,6 +2211,8 @@ ERTS_CIO_EXPORT(erts_init_check_io)(void)
erts_io_funcs.check_io_interrupt = ERTS_CIO_EXPORT(erts_check_io_interrupt);
erts_io_funcs.check_io_interrupt_tmd= ERTS_CIO_EXPORT(erts_check_io_interrupt_timed);
erts_io_funcs.check_io = ERTS_CIO_EXPORT(erts_check_io);
+ erts_io_funcs.get_pollset = ERTS_CIO_EXPORT(erts_get_pollset);
+ erts_io_funcs.notify_port_task_executed = ERTS_CIO_EXPORT(erts_io_notify_port_task_executed);
erts_io_funcs.max_files = ERTS_CIO_EXPORT(erts_check_io_max_files);
erts_io_funcs.size = ERTS_CIO_EXPORT(erts_check_io_size);
erts_io_funcs.info = ERTS_CIO_EXPORT(erts_check_io_info);
@@ -2149,25 +2222,34 @@ ERTS_CIO_EXPORT(erts_init_check_io)(void)
#endif
#endif
+ init_removed_fd_alloc();
+
ERTS_CIO_POLL_INIT();
- pollset.ps = ERTS_CIO_NEW_POLLSET();
-
- pollset.active_fd.six = 0;
- pollset.active_fd.eix = 0;
- erts_atomic32_init_nob(&pollset.active_fd.no, 0);
- pollset.active_fd.size = ERTS_ACTIVE_FD_INC;
- pollset.active_fd.array = erts_alloc(ERTS_ALC_T_ACTIVE_FD_ARR,
- sizeof(ErtsSysFdType)*ERTS_ACTIVE_FD_INC);
+ pollsetv = erts_alloc(ERTS_ALC_T_POLLSET,
+ sizeof(struct pollset_info)*erts_no_schedulers);
+ for (j=0; j < erts_no_schedulers; j++) {
+ struct pollset_info* psi = &pollsetv[j];
+
+ erts_atomic_init_nob(&psi->check_io_time, 0);
+ erts_atomic_init_nob(&psi->in_poll_wait, 0);
+ psi->ps = ERTS_CIO_NEW_POLLSET();
+ psi->active_fd.six = 0;
+ psi->active_fd.eix = 0;
+ erts_atomic32_init_nob(&psi->active_fd.no, 0);
+ psi->active_fd.size = ERTS_ACTIVE_FD_INC;
+ psi->active_fd.array = erts_alloc(ERTS_ALC_T_ACTIVE_FD_ARR,
+ sizeof(ErtsSysFdType)*ERTS_ACTIVE_FD_INC);
#ifdef DEBUG
- {
- int i;
- for (i = 0; i < ERTS_ACTIVE_FD_INC; i++)
- pollset.active_fd.array[i] = ERTS_SYS_FD_INVALID;
- }
+ {
+ int i;
+ for (i = 0; i < ERTS_ACTIVE_FD_INC; i++)
+ psi->active_fd.array[i] = ERTS_SYS_FD_INVALID;
+ }
#endif
- init_removed_fd_alloc();
- erts_atomic_init_nob(&pollset.removed_list, (erts_aint_t)NULL);
+ erts_atomic_init_nob(&psi->removed_list, (erts_aint_t)NULL);
+ }
+
{
int i;
for (i=0; i<DRV_EV_STATE_LOCK_CNT; i++) {
@@ -2211,10 +2293,14 @@ ERTS_CIO_EXPORT(erts_check_io_max_files)(void)
Uint
ERTS_CIO_EXPORT(erts_check_io_size)(void)
{
- Uint res;
+ Uint res = 0;
ErtsPollInfo pi;
- ERTS_CIO_POLL_INFO(pollset.ps, &pi);
- res = pi.memory_size;
+ int i;
+
+ for (i = 0; i < erts_no_schedulers; i++) {
+ ERTS_CIO_POLL_INFO(pollsetv[i].ps, &pi);
+ res += pi.memory_size;
+ }
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
res += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len);
#else
@@ -2235,105 +2321,122 @@ Eterm
ERTS_CIO_EXPORT(erts_check_io_info)(void *proc)
{
Process *p = (Process *) proc;
- Eterm tags[16], values[16], res;
- Uint sz, *szp, *hp, **hpp, memory_size;
- Sint i;
- ErtsPollInfo pi;
- erts_aint_t cio_time = erts_atomic_read_acqb(&erts_check_io_time);
- int active_fds = (int) erts_atomic32_read_acqb(&pollset.active_fd.no);
-
- while (1) {
- erts_aint_t post_cio_time;
- int post_active_fds;
-
- ERTS_CIO_POLL_INFO(pollset.ps, &pi);
-
- post_cio_time = erts_atomic_read_mb(&erts_check_io_time);
- post_active_fds = (int) erts_atomic32_read_acqb(&pollset.active_fd.no);
- if (cio_time == post_cio_time && active_fds == post_active_fds)
- break;
- cio_time = post_cio_time;
- active_fds = post_active_fds;
- }
+ Eterm tags[16], values[16], res, list = NIL;
+ Uint sz, *szp, *hp, **hpp;
+ ErtsPollInfo *piv;
+ Sint i, j;
+
+ piv = erts_alloc(ERTS_ALC_T_TMP,
+ sizeof(ErtsPollInfo) * erts_no_schedulers);
+
+ for (j = 0; j < erts_no_schedulers; j++) {
+ struct pollset_info *psi = &pollsetv[j];
+ erts_aint_t cio_time = erts_atomic_read_acqb(&psi->check_io_time);
+
+ piv[j].active_fds = (int) erts_atomic32_read_acqb(&psi->active_fd.no);
+ while (1) {
+ erts_aint_t post_cio_time;
+ int post_active_fds;
+
+ ERTS_CIO_POLL_INFO(psi->ps, &piv[j]);
+
+ post_cio_time = erts_atomic_read_mb(&psi->check_io_time);
+ post_active_fds = (int) erts_atomic32_read_acqb(&psi->active_fd.no);
+ if (cio_time == post_cio_time && piv[j].active_fds == post_active_fds)
+ break;
+ cio_time = post_cio_time;
+ piv[j].active_fds = post_active_fds;
+ }
- memory_size = pi.memory_size;
-#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- memory_size += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len);
-#else
- memory_size += safe_hash_table_sz(&drv_ev_state.tab);
- {
- SafeHashInfo hi;
- safe_hash_get_info(&hi, &drv_ev_state.tab);
- memory_size += hi.objs * sizeof(ErtsDrvEventState);
+ #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ piv[j].memory_size += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len);
+ #else
+ piv[j].memory_size += safe_hash_table_sz(&drv_ev_state.tab);
+ {
+ SafeHashInfo hi;
+ safe_hash_get_info(&hi, &drv_ev_state.tab);
+ piv[j].memory_size += hi.objs * sizeof(ErtsDrvEventState);
+ }
+ erts_spin_lock(&drv_ev_state.prealloc_lock);
+ piv[j].memory_size += drv_ev_state.num_prealloc * sizeof(ErtsDrvEventState);
+ erts_spin_unlock(&drv_ev_state.prealloc_lock);
+ #endif
}
- erts_spin_lock(&drv_ev_state.prealloc_lock);
- memory_size += drv_ev_state.num_prealloc * sizeof(ErtsDrvEventState);
- erts_spin_unlock(&drv_ev_state.prealloc_lock);
-#endif
hpp = NULL;
szp = &sz;
sz = 0;
bld_it:
- i = 0;
- tags[i] = erts_bld_atom(hpp, szp, "name");
- values[i++] = erts_bld_atom(hpp, szp, "erts_poll");
+ for (j = erts_no_schedulers-1; j >= 0; j--) {
+ i = 0;
- tags[i] = erts_bld_atom(hpp, szp, "primary");
- values[i++] = erts_bld_atom(hpp, szp, pi.primary);
+ tags[i] = erts_bld_atom(hpp, szp, "name");
+ values[i++] = erts_bld_atom(hpp, szp, "erts_poll");
- tags[i] = erts_bld_atom(hpp, szp, "fallback");
- values[i++] = erts_bld_atom(hpp, szp, pi.fallback ? pi.fallback : "false");
+ tags[i] = erts_bld_atom(hpp, szp, "primary");
+ values[i++] = erts_bld_atom(hpp, szp, piv[j].primary);
- tags[i] = erts_bld_atom(hpp, szp, "kernel_poll");
- values[i++] = erts_bld_atom(hpp, szp,
- pi.kernel_poll ? pi.kernel_poll : "false");
+ tags[i] = erts_bld_atom(hpp, szp, "fallback");
+ values[i++] = erts_bld_atom(hpp, szp, piv[j].fallback ? piv[j].fallback : "false");
- tags[i] = erts_bld_atom(hpp, szp, "memory_size");
- values[i++] = erts_bld_uint(hpp, szp, memory_size);
+ tags[i] = erts_bld_atom(hpp, szp, "kernel_poll");
+ values[i++] = erts_bld_atom(hpp, szp,
+ piv[j].kernel_poll ? piv[j].kernel_poll : "false");
- tags[i] = erts_bld_atom(hpp, szp, "total_poll_set_size");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.poll_set_size);
+ tags[i] = erts_bld_atom(hpp, szp, "memory_size");
+ values[i++] = erts_bld_uint(hpp, szp, piv[j].memory_size);
- if (pi.fallback) {
- tags[i] = erts_bld_atom(hpp, szp, "fallback_poll_set_size");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.fallback_poll_set_size);
- }
+ tags[i] = erts_bld_atom(hpp, szp, "total_poll_set_size");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].poll_set_size);
+
+ if (piv[j].fallback) {
+ tags[i] = erts_bld_atom(hpp, szp, "fallback_poll_set_size");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].fallback_poll_set_size);
+ }
- tags[i] = erts_bld_atom(hpp, szp, "lazy_updates");
- values[i++] = pi.lazy_updates ? am_true : am_false;
+ tags[i] = erts_bld_atom(hpp, szp, "lazy_updates");
+ values[i++] = piv[j].lazy_updates ? am_true : am_false;
- if (pi.lazy_updates) {
- tags[i] = erts_bld_atom(hpp, szp, "pending_updates");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.pending_updates);
- }
+ if (piv[j].lazy_updates) {
+ tags[i] = erts_bld_atom(hpp, szp, "pending_updates");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].pending_updates);
+ }
- tags[i] = erts_bld_atom(hpp, szp, "batch_updates");
- values[i++] = pi.batch_updates ? am_true : am_false;
+ tags[i] = erts_bld_atom(hpp, szp, "batch_updates");
+ values[i++] = piv[j].batch_updates ? am_true : am_false;
- tags[i] = erts_bld_atom(hpp, szp, "concurrent_updates");
- values[i++] = pi.concurrent_updates ? am_true : am_false;
+ tags[i] = erts_bld_atom(hpp, szp, "concurrent_updates");
+ values[i++] = piv[j].concurrent_updates ? am_true : am_false;
- tags[i] = erts_bld_atom(hpp, szp, "max_fds");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.max_fds);
+ tags[i] = erts_bld_atom(hpp, szp, "max_fds");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].max_fds);
- tags[i] = erts_bld_atom(hpp, szp, "active_fds");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) active_fds);
+ tags[i] = erts_bld_atom(hpp, szp, "active_fds");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].active_fds);
-#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS
- tags[i] = erts_bld_atom(hpp, szp, "no_avoided_wakeups");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.no_avoided_wakeups);
+ #ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS
+ tags[i] = erts_bld_atom(hpp, szp, "no_avoided_wakeups");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_avoided_wakeups);
- tags[i] = erts_bld_atom(hpp, szp, "no_avoided_interrupts");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.no_avoided_interrupts);
+ tags[i] = erts_bld_atom(hpp, szp, "no_avoided_interrupts");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_avoided_interrupts);
- tags[i] = erts_bld_atom(hpp, szp, "no_interrupt_timed");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.no_interrupt_timed);
-#endif
+ tags[i] = erts_bld_atom(hpp, szp, "no_interrupt_timed");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_interrupt_timed);
+ #endif
- res = erts_bld_2tup_list(hpp, szp, i, tags, values);
+ res = erts_bld_2tup_list(hpp, szp, i, tags, values);
+
+ if (!hpp) {
+ *szp += 2;
+ }
+ else {
+ list = CONS(*hpp, res, list);
+ *hpp += 2;
+ }
+ }
if (!hpp) {
hp = HAlloc(p, sz);
@@ -2342,7 +2445,9 @@ ERTS_CIO_EXPORT(erts_check_io_info)(void *proc)
goto bld_it;
}
- return res;
+ erts_free(ERTS_ALC_T_TMP, piv);
+
+ return list;
}
static ERTS_INLINE ErtsPollEvents
@@ -2373,6 +2478,23 @@ print_events(ErtsPollEvents ev)
return ev;
}
+static ERTS_INLINE void
+print_flags(EventStateFlags f)
+{
+ const char* delim = "";
+ if(f & ERTS_EV_FLAG_USED) {
+ erts_printf("%s","USED");
+ delim = "|";
+ }
+ if(f & ERTS_EV_FLAG_DEFER_IN_EV) {
+ erts_printf("%s%s", delim, "DRIN");
+ delim = "|";
+ }
+ if(f & ERTS_EV_FLAG_DEFER_OUT_EV) {
+ erts_printf("%s%s", delim, "DROUT");
+ }
+}
+
typedef struct {
int used_fds;
int num_errors;
@@ -2419,7 +2541,8 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters)
counters->used_fds++;
#endif
- erts_printf("fd=%d ", (int) fd);
+ erts_printf("pollset=%d fd=%d ",
+ (int)(state->pollset - pollsetv), (int) fd);
#if defined(HAVE_FSTAT) && !defined(NO_FSTAT_ON_SYS_FD_TYPE)
if (fstat((int) fd, &stat_buf) < 0)
@@ -2497,7 +2620,15 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters)
err = 1;
}
else {
- err = 1;
+ ErtsPollEvents ev = cio_events;
+#if ERTS_CIO_DEFER_ACTIVE_EVENTS
+ if (state->flags & ERTS_EV_FLAG_DEFER_IN_EV)
+ ev &= ~ERTS_POLL_EV_IN;
+ if (state->flags & ERTS_EV_FLAG_DEFER_OUT_EV)
+ ev &= ~ERTS_POLL_EV_OUT;
+#endif
+ if (ev != ep_events)
+ err = 1;
erts_printf("cio_ev=");
print_events(cio_events);
erts_printf(" ep_ev=");
@@ -2607,6 +2738,8 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters)
#endif
}
+ erts_printf(" flags="); print_flags(state->flags);
+
if (err) {
counters->num_errors++;
erts_printf(" ERROR");
@@ -2619,18 +2752,23 @@ int
ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip)
{
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- int fd, len;
+ int fd, len, i;
#endif
- IterDebugCounters counters;
+ IterDebugCounters counters = {0};
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
ErtsDrvEventState null_des;
+ null_des.pollset = NULL;
null_des.driver.select = NULL;
null_des.driver.nif = NULL;
null_des.driver.stop.drv_ptr = NULL;
null_des.events = 0;
null_des.remove_cnt = 0;
null_des.type = ERTS_EV_TYPE_NONE;
+ null_des.flags = 0;
+
+ counters.epep = erts_alloc(ERTS_ALC_T_TMP,
+ sizeof(ErtsPollEvents)*drv_ev_state.max_fds);
#endif
erts_printf("--- fds in pollset --------------------------------------\n");
@@ -2642,28 +2780,25 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip)
erts_thr_progress_block(); /* stop the world to avoid messy locking */
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- counters.epep = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsPollEvents)*drv_ev_state.max_fds);
- ERTS_POLL_EXPORT(erts_poll_get_selected_events)(pollset.ps, counters.epep, drv_ev_state.max_fds);
- counters.internal_fds = 0;
-#endif
- counters.used_fds = 0;
- counters.num_errors = 0;
- counters.no_driver_select_structs = 0;
- counters.no_enif_select_structs = 0;
-
-#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
len = erts_atomic_read_nob(&drv_ev_state.len);
- for (fd = 0; fd < len; fd++) {
- doit_erts_check_io_debug((void *) &drv_ev_state.v[fd], (void *) &counters);
+
+ for (i = 0; i < erts_no_schedulers; i++) {
+ ERTS_POLL_EXPORT(erts_poll_get_selected_events)(pollsetv[i].ps,
+ counters.epep,
+ drv_ev_state.max_fds);
+ for (fd = 0; fd < len; fd++) {
+ if (drv_ev_state.v[fd].pollset == &pollsetv[i])
+ doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters);
+ }
}
- for ( ; fd < drv_ev_state.max_fds; fd++) {
- null_des.fd = fd;
- doit_erts_check_io_debug((void *) &null_des, (void *) &counters);
+ for (fd = len ; fd < drv_ev_state.max_fds; fd++) {
+ null_des.fd = fd;
+ doit_erts_check_io_debug(&null_des, &counters);
}
#else
- safe_hash_for_each(&drv_ev_state.tab, &doit_erts_check_io_debug, (void *) &counters);
+ safe_hash_for_each(&drv_ev_state.tab, &doit_erts_check_io_debug,
+ &counters);
#endif
-
erts_thr_progress_unblock();
ciodip->no_used_fds = counters.used_fds;
@@ -2696,7 +2831,6 @@ void ERTS_CIO_EXPORT(erts_lcnt_update_cio_locks)(int enable) {
}
#endif /* ERTS_ENABLE_LOCK_COUNT */
-
#ifdef ERTS_ENABLE_KERNEL_POLL
# ifdef ERTS_KERNEL_POLL_VERSION
@@ -2724,6 +2858,16 @@ int enif_select(ErlNifEnv* env, ErlNifEvent event,
return (*erts_io_funcs.enif_select)(env, event, flags, obj, pid, ref);
}
+struct pollset_info* erts_get_pollset(int sched_nr)
+{
+ return (*erts_io_funcs.get_pollset)(sched_nr);
+}
+
+void erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp)
+{
+ erts_io_funcs.notify_port_task_executed(pthp);
+}
+
void erts_check_io(int do_wait)
{
erts_io_funcs.check_io(do_wait);
@@ -2751,15 +2895,15 @@ erts_check_io_debug(ErtsCheckIoDebugInfo *ip)
return (*erts_io_funcs.check_io_debug)(ip);
}
-void erts_check_io_interrupt(int set)
+void erts_check_io_interrupt(struct pollset_info* psi, int set)
{
- erts_io_funcs.check_io_interrupt(set);
+ erts_io_funcs.check_io_interrupt(psi, set);
}
-void erts_check_io_interrupt_timed(int set,
+void erts_check_io_interrupt_timed(struct pollset_info* psi, int set,
ErtsMonotonicTime timeout_time)
{
- erts_io_funcs.check_io_interrupt_tmd(set, timeout_time);
+ erts_io_funcs.check_io_interrupt_tmd(psi, set, timeout_time);
}
#ifdef ERTS_ENABLE_LOCK_COUNT
diff --git a/erts/emulator/sys/common/erl_check_io.h b/erts/emulator/sys/common/erl_check_io.h
index f4d7983002..ab53d91756 100644
--- a/erts/emulator/sys/common/erl_check_io.h
+++ b/erts/emulator/sys/common/erl_check_io.h
@@ -30,38 +30,29 @@
#include "sys.h"
#include "erl_sys_driver.h"
+struct pollset_info;
+
Uint erts_check_io_size(void);
Eterm erts_check_io_info(void *);
+void erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp);
+void erts_check_io_async_sig_interrupt(struct pollset_info *psi);
int erts_check_io_max_files(void);
-void erts_check_io_interrupt(int);
-void erts_check_io_interrupt_timed(int, ErtsMonotonicTime);
void erts_check_io(int);
void erts_init_check_io(void);
#ifdef ERTS_ENABLE_LOCK_COUNT
void erts_lcnt_update_cio_locks(int enable);
#endif
-extern erts_atomic_t erts_check_io_time;
+void erts_check_io_interrupt(struct pollset_info*, int);
+void erts_check_io_interrupt_timed(struct pollset_info*, int, ErtsMonotonicTime);
+struct pollset_info* erts_get_pollset(int sched_num);
typedef struct {
ErtsPortTaskHandle task;
erts_atomic_t executed_time;
+ struct pollset_info *pollset;
} ErtsIoTask;
-ERTS_GLB_INLINE void erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp);
-
-#if ERTS_GLB_INLINE_INCL_FUNC_DEF
-
-ERTS_GLB_INLINE void
-erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp)
-{
- ErtsIoTask *itp = (ErtsIoTask *) (((char *) pthp) - offsetof(ErtsIoTask, task));
- erts_aint_t ci_time = erts_atomic_read_acqb(&erts_check_io_time);
- erts_atomic_set_relb(&itp->executed_time, ci_time);
-}
-
-#endif
-
#endif /* ERL_CHECK_IO_H__ */
#if !defined(ERL_CHECK_IO_C__) && !defined(ERTS_ALLOC_C__)
@@ -80,7 +71,7 @@ erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp)
*/
# define ERTS_CIO_DEFER_ACTIVE_EVENTS 1
#else
-# define ERTS_CIO_DEFER_ACTIVE_EVENTS 0
+# define ERTS_CIO_DEFER_ACTIVE_EVENTS 1
#endif
typedef struct {
diff --git a/erts/emulator/sys/common/erl_poll.h b/erts/emulator/sys/common/erl_poll.h
index 12dfc66e51..6c961205fe 100644
--- a/erts/emulator/sys/common/erl_poll.h
+++ b/erts/emulator/sys/common/erl_poll.h
@@ -225,6 +225,7 @@ typedef struct {
long no_avoided_interrupts;
long no_interrupt_timed;
#endif
+ int active_fds;
} ErtsPollInfo;
void ERTS_POLL_EXPORT(erts_poll_interrupt)(ErtsPollSet,
diff --git a/erts/emulator/sys/common/erl_sys_common_misc.c b/erts/emulator/sys/common/erl_sys_common_misc.c
index 09237c81ce..420138ff0a 100644
--- a/erts/emulator/sys/common/erl_sys_common_misc.c
+++ b/erts/emulator/sys/common/erl_sys_common_misc.c
@@ -45,14 +45,6 @@
#endif
#endif
-/*
- * erts_check_io_time is used by the erl_check_io implementation. The
- * global erts_check_io_time variable is declared here since there
- * (often) exist two versions of erl_check_io (kernel-poll and
- * non-kernel-poll), and we dont want two versions of this variable.
- */
-erts_atomic_t erts_check_io_time;
-
/* Written once and only once */
static int filename_encoding = ERL_FILENAME_UNKNOWN;
diff --git a/erts/emulator/sys/unix/sys.c b/erts/emulator/sys/unix/sys.c
index 09c515291a..acd7920e86 100644
--- a/erts/emulator/sys/unix/sys.c
+++ b/erts/emulator/sys/unix/sys.c
@@ -499,6 +499,7 @@ static void signal_notify_requested(Eterm type) {
static ERTS_INLINE void
break_requested(void)
{
+ int i;
/*
* just set a flag - checked for and handled by
* scheduler threads erts_check_io() (not signal handler).
@@ -510,7 +511,10 @@ break_requested(void)
erts_exit(ERTS_INTR_EXIT, "");
ERTS_SET_BREAK_REQUESTED;
- erts_check_io_interrupt(1);
+ for (i=0; i < erts_no_schedulers; i++) {
+ /* Make sure we don't sleep in poll */
+ erts_check_io_interrupt(ERTS_SCHEDULER_IX(i)->pollset, 1);
+ }
}
static RETSIGTYPE request_break(int signum)
diff --git a/erts/emulator/sys/unix/sys_drivers.c b/erts/emulator/sys/unix/sys_drivers.c
index 7c9a532fed..0228e1af54 100644
--- a/erts/emulator/sys/unix/sys_drivers.c
+++ b/erts/emulator/sys/unix/sys_drivers.c
@@ -1723,8 +1723,6 @@ static ErlDrvData forker_start(ErlDrvPort port_num, char* name,
SET_NONBLOCKING(forker_fd);
- driver_select(port_num, forker_fd, ERL_DRV_READ|ERL_DRV_USE, 1);
-
return (ErlDrvData)port_num;
}
@@ -1821,10 +1819,19 @@ static void forker_ready_output(ErlDrvData e, ErlDrvEvent fd)
static ErlDrvSSizeT forker_control(ErlDrvData e, unsigned int cmd, char *buf,
ErlDrvSizeT len, char **rbuf, ErlDrvSizeT rlen)
{
+ static int first_call = 1;
ErtsSysForkerProto *proto = (ErtsSysForkerProto *)buf;
ErlDrvPort port_num = (ErlDrvPort)e;
int res;
+ if (first_call) {
+ /*
+ * Do driver_select here when schedulers and their pollsets have started.
+ */
+ driver_select(port_num, forker_fd, ERL_DRV_READ|ERL_DRV_USE, 1);
+ first_call = 0;
+ }
+
driver_enq(port_num, buf, len);
if (driver_sizeq(port_num) > sizeof(*proto)) {
return 0;