diff options
author | Sverker Eriksson <[email protected]> | 2016-11-23 15:58:15 +0100 |
---|---|---|
committer | Sverker Eriksson <[email protected]> | 2016-11-30 20:26:47 +0100 |
commit | 0763a36867a702e3075b682973a079e0390144ce (patch) | |
tree | 2fffe305a52e44ed0df4b445a691d4a78380738e /erts/emulator/sys | |
parent | 16b9292ff0914f77ee7ab7e169def914a190f79b (diff) | |
download | otp-0763a36867a702e3075b682973a079e0390144ce.tar.gz otp-0763a36867a702e3075b682973a079e0390144ce.tar.bz2 otp-0763a36867a702e3075b682973a079e0390144ce.zip |
erts: Add enif_select & enif_open_resource_type_x
Diffstat (limited to 'erts/emulator/sys')
-rw-r--r-- | erts/emulator/sys/common/erl_check_io.c | 759 | ||||
-rw-r--r-- | erts/emulator/sys/common/erl_check_io.h | 18 | ||||
-rw-r--r-- | erts/emulator/sys/common/erl_poll.c | 1 | ||||
-rw-r--r-- | erts/emulator/sys/unix/sys.c | 10 |
4 files changed, 728 insertions, 60 deletions
diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c index 2c138e1746..6f61fc8a28 100644 --- a/erts/emulator/sys/common/erl_check_io.c +++ b/erts/emulator/sys/common/erl_check_io.c @@ -53,6 +53,8 @@ typedef char EventStateType; #define ERTS_EV_TYPE_DRV_SEL ((EventStateType) 1) /* driver_select */ #define ERTS_EV_TYPE_DRV_EV ((EventStateType) 2) /* driver_event */ #define ERTS_EV_TYPE_STOP_USE ((EventStateType) 3) /* pending stop_select */ +#define ERTS_EV_TYPE_NIF ((EventStateType) 4) /* enif_select */ +#define ERTS_EV_TYPE_STOP_NIF ((EventStateType) 5) /* pending nif stop */ typedef char EventStateFlags; #define ERTS_EV_FLAG_USED ((EventStateFlags) 1) /* ERL_DRV_USE has been turned on */ @@ -122,7 +124,11 @@ typedef struct { #if ERTS_CIO_HAVE_DRV_EVENT ErtsDrvEventDataState *event; /* ERTS_EV_TYPE_DRV_EV */ #endif - erts_driver_t* drv_ptr; /* ERTS_EV_TYPE_STOP_USE */ + ErtsNifSelectDataState *nif; /* ERTS_EV_TYPE_NIF */ + union { + erts_driver_t* drv_ptr; /* ERTS_EV_TYPE_STOP_USE */ + ErlNifResource* resource; /* ERTS_EV_TYPE_STOP_NIF */ + }stop; } driver; ErtsPollEvents events; unsigned short remove_cnt; /* number of removed_fd's referring to this fd */ @@ -212,11 +218,18 @@ static ERTS_INLINE void hash_erase_drv_ev_state(ErtsDrvEventState *state) static void stale_drv_select(Eterm id, ErtsDrvEventState *state, int mode); static void drv_select_steal(ErlDrvPort ix, ErtsDrvEventState *state, - int mode, int on); + int mode, int on); +static void nif_select_steal(ErtsDrvEventState *state, int mode, + ErlNifResource* resource, Eterm ref); + static void print_drv_select_op(erts_dsprintf_buf_t *dsbufp, ErlDrvPort ix, ErtsSysFdType fd, int mode, int on); +static void print_nif_select_op(erts_dsprintf_buf_t*, ErtsSysFdType, + int mode, ErlNifResource*, Eterm ref); + #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS -static void select_large_fd_error(ErlDrvPort, ErtsSysFdType, int, int); +static void drv_select_large_fd_error(ErlDrvPort, ErtsSysFdType, int, int); +static void nif_select_large_fd_error(ErtsSysFdType, int, ErlNifResource*,Eterm ref); #endif #if ERTS_CIO_HAVE_DRV_EVENT static void drv_event_steal(ErlDrvPort ix, ErtsDrvEventState *state, @@ -227,8 +240,12 @@ static void print_drv_event_op(erts_dsprintf_buf_t *dsbufp, static void event_large_fd_error(ErlDrvPort, ErtsSysFdType, ErlDrvEventData); #endif #endif -static void steal_pending_stop_use(erts_dsprintf_buf_t*, ErlDrvPort, - ErtsDrvEventState*, int mode, int on); +static void +steal_pending_stop_use(erts_dsprintf_buf_t*, ErlDrvPort, ErtsDrvEventState*, + int mode, int on); +static void +steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErlNifResource*, + ErtsDrvEventState *state, int mode, int on); #ifdef ERTS_SMP ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(removed_fd, struct removed_fd, 64, ERTS_ALC_T_FD_LIST) @@ -263,6 +280,18 @@ alloc_drv_select_data(void) return dsp; } +static ERTS_INLINE ErtsNifSelectDataState * +alloc_nif_select_data(void) +{ + ErtsNifSelectDataState *dsp = erts_alloc(ERTS_ALC_T_NIF_SEL_D_STATE, + sizeof(ErtsNifSelectDataState)); + dsp->in.pid = NIL; + dsp->out.pid = NIL; + dsp->in.ddeselect_cnt = 0; + dsp->out.ddeselect_cnt = 0; + return dsp; +} + static ERTS_INLINE void free_drv_select_data(ErtsDrvSelectDataState *dsp) { @@ -271,6 +300,12 @@ free_drv_select_data(ErtsDrvSelectDataState *dsp) erts_free(ERTS_ALC_T_DRV_SEL_D_STATE, dsp); } +static ERTS_INLINE void +free_nif_select_data(ErtsNifSelectDataState *dsp) +{ + erts_free(ERTS_ALC_T_NIF_SEL_D_STATE, dsp); +} + #if ERTS_CIO_HAVE_DRV_EVENT static ERTS_INLINE ErtsDrvEventDataState * @@ -352,6 +387,7 @@ forget_removed(struct pollset_info* psi) erts_smp_spin_unlock(&psi->removed_list_lock); while (fdlp) { + ErlNifResource* resource = NULL; erts_driver_t* drv_ptr = NULL; erts_smp_mtx_t* mtx; ErtsSysFdType fd; @@ -372,15 +408,25 @@ forget_removed(struct pollset_info* psi) ASSERT(state->remove_cnt > 0); if (--state->remove_cnt == 0) { switch (state->type) { + case ERTS_EV_TYPE_STOP_NIF: + /* Now we can call stop */ + resource = state->driver.stop.resource; + state->driver.stop.resource = NULL; + ASSERT(resource); + state->type = ERTS_EV_TYPE_NONE; + state->flags &= ~ERTS_EV_FLAG_USED; + goto case_ERTS_EV_TYPE_NONE; + case ERTS_EV_TYPE_STOP_USE: /* Now we can call stop_select */ - drv_ptr = state->driver.drv_ptr; + drv_ptr = state->driver.stop.drv_ptr; ASSERT(drv_ptr); state->type = ERTS_EV_TYPE_NONE; state->flags &= ~ERTS_EV_FLAG_USED; - state->driver.drv_ptr = NULL; + state->driver.stop.drv_ptr = NULL; /* Fall through */ - case ERTS_EV_TYPE_NONE: + case ERTS_EV_TYPE_NONE: + case_ERTS_EV_TYPE_NONE: #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS hash_erase_drv_ev_state(state); #endif @@ -403,6 +449,11 @@ forget_removed(struct pollset_info* psi) erts_ddll_dereference_driver(drv_ptr->handle); } } + if (resource) { + erts_resource_stop(resource); + enif_release_resource(resource->data); + } + tofree = fdlp; fdlp = fdlp->next; removed_fd_free(tofree); @@ -440,7 +491,8 @@ grow_drv_ev_state(int min_ix) #if ERTS_CIO_HAVE_DRV_EVENT drv_ev_state[i].driver.event = NULL; #endif - drv_ev_state[i].driver.drv_ptr = NULL; + drv_ev_state[i].driver.stop.drv_ptr = NULL; + drv_ev_state[i].driver.nif = NULL; drv_ev_state[i].events = 0; drv_ev_state[i].remove_cnt = 0; drv_ev_state[i].type = ERTS_EV_TYPE_NONE; @@ -480,6 +532,7 @@ abort_tasks(ErtsDrvEventState *state, int mode) ERTS_EV_TYPE_DRV_EV); return; #endif + case ERTS_EV_TYPE_NIF: case ERTS_EV_TYPE_NONE: return; default: @@ -534,6 +587,14 @@ deselect(ErtsDrvEventState *state, int mode) if (!(state->events)) { switch (state->type) { + case ERTS_EV_TYPE_NIF: + state->driver.nif->in.pid = NIL; + state->driver.nif->out.pid = NIL; + state->driver.nif->in.ddeselect_cnt = 0; + state->driver.nif->out.ddeselect_cnt = 0; + enif_release_resource(state->driver.stop.resource); + state->driver.stop.resource = NULL; + break; case ERTS_EV_TYPE_DRV_SEL: state->driver.select->inport = NIL; state->driver.select->outport = NIL; @@ -569,7 +630,8 @@ check_fd_cleanup(ErtsDrvEventState *state, #if ERTS_CIO_HAVE_DRV_EVENT ErtsDrvEventDataState **free_event, #endif - ErtsDrvSelectDataState **free_select) + ErtsDrvSelectDataState **free_select, + ErtsNifSelectDataState **free_nif) { erts_aint_t current_cio_time; @@ -586,6 +648,12 @@ check_fd_cleanup(ErtsDrvEventState *state, state->driver.select = NULL; } + *free_nif = NULL; + if (state->driver.nif && (state->type != ERTS_EV_TYPE_NIF)) { + *free_nif = state->driver.nif; + state->driver.nif = NULL; + } + #if ERTS_CIO_HAVE_DRV_EVENT *free_event = NULL; if (state->driver.event @@ -617,12 +685,14 @@ check_cleanup_active_fd(ErtsSysFdType fd, ErtsPollControlEntry *pce, int *pce_ix, #endif - erts_aint_t current_cio_time) + erts_aint_t current_cio_time, + int may_sleep) { ErtsDrvEventState *state; int active = 0; erts_smp_mtx_t *mtx = fd_mtx(fd); void *free_select = NULL; + void *free_nif = NULL; #if ERTS_CIO_HAVE_DRV_EVENT void *free_event = NULL; #endif @@ -682,6 +752,41 @@ check_cleanup_active_fd(ErtsSysFdType fd, } } + if (state->driver.nif) { +#if ERTS_CIO_DEFER_ACTIVE_EVENTS +# error Windows +#endif + int do_wake = 0; + ErtsPollEvents rm_events = 0; + if (state->driver.nif->in.ddeselect_cnt) { + ASSERT(state->type == ERTS_EV_TYPE_NIF); + ASSERT(state->events & ERTS_POLL_EV_IN); + ASSERT(is_nil(state->driver.nif->in.pid)); + if (may_sleep || state->driver.nif->in.ddeselect_cnt == 1) { + rm_events = ERTS_POLL_EV_IN; + state->driver.nif->in.ddeselect_cnt = 0; + } + } + if (state->driver.nif->out.ddeselect_cnt) { + ASSERT(state->type == ERTS_EV_TYPE_NIF); + ASSERT(state->events & ERTS_POLL_EV_OUT); + ASSERT(is_nil(state->driver.nif->out.pid)); + if (may_sleep || state->driver.nif->out.ddeselect_cnt == 1) { + rm_events |= ERTS_POLL_EV_OUT; + state->driver.nif->out.ddeselect_cnt = 0; + } + } + if (rm_events) { + state->events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, rm_events, 0, &do_wake); + } + if (state->events) + active = 1; + else if (state->type != ERTS_EV_TYPE_NIF) { + free_nif = state->driver.nif; + state->driver.nif = NULL; + } + } + #if ERTS_CIO_HAVE_DRV_EVENT if (state->driver.event) { if (is_iotask_active(&state->driver.event->iotask, current_cio_time)) { @@ -722,6 +827,8 @@ check_cleanup_active_fd(ErtsSysFdType fd, if (free_select) free_drv_select_data(free_select); + if (free_nif) + free_nif_select_data(free_nif); #if ERTS_CIO_HAVE_DRV_EVENT if (free_event) free_drv_event_data(free_event); @@ -746,7 +853,7 @@ check_cleanup_active_fd(ErtsSysFdType fd, } static void -check_cleanup_active_fds(erts_aint_t current_cio_time) +check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep) { int six = pollset.active_fd.six; int eix = pollset.active_fd.eix; @@ -773,7 +880,8 @@ check_cleanup_active_fds(erts_aint_t current_cio_time) pctrl_entries, &pctrl_ix, #endif - current_cio_time)) { + current_cio_time, + may_sleep)) { no--; if (ix == six) { #ifdef DEBUG @@ -831,7 +939,6 @@ add_active_fd(ErtsSysFdType fd) int eix = pollset.active_fd.eix; int size = pollset.active_fd.size; - pollset.active_fd.array[eix] = fd; erts_smp_atomic32_set_relb(&pollset.active_fd.no, @@ -867,6 +974,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, ErtsDrvEventDataState *free_event = NULL; #endif ErtsDrvSelectDataState *free_select = NULL; + ErtsNifSelectDataState *free_nif = NULL; #ifdef USE_VM_PROBES DTRACE_CHARBUF(name, 64); #endif @@ -882,7 +990,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, return -1; } if (fd >= max_fds) { - select_large_fd_error(ix, fd, mode, on); + drv_select_large_fd_error(ix, fd, mode, on); return -1; } grow_drv_ev_state(fd); @@ -920,20 +1028,32 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, } #endif + switch (state->type) { #if ERTS_CIO_HAVE_DRV_EVENT - if (state->type == ERTS_EV_TYPE_DRV_EV) - drv_select_steal(ix, state, mode, on); + case ERTS_EV_TYPE_DRV_EV: #endif - if (state->type == ERTS_EV_TYPE_STOP_USE) { - erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); - print_drv_select_op(dsbufp, ix, state->fd, mode, on); - steal_pending_stop_use(dsbufp, ix, state, mode, on); - if (state->type == ERTS_EV_TYPE_STOP_USE) { - ret = 0; - goto done; /* stop_select still pending */ - } - ASSERT(state->type == ERTS_EV_TYPE_NONE); - } + case ERTS_EV_TYPE_NIF: + drv_select_steal(ix, state, mode, on); + break; + case ERTS_EV_TYPE_STOP_USE: { + erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); + print_drv_select_op(dsbufp, ix, state->fd, mode, on); + steal_pending_stop_use(dsbufp, ix, state, mode, on); + if (state->type == ERTS_EV_TYPE_STOP_USE) { + ret = 0; + goto done; /* stop_select still pending */ + } + ASSERT(state->type == ERTS_EV_TYPE_NONE); + break; + } + case ERTS_EV_TYPE_STOP_NIF: { + erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); + print_drv_select_op(dsbufp, ix, state->fd, mode, on); + steal_pending_stop_nif(dsbufp, NULL, state, mode, on); + ASSERT(state->type == ERTS_EV_TYPE_NONE); + break; + + }} if (mode & ERL_DRV_READ) { if (state->type == ERTS_EV_TYPE_DRV_SEL) { @@ -1037,7 +1157,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, else { /* Not safe to close fd, postpone stop_select callback. */ state->type = ERTS_EV_TYPE_STOP_USE; - state->driver.drv_ptr = drv_ptr; + state->driver.stop.drv_ptr = drv_ptr; if (drv_ptr->handle) { erts_ddll_reference_referenced_driver(drv_ptr->handle); } @@ -1054,7 +1174,8 @@ done: #if ERTS_CIO_HAVE_DRV_EVENT &free_event, #endif - &free_select); + &free_select, + &free_nif); done_unknown: erts_smp_mtx_unlock(fd_mtx(fd)); @@ -1067,6 +1188,9 @@ done_unknown: } if (free_select) free_drv_select_data(free_select); + if (free_nif) + free_nif_select_data(free_nif); + #if ERTS_CIO_HAVE_DRV_EVENT if (free_event) free_drv_event_data(free_event); @@ -1075,6 +1199,269 @@ done_unknown: } int +ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, + ErlNifEvent e, + enum ErlNifSelectFlags mode, + void* obj, + Eterm ref) +{ + int on; + const Eterm id = env->proc->common.id; + ErlNifResource* resource = DATA_TO_RESOURCE(obj); + ErtsSysFdType fd = (ErtsSysFdType) e; + ErtsPollEvents ctl_events = (ErtsPollEvents) 0; + ErtsPollEvents new_events, old_events; + ErtsDrvEventState *state; + int wake_poller; + int ret; + enum { NO_STOP=0, CALL_STOP, CALL_STOP_AND_RELEASE } call_stop = NO_STOP; +#if ERTS_CIO_HAVE_DRV_EVENT + ErtsDrvEventDataState *free_event = NULL; +#endif + ErtsDrvSelectDataState *free_select = NULL; + ErtsNifSelectDataState *free_nif = NULL; +#ifdef USE_VM_PROBES + DTRACE_CHARBUF(name, 64); +#endif + +#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS + if ((unsigned)fd >= (unsigned)erts_smp_atomic_read_nob(&drv_ev_state_len)) { + if (fd < 0) { + return -1; + } + if (fd >= max_fds) { + nif_select_large_fd_error(fd, mode, resource, ref); + return -1; + } + grow_drv_ev_state(fd); + } +#endif + + erts_smp_mtx_lock(fd_mtx(fd)); + +#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS + state = &drv_ev_state[(int) fd]; +#else + state = hash_get_drv_ev_state(fd); /* may be NULL! */ +#endif + + if (mode & ERL_NIF_SELECT_STOP) { + ASSERT(resource->type->stop); + if (IS_FD_UNKNOWN(state)) { + /* fast track to stop callback */ + call_stop = CALL_STOP; + ret = 0; + goto done_unknown; + } + on = 0; + mode = ERL_DRV_READ | ERL_DRV_WRITE | ERL_DRV_USE; + wake_poller = 1; /* to eject fd from pollset (if needed) */ + } + else { + on = 1; + ASSERT(mode); + wake_poller = 0; + } + +#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS + if (state == NULL) { + state = hash_new_drv_ev_state(fd); + } +#endif + + switch (state->type) { + case ERTS_EV_TYPE_NIF: + /* + * Changing resource is considered stealing. + * Changing process and/or ref is ok (I think?). + */ + if (state->driver.stop.resource != resource) + nif_select_steal(state, ERL_DRV_READ | ERL_DRV_WRITE, resource, ref); + break; +#if ERTS_CIO_HAVE_DRV_EVENT + case ERTS_EV_TYPE_DRV_EV: +#endif + case ERTS_EV_TYPE_DRV_SEL: + nif_select_steal(state, mode, resource, ref); + break; + case ERTS_EV_TYPE_STOP_USE: { + erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); + print_nif_select_op(dsbufp, fd, mode, resource, ref); + steal_pending_stop_use(dsbufp, ERTS_INVALID_ERL_DRV_PORT, state, mode, on); + ASSERT(state->type == ERTS_EV_TYPE_NONE); + break; + } + case ERTS_EV_TYPE_STOP_NIF: { + erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); + print_nif_select_op(dsbufp, fd, mode, resource, ref); + steal_pending_stop_nif(dsbufp, resource, state, mode, on); + if (state->type == ERTS_EV_TYPE_STOP_NIF) { + ret = 0; + goto done; + } + ASSERT(state->type == ERTS_EV_TYPE_NONE); + break; + }} + + ASSERT(state->type == ERTS_EV_TYPE_NONE || + state->type == ERTS_EV_TYPE_NIF); + + if (mode & ERL_DRV_READ) { + ctl_events |= ERTS_POLL_EV_IN; + } + if (mode & ERL_DRV_WRITE) { + ctl_events |= ERTS_POLL_EV_OUT; + } + + ASSERT((state->type == ERTS_EV_TYPE_NIF) || + (state->type == ERTS_EV_TYPE_NONE && !state->events)); + + new_events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, ctl_events, on, &wake_poller); + + if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { + if (state->type == ERTS_EV_TYPE_NIF && !state->events) { + state->type = ERTS_EV_TYPE_NONE; + state->flags &= ~ERTS_EV_FLAG_USED; + state->driver.nif->in.pid = NIL; + state->driver.nif->out.pid = NIL; + state->driver.nif->in.ddeselect_cnt = 0; + state->driver.nif->out.ddeselect_cnt = 0; + state->driver.stop.resource = NULL; + } + ret = -1; + goto done; + } + + old_events = state->events; + + ASSERT(on + ? (new_events == (state->events | ctl_events)) + : (new_events == (state->events & ~ctl_events))); + + ASSERT(state->type == ERTS_EV_TYPE_NIF + || state->type == ERTS_EV_TYPE_NONE); + + state->events = new_events; + if (ctl_events) { + if (on) { + Uint32* refn; + if (!state->driver.nif) + state->driver.nif = alloc_nif_select_data(); + if (state->type == ERTS_EV_TYPE_NONE) { + state->type = ERTS_EV_TYPE_NIF; + state->driver.stop.resource = resource; + enif_keep_resource(resource->data); + } + ASSERT(state->type == ERTS_EV_TYPE_NIF); + ASSERT(state->driver.stop.resource == resource); + if (ctl_events & ERTS_POLL_EV_IN) { + ASSERT(is_nil(state->driver.nif->in.pid)); + state->driver.nif->in.pid = id; + if (is_immed(ref)) { + state->driver.nif->in.immed = ref; + } else { + ASSERT(is_internal_ref(ref)); + refn = internal_ref_numbers(ref); + state->driver.nif->in.immed = THE_NON_VALUE; + state->driver.nif->in.refn[0] = refn[0]; + state->driver.nif->in.refn[1] = refn[1]; + state->driver.nif->in.refn[2] = refn[2]; + } + state->driver.nif->in.ddeselect_cnt = 0; + } + if (ctl_events & ERTS_POLL_EV_OUT) { + ASSERT(is_nil(state->driver.nif->out.pid)); + state->driver.nif->out.pid = id; + if (is_immed(ref)) { + state->driver.nif->out.immed = ref; + } else { + ASSERT(is_internal_ref(ref)); + refn = internal_ref_numbers(ref); + state->driver.nif->out.immed = THE_NON_VALUE; + state->driver.nif->out.refn[0] = refn[0]; + state->driver.nif->out.refn[1] = refn[1]; + state->driver.nif->out.refn[2] = refn[2]; + } + state->driver.nif->out.ddeselect_cnt = 0; + } + state->flags |= ERTS_EV_FLAG_USED; + } + else { /* off */ + if (state->type == ERTS_EV_TYPE_NIF) { + //erts_fprintf(stderr, "SVERK: enif select clear fd=%d inpid=%T inrsrc=%p\n", + // state->fd, state->driver.nif->inpid, + // state->driver.nif->in.resource); + state->driver.nif->in.pid = NIL; + state->driver.nif->out.pid = NIL; + state->driver.nif->in.ddeselect_cnt = 0; + state->driver.nif->out.ddeselect_cnt = 0; + state->flags &= ~ERTS_EV_FLAG_USED; + if (old_events != 0) { + remember_removed(state, &pollset); + } + } + ASSERT(new_events==0); + if (state->remove_cnt == 0 || !wake_poller) { + /* Safe to close fd now as it is not in pollset + or there was no need to eject fd (kernel poll) */ + //erts_fprintf(stderr, "SVERK : enif_select calling stop before return\n"); + if (state->type == ERTS_EV_TYPE_NIF) { + ASSERT(state->driver.stop.resource == resource); + call_stop = CALL_STOP_AND_RELEASE; + state->driver.stop.resource = NULL; + } + else { + ASSERT(!state->driver.stop.resource); + call_stop = CALL_STOP; + } + state->type = ERTS_EV_TYPE_NONE; + } + else { + /* Not safe to close fd, postpone stop_select callback. */ + //erts_fprintf(stderr, "SVERK: enif_select schedule stop\n"); + if (state->type == ERTS_EV_TYPE_NONE) { + ASSERT(!state->driver.stop.resource); + state->driver.stop.resource = resource; + enif_keep_resource(resource); + } + state->type = ERTS_EV_TYPE_STOP_NIF; + } + } + } + + ret = 0; + +done: + + check_fd_cleanup(state, +#if ERTS_CIO_HAVE_DRV_EVENT + &free_event, +#endif + &free_select, + &free_nif); + +done_unknown: + erts_smp_mtx_unlock(fd_mtx(fd)); + if (call_stop) { + erts_resource_stop(resource); + if (call_stop == CALL_STOP_AND_RELEASE) { + enif_release_resource(resource->data); + } + } + if (free_select) + free_drv_select_data(free_select); + if (free_nif) + free_nif_select_data(free_nif); + +#if ERTS_CIO_HAVE_DRV_EVENT + if (free_event) + free_drv_event_data(free_event); +#endif + return ret; +} + + +int ERTS_CIO_EXPORT(driver_event)(ErlDrvPort ix, ErlDrvEvent e, ErlDrvEventData event_data) @@ -1094,6 +1481,7 @@ ERTS_CIO_EXPORT(driver_event)(ErlDrvPort ix, ErtsDrvEventDataState *free_event; #endif ErtsDrvSelectDataState *free_select; + ErtsNifSelectDataState *free_nif; Port *prt = erts_drvport2port(ix); if (prt == ERTS_INVALID_ERL_DRV_PORT) @@ -1203,12 +1591,15 @@ done: #if ERTS_CIO_HAVE_DRV_EVENT &free_event, #endif - &free_select); + &free_select, + &free_nif); erts_smp_mtx_unlock(fd_mtx(fd)); if (free_select) free_drv_select_data(free_select); + if (free_nif) + free_nif_select_data(free_nif); #if ERTS_CIO_HAVE_DRV_EVENT if (free_event) free_drv_event_data(free_event); @@ -1244,13 +1635,19 @@ need2steal(ErtsDrvEventState *state, int mode) state, ERL_DRV_WRITE); break; + case ERTS_EV_TYPE_NIF: + ASSERT(state->driver.stop.resource); + do_steal = 1; + break; + #if ERTS_CIO_HAVE_DRV_EVENT case ERTS_EV_TYPE_DRV_EV: do_steal |= chk_stale(state->driver.event->port, state, 0); break; #endif case ERTS_EV_TYPE_STOP_USE: - ASSERT(0); + case ERTS_EV_TYPE_STOP_NIF: + ASSERT(0); break; default: break; @@ -1311,6 +1708,25 @@ steal(erts_dsprintf_buf_t *dsbufp, ErtsDrvEventState *state, int mode) erts_dsprintf(dsbufp, "\n"); break; } + case ERTS_EV_TYPE_NIF: { + Eterm iid = state->driver.nif->in.pid; + Eterm oid = state->driver.nif->out.pid; + const char* with = "with"; + ErlNifResourceType* rt = state->driver.stop.resource->type; + + erts_dsprintf(dsbufp, "resource %T:%T", rt->module, rt->name); + + if (is_not_nil(iid)) { + erts_dsprintf(dsbufp, " %s in-pid %T", with, iid); + with = "and"; + } + if (is_not_nil(oid)) { + erts_dsprintf(dsbufp, " %s out-pid %T", with, oid); + } + deselect(state, 0); + erts_dsprintf(dsbufp, "\n"); + break; + } #if ERTS_CIO_HAVE_DRV_EVENT case ERTS_EV_TYPE_DRV_EV: { Eterm eid = state->driver.event->port; @@ -1328,7 +1744,8 @@ steal(erts_dsprintf_buf_t *dsbufp, ErtsDrvEventState *state, int mode) break; } #endif - case ERTS_EV_TYPE_STOP_USE: { + case ERTS_EV_TYPE_STOP_USE: + case ERTS_EV_TYPE_STOP_NIF: { ASSERT(0); break; } @@ -1358,6 +1775,23 @@ print_drv_select_op(erts_dsprintf_buf_t *dsbufp, } static void +print_nif_select_op(erts_dsprintf_buf_t *dsbufp, + ErtsSysFdType fd, int mode, + ErlNifResource* resource, Eterm ref) +{ + erts_dsprintf(dsbufp, + "enif_select(_, %d,%s%s%s, %T:%T, %T) ", + (int) GET_FD(fd), + mode & ERL_NIF_SELECT_READ ? " READ" : "", + mode & ERL_NIF_SELECT_WRITE ? " WRITE" : "", + mode & ERL_NIF_SELECT_STOP ? " STOP" : "", + resource->type->module, + resource->type->name, + ref); +} + + +static void drv_select_steal(ErlDrvPort ix, ErtsDrvEventState *state, int mode, int on) { if (need2steal(state, mode)) { @@ -1368,6 +1802,18 @@ drv_select_steal(ErlDrvPort ix, ErtsDrvEventState *state, int mode, int on) } } +static void +nif_select_steal(ErtsDrvEventState *state, int mode, + ErlNifResource* resource, Eterm ref) +{ + if (need2steal(state, mode)) { + erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); + print_nif_select_op(dsbufp, state->fd, mode, resource, ref); + steal(dsbufp, state, mode); + erts_send_error_to_logger_nogl(dsbufp); + } +} + #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS static void large_fd_error_common(erts_dsprintf_buf_t *dsbufp, ErtsSysFdType fd) @@ -1378,7 +1824,7 @@ large_fd_error_common(erts_dsprintf_buf_t *dsbufp, ErtsSysFdType fd) } static void -select_large_fd_error(ErlDrvPort ix, ErtsSysFdType fd, int mode, int on) +drv_select_large_fd_error(ErlDrvPort ix, ErtsSysFdType fd, int mode, int on) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); print_drv_select_op(dsbufp, ix, fd, mode, on); @@ -1386,6 +1832,16 @@ select_large_fd_error(ErlDrvPort ix, ErtsSysFdType fd, int mode, int on) large_fd_error_common(dsbufp, fd); erts_send_error_to_logger_nogl(dsbufp); } +static void +nif_select_large_fd_error(ErtsSysFdType fd, int mode, + ErlNifResource* resource, Eterm ref) +{ + erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); + print_nif_select_op(dsbufp, fd, mode, resource, ref); + erts_dsprintf(dsbufp, "failed: "); + large_fd_error_common(dsbufp, fd); + erts_send_error_to_logger_nogl(dsbufp); +} #endif /* ERTS_SYS_CONTINOUS_FD_NUMBERS */ @@ -1394,38 +1850,78 @@ static void steal_pending_stop_use(erts_dsprintf_buf_t *dsbufp, ErlDrvPort ix, ErtsDrvEventState *state, int mode, int on) { + int cancel = 0; ASSERT(state->type == ERTS_EV_TYPE_STOP_USE); - erts_dsprintf(dsbufp, "failed: fd=%d (re)selected before stop_select " - "was called for driver %s\n", - (int) GET_FD(state->fd), state->driver.drv_ptr->name); - erts_send_error_to_logger_nogl(dsbufp); if (on) { /* Either fd-owner changed its mind about closing * or closed fd before stop_select callback and fd is now reused. * In either case stop_select should not be called. - */ - state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; - if (state->driver.drv_ptr->handle) { - erts_ddll_dereference_driver(state->driver.drv_ptr->handle); - } - state->driver.drv_ptr = NULL; + */ + cancel = 1; } else if ((mode & ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) { Port *prt = erts_drvport2port(ix); - erts_driver_t* drv_ptr = prt != ERTS_INVALID_ERL_DRV_PORT ? prt->drv_ptr : NULL; - if (drv_ptr && drv_ptr != state->driver.drv_ptr) { - /* Some other driver wants the stop_select callback */ - if (state->driver.drv_ptr->handle) { - erts_ddll_dereference_driver(state->driver.drv_ptr->handle); - } - if (drv_ptr->handle) { - erts_ddll_reference_referenced_driver(drv_ptr->handle); - } - state->driver.drv_ptr = drv_ptr; - } + if (prt == ERTS_INVALID_ERL_DRV_PORT + || prt->drv_ptr != state->driver.stop.drv_ptr) { + /* Some other driver or nif wants the stop_select callback */ + cancel = 1; + } + } + + if (cancel) { + erts_dsprintf(dsbufp, "called before stop_select was called for driver '%s'\n", + state->driver.stop.drv_ptr->name); + if (state->driver.stop.drv_ptr->handle) { + erts_ddll_dereference_driver(state->driver.stop.drv_ptr->handle); + } + state->type = ERTS_EV_TYPE_NONE; + state->flags &= ~ERTS_EV_FLAG_USED; + state->driver.stop.drv_ptr = NULL; + } + else { + erts_dsprintf(dsbufp, "ignored repeated call\n"); + } + erts_send_error_to_logger_nogl(dsbufp); +} + +static void +steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErlNifResource* resource, + ErtsDrvEventState *state, int mode, int on) +{ + int cancel = 0; + + ASSERT(state->type == ERTS_EV_TYPE_STOP_NIF); + ASSERT(state->driver.stop.resource); + + if (on) { + ASSERT(mode & (ERL_NIF_SELECT_READ | ERL_NIF_SELECT_WRITE)); + /* Either fd-owner changed its mind about closing + * or closed fd before stop callback and fd is now reused. + * In either case, stop should not be called. + */ + cancel = 1; } + else if ((mode & ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE + && resource != state->driver.stop.resource) { + /* Some driver or other resource wants the stop callback */ + cancel = 1; + } + + if (cancel) { + ErlNifResourceType* rt = state->driver.stop.resource->type; + erts_dsprintf(dsbufp, "called before stop was called for NIF resource %T:%T\n", + rt->module, rt->name); + + enif_release_resource(state->driver.stop.resource); + state->type = ERTS_EV_TYPE_NONE; + state->flags &= ~ERTS_EV_FLAG_USED; + state->driver.stop.resource = NULL; + } + else { + erts_dsprintf(dsbufp, "ignored repeated call\n"); + } + erts_send_error_to_logger_nogl(dsbufp); } @@ -1557,6 +2053,48 @@ oready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time) } } +static ERTS_INLINE void +send_event_tuple(struct erts_nif_select_event* e, ErlNifResource* resource, + Eterm event_atom) +{ + Process* rp = erts_proc_lookup(e->pid); + ErtsProcLocks rp_locks = 0; + ErtsMessage* mp; + ErlOffHeap* ohp; + ErtsBinary* bin; + Eterm* hp; + Uint hsz = 5 + PROC_BIN_SIZE + REF_THING_SIZE; /* {select, Resource, Ref, EventAtom} */ + Eterm resource_term, ref_term, tuple; + + if (!rp) { + erts_fprintf(stderr, "SVERK: Process %T not alive for msg %T\n", e->pid, event_atom); + return; + } + + bin = ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(resource); + + mp = erts_alloc_message_heap(rp, &rp_locks, hsz, &hp, &ohp); + + resource_term = erts_mk_magic_binary_term(&hp, ohp, &bin->binary); + if (is_value(e->immed)) { + ASSERT(is_immed(e->immed)); + ref_term = e->immed; + } + else { + write_ref_thing(hp, e->refn[0], e->refn[1], e->refn[2]); + ref_term = make_internal_ref(hp); + } + hp += REF_THING_SIZE; + tuple = TUPLE4(hp, am_select, resource_term, ref_term, event_atom); + + ERL_MESSAGE_TOKEN(mp) = am_undefined; + erts_queue_message(rp, rp_locks, mp, tuple, am_system); + + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); +} + + #if ERTS_CIO_HAVE_DRV_EVENT static ERTS_INLINE void eready(Eterm id, ErtsDrvEventState *state, ErlDrvEventData event_data, @@ -1602,6 +2140,8 @@ ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(int set, ERTS_CIO_POLL_INTR_TMD(pollset.ps, set, timeout_time); } +#define ERTS_NIF_DELAYED_DESELECT 20 + void ERTS_CIO_EXPORT(erts_check_io)(int do_wait) { @@ -1635,7 +2175,8 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) current_cio_time++; erts_smp_atomic_set_relb(&erts_check_io_time, current_cio_time); - check_cleanup_active_fds(current_cio_time); + check_cleanup_active_fds(current_cio_time, + timeout_time != ERTS_POLL_NO_TIMEOUT); #ifdef ERTS_ENABLE_LOCK_CHECK erts_lc_check_exact(NULL, 0); /* No locks should be locked */ @@ -1741,6 +2282,69 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) break; } + case ERTS_EV_TYPE_NIF: { /* Requested via enif_select()... */ + struct erts_nif_select_event in = {NIL}; + struct erts_nif_select_event out = {NIL}; + ErlNifResource* resource; + ErtsPollEvents revents = pollres[i].events; + + if (revents & ERTS_POLL_EV_ERR) { + /* + * Handle error events by triggering all in/out events + * that the NIF has selected. + * We *do not* want to send a message that corresponds + * to an event not selected. + */ + revents = state->events; + } + else { + revents &= (state->events | ERTS_POLL_EV_NVAL); + } + + if (revents & (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) { + if (revents & ERTS_POLL_EV_OUT) { + if (is_not_nil(state->driver.nif->out.pid)) { + out = state->driver.nif->out; + resource = state->driver.stop.resource; + state->driver.nif->out.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT; + state->driver.nif->out.pid = NIL; + add_active_fd(state->fd); + } + else { + ASSERT(state->driver.nif->out.ddeselect_cnt >= 2); + state->driver.nif->out.ddeselect_cnt--; + } + } + if (revents & ERTS_POLL_EV_IN) { + if (is_not_nil(state->driver.nif->in.pid)) { + in = state->driver.nif->in; + resource = state->driver.stop.resource; + state->driver.nif->in.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT; + state->driver.nif->in.pid = NIL; + add_active_fd(state->fd); + } + else { + ASSERT(state->driver.nif->in.ddeselect_cnt >= 2); + state->driver.nif->in.ddeselect_cnt--; + } + } + } + else if (revents & ERTS_POLL_EV_NVAL) { + abort(); + } + +#ifdef ERTS_SMP + erts_smp_mtx_unlock(fd_mtx(fd)); +#endif + if (is_not_nil(in.pid)) { + send_event_tuple(&in, resource, am_ready_input); + } + if (is_not_nil(out.pid)) { + send_event_tuple(&out, resource, am_ready_output); + } + goto next_pollres_unlocked; + } + #if ERTS_CIO_HAVE_DRV_EVENT case ERTS_EV_TYPE_DRV_EV: { /* Requested via driver_event()... */ ErlDrvEventData event_data; @@ -1781,6 +2385,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) #ifdef ERTS_SMP erts_smp_mtx_unlock(fd_mtx(fd)); #endif + next_pollres_unlocked:; } erts_smp_atomic_set_nob(&pollset.in_poll_wait, 0); @@ -2306,6 +2911,39 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters) } } } + else if (state->type == ERTS_EV_TYPE_NIF) { + ErlNifResource* r; + erts_printf("enif_select "); + +#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS + if (internal) { + erts_printf("internal "); + err = 1; + } + + if (cio_events == ep_events) { + erts_printf("ev="); + if (print_events(cio_events) != 0) + err = 1; + } + else { + err = 1; + erts_printf("cio_ev="); + print_events(cio_events); + erts_printf(" ep_ev="); + print_events(ep_events); + } +#else + if (print_events(cio_events) != 0) + err = 1; +#endif + erts_printf(" inpid=%T dd_cnt=%b32d", state->driver.nif->in.pid, + state->driver.nif->in.ddeselect_cnt); + erts_printf(" outpid=%T dd_cnt=%b32d", state->driver.nif->out.pid, + state->driver.nif->out.ddeselect_cnt); + r = state->driver.stop.resource; + erts_printf(" resource=%p(%T:%T)", r, r->type->module, r->type->name); + } #if ERTS_CIO_HAVE_DRV_EVENT else if (state->type == ERTS_EV_TYPE_DRV_EV) { Eterm id; @@ -2362,11 +3000,12 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters) erts_printf("control_type=%d ", (int)state->type); #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS if (cio_events == ep_events) { - erts_printf("ev=0x%b32x", (Uint32) cio_events); + erts_printf("ev="); + print_events(cio_events); } else { - erts_printf("cio_ev=0x%b32x", (Uint32) cio_events); - erts_printf(" ep_ev=0x%b32x", (Uint32) ep_events); + erts_printf("cio_ev="); print_events(cio_events); + erts_printf(" ep_ev="); print_events(ep_events); } #else erts_printf("ev=0x%b32x", (Uint32) cio_events); @@ -2395,7 +3034,7 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip) #if ERTS_CIO_HAVE_DRV_EVENT null_des.driver.event = NULL; #endif - null_des.driver.drv_ptr = NULL; + null_des.driver.stop.drv_ptr = NULL; null_des.events = 0; null_des.remove_cnt = 0; null_des.type = ERTS_EV_TYPE_NONE; diff --git a/erts/emulator/sys/common/erl_check_io.h b/erts/emulator/sys/common/erl_check_io.h index 14f1ea3f43..242e7cfcf5 100644 --- a/erts/emulator/sys/common/erl_check_io.h +++ b/erts/emulator/sys/common/erl_check_io.h @@ -34,6 +34,8 @@ int driver_select_kp(ErlDrvPort, ErlDrvEvent, int, int); int driver_select_nkp(ErlDrvPort, ErlDrvEvent, int, int); +int enif_select_kp(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, Eterm); +int enif_select_nkp(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, Eterm); int driver_event_kp(ErlDrvPort, ErlDrvEvent, ErlDrvEventData); int driver_event_nkp(ErlDrvPort, ErlDrvEvent, ErlDrvEventData); Uint erts_check_io_size_kp(void); @@ -136,4 +138,20 @@ typedef struct { ErtsIoTask iniotask; ErtsIoTask outiotask; } ErtsDrvSelectDataState; + +struct erts_nif_select_event { + Eterm pid; + Eterm immed; + Uint32 refn[ERTS_REF_NUMBERS]; + Sint32 ddeselect_cnt; /* 0: No delayed deselect in progress + * 1: Do deselect before next poll + * >1: Countdown of ignored events + */ +}; + +typedef struct { + struct erts_nif_select_event in; + struct erts_nif_select_event out; +} ErtsNifSelectDataState; + #endif /* #ifndef ERL_CHECK_IO_INTERNAL__ */ diff --git a/erts/emulator/sys/common/erl_poll.c b/erts/emulator/sys/common/erl_poll.c index b8a28bcc18..0c4df3e13a 100644 --- a/erts/emulator/sys/common/erl_poll.c +++ b/erts/emulator/sys/common/erl_poll.c @@ -3030,6 +3030,7 @@ ERTS_POLL_EXPORT(erts_poll_get_selected_events)(ErtsPollSet ps, ev[fd] = 0; else { ev[fd] = ps->fds_status[fd].events; + ASSERT(ps->fds_status[fd].used_events == ev[fd]); if ( #if ERTS_POLL_USE_WAKEUP_PIPE fd == ps->wake_fds[0] || fd == ps->wake_fds[1] || diff --git a/erts/emulator/sys/unix/sys.c b/erts/emulator/sys/unix/sys.c index 4b2edace0a..789b455f2d 100644 --- a/erts/emulator/sys/unix/sys.c +++ b/erts/emulator/sys/unix/sys.c @@ -161,6 +161,7 @@ int erts_use_kernel_poll = 0; struct { int (*select)(ErlDrvPort, ErlDrvEvent, int, int); + int (*enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, Eterm); int (*event)(ErlDrvPort, ErlDrvEvent, ErlDrvEventData); void (*check_io_as_interrupt)(void); void (*check_io_interrupt)(int); @@ -184,6 +185,13 @@ driver_event(ErlDrvPort port, ErlDrvEvent event, ErlDrvEventData event_data) return (*io_func.event)(port, event, event_data); } +int enif_select(ErlNifEnv* env, ErlNifEvent event, + enum ErlNifSelectFlags flags, void* obj, Eterm ref) +{ + return (*io_func.enif_select)(env, event, flags, obj, ref); +} + + Eterm erts_check_io_info(void *p) { return (*io_func.info)(p); @@ -201,6 +209,7 @@ init_check_io(void) { if (erts_use_kernel_poll) { io_func.select = driver_select_kp; + io_func.enif_select = enif_select_kp; io_func.event = driver_event_kp; #ifdef ERTS_POLL_NEED_ASYNC_INTERRUPT_SUPPORT io_func.check_io_as_interrupt = erts_check_io_async_sig_interrupt_kp; @@ -216,6 +225,7 @@ init_check_io(void) } else { io_func.select = driver_select_nkp; + io_func.enif_select = enif_select_nkp; io_func.event = driver_event_nkp; #ifdef ERTS_POLL_NEED_ASYNC_INTERRUPT_SUPPORT io_func.check_io_as_interrupt = erts_check_io_async_sig_interrupt_nkp; |