diff options
Diffstat (limited to 'erts/emulator/sys/common/erl_check_io.c')
-rw-r--r-- | erts/emulator/sys/common/erl_check_io.c | 529 |
1 files changed, 351 insertions, 178 deletions
diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c index f421794f91..d413659f81 100644 --- a/erts/emulator/sys/common/erl_check_io.c +++ b/erts/emulator/sys/common/erl_check_io.c @@ -46,11 +46,11 @@ #if 0 #define DEBUG_PRINT(FMT, ...) erts_printf(FMT "\r\n", ##__VA_ARGS__) #define DEBUG_PRINT_FD(FMT, STATE, ...) \ - DEBUG_PRINT("%d: " FMT " (ev=%s, ac=%s, flg=%d)", \ + DEBUG_PRINT("%d: " FMT " (ev=%s, ac=%s, flg=%s)", \ (STATE) ? (STATE)->fd : (ErtsSysFdType)-1, ##__VA_ARGS__, \ ev2str((STATE) ? (STATE)->events : ERTS_POLL_EV_NONE), \ ev2str((STATE) ? (STATE)->active_events : ERTS_POLL_EV_NONE), \ - (STATE) ? (STATE)->flags : ERTS_EV_FLAG_CLEAR) + (STATE) ? flag2str((STATE)->flags) : ERTS_EV_FLAG_CLEAR) #define DEBUG_PRINT_MODE #else #define DEBUG_PRINT(...) @@ -76,22 +76,40 @@ typedef enum { typedef enum { ERTS_EV_FLAG_CLEAR = 0, ERTS_EV_FLAG_USED = 1, /* ERL_DRV_USE has been turned on */ -#ifdef ERTS_ENABLE_KERNEL_POLL - ERTS_EV_FLAG_FALLBACK = 2, /* Set when kernel poll rejected fd +#if ERTS_POLL_USE_SCHEDULER_POLLING + ERTS_EV_FLAG_SCHEDULER = 2, /* Set when the fd has been migrated + to scheduler pollset */ + ERTS_EV_FLAG_IN_SCHEDULER = 4, /* Set when the fd is currently in + scheduler pollset */ +#else + ERTS_EV_FLAG_SCHEDULER = ERTS_EV_FLAG_CLEAR, + ERTS_EV_FLAG_IN_SCHEDULER = ERTS_EV_FLAG_CLEAR, +#endif +#ifdef ERTS_POLL_USE_FALLBACK + ERTS_EV_FLAG_FALLBACK = 8, /* Set when kernel poll rejected fd and it was put in the nkp version */ #else ERTS_EV_FLAG_FALLBACK = ERTS_EV_FLAG_CLEAR, #endif /* Combinations */ - ERTS_EV_FLAG_USED_FALLBACK = ERTS_EV_FLAG_USED | ERTS_EV_FLAG_FALLBACK + ERTS_EV_FLAG_USED_FALLBACK = ERTS_EV_FLAG_USED | ERTS_EV_FLAG_FALLBACK, + ERTS_EV_FLAG_USED_SCHEDULER = ERTS_EV_FLAG_USED | ERTS_EV_FLAG_SCHEDULER, + ERTS_EV_FLAG_USED_IN_SCHEDULER = ERTS_EV_FLAG_USED | ERTS_EV_FLAG_SCHEDULER | ERTS_EV_FLAG_IN_SCHEDULER, + ERTS_EV_FLAG_UNUSED_SCHEDULER = ERTS_EV_FLAG_SCHEDULER, + ERTS_EV_FLAG_UNUSED_IN_SCHEDULER = ERTS_EV_FLAG_SCHEDULER | ERTS_EV_FLAG_IN_SCHEDULER } EventStateFlags; #define flag2str(flags) \ ((flags) == ERTS_EV_FLAG_CLEAR ? "CLEAR" : \ ((flags) == ERTS_EV_FLAG_USED ? "USED" : \ ((flags) == ERTS_EV_FLAG_FALLBACK ? "FLBK" : \ - ((flags) == ERTS_EV_FLAG_USED_FALLBACK ? "USED|FLBK" : "ERROR")))) + ((flags) == ERTS_EV_FLAG_USED_FALLBACK ? "USED|FLBK" : \ + ((flags) == ERTS_EV_FLAG_USED_SCHEDULER ? "USED|SCHD" : \ + ((flags) == ERTS_EV_FLAG_UNUSED_SCHEDULER ? "SCHD" : \ + ((flags) == ERTS_EV_FLAG_USED_IN_SCHEDULER ? "USED|IN_SCHD" : \ + ((flags) == ERTS_EV_FLAG_UNUSED_IN_SCHEDULER ? "IN_SCHD" : \ + "ERROR")))))))) /* How many events that can be handled at once by one erts_poll_wait call */ #define ERTS_CHECK_IO_POLL_RES_LEN 512 @@ -105,6 +123,7 @@ typedef struct erts_poll_thread { ErtsPollSet *ps; ErtsPollResFd *pollres; + ErtsThrPrgrData *tpd; int pollres_len; } ErtsPollThread; @@ -112,10 +131,13 @@ typedef struct erts_poll_thread * Which pollset to use is determined by hashing the fd. */ static ErtsPollSet **pollsetv; +static ErtsPollThread *psiv; #if ERTS_POLL_USE_FALLBACK static ErtsPollSet *flbk_pollset; #endif -static ErtsPollThread *psiv; +#if ERTS_POLL_USE_SCHEDULER_POLLING +static ErtsPollSet *sched_pollset; +#endif typedef struct { #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS @@ -130,10 +152,12 @@ typedef struct { ErtsResource* resource; /* ERTS_EV_TYPE_STOP_NIF */ } stop; } driver; - ErtsPollEvents events; /* The events that have been selected upon */ + ErtsPollEvents events; /* The events that have been selected upon */ ErtsPollEvents active_events; /* The events currently active in the pollset */ EventStateType type; EventStateFlags flags; + int count; /* Number of times this fd has triggered + without being deselected. */ } ErtsDrvEventState; struct drv_ev_state_shared { @@ -370,12 +394,22 @@ get_pollset(ErtsSysFdType fd) #if ERTS_POLL_USE_FALLBACK static ERTS_INLINE ErtsPollSet * -get_fallback(void) +get_fallback_pollset(void) { return flbk_pollset; } #endif +static ERTS_INLINE ErtsPollSet * +get_scheduler_pollset(ErtsSysFdType fd) +{ +#if ERTS_POLL_USE_SCHEDULER_POLLING + return sched_pollset; +#else + return get_pollset(fd); +#endif +} + /* * Place a fd within a pollset. This will automatically use * the fallback ps if needed. @@ -391,18 +425,27 @@ erts_io_control_wakeup(ErtsDrvEventState *state, ErtsPollOp op, ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd))); if (!(flags & ERTS_EV_FLAG_FALLBACK)) { - res = erts_poll_control(get_pollset(fd), fd, op, pe, wake_poller); + + if (op == ERTS_POLL_OP_DEL && (flags & ERTS_EV_FLAG_SCHEDULER)) { + erts_poll_control(get_scheduler_pollset(fd), fd, op, pe, wake_poller); + flags &= ~ERTS_EV_FLAG_IN_SCHEDULER; + } + if (!(flags & ERTS_EV_FLAG_IN_SCHEDULER) || (pe & ERTS_POLL_EV_OUT)) { + res = erts_poll_control(get_pollset(fd), fd, op, pe, wake_poller); + } else { + res = erts_poll_control(get_scheduler_pollset(fd), fd, op, pe, wake_poller); + } #if ERTS_POLL_USE_FALLBACK if (op == ERTS_POLL_OP_ADD && res == ERTS_POLL_EV_NVAL) { /* When an add fails with NVAL, the poll/kevent operation could not put that fd in the pollset, so we instead put it into a fallback pollset */ state->flags |= ERTS_EV_FLAG_FALLBACK; - res = erts_poll_control_flbk(get_fallback(), fd, op, pe, wake_poller); + res = erts_poll_control_flbk(get_fallback_pollset(), fd, op, pe, wake_poller); } } else { ASSERT(op != ERTS_POLL_OP_ADD); - res = erts_poll_control_flbk(get_fallback(), fd, op, pe, wake_poller); + res = erts_poll_control_flbk(get_fallback_pollset(), fd, op, pe, wake_poller); #endif } @@ -425,59 +468,77 @@ erts_io_notify_port_task_executed(ErtsPortTaskType type, ErtsIoTask *itp = ErtsContainerStruct(pthp, ErtsIoTask, task); ErtsSysFdType fd = itp->fd; erts_mtx_t *mtx = fd_mtx(fd); - int active_events; + ErtsPollOp op = ERTS_POLL_OP_MOD; + int active_events, new_events = 0; ErtsDrvEventState *state; ErtsDrvSelectDataState *free_select = NULL; ErtsNifSelectDataState *free_nif = NULL; + ERTS_MSACC_PUSH_AND_SET_STATE_M_X(ERTS_MSACC_STATE_CHECK_IO); + erts_mtx_lock(mtx); state = get_drv_ev_state(fd); + reset_handle(pthp); + active_events = state->active_events; - switch (type) { - case ERTS_PORT_TASK_INPUT: + if (!(state->flags & ERTS_EV_FLAG_IN_SCHEDULER) || type == ERTS_PORT_TASK_OUTPUT) { + switch (type) { + case ERTS_PORT_TASK_INPUT: + + DEBUG_PRINT_FD("executed ready_input", state); + + ASSERT(!(state->active_events & ERTS_POLL_EV_IN)); + if (state->events & ERTS_POLL_EV_IN) { + active_events |= ERTS_POLL_EV_IN; + if (state->count > 10 && ERTS_POLL_USE_SCHEDULER_POLLING) { + if (!(state->flags & ERTS_EV_FLAG_SCHEDULER)) + op = ERTS_POLL_OP_ADD; + state->flags |= ERTS_EV_FLAG_IN_SCHEDULER|ERTS_EV_FLAG_SCHEDULER; + new_events = ERTS_POLL_EV_IN; + DEBUG_PRINT_FD("moving to scheduler ps", state); + } else + new_events = active_events; + if (!(state->flags & ERTS_EV_FLAG_FALLBACK) && ERTS_POLL_USE_SCHEDULER_POLLING) + state->count++; + } + break; + case ERTS_PORT_TASK_OUTPUT: - DEBUG_PRINT_FD("executed ready_input", state); + DEBUG_PRINT_FD("executed ready_output", state); - ASSERT(!(state->active_events & ERTS_POLL_EV_IN)); - if (state->events & ERTS_POLL_EV_IN) - active_events |= ERTS_POLL_EV_IN; - break; - case ERTS_PORT_TASK_OUTPUT: + ASSERT(!(state->active_events & ERTS_POLL_EV_OUT)); + if (state->events & ERTS_POLL_EV_OUT) { + active_events |= ERTS_POLL_EV_OUT; + if (state->flags & ERTS_EV_FLAG_IN_SCHEDULER && active_events & ERTS_POLL_EV_IN) + new_events = ERTS_POLL_EV_OUT; + else + new_events = active_events; + } + break; + default: + erts_exit(ERTS_ABORT_EXIT, "Invalid IO port task type"); + break; + } - DEBUG_PRINT_FD("executed ready_output", state); + if (state->active_events != active_events && new_events) { + state->active_events = active_events; + new_events = erts_io_control(state, op, new_events); + } - ASSERT(!(state->active_events & ERTS_POLL_EV_OUT)); - if (state->events & ERTS_POLL_EV_OUT) - active_events |= ERTS_POLL_EV_OUT; - break; - default: - erts_exit(ERTS_ABORT_EXIT, "Invalid IO port task type"); - break; + /* We were unable to re-insert the fd into the pollset, signal the callback. */ + if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { + if (state->active_events & ERTS_POLL_EV_IN) + iready(state->driver.select->inport, state); + if (state->active_events & ERTS_POLL_EV_OUT) + oready(state->driver.select->outport, state); + state->active_events = 0; + } } - reset_handle(pthp); - - if (active_events) { - /* This is not needed if active_events has not changed */ - if (state->active_events != active_events) { - ErtsPollEvents new_events; - state->active_events = active_events; - new_events = erts_io_control(state, ERTS_POLL_OP_MOD, active_events); - - /* We were unable to re-insert the fd into the pollset, signal the callback. */ - if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { - if (active_events & ERTS_POLL_EV_IN) - iready(state->driver.select->inport, state); - if (active_events & ERTS_POLL_EV_OUT) - oready(state->driver.select->outport, state); - state->active_events = 0; - } - } - } else { + if (!active_events) check_fd_cleanup(state, &free_select, &free_nif); - } erts_mtx_unlock(mtx); @@ -485,6 +546,8 @@ erts_io_notify_port_task_executed(ErtsPortTaskType type, free_drv_select_data(free_select); if (free_nif) free_nif_select_data(free_nif); + + ERTS_MSACC_POP_STATE_M_X(); } static ERTS_INLINE void @@ -528,6 +591,96 @@ abort_tasks(ErtsDrvEventState *state, int mode) } } +static void prepare_select_msg(struct erts_nif_select_event* e, + enum ErlNifSelectFlags mode, + Eterm recipient, + ErtsResource* resource, + Eterm msg, + ErlNifEnv* msg_env, + Eterm event_atom) +{ + ErtsMessage* mp; + Eterm* hp; + Uint hsz; + + if (is_not_nil(e->pid)) { + ASSERT(e->mp); + erts_cleanup_messages(e->mp); + } + + if (mode & ERL_NIF_SELECT_CUSTOM_MSG) { + if (msg_env) { + mp = erts_create_message_from_nif_env(msg_env); + ERL_MESSAGE_TERM(mp) = msg; + } + else { + hsz = size_object(msg); + mp = erts_alloc_message(hsz, &hp); + ERL_MESSAGE_TERM(mp) = copy_struct(msg, hsz, &hp, &mp->hfrag.off_heap); + } + } + else { + ErtsBinary* bin; + Eterm resource_term, ref_term, tuple; + Eterm* hp_start; + + /* {select, Resource, Ref, EventAtom} */ + hsz = 5 + ERTS_MAGIC_REF_THING_SIZE; + if (is_internal_ref(msg)) + hsz += ERTS_REF_THING_SIZE; + else + ASSERT(is_immed(msg)); + + mp = erts_alloc_message(hsz, &hp); + hp_start = hp; + + bin = ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(resource); + resource_term = erts_mk_magic_ref(&hp, &mp->hfrag.off_heap, &bin->binary); + if (is_internal_ref(msg)) { + Uint32* refn = internal_ref_numbers(msg); + write_ref_thing(hp, refn[0], refn[1], refn[2]); + ref_term = make_internal_ref(hp); + hp += ERTS_REF_THING_SIZE; + } + else { + ASSERT(is_immed(msg)); + ref_term = msg; + } + tuple = TUPLE4(hp, am_select, resource_term, ref_term, event_atom); + hp += 5; + ERL_MESSAGE_TERM(mp) = tuple; + ASSERT(hp == hp_start + hsz); (void)hp_start; + } + + ASSERT(is_not_nil(recipient)); + e->pid = recipient; + e->mp = mp; +} + +static ERTS_INLINE void send_select_msg(struct erts_nif_select_event* e) +{ + Process* rp = erts_proc_lookup(e->pid); + + ASSERT(is_internal_pid(e->pid)); + if (!rp) { + erts_cleanup_messages(e->mp); + return; + } + + erts_queue_message(rp, 0, e->mp, ERL_MESSAGE_TERM(e->mp), am_system); +} + +static void clear_select_event(struct erts_nif_select_event* e) +{ + if (is_not_nil(e->pid)) { + /* Discard unsent message */ + ASSERT(e->mp); + erts_cleanup_messages(e->mp); + e->mp = NULL; + e->pid = NIL; + } +} + static void deselect(ErtsDrvEventState *state, int mode) { @@ -558,8 +711,8 @@ deselect(ErtsDrvEventState *state, int mode) erts_io_control(state, ERTS_POLL_OP_DEL, 0); switch (state->type) { case ERTS_EV_TYPE_NIF: - state->driver.nif->in.pid = NIL; - state->driver.nif->out.pid = NIL; + clear_select_event(&state->driver.nif->in); + clear_select_event(&state->driver.nif->out); enif_release_resource(state->driver.stop.resource->data); state->driver.stop.resource = NULL; break; @@ -755,11 +908,22 @@ driver_select(ErlDrvPort ix, ErlDrvEvent e, int mode, int on) if (old_events == 0 && !(state->flags & ERTS_EV_FLAG_USED)) { ctl_op = ERTS_POLL_OP_ADD; } + new_events = state->active_events; + if (state->flags & ERTS_EV_FLAG_IN_SCHEDULER) + new_events &= ~ERTS_POLL_EV_IN; } else { ctl_events &= old_events; state->events &= ~ctl_events; state->active_events &= ~ctl_events; + new_events = state->active_events; + + if (ctl_events & ERTS_POLL_EV_IN) { + state->count = 0; + if (state->flags & ERTS_EV_FLAG_IN_SCHEDULER) { + new_events = 0; + } + } if (!state->events) { if (!(state->flags & ERTS_EV_FLAG_USED) || mode & ERL_DRV_USE) @@ -770,7 +934,7 @@ driver_select(ErlDrvPort ix, ErlDrvEvent e, int mode, int on) if (ctl_events || ctl_op == ERTS_POLL_OP_DEL) { new_events = erts_io_control_wakeup(state, ctl_op, - state->active_events, + new_events, &wake_poller); ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL || state->type == ERTS_EV_TYPE_NONE); @@ -802,6 +966,7 @@ driver_select(ErlDrvPort ix, ErlDrvEvent e, int mode, int on) if (ctl_events & ERTS_POLL_EV_IN) { abort_tasks(state, ERL_DRV_READ); state->driver.select->inport = NIL; + state->flags &= ~ERTS_EV_FLAG_IN_SCHEDULER; } if (ctl_events & ERTS_POLL_EV_OUT) { abort_tasks(state, ERL_DRV_WRITE); @@ -810,6 +975,8 @@ driver_select(ErlDrvPort ix, ErlDrvEvent e, int mode, int on) if (state->events == 0) { if ((mode & ERL_DRV_USE) || !(state->flags & ERTS_EV_FLAG_USED)) { state->type = ERTS_EV_TYPE_NONE; + if (state->flags & ERTS_EV_FLAG_SCHEDULER) + erts_atomic32_read_bor_nob(&prt->state, ERTS_PORT_SFLG_CHECK_FD_CLEANUP); state->flags = 0; } /*else keep it, as fd will probably be selected upon again */ @@ -866,12 +1033,21 @@ done_unknown: } int -enif_select(ErlNifEnv* env, - ErlNifEvent e, - enum ErlNifSelectFlags mode, - void* obj, - const ErlNifPid* pid, - Eterm ref) +enif_select(ErlNifEnv* env, ErlNifEvent e, enum ErlNifSelectFlags mode, + void* obj, const ErlNifPid* pid, Eterm msg) +{ + return enif_select_x(env, e, mode, obj, pid, msg, NULL); +} + + +int +enif_select_x(ErlNifEnv* env, + ErlNifEvent e, + enum ErlNifSelectFlags mode, + void* obj, + const ErlNifPid* pid, + Eterm msg, + ErlNifEnv* msg_env) { int on; ErtsResource* resource = DATA_TO_RESOURCE(obj); @@ -889,7 +1065,7 @@ enif_select(ErlNifEnv* env, #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS if (!grow_drv_ev_state(fd)) { - if (fd > 0) nif_select_large_fd_error(fd, mode, resource, ref); + if (fd > 0) nif_select_large_fd_error(fd, mode, resource, msg); return INT_MIN | ERL_NIF_SELECT_INVALID_EVENT; } #endif @@ -935,21 +1111,21 @@ enif_select(ErlNifEnv* env, * Changing process and/or ref is ok (I think?). */ if (state->driver.stop.resource != resource) - nif_select_steal(state, ERL_DRV_READ | ERL_DRV_WRITE, resource, ref); + nif_select_steal(state, ERL_DRV_READ | ERL_DRV_WRITE, resource, msg); break; case ERTS_EV_TYPE_DRV_SEL: - nif_select_steal(state, mode, resource, ref); + nif_select_steal(state, mode, resource, msg); break; case ERTS_EV_TYPE_STOP_USE: { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); - print_nif_select_op(dsbufp, fd, mode, resource, ref); + print_nif_select_op(dsbufp, fd, mode, resource, msg); steal_pending_stop_use(dsbufp, ERTS_INVALID_ERL_DRV_PORT, state, mode, on); ASSERT(state->type == ERTS_EV_TYPE_NONE); break; } case ERTS_EV_TYPE_STOP_NIF: { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); - print_nif_select_op(dsbufp, fd, mode, resource, ref); + print_nif_select_op(dsbufp, fd, mode, resource, msg); steal_pending_stop_nif(dsbufp, resource, state, mode, on); if (state->type == ERTS_EV_TYPE_STOP_NIF) { ret = ERL_NIF_SELECT_STOP_SCHEDULED; /* ?? */ @@ -1005,7 +1181,6 @@ enif_select(ErlNifEnv* env, if (on) { const Eterm recipient = pid ? pid->pid : env->proc->common.id; - Uint32* refn; if (!state->driver.nif) state->driver.nif = alloc_nif_select_data(); if (state->type == ERTS_EV_TYPE_NONE) { @@ -1016,28 +1191,13 @@ enif_select(ErlNifEnv* env, ASSERT(state->type == ERTS_EV_TYPE_NIF); ASSERT(state->driver.stop.resource == resource); if (mode & ERL_DRV_READ) { - state->driver.nif->in.pid = recipient; - if (is_immed(ref)) { - state->driver.nif->in.immed = ref; - } else { - ASSERT(is_internal_ref(ref)); - refn = internal_ref_numbers(ref); - state->driver.nif->in.immed = THE_NON_VALUE; - sys_memcpy(state->driver.nif->in.refn, refn, - sizeof(state->driver.nif->in.refn)); - } + prepare_select_msg(&state->driver.nif->in, mode, recipient, + resource, msg, msg_env, am_ready_input); + msg_env = NULL; } if (mode & ERL_DRV_WRITE) { - state->driver.nif->out.pid = recipient; - if (is_immed(ref)) { - state->driver.nif->out.immed = ref; - } else { - ASSERT(is_internal_ref(ref)); - refn = internal_ref_numbers(ref); - state->driver.nif->out.immed = THE_NON_VALUE; - sys_memcpy(state->driver.nif->out.refn, refn, - sizeof(state->driver.nif->out.refn)); - } + prepare_select_msg(&state->driver.nif->out, mode, recipient, + resource, msg, msg_env, am_ready_output); } ret = 0; } @@ -1046,12 +1206,12 @@ enif_select(ErlNifEnv* env, if (state->type == ERTS_EV_TYPE_NIF) { if (mode & ERL_NIF_SELECT_READ && is_not_nil(state->driver.nif->in.pid)) { - state->driver.nif->in.pid = NIL; + clear_select_event(&state->driver.nif->in); ret |= ERL_NIF_SELECT_READ_CANCELLED; } if (mode & ERL_NIF_SELECT_WRITE && is_not_nil(state->driver.nif->out.pid)) { - state->driver.nif->out.pid = NIL; + clear_select_event(&state->driver.nif->out); ret |= ERL_NIF_SELECT_WRITE_CANCELLED; } } @@ -1440,7 +1600,8 @@ iready(Eterm id, ErtsDrvEventState *state) if (erts_port_task_schedule(id, &iotask->task, ERTS_PORT_TASK_INPUT, - (ErlDrvEvent) state->fd) != 0) { + (ErlDrvEvent) state->fd, + state->flags & ERTS_EV_FLAG_IN_SCHEDULER) != 0) { stale_drv_select(id, state, ERL_DRV_READ); } else { DEBUG_PRINT_FD("schedule ready_input(%T, %d)", @@ -1458,7 +1619,8 @@ oready(Eterm id, ErtsDrvEventState *state) if (erts_port_task_schedule(id, &iotask->task, ERTS_PORT_TASK_OUTPUT, - (ErlDrvEvent) state->fd) != 0) { + (ErlDrvEvent) state->fd, + 0) != 0) { stale_drv_select(id, state, ERL_DRV_WRITE); } else { DEBUG_PRINT_FD("schedule ready_output(%T, %d)", state, id, state->fd); @@ -1466,53 +1628,6 @@ oready(Eterm id, ErtsDrvEventState *state) } } -static ERTS_INLINE void -send_event_tuple(struct erts_nif_select_event* e, ErtsResource* resource, - Eterm event_atom) -{ - Process* rp = erts_proc_lookup(e->pid); - ErtsProcLocks rp_locks = 0; - ErtsMessage* mp; - ErlOffHeap* ohp; - ErtsBinary* bin; - Eterm* hp; - Uint hsz; - Eterm resource_term, ref_term, tuple; - - if (!rp) { - return; - } - - bin = ERTS_MAGIC_BIN_FROM_UNALIGNED_DATA(resource); - - /* {select, Resource, Ref, EventAtom} */ - if (is_value(e->immed)) { - hsz = 5 + ERTS_MAGIC_REF_THING_SIZE; - } - else { - hsz = 5 + ERTS_MAGIC_REF_THING_SIZE + ERTS_REF_THING_SIZE; - } - - mp = erts_alloc_message_heap(rp, &rp_locks, hsz, &hp, &ohp); - - resource_term = erts_mk_magic_ref(&hp, ohp, &bin->binary); - if (is_value(e->immed)) { - ASSERT(is_immed(e->immed)); - ref_term = e->immed; - } - else { - write_ref_thing(hp, e->refn[0], e->refn[1], e->refn[2]); - ref_term = make_internal_ref(hp); - hp += ERTS_REF_THING_SIZE; - } - tuple = TUPLE4(hp, am_select, resource_term, ref_term, event_atom); - - erts_queue_message(rp, rp_locks, mp, tuple, am_system); - - if (rp_locks) - erts_proc_unlock(rp, rp_locks); -} - static void bad_fd_in_pollset(ErtsDrvEventState *, Eterm inport, Eterm outport); void @@ -1520,7 +1635,7 @@ erts_check_io_interrupt(ErtsPollThread *psi, int set) { if (psi) { #if ERTS_POLL_USE_FALLBACK - if (psi->ps == get_fallback()) { + if (psi->ps == get_fallback_pollset()) { erts_poll_interrupt_flbk(psi->ps, set); return; } @@ -1530,12 +1645,13 @@ erts_check_io_interrupt(ErtsPollThread *psi, int set) } ErtsPollThread * -erts_create_pollset_thread(int id) { +erts_create_pollset_thread(int id, ErtsThrPrgrData *tpd) { + psiv[id].tpd = tpd; return psiv+id; } void -erts_check_io(ErtsPollThread *psi) +erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time) { int pollres_len; int poll_ret, i; @@ -1550,14 +1666,14 @@ erts_check_io(ErtsPollThread *psi) pollres_len = psi->pollres_len; #if ERTS_POLL_USE_FALLBACK - if (psi->ps == get_fallback()) { + if (psi->ps == get_fallback_pollset()) { - poll_ret = erts_poll_wait_flbk(psi->ps, psi->pollres, &pollres_len); + poll_ret = erts_poll_wait_flbk(psi->ps, psi->pollres, &pollres_len, psi->tpd, timeout_time); } else #endif { - poll_ret = erts_poll_wait(psi->ps, psi->pollres, &pollres_len); + poll_ret = erts_poll_wait(psi->ps, psi->pollres, &pollres_len, psi->tpd, timeout_time); } #ifdef ERTS_ENABLE_LOCK_CHECK @@ -1593,7 +1709,12 @@ erts_check_io(ErtsPollThread *psi) ErtsNifSelectDataState *free_nif = NULL; ErtsSysFdType fd = (ErtsSysFdType) ERTS_POLL_RES_GET_FD(&psi->pollres[i]); ErtsDrvEventState *state; - ErtsPollEvents revents; + ErtsPollEvents revents = ERTS_POLL_RES_GET_EVTS(&psi->pollres[i]); + + /* The fd will be set to -1 if a pollset internal fd was triggered + that was determined to be too expensive to remove from the result. + */ + if (fd == -1) continue; erts_mtx_lock(fd_mtx(fd)); @@ -1604,8 +1725,6 @@ erts_check_io(ErtsPollThread *psi) continue; } - revents = ERTS_POLL_RES_GET_EVTS(&psi->pollres[i]); - DEBUG_PRINT_FD("triggered %s", state, ev2str(revents)); if (revents & ERTS_POLL_EV_ERR) { @@ -1617,25 +1736,39 @@ erts_check_io(ErtsPollThread *psi) */ revents = state->active_events; state->active_events = 0; + + if (state->flags & ERTS_EV_FLAG_IN_SCHEDULER) { + erts_io_control(state, ERTS_POLL_OP_MOD, 0); + state->flags &= ~ERTS_EV_FLAG_IN_SCHEDULER; + } } else { /* Disregard any events that are not active at the moment, for instance this could happen if the driver/nif does select/deselect in rapid succession. */ revents &= state->active_events | ERTS_POLL_EV_NVAL; - state->active_events &= ~revents; - /* Reactivate the poll op if there are still active events */ - if (state->active_events) { - ErtsPollEvents new_events; - DEBUG_PRINT_FD("re-enable %s", state, ev2str(state->active_events)); + if (psi->ps != get_scheduler_pollset(fd) || !ERTS_POLL_USE_SCHEDULER_POLLING) { + ErtsPollEvents reactive_events; + state->active_events &= ~revents; + + reactive_events = state->active_events; + + if (state->flags & ERTS_EV_FLAG_IN_SCHEDULER) + reactive_events &= ~ERTS_POLL_EV_IN; + + /* Reactivate the poll op if there are still active events */ + if (reactive_events) { + ErtsPollEvents new_events; + DEBUG_PRINT_FD("re-enable %s", state, ev2str(reactive_events)); - new_events = erts_io_control(state, ERTS_POLL_OP_MOD, state->active_events); + new_events = erts_io_control(state, ERTS_POLL_OP_MOD, reactive_events); - /* Unable to re-enable the fd, signal all callbacks */ - if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { - revents |= state->active_events; - state->active_events = 0; + /* Unable to re-enable the fd, signal all callbacks */ + if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { + revents |= reactive_events; + state->active_events &= ~reactive_events; + } } } } @@ -1668,7 +1801,6 @@ erts_check_io(ErtsPollThread *psi) case ERTS_EV_TYPE_NIF: { /* Requested via enif_select()... */ struct erts_nif_select_event in = {NIL}; struct erts_nif_select_event out = {NIL}; - ErtsResource* resource = NULL; if (revents & (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) { if (revents & ERTS_POLL_EV_OUT) { @@ -1676,6 +1808,7 @@ erts_check_io(ErtsPollThread *psi) out = state->driver.nif->out; resource = state->driver.stop.resource; state->driver.nif->out.pid = NIL; + state->driver.nif->out.mp = NULL; } } if (revents & ERTS_POLL_EV_IN) { @@ -1683,6 +1816,7 @@ erts_check_io(ErtsPollThread *psi) in = state->driver.nif->in; resource = state->driver.stop.resource; state->driver.nif->in.pid = NIL; + state->driver.nif->in.mp = NULL; } } state->events &= ~revents; @@ -1695,10 +1829,10 @@ erts_check_io(ErtsPollThread *psi) erts_mtx_unlock(fd_mtx(fd)); if (is_not_nil(in.pid)) { - send_event_tuple(&in, resource, am_ready_input); + send_select_msg(&in); } if (is_not_nil(out.pid)) { - send_event_tuple(&out, resource, am_ready_output); + send_select_msg(&out); } continue; } @@ -1711,7 +1845,7 @@ erts_check_io(ErtsPollThread *psi) case ERTS_EV_TYPE_STOP_USE: { #if ERTS_POLL_USE_FALLBACK - ASSERT(psi->ps == get_fallback()); + ASSERT(psi->ps == get_fallback_pollset()); #endif drv_ptr = state->driver.stop.drv_ptr; state->type = ERTS_EV_TYPE_NONE; @@ -2049,12 +2183,17 @@ erts_init_check_io(int *argc, char **argv) for (j=0; j < erts_no_pollsets; j++) pollsetv[j] = erts_poll_create_pollset(j); -#if ERTS_POLL_USE_FALLBACK - flbk_pollset = erts_poll_create_pollset_flbk(-1); + no_poll_threads = erts_no_poll_threads; + + j = -1; + +#if ERTS_POLL_USE_SCHEDULER_POLLING + sched_pollset = erts_poll_create_pollset(j--); + no_poll_threads++; #endif - no_poll_threads = erts_no_poll_threads; #if ERTS_POLL_USE_FALLBACK + flbk_pollset = erts_poll_create_pollset_flbk(j--); no_poll_threads++; #endif @@ -2064,7 +2203,15 @@ erts_init_check_io(int *argc, char **argv) psiv[0].pollres_len = ERTS_CHECK_IO_POLL_RES_LEN; psiv[0].pollres = erts_alloc(ERTS_ALC_T_POLLSET, sizeof(ErtsPollResFd) * ERTS_CHECK_IO_POLL_RES_LEN); - psiv[0].ps = get_fallback(); + psiv[0].ps = get_fallback_pollset(); + psiv++; +#endif + +#if ERTS_POLL_USE_SCHEDULER_POLLING + psiv[0].pollres_len = ERTS_CHECK_IO_POLL_RES_LEN; + psiv[0].pollres = erts_alloc(ERTS_ALC_T_POLLSET, + sizeof(ErtsPollResFd) * ERTS_CHECK_IO_POLL_RES_LEN); + psiv[0].ps = get_scheduler_pollset(0); psiv++; #endif @@ -2121,7 +2268,12 @@ erts_check_io_size(void) int i; #if ERTS_POLL_USE_FALLBACK - erts_poll_info(get_fallback(), &pi); + erts_poll_info(get_fallback_pollset(), &pi); + res += pi.memory_size; +#endif + +#if ERTS_POLL_USE_SCHEDULER_POLLING + erts_poll_info(get_scheduler_pollset(0), &pi); res += pi.memory_size; #endif @@ -2153,14 +2305,22 @@ erts_check_io_info(void *proc) Uint sz, *szp, *hp, **hpp; ErtsPollInfo *piv; Sint i, j = 0, len; - int no_pollsets = erts_no_pollsets + ERTS_POLL_USE_FALLBACK; + int no_pollsets = erts_no_pollsets + ERTS_POLL_USE_FALLBACK + ERTS_POLL_USE_SCHEDULER_POLLING; ERTS_CT_ASSERT(ERTS_POLL_USE_FALLBACK == 0 || ERTS_POLL_USE_FALLBACK == 1); + ERTS_CT_ASSERT(ERTS_POLL_USE_SCHEDULER_POLLING == 0 || ERTS_POLL_USE_SCHEDULER_POLLING == 1); piv = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsPollInfo) * no_pollsets); #if ERTS_POLL_USE_FALLBACK - erts_poll_info_flbk(get_fallback(), &piv[0]); - piv[0].poll_threads = 1; + erts_poll_info_flbk(get_fallback_pollset(), &piv[0]); + piv[0].poll_threads = 0; + piv[0].active_fds = 0; + piv++; +#endif + +#if ERTS_POLL_USE_SCHEDULER_POLLING + erts_poll_info(get_scheduler_pollset(0), &piv[0]); + piv[0].poll_threads = 0; piv[0].active_fds = 0; piv++; #endif @@ -2213,6 +2373,7 @@ erts_check_io_info(void *proc) sz = 0; piv -= ERTS_POLL_USE_FALLBACK; + piv -= ERTS_POLL_USE_SCHEDULER_POLLING; bld_it: @@ -2317,15 +2478,7 @@ print_events(erts_dsprintf_buf_t *dsbufp, ErtsPollEvents ev) static ERTS_INLINE void print_flags(erts_dsprintf_buf_t *dsbufp, EventStateFlags f) { - const char* delim = ""; - if(f & ERTS_EV_FLAG_USED) { - erts_dsprintf(dsbufp, "%s","USED"); - delim = "|"; - } - if(f & ERTS_EV_FLAG_FALLBACK) { - erts_dsprintf(dsbufp, "%s%s", delim, "FLBK"); - delim = "|"; - } + erts_dsprintf(dsbufp, "%s", flag2str(f)); } #ifdef DEBUG_PRINT_MODE @@ -2673,13 +2826,26 @@ erts_check_io_debug(ErtsCheckIoDebugInfo *ciodip) #if ERTS_POLL_USE_FALLBACK erts_dsprintf(dsbufp, "--- fds in flbk pollset ---------------------------------\n"); - erts_poll_get_selected_events_flbk(get_fallback(), counters.epep, + erts_poll_get_selected_events_flbk(get_fallback_pollset(), counters.epep, drv_ev_state.max_fds); for (fd = 0; fd < len; fd++) { if (drv_ev_state.v[fd].flags & ERTS_EV_FLAG_FALLBACK) doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters, dsbufp); } #endif +#if ERTS_POLL_USE_SCHEDULER_POLLING + erts_dsprintf(dsbufp, "--- fds in scheduler pollset ----------------------------\n"); + erts_poll_get_selected_events(get_scheduler_pollset(0), counters.epep, + drv_ev_state.max_fds); + for (fd = 0; fd < len; fd++) { + if (drv_ev_state.v[fd].flags & ERTS_EV_FLAG_SCHEDULER) { + if (drv_ev_state.v[fd].events && drv_ev_state.v[fd].events != ERTS_POLL_EV_NONE) + counters.epep[fd] &= ~ERTS_POLL_EV_OUT; + doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters, dsbufp); + } + } +#endif + erts_dsprintf(dsbufp, "--- fds in pollset --------------------------------------\n"); for (i = 0; i < erts_no_pollsets; i++) { @@ -2688,8 +2854,15 @@ erts_check_io_debug(ErtsCheckIoDebugInfo *ciodip) drv_ev_state.max_fds); for (fd = 0; fd < len; fd++) { if (!(drv_ev_state.v[fd].flags & ERTS_EV_FLAG_FALLBACK) - && get_pollset_id(fd) == i) + && get_pollset_id(fd) == i) { + if (counters.epep[fd] != ERTS_POLL_EV_NONE && + drv_ev_state.v[fd].flags & ERTS_EV_FLAG_IN_SCHEDULER) { + /* We add the in flag if it is enabled in the scheduler pollset + and get_selected_events works on the platform */ + counters.epep[fd] |= ERTS_POLL_EV_IN; + } doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters, dsbufp); + } } } for (fd = len ; fd < drv_ev_state.max_fds; fd++) { @@ -2736,7 +2909,7 @@ void erts_lcnt_update_cio_locks(int enable) { #endif #if ERTS_POLL_USE_FALLBACK - erts_lcnt_enable_pollset_lock_count_flbk(get_fallback(), enable); + erts_lcnt_enable_pollset_lock_count_flbk(get_fallback_pollset(), enable); #endif for (i = 0; i < erts_no_pollsets; i++) |