From 9cc0332d42f58b69fbd10123e56c9e246ec4023b Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Tue, 11 Jan 2011 00:09:05 +0100 Subject: Simplify erts_poll_wait() wakeup logic --- erts/emulator/sys/win32/erl_poll.c | 382 ++++++++++++++++---------------- erts/emulator/sys/win32/sys_interrupt.c | 8 +- 2 files changed, 195 insertions(+), 195 deletions(-) (limited to 'erts/emulator/sys/win32') diff --git a/erts/emulator/sys/win32/erl_poll.c b/erts/emulator/sys/win32/erl_poll.c index d84ae2ede2..1f2877b682 100644 --- a/erts/emulator/sys/win32/erl_poll.c +++ b/erts/emulator/sys/win32/erl_poll.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 2007-2010. All Rights Reserved. + * Copyright Ericsson AB 2007-2011. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in @@ -274,7 +274,6 @@ struct ErtsPollSet_ { Waiter** waiter; int allocated_waiters; /* Size ow waiter array */ int num_waiters; /* Number of waiter threads. */ - erts_atomic_t sys_io_ready; /* Tells us there is I/O ready (already). */ int restore_events; /* Tells us to restore waiters events next time around */ HANDLE event_io_ready; /* To be used when waiting for io */ @@ -282,12 +281,11 @@ struct ErtsPollSet_ { volatile int standby_wait_counter; /* Number of threads to wait for */ CRITICAL_SECTION standby_crit; /* CS to guard the counter */ HANDLE standby_wait_event; /* Event signalled when counte == 0 */ + erts_atomic32_t wakeup_state; #ifdef ERTS_SMP - erts_smp_atomic_t woken; erts_smp_mtx_t mtx; - erts_smp_atomic_t interrupt; #endif - erts_smp_atomic_t timeout; + erts_smp_atomic32_t timeout; }; #ifdef ERTS_SMP @@ -296,126 +294,24 @@ struct ErtsPollSet_ { erts_smp_mtx_lock(&(PS)->mtx) #define ERTS_POLLSET_UNLOCK(PS) \ erts_smp_mtx_unlock(&(PS)->mtx) -#define ERTS_POLLSET_SET_POLLED_CHK(PS) \ - ((int) erts_smp_atomic_xchg(&(PS)->polled, (erts_aint_t) 1)) -#define ERTS_POLLSET_SET_POLLED(PS) \ - erts_smp_atomic_set(&(PS)->polled, (erts_aint_t) 1) -#define ERTS_POLLSET_UNSET_POLLED(PS) \ - erts_smp_atomic_set(&(PS)->polled, (erts_aint_t) 0) -#define ERTS_POLLSET_IS_POLLED(PS) \ - ((int) erts_smp_atomic_read(&(PS)->polled)) - -#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) set_poller_woken_chk((PS)) -#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) \ -do { \ - ERTS_THR_MEMORY_BARRIER; \ - erts_smp_atomic_set(&(PS)->woken, (erts_aint_t) 1); \ -} while (0) -#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) \ -do { \ - erts_smp_atomic_set(&(PS)->woken, (erts_aint_t) 0); \ - ERTS_THR_MEMORY_BARRIER; \ -} while (0) -#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) \ - ((int) erts_smp_atomic_read(&(PS)->woken)) - -#define ERTS_POLLSET_UNSET_INTERRUPTED_CHK(PS) unset_interrupted_chk((PS)) -#define ERTS_POLLSET_UNSET_INTERRUPTED(PS) \ -do { \ - erts_smp_atomic_set(&(PS)->interrupt, (erts_aint_t) 0); \ - ERTS_THR_MEMORY_BARRIER; \ -} while (0) -#define ERTS_POLLSET_SET_INTERRUPTED(PS) \ -do { \ - ERTS_THR_MEMORY_BARRIER; \ - erts_smp_atomic_set(&(PS)->interrupt, (erts_aint_t) 1); \ -} while (0) -#define ERTS_POLLSET_IS_INTERRUPTED(PS) \ - ((int) erts_smp_atomic_read(&(PS)->interrupt)) - -static ERTS_INLINE int -unset_interrupted_chk(ErtsPollSet ps) -{ - int res = (int) erts_smp_atomic_xchg(&ps->interrupt, (erts_aint_t) 0); - ERTS_THR_MEMORY_BARRIER; - return res; - -} - -static ERTS_INLINE int -set_poller_woken_chk(ErtsPollSet ps) -{ - ERTS_THR_MEMORY_BARRIER; - return (int) erts_smp_atomic_xchg(&ps->woken, (erts_aint_t) 1); -} #else #define ERTS_POLLSET_LOCK(PS) #define ERTS_POLLSET_UNLOCK(PS) -#define ERTS_POLLSET_SET_POLLED_CHK(PS) 0 -#define ERTS_POLLSET_UNSET_POLLED(PS) -#define ERTS_POLLSET_IS_POLLED(PS) 0 -#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) 1 -#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) -#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) -#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) 1 - #endif -/* - * While atomics are not yet implemented for windows in the common library... - * - * MSDN doc states that SMP machines and old compilers require - * InterLockedExchange to properly read and write interlocked - * variables, otherwise the processors might reschedule - * the access and order of atomics access is destroyed... - * While they only mention it in white-papers, the problem - * in VS2003 is due to the IA64 arch, so we can still count - * on the CPU not rescheduling the access to volatile in X86 arch using - * even the slightly older compiler... - * - * So here's (hopefully) a subset of the generally working atomic - * variable access... - */ - -#if defined(__GNUC__) -# if defined(__i386__) || defined(__x86_64__) -# define VOLATILE_IN_SEQUENCE 1 -# else -# define VOLATILE_IN_SEQUENCE 0 -# endif -#elif defined(_MSC_VER) -# if _MSC_VER < 1300 -# define VOLATILE_IN_SEQUENCE 0 /* Dont trust really old compilers */ -# else -# if defined(_M_IX86) -# define VOLATILE_IN_SEQUENCE 1 -# else /* I.e. IA64 */ -# if _MSC_VER >= 1400 -# define VOLATILE_IN_SEQUENCE 1 -# else -# define VOLATILE_IN_SEQUENCE 0 -# endif -# endif -# endif -#else -# define VOLATILE_IN_SEQUENCE 0 -#endif - - - /* * Communication with sys_interrupt */ #ifdef ERTS_SMP -extern erts_smp_atomic_t erts_break_requested; +extern erts_smp_atomic32_t erts_break_requested; #define ERTS_SET_BREAK_REQUESTED \ - erts_smp_atomic_set(&erts_break_requested, (erts_aint_t) 1) + erts_smp_atomic32_set(&erts_break_requested, (erts_aint32_t) 1) #define ERTS_UNSET_BREAK_REQUESTED \ - erts_smp_atomic_set(&erts_break_requested, (erts_aint_t) 0) + erts_smp_atomic32_set(&erts_break_requested, (erts_aint32_t) 0) #else extern volatile int erts_break_requested; #define ERTS_SET_BREAK_REQUESTED (erts_break_requested = 1) @@ -424,7 +320,7 @@ extern volatile int erts_break_requested; static erts_mtx_t break_waiter_lock; static HANDLE break_happened_event; -static erts_atomic_t break_waiter_state; +static erts_atomic32_t break_waiter_state; #define BREAK_WAITER_GOT_BREAK 1 #define BREAK_WAITER_GOT_HALT 2 @@ -467,29 +363,168 @@ do { \ wait_standby(PS); \ } while(0) -#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP) +#define ERTS_POLL_NOT_WOKEN ((erts_aint32_t) 0) +#define ERTS_POLL_WOKEN_IO_READY ((erts_aint32_t) 1) +#define ERTS_POLL_WOKEN_INTR ((erts_aint32_t) 2) +#define ERTS_POLL_WOKEN_TIMEDOUT ((erts_aint32_t) 3) static ERTS_INLINE int -unset_interrupted_chk(ErtsPollSet ps) +is_io_ready(ErtsPollSet ps) { - /* This operation isn't atomic, but we have no need at all for an - atomic operation here... */ - int res = ps->interrupt; - ps->interrupt = 0; - return res; + return erts_atomic32_read(&ps->wakeup_state) == ERTS_POLL_WOKEN_IO_READY; } +static ERTS_INLINE void +woke_up(ErtsPollSet ps) +{ + if (erts_atomic32_read(&ps->wakeup_state) == ERTS_POLL_NOT_WOKEN) + erts_atomic32_cmpxchg(&ps->wakeup_state, + ERTS_POLL_WOKEN_TIMEDOUT, + ERTS_POLL_NOT_WOKEN); +#ifdef DEBUG + { + erts_aint32_t wakeup_state = erts_atomic32_read(&ps->wakeup_state); + switch (wakeup_state) { + case ERTS_POLL_WOKEN_IO_READY: + case ERTS_POLL_WOKEN_INTR: + case ERTS_POLL_WOKEN_TIMEDOUT: + break; + default: + ASSERT(0); + break; + } + } #endif +} + +static ERTS_INLINE int +wakeup_cause(ErtsPollSet ps) +{ + int res; + erts_aint32_t wakeup_state = erts_atomic32_read(&ps->wakeup_state); + switch (wakeup_state) { + case ERTS_POLL_WOKEN_IO_READY: + res = 0; + break; + case ERTS_POLL_WOKEN_INTR: + res = EINTR; + break; + case ERTS_POLL_WOKEN_TIMEDOUT: + res = ETIMEDOUT; + break; + default: + res = 0; + erl_exit(ERTS_ABORT_EXIT, + "%s:%d: Internal error: Invalid wakeup_state=%d\n", + __FILE__, __LINE__, (int) wakeup_state); + } + return res; +} + +static ERTS_INLINE DWORD +poll_wait_timeout(ErtsPollSet ps, SysTimeval *tvp) +{ + time_t timeout = tvp->tv_sec * 1000 + tvp->tv_usec / 1000; + + if (timeout <= 0) { + woke_up(ps); + return (DWORD) 0; + } + + ResetEvent(ps->event_io_ready); + /* + * Since we don't know the internals of ResetEvent() we issue + * a memory barrier as a safety precaution ensuring that + * the load of wakeup_state wont be reordered with stores made + * by ResetEvent(). + */ + ERTS_THR_MEMORY_BARRIER; + if (erts_atomic32_read(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN) + return (DWORD) 0; + + if (timeout > ERTS_AINT32_T_MAX) /* Also prevents DWORD overflow */ + timeout = ERTS_AINT32_T_MAX; + + erts_smp_atomic32_set_relb(&ps->timeout, (erts_aint32_t) timeout); + return (DWORD) timeout; +} -#ifdef ERTS_SMP static ERTS_INLINE void -wake_poller(ErtsPollSet ps) +wake_poller(ErtsPollSet ps, int io_ready) { - if (!ERTS_POLLSET_SET_POLLER_WOKEN_CHK(ps)) { + erts_aint32_t wakeup_state = erts_atomic32_read(&ps->wakeup_state); + if (io_ready) { + /* We may set the event multiple times. This is, however, harmless. */ + erts_atomic32_set(&ps->wakeup_state, ERTS_POLL_WOKEN_IO_READY); + } + else { + while (wakeup_state != ERTS_POLL_WOKEN_IO_READY + && wakeup_state != ERTS_POLL_WOKEN_INTR) { + erts_aint32_t act = erts_atomic32_cmpxchg(&ps->wakeup_state, + ERTS_POLL_WOKEN_INTR, + wakeup_state); + if (act == wakeup_state) { + wakeup_state = act; + break; + } + wakeup_state = act; + } + } + if (wakeup_state == ERTS_POLL_NOT_WOKEN) { + /* + * Since we don't know the internals of SetEvent() we issue + * a memory barrier as a safety precaution ensuring that + * the store we just made to wakeup_state wont be reordered + * with loads in SetEvent(). + */ + ERTS_THR_MEMORY_BARRIER; SetEvent(ps->event_io_ready); } } -#endif + +static ERTS_INLINE void +reset_io_ready(ErtsPollSet ps) +{ + erts_atomic32_set(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN); +} + +static ERTS_INLINE void +restore_io_ready(ErtsPollSet ps) +{ + erts_atomic32_set(&ps->wakeup_state, ERTS_POLL_WOKEN_IO_READY); +} + +/* + * notify_io_ready() is used by threads waiting for events, when + * notifying a poller thread about I/O ready. + */ +static ERTS_INLINE void +notify_io_ready(ErtsPollSet ps) +{ + wake_poller(ps, 1); +} + +static ERTS_INLINE void +reset_interrupt(ErtsPollSet ps) +{ + /* We need to keep io-ready if set */ + erts_aint32_t wakeup_state = erts_atomic32_read(&ps->wakeup_state); + while (wakeup_state != ERTS_POLL_WOKEN_IO_READY + && wakeup_state != ERTS_POLL_NOT_WOKEN) { + erts_aint32_t act = erts_atomic32_cmpxchg(&ps->wakeup_state, + ERTS_POLL_NOT_WOKEN, + wakeup_state); + if (wakeup_state == act) + break; + wakeup_state = act; + } +} + +static ERTS_INLINE void +set_interrupt(ErtsPollSet ps) +{ + wake_poller(ps, 0); +} static void setup_standby_wait(ErtsPollSet ps, int num_threads) { @@ -653,14 +688,14 @@ static void *break_waiter(void *param) case WAIT_OBJECT_0: ResetEvent(harr[0]); erts_mtx_lock(&break_waiter_lock); - erts_atomic_set(&break_waiter_state,BREAK_WAITER_GOT_BREAK); + erts_atomic32_set(&break_waiter_state,BREAK_WAITER_GOT_BREAK); SetEvent(break_happened_event); erts_mtx_unlock(&break_waiter_lock); break; case (WAIT_OBJECT_0+1): ResetEvent(harr[1]); erts_mtx_lock(&break_waiter_lock); - erts_atomic_set(&break_waiter_state,BREAK_WAITER_GOT_HALT); + erts_atomic32_set(&break_waiter_state,BREAK_WAITER_GOT_HALT); SetEvent(break_happened_event); erts_mtx_unlock(&break_waiter_lock); break; @@ -767,12 +802,7 @@ event_happened: consistency_check(w); #endif ASSERT(WAIT_OBJECT_0 < i && i < WAIT_OBJECT_0+w->active_events); - if (!erts_atomic_xchg(&ps->sys_io_ready,1)) { - HARDDEBUGF(("SET EventIoReady (%d)",erts_atomic_read(&ps->sys_io_ready))); - SetEvent(ps->event_io_ready); - } else { - HARDDEBUGF(("DONT SET EventIoReady")); - } + notify_io_ready(ps); /* * The main thread wont start working on our arrays untill we're @@ -967,15 +997,10 @@ static int cancel_driver_select(ErtsPollSet ps, HANDLE event) void erts_poll_interrupt(ErtsPollSet ps, int set /* bool */) { HARDTRACEF(("In erts_poll_interrupt(%d)",set)); -#ifdef ERTS_SMP - if (set) { - ERTS_POLLSET_SET_INTERRUPTED(ps); - wake_poller(ps); - } - else { - ERTS_POLLSET_UNSET_INTERRUPTED(ps); - } -#endif + if (!set) + reset_interrupt(ps); + else + set_interrupt(ps); HARDTRACEF(("Out erts_poll_interrupt(%d)",set)); } @@ -984,17 +1009,10 @@ void erts_poll_interrupt_timed(ErtsPollSet ps, long msec) { HARDTRACEF(("In erts_poll_interrupt_timed(%d,%ld)",set,msec)); -#ifdef ERTS_SMP - if (set) { - if (erts_smp_atomic_read(&ps->timeout) > (erts_aint_t) msec) { - ERTS_POLLSET_SET_INTERRUPTED(ps); - wake_poller(ps); - } - } - else { - ERTS_POLLSET_UNSET_INTERRUPTED(ps); - } -#endif + if (!set) + reset_interrupt(ps); + else if (erts_smp_atomic32_read_acqb(&ps->timeout) > (erts_aint32_t) msec) + set_interrupt(ps); HARDTRACEF(("Out erts_poll_interrupt_timed")); } @@ -1068,10 +1086,8 @@ void erts_poll_controlv(ErtsPollSet ps, int erts_poll_wait(ErtsPollSet ps, ErtsPollResFd pr[], int *len, - SysTimeval *utvp) + SysTimeval *tvp) { - SysTimeval *tvp = utvp; - SysTimeval itv; int no_fds; DWORD timeout; EventData* ev; @@ -1084,7 +1100,7 @@ int erts_poll_wait(ErtsPollSet ps, HARDTRACEF(("In erts_poll_wait")); ERTS_POLLSET_LOCK(ps); - if (!erts_atomic_read(&ps->sys_io_ready) && ps->restore_events) { + if (!is_io_ready(ps) && ps->restore_events) { HARDDEBUGF(("Restore events: %d",ps->num_waiters)); ps->restore_events = 0; for (i = 0; i < ps->num_waiters; ++i) { @@ -1102,7 +1118,7 @@ int erts_poll_wait(ErtsPollSet ps, if (w->highwater != w->active_events) { HARDDEBUGF(("Oups!")); /* Oups, got signalled before we took the lock, can't reset */ - if(erts_atomic_read(&ps->sys_io_ready) == 0) { + if(!is_io_ready(ps)) { erl_exit(1,"Internal error: " "Inconsistent io structures in erl_poll.\n"); } @@ -1127,39 +1143,27 @@ int erts_poll_wait(ErtsPollSet ps, no_fds = ERTS_POLL_MAX_RES; #endif + timeout = poll_wait_timeout(ps, tvp); - ResetEvent(ps->event_io_ready); - ERTS_POLLSET_UNSET_POLLER_WOKEN(ps); - -#ifdef ERTS_SMP - if (ERTS_POLLSET_IS_INTERRUPTED(ps)) { - /* Interrupt use zero timeout */ - itv.tv_sec = 0; - itv.tv_usec = 0; - tvp = &itv; - } -#endif - - timeout = tvp->tv_sec * 1000 + tvp->tv_usec / 1000; /*HARDDEBUGF(("timeout = %ld",(long) timeout));*/ - erts_smp_atomic_set(&ps->timeout, timeout); - if (timeout > 0 && ! erts_atomic_read(&ps->sys_io_ready) && ! erts_atomic_read(&break_waiter_state)) { + if (timeout > 0 && !erts_atomic32_read(&break_waiter_state)) { HANDLE harr[2] = {ps->event_io_ready, break_happened_event}; int num_h = 2; - HARDDEBUGF(("Start waiting %d [%d]",num_h, (long) timeout)); + HARDDEBUGF(("Start waiting %d [%d]",num_h, (int) timeout)); ERTS_POLLSET_UNLOCK(ps); WaitForMultipleObjects(num_h, harr, FALSE, timeout); ERTS_POLLSET_LOCK(ps); - HARDDEBUGF(("Stop waiting %d [%d]",num_h, (long) timeout)); + HARDDEBUGF(("Stop waiting %d [%d]",num_h, (int) timeout)); + woke_up(ps); } ERTS_UNSET_BREAK_REQUESTED; - if(erts_atomic_read(&break_waiter_state)) { + if(erts_atomic32_read(&break_waiter_state)) { erts_mtx_lock(&break_waiter_lock); - break_state = erts_atomic_read(&break_waiter_state); - erts_atomic_set(&break_waiter_state,0); + break_state = erts_atomic32_read(&break_waiter_state); + erts_atomic32_set(&break_waiter_state,0); ResetEvent(break_happened_event); erts_mtx_unlock(&break_waiter_lock); switch (break_state) { @@ -1174,15 +1178,13 @@ int erts_poll_wait(ErtsPollSet ps, } } - ERTS_POLLSET_SET_POLLER_WOKEN(ps); - - if (!erts_atomic_read(&ps->sys_io_ready)) { - res = EINTR; - HARDDEBUGF(("EINTR!")); - goto done; + res = wakeup_cause(ps); + if (res != 0) { + HARDDEBUGF(("%s!", res == EINTR ? "EINTR" : "ETIMEDOUT")); + goto done; } - erts_atomic_set(&ps->sys_io_ready,0); + reset_io_ready(ps); n = ps->num_waiters; @@ -1204,9 +1206,9 @@ int erts_poll_wait(ErtsPollSet ps, if (num >= no_fds) { w->highwater=j+1; erts_mtx_unlock(&w->mtx); - /* This might mean we still have data to report, set - back the global flag! */ - erts_atomic_set(&ps->sys_io_ready,1); + /* This might mean we still have data to report, + restore flag indicating I/O ready! */ + restore_io_ready(ps); HARDDEBUGF(("To many FD's to report!")); goto done; } @@ -1228,7 +1230,7 @@ int erts_poll_wait(ErtsPollSet ps, erts_mtx_unlock(&w->mtx); } done: - erts_smp_atomic_set(&ps->timeout, ERTS_AINT_T_MAX); + erts_smp_atomic32_set(&ps->timeout, ERTS_AINT32_T_MAX); *len = num; ERTS_POLLSET_UNLOCK(ps); HARDTRACEF(("Out erts_poll_wait")); @@ -1306,15 +1308,13 @@ ErtsPollSet erts_poll_create_pollset(void) ps->standby_wait_counter = 0; ps->event_io_ready = CreateManualEvent(FALSE); ps->standby_wait_event = CreateManualEvent(FALSE); - erts_atomic_init(&ps->sys_io_ready,0); ps->restore_events = 0; + erts_atomic32_init(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN); #ifdef ERTS_SMP - erts_smp_atomic_init(&ps->woken, 0); erts_smp_mtx_init(&ps->mtx, "pollset"); - erts_smp_atomic_init(&ps->interrupt, 0); #endif - erts_smp_atomic_init(&ps->timeout, ERTS_AINT_T_MAX); + erts_smp_atomic32_init(&ps->timeout, ERTS_AINT32_T_MAX); HARDTRACEF(("Out erts_poll_create_pollset")); return ps; @@ -1366,7 +1366,7 @@ void erts_poll_init(void) erts_mtx_init(&break_waiter_lock,"break_waiter_lock"); break_happened_event = CreateManualEvent(FALSE); - erts_atomic_init(&break_waiter_state, 0); + erts_atomic32_init(&break_waiter_state, 0); erts_thr_create(&thread, &break_waiter, NULL, NULL); ERTS_UNSET_BREAK_REQUESTED; diff --git a/erts/emulator/sys/win32/sys_interrupt.c b/erts/emulator/sys/win32/sys_interrupt.c index 262f84babc..943c338794 100644 --- a/erts/emulator/sys/win32/sys_interrupt.c +++ b/erts/emulator/sys/win32/sys_interrupt.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1997-2010. All Rights Reserved. + * Copyright Ericsson AB 1997-2011. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in @@ -31,11 +31,11 @@ #endif #ifdef ERTS_SMP -erts_smp_atomic_t erts_break_requested; +erts_smp_atomic32_t erts_break_requested; #define ERTS_SET_BREAK_REQUESTED \ - erts_smp_atomic_set(&erts_break_requested, (erts_aint_t) 1) + erts_smp_atomic32_set(&erts_break_requested, (erts_aint32_t) 1) #define ERTS_UNSET_BREAK_REQUESTED \ - erts_smp_atomic_set(&erts_break_requested, (erts_aint_t) 0) + erts_smp_atomic32_set(&erts_break_requested, (erts_aint32_t) 0) #else volatile int erts_break_requested = 0; #define ERTS_SET_BREAK_REQUESTED (erts_break_requested = 1) -- cgit v1.2.3