/* * %CopyrightBegin% * * Copyright Ericsson AB 2006-2016. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * %CopyrightEnd% */ /** * @description Poll interface suitable for ERTS * * The interface is currently implemented using: * - select * - poll * - /dev/poll * - epoll with poll or select as fallback * - kqueue with poll or select as fallback * * * @author Rickard Green * @author Lukas Larsson * * There are two major different implementations off IO polling in this * file. The concurrent and non-concurrent implementations. * When available epoll/kqueue are used to implement the concurrent * versions. poll, select and dev/poll use non-concurrent updates. * * Concurrent version: * In the concurrent version erts_poll_control directly modifies * the kernel pollset without waking the thread that is waiting * on events. Also the ErtsPollResFd type is directly mapped to * the native event type, so no extra copying is needed. Note that * as no locking at all is done, fds can be triggered that have been * removed from the pollset. The check_io layer has to deal with this. * * Non-concurrent version: * In the non-concurrent version, the pollset has an internal representation * of the pollset that is updated by erts_poll_control. When an fd is updated, * its number is placed in the update request queue and then the waiting thread * is woken in order to see the change. The internal data in the pollset is * protected by a mutex that has to be taken by both the modifying and waiting * thread at different times. * * The non-concurrent pollset cannot have fd's closed in it while a thread is * waiting on that fd. In order to fix this, when an ERTS_POLL_OP_DEL command * is issued, the fd is marked as closing and the waiting thread is woken. The * fd is then returned in the waiting threads results as ERTS_POLL_EV_NONE. * */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #if defined(__DARWIN__) || defined(__APPLE__) && defined(__MACH__) /* Setting _DARWIN_UNLIMITED_SELECT before including sys/select.h enables * the version of select() that does not place a limit on the fd_set. */ # define _DARWIN_UNLIMITED_SELECT #endif #ifndef WANT_NONBLOCKING # define WANT_NONBLOCKING #endif #include "erl_poll.h" #if ERTS_POLL_USE_KQUEUE # include # include # include #endif #if ERTS_POLL_USE_SELECT # ifdef SYS_SELECT_H # include # endif #elif defined(_DARWIN_UNLIMITED_SELECT) # undef _DARWIN_UNLIMITED_SELECT #endif #ifdef NO_SYSCONF # if ERTS_POLL_USE_SELECT # include # else # include # endif #endif #include "erl_thr_progress.h" #include "erl_driver.h" #include "erl_alloc.h" #include "erl_msacc.h" #include "erl_misc_utils.h" #if !defined(ERTS_POLL_USE_EPOLL) \ && !defined(ERTS_POLL_USE_DEVPOLL) \ && !defined(ERTS_POLL_USE_POLL) \ && !defined(ERTS_POLL_USE_SELECT) #error "Missing implementation of erts_poll()" #endif #if 0 #define ERTS_POLL_DEBUG_PRINT 1 #define DEBUG_PRINT(FMT, PS, ...) \ do { \ int myerrno = errno; \ erts_printf("%d: " FMT "\r\n", (PS)->id, ##__VA_ARGS__); \ errno = myerrno; \ } while(0) /* Define to print info about modifications done to each fd */ #define DEBUG_PRINT_FD(FMT, PS, FD, ...) DEBUG_PRINT("%d: " FMT, PS, FD, ##__VA_ARGS__) /* Define to print entry and exit from erts_poll_wait (can be very spammy) */ //#define DEBUG_PRINT_WAIT(FMT, PS, ...) DEBUG_PRINT(FMT, PS, ##__VA_ARGS__) #else #define ERTS_POLL_DEBUG_PRINT 0 #define DEBUG_PRINT(...) #endif #ifndef DEBUG_PRINT_FD #define DEBUG_PRINT_FD(...) #endif #ifndef DEBUG_PRINT_WAIT #define DEBUG_PRINT_WAIT(...) #endif #if defined(_DARWIN_UNLIMITED_SELECT) && ERTS_POLL_USE_SELECT typedef struct { size_t sz; fd_set* ptr; }ERTS_fd_set; # define ERTS_FD_CLR(fd, fds) FD_CLR((fd), (fds)->ptr) # define ERTS_FD_SET(fd, fds) FD_SET((fd), (fds)->ptr) # define ERTS_FD_ISSET(fd,fds) FD_ISSET((fd), (fds)->ptr) # define ERTS_FD_ZERO(fds) memset((fds)->ptr, 0, (fds)->sz) # define ERTS_FD_SIZE(n) ((((n)+NFDBITS-1)/NFDBITS)*sizeof(fd_mask)) static void ERTS_FD_COPY(ERTS_fd_set *src, ERTS_fd_set *dst) { if (dst->sz != src->sz) { dst->ptr = dst->ptr ? erts_realloc(ERTS_ALC_T_SELECT_FDS, dst->ptr, src->sz) : erts_alloc(ERTS_ALC_T_SELECT_FDS, src->sz); dst->sz = src->sz; } memcpy(dst->ptr, src->ptr, src->sz); } static ERTS_INLINE int ERTS_SELECT(int nfds, ERTS_fd_set *readfds, ERTS_fd_set *writefds, ERTS_fd_set *exceptfds, struct timeval *timeout) { ASSERT(!readfds || readfds->sz >= ERTS_FD_SIZE(nfds)); ASSERT(!writefds || writefds->sz >= ERTS_FD_SIZE(nfds)); ASSERT(!exceptfds); return select(nfds, (readfds ? readfds->ptr : NULL ), (writefds ? writefds->ptr : NULL), NULL, timeout); } #else /* !_DARWIN_UNLIMITED_SELECT */ # define ERTS_fd_set fd_set # define ERTS_FD_CLR FD_CLR # define ERTS_FD_ISSET FD_ISSET # define ERTS_FD_SET FD_SET # define ERTS_FD_ZERO FD_ZERO # define ERTS_FD_COPY(src,dst) (*(dst) = *(src)) # define ERTS_SELECT select #endif #define ERTS_POLL_IS_FALLBACK (ERTS_POLL_USE_POLL || ERTS_POLL_USE_SELECT) && ERTS_ENABLE_KERNEL_POLL #define ERTS_POLL_USE_CONCURRENT_UPDATE (ERTS_POLL_USE_EPOLL || ERTS_POLL_USE_KQUEUE) #if !ERTS_POLL_USE_CONCURRENT_UPDATE #define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS) \ erts_atomic32_set_nob(&(PS)->have_update_requests, (erts_aint32_t) 1) #define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS) \ erts_atomic32_set_nob(&(PS)->have_update_requests, (erts_aint32_t) 0) #define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) \ ((int) erts_atomic32_read_nob(&(PS)->have_update_requests)) #define ERTS_POLLSET_LOCK(PS) \ erts_mtx_lock(&(PS)->mtx) #define ERTS_POLLSET_UNLOCK(PS) \ erts_mtx_unlock(&(PS)->mtx) #else #define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS) #define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS) #define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) 0 #define ERTS_POLLSET_LOCK(PS) #define ERTS_POLLSET_UNLOCK(PS) #endif /* * --- Data types ------------------------------------------------------------ */ #if !ERTS_POLL_USE_CONCURRENT_UPDATE #define ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE 128 typedef struct ErtsPollSetUpdateRequestsBlock_ ErtsPollSetUpdateRequestsBlock; struct ErtsPollSetUpdateRequestsBlock_ { ErtsPollSetUpdateRequestsBlock *next; int len; int fds[ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE]; }; # define ERTS_POLL_FD_FLG_INURQ (((unsigned short) 1) << 0) # define ERTS_POLL_FD_FLG_RST (((unsigned short) 1) << 1) typedef struct { #if ERTS_POLL_USE_POLL int pix; #endif ErtsPollEvents used_events; ErtsPollEvents events; unsigned short flags; } ErtsFdStatus; #endif /* * This struct is not really exported, but it's nice to * get unique names in debugger for kp/nkp */ struct ERTS_POLL_EXPORT(erts_pollset) { int id; int internal_fd_limit; erts_atomic_t no_of_user_fds; #if ERTS_POLL_USE_KERNEL_POLL int kp_fd; #endif /* ERTS_POLL_USE_KERNEL_POLL */ #if ERTS_POLL_USE_POLL int next_poll_fds_ix; int no_poll_fds; int poll_fds_len; struct pollfd *poll_fds; #elif ERTS_POLL_USE_SELECT int next_sel_fd; int max_fd; ERTS_fd_set input_fds; ERTS_fd_set res_input_fds; ERTS_fd_set output_fds; ERTS_fd_set res_output_fds; #elif ERTS_POLL_USE_DEVPOLL struct pollfd *poll_fds; int poll_fds_ix; #endif #if !ERTS_POLL_USE_CONCURRENT_UPDATE ErtsFdStatus *fds_status; int fds_status_len; ErtsPollSetUpdateRequestsBlock update_requests; ErtsPollSetUpdateRequestsBlock *curr_upd_req_block; erts_atomic32_t have_update_requests; erts_mtx_t mtx; int wake_fds[2]; erts_atomic32_t wakeup_state; #endif }; void erts_silence_warn_unused_result(long unused); static void fatal_error(char *format, ...); static int max_fds = -1; #if ERTS_POLL_USE_POLL #if !ERTS_POLL_IS_FALLBACK static ERTS_INLINE short ev2pollev(ErtsPollEvents ev) { return ERTS_POLL_EV_E2N(ev); } static ERTS_INLINE ErtsPollEvents pollev2ev(short ev) { return ERTS_POLL_EV_N2E(ev); } #else /* ERTS_POLL_IS_FALLBACK */ static ERTS_INLINE short ev2pollev(ErtsPollEvents ev) { short res_ev = (short) 0; if (ev & ERTS_POLL_EV_IN) res_ev |= ERTS_POLL_EV_NKP_IN; if (ev & ERTS_POLL_EV_OUT) res_ev |= ERTS_POLL_EV_NKP_OUT; return res_ev; } static ERTS_INLINE ErtsPollEvents pollev2ev(short ev) { ErtsPollEvents res_ev = (ErtsPollEvents) 0; if (ev & ERTS_POLL_EV_NKP_IN) res_ev |= ERTS_POLL_EV_IN; if (ev & ERTS_POLL_EV_NKP_OUT) res_ev |= ERTS_POLL_EV_OUT; if (ev & ERTS_POLL_EV_NKP_ERR) res_ev |= ERTS_POLL_EV_ERR; if (ev & ERTS_POLL_EV_NKP_NVAL) res_ev |= ERTS_POLL_EV_NVAL; return res_ev; } #endif /* !ERTS_POLL_IS_FALLBACK */ #endif /* ERTS_POLL_USE_POLL */ #ifdef HARD_DEBUG static void check_poll_result(ErtsPollResFd pr[], int len); #endif /* HARD_DEBUG */ #if ERTS_POLL_USE_DEVPOLL && defined(DEBUG) static void check_poll_status(ErtsPollSet *ps); #endif /* ERTS_POLL_USE_DEVPOLL && DEBUG */ static void print_misc_debug_info(void); #if ERTS_POLL_USE_EPOLL uint32_t epoll_events(int kp_fd, int fd); #endif #define ERTS_POLL_NOT_WOKEN 0 #define ERTS_POLL_WOKEN -1 #define ERTS_POLL_WOKEN_INTR 1 #if !ERTS_POLL_USE_CONCURRENT_UPDATE static ERTS_INLINE void reset_wakeup_state(ErtsPollSet *ps) { erts_atomic32_set_mb(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN); } #endif static ERTS_INLINE int is_woken(ErtsPollSet *ps) { #if !ERTS_POLL_USE_CONCURRENT_UPDATE return erts_atomic32_read_acqb(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN; #else return 0; #endif } static ERTS_INLINE int is_interrupted_reset(ErtsPollSet *ps) { #if !ERTS_POLL_USE_CONCURRENT_UPDATE return (erts_atomic32_xchg_acqb(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN) == ERTS_POLL_WOKEN_INTR); #else return 0; #endif } static ERTS_INLINE void woke_up(ErtsPollSet *ps) { #if !ERTS_POLL_USE_CONCURRENT_UPDATE erts_aint32_t wakeup_state = erts_atomic32_read_acqb(&ps->wakeup_state); if (wakeup_state == ERTS_POLL_NOT_WOKEN) (void) erts_atomic32_cmpxchg_nob(&ps->wakeup_state, ERTS_POLL_WOKEN, ERTS_POLL_NOT_WOKEN); ASSERT(erts_atomic32_read_nob(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN); #endif } /* * --- Wakeup pipe ----------------------------------------------------------- */ #if !ERTS_POLL_USE_CONCURRENT_UPDATE static ERTS_INLINE void wake_poller(ErtsPollSet *ps, int interrupted) { int wake; erts_aint32_t wakeup_state; if (!interrupted) wakeup_state = erts_atomic32_cmpxchg_relb(&ps->wakeup_state, ERTS_POLL_WOKEN, ERTS_POLL_NOT_WOKEN); else wakeup_state = erts_atomic32_xchg_relb(&ps->wakeup_state, ERTS_POLL_WOKEN_INTR); wake = wakeup_state == ERTS_POLL_NOT_WOKEN; if (wake) { ssize_t res; if (ps->wake_fds[1] < 0) return; /* Not initialized yet */ do { /* write() is async-signal safe (according to posix) */ res = write(ps->wake_fds[1], "!", 1); } while (res < 0 && errno == EINTR); if (res <= 0 && errno != ERRNO_BLOCK) { fatal_error("%s:%d:wake_poller(): " "Failed to write to wakeup pipe fd=%d: " "%s (%d)\n", __FILE__, __LINE__, ps->wake_fds[1], erl_errno_id(errno), errno); } } } static ERTS_INLINE void cleanup_wakeup_pipe(ErtsPollSet *ps) { int intr = 0; int fd = ps->wake_fds[0]; int res; do { char buf[32]; res = read(fd, buf, sizeof(buf)); if (res > 0) intr = 1; } while (res > 0 || (res < 0 && errno == EINTR)); if (res < 0 && errno != ERRNO_BLOCK) { fatal_error("%s:%d:cleanup_wakeup_pipe(): " "Failed to read on wakeup pipe fd=%d: " "%s (%d)\n", __FILE__, __LINE__, fd, erl_errno_id(errno), errno); } if (intr) erts_atomic32_set_nob(&ps->wakeup_state, ERTS_POLL_WOKEN_INTR); } static void create_wakeup_pipe(ErtsPollSet *ps) { int do_wake = 0; int wake_fds[2]; ps->wake_fds[0] = -1; ps->wake_fds[1] = -1; if (pipe(wake_fds) < 0) { fatal_error("%s:%d:create_wakeup_pipe(): " "Failed to create pipe: %s (%d)\n", __FILE__, __LINE__, erl_errno_id(errno), errno); } SET_NONBLOCKING(wake_fds[0]); SET_NONBLOCKING(wake_fds[1]); DEBUG_PRINT("wakeup fds = {%d, %d}", ps, wake_fds[0], wake_fds[1]); ERTS_POLL_EXPORT(erts_poll_control)(ps, wake_fds[0], ERTS_POLL_OP_ADD, ERTS_POLL_EV_IN, &do_wake); if (ps->internal_fd_limit <= wake_fds[1]) ps->internal_fd_limit = wake_fds[1] + 1; if (ps->internal_fd_limit <= wake_fds[0]) ps->internal_fd_limit = wake_fds[0] + 1; ps->wake_fds[0] = wake_fds[0]; ps->wake_fds[1] = wake_fds[1]; } /* * --- Poll set update requests ---------------------------------------------- */ static ERTS_INLINE void enqueue_update_request(ErtsPollSet *ps, int fd) { ErtsPollSetUpdateRequestsBlock *urqbp; ASSERT(fd < ps->fds_status_len); if (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INURQ) return; if (ps->update_requests.len == 0) ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(ps); urqbp = ps->curr_upd_req_block; if (urqbp->len == ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE) { urqbp = erts_alloc(ERTS_ALC_T_POLLSET_UPDREQ, sizeof(ErtsPollSetUpdateRequestsBlock)); urqbp->next = ps->curr_upd_req_block; urqbp->len = 0; ps->curr_upd_req_block = urqbp; } ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_INURQ; urqbp->fds[urqbp->len++] = fd; } static ERTS_INLINE void free_update_requests_block(ErtsPollSet *ps, ErtsPollSetUpdateRequestsBlock *urqbp) { if (urqbp != &ps->update_requests) erts_free(ERTS_ALC_T_POLLSET_UPDREQ, (void *) urqbp); else { urqbp->len = 0; } } #endif /* !ERTS_POLL_USE_CONCURRENT_UPDATE */ /* * --- Growing poll set structures ------------------------------------------- */ #if !ERTS_NO_KERNEL_POLL_VERSION || !ERTS_ENABLE_KERNEL_POLL /* only one shared implementation */ #define ERTS_FD_TABLE_MIN_LENGTH 1024 #define ERTS_FD_TABLE_EXP_THRESHOLD (2048*1024) int erts_poll_new_table_len(int old_len, int need_len) { int new_len; ASSERT(need_len > old_len); if (need_len < ERTS_FD_TABLE_MIN_LENGTH) { new_len = ERTS_FD_TABLE_MIN_LENGTH; } else { new_len = old_len; do { if (new_len < ERTS_FD_TABLE_EXP_THRESHOLD) new_len *= 2; else new_len += ERTS_FD_TABLE_EXP_THRESHOLD; } while (new_len < need_len); } ASSERT(new_len >= need_len); return new_len; } #endif #if ERTS_POLL_USE_POLL static void grow_poll_fds(ErtsPollSet *ps, int min_ix) { int i; int new_len = erts_poll_new_table_len(ps->poll_fds_len, min_ix + 1); if (new_len > max_fds) new_len = max_fds; ps->poll_fds = (ps->poll_fds_len ? erts_realloc(ERTS_ALC_T_POLL_FDS, ps->poll_fds, sizeof(struct pollfd)*new_len) : erts_alloc(ERTS_ALC_T_POLL_FDS, sizeof(struct pollfd)*new_len)); for (i = ps->poll_fds_len; i < new_len; i++) { ps->poll_fds[i].fd = -1; ps->poll_fds[i].events = (short) 0; ps->poll_fds[i].revents = (short) 0; } ps->poll_fds_len = new_len; } #endif #ifdef _DARWIN_UNLIMITED_SELECT static void grow_select_fds(int fd, ERTS_fd_set* fds) { int new_len = erts_poll_new_table_len(fds->sz, fd + 1); if (new_len > max_fds) new_len = max_fds; new_len = ERTS_FD_SIZE(new_len); fds->ptr = fds->sz ? erts_realloc(ERTS_ALC_T_SELECT_FDS, fds->ptr, new_len) : erts_alloc(ERTS_ALC_T_SELECT_FDS, new_len); memset((char*)fds->ptr + fds->sz, 0, new_len - fds->sz); fds->sz = new_len; } static ERTS_INLINE void ensure_select_fds(int fd, ERTS_fd_set* in, ERTS_fd_set* out) { ASSERT(in->sz == out->sz); if (ERTS_FD_SIZE(fd+1) > in->sz) { grow_select_fds(fd, in); grow_select_fds(fd, out); } } #else # define ensure_select_fds(fd, in, out) do {} while(0) #endif /* _DARWIN_UNLIMITED_SELECT */ #if !ERTS_POLL_USE_CONCURRENT_UPDATE static void grow_fds_status(ErtsPollSet *ps, int min_fd) { int i; int new_len = erts_poll_new_table_len(ps->fds_status_len, min_fd + 1); ASSERT(min_fd < max_fds); if (new_len > max_fds) new_len = max_fds; ps->fds_status = (ps->fds_status_len ? erts_realloc(ERTS_ALC_T_FD_STATUS, ps->fds_status, sizeof(ErtsFdStatus)*new_len) : erts_alloc(ERTS_ALC_T_FD_STATUS, sizeof(ErtsFdStatus)*new_len)); for (i = ps->fds_status_len; i < new_len; i++) { #if ERTS_POLL_USE_POLL ps->fds_status[i].pix = -1; #endif ps->fds_status[i].used_events = (ErtsPollEvents) 0; ps->fds_status[i].events = (ErtsPollEvents) 0; ps->fds_status[i].flags = (unsigned short) 0; } ps->fds_status_len = new_len; } #endif /* * --- Selecting fd to poll on ----------------------------------------------- */ #if ERTS_POLL_USE_EPOLL static int update_pollset(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events) { int res; int epoll_op = EPOLL_CTL_MOD; struct epoll_event epe_templ; struct epoll_event epe; epe_templ.events = ERTS_POLL_EV_E2N(events) | EPOLLONESHOT; epe_templ.data.fd = fd; #ifdef VALGRIND /* Silence invalid valgrind warning ... */ memset((void *) &epe.data, 0, sizeof(epoll_data_t)); #endif switch (op) { case ERTS_POLL_OP_DEL: /* A note on EPOLL_CTL_DEL: linux kernel versions before 2.6.9 need a non-NULL event pointer even though it is ignored... */ epoll_op = EPOLL_CTL_DEL; epe_templ.events = 0; erts_atomic_dec_nob(&ps->no_of_user_fds); break; case ERTS_POLL_OP_ADD: epoll_op = EPOLL_CTL_ADD; erts_atomic_inc_nob(&ps->no_of_user_fds); break; case ERTS_POLL_OP_MOD: epoll_op = EPOLL_CTL_MOD; break; default: ASSERT(0); break; } do { /* We init 'epe' every time since epoll_ctl() may modify it (not declared const and not documented as const). */ epe.events = epe_templ.events; epe.data.fd = epe_templ.data.fd; res = epoll_ctl(ps->kp_fd, epoll_op, fd, &epe); } while (res != 0 && errno == EINTR); #if ERTS_POLL_DEBUG_PRINT { int saved_errno = errno; DEBUG_PRINT_FD("%s = epoll_ctl(%d, %s, %d, {0x%x, %d})", ps, fd, res == 0 ? "0" : erl_errno_id(errno), ps->kp_fd, (epoll_op == EPOLL_CTL_ADD ? "EPOLL_CTL_ADD" : (epoll_op == EPOLL_CTL_MOD ? "EPOLL_CTL_MOD" : (epoll_op == EPOLL_CTL_DEL ? "EPOLL_CTL_DEL" : "UNKNOWN"))), fd, epe_templ.events, fd); errno = saved_errno; } #endif if (res != 0) { switch (op) { case ERTS_POLL_OP_MOD: epe.events = 0; do { /* We init 'epe' every time since epoll_ctl() may modify it (not declared const and not documented as const). */ epe.events = 0; epe.data.fd = fd; res = epoll_ctl(ps->kp_fd, EPOLL_CTL_DEL, fd, &epe); } while (res != 0 && errno == EINTR); /* Fall through ... */ case ERTS_POLL_OP_ADD: { erts_atomic_dec_nob(&ps->no_of_user_fds); res = ERTS_POLL_EV_NVAL; break; } case ERTS_POLL_OP_DEL: { /* * Since we use a lazy update approach EPOLL_CTL_DEL will * frequently fail. This since epoll automatically removes * a filedescriptor that is closed from the poll set. */ res = 0; break; } default: fatal_error("%s:%d:update_pollset(): Internal error\n", __FILE__, __LINE__); break; } } else { res = events; } return res; } #endif /* ERTS_POLL_USE_EPOLL */ #if ERTS_POLL_USE_KQUEUE /* Some versions of the EV_SET macro used kevp multiple times, so we define out own version that make sure that it is safe to do kevp++ in the argument list. */ #define ERTS_EV_SET(kevp, a, b, c, f) do { \ struct kevent *kevp_ = kevp; \ EV_SET(kevp_, a, b, c, 0, 0, f); \ } while(0) static int update_pollset(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events) { int res = 0, len = 0; struct kevent evts[2]; struct timespec ts = {0, 0}; #ifdef EV_DISPATCH /* If we have EV_DISPATCH we use it. The kevent descriptions for both read and write are added on OP_ADD and removed on OP_DEL. And then after than only EV_ENABLE|EV_DISPATCH are used. It could be possible to not modify the pollset when disabling and/or deleting events, but that may cause the poll threads to be awoken a lot more than they should so we take the cost here instead of in the poll thread. Note: We need to have EV_DISPATCH both when the event is enabled and disabled, as otherwise the event may be triggered twice on each re-arm. Not sure if this is intended or not (can't find anything about it in the man page), but it seems to be the way it works... */ if (op == ERTS_POLL_OP_DEL) { erts_atomic_dec_nob(&ps->no_of_user_fds); /* We could probably skip this delete, do we want to? */ ERTS_EV_SET(&evts[len++], fd, EVFILT_READ, EV_DELETE, (void *) 0); ERTS_EV_SET(&evts[len++], fd, EVFILT_WRITE, EV_DELETE, (void *) 0); } else if (op == ERTS_POLL_OP_ADD) { uint32_t flags; erts_atomic_inc_nob(&ps->no_of_user_fds); flags = EV_ADD|EV_DISPATCH; flags |= ((events & ERTS_POLL_EV_IN) ? 0 : EV_DISABLE); ERTS_EV_SET(&evts[len++], fd, EVFILT_READ, flags, (void *) ERTS_POLL_EV_IN); flags = EV_ADD|EV_DISPATCH; flags |= ((events & ERTS_POLL_EV_OUT) ? 0 : EV_DISABLE); ERTS_EV_SET(&evts[len++], fd, EVFILT_WRITE, flags, (void *) ERTS_POLL_EV_OUT); } else { uint32_t flags; ASSERT(op == ERTS_POLL_OP_MOD); flags = EV_DISPATCH; flags |= (events & ERTS_POLL_EV_IN) ? EV_ENABLE : EV_DISABLE; ERTS_EV_SET(&evts[len++], fd, EVFILT_READ, flags, (void *) ERTS_POLL_EV_IN); flags = EV_DISPATCH; flags |= (events & ERTS_POLL_EV_OUT) ? EV_ENABLE : EV_DISABLE; ERTS_EV_SET(&evts[len++], fd, EVFILT_WRITE, flags, (void *) ERTS_POLL_EV_OUT); } #else uint32_t flags = EV_ADD|EV_ONESHOT; if (op == ERTS_POLL_OP_DEL) { erts_atomic_dec_nob(&ps->no_of_user_fds); /* We don't do anything when a delete is issued. The fds will be removed when they are triggered, or when they are closed. */ events = 0; } else if (op == ERTS_POLL_OP_ADD) { erts_atomic_inc_nob(&ps->no_of_user_fds); } if (events & ERTS_POLL_EV_IN) { ERTS_EV_SET(&evts[len++], fd, EVFILT_READ, flags, (void *) ERTS_POLL_EV_IN); } if (events & ERTS_POLL_EV_OUT) { ERTS_EV_SET(&evts[len++], fd, EVFILT_WRITE, flags, (void *) ERTS_POLL_EV_OUT); } #endif if (len) do { res = kevent(ps->kp_fd, evts, len, NULL, 0, &ts); } while (res < 0 && errno == EINTR); #if ERTS_POLL_DEBUG_PRINT { int saved_errno = errno, i; char keventb[255], *keventbp = keventb; if (res < 0) keventbp += sprintf(keventbp,"%s = ",erl_errno_id(saved_errno)); else keventbp += sprintf(keventbp,"%d = ",res); keventbp += sprintf(keventbp, "kevent(%d, {",ps->kp_fd); for (i = 0; i < len; i++) { const char *flags = "UNKNOWN"; if (evts[i].flags == EV_DELETE) flags = "EV_DELETE"; if (evts[i].flags == (EV_ADD|EV_ONESHOT)) flags = "EV_ADD|EV_ONESHOT"; #ifdef EV_DISPATCH if (evts[i].flags == (EV_ADD|EV_DISPATCH)) flags = "EV_ADD|EV_DISPATCH"; if (evts[i].flags == (EV_ADD|EV_DISABLE)) flags = "EV_ADD|EV_DISABLE"; if (evts[i].flags == (EV_ENABLE|EV_DISPATCH)) flags = "EV_ENABLE|EV_DISPATCH"; if (evts[i].flags == EV_DISABLE) flags = "EV_DISABLE"; if (evts[i].flags == (EV_DISABLE|EV_DISPATCH)) flags = "EV_DISABLE|EV_DISABLE"; #endif keventbp += sprintf(keventbp, "%s{%lu, %s, %s}",i > 0 ? ", " : "", evts[i].ident, (evts[i].filter == EVFILT_READ ? "EVFILT_READ" : (evts[i].filter == EVFILT_WRITE ? "EVFILT_WRITE" : "UNKNOWN")), flags); } keventbp += sprintf(keventbp, "}, %d)", len); DEBUG_PRINT_FD("%s", ps, fd, keventb); errno = saved_errno; } #endif if (res < 0) { if (op != ERTS_POLL_OP_DEL) { #ifdef EV_RECEIPT struct kevent receipt_evts[2]; len = 0; ERTS_EV_SET(&evts[len++], fd, EVFILT_WRITE, EV_DELETE|EV_RECEIPT, (void *) 0); ERTS_EV_SET(&evts[len++], fd, EVFILT_READ, EV_DELETE|EV_RECEIPT, (void *) 0); do { res = kevent(ps->kp_fd, evts, len, receipt_evts, 2, &ts); } while (res < 0 && errno == EINTR); #else ERTS_EV_SET(&evts[0], fd, EVFILT_WRITE, EV_DELETE, (void *) 0); do { res = kevent(ps->kp_fd, evts, 1, NULL, 0, &ts); } while (res < 0 && errno == EINTR); ERTS_EV_SET(&evts[0], fd, EVFILT_READ, EV_DELETE, (void *) 0); do { res = kevent(ps->kp_fd, evts, 1, NULL, 0, &ts); } while (res < 0 && errno == EINTR); #endif if (op == ERTS_POLL_OP_ADD) erts_atomic_dec_nob(&ps->no_of_user_fds); return ERTS_POLL_EV_NVAL; } return 0; } return events; } #endif /* ERTS_POLL_USE_KQUEUE */ #if !ERTS_POLL_USE_CONCURRENT_UPDATE static ERTS_INLINE void init_batch_update(ErtsPollSet *ps, int len) { #if ERTS_POLL_USE_DEVPOLL ASSERT(ps->poll_fds == NULL); ps->poll_fds = erts_alloc(ERTS_ALC_T_TMP, sizeof(struct pollfd) * len); ps->poll_fds_ix = 0; #endif } static ERTS_INLINE void write_batch_update(ErtsPollSet *ps) { #if ERTS_POLL_USE_DEVPOLL ssize_t wres; char *buf = (char *) ps->poll_fds; size_t buf_size = sizeof(struct pollfd)*ps->poll_fds_ix; while (1) { wres = write(ps->kp_fd, (void *) buf, buf_size); if (wres < 0) { if (errno == EINTR) continue; fatal_error("%s:%d:write_batch_buf(): " "Failed to write to /dev/poll: " "%s (%d)\n", __FILE__, __LINE__, erl_errno_id(errno), errno); } #if ERTS_POLL_DEBUG_PRINT { int saved_errno = errno, i; char devpollb[2048], *devpollbp = devpollb; devpollbp += sprintf(devpollbp, "%d = devpoll(%d, {", wres, ps->kp_fd); for (i = 0; i < wres / sizeof(struct pollfd); i++) { if (devpollbp == devpollb) devpollbp += sprintf(devpollbp, "%d = devpoll(%d, {", wres, ps->kp_fd); devpollbp += sprintf(devpollbp, "%s{fd = %d, events = %s}", i > 0 ? ", " : "", ps->poll_fds[i].fd, ev2str(ps->poll_fds[i].events)); if (devpollbp - devpollb > 512) { devpollbp += sprintf(devpollbp, "}, %d)", ps->poll_fds_ix); DEBUG_PRINT("%s", ps, devpollb); devpollbp = devpollb; } } devpollbp += sprintf(devpollbp, "}, %d)", ps->poll_fds_ix); DEBUG_PRINT("%s", ps, devpollb); errno = saved_errno; } #endif buf_size -= wres; if (buf_size <= 0) break; buf += wres; } if (buf_size < 0) { fatal_error("%s:%d:write_devpoll_buf(): Internal error\n", __FILE__, __LINE__); } erts_free(ERTS_ALC_T_TMP, ps->poll_fds); ps->poll_fds = NULL; #endif } static ERTS_INLINE int need_update(ErtsPollSet *ps, int fd, int *resetp) { int reset; ASSERT(fd < ps->fds_status_len); reset = (int) (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST); ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_RST; *resetp = reset; if (reset || ps->fds_status[fd].used_events != ps->fds_status[fd].events) return 1; return 0; } static int update_pollset(ErtsPollSet *ps, ErtsPollResFd pr[], int fd) { int res = 0, reset = 0; ErtsPollEvents events = ps->fds_status[fd].events; ASSERT(fd < ps->fds_status_len); if (!need_update(ps, fd, &reset)) return res; #if ERTS_POLL_USE_POLL /* --- poll -------------------------------- */ if (!events) { int pix = ps->fds_status[fd].pix; int last_pix; if (reset) { /* When a fd has been reset, we tell the caller of erts_poll_wait this by setting the fd as ERTS_POLL_EV_NONE */ ERTS_POLL_RES_SET_FD(&pr[res], fd); ERTS_POLL_RES_SET_EVTS(&pr[res], ERTS_POLL_EV_NONE); DEBUG_PRINT_FD("trig %s (poll)", ps, fd, ev2str(ERTS_POLL_EV_NONE)); res++; } if (pix < 0) { return res; } erts_atomic_dec_nob(&ps->no_of_user_fds); last_pix = --ps->no_poll_fds; if (pix != last_pix) { /* Move last pix to this pix */ ps->poll_fds[pix].fd = ps->poll_fds[last_pix].fd; ps->poll_fds[pix].events = ps->poll_fds[last_pix].events; ps->poll_fds[pix].revents = ps->poll_fds[last_pix].revents; ps->fds_status[ps->poll_fds[pix].fd].pix = pix; } /* Clear last pix */ ps->poll_fds[last_pix].fd = -1; ps->poll_fds[last_pix].events = (short) 0; ps->poll_fds[last_pix].revents = (short) 0; /* Clear this fd status */ ps->fds_status[fd].pix = -1; ps->fds_status[fd].used_events = (ErtsPollEvents) 0; } else { int pix = ps->fds_status[fd].pix; if (pix < 0) { erts_atomic_inc_nob(&ps->no_of_user_fds); ps->fds_status[fd].pix = pix = ps->no_poll_fds++; if (pix >= ps->poll_fds_len) grow_poll_fds(ps, pix); ps->poll_fds[pix].fd = fd; ps->fds_status[fd].pix = pix; } /* Events to be used in next poll */ ps->poll_fds[pix].events = ev2pollev(events); if (ps->poll_fds[pix].revents) { /* Remove result events that we should not poll for anymore */ ps->poll_fds[pix].revents &= ev2pollev(~(~ps->fds_status[fd].used_events & events)); } /* Save events to be used in next poll */ ps->fds_status[fd].used_events = events; } return res; #elif ERTS_POLL_USE_SELECT /* --- select ------------------------------ */ if (!events) { if (reset) { /* When a fd has been reset, we tell the caller of erts_poll_wait this by setting the fd as ERTS_POLL_EV_NONE */ ERTS_POLL_RES_SET_FD(&pr[res], fd); ERTS_POLL_RES_SET_EVTS(&pr[res], ERTS_POLL_EV_NONE); DEBUG_PRINT_FD("trig %s (select)", ps, fd, ev2str(ERTS_POLL_EV_NONE)); res++; } ERTS_FD_CLR(fd, &ps->input_fds); ERTS_FD_CLR(fd, &ps->output_fds); if (ps->fds_status[fd].used_events) { erts_atomic_dec_nob(&ps->no_of_user_fds); ps->fds_status[fd].used_events = (ErtsPollEvents) 0; } if (fd == ps->max_fd) { int max = ps->max_fd; for (max = ps->max_fd; max >= 0; max--) if (ps->fds_status[max].used_events) break; ps->max_fd = max; } } else { ensure_select_fds(fd, &ps->input_fds, &ps->output_fds); if (!ps->fds_status[fd].used_events) erts_atomic_inc_nob(&ps->no_of_user_fds); if (events & ERTS_POLL_EV_IN) ERTS_FD_SET(fd, &ps->input_fds); else ERTS_FD_CLR(fd, &ps->input_fds); if (events & ERTS_POLL_EV_OUT) ERTS_FD_SET(fd, &ps->output_fds); else ERTS_FD_CLR(fd, &ps->output_fds); ps->fds_status[fd].used_events = events; if (fd > ps->max_fd) ps->max_fd = fd; } return res; #elif ERTS_POLL_USE_DEVPOLL if (!events) { if (reset) { /* When a fd has been reset, we tell the caller of erts_poll_wait this by setting the fd as ERTS_POLL_EV_NONE */ ERTS_POLL_RES_SET_FD(&pr[res], fd); ERTS_POLL_RES_SET_EVTS(&pr[res], ERTS_POLL_EV_NONE); DEBUG_PRINT_FD("trig %s (devpoll)", ps, fd, ev2str(ERTS_POLL_EV_NONE)); res++; } ps->poll_fds[ps->poll_fds_ix].fd = fd; ps->poll_fds[ps->poll_fds_ix].revents = 0; ps->poll_fds[ps->poll_fds_ix++].events = POLLREMOVE; if (ps->fds_status[fd].used_events) { erts_atomic_dec_nob(&ps->no_of_user_fds); ps->fds_status[fd].used_events = 0; } } else { if (!ps->fds_status[fd].used_events) { erts_atomic_inc_nob(&ps->no_of_user_fds); } ps->poll_fds[ps->poll_fds_ix].fd = fd; ps->poll_fds[ps->poll_fds_ix].revents = 0; ps->poll_fds[ps->poll_fds_ix++].events = ERTS_POLL_EV_E2N(events); ps->fds_status[fd].used_events = ps->fds_status[fd].events; } return res; #endif } static int handle_update_requests(ErtsPollSet *ps, ErtsPollResFd pr[], int no_fds) { int res = 0; ErtsPollSetUpdateRequestsBlock *urqbp = ps->curr_upd_req_block; while (urqbp) { ErtsPollSetUpdateRequestsBlock *free_urqbp = urqbp; int i; int len = urqbp->len; init_batch_update(ps, len); for (i = 0; i < len; i++) { int fd = urqbp->fds[i]; ASSERT(fd < ps->fds_status_len); ASSERT(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INURQ); /* We have run out of PollResFd slots to put results in, so we yield here and return later for more. */ if (res == no_fds && pr != NULL) { memmove(urqbp->fds, urqbp->fds+i, sizeof(int) * (len - i)); urqbp->len -= i; ps->curr_upd_req_block = urqbp; write_batch_update(ps); return res; } if (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INURQ) { ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_INURQ; res += update_pollset(ps, pr + res, fd); } } free_urqbp = urqbp; urqbp = urqbp->next; free_update_requests_block(ps, free_urqbp); write_batch_update(ps); } ps->curr_upd_req_block = &ps->update_requests; #if ERTS_POLL_USE_DEVPOLL && defined(HARD_DEBUG) check_poll_status(ps); #endif ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(ps); return res; } #endif /* !ERTS_POLL_USE_CONCURRENT_UPDATE */ static ERTS_INLINE ErtsPollEvents poll_control(ErtsPollSet *ps, int fd, ErtsPollOp op, ErtsPollEvents events, int *do_wake) { ErtsPollEvents new_events; if (fd < ps->internal_fd_limit || fd >= max_fds) { if (fd < 0 || fd >= max_fds) { new_events = ERTS_POLL_EV_ERR; goto done; } #if ERTS_POLL_USE_KERNEL_POLL if (fd == ps->kp_fd) { new_events = ERTS_POLL_EV_NVAL; goto done; } #endif #if !ERTS_POLL_USE_CONCURRENT_UPDATE if (fd == ps->wake_fds[0] || fd == ps->wake_fds[1]) { new_events = ERTS_POLL_EV_NVAL; goto done; } #endif } #if ERTS_POLL_USE_CONCURRENT_UPDATE new_events = update_pollset(ps, fd, op, events); #else /* !ERTS_POLL_USE_CONCURRENT_UPDATE */ if (fd >= ps->fds_status_len) grow_fds_status(ps, fd); ASSERT(fd < ps->fds_status_len); if (op == ERTS_POLL_OP_DEL) { ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_RST; ps->fds_status[fd].events = 0; *do_wake = 1; } else if (op == ERTS_POLL_OP_ADD) { ASSERT(ps->fds_status[fd].events == 0); ps->fds_status[fd].events = events; *do_wake = 1; } else { ASSERT(op == ERTS_POLL_OP_MOD); ps->fds_status[fd].events = events; *do_wake = 1; } new_events = ps->fds_status[fd].events; enqueue_update_request(ps, fd); #endif /* !ERTS_POLL_USE_CONCURRENT_UPDATE */ done: DEBUG_PRINT_FD("%s = %s(%p, %d, %s, %s) do_wake=%d", ps, fd, ev2str(new_events), __FUNCTION__, ps, fd, op2str(op), ev2str(events), *do_wake); return new_events; } ErtsPollEvents ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet *ps, ErtsSysFdType fd, ErtsPollOp op, ErtsPollEvents events, int* do_wake) /* In: Wake up polling thread */ /* Out: Poller is woken */ { ErtsPollEvents res; ERTS_POLLSET_LOCK(ps); res = poll_control(ps, fd, op, events, do_wake); ERTS_POLLSET_UNLOCK(ps); #if !ERTS_POLL_USE_CONCURRENT_UPDATE if (*do_wake) { wake_poller(ps, 0); } #endif return res; } /* * --- Wait on poll set ------------------------------------------------------ */ #if ERTS_POLL_USE_KERNEL_POLL static ERTS_INLINE int ERTS_POLL_EXPORT(save_result)(ErtsPollSet *ps, ErtsPollResFd pr[], int max_res, int chk_fds_res, int ebadf) { #if !ERTS_POLL_USE_CONCURRENT_UPDATE || ERTS_POLL_DEBUG_PRINT int n = chk_fds_res < max_res ? chk_fds_res : max_res, i; int res = n; #if !ERTS_POLL_USE_CONCURRENT_UPDATE int wake_fd = ps->wake_fds[0]; #endif for (i = 0; i < n; i++) { int fd = ERTS_POLL_RES_GET_FD(&pr[i]); #ifdef DEBUG_PRINT_MODE ErtsPollEvents evts = ERTS_POLL_RES_GET_EVTS(pr+i); #endif DEBUG_PRINT_FD("trig %s (%s)", ps, fd, ev2str(evts), #if ERTS_POLL_USE_KQUEUE "kqueue" #elif ERTS_POLL_USE_EPOLL "epoll" #else "/dev/poll" #endif ); #if !ERTS_POLL_USE_CONCURRENT_UPDATE if (fd == wake_fd) { cleanup_wakeup_pipe(ps); ERTS_POLL_RES_SET_EVTS(&pr[i], ERTS_POLL_EV_NONE); } else { /* Reset the events to emulate ONESHOT semantics */ ps->fds_status[fd].events = 0; enqueue_update_request(ps, fd); } #endif } return res; #else ASSERT(chk_fds_res <= max_res); return chk_fds_res; #endif } #else /* !ERTS_POLL_USE_KERNEL_POLL */ static ERTS_INLINE int ERTS_POLL_EXPORT(save_result)(ErtsPollSet *ps, ErtsPollResFd pr[], int max_res, int chk_fds_res, int ebadf) { #if ERTS_POLL_USE_POLL /* --- poll -------------------------------- */ int res = 0; int wake_fd = ps->wake_fds[0]; int i, first_ix, end_ix; /* * In order to be somewhat fair, we continue on the poll_fds * index where we stopped last time. */ first_ix = i = ((ps->next_poll_fds_ix < ps->no_poll_fds) ? ps->next_poll_fds_ix : 0); end_ix = ps->no_poll_fds; while (1) { while (i < end_ix && res < max_res) { if (ps->poll_fds[i].revents != (short) 0) { int fd = ps->poll_fds[i].fd; ErtsPollEvents revents; if (fd == wake_fd) { cleanup_wakeup_pipe(ps); i++; continue; } revents = pollev2ev(ps->poll_fds[i].revents); ERTS_POLL_RES_SET_FD(&pr[res], fd); ERTS_POLL_RES_SET_EVTS(&pr[res], revents); /* If an fd returns as error, we may want to check the update_requests queue to see if it has been reset before delivering the result?!?! This should allow the user to do driver_dselect + close without waiting for stop_select... */ DEBUG_PRINT_FD("trig %s (poll)", ps, ERTS_POLL_RES_GET_FD(&pr[res]), ev2str(ERTS_POLL_RES_GET_EVTS(&pr[res]))); res++; /* Clear the events for this fd in order to mimic how epoll ONESHOT works */ ps->fds_status[fd].events = 0; enqueue_update_request(ps, fd); } i++; } if (res == max_res || i == first_ix) break; ASSERT(i == ps->no_poll_fds); i = 0; end_ix = first_ix; } ps->next_poll_fds_ix = i; return res; #elif ERTS_POLL_USE_SELECT /* --- select ------------------------------ */ int res = 0; int wake_fd = ps->wake_fds[0]; int fd, first_fd, end_fd; /* * In order to be fair, we continue on the fd where we stopped * last time. */ first_fd = fd = ps->next_sel_fd <= ps->max_fd ? ps->next_sel_fd : 0; end_fd = ps->max_fd + 1; if (!ebadf) { while (1) { while (fd < end_fd && res < max_res) { ErtsPollEvents events = 0; if (ERTS_FD_ISSET(fd, &ps->res_input_fds)) { if (fd == wake_fd) { cleanup_wakeup_pipe(ps); fd++; continue; } events |= ERTS_POLL_EV_IN; } if (ERTS_FD_ISSET(fd, &ps->res_output_fds)) events |= ERTS_POLL_EV_OUT; if (events) { ERTS_POLL_RES_SET_FD(&pr[res], fd); ERTS_POLL_RES_SET_EVTS(&pr[res], events); res++; ps->fds_status[fd].events = 0; enqueue_update_request(ps, fd); } fd++; } if (res == max_res || fd == first_fd) break; ASSERT(fd == ps->max_fd + 1); fd = 0; end_fd = first_fd; } } else { /* * Bad file descriptors in poll set. * * This only happens when running poorly written * drivers. This code could be optimized, but we * don't bother since it should never happen... */ while (1) { while (fd < end_fd && res < max_res) { if (ps->fds_status[fd].events) { int sres; ERTS_fd_set *iset = NULL; ERTS_fd_set *oset = NULL; if (ps->fds_status[fd].events & ERTS_POLL_EV_IN) { iset = &ps->res_input_fds; ERTS_FD_ZERO(iset); ERTS_FD_SET(fd, iset); } if (ps->fds_status[fd].events & ERTS_POLL_EV_OUT) { oset = &ps->res_output_fds; ERTS_FD_ZERO(oset); ERTS_FD_SET(fd, oset); } do { /* Initiate 'tv' each time; select() may modify it */ SysTimeval tv = {0, 0}; sres = ERTS_SELECT(ps->max_fd+1, iset, oset, NULL, &tv); } while (sres < 0 && errno == EINTR); if (sres < 0) { if (fd == wake_fd) { cleanup_wakeup_pipe(ps); fd++; continue; } ERTS_POLL_RES_SET_FD(&pr[res], fd); ERTS_POLL_RES_SET_EVTS(&pr[res], ERTS_POLL_EV_NVAL); res++; } else if (sres > 0) { ErtsPollEvents events = 0; ERTS_POLL_RES_SET_FD(&pr[res], fd); if (iset && ERTS_FD_ISSET(fd, iset)) { if (fd == wake_fd) { cleanup_wakeup_pipe(ps); fd++; continue; } events |= ERTS_POLL_EV_IN; } if (oset && ERTS_FD_ISSET(fd, oset)) { events |= ERTS_POLL_EV_OUT; } ASSERT(events); ERTS_POLL_RES_SET_EVTS(&pr[res], events); res++; } } fd++; } if (res == max_res || fd == first_fd) break; ASSERT(fd == ps->max_fd + 1); fd = 0; end_fd = first_fd; } } ps->next_sel_fd = fd; return res; #endif /* ERTS_POLL_USE_SELECT */ } #endif /* !ERTS_POLL_USE_KERNEL_POLL */ static ERTS_INLINE int check_fd_events(ErtsPollSet *ps, ErtsPollResFd pr[], int do_wait, int max_res) { int res; int timeout = do_wait ? -1 : 0; DEBUG_PRINT_WAIT("Entering check_fd_events(), do_wait=%d", ps, do_wait); { #if ERTS_POLL_USE_EPOLL /* --- epoll ------------------------------- */ res = epoll_wait(ps->kp_fd, pr, max_res, timeout); #elif ERTS_POLL_USE_KQUEUE /* --- kqueue ------------------------------ */ struct timespec ts = {0, 0}; struct timespec *tsp = timeout ? NULL : &ts; res = kevent(ps->kp_fd, NULL, 0, pr, max_res, tsp); #elif ERTS_POLL_USE_DEVPOLL /* --- devpoll ----------------------------- */ /* * The ioctl() will fail with EINVAL on Solaris 10 if dp_nfds * is set too high. dp_nfds should not be set greater than * the maximum number of file descriptors in the poll set. */ struct dvpoll poll_res; int nfds = (int) erts_atomic_read_nob(&ps->no_of_user_fds) + 1 /* wakeup pipe */; poll_res.dp_nfds = nfds < max_res ? nfds : max_res; poll_res.dp_fds = pr; poll_res.dp_timeout = timeout; res = ioctl(ps->kp_fd, DP_POLL, &poll_res); #elif ERTS_POLL_USE_POLL /* --- poll --------------------------------- */ res = poll(ps->poll_fds, ps->no_poll_fds, timeout); #elif ERTS_POLL_USE_SELECT /* --- select ------------------------------ */ SysTimeval tv = {0, 0}; SysTimeval *tvp = timeout ? NULL : &tv; ERTS_FD_COPY(&ps->input_fds, &ps->res_input_fds); ERTS_FD_COPY(&ps->output_fds, &ps->res_output_fds); res = ERTS_SELECT(ps->max_fd + 1, &ps->res_input_fds, &ps->res_output_fds, NULL, tvp); #endif /* ----------------------------------------- */ } DEBUG_PRINT_WAIT("Leaving check_fd_events(), res=%d", ps, res); return res; } int ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet *ps, ErtsPollResFd pr[], int *len) { int res, no_fds, used_fds = 0; int ebadf = 0; int do_wait; int ps_locked = 0; ERTS_MSACC_DECLARE_CACHE(); no_fds = *len; *len = 0; ASSERT(no_fds > 0); #if !ERTS_POLL_USE_CONCURRENT_UPDATE if (ERTS_POLLSET_HAVE_UPDATE_REQUESTS(ps)) { ERTS_POLLSET_LOCK(ps); used_fds = handle_update_requests(ps, pr, no_fds); ERTS_POLLSET_UNLOCK(ps); if (used_fds == no_fds) { *len = used_fds; return 0; } } #endif do_wait = !is_woken(ps) && used_fds == 0; DEBUG_PRINT_WAIT("Entering %s(), do_wait=%d", ps, __FUNCTION__, do_wait); if (do_wait) { erts_thr_progress_prepare_wait(NULL); ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_SLEEP); } while (1) { res = check_fd_events(ps, pr + used_fds, do_wait, no_fds - used_fds); #if !ERTS_POLL_USE_CONCURRENT_UPDATE if (res < 0 && errno == EBADF && ERTS_POLLSET_HAVE_UPDATE_REQUESTS(ps)) { /* * This may have happened because another thread deselected * a fd in our poll set and then closed it, i.e. the driver * behaved correctly. We wan't to avoid looking for a bad * fd, that may even not exist anymore. Therefore, handle * update requests and try again. This behaviour should only * happen when using SELECT as the polling mechanism. */ ERTS_POLLSET_LOCK(ps); used_fds += handle_update_requests(ps, pr + used_fds, no_fds - used_fds); if (used_fds == no_fds) { *len = used_fds; ERTS_POLLSET_UNLOCK(ps); return 0; } res = check_fd_events(ps, pr + used_fds, 0, no_fds - used_fds); /* Keep the lock over the non-blocking poll in order to not get any nasty races happening. */ ERTS_POLLSET_UNLOCK(ps); if (res == 0) { errno = EAGAIN; res = -1; } } #endif if (res != 0) break; if (!do_wait) break; } if (do_wait) { erts_thr_progress_finalize_wait(NULL); ERTS_MSACC_UPDATE_CACHE(); ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_CHECK_IO); } woke_up(ps); if (res < 0) { #if ERTS_POLL_USE_SELECT if (errno == EBADF) { ebadf = 1; goto save_results; } #endif res = errno; } else { #if ERTS_POLL_USE_SELECT save_results: #endif ps_locked = 1; ERTS_POLLSET_LOCK(ps); used_fds += ERTS_POLL_EXPORT(save_result)(ps, pr + used_fds, no_fds - used_fds, res, ebadf); #ifdef HARD_DEBUG check_poll_result(pr, used_fds); #endif res = (used_fds == 0 ? (is_interrupted_reset(ps) ? EINTR : EAGAIN) : 0); *len = used_fds; } if (ps_locked) ERTS_POLLSET_UNLOCK(ps); DEBUG_PRINT_WAIT("Leaving %s = %s(len = %d)", ps, res == 0 ? "0" : erl_errno_id(res), __FUNCTION__, *len); return res; } /* * --- Interrupt a thread doing erts_poll_wait() ----------------------------- */ void ERTS_POLL_EXPORT(erts_poll_interrupt)(ErtsPollSet *ps, int set) { #if !ERTS_POLL_USE_CONCURRENT_UPDATE if (!set) reset_wakeup_state(ps); else wake_poller(ps, 1); #endif } int ERTS_POLL_EXPORT(erts_poll_max_fds)(void) { return max_fds; } /* * --- Initialization -------------------------------------------------------- */ void ERTS_POLL_EXPORT(erts_poll_init)(int *concurrent_updates) { errno = 0; if (concurrent_updates) { #if ERTS_POLL_USE_CONCURRENT_UPDATE *concurrent_updates = 1; #else *concurrent_updates = 0; #endif } #if !defined(NO_SYSCONF) max_fds = sysconf(_SC_OPEN_MAX); #elif ERTS_POLL_USE_SELECT max_fds = NOFILE; #else max_fds = OPEN_MAX; #endif #if ERTS_POLL_USE_SELECT && defined(FD_SETSIZE) && \ !defined(_DARWIN_UNLIMITED_SELECT) if (max_fds > FD_SETSIZE) max_fds = FD_SETSIZE; #endif if (max_fds < 0) fatal_error("erts_poll_init(): Failed to get max number of files: %s\n", erl_errno_id(errno)); print_misc_debug_info(); } ErtsPollSet * ERTS_POLL_EXPORT(erts_poll_create_pollset)(int id) { #if ERTS_POLL_USE_KERNEL_POLL int kp_fd; #endif ErtsPollSet *ps = erts_alloc(ERTS_ALC_T_POLLSET, sizeof(struct ERTS_POLL_EXPORT(erts_pollset))); ps->id = id; ps->internal_fd_limit = 0; erts_atomic_init_nob(&ps->no_of_user_fds, 0); #if ERTS_POLL_USE_KERNEL_POLL ps->kp_fd = -1; #if ERTS_POLL_USE_EPOLL kp_fd = epoll_create(256); #elif ERTS_POLL_USE_DEVPOLL kp_fd = open("/dev/poll", O_RDWR); #elif ERTS_POLL_USE_KQUEUE kp_fd = kqueue(); #endif if (kp_fd < 0) fatal_error("erts_poll_create_pollset(): Failed to " #if ERTS_POLL_USE_EPOLL "create epoll set" #elif ERTS_POLL_USE_DEVPOLL "to open /dev/poll" #elif ERTS_POLL_USE_KQUEUE "create kqueue" #endif ": %s (%d)\n", erl_errno_id(errno), errno); #endif /* ERTS_POLL_USE_KERNEL_POLL */ #if ERTS_POLL_USE_POLL ps->next_poll_fds_ix = 0; ps->no_poll_fds = 0; ps->poll_fds_len = 0; ps->poll_fds = NULL; #elif ERTS_POLL_USE_SELECT ps->next_sel_fd = 0; ps->max_fd = -1; #ifdef _DARWIN_UNLIMITED_SELECT ps->input_fds.sz = 0; ps->input_fds.ptr = NULL; ps->res_input_fds.sz = 0; ps->res_input_fds.ptr = NULL; ps->output_fds.sz = 0; ps->output_fds.ptr = NULL; ps->res_output_fds.sz = 0; ps->res_output_fds.ptr = NULL; #else ERTS_FD_ZERO(&ps->input_fds); ERTS_FD_ZERO(&ps->res_input_fds); ERTS_FD_ZERO(&ps->output_fds); ERTS_FD_ZERO(&ps->res_output_fds); #endif #endif #if !ERTS_POLL_USE_CONCURRENT_UPDATE ps->fds_status = NULL; ps->fds_status_len = 0; ps->update_requests.next = NULL; ps->update_requests.len = 0; ps->curr_upd_req_block = &ps->update_requests; erts_atomic32_init_nob(&ps->have_update_requests, 0); erts_mtx_init(&ps->mtx, "pollset", NIL, ERTS_LOCK_FLAGS_CATEGORY_IO); #endif #if ERTS_POLL_USE_KERNEL_POLL if (ps->internal_fd_limit <= kp_fd) ps->internal_fd_limit = kp_fd + 1; ps->kp_fd = kp_fd; #endif #if !ERTS_POLL_USE_CONCURRENT_UPDATE erts_atomic32_init_nob(&ps->wakeup_state, (erts_aint32_t) 0); create_wakeup_pipe(ps); handle_update_requests(ps, NULL, 0); cleanup_wakeup_pipe(ps); #endif erts_atomic_set_nob(&ps->no_of_user_fds, 0); /* Don't count wakeup pipe and fallback fd */ return ps; } /* * --- Info ------------------------------------------------------------------ */ void ERTS_POLL_EXPORT(erts_poll_info)(ErtsPollSet *ps, ErtsPollInfo *pip) { #if !ERTS_POLL_USE_CONCURRENT_UPDATE int pending_updates; #endif Uint size = 0; ERTS_POLLSET_LOCK(ps); size += sizeof(struct ERTS_POLL_EXPORT(erts_pollset)); #if !ERTS_POLL_USE_CONCURRENT_UPDATE size += ps->fds_status_len*sizeof(ErtsFdStatus); #endif #if ERTS_POLL_USE_POLL size += ps->poll_fds_len*sizeof(struct pollfd); #elif ERTS_POLL_USE_SELECT #ifdef _DARWIN_UNLIMITED_SELECT size += ps->input_fds.sz + ps->res_input_fds.sz + ps->output_fds.sz + ps->res_output_fds.sz; #endif #endif #if !ERTS_POLL_USE_CONCURRENT_UPDATE { ErtsPollSetUpdateRequestsBlock *urqbp = ps->update_requests.next; pending_updates = ps->update_requests.len; while (urqbp) { size += sizeof(ErtsPollSetUpdateRequestsBlock); pending_updates += urqbp->len; urqbp = urqbp->next; } } #endif pip->primary = #if ERTS_POLL_USE_KQUEUE "kqueue" #elif ERTS_POLL_USE_EPOLL "epoll" #elif ERTS_POLL_USE_DEVPOLL "/dev/poll" #elif ERTS_POLL_USE_POLL "poll" #elif ERTS_POLL_USE_SELECT "select" #endif ; pip->kernel_poll = #if !ERTS_POLL_USE_KERNEL_POLL NULL #elif ERTS_POLL_USE_KQUEUE "kqueue" #elif ERTS_POLL_USE_EPOLL "epoll" #elif ERTS_POLL_USE_DEVPOLL "/dev/poll" #endif ; pip->memory_size = size; pip->poll_set_size = (int) erts_atomic_read_nob(&ps->no_of_user_fds); #if !ERTS_POLL_USE_CONCURRENT_UPDATE pip->poll_set_size++; /* Wakeup pipe */ #endif pip->lazy_updates = #if !ERTS_POLL_USE_CONCURRENT_UPDATE 1 #else 0 #endif ; pip->pending_updates = #if !ERTS_POLL_USE_CONCURRENT_UPDATE pending_updates #else 0 #endif ; pip->concurrent_updates = #if ERTS_POLL_USE_CONCURRENT_UPDATE 1 #else 0 #endif ; pip->is_fallback = #if ERTS_POLL_IS_FALLBACK 1 #else 0 #endif ; pip->batch_updates = #if ERTS_POLL_USE_DEVPOLL 1 #else 0 #endif ; pip->max_fds = max_fds; ERTS_POLLSET_UNLOCK(ps); } /* * Fatal error... */ #ifndef ERTS_GOT_SIGUSR1 # define ERTS_GOT_SIGUSR1 0 #endif static void fatal_error(char *format, ...) { va_list ap; if (ERTS_SOMEONE_IS_CRASH_DUMPING || ERTS_GOT_SIGUSR1) { /* * Crash dump writing and reception of sigusr1 (which will * result in a crash dump) closes all file descriptors. This * typically results in a fatal error for erts_poll() (wakeup * pipes and kernel poll fds are closed). * * We ignore the error and let the crash dump writing continue... */ return; } va_start(ap, format); erts_vfprintf(stderr, format, ap); va_end(ap); abort(); } /* * --- Debug ----------------------------------------------------------------- */ #if ERTS_POLL_USE_EPOLL uint32_t epoll_events(int kp_fd, int fd) { /* For epoll we read the information about what is selected upon from the proc fs.*/ char fname[30]; FILE *f; unsigned int pos, flags, mnt_id; int line = 0; sprintf(fname,"/proc/%d/fdinfo/%d",getpid(), kp_fd); f = fopen(fname,"r"); if (!f) { fprintf(stderr,"failed to open file %s, errno = %d\n", fname, errno); ASSERT(0); return 0; } if (fscanf(f,"pos:\t%x\nflags:\t%x", &pos, &flags) != 2) { fprintf(stderr,"failed to parse file %s, errno = %d\n", fname, errno); ASSERT(0); return 0; } if (fscanf(f,"\nmnt_id:\t%x\n", &mnt_id)); line += 3; while (!feof(f)) { /* tfd: 10 events: 40000019 data: 180000000a */ int ev_fd; uint32_t events; uint64_t data; if (fscanf(f,"tfd:%d events:%x data:%lx\n", &ev_fd, &events, &data) != 3) { fprintf(stderr,"failed to parse file %s on line %d, errno = %d\n", fname, line, errno); return 0; } if (fd == ev_fd) { fclose(f); return events; } } fclose(f); return 0; } #endif void ERTS_POLL_EXPORT(erts_poll_get_selected_events)(ErtsPollSet *ps, ErtsPollEvents ev[], int len) { int fd; #if !ERTS_POLL_USE_CONCURRENT_UPDATE ERTS_POLLSET_LOCK(ps); for (fd = 0; fd < len; fd++) { if (fd >= ps->fds_status_len) ev[fd] = 0; else { ev[fd] = ps->fds_status[fd].events; if ( fd == ps->wake_fds[0] || fd == ps->wake_fds[1] || #if ERTS_POLL_USE_KERNEL_POLL fd == ps->kp_fd || #endif 0) ev[fd] |= ERTS_POLL_EV_NVAL; } } ERTS_POLLSET_UNLOCK(ps); #elif ERTS_POLL_USE_EPOLL /* For epoll we read the information about what is selected upon from the proc fs.*/ char fname[30]; FILE *f; unsigned int pos, flags, mnt_id; int line = 0; sprintf(fname,"/proc/%d/fdinfo/%d",getpid(), ps->kp_fd); for (fd = 0; fd < len; fd++) ev[fd] = ERTS_POLL_EV_NONE; f = fopen(fname,"r"); if (!f) { fprintf(stderr,"failed to open file %s, errno = %d\n", fname, errno); return; } if (fscanf(f,"pos:\t%x\nflags:\t%x", &pos, &flags) != 2) { fprintf(stderr,"failed to parse file %s, errno = %d\n", fname, errno); ASSERT(0); return; } if (fscanf(f,"\nmnt_id:\t%x\n", &mnt_id)); line += 3; while (!feof(f)) { /* tfd: 10 events: 40000019 data: 180000000a */ int fd; uint32_t events; uint64_t data; if (fscanf(f,"tfd:%d events:%x data:%lx\n", &fd, &events, &data) != 3) { fprintf(stderr,"failed to parse file %s on line %d, errno = %d\n", fname, line, errno); ASSERT(0); return; } data &= 0xFFFFFFFF; ASSERT(fd == data); /* Events are the events that are being monitored, which of course include error and hup events, but we are only interested in IN/OUT events */ ev[fd] = (ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT) & ERTS_POLL_EV_N2E(events); line++; } #else for (fd = 0; fd < len; fd++) ev[fd] = ERTS_POLL_EV_NONE; #endif } #ifdef HARD_DEBUG static void check_poll_result(ErtsPollResFd pr[], int len) { int i, j; for (i = 0; i < len; i++) { ASSERT(pr[i].fd >= 0); ASSERT(pr[i].fd < max_fds); for (j = 0; j < len; j++) { ASSERT(i == j || pr[i].fd != pr[j].fd); } } } #if ERTS_POLL_USE_DEVPOLL && defined(DEBUG) static void check_poll_status(ErtsPollSet *ps) { int i; for (i = 0; i < ps->fds_status_len; i++) { int ires; struct pollfd dp_fd; short events = ERTS_POLL_EV_E2N(ps->fds_status[i].events); dp_fd.fd = i; dp_fd.events = (short) 0; dp_fd.revents = (short) 0; ires = ioctl(ps->kp_fd, DP_ISPOLLED, &dp_fd); if (ires == 0) { ASSERT(!events); } else if (ires == 1) { ASSERT(events); ASSERT(events == dp_fd.revents); } else { ASSERT(0); } ASSERT(dp_fd.fd == i); ASSERT(ps->fds_status[i].events == ps->fds_status[i].used_events); } } #endif /* ERTS_POLL_USE_DEVPOLL */ #endif /* HARD_DEBUG */ static void print_misc_debug_info(void) { #if ERTS_POLL_DEBUG_PRINT erts_printf("erts_poll using: %s lazy_updates:%s\n", #if ERTS_POLL_USE_KQUEUE "kqueue" #elif ERTS_POLL_USE_EPOLL "epoll" #elif ERTS_POLL_USE_DEVPOLL "/dev/poll" #elif ERTS_POLL_USE_POLL "poll" #elif ERTS_POLL_USE_SELECT "select" #endif , #if !ERTS_POLL_USE_CONCURRENT_UPDATE "true" #else "false" #endif ); erts_printf("ERTS_POLL_EV_IN=0x%x\n" "ERTS_POLL_EV_OUT=0x%x\n" "ERTS_POLL_EV_NVAL=0x%x\n" "ERTS_POLL_EV_ERR=0x%x\n", ERTS_POLL_EV_IN, ERTS_POLL_EV_OUT, ERTS_POLL_EV_NVAL, ERTS_POLL_EV_ERR); #ifdef FD_SETSIZE erts_printf("FD_SETSIZE=%d\n", FD_SETSIZE); #endif #endif } #ifdef ERTS_ENABLE_LOCK_COUNT void ERTS_POLL_EXPORT(erts_lcnt_enable_pollset_lock_count)(ErtsPollSet *pollset, int enable) { #if !ERTS_POLL_USE_CONCURRENT_UPDATE if(enable) { erts_lcnt_install_new_lock_info(&pollset->mtx.lcnt, "pollset_rm", NIL, ERTS_LOCK_TYPE_MUTEX | ERTS_LOCK_FLAGS_CATEGORY_IO); } else { erts_lcnt_uninstall(&pollset->mtx.lcnt); } #endif return; } #endif