aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/sys/common/erl_poll.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2011-01-11 00:09:05 +0100
committerRickard Green <[email protected]>2011-02-25 09:49:06 +0100
commit9cc0332d42f58b69fbd10123e56c9e246ec4023b (patch)
tree0e6c9bd551c902ac6b58f82441173497f84ddd0d /erts/emulator/sys/common/erl_poll.c
parent99372cc4053c2fa7662da2f871c9813fbf45ba7e (diff)
downloadotp-9cc0332d42f58b69fbd10123e56c9e246ec4023b.tar.gz
otp-9cc0332d42f58b69fbd10123e56c9e246ec4023b.tar.bz2
otp-9cc0332d42f58b69fbd10123e56c9e246ec4023b.zip
Simplify erts_poll_wait() wakeup logic
Diffstat (limited to 'erts/emulator/sys/common/erl_poll.c')
-rw-r--r--erts/emulator/sys/common/erl_poll.c284
1 files changed, 128 insertions, 156 deletions
diff --git a/erts/emulator/sys/common/erl_poll.c b/erts/emulator/sys/common/erl_poll.c
index 4d0ca97889..77ac2de5f6 100644
--- a/erts/emulator/sys/common/erl_poll.c
+++ b/erts/emulator/sys/common/erl_poll.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 2006-2010. All Rights Reserved.
+ * Copyright Ericsson AB 2006-2011. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
@@ -124,25 +124,11 @@
erts_smp_mtx_unlock(&(PS)->mtx)
#define ERTS_POLLSET_SET_POLLED_CHK(PS) \
- ((int) erts_smp_atomic_xchg(&(PS)->polled, (erts_aint_t) 1))
+ ((int) erts_atomic32_xchg(&(PS)->polled, (erts_aint32_t) 1))
#define ERTS_POLLSET_UNSET_POLLED(PS) \
- erts_smp_atomic_set(&(PS)->polled, (erts_aint_t) 0)
+ erts_atomic32_set(&(PS)->polled, (erts_aint32_t) 0)
#define ERTS_POLLSET_IS_POLLED(PS) \
- ((int) erts_smp_atomic_read(&(PS)->polled))
-
-#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) set_poller_woken_chk((PS))
-#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) \
-do { \
- ERTS_THR_MEMORY_BARRIER; \
- erts_smp_atomic_set(&(PS)->woken, (erts_aint_t) 1); \
-} while (0)
-#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) \
-do { \
- erts_smp_atomic_set(&(PS)->woken, (erts_aint_t) 0); \
- ERTS_THR_MEMORY_BARRIER; \
-} while (0)
-#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) \
- ((int) erts_smp_atomic_read(&(PS)->woken))
+ ((int) erts_atomic32_read(&(PS)->polled))
#else
@@ -152,69 +138,21 @@ do { \
#define ERTS_POLLSET_UNSET_POLLED(PS)
#define ERTS_POLLSET_IS_POLLED(PS) 0
-#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
-
-/*
- * Ideally, the ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) operation would
- * be atomic. This operation isn't, but we will do okay anyway. The
- * "woken check" is only an optimization. The only requirement we have:
- * If (PS)->woken is set to a value != 0 when interrupting, we have to
- * write on the the wakeup pipe at least once. Multiple writes are okay.
- */
-#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) ((PS)->woken++)
-#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) ((PS)->woken = 1, (void) 0)
-#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) ((PS)->woken = 0, (void) 0)
-#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) ((PS)->woken)
-
-#else
-
-#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) 1
-#define ERTS_POLLSET_SET_POLLER_WOKEN(PS)
-#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS)
-#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) 1
-
-#endif
-
#endif
#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE
#define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS) \
- erts_smp_atomic_set(&(PS)->have_update_requests, (erts_aint_t) 1)
+ erts_smp_atomic32_set(&(PS)->have_update_requests, (erts_aint32_t) 1)
#define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS) \
- erts_smp_atomic_set(&(PS)->have_update_requests, (erts_aint_t) 0)
+ erts_smp_atomic32_set(&(PS)->have_update_requests, (erts_aint32_t) 0)
#define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) \
- ((int) erts_smp_atomic_read(&(PS)->have_update_requests))
+ ((int) erts_smp_atomic32_read(&(PS)->have_update_requests))
#else
#define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS)
#define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS)
#define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) 0
#endif
-#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP)
-
-#define ERTS_POLLSET_UNSET_INTERRUPTED_CHK(PS) unset_interrupted_chk((PS))
-#define ERTS_POLLSET_UNSET_INTERRUPTED(PS) ((PS)->interrupt = 0, (void) 0)
-#define ERTS_POLLSET_SET_INTERRUPTED(PS) ((PS)->interrupt = 1, (void) 0)
-#define ERTS_POLLSET_IS_INTERRUPTED(PS) ((PS)->interrupt)
-
-#else
-
-#define ERTS_POLLSET_UNSET_INTERRUPTED_CHK(PS) unset_interrupted_chk((PS))
-#define ERTS_POLLSET_UNSET_INTERRUPTED(PS) \
-do { \
- erts_smp_atomic_set(&(PS)->interrupt, (erts_aint_t) 0); \
- ERTS_THR_MEMORY_BARRIER; \
-} while (0)
-#define ERTS_POLLSET_SET_INTERRUPTED(PS) \
-do { \
- ERTS_THR_MEMORY_BARRIER; \
- erts_smp_atomic_set(&(PS)->interrupt, (erts_aint_t) 1); \
-} while (0)
-#define ERTS_POLLSET_IS_INTERRUPTED(PS) \
- ((int) erts_smp_atomic_read(&(PS)->interrupt))
-
-#endif
-
#if ERTS_POLL_USE_FALLBACK
# if ERTS_POLL_USE_POLL
# define ERTS_POLL_NEED_FALLBACK(PS) ((PS)->no_poll_fds > 1)
@@ -318,14 +256,12 @@ struct ErtsPollSet_ {
#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE
ErtsPollSetUpdateRequestsBlock update_requests;
ErtsPollSetUpdateRequestsBlock *curr_upd_req_block;
- erts_smp_atomic_t have_update_requests;
+ erts_smp_atomic32_t have_update_requests;
#endif
#ifdef ERTS_SMP
- erts_smp_atomic_t polled;
- erts_smp_atomic_t woken;
+ erts_atomic32_t polled;
erts_smp_mtx_t mtx;
#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
- volatile int woken;
#endif
#if ERTS_POLL_USE_WAKEUP_PIPE
int wake_fds[2];
@@ -333,12 +269,12 @@ struct ErtsPollSet_ {
#if ERTS_POLL_USE_FALLBACK
int fallback_used;
#endif
-#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP)
- volatile int interrupt;
-#else
- erts_smp_atomic_t interrupt;
+#ifdef ERTS_SMP
+ erts_atomic32_t wakeup_state;
+#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
+ volatile int wakeup_state;
#endif
- erts_smp_atomic_t timeout;
+ erts_smp_atomic32_t timeout;
#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS
erts_smp_atomic_t no_avoided_wakeups;
erts_smp_atomic_t no_avoided_interrupts;
@@ -346,34 +282,6 @@ struct ErtsPollSet_ {
#endif
};
-static ERTS_INLINE int
-unset_interrupted_chk(ErtsPollSet ps)
-{
- int res;
-#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP)
- /* This operation isn't atomic, but we have no need at all for an
- atomic operation here... */
- res = ps->interrupt;
- ps->interrupt = 0;
-#else
- res = (int) erts_smp_atomic_xchg(&ps->interrupt, (erts_aint_t) 0);
- ERTS_THR_MEMORY_BARRIER;
-#endif
- return res;
-
-}
-
-#ifdef ERTS_SMP
-
-static ERTS_INLINE int
-set_poller_woken_chk(ErtsPollSet ps)
-{
- ERTS_THR_MEMORY_BARRIER;
- return (int) erts_smp_atomic_xchg(&ps->woken, (erts_aint_t) 1);
-}
-
-#endif
-
void erts_silence_warn_unused_result(long unused);
static void fatal_error(char *format, ...);
static void fatal_error_async_signal_safe(char *error_str);
@@ -430,6 +338,63 @@ static void check_poll_status(ErtsPollSet ps);
static void print_misc_debug_info(void);
#endif
+#define ERTS_POLL_NOT_WOKEN 0
+#define ERTS_POLL_WOKEN -1
+#define ERTS_POLL_WOKEN_INTR 1
+
+static ERTS_INLINE void
+reset_wakeup_state(ErtsPollSet ps)
+{
+#ifdef ERTS_SMP
+ erts_atomic32_set(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN);
+#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
+ ps->wakeup_state = 0;
+#endif
+}
+
+static ERTS_INLINE int
+is_woken(ErtsPollSet ps)
+{
+#ifdef ERTS_SMP
+ return erts_atomic32_read_acqb(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN;
+#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
+ return ps->wakeup_state != ERTS_POLL_NOT_WOKEN;
+#else
+ return 0;
+#endif
+}
+
+static ERTS_INLINE int
+is_interrupted_reset(ErtsPollSet ps)
+{
+#ifdef ERTS_SMP
+ return (erts_atomic32_xchg(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN)
+ == ERTS_POLL_WOKEN_INTR);
+#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
+ int res = ps->wakeup_state == ERTS_POLL_WOKEN_INTR;
+ ps->wakeup_state = ERTS_POLL_NOT_WOKEN;
+ return res;
+#else
+ return 0;
+#endif
+}
+
+static ERTS_INLINE void
+woke_up(ErtsPollSet ps)
+{
+#ifdef ERTS_SMP
+ erts_aint32_t wakeup_state = erts_atomic32_read(&ps->wakeup_state);
+ if (wakeup_state == ERTS_POLL_NOT_WOKEN)
+ (void) erts_atomic32_cmpxchg(&ps->wakeup_state,
+ ERTS_POLL_WOKEN,
+ ERTS_POLL_NOT_WOKEN);
+ ASSERT(erts_atomic32_read(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN);
+#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
+ if (ps->wakeup_state == ERTS_POLL_NOT_WOKEN)
+ ps->wakeup_state = ERTS_POLL_WOKEN;
+#endif
+}
+
/*
* --- Wakeup pipe -----------------------------------------------------------
*/
@@ -437,14 +402,34 @@ static void print_misc_debug_info(void);
#if ERTS_POLL_USE_WAKEUP_PIPE
static ERTS_INLINE void
-wake_poller(ErtsPollSet ps)
+wake_poller(ErtsPollSet ps, int interrupted)
{
+ int wake;
+#ifdef ERTS_SMP
+ erts_aint32_t wakeup_state;
+ if (!interrupted)
+ wakeup_state = erts_atomic32_cmpxchg_relb(&ps->wakeup_state,
+ ERTS_POLL_WOKEN,
+ ERTS_POLL_NOT_WOKEN);
+ else {
+ /*
+ * We might unnecessarily write to the pipe, however,
+ * that isn't problematic.
+ */
+ wakeup_state = erts_atomic32_read(&ps->wakeup_state);
+ erts_atomic32_set_relb(&ps->wakeup_state, ERTS_POLL_WOKEN_INTR);
+ }
+ wake = wakeup_state == ERTS_POLL_NOT_WOKEN;
+#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
+ wake = ps->wakeup_state == ERTS_POLL_NOT_WOKEN;
+ ps->wakeup_state = interrupted ? ERTS_POLL_WOKEN_INTR : ERTS_POLL_NOT_WOKEN;
+#endif
/*
* NOTE: This function might be called from signal handlers in the
* non-smp case; therefore, it has to be async-signal safe in
* the non-smp case.
*/
- if (!ERTS_POLLSET_SET_POLLER_WOKEN_CHK(ps)) {
+ if (wake) {
ssize_t res;
if (ps->wake_fds[1] < 0)
return; /* Not initialized yet */
@@ -1387,9 +1372,7 @@ handle_update_requests(ErtsPollSet ps)
#endif /* ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE */
static ERTS_INLINE ErtsPollEvents
-poll_control(ErtsPollSet ps, int fd, ErtsPollEvents events, int on,
- int *have_set_have_update_requests,
- int *do_wake)
+poll_control(ErtsPollSet ps, int fd, ErtsPollEvents events, int on, int *do_wake)
{
ErtsPollEvents new_events;
@@ -1493,7 +1476,6 @@ ERTS_POLL_EXPORT(erts_poll_controlv)(ErtsPollSet ps,
int len)
{
int i;
- int hshur = 0;
int do_wake;
int final_do_wake = 0;
@@ -1505,17 +1487,17 @@ ERTS_POLL_EXPORT(erts_poll_controlv)(ErtsPollSet ps,
pcev[i].fd,
pcev[i].events,
pcev[i].on,
- &hshur,
&do_wake);
final_do_wake |= do_wake;
}
+ ERTS_POLLSET_UNLOCK(ps);
+
#ifdef ERTS_SMP
if (final_do_wake)
- wake_poller(ps);
+ wake_poller(ps, 0);
#endif /* ERTS_SMP */
- ERTS_POLLSET_UNLOCK(ps);
}
ErtsPollEvents
@@ -1526,20 +1508,20 @@ ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet ps,
int* do_wake) /* In: Wake up polling thread */
/* Out: Poller is woken */
{
- int hshur = 0;
ErtsPollEvents res;
ERTS_POLLSET_LOCK(ps);
- res = poll_control(ps, fd, events, on, &hshur, do_wake);
+ res = poll_control(ps, fd, events, on, do_wake);
+
+ ERTS_POLLSET_UNLOCK(ps);
#ifdef ERTS_SMP
if (*do_wake) {
- wake_poller(ps);
+ wake_poller(ps, 0);
}
#endif /* ERTS_SMP */
- ERTS_POLLSET_UNLOCK(ps);
return res;
}
@@ -1918,9 +1900,11 @@ check_fd_events(ErtsPollSet ps, SysTimeval *tv, int max_res, int *ps_locked)
return 0;
}
else {
- erts_aint_t timeout = tv->tv_sec*1000 + tv->tv_usec/1000;
+ long timeout = tv->tv_sec*1000 + tv->tv_usec/1000;
+ if (timeout > ERTS_AINT32_T_MAX)
+ timeout = ERTS_AINT32_T_MAX;
ASSERT(timeout >= 0);
- erts_smp_atomic_set(&ps->timeout, timeout);
+ erts_smp_atomic32_set_relb(&ps->timeout, (erts_aint32_t) timeout);
#if ERTS_POLL_USE_FALLBACK
if (!(ps->fallback_used = ERTS_POLL_NEED_FALLBACK(ps))) {
@@ -2042,15 +2026,14 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps,
(int) tv->tv_sec*1000 + tv->tv_usec/1000);
#endif
- ERTS_POLLSET_UNSET_POLLER_WOKEN(ps);
if (ERTS_POLLSET_SET_POLLED_CHK(ps)) {
res = EINVAL; /* Another thread is in erts_poll_wait()
on this pollset... */
goto done;
}
- if (ERTS_POLLSET_IS_INTERRUPTED(ps)) {
- /* Interrupt use zero timeout */
+ if (is_woken(ps)) {
+ /* Use zero timeout */
itv.tv_sec = 0;
itv.tv_usec = 0;
tvp = &itv;
@@ -2067,7 +2050,7 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps,
ps_locked = 0;
res = check_fd_events(ps, tvp, no_fds, &ps_locked);
- ERTS_POLLSET_SET_POLLER_WOKEN(ps);
+ woke_up(ps);
if (res == 0) {
res = ETIMEDOUT;
@@ -2099,9 +2082,7 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps,
check_poll_result(pr, no_fds);
#endif
- res = (no_fds == 0
- ? (ERTS_POLLSET_UNSET_INTERRUPTED_CHK(ps) ? EINTR : EAGAIN)
- : 0);
+ res = (no_fds == 0 ? (is_interrupted_reset(ps) ? EINTR : EAGAIN) : 0);
*len = no_fds;
}
@@ -2112,7 +2093,7 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps,
#endif
done:
- erts_smp_atomic_set(&ps->timeout, ERTS_AINT_T_MAX);
+ erts_smp_atomic32_set_relb(&ps->timeout, ERTS_AINT32_T_MAX);
#ifdef ERTS_POLL_DEBUG_PRINT
erts_printf("Leaving %s = erts_poll_wait()\n",
res == 0 ? "0" : erl_errno_id(res));
@@ -2128,20 +2109,17 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps,
void
ERTS_POLL_EXPORT(erts_poll_interrupt)(ErtsPollSet ps, int set)
{
+#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(ERTS_SMP)
/*
* NOTE: This function might be called from signal handlers in the
* non-smp case; therefore, it has to be async-signal safe in
* the non-smp case.
*/
- if (set) {
- ERTS_POLLSET_SET_INTERRUPTED(ps);
-#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(ERTS_SMP)
- wake_poller(ps);
+ if (!set)
+ reset_wakeup_state(ps);
+ else
+ wake_poller(ps, 1);
#endif
- }
- else {
- ERTS_POLLSET_UNSET_INTERRUPTED(ps);
- }
}
/*
@@ -2154,13 +2132,12 @@ ERTS_POLL_EXPORT(erts_poll_interrupt_timed)(ErtsPollSet ps,
int set,
long msec)
{
- if (set) {
- if (erts_smp_atomic_read(&ps->timeout) > (erts_aint_t) msec) {
- ERTS_POLLSET_SET_INTERRUPTED(ps);
#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(ERTS_SMP)
- wake_poller(ps);
-#endif
- }
+ if (!set)
+ reset_wakeup_state(ps);
+ else {
+ if (erts_smp_atomic32_read_acqb(&ps->timeout) > (erts_aint32_t) msec)
+ wake_poller(ps, 1);
#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS
else {
if (ERTS_POLLSET_IS_POLLED(ps))
@@ -2170,9 +2147,7 @@ ERTS_POLL_EXPORT(erts_poll_interrupt_timed)(ErtsPollSet ps,
erts_smp_atomic_inc(&ps->no_interrupt_timed);
#endif
}
- else {
- ERTS_POLLSET_UNSET_INTERRUPTED(ps);
- }
+#endif
}
int
@@ -2283,14 +2258,16 @@ ERTS_POLL_EXPORT(erts_poll_create_pollset)(void)
ps->update_requests.next = NULL;
ps->update_requests.len = 0;
ps->curr_upd_req_block = &ps->update_requests;
- erts_smp_atomic_init(&ps->have_update_requests, 0);
+ erts_smp_atomic32_init(&ps->have_update_requests, 0);
#endif
#ifdef ERTS_SMP
- erts_smp_atomic_init(&ps->polled, 0);
- erts_smp_atomic_init(&ps->woken, 0);
+ erts_atomic32_init(&ps->polled, 0);
erts_smp_mtx_init(&ps->mtx, "pollset");
+#endif
+#ifdef ERTS_SMP
+ erts_atomic32_init(&ps->wakeup_state, (erts_aint32_t) 0);
#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
- ps->woken = 0;
+ ps->wakeup_state = 0;
#endif
#if ERTS_POLL_USE_WAKEUP_PIPE
create_wakeup_pipe(ps);
@@ -2312,12 +2289,7 @@ ERTS_POLL_EXPORT(erts_poll_create_pollset)(void)
ps->internal_fd_limit = kp_fd + 1;
ps->kp_fd = kp_fd;
#endif
-#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP)
- ps->interrupt = 0;
-#else
- erts_smp_atomic_init(&ps->interrupt, 0);
-#endif
- erts_smp_atomic_init(&ps->timeout, ERTS_AINT_T_MAX);
+ erts_smp_atomic32_init(&ps->timeout, ERTS_AINT32_T_MAX);
#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS
erts_smp_atomic_init(&ps->no_avoided_wakeups, 0);
erts_smp_atomic_init(&ps->no_avoided_interrupts, 0);