From 9cc0332d42f58b69fbd10123e56c9e246ec4023b Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Tue, 11 Jan 2011 00:09:05 +0100 Subject: Simplify erts_poll_wait() wakeup logic --- erts/emulator/sys/common/erl_poll.c | 284 ++++++++++++++++-------------------- 1 file changed, 128 insertions(+), 156 deletions(-) (limited to 'erts/emulator/sys/common/erl_poll.c') diff --git a/erts/emulator/sys/common/erl_poll.c b/erts/emulator/sys/common/erl_poll.c index 4d0ca97889..77ac2de5f6 100644 --- a/erts/emulator/sys/common/erl_poll.c +++ b/erts/emulator/sys/common/erl_poll.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 2006-2010. All Rights Reserved. + * Copyright Ericsson AB 2006-2011. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in @@ -124,25 +124,11 @@ erts_smp_mtx_unlock(&(PS)->mtx) #define ERTS_POLLSET_SET_POLLED_CHK(PS) \ - ((int) erts_smp_atomic_xchg(&(PS)->polled, (erts_aint_t) 1)) + ((int) erts_atomic32_xchg(&(PS)->polled, (erts_aint32_t) 1)) #define ERTS_POLLSET_UNSET_POLLED(PS) \ - erts_smp_atomic_set(&(PS)->polled, (erts_aint_t) 0) + erts_atomic32_set(&(PS)->polled, (erts_aint32_t) 0) #define ERTS_POLLSET_IS_POLLED(PS) \ - ((int) erts_smp_atomic_read(&(PS)->polled)) - -#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) set_poller_woken_chk((PS)) -#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) \ -do { \ - ERTS_THR_MEMORY_BARRIER; \ - erts_smp_atomic_set(&(PS)->woken, (erts_aint_t) 1); \ -} while (0) -#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) \ -do { \ - erts_smp_atomic_set(&(PS)->woken, (erts_aint_t) 0); \ - ERTS_THR_MEMORY_BARRIER; \ -} while (0) -#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) \ - ((int) erts_smp_atomic_read(&(PS)->woken)) + ((int) erts_atomic32_read(&(PS)->polled)) #else @@ -152,69 +138,21 @@ do { \ #define ERTS_POLLSET_UNSET_POLLED(PS) #define ERTS_POLLSET_IS_POLLED(PS) 0 -#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT - -/* - * Ideally, the ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) operation would - * be atomic. This operation isn't, but we will do okay anyway. The - * "woken check" is only an optimization. The only requirement we have: - * If (PS)->woken is set to a value != 0 when interrupting, we have to - * write on the the wakeup pipe at least once. Multiple writes are okay. - */ -#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) ((PS)->woken++) -#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) ((PS)->woken = 1, (void) 0) -#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) ((PS)->woken = 0, (void) 0) -#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) ((PS)->woken) - -#else - -#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) 1 -#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) -#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) -#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) 1 - -#endif - #endif #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE #define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS) \ - erts_smp_atomic_set(&(PS)->have_update_requests, (erts_aint_t) 1) + erts_smp_atomic32_set(&(PS)->have_update_requests, (erts_aint32_t) 1) #define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS) \ - erts_smp_atomic_set(&(PS)->have_update_requests, (erts_aint_t) 0) + erts_smp_atomic32_set(&(PS)->have_update_requests, (erts_aint32_t) 0) #define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) \ - ((int) erts_smp_atomic_read(&(PS)->have_update_requests)) + ((int) erts_smp_atomic32_read(&(PS)->have_update_requests)) #else #define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS) #define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS) #define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) 0 #endif -#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP) - -#define ERTS_POLLSET_UNSET_INTERRUPTED_CHK(PS) unset_interrupted_chk((PS)) -#define ERTS_POLLSET_UNSET_INTERRUPTED(PS) ((PS)->interrupt = 0, (void) 0) -#define ERTS_POLLSET_SET_INTERRUPTED(PS) ((PS)->interrupt = 1, (void) 0) -#define ERTS_POLLSET_IS_INTERRUPTED(PS) ((PS)->interrupt) - -#else - -#define ERTS_POLLSET_UNSET_INTERRUPTED_CHK(PS) unset_interrupted_chk((PS)) -#define ERTS_POLLSET_UNSET_INTERRUPTED(PS) \ -do { \ - erts_smp_atomic_set(&(PS)->interrupt, (erts_aint_t) 0); \ - ERTS_THR_MEMORY_BARRIER; \ -} while (0) -#define ERTS_POLLSET_SET_INTERRUPTED(PS) \ -do { \ - ERTS_THR_MEMORY_BARRIER; \ - erts_smp_atomic_set(&(PS)->interrupt, (erts_aint_t) 1); \ -} while (0) -#define ERTS_POLLSET_IS_INTERRUPTED(PS) \ - ((int) erts_smp_atomic_read(&(PS)->interrupt)) - -#endif - #if ERTS_POLL_USE_FALLBACK # if ERTS_POLL_USE_POLL # define ERTS_POLL_NEED_FALLBACK(PS) ((PS)->no_poll_fds > 1) @@ -318,14 +256,12 @@ struct ErtsPollSet_ { #if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE ErtsPollSetUpdateRequestsBlock update_requests; ErtsPollSetUpdateRequestsBlock *curr_upd_req_block; - erts_smp_atomic_t have_update_requests; + erts_smp_atomic32_t have_update_requests; #endif #ifdef ERTS_SMP - erts_smp_atomic_t polled; - erts_smp_atomic_t woken; + erts_atomic32_t polled; erts_smp_mtx_t mtx; #elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT - volatile int woken; #endif #if ERTS_POLL_USE_WAKEUP_PIPE int wake_fds[2]; @@ -333,12 +269,12 @@ struct ErtsPollSet_ { #if ERTS_POLL_USE_FALLBACK int fallback_used; #endif -#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP) - volatile int interrupt; -#else - erts_smp_atomic_t interrupt; +#ifdef ERTS_SMP + erts_atomic32_t wakeup_state; +#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT + volatile int wakeup_state; #endif - erts_smp_atomic_t timeout; + erts_smp_atomic32_t timeout; #ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS erts_smp_atomic_t no_avoided_wakeups; erts_smp_atomic_t no_avoided_interrupts; @@ -346,34 +282,6 @@ struct ErtsPollSet_ { #endif }; -static ERTS_INLINE int -unset_interrupted_chk(ErtsPollSet ps) -{ - int res; -#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP) - /* This operation isn't atomic, but we have no need at all for an - atomic operation here... */ - res = ps->interrupt; - ps->interrupt = 0; -#else - res = (int) erts_smp_atomic_xchg(&ps->interrupt, (erts_aint_t) 0); - ERTS_THR_MEMORY_BARRIER; -#endif - return res; - -} - -#ifdef ERTS_SMP - -static ERTS_INLINE int -set_poller_woken_chk(ErtsPollSet ps) -{ - ERTS_THR_MEMORY_BARRIER; - return (int) erts_smp_atomic_xchg(&ps->woken, (erts_aint_t) 1); -} - -#endif - void erts_silence_warn_unused_result(long unused); static void fatal_error(char *format, ...); static void fatal_error_async_signal_safe(char *error_str); @@ -430,6 +338,63 @@ static void check_poll_status(ErtsPollSet ps); static void print_misc_debug_info(void); #endif +#define ERTS_POLL_NOT_WOKEN 0 +#define ERTS_POLL_WOKEN -1 +#define ERTS_POLL_WOKEN_INTR 1 + +static ERTS_INLINE void +reset_wakeup_state(ErtsPollSet ps) +{ +#ifdef ERTS_SMP + erts_atomic32_set(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN); +#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT + ps->wakeup_state = 0; +#endif +} + +static ERTS_INLINE int +is_woken(ErtsPollSet ps) +{ +#ifdef ERTS_SMP + return erts_atomic32_read_acqb(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN; +#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT + return ps->wakeup_state != ERTS_POLL_NOT_WOKEN; +#else + return 0; +#endif +} + +static ERTS_INLINE int +is_interrupted_reset(ErtsPollSet ps) +{ +#ifdef ERTS_SMP + return (erts_atomic32_xchg(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN) + == ERTS_POLL_WOKEN_INTR); +#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT + int res = ps->wakeup_state == ERTS_POLL_WOKEN_INTR; + ps->wakeup_state = ERTS_POLL_NOT_WOKEN; + return res; +#else + return 0; +#endif +} + +static ERTS_INLINE void +woke_up(ErtsPollSet ps) +{ +#ifdef ERTS_SMP + erts_aint32_t wakeup_state = erts_atomic32_read(&ps->wakeup_state); + if (wakeup_state == ERTS_POLL_NOT_WOKEN) + (void) erts_atomic32_cmpxchg(&ps->wakeup_state, + ERTS_POLL_WOKEN, + ERTS_POLL_NOT_WOKEN); + ASSERT(erts_atomic32_read(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN); +#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT + if (ps->wakeup_state == ERTS_POLL_NOT_WOKEN) + ps->wakeup_state = ERTS_POLL_WOKEN; +#endif +} + /* * --- Wakeup pipe ----------------------------------------------------------- */ @@ -437,14 +402,34 @@ static void print_misc_debug_info(void); #if ERTS_POLL_USE_WAKEUP_PIPE static ERTS_INLINE void -wake_poller(ErtsPollSet ps) +wake_poller(ErtsPollSet ps, int interrupted) { + int wake; +#ifdef ERTS_SMP + erts_aint32_t wakeup_state; + if (!interrupted) + wakeup_state = erts_atomic32_cmpxchg_relb(&ps->wakeup_state, + ERTS_POLL_WOKEN, + ERTS_POLL_NOT_WOKEN); + else { + /* + * We might unnecessarily write to the pipe, however, + * that isn't problematic. + */ + wakeup_state = erts_atomic32_read(&ps->wakeup_state); + erts_atomic32_set_relb(&ps->wakeup_state, ERTS_POLL_WOKEN_INTR); + } + wake = wakeup_state == ERTS_POLL_NOT_WOKEN; +#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT + wake = ps->wakeup_state == ERTS_POLL_NOT_WOKEN; + ps->wakeup_state = interrupted ? ERTS_POLL_WOKEN_INTR : ERTS_POLL_NOT_WOKEN; +#endif /* * NOTE: This function might be called from signal handlers in the * non-smp case; therefore, it has to be async-signal safe in * the non-smp case. */ - if (!ERTS_POLLSET_SET_POLLER_WOKEN_CHK(ps)) { + if (wake) { ssize_t res; if (ps->wake_fds[1] < 0) return; /* Not initialized yet */ @@ -1387,9 +1372,7 @@ handle_update_requests(ErtsPollSet ps) #endif /* ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE */ static ERTS_INLINE ErtsPollEvents -poll_control(ErtsPollSet ps, int fd, ErtsPollEvents events, int on, - int *have_set_have_update_requests, - int *do_wake) +poll_control(ErtsPollSet ps, int fd, ErtsPollEvents events, int on, int *do_wake) { ErtsPollEvents new_events; @@ -1493,7 +1476,6 @@ ERTS_POLL_EXPORT(erts_poll_controlv)(ErtsPollSet ps, int len) { int i; - int hshur = 0; int do_wake; int final_do_wake = 0; @@ -1505,17 +1487,17 @@ ERTS_POLL_EXPORT(erts_poll_controlv)(ErtsPollSet ps, pcev[i].fd, pcev[i].events, pcev[i].on, - &hshur, &do_wake); final_do_wake |= do_wake; } + ERTS_POLLSET_UNLOCK(ps); + #ifdef ERTS_SMP if (final_do_wake) - wake_poller(ps); + wake_poller(ps, 0); #endif /* ERTS_SMP */ - ERTS_POLLSET_UNLOCK(ps); } ErtsPollEvents @@ -1526,20 +1508,20 @@ ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet ps, int* do_wake) /* In: Wake up polling thread */ /* Out: Poller is woken */ { - int hshur = 0; ErtsPollEvents res; ERTS_POLLSET_LOCK(ps); - res = poll_control(ps, fd, events, on, &hshur, do_wake); + res = poll_control(ps, fd, events, on, do_wake); + + ERTS_POLLSET_UNLOCK(ps); #ifdef ERTS_SMP if (*do_wake) { - wake_poller(ps); + wake_poller(ps, 0); } #endif /* ERTS_SMP */ - ERTS_POLLSET_UNLOCK(ps); return res; } @@ -1918,9 +1900,11 @@ check_fd_events(ErtsPollSet ps, SysTimeval *tv, int max_res, int *ps_locked) return 0; } else { - erts_aint_t timeout = tv->tv_sec*1000 + tv->tv_usec/1000; + long timeout = tv->tv_sec*1000 + tv->tv_usec/1000; + if (timeout > ERTS_AINT32_T_MAX) + timeout = ERTS_AINT32_T_MAX; ASSERT(timeout >= 0); - erts_smp_atomic_set(&ps->timeout, timeout); + erts_smp_atomic32_set_relb(&ps->timeout, (erts_aint32_t) timeout); #if ERTS_POLL_USE_FALLBACK if (!(ps->fallback_used = ERTS_POLL_NEED_FALLBACK(ps))) { @@ -2042,15 +2026,14 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps, (int) tv->tv_sec*1000 + tv->tv_usec/1000); #endif - ERTS_POLLSET_UNSET_POLLER_WOKEN(ps); if (ERTS_POLLSET_SET_POLLED_CHK(ps)) { res = EINVAL; /* Another thread is in erts_poll_wait() on this pollset... */ goto done; } - if (ERTS_POLLSET_IS_INTERRUPTED(ps)) { - /* Interrupt use zero timeout */ + if (is_woken(ps)) { + /* Use zero timeout */ itv.tv_sec = 0; itv.tv_usec = 0; tvp = &itv; @@ -2067,7 +2050,7 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps, ps_locked = 0; res = check_fd_events(ps, tvp, no_fds, &ps_locked); - ERTS_POLLSET_SET_POLLER_WOKEN(ps); + woke_up(ps); if (res == 0) { res = ETIMEDOUT; @@ -2099,9 +2082,7 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps, check_poll_result(pr, no_fds); #endif - res = (no_fds == 0 - ? (ERTS_POLLSET_UNSET_INTERRUPTED_CHK(ps) ? EINTR : EAGAIN) - : 0); + res = (no_fds == 0 ? (is_interrupted_reset(ps) ? EINTR : EAGAIN) : 0); *len = no_fds; } @@ -2112,7 +2093,7 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps, #endif done: - erts_smp_atomic_set(&ps->timeout, ERTS_AINT_T_MAX); + erts_smp_atomic32_set_relb(&ps->timeout, ERTS_AINT32_T_MAX); #ifdef ERTS_POLL_DEBUG_PRINT erts_printf("Leaving %s = erts_poll_wait()\n", res == 0 ? "0" : erl_errno_id(res)); @@ -2128,20 +2109,17 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps, void ERTS_POLL_EXPORT(erts_poll_interrupt)(ErtsPollSet ps, int set) { +#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(ERTS_SMP) /* * NOTE: This function might be called from signal handlers in the * non-smp case; therefore, it has to be async-signal safe in * the non-smp case. */ - if (set) { - ERTS_POLLSET_SET_INTERRUPTED(ps); -#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(ERTS_SMP) - wake_poller(ps); + if (!set) + reset_wakeup_state(ps); + else + wake_poller(ps, 1); #endif - } - else { - ERTS_POLLSET_UNSET_INTERRUPTED(ps); - } } /* @@ -2154,13 +2132,12 @@ ERTS_POLL_EXPORT(erts_poll_interrupt_timed)(ErtsPollSet ps, int set, long msec) { - if (set) { - if (erts_smp_atomic_read(&ps->timeout) > (erts_aint_t) msec) { - ERTS_POLLSET_SET_INTERRUPTED(ps); #if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(ERTS_SMP) - wake_poller(ps); -#endif - } + if (!set) + reset_wakeup_state(ps); + else { + if (erts_smp_atomic32_read_acqb(&ps->timeout) > (erts_aint32_t) msec) + wake_poller(ps, 1); #ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS else { if (ERTS_POLLSET_IS_POLLED(ps)) @@ -2170,9 +2147,7 @@ ERTS_POLL_EXPORT(erts_poll_interrupt_timed)(ErtsPollSet ps, erts_smp_atomic_inc(&ps->no_interrupt_timed); #endif } - else { - ERTS_POLLSET_UNSET_INTERRUPTED(ps); - } +#endif } int @@ -2283,14 +2258,16 @@ ERTS_POLL_EXPORT(erts_poll_create_pollset)(void) ps->update_requests.next = NULL; ps->update_requests.len = 0; ps->curr_upd_req_block = &ps->update_requests; - erts_smp_atomic_init(&ps->have_update_requests, 0); + erts_smp_atomic32_init(&ps->have_update_requests, 0); #endif #ifdef ERTS_SMP - erts_smp_atomic_init(&ps->polled, 0); - erts_smp_atomic_init(&ps->woken, 0); + erts_atomic32_init(&ps->polled, 0); erts_smp_mtx_init(&ps->mtx, "pollset"); +#endif +#ifdef ERTS_SMP + erts_atomic32_init(&ps->wakeup_state, (erts_aint32_t) 0); #elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT - ps->woken = 0; + ps->wakeup_state = 0; #endif #if ERTS_POLL_USE_WAKEUP_PIPE create_wakeup_pipe(ps); @@ -2312,12 +2289,7 @@ ERTS_POLL_EXPORT(erts_poll_create_pollset)(void) ps->internal_fd_limit = kp_fd + 1; ps->kp_fd = kp_fd; #endif -#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP) - ps->interrupt = 0; -#else - erts_smp_atomic_init(&ps->interrupt, 0); -#endif - erts_smp_atomic_init(&ps->timeout, ERTS_AINT_T_MAX); + erts_smp_atomic32_init(&ps->timeout, ERTS_AINT32_T_MAX); #ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS erts_smp_atomic_init(&ps->no_avoided_wakeups, 0); erts_smp_atomic_init(&ps->no_avoided_interrupts, 0); -- cgit v1.2.3