diff options
Diffstat (limited to 'erts/emulator/sys/common/erl_poll.c')
-rw-r--r-- | erts/emulator/sys/common/erl_poll.c | 504 |
1 files changed, 377 insertions, 127 deletions
diff --git a/erts/emulator/sys/common/erl_poll.c b/erts/emulator/sys/common/erl_poll.c index 5a5874ddfa..51d50933ff 100644 --- a/erts/emulator/sys/common/erl_poll.c +++ b/erts/emulator/sys/common/erl_poll.c @@ -121,7 +121,8 @@ /* Define to print info about modifications done to each fd */ #define DEBUG_PRINT_FD(FMT, PS, FD, ...) DEBUG_PRINT("%d: " FMT, PS, FD, ##__VA_ARGS__) /* Define to print entry and exit from erts_poll_wait (can be very spammy) */ -//#define DEBUG_PRINT_WAIT(FMT, PS, ...) DEBUG_PRINT(FMT, PS, ##__VA_ARGS__) +// #define DEBUG_PRINT_WAIT(FMT, PS, ...) DEBUG_PRINT(FMT, PS, ##__VA_ARGS__) +// #define DEBUG_PRINT_WAIT(FMT, PS, ...) do { if ((PS)->id != -1) DEBUG_PRINT(FMT, PS, ##__VA_ARGS__); } while(0) #else #define ERTS_POLL_DEBUG_PRINT 0 @@ -200,7 +201,7 @@ int ERTS_SELECT(int nfds, ERTS_fd_set *readfds, ERTS_fd_set *writefds, #define ERTS_POLL_USE_CONCURRENT_UPDATE (ERTS_POLL_USE_EPOLL || ERTS_POLL_USE_KQUEUE) -#define ERTS_POLL_USE_WAKEUP_PIPE (!ERTS_POLL_USE_CONCURRENT_UPDATE) +#define ERTS_POLL_USE_WAKEUP(ps) (!ERTS_POLL_USE_CONCURRENT_UPDATE || (ps)->id < 0) #if !ERTS_POLL_USE_CONCURRENT_UPDATE @@ -269,6 +270,7 @@ struct ERTS_POLL_EXPORT(erts_pollset) { #if ERTS_POLL_USE_KERNEL_POLL int kp_fd; + int oneshot; #endif /* ERTS_POLL_USE_KERNEL_POLL */ #if ERTS_POLL_USE_POLL @@ -295,12 +297,16 @@ struct ERTS_POLL_EXPORT(erts_pollset) { ErtsPollSetUpdateRequestsBlock *curr_upd_req_block; erts_atomic32_t have_update_requests; erts_mtx_t mtx; - erts_atomic32_t wakeup_state; +#else + int do_wakeup; #endif -#if ERTS_POLL_USE_WAKEUP_PIPE - int wake_fds[2]; +#if ERTS_POLL_USE_TIMERFD + int timer_fd; #endif + ErtsMonotonicTime timeout_time; + erts_atomic32_t wakeup_state; + int wake_fds[2]; }; void erts_silence_warn_unused_result(long unused); @@ -365,63 +371,47 @@ static void print_misc_debug_info(void); uint32_t epoll_events(int kp_fd, int fd); #endif - #define ERTS_POLL_NOT_WOKEN 0 #define ERTS_POLL_WOKEN -1 #define ERTS_POLL_WOKEN_INTR 1 -#if !ERTS_POLL_USE_CONCURRENT_UPDATE static ERTS_INLINE void reset_wakeup_state(ErtsPollSet *ps) { erts_atomic32_set_mb(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN); } -#endif static ERTS_INLINE int is_woken(ErtsPollSet *ps) { -#if !ERTS_POLL_USE_CONCURRENT_UPDATE return erts_atomic32_read_acqb(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN; -#else - return 0; -#endif } static ERTS_INLINE int is_interrupted_reset(ErtsPollSet *ps) { -#if !ERTS_POLL_USE_CONCURRENT_UPDATE return (erts_atomic32_xchg_acqb(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN) == ERTS_POLL_WOKEN_INTR); -#else - return 0; -#endif } static ERTS_INLINE void woke_up(ErtsPollSet *ps) { -#if !ERTS_POLL_USE_CONCURRENT_UPDATE erts_aint32_t wakeup_state = erts_atomic32_read_acqb(&ps->wakeup_state); if (wakeup_state == ERTS_POLL_NOT_WOKEN) (void) erts_atomic32_cmpxchg_nob(&ps->wakeup_state, ERTS_POLL_WOKEN, ERTS_POLL_NOT_WOKEN); ASSERT(erts_atomic32_read_nob(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN); -#endif } /* * --- Wakeup pipe ----------------------------------------------------------- */ -#if ERTS_POLL_USE_WAKEUP_PIPE - static ERTS_INLINE void wake_poller(ErtsPollSet *ps, int interrupted) { -#if !ERTS_POLL_USE_CONCURRENT_UPDATE int wake; erts_aint32_t wakeup_state; if (!interrupted) @@ -434,9 +424,9 @@ wake_poller(ErtsPollSet *ps, int interrupted) wake = wakeup_state == ERTS_POLL_NOT_WOKEN; if (wake) -#endif { ssize_t res; + DEBUG_PRINT_WAIT("wake_poller(%d)", ps, interrupted); if (ps->wake_fds[1] < 0) return; /* Not initialized yet */ do { @@ -474,10 +464,8 @@ cleanup_wakeup_pipe(ErtsPollSet *ps) fd, erl_errno_id(errno), errno); } -#if !ERTS_POLL_USE_CONCURRENT_UPDATE if (intr) erts_atomic32_set_nob(&ps->wakeup_state, ERTS_POLL_WOKEN_INTR); -#endif } static void @@ -513,7 +501,67 @@ create_wakeup_pipe(ErtsPollSet *ps) ps->wake_fds[1] = wake_fds[1]; } +/* + * --- timer fd ----------------------------------------------------------- + */ + +#if ERTS_POLL_USE_TIMERFD + +/* We use the timerfd when using epoll_wait to get high accuracy + timeouts, i.e. we want to sleep with < ms accuracy. */ + +static void +create_timerfd(ErtsPollSet *ps) +{ + int do_wake = 0; + int timer_fd = timerfd_create(CLOCK_MONOTONIC,0); + ERTS_POLL_EXPORT(erts_poll_control)(ps, + timer_fd, + ERTS_POLL_OP_ADD, + ERTS_POLL_EV_IN, + &do_wake); + if (ps->internal_fd_limit <= timer_fd) + ps->internal_fd_limit = timer_fd + 1; + ps->timer_fd = timer_fd; +} + +static ERTS_INLINE void +timerfd_set(ErtsPollSet *ps, struct itimerspec *its) +{ +#ifdef DEBUG + struct itimerspec old_its; + int res; + res = timerfd_settime(ps->timer_fd, 0, its, &old_its); + ASSERT(res == 0); + ASSERT(old_its.it_interval.tv_sec == 0 && + old_its.it_interval.tv_nsec == 0 && + old_its.it_value.tv_sec == 0 && + old_its.it_value.tv_nsec == 0); + +#else + timerfd_settime(ps->timer_fd, 0, its, NULL); #endif +} + +static ERTS_INLINE int +timerfd_clear(ErtsPollSet *ps, ErtsPollResFd pr[], int res, int max_res) { + + struct itimerspec its; + /* we always have to clear the timer */ + its.it_interval.tv_sec = 0; + its.it_interval.tv_nsec = 0; + its.it_value.tv_sec = 0; + its.it_value.tv_nsec = 0; + timerfd_settime(ps->timer_fd, 0, &its, NULL); + + /* only timeout fd triggered */ + if (res == 1 && pr[0].data.fd == ps->timer_fd) + return 0; + + return res; +} + +#endif /* ERTS_POLL_USE_TIMERFD */ /* * --- Poll set update requests ---------------------------------------------- @@ -691,9 +739,12 @@ update_pollset(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events) struct epoll_event epe_templ; struct epoll_event epe; - epe_templ.events = ERTS_POLL_EV_E2N(events) | EPOLLONESHOT; + epe_templ.events = ERTS_POLL_EV_E2N(events); epe_templ.data.fd = fd; + if (ps->oneshot) + epe_templ.events |= EPOLLONESHOT; + #ifdef VALGRIND /* Silence invalid valgrind warning ... */ memset((void *) &epe.data, 0, sizeof(epoll_data_t)); @@ -802,6 +853,7 @@ update_pollset(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events) int res = 0, len = 0; struct kevent evts[2]; struct timespec ts = {0, 0}; + uint32_t oneshot = 0; if (op == ERTS_POLL_OP_ADD) { /* This is a hack to make the "noshell" option work; kqueue can poll @@ -840,6 +892,9 @@ update_pollset(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events) man page), but it seems to be the way it works... */ + if (ps->oneshot) + oneshot = EV_DISPATCH; + if (op == ERTS_POLL_OP_DEL) { erts_atomic_dec_nob(&ps->no_of_user_fds); /* We could probably skip this delete, do we want to? */ @@ -849,27 +904,29 @@ update_pollset(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events) uint32_t flags; erts_atomic_inc_nob(&ps->no_of_user_fds); - flags = EV_ADD|EV_DISPATCH; + flags = EV_ADD|oneshot; flags |= ((events & ERTS_POLL_EV_IN) ? 0 : EV_DISABLE); ERTS_EV_SET(&evts[len++], fd, EVFILT_READ, flags, (void *) ERTS_POLL_EV_IN); - flags = EV_ADD|EV_DISPATCH; + flags = EV_ADD|oneshot; flags |= ((events & ERTS_POLL_EV_OUT) ? 0 : EV_DISABLE); ERTS_EV_SET(&evts[len++], fd, EVFILT_WRITE, flags, (void *) ERTS_POLL_EV_OUT); } else { uint32_t flags; ASSERT(op == ERTS_POLL_OP_MOD); - flags = EV_DISPATCH; + flags = oneshot; flags |= (events & ERTS_POLL_EV_IN) ? EV_ENABLE : EV_DISABLE; ERTS_EV_SET(&evts[len++], fd, EVFILT_READ, flags, (void *) ERTS_POLL_EV_IN); - flags = EV_DISPATCH; + flags = oneshot; flags |= (events & ERTS_POLL_EV_OUT) ? EV_ENABLE : EV_DISABLE; ERTS_EV_SET(&evts[len++], fd, EVFILT_WRITE, flags, (void *) ERTS_POLL_EV_OUT); } #else - uint32_t flags = EV_ADD|EV_ONESHOT; + uint32_t flags = EV_ADD; + + if (ps->oneshot) flags |= EV_ONESHOT; if (op == ERTS_POLL_OP_DEL) { erts_atomic_dec_nob(&ps->no_of_user_fds); @@ -903,14 +960,17 @@ update_pollset(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events) keventbp += sprintf(keventbp, "kevent(%d, {",ps->kp_fd); for (i = 0; i < len; i++) { const char *flags = "UNKNOWN"; - if (evts[i].flags == EV_DELETE) flags = "EV_DELETE"; + if (evts[i].flags == (EV_DELETE)) flags = "EV_DELETE"; if (evts[i].flags == (EV_ADD|EV_ONESHOT)) flags = "EV_ADD|EV_ONESHOT"; + if (evts[i].flags == (EV_ADD)) flags = "EV_ADD"; #ifdef EV_DISPATCH if (evts[i].flags == (EV_ADD|EV_DISPATCH)) flags = "EV_ADD|EV_DISPATCH"; if (evts[i].flags == (EV_ADD|EV_DISABLE)) flags = "EV_ADD|EV_DISABLE"; if (evts[i].flags == (EV_ENABLE|EV_DISPATCH)) flags = "EV_ENABLE|EV_DISPATCH"; - if (evts[i].flags == EV_DISABLE) flags = "EV_DISABLE"; + if (evts[i].flags == (EV_ENABLE)) flags = "EV_ENABLE"; + if (evts[i].flags == (EV_DISABLE)) flags = "EV_DISABLE"; if (evts[i].flags == (EV_DISABLE|EV_DISPATCH)) flags = "EV_DISABLE|EV_DISABLE"; + if (evts[i].flags == (EV_DISABLE)) flags = "EV_DISABLE"; #endif keventbp += sprintf(keventbp, "%s{%lu, %s, %s}",i > 0 ? ", " : "", @@ -1273,11 +1333,15 @@ poll_control(ErtsPollSet *ps, int fd, ErtsPollOp op, goto done; } #endif -#if ERTS_POLL_USE_WAKEUP_PIPE if (fd == ps->wake_fds[0] || fd == ps->wake_fds[1]) { new_events = ERTS_POLL_EV_NVAL; goto done; } +#if ERTS_POLL_USE_TIMERFD + if (fd == ps->timer_fd) { + new_events = ERTS_POLL_EV_NVAL; + goto done; + } #endif } @@ -1333,11 +1397,8 @@ ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet *ps, ERTS_POLLSET_UNLOCK(ps); -#if !ERTS_POLL_USE_CONCURRENT_UPDATE - if (*do_wake) { + if (*do_wake) wake_poller(ps, 0); - } -#endif return res; } @@ -1351,52 +1412,61 @@ ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet *ps, static ERTS_INLINE int ERTS_POLL_EXPORT(save_result)(ErtsPollSet *ps, ErtsPollResFd pr[], int max_res, int chk_fds_res, int ebadf) { -#if !ERTS_POLL_USE_CONCURRENT_UPDATE || ERTS_POLL_DEBUG_PRINT || ERTS_POLL_USE_WAKEUP_PIPE int n = chk_fds_res < max_res ? chk_fds_res : max_res, i; int res = n; -#if ERTS_POLL_USE_WAKEUP_PIPE int wake_fd = ps->wake_fds[0]; -#endif - for (i = 0; i < n; i++) { - int fd = ERTS_POLL_RES_GET_FD(&pr[i]); -#ifdef DEBUG_PRINT_MODE - ErtsPollEvents evts = ERTS_POLL_RES_GET_EVTS(pr+i); -#endif + if (ERTS_POLL_USE_WAKEUP(ps) || ERTS_POLL_DEBUG_PRINT || ERTS_POLL_USE_TIMERFD) { - DEBUG_PRINT_FD("trig %s (%s)", ps, fd, - ev2str(evts), + for (i = 0; i < n; i++) { + int fd = ERTS_POLL_RES_GET_FD(&pr[i]); +#if ERTS_POLL_DEBUG_PRINT + ErtsPollEvents evts = ERTS_POLL_RES_GET_EVTS(pr+i); + + if (fd != wake_fd +#if ERTS_POLL_USE_TIMERFD + && fd != ps->timer_fd +#endif + ) + DEBUG_PRINT_FD("trig %s (%s)", ps, fd, + ev2str(evts), #if ERTS_POLL_USE_KQUEUE - "kqueue" + "kqueue" #elif ERTS_POLL_USE_EPOLL - "epoll" + "epoll" #else - "/dev/poll" + "/dev/poll" +#endif + ); #endif - ); -#if ERTS_POLL_USE_WAKEUP_PIPE - if (fd == wake_fd) { - cleanup_wakeup_pipe(ps); - ERTS_POLL_RES_SET_EVTS(&pr[i], ERTS_POLL_EV_NONE); - if (n == 1) - return 0; - } + if (ERTS_POLL_USE_WAKEUP(ps) && fd == wake_fd) { + cleanup_wakeup_pipe(ps); + ERTS_POLL_RES_SET_FD(&pr[i], -1); + ERTS_POLL_RES_SET_EVTS(&pr[i], ERTS_POLL_EV_NONE); + res--; + } +#if ERTS_POLL_USE_TIMERFD + else if (fd == ps->timer_fd) { + ERTS_POLL_RES_SET_FD(&pr[i], -1); + ERTS_POLL_RES_SET_EVTS(&pr[i], ERTS_POLL_EV_NONE); + res--; + } #endif #if !ERTS_POLL_USE_CONCURRENT_UPDATE - else { - /* Reset the events to emulate ONESHOT semantics */ - ps->fds_status[fd].events = 0; - enqueue_update_request(ps, fd); - } + else { + /* Reset the events to emulate ONESHOT semantics */ + ps->fds_status[fd].events = 0; + enqueue_update_request(ps, fd); + } #endif + } } - return res; -#else - ASSERT(chk_fds_res <= max_res); - return chk_fds_res; -#endif + if (res == 0) + return res; + else + return n; } #else /* !ERTS_POLL_USE_KERNEL_POLL */ @@ -1577,19 +1647,168 @@ ERTS_POLL_EXPORT(save_result)(ErtsPollSet *ps, ErtsPollResFd pr[], int max_res, #endif /* !ERTS_POLL_USE_KERNEL_POLL */ +static ERTS_INLINE ErtsMonotonicTime +get_timeout(ErtsPollSet *ps, + int resolution, + ErtsMonotonicTime timeout_time) +{ + ErtsMonotonicTime timeout; + + if (timeout_time == ERTS_POLL_NO_TIMEOUT) { + timeout = 0; + } + else if (timeout_time == ERTS_POLL_INF_TIMEOUT) { + timeout = -1; + } + else { + ErtsMonotonicTime diff_time, current_time; + current_time = erts_get_monotonic_time(NULL); + diff_time = timeout_time - current_time; + if (diff_time <= 0) { + timeout = 0; + } + else { + switch (resolution) { + case 1000: + /* Round up to nearest even milli second */ + timeout = ERTS_MONOTONIC_TO_MSEC(diff_time - 1) + 1; + if (timeout > (ErtsMonotonicTime) INT_MAX) + timeout = (ErtsMonotonicTime) INT_MAX; + timeout -= ERTS_PREMATURE_TIMEOUT(timeout, 1000); + break; + case 1000000: + /* Round up to nearest even micro second */ + timeout = ERTS_MONOTONIC_TO_USEC(diff_time - 1) + 1; + timeout -= ERTS_PREMATURE_TIMEOUT(timeout, 1000*1000); + break; + case 1000000000: + /* Round up to nearest even nano second */ + timeout = ERTS_MONOTONIC_TO_NSEC(diff_time - 1) + 1; + timeout -= ERTS_PREMATURE_TIMEOUT(timeout, 1000*1000*1000); + break; + default: + ERTS_INTERNAL_ERROR("Invalid resolution"); + timeout = 0; + break; + } + } + } + return timeout; +} + +#if ERTS_POLL_USE_SELECT + +static ERTS_INLINE int +get_timeout_timeval(ErtsPollSet *ps, + SysTimeval *tvp, + ErtsMonotonicTime timeout_time) +{ + ErtsMonotonicTime timeout = get_timeout(ps, + 1000*1000, + timeout_time); + + if (!timeout) { + tvp->tv_sec = 0; + tvp->tv_usec = 0; + + return 0; + } + else if (timeout == -1) { + return -1; + } + else { + ErtsMonotonicTime sec = timeout/(1000*1000); + tvp->tv_sec = sec; + tvp->tv_usec = timeout - sec*(1000*1000); + + ASSERT(tvp->tv_sec >= 0); + ASSERT(tvp->tv_usec >= 0); + ASSERT(tvp->tv_usec < 1000*1000); + + return 1; + } + +} + +#endif + +#if ERTS_POLL_USE_KQUEUE || (ERTS_POLL_USE_POLL && defined(HAVE_PPOLL)) || ERTS_POLL_USE_TIMERFD + static ERTS_INLINE int -check_fd_events(ErtsPollSet *ps, ErtsPollResFd pr[], int do_wait, int max_res) +get_timeout_timespec(ErtsPollSet *ps, + struct timespec *tsp, + ErtsMonotonicTime timeout_time) +{ + ErtsMonotonicTime timeout = get_timeout(ps, + 1000*1000*1000, + timeout_time); + + if (!timeout) { + tsp->tv_sec = 0; + tsp->tv_nsec = 0; + return 0; + } + else if (timeout == -1) { + return -1; + } + else { + ErtsMonotonicTime sec = timeout/(1000*1000*1000); + tsp->tv_sec = sec; + tsp->tv_nsec = timeout - sec*(1000*1000*1000); + + ASSERT(tsp->tv_sec >= 0); + ASSERT(tsp->tv_nsec >= 0); + ASSERT(tsp->tv_nsec < 1000*1000*1000); + + return 1; + } +} + +#endif + +#if ERTS_POLL_USE_TIMERFD + +static ERTS_INLINE int +get_timeout_itimerspec(ErtsPollSet *ps, + struct itimerspec *itsp, + ErtsMonotonicTime timeout_time) +{ + + itsp->it_interval.tv_sec = 0; + itsp->it_interval.tv_nsec = 0; + + return get_timeout_timespec(ps, &itsp->it_value, timeout_time); +} + +#endif + +static ERTS_INLINE int +check_fd_events(ErtsPollSet *ps, ErtsPollResFd pr[], int max_res, ErtsMonotonicTime timeout_time) { int res; - int timeout = do_wait ? -1 : 0; - DEBUG_PRINT_WAIT("Entering check_fd_events(), do_wait=%d", ps, do_wait); + int timeout; + DEBUG_PRINT_WAIT("Entering check_fd_events(), timeout=%d", ps, timeout_time); { #if ERTS_POLL_USE_EPOLL /* --- epoll ------------------------------- */ +#if ERTS_POLL_USE_TIMERFD + struct itimerspec its; + timeout = get_timeout_itimerspec(ps, &its, timeout_time); + if (timeout > 0) { + timerfd_set(ps, &its); + res = epoll_wait(ps->kp_fd, pr, max_res, -1); + res = timerfd_clear(ps, pr, res, max_res); + } else { + res = epoll_wait(ps->kp_fd, pr, max_res, timeout); + } +#else /* !ERTS_POLL_USE_TIMERFD */ + timeout = (int) get_timeout(ps, 1000, timeout_time); res = epoll_wait(ps->kp_fd, pr, max_res, timeout); - +#endif /* !ERTS_POLL_USE_TIMERFD */ #elif ERTS_POLL_USE_KQUEUE /* --- kqueue ------------------------------ */ - struct timespec ts = {0, 0}; - struct timespec *tsp = timeout ? NULL : &ts; + struct timespec ts; + struct timespec *tsp; + timeout = get_timeout_timespec(ps, &ts, timeout_time); + tsp = timeout < 0 ? NULL : &ts; res = kevent(ps->kp_fd, NULL, 0, pr, max_res, tsp); #elif ERTS_POLL_USE_DEVPOLL /* --- devpoll ----------------------------- */ /* @@ -1601,16 +1820,22 @@ check_fd_events(ErtsPollSet *ps, ErtsPollResFd pr[], int do_wait, int max_res) int nfds = (int) erts_atomic_read_nob(&ps->no_of_user_fds) + 1 /* wakeup pipe */; poll_res.dp_nfds = nfds < max_res ? nfds : max_res; poll_res.dp_fds = pr; - poll_res.dp_timeout = timeout; + poll_res.dp_timeout = (int) get_timeout(ps, 1000, timeout_time); res = ioctl(ps->kp_fd, DP_POLL, &poll_res); - +#elif ERTS_POLL_USE_POLL && defined(HAVE_PPOLL) /* --- ppoll ---------------- */ + struct timespec ts; + struct timespec *tsp = &ts; + timeout = get_timeout_timespec(ps, &ts, timeout_time); + if (timeout < 0) tsp = NULL; + res = ppoll(ps->poll_fds, ps->no_poll_fds, tsp, NULL); #elif ERTS_POLL_USE_POLL /* --- poll --------------------------------- */ - + timeout = (int) get_timeout(ps, 1000, timeout_time); res = poll(ps->poll_fds, ps->no_poll_fds, timeout); - #elif ERTS_POLL_USE_SELECT /* --- select ------------------------------ */ - SysTimeval tv = {0, 0}; - SysTimeval *tvp = timeout ? NULL : &tv; + SysTimeval tv; + SysTimeval *tvp; + timeout = get_timeout_timeval(ps, &tv, timeout_time); + tvp = timeout < 0 ? NULL : &tv; ERTS_FD_COPY(&ps->input_fds, &ps->res_input_fds); ERTS_FD_COPY(&ps->output_fds, &ps->res_output_fds); @@ -1630,7 +1855,8 @@ int ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, ErtsPollResFd pr[], int *len, - ErtsThrPrgrData *tpd) + ErtsThrPrgrData *tpd, + ErtsMonotonicTime timeout_time) { int res, no_fds, used_fds = 0; int ebadf = 0; @@ -1655,53 +1881,56 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, } #endif - do_wait = !is_woken(ps) && used_fds == 0; + do_wait = !is_woken(ps) && used_fds == 0 && timeout_time != ERTS_POLL_NO_TIMEOUT; DEBUG_PRINT_WAIT("Entering %s(), do_wait=%d", ps, __FUNCTION__, do_wait); if (do_wait) { + tpd = tpd ? tpd : erts_thr_prgr_data(NULL); erts_thr_progress_prepare_wait(tpd); ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_SLEEP); - } + } else + timeout_time = ERTS_POLL_NO_TIMEOUT; while (1) { - res = check_fd_events(ps, pr + used_fds, do_wait, no_fds - used_fds); + res = check_fd_events(ps, pr + used_fds, no_fds - used_fds, timeout_time); + if (res != 0) + break; + if (timeout_time == ERTS_POLL_NO_TIMEOUT) + break; + if (erts_get_monotonic_time(NULL) >= timeout_time) + break; + } #if !ERTS_POLL_USE_CONCURRENT_UPDATE - if (res < 0 - && errno == EBADF - && ERTS_POLLSET_HAVE_UPDATE_REQUESTS(ps)) { - /* - * This may have happened because another thread deselected - * a fd in our poll set and then closed it, i.e. the driver - * behaved correctly. We wan't to avoid looking for a bad - * fd, that may even not exist anymore. Therefore, handle - * update requests and try again. This behaviour should only - * happen when using SELECT as the polling mechanism. - */ - ERTS_POLLSET_LOCK(ps); - used_fds += handle_update_requests(ps, pr + used_fds, no_fds - used_fds); - if (used_fds == no_fds) { - *len = used_fds; - ERTS_POLLSET_UNLOCK(ps); - return 0; - } - res = check_fd_events(ps, pr + used_fds, 0, no_fds - used_fds); - /* Keep the lock over the non-blocking poll in order to not - get any nasty races happening. */ + if (res < 0 + && errno == EBADF + && ERTS_POLLSET_HAVE_UPDATE_REQUESTS(ps)) { + /* + * This may have happened because another thread deselected + * a fd in our poll set and then closed it, i.e. the driver + * behaved correctly. We wan't to avoid looking for a bad + * fd, that may even not exist anymore. Therefore, handle + * update requests and try again. This behaviour should only + * happen when using SELECT as the polling mechanism. + */ + ERTS_POLLSET_LOCK(ps); + used_fds += handle_update_requests(ps, pr + used_fds, no_fds - used_fds); + if (used_fds == no_fds) { + *len = used_fds; ERTS_POLLSET_UNLOCK(ps); - if (res == 0) { - errno = EAGAIN; - res = -1; - } + return 0; + } + res = check_fd_events(ps, pr + used_fds, no_fds - used_fds, ERTS_POLL_NO_TIMEOUT); + /* Keep the lock over the non-blocking poll in order to not + get any nasty races happening. */ + ERTS_POLLSET_UNLOCK(ps); + if (res == 0) { + errno = EAGAIN; + res = -1; } -#endif - - if (res != 0) - break; - if (!do_wait) - break; } +#endif if (do_wait) { erts_thr_progress_finalize_wait(tpd); @@ -1709,7 +1938,8 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_CHECK_IO); } - woke_up(ps); + if (ERTS_POLL_USE_WAKEUP(ps)) + woke_up(ps); if (res < 0) { #if ERTS_POLL_USE_SELECT @@ -1720,11 +1950,16 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, #endif res = errno; } - else { + else if (res == 0) { + res = used_fds == 0 ? ETIMEDOUT : 0; +#ifdef HARD_DEBUG + check_poll_result(pr, used_fds); +#endif + *len = used_fds; + } else { #if ERTS_POLL_USE_SELECT save_results: #endif - ps_locked = 1; ERTS_POLLSET_LOCK(ps); @@ -1754,12 +1989,13 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, void ERTS_POLL_EXPORT(erts_poll_interrupt)(ErtsPollSet *ps, int set) { -#if !ERTS_POLL_USE_CONCURRENT_UPDATE - if (!set) - reset_wakeup_state(ps); - else - wake_poller(ps, 1); -#endif + DEBUG_PRINT_WAIT("poll_interrupt(%d)", ps, set); + if (ERTS_POLL_USE_WAKEUP(ps)) { + if (!set) + reset_wakeup_state(ps); + else + wake_poller(ps, 1); + } } int @@ -1875,10 +2111,20 @@ ERTS_POLL_EXPORT(erts_poll_create_pollset)(int id) if (ps->internal_fd_limit <= kp_fd) ps->internal_fd_limit = kp_fd + 1; ps->kp_fd = kp_fd; + if (ps->id == -1) + ps->oneshot = 0; + else + ps->oneshot = 1; #endif -#if !ERTS_POLL_USE_CONCURRENT_UPDATE + erts_atomic32_init_nob(&ps->wakeup_state, (erts_aint32_t) 0); create_wakeup_pipe(ps); + +#if ERTS_POLL_USE_TIMERFD + create_timerfd(ps); +#endif + +#if !ERTS_POLL_USE_CONCURRENT_UPDATE handle_update_requests(ps, NULL, 0); cleanup_wakeup_pipe(ps); #endif @@ -1993,9 +2239,7 @@ ERTS_POLL_EXPORT(erts_poll_info)(ErtsPollSet *ps, ErtsPollInfo *pip) pip->memory_size = size; pip->poll_set_size = (int) erts_atomic_read_nob(&ps->no_of_user_fds); -#if !ERTS_POLL_USE_CONCURRENT_UPDATE pip->poll_set_size++; /* Wakeup pipe */ -#endif pip->lazy_updates = #if !ERTS_POLL_USE_CONCURRENT_UPDATE @@ -2178,6 +2422,12 @@ ERTS_POLL_EXPORT(erts_poll_get_selected_events)(ErtsPollSet *ps, ASSERT(0); return; } + if (fd == ps->wake_fds[0] || fd == ps->wake_fds[1]) + continue; +#if ERTS_POLL_USE_TIMERFD + if (fd == ps->timer_fd) + continue; +#endif data &= 0xFFFFFFFF; ASSERT(fd == data); /* Events are the events that are being monitored, which of course include |