aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/sys
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/sys')
-rw-r--r--erts/emulator/sys/common/erl_check_io.c890
-rw-r--r--erts/emulator/sys/common/erl_check_io.h18
-rw-r--r--erts/emulator/sys/common/erl_poll.c1
-rw-r--r--erts/emulator/sys/unix/sys.c10
4 files changed, 793 insertions, 126 deletions
diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c
index 44a77f3ea5..4fc95624c7 100644
--- a/erts/emulator/sys/common/erl_check_io.c
+++ b/erts/emulator/sys/common/erl_check_io.c
@@ -53,6 +53,8 @@ typedef char EventStateType;
#define ERTS_EV_TYPE_DRV_SEL ((EventStateType) 1) /* driver_select */
#define ERTS_EV_TYPE_DRV_EV ((EventStateType) 2) /* driver_event */
#define ERTS_EV_TYPE_STOP_USE ((EventStateType) 3) /* pending stop_select */
+#define ERTS_EV_TYPE_NIF ((EventStateType) 4) /* enif_select */
+#define ERTS_EV_TYPE_STOP_NIF ((EventStateType) 5) /* pending nif stop */
typedef char EventStateFlags;
#define ERTS_EV_FLAG_USED ((EventStateFlags) 1) /* ERL_DRV_USE has been turned on */
@@ -122,7 +124,11 @@ typedef struct {
#if ERTS_CIO_HAVE_DRV_EVENT
ErtsDrvEventDataState *event; /* ERTS_EV_TYPE_DRV_EV */
#endif
- erts_driver_t* drv_ptr; /* ERTS_EV_TYPE_STOP_USE */
+ ErtsNifSelectDataState *nif; /* ERTS_EV_TYPE_NIF */
+ union {
+ erts_driver_t* drv_ptr; /* ERTS_EV_TYPE_STOP_USE */
+ ErlNifResource* resource; /* ERTS_EV_TYPE_STOP_NIF */
+ }stop;
} driver;
ErtsPollEvents events;
unsigned short remove_cnt; /* number of removed_fd's referring to this fd */
@@ -211,24 +217,35 @@ static ERTS_INLINE void hash_erase_drv_ev_state(ErtsDrvEventState *state)
#endif /* !ERTS_SYS_CONTINOUS_FD_NUMBERS */
static void stale_drv_select(Eterm id, ErtsDrvEventState *state, int mode);
-static void select_steal(ErlDrvPort ix, ErtsDrvEventState *state,
- int mode, int on);
-static void print_select_op(erts_dsprintf_buf_t *dsbufp,
- ErlDrvPort ix, ErtsSysFdType fd, int mode, int on);
+static void drv_select_steal(ErlDrvPort ix, ErtsDrvEventState *state,
+ int mode, int on);
+static void nif_select_steal(ErtsDrvEventState *state, int mode,
+ ErlNifResource* resource, Eterm ref);
+
+static void print_drv_select_op(erts_dsprintf_buf_t *dsbufp,
+ ErlDrvPort ix, ErtsSysFdType fd, int mode, int on);
+static void print_nif_select_op(erts_dsprintf_buf_t*, ErtsSysFdType,
+ int mode, ErlNifResource*, Eterm ref);
+
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
-static void select_large_fd_error(ErlDrvPort, ErtsSysFdType, int, int);
+static void drv_select_large_fd_error(ErlDrvPort, ErtsSysFdType, int, int);
+static void nif_select_large_fd_error(ErtsSysFdType, int, ErlNifResource*,Eterm ref);
#endif
#if ERTS_CIO_HAVE_DRV_EVENT
-static void event_steal(ErlDrvPort ix, ErtsDrvEventState *state,
+static void drv_event_steal(ErlDrvPort ix, ErtsDrvEventState *state,
ErlDrvEventData event_data);
-static void print_event_op(erts_dsprintf_buf_t *dsbufp,
- ErlDrvPort, ErtsSysFdType, ErlDrvEventData);
+static void print_drv_event_op(erts_dsprintf_buf_t *dsbufp,
+ ErlDrvPort, ErtsSysFdType, ErlDrvEventData);
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
static void event_large_fd_error(ErlDrvPort, ErtsSysFdType, ErlDrvEventData);
#endif
#endif
-static void steal_pending_stop_select(erts_dsprintf_buf_t*, ErlDrvPort,
- ErtsDrvEventState*, int mode, int on);
+static void
+steal_pending_stop_use(erts_dsprintf_buf_t*, ErlDrvPort, ErtsDrvEventState*,
+ int mode, int on);
+static void
+steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErlNifResource*,
+ ErtsDrvEventState *state, int mode, int on);
#ifdef ERTS_SMP
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(removed_fd, struct removed_fd, 64, ERTS_ALC_T_FD_LIST)
@@ -263,6 +280,18 @@ alloc_drv_select_data(void)
return dsp;
}
+static ERTS_INLINE ErtsNifSelectDataState *
+alloc_nif_select_data(void)
+{
+ ErtsNifSelectDataState *dsp = erts_alloc(ERTS_ALC_T_NIF_SEL_D_STATE,
+ sizeof(ErtsNifSelectDataState));
+ dsp->in.pid = NIL;
+ dsp->out.pid = NIL;
+ dsp->in.ddeselect_cnt = 0;
+ dsp->out.ddeselect_cnt = 0;
+ return dsp;
+}
+
static ERTS_INLINE void
free_drv_select_data(ErtsDrvSelectDataState *dsp)
{
@@ -271,6 +300,12 @@ free_drv_select_data(ErtsDrvSelectDataState *dsp)
erts_free(ERTS_ALC_T_DRV_SEL_D_STATE, dsp);
}
+static ERTS_INLINE void
+free_nif_select_data(ErtsNifSelectDataState *dsp)
+{
+ erts_free(ERTS_ALC_T_NIF_SEL_D_STATE, dsp);
+}
+
#if ERTS_CIO_HAVE_DRV_EVENT
static ERTS_INLINE ErtsDrvEventDataState *
@@ -352,6 +387,7 @@ forget_removed(struct pollset_info* psi)
erts_smp_spin_unlock(&psi->removed_list_lock);
while (fdlp) {
+ ErlNifResource* resource = NULL;
erts_driver_t* drv_ptr = NULL;
erts_smp_mtx_t* mtx;
ErtsSysFdType fd;
@@ -372,15 +408,25 @@ forget_removed(struct pollset_info* psi)
ASSERT(state->remove_cnt > 0);
if (--state->remove_cnt == 0) {
switch (state->type) {
+ case ERTS_EV_TYPE_STOP_NIF:
+ /* Now we can call stop */
+ resource = state->driver.stop.resource;
+ state->driver.stop.resource = NULL;
+ ASSERT(resource);
+ state->type = ERTS_EV_TYPE_NONE;
+ state->flags &= ~ERTS_EV_FLAG_USED;
+ goto case_ERTS_EV_TYPE_NONE;
+
case ERTS_EV_TYPE_STOP_USE:
/* Now we can call stop_select */
- drv_ptr = state->driver.drv_ptr;
+ drv_ptr = state->driver.stop.drv_ptr;
ASSERT(drv_ptr);
state->type = ERTS_EV_TYPE_NONE;
state->flags &= ~ERTS_EV_FLAG_USED;
- state->driver.drv_ptr = NULL;
+ state->driver.stop.drv_ptr = NULL;
/* Fall through */
- case ERTS_EV_TYPE_NONE:
+ case ERTS_EV_TYPE_NONE:
+ case_ERTS_EV_TYPE_NONE:
#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
hash_erase_drv_ev_state(state);
#endif
@@ -403,6 +449,11 @@ forget_removed(struct pollset_info* psi)
erts_ddll_dereference_driver(drv_ptr->handle);
}
}
+ if (resource) {
+ erts_resource_stop(resource, (ErlNifEvent)fd, 0);
+ enif_release_resource(resource->data);
+ }
+
tofree = fdlp;
fdlp = fdlp->next;
removed_fd_free(tofree);
@@ -440,7 +491,8 @@ grow_drv_ev_state(int min_ix)
#if ERTS_CIO_HAVE_DRV_EVENT
drv_ev_state[i].driver.event = NULL;
#endif
- drv_ev_state[i].driver.drv_ptr = NULL;
+ drv_ev_state[i].driver.stop.drv_ptr = NULL;
+ drv_ev_state[i].driver.nif = NULL;
drv_ev_state[i].events = 0;
drv_ev_state[i].remove_cnt = 0;
drv_ev_state[i].type = ERTS_EV_TYPE_NONE;
@@ -480,6 +532,7 @@ abort_tasks(ErtsDrvEventState *state, int mode)
ERTS_EV_TYPE_DRV_EV);
return;
#endif
+ case ERTS_EV_TYPE_NIF:
case ERTS_EV_TYPE_NONE:
return;
default:
@@ -534,6 +587,14 @@ deselect(ErtsDrvEventState *state, int mode)
if (!(state->events)) {
switch (state->type) {
+ case ERTS_EV_TYPE_NIF:
+ state->driver.nif->in.pid = NIL;
+ state->driver.nif->out.pid = NIL;
+ state->driver.nif->in.ddeselect_cnt = 0;
+ state->driver.nif->out.ddeselect_cnt = 0;
+ enif_release_resource(state->driver.stop.resource);
+ state->driver.stop.resource = NULL;
+ break;
case ERTS_EV_TYPE_DRV_SEL:
state->driver.select->inport = NIL;
state->driver.select->outport = NIL;
@@ -569,7 +630,8 @@ check_fd_cleanup(ErtsDrvEventState *state,
#if ERTS_CIO_HAVE_DRV_EVENT
ErtsDrvEventDataState **free_event,
#endif
- ErtsDrvSelectDataState **free_select)
+ ErtsDrvSelectDataState **free_select,
+ ErtsNifSelectDataState **free_nif)
{
erts_aint_t current_cio_time;
@@ -586,6 +648,12 @@ check_fd_cleanup(ErtsDrvEventState *state,
state->driver.select = NULL;
}
+ *free_nif = NULL;
+ if (state->driver.nif && (state->type != ERTS_EV_TYPE_NIF)) {
+ *free_nif = state->driver.nif;
+ state->driver.nif = NULL;
+ }
+
#if ERTS_CIO_HAVE_DRV_EVENT
*free_event = NULL;
if (state->driver.event
@@ -617,12 +685,14 @@ check_cleanup_active_fd(ErtsSysFdType fd,
ErtsPollControlEntry *pce,
int *pce_ix,
#endif
- erts_aint_t current_cio_time)
+ erts_aint_t current_cio_time,
+ int may_sleep)
{
ErtsDrvEventState *state;
int active = 0;
erts_smp_mtx_t *mtx = fd_mtx(fd);
void *free_select = NULL;
+ void *free_nif = NULL;
#if ERTS_CIO_HAVE_DRV_EVENT
void *free_event = NULL;
#endif
@@ -682,6 +752,41 @@ check_cleanup_active_fd(ErtsSysFdType fd,
}
}
+ if (state->driver.nif) {
+#if ERTS_CIO_DEFER_ACTIVE_EVENTS
+# error Windows
+#endif
+ int do_wake = 0;
+ ErtsPollEvents rm_events = 0;
+ if (state->driver.nif->in.ddeselect_cnt) {
+ ASSERT(state->type == ERTS_EV_TYPE_NIF);
+ ASSERT(state->events & ERTS_POLL_EV_IN);
+ ASSERT(is_nil(state->driver.nif->in.pid));
+ if (may_sleep || state->driver.nif->in.ddeselect_cnt == 1) {
+ rm_events = ERTS_POLL_EV_IN;
+ state->driver.nif->in.ddeselect_cnt = 0;
+ }
+ }
+ if (state->driver.nif->out.ddeselect_cnt) {
+ ASSERT(state->type == ERTS_EV_TYPE_NIF);
+ ASSERT(state->events & ERTS_POLL_EV_OUT);
+ ASSERT(is_nil(state->driver.nif->out.pid));
+ if (may_sleep || state->driver.nif->out.ddeselect_cnt == 1) {
+ rm_events |= ERTS_POLL_EV_OUT;
+ state->driver.nif->out.ddeselect_cnt = 0;
+ }
+ }
+ if (rm_events) {
+ state->events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, rm_events, 0, &do_wake);
+ }
+ if (state->events)
+ active = 1;
+ else if (state->type != ERTS_EV_TYPE_NIF) {
+ free_nif = state->driver.nif;
+ state->driver.nif = NULL;
+ }
+ }
+
#if ERTS_CIO_HAVE_DRV_EVENT
if (state->driver.event) {
if (is_iotask_active(&state->driver.event->iotask, current_cio_time)) {
@@ -722,6 +827,8 @@ check_cleanup_active_fd(ErtsSysFdType fd,
if (free_select)
free_drv_select_data(free_select);
+ if (free_nif)
+ free_nif_select_data(free_nif);
#if ERTS_CIO_HAVE_DRV_EVENT
if (free_event)
free_drv_event_data(free_event);
@@ -746,7 +853,7 @@ check_cleanup_active_fd(ErtsSysFdType fd,
}
static void
-check_cleanup_active_fds(erts_aint_t current_cio_time)
+check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep)
{
int six = pollset.active_fd.six;
int eix = pollset.active_fd.eix;
@@ -773,7 +880,8 @@ check_cleanup_active_fds(erts_aint_t current_cio_time)
pctrl_entries,
&pctrl_ix,
#endif
- current_cio_time)) {
+ current_cio_time,
+ may_sleep)) {
no--;
if (ix == six) {
#ifdef DEBUG
@@ -807,13 +915,30 @@ check_cleanup_active_fds(erts_aint_t current_cio_time)
erts_smp_atomic32_set_relb(&pollset.active_fd.no, no);
}
+static void grow_active_fds(void)
+{
+ ASSERT(pollset.active_fd.six == pollset.active_fd.eix);
+ pollset.active_fd.six = 0;
+ pollset.active_fd.eix = pollset.active_fd.size;
+ pollset.active_fd.size += ERTS_ACTIVE_FD_INC;
+ pollset.active_fd.array = erts_realloc(ERTS_ALC_T_ACTIVE_FD_ARR,
+ pollset.active_fd.array,
+ pollset.active_fd.size*sizeof(ErtsSysFdType));
+#ifdef DEBUG
+ {
+ int i;
+ for (i = pollset.active_fd.eix + 1; i < pollset.active_fd.size; i++)
+ pollset.active_fd.array[i] = ERTS_SYS_FD_INVALID;
+ }
+#endif
+}
+
static ERTS_INLINE void
add_active_fd(ErtsSysFdType fd)
{
int eix = pollset.active_fd.eix;
int size = pollset.active_fd.size;
-
pollset.active_fd.array[eix] = fd;
erts_smp_atomic32_set_relb(&pollset.active_fd.no,
@@ -823,25 +948,11 @@ add_active_fd(ErtsSysFdType fd)
eix++;
if (eix >= size)
eix = 0;
- if (pollset.active_fd.six == eix) {
- pollset.active_fd.six = 0;
- eix = size;
- size += ERTS_ACTIVE_FD_INC;
- pollset.active_fd.array = erts_realloc(ERTS_ALC_T_ACTIVE_FD_ARR,
- pollset.active_fd.array,
- sizeof(ErtsSysFdType)*size);
- pollset.active_fd.size = size;
-#ifdef DEBUG
- {
- int i;
- for (i = eix + 1; i < size; i++)
- pollset.active_fd.array[i] = ERTS_SYS_FD_INVALID;
- }
-#endif
+ pollset.active_fd.eix = eix;
+ if (pollset.active_fd.six == eix) {
+ grow_active_fds();
}
-
- pollset.active_fd.eix = eix;
}
int
@@ -863,6 +974,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
ErtsDrvEventDataState *free_event = NULL;
#endif
ErtsDrvSelectDataState *free_select = NULL;
+ ErtsNifSelectDataState *free_nif = NULL;
#ifdef USE_VM_PROBES
DTRACE_CHARBUF(name, 64);
#endif
@@ -878,7 +990,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
return -1;
}
if (fd >= max_fds) {
- select_large_fd_error(ix, fd, mode, on);
+ drv_select_large_fd_error(ix, fd, mode, on);
return -1;
}
grow_drv_ev_state(fd);
@@ -916,26 +1028,38 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
}
#endif
+ switch (state->type) {
#if ERTS_CIO_HAVE_DRV_EVENT
- if (state->type == ERTS_EV_TYPE_DRV_EV)
- select_steal(ix, state, mode, on);
+ case ERTS_EV_TYPE_DRV_EV:
#endif
- if (state->type == ERTS_EV_TYPE_STOP_USE) {
- erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
- print_select_op(dsbufp, ix, state->fd, mode, on);
- steal_pending_stop_select(dsbufp, ix, state, mode, on);
- if (state->type == ERTS_EV_TYPE_STOP_USE) {
- ret = 0;
- goto done; /* stop_select still pending */
- }
- ASSERT(state->type == ERTS_EV_TYPE_NONE);
- }
+ case ERTS_EV_TYPE_NIF:
+ drv_select_steal(ix, state, mode, on);
+ break;
+ case ERTS_EV_TYPE_STOP_USE: {
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ print_drv_select_op(dsbufp, ix, state->fd, mode, on);
+ steal_pending_stop_use(dsbufp, ix, state, mode, on);
+ if (state->type == ERTS_EV_TYPE_STOP_USE) {
+ ret = 0;
+ goto done; /* stop_select still pending */
+ }
+ ASSERT(state->type == ERTS_EV_TYPE_NONE);
+ break;
+ }
+ case ERTS_EV_TYPE_STOP_NIF: {
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ print_drv_select_op(dsbufp, ix, state->fd, mode, on);
+ steal_pending_stop_nif(dsbufp, NULL, state, mode, on);
+ ASSERT(state->type == ERTS_EV_TYPE_NONE);
+ break;
+
+ }}
if (mode & ERL_DRV_READ) {
if (state->type == ERTS_EV_TYPE_DRV_SEL) {
Eterm owner = state->driver.select->inport;
if (owner != id && is_not_nil(owner))
- select_steal(ix, state, mode, on);
+ drv_select_steal(ix, state, mode, on);
}
ctl_events |= ERTS_POLL_EV_IN;
}
@@ -943,7 +1067,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
if (state->type == ERTS_EV_TYPE_DRV_SEL) {
Eterm owner = state->driver.select->outport;
if (owner != id && is_not_nil(owner))
- select_steal(ix, state, mode, on);
+ drv_select_steal(ix, state, mode, on);
}
ctl_events |= ERTS_POLL_EV_OUT;
}
@@ -1033,7 +1157,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
else {
/* Not safe to close fd, postpone stop_select callback. */
state->type = ERTS_EV_TYPE_STOP_USE;
- state->driver.drv_ptr = drv_ptr;
+ state->driver.stop.drv_ptr = drv_ptr;
if (drv_ptr->handle) {
erts_ddll_reference_referenced_driver(drv_ptr->handle);
}
@@ -1050,7 +1174,8 @@ done:
#if ERTS_CIO_HAVE_DRV_EVENT
&free_event,
#endif
- &free_select);
+ &free_select,
+ &free_nif);
done_unknown:
erts_smp_mtx_unlock(fd_mtx(fd));
@@ -1063,6 +1188,263 @@ done_unknown:
}
if (free_select)
free_drv_select_data(free_select);
+ if (free_nif)
+ free_nif_select_data(free_nif);
+
+#if ERTS_CIO_HAVE_DRV_EVENT
+ if (free_event)
+ free_drv_event_data(free_event);
+#endif
+ return ret;
+}
+
+enum ErlNifSelectReturn
+ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
+ ErlNifEvent e,
+ enum ErlNifSelectFlags mode,
+ void* obj,
+ Eterm ref)
+{
+ int on;
+ const Eterm id = env->proc->common.id;
+ ErlNifResource* resource = DATA_TO_RESOURCE(obj);
+ ErtsSysFdType fd = (ErtsSysFdType) e;
+ ErtsPollEvents ctl_events = (ErtsPollEvents) 0;
+ ErtsPollEvents new_events, old_events;
+ ErtsDrvEventState *state;
+ int wake_poller;
+ enum ErlNifSelectReturn ret;
+ enum { NO_STOP=0, CALL_STOP, CALL_STOP_AND_RELEASE } call_stop = NO_STOP;
+#if ERTS_CIO_HAVE_DRV_EVENT
+ ErtsDrvEventDataState *free_event = NULL;
+#endif
+ ErtsDrvSelectDataState *free_select = NULL;
+ ErtsNifSelectDataState *free_nif = NULL;
+#ifdef USE_VM_PROBES
+ DTRACE_CHARBUF(name, 64);
+#endif
+
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ if ((unsigned)fd >= (unsigned)erts_smp_atomic_read_nob(&drv_ev_state_len)) {
+ if (fd < 0) {
+ return ERL_NIF_SELECT_ERROR | ERL_NIF_SELECT_INVALID_EVENT;
+ }
+ if (fd >= max_fds) {
+ nif_select_large_fd_error(fd, mode, resource, ref);
+ return ERL_NIF_SELECT_ERROR | ERL_NIF_SELECT_INVALID_EVENT;
+ }
+ grow_drv_ev_state(fd);
+ }
+#endif
+
+ erts_smp_mtx_lock(fd_mtx(fd));
+
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ state = &drv_ev_state[(int) fd];
+#else
+ state = hash_get_drv_ev_state(fd); /* may be NULL! */
+#endif
+
+ if (mode & ERL_NIF_SELECT_STOP) {
+ ASSERT(resource->type->stop);
+ if (IS_FD_UNKNOWN(state)) {
+ /* fast track to stop callback */
+ call_stop = CALL_STOP;
+ ret = ERL_NIF_SELECT_STOP_CALLED;
+ goto done_unknown;
+ }
+ on = 0;
+ mode = ERL_DRV_READ | ERL_DRV_WRITE | ERL_DRV_USE;
+ wake_poller = 1; /* to eject fd from pollset (if needed) */
+ ctl_events = ERTS_POLL_EV_IN | ERTS_POLL_EV_OUT;
+ }
+ else {
+ on = 1;
+ ASSERT(mode);
+ wake_poller = 0;
+ if (mode & ERL_DRV_READ) {
+ ctl_events |= ERTS_POLL_EV_IN;
+ }
+ if (mode & ERL_DRV_WRITE) {
+ ctl_events |= ERTS_POLL_EV_OUT;
+ }
+ }
+
+#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ if (state == NULL) {
+ state = hash_new_drv_ev_state(fd);
+ }
+#endif
+
+ switch (state->type) {
+ case ERTS_EV_TYPE_NIF:
+ /*
+ * Changing resource is considered stealing.
+ * Changing process and/or ref is ok (I think?).
+ */
+ if (state->driver.stop.resource != resource)
+ nif_select_steal(state, ERL_DRV_READ | ERL_DRV_WRITE, resource, ref);
+ break;
+#if ERTS_CIO_HAVE_DRV_EVENT
+ case ERTS_EV_TYPE_DRV_EV:
+#endif
+ case ERTS_EV_TYPE_DRV_SEL:
+ nif_select_steal(state, mode, resource, ref);
+ break;
+ case ERTS_EV_TYPE_STOP_USE: {
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ print_nif_select_op(dsbufp, fd, mode, resource, ref);
+ steal_pending_stop_use(dsbufp, ERTS_INVALID_ERL_DRV_PORT, state, mode, on);
+ ASSERT(state->type == ERTS_EV_TYPE_NONE);
+ break;
+ }
+ case ERTS_EV_TYPE_STOP_NIF: {
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ print_nif_select_op(dsbufp, fd, mode, resource, ref);
+ steal_pending_stop_nif(dsbufp, resource, state, mode, on);
+ if (state->type == ERTS_EV_TYPE_STOP_NIF) {
+ ret = ERL_NIF_SELECT_STOP_SCHEDULED; /* ?? */
+ goto done;
+ }
+ ASSERT(state->type == ERTS_EV_TYPE_NONE);
+ break;
+ }}
+
+ ASSERT((state->type == ERTS_EV_TYPE_NIF) ||
+ (state->type == ERTS_EV_TYPE_NONE && !state->events));
+
+ new_events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, ctl_events, on, &wake_poller);
+
+ if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
+ if (state->type == ERTS_EV_TYPE_NIF && !state->events) {
+ state->type = ERTS_EV_TYPE_NONE;
+ state->flags &= ~ERTS_EV_FLAG_USED;
+ state->driver.nif->in.pid = NIL;
+ state->driver.nif->out.pid = NIL;
+ state->driver.nif->in.ddeselect_cnt = 0;
+ state->driver.nif->out.ddeselect_cnt = 0;
+ state->driver.stop.resource = NULL;
+ }
+ ret = ERL_NIF_SELECT_ERROR | ERL_NIF_SELECT_FAILED;
+ goto done;
+ }
+
+ old_events = state->events;
+
+ ASSERT(on
+ ? (new_events == (state->events | ctl_events))
+ : (new_events == (state->events & ~ctl_events)));
+
+ ASSERT(state->type == ERTS_EV_TYPE_NIF
+ || state->type == ERTS_EV_TYPE_NONE);
+
+ state->events = new_events;
+ if (on) {
+ Uint32* refn;
+ if (!state->driver.nif)
+ state->driver.nif = alloc_nif_select_data();
+ if (state->type == ERTS_EV_TYPE_NONE) {
+ state->type = ERTS_EV_TYPE_NIF;
+ state->driver.stop.resource = resource;
+ enif_keep_resource(resource->data);
+ }
+ ASSERT(state->type == ERTS_EV_TYPE_NIF);
+ ASSERT(state->driver.stop.resource == resource);
+ if (ctl_events & ERTS_POLL_EV_IN) {
+ state->driver.nif->in.pid = id;
+ if (is_immed(ref)) {
+ state->driver.nif->in.immed = ref;
+ } else {
+ ASSERT(is_internal_ref(ref));
+ refn = internal_ref_numbers(ref);
+ state->driver.nif->in.immed = THE_NON_VALUE;
+ state->driver.nif->in.refn[0] = refn[0];
+ state->driver.nif->in.refn[1] = refn[1];
+ state->driver.nif->in.refn[2] = refn[2];
+ }
+ state->driver.nif->in.ddeselect_cnt = 0;
+ }
+ if (ctl_events & ERTS_POLL_EV_OUT) {
+ state->driver.nif->out.pid = id;
+ if (is_immed(ref)) {
+ state->driver.nif->out.immed = ref;
+ } else {
+ ASSERT(is_internal_ref(ref));
+ refn = internal_ref_numbers(ref);
+ state->driver.nif->out.immed = THE_NON_VALUE;
+ state->driver.nif->out.refn[0] = refn[0];
+ state->driver.nif->out.refn[1] = refn[1];
+ state->driver.nif->out.refn[2] = refn[2];
+ }
+ state->driver.nif->out.ddeselect_cnt = 0;
+ }
+ ret = 0;
+ }
+ else { /* off */
+ if (state->type == ERTS_EV_TYPE_NIF) {
+ //erts_fprintf(stderr, "SVERK: enif select clear fd=%d inpid=%T inrsrc=%p\n",
+ // state->fd, state->driver.nif->inpid,
+ // state->driver.nif->in.resource);
+ state->driver.nif->in.pid = NIL;
+ state->driver.nif->out.pid = NIL;
+ state->driver.nif->in.ddeselect_cnt = 0;
+ state->driver.nif->out.ddeselect_cnt = 0;
+ if (old_events != 0) {
+ remember_removed(state, &pollset);
+ }
+ }
+ ASSERT(new_events==0);
+ if (state->remove_cnt == 0 || !wake_poller) {
+ /* Safe to close fd now as it is not in pollset
+ or there was no need to eject fd (kernel poll) */
+ //erts_fprintf(stderr, "SVERK : enif_select calling stop before return\n");
+ if (state->type == ERTS_EV_TYPE_NIF) {
+ ASSERT(state->driver.stop.resource == resource);
+ call_stop = CALL_STOP_AND_RELEASE;
+ state->driver.stop.resource = NULL;
+ }
+ else {
+ ASSERT(!state->driver.stop.resource);
+ call_stop = CALL_STOP;
+ }
+ state->type = ERTS_EV_TYPE_NONE;
+ ret = ERL_NIF_SELECT_STOP_CALLED;
+ }
+ else {
+ /* Not safe to close fd, postpone stop_select callback. */
+ //erts_fprintf(stderr, "SVERK: enif_select schedule stop\n");
+ if (state->type == ERTS_EV_TYPE_NONE) {
+ ASSERT(!state->driver.stop.resource);
+ state->driver.stop.resource = resource;
+ enif_keep_resource(resource);
+ }
+ state->type = ERTS_EV_TYPE_STOP_NIF;
+ ret = ERL_NIF_SELECT_STOP_SCHEDULED;
+ }
+ }
+
+done:
+
+ check_fd_cleanup(state,
+#if ERTS_CIO_HAVE_DRV_EVENT
+ &free_event,
+#endif
+ &free_select,
+ &free_nif);
+
+done_unknown:
+ erts_smp_mtx_unlock(fd_mtx(fd));
+ if (call_stop) {
+ erts_resource_stop(resource, (ErlNifEvent)fd, 1);
+ if (call_stop == CALL_STOP_AND_RELEASE) {
+ enif_release_resource(resource->data);
+ }
+ }
+ if (free_select)
+ free_drv_select_data(free_select);
+ if (free_nif)
+ free_nif_select_data(free_nif);
+
#if ERTS_CIO_HAVE_DRV_EVENT
if (free_event)
free_drv_event_data(free_event);
@@ -1070,6 +1452,7 @@ done_unknown:
return ret;
}
+
int
ERTS_CIO_EXPORT(driver_event)(ErlDrvPort ix,
ErlDrvEvent e,
@@ -1090,6 +1473,7 @@ ERTS_CIO_EXPORT(driver_event)(ErlDrvPort ix,
ErtsDrvEventDataState *free_event;
#endif
ErtsDrvSelectDataState *free_select;
+ ErtsNifSelectDataState *free_nif;
Port *prt = erts_drvport2port(ix);
if (prt == ERTS_INVALID_ERL_DRV_PORT)
@@ -1126,12 +1510,12 @@ ERTS_CIO_EXPORT(driver_event)(ErlDrvPort ix,
if (state->driver.event->port == id) break;
/*fall through*/
case ERTS_EV_TYPE_DRV_SEL:
- event_steal(ix, state, event_data);
+ drv_event_steal(ix, state, event_data);
break;
case ERTS_EV_TYPE_STOP_USE: {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
- print_event_op(dsbufp, ix, fd, event_data);
- steal_pending_stop_select(dsbufp, ix, state, 0, 1);
+ print_drv_event_op(dsbufp, ix, fd, event_data);
+ steal_pending_stop_use(dsbufp, ix, state, 0, 1);
break;
}
}
@@ -1199,12 +1583,15 @@ done:
#if ERTS_CIO_HAVE_DRV_EVENT
&free_event,
#endif
- &free_select);
+ &free_select,
+ &free_nif);
erts_smp_mtx_unlock(fd_mtx(fd));
if (free_select)
free_drv_select_data(free_select);
+ if (free_nif)
+ free_nif_select_data(free_nif);
#if ERTS_CIO_HAVE_DRV_EVENT
if (free_event)
free_drv_event_data(free_event);
@@ -1240,13 +1627,19 @@ need2steal(ErtsDrvEventState *state, int mode)
state,
ERL_DRV_WRITE);
break;
+ case ERTS_EV_TYPE_NIF:
+ ASSERT(state->driver.stop.resource);
+ do_steal = 1;
+ break;
+
#if ERTS_CIO_HAVE_DRV_EVENT
case ERTS_EV_TYPE_DRV_EV:
do_steal |= chk_stale(state->driver.event->port, state, 0);
break;
#endif
case ERTS_EV_TYPE_STOP_USE:
- ASSERT(0);
+ case ERTS_EV_TYPE_STOP_NIF:
+ ASSERT(0);
break;
default:
break;
@@ -1307,6 +1700,25 @@ steal(erts_dsprintf_buf_t *dsbufp, ErtsDrvEventState *state, int mode)
erts_dsprintf(dsbufp, "\n");
break;
}
+ case ERTS_EV_TYPE_NIF: {
+ Eterm iid = state->driver.nif->in.pid;
+ Eterm oid = state->driver.nif->out.pid;
+ const char* with = "with";
+ ErlNifResourceType* rt = state->driver.stop.resource->type;
+
+ erts_dsprintf(dsbufp, "resource %T:%T", rt->module, rt->name);
+
+ if (is_not_nil(iid)) {
+ erts_dsprintf(dsbufp, " %s in-pid %T", with, iid);
+ with = "and";
+ }
+ if (is_not_nil(oid)) {
+ erts_dsprintf(dsbufp, " %s out-pid %T", with, oid);
+ }
+ deselect(state, 0);
+ erts_dsprintf(dsbufp, "\n");
+ break;
+ }
#if ERTS_CIO_HAVE_DRV_EVENT
case ERTS_EV_TYPE_DRV_EV: {
Eterm eid = state->driver.event->port;
@@ -1324,7 +1736,8 @@ steal(erts_dsprintf_buf_t *dsbufp, ErtsDrvEventState *state, int mode)
break;
}
#endif
- case ERTS_EV_TYPE_STOP_USE: {
+ case ERTS_EV_TYPE_STOP_USE:
+ case ERTS_EV_TYPE_STOP_NIF: {
ASSERT(0);
break;
}
@@ -1335,8 +1748,8 @@ steal(erts_dsprintf_buf_t *dsbufp, ErtsDrvEventState *state, int mode)
}
static void
-print_select_op(erts_dsprintf_buf_t *dsbufp,
- ErlDrvPort ix, ErtsSysFdType fd, int mode, int on)
+print_drv_select_op(erts_dsprintf_buf_t *dsbufp,
+ ErlDrvPort ix, ErtsSysFdType fd, int mode, int on)
{
Port *pp = erts_drvport2port(ix);
erts_dsprintf(dsbufp,
@@ -1354,11 +1767,40 @@ print_select_op(erts_dsprintf_buf_t *dsbufp,
}
static void
-select_steal(ErlDrvPort ix, ErtsDrvEventState *state, int mode, int on)
+print_nif_select_op(erts_dsprintf_buf_t *dsbufp,
+ ErtsSysFdType fd, int mode,
+ ErlNifResource* resource, Eterm ref)
+{
+ erts_dsprintf(dsbufp,
+ "enif_select(_, %d,%s%s%s, %T:%T, %T) ",
+ (int) GET_FD(fd),
+ mode & ERL_NIF_SELECT_READ ? " READ" : "",
+ mode & ERL_NIF_SELECT_WRITE ? " WRITE" : "",
+ mode & ERL_NIF_SELECT_STOP ? " STOP" : "",
+ resource->type->module,
+ resource->type->name,
+ ref);
+}
+
+
+static void
+drv_select_steal(ErlDrvPort ix, ErtsDrvEventState *state, int mode, int on)
{
if (need2steal(state, mode)) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
- print_select_op(dsbufp, ix, state->fd, mode, on);
+ print_drv_select_op(dsbufp, ix, state->fd, mode, on);
+ steal(dsbufp, state, mode);
+ erts_send_error_to_logger_nogl(dsbufp);
+ }
+}
+
+static void
+nif_select_steal(ErtsDrvEventState *state, int mode,
+ ErlNifResource* resource, Eterm ref)
+{
+ if (need2steal(state, mode)) {
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ print_nif_select_op(dsbufp, state->fd, mode, resource, ref);
steal(dsbufp, state, mode);
erts_send_error_to_logger_nogl(dsbufp);
}
@@ -1374,10 +1816,20 @@ large_fd_error_common(erts_dsprintf_buf_t *dsbufp, ErtsSysFdType fd)
}
static void
-select_large_fd_error(ErlDrvPort ix, ErtsSysFdType fd, int mode, int on)
+drv_select_large_fd_error(ErlDrvPort ix, ErtsSysFdType fd, int mode, int on)
+{
+ erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
+ print_drv_select_op(dsbufp, ix, fd, mode, on);
+ erts_dsprintf(dsbufp, "failed: ");
+ large_fd_error_common(dsbufp, fd);
+ erts_send_error_to_logger_nogl(dsbufp);
+}
+static void
+nif_select_large_fd_error(ErtsSysFdType fd, int mode,
+ ErlNifResource* resource, Eterm ref)
{
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
- print_select_op(dsbufp, ix, fd, mode, on);
+ print_nif_select_op(dsbufp, fd, mode, resource, ref);
erts_dsprintf(dsbufp, "failed: ");
large_fd_error_common(dsbufp, fd);
erts_send_error_to_logger_nogl(dsbufp);
@@ -1387,41 +1839,81 @@ select_large_fd_error(ErlDrvPort ix, ErtsSysFdType fd, int mode, int on)
static void
-steal_pending_stop_select(erts_dsprintf_buf_t *dsbufp, ErlDrvPort ix,
- ErtsDrvEventState *state, int mode, int on)
+steal_pending_stop_use(erts_dsprintf_buf_t *dsbufp, ErlDrvPort ix,
+ ErtsDrvEventState *state, int mode, int on)
{
+ int cancel = 0;
ASSERT(state->type == ERTS_EV_TYPE_STOP_USE);
- erts_dsprintf(dsbufp, "failed: fd=%d (re)selected before stop_select "
- "was called for driver %s\n",
- (int) GET_FD(state->fd), state->driver.drv_ptr->name);
- erts_send_error_to_logger_nogl(dsbufp);
if (on) {
/* Either fd-owner changed its mind about closing
* or closed fd before stop_select callback and fd is now reused.
* In either case stop_select should not be called.
- */
- state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
- if (state->driver.drv_ptr->handle) {
- erts_ddll_dereference_driver(state->driver.drv_ptr->handle);
- }
- state->driver.drv_ptr = NULL;
+ */
+ cancel = 1;
}
else if ((mode & ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) {
Port *prt = erts_drvport2port(ix);
- erts_driver_t* drv_ptr = prt != ERTS_INVALID_ERL_DRV_PORT ? prt->drv_ptr : NULL;
- if (drv_ptr && drv_ptr != state->driver.drv_ptr) {
- /* Some other driver wants the stop_select callback */
- if (state->driver.drv_ptr->handle) {
- erts_ddll_dereference_driver(state->driver.drv_ptr->handle);
- }
- if (drv_ptr->handle) {
- erts_ddll_reference_referenced_driver(drv_ptr->handle);
- }
- state->driver.drv_ptr = drv_ptr;
- }
+ if (prt == ERTS_INVALID_ERL_DRV_PORT
+ || prt->drv_ptr != state->driver.stop.drv_ptr) {
+ /* Some other driver or nif wants the stop_select callback */
+ cancel = 1;
+ }
+ }
+
+ if (cancel) {
+ erts_dsprintf(dsbufp, "called before stop_select was called for driver '%s'\n",
+ state->driver.stop.drv_ptr->name);
+ if (state->driver.stop.drv_ptr->handle) {
+ erts_ddll_dereference_driver(state->driver.stop.drv_ptr->handle);
+ }
+ state->type = ERTS_EV_TYPE_NONE;
+ state->flags &= ~ERTS_EV_FLAG_USED;
+ state->driver.stop.drv_ptr = NULL;
+ }
+ else {
+ erts_dsprintf(dsbufp, "ignored repeated call\n");
}
+ erts_send_error_to_logger_nogl(dsbufp);
+}
+
+static void
+steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErlNifResource* resource,
+ ErtsDrvEventState *state, int mode, int on)
+{
+ int cancel = 0;
+
+ ASSERT(state->type == ERTS_EV_TYPE_STOP_NIF);
+ ASSERT(state->driver.stop.resource);
+
+ if (on) {
+ ASSERT(mode & (ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE));
+ /* Either fd-owner changed its mind about closing
+ * or closed fd before stop callback and fd is now reused.
+ * In either case, stop should not be called.
+ */
+ cancel = 1;
+ }
+ else if ((mode & ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE
+ && resource != state->driver.stop.resource) {
+ /* Some driver or other resource wants the stop callback */
+ cancel = 1;
+ }
+
+ if (cancel) {
+ ErlNifResourceType* rt = state->driver.stop.resource->type;
+ erts_dsprintf(dsbufp, "called before stop was called for NIF resource %T:%T\n",
+ rt->module, rt->name);
+
+ enif_release_resource(state->driver.stop.resource);
+ state->type = ERTS_EV_TYPE_NONE;
+ state->flags &= ~ERTS_EV_FLAG_USED;
+ state->driver.stop.resource = NULL;
+ }
+ else {
+ erts_dsprintf(dsbufp, "ignored repeated call\n");
+ }
+ erts_send_error_to_logger_nogl(dsbufp);
}
@@ -1429,8 +1921,8 @@ steal_pending_stop_select(erts_dsprintf_buf_t *dsbufp, ErlDrvPort ix,
#if ERTS_CIO_HAVE_DRV_EVENT
static void
-print_event_op(erts_dsprintf_buf_t *dsbufp,
- ErlDrvPort ix, ErtsSysFdType fd, ErlDrvEventData event_data)
+print_drv_event_op(erts_dsprintf_buf_t *dsbufp,
+ ErlDrvPort ix, ErtsSysFdType fd, ErlDrvEventData event_data)
{
Port *pp = erts_drvport2port(ix);
erts_dsprintf(dsbufp, "driver_event(%p, %d, ", ix, (int) fd);
@@ -1447,11 +1939,11 @@ print_event_op(erts_dsprintf_buf_t *dsbufp,
}
static void
-event_steal(ErlDrvPort ix, ErtsDrvEventState *state, ErlDrvEventData event_data)
+drv_event_steal(ErlDrvPort ix, ErtsDrvEventState *state, ErlDrvEventData event_data)
{
if (need2steal(state, ERL_DRV_READ|ERL_DRV_WRITE)) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
- print_event_op(dsbufp, ix, state->fd, event_data);
+ print_drv_event_op(dsbufp, ix, state->fd, event_data);
steal(dsbufp, state, ERL_DRV_READ|ERL_DRV_WRITE);
erts_send_error_to_logger_nogl(dsbufp);
}
@@ -1466,7 +1958,7 @@ static void
event_large_fd_error(ErlDrvPort ix, ErtsSysFdType fd, ErlDrvEventData event_data)
{
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
- print_event_op(dsbufp, ix, fd, event_data);
+ print_drv_event_op(dsbufp, ix, fd, event_data);
erts_dsprintf(dsbufp, "failed: ");
large_fd_error_common(dsbufp, fd);
erts_send_error_to_logger_nogl(dsbufp);
@@ -1553,6 +2045,56 @@ oready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time)
}
}
+static ERTS_INLINE void
+send_event_tuple(struct erts_nif_select_event* e, ErlNifResource* resource,
+ Eterm event_atom)
+{
+ Process* rp = erts_proc_lookup(e->pid);
+ ErtsProcLocks rp_locks = 0;
+ ErtsMessage* mp;
+ ErlOffHeap* ohp;
+ ErtsBinary* bin;
+ Eterm* hp;
+ Uint hsz;
+ Eterm resource_term, ref_term, tuple;
+
+ if (!rp) {
+ erts_fprintf(stderr, "SVERK: Process %T not alive for msg %T\n", e->pid, event_atom);
+ return;
+ }
+
+ bin = ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(resource);
+
+ /* {select, Resource, Ref, EventAtom} */
+ if (is_value(e->immed)) {
+ hsz = 5 + PROC_BIN_SIZE;
+ }
+ else {
+ hsz = 5 + PROC_BIN_SIZE + REF_THING_SIZE;
+ }
+
+ mp = erts_alloc_message_heap(rp, &rp_locks, hsz, &hp, &ohp);
+
+ resource_term = erts_mk_magic_binary_term(&hp, ohp, &bin->binary);
+ if (is_value(e->immed)) {
+ ASSERT(is_immed(e->immed));
+ ref_term = e->immed;
+ }
+ else {
+ write_ref_thing(hp, e->refn[0], e->refn[1], e->refn[2]);
+ ref_term = make_internal_ref(hp);
+ hp += REF_THING_SIZE;
+ }
+ tuple = TUPLE4(hp, am_select, resource_term, ref_term, event_atom);
+
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
+ erts_queue_message(rp, rp_locks, mp, tuple, am_system);
+
+ if (rp_locks)
+ erts_smp_proc_unlock(rp, rp_locks);
+}
+
+
#if ERTS_CIO_HAVE_DRV_EVENT
static ERTS_INLINE void
eready(Eterm id, ErtsDrvEventState *state, ErlDrvEventData event_data,
@@ -1598,6 +2140,12 @@ ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(int set,
ERTS_CIO_POLL_INTR_TMD(pollset.ps, set, timeout_time);
}
+/*
+ * Number of ignored events, for a lingering fd added by enif_select(),
+ * until we deselect fd-event from pollset.
+ */
+#define ERTS_NIF_DELAYED_DESELECT 20
+
void
ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
{
@@ -1631,7 +2179,8 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
current_cio_time++;
erts_smp_atomic_set_relb(&erts_check_io_time, current_cio_time);
- check_cleanup_active_fds(current_cio_time);
+ check_cleanup_active_fds(current_cio_time,
+ timeout_time != ERTS_POLL_NO_TIMEOUT);
#ifdef ERTS_ENABLE_LOCK_CHECK
erts_lc_check_exact(NULL, 0); /* No locks should be locked */
@@ -1699,31 +2248,22 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
switch (state->type) {
case ERTS_EV_TYPE_DRV_SEL: { /* Requested via driver_select()... */
- ErtsPollEvents revents;
- ErtsPollEvents revent_mask;
-
- revent_mask = ~(ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT);
- revent_mask |= state->events;
- revents = pollres[i].events & revent_mask;
-
- if (revents & ERTS_POLL_EV_ERR) {
- /*
- * Let the driver handle the error condition. Only input,
- * only output, or nothing might have been selected.
- * We *do not* want to call a callback that corresponds
- * to an event not selected. revents might give us a clue
- * on which one to call.
- */
- if ((revents & ERTS_POLL_EV_IN)
- || (!(revents & ERTS_POLL_EV_OUT)
- && state->events & ERTS_POLL_EV_IN)) {
- iready(state->driver.select->inport, state, current_cio_time);
- }
- else if (state->events & ERTS_POLL_EV_OUT) {
- oready(state->driver.select->outport, state, current_cio_time);
- }
- }
- else if (revents & (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) {
+ ErtsPollEvents revents = pollres[i].events;
+
+ if (revents & ERTS_POLL_EV_ERR) {
+ /*
+ * Handle error events by triggering all in/out events
+ * that the driver has selected.
+ * We *do not* want to call a callback that corresponds
+ * to an event not selected.
+ */
+ revents = state->events;
+ }
+ else {
+ revents &= (state->events | ERTS_POLL_EV_NVAL);
+ }
+
+ if (revents & (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) {
if (revents & ERTS_POLL_EV_OUT) {
oready(state->driver.select->outport, state, current_cio_time);
}
@@ -1731,7 +2271,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
was read (true also on the non-smp emulator since
oready() may have been called); therefore, update
revents... */
- revents &= ~(~state->events & ERTS_POLL_EV_IN);
+ revents &= state->events;
if (revents & ERTS_POLL_EV_IN) {
iready(state->driver.select->inport, state, current_cio_time);
}
@@ -1746,6 +2286,69 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
break;
}
+ case ERTS_EV_TYPE_NIF: { /* Requested via enif_select()... */
+ struct erts_nif_select_event in = {NIL};
+ struct erts_nif_select_event out = {NIL};
+ ErlNifResource* resource;
+ ErtsPollEvents revents = pollres[i].events;
+
+ if (revents & ERTS_POLL_EV_ERR) {
+ /*
+ * Handle error events by triggering all in/out events
+ * that the NIF has selected.
+ * We *do not* want to send a message that corresponds
+ * to an event not selected.
+ */
+ revents = state->events;
+ }
+ else {
+ revents &= (state->events | ERTS_POLL_EV_NVAL);
+ }
+
+ if (revents & (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) {
+ if (revents & ERTS_POLL_EV_OUT) {
+ if (is_not_nil(state->driver.nif->out.pid)) {
+ out = state->driver.nif->out;
+ resource = state->driver.stop.resource;
+ state->driver.nif->out.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT;
+ state->driver.nif->out.pid = NIL;
+ add_active_fd(state->fd);
+ }
+ else {
+ ASSERT(state->driver.nif->out.ddeselect_cnt >= 2);
+ state->driver.nif->out.ddeselect_cnt--;
+ }
+ }
+ if (revents & ERTS_POLL_EV_IN) {
+ if (is_not_nil(state->driver.nif->in.pid)) {
+ in = state->driver.nif->in;
+ resource = state->driver.stop.resource;
+ state->driver.nif->in.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT;
+ state->driver.nif->in.pid = NIL;
+ add_active_fd(state->fd);
+ }
+ else {
+ ASSERT(state->driver.nif->in.ddeselect_cnt >= 2);
+ state->driver.nif->in.ddeselect_cnt--;
+ }
+ }
+ }
+ else if (revents & ERTS_POLL_EV_NVAL) {
+ abort();
+ }
+
+#ifdef ERTS_SMP
+ erts_smp_mtx_unlock(fd_mtx(fd));
+#endif
+ if (is_not_nil(in.pid)) {
+ send_event_tuple(&in, resource, am_ready_input);
+ }
+ if (is_not_nil(out.pid)) {
+ send_event_tuple(&out, resource, am_ready_output);
+ }
+ goto next_pollres_unlocked;
+ }
+
#if ERTS_CIO_HAVE_DRV_EVENT
case ERTS_EV_TYPE_DRV_EV: { /* Requested via driver_event()... */
ErlDrvEventData event_data;
@@ -1786,6 +2389,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
#ifdef ERTS_SMP
erts_smp_mtx_unlock(fd_mtx(fd));
#endif
+ next_pollres_unlocked:;
}
erts_smp_atomic_set_nob(&pollset.in_poll_wait, 0);
@@ -2311,6 +2915,39 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters)
}
}
}
+ else if (state->type == ERTS_EV_TYPE_NIF) {
+ ErlNifResource* r;
+ erts_printf("enif_select ");
+
+#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ if (internal) {
+ erts_printf("internal ");
+ err = 1;
+ }
+
+ if (cio_events == ep_events) {
+ erts_printf("ev=");
+ if (print_events(cio_events) != 0)
+ err = 1;
+ }
+ else {
+ err = 1;
+ erts_printf("cio_ev=");
+ print_events(cio_events);
+ erts_printf(" ep_ev=");
+ print_events(ep_events);
+ }
+#else
+ if (print_events(cio_events) != 0)
+ err = 1;
+#endif
+ erts_printf(" inpid=%T dd_cnt=%b32d", state->driver.nif->in.pid,
+ state->driver.nif->in.ddeselect_cnt);
+ erts_printf(" outpid=%T dd_cnt=%b32d", state->driver.nif->out.pid,
+ state->driver.nif->out.ddeselect_cnt);
+ r = state->driver.stop.resource;
+ erts_printf(" resource=%p(%T:%T)", r, r->type->module, r->type->name);
+ }
#if ERTS_CIO_HAVE_DRV_EVENT
else if (state->type == ERTS_EV_TYPE_DRV_EV) {
Eterm id;
@@ -2367,11 +3004,12 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters)
erts_printf("control_type=%d ", (int)state->type);
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
if (cio_events == ep_events) {
- erts_printf("ev=0x%b32x", (Uint32) cio_events);
+ erts_printf("ev=");
+ print_events(cio_events);
}
else {
- erts_printf("cio_ev=0x%b32x", (Uint32) cio_events);
- erts_printf(" ep_ev=0x%b32x", (Uint32) ep_events);
+ erts_printf("cio_ev="); print_events(cio_events);
+ erts_printf(" ep_ev="); print_events(ep_events);
}
#else
erts_printf("ev=0x%b32x", (Uint32) cio_events);
@@ -2400,7 +3038,7 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip)
#if ERTS_CIO_HAVE_DRV_EVENT
null_des.driver.event = NULL;
#endif
- null_des.driver.drv_ptr = NULL;
+ null_des.driver.stop.drv_ptr = NULL;
null_des.events = 0;
null_des.remove_cnt = 0;
null_des.type = ERTS_EV_TYPE_NONE;
diff --git a/erts/emulator/sys/common/erl_check_io.h b/erts/emulator/sys/common/erl_check_io.h
index 14f1ea3f43..41e2e9210a 100644
--- a/erts/emulator/sys/common/erl_check_io.h
+++ b/erts/emulator/sys/common/erl_check_io.h
@@ -34,6 +34,8 @@
int driver_select_kp(ErlDrvPort, ErlDrvEvent, int, int);
int driver_select_nkp(ErlDrvPort, ErlDrvEvent, int, int);
+enum ErlNifSelectReturn enif_select_kp(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, Eterm);
+enum ErlNifSelectReturn enif_select_nkp(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, Eterm);
int driver_event_kp(ErlDrvPort, ErlDrvEvent, ErlDrvEventData);
int driver_event_nkp(ErlDrvPort, ErlDrvEvent, ErlDrvEventData);
Uint erts_check_io_size_kp(void);
@@ -136,4 +138,20 @@ typedef struct {
ErtsIoTask iniotask;
ErtsIoTask outiotask;
} ErtsDrvSelectDataState;
+
+struct erts_nif_select_event {
+ Eterm pid;
+ Eterm immed;
+ Uint32 refn[ERTS_REF_NUMBERS];
+ Sint32 ddeselect_cnt; /* 0: No delayed deselect in progress
+ * 1: Do deselect before next poll
+ * >1: Countdown of ignored events
+ */
+};
+
+typedef struct {
+ struct erts_nif_select_event in;
+ struct erts_nif_select_event out;
+} ErtsNifSelectDataState;
+
#endif /* #ifndef ERL_CHECK_IO_INTERNAL__ */
diff --git a/erts/emulator/sys/common/erl_poll.c b/erts/emulator/sys/common/erl_poll.c
index b8a28bcc18..0c4df3e13a 100644
--- a/erts/emulator/sys/common/erl_poll.c
+++ b/erts/emulator/sys/common/erl_poll.c
@@ -3030,6 +3030,7 @@ ERTS_POLL_EXPORT(erts_poll_get_selected_events)(ErtsPollSet ps,
ev[fd] = 0;
else {
ev[fd] = ps->fds_status[fd].events;
+ ASSERT(ps->fds_status[fd].used_events == ev[fd]);
if (
#if ERTS_POLL_USE_WAKEUP_PIPE
fd == ps->wake_fds[0] || fd == ps->wake_fds[1] ||
diff --git a/erts/emulator/sys/unix/sys.c b/erts/emulator/sys/unix/sys.c
index 2fc802a2c6..a852550915 100644
--- a/erts/emulator/sys/unix/sys.c
+++ b/erts/emulator/sys/unix/sys.c
@@ -161,6 +161,7 @@ int erts_use_kernel_poll = 0;
struct {
int (*select)(ErlDrvPort, ErlDrvEvent, int, int);
+ enum ErlNifSelectReturn (*enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, Eterm);
int (*event)(ErlDrvPort, ErlDrvEvent, ErlDrvEventData);
void (*check_io_as_interrupt)(void);
void (*check_io_interrupt)(int);
@@ -184,6 +185,13 @@ driver_event(ErlDrvPort port, ErlDrvEvent event, ErlDrvEventData event_data)
return (*io_func.event)(port, event, event_data);
}
+int enif_select(ErlNifEnv* env, ErlNifEvent event,
+ enum ErlNifSelectFlags flags, void* obj, Eterm ref)
+{
+ return (*io_func.enif_select)(env, event, flags, obj, ref);
+}
+
+
Eterm erts_check_io_info(void *p)
{
return (*io_func.info)(p);
@@ -201,6 +209,7 @@ init_check_io(void)
{
if (erts_use_kernel_poll) {
io_func.select = driver_select_kp;
+ io_func.enif_select = enif_select_kp;
io_func.event = driver_event_kp;
#ifdef ERTS_POLL_NEED_ASYNC_INTERRUPT_SUPPORT
io_func.check_io_as_interrupt = erts_check_io_async_sig_interrupt_kp;
@@ -216,6 +225,7 @@ init_check_io(void)
}
else {
io_func.select = driver_select_nkp;
+ io_func.enif_select = enif_select_nkp;
io_func.event = driver_event_nkp;
#ifdef ERTS_POLL_NEED_ASYNC_INTERRUPT_SUPPORT
io_func.check_io_as_interrupt = erts_check_io_async_sig_interrupt_nkp;