From 090f9543f66cc0c4ea4d1ca63902c300b103f271 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Tue, 9 Oct 2018 16:11:43 +0200 Subject: erts: Pass thread progress data where possible The poll thread does a lot of waking up and then going back to sleep. A large part of the waking up is managing thread progress and a large part of that was using thread specific data to get the thread progress data pointer. With this refactor the tpd is passed to each of the functions which greatly decreases the number of ethr_get_tsd calls which in turn halves the CPU usage of the poller thread in certain scenarios. --- erts/emulator/sys/common/erl_check_io.c | 8 +++++--- erts/emulator/sys/common/erl_check_io.h | 5 ++++- erts/emulator/sys/common/erl_poll.c | 9 +++++---- erts/emulator/sys/common/erl_poll_api.h | 4 +++- erts/emulator/sys/win32/erl_poll.c | 7 ++++--- 5 files changed, 21 insertions(+), 12 deletions(-) (limited to 'erts/emulator/sys') diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c index 9f115706dc..748555165f 100644 --- a/erts/emulator/sys/common/erl_check_io.c +++ b/erts/emulator/sys/common/erl_check_io.c @@ -105,6 +105,7 @@ typedef struct erts_poll_thread { ErtsPollSet *ps; ErtsPollResFd *pollres; + ErtsThrPrgrData *tpd; int pollres_len; } ErtsPollThread; @@ -1516,7 +1517,8 @@ erts_check_io_interrupt(ErtsPollThread *psi, int set) } ErtsPollThread * -erts_create_pollset_thread(int id) { +erts_create_pollset_thread(int id, ErtsThrPrgrData *tpd) { + psiv[id].tpd = tpd; return psiv+id; } @@ -1538,12 +1540,12 @@ erts_check_io(ErtsPollThread *psi) #if ERTS_POLL_USE_FALLBACK if (psi->ps == get_fallback()) { - poll_ret = erts_poll_wait_flbk(psi->ps, psi->pollres, &pollres_len); + poll_ret = erts_poll_wait_flbk(psi->ps, psi->pollres, &pollres_len, psi->tpd); } else #endif { - poll_ret = erts_poll_wait(psi->ps, psi->pollres, &pollres_len); + poll_ret = erts_poll_wait(psi->ps, psi->pollres, &pollres_len, psi->tpd); } #ifdef ERTS_ENABLE_LOCK_CHECK diff --git a/erts/emulator/sys/common/erl_check_io.h b/erts/emulator/sys/common/erl_check_io.h index 443ef1264c..16aba8a5f3 100644 --- a/erts/emulator/sys/common/erl_check_io.h +++ b/erts/emulator/sys/common/erl_check_io.h @@ -90,8 +90,11 @@ void erts_check_io_interrupt(struct erts_poll_thread *pt, int set); /** * Create a new poll thread structure that is associated with the number no. * It is the callers responsibility that no is unique. + * + * @param no the id of the pollset thread, -2 = aux thread, -1 = scheduler + * @param tpd the thread progress data of the pollset thread */ -struct erts_poll_thread* erts_create_pollset_thread(int no); +struct erts_poll_thread* erts_create_pollset_thread(int no, ErtsThrPrgrData *tpd); #ifdef ERTS_ENABLE_LOCK_COUNT /** * Toggle lock counting on all check io locks diff --git a/erts/emulator/sys/common/erl_poll.c b/erts/emulator/sys/common/erl_poll.c index b4d1575ee5..5a5874ddfa 100644 --- a/erts/emulator/sys/common/erl_poll.c +++ b/erts/emulator/sys/common/erl_poll.c @@ -75,6 +75,7 @@ # define WANT_NONBLOCKING #endif +#include "erl_thr_progress.h" #include "erl_poll.h" #if ERTS_POLL_USE_KQUEUE # include @@ -95,7 +96,6 @@ # include # endif #endif -#include "erl_thr_progress.h" #include "erl_driver.h" #include "erl_alloc.h" #include "erl_msacc.h" @@ -1629,7 +1629,8 @@ check_fd_events(ErtsPollSet *ps, ErtsPollResFd pr[], int do_wait, int max_res) int ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, ErtsPollResFd pr[], - int *len) + int *len, + ErtsThrPrgrData *tpd) { int res, no_fds, used_fds = 0; int ebadf = 0; @@ -1659,7 +1660,7 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, DEBUG_PRINT_WAIT("Entering %s(), do_wait=%d", ps, __FUNCTION__, do_wait); if (do_wait) { - erts_thr_progress_prepare_wait(NULL); + erts_thr_progress_prepare_wait(tpd); ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_SLEEP); } @@ -1703,7 +1704,7 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, } if (do_wait) { - erts_thr_progress_finalize_wait(NULL); + erts_thr_progress_finalize_wait(tpd); ERTS_MSACC_UPDATE_CACHE(); ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_CHECK_IO); } diff --git a/erts/emulator/sys/common/erl_poll_api.h b/erts/emulator/sys/common/erl_poll_api.h index 1170a549b9..f35f64a9f3 100644 --- a/erts/emulator/sys/common/erl_poll_api.h +++ b/erts/emulator/sys/common/erl_poll_api.h @@ -72,11 +72,13 @@ ErtsPollEvents ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet *ps, * @param res an array of fd results that the ready fds are put in. * @param[in] length the length of the res array * @param[out] length the number of ready events returned in res + * @param tpd the thread progress data to note sleep state in * @return 0 on success, else the ERRNO of the error that happened. */ int ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, ErtsPollResFd res[], - int *length); + int *length, + ErtsThrPrgrData *tpd); /** * Interrupt the thread waiting in the pollset. This function should be called * with set = 0 before any thread calls erts_poll_wait in order to clear any diff --git a/erts/emulator/sys/win32/erl_poll.c b/erts/emulator/sys/win32/erl_poll.c index 39bb4d515e..5d832f4d34 100644 --- a/erts/emulator/sys/win32/erl_poll.c +++ b/erts/emulator/sys/win32/erl_poll.c @@ -1017,7 +1017,8 @@ ErtsPollEvents erts_poll_control(ErtsPollSet *ps, int erts_poll_wait(ErtsPollSet *ps, ErtsPollResFd pr[], - int *len) + int *len, + ErtsThrPrgrData *tpd) { int no_fds; DWORD timeout = INFINITE; @@ -1056,10 +1057,10 @@ int erts_poll_wait(ErtsPollSet *ps, HARDDEBUGF(("Start waiting %d [%d]",num_h, (int) timeout)); ERTS_POLLSET_UNLOCK(ps); - erts_thr_progress_prepare_wait(NULL); + erts_thr_progress_prepare_wait(tpd); ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_SLEEP); handle = WaitForMultipleObjects(num_h, harr, FALSE, timeout); - erts_thr_progress_finalize_wait(NULL); + erts_thr_progress_finalize_wait(tpd); ERTS_MSACC_POP_STATE(); ERTS_POLLSET_LOCK(ps); HARDDEBUGF(("Stop waiting %d [%d]",num_h, (int) timeout)); -- cgit v1.2.3 From 19e7675913a0c244231344d4d40db447a0bb7ef1 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 12 Oct 2018 10:17:18 +0200 Subject: erts: Add erts_io_notify_port_task_executed to check_io msacc state OTP-15450 --- erts/emulator/sys/common/erl_check_io.c | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'erts/emulator/sys') diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c index 748555165f..1444cee805 100644 --- a/erts/emulator/sys/common/erl_check_io.c +++ b/erts/emulator/sys/common/erl_check_io.c @@ -431,6 +431,8 @@ erts_io_notify_port_task_executed(ErtsPortTaskType type, ErtsDrvSelectDataState *free_select = NULL; ErtsNifSelectDataState *free_nif = NULL; + ERTS_MSACC_PUSH_AND_SET_STATE_M_X(ERTS_MSACC_STATE_CHECK_IO); + erts_mtx_lock(mtx); state = get_drv_ev_state(fd); @@ -486,6 +488,8 @@ erts_io_notify_port_task_executed(ErtsPortTaskType type, free_drv_select_data(free_select); if (free_nif) free_nif_select_data(free_nif); + + ERTS_MSACC_POP_STATE_M_X(); } static ERTS_INLINE void -- cgit v1.2.3 From c6498571109b524fb319300e1b177b942e556f1b Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 12 Oct 2018 18:16:17 +0200 Subject: erts: Move fds with active true behaviour to own pollset At start of the VM a poll-set that the schedulers will check is created where fds that have triggered many (at the moment, many means 10) times without being deselected inbetween. In this scheduler specific poll-set fds do not use ONESHOT, which means that the number of syscalls goes down dramatically for such fds. This pollset is introduced in order to handle fds that are used by the erlang distribution and that never change their state from {active, true}. This pollset only handles ready_input events, ready_output is still handled by the poll threads. During overload, polling the scheduler poll-set is done on a 10ms timer. --- erts/emulator/sys/common/erl_check_io.c | 310 ++++++++++++++------ erts/emulator/sys/common/erl_check_io.h | 16 +- erts/emulator/sys/common/erl_poll.c | 504 ++++++++++++++++++++++++-------- erts/emulator/sys/common/erl_poll.h | 14 +- erts/emulator/sys/common/erl_poll_api.h | 4 +- erts/emulator/sys/win32/erl_poll.c | 5 +- 6 files changed, 617 insertions(+), 236 deletions(-) (limited to 'erts/emulator/sys') diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c index 1444cee805..c681fa481f 100644 --- a/erts/emulator/sys/common/erl_check_io.c +++ b/erts/emulator/sys/common/erl_check_io.c @@ -46,11 +46,11 @@ #if 0 #define DEBUG_PRINT(FMT, ...) erts_printf(FMT "\r\n", ##__VA_ARGS__) #define DEBUG_PRINT_FD(FMT, STATE, ...) \ - DEBUG_PRINT("%d: " FMT " (ev=%s, ac=%s, flg=%d)", \ + DEBUG_PRINT("%d: " FMT " (ev=%s, ac=%s, flg=%s)", \ (STATE) ? (STATE)->fd : (ErtsSysFdType)-1, ##__VA_ARGS__, \ ev2str((STATE) ? (STATE)->events : ERTS_POLL_EV_NONE), \ ev2str((STATE) ? (STATE)->active_events : ERTS_POLL_EV_NONE), \ - (STATE) ? (STATE)->flags : ERTS_EV_FLAG_CLEAR) + (STATE) ? flag2str((STATE)->flags) : ERTS_EV_FLAG_CLEAR) #define DEBUG_PRINT_MODE #else #define DEBUG_PRINT(...) @@ -76,22 +76,40 @@ typedef enum { typedef enum { ERTS_EV_FLAG_CLEAR = 0, ERTS_EV_FLAG_USED = 1, /* ERL_DRV_USE has been turned on */ -#ifdef ERTS_ENABLE_KERNEL_POLL - ERTS_EV_FLAG_FALLBACK = 2, /* Set when kernel poll rejected fd +#if ERTS_POLL_USE_SCHEDULER_POLLING + ERTS_EV_FLAG_SCHEDULER = 2, /* Set when the fd has been migrated + to scheduler pollset */ + ERTS_EV_FLAG_IN_SCHEDULER = 4, /* Set when the fd is currently in + scheduler pollset */ +#else + ERTS_EV_FLAG_SCHEDULER = ERTS_EV_FLAG_CLEAR, + ERTS_EV_FLAG_IN_SCHEDULER = ERTS_EV_FLAG_CLEAR, +#endif +#ifdef ERTS_POLL_USE_FALLBACK + ERTS_EV_FLAG_FALLBACK = 8, /* Set when kernel poll rejected fd and it was put in the nkp version */ #else ERTS_EV_FLAG_FALLBACK = ERTS_EV_FLAG_CLEAR, #endif /* Combinations */ - ERTS_EV_FLAG_USED_FALLBACK = ERTS_EV_FLAG_USED | ERTS_EV_FLAG_FALLBACK + ERTS_EV_FLAG_USED_FALLBACK = ERTS_EV_FLAG_USED | ERTS_EV_FLAG_FALLBACK, + ERTS_EV_FLAG_USED_SCHEDULER = ERTS_EV_FLAG_USED | ERTS_EV_FLAG_SCHEDULER, + ERTS_EV_FLAG_USED_IN_SCHEDULER = ERTS_EV_FLAG_USED | ERTS_EV_FLAG_SCHEDULER | ERTS_EV_FLAG_IN_SCHEDULER, + ERTS_EV_FLAG_UNUSED_SCHEDULER = ERTS_EV_FLAG_SCHEDULER, + ERTS_EV_FLAG_UNUSED_IN_SCHEDULER = ERTS_EV_FLAG_SCHEDULER | ERTS_EV_FLAG_IN_SCHEDULER } EventStateFlags; #define flag2str(flags) \ ((flags) == ERTS_EV_FLAG_CLEAR ? "CLEAR" : \ ((flags) == ERTS_EV_FLAG_USED ? "USED" : \ ((flags) == ERTS_EV_FLAG_FALLBACK ? "FLBK" : \ - ((flags) == ERTS_EV_FLAG_USED_FALLBACK ? "USED|FLBK" : "ERROR")))) + ((flags) == ERTS_EV_FLAG_USED_FALLBACK ? "USED|FLBK" : \ + ((flags) == ERTS_EV_FLAG_USED_SCHEDULER ? "USED|SCHD" : \ + ((flags) == ERTS_EV_FLAG_UNUSED_SCHEDULER ? "SCHD" : \ + ((flags) == ERTS_EV_FLAG_USED_IN_SCHEDULER ? "USED|IN_SCHD" : \ + ((flags) == ERTS_EV_FLAG_UNUSED_IN_SCHEDULER ? "IN_SCHD" : \ + "ERROR")))))))) /* How many events that can be handled at once by one erts_poll_wait call */ #define ERTS_CHECK_IO_POLL_RES_LEN 512 @@ -113,10 +131,13 @@ typedef struct erts_poll_thread * Which pollset to use is determined by hashing the fd. */ static ErtsPollSet **pollsetv; +static ErtsPollThread *psiv; #if ERTS_POLL_USE_FALLBACK static ErtsPollSet *flbk_pollset; #endif -static ErtsPollThread *psiv; +#if ERTS_POLL_USE_SCHEDULER_POLLING +static ErtsPollSet *sched_pollset; +#endif typedef struct { #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS @@ -131,10 +152,12 @@ typedef struct { ErtsResource* resource; /* ERTS_EV_TYPE_STOP_NIF */ } stop; } driver; - ErtsPollEvents events; /* The events that have been selected upon */ + ErtsPollEvents events; /* The events that have been selected upon */ ErtsPollEvents active_events; /* The events currently active in the pollset */ EventStateType type; EventStateFlags flags; + int count; /* Number of times this fd has triggered + without being deselected. */ } ErtsDrvEventState; struct drv_ev_state_shared { @@ -371,12 +394,22 @@ get_pollset(ErtsSysFdType fd) #if ERTS_POLL_USE_FALLBACK static ERTS_INLINE ErtsPollSet * -get_fallback(void) +get_fallback_pollset(void) { return flbk_pollset; } #endif +static ERTS_INLINE ErtsPollSet * +get_scheduler_pollset(ErtsSysFdType fd) +{ +#if ERTS_POLL_USE_SCHEDULER_POLLING + return sched_pollset; +#else + return get_pollset(fd); +#endif +} + /* * Place a fd within a pollset. This will automatically use * the fallback ps if needed. @@ -392,18 +425,27 @@ erts_io_control_wakeup(ErtsDrvEventState *state, ErtsPollOp op, ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd))); if (!(flags & ERTS_EV_FLAG_FALLBACK)) { - res = erts_poll_control(get_pollset(fd), fd, op, pe, wake_poller); + + if (op == ERTS_POLL_OP_DEL && (flags & ERTS_EV_FLAG_SCHEDULER)) { + erts_poll_control(get_scheduler_pollset(fd), fd, op, pe, wake_poller); + flags &= ~ERTS_EV_FLAG_IN_SCHEDULER; + } + if (!(flags & ERTS_EV_FLAG_IN_SCHEDULER) || (pe & ERTS_POLL_EV_OUT)) { + res = erts_poll_control(get_pollset(fd), fd, op, pe, wake_poller); + } else { + res = erts_poll_control(get_scheduler_pollset(fd), fd, op, pe, wake_poller); + } #if ERTS_POLL_USE_FALLBACK if (op == ERTS_POLL_OP_ADD && res == ERTS_POLL_EV_NVAL) { /* When an add fails with NVAL, the poll/kevent operation could not put that fd in the pollset, so we instead put it into a fallback pollset */ state->flags |= ERTS_EV_FLAG_FALLBACK; - res = erts_poll_control_flbk(get_fallback(), fd, op, pe, wake_poller); + res = erts_poll_control_flbk(get_fallback_pollset(), fd, op, pe, wake_poller); } } else { ASSERT(op != ERTS_POLL_OP_ADD); - res = erts_poll_control_flbk(get_fallback(), fd, op, pe, wake_poller); + res = erts_poll_control_flbk(get_fallback_pollset(), fd, op, pe, wake_poller); #endif } @@ -426,7 +468,8 @@ erts_io_notify_port_task_executed(ErtsPortTaskType type, ErtsIoTask *itp = ErtsContainerStruct(pthp, ErtsIoTask, task); ErtsSysFdType fd = itp->fd; erts_mtx_t *mtx = fd_mtx(fd); - int active_events; + ErtsPollOp op = ERTS_POLL_OP_MOD; + int active_events, new_events = 0; ErtsDrvEventState *state; ErtsDrvSelectDataState *free_select = NULL; ErtsNifSelectDataState *free_nif = NULL; @@ -436,51 +479,66 @@ erts_io_notify_port_task_executed(ErtsPortTaskType type, erts_mtx_lock(mtx); state = get_drv_ev_state(fd); + reset_handle(pthp); + active_events = state->active_events; - switch (type) { - case ERTS_PORT_TASK_INPUT: + if (!(state->flags & ERTS_EV_FLAG_IN_SCHEDULER) || type == ERTS_PORT_TASK_OUTPUT) { + switch (type) { + case ERTS_PORT_TASK_INPUT: + + DEBUG_PRINT_FD("executed ready_input", state); + + ASSERT(!(state->active_events & ERTS_POLL_EV_IN)); + if (state->events & ERTS_POLL_EV_IN) { + active_events |= ERTS_POLL_EV_IN; + if (state->count > 10 && ERTS_POLL_USE_SCHEDULER_POLLING) { + if (!(state->flags & ERTS_EV_FLAG_SCHEDULER)) + op = ERTS_POLL_OP_ADD; + state->flags |= ERTS_EV_FLAG_IN_SCHEDULER|ERTS_EV_FLAG_SCHEDULER; + new_events = ERTS_POLL_EV_IN; + DEBUG_PRINT_FD("moving to scheduler ps", state); + } else + new_events = active_events; + if (!(state->flags & ERTS_EV_FLAG_FALLBACK) && ERTS_POLL_USE_SCHEDULER_POLLING) + state->count++; + } + break; + case ERTS_PORT_TASK_OUTPUT: - DEBUG_PRINT_FD("executed ready_input", state); + DEBUG_PRINT_FD("executed ready_output", state); - ASSERT(!(state->active_events & ERTS_POLL_EV_IN)); - if (state->events & ERTS_POLL_EV_IN) - active_events |= ERTS_POLL_EV_IN; - break; - case ERTS_PORT_TASK_OUTPUT: + ASSERT(!(state->active_events & ERTS_POLL_EV_OUT)); + if (state->events & ERTS_POLL_EV_OUT) { + active_events |= ERTS_POLL_EV_OUT; + if (state->flags & ERTS_EV_FLAG_IN_SCHEDULER && active_events & ERTS_POLL_EV_IN) + new_events = ERTS_POLL_EV_OUT; + else + new_events = active_events; + } + break; + default: + erts_exit(ERTS_ABORT_EXIT, "Invalid IO port task type"); + break; + } - DEBUG_PRINT_FD("executed ready_output", state); + if (state->active_events != active_events && new_events) { + state->active_events = active_events; + new_events = erts_io_control(state, op, new_events); + } - ASSERT(!(state->active_events & ERTS_POLL_EV_OUT)); - if (state->events & ERTS_POLL_EV_OUT) - active_events |= ERTS_POLL_EV_OUT; - break; - default: - erts_exit(ERTS_ABORT_EXIT, "Invalid IO port task type"); - break; + /* We were unable to re-insert the fd into the pollset, signal the callback. */ + if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { + if (state->active_events & ERTS_POLL_EV_IN) + iready(state->driver.select->inport, state); + if (state->active_events & ERTS_POLL_EV_OUT) + oready(state->driver.select->outport, state); + state->active_events = 0; + } } - reset_handle(pthp); - - if (active_events) { - /* This is not needed if active_events has not changed */ - if (state->active_events != active_events) { - ErtsPollEvents new_events; - state->active_events = active_events; - new_events = erts_io_control(state, ERTS_POLL_OP_MOD, active_events); - - /* We were unable to re-insert the fd into the pollset, signal the callback. */ - if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { - if (active_events & ERTS_POLL_EV_IN) - iready(state->driver.select->inport, state); - if (active_events & ERTS_POLL_EV_OUT) - oready(state->driver.select->outport, state); - state->active_events = 0; - } - } - } else { + if (!active_events) check_fd_cleanup(state, &free_select, &free_nif); - } erts_mtx_unlock(mtx); @@ -760,11 +818,22 @@ driver_select(ErlDrvPort ix, ErlDrvEvent e, int mode, int on) if (old_events == 0 && !(state->flags & ERTS_EV_FLAG_USED)) { ctl_op = ERTS_POLL_OP_ADD; } + new_events = state->active_events; + if (state->flags & ERTS_EV_FLAG_IN_SCHEDULER) + new_events &= ~ERTS_POLL_EV_IN; } else { ctl_events &= old_events; state->events &= ~ctl_events; state->active_events &= ~ctl_events; + new_events = state->active_events; + + if (ctl_events & ERTS_POLL_EV_IN) { + state->count = 0; + if (state->flags & ERTS_EV_FLAG_IN_SCHEDULER) { + new_events = 0; + } + } if (!state->events) { if (!(state->flags & ERTS_EV_FLAG_USED) || mode & ERL_DRV_USE) @@ -775,7 +844,7 @@ driver_select(ErlDrvPort ix, ErlDrvEvent e, int mode, int on) if (ctl_events || ctl_op == ERTS_POLL_OP_DEL) { new_events = erts_io_control_wakeup(state, ctl_op, - state->active_events, + new_events, &wake_poller); ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL || state->type == ERTS_EV_TYPE_NONE); @@ -807,6 +876,7 @@ driver_select(ErlDrvPort ix, ErlDrvEvent e, int mode, int on) if (ctl_events & ERTS_POLL_EV_IN) { abort_tasks(state, ERL_DRV_READ); state->driver.select->inport = NIL; + state->flags &= ~ERTS_EV_FLAG_IN_SCHEDULER; } if (ctl_events & ERTS_POLL_EV_OUT) { abort_tasks(state, ERL_DRV_WRITE); @@ -815,6 +885,8 @@ driver_select(ErlDrvPort ix, ErlDrvEvent e, int mode, int on) if (state->events == 0) { if ((mode & ERL_DRV_USE) || !(state->flags & ERTS_EV_FLAG_USED)) { state->type = ERTS_EV_TYPE_NONE; + if (state->flags & ERTS_EV_FLAG_SCHEDULER) + erts_atomic32_read_bor_nob(&prt->state, ERTS_PORT_SFLG_CHECK_FD_CLEANUP); state->flags = 0; } /*else keep it, as fd will probably be selected upon again */ @@ -1431,7 +1503,8 @@ iready(Eterm id, ErtsDrvEventState *state) if (erts_port_task_schedule(id, &iotask->task, ERTS_PORT_TASK_INPUT, - (ErlDrvEvent) state->fd) != 0) { + (ErlDrvEvent) state->fd, + state->flags & ERTS_EV_FLAG_IN_SCHEDULER) != 0) { stale_drv_select(id, state, ERL_DRV_READ); } else { DEBUG_PRINT_FD("schedule ready_input(%T, %d)", @@ -1449,7 +1522,8 @@ oready(Eterm id, ErtsDrvEventState *state) if (erts_port_task_schedule(id, &iotask->task, ERTS_PORT_TASK_OUTPUT, - (ErlDrvEvent) state->fd) != 0) { + (ErlDrvEvent) state->fd, + 0) != 0) { stale_drv_select(id, state, ERL_DRV_WRITE); } else { DEBUG_PRINT_FD("schedule ready_output(%T, %d)", state, id, state->fd); @@ -1511,7 +1585,7 @@ erts_check_io_interrupt(ErtsPollThread *psi, int set) { if (psi) { #if ERTS_POLL_USE_FALLBACK - if (psi->ps == get_fallback()) { + if (psi->ps == get_fallback_pollset()) { erts_poll_interrupt_flbk(psi->ps, set); return; } @@ -1527,7 +1601,7 @@ erts_create_pollset_thread(int id, ErtsThrPrgrData *tpd) { } void -erts_check_io(ErtsPollThread *psi) +erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time) { int pollres_len; int poll_ret, i; @@ -1542,14 +1616,14 @@ erts_check_io(ErtsPollThread *psi) pollres_len = psi->pollres_len; #if ERTS_POLL_USE_FALLBACK - if (psi->ps == get_fallback()) { + if (psi->ps == get_fallback_pollset()) { - poll_ret = erts_poll_wait_flbk(psi->ps, psi->pollres, &pollres_len, psi->tpd); + poll_ret = erts_poll_wait_flbk(psi->ps, psi->pollres, &pollres_len, psi->tpd, timeout_time); } else #endif { - poll_ret = erts_poll_wait(psi->ps, psi->pollres, &pollres_len, psi->tpd); + poll_ret = erts_poll_wait(psi->ps, psi->pollres, &pollres_len, psi->tpd, timeout_time); } #ifdef ERTS_ENABLE_LOCK_CHECK @@ -1585,7 +1659,12 @@ erts_check_io(ErtsPollThread *psi) ErtsNifSelectDataState *free_nif = NULL; ErtsSysFdType fd = (ErtsSysFdType) ERTS_POLL_RES_GET_FD(&psi->pollres[i]); ErtsDrvEventState *state; - ErtsPollEvents revents; + ErtsPollEvents revents = ERTS_POLL_RES_GET_EVTS(&psi->pollres[i]); + + /* The fd will be set to -1 if a pollset internal fd was triggered + that was determined to be too expensive to remove from the result. + */ + if (fd == -1) continue; erts_mtx_lock(fd_mtx(fd)); @@ -1596,8 +1675,6 @@ erts_check_io(ErtsPollThread *psi) continue; } - revents = ERTS_POLL_RES_GET_EVTS(&psi->pollres[i]); - DEBUG_PRINT_FD("triggered %s", state, ev2str(revents)); if (revents & ERTS_POLL_EV_ERR) { @@ -1609,25 +1686,39 @@ erts_check_io(ErtsPollThread *psi) */ revents = state->active_events; state->active_events = 0; + + if (state->flags & ERTS_EV_FLAG_IN_SCHEDULER) { + erts_io_control(state, ERTS_POLL_OP_MOD, 0); + state->flags &= ~ERTS_EV_FLAG_IN_SCHEDULER; + } } else { /* Disregard any events that are not active at the moment, for instance this could happen if the driver/nif does select/deselect in rapid succession. */ revents &= state->active_events | ERTS_POLL_EV_NVAL; - state->active_events &= ~revents; - /* Reactivate the poll op if there are still active events */ - if (state->active_events) { - ErtsPollEvents new_events; - DEBUG_PRINT_FD("re-enable %s", state, ev2str(state->active_events)); + if (psi->ps != get_scheduler_pollset(fd) || !ERTS_POLL_USE_SCHEDULER_POLLING) { + ErtsPollEvents reactive_events; + state->active_events &= ~revents; + + reactive_events = state->active_events; - new_events = erts_io_control(state, ERTS_POLL_OP_MOD, state->active_events); + if (state->flags & ERTS_EV_FLAG_IN_SCHEDULER) + reactive_events &= ~ERTS_POLL_EV_IN; - /* Unable to re-enable the fd, signal all callbacks */ - if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { - revents |= state->active_events; - state->active_events = 0; + /* Reactivate the poll op if there are still active events */ + if (reactive_events) { + ErtsPollEvents new_events; + DEBUG_PRINT_FD("re-enable %s", state, ev2str(reactive_events)); + + new_events = erts_io_control(state, ERTS_POLL_OP_MOD, reactive_events); + + /* Unable to re-enable the fd, signal all callbacks */ + if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { + revents |= reactive_events; + state->active_events &= ~reactive_events; + } } } } @@ -1703,7 +1794,7 @@ erts_check_io(ErtsPollThread *psi) case ERTS_EV_TYPE_STOP_USE: { #if ERTS_POLL_USE_FALLBACK - ASSERT(psi->ps == get_fallback()); + ASSERT(psi->ps == get_fallback_pollset()); #endif drv_ptr = state->driver.stop.drv_ptr; state->type = ERTS_EV_TYPE_NONE; @@ -2041,12 +2132,17 @@ erts_init_check_io(int *argc, char **argv) for (j=0; j < erts_no_pollsets; j++) pollsetv[j] = erts_poll_create_pollset(j); -#if ERTS_POLL_USE_FALLBACK - flbk_pollset = erts_poll_create_pollset_flbk(-1); + no_poll_threads = erts_no_poll_threads; + + j = -1; + +#if ERTS_POLL_USE_SCHEDULER_POLLING + sched_pollset = erts_poll_create_pollset(j--); + no_poll_threads++; #endif - no_poll_threads = erts_no_poll_threads; #if ERTS_POLL_USE_FALLBACK + flbk_pollset = erts_poll_create_pollset_flbk(j--); no_poll_threads++; #endif @@ -2056,7 +2152,15 @@ erts_init_check_io(int *argc, char **argv) psiv[0].pollres_len = ERTS_CHECK_IO_POLL_RES_LEN; psiv[0].pollres = erts_alloc(ERTS_ALC_T_POLLSET, sizeof(ErtsPollResFd) * ERTS_CHECK_IO_POLL_RES_LEN); - psiv[0].ps = get_fallback(); + psiv[0].ps = get_fallback_pollset(); + psiv++; +#endif + +#if ERTS_POLL_USE_SCHEDULER_POLLING + psiv[0].pollres_len = ERTS_CHECK_IO_POLL_RES_LEN; + psiv[0].pollres = erts_alloc(ERTS_ALC_T_POLLSET, + sizeof(ErtsPollResFd) * ERTS_CHECK_IO_POLL_RES_LEN); + psiv[0].ps = get_scheduler_pollset(0); psiv++; #endif @@ -2113,7 +2217,12 @@ erts_check_io_size(void) int i; #if ERTS_POLL_USE_FALLBACK - erts_poll_info(get_fallback(), &pi); + erts_poll_info(get_fallback_pollset(), &pi); + res += pi.memory_size; +#endif + +#if ERTS_POLL_USE_SCHEDULER_POLLING + erts_poll_info(get_scheduler_pollset(0), &pi); res += pi.memory_size; #endif @@ -2145,13 +2254,21 @@ erts_check_io_info(void *proc) Uint sz, *szp, *hp, **hpp; ErtsPollInfo *piv; Sint i, j = 0, len; - int no_pollsets = erts_no_pollsets + ERTS_POLL_USE_FALLBACK; + int no_pollsets = erts_no_pollsets + ERTS_POLL_USE_FALLBACK + ERTS_POLL_USE_SCHEDULER_POLLING; ERTS_CT_ASSERT(ERTS_POLL_USE_FALLBACK == 0 || ERTS_POLL_USE_FALLBACK == 1); + ERTS_CT_ASSERT(ERTS_POLL_USE_SCHEDULER_POLLING == 0 || ERTS_POLL_USE_SCHEDULER_POLLING == 1); piv = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsPollInfo) * no_pollsets); #if ERTS_POLL_USE_FALLBACK - erts_poll_info_flbk(get_fallback(), &piv[0]); + erts_poll_info_flbk(get_fallback_pollset(), &piv[0]); + piv[0].poll_threads = 1; + piv[0].active_fds = 0; + piv++; +#endif + +#if ERTS_POLL_USE_SCHEDULER_POLLING + erts_poll_info(get_scheduler_pollset(0), &piv[0]); piv[0].poll_threads = 1; piv[0].active_fds = 0; piv++; @@ -2205,6 +2322,7 @@ erts_check_io_info(void *proc) sz = 0; piv -= ERTS_POLL_USE_FALLBACK; + piv -= ERTS_POLL_USE_SCHEDULER_POLLING; bld_it: @@ -2309,15 +2427,7 @@ print_events(erts_dsprintf_buf_t *dsbufp, ErtsPollEvents ev) static ERTS_INLINE void print_flags(erts_dsprintf_buf_t *dsbufp, EventStateFlags f) { - const char* delim = ""; - if(f & ERTS_EV_FLAG_USED) { - erts_dsprintf(dsbufp, "%s","USED"); - delim = "|"; - } - if(f & ERTS_EV_FLAG_FALLBACK) { - erts_dsprintf(dsbufp, "%s%s", delim, "FLBK"); - delim = "|"; - } + erts_dsprintf(dsbufp, "%s", flag2str(f)); } #ifdef DEBUG_PRINT_MODE @@ -2659,13 +2769,26 @@ erts_check_io_debug(ErtsCheckIoDebugInfo *ciodip) #if ERTS_POLL_USE_FALLBACK erts_dsprintf(dsbufp, "--- fds in flbk pollset ---------------------------------\n"); - erts_poll_get_selected_events_flbk(get_fallback(), counters.epep, + erts_poll_get_selected_events_flbk(get_fallback_pollset(), counters.epep, drv_ev_state.max_fds); for (fd = 0; fd < len; fd++) { if (drv_ev_state.v[fd].flags & ERTS_EV_FLAG_FALLBACK) doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters, dsbufp); } #endif +#if ERTS_POLL_USE_SCHEDULER_POLLING + erts_dsprintf(dsbufp, "--- fds in scheduler pollset ----------------------------\n"); + erts_poll_get_selected_events(get_scheduler_pollset(0), counters.epep, + drv_ev_state.max_fds); + for (fd = 0; fd < len; fd++) { + if (drv_ev_state.v[fd].flags & ERTS_EV_FLAG_SCHEDULER) { + if (drv_ev_state.v[fd].events && drv_ev_state.v[fd].events != ERTS_POLL_EV_NONE) + counters.epep[fd] &= ~ERTS_POLL_EV_OUT; + doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters, dsbufp); + } + } +#endif + erts_dsprintf(dsbufp, "--- fds in pollset --------------------------------------\n"); for (i = 0; i < erts_no_pollsets; i++) { @@ -2674,8 +2797,15 @@ erts_check_io_debug(ErtsCheckIoDebugInfo *ciodip) drv_ev_state.max_fds); for (fd = 0; fd < len; fd++) { if (!(drv_ev_state.v[fd].flags & ERTS_EV_FLAG_FALLBACK) - && get_pollset_id(fd) == i) + && get_pollset_id(fd) == i) { + if (counters.epep[fd] != ERTS_POLL_EV_NONE && + drv_ev_state.v[fd].flags & ERTS_EV_FLAG_IN_SCHEDULER) { + /* We add the in flag if it is enabled in the scheduler pollset + and get_selected_events works on the platform */ + counters.epep[fd] |= ERTS_POLL_EV_IN; + } doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters, dsbufp); + } } } for (fd = len ; fd < drv_ev_state.max_fds; fd++) { @@ -2722,7 +2852,7 @@ void erts_lcnt_update_cio_locks(int enable) { #endif #if ERTS_POLL_USE_FALLBACK - erts_lcnt_enable_pollset_lock_count_flbk(get_fallback(), enable); + erts_lcnt_enable_pollset_lock_count_flbk(get_fallback_pollset(), enable); #endif for (i = 0; i < erts_no_pollsets; i++) diff --git a/erts/emulator/sys/common/erl_check_io.h b/erts/emulator/sys/common/erl_check_io.h index 16aba8a5f3..31182be5ec 100644 --- a/erts/emulator/sys/common/erl_check_io.h +++ b/erts/emulator/sys/common/erl_check_io.h @@ -68,7 +68,7 @@ int erts_check_io_max_files(void); * * @param pt the poll thread structure to use. */ -void erts_check_io(struct erts_poll_thread *pt); +void erts_check_io(struct erts_poll_thread *pt, ErtsMonotonicTime timeout_time); /** * Initialize the check io framework. This function will parse the arguments * and delete any entries that it is interested in. @@ -129,16 +129,6 @@ extern int erts_no_poll_threads; #include "erl_poll.h" #include "erl_port_task.h" -#ifdef __WIN32__ -/* - * Current erts_poll implementation for Windows cannot handle - * active events in the set of events polled. - */ -# define ERTS_CIO_DEFER_ACTIVE_EVENTS 1 -#else -# define ERTS_CIO_DEFER_ACTIVE_EVENTS 1 -#endif - typedef struct { Eterm inport; Eterm outport; @@ -150,10 +140,6 @@ struct erts_nif_select_event { Eterm pid; Eterm immed; Uint32 refn[ERTS_REF_NUMBERS]; - Sint32 ddeselect_cnt; /* 0: No delayed deselect in progress - * 1: Do deselect before next poll - * >1: Countdown of ignored events - */ }; typedef struct { diff --git a/erts/emulator/sys/common/erl_poll.c b/erts/emulator/sys/common/erl_poll.c index 5a5874ddfa..51d50933ff 100644 --- a/erts/emulator/sys/common/erl_poll.c +++ b/erts/emulator/sys/common/erl_poll.c @@ -121,7 +121,8 @@ /* Define to print info about modifications done to each fd */ #define DEBUG_PRINT_FD(FMT, PS, FD, ...) DEBUG_PRINT("%d: " FMT, PS, FD, ##__VA_ARGS__) /* Define to print entry and exit from erts_poll_wait (can be very spammy) */ -//#define DEBUG_PRINT_WAIT(FMT, PS, ...) DEBUG_PRINT(FMT, PS, ##__VA_ARGS__) +// #define DEBUG_PRINT_WAIT(FMT, PS, ...) DEBUG_PRINT(FMT, PS, ##__VA_ARGS__) +// #define DEBUG_PRINT_WAIT(FMT, PS, ...) do { if ((PS)->id != -1) DEBUG_PRINT(FMT, PS, ##__VA_ARGS__); } while(0) #else #define ERTS_POLL_DEBUG_PRINT 0 @@ -200,7 +201,7 @@ int ERTS_SELECT(int nfds, ERTS_fd_set *readfds, ERTS_fd_set *writefds, #define ERTS_POLL_USE_CONCURRENT_UPDATE (ERTS_POLL_USE_EPOLL || ERTS_POLL_USE_KQUEUE) -#define ERTS_POLL_USE_WAKEUP_PIPE (!ERTS_POLL_USE_CONCURRENT_UPDATE) +#define ERTS_POLL_USE_WAKEUP(ps) (!ERTS_POLL_USE_CONCURRENT_UPDATE || (ps)->id < 0) #if !ERTS_POLL_USE_CONCURRENT_UPDATE @@ -269,6 +270,7 @@ struct ERTS_POLL_EXPORT(erts_pollset) { #if ERTS_POLL_USE_KERNEL_POLL int kp_fd; + int oneshot; #endif /* ERTS_POLL_USE_KERNEL_POLL */ #if ERTS_POLL_USE_POLL @@ -295,12 +297,16 @@ struct ERTS_POLL_EXPORT(erts_pollset) { ErtsPollSetUpdateRequestsBlock *curr_upd_req_block; erts_atomic32_t have_update_requests; erts_mtx_t mtx; - erts_atomic32_t wakeup_state; +#else + int do_wakeup; #endif -#if ERTS_POLL_USE_WAKEUP_PIPE - int wake_fds[2]; +#if ERTS_POLL_USE_TIMERFD + int timer_fd; #endif + ErtsMonotonicTime timeout_time; + erts_atomic32_t wakeup_state; + int wake_fds[2]; }; void erts_silence_warn_unused_result(long unused); @@ -365,63 +371,47 @@ static void print_misc_debug_info(void); uint32_t epoll_events(int kp_fd, int fd); #endif - #define ERTS_POLL_NOT_WOKEN 0 #define ERTS_POLL_WOKEN -1 #define ERTS_POLL_WOKEN_INTR 1 -#if !ERTS_POLL_USE_CONCURRENT_UPDATE static ERTS_INLINE void reset_wakeup_state(ErtsPollSet *ps) { erts_atomic32_set_mb(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN); } -#endif static ERTS_INLINE int is_woken(ErtsPollSet *ps) { -#if !ERTS_POLL_USE_CONCURRENT_UPDATE return erts_atomic32_read_acqb(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN; -#else - return 0; -#endif } static ERTS_INLINE int is_interrupted_reset(ErtsPollSet *ps) { -#if !ERTS_POLL_USE_CONCURRENT_UPDATE return (erts_atomic32_xchg_acqb(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN) == ERTS_POLL_WOKEN_INTR); -#else - return 0; -#endif } static ERTS_INLINE void woke_up(ErtsPollSet *ps) { -#if !ERTS_POLL_USE_CONCURRENT_UPDATE erts_aint32_t wakeup_state = erts_atomic32_read_acqb(&ps->wakeup_state); if (wakeup_state == ERTS_POLL_NOT_WOKEN) (void) erts_atomic32_cmpxchg_nob(&ps->wakeup_state, ERTS_POLL_WOKEN, ERTS_POLL_NOT_WOKEN); ASSERT(erts_atomic32_read_nob(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN); -#endif } /* * --- Wakeup pipe ----------------------------------------------------------- */ -#if ERTS_POLL_USE_WAKEUP_PIPE - static ERTS_INLINE void wake_poller(ErtsPollSet *ps, int interrupted) { -#if !ERTS_POLL_USE_CONCURRENT_UPDATE int wake; erts_aint32_t wakeup_state; if (!interrupted) @@ -434,9 +424,9 @@ wake_poller(ErtsPollSet *ps, int interrupted) wake = wakeup_state == ERTS_POLL_NOT_WOKEN; if (wake) -#endif { ssize_t res; + DEBUG_PRINT_WAIT("wake_poller(%d)", ps, interrupted); if (ps->wake_fds[1] < 0) return; /* Not initialized yet */ do { @@ -474,10 +464,8 @@ cleanup_wakeup_pipe(ErtsPollSet *ps) fd, erl_errno_id(errno), errno); } -#if !ERTS_POLL_USE_CONCURRENT_UPDATE if (intr) erts_atomic32_set_nob(&ps->wakeup_state, ERTS_POLL_WOKEN_INTR); -#endif } static void @@ -513,7 +501,67 @@ create_wakeup_pipe(ErtsPollSet *ps) ps->wake_fds[1] = wake_fds[1]; } +/* + * --- timer fd ----------------------------------------------------------- + */ + +#if ERTS_POLL_USE_TIMERFD + +/* We use the timerfd when using epoll_wait to get high accuracy + timeouts, i.e. we want to sleep with < ms accuracy. */ + +static void +create_timerfd(ErtsPollSet *ps) +{ + int do_wake = 0; + int timer_fd = timerfd_create(CLOCK_MONOTONIC,0); + ERTS_POLL_EXPORT(erts_poll_control)(ps, + timer_fd, + ERTS_POLL_OP_ADD, + ERTS_POLL_EV_IN, + &do_wake); + if (ps->internal_fd_limit <= timer_fd) + ps->internal_fd_limit = timer_fd + 1; + ps->timer_fd = timer_fd; +} + +static ERTS_INLINE void +timerfd_set(ErtsPollSet *ps, struct itimerspec *its) +{ +#ifdef DEBUG + struct itimerspec old_its; + int res; + res = timerfd_settime(ps->timer_fd, 0, its, &old_its); + ASSERT(res == 0); + ASSERT(old_its.it_interval.tv_sec == 0 && + old_its.it_interval.tv_nsec == 0 && + old_its.it_value.tv_sec == 0 && + old_its.it_value.tv_nsec == 0); + +#else + timerfd_settime(ps->timer_fd, 0, its, NULL); #endif +} + +static ERTS_INLINE int +timerfd_clear(ErtsPollSet *ps, ErtsPollResFd pr[], int res, int max_res) { + + struct itimerspec its; + /* we always have to clear the timer */ + its.it_interval.tv_sec = 0; + its.it_interval.tv_nsec = 0; + its.it_value.tv_sec = 0; + its.it_value.tv_nsec = 0; + timerfd_settime(ps->timer_fd, 0, &its, NULL); + + /* only timeout fd triggered */ + if (res == 1 && pr[0].data.fd == ps->timer_fd) + return 0; + + return res; +} + +#endif /* ERTS_POLL_USE_TIMERFD */ /* * --- Poll set update requests ---------------------------------------------- @@ -691,9 +739,12 @@ update_pollset(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events) struct epoll_event epe_templ; struct epoll_event epe; - epe_templ.events = ERTS_POLL_EV_E2N(events) | EPOLLONESHOT; + epe_templ.events = ERTS_POLL_EV_E2N(events); epe_templ.data.fd = fd; + if (ps->oneshot) + epe_templ.events |= EPOLLONESHOT; + #ifdef VALGRIND /* Silence invalid valgrind warning ... */ memset((void *) &epe.data, 0, sizeof(epoll_data_t)); @@ -802,6 +853,7 @@ update_pollset(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events) int res = 0, len = 0; struct kevent evts[2]; struct timespec ts = {0, 0}; + uint32_t oneshot = 0; if (op == ERTS_POLL_OP_ADD) { /* This is a hack to make the "noshell" option work; kqueue can poll @@ -840,6 +892,9 @@ update_pollset(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events) man page), but it seems to be the way it works... */ + if (ps->oneshot) + oneshot = EV_DISPATCH; + if (op == ERTS_POLL_OP_DEL) { erts_atomic_dec_nob(&ps->no_of_user_fds); /* We could probably skip this delete, do we want to? */ @@ -849,27 +904,29 @@ update_pollset(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events) uint32_t flags; erts_atomic_inc_nob(&ps->no_of_user_fds); - flags = EV_ADD|EV_DISPATCH; + flags = EV_ADD|oneshot; flags |= ((events & ERTS_POLL_EV_IN) ? 0 : EV_DISABLE); ERTS_EV_SET(&evts[len++], fd, EVFILT_READ, flags, (void *) ERTS_POLL_EV_IN); - flags = EV_ADD|EV_DISPATCH; + flags = EV_ADD|oneshot; flags |= ((events & ERTS_POLL_EV_OUT) ? 0 : EV_DISABLE); ERTS_EV_SET(&evts[len++], fd, EVFILT_WRITE, flags, (void *) ERTS_POLL_EV_OUT); } else { uint32_t flags; ASSERT(op == ERTS_POLL_OP_MOD); - flags = EV_DISPATCH; + flags = oneshot; flags |= (events & ERTS_POLL_EV_IN) ? EV_ENABLE : EV_DISABLE; ERTS_EV_SET(&evts[len++], fd, EVFILT_READ, flags, (void *) ERTS_POLL_EV_IN); - flags = EV_DISPATCH; + flags = oneshot; flags |= (events & ERTS_POLL_EV_OUT) ? EV_ENABLE : EV_DISABLE; ERTS_EV_SET(&evts[len++], fd, EVFILT_WRITE, flags, (void *) ERTS_POLL_EV_OUT); } #else - uint32_t flags = EV_ADD|EV_ONESHOT; + uint32_t flags = EV_ADD; + + if (ps->oneshot) flags |= EV_ONESHOT; if (op == ERTS_POLL_OP_DEL) { erts_atomic_dec_nob(&ps->no_of_user_fds); @@ -903,14 +960,17 @@ update_pollset(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events) keventbp += sprintf(keventbp, "kevent(%d, {",ps->kp_fd); for (i = 0; i < len; i++) { const char *flags = "UNKNOWN"; - if (evts[i].flags == EV_DELETE) flags = "EV_DELETE"; + if (evts[i].flags == (EV_DELETE)) flags = "EV_DELETE"; if (evts[i].flags == (EV_ADD|EV_ONESHOT)) flags = "EV_ADD|EV_ONESHOT"; + if (evts[i].flags == (EV_ADD)) flags = "EV_ADD"; #ifdef EV_DISPATCH if (evts[i].flags == (EV_ADD|EV_DISPATCH)) flags = "EV_ADD|EV_DISPATCH"; if (evts[i].flags == (EV_ADD|EV_DISABLE)) flags = "EV_ADD|EV_DISABLE"; if (evts[i].flags == (EV_ENABLE|EV_DISPATCH)) flags = "EV_ENABLE|EV_DISPATCH"; - if (evts[i].flags == EV_DISABLE) flags = "EV_DISABLE"; + if (evts[i].flags == (EV_ENABLE)) flags = "EV_ENABLE"; + if (evts[i].flags == (EV_DISABLE)) flags = "EV_DISABLE"; if (evts[i].flags == (EV_DISABLE|EV_DISPATCH)) flags = "EV_DISABLE|EV_DISABLE"; + if (evts[i].flags == (EV_DISABLE)) flags = "EV_DISABLE"; #endif keventbp += sprintf(keventbp, "%s{%lu, %s, %s}",i > 0 ? ", " : "", @@ -1273,11 +1333,15 @@ poll_control(ErtsPollSet *ps, int fd, ErtsPollOp op, goto done; } #endif -#if ERTS_POLL_USE_WAKEUP_PIPE if (fd == ps->wake_fds[0] || fd == ps->wake_fds[1]) { new_events = ERTS_POLL_EV_NVAL; goto done; } +#if ERTS_POLL_USE_TIMERFD + if (fd == ps->timer_fd) { + new_events = ERTS_POLL_EV_NVAL; + goto done; + } #endif } @@ -1333,11 +1397,8 @@ ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet *ps, ERTS_POLLSET_UNLOCK(ps); -#if !ERTS_POLL_USE_CONCURRENT_UPDATE - if (*do_wake) { + if (*do_wake) wake_poller(ps, 0); - } -#endif return res; } @@ -1351,52 +1412,61 @@ ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet *ps, static ERTS_INLINE int ERTS_POLL_EXPORT(save_result)(ErtsPollSet *ps, ErtsPollResFd pr[], int max_res, int chk_fds_res, int ebadf) { -#if !ERTS_POLL_USE_CONCURRENT_UPDATE || ERTS_POLL_DEBUG_PRINT || ERTS_POLL_USE_WAKEUP_PIPE int n = chk_fds_res < max_res ? chk_fds_res : max_res, i; int res = n; -#if ERTS_POLL_USE_WAKEUP_PIPE int wake_fd = ps->wake_fds[0]; -#endif - for (i = 0; i < n; i++) { - int fd = ERTS_POLL_RES_GET_FD(&pr[i]); -#ifdef DEBUG_PRINT_MODE - ErtsPollEvents evts = ERTS_POLL_RES_GET_EVTS(pr+i); -#endif + if (ERTS_POLL_USE_WAKEUP(ps) || ERTS_POLL_DEBUG_PRINT || ERTS_POLL_USE_TIMERFD) { - DEBUG_PRINT_FD("trig %s (%s)", ps, fd, - ev2str(evts), + for (i = 0; i < n; i++) { + int fd = ERTS_POLL_RES_GET_FD(&pr[i]); +#if ERTS_POLL_DEBUG_PRINT + ErtsPollEvents evts = ERTS_POLL_RES_GET_EVTS(pr+i); + + if (fd != wake_fd +#if ERTS_POLL_USE_TIMERFD + && fd != ps->timer_fd +#endif + ) + DEBUG_PRINT_FD("trig %s (%s)", ps, fd, + ev2str(evts), #if ERTS_POLL_USE_KQUEUE - "kqueue" + "kqueue" #elif ERTS_POLL_USE_EPOLL - "epoll" + "epoll" #else - "/dev/poll" + "/dev/poll" +#endif + ); #endif - ); -#if ERTS_POLL_USE_WAKEUP_PIPE - if (fd == wake_fd) { - cleanup_wakeup_pipe(ps); - ERTS_POLL_RES_SET_EVTS(&pr[i], ERTS_POLL_EV_NONE); - if (n == 1) - return 0; - } + if (ERTS_POLL_USE_WAKEUP(ps) && fd == wake_fd) { + cleanup_wakeup_pipe(ps); + ERTS_POLL_RES_SET_FD(&pr[i], -1); + ERTS_POLL_RES_SET_EVTS(&pr[i], ERTS_POLL_EV_NONE); + res--; + } +#if ERTS_POLL_USE_TIMERFD + else if (fd == ps->timer_fd) { + ERTS_POLL_RES_SET_FD(&pr[i], -1); + ERTS_POLL_RES_SET_EVTS(&pr[i], ERTS_POLL_EV_NONE); + res--; + } #endif #if !ERTS_POLL_USE_CONCURRENT_UPDATE - else { - /* Reset the events to emulate ONESHOT semantics */ - ps->fds_status[fd].events = 0; - enqueue_update_request(ps, fd); - } + else { + /* Reset the events to emulate ONESHOT semantics */ + ps->fds_status[fd].events = 0; + enqueue_update_request(ps, fd); + } #endif + } } - return res; -#else - ASSERT(chk_fds_res <= max_res); - return chk_fds_res; -#endif + if (res == 0) + return res; + else + return n; } #else /* !ERTS_POLL_USE_KERNEL_POLL */ @@ -1577,19 +1647,168 @@ ERTS_POLL_EXPORT(save_result)(ErtsPollSet *ps, ErtsPollResFd pr[], int max_res, #endif /* !ERTS_POLL_USE_KERNEL_POLL */ +static ERTS_INLINE ErtsMonotonicTime +get_timeout(ErtsPollSet *ps, + int resolution, + ErtsMonotonicTime timeout_time) +{ + ErtsMonotonicTime timeout; + + if (timeout_time == ERTS_POLL_NO_TIMEOUT) { + timeout = 0; + } + else if (timeout_time == ERTS_POLL_INF_TIMEOUT) { + timeout = -1; + } + else { + ErtsMonotonicTime diff_time, current_time; + current_time = erts_get_monotonic_time(NULL); + diff_time = timeout_time - current_time; + if (diff_time <= 0) { + timeout = 0; + } + else { + switch (resolution) { + case 1000: + /* Round up to nearest even milli second */ + timeout = ERTS_MONOTONIC_TO_MSEC(diff_time - 1) + 1; + if (timeout > (ErtsMonotonicTime) INT_MAX) + timeout = (ErtsMonotonicTime) INT_MAX; + timeout -= ERTS_PREMATURE_TIMEOUT(timeout, 1000); + break; + case 1000000: + /* Round up to nearest even micro second */ + timeout = ERTS_MONOTONIC_TO_USEC(diff_time - 1) + 1; + timeout -= ERTS_PREMATURE_TIMEOUT(timeout, 1000*1000); + break; + case 1000000000: + /* Round up to nearest even nano second */ + timeout = ERTS_MONOTONIC_TO_NSEC(diff_time - 1) + 1; + timeout -= ERTS_PREMATURE_TIMEOUT(timeout, 1000*1000*1000); + break; + default: + ERTS_INTERNAL_ERROR("Invalid resolution"); + timeout = 0; + break; + } + } + } + return timeout; +} + +#if ERTS_POLL_USE_SELECT + +static ERTS_INLINE int +get_timeout_timeval(ErtsPollSet *ps, + SysTimeval *tvp, + ErtsMonotonicTime timeout_time) +{ + ErtsMonotonicTime timeout = get_timeout(ps, + 1000*1000, + timeout_time); + + if (!timeout) { + tvp->tv_sec = 0; + tvp->tv_usec = 0; + + return 0; + } + else if (timeout == -1) { + return -1; + } + else { + ErtsMonotonicTime sec = timeout/(1000*1000); + tvp->tv_sec = sec; + tvp->tv_usec = timeout - sec*(1000*1000); + + ASSERT(tvp->tv_sec >= 0); + ASSERT(tvp->tv_usec >= 0); + ASSERT(tvp->tv_usec < 1000*1000); + + return 1; + } + +} + +#endif + +#if ERTS_POLL_USE_KQUEUE || (ERTS_POLL_USE_POLL && defined(HAVE_PPOLL)) || ERTS_POLL_USE_TIMERFD + static ERTS_INLINE int -check_fd_events(ErtsPollSet *ps, ErtsPollResFd pr[], int do_wait, int max_res) +get_timeout_timespec(ErtsPollSet *ps, + struct timespec *tsp, + ErtsMonotonicTime timeout_time) +{ + ErtsMonotonicTime timeout = get_timeout(ps, + 1000*1000*1000, + timeout_time); + + if (!timeout) { + tsp->tv_sec = 0; + tsp->tv_nsec = 0; + return 0; + } + else if (timeout == -1) { + return -1; + } + else { + ErtsMonotonicTime sec = timeout/(1000*1000*1000); + tsp->tv_sec = sec; + tsp->tv_nsec = timeout - sec*(1000*1000*1000); + + ASSERT(tsp->tv_sec >= 0); + ASSERT(tsp->tv_nsec >= 0); + ASSERT(tsp->tv_nsec < 1000*1000*1000); + + return 1; + } +} + +#endif + +#if ERTS_POLL_USE_TIMERFD + +static ERTS_INLINE int +get_timeout_itimerspec(ErtsPollSet *ps, + struct itimerspec *itsp, + ErtsMonotonicTime timeout_time) +{ + + itsp->it_interval.tv_sec = 0; + itsp->it_interval.tv_nsec = 0; + + return get_timeout_timespec(ps, &itsp->it_value, timeout_time); +} + +#endif + +static ERTS_INLINE int +check_fd_events(ErtsPollSet *ps, ErtsPollResFd pr[], int max_res, ErtsMonotonicTime timeout_time) { int res; - int timeout = do_wait ? -1 : 0; - DEBUG_PRINT_WAIT("Entering check_fd_events(), do_wait=%d", ps, do_wait); + int timeout; + DEBUG_PRINT_WAIT("Entering check_fd_events(), timeout=%d", ps, timeout_time); { #if ERTS_POLL_USE_EPOLL /* --- epoll ------------------------------- */ +#if ERTS_POLL_USE_TIMERFD + struct itimerspec its; + timeout = get_timeout_itimerspec(ps, &its, timeout_time); + if (timeout > 0) { + timerfd_set(ps, &its); + res = epoll_wait(ps->kp_fd, pr, max_res, -1); + res = timerfd_clear(ps, pr, res, max_res); + } else { + res = epoll_wait(ps->kp_fd, pr, max_res, timeout); + } +#else /* !ERTS_POLL_USE_TIMERFD */ + timeout = (int) get_timeout(ps, 1000, timeout_time); res = epoll_wait(ps->kp_fd, pr, max_res, timeout); - +#endif /* !ERTS_POLL_USE_TIMERFD */ #elif ERTS_POLL_USE_KQUEUE /* --- kqueue ------------------------------ */ - struct timespec ts = {0, 0}; - struct timespec *tsp = timeout ? NULL : &ts; + struct timespec ts; + struct timespec *tsp; + timeout = get_timeout_timespec(ps, &ts, timeout_time); + tsp = timeout < 0 ? NULL : &ts; res = kevent(ps->kp_fd, NULL, 0, pr, max_res, tsp); #elif ERTS_POLL_USE_DEVPOLL /* --- devpoll ----------------------------- */ /* @@ -1601,16 +1820,22 @@ check_fd_events(ErtsPollSet *ps, ErtsPollResFd pr[], int do_wait, int max_res) int nfds = (int) erts_atomic_read_nob(&ps->no_of_user_fds) + 1 /* wakeup pipe */; poll_res.dp_nfds = nfds < max_res ? nfds : max_res; poll_res.dp_fds = pr; - poll_res.dp_timeout = timeout; + poll_res.dp_timeout = (int) get_timeout(ps, 1000, timeout_time); res = ioctl(ps->kp_fd, DP_POLL, &poll_res); - +#elif ERTS_POLL_USE_POLL && defined(HAVE_PPOLL) /* --- ppoll ---------------- */ + struct timespec ts; + struct timespec *tsp = &ts; + timeout = get_timeout_timespec(ps, &ts, timeout_time); + if (timeout < 0) tsp = NULL; + res = ppoll(ps->poll_fds, ps->no_poll_fds, tsp, NULL); #elif ERTS_POLL_USE_POLL /* --- poll --------------------------------- */ - + timeout = (int) get_timeout(ps, 1000, timeout_time); res = poll(ps->poll_fds, ps->no_poll_fds, timeout); - #elif ERTS_POLL_USE_SELECT /* --- select ------------------------------ */ - SysTimeval tv = {0, 0}; - SysTimeval *tvp = timeout ? NULL : &tv; + SysTimeval tv; + SysTimeval *tvp; + timeout = get_timeout_timeval(ps, &tv, timeout_time); + tvp = timeout < 0 ? NULL : &tv; ERTS_FD_COPY(&ps->input_fds, &ps->res_input_fds); ERTS_FD_COPY(&ps->output_fds, &ps->res_output_fds); @@ -1630,7 +1855,8 @@ int ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, ErtsPollResFd pr[], int *len, - ErtsThrPrgrData *tpd) + ErtsThrPrgrData *tpd, + ErtsMonotonicTime timeout_time) { int res, no_fds, used_fds = 0; int ebadf = 0; @@ -1655,53 +1881,56 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, } #endif - do_wait = !is_woken(ps) && used_fds == 0; + do_wait = !is_woken(ps) && used_fds == 0 && timeout_time != ERTS_POLL_NO_TIMEOUT; DEBUG_PRINT_WAIT("Entering %s(), do_wait=%d", ps, __FUNCTION__, do_wait); if (do_wait) { + tpd = tpd ? tpd : erts_thr_prgr_data(NULL); erts_thr_progress_prepare_wait(tpd); ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_SLEEP); - } + } else + timeout_time = ERTS_POLL_NO_TIMEOUT; while (1) { - res = check_fd_events(ps, pr + used_fds, do_wait, no_fds - used_fds); + res = check_fd_events(ps, pr + used_fds, no_fds - used_fds, timeout_time); + if (res != 0) + break; + if (timeout_time == ERTS_POLL_NO_TIMEOUT) + break; + if (erts_get_monotonic_time(NULL) >= timeout_time) + break; + } #if !ERTS_POLL_USE_CONCURRENT_UPDATE - if (res < 0 - && errno == EBADF - && ERTS_POLLSET_HAVE_UPDATE_REQUESTS(ps)) { - /* - * This may have happened because another thread deselected - * a fd in our poll set and then closed it, i.e. the driver - * behaved correctly. We wan't to avoid looking for a bad - * fd, that may even not exist anymore. Therefore, handle - * update requests and try again. This behaviour should only - * happen when using SELECT as the polling mechanism. - */ - ERTS_POLLSET_LOCK(ps); - used_fds += handle_update_requests(ps, pr + used_fds, no_fds - used_fds); - if (used_fds == no_fds) { - *len = used_fds; - ERTS_POLLSET_UNLOCK(ps); - return 0; - } - res = check_fd_events(ps, pr + used_fds, 0, no_fds - used_fds); - /* Keep the lock over the non-blocking poll in order to not - get any nasty races happening. */ + if (res < 0 + && errno == EBADF + && ERTS_POLLSET_HAVE_UPDATE_REQUESTS(ps)) { + /* + * This may have happened because another thread deselected + * a fd in our poll set and then closed it, i.e. the driver + * behaved correctly. We wan't to avoid looking for a bad + * fd, that may even not exist anymore. Therefore, handle + * update requests and try again. This behaviour should only + * happen when using SELECT as the polling mechanism. + */ + ERTS_POLLSET_LOCK(ps); + used_fds += handle_update_requests(ps, pr + used_fds, no_fds - used_fds); + if (used_fds == no_fds) { + *len = used_fds; ERTS_POLLSET_UNLOCK(ps); - if (res == 0) { - errno = EAGAIN; - res = -1; - } + return 0; + } + res = check_fd_events(ps, pr + used_fds, no_fds - used_fds, ERTS_POLL_NO_TIMEOUT); + /* Keep the lock over the non-blocking poll in order to not + get any nasty races happening. */ + ERTS_POLLSET_UNLOCK(ps); + if (res == 0) { + errno = EAGAIN; + res = -1; } -#endif - - if (res != 0) - break; - if (!do_wait) - break; } +#endif if (do_wait) { erts_thr_progress_finalize_wait(tpd); @@ -1709,7 +1938,8 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_CHECK_IO); } - woke_up(ps); + if (ERTS_POLL_USE_WAKEUP(ps)) + woke_up(ps); if (res < 0) { #if ERTS_POLL_USE_SELECT @@ -1720,11 +1950,16 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, #endif res = errno; } - else { + else if (res == 0) { + res = used_fds == 0 ? ETIMEDOUT : 0; +#ifdef HARD_DEBUG + check_poll_result(pr, used_fds); +#endif + *len = used_fds; + } else { #if ERTS_POLL_USE_SELECT save_results: #endif - ps_locked = 1; ERTS_POLLSET_LOCK(ps); @@ -1754,12 +1989,13 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, void ERTS_POLL_EXPORT(erts_poll_interrupt)(ErtsPollSet *ps, int set) { -#if !ERTS_POLL_USE_CONCURRENT_UPDATE - if (!set) - reset_wakeup_state(ps); - else - wake_poller(ps, 1); -#endif + DEBUG_PRINT_WAIT("poll_interrupt(%d)", ps, set); + if (ERTS_POLL_USE_WAKEUP(ps)) { + if (!set) + reset_wakeup_state(ps); + else + wake_poller(ps, 1); + } } int @@ -1875,10 +2111,20 @@ ERTS_POLL_EXPORT(erts_poll_create_pollset)(int id) if (ps->internal_fd_limit <= kp_fd) ps->internal_fd_limit = kp_fd + 1; ps->kp_fd = kp_fd; + if (ps->id == -1) + ps->oneshot = 0; + else + ps->oneshot = 1; #endif -#if !ERTS_POLL_USE_CONCURRENT_UPDATE + erts_atomic32_init_nob(&ps->wakeup_state, (erts_aint32_t) 0); create_wakeup_pipe(ps); + +#if ERTS_POLL_USE_TIMERFD + create_timerfd(ps); +#endif + +#if !ERTS_POLL_USE_CONCURRENT_UPDATE handle_update_requests(ps, NULL, 0); cleanup_wakeup_pipe(ps); #endif @@ -1993,9 +2239,7 @@ ERTS_POLL_EXPORT(erts_poll_info)(ErtsPollSet *ps, ErtsPollInfo *pip) pip->memory_size = size; pip->poll_set_size = (int) erts_atomic_read_nob(&ps->no_of_user_fds); -#if !ERTS_POLL_USE_CONCURRENT_UPDATE pip->poll_set_size++; /* Wakeup pipe */ -#endif pip->lazy_updates = #if !ERTS_POLL_USE_CONCURRENT_UPDATE @@ -2178,6 +2422,12 @@ ERTS_POLL_EXPORT(erts_poll_get_selected_events)(ErtsPollSet *ps, ASSERT(0); return; } + if (fd == ps->wake_fds[0] || fd == ps->wake_fds[1]) + continue; +#if ERTS_POLL_USE_TIMERFD + if (fd == ps->timer_fd) + continue; +#endif data &= 0xFFFFFFFF; ASSERT(fd == data); /* Events are the events that are being monitored, which of course include diff --git a/erts/emulator/sys/common/erl_poll.h b/erts/emulator/sys/common/erl_poll.h index e1cea7eb8b..d40dabc529 100644 --- a/erts/emulator/sys/common/erl_poll.h +++ b/erts/emulator/sys/common/erl_poll.h @@ -51,6 +51,7 @@ #include "sys.h" #define ERTS_POLL_NO_TIMEOUT ERTS_MONOTONIC_TIME_MIN +#define ERTS_POLL_INF_TIMEOUT ERTS_MONOTONIC_TIME_MAX #ifdef ERTS_ENABLE_KERNEL_POLL # undef ERTS_ENABLE_KERNEL_POLL @@ -130,6 +131,9 @@ #endif #define ERTS_POLL_USE_FALLBACK (ERTS_POLL_USE_KQUEUE || ERTS_POLL_USE_EPOLL) +#define ERTS_POLL_USE_SCHEDULER_POLLING (ERTS_POLL_USE_KQUEUE || ERTS_POLL_USE_EPOLL) +#define ERTS_POLL_SCHEDULER_POLLING_TIMEOUT 10 +#define ERTS_POLL_USE_TIMERFD 0 typedef Uint32 ErtsPollEvents; @@ -156,6 +160,14 @@ typedef enum { #include +#if ERTS_POLL_USE_EPOLL +#ifdef HAVE_SYS_TIMERFD_H +#include +#undef ERTS_POLL_USE_TIMERFD +#define ERTS_POLL_USE_TIMERFD 1 +#endif +#endif + #define ERTS_POLL_EV_E2N(EV) \ ((uint32_t) (EV)) #define ERTS_POLL_EV_N2E(EV) \ @@ -276,7 +288,7 @@ typedef struct _ErtsPollResFd { #endif -#define ERTS_POLL_EV_NONE (UINT_MAX & ~(ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT|ERTS_POLL_EV_NVAL|ERTS_POLL_EV_ERR)) +#define ERTS_POLL_EV_NONE ERTS_POLL_EV_N2E((UINT_MAX & ~(ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT|ERTS_POLL_EV_NVAL|ERTS_POLL_EV_ERR))) #define ev2str(ev) \ (((ev) == 0 || (ev) == ERTS_POLL_EV_NONE) ? "NONE" : \ diff --git a/erts/emulator/sys/common/erl_poll_api.h b/erts/emulator/sys/common/erl_poll_api.h index f35f64a9f3..f3a91e54f7 100644 --- a/erts/emulator/sys/common/erl_poll_api.h +++ b/erts/emulator/sys/common/erl_poll_api.h @@ -73,12 +73,14 @@ ErtsPollEvents ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet *ps, * @param[in] length the length of the res array * @param[out] length the number of ready events returned in res * @param tpd the thread progress data to note sleep state in + * @param timeout_time the time in native to wake up at * @return 0 on success, else the ERRNO of the error that happened. */ int ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, ErtsPollResFd res[], int *length, - ErtsThrPrgrData *tpd); + ErtsThrPrgrData *tpd, + ErtsMonotonicTime timeout_time); /** * Interrupt the thread waiting in the pollset. This function should be called * with set = 0 before any thread calls erts_poll_wait in order to clear any diff --git a/erts/emulator/sys/win32/erl_poll.c b/erts/emulator/sys/win32/erl_poll.c index 5d832f4d34..3843a27a6e 100644 --- a/erts/emulator/sys/win32/erl_poll.c +++ b/erts/emulator/sys/win32/erl_poll.c @@ -1018,10 +1018,11 @@ ErtsPollEvents erts_poll_control(ErtsPollSet *ps, int erts_poll_wait(ErtsPollSet *ps, ErtsPollResFd pr[], int *len, - ErtsThrPrgrData *tpd) + ErtsThrPrgrData *tpd, + Sint64 timeout_in) { int no_fds; - DWORD timeout = INFINITE; + DWORD timeout = timeout_in == -1 ? INFINITE : timeout_in; EventData* ev; int res = 0; int num = 0; -- cgit v1.2.3