diff options
author | Lukas Larsson <[email protected]> | 2017-05-30 16:35:18 +0200 |
---|---|---|
committer | Lukas Larsson <[email protected]> | 2017-10-02 10:35:52 +0200 |
commit | 988f5f5e8061ce2e135a314ca782788eda478a06 (patch) | |
tree | 1609c42abb9ed03a176865ee32ad3de803c392d3 /erts/emulator/sys/common/erl_poll.c | |
parent | 1f9003a3dd9bbf9e3dcd84b8fdecef04a3b5e9c7 (diff) | |
download | otp-988f5f5e8061ce2e135a314ca782788eda478a06.tar.gz otp-988f5f5e8061ce2e135a314ca782788eda478a06.tar.bz2 otp-988f5f5e8061ce2e135a314ca782788eda478a06.zip |
erts: Move all I/O polling to a seperate thread
Diffstat (limited to 'erts/emulator/sys/common/erl_poll.c')
-rw-r--r-- | erts/emulator/sys/common/erl_poll.c | 2640 |
1 files changed, 917 insertions, 1723 deletions
diff --git a/erts/emulator/sys/common/erl_poll.c b/erts/emulator/sys/common/erl_poll.c index 341370ca14..c671ea99f4 100644 --- a/erts/emulator/sys/common/erl_poll.c +++ b/erts/emulator/sys/common/erl_poll.c @@ -18,9 +18,8 @@ * %CopyrightEnd% */ -/* - * Description: Poll interface suitable for ERTS with or without - * SMP support. +/** + * @description Poll interface suitable for ERTS * * The interface is currently implemented using: * - select @@ -29,12 +28,36 @@ * - epoll with poll or select as fallback * - kqueue with poll or select as fallback * - * Some time in the future it will also be - * implemented using Solaris ports. * + * @author Rickard Green + * @author Lukas Larsson + * + * There are two major different implementations off IO polling in this + * file. The concurrent and non-concurrent implementations. + * When available epoll/kqueue are used to implement the concurrent + * versions. poll, select and dev/poll use non-concurrent updates. + * + * Concurrent version: + * In the concurrent version erts_poll_control directly modifies + * the kernel pollset without waking the thread that is waiting + * on events. Also the ErtsPollResFd type is directly mapped to + * the native event type, so no extra copying is needed. Note that + * as no locking at all is done, fds can be triggered that have been + * removed from the pollset. The check_io layer has to deal with this. + * + * Non-concurrent version: + * In the non-concurrent version, the pollset has an internal representation + * of the pollset that is updated by erts_poll_control. When an fd is updated, + * its number is placed in the update request queue and then the waiting thread + * is woken in order to see the change. The internal data in the pollset is + * protected by a mutex that has to be taken by both the modifying and waiting + * thread at different times. * + * The non-concurrent pollset cannot have fd's closed in it while a thread is + * waiting on that fd. In order to fix this, when an ERTS_POLL_OP_DEL command + * is issued, the fd is marked as closing and the waiting thread is woken. The + * fd is then returned in the waiting threads results as ERTS_POLL_EV_NONE. * - * Author: Rickard Green */ #ifdef HAVE_CONFIG_H @@ -62,6 +85,8 @@ # ifdef SYS_SELECT_H # include <sys/select.h> # endif +#elif defined(_DARWIN_UNLIMITED_SELECT) +# undef _DARWIN_UNLIMITED_SELECT #endif #ifdef NO_SYSCONF # if ERTS_POLL_USE_SELECT @@ -83,20 +108,35 @@ #error "Missing implementation of erts_poll()" #endif -#if defined(ERTS_KERNEL_POLL_VERSION) && !ERTS_POLL_USE_KERNEL_POLL -#error "Missing kernel poll implementation of erts_poll()" -#endif +#if 0 +#define ERTS_POLL_DEBUG_PRINT 1 + +#define DEBUG_PRINT(FMT, PS, ...) \ + do { \ + int myerrno = errno; \ + erts_printf("%d: " FMT "\r\n", (PS)->id, ##__VA_ARGS__); \ + errno = myerrno; \ + } while(0) -#if defined(ERTS_NO_KERNEL_POLL_VERSION) && ERTS_POLL_USE_KERNEL_POLL -#error "Kernel poll used when it shouldn't be used" +/* Define to print info about modifications done to each fd */ +#define DEBUG_PRINT_FD(FMT, PS, FD, ...) DEBUG_PRINT("%d: " FMT, PS, FD, ##__VA_ARGS__) +/* Define to print entry and exit from erts_poll_wait (can be very spammy) */ +//#define DEBUG_PRINT_WAIT(FMT, PS, ...) DEBUG_PRINT(FMT, PS, ##__VA_ARGS__) + +#else +#define ERTS_POLL_DEBUG_PRINT 0 +#define DEBUG_PRINT(...) #endif -#if 0 -#define ERTS_POLL_DEBUG_PRINT +#ifndef DEBUG_PRINT_FD +#define DEBUG_PRINT_FD(...) +#endif +#ifndef DEBUG_PRINT_WAIT +#define DEBUG_PRINT_WAIT(...) #endif -#ifdef _DARWIN_UNLIMITED_SELECT +#if defined(_DARWIN_UNLIMITED_SELECT) && ERTS_POLL_USE_SELECT typedef struct { size_t sz; fd_set* ptr; @@ -142,44 +182,41 @@ int ERTS_SELECT(int nfds, ERTS_fd_set *readfds, ERTS_fd_set *writefds, # define ERTS_SELECT select #endif -#define ERTS_POLL_USE_BATCH_UPDATE_POLLSET (ERTS_POLL_USE_DEVPOLL \ - || ERTS_POLL_USE_KQUEUE) +#define ERTS_POLL_IS_FALLBACK (ERTS_POLL_USE_POLL || ERTS_POLL_USE_SELECT) && ERTS_ENABLE_KERNEL_POLL -#define ERTS_POLL_USE_CONCURRENT_UPDATE ERTS_POLL_USE_EPOLL +#define ERTS_POLL_USE_CONCURRENT_UPDATE (ERTS_POLL_USE_EPOLL || ERTS_POLL_USE_KQUEUE) -#define ERTS_POLL_COALESCE_KP_RES (ERTS_POLL_USE_KQUEUE || ERTS_POLL_USE_EPOLL) +#if !ERTS_POLL_USE_CONCURRENT_UPDATE + +#define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS) \ + erts_atomic32_set_nob(&(PS)->have_update_requests, (erts_aint32_t) 1) +#define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS) \ + erts_atomic32_set_nob(&(PS)->have_update_requests, (erts_aint32_t) 0) +#define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) \ + ((int) erts_atomic32_read_nob(&(PS)->have_update_requests)) #define ERTS_POLLSET_LOCK(PS) \ erts_mtx_lock(&(PS)->mtx) #define ERTS_POLLSET_UNLOCK(PS) \ erts_mtx_unlock(&(PS)->mtx) -#define ERTS_POLLSET_SET_POLLED_CHK(PS) \ - ((int) erts_atomic32_xchg_nob(&(PS)->polled, (erts_aint32_t) 1)) -#define ERTS_POLLSET_UNSET_POLLED(PS) \ - erts_atomic32_set_nob(&(PS)->polled, (erts_aint32_t) 0) -#define ERTS_POLLSET_IS_POLLED(PS) \ - ((int) erts_atomic32_read_nob(&(PS)->polled)) +#else +#define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS) +#define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS) +#define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) 0 -#define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS) \ - erts_atomic32_set_nob(&(PS)->have_update_requests, (erts_aint32_t) 1) -#define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS) \ - erts_atomic32_set_nob(&(PS)->have_update_requests, (erts_aint32_t) 0) -#define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) \ - ((int) erts_atomic32_read_nob(&(PS)->have_update_requests)) +#define ERTS_POLLSET_LOCK(PS) +#define ERTS_POLLSET_UNLOCK(PS) -#if ERTS_POLL_USE_FALLBACK -# if ERTS_POLL_USE_POLL -# define ERTS_POLL_NEED_FALLBACK(PS) ((PS)->no_poll_fds > 1) -# elif ERTS_POLL_USE_SELECT -# define ERTS_POLL_NEED_FALLBACK(PS) ((PS)->no_select_fds > 1) -# endif #endif + /* * --- Data types ------------------------------------------------------------ */ +#if !ERTS_POLL_USE_CONCURRENT_UPDATE + #define ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE 128 typedef struct ErtsPollSetUpdateRequestsBlock_ ErtsPollSetUpdateRequestsBlock; @@ -189,43 +226,20 @@ struct ErtsPollSetUpdateRequestsBlock_ { int fds[ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE]; }; - - # define ERTS_POLL_FD_FLG_INURQ (((unsigned short) 1) << 0) -#if ERTS_POLL_USE_FALLBACK -# define ERTS_POLL_FD_FLG_INFLBCK (((unsigned short) 1) << 1) -# define ERTS_POLL_FD_FLG_USEFLBCK (((unsigned short) 1) << 2) -#endif -# define ERTS_POLL_FD_FLG_RST (((unsigned short) 1) << 3) +# define ERTS_POLL_FD_FLG_RST (((unsigned short) 1) << 1) + typedef struct { #if ERTS_POLL_USE_POLL int pix; #endif + ErtsPollEvents used_events; ErtsPollEvents events; -#if ERTS_POLL_COALESCE_KP_RES - unsigned short res_ev_ix; -#endif unsigned short flags; } ErtsFdStatus; - -#if ERTS_POLL_COALESCE_KP_RES -/* res_ev_ix max value */ -#define ERTS_POLL_MAX_RES ((1 << sizeof(unsigned short)*8) - 1) -#endif - -#if ERTS_POLL_USE_KQUEUE - -#define ERTS_POLL_KQ_OP_HANDLED 1 -#define ERTS_POLL_KQ_OP_DEL_R 2 -#define ERTS_POLL_KQ_OP_DEL_W 3 -#define ERTS_POLL_KQ_OP_ADD_R 4 -#define ERTS_POLL_KQ_OP_ADD_W 5 -#define ERTS_POLL_KQ_OP_ADD2_R 6 -#define ERTS_POLL_KQ_OP_ADD2_W 7 - #endif /* @@ -233,196 +247,172 @@ typedef struct { * get unique names in debugger for kp/nkp */ struct ERTS_POLL_EXPORT(erts_pollset) { - ErtsPollSet next; + int id; int internal_fd_limit; - ErtsFdStatus *fds_status; erts_atomic_t no_of_user_fds; - int fds_status_len; + #if ERTS_POLL_USE_KERNEL_POLL int kp_fd; - int res_events_len; -#if ERTS_POLL_USE_EPOLL - struct epoll_event *res_events; -#elif ERTS_POLL_USE_KQUEUE - struct kevent *res_events; -#elif ERTS_POLL_USE_DEVPOLL - struct pollfd *res_events; -#endif #endif /* ERTS_POLL_USE_KERNEL_POLL */ + #if ERTS_POLL_USE_POLL int next_poll_fds_ix; int no_poll_fds; int poll_fds_len; - struct pollfd*poll_fds; + struct pollfd *poll_fds; #elif ERTS_POLL_USE_SELECT int next_sel_fd; int max_fd; -#if ERTS_POLL_USE_FALLBACK - int no_select_fds; -#endif ERTS_fd_set input_fds; ERTS_fd_set res_input_fds; ERTS_fd_set output_fds; ERTS_fd_set res_output_fds; +#elif ERTS_POLL_USE_DEVPOLL + struct pollfd *poll_fds; + int poll_fds_ix; #endif + +#if !ERTS_POLL_USE_CONCURRENT_UPDATE + ErtsFdStatus *fds_status; + int fds_status_len; ErtsPollSetUpdateRequestsBlock update_requests; ErtsPollSetUpdateRequestsBlock *curr_upd_req_block; erts_atomic32_t have_update_requests; - erts_atomic32_t polled; erts_mtx_t mtx; int wake_fds[2]; -#if ERTS_POLL_USE_TIMERFD - int timer_fd; -#endif -#if ERTS_POLL_USE_FALLBACK - int fallback_used; -#endif erts_atomic32_t wakeup_state; - erts_atomic64_t timeout_time; -#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS - erts_atomic_t no_avoided_wakeups; - erts_atomic_t no_avoided_interrupts; - erts_atomic_t no_interrupt_timed; #endif }; void erts_silence_warn_unused_result(long unused); static void fatal_error(char *format, ...); -static void fatal_error_async_signal_safe(char *error_str); static int max_fds = -1; -static ErtsPollSet pollsets; -static erts_mtx_t pollsets_lock; #if ERTS_POLL_USE_POLL +#if !ERTS_POLL_IS_FALLBACK +static ERTS_INLINE short ev2pollev(ErtsPollEvents ev) +{ + return ERTS_POLL_EV_E2N(ev); +} + +static ERTS_INLINE ErtsPollEvents pollev2ev(short ev) +{ + return ERTS_POLL_EV_N2E(ev); +} + +#else /* ERTS_POLL_IS_FALLBACK */ + static ERTS_INLINE short ev2pollev(ErtsPollEvents ev) { -#if !ERTS_POLL_USE_FALLBACK || ERTS_POLL_USE_KQUEUE - return ERTS_POLL_EV_E2N(ev); -#else /* Note, we only map events we are interested in */ short res_ev = (short) 0; if (ev & ERTS_POLL_EV_IN) - res_ev |= ERTS_POLL_EV_NKP_IN; + res_ev |= ERTS_POLL_EV_NKP_IN; if (ev & ERTS_POLL_EV_OUT) - res_ev |= ERTS_POLL_EV_NKP_OUT; + res_ev |= ERTS_POLL_EV_NKP_OUT; return res_ev; -#endif } static ERTS_INLINE ErtsPollEvents pollev2ev(short ev) { -#if !ERTS_POLL_USE_FALLBACK || ERTS_POLL_USE_KQUEUE - return ERTS_POLL_EV_N2E(ev); -#else /* Note, we only map events we are interested in */ ErtsPollEvents res_ev = (ErtsPollEvents) 0; if (ev & ERTS_POLL_EV_NKP_IN) - res_ev |= ERTS_POLL_EV_IN; + res_ev |= ERTS_POLL_EV_IN; if (ev & ERTS_POLL_EV_NKP_OUT) - res_ev |= ERTS_POLL_EV_OUT; + res_ev |= ERTS_POLL_EV_OUT; if (ev & ERTS_POLL_EV_NKP_ERR) - res_ev |= ERTS_POLL_EV_ERR; + res_ev |= ERTS_POLL_EV_ERR; if (ev & ERTS_POLL_EV_NKP_NVAL) - res_ev |= ERTS_POLL_EV_NVAL; - return res_ev; -#endif + res_ev |= ERTS_POLL_EV_NVAL; + return res_ev; } -#endif +#endif /* !ERTS_POLL_IS_FALLBACK */ + +#endif /* ERTS_POLL_USE_POLL */ + #ifdef HARD_DEBUG static void check_poll_result(ErtsPollResFd pr[], int len); -#if ERTS_POLL_USE_DEVPOLL -static void check_poll_status(ErtsPollSet ps); -#endif /* ERTS_POLL_USE_DEVPOLL */ #endif /* HARD_DEBUG */ -#ifdef ERTS_POLL_DEBUG_PRINT +#if ERTS_POLL_USE_DEVPOLL && defined(DEBUG) +static void check_poll_status(ErtsPollSet *ps); +#endif /* ERTS_POLL_USE_DEVPOLL && DEBUG */ static void print_misc_debug_info(void); +#if ERTS_POLL_USE_EPOLL +uint32_t epoll_events(int kp_fd, int fd); #endif -static ERTS_INLINE void -init_timeout_time(ErtsPollSet ps) -{ - erts_atomic64_init_nob(&ps->timeout_time, - (erts_aint64_t) ERTS_MONOTONIC_TIME_MAX); -} - -static ERTS_INLINE void -set_timeout_time(ErtsPollSet ps, ErtsMonotonicTime time) -{ - erts_atomic64_set_relb(&ps->timeout_time, - (erts_aint64_t) time); -} - -static ERTS_INLINE ErtsMonotonicTime -get_timeout_time(ErtsPollSet ps) -{ - return (ErtsMonotonicTime) erts_atomic64_read_acqb(&ps->timeout_time); -} #define ERTS_POLL_NOT_WOKEN 0 #define ERTS_POLL_WOKEN -1 #define ERTS_POLL_WOKEN_INTR 1 +#if !ERTS_POLL_USE_CONCURRENT_UPDATE static ERTS_INLINE void -reset_wakeup_state(ErtsPollSet ps) +reset_wakeup_state(ErtsPollSet *ps) { erts_atomic32_set_mb(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN); } +#endif static ERTS_INLINE int -is_woken(ErtsPollSet ps) +is_woken(ErtsPollSet *ps) { +#if !ERTS_POLL_USE_CONCURRENT_UPDATE return erts_atomic32_read_acqb(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN; +#else + return 0; +#endif } static ERTS_INLINE int -is_interrupted_reset(ErtsPollSet ps) +is_interrupted_reset(ErtsPollSet *ps) { +#if !ERTS_POLL_USE_CONCURRENT_UPDATE return (erts_atomic32_xchg_acqb(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN) == ERTS_POLL_WOKEN_INTR); +#else + return 0; +#endif } static ERTS_INLINE void -woke_up(ErtsPollSet ps) +woke_up(ErtsPollSet *ps) { +#if !ERTS_POLL_USE_CONCURRENT_UPDATE erts_aint32_t wakeup_state = erts_atomic32_read_acqb(&ps->wakeup_state); if (wakeup_state == ERTS_POLL_NOT_WOKEN) (void) erts_atomic32_cmpxchg_nob(&ps->wakeup_state, ERTS_POLL_WOKEN, ERTS_POLL_NOT_WOKEN); ASSERT(erts_atomic32_read_nob(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN); +#endif } /* * --- Wakeup pipe ----------------------------------------------------------- */ +#if !ERTS_POLL_USE_CONCURRENT_UPDATE static ERTS_INLINE void -wake_poller(ErtsPollSet ps, int interrupted, int async_signal_safe) +wake_poller(ErtsPollSet *ps, int interrupted) { int wake; - if (async_signal_safe) - wake = 1; - else { - erts_aint32_t wakeup_state; - if (!interrupted) - wakeup_state = erts_atomic32_cmpxchg_relb(&ps->wakeup_state, - ERTS_POLL_WOKEN, - ERTS_POLL_NOT_WOKEN); - else - wakeup_state = erts_atomic32_xchg_relb(&ps->wakeup_state, - ERTS_POLL_WOKEN_INTR); - wake = wakeup_state == ERTS_POLL_NOT_WOKEN; - } - /* - * NOTE: This function might be called from signal handlers in the - * non-smp case; therefore, it has to be async-signal safe in - * the non-smp case. - */ + erts_aint32_t wakeup_state; + if (!interrupted) + wakeup_state = erts_atomic32_cmpxchg_relb(&ps->wakeup_state, + ERTS_POLL_WOKEN, + ERTS_POLL_NOT_WOKEN); + else + wakeup_state = erts_atomic32_xchg_relb(&ps->wakeup_state, + ERTS_POLL_WOKEN_INTR); + wake = wakeup_state == ERTS_POLL_NOT_WOKEN; + if (wake) { ssize_t res; if (ps->wake_fds[1] < 0) @@ -432,29 +422,27 @@ wake_poller(ErtsPollSet ps, int interrupted, int async_signal_safe) res = write(ps->wake_fds[1], "!", 1); } while (res < 0 && errno == EINTR); if (res <= 0 && errno != ERRNO_BLOCK) { - if (async_signal_safe) - fatal_error_async_signal_safe(__FILE__ - ":XXX:wake_poller(): " - "Failed to write on wakeup pipe\n"); - else - fatal_error("%s:%d:wake_poller(): " - "Failed to write to wakeup pipe fd=%d: " - "%s (%d)\n", - __FILE__, __LINE__, - ps->wake_fds[1], - erl_errno_id(errno), errno); + fatal_error("%s:%d:wake_poller(): " + "Failed to write to wakeup pipe fd=%d: " + "%s (%d)\n", + __FILE__, __LINE__, + ps->wake_fds[1], + erl_errno_id(errno), errno); } } } static ERTS_INLINE void -cleanup_wakeup_pipe(ErtsPollSet ps) +cleanup_wakeup_pipe(ErtsPollSet *ps) { + int intr = 0; int fd = ps->wake_fds[0]; int res; do { char buf[32]; res = read(fd, buf, sizeof(buf)); + if (res > 0) + intr = 1; } while (res > 0 || (res < 0 && errno == EINTR)); if (res < 0 && errno != ERRNO_BLOCK) { fatal_error("%s:%d:cleanup_wakeup_pipe(): " @@ -464,10 +452,12 @@ cleanup_wakeup_pipe(ErtsPollSet ps) fd, erl_errno_id(errno), errno); } + if (intr) + erts_atomic32_set_nob(&ps->wakeup_state, ERTS_POLL_WOKEN_INTR); } static void -create_wakeup_pipe(ErtsPollSet ps) +create_wakeup_pipe(ErtsPollSet *ps) { int do_wake = 0; int wake_fds[2]; @@ -484,20 +474,13 @@ create_wakeup_pipe(ErtsPollSet ps) SET_NONBLOCKING(wake_fds[0]); SET_NONBLOCKING(wake_fds[1]); -#ifdef ERTS_POLL_DEBUG_PRINT - erts_printf("wakeup fds = {%d, %d}\n", wake_fds[0], wake_fds[1]); -#endif + DEBUG_PRINT("wakeup fds = {%d, %d}", ps, wake_fds[0], wake_fds[1]); ERTS_POLL_EXPORT(erts_poll_control)(ps, wake_fds[0], + ERTS_POLL_OP_ADD, ERTS_POLL_EV_IN, - 1, &do_wake); -#if ERTS_POLL_USE_FALLBACK - /* We depend on the wakeup pipe being handled by kernel poll */ - if (ps->fds_status[wake_fds[0]].flags & ERTS_POLL_FD_FLG_INFLBCK) - fatal_error("%s:%d:create_wakeup_pipe(): Internal error\n", - __FILE__, __LINE__); -#endif + &do_wake); if (ps->internal_fd_limit <= wake_fds[1]) ps->internal_fd_limit = wake_fds[1] + 1; if (ps->internal_fd_limit <= wake_fds[0]) @@ -508,80 +491,11 @@ create_wakeup_pipe(ErtsPollSet ps) /* - * --- timer fd ----------------------------------------------------------- - */ - -#if ERTS_POLL_USE_TIMERFD - -/* We use the timerfd when using epoll_wait to get high accuracy - timeouts, i.e. we want to sleep with < ms accuracy. */ - -static void -create_timerfd(ErtsPollSet ps) -{ - int do_wake = 0; - int timer_fd; - timer_fd = timerfd_create(CLOCK_MONOTONIC,0); - ERTS_POLL_EXPORT(erts_poll_control)(ps, - timer_fd, - ERTS_POLL_EV_IN, - 1, &do_wake); -#if ERTS_POLL_USE_FALLBACK - /* We depend on the wakeup pipe being handled by kernel poll */ - if (ps->fds_status[timer_fd].flags & ERTS_POLL_FD_FLG_INFLBCK) - fatal_error("%s:%d:create_wakeup_pipe(): Internal error\n", - __FILE__, __LINE__); -#endif - if (ps->internal_fd_limit <= timer_fd) - ps->internal_fd_limit = timer_fd + 1; - ps->timer_fd = timer_fd; -} - -static ERTS_INLINE void -timerfd_set(ErtsPollSet ps, struct itimerspec *its) -{ -#ifdef DEBUG - struct itimerspec old_its; - int res; - res = timerfd_settime(ps->timer_fd, 0, its, &old_its); - ASSERT(res == 0); - ASSERT(old_its.it_interval.tv_sec == 0 && - old_its.it_interval.tv_nsec == 0 && - old_its.it_value.tv_sec == 0 && - old_its.it_value.tv_nsec == 0); - -#else - timerfd_settime(ps->timer_fd, 0, its, NULL); -#endif -} - -static ERTS_INLINE int -timerfd_clear(ErtsPollSet ps, int res, int max_res) { - - struct itimerspec its; - /* we always have to clear the timer */ - its.it_interval.tv_sec = 0; - its.it_interval.tv_nsec = 0; - its.it_value.tv_sec = 0; - its.it_value.tv_nsec = 0; - timerfd_settime(ps->timer_fd, 0, &its, NULL); - - /* only timeout fd triggered */ - if (res == 1 && ps->res_events[0].data.fd == ps->timer_fd) - return 0; - - return res; -} - -#endif /* ERTS_POLL_USE_TIMERFD */ - - -/* * --- Poll set update requests ---------------------------------------------- */ static ERTS_INLINE void -enqueue_update_request(ErtsPollSet ps, int fd) +enqueue_update_request(ErtsPollSet *ps, int fd) { ErtsPollSetUpdateRequestsBlock *urqbp; @@ -596,13 +510,11 @@ enqueue_update_request(ErtsPollSet ps, int fd) urqbp = ps->curr_upd_req_block; if (urqbp->len == ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE) { - ASSERT(!urqbp->next); urqbp = erts_alloc(ERTS_ALC_T_POLLSET_UPDREQ, sizeof(ErtsPollSetUpdateRequestsBlock)); - ps->curr_upd_req_block->next = urqbp; - ps->curr_upd_req_block = urqbp; - urqbp->next = NULL; + urqbp->next = ps->curr_upd_req_block; urqbp->len = 0; + ps->curr_upd_req_block = urqbp; } ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_INURQ; @@ -610,28 +522,29 @@ enqueue_update_request(ErtsPollSet ps, int fd) } static ERTS_INLINE void -free_update_requests_block(ErtsPollSet ps, +free_update_requests_block(ErtsPollSet *ps, ErtsPollSetUpdateRequestsBlock *urqbp) { if (urqbp != &ps->update_requests) erts_free(ERTS_ALC_T_POLLSET_UPDREQ, (void *) urqbp); else { - urqbp->next = NULL; urqbp->len = 0; } } +#endif /* !ERTS_POLL_USE_CONCURRENT_UPDATE */ /* * --- Growing poll set structures ------------------------------------------- */ -#ifndef ERTS_KERNEL_POLL_VERSION /* only one shared implementation */ +#if !ERTS_NO_KERNEL_POLL_VERSION || !ERTS_ENABLE_KERNEL_POLL +/* only one shared implementation */ #define ERTS_FD_TABLE_MIN_LENGTH 1024 #define ERTS_FD_TABLE_EXP_THRESHOLD (2048*1024) -int erts_poll_new_table_len (int old_len, int need_len) +int erts_poll_new_table_len(int old_len, int need_len) { int new_len; @@ -641,7 +554,7 @@ int erts_poll_new_table_len (int old_len, int need_len) } else { new_len = old_len; - do { + do { if (new_len < ERTS_FD_TABLE_EXP_THRESHOLD) new_len *= 2; else @@ -654,30 +567,9 @@ int erts_poll_new_table_len (int old_len, int need_len) } #endif -#if ERTS_POLL_USE_KERNEL_POLL -static void -grow_res_events(ErtsPollSet ps, int new_len) -{ - size_t new_size = sizeof( -#if ERTS_POLL_USE_EPOLL - struct epoll_event -#elif ERTS_POLL_USE_DEVPOLL - struct pollfd -#elif ERTS_POLL_USE_KQUEUE - struct kevent -#endif - ) * erts_poll_new_table_len(ps->res_events_len, new_len); - /* We do not need to save previously stored data */ - if (ps->res_events) - erts_free(ERTS_ALC_T_POLL_RES_EVS, ps->res_events); - ps->res_events = erts_alloc(ERTS_ALC_T_POLL_RES_EVS, new_size); - ps->res_events_len = new_len; -} -#endif /* ERTS_POLL_USE_KERNEL_POLL */ - #if ERTS_POLL_USE_POLL static void -grow_poll_fds(ErtsPollSet ps, int min_ix) +grow_poll_fds(ErtsPollSet *ps, int min_ix) { int i; int new_len = erts_poll_new_table_len(ps->poll_fds_len, min_ix + 1); @@ -725,8 +617,9 @@ ensure_select_fds(int fd, ERTS_fd_set* in, ERTS_fd_set* out) # define ensure_select_fds(fd, in, out) do {} while(0) #endif /* _DARWIN_UNLIMITED_SELECT */ +#if !ERTS_POLL_USE_CONCURRENT_UPDATE static void -grow_fds_status(ErtsPollSet ps, int min_fd) +grow_fds_status(ErtsPollSet *ps, int min_fd) { int i; int new_len = erts_poll_new_table_len(ps->fds_status_len, min_fd + 1); @@ -745,428 +638,26 @@ grow_fds_status(ErtsPollSet ps, int min_fd) #endif ps->fds_status[i].used_events = (ErtsPollEvents) 0; ps->fds_status[i].events = (ErtsPollEvents) 0; -#if ERTS_POLL_COALESCE_KP_RES - ps->fds_status[i].res_ev_ix = (unsigned short) ERTS_POLL_MAX_RES; -#endif ps->fds_status[i].flags = (unsigned short) 0; } ps->fds_status_len = new_len; } +#endif /* * --- Selecting fd to poll on ----------------------------------------------- */ -#if ERTS_POLL_USE_FALLBACK -static int update_fallback_pollset(ErtsPollSet ps, int fd); -#endif - -static ERTS_INLINE int -need_update(ErtsPollSet ps, int fd) -{ -#if ERTS_POLL_USE_KERNEL_POLL - int reset; -#endif - - ASSERT(fd < ps->fds_status_len); - -#if ERTS_POLL_USE_KERNEL_POLL - reset = (int) (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST); - if (reset && !ps->fds_status[fd].used_events) { - ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_RST; - reset = 0; - } -#else - ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_RST; -#endif - - if (ps->fds_status[fd].used_events != ps->fds_status[fd].events) - return 1; - -#if ERTS_POLL_USE_KERNEL_POLL - return reset; -#else - return 0; -#endif -} - -#if ERTS_POLL_USE_BATCH_UPDATE_POLLSET - -#if ERTS_POLL_USE_KQUEUE -#define ERTS_POLL_MIN_BATCH_BUF_SIZE 128 -#else -#define ERTS_POLL_MIN_BATCH_BUF_SIZE 64 -#endif - -typedef struct { - int len; - int size; -#if ERTS_POLL_USE_DEVPOLL - struct pollfd *buf; -#elif ERTS_POLL_USE_KQUEUE - struct kevent *buf; - struct kevent *ebuf; -#endif -} ErtsPollBatchBuf; - - -static ERTS_INLINE void -setup_batch_buf(ErtsPollSet ps, ErtsPollBatchBuf *bbp) -{ - bbp->len = 0; -#if ERTS_POLL_USE_DEVPOLL - bbp->size = ps->res_events_len; - bbp->buf = ps->res_events; -#elif ERTS_POLL_USE_KQUEUE - bbp->size = ps->res_events_len/2; - bbp->buf = ps->res_events; - bbp->ebuf = bbp->buf + bbp->size; -#endif -} - - -#if ERTS_POLL_USE_DEVPOLL - -static void -write_batch_buf(ErtsPollSet ps, ErtsPollBatchBuf *bbp) -{ - ssize_t wres; - char *buf = (char *) bbp->buf; - size_t buf_size = sizeof(struct pollfd)*bbp->len; - - while (1) { - wres = write(ps->kp_fd, (void *) buf, buf_size); - if (wres < 0) { - if (errno == EINTR) - continue; - fatal_error("%s:%d:write_batch_buf(): " - "Failed to write to /dev/poll: " - "%s (%d)\n", - __FILE__, __LINE__, - erl_errno_id(errno), errno); - } - buf_size -= wres; - if (buf_size <= 0) - break; - buf += wres; - } - - if (buf_size < 0) { - fatal_error("%s:%d:write_devpoll_buf(): Internal error\n", - __FILE__, __LINE__); - } - bbp->len = 0; -} - -#elif ERTS_POLL_USE_KQUEUE - -static void -write_batch_buf(ErtsPollSet ps, ErtsPollBatchBuf *bbp) -{ - int res; - int len = bbp->len; - struct kevent *buf = bbp->buf; - struct timespec ts = {0, 0}; - - do { - res = kevent(ps->kp_fd, buf, len, NULL, 0, &ts); - } while (res < 0 && errno == EINTR); - if (res < 0) { - int i; - struct kevent *ebuf = bbp->ebuf; - do { - res = kevent(ps->kp_fd, buf, len, ebuf, len, &ts); - } while (res < 0 && errno == EINTR); - if (res < 0) { - fatal_error("%s:%d: kevent() failed: %s (%d)\n", - __FILE__, __LINE__, erl_errno_id(errno), errno); - } - for (i = 0; i < res; i++) { - if (ebuf[i].flags & EV_ERROR) { - short filter; - int fd = (int) ebuf[i].ident; - - switch ((int) (long) ebuf[i].udata) { - - /* - * Since we use a lazy update approach EV_DELETE will - * frequently fail. This since kqueue automatically - * removes a file descriptor that is closed from the - * poll set. - */ - case ERTS_POLL_KQ_OP_DEL_R: - case ERTS_POLL_KQ_OP_DEL_W: - case ERTS_POLL_KQ_OP_HANDLED: - break; - - /* - * According to the kqueue man page EVFILT_READ support - * does not imply EVFILT_WRITE support; therefore, - * if an EV_ADD fail, we may have to remove other - * events on this fd in the kqueue pollset before - * adding fd to the fallback pollset. - */ - case ERTS_POLL_KQ_OP_ADD_W: - if (ps->fds_status[fd].used_events & ERTS_POLL_EV_IN) { - filter = EVFILT_READ; - goto rm_add_fb; - } - goto add_fb; - case ERTS_POLL_KQ_OP_ADD_R: - if (ps->fds_status[fd].used_events & ERTS_POLL_EV_OUT) { - filter = EVFILT_WRITE; - goto rm_add_fb; - } - goto add_fb; - case ERTS_POLL_KQ_OP_ADD2_W: - case ERTS_POLL_KQ_OP_ADD2_R: { - int j; - for (j = i+1; j < res; j++) { - if (fd == (int) ebuf[j].ident) { - ebuf[j].udata = (void *) ERTS_POLL_KQ_OP_HANDLED; - if (!(ebuf[j].flags & EV_ERROR)) { - switch ((int) (long) ebuf[j].udata) { - case ERTS_POLL_KQ_OP_ADD2_W: - filter = EVFILT_WRITE; - goto rm_add_fb; - case ERTS_POLL_KQ_OP_ADD2_R: - filter = EVFILT_READ; - goto rm_add_fb; - default: - fatal_error("%s:%d:write_batch_buf(): " - "Internal error", - __FILE__, __LINE__); - break; - } - } - goto add_fb; - } - } - /* The other add succeded... */ - filter = ((((int) (long) ebuf[i].udata) - == ERTS_POLL_KQ_OP_ADD2_W) - ? EVFILT_READ - : EVFILT_WRITE); - rm_add_fb: - { - struct kevent kev; - struct timespec ts = {0, 0}; - EV_SET(&kev, fd, filter, EV_DELETE, 0, 0, 0); - (void) kevent(ps->kp_fd, &kev, 1, NULL, 0, &ts); - } - - add_fb: - ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_USEFLBCK; - ASSERT(ps->fds_status[fd].used_events); - ps->fds_status[fd].used_events = 0; - erts_atomic_dec_nob(&ps->no_of_user_fds); - update_fallback_pollset(ps, fd); - ASSERT(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK); - break; - } - default: - fatal_error("%s:%d:write_batch_buf(): Internal error", - __FILE__, __LINE__); - break; - } - } - } - } - bbp->len = 0; -} - -#endif /* ERTS_POLL_USE_KQUEUE */ - -static ERTS_INLINE void -batch_update_pollset(ErtsPollSet ps, int fd, ErtsPollBatchBuf *bbp) -{ - int buf_len; -#if ERTS_POLL_USE_DEVPOLL - short events; - struct pollfd *buf; -#elif ERTS_POLL_USE_KQUEUE - struct kevent *buf; -#endif - -#ifdef ERTS_POLL_DEBUG_PRINT - erts_printf("Doing lazy update on fd=%d\n", fd); -#endif - - if (!need_update(ps, fd)) - return; - - /* Make sure we have room for at least maximum no of entries - per fd */ - if (bbp->size - bbp->len < 2) - write_batch_buf(ps, bbp); - - buf_len = bbp->len; - buf = bbp->buf; - - ASSERT(fd < ps->fds_status_len); - -#if ERTS_POLL_USE_DEVPOLL - events = ERTS_POLL_EV_E2N(ps->fds_status[fd].events); - if (!events) { - buf[buf_len].events = POLLREMOVE; - erts_atomic_dec_nob(&ps->no_of_user_fds); - } - else if (!ps->fds_status[fd].used_events) { - buf[buf_len].events = events; - erts_atomic_inc_nob(&ps->no_of_user_fds); - } - else { - if ((ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST) - || (ps->fds_status[fd].used_events & ~events)) { - /* Reset or removed events... */ - buf[buf_len].fd = fd; - buf[buf_len].events = POLLREMOVE; - buf[buf_len++].revents = 0; - } - buf[buf_len].events = events; - } - buf[buf_len].fd = fd; - buf[buf_len++].revents = 0; - -#elif ERTS_POLL_USE_KQUEUE - - if (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK) { - if (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_USEFLBCK) - update_fallback_pollset(ps, fd); - else { /* Remove from fallback and try kqueue */ - ErtsPollEvents events = ps->fds_status[fd].events; - ps->fds_status[fd].events = (ErtsPollEvents) 0; - update_fallback_pollset(ps, fd); - ASSERT(!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK)); - if (events) { - ps->fds_status[fd].events = events; - goto try_kqueue; - } - } - } - else { - ErtsPollEvents events, used_events; - int mod_w, mod_r; - try_kqueue: - events = ERTS_POLL_EV_E2N(ps->fds_status[fd].events); - used_events = ERTS_POLL_EV_E2N(ps->fds_status[fd].used_events); - if (!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST)) { - if (!used_events && - (events & ERTS_POLL_EV_IN) && (events & ERTS_POLL_EV_OUT)) - goto do_add_rw; - mod_r = ((events & ERTS_POLL_EV_IN) - != (used_events & ERTS_POLL_EV_IN)); - mod_w = ((events & ERTS_POLL_EV_OUT) - != (used_events & ERTS_POLL_EV_OUT)); - goto do_mod; - } - else { /* Reset */ - if ((events & ERTS_POLL_EV_IN) && (events & ERTS_POLL_EV_OUT)) { - do_add_rw: - EV_SET(&buf[buf_len], fd, EVFILT_READ, EV_ADD, - 0, 0, (void *) ERTS_POLL_KQ_OP_ADD2_R); - buf_len++; - EV_SET(&buf[buf_len], fd, EVFILT_WRITE, EV_ADD, - 0, 0, (void *) ERTS_POLL_KQ_OP_ADD2_W); - buf_len++; - - } - else { - mod_r = 1; - mod_w = 1; - do_mod: - if (mod_r) { - if (events & ERTS_POLL_EV_IN) { - EV_SET(&buf[buf_len], fd, EVFILT_READ, EV_ADD, - 0, 0, (void *) ERTS_POLL_KQ_OP_ADD_R); - buf_len++; - } - else if (used_events & ERTS_POLL_EV_IN) { - EV_SET(&buf[buf_len], fd, EVFILT_READ, EV_DELETE, - 0, 0, (void *) ERTS_POLL_KQ_OP_DEL_R); - buf_len++; - } - } - if (mod_w) { - if (events & ERTS_POLL_EV_OUT) { - EV_SET(&buf[buf_len], fd, EVFILT_WRITE, EV_ADD, - 0, 0, (void *) ERTS_POLL_KQ_OP_ADD_W); - buf_len++; - } - else if (used_events & ERTS_POLL_EV_OUT) { - EV_SET(&buf[buf_len], fd, EVFILT_WRITE, EV_DELETE, - 0, 0, (void *) ERTS_POLL_KQ_OP_DEL_W); - buf_len++; - } - } - } - } - if (used_events) { - if (!events) { - erts_atomic_dec_nob(&ps->no_of_user_fds); - } - } - else { - if (events) - erts_atomic_inc_nob(&ps->no_of_user_fds); - } - ASSERT((events & ~(ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) == 0); - ASSERT((used_events & ~(ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) == 0); - } - -#endif - - ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_RST; - ps->fds_status[fd].used_events = ps->fds_status[fd].events; - - bbp->len = buf_len; -} - -#else /* !ERTS_POLL_USE_BATCH_UPDATE_POLLSET */ - #if ERTS_POLL_USE_EPOLL static int -#if ERTS_POLL_USE_CONCURRENT_UPDATE -conc_update_pollset(ErtsPollSet ps, int fd, int *update_fallback) -#else -update_pollset(ErtsPollSet ps, int fd) -#endif +update_pollset(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events) { int res; - int op; + int epoll_op = EPOLL_CTL_MOD; struct epoll_event epe_templ; struct epoll_event epe; - ASSERT(fd < ps->fds_status_len); - - if (!need_update(ps, fd)) - return 0; - -#ifdef ERTS_POLL_DEBUG_PRINT - erts_printf("Doing update on fd=%d\n", fd); -#endif - if (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK) { -#if ERTS_POLL_USE_CONCURRENT_UPDATE - if (!*update_fallback) { - *update_fallback = 1; - return 0; - } -#endif - if (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_USEFLBCK) { - return update_fallback_pollset(ps, fd); - } - else { /* Remove from fallback and try epoll */ - ErtsPollEvents events = ps->fds_status[fd].events; - ps->fds_status[fd].events = (ErtsPollEvents) 0; - res = update_fallback_pollset(ps, fd); - ASSERT(!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK)); - if (!events) - return res; - ps->fds_status[fd].events = events; - } - } - - epe_templ.events = ERTS_POLL_EV_E2N(ps->fds_status[fd].events); + epe_templ.events = ERTS_POLL_EV_E2N(events) | EPOLLONESHOT; epe_templ.data.fd = fd; #ifdef VALGRIND @@ -1174,30 +665,24 @@ update_pollset(ErtsPollSet ps, int fd) memset((void *) &epe.data, 0, sizeof(epoll_data_t)); #endif - if (epe_templ.events && ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST) { - do { - /* We init 'epe' every time since epoll_ctl() may modify it - (not declared const and not documented as const). */ - epe.events = epe_templ.events; - epe.data.fd = epe_templ.data.fd; - res = epoll_ctl(ps->kp_fd, EPOLL_CTL_DEL, fd, &epe); - } while (res != 0 && errno == EINTR); - erts_atomic_dec_nob(&ps->no_of_user_fds); - ps->fds_status[fd].used_events = 0; - } - - if (!epe_templ.events) { + switch (op) { + case ERTS_POLL_OP_DEL: /* A note on EPOLL_CTL_DEL: linux kernel versions before 2.6.9 need a non-NULL event pointer even though it is ignored... */ - op = EPOLL_CTL_DEL; + epoll_op = EPOLL_CTL_DEL; + epe_templ.events = 0; erts_atomic_dec_nob(&ps->no_of_user_fds); - } - else if (!ps->fds_status[fd].used_events) { - op = EPOLL_CTL_ADD; + break; + case ERTS_POLL_OP_ADD: + epoll_op = EPOLL_CTL_ADD; erts_atomic_inc_nob(&ps->no_of_user_fds); - } - else { - op = EPOLL_CTL_MOD; + break; + case ERTS_POLL_OP_MOD: + epoll_op = EPOLL_CTL_MOD; + break; + default: + ASSERT(0); + break; } do { @@ -1205,33 +690,32 @@ update_pollset(ErtsPollSet ps, int fd) (not declared const and not documented as const). */ epe.events = epe_templ.events; epe.data.fd = epe_templ.data.fd; - res = epoll_ctl(ps->kp_fd, op, fd, &epe); + res = epoll_ctl(ps->kp_fd, epoll_op, fd, &epe); } while (res != 0 && errno == EINTR); -#if defined(ERTS_POLL_DEBUG_PRINT) && 1 +#if ERTS_POLL_DEBUG_PRINT { int saved_errno = errno; - erts_printf("%s = epoll_ctl(%d, %s, %d, {Ox%x, %d})\n", - res == 0 ? "0" : erl_errno_id(errno), - ps->kp_fd, - (op == EPOLL_CTL_ADD - ? "EPOLL_CTL_ADD" - : (op == EPOLL_CTL_MOD - ? "EPOLL_CTL_MOD" - : (op == EPOLL_CTL_DEL - ? "EPOLL_CTL_DEL" - : "UNKNOWN"))), - fd, - epe_templ.events, - fd); + DEBUG_PRINT_FD("%s = epoll_ctl(%d, %s, %d, {0x%x, %d})", + ps, fd, + res == 0 ? "0" : erl_errno_id(errno), + ps->kp_fd, + (epoll_op == EPOLL_CTL_ADD + ? "EPOLL_CTL_ADD" + : (epoll_op == EPOLL_CTL_MOD + ? "EPOLL_CTL_MOD" + : (epoll_op == EPOLL_CTL_DEL + ? "EPOLL_CTL_DEL" + : "UNKNOWN"))), + fd, + epe_templ.events, + fd); errno = saved_errno; } #endif - if (res == 0) - ps->fds_status[fd].used_events = ps->fds_status[fd].events; - else { + if (res != 0) { switch (op) { - case EPOLL_CTL_MOD: + case ERTS_POLL_OP_MOD: epe.events = 0; do { /* We init 'epe' every time since epoll_ctl() may modify it @@ -1240,29 +724,18 @@ update_pollset(ErtsPollSet ps, int fd) epe.data.fd = fd; res = epoll_ctl(ps->kp_fd, EPOLL_CTL_DEL, fd, &epe); } while (res != 0 && errno == EINTR); - ps->fds_status[fd].used_events = 0; /* Fall through ... */ - case EPOLL_CTL_ADD: { - ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_USEFLBCK; + case ERTS_POLL_OP_ADD: { erts_atomic_dec_nob(&ps->no_of_user_fds); -#if ERTS_POLL_USE_CONCURRENT_UPDATE - if (!*update_fallback) { - *update_fallback = 1; - return 0; - } -#endif - ASSERT(!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK)); - res = update_fallback_pollset(ps, fd); - ASSERT(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK); + res = ERTS_POLL_EV_NVAL; break; } - case EPOLL_CTL_DEL: { + case ERTS_POLL_OP_DEL: { /* * Since we use a lazy update approach EPOLL_CTL_DEL will * frequently fail. This since epoll automatically removes * a filedescriptor that is closed from the poll set. */ - ps->fds_status[fd].used_events = 0; res = 0; break; } @@ -1271,67 +744,277 @@ update_pollset(ErtsPollSet ps, int fd) __FILE__, __LINE__); break; } + } else { + res = events; } - ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_RST; return res; } -#if ERTS_POLL_USE_CONCURRENT_UPDATE +#endif /* ERTS_POLL_USE_EPOLL */ + +#if ERTS_POLL_USE_KQUEUE + +/* Some versions of the EV_SET macro used kevp multiple times, + so we define out own version that make sure that it is safe + to do kevp++ in the argument list. */ +#define ERTS_EV_SET(kevp, a, b, c, f) do { \ + struct kevent *kevp_ = kevp; \ + EV_SET(kevp_, a, b, c, 0, 0, f); \ + } while(0) + static int -update_pollset(ErtsPollSet ps, int fd) +update_pollset(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events) { - int update_fallback = 1; - return conc_update_pollset(ps, fd, &update_fallback); -} -#endif - -#endif /* ERTS_POLL_USE_EPOLL */ + int res = 0, len = 0; + struct kevent evts[2]; + struct timespec ts = {0, 0}; -#endif /* ERTS_POLL_USE_BATCH_UPDATE_POLLSET */ +#ifdef EV_DISPATCH + /* If we have EV_DISPATCH we use it. The kevent descriptions for both + read and write are added on OP_ADD and removed on OP_DEL. And then + after than only EV_ENABLE|EV_DISPATCH are used. + + It could be possible to not modify the pollset when disabling and/or + deleting events, but that may cause the poll threads to be awoken + a lot more than they should so we take the cost here instead of + in the poll thread. + + Note: We need to have EV_DISPATCH both when the event is enabled and + disabled, as otherwise the event may be triggered twice on each re-arm. + Not sure if this is intended or not (can't find anything about it in the + man page), but it seems to be the way it works... + */ + + if (op == ERTS_POLL_OP_DEL) { + erts_atomic_dec_nob(&ps->no_of_user_fds); + /* We could probably skip this delete, do we want to? */ + ERTS_EV_SET(&evts[len++], fd, EVFILT_READ, EV_DELETE, (void *) 0); + ERTS_EV_SET(&evts[len++], fd, EVFILT_WRITE, EV_DELETE, (void *) 0); + } else if (op == ERTS_POLL_OP_ADD) { + uint32_t flags; + erts_atomic_inc_nob(&ps->no_of_user_fds); + + flags = EV_ADD|EV_DISPATCH; + flags |= ((events & ERTS_POLL_EV_IN) ? 0 : EV_DISABLE); + ERTS_EV_SET(&evts[len++], fd, EVFILT_READ, flags, (void *) ERTS_POLL_EV_IN); + + flags = EV_ADD|EV_DISPATCH; + flags |= ((events & ERTS_POLL_EV_OUT) ? 0 : EV_DISABLE); + ERTS_EV_SET(&evts[len++], fd, EVFILT_WRITE, flags, (void *) ERTS_POLL_EV_OUT); + } else { + uint32_t flags; + ASSERT(op == ERTS_POLL_OP_MOD); -#if ERTS_POLL_USE_POLL || ERTS_POLL_USE_SELECT || ERTS_POLL_USE_FALLBACK + flags = EV_DISPATCH; + flags |= (events & ERTS_POLL_EV_IN) ? EV_ENABLE : EV_DISABLE; + ERTS_EV_SET(&evts[len++], fd, EVFILT_READ, flags, (void *) ERTS_POLL_EV_IN); -#if ERTS_POLL_USE_FALLBACK -static int update_fallback_pollset(ErtsPollSet ps, int fd) + flags = EV_DISPATCH; + flags |= (events & ERTS_POLL_EV_OUT) ? EV_ENABLE : EV_DISABLE; + ERTS_EV_SET(&evts[len++], fd, EVFILT_WRITE, flags, (void *) ERTS_POLL_EV_OUT); + } #else -static int update_pollset(ErtsPollSet ps, int fd) + uint32_t flags = EV_ADD|EV_ONESHOT; + + if (op == ERTS_POLL_OP_DEL) { + erts_atomic_dec_nob(&ps->no_of_user_fds); + /* We don't do anything when a delete is issued. The fds will be removed + when they are triggered, or when they are closed. */ + events = 0; + } else if (op == ERTS_POLL_OP_ADD) { + erts_atomic_inc_nob(&ps->no_of_user_fds); + } + + if (events & ERTS_POLL_EV_IN) { + ERTS_EV_SET(&evts[len++], fd, EVFILT_READ, flags, (void *) ERTS_POLL_EV_IN); + } + if (events & ERTS_POLL_EV_OUT) { + ERTS_EV_SET(&evts[len++], fd, EVFILT_WRITE, flags, (void *) ERTS_POLL_EV_OUT); + } + #endif -{ -#ifdef ERTS_POLL_DEBUG_PRINT -#if ERTS_POLL_USE_FALLBACK - erts_printf("Doing fallback update on fd=%d\n", fd); + if (len) + do { + res = kevent(ps->kp_fd, evts, len, NULL, 0, &ts); + } while (res < 0 && errno == EINTR); +#if ERTS_POLL_DEBUG_PRINT + { + int saved_errno = errno, i; + char keventb[255], *keventbp = keventb; + if (res < 0) + keventbp += sprintf(keventbp,"%s = ",erl_errno_id(saved_errno)); + else + keventbp += sprintf(keventbp,"%d = ",res); + keventbp += sprintf(keventbp, "kevent(%d, {",ps->kp_fd); + for (i = 0; i < len; i++) { + const char *flags = "UNKNOWN"; + if (evts[i].flags == EV_DELETE) flags = "EV_DELETE"; + if (evts[i].flags == (EV_ADD|EV_ONESHOT)) flags = "EV_ADD|EV_ONESHOT"; +#ifdef EV_DISPATCH + if (evts[i].flags == (EV_ADD|EV_DISPATCH)) flags = "EV_ADD|EV_DISPATCH"; + if (evts[i].flags == (EV_ADD|EV_DISABLE)) flags = "EV_ADD|EV_DISABLE"; + if (evts[i].flags == (EV_ENABLE|EV_DISPATCH)) flags = "EV_ENABLE|EV_DISPATCH"; + if (evts[i].flags == EV_DISABLE) flags = "EV_DISABLE"; + if (evts[i].flags == (EV_DISABLE|EV_DISPATCH)) flags = "EV_DISABLE|EV_DISABLE"; +#endif + + keventbp += sprintf(keventbp, "%s{%lu, %s, %s}",i > 0 ? ", " : "", + evts[i].ident, + (evts[i].filter == EVFILT_READ + ? "EVFILT_READ" + : (evts[i].filter == EVFILT_WRITE + ? "EVFILT_WRITE" + : "UNKNOWN")), flags); + } + keventbp += sprintf(keventbp, "}, %d)", len); + DEBUG_PRINT_FD("%s", ps, fd, keventb); + errno = saved_errno; + } +#endif + if (res < 0) { + if (op != ERTS_POLL_OP_DEL) { +#ifdef EV_RECEIPT + struct kevent receipt_evts[2]; + len = 0; + ERTS_EV_SET(&evts[len++], fd, EVFILT_WRITE, EV_DELETE|EV_RECEIPT, (void *) 0); + ERTS_EV_SET(&evts[len++], fd, EVFILT_READ, EV_DELETE|EV_RECEIPT, (void *) 0); + do { + res = kevent(ps->kp_fd, evts, len, receipt_evts, 2, &ts); + } while (res < 0 && errno == EINTR); #else - erts_printf("Doing update on fd=%d\n", fd); + ERTS_EV_SET(&evts[0], fd, EVFILT_WRITE, EV_DELETE, (void *) 0); + do { + res = kevent(ps->kp_fd, evts, 1, NULL, 0, &ts); + } while (res < 0 && errno == EINTR); + ERTS_EV_SET(&evts[0], fd, EVFILT_READ, EV_DELETE, (void *) 0); + do { + res = kevent(ps->kp_fd, evts, 1, NULL, 0, &ts); + } while (res < 0 && errno == EINTR); +#endif + if (op == ERTS_POLL_OP_ADD) + erts_atomic_dec_nob(&ps->no_of_user_fds); + return ERTS_POLL_EV_NVAL; + } + return 0; + } + return events; +} + +#endif /* ERTS_POLL_USE_KQUEUE */ + +#if !ERTS_POLL_USE_CONCURRENT_UPDATE + +static ERTS_INLINE void +init_batch_update(ErtsPollSet *ps, int len) +{ +#if ERTS_POLL_USE_DEVPOLL + ASSERT(ps->poll_fds == NULL); + ps->poll_fds = erts_alloc(ERTS_ALC_T_TMP, sizeof(struct pollfd) * len); + ps->poll_fds_ix = 0; #endif +} + +static ERTS_INLINE void +write_batch_update(ErtsPollSet *ps) +{ +#if ERTS_POLL_USE_DEVPOLL + ssize_t wres; + char *buf = (char *) ps->poll_fds; + size_t buf_size = sizeof(struct pollfd)*ps->poll_fds_ix; + + while (1) { + wres = write(ps->kp_fd, (void *) buf, buf_size); + if (wres < 0) { + if (errno == EINTR) + continue; + fatal_error("%s:%d:write_batch_buf(): " + "Failed to write to /dev/poll: " + "%s (%d)\n", + __FILE__, __LINE__, + erl_errno_id(errno), errno); + } +#if ERTS_POLL_DEBUG_PRINT + { + int saved_errno = errno, i; + char devpollb[2048], *devpollbp = devpollb; + devpollbp += sprintf(devpollbp, "%d = devpoll(%d, {", wres, ps->kp_fd); + for (i = 0; i < wres / sizeof(struct pollfd); i++) { + if (devpollbp == devpollb) + devpollbp += sprintf(devpollbp, "%d = devpoll(%d, {", wres, ps->kp_fd); + devpollbp += sprintf(devpollbp, "%s{fd = %d, events = %s}", + i > 0 ? ", " : "", + ps->poll_fds[i].fd, + ev2str(ps->poll_fds[i].events)); + if (devpollbp - devpollb > 512) { + devpollbp += sprintf(devpollbp, "}, %d)", ps->poll_fds_ix); + DEBUG_PRINT("%s", ps, devpollb); + devpollbp = devpollb; + } + } + devpollbp += sprintf(devpollbp, "}, %d)", ps->poll_fds_ix); + DEBUG_PRINT("%s", ps, devpollb); + errno = saved_errno; + } #endif - ASSERT(fd < ps->fds_status_len); -#if ERTS_POLL_USE_FALLBACK - ASSERT(ps->fds_status[fd].used_events - ? (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK) - : (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_USEFLBCK)); + buf_size -= wres; + if (buf_size <= 0) + break; + buf += wres; + } + + if (buf_size < 0) { + fatal_error("%s:%d:write_devpoll_buf(): Internal error\n", + __FILE__, __LINE__); + } + erts_free(ERTS_ALC_T_TMP, ps->poll_fds); + ps->poll_fds = NULL; #endif +} - if (!need_update(ps, fd)) - return 0; +static ERTS_INLINE int +need_update(ErtsPollSet *ps, int fd, int *resetp) +{ + int reset; + ASSERT(fd < ps->fds_status_len); -#if ERTS_POLL_USE_FALLBACK + reset = (int) (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST); ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_RST; -#endif + + *resetp = reset; + + if (reset || ps->fds_status[fd].used_events != ps->fds_status[fd].events) + return 1; + + return 0; +} + +static int update_pollset(ErtsPollSet *ps, ErtsPollResFd pr[], int fd) +{ + int res = 0, reset = 0; + ErtsPollEvents events = ps->fds_status[fd].events; + ASSERT(fd < ps->fds_status_len); + + if (!need_update(ps, fd, &reset)) + return res; #if ERTS_POLL_USE_POLL /* --- poll -------------------------------- */ - if (!ps->fds_status[fd].events) { + if (!events) { int pix = ps->fds_status[fd].pix; int last_pix; + + if (reset) { + /* When a fd has been reset, we tell the caller of erts_poll_wait + this by setting the fd as ERTS_POLL_EV_NONE */ + ERTS_POLL_RES_SET_FD(&pr[res], fd); + ERTS_POLL_RES_SET_EVTS(&pr[res], ERTS_POLL_EV_NONE); + DEBUG_PRINT_FD("trig %s (poll)", ps, fd, ev2str(ERTS_POLL_EV_NONE)); + res++; + } + if (pix < 0) { -#if ERTS_POLL_USE_FALLBACK - ASSERT(!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK)); -#endif - return -1; + return res; } -#if ERTS_POLL_USE_FALLBACK - ASSERT(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK); -#endif erts_atomic_dec_nob(&ps->no_of_user_fds); last_pix = --ps->no_poll_fds; if (pix != last_pix) { @@ -1348,126 +1031,151 @@ static int update_pollset(ErtsPollSet ps, int fd) /* Clear this fd status */ ps->fds_status[fd].pix = -1; ps->fds_status[fd].used_events = (ErtsPollEvents) 0; -#if ERTS_POLL_USE_FALLBACK - ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_INFLBCK; -#endif + } else { int pix = ps->fds_status[fd].pix; if (pix < 0) { -#if ERTS_POLL_USE_FALLBACK - ASSERT(!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK) - || fd == ps->kp_fd); -#endif erts_atomic_inc_nob(&ps->no_of_user_fds); ps->fds_status[fd].pix = pix = ps->no_poll_fds++; if (pix >= ps->poll_fds_len) grow_poll_fds(ps, pix); ps->poll_fds[pix].fd = fd; ps->fds_status[fd].pix = pix; -#if ERTS_POLL_USE_FALLBACK - ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_INFLBCK; -#endif } -#if ERTS_POLL_USE_FALLBACK - ASSERT(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK); -#endif - /* Events to be used in next poll */ - ps->poll_fds[pix].events = ev2pollev(ps->fds_status[fd].events); + ps->poll_fds[pix].events = ev2pollev(events); if (ps->poll_fds[pix].revents) { /* Remove result events that we should not poll for anymore */ ps->poll_fds[pix].revents &= ev2pollev(~(~ps->fds_status[fd].used_events - & ps->fds_status[fd].events)); + & events)); } /* Save events to be used in next poll */ - ps->fds_status[fd].used_events = ps->fds_status[fd].events; + ps->fds_status[fd].used_events = events; } - return 0; + return res; #elif ERTS_POLL_USE_SELECT /* --- select ------------------------------ */ - { - ErtsPollEvents events = ps->fds_status[fd].events; + if (!events) { + + if (reset) { + /* When a fd has been reset, we tell the caller of erts_poll_wait + this by setting the fd as ERTS_POLL_EV_NONE */ + ERTS_POLL_RES_SET_FD(&pr[res], fd); + ERTS_POLL_RES_SET_EVTS(&pr[res], ERTS_POLL_EV_NONE); + DEBUG_PRINT_FD("trig %s (select)", ps, fd, ev2str(ERTS_POLL_EV_NONE)); + res++; + } + + ERTS_FD_CLR(fd, &ps->input_fds); + ERTS_FD_CLR(fd, &ps->output_fds); + + if (ps->fds_status[fd].used_events) { + erts_atomic_dec_nob(&ps->no_of_user_fds); + ps->fds_status[fd].used_events = (ErtsPollEvents) 0; + } + + if (fd == ps->max_fd) { + int max = ps->max_fd; + for (max = ps->max_fd; max >= 0; max--) + if (ps->fds_status[max].used_events) + break; + ps->max_fd = max; + } + + } else { + ensure_select_fds(fd, &ps->input_fds, &ps->output_fds); - if ((ERTS_POLL_EV_IN & events) - != (ERTS_POLL_EV_IN & ps->fds_status[fd].used_events)) { - if (ERTS_POLL_EV_IN & events) { - ERTS_FD_SET(fd, &ps->input_fds); - } - else { - ERTS_FD_CLR(fd, &ps->input_fds); - } - } - if ((ERTS_POLL_EV_OUT & events) - != (ERTS_POLL_EV_OUT & ps->fds_status[fd].used_events)) { - if (ERTS_POLL_EV_OUT & events) { - ERTS_FD_SET(fd, &ps->output_fds); - } - else { - ERTS_FD_CLR(fd, &ps->output_fds); - } - } - if (!ps->fds_status[fd].used_events) { - ASSERT(events); - erts_atomic_inc_nob(&ps->no_of_user_fds); -#if ERTS_POLL_USE_FALLBACK - ps->no_select_fds++; - ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_INFLBCK; -#endif - } - else if (!events) { - ASSERT(ps->fds_status[fd].used_events); - erts_atomic_dec_nob(&ps->no_of_user_fds); - ps->fds_status[fd].events = events; -#if ERTS_POLL_USE_FALLBACK - ps->no_select_fds--; - ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_INFLBCK; -#endif - } + if (!ps->fds_status[fd].used_events) + erts_atomic_inc_nob(&ps->no_of_user_fds); + + if (events & ERTS_POLL_EV_IN) + ERTS_FD_SET(fd, &ps->input_fds); + else + ERTS_FD_CLR(fd, &ps->input_fds); + + if (events & ERTS_POLL_EV_OUT) + ERTS_FD_SET(fd, &ps->output_fds); + else + ERTS_FD_CLR(fd, &ps->output_fds); ps->fds_status[fd].used_events = events; - if (events && fd > ps->max_fd) - ps->max_fd = fd; - else if (!events && fd == ps->max_fd) { - int max = ps->max_fd; - for (max = ps->max_fd; max >= 0; max--) - if (ps->fds_status[max].used_events) - break; - ps->max_fd = max; - } + if (fd > ps->max_fd) + ps->max_fd = fd; } - return 0; -#endif -} -#endif /* ERTS_POLL_USE_POLL || ERTS_POLL_USE_SELECT || ERTS_POLL_USE_FALLBACK */ + return res; +#elif ERTS_POLL_USE_DEVPOLL + if (!events) { -static void -handle_update_requests(ErtsPollSet ps) -{ - ErtsPollSetUpdateRequestsBlock *urqbp = &ps->update_requests; -#if ERTS_POLL_USE_BATCH_UPDATE_POLLSET - ErtsPollBatchBuf bb; - setup_batch_buf(ps, &bb); + if (reset) { + /* When a fd has been reset, we tell the caller of erts_poll_wait + this by setting the fd as ERTS_POLL_EV_NONE */ + ERTS_POLL_RES_SET_FD(&pr[res], fd); + ERTS_POLL_RES_SET_EVTS(&pr[res], ERTS_POLL_EV_NONE); + DEBUG_PRINT_FD("trig %s (devpoll)", ps, fd, ev2str(ERTS_POLL_EV_NONE)); + res++; + } + + ps->poll_fds[ps->poll_fds_ix].fd = fd; + ps->poll_fds[ps->poll_fds_ix].revents = 0; + ps->poll_fds[ps->poll_fds_ix++].events = POLLREMOVE; + + if (ps->fds_status[fd].used_events) { + erts_atomic_dec_nob(&ps->no_of_user_fds); + ps->fds_status[fd].used_events = 0; + } + + } else { + if (!ps->fds_status[fd].used_events) { + erts_atomic_inc_nob(&ps->no_of_user_fds); + } + ps->poll_fds[ps->poll_fds_ix].fd = fd; + ps->poll_fds[ps->poll_fds_ix].revents = 0; + ps->poll_fds[ps->poll_fds_ix++].events = ERTS_POLL_EV_E2N(events); + ps->fds_status[fd].used_events = ps->fds_status[fd].events; + } + + return res; #endif +} + +static int +handle_update_requests(ErtsPollSet *ps, ErtsPollResFd pr[], int no_fds) +{ + int res = 0; + ErtsPollSetUpdateRequestsBlock *urqbp = ps->curr_upd_req_block; while (urqbp) { ErtsPollSetUpdateRequestsBlock *free_urqbp = urqbp; int i; int len = urqbp->len; + + init_batch_update(ps, len); + for (i = 0; i < len; i++) { int fd = urqbp->fds[i]; ASSERT(fd < ps->fds_status_len); - ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_INURQ; -#if ERTS_POLL_USE_BATCH_UPDATE_POLLSET - batch_update_pollset(ps, fd, &bb); -#else - update_pollset(ps, fd); -#endif + ASSERT(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INURQ); + + /* We have run out of PollResFd slots to put results in, + so we yield here and return later for more. */ + if (res == no_fds && pr != NULL) { + memmove(urqbp->fds, urqbp->fds+i, sizeof(int) * (len - i)); + urqbp->len -= i; + ps->curr_upd_req_block = urqbp; + write_batch_update(ps); + return res; + } + + if (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INURQ) { + ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_INURQ; + res += update_pollset(ps, pr + res, fd); + } } free_urqbp = urqbp; @@ -1475,12 +1183,9 @@ handle_update_requests(ErtsPollSet ps) free_update_requests_block(ps, free_urqbp); - } + write_batch_update(ps); -#if ERTS_POLL_USE_BATCH_UPDATE_POLLSET - if (bb.len) - write_batch_buf(ps, &bb); -#endif + } ps->curr_upd_req_block = &ps->update_requests; @@ -1489,16 +1194,19 @@ handle_update_requests(ErtsPollSet ps) #endif ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(ps); + return res; } +#endif /* !ERTS_POLL_USE_CONCURRENT_UPDATE */ static ERTS_INLINE ErtsPollEvents -poll_control(ErtsPollSet ps, int fd, ErtsPollEvents events, int on, int *do_wake) +poll_control(ErtsPollSet *ps, int fd, ErtsPollOp op, + ErtsPollEvents events, int *do_wake) { ErtsPollEvents new_events; if (fd < ps->internal_fd_limit || fd >= max_fds) { - if (fd < 0) { + if (fd < 0 || fd >= max_fds) { new_events = ERTS_POLL_EV_ERR; goto done; } @@ -1508,115 +1216,55 @@ poll_control(ErtsPollSet ps, int fd, ErtsPollEvents events, int on, int *do_wake goto done; } #endif +#if !ERTS_POLL_USE_CONCURRENT_UPDATE if (fd == ps->wake_fds[0] || fd == ps->wake_fds[1]) { new_events = ERTS_POLL_EV_NVAL; goto done; } -#if ERTS_POLL_USE_TIMERFD - if (fd == ps->timer_fd) { - new_events = ERTS_POLL_EV_NVAL; - goto done; - } #endif } +#if ERTS_POLL_USE_CONCURRENT_UPDATE + + new_events = update_pollset(ps, fd, op, events); + +#else /* !ERTS_POLL_USE_CONCURRENT_UPDATE */ if (fd >= ps->fds_status_len) grow_fds_status(ps, fd); ASSERT(fd < ps->fds_status_len); - new_events = ps->fds_status[fd].events; - - if (events == 0) { - *do_wake = 0; - goto done; - } - - if (on) - new_events |= events; - else - new_events &= ~events; - - if (new_events == (ErtsPollEvents) 0) { - ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_RST; -#if ERTS_POLL_USE_FALLBACK - ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_USEFLBCK; -#endif - } - - ps->fds_status[fd].events = new_events; - - if (new_events == ps->fds_status[fd].used_events - && !(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST) - ) { - *do_wake = 0; - goto done; - } - - -#if ERTS_POLL_USE_CONCURRENT_UPDATE - if (ERTS_POLLSET_IS_POLLED(ps)) { - int update_fallback = 0; - conc_update_pollset(ps, fd, &update_fallback); - if (!update_fallback) { - *do_wake = 0; /* no need to wake kernel poller */ - goto done; - } + if (op == ERTS_POLL_OP_DEL) { + ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_RST; + ps->fds_status[fd].events = 0; + *do_wake = 1; + } else if (op == ERTS_POLL_OP_ADD) { + ASSERT(ps->fds_status[fd].events == 0); + ps->fds_status[fd].events = events; + *do_wake = 1; + } else { + ASSERT(op == ERTS_POLL_OP_MOD); + ps->fds_status[fd].events = events; + *do_wake = 1; } -#endif + new_events = ps->fds_status[fd].events; enqueue_update_request(ps, fd); - - /* - * If new events have been added, we need to wake up the - * polling thread, but if events have been removed we don't. - */ - if ((new_events && (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST)) - || (~ps->fds_status[fd].used_events & new_events)) - *do_wake = 1; +#endif /* !ERTS_POLL_USE_CONCURRENT_UPDATE */ done: -#ifdef ERTS_POLL_DEBUG_PRINT - erts_printf("0x%x = poll_control(ps, %d, 0x%x, %s) do_wake=%d\n", - (int) new_events, fd, (int) events, (on ? "on" : "off"), *do_wake); -#endif + DEBUG_PRINT_FD("%s = %s(%p, %d, %s, %s) do_wake=%d", + ps, fd, ev2str(new_events), __FUNCTION__, ps, + fd, op2str(op), ev2str(events), *do_wake); return new_events; } -void -ERTS_POLL_EXPORT(erts_poll_controlv)(ErtsPollSet ps, - ErtsPollControlEntry pcev[], - int len) -{ - int i; - int do_wake; - int final_do_wake = 0; - - ERTS_POLLSET_LOCK(ps); - - for (i = 0; i < len; i++) { - do_wake = 0; - pcev[i].events = poll_control(ps, - pcev[i].fd, - pcev[i].events, - pcev[i].on, - &do_wake); - final_do_wake |= do_wake; - } - - ERTS_POLLSET_UNLOCK(ps); - - if (final_do_wake) - wake_poller(ps, 0, 0); - -} - ErtsPollEvents -ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet ps, +ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet *ps, ErtsSysFdType fd, + ErtsPollOp op, ErtsPollEvents events, - int on, int* do_wake) /* In: Wake up polling thread */ /* Out: Poller is woken */ { @@ -1624,13 +1272,15 @@ ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet ps, ERTS_POLLSET_LOCK(ps); - res = poll_control(ps, fd, events, on, do_wake); + res = poll_control(ps, fd, op, events, do_wake); ERTS_POLLSET_UNLOCK(ps); +#if !ERTS_POLL_USE_CONCURRENT_UPDATE if (*do_wake) { - wake_poller(ps, 0, 0); + wake_poller(ps, 0); } +#endif return res; } @@ -1642,180 +1292,60 @@ ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet ps, #if ERTS_POLL_USE_KERNEL_POLL static ERTS_INLINE int -save_kp_result(ErtsPollSet ps, ErtsPollResFd pr[], int max_res, int chk_fds_res) +ERTS_POLL_EXPORT(save_result)(ErtsPollSet *ps, ErtsPollResFd pr[], int max_res, int chk_fds_res, int ebadf) { - int res = 0; - int i; - int n = chk_fds_res < max_res ? chk_fds_res : max_res; +#if !ERTS_POLL_USE_CONCURRENT_UPDATE || ERTS_POLL_DEBUG_PRINT + int n = chk_fds_res < max_res ? chk_fds_res : max_res, i; + int res = n; +#if !ERTS_POLL_USE_CONCURRENT_UPDATE int wake_fd = ps->wake_fds[0]; -#if ERTS_POLL_USE_TIMERFD - int timer_fd = ps->timer_fd; #endif for (i = 0; i < n; i++) { - -#if ERTS_POLL_USE_EPOLL /* --- epoll ------------------------------- */ - - if (ps->res_events[i].events) { - int fd = ps->res_events[i].data.fd; - int ix; - ErtsPollEvents revents; - if (fd == wake_fd) { - cleanup_wakeup_pipe(ps); - continue; - } -#if ERTS_POLL_USE_TIMERFD - if (fd == timer_fd) { - continue; - } + int fd = ERTS_POLL_RES_GET_FD(&pr[i]); +#ifdef DEBUG_PRINT_MODE + ErtsPollEvents evts = ERTS_POLL_RES_GET_EVTS(pr+i); #endif - ASSERT(!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK)); - /* epoll_wait() can repeat the same fd in result array... */ - ix = (int) ps->fds_status[fd].res_ev_ix; - ASSERT(ix >= 0); - if (ix >= res || pr[ix].fd != fd) { - ix = res; - pr[ix].fd = fd; - pr[ix].events = (ErtsPollEvents) 0; - } - - revents = ERTS_POLL_EV_N2E(ps->res_events[i].events); - pr[ix].events |= revents; - if (revents) { - if (res == ix) { - ps->fds_status[fd].res_ev_ix = (unsigned short) ix; - res++; - } - } - } - -#elif ERTS_POLL_USE_KQUEUE /* --- kqueue ------------------------------ */ - - struct kevent *ev; - int fd; - int ix; - - ev = &ps->res_events[i]; - fd = (int) ev->ident; - ASSERT(fd < ps->fds_status_len); - ASSERT(!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK)); - ix = (int) ps->fds_status[fd].res_ev_ix; - - ASSERT(ix >= 0); - if (ix >= res || pr[ix].fd != fd) { - ix = res; - pr[ix].fd = (int) ev->ident; - pr[ix].events = (ErtsPollEvents) 0; - } - - if (ev->filter == EVFILT_READ) { - if (fd == wake_fd) { - cleanup_wakeup_pipe(ps); - continue; - } - pr[ix].events |= ERTS_POLL_EV_IN; - } - else if (ev->filter == EVFILT_WRITE) - pr[ix].events |= ERTS_POLL_EV_OUT; - if (ev->flags & (EV_ERROR|EV_EOF)) { - if ((ev->flags & EV_ERROR) && (((int) ev->data) == EBADF)) - pr[ix].events |= ERTS_POLL_EV_NVAL; - else - pr[ix].events |= ERTS_POLL_EV_ERR; - } - if (pr[ix].events) { - if (res == ix) { - ps->fds_status[fd].res_ev_ix = (unsigned short) ix; - res++; - } - } - -#elif ERTS_POLL_USE_DEVPOLL /* --- devpoll ----------------------------- */ - - if (ps->res_events[i].revents) { - int fd = ps->res_events[i].fd; - ErtsPollEvents revents; - if (fd == wake_fd) { - cleanup_wakeup_pipe(ps); - continue; - } -#if ERTS_POLL_USE_TIMERFD - if (fd == timer_fd) { - continue; - } -#endif - revents = ERTS_POLL_EV_N2E(ps->res_events[i].events); - pr[res].fd = fd; - pr[res].events = revents; - res++; - } -#endif - - } - - return res; -} - -#endif /* ERTS_POLL_USE_KERNEL_POLL */ - -#if ERTS_POLL_USE_FALLBACK - -static int -get_kp_results(ErtsPollSet ps, ErtsPollResFd pr[], int max_res) -{ - int res; + DEBUG_PRINT_FD("trig %s (%s)", ps, fd, + ev2str(evts), #if ERTS_POLL_USE_KQUEUE - struct timespec ts = {0, 0}; + "kqueue" +#elif ERTS_POLL_USE_EPOLL + "epoll" +#else + "/dev/poll" #endif + ); - if (max_res > ps->res_events_len) - grow_res_events(ps, max_res); +#if !ERTS_POLL_USE_CONCURRENT_UPDATE - do { -#if ERTS_POLL_USE_EPOLL - res = epoll_wait(ps->kp_fd, ps->res_events, max_res, 0); -#elif ERTS_POLL_USE_KQUEUE - res = kevent(ps->kp_fd, NULL, 0, ps->res_events, max_res, &ts); + if (fd == wake_fd) { + cleanup_wakeup_pipe(ps); + ERTS_POLL_RES_SET_EVTS(&pr[i], ERTS_POLL_EV_NONE); + } else { + /* Reset the events to emulate ONESHOT semantics */ + ps->fds_status[fd].events = 0; + enqueue_update_request(ps, fd); + } #endif - } while (res < 0 && errno == EINTR); - - if (res < 0) { - fatal_error("%s:%d: %s() failed: %s (%d)\n", - __FILE__, __LINE__, -#if ERTS_POLL_USE_EPOLL - "epoll_wait", -#elif ERTS_POLL_USE_KQUEUE - "kevent", -#endif - erl_errno_id(errno), errno); } - return save_kp_result(ps, pr, max_res, res); + return res; +#else + ASSERT(chk_fds_res <= max_res); + return chk_fds_res; +#endif } -#endif /* ERTS_POLL_USE_FALLBACK */ - - +#else /* !ERTS_POLL_USE_KERNEL_POLL */ static ERTS_INLINE int -save_poll_result(ErtsPollSet ps, ErtsPollResFd pr[], int max_res, - int chk_fds_res, int ebadf) +ERTS_POLL_EXPORT(save_result)(ErtsPollSet *ps, ErtsPollResFd pr[], int max_res, int chk_fds_res, int ebadf) { -#if ERTS_POLL_USE_DEVPOLL - return save_kp_result(ps, pr, max_res, chk_fds_res); -#elif ERTS_POLL_USE_FALLBACK - if (!ps->fallback_used) - return save_kp_result(ps, pr, max_res, chk_fds_res); - else -#endif /* ERTS_POLL_USE_FALLBACK */ - { - #if ERTS_POLL_USE_POLL /* --- poll -------------------------------- */ int res = 0; -#if !ERTS_POLL_USE_FALLBACK int wake_fd = ps->wake_fds[0]; -#endif int i, first_ix, end_ix; /* @@ -1832,23 +1362,30 @@ save_poll_result(ErtsPollSet ps, ErtsPollResFd pr[], int max_res, if (ps->poll_fds[i].revents != (short) 0) { int fd = ps->poll_fds[i].fd; ErtsPollEvents revents; -#if ERTS_POLL_USE_FALLBACK - if (fd == ps->kp_fd) { - res += get_kp_results(ps, &pr[res], max_res-res); - i++; - continue; - } -#else if (fd == wake_fd) { cleanup_wakeup_pipe(ps); i++; continue; } -#endif revents = pollev2ev(ps->poll_fds[i].revents); - pr[res].fd = fd; - pr[res].events = revents; + ERTS_POLL_RES_SET_FD(&pr[res], fd); + ERTS_POLL_RES_SET_EVTS(&pr[res], revents); + + /* If an fd returns as error, we may want to check the + update_requests queue to see if it has been reset + before delivering the result?!?! This should allow + the user to do driver_dselect + close without waiting + for stop_select... */ + + DEBUG_PRINT_FD("trig %s (poll)", ps, ERTS_POLL_RES_GET_FD(&pr[res]), + ev2str(ERTS_POLL_RES_GET_EVTS(&pr[res]))); + res++; + + /* Clear the events for this fd in order to mimic + how epoll ONESHOT works */ + ps->fds_status[fd].events = 0; + enqueue_update_request(ps, fd); } i++; } @@ -1864,9 +1401,7 @@ save_poll_result(ErtsPollSet ps, ErtsPollResFd pr[], int max_res, #elif ERTS_POLL_USE_SELECT /* --- select ------------------------------ */ int res = 0; -#if !ERTS_POLL_USE_FALLBACK int wake_fd = ps->wake_fds[0]; -#endif int fd, first_fd, end_fd; /* @@ -1879,29 +1414,23 @@ save_poll_result(ErtsPollSet ps, ErtsPollResFd pr[], int max_res, if (!ebadf) { while (1) { while (fd < end_fd && res < max_res) { - - pr[res].events = (ErtsPollEvents) 0; + ErtsPollEvents events = 0; if (ERTS_FD_ISSET(fd, &ps->res_input_fds)) { -#if ERTS_POLL_USE_FALLBACK - if (fd == ps->kp_fd) { - res += get_kp_results(ps, &pr[res], max_res-res); - fd++; - continue; - } -#else if (fd == wake_fd) { cleanup_wakeup_pipe(ps); fd++; continue; } -#endif - pr[res].events |= ERTS_POLL_EV_IN; + events |= ERTS_POLL_EV_IN; } if (ERTS_FD_ISSET(fd, &ps->res_output_fds)) - pr[res].events |= ERTS_POLL_EV_OUT; - if (pr[res].events) { - pr[res].fd = fd; + events |= ERTS_POLL_EV_OUT; + if (events) { + ERTS_POLL_RES_SET_FD(&pr[res], fd); + ERTS_POLL_RES_SET_EVTS(&pr[res], events); res++; + ps->fds_status[fd].events = 0; + enqueue_update_request(ps, fd); } fd++; } @@ -1934,7 +1463,7 @@ save_poll_result(ErtsPollSet ps, ErtsPollResFd pr[], int max_res, if (ps->fds_status[fd].events & ERTS_POLL_EV_OUT) { oset = &ps->res_output_fds; ERTS_FD_ZERO(oset); - ERTS_FD_SET(fd, oset); + ERTS_FD_SET(fd, oset); } do { /* Initiate 'tv' each time; @@ -1943,49 +1472,31 @@ save_poll_result(ErtsPollSet ps, ErtsPollResFd pr[], int max_res, sres = ERTS_SELECT(ps->max_fd+1, iset, oset, NULL, &tv); } while (sres < 0 && errno == EINTR); if (sres < 0) { -#if ERTS_POLL_USE_FALLBACK - if (fd == ps->kp_fd) { - res += get_kp_results(ps, - &pr[res], - max_res-res); - fd++; - continue; - } -#else if (fd == wake_fd) { cleanup_wakeup_pipe(ps); fd++; continue; } -#endif - pr[res].fd = fd; - pr[res].events = ERTS_POLL_EV_NVAL; + ERTS_POLL_RES_SET_FD(&pr[res], fd); + ERTS_POLL_RES_SET_EVTS(&pr[res], ERTS_POLL_EV_NVAL); res++; } else if (sres > 0) { - pr[res].fd = fd; + ErtsPollEvents events = 0; + ERTS_POLL_RES_SET_FD(&pr[res], fd); if (iset && ERTS_FD_ISSET(fd, iset)) { -#if ERTS_POLL_USE_FALLBACK - if (fd == ps->kp_fd) { - res += get_kp_results(ps, - &pr[res], - max_res-res); - fd++; - continue; - } -#else if (fd == wake_fd) { cleanup_wakeup_pipe(ps); fd++; continue; } -#endif - pr[res].events |= ERTS_POLL_EV_IN; + events |= ERTS_POLL_EV_IN; } if (oset && ERTS_FD_ISSET(fd, oset)) { - pr[res].events |= ERTS_POLL_EV_OUT; + events |= ERTS_POLL_EV_OUT; } - ASSERT(pr[res].events); + ASSERT(events); + ERTS_POLL_RES_SET_EVTS(&pr[res], events); res++; } } @@ -2000,353 +1511,145 @@ save_poll_result(ErtsPollSet ps, ErtsPollResFd pr[], int max_res, } ps->next_sel_fd = fd; return res; -#endif - } +#endif /* ERTS_POLL_USE_SELECT */ } -static ERTS_INLINE ErtsMonotonicTime -get_timeout(ErtsPollSet ps, - int resolution, - ErtsMonotonicTime timeout_time) -{ - ErtsMonotonicTime timeout, save_timeout_time; - - if (timeout_time == ERTS_POLL_NO_TIMEOUT) { - save_timeout_time = ERTS_MONOTONIC_TIME_MIN; - timeout = 0; - } - else { - ErtsMonotonicTime diff_time, current_time; - current_time = erts_get_monotonic_time(NULL); - diff_time = timeout_time - current_time; - if (diff_time <= 0) { - save_timeout_time = ERTS_MONOTONIC_TIME_MIN; - timeout = 0; - } - else { - save_timeout_time = current_time; - switch (resolution) { - case 1000: - /* Round up to nearest even milli second */ - timeout = ERTS_MONOTONIC_TO_MSEC(diff_time - 1) + 1; - if (timeout > (ErtsMonotonicTime) INT_MAX) - timeout = (ErtsMonotonicTime) INT_MAX; - save_timeout_time += ERTS_MSEC_TO_MONOTONIC(timeout); - timeout -= ERTS_PREMATURE_TIMEOUT(timeout, 1000); - break; - case 1000000: - /* Round up to nearest even micro second */ - timeout = ERTS_MONOTONIC_TO_USEC(diff_time - 1) + 1; - save_timeout_time += ERTS_USEC_TO_MONOTONIC(timeout); - timeout -= ERTS_PREMATURE_TIMEOUT(timeout, 1000*1000); - break; - case 1000000000: - /* Round up to nearest even nano second */ - timeout = ERTS_MONOTONIC_TO_NSEC(diff_time - 1) + 1; - save_timeout_time += ERTS_NSEC_TO_MONOTONIC(timeout); - timeout -= ERTS_PREMATURE_TIMEOUT(timeout, 1000*1000*1000); - break; - default: - ERTS_INTERNAL_ERROR("Invalid resolution"); - timeout = 0; - save_timeout_time = 0; - break; - } - } - } - set_timeout_time(ps, save_timeout_time); - return timeout; -} - -#if ERTS_POLL_USE_SELECT +#endif /* !ERTS_POLL_USE_KERNEL_POLL */ static ERTS_INLINE int -get_timeout_timeval(ErtsPollSet ps, - SysTimeval *tvp, - ErtsMonotonicTime timeout_time) -{ - ErtsMonotonicTime timeout = get_timeout(ps, - 1000*1000, - timeout_time); - - if (!timeout) { - tvp->tv_sec = 0; - tvp->tv_usec = 0; - - return 0; - } - else { - ErtsMonotonicTime sec = timeout/(1000*1000); - tvp->tv_sec = sec; - tvp->tv_usec = timeout - sec*(1000*1000); - - ASSERT(tvp->tv_sec >= 0); - ASSERT(tvp->tv_usec >= 0); - ASSERT(tvp->tv_usec < 1000*1000); - - return !0; - } - -} - -#endif - -#if ERTS_POLL_USE_KQUEUE || (ERTS_POLL_USE_POLL && defined(HAVE_PPOLL)) || ERTS_POLL_USE_TIMERFD - -static ERTS_INLINE int -get_timeout_timespec(ErtsPollSet ps, - struct timespec *tsp, - ErtsMonotonicTime timeout_time) -{ - ErtsMonotonicTime timeout = get_timeout(ps, - 1000*1000*1000, - timeout_time); - - if (!timeout) { - tsp->tv_sec = 0; - tsp->tv_nsec = 0; - return 0; - } - else { - ErtsMonotonicTime sec = timeout/(1000*1000*1000); - tsp->tv_sec = sec; - tsp->tv_nsec = timeout - sec*(1000*1000*1000); - - ASSERT(tsp->tv_sec >= 0); - ASSERT(tsp->tv_nsec >= 0); - ASSERT(tsp->tv_nsec < 1000*1000*1000); - - return !0; - } -} - -#endif - -#if ERTS_POLL_USE_TIMERFD - -static ERTS_INLINE int -get_timeout_itimerspec(ErtsPollSet ps, - struct itimerspec *itsp, - ErtsMonotonicTime timeout_time) -{ - - itsp->it_interval.tv_sec = 0; - itsp->it_interval.tv_nsec = 0; - - return get_timeout_timespec(ps, &itsp->it_value, timeout_time); -} - -#endif - -static ERTS_INLINE int -check_fd_events(ErtsPollSet ps, ErtsMonotonicTime timeout_time, int max_res) +check_fd_events(ErtsPollSet *ps, ErtsPollResFd pr[], int do_wait, int max_res) { int res; - ERTS_MSACC_PUSH_STATE_M(); - if (erts_atomic_read_nob(&ps->no_of_user_fds) == 0 - && timeout_time == ERTS_POLL_NO_TIMEOUT) { - /* Nothing to poll and zero timeout; done... */ - return 0; - } - else { - int timeout; -#if ERTS_POLL_USE_FALLBACK - if (!(ps->fallback_used = ERTS_POLL_NEED_FALLBACK(ps))) { - -#if ERTS_POLL_USE_EPOLL /* --- epoll ------------------------------- */ - if (max_res > ps->res_events_len) - grow_res_events(ps, max_res); -#if ERTS_POLL_USE_TIMERFD - { - struct itimerspec its; - timeout = get_timeout_itimerspec(ps, &its, timeout_time); - if (timeout) { - erts_thr_progress_prepare_wait(NULL); - ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_SLEEP); - timerfd_set(ps, &its); - res = epoll_wait(ps->kp_fd, ps->res_events, max_res, -1); - res = timerfd_clear(ps, res, max_res); - } else { - res = epoll_wait(ps->kp_fd, ps->res_events, max_res, 0); - } - } -#else /* !ERTS_POLL_USE_TIMERFD */ - timeout = (int) get_timeout(ps, 1000, timeout_time); - if (timeout) { - erts_thr_progress_prepare_wait(NULL); - ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_SLEEP); - } - res = epoll_wait(ps->kp_fd, ps->res_events, max_res, timeout); -#endif /* !ERTS_POLL_USE_TIMERFD */ -#elif ERTS_POLL_USE_KQUEUE /* --- kqueue ------------------------------ */ - struct timespec ts; - if (max_res > ps->res_events_len) - grow_res_events(ps, max_res); - timeout = get_timeout_timespec(ps, &ts, timeout_time); - if (timeout) { - erts_thr_progress_prepare_wait(NULL); - ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_SLEEP); - } - res = kevent(ps->kp_fd, NULL, 0, ps->res_events, max_res, &ts); -#endif /* ----------------------------------------- */ - } - else /* use fallback (i.e. poll() or select()) */ -#endif /* ERTS_POLL_USE_FALLBACK */ - { + int timeout = do_wait ? -1 : 0; + DEBUG_PRINT_WAIT("Entering check_fd_events(), do_wait=%d", ps, do_wait); + { +#if ERTS_POLL_USE_EPOLL /* --- epoll ------------------------------- */ + res = epoll_wait(ps->kp_fd, pr, max_res, timeout); + +#elif ERTS_POLL_USE_KQUEUE /* --- kqueue ------------------------------ */ + struct timespec ts = {0, 0}; + struct timespec *tsp = timeout ? NULL : &ts; + res = kevent(ps->kp_fd, NULL, 0, pr, max_res, tsp); +#elif ERTS_POLL_USE_DEVPOLL /* --- devpoll ----------------------------- */ + /* + * The ioctl() will fail with EINVAL on Solaris 10 if dp_nfds + * is set too high. dp_nfds should not be set greater than + * the maximum number of file descriptors in the poll set. + */ + struct dvpoll poll_res; + int nfds = (int) erts_atomic_read_nob(&ps->no_of_user_fds) + 1 /* wakeup pipe */; + poll_res.dp_nfds = nfds < max_res ? nfds : max_res; + poll_res.dp_fds = pr; + poll_res.dp_timeout = timeout; + res = ioctl(ps->kp_fd, DP_POLL, &poll_res); -#if ERTS_POLL_USE_DEVPOLL /* --- devpoll ----------------------------- */ - /* - * The ioctl() will fail with EINVAL on Solaris 10 if dp_nfds - * is set too high. dp_nfds should not be set greater than - * the maximum number of file descriptors in the poll set. - */ - struct dvpoll poll_res; - int nfds = (int) erts_atomic_read_nob(&ps->no_of_user_fds); - nfds++; /* Wakeup pipe */ - timeout = (int) get_timeout(ps, 1000, timeout_time); - poll_res.dp_nfds = nfds < max_res ? nfds : max_res; - if (poll_res.dp_nfds > ps->res_events_len) - grow_res_events(ps, poll_res.dp_nfds); - poll_res.dp_fds = ps->res_events; - if (timeout) { - erts_thr_progress_prepare_wait(NULL); - ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_SLEEP); - } - poll_res.dp_timeout = timeout; - res = ioctl(ps->kp_fd, DP_POLL, &poll_res); -#elif ERTS_POLL_USE_POLL && defined(HAVE_PPOLL) /* --- ppoll ---------------- */ - struct timespec ts; - timeout = get_timeout_timespec(ps, &ts, timeout_time); - if (timeout) { - erts_thr_progress_prepare_wait(NULL); - ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_SLEEP); - } - res = ppoll(ps->poll_fds, ps->no_poll_fds, &ts, NULL); #elif ERTS_POLL_USE_POLL /* --- poll --------------------------------- */ - timeout = (int) get_timeout(ps, 1000, timeout_time); - if (timeout) { - erts_thr_progress_prepare_wait(NULL); - ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_SLEEP); - } - res = poll(ps->poll_fds, ps->no_poll_fds, timeout); + res = poll(ps->poll_fds, ps->no_poll_fds, timeout); + #elif ERTS_POLL_USE_SELECT /* --- select ------------------------------ */ - SysTimeval to; - timeout = get_timeout_timeval(ps, &to, timeout_time); + SysTimeval tv = {0, 0}; + SysTimeval *tvp = timeout ? NULL : &tv; - ERTS_FD_COPY(&ps->input_fds, &ps->res_input_fds); - ERTS_FD_COPY(&ps->output_fds, &ps->res_output_fds); + ERTS_FD_COPY(&ps->input_fds, &ps->res_input_fds); + ERTS_FD_COPY(&ps->output_fds, &ps->res_output_fds); - if (timeout) { - erts_thr_progress_prepare_wait(NULL); - ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_SLEEP); - } - res = ERTS_SELECT(ps->max_fd + 1, - &ps->res_input_fds, - &ps->res_output_fds, - NULL, - &to); - if (timeout) { - erts_thr_progress_finalize_wait(NULL); - ERTS_MSACC_POP_STATE_M(); - } - if (res < 0 - && errno == EBADF - && ERTS_POLLSET_HAVE_UPDATE_REQUESTS(ps)) { - /* - * This may have happened because another thread deselected - * a fd in our poll set and then closed it, i.e. the driver - * behaved correctly. We wan't to avoid looking for a bad - * fd, that may even not exist anymore. Therefore, handle - * update requests and try again. - * - * We don't know how much of the timeout is left; therfore, - * we use a zero timeout. If no error occur and no events - * have triggered, we fake an EAGAIN error and let the caller - * restart us. - */ - to.tv_sec = 0; - to.tv_usec = 0; - ERTS_POLLSET_LOCK(ps); - handle_update_requests(ps); - ERTS_POLLSET_UNLOCK(ps); - res = ERTS_SELECT(ps->max_fd + 1, - &ps->res_input_fds, - &ps->res_output_fds, - NULL, - &to); - if (res == 0) { - errno = EAGAIN; - res = -1; - } - } - return res; + res = ERTS_SELECT(ps->max_fd + 1, + &ps->res_input_fds, + &ps->res_output_fds, + NULL, + tvp); #endif /* ----------------------------------------- */ - } - if (timeout) { - erts_thr_progress_finalize_wait(NULL); - ERTS_MSACC_POP_STATE_M(); - } - return res; } + DEBUG_PRINT_WAIT("Leaving check_fd_events(), res=%d", ps, res); + return res; } int -ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps, +ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, ErtsPollResFd pr[], - int *len, - ErtsMonotonicTime timeout_time) + int *len) { - ErtsMonotonicTime to; - int res, no_fds; + int res, no_fds, used_fds = 0; int ebadf = 0; + int do_wait; int ps_locked = 0; + ERTS_MSACC_DECLARE_CACHE(); no_fds = *len; -#ifdef ERTS_POLL_MAX_RES - if (no_fds >= ERTS_POLL_MAX_RES) - no_fds = ERTS_POLL_MAX_RES; -#endif - *len = 0; + ASSERT(no_fds > 0); -#ifdef ERTS_POLL_DEBUG_PRINT - erts_printf("Entering erts_poll_wait(), timeout_time=%bps\n", - timeout_time); -#endif +#if !ERTS_POLL_USE_CONCURRENT_UPDATE + if (ERTS_POLLSET_HAVE_UPDATE_REQUESTS(ps)) { + ERTS_POLLSET_LOCK(ps); + used_fds = handle_update_requests(ps, pr, no_fds); + ERTS_POLLSET_UNLOCK(ps); - if (ERTS_POLLSET_SET_POLLED_CHK(ps)) { - res = EINVAL; /* Another thread is in erts_poll_wait() - on this pollset... */ - goto done; + if (used_fds == no_fds) { + *len = used_fds; + return 0; + } } +#endif - to = (is_woken(ps) - ? ERTS_POLL_NO_TIMEOUT /* Use zero timeout */ - : timeout_time); + do_wait = !is_woken(ps) && used_fds == 0; - if (ERTS_POLLSET_HAVE_UPDATE_REQUESTS(ps)) { - ERTS_POLLSET_LOCK(ps); - handle_update_requests(ps); - ERTS_POLLSET_UNLOCK(ps); + DEBUG_PRINT_WAIT("Entering %s(), do_wait=%d", ps, __FUNCTION__, do_wait); + + if (do_wait) { + erts_thr_progress_prepare_wait(NULL); + ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_SLEEP); } while (1) { - res = check_fd_events(ps, to, no_fds); + res = check_fd_events(ps, pr + used_fds, do_wait, no_fds - used_fds); + +#if !ERTS_POLL_USE_CONCURRENT_UPDATE + if (res < 0 + && errno == EBADF + && ERTS_POLLSET_HAVE_UPDATE_REQUESTS(ps)) { + /* + * This may have happened because another thread deselected + * a fd in our poll set and then closed it, i.e. the driver + * behaved correctly. We wan't to avoid looking for a bad + * fd, that may even not exist anymore. Therefore, handle + * update requests and try again. This behaviour should only + * happen when using SELECT as the polling mechanism. + */ + ERTS_POLLSET_LOCK(ps); + used_fds += handle_update_requests(ps, pr + used_fds, no_fds - used_fds); + if (used_fds == no_fds) { + *len = used_fds; + ERTS_POLLSET_UNLOCK(ps); + return 0; + } + res = check_fd_events(ps, pr + used_fds, 0, no_fds - used_fds); + /* Keep the lock over the non-blocking poll in order to not + get any nasty races happening. */ + ERTS_POLLSET_UNLOCK(ps); + if (res == 0) { + errno = EAGAIN; + res = -1; + } + } +#endif + if (res != 0) break; - if (to == ERTS_POLL_NO_TIMEOUT) - break; - if (erts_get_monotonic_time(NULL) >= timeout_time) - break; + if (!do_wait) + break; + } + + if (do_wait) { + erts_thr_progress_finalize_wait(NULL); + ERTS_MSACC_UPDATE_CACHE(); + ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_CHECK_IO); } woke_up(ps); - if (res == 0) { - res = ETIMEDOUT; - } - else if (res < 0) { + if (res < 0) { #if ERTS_POLL_USE_SELECT if (errno == EBADF) { ebadf = 1; @@ -2363,26 +1666,21 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps, ps_locked = 1; ERTS_POLLSET_LOCK(ps); - no_fds = save_poll_result(ps, pr, no_fds, res, ebadf); + used_fds += ERTS_POLL_EXPORT(save_result)(ps, pr + used_fds, no_fds - used_fds, res, ebadf); #ifdef HARD_DEBUG - check_poll_result(pr, no_fds); + check_poll_result(pr, used_fds); #endif - res = (no_fds == 0 ? (is_interrupted_reset(ps) ? EINTR : EAGAIN) : 0); - *len = no_fds; + res = (used_fds == 0 ? (is_interrupted_reset(ps) ? EINTR : EAGAIN) : 0); + *len = used_fds; } if (ps_locked) ERTS_POLLSET_UNLOCK(ps); - ERTS_POLLSET_UNSET_POLLED(ps); - done: - set_timeout_time(ps, ERTS_MONOTONIC_TIME_MAX); -#ifdef ERTS_POLL_DEBUG_PRINT - erts_printf("Leaving %s = erts_poll_wait()\n", - res == 0 ? "0" : erl_errno_id(res)); -#endif + DEBUG_PRINT_WAIT("Leaving %s = %s(len = %d)", ps, + res == 0 ? "0" : erl_errno_id(res), __FUNCTION__, *len); return res; } @@ -2392,40 +1690,14 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps, */ void -ERTS_POLL_EXPORT(erts_poll_interrupt)(ErtsPollSet ps, int set) +ERTS_POLL_EXPORT(erts_poll_interrupt)(ErtsPollSet *ps, int set) { +#if !ERTS_POLL_USE_CONCURRENT_UPDATE if (!set) reset_wakeup_state(ps); else - wake_poller(ps, 1, 0); -} - - -/* - * erts_poll_interrupt_timed(): - * If 'set' != 0, interrupt thread blocked in erts_poll_wait() if it - * is not guaranteed that it will timeout before 'msec' milli seconds. - */ -void -ERTS_POLL_EXPORT(erts_poll_interrupt_timed)(ErtsPollSet ps, - int set, - ErtsMonotonicTime timeout_time) -{ - if (!set) - reset_wakeup_state(ps); - else { - ErtsMonotonicTime max_wait_time = get_timeout_time(ps); - if (max_wait_time > timeout_time) - wake_poller(ps, 1, 0); -#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS - else { - if (ERTS_POLLSET_IS_POLLED(ps)) - erts_atomic_inc_nob(&ps->no_avoided_wakeups); - erts_atomic_inc_nob(&ps->no_avoided_interrupts); - } - erts_atomic_inc_nob(&ps->no_interrupt_timed); + wake_poller(ps, 1); #endif - } } int @@ -2438,14 +1710,19 @@ ERTS_POLL_EXPORT(erts_poll_max_fds)(void) */ void -ERTS_POLL_EXPORT(erts_poll_init)(void) +ERTS_POLL_EXPORT(erts_poll_init)(int *concurrent_updates) { - erts_mtx_init(&pollsets_lock, "pollsets_lock", NIL, - ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_IO); - pollsets = NULL; errno = 0; + if (concurrent_updates) { +#if ERTS_POLL_USE_CONCURRENT_UPDATE + *concurrent_updates = 1; +#else + *concurrent_updates = 0; +#endif + } + #if !defined(NO_SYSCONF) max_fds = sysconf(_SC_OPEN_MAX); #elif ERTS_POLL_USE_SELECT @@ -2464,37 +1741,28 @@ ERTS_POLL_EXPORT(erts_poll_init)(void) fatal_error("erts_poll_init(): Failed to get max number of files: %s\n", erl_errno_id(errno)); -#ifdef ERTS_POLL_DEBUG_PRINT print_misc_debug_info(); -#endif } -ErtsPollSet -ERTS_POLL_EXPORT(erts_poll_create_pollset)(void) +ErtsPollSet * +ERTS_POLL_EXPORT(erts_poll_create_pollset)(int id) { #if ERTS_POLL_USE_KERNEL_POLL int kp_fd; #endif - ErtsPollSet ps = erts_alloc(ERTS_ALC_T_POLLSET, + ErtsPollSet *ps = erts_alloc(ERTS_ALC_T_POLLSET, sizeof(struct ERTS_POLL_EXPORT(erts_pollset))); + ps->id = id; ps->internal_fd_limit = 0; - ps->fds_status = NULL; - ps->fds_status_len = 0; erts_atomic_init_nob(&ps->no_of_user_fds, 0); #if ERTS_POLL_USE_KERNEL_POLL ps->kp_fd = -1; #if ERTS_POLL_USE_EPOLL kp_fd = epoll_create(256); - ps->res_events_len = 0; - ps->res_events = NULL; #elif ERTS_POLL_USE_DEVPOLL kp_fd = open("/dev/poll", O_RDWR); - ps->res_events_len = 0; - ps->res_events = NULL; #elif ERTS_POLL_USE_KQUEUE kp_fd = kqueue(); - ps->res_events_len = 0; - ps->res_events = NULL; #endif if (kp_fd < 0) fatal_error("erts_poll_create_pollset(): Failed to " @@ -2508,10 +1776,6 @@ ERTS_POLL_EXPORT(erts_poll_create_pollset)(void) ": %s (%d)\n", erl_errno_id(errno), errno); #endif /* ERTS_POLL_USE_KERNEL_POLL */ -#if ERTS_POLL_USE_BATCH_UPDATE_POLLSET - /* res_events is also used as write buffer */ - grow_res_events(ps, ERTS_POLL_MIN_BATCH_BUF_SIZE); -#endif #if ERTS_POLL_USE_POLL ps->next_poll_fds_ix = 0; ps->no_poll_fds = 0; @@ -2520,9 +1784,6 @@ ERTS_POLL_EXPORT(erts_poll_create_pollset)(void) #elif ERTS_POLL_USE_SELECT ps->next_sel_fd = 0; ps->max_fd = -1; -#if ERTS_POLL_USE_FALLBACK - ps->no_select_fds = 0; -#endif #ifdef _DARWIN_UNLIMITED_SELECT ps->input_fds.sz = 0; ps->input_fds.ptr = NULL; @@ -2539,140 +1800,48 @@ ERTS_POLL_EXPORT(erts_poll_create_pollset)(void) ERTS_FD_ZERO(&ps->res_output_fds); #endif #endif +#if !ERTS_POLL_USE_CONCURRENT_UPDATE + ps->fds_status = NULL; + ps->fds_status_len = 0; ps->update_requests.next = NULL; ps->update_requests.len = 0; ps->curr_upd_req_block = &ps->update_requests; erts_atomic32_init_nob(&ps->have_update_requests, 0); - erts_atomic32_init_nob(&ps->polled, 0); erts_mtx_init(&ps->mtx, "pollset", NIL, ERTS_LOCK_FLAGS_CATEGORY_IO); - erts_atomic32_init_nob(&ps->wakeup_state, (erts_aint32_t) 0); - create_wakeup_pipe(ps); -#if ERTS_POLL_USE_TIMERFD - create_timerfd(ps); -#endif -#if ERTS_POLL_USE_FALLBACK - if (kp_fd >= ps->fds_status_len) - grow_fds_status(ps, kp_fd); - /* Force kernel poll fd into fallback (poll/select) set */ - ps->fds_status[kp_fd].flags - |= ERTS_POLL_FD_FLG_INFLBCK|ERTS_POLL_FD_FLG_USEFLBCK; - { - int do_wake = 0; - ERTS_POLL_EXPORT(erts_poll_control)(ps, kp_fd, ERTS_POLL_EV_IN, 1, - &do_wake); - } #endif #if ERTS_POLL_USE_KERNEL_POLL if (ps->internal_fd_limit <= kp_fd) ps->internal_fd_limit = kp_fd + 1; ps->kp_fd = kp_fd; #endif - init_timeout_time(ps); -#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS - erts_atomic_init_nob(&ps->no_avoided_wakeups, 0); - erts_atomic_init_nob(&ps->no_avoided_interrupts, 0); - erts_atomic_init_nob(&ps->no_interrupt_timed, 0); -#endif - handle_update_requests(ps); -#if ERTS_POLL_USE_FALLBACK - ps->fallback_used = 0; +#if !ERTS_POLL_USE_CONCURRENT_UPDATE + erts_atomic32_init_nob(&ps->wakeup_state, (erts_aint32_t) 0); + create_wakeup_pipe(ps); + handle_update_requests(ps, NULL, 0); + cleanup_wakeup_pipe(ps); #endif erts_atomic_set_nob(&ps->no_of_user_fds, 0); /* Don't count wakeup pipe and fallback fd */ - erts_mtx_lock(&pollsets_lock); - ps->next = pollsets; - pollsets = ps; - erts_mtx_unlock(&pollsets_lock); - return ps; } -void -ERTS_POLL_EXPORT(erts_poll_destroy_pollset)(ErtsPollSet ps) -{ - - if (ps->fds_status) - erts_free(ERTS_ALC_T_FD_STATUS, (void *) ps->fds_status); - -#if ERTS_POLL_USE_EPOLL - if (ps->kp_fd >= 0) - close(ps->kp_fd); - if (ps->res_events) - erts_free(ERTS_ALC_T_POLL_RES_EVS, (void *) ps->res_events); -#elif ERTS_POLL_USE_DEVPOLL - if (ps->kp_fd >= 0) - close(ps->kp_fd); - if (ps->res_events) - erts_free(ERTS_ALC_T_POLL_RES_EVS, (void *) ps->res_events); -#elif ERTS_POLL_USE_POLL - if (ps->poll_fds) - erts_free(ERTS_ALC_T_POLL_FDS, (void *) ps->poll_fds); -#elif ERTS_POLL_USE_SELECT -#ifdef _DARWIN_UNLIMITED_SELECT - if (ps->input_fds.ptr) - erts_free(ERTS_ALC_T_SELECT_FDS, (void *) ps->input_fds.ptr); - if (ps->res_input_fds.ptr) - erts_free(ERTS_ALC_T_SELECT_FDS, (void *) ps->res_input_fds.ptr); - if (ps->output_fds.ptr) - erts_free(ERTS_ALC_T_SELECT_FDS, (void *) ps->output_fds.ptr); - if (ps->res_output_fds.ptr) - erts_free(ERTS_ALC_T_SELECT_FDS, (void *) ps->res_output_fds.ptr); -#endif -#endif - { - ErtsPollSetUpdateRequestsBlock *urqbp = ps->update_requests.next; - while (urqbp) { - ErtsPollSetUpdateRequestsBlock *free_urqbp = urqbp; - urqbp = urqbp->next; - free_update_requests_block(ps, free_urqbp); - } - } - erts_mtx_destroy(&ps->mtx); - if (ps->wake_fds[0] >= 0) - close(ps->wake_fds[0]); - if (ps->wake_fds[1] >= 0) - close(ps->wake_fds[1]); -#if ERTS_POLL_USE_TIMERFD - if (ps->timer_fd >= 0) - close(ps->timer_fd); -#endif - - erts_mtx_lock(&pollsets_lock); - if (ps == pollsets) - pollsets = pollsets->next; - else { - ErtsPollSet prev_ps; - for (prev_ps = pollsets; ps != prev_ps->next; prev_ps = prev_ps->next) - ; - ASSERT(ps == prev_ps->next); - prev_ps->next = ps->next; - } - erts_mtx_unlock(&pollsets_lock); - - erts_free(ERTS_ALC_T_POLLSET, (void *) ps); -} - /* * --- Info ------------------------------------------------------------------ */ void -ERTS_POLL_EXPORT(erts_poll_info)(ErtsPollSet ps, ErtsPollInfo *pip) +ERTS_POLL_EXPORT(erts_poll_info)(ErtsPollSet *ps, ErtsPollInfo *pip) { +#if !ERTS_POLL_USE_CONCURRENT_UPDATE int pending_updates; +#endif Uint size = 0; ERTS_POLLSET_LOCK(ps); size += sizeof(struct ERTS_POLL_EXPORT(erts_pollset)); +#if !ERTS_POLL_USE_CONCURRENT_UPDATE size += ps->fds_status_len*sizeof(ErtsFdStatus); - -#if ERTS_POLL_USE_EPOLL - size += ps->res_events_len*sizeof(struct epoll_event); -#elif ERTS_POLL_USE_DEVPOLL - size += ps->res_events_len*sizeof(struct pollfd); -#elif ERTS_POLL_USE_KQUEUE - size += ps->res_events_len*sizeof(struct kevent); #endif #if ERTS_POLL_USE_POLL @@ -2684,6 +1853,7 @@ ERTS_POLL_EXPORT(erts_poll_info)(ErtsPollSet ps, ErtsPollInfo *pip) #endif #endif +#if !ERTS_POLL_USE_CONCURRENT_UPDATE { ErtsPollSetUpdateRequestsBlock *urqbp = ps->update_requests.next; pending_updates = ps->update_requests.len; @@ -2693,8 +1863,9 @@ ERTS_POLL_EXPORT(erts_poll_info)(ErtsPollSet ps, ErtsPollInfo *pip) urqbp = urqbp->next; } } +#endif - pip->primary = + pip->primary = #if ERTS_POLL_USE_KQUEUE "kqueue" #elif ERTS_POLL_USE_EPOLL @@ -2708,17 +1879,7 @@ ERTS_POLL_EXPORT(erts_poll_info)(ErtsPollSet ps, ErtsPollInfo *pip) #endif ; - pip->fallback = -#if !ERTS_POLL_USE_FALLBACK - NULL -#elif ERTS_POLL_USE_POLL - "poll" -#elif ERTS_POLL_USE_SELECT - "select" -#endif - ; - - pip->kernel_poll = + pip->kernel_poll = #if !ERTS_POLL_USE_KERNEL_POLL NULL #elif ERTS_POLL_USE_KQUEUE @@ -2733,40 +1894,21 @@ ERTS_POLL_EXPORT(erts_poll_info)(ErtsPollSet ps, ErtsPollInfo *pip) pip->memory_size = size; pip->poll_set_size = (int) erts_atomic_read_nob(&ps->no_of_user_fds); +#if !ERTS_POLL_USE_CONCURRENT_UPDATE pip->poll_set_size++; /* Wakeup pipe */ -#if ERTS_POLL_USE_TIMERFD - pip->poll_set_size++; /* timerfd */ -#endif - - pip->fallback_poll_set_size = -#if !ERTS_POLL_USE_FALLBACK - 0 -#elif ERTS_POLL_USE_POLL - ps->no_poll_fds -#elif ERTS_POLL_USE_SELECT - ps->no_select_fds -#endif - ; - -#if ERTS_POLL_USE_FALLBACK - /* If only kp_fd is in fallback poll set we don't use fallback... */ - if (pip->fallback_poll_set_size == 1) - pip->fallback_poll_set_size = 0; - else - pip->poll_set_size++; /* kp_fd */ #endif pip->lazy_updates = +#if !ERTS_POLL_USE_CONCURRENT_UPDATE 1 +#else + 0 +#endif ; pip->pending_updates = +#if !ERTS_POLL_USE_CONCURRENT_UPDATE pending_updates - ; - - pip->batch_updates = -#if ERTS_POLL_USE_BATCH_UPDATE_POLLSET - 1 #else 0 #endif @@ -2780,13 +1922,15 @@ ERTS_POLL_EXPORT(erts_poll_info)(ErtsPollSet ps, ErtsPollInfo *pip) #endif ; - pip->max_fds = max_fds; - -#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS - pip->no_avoided_wakeups = erts_atomic_read_nob(&ps->no_avoided_wakeups); - pip->no_avoided_interrupts = erts_atomic_read_nob(&ps->no_avoided_interrupts); - pip->no_interrupt_timed = erts_atomic_read_nob(&ps->no_interrupt_timed); + pip->is_fallback = +#if ERTS_POLL_IS_FALLBACK + 1 +#else + 0 #endif + ; + + pip->max_fds = max_fds; ERTS_POLLSET_UNLOCK(ps); @@ -2822,35 +1966,60 @@ fatal_error(char *format, ...) abort(); } -static void -fatal_error_async_signal_safe(char *error_str) +/* + * --- Debug ----------------------------------------------------------------- + */ + +#if ERTS_POLL_USE_EPOLL +uint32_t epoll_events(int kp_fd, int fd) { - if (ERTS_SOMEONE_IS_CRASH_DUMPING || ERTS_GOT_SIGUSR1) { - /* See comment above in fatal_error() */ - return; + /* For epoll we read the information about what is selected upon from the proc fs.*/ + char fname[30]; + FILE *f; + unsigned int pos, flags, mnt_id; + int line = 0; + sprintf(fname,"/proc/%d/fdinfo/%d",getpid(), kp_fd); + f = fopen(fname,"r"); + if (!f) { + fprintf(stderr,"failed to open file %s, errno = %d\n", fname, errno); + ASSERT(0); + return 0; } - if (error_str) { - int len = 0; - while (error_str[len]) - len++; - if (len) { - /* async signal safe */ - erts_silence_warn_unused_result(write(2, error_str, len)); - } + if (fscanf(f,"pos:\t%x\nflags:\t%x", &pos, &flags) != 2) { + fprintf(stderr,"failed to parse file %s, errno = %d\n", fname, errno); + ASSERT(0); + return 0; } - abort(); + if (fscanf(f,"\nmnt_id:\t%x\n", &mnt_id)); + line += 3; + while (!feof(f)) { + /* tfd: 10 events: 40000019 data: 180000000a */ + int ev_fd; + uint32_t events; + uint64_t data; + if (fscanf(f,"tfd:%d events:%x data:%lx\n", &ev_fd, &events, &data) != 3) { + fprintf(stderr,"failed to parse file %s on line %d, errno = %d\n", fname, + line, + errno); + return 0; + } + if (fd == ev_fd) { + fclose(f); + return events; + } + } + fclose(f); + return 0; } - -/* - * --- Debug ----------------------------------------------------------------- - */ +#endif void -ERTS_POLL_EXPORT(erts_poll_get_selected_events)(ErtsPollSet ps, +ERTS_POLL_EXPORT(erts_poll_get_selected_events)(ErtsPollSet *ps, ErtsPollEvents ev[], int len) { int fd; +#if !ERTS_POLL_USE_CONCURRENT_UPDATE ERTS_POLLSET_LOCK(ps); for (fd = 0; fd < len; fd++) { if (fd >= ps->fds_status_len) @@ -2859,9 +2028,6 @@ ERTS_POLL_EXPORT(erts_poll_get_selected_events)(ErtsPollSet ps, ev[fd] = ps->fds_status[fd].events; if ( fd == ps->wake_fds[0] || fd == ps->wake_fds[1] || -#if ERTS_POLL_USE_TIMERFD - fd == ps->timer_fd || -#endif #if ERTS_POLL_USE_KERNEL_POLL fd == ps->kp_fd || #endif @@ -2870,7 +2036,50 @@ ERTS_POLL_EXPORT(erts_poll_get_selected_events)(ErtsPollSet ps, } } ERTS_POLLSET_UNLOCK(ps); +#elif ERTS_POLL_USE_EPOLL + /* For epoll we read the information about what is selected upon from the proc fs.*/ + char fname[30]; + FILE *f; + unsigned int pos, flags, mnt_id; + int line = 0; + sprintf(fname,"/proc/%d/fdinfo/%d",getpid(), ps->kp_fd); + for (fd = 0; fd < len; fd++) + ev[fd] = ERTS_POLL_EV_NONE; + f = fopen(fname,"r"); + if (!f) { + fprintf(stderr,"failed to open file %s, errno = %d\n", fname, errno); + return; + } + if (fscanf(f,"pos:\t%x\nflags:\t%x", &pos, &flags) != 2) { + fprintf(stderr,"failed to parse file %s, errno = %d\n", fname, errno); + ASSERT(0); + return; + } + if (fscanf(f,"\nmnt_id:\t%x\n", &mnt_id)); + line += 3; + while (!feof(f)) { + /* tfd: 10 events: 40000019 data: 180000000a */ + int fd; + uint32_t events; + uint64_t data; + if (fscanf(f,"tfd:%d events:%x data:%lx\n", &fd, &events, &data) != 3) { + fprintf(stderr,"failed to parse file %s on line %d, errno = %d\n", + fname, line, errno); + ASSERT(0); + return; + } + data &= 0xFFFFFFFF; + ASSERT(fd == data); + /* Events are the events that are being monitored, which of course include + error and hup events, but we are only interested in IN/OUT events */ + ev[fd] = (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT) & ERTS_POLL_EV_N2E(events); + line++; + } +#else + for (fd = 0; fd < len; fd++) + ev[fd] = ERTS_POLL_EV_NONE; +#endif } #ifdef HARD_DEBUG @@ -2890,10 +2099,10 @@ check_poll_result(ErtsPollResFd pr[], int len) } -#if ERTS_POLL_USE_DEVPOLL +#if ERTS_POLL_USE_DEVPOLL && defined(DEBUG) static void -check_poll_status(ErtsPollSet ps) +check_poll_status(ErtsPollSet *ps) { int i; for (i = 0; i < ps->fds_status_len; i++) { @@ -2925,30 +2134,24 @@ check_poll_status(ErtsPollSet ps) #endif /* ERTS_POLL_USE_DEVPOLL */ #endif /* HARD_DEBUG */ -#ifdef ERTS_POLL_DEBUG_PRINT static void print_misc_debug_info(void) { - erts_printf("erts_poll using: %s lazy_updates:%s batch_updates:%s\n", +#if ERTS_POLL_DEBUG_PRINT + erts_printf("erts_poll using: %s lazy_updates:%s\n", #if ERTS_POLL_USE_KQUEUE "kqueue" #elif ERTS_POLL_USE_EPOLL "epoll" #elif ERTS_POLL_USE_DEVPOLL "/dev/poll" -#endif -#if ERTS_POLL_USE_FALLBACK - "-" -#endif -#if ERTS_POLL_USE_POLL +#elif ERTS_POLL_USE_POLL "poll" #elif ERTS_POLL_USE_SELECT "select" #endif , - "true" - , -#if ERTS_POLL_USE_BATCH_UPDATE_POLLSET +#if !ERTS_POLL_USE_CONCURRENT_UPDATE "true" #else "false" @@ -2967,29 +2170,20 @@ print_misc_debug_info(void) #ifdef FD_SETSIZE erts_printf("FD_SETSIZE=%d\n", FD_SETSIZE); #endif -} - #endif +} #ifdef ERTS_ENABLE_LOCK_COUNT -static void erts_lcnt_enable_pollset_lock_count(ErtsPollSet pollset, int enable) { +void ERTS_POLL_EXPORT(erts_lcnt_enable_pollset_lock_count)(ErtsPollSet *pollset, int enable) +{ +#if !ERTS_POLL_USE_CONCURRENT_UPDATE if(enable) { erts_lcnt_install_new_lock_info(&pollset->mtx.lcnt, "pollset_rm", NIL, ERTS_LOCK_TYPE_MUTEX | ERTS_LOCK_FLAGS_CATEGORY_IO); } else { erts_lcnt_uninstall(&pollset->mtx.lcnt); } -} - -void ERTS_POLL_EXPORT(erts_lcnt_update_pollset_locks)(int enable) { - ErtsPollSet iterator; - - erts_mtx_lock(&pollsets_lock); - - for(iterator = pollsets; iterator != NULL; iterator = iterator->next) { - erts_lcnt_enable_pollset_lock_count(iterator, enable); - } - - erts_mtx_unlock(&pollsets_lock); +#endif + return; } #endif |