/* * %CopyrightBegin% * * Copyright Ericsson AB 2006-2012. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved online at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * %CopyrightEnd% */ /* * Description: Poll interface suitable for ERTS with or without * SMP support. * * The interface is currently implemented using: * - select * - poll * - /dev/poll * - epoll with poll or select as fallback * - kqueue with poll or select as fallback * * Some time in the future it will also be * implemented using Solaris ports. * * * * Author: Rickard Green */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #if defined(__DARWIN__) || defined(__APPLE__) && defined(__MACH__) # define _DARWIN_UNLIMITED_SELECT #endif #ifndef WANT_NONBLOCKING # define WANT_NONBLOCKING #endif #define ERTS_WANT_GOT_SIGUSR1 #include "erl_poll.h" #if ERTS_POLL_USE_KQUEUE # include # include # include #endif #if ERTS_POLL_USE_SELECT # ifdef SYS_SELECT_H # include # endif #endif #ifdef NO_SYSCONF # if ERTS_POLL_USE_SELECT # include # else # include # endif #endif #include "erl_thr_progress.h" #include "erl_driver.h" #include "erl_alloc.h" #if !defined(ERTS_POLL_USE_EPOLL) \ && !defined(ERTS_POLL_USE_DEVPOLL) \ && !defined(ERTS_POLL_USE_POLL) \ && !defined(ERTS_POLL_USE_SELECT) #error "Missing implementation of erts_poll()" #endif #if defined(ERTS_KERNEL_POLL_VERSION) && !ERTS_POLL_USE_KERNEL_POLL #error "Missing kernel poll implementation of erts_poll()" #endif #if defined(ERTS_NO_KERNEL_POLL_VERSION) && ERTS_POLL_USE_KERNEL_POLL #error "Kernel poll used when it shouldn't be used" #endif #if 0 #define ERTS_POLL_DEBUG_PRINT #endif #if defined(DEBUG) && 0 #define HARD_DEBUG #endif #ifdef _DARWIN_UNLIMITED_SELECT # define FDS fd_set * # define FDS_CLR FD_CLR # define FDS_ISSET FD_ISSET # define FDS_SET FD_SET # define FDS_SIZE(n) ((((n)+NFDBITS-1)/NFDBITS)*sizeof(fd_mask)) #else # define FDS fd_set # define FDS_CLR(fd, fds) FD_CLR((fd), &(fds)) # define FDS_ISSET(fd, fds) FD_ISSET((fd), &(fds)) # define FDS_SET(fd, fds) FD_SET((fd), &(fds)) #endif #define ERTS_POLL_USE_BATCH_UPDATE_POLLSET (ERTS_POLL_USE_DEVPOLL \ || ERTS_POLL_USE_KQUEUE) #define ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE \ (defined(ERTS_SMP) || ERTS_POLL_USE_KERNEL_POLL || ERTS_POLL_USE_POLL) #define ERTS_POLL_USE_CONCURRENT_UPDATE \ (defined(ERTS_SMP) && ERTS_POLL_USE_EPOLL) #define ERTS_POLL_COALESCE_KP_RES (ERTS_POLL_USE_KQUEUE || ERTS_POLL_USE_EPOLL) #define ERTS_EV_TABLE_MIN_LENGTH 1024 #define ERTS_EV_TABLE_EXP_THRESHOLD (2048*1024) #ifdef ERTS_POLL_NEED_ASYNC_INTERRUPT_SUPPORT # define ERTS_POLL_ASYNC_INTERRUPT_SUPPORT 1 #else # define ERTS_POLL_ASYNC_INTERRUPT_SUPPORT 0 #endif #define ERTS_POLL_USE_WAKEUP_PIPE \ (ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(USE_THREADS)) #ifdef ERTS_SMP #define ERTS_POLLSET_LOCK(PS) \ erts_smp_mtx_lock(&(PS)->mtx) #define ERTS_POLLSET_UNLOCK(PS) \ erts_smp_mtx_unlock(&(PS)->mtx) #define ERTS_POLLSET_SET_POLLED_CHK(PS) \ ((int) erts_atomic32_xchg_nob(&(PS)->polled, (erts_aint32_t) 1)) #define ERTS_POLLSET_UNSET_POLLED(PS) \ erts_atomic32_set_nob(&(PS)->polled, (erts_aint32_t) 0) #define ERTS_POLLSET_IS_POLLED(PS) \ ((int) erts_atomic32_read_nob(&(PS)->polled)) #else #define ERTS_POLLSET_LOCK(PS) #define ERTS_POLLSET_UNLOCK(PS) #define ERTS_POLLSET_SET_POLLED_CHK(PS) 0 #define ERTS_POLLSET_UNSET_POLLED(PS) #define ERTS_POLLSET_IS_POLLED(PS) 0 #endif #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE #define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS) \ erts_smp_atomic32_set_nob(&(PS)->have_update_requests, (erts_aint32_t) 1) #define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS) \ erts_smp_atomic32_set_nob(&(PS)->have_update_requests, (erts_aint32_t) 0) #define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) \ ((int) erts_smp_atomic32_read_nob(&(PS)->have_update_requests)) #else #define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS) #define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS) #define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) 0 #endif #if ERTS_POLL_USE_FALLBACK # if ERTS_POLL_USE_POLL # define ERTS_POLL_NEED_FALLBACK(PS) ((PS)->no_poll_fds > 1) # elif ERTS_POLL_USE_SELECT # define ERTS_POLL_NEED_FALLBACK(PS) ((PS)->no_select_fds > 1) # endif #endif /* * --- Data types ------------------------------------------------------------ */ #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE #define ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE 128 typedef struct ErtsPollSetUpdateRequestsBlock_ ErtsPollSetUpdateRequestsBlock; struct ErtsPollSetUpdateRequestsBlock_ { ErtsPollSetUpdateRequestsBlock *next; int len; int fds[ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE]; }; #endif #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE # define ERTS_POLL_FD_FLG_INURQ (((unsigned short) 1) << 0) #endif #if ERTS_POLL_USE_FALLBACK # define ERTS_POLL_FD_FLG_INFLBCK (((unsigned short) 1) << 1) # define ERTS_POLL_FD_FLG_USEFLBCK (((unsigned short) 1) << 2) #endif #if ERTS_POLL_USE_KERNEL_POLL || defined(ERTS_SMP) # define ERTS_POLL_FD_FLG_RST (((unsigned short) 1) << 3) #endif typedef struct { #if ERTS_POLL_USE_POLL int pix; #endif ErtsPollEvents used_events; ErtsPollEvents events; #if ERTS_POLL_COALESCE_KP_RES unsigned short res_ev_ix; #endif #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE || ERTS_POLL_USE_FALLBACK unsigned short flags; #endif } ErtsFdStatus; #if ERTS_POLL_COALESCE_KP_RES /* res_ev_ix max value */ #define ERTS_POLL_MAX_RES ((1 << sizeof(unsigned short)*8) - 1) #endif #if ERTS_POLL_USE_KQUEUE #define ERTS_POLL_KQ_OP_HANDLED 1 #define ERTS_POLL_KQ_OP_DEL_R 2 #define ERTS_POLL_KQ_OP_DEL_W 3 #define ERTS_POLL_KQ_OP_ADD_R 4 #define ERTS_POLL_KQ_OP_ADD_W 5 #define ERTS_POLL_KQ_OP_ADD2_R 6 #define ERTS_POLL_KQ_OP_ADD2_W 7 #endif struct ErtsPollSet_ { ErtsPollSet next; int internal_fd_limit; ErtsFdStatus *fds_status; erts_smp_atomic_t no_of_user_fds; int fds_status_len; #if ERTS_POLL_USE_KERNEL_POLL int kp_fd; int res_events_len; #if ERTS_POLL_USE_EPOLL struct epoll_event *res_events; #elif ERTS_POLL_USE_KQUEUE struct kevent *res_events; #elif ERTS_POLL_USE_DEVPOLL struct pollfd *res_events; #endif #endif /* ERTS_POLL_USE_KERNEL_POLL */ #if ERTS_POLL_USE_POLL int next_poll_fds_ix; int no_poll_fds; int poll_fds_len; struct pollfd*poll_fds; #elif ERTS_POLL_USE_SELECT int next_sel_fd; int max_fd; #if ERTS_POLL_USE_FALLBACK int no_select_fds; #endif #ifdef _DARWIN_UNLIMITED_SELECT size_t select_fds_len; char resize_res_sets; #endif FDS input_fds; FDS res_input_fds; FDS output_fds; FDS res_output_fds; #endif #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE ErtsPollSetUpdateRequestsBlock update_requests; ErtsPollSetUpdateRequestsBlock *curr_upd_req_block; erts_smp_atomic32_t have_update_requests; #endif #ifdef ERTS_SMP erts_atomic32_t polled; erts_smp_mtx_t mtx; #endif #if ERTS_POLL_USE_WAKEUP_PIPE int wake_fds[2]; #endif #if ERTS_POLL_USE_FALLBACK int fallback_used; #endif #if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT erts_atomic32_t wakeup_state; #endif erts_smp_atomic32_t timeout; #ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS erts_smp_atomic_t no_avoided_wakeups; erts_smp_atomic_t no_avoided_interrupts; erts_smp_atomic_t no_interrupt_timed; #endif }; void erts_silence_warn_unused_result(long unused); static void fatal_error(char *format, ...); static void fatal_error_async_signal_safe(char *error_str); static int max_fds = -1; static ErtsPollSet pollsets; static erts_smp_spinlock_t pollsets_lock; #if ERTS_POLL_USE_POLL static ERTS_INLINE short ev2pollev(ErtsPollEvents ev) { #if !ERTS_POLL_USE_FALLBACK || ERTS_POLL_USE_KQUEUE return ERTS_POLL_EV_E2N(ev); #else /* Note, we only map events we are interested in */ short res_ev = (short) 0; if (ev & ERTS_POLL_EV_IN) res_ev |= ERTS_POLL_EV_NKP_IN; if (ev & ERTS_POLL_EV_OUT) res_ev |= ERTS_POLL_EV_NKP_OUT; return res_ev; #endif } static ERTS_INLINE ErtsPollEvents pollev2ev(short ev) { #if !ERTS_POLL_USE_FALLBACK || ERTS_POLL_USE_KQUEUE return ERTS_POLL_EV_N2E(ev); #else /* Note, we only map events we are interested in */ ErtsPollEvents res_ev = (ErtsPollEvents) 0; if (ev & ERTS_POLL_EV_NKP_IN) res_ev |= ERTS_POLL_EV_IN; if (ev & ERTS_POLL_EV_NKP_OUT) res_ev |= ERTS_POLL_EV_OUT; if (ev & ERTS_POLL_EV_NKP_ERR) res_ev |= ERTS_POLL_EV_ERR; if (ev & ERTS_POLL_EV_NKP_NVAL) res_ev |= ERTS_POLL_EV_NVAL; return res_ev; #endif } #endif #ifdef HARD_DEBUG static void check_poll_result(ErtsPollResFd pr[], int len); #if ERTS_POLL_USE_DEVPOLL static void check_poll_status(ErtsPollSet ps); #endif /* ERTS_POLL_USE_DEVPOLL */ #endif /* HARD_DEBUG */ #ifdef ERTS_POLL_DEBUG_PRINT static void print_misc_debug_info(void); #endif #define ERTS_POLL_NOT_WOKEN 0 #define ERTS_POLL_WOKEN -1 #define ERTS_POLL_WOKEN_INTR 1 static ERTS_INLINE void reset_wakeup_state(ErtsPollSet ps) { #if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT erts_atomic32_set_mb(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN); #endif } static ERTS_INLINE int is_woken(ErtsPollSet ps) { #if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT return erts_atomic32_read_acqb(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN; #else return 0; #endif } static ERTS_INLINE int is_interrupted_reset(ErtsPollSet ps) { #if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT return (erts_atomic32_xchg_nob(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN) == ERTS_POLL_WOKEN_INTR); #else return 0; #endif } static ERTS_INLINE void woke_up(ErtsPollSet ps) { #if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT erts_aint32_t wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state); if (wakeup_state == ERTS_POLL_NOT_WOKEN) (void) erts_atomic32_cmpxchg_nob(&ps->wakeup_state, ERTS_POLL_WOKEN, ERTS_POLL_NOT_WOKEN); ASSERT(erts_atomic32_read_nob(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN); #endif } /* * --- Wakeup pipe ----------------------------------------------------------- */ #if ERTS_POLL_USE_WAKEUP_PIPE static ERTS_INLINE void wake_poller(ErtsPollSet ps, int interrupted, int async_signal_safe) { int wake; if (async_signal_safe) wake = 1; else { erts_aint32_t wakeup_state; if (!interrupted) wakeup_state = erts_atomic32_cmpxchg_relb(&ps->wakeup_state, ERTS_POLL_WOKEN, ERTS_POLL_NOT_WOKEN); else { /* * We might unnecessarily write to the pipe, however, * that isn't problematic. */ wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state); erts_atomic32_set_relb(&ps->wakeup_state, ERTS_POLL_WOKEN_INTR); } wake = wakeup_state == ERTS_POLL_NOT_WOKEN; } /* * NOTE: This function might be called from signal handlers in the * non-smp case; therefore, it has to be async-signal safe in * the non-smp case. */ if (wake) { ssize_t res; if (ps->wake_fds[1] < 0) return; /* Not initialized yet */ do { /* write() is async-signal safe (according to posix) */ res = write(ps->wake_fds[1], "!", 1); } while (res < 0 && errno == EINTR); if (res <= 0 && errno != ERRNO_BLOCK) { if (async_signal_safe) fatal_error_async_signal_safe(__FILE__ ":XXX:wake_poller(): " "Failed to write on wakeup pipe\n"); else fatal_error("%s:%d:wake_poller(): " "Failed to write to wakeup pipe fd=%d: " "%s (%d)\n", __FILE__, __LINE__, ps->wake_fds[1], erl_errno_id(errno), errno); } } } static ERTS_INLINE void cleanup_wakeup_pipe(ErtsPollSet ps) { #if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT int intr = 0; #endif int fd = ps->wake_fds[0]; int res; do { char buf[32]; res = read(fd, buf, sizeof(buf)); #if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT if (res > 0) intr = 1; #endif } while (res > 0 || (res < 0 && errno == EINTR)); if (res < 0 && errno != ERRNO_BLOCK) { fatal_error("%s:%d:cleanup_wakeup_pipe(): " "Failed to read on wakeup pipe fd=%d: " "%s (%d)\n", __FILE__, __LINE__, fd, erl_errno_id(errno), errno); } #if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT if (intr) erts_atomic32_set_nob(&ps->wakeup_state, ERTS_POLL_WOKEN_INTR); #endif } static void create_wakeup_pipe(ErtsPollSet ps) { int do_wake = 0; int wake_fds[2]; ps->wake_fds[0] = -1; ps->wake_fds[1] = -1; if (pipe(wake_fds) < 0) { fatal_error("%s:%d:create_wakeup_pipe(): " "Failed to create pipe: %s (%d)\n", __FILE__, __LINE__, erl_errno_id(errno), errno); } SET_NONBLOCKING(wake_fds[0]); SET_NONBLOCKING(wake_fds[1]); #ifdef ERTS_POLL_DEBUG_PRINT erts_printf("wakeup fds = {%d, %d}\n", wake_fds[0], wake_fds[1]); #endif ERTS_POLL_EXPORT(erts_poll_control)(ps, wake_fds[0], ERTS_POLL_EV_IN, 1, &do_wake); #if ERTS_POLL_USE_FALLBACK /* We depend on the wakeup pipe being handled by kernel poll */ if (ps->fds_status[wake_fds[0]].flags & ERTS_POLL_FD_FLG_INFLBCK) fatal_error("%s:%d:create_wakeup_pipe(): Internal error\n", __FILE__, __LINE__); #endif if (ps->internal_fd_limit <= wake_fds[1]) ps->internal_fd_limit = wake_fds[1] + 1; if (ps->internal_fd_limit <= wake_fds[0]) ps->internal_fd_limit = wake_fds[0] + 1; ps->wake_fds[0] = wake_fds[0]; ps->wake_fds[1] = wake_fds[1]; } #endif /* ERTS_POLL_USE_WAKEUP_PIPE */ /* * --- Poll set update requests ---------------------------------------------- */ #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE static ERTS_INLINE void enqueue_update_request(ErtsPollSet ps, int fd) { ErtsPollSetUpdateRequestsBlock *urqbp; ASSERT(fd < ps->fds_status_len); if (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INURQ) return; if (ps->update_requests.len == 0) ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(ps); urqbp = ps->curr_upd_req_block; if (urqbp->len == ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE) { ASSERT(!urqbp->next); urqbp = erts_alloc(ERTS_ALC_T_POLLSET_UPDREQ, sizeof(ErtsPollSetUpdateRequestsBlock)); ps->curr_upd_req_block->next = urqbp; ps->curr_upd_req_block = urqbp; urqbp->next = NULL; urqbp->len = 0; } ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_INURQ; urqbp->fds[urqbp->len++] = fd; } static ERTS_INLINE void free_update_requests_block(ErtsPollSet ps, ErtsPollSetUpdateRequestsBlock *urqbp) { if (urqbp != &ps->update_requests) erts_free(ERTS_ALC_T_POLLSET_UPDREQ, (void *) urqbp); else { urqbp->next = NULL; urqbp->len = 0; } } #endif /* ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE */ /* * --- Growing poll set structures ------------------------------------------- */ int ERTS_POLL_EXPORT(erts_poll_get_table_len) (int new_len) { if (new_len < ERTS_EV_TABLE_MIN_LENGTH) { new_len = ERTS_EV_TABLE_MIN_LENGTH; } else if (new_len < ERTS_EV_TABLE_EXP_THRESHOLD) { /* find next power of 2 */ --new_len; new_len |= new_len >> 1; new_len |= new_len >> 2; new_len |= new_len >> 4; new_len |= new_len >> 8; new_len |= new_len >> 16; ++new_len; } else { /* grow incrementally */ new_len += ERTS_EV_TABLE_EXP_THRESHOLD; } return new_len; } #if ERTS_POLL_USE_KERNEL_POLL static void grow_res_events(ErtsPollSet ps, int new_len) { size_t new_size = sizeof( #if ERTS_POLL_USE_EPOLL struct epoll_event #elif ERTS_POLL_USE_DEVPOLL struct pollfd #elif ERTS_POLL_USE_KQUEUE struct kevent #endif ) * ERTS_POLL_EXPORT(erts_poll_get_table_len)(new_len); /* We do not need to save previously stored data */ if (ps->res_events) erts_free(ERTS_ALC_T_POLL_RES_EVS, ps->res_events); ps->res_events = erts_alloc(ERTS_ALC_T_POLL_RES_EVS, new_size); ps->res_events_len = new_len; } #endif /* ERTS_POLL_USE_KERNEL_POLL */ #if ERTS_POLL_USE_POLL static void grow_poll_fds(ErtsPollSet ps, int min_ix) { int i; int new_len = ERTS_POLL_EXPORT(erts_poll_get_table_len)(min_ix + 1); if (new_len > max_fds) new_len = max_fds; ps->poll_fds = (ps->poll_fds_len ? erts_realloc(ERTS_ALC_T_POLL_FDS, ps->poll_fds, sizeof(struct pollfd)*new_len) : erts_alloc(ERTS_ALC_T_POLL_FDS, sizeof(struct pollfd)*new_len)); for (i = ps->poll_fds_len; i < new_len; i++) { ps->poll_fds[i].fd = -1; ps->poll_fds[i].events = (short) 0; ps->poll_fds[i].revents = (short) 0; } ps->poll_fds_len = new_len; } #endif #ifdef _DARWIN_UNLIMITED_SELECT static void grow_select_fds(ErtsPollSet ps, int fd) { int new_len = ERTS_POLL_EXPORT(erts_poll_get_table_len)(fd + 1); if (new_len > max_fds) new_len = max_fds; new_len = FDS_SIZE(new_len); ps->input_fds = ps->select_fds_len ? erts_realloc(ERTS_ALC_T_SELECT_FDS, ps->input_fds, new_len) : erts_alloc(ERTS_ALC_T_SELECT_FDS, new_len); ps->output_fds = ps->select_fds_len ? erts_realloc(ERTS_ALC_T_SELECT_FDS, ps->output_fds, new_len) : erts_alloc(ERTS_ALC_T_SELECT_FDS, new_len); memset((void *) ps->input_fds + ps->select_fds_len, 0, new_len - ps->select_fds_len); memset((void *) ps->output_fds + ps->select_fds_len, 0, new_len - ps->select_fds_len); ps->select_fds_len = new_len; ps->resize_res_sets = 1; } #endif /* _DARWIN_UNLIMITED_SELECT */ static void grow_fds_status(ErtsPollSet ps, int min_fd) { int i; int new_len = ERTS_POLL_EXPORT(erts_poll_get_table_len)(min_fd + 1); ASSERT(min_fd < max_fds); if (new_len > max_fds) new_len = max_fds; ps->fds_status = (ps->fds_status_len ? erts_realloc(ERTS_ALC_T_FD_STATUS, ps->fds_status, sizeof(ErtsFdStatus)*new_len) : erts_alloc(ERTS_ALC_T_FD_STATUS, sizeof(ErtsFdStatus)*new_len)); for (i = ps->fds_status_len; i < new_len; i++) { #if ERTS_POLL_USE_POLL ps->fds_status[i].pix = -1; #endif ps->fds_status[i].used_events = (ErtsPollEvents) 0; ps->fds_status[i].events = (ErtsPollEvents) 0; #if ERTS_POLL_COALESCE_KP_RES ps->fds_status[i].res_ev_ix = (unsigned short) ERTS_POLL_MAX_RES; #endif #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE || ERTS_POLL_USE_FALLBACK ps->fds_status[i].flags = (unsigned short) 0; #endif } ps->fds_status_len = new_len; } /* * --- Selecting fd to poll on ----------------------------------------------- */ #if ERTS_POLL_USE_FALLBACK static int update_fallback_pollset(ErtsPollSet ps, int fd); #endif static ERTS_INLINE int need_update(ErtsPollSet ps, int fd) { #if ERTS_POLL_USE_KERNEL_POLL int reset; #endif ASSERT(fd < ps->fds_status_len); #if ERTS_POLL_USE_KERNEL_POLL reset = (int) (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST); if (reset && !ps->fds_status[fd].used_events) { ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_RST; reset = 0; } #elif defined(ERTS_SMP) ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_RST; #endif if (ps->fds_status[fd].used_events != ps->fds_status[fd].events) return 1; #if ERTS_POLL_USE_KERNEL_POLL return reset; #else return 0; #endif } #if ERTS_POLL_USE_BATCH_UPDATE_POLLSET #if ERTS_POLL_USE_KQUEUE #define ERTS_POLL_MIN_BATCH_BUF_SIZE 128 #else #define ERTS_POLL_MIN_BATCH_BUF_SIZE 64 #endif typedef struct { int len; int size; #if ERTS_POLL_USE_DEVPOLL struct pollfd *buf; #elif ERTS_POLL_USE_KQUEUE struct kevent *buf; struct kevent *ebuf; #endif } ErtsPollBatchBuf; static ERTS_INLINE void setup_batch_buf(ErtsPollSet ps, ErtsPollBatchBuf *bbp) { bbp->len = 0; #if ERTS_POLL_USE_DEVPOLL bbp->size = ps->res_events_len; bbp->buf = ps->res_events; #elif ERTS_POLL_USE_KQUEUE bbp->size = ps->res_events_len/2; bbp->buf = ps->res_events; bbp->ebuf = bbp->buf + bbp->size; #endif } #if ERTS_POLL_USE_DEVPOLL static void write_batch_buf(ErtsPollSet ps, ErtsPollBatchBuf *bbp) { ssize_t wres; char *buf = (char *) bbp->buf; size_t buf_size = sizeof(struct pollfd)*bbp->len; while (1) { wres = write(ps->kp_fd, (void *) buf, buf_size); if (wres < 0) { if (errno == EINTR) continue; fatal_error("%s:%d:write_batch_buf(): " "Failed to write to /dev/poll: " "%s (%d)\n", __FILE__, __LINE__, erl_errno_id(errno), errno); } buf_size -= wres; if (buf_size <= 0) break; buf += wres; } if (buf_size < 0) { fatal_error("%s:%d:write_devpoll_buf(): Internal error\n", __FILE__, __LINE__); } bbp->len = 0; } #elif ERTS_POLL_USE_KQUEUE static void write_batch_buf(ErtsPollSet ps, ErtsPollBatchBuf *bbp) { int res; int len = bbp->len; struct kevent *buf = bbp->buf; struct timespec ts = {0, 0}; do { res = kevent(ps->kp_fd, buf, len, NULL, 0, &ts); } while (res < 0 && errno == EINTR); if (res < 0) { int i; struct kevent *ebuf = bbp->ebuf; do { res = kevent(ps->kp_fd, buf, len, ebuf, len, &ts); } while (res < 0 && errno == EINTR); if (res < 0) { fatal_error("%s:%d: kevent() failed: %s (%d)\n", __FILE__, __LINE__, erl_errno_id(errno), errno); } for (i = 0; i < res; i++) { if (ebuf[i].flags & EV_ERROR) { short filter; int fd = (int) ebuf[i].ident; switch ((int) (long) ebuf[i].udata) { /* * Since we use a lazy update approach EV_DELETE will * frequently fail. This since kqueue automatically * removes a file descriptor that is closed from the * poll set. */ case ERTS_POLL_KQ_OP_DEL_R: case ERTS_POLL_KQ_OP_DEL_W: case ERTS_POLL_KQ_OP_HANDLED: break; /* * According to the kqueue man page EVFILT_READ support * does not imply EVFILT_WRITE support; therefore, * if an EV_ADD fail, we may have to remove other * events on this fd in the kqueue pollset before * adding fd to the fallback pollset. */ case ERTS_POLL_KQ_OP_ADD_W: if (ps->fds_status[fd].used_events & ERTS_POLL_EV_IN) { filter = EVFILT_READ; goto rm_add_fb; } goto add_fb; case ERTS_POLL_KQ_OP_ADD_R: if (ps->fds_status[fd].used_events & ERTS_POLL_EV_OUT) { filter = EVFILT_WRITE; goto rm_add_fb; } goto add_fb; case ERTS_POLL_KQ_OP_ADD2_W: case ERTS_POLL_KQ_OP_ADD2_R: { int j; for (j = i+1; j < res; j++) { if (fd == (int) ebuf[j].ident) { ebuf[j].udata = (void *) ERTS_POLL_KQ_OP_HANDLED; if (!(ebuf[j].flags & EV_ERROR)) { switch ((int) (long) ebuf[j].udata) { case ERTS_POLL_KQ_OP_ADD2_W: filter = EVFILT_WRITE; goto rm_add_fb; case ERTS_POLL_KQ_OP_ADD2_R: filter = EVFILT_READ; goto rm_add_fb; default: fatal_error("%s:%d:write_batch_buf(): " "Internal error", __FILE__, __LINE__); break; } } goto add_fb; } } /* The other add succeded... */ filter = ((((int) (long) ebuf[i].udata) == ERTS_POLL_KQ_OP_ADD2_W) ? EVFILT_READ : EVFILT_WRITE); rm_add_fb: { struct kevent kev; struct timespec ts = {0, 0}; EV_SET(&kev, fd, filter, EV_DELETE, 0, 0, 0); (void) kevent(ps->kp_fd, &kev, 1, NULL, 0, &ts); } add_fb: ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_USEFLBCK; ASSERT(ps->fds_status[fd].used_events); ps->fds_status[fd].used_events = 0; erts_smp_atomic_dec_nob(&ps->no_of_user_fds); update_fallback_pollset(ps, fd); ASSERT(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK); break; } default: fatal_error("%s:%d:write_batch_buf(): Internal error", __FILE__, __LINE__); break; } } } } bbp->len = 0; } #endif /* ERTS_POLL_USE_KQUEUE */ static ERTS_INLINE void batch_update_pollset(ErtsPollSet ps, int fd, ErtsPollBatchBuf *bbp) { int buf_len; #if ERTS_POLL_USE_DEVPOLL short events; struct pollfd *buf; #elif ERTS_POLL_USE_KQUEUE struct kevent *buf; #endif #ifdef ERTS_POLL_DEBUG_PRINT erts_printf("Doing lazy update on fd=%d\n", fd); #endif if (!need_update(ps, fd)) return; /* Make sure we have room for at least maximum no of entries per fd */ if (bbp->size - bbp->len < 2) write_batch_buf(ps, bbp); buf_len = bbp->len; buf = bbp->buf; ASSERT(fd < ps->fds_status_len); #if ERTS_POLL_USE_DEVPOLL events = ERTS_POLL_EV_E2N(ps->fds_status[fd].events); if (!events) { buf[buf_len].events = POLLREMOVE; erts_smp_atomic_dec_nob(&ps->no_of_user_fds); } else if (!ps->fds_status[fd].used_events) { buf[buf_len].events = events; erts_smp_atomic_inc_nob(&ps->no_of_user_fds); } else { if ((ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST) || (ps->fds_status[fd].used_events & ~events)) { /* Reset or removed events... */ buf[buf_len].fd = fd; buf[buf_len].events = POLLREMOVE; buf[buf_len++].revents = 0; } buf[buf_len].events = events; } buf[buf_len].fd = fd; buf[buf_len++].revents = 0; #elif ERTS_POLL_USE_KQUEUE if (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK) { if (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_USEFLBCK) update_fallback_pollset(ps, fd); else { /* Remove from fallback and try kqueue */ ErtsPollEvents events = ps->fds_status[fd].events; ps->fds_status[fd].events = (ErtsPollEvents) 0; update_fallback_pollset(ps, fd); ASSERT(!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK)); if (events) { ps->fds_status[fd].events = events; goto try_kqueue; } } } else { ErtsPollEvents events, used_events; int mod_w, mod_r; try_kqueue: events = ERTS_POLL_EV_E2N(ps->fds_status[fd].events); used_events = ERTS_POLL_EV_E2N(ps->fds_status[fd].used_events); if (!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST)) { if (!used_events && (events & ERTS_POLL_EV_IN) && (events & ERTS_POLL_EV_OUT)) goto do_add_rw; mod_r = ((events & ERTS_POLL_EV_IN) != (used_events & ERTS_POLL_EV_IN)); mod_w = ((events & ERTS_POLL_EV_OUT) != (used_events & ERTS_POLL_EV_OUT)); goto do_mod; } else { /* Reset */ if ((events & ERTS_POLL_EV_IN) && (events & ERTS_POLL_EV_OUT)) { do_add_rw: EV_SET(&buf[buf_len], fd, EVFILT_READ, EV_ADD, 0, 0, (void *) ERTS_POLL_KQ_OP_ADD2_R); buf_len++; EV_SET(&buf[buf_len], fd, EVFILT_WRITE, EV_ADD, 0, 0, (void *) ERTS_POLL_KQ_OP_ADD2_W); buf_len++; } else { mod_r = 1; mod_w = 1; do_mod: if (mod_r) { if (events & ERTS_POLL_EV_IN) { EV_SET(&buf[buf_len], fd, EVFILT_READ, EV_ADD, 0, 0, (void *) ERTS_POLL_KQ_OP_ADD_R); buf_len++; } else if (used_events & ERTS_POLL_EV_IN) { EV_SET(&buf[buf_len], fd, EVFILT_READ, EV_DELETE, 0, 0, (void *) ERTS_POLL_KQ_OP_DEL_R); buf_len++; } } if (mod_w) { if (events & ERTS_POLL_EV_OUT) { EV_SET(&buf[buf_len], fd, EVFILT_WRITE, EV_ADD, 0, 0, (void *) ERTS_POLL_KQ_OP_ADD_W); buf_len++; } else if (used_events & ERTS_POLL_EV_OUT) { EV_SET(&buf[buf_len], fd, EVFILT_WRITE, EV_DELETE, 0, 0, (void *) ERTS_POLL_KQ_OP_DEL_W); buf_len++; } } } } if (used_events) { if (!events) { erts_smp_atomic_dec_nob(&ps->no_of_user_fds); } } else { if (events) erts_smp_atomic_inc_nob(&ps->no_of_user_fds); } ASSERT((events & ~(ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) == 0); ASSERT((used_events & ~(ERTS_POLL_EV_IN|ERTS_POLL_EV_OUT)) == 0); } #endif ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_RST; ps->fds_status[fd].used_events = ps->fds_status[fd].events; bbp->len = buf_len; } #else /* !ERTS_POLL_USE_BATCH_UPDATE_POLLSET */ #if ERTS_POLL_USE_EPOLL static int #if ERTS_POLL_USE_CONCURRENT_UPDATE conc_update_pollset(ErtsPollSet ps, int fd, int *update_fallback) #else update_pollset(ErtsPollSet ps, int fd) #endif { int res; int op; struct epoll_event epe_templ; struct epoll_event epe; ASSERT(fd < ps->fds_status_len); if (!need_update(ps, fd)) return 0; #ifdef ERTS_POLL_DEBUG_PRINT erts_printf("Doing update on fd=%d\n", fd); #endif if (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK) { #if ERTS_POLL_USE_CONCURRENT_UPDATE if (!*update_fallback) { *update_fallback = 1; return 0; } #endif if (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_USEFLBCK) { return update_fallback_pollset(ps, fd); } else { /* Remove from fallback and try epoll */ ErtsPollEvents events = ps->fds_status[fd].events; ps->fds_status[fd].events = (ErtsPollEvents) 0; res = update_fallback_pollset(ps, fd); ASSERT(!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK)); if (!events) return res; ps->fds_status[fd].events = events; } } epe_templ.events = ERTS_POLL_EV_E2N(ps->fds_status[fd].events); epe_templ.data.fd = fd; #ifdef VALGRIND /* Silence invalid valgrind warning ... */ memset((void *) &epe.data, 0, sizeof(epoll_data_t)); #endif if (epe_templ.events && ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST) { do { /* We init 'epe' every time since epoll_ctl() may modify it (not declared const and not documented as const). */ epe.events = epe_templ.events; epe.data.fd = epe_templ.data.fd; res = epoll_ctl(ps->kp_fd, EPOLL_CTL_DEL, fd, &epe); } while (res != 0 && errno == EINTR); erts_smp_atomic_dec_nob(&ps->no_of_user_fds); ps->fds_status[fd].used_events = 0; } if (!epe_templ.events) { /* A note on EPOLL_CTL_DEL: linux kernel versions before 2.6.9 need a non-NULL event pointer even though it is ignored... */ op = EPOLL_CTL_DEL; erts_smp_atomic_dec_nob(&ps->no_of_user_fds); } else if (!ps->fds_status[fd].used_events) { op = EPOLL_CTL_ADD; erts_smp_atomic_inc_nob(&ps->no_of_user_fds); } else { op = EPOLL_CTL_MOD; } do { /* We init 'epe' every time since epoll_ctl() may modify it (not declared const and not documented as const). */ epe.events = epe_templ.events; epe.data.fd = epe_templ.data.fd; res = epoll_ctl(ps->kp_fd, op, fd, &epe); } while (res != 0 && errno == EINTR); #if defined(ERTS_POLL_DEBUG_PRINT) && 1 { int saved_errno = errno; erts_printf("%s = epoll_ctl(%d, %s, %d, {Ox%x, %d})\n", res == 0 ? "0" : erl_errno_id(errno), ps->kp_fd, (op == EPOLL_CTL_ADD ? "EPOLL_CTL_ADD" : (op == EPOLL_CTL_MOD ? "EPOLL_CTL_MOD" : (op == EPOLL_CTL_DEL ? "EPOLL_CTL_DEL" : "UNKNOWN"))), fd, epe_templ.events, fd); errno = saved_errno; } #endif if (res == 0) ps->fds_status[fd].used_events = ps->fds_status[fd].events; else { switch (op) { case EPOLL_CTL_MOD: epe.events = 0; do { /* We init 'epe' every time since epoll_ctl() may modify it (not declared const and not documented as const). */ epe.events = 0; epe.data.fd = fd; res = epoll_ctl(ps->kp_fd, EPOLL_CTL_DEL, fd, &epe); } while (res != 0 && errno == EINTR); ps->fds_status[fd].used_events = 0; /* Fall through ... */ case EPOLL_CTL_ADD: { ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_USEFLBCK; erts_smp_atomic_dec_nob(&ps->no_of_user_fds); #if ERTS_POLL_USE_CONCURRENT_UPDATE if (!*update_fallback) { *update_fallback = 1; return 0; } #endif ASSERT(!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK)); res = update_fallback_pollset(ps, fd); ASSERT(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK); break; } case EPOLL_CTL_DEL: { /* * Since we use a lazy update approach EPOLL_CTL_DEL will * frequently fail. This since epoll automatically removes * a filedescriptor that is closed from the poll set. */ ps->fds_status[fd].used_events = 0; res = 0; break; } default: fatal_error("%s:%d:update_pollset(): Internal error\n", __FILE__, __LINE__); break; } } ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_RST; return res; } #if ERTS_POLL_USE_CONCURRENT_UPDATE static int update_pollset(ErtsPollSet ps, int fd) { int update_fallback = 1; return conc_update_pollset(ps, fd, &update_fallback); } #endif #endif /* ERTS_POLL_USE_EPOLL */ #endif /* ERTS_POLL_USE_BATCH_UPDATE_POLLSET */ #if ERTS_POLL_USE_POLL || ERTS_POLL_USE_SELECT || ERTS_POLL_USE_FALLBACK #if ERTS_POLL_USE_FALLBACK static int update_fallback_pollset(ErtsPollSet ps, int fd) #else static int update_pollset(ErtsPollSet ps, int fd) #endif { #ifdef ERTS_POLL_DEBUG_PRINT #if ERTS_POLL_USE_FALLBACK erts_printf("Doing fallback update on fd=%d\n", fd); #else erts_printf("Doing update on fd=%d\n", fd); #endif #endif ASSERT(fd < ps->fds_status_len); #if ERTS_POLL_USE_FALLBACK ASSERT(ps->fds_status[fd].used_events ? (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK) : (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_USEFLBCK)); #endif if (!need_update(ps, fd)) return 0; #if ERTS_POLL_USE_FALLBACK ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_RST; #endif #if ERTS_POLL_USE_POLL /* --- poll -------------------------------- */ if (!ps->fds_status[fd].events) { int pix = ps->fds_status[fd].pix; int last_pix; if (pix < 0) { #if ERTS_POLL_USE_FALLBACK ASSERT(!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK)); #endif return -1; } #if ERTS_POLL_USE_FALLBACK ASSERT(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK); #endif erts_smp_atomic_dec_nob(&ps->no_of_user_fds); last_pix = --ps->no_poll_fds; if (pix != last_pix) { /* Move last pix to this pix */ ps->poll_fds[pix].fd = ps->poll_fds[last_pix].fd; ps->poll_fds[pix].events = ps->poll_fds[last_pix].events; ps->poll_fds[pix].revents = ps->poll_fds[last_pix].revents; ps->fds_status[ps->poll_fds[pix].fd].pix = pix; } /* Clear last pix */ ps->poll_fds[last_pix].fd = -1; ps->poll_fds[last_pix].events = (short) 0; ps->poll_fds[last_pix].revents = (short) 0; /* Clear this fd status */ ps->fds_status[fd].pix = -1; ps->fds_status[fd].used_events = (ErtsPollEvents) 0; #if ERTS_POLL_USE_FALLBACK ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_INFLBCK; #endif } else { int pix = ps->fds_status[fd].pix; if (pix < 0) { #if ERTS_POLL_USE_FALLBACK ASSERT(!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK) || fd == ps->kp_fd); #endif erts_smp_atomic_inc_nob(&ps->no_of_user_fds); ps->fds_status[fd].pix = pix = ps->no_poll_fds++; if (pix >= ps->poll_fds_len) grow_poll_fds(ps, pix); ps->poll_fds[pix].fd = fd; ps->fds_status[fd].pix = pix; #if ERTS_POLL_USE_FALLBACK ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_INFLBCK; #endif } #if ERTS_POLL_USE_FALLBACK ASSERT(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK); #endif /* Events to be used in next poll */ ps->poll_fds[pix].events = ev2pollev(ps->fds_status[fd].events); if (ps->poll_fds[pix].revents) { /* Remove result events that we should not poll for anymore */ ps->poll_fds[pix].revents &= ev2pollev(~(~ps->fds_status[fd].used_events & ps->fds_status[fd].events)); } /* Save events to be used in next poll */ ps->fds_status[fd].used_events = ps->fds_status[fd].events; } return 0; #elif ERTS_POLL_USE_SELECT /* --- select ------------------------------ */ { ErtsPollEvents events = ps->fds_status[fd].events; #ifdef _DARWIN_UNLIMITED_SELECT if (FDS_SIZE(fd) > ps->select_fds_len) grow_select_fds(ps, fd); #endif if ((ERTS_POLL_EV_IN & events) != (ERTS_POLL_EV_IN & ps->fds_status[fd].used_events)) { if (ERTS_POLL_EV_IN & events) { FDS_SET(fd, ps->input_fds); } else { FDS_CLR(fd, ps->input_fds); } } if ((ERTS_POLL_EV_OUT & events) != (ERTS_POLL_EV_OUT & ps->fds_status[fd].used_events)) { if (ERTS_POLL_EV_OUT & events) { FDS_SET(fd, ps->output_fds); } else { FDS_CLR(fd, ps->output_fds); } } if (!ps->fds_status[fd].used_events) { ASSERT(events); erts_smp_atomic_inc_nob(&ps->no_of_user_fds); #if ERTS_POLL_USE_FALLBACK ps->no_select_fds++; ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_INFLBCK; #endif } else if (!events) { ASSERT(ps->fds_status[fd].used_events); erts_smp_atomic_dec_nob(&ps->no_of_user_fds); ps->fds_status[fd].events = events; #if ERTS_POLL_USE_FALLBACK ps->no_select_fds--; ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_INFLBCK; #endif } ps->fds_status[fd].used_events = events; if (events && fd > ps->max_fd) ps->max_fd = fd; else if (!events && fd == ps->max_fd) { int max = ps->max_fd; for (max = ps->max_fd; max >= 0; max--) if (ps->fds_status[max].used_events) break; ps->max_fd = max; } } return 0; #endif } #endif /* ERTS_POLL_USE_POLL || ERTS_POLL_USE_SELECT || ERTS_POLL_USE_FALLBACK */ #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE static void handle_update_requests(ErtsPollSet ps) { ErtsPollSetUpdateRequestsBlock *urqbp = &ps->update_requests; #if ERTS_POLL_USE_BATCH_UPDATE_POLLSET ErtsPollBatchBuf bb; setup_batch_buf(ps, &bb); #endif while (urqbp) { ErtsPollSetUpdateRequestsBlock *free_urqbp = urqbp; int i; int len = urqbp->len; for (i = 0; i < len; i++) { int fd = urqbp->fds[i]; ASSERT(fd < ps->fds_status_len); ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_INURQ; #if ERTS_POLL_USE_BATCH_UPDATE_POLLSET batch_update_pollset(ps, fd, &bb); #else update_pollset(ps, fd); #endif } free_urqbp = urqbp; urqbp = urqbp->next; free_update_requests_block(ps, free_urqbp); } #if ERTS_POLL_USE_BATCH_UPDATE_POLLSET if (bb.len) write_batch_buf(ps, &bb); #endif ps->curr_upd_req_block = &ps->update_requests; #if ERTS_POLL_USE_DEVPOLL && defined(HARD_DEBUG) check_poll_status(ps); #endif ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(ps); } #endif /* ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE */ static ERTS_INLINE ErtsPollEvents poll_control(ErtsPollSet ps, int fd, ErtsPollEvents events, int on, int *do_wake) { ErtsPollEvents new_events; if (fd < ps->internal_fd_limit || fd >= max_fds) { if (fd < 0) { new_events = ERTS_POLL_EV_ERR; goto done; } #if ERTS_POLL_USE_KERNEL_POLL if (fd == ps->kp_fd) { new_events = ERTS_POLL_EV_NVAL; goto done; } #endif #if ERTS_POLL_USE_WAKEUP_PIPE if (fd == ps->wake_fds[0] || fd == ps->wake_fds[1]) { new_events = ERTS_POLL_EV_NVAL; goto done; } #endif } if (fd >= ps->fds_status_len) grow_fds_status(ps, fd); ASSERT(fd < ps->fds_status_len); new_events = ps->fds_status[fd].events; if (events == 0) { *do_wake = 0; goto done; } if (on) new_events |= events; else new_events &= ~events; if (new_events == (ErtsPollEvents) 0) { #if ERTS_POLL_USE_KERNEL_POLL || defined(ERTS_SMP) ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_RST; #endif #if ERTS_POLL_USE_FALLBACK ps->fds_status[fd].flags &= ~ERTS_POLL_FD_FLG_USEFLBCK; #endif } ps->fds_status[fd].events = new_events; if (new_events == ps->fds_status[fd].used_events #if ERTS_POLL_USE_KERNEL_POLL || defined(ERTS_SMP) && !(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST) #endif ) { *do_wake = 0; goto done; } #if !ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE if (update_pollset(ps, fd) != 0) new_events = ERTS_POLL_EV_ERR; #else /* ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE */ #if ERTS_POLL_USE_CONCURRENT_UPDATE if (ERTS_POLLSET_IS_POLLED(ps)) { int update_fallback = 0; conc_update_pollset(ps, fd, &update_fallback); if (!update_fallback) { *do_wake = 0; /* no need to wake kernel poller */ goto done; } } #endif enqueue_update_request(ps, fd); #ifdef ERTS_SMP /* * If new events have been added, we need to wake up the * polling thread, but if events have been removed we don't. */ if ((new_events && (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_RST)) || (~ps->fds_status[fd].used_events & new_events)) *do_wake = 1; #endif /* ERTS_SMP */ #endif /* ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE */ done: #ifdef ERTS_POLL_DEBUG_PRINT erts_printf("0x%x = poll_control(ps, %d, 0x%x, %s) do_wake=%d\n", (int) new_events, fd, (int) events, (on ? "on" : "off"), *do_wake); #endif return new_events; } void ERTS_POLL_EXPORT(erts_poll_controlv)(ErtsPollSet ps, ErtsPollControlEntry pcev[], int len) { int i; int do_wake; int final_do_wake = 0; ERTS_POLLSET_LOCK(ps); for (i = 0; i < len; i++) { do_wake = 0; pcev[i].events = poll_control(ps, pcev[i].fd, pcev[i].events, pcev[i].on, &do_wake); final_do_wake |= do_wake; } ERTS_POLLSET_UNLOCK(ps); #ifdef ERTS_SMP if (final_do_wake) wake_poller(ps, 0, 0); #endif /* ERTS_SMP */ } ErtsPollEvents ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet ps, ErtsSysFdType fd, ErtsPollEvents events, int on, int* do_wake) /* In: Wake up polling thread */ /* Out: Poller is woken */ { ErtsPollEvents res; ERTS_POLLSET_LOCK(ps); res = poll_control(ps, fd, events, on, do_wake); ERTS_POLLSET_UNLOCK(ps); #ifdef ERTS_SMP if (*do_wake) { wake_poller(ps, 0, 0); } #endif /* ERTS_SMP */ return res; } /* * --- Wait on poll set ------------------------------------------------------ */ #if ERTS_POLL_USE_KERNEL_POLL static ERTS_INLINE int save_kp_result(ErtsPollSet ps, ErtsPollResFd pr[], int max_res, int chk_fds_res) { int res = 0; int i; int n = chk_fds_res < max_res ? chk_fds_res : max_res; #if ERTS_POLL_USE_WAKEUP_PIPE int wake_fd = ps->wake_fds[0]; #endif for (i = 0; i < n; i++) { #if ERTS_POLL_USE_EPOLL /* --- epoll ------------------------------- */ if (ps->res_events[i].events) { int fd = ps->res_events[i].data.fd; int ix; ErtsPollEvents revents; #if ERTS_POLL_USE_WAKEUP_PIPE if (fd == wake_fd) { cleanup_wakeup_pipe(ps); continue; } #endif ASSERT(!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK)); /* epoll_wait() can repeat the same fd in result array... */ ix = (int) ps->fds_status[fd].res_ev_ix; ASSERT(ix >= 0); if (ix >= res || pr[ix].fd != fd) { ix = res; pr[ix].fd = fd; pr[ix].events = (ErtsPollEvents) 0; } revents = ERTS_POLL_EV_N2E(ps->res_events[i].events); pr[ix].events |= revents; if (revents) { if (res == ix) { ps->fds_status[fd].res_ev_ix = (unsigned short) ix; res++; } } } #elif ERTS_POLL_USE_KQUEUE /* --- kqueue ------------------------------ */ struct kevent *ev; int fd; int ix; ev = &ps->res_events[i]; fd = (int) ev->ident; ASSERT(fd < ps->fds_status_len); ASSERT(!(ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INFLBCK)); ix = (int) ps->fds_status[fd].res_ev_ix; ASSERT(ix >= 0); if (ix >= res || pr[ix].fd != fd) { ix = res; pr[ix].fd = (int) ev->ident; pr[ix].events = (ErtsPollEvents) 0; } if (ev->filter == EVFILT_READ) { #if ERTS_POLL_USE_WAKEUP_PIPE if (fd == wake_fd) { cleanup_wakeup_pipe(ps); continue; } #endif pr[ix].events |= ERTS_POLL_EV_IN; } else if (ev->filter == EVFILT_WRITE) pr[ix].events |= ERTS_POLL_EV_OUT; if (ev->flags & (EV_ERROR|EV_EOF)) { if ((ev->flags & EV_ERROR) && (((int) ev->data) == EBADF)) pr[ix].events |= ERTS_POLL_EV_NVAL; else pr[ix].events |= ERTS_POLL_EV_ERR; } if (pr[ix].events) { if (res == ix) { ps->fds_status[fd].res_ev_ix = (unsigned short) ix; res++; } } #elif ERTS_POLL_USE_DEVPOLL /* --- devpoll ----------------------------- */ if (ps->res_events[i].revents) { int fd = ps->res_events[i].fd; ErtsPollEvents revents; #if ERTS_POLL_USE_WAKEUP_PIPE if (fd == wake_fd) { cleanup_wakeup_pipe(ps); continue; } #endif revents = ERTS_POLL_EV_N2E(ps->res_events[i].events); pr[res].fd = fd; pr[res].events = revents; res++; } #endif } return res; } #endif /* ERTS_POLL_USE_KERNEL_POLL */ #if ERTS_POLL_USE_FALLBACK static int get_kp_results(ErtsPollSet ps, ErtsPollResFd pr[], int max_res) { int res; #if ERTS_POLL_USE_KQUEUE struct timespec ts = {0, 0}; #endif if (max_res > ps->res_events_len) grow_res_events(ps, max_res); do { #if ERTS_POLL_USE_EPOLL res = epoll_wait(ps->kp_fd, ps->res_events, max_res, 0); #elif ERTS_POLL_USE_KQUEUE res = kevent(ps->kp_fd, NULL, 0, ps->res_events, max_res, &ts); #endif } while (res < 0 && errno == EINTR); if (res < 0) { fatal_error("%s:%d: %s() failed: %s (%d)\n", __FILE__, __LINE__, #if ERTS_POLL_USE_EPOLL "epoll_wait", #elif ERTS_POLL_USE_KQUEUE "kevent", #endif erl_errno_id(errno), errno); } return save_kp_result(ps, pr, max_res, res); } #endif /* ERTS_POLL_USE_FALLBACK */ static ERTS_INLINE int save_poll_result(ErtsPollSet ps, ErtsPollResFd pr[], int max_res, int chk_fds_res, int ebadf) { #if ERTS_POLL_USE_DEVPOLL return save_kp_result(ps, pr, max_res, chk_fds_res); #elif ERTS_POLL_USE_FALLBACK if (!ps->fallback_used) return save_kp_result(ps, pr, max_res, chk_fds_res); else #endif /* ERTS_POLL_USE_FALLBACK */ { #if ERTS_POLL_USE_POLL /* --- poll -------------------------------- */ int res = 0; #if ERTS_POLL_USE_WAKEUP_PIPE && !ERTS_POLL_USE_FALLBACK int wake_fd = ps->wake_fds[0]; #endif int i, first_ix, end_ix; /* * In order to be somewhat fair, we continue on the poll_fds * index where we stopped last time. */ first_ix = i = ((ps->next_poll_fds_ix < ps->no_poll_fds) ? ps->next_poll_fds_ix : 0); end_ix = ps->no_poll_fds; while (1) { while (i < end_ix && res < max_res) { if (ps->poll_fds[i].revents != (short) 0) { int fd = ps->poll_fds[i].fd; ErtsPollEvents revents; #if ERTS_POLL_USE_FALLBACK if (fd == ps->kp_fd) { res += get_kp_results(ps, &pr[res], max_res-res); i++; continue; } #elif ERTS_POLL_USE_WAKEUP_PIPE if (fd == wake_fd) { cleanup_wakeup_pipe(ps); i++; continue; } #endif revents = pollev2ev(ps->poll_fds[i].revents); pr[res].fd = fd; pr[res].events = revents; res++; } i++; } if (res == max_res || i == first_ix) break; ASSERT(i == ps->no_poll_fds); i = 0; end_ix = first_ix; } ps->next_poll_fds_ix = i; return res; #elif ERTS_POLL_USE_SELECT /* --- select ------------------------------ */ int res = 0; #if ERTS_POLL_USE_WAKEUP_PIPE && !ERTS_POLL_USE_FALLBACK int wake_fd = ps->wake_fds[0]; #endif int fd, first_fd, end_fd; /* * In order to be fair, we continue on the fd where we stopped * last time. */ first_fd = fd = ps->next_sel_fd <= ps->max_fd ? ps->next_sel_fd : 0; end_fd = ps->max_fd + 1; if (!ebadf) { while (1) { while (fd < end_fd && res < max_res) { pr[res].events = (ErtsPollEvents) 0; if (FDS_ISSET(fd, ps->res_input_fds)) { #if ERTS_POLL_USE_FALLBACK if (fd == ps->kp_fd) { res += get_kp_results(ps, &pr[res], max_res-res); fd++; continue; } #elif ERTS_POLL_USE_WAKEUP_PIPE if (fd == wake_fd) { cleanup_wakeup_pipe(ps); fd++; continue; } #endif pr[res].events |= ERTS_POLL_EV_IN; } if (FDS_ISSET(fd, ps->res_output_fds)) pr[res].events |= ERTS_POLL_EV_OUT; if (pr[res].events) { pr[res].fd = fd; res++; } fd++; } if (res == max_res || fd == first_fd) break; ASSERT(fd == ps->max_fd + 1); fd = 0; end_fd = first_fd; } } else { /* * Bad file descriptors in poll set. * * This only happens when running poorly written * drivers. This code could be optimized, but we * don't bother since it should never happen... */ while (1) { while (fd < end_fd && res < max_res) { if (ps->fds_status[fd].events) { int sres; fd_set *iset = NULL; fd_set *oset = NULL; if (ps->fds_status[fd].events & ERTS_POLL_EV_IN) { #ifdef _DARWIN_UNLIMITED_SELECT iset = ps->res_input_fds; memset((void *) iset, 0, ps->select_fds_len); #else iset = &ps->res_input_fds; FD_ZERO(iset); #endif FD_SET(fd, iset); } if (ps->fds_status[fd].events & ERTS_POLL_EV_OUT) { #ifdef _DARWIN_UNLIMITED_SELECT oset = ps->res_output_fds; memset((char *)oset, 0, ps->select_fds_len); #else oset = &ps->res_output_fds; FD_ZERO(oset); #endif FD_SET(fd, oset); } do { /* Initiate 'tv' each time; select() may modify it */ SysTimeval tv = {0, 0}; sres = select(ps->max_fd+1, iset, oset, NULL, &tv); } while (sres < 0 && errno == EINTR); if (sres < 0) { #if ERTS_POLL_USE_FALLBACK if (fd == ps->kp_fd) { res += get_kp_results(ps, &pr[res], max_res-res); fd++; continue; } #elif ERTS_POLL_USE_WAKEUP_PIPE if (fd == wake_fd) { cleanup_wakeup_pipe(ps); fd++; continue; } #endif pr[res].fd = fd; pr[res].events = ERTS_POLL_EV_NVAL; res++; } else if (sres > 0) { pr[res].fd = fd; if (iset && FD_ISSET(fd, iset)) { #if ERTS_POLL_USE_FALLBACK if (fd == ps->kp_fd) { res += get_kp_results(ps, &pr[res], max_res-res); fd++; continue; } #elif ERTS_POLL_USE_WAKEUP_PIPE if (fd == wake_fd) { cleanup_wakeup_pipe(ps); fd++; continue; } #endif pr[res].events |= ERTS_POLL_EV_IN; } if (oset && FD_ISSET(fd, oset)) { pr[res].events |= ERTS_POLL_EV_OUT; } ASSERT(pr[res].events); res++; } } fd++; } if (res == max_res || fd == first_fd) break; ASSERT(fd == ps->max_fd + 1); fd = 0; end_fd = first_fd; } } ps->next_sel_fd = fd; return res; #endif } } static ERTS_INLINE int check_fd_events(ErtsPollSet ps, SysTimeval *tv, int max_res) { int res; if (erts_smp_atomic_read_nob(&ps->no_of_user_fds) == 0 && tv->tv_usec == 0 && tv->tv_sec == 0) { /* Nothing to poll and zero timeout; done... */ return 0; } else { long timeout = tv->tv_sec*1000 + tv->tv_usec/1000; if (timeout > ERTS_AINT32_T_MAX) timeout = ERTS_AINT32_T_MAX; ASSERT(timeout >= 0); erts_smp_atomic32_set_relb(&ps->timeout, (erts_aint32_t) timeout); #if ERTS_POLL_USE_FALLBACK if (!(ps->fallback_used = ERTS_POLL_NEED_FALLBACK(ps))) { #if ERTS_POLL_USE_EPOLL /* --- epoll ------------------------------- */ if (timeout > INT_MAX) timeout = INT_MAX; if (max_res > ps->res_events_len) grow_res_events(ps, max_res); #ifdef ERTS_SMP if (timeout) erts_thr_progress_prepare_wait(NULL); #endif res = epoll_wait(ps->kp_fd, ps->res_events, max_res, (int)timeout); #elif ERTS_POLL_USE_KQUEUE /* --- kqueue ------------------------------ */ struct timespec ts; if (max_res > ps->res_events_len) grow_res_events(ps, max_res); #ifdef ERTS_SMP if (timeout) erts_thr_progress_prepare_wait(NULL); #endif ts.tv_sec = tv->tv_sec; ts.tv_nsec = tv->tv_usec*1000; res = kevent(ps->kp_fd, NULL, 0, ps->res_events, max_res, &ts); #endif /* ----------------------------------------- */ } else /* use fallback (i.e. poll() or select()) */ #endif /* ERTS_POLL_USE_FALLBACK */ { #if ERTS_POLL_USE_DEVPOLL /* --- devpoll ----------------------------- */ /* * The ioctl() will fail with EINVAL on Solaris 10 if dp_nfds * is set too high. dp_nfds should not be set greater than * the maximum number of file descriptors in the poll set. */ struct dvpoll poll_res; int nfds = (int) erts_smp_atomic_read_nob(&ps->no_of_user_fds); #if ERTS_POLL_USE_WAKEUP_PIPE nfds++; /* Wakeup pipe */ #endif if (timeout > INT_MAX) timeout = INT_MAX; poll_res.dp_nfds = nfds < max_res ? nfds : max_res; if (poll_res.dp_nfds > ps->res_events_len) grow_res_events(ps, poll_res.dp_nfds); poll_res.dp_fds = ps->res_events; #ifdef ERTS_SMP if (timeout) erts_thr_progress_prepare_wait(NULL); #endif poll_res.dp_timeout = (int) timeout; res = ioctl(ps->kp_fd, DP_POLL, &poll_res); #elif ERTS_POLL_USE_POLL /* --- poll -------------------------------- */ if (timeout > INT_MAX) timeout = INT_MAX; #ifdef ERTS_SMP if (timeout) erts_thr_progress_prepare_wait(NULL); #endif res = poll(ps->poll_fds, ps->no_poll_fds, (int) timeout); #elif ERTS_POLL_USE_SELECT /* --- select ------------------------------ */ SysTimeval to = *tv; #ifdef _DARWIN_UNLIMITED_SELECT if (ps->resize_res_sets) { ps->res_input_fds = ps->res_input_fds ? erts_realloc(ERTS_ALC_T_SELECT_FDS, ps->res_input_fds, ps->select_fds_len) : erts_alloc(ERTS_ALC_T_SELECT_FDS, ps->select_fds_len); ps->res_output_fds = ps->res_output_fds ? erts_realloc(ERTS_ALC_T_SELECT_FDS, ps->res_output_fds, ps->select_fds_len) : erts_alloc(ERTS_ALC_T_SELECT_FDS, ps->select_fds_len); ps->resize_res_sets = 0; } memcpy(ps->res_input_fds, ps->input_fds, ps->select_fds_len); memcpy(ps->res_output_fds, ps->output_fds, ps->select_fds_len); #else ps->res_input_fds = ps->input_fds; ps->res_output_fds = ps->output_fds; #endif #ifdef ERTS_SMP if (to.tv_sec || to.tv_usec) erts_thr_progress_prepare_wait(NULL); #endif res = select(ps->max_fd + 1, #ifdef _DARWIN_UNLIMITED_SELECT ps->res_input_fds, ps->res_output_fds, #else &ps->res_input_fds, &ps->res_output_fds, #endif NULL, &to); #ifdef ERTS_SMP if (to.tv_sec || to.tv_usec) erts_thr_progress_finalize_wait(NULL); if (res < 0 && errno == EBADF && ERTS_POLLSET_HAVE_UPDATE_REQUESTS(ps)) { /* * This may have happened because another thread deselected * a fd in our poll set and then closed it, i.e. the driver * behaved correctly. We wan't to avoid looking for a bad * fd, that may even not exist anymore. Therefore, handle * update requests and try again. * * We don't know how much of the timeout is left; therfore, * we use a zero timeout. If no error occur and no events * have triggered, we fake an EAGAIN error and let the caller * restart us. */ to.tv_sec = 0; to.tv_usec = 0; ERTS_POLLSET_LOCK(ps); handle_update_requests(ps); ERTS_POLLSET_UNLOCK(ps); res = select(ps->max_fd + 1, #ifdef _DARWIN_UNLIMITED_SELECT ps->res_input_fds, ps->res_output_fds, #else &ps->res_input_fds, &ps->res_output_fds, #endif NULL, &to); if (res == 0) { errno = EAGAIN; res = -1; } } #endif /* ERTS_SMP */ return res; #endif /* ----------------------------------------- */ } #ifdef ERTS_SMP if (timeout) erts_thr_progress_finalize_wait(NULL); #endif return res; } } int ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps, ErtsPollResFd pr[], int *len, SysTimeval *utvp) { int res, no_fds; int ebadf = 0; #ifdef ERTS_SMP int ps_locked = 0; #endif SysTimeval *tvp; SysTimeval itv; no_fds = *len; #ifdef ERTS_POLL_MAX_RES if (no_fds >= ERTS_POLL_MAX_RES) no_fds = ERTS_POLL_MAX_RES; #endif *len = 0; ASSERT(utvp); tvp = utvp; #ifdef ERTS_POLL_DEBUG_PRINT erts_printf("Entering erts_poll_wait(), timeout=%d\n", (int) tv->tv_sec*1000 + tv->tv_usec/1000); #endif if (ERTS_POLLSET_SET_POLLED_CHK(ps)) { res = EINVAL; /* Another thread is in erts_poll_wait() on this pollset... */ goto done; } if (is_woken(ps)) { /* Use zero timeout */ itv.tv_sec = 0; itv.tv_usec = 0; tvp = &itv; } #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE if (ERTS_POLLSET_HAVE_UPDATE_REQUESTS(ps)) { ERTS_POLLSET_LOCK(ps); handle_update_requests(ps); ERTS_POLLSET_UNLOCK(ps); } #endif res = check_fd_events(ps, tvp, no_fds); woke_up(ps); if (res == 0) { res = ETIMEDOUT; } else if (res < 0) { #if ERTS_POLL_USE_SELECT if (errno == EBADF) { ebadf = 1; goto save_results; } #endif res = errno; } else { #if ERTS_POLL_USE_SELECT save_results: #endif #ifdef ERTS_SMP ps_locked = 1; ERTS_POLLSET_LOCK(ps); #endif no_fds = save_poll_result(ps, pr, no_fds, res, ebadf); #ifdef HARD_DEBUG check_poll_result(pr, no_fds); #endif res = (no_fds == 0 ? (is_interrupted_reset(ps) ? EINTR : EAGAIN) : 0); *len = no_fds; } #ifdef ERTS_SMP if (ps_locked) ERTS_POLLSET_UNLOCK(ps); ERTS_POLLSET_UNSET_POLLED(ps); #endif done: erts_smp_atomic32_set_relb(&ps->timeout, ERTS_AINT32_T_MAX); #ifdef ERTS_POLL_DEBUG_PRINT erts_printf("Leaving %s = erts_poll_wait()\n", res == 0 ? "0" : erl_errno_id(res)); #endif return res; } /* * --- Interrupt a thread doing erts_poll_wait() ----------------------------- */ void ERTS_POLL_EXPORT(erts_poll_interrupt)(ErtsPollSet ps, int set) { #if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT if (!set) reset_wakeup_state(ps); else wake_poller(ps, 1, 0); #endif } #if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT void ERTS_POLL_EXPORT(erts_poll_async_sig_interrupt)(ErtsPollSet ps) { /* * NOTE: This function is called from signal handlers, it, * therefore, it has to be async-signal safe. */ wake_poller(ps, 1, 1); } #endif /* * erts_poll_interrupt_timed(): * If 'set' != 0, interrupt thread blocked in erts_poll_wait() if it * is not guaranteed that it will timeout before 'msec' milli seconds. */ void ERTS_POLL_EXPORT(erts_poll_interrupt_timed)(ErtsPollSet ps, int set, erts_short_time_t msec) { #if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(ERTS_SMP) if (!set) reset_wakeup_state(ps); else { if (erts_smp_atomic32_read_acqb(&ps->timeout) > (erts_aint32_t) msec) wake_poller(ps, 1, 0); #ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS else { if (ERTS_POLLSET_IS_POLLED(ps)) erts_smp_atomic_inc_nob(&ps->no_avoided_wakeups); erts_smp_atomic_inc_nob(&ps->no_avoided_interrupts); } erts_smp_atomic_inc_nob(&ps->no_interrupt_timed); #endif } #endif } int ERTS_POLL_EXPORT(erts_poll_max_fds)(void) { return max_fds; } /* * --- Initialization -------------------------------------------------------- */ void ERTS_POLL_EXPORT(erts_poll_init)(void) { erts_smp_spinlock_init(&pollsets_lock, "pollsets_lock"); pollsets = NULL; errno = 0; #if !defined(NO_SYSCONF) max_fds = sysconf(_SC_OPEN_MAX); #elif ERTS_POLL_USE_SELECT max_fds = NOFILE; #else max_fds = OPEN_MAX; #endif #if ERTS_POLL_USE_SELECT && defined(FD_SETSIZE) && \ !defined(_DARWIN_UNLIMITED_SELECT) if (max_fds > FD_SETSIZE) max_fds = FD_SETSIZE; #endif if (max_fds < 0) fatal_error("erts_poll_init(): Failed to get max number of files: %s\n", erl_errno_id(errno)); #ifdef ERTS_POLL_DEBUG_PRINT print_misc_debug_info(); #endif } ErtsPollSet ERTS_POLL_EXPORT(erts_poll_create_pollset)(void) { #if ERTS_POLL_USE_KERNEL_POLL int kp_fd; #endif ErtsPollSet ps = erts_alloc(ERTS_ALC_T_POLLSET, sizeof(struct ErtsPollSet_)); ps->internal_fd_limit = 0; ps->fds_status = NULL; ps->fds_status_len = 0; erts_smp_atomic_init_nob(&ps->no_of_user_fds, 0); #if ERTS_POLL_USE_KERNEL_POLL ps->kp_fd = -1; #if ERTS_POLL_USE_EPOLL kp_fd = epoll_create(256); ps->res_events_len = 0; ps->res_events = NULL; #elif ERTS_POLL_USE_DEVPOLL kp_fd = open("/dev/poll", O_RDWR); ps->res_events_len = 0; ps->res_events = NULL; #elif ERTS_POLL_USE_KQUEUE kp_fd = kqueue(); ps->res_events_len = 0; ps->res_events = NULL; #endif if (kp_fd < 0) fatal_error("erts_poll_create_pollset(): Failed to " #if ERTS_POLL_USE_EPOLL "create epoll set" #elif ERTS_POLL_USE_DEVPOLL "to open /dev/poll" #elif ERTS_POLL_USE_KQUEUE "create kqueue" #endif ": %s (%d)\n", erl_errno_id(errno), errno); #endif /* ERTS_POLL_USE_KERNEL_POLL */ #if ERTS_POLL_USE_BATCH_UPDATE_POLLSET /* res_events is also used as write buffer */ grow_res_events(ps, ERTS_POLL_MIN_BATCH_BUF_SIZE); #endif #if ERTS_POLL_USE_POLL ps->next_poll_fds_ix = 0; ps->no_poll_fds = 0; ps->poll_fds_len = 0; ps->poll_fds = NULL; #elif ERTS_POLL_USE_SELECT ps->next_sel_fd = 0; ps->max_fd = -1; #if ERTS_POLL_USE_FALLBACK ps->no_select_fds = 0; #endif #ifdef _DARWIN_UNLIMITED_SELECT ps->select_fds_len = 0; ps->resize_res_sets = 0; ps->input_fds = NULL; ps->res_input_fds = NULL; ps->output_fds = NULL; ps->res_output_fds = NULL; #else FD_ZERO(&ps->input_fds); FD_ZERO(&ps->res_input_fds); FD_ZERO(&ps->output_fds); FD_ZERO(&ps->res_output_fds); #endif #endif #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE ps->update_requests.next = NULL; ps->update_requests.len = 0; ps->curr_upd_req_block = &ps->update_requests; erts_smp_atomic32_init_nob(&ps->have_update_requests, 0); #endif #ifdef ERTS_SMP erts_atomic32_init_nob(&ps->polled, 0); erts_smp_mtx_init(&ps->mtx, "pollset"); #endif #if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT erts_atomic32_init_nob(&ps->wakeup_state, (erts_aint32_t) 0); #endif #if ERTS_POLL_USE_WAKEUP_PIPE create_wakeup_pipe(ps); #endif #if ERTS_POLL_USE_FALLBACK if (kp_fd >= ps->fds_status_len) grow_fds_status(ps, kp_fd); /* Force kernel poll fd into fallback (poll/select) set */ ps->fds_status[kp_fd].flags |= ERTS_POLL_FD_FLG_INFLBCK|ERTS_POLL_FD_FLG_USEFLBCK; { int do_wake = 0; ERTS_POLL_EXPORT(erts_poll_control)(ps, kp_fd, ERTS_POLL_EV_IN, 1, &do_wake); } #endif #if ERTS_POLL_USE_KERNEL_POLL if (ps->internal_fd_limit <= kp_fd) ps->internal_fd_limit = kp_fd + 1; ps->kp_fd = kp_fd; #endif erts_smp_atomic32_init_nob(&ps->timeout, ERTS_AINT32_T_MAX); #ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS erts_smp_atomic_init_nob(&ps->no_avoided_wakeups, 0); erts_smp_atomic_init_nob(&ps->no_avoided_interrupts, 0); erts_smp_atomic_init_nob(&ps->no_interrupt_timed, 0); #endif #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE handle_update_requests(ps); #endif #if ERTS_POLL_USE_FALLBACK ps->fallback_used = 0; #endif erts_smp_atomic_set_nob(&ps->no_of_user_fds, 0); /* Don't count wakeup pipe and fallback fd */ erts_smp_spin_lock(&pollsets_lock); ps->next = pollsets; pollsets = ps; erts_smp_spin_unlock(&pollsets_lock); return ps; } void ERTS_POLL_EXPORT(erts_poll_destroy_pollset)(ErtsPollSet ps) { if (ps->fds_status) erts_free(ERTS_ALC_T_FD_STATUS, (void *) ps->fds_status); #if ERTS_POLL_USE_EPOLL if (ps->kp_fd >= 0) close(ps->kp_fd); if (ps->res_events) erts_free(ERTS_ALC_T_POLL_RES_EVS, (void *) ps->res_events); #elif ERTS_POLL_USE_DEVPOLL if (ps->kp_fd >= 0) close(ps->kp_fd); if (ps->res_events) erts_free(ERTS_ALC_T_POLL_RES_EVS, (void *) ps->res_events); #elif ERTS_POLL_USE_POLL if (ps->poll_fds) erts_free(ERTS_ALC_T_POLL_FDS, (void *) ps->poll_fds); #elif ERTS_POLL_USE_SELECT #ifdef _DARWIN_UNLIMITED_SELECT if (ps->input_fds) erts_free(ERTS_ALC_T_SELECT_FDS, (void *) ps->input_fds); if (ps->res_input_fds) erts_free(ERTS_ALC_T_SELECT_FDS, (void *) ps->res_input_fds); if (ps->output_fds) erts_free(ERTS_ALC_T_SELECT_FDS, (void *) ps->output_fds); if (ps->res_output_fds) erts_free(ERTS_ALC_T_SELECT_FDS, (void *) ps->res_output_fds); #endif #endif #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE { ErtsPollSetUpdateRequestsBlock *urqbp = ps->update_requests.next; while (urqbp) { ErtsPollSetUpdateRequestsBlock *free_urqbp = urqbp; urqbp = urqbp->next; free_update_requests_block(ps, free_urqbp); } } #endif #ifdef ERTS_SMP erts_smp_mtx_destroy(&ps->mtx); #endif #if ERTS_POLL_USE_WAKEUP_PIPE if (ps->wake_fds[0] >= 0) close(ps->wake_fds[0]); if (ps->wake_fds[1] >= 0) close(ps->wake_fds[1]); #endif erts_smp_spin_lock(&pollsets_lock); if (ps == pollsets) pollsets = pollsets->next; else { ErtsPollSet prev_ps; for (prev_ps = pollsets; ps != prev_ps->next; prev_ps = prev_ps->next); ASSERT(ps == prev_ps->next); prev_ps->next = ps->next; } erts_smp_spin_unlock(&pollsets_lock); erts_free(ERTS_ALC_T_POLLSET, (void *) ps); } /* * --- Info ------------------------------------------------------------------ */ void ERTS_POLL_EXPORT(erts_poll_info)(ErtsPollSet ps, ErtsPollInfo *pip) { #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE int pending_updates; #endif Uint size = 0; ERTS_POLLSET_LOCK(ps); size += sizeof(struct ErtsPollSet_); size += ps->fds_status_len*sizeof(ErtsFdStatus); #if ERTS_POLL_USE_EPOLL size += ps->res_events_len*sizeof(struct epoll_event); #elif ERTS_POLL_USE_DEVPOLL size += ps->res_events_len*sizeof(struct pollfd); #elif ERTS_POLL_USE_KQUEUE size += ps->res_events_len*sizeof(struct kevent); #endif #if ERTS_POLL_USE_POLL size += ps->poll_fds_len*sizeof(struct pollfd); #elif ERTS_POLL_USE_SELECT #ifdef _DARWIN_UNLIMITED_SELECT size += ps->select_fds_len*4; #endif #endif #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE { ErtsPollSetUpdateRequestsBlock *urqbp = ps->update_requests.next; pending_updates = ps->update_requests.len; while (urqbp) { size += sizeof(ErtsPollSetUpdateRequestsBlock); pending_updates += urqbp->len; urqbp = urqbp->next; } } #endif pip->primary = #if ERTS_POLL_USE_KQUEUE "kqueue" #elif ERTS_POLL_USE_EPOLL "epoll" #elif ERTS_POLL_USE_DEVPOLL "/dev/poll" #elif ERTS_POLL_USE_POLL "poll" #elif ERTS_POLL_USE_SELECT "select" #endif ; pip->fallback = #if !ERTS_POLL_USE_FALLBACK NULL #elif ERTS_POLL_USE_POLL "poll" #elif ERTS_POLL_USE_SELECT "select" #endif ; pip->kernel_poll = #if !ERTS_POLL_USE_KERNEL_POLL NULL #elif ERTS_POLL_USE_KQUEUE "kqueue" #elif ERTS_POLL_USE_EPOLL "epoll" #elif ERTS_POLL_USE_DEVPOLL "/dev/poll" #endif ; pip->memory_size = size; pip->poll_set_size = (int) erts_smp_atomic_read_nob(&ps->no_of_user_fds); #if ERTS_POLL_USE_WAKEUP_PIPE pip->poll_set_size++; /* Wakeup pipe */ #endif pip->fallback_poll_set_size = #if !ERTS_POLL_USE_FALLBACK 0 #elif ERTS_POLL_USE_POLL ps->no_poll_fds #elif ERTS_POLL_USE_SELECT ps->no_select_fds #endif ; #if ERTS_POLL_USE_FALLBACK /* If only kp_fd is in fallback poll set we don't use fallback... */ if (pip->fallback_poll_set_size == 1) pip->fallback_poll_set_size = 0; else pip->poll_set_size++; /* kp_fd */ #endif pip->lazy_updates = #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE 1 #else 0 #endif ; pip->pending_updates = #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE pending_updates #else 0 #endif ; pip->batch_updates = #if ERTS_POLL_USE_BATCH_UPDATE_POLLSET 1 #else 0 #endif ; pip->concurrent_updates = #if ERTS_POLL_USE_CONCURRENT_UPDATE 1 #else 0 #endif ; pip->max_fds = max_fds; #ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS pip->no_avoided_wakeups = erts_smp_atomic_read_nob(&ps->no_avoided_wakeups); pip->no_avoided_interrupts = erts_smp_atomic_read_nob(&ps->no_avoided_interrupts); pip->no_interrupt_timed = erts_smp_atomic_read_nob(&ps->no_interrupt_timed); #endif ERTS_POLLSET_UNLOCK(ps); } /* * Fatal error... */ #ifndef ERTS_GOT_SIGUSR1 # define ERTS_GOT_SIGUSR1 0 #endif static void fatal_error(char *format, ...) { va_list ap; if (ERTS_SOMEONE_IS_CRASH_DUMPING || ERTS_GOT_SIGUSR1) { /* * Crash dump writing and reception of sigusr1 (which will * result in a crash dump) closes all file descriptors. This * typically results in a fatal error for erts_poll() (wakeup * pipes and kernel poll fds are closed). * * We ignore the error and let the crash dump writing continue... */ return; } va_start(ap, format); erts_vfprintf(stderr, format, ap); va_end(ap); abort(); } static void fatal_error_async_signal_safe(char *error_str) { if (ERTS_SOMEONE_IS_CRASH_DUMPING || ERTS_GOT_SIGUSR1) { /* See comment above in fatal_error() */ return; } if (error_str) { int len = 0; while (error_str[len]) len++; if (len) { /* async signal safe */ erts_silence_warn_unused_result(write(2, error_str, len)); } } abort(); } /* * --- Debug ----------------------------------------------------------------- */ void ERTS_POLL_EXPORT(erts_poll_get_selected_events)(ErtsPollSet ps, ErtsPollEvents ev[], int len) { int fd; ERTS_POLLSET_LOCK(ps); for (fd = 0; fd < len; fd++) { if (fd >= ps->fds_status_len) ev[fd] = 0; else { ev[fd] = ps->fds_status[fd].events; #if ERTS_POLL_USE_WAKEUP_PIPE if (fd == ps->wake_fds[0] || fd == ps->wake_fds[1]) ev[fd] |= ERTS_POLL_EV_NVAL; #endif #if ERTS_POLL_USE_KERNEL_POLL if (fd == ps->kp_fd) ev[fd] |= ERTS_POLL_EV_NVAL; #endif } } ERTS_POLLSET_UNLOCK(ps); } #ifdef HARD_DEBUG static void check_poll_result(ErtsPollResFd pr[], int len) { int i, j; for (i = 0; i < len; i++) { ASSERT(pr[i].fd >= 0); ASSERT(pr[i].fd < max_fds); for (j = 0; j < len; j++) { ASSERT(i == j || pr[i].fd != pr[j].fd); } } } #if ERTS_POLL_USE_DEVPOLL static void check_poll_status(ErtsPollSet ps) { int i; for (i = 0; i < ps->fds_status_len; i++) { int ires; struct pollfd dp_fd; short events = ERTS_POLL_EV_E2N(ps->fds_status[i].events); dp_fd.fd = i; dp_fd.events = (short) 0; dp_fd.revents = (short) 0; ires = ioctl(ps->kp_fd, DP_ISPOLLED, &dp_fd); if (ires == 0) { ASSERT(!events); } else if (ires == 1) { ASSERT(events); ASSERT(events == dp_fd.revents); } else { ASSERT(0); } ASSERT(dp_fd.fd == i); ASSERT(ps->fds_status[i].events == ps->fds_status[i].used_events); } } #endif /* ERTS_POLL_USE_DEVPOLL */ #endif /* HARD_DEBUG */ #ifdef ERTS_POLL_DEBUG_PRINT static void print_misc_debug_info(void) { erts_printf("erts_poll using: %s lazy_updates:%s batch_updates:%s\n", #if ERTS_POLL_USE_KQUEUE "kqueue" #elif ERTS_POLL_USE_EPOLL "epoll" #elif ERTS_POLL_USE_DEVPOLL "/dev/poll" #endif #if ERTS_POLL_USE_FALLBACK "-" #endif #if ERTS_POLL_USE_POLL "poll" #elif ERTS_POLL_USE_SELECT "select" #endif , #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE "true" #else "false" #endif , #if ERTS_POLL_USE_BATCH_UPDATE_POLLSET "true" #else "false" #endif ); erts_printf("ERTS_POLL_EV_IN=0x%x\n" "ERTS_POLL_EV_OUT=0x%x\n" "ERTS_POLL_EV_NVAL=0x%x\n" "ERTS_POLL_EV_ERR=0x%x\n", ERTS_POLL_EV_IN, ERTS_POLL_EV_OUT, ERTS_POLL_EV_NVAL, ERTS_POLL_EV_ERR); #ifdef FD_SETSIZE erts_printf("FD_SETSIZE=%d\n", FD_SETSIZE); #endif } #endif