diff options
author | Rickard Green <[email protected]> | 2015-02-13 03:18:16 +0100 |
---|---|---|
committer | Rickard Green <[email protected]> | 2015-03-20 15:28:52 +0100 |
commit | fa7b2c00cbf9212c4a3551980939b92fc6606510 (patch) | |
tree | e3978ac46d61aff7b6f66e67fd763de57a4414bd | |
parent | 6487aac5977cf470bc6a2cd0964da2850ee38717 (diff) | |
download | otp-fa7b2c00cbf9212c4a3551980939b92fc6606510.tar.gz otp-fa7b2c00cbf9212c4a3551980939b92fc6606510.tar.bz2 otp-fa7b2c00cbf9212c4a3551980939b92fc6606510.zip |
Implement ethread events with timeout
-rw-r--r-- | erts/aclocal.m4 | 44 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 6 | ||||
-rw-r--r-- | erts/emulator/beam/erl_threads.h | 23 | ||||
-rw-r--r-- | erts/include/internal/ethr_internal.h | 27 | ||||
-rw-r--r-- | erts/include/internal/ethread_header_config.h.in | 4 | ||||
-rw-r--r-- | erts/include/internal/pthread/ethr_event.h | 59 | ||||
-rw-r--r-- | erts/include/internal/win/ethr_event.h | 2 | ||||
-rw-r--r-- | erts/lib_src/common/ethr_aux.c | 2 | ||||
-rw-r--r-- | erts/lib_src/pthread/ethr_event.c | 379 | ||||
-rw-r--r-- | erts/lib_src/pthread/ethread.c | 148 | ||||
-rw-r--r-- | erts/lib_src/win/ethr_event.c | 53 |
11 files changed, 683 insertions, 64 deletions
diff --git a/erts/aclocal.m4 b/erts/aclocal.m4 index 989f697201..1e449bf874 100644 --- a/erts/aclocal.m4 +++ b/erts/aclocal.m4 @@ -1460,6 +1460,50 @@ case "$THR_LIB_NAME" in AC_DEFINE(ETHR_HAVE_PTHREAD_ATTR_SETGUARDSIZE, 1, \ [Define if you have the pthread_attr_setguardsize function.])) + if test "x$erl_monotonic_clock_id" != "x"; then + AC_MSG_CHECKING(whether pthread_cond_timedwait() can use the monotonic clock $erl_monotonic_clock_id for timeout) + pthread_cond_timedwait_monotonic=no + AC_TRY_LINK([ + #if defined(ETHR_NEED_NPTL_PTHREAD_H) + # include <nptl/pthread.h> + #elif defined(ETHR_HAVE_MIT_PTHREAD_H) + # include <pthread/mit/pthread.h> + #elif defined(ETHR_HAVE_PTHREAD_H) + # include <pthread.h> + #endif + #ifdef ETHR_TIME_WITH_SYS_TIME + # include <time.h> + # include <sys/time.h> + #else + # ifdef ETHR_HAVE_SYS_TIME_H + # include <sys/time.h> + # else + # include <time.h> + # endif + #endif + #if defined(ETHR_HAVE_MACH_CLOCK_GET_TIME) + # include <mach/clock.h> + # include <mach/mach.h> + #endif + ], + [ + int res; + pthread_condattr_t attr; + pthread_cond_t cond; + struct timespec cond_timeout; + pthread_mutex_t mutex; + res = pthread_condattr_init(&attr); + res = pthread_condattr_setclock(&attr, ETHR_MONOTONIC_CLOCK_ID); + res = pthread_cond_init(&cond, &attr); + res = pthread_cond_timedwait(&cond, &mutex, &cond_timeout); + ], + [pthread_cond_timedwait_monotonic=yes]) + AC_MSG_RESULT([$pthread_cond_timedwait_monotonic]) + if test $pthread_cond_timedwait_monotonic = yes; then + AC_DEFINE(ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC, [1], [Define if pthread_cond_timedwait() can be used with a monotonic clock]) + fi + fi + linux_futex=no AC_MSG_CHECKING([for Linux futexes]) AC_TRY_LINK([ diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 81bc3d2429..9dcaf2fdca 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -2806,7 +2806,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) ASSERT(flgs & ERTS_SSI_FLG_TSE_SLEEPING); ASSERT(flgs & ERTS_SSI_FLG_WAITING); do { - res = erts_tse_wait(ssi->event); + res = erts_tse_twait(ssi->event, -1); } while (res == EINTR); } } @@ -6594,7 +6594,7 @@ suspend_scheduler(ErtsSchedulerData *esdp) int res; do { - res = erts_tse_wait(ssi->event); + res = erts_tse_twait(ssi->event, -1); } while (res == EINTR); } } @@ -6800,7 +6800,7 @@ suspend_scheduler(ErtsSchedulerData *esdp) int res; do { - res = erts_tse_wait(ssi->event); + res = erts_tse_twait(ssi->event, -1); } while (res == EINTR); } } diff --git a/erts/emulator/beam/erl_threads.h b/erts/emulator/beam/erl_threads.h index 7214f3ea33..b9f34d4c12 100644 --- a/erts/emulator/beam/erl_threads.h +++ b/erts/emulator/beam/erl_threads.h @@ -651,6 +651,8 @@ ERTS_GLB_INLINE void erts_tse_set(erts_tse_t *ep); ERTS_GLB_INLINE void erts_tse_reset(erts_tse_t *ep); ERTS_GLB_INLINE int erts_tse_wait(erts_tse_t *ep); ERTS_GLB_INLINE int erts_tse_swait(erts_tse_t *ep, int spincount); +ERTS_GLB_INLINE int erts_tse_twait(erts_tse_t *ep, Sint64 tmo); +ERTS_GLB_INLINE int erts_tse_stwait(erts_tse_t *ep, int spincount, Sint64 tmo); ERTS_GLB_INLINE int erts_tse_is_tmp(erts_tse_t *ep); ERTS_GLB_INLINE void erts_thr_set_main_status(int, int); ERTS_GLB_INLINE int erts_thr_get_main_status(void); @@ -3473,6 +3475,27 @@ ERTS_GLB_INLINE int erts_tse_swait(erts_tse_t *ep, int spincount) #endif } +ERTS_GLB_INLINE int erts_tse_twait(erts_tse_t *ep, Sint64 tmo) +{ +#ifdef USE_THREADS + return ethr_event_twait(&((ethr_ts_event *) ep)->event, + (ethr_sint64_t) tmo); +#else + return ENOTSUP; +#endif +} + +ERTS_GLB_INLINE int erts_tse_stwait(erts_tse_t *ep, int spincount, Sint64 tmo) +{ +#ifdef USE_THREADS + return ethr_event_stwait(&((ethr_ts_event *) ep)->event, + spincount, + (ethr_sint64_t) tmo); +#else + return ENOTSUP; +#endif +} + ERTS_GLB_INLINE int erts_tse_is_tmp(erts_tse_t *ep) { #ifdef USE_THREADS diff --git a/erts/include/internal/ethr_internal.h b/erts/include/internal/ethr_internal.h index c9b1db5b46..65195145af 100644 --- a/erts/include/internal/ethr_internal.h +++ b/erts/include/internal/ethr_internal.h @@ -57,6 +57,33 @@ ETHR_PROTO_NORETURN__ ethr_abort__(void); int ethr_win_get_errno__(void); #endif +#ifdef ETHR_INCLUDE_MONOTONIC_CLOCK__ +#undef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME +#if defined(ETHR_HAVE_CLOCK_GETTIME_MONOTONIC) \ + || defined(ETHR_HAVE_MACH_CLOCK_GET_TIME) \ + || defined(ETHR_HAVE_GETHRTIME) +#ifdef ETHR_TIME_WITH_SYS_TIME +# include <time.h> +# include <sys/time.h> +#else +# ifdef ETHR_HAVE_SYS_TIME_H +# include <sys/time.h> +# else +# include <time.h> +# endif +#endif +#ifdef ETHR_HAVE_MACH_CLOCK_GET_TIME +#include <mach/clock.h> +#include <mach/mach.h> +#endif +#define ETHR_HAVE_ETHR_GET_MONOTONIC_TIME +ethr_sint64_t ethr_get_monotonic_time(void); +int ethr_get_monotonic_time_is_broken(void); +#endif +#endif /* ETHR_INCLUDE_MONOTONIC_CLOCK__ */ + +void ethr_init_event__(void); + /* implemented in lib_src/common/ethread_aux.c */ int ethr_init_common__(ethr_init_data *id); int ethr_late_init_common__(ethr_late_init_data *lid); diff --git a/erts/include/internal/ethread_header_config.h.in b/erts/include/internal/ethread_header_config.h.in index ffd9d350e2..618fcba380 100644 --- a/erts/include/internal/ethread_header_config.h.in +++ b/erts/include/internal/ethread_header_config.h.in @@ -250,5 +250,5 @@ /* Define to the monotonic clock id to use */ #undef ETHR_MONOTONIC_CLOCK_ID - - +/* Define if pthread_cond_timedwait() can be used with a monotonic clock */ +#undef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC diff --git a/erts/include/internal/pthread/ethr_event.h b/erts/include/internal/pthread/ethr_event.h index d0a77990cc..f67bac858b 100644 --- a/erts/include/internal/pthread/ethr_event.h +++ b/erts/include/internal/pthread/ethr_event.h @@ -46,12 +46,12 @@ typedef struct { ethr_atomic32_t futex; } ethr_event; -#define ETHR_FUTEX__(FTX, OP, VAL) \ +#define ETHR_FUTEX__(FTX, OP, VAL, TIMEOUT) \ (-1 == syscall(__NR_futex, \ (void *) ethr_atomic32_addr((FTX)), \ (OP), \ (int) (VAL), \ - NULL, \ + (TIMEOUT), \ NULL, \ 0) \ ? errno : 0) @@ -64,7 +64,7 @@ ETHR_INLINE_FUNC_NAME_(ethr_event_set)(ethr_event *e) ethr_sint32_t val; val = ethr_atomic32_xchg_mb(&e->futex, ETHR_EVENT_ON__); if (val == ETHR_EVENT_OFF_WAITER__) { - int res = ETHR_FUTEX__(&e->futex, ETHR_FUTEX_WAKE__, 1); + int res = ETHR_FUTEX__(&e->futex, ETHR_FUTEX_WAKE__, 1, NULL); if (res != 0) ETHR_FATAL_ERROR__(res); } @@ -80,35 +80,58 @@ ETHR_INLINE_FUNC_NAME_(ethr_event_reset)(ethr_event *e) #endif #elif defined(ETHR_PTHREADS) -/* --- Posix mutex/cond implementation of events ---------------------------- */ +/* --- Posix mutex/cond pipe/select implementation of events ---------------- */ + typedef struct { ethr_atomic32_t state; pthread_mutex_t mtx; pthread_cond_t cnd; + int fd[2]; } ethr_event; -#define ETHR_EVENT_OFF_WAITER__ -1L -#define ETHR_EVENT_OFF__ 1L -#define ETHR_EVENT_ON__ 0L +#define ETHR_EVENT_OFF_WAITER_SELECT__ ((ethr_sint32_t) -2) +#define ETHR_EVENT_OFF_WAITER__ ((ethr_sint32_t) -1) +#define ETHR_EVENT_OFF__ ((ethr_sint32_t) 1) +#define ETHR_EVENT_ON__ ((ethr_sint32_t) 0) + +#define ETHR_EVENT_IS_WAITING__(VAL) ((VAL) < 0) #if defined(ETHR_TRY_INLINE_FUNCS) || defined(ETHR_EVENT_IMPL__) +#ifndef ETHR_HAVE_PTHREAD_TIMED_COND_MONOTONIC +#include <unistd.h> +#include <errno.h> +#endif + static void ETHR_INLINE ETHR_INLINE_FUNC_NAME_(ethr_event_set)(ethr_event *e) { ethr_sint32_t val; val = ethr_atomic32_xchg_mb(&e->state, ETHR_EVENT_ON__); - if (val == ETHR_EVENT_OFF_WAITER__) { - int res = pthread_mutex_lock(&e->mtx); - if (res != 0) - ETHR_FATAL_ERROR__(res); - res = pthread_cond_signal(&e->cnd); - if (res != 0) - ETHR_FATAL_ERROR__(res); - res = pthread_mutex_unlock(&e->mtx); - if (res != 0) - ETHR_FATAL_ERROR__(res); + if (ETHR_EVENT_IS_WAITING__(val)) { + int res; + if (val == ETHR_EVENT_OFF_WAITER_SELECT__) { + ssize_t wres; + int fd = e->fd[1]; + ETHR_ASSERT(fd >= 0); + do { + wres = write(fd, "!", 1); + } while (wres < 0 && errno == EINTR); + if (wres < 0 && errno != EAGAIN && errno != EWOULDBLOCK) + ETHR_FATAL_ERROR__(errno); + } + else { + res = pthread_mutex_lock(&e->mtx); + if (res != 0) + ETHR_FATAL_ERROR__(res); + res = pthread_cond_signal(&e->cnd); + if (res != 0) + ETHR_FATAL_ERROR__(res); + res = pthread_mutex_unlock(&e->mtx); + if (res != 0) + ETHR_FATAL_ERROR__(res); + } } } @@ -127,6 +150,8 @@ int ethr_event_init(ethr_event *e); int ethr_event_destroy(ethr_event *e); int ethr_event_wait(ethr_event *e); int ethr_event_swait(ethr_event *e, int spincount); +int ethr_event_twait(ethr_event *e, ethr_sint64_t timeout); +int ethr_event_stwait(ethr_event *e, int spincount, ethr_sint64_t timeout); #if !defined(ETHR_TRY_INLINE_FUNCS) || defined(ETHR_EVENT_IMPL__) void ethr_event_set(ethr_event *e); void ethr_event_reset(ethr_event *e); diff --git a/erts/include/internal/win/ethr_event.h b/erts/include/internal/win/ethr_event.h index 6363174a74..95e681983f 100644 --- a/erts/include/internal/win/ethr_event.h +++ b/erts/include/internal/win/ethr_event.h @@ -58,6 +58,8 @@ int ethr_event_init(ethr_event *e); int ethr_event_destroy(ethr_event *e); int ethr_event_wait(ethr_event *e); int ethr_event_swait(ethr_event *e, int spincount); +int ethr_event_twait(ethr_event *e, ethr_sint64_t timeout); +int ethr_event_stwait(ethr_event *e, int spincount, ethr_sint64_t timeout); #if !defined(ETHR_TRY_INLINE_FUNCS) || defined(ETHR_EVENT_IMPL__) void ethr_event_set(ethr_event *e); void ethr_event_reset(ethr_event *e); diff --git a/erts/lib_src/common/ethr_aux.c b/erts/lib_src/common/ethr_aux.c index b77f2178f2..1ba51882c3 100644 --- a/erts/lib_src/common/ethr_aux.c +++ b/erts/lib_src/common/ethr_aux.c @@ -148,6 +148,8 @@ ethr_init_common__(ethr_init_data *id) { int res; + ethr_init_event__(); + #if defined(ETHR_X86_RUNTIME_CONF__) x86_init(); #endif diff --git a/erts/lib_src/pthread/ethr_event.c b/erts/lib_src/pthread/ethr_event.c index 9434d60d0a..b35c599365 100644 --- a/erts/lib_src/pthread/ethr_event.c +++ b/erts/lib_src/pthread/ethr_event.c @@ -29,6 +29,9 @@ #endif #include "ethread.h" +#undef ETHR_INCLUDE_MONOTONIC_CLOCK__ +#define ETHR_INCLUDE_MONOTONIC_CLOCK__ +#include "ethr_internal.h" #if defined(ETHR_LINUX_FUTEX_IMPL__) /* --- Linux futex implementation of ethread events ------------------------- */ @@ -38,6 +41,12 @@ #define ETHR_YIELD_AFTER_BUSY_LOOPS 50 +void +ethr_init_event__(void) +{ + +} + int ethr_event_init(ethr_event *e) { @@ -52,21 +61,43 @@ ethr_event_destroy(ethr_event *e) } static ETHR_INLINE int -wait__(ethr_event *e, int spincount) +wait__(ethr_event *e, int spincount, ethr_sint64_t timeout) { unsigned sc = spincount; int res; ethr_sint32_t val; int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; + ethr_sint64_t time = 0; /* SHUT UP annoying faulty warning... */ + struct timespec ts, *tsp; +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + ethr_sint64_t start = 0; /* SHUT UP annoying faulty warning... */ +#endif if (spincount < 0) ETHR_FATAL_ERROR__(EINVAL); + if (timeout < 0) { + tsp = NULL; + } + else { + tsp = &ts; + time = timeout; + if (spincount == 0) { + val = ethr_atomic32_read(&e->futex); + if (val == ETHR_EVENT_ON__) + goto return_event_on; + goto set_timeout; + } +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + start = ethr_get_monotonic_time(); +#endif + } + while (1) { while (1) { val = ethr_atomic32_read(&e->futex); if (val == ETHR_EVENT_ON__) - return 0; + goto return_event_on; if (sc == 0) break; sc--; @@ -79,44 +110,147 @@ wait__(ethr_event *e, int spincount) } } + if (timeout >= 0) { +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + time = timeout - (ethr_get_monotonic_time() - start); +#endif + set_timeout: + if (time <= 0) { + val = ethr_atomic32_read(&e->futex); + if (val == ETHR_EVENT_ON__) + goto return_event_on; + return ETIMEDOUT; + } + ts.tv_sec = time / (1000*1000*1000); + ts.tv_nsec = time % (1000*1000*1000); + } + if (val != ETHR_EVENT_OFF_WAITER__) { val = ethr_atomic32_cmpxchg(&e->futex, ETHR_EVENT_OFF_WAITER__, ETHR_EVENT_OFF__); if (val == ETHR_EVENT_ON__) - return 0; + goto return_event_on; ETHR_ASSERT(val == ETHR_EVENT_OFF__); } res = ETHR_FUTEX__(&e->futex, ETHR_FUTEX_WAIT__, - ETHR_EVENT_OFF_WAITER__); - if (res == EINTR) + ETHR_EVENT_OFF_WAITER__, + tsp); + switch (res) { + case EINTR: + case ETIMEDOUT: + return res; + case 0: + case EWOULDBLOCK: break; - if (res != 0 && res != EWOULDBLOCK) + default: ETHR_FATAL_ERROR__(res); + } } - return res; +return_event_on: + + ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore); + + return 0; + } #elif defined(ETHR_PTHREADS) -/* --- Posix mutex/cond implementation of events ---------------------------- */ +/* --- Posix mutex/cond pipe/select implementation of events ---------------- */ + +#include <fcntl.h> +#include <sys/types.h> +#include <sys/uio.h> +#include <unistd.h> +#include <sys/select.h> +#include <errno.h> +#include <string.h> + +static void +setup_nonblocking_pipe(ethr_event *e) +{ + int flgs; + int res; + + res = pipe(e->fd); + if (res != 0) + ETHR_FATAL_ERROR__(errno); + + ETHR_ASSERT(e->fd[0] >= 0 && e->fd[1] >= 0); + + flgs = fcntl(e->fd[0], F_GETFL, 0); + fcntl(e->fd[0], F_SETFL, flgs | O_NONBLOCK); + flgs = fcntl(e->fd[1], F_GETFL, 0); + fcntl(e->fd[1], F_SETFL, flgs | O_NONBLOCK); + + ETHR_MEMBAR(ETHR_StoreStore); +} + +#define ETHR_EVENT_INVALID_FD__ -1 +#define ETHR_EVENT_COND_TIMEDWAIT__ -2 + +#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC +static pthread_condattr_t monotonic_clock_cond_attr; +#endif +static pthread_condattr_t *monotonic_clock_cond_attr_p; + +#ifndef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME +# undef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC +#endif +#ifndef ETHR_MONOTONIC_CLOCK_ID +# undef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC +#endif + +void +ethr_init_event__(void) +{ + monotonic_clock_cond_attr_p = NULL; +#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC + if (!ethr_get_monotonic_time_is_broken() + && pthread_condattr_init(&monotonic_clock_cond_attr) == 0) { + if (pthread_condattr_setclock(&monotonic_clock_cond_attr, + ETHR_MONOTONIC_CLOCK_ID) == 0) + monotonic_clock_cond_attr_p = &monotonic_clock_cond_attr; + else + pthread_condattr_destroy(&monotonic_clock_cond_attr); + } +#endif +} int ethr_event_init(ethr_event *e) { int res; + ethr_atomic32_init(&e->state, ETHR_EVENT_OFF__); + res = pthread_mutex_init(&e->mtx, NULL); if (res != 0) return res; - res = pthread_cond_init(&e->cnd, NULL); + + res = pthread_cond_init(&e->cnd, monotonic_clock_cond_attr_p); if (res != 0) { pthread_mutex_destroy(&e->mtx); return res; } + +#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC + /* + * If ethr_get_monotonic_time() is broken we + * fall back on the pipe/select solution... + */ + if (monotonic_clock_cond_attr_p) { + e->fd[0] = e->fd[1] = ETHR_EVENT_COND_TIMEDWAIT__; + return 0; + } +#endif + + e->fd[0] = e->fd[1] = ETHR_EVENT_INVALID_FD__; + return 0; } @@ -124,6 +258,10 @@ int ethr_event_destroy(ethr_event *e) { int res; + if (e->fd[0] >= 0) { + close(e->fd[0]); + close(e->fd[1]); + } res = pthread_mutex_destroy(&e->mtx); if (res != 0) return res; @@ -134,12 +272,58 @@ ethr_event_destroy(ethr_event *e) } static ETHR_INLINE int -wait__(ethr_event *e, int spincount) +wait__(ethr_event *e, int spincount, ethr_sint64_t timeout) { int sc = spincount; ethr_sint32_t val; int res, ulres; int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; + ethr_sint64_t time = 0; /* SHUT UP annoying faulty warning... */ +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + ethr_sint64_t start = 0; /* SHUT UP annoying faulty warning... */ +#endif +#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC + struct timespec cond_timeout; +#endif + + val = ethr_atomic32_read(&e->state); + if (val == ETHR_EVENT_ON__) + goto return_event_on; + + if (timeout < 0) { + if (spincount == 0) + goto set_event_off_waiter; + } + if (timeout == 0) + return ETIMEDOUT; + else { + time = timeout; + switch (e->fd[0]) { + case ETHR_EVENT_INVALID_FD__: +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + start = ethr_get_monotonic_time(); +#endif + setup_nonblocking_pipe(e); + break; +#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC + case ETHR_EVENT_COND_TIMEDWAIT__: + time += ethr_get_monotonic_time(); + cond_timeout.tv_sec = time / (1000*1000*1000); + cond_timeout.tv_nsec = time % (1000*1000*1000); + if (spincount == 0) + goto set_event_off_waiter; + break; +#endif + default: + /* Already initialized pipe... */ + if (spincount == 0) + goto set_select_timeout; +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + start = ethr_get_monotonic_time(); +#endif + break; + } + } if (spincount < 0) ETHR_FATAL_ERROR__(EINVAL); @@ -147,7 +331,8 @@ wait__(ethr_event *e, int spincount) while (1) { val = ethr_atomic32_read(&e->state); if (val == ETHR_EVENT_ON__) - return 0; + goto return_event_on; + if (sc == 0) break; sc--; @@ -160,40 +345,150 @@ wait__(ethr_event *e, int spincount) } } - if (val != ETHR_EVENT_OFF_WAITER__) { - val = ethr_atomic32_cmpxchg(&e->state, - ETHR_EVENT_OFF_WAITER__, - ETHR_EVENT_OFF__); - if (val == ETHR_EVENT_ON__) - return 0; - ETHR_ASSERT(val == ETHR_EVENT_OFF__); + if (timeout < 0 +#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC + || e->fd[0] == ETHR_EVENT_COND_TIMEDWAIT__ +#endif + ) { + + set_event_off_waiter: + + if (val != ETHR_EVENT_OFF_WAITER__) { + ethr_sint32_t act; + act = ethr_atomic32_cmpxchg(&e->state, + ETHR_EVENT_OFF_WAITER__, + val); + if (act == ETHR_EVENT_ON__) + goto return_event_on; + ETHR_ASSERT(act == val); + } + + res = pthread_mutex_lock(&e->mtx); + if (res != 0) + ETHR_FATAL_ERROR__(res); + + while (1) { + + val = ethr_atomic32_read(&e->state); + if (val == ETHR_EVENT_ON__) { + ETHR_ASSERT(res == 0); + ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore); + break; + } + +#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC + if (timeout > 0) { + res = pthread_cond_timedwait(&e->cnd, &e->mtx, &cond_timeout); + if (res == EINTR || res == ETIMEDOUT) + break; + } + else +#endif + { + res = pthread_cond_wait(&e->cnd, &e->mtx); + if (res == EINTR) + break; + } + if (res != 0) + ETHR_FATAL_ERROR__(res); + } + + ulres = pthread_mutex_unlock(&e->mtx); + if (ulres != 0) + ETHR_FATAL_ERROR__(ulres); + } + else { + int fd; + int sres; + ssize_t rres; + fd_set rset; + fd_set eset; + struct timeval select_timeout; + +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + time -= ethr_get_monotonic_time() - start; + if (time <= 0) + return ETIMEDOUT; +#endif - ETHR_ASSERT(val == ETHR_EVENT_OFF_WAITER__ - || val == ETHR_EVENT_OFF__); + set_select_timeout: - res = pthread_mutex_lock(&e->mtx); - if (res != 0) - ETHR_FATAL_ERROR__(res); + ETHR_ASSERT(time > 0); - while (1) { + /* + * timeout in nano-second, but we can only wait + * for micro-seconds... + */ + time = ((time - 1) / 1000) + 1; + + select_timeout.tv_sec = time / (1000*1000); + select_timeout.tv_usec = time % (1000*1000); + + ETHR_ASSERT(val != ETHR_EVENT_ON__); + + fd = e->fd[0]; + + /* Cleanup pipe... */ + do { + char buf[64]; + rres = read(fd, buf, sizeof(buf)); + } while (rres > 0 || (rres < 0 && errno == EINTR)); + if (rres < 0 && errno != EAGAIN && errno != EWOULDBLOCK) + ETHR_FATAL_ERROR__(errno); + + /* + * Need to verify that state is still off + * after cleaning the pipe... + */ + if (val == ETHR_EVENT_OFF_WAITER_SELECT__) { + val = ethr_atomic32_read(&e->state); + if (val == ETHR_EVENT_ON__) + goto return_event_on; + } + else { + ethr_sint32_t act; + act = ethr_atomic32_cmpxchg(&e->state, + ETHR_EVENT_OFF_WAITER_SELECT__, + val); + if (act == ETHR_EVENT_ON__) + goto return_event_on; + ETHR_ASSERT(act == val); + } + + FD_ZERO(&rset); + FD_SET(fd, &rset); + FD_ZERO(&eset); + FD_SET(fd, &eset); + + sres = select(fd + 1, &rset, NULL, &eset, &select_timeout); + if (sres == 0) + res = ETIMEDOUT; + else { + res = EINTR; + if (sres < 0 && errno != EINTR) + ETHR_FATAL_ERROR__(errno); + /* else: + * Event is *probably* set, but it can be a + * lingering writer. That is, it is important + * that we verify that it actually is set. If + * it isn't, return EINTR (spurious wakeup). + */ + } val = ethr_atomic32_read(&e->state); if (val == ETHR_EVENT_ON__) - break; + goto return_event_on; - res = pthread_cond_wait(&e->cnd, &e->mtx); - if (res == EINTR) - break; - if (res != 0) - ETHR_FATAL_ERROR__(res); } - ulres = pthread_mutex_unlock(&e->mtx); - if (ulres != 0) - ETHR_FATAL_ERROR__(ulres); + return res; + +return_event_on: + + ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore); - return res; /* 0 || EINTR */ + return 0; } #else @@ -215,11 +510,23 @@ ethr_event_set(ethr_event *e) int ethr_event_wait(ethr_event *e) { - return wait__(e, 0); + return wait__(e, 0, -1); } int ethr_event_swait(ethr_event *e, int spincount) { - return wait__(e, spincount); + return wait__(e, spincount, -1); +} + +int +ethr_event_twait(ethr_event *e, ethr_sint64_t timeout) +{ + return wait__(e, 0, timeout); +} + +int +ethr_event_stwait(ethr_event *e, int spincount, ethr_sint64_t timeout) +{ + return wait__(e, spincount, timeout); } diff --git a/erts/lib_src/pthread/ethread.c b/erts/lib_src/pthread/ethread.c index 79784c5b84..42d2abd3f1 100644 --- a/erts/lib_src/pthread/ethread.c +++ b/erts/lib_src/pthread/ethread.c @@ -49,6 +49,8 @@ #define ETHREAD_IMPL__ #include "ethread.h" +#undef ETHR_INCLUDE_MONOTONIC_CLOCK__ +#define ETHR_INCLUDE_MONOTONIC_CLOCK__ #include "ethr_internal.h" #ifndef ETHR_HAVE_ETHREAD_DEFINES @@ -232,6 +234,10 @@ ethr_x86_cpuid__(int *eax, int *ebx, int *ecx, int *edx) #endif /* ETHR_X86_RUNTIME_CONF__ */ +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME +static void init_get_monotonic_time(void); +#endif + /* * -------------------------------------------------------------------------- * Exported functions @@ -254,6 +260,10 @@ ethr_init(ethr_init_data *id) goto error; #endif +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + init_get_monotonic_time(); +#endif + res = ethr_init_common__(id); if (res != 0) goto error; @@ -567,6 +577,144 @@ int ethr_sigwait(const sigset_t *set, int *sig) #endif /* #if ETHR_HAVE_ETHR_SIG_FUNCS */ +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + +static int broken_get_monotonic_time; + +#if defined(ETHR_HAVE_CLOCK_GETTIME_MONOTONIC) +# ifndef ETHR_MONOTONIC_CLOCK_ID +# error ETHR_MONOTONIC_CLOCK_ID should have been defined +# endif + +ethr_sint64_t +ethr_get_monotonic_time(void) +{ + ethr_sint64_t time; + struct timespec ts; + + if (broken_get_monotonic_time) + return (ethr_sint64_t) 0; + + if (0 != clock_gettime(ETHR_MONOTONIC_CLOCK_ID, &ts)) + ETHR_FATAL_ERROR__(errno); + + time = (ethr_sint64_t) ts.tv_sec; + time *= (ethr_sint64_t) 1000*1000*1000; + time += (ethr_sint64_t) ts.tv_nsec; + return time; +} + +#elif defined(ETHR_HAVE_MACH_CLOCK_GET_TIME) +# ifndef ETHR_MONOTONIC_CLOCK_ID +# error ETHR_MONOTONIC_CLOCK_ID should have been defined +# endif + +ethr_sint64_t +ethr_get_monotonic_time(void) +{ + ethr_sint64_t time; + kern_return_t res; + clock_serv_t clk_srv; + mach_timespec_t time_spec; + + if (broken_get_monotonic_time) + return (ethr_sint64_t) 0; + + errno = EFAULT; + host_get_clock_service(mach_host_self(), + ETHR_MONOTONIC_CLOCK_ID, + &clk_srv); + res = clock_get_time(clk_srv, &time_spec); + if (res != KERN_SUCCESS) + ETHR_FATAL_ERROR__(errno); + mach_port_deallocate(mach_task_self(), clk_srv); + + time = (ethr_sint64_t) time_spec.tv_sec; + time *= (ethr_sint64_t) 1000*1000*1000; + time += (ethr_sint64_t) time_spec.tv_nsec; + return time; +} + +#elif defined(ETHR_HAVE_GETHRTIME) + +ethr_sint64_t +ethr_get_monotonic_time(void) +{ + if (broken_get_monotonic_time) + return (ethr_sint64_t) 0; + return (ethr_sint64_t) gethrtime(); +} + +#else +#error missing monotonic clock +#endif + +int +ethr_get_monotonic_time_is_broken(void) +{ + return broken_get_monotonic_time; +} + +#include <string.h> +#include <ctype.h> +#include <sys/utsname.h> + +static void +init_get_monotonic_time(void) +{ + struct utsname uts; + int vsn[3]; + int i; + char *c; + + broken_get_monotonic_time = 0; + + (void) uname(&uts); + + for (c = uts.sysname; *c; c++) { + if (isupper((int) *c)) + *c = tolower((int) *c); + } + + c = uts.release; + for (i = 0; i < sizeof(vsn)/sizeof(int); i++) { + if (!isdigit((int) *c)) + vsn[i] = 0; + else { + char *c2 = c; + do { + c2++; + } while (isdigit((int) *c2)); + *c2 = '\0'; + vsn[i] = atoi(c); + c = c2; + c++; + } + } + + if (strcmp("linux", uts.sysname) == 0) { + if (vsn[0] < 2 + || (vsn[0] == 2 && vsn[1] < 6) + || (vsn[0] == 2 && vsn[1] == 6 && vsn[2] < 33)) { + broken_get_monotonic_time = 1; + } + } + else if (strcmp("sunos", uts.sysname) == 0) { + if ((vsn[0] < 5 + || (vsn[0] == 5 && vsn[1] < 8)) +#if defined(HAVE_SYSCONF) && defined(_SC_NPROCESSORS_CONF) + && sysconf(_SC_NPROCESSORS_CONF) > 1 +#endif + ) { + broken_get_monotonic_time = 1; + } + } + +} + + +#endif /* ETHR_HAVE_ETHR_GET_MONOTONIC_TIME */ + ETHR_IMPL_NORETURN__ ethr_abort__(void) { diff --git a/erts/lib_src/win/ethr_event.c b/erts/lib_src/win/ethr_event.c index bc2f635c26..a0d506356d 100644 --- a/erts/lib_src/win/ethr_event.c +++ b/erts/lib_src/win/ethr_event.c @@ -25,9 +25,16 @@ #define ETHR_EVENT_IMPL__ #include "ethread.h" +#include "ethr_internal.h" /* --- Windows implementation of thread events ------------------------------ */ +void +ethr_init_event__(void) +{ + +} + int ethr_event_init(ethr_event *e) { @@ -58,11 +65,29 @@ ethr_event_reset(ethr_event *e) } static ETHR_INLINE int -wait(ethr_event *e, int spincount) +wait(ethr_event *e, int spincount, ethr_sint64_t timeout) { - DWORD code; + DWORD code, tmo; int sc, res, until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; + if (timeout < 0) + tmo = INFINITE; + else if (timeout == 0) { + ethr_sint32_t state = ethr_atomic32_read(&e->state); + if (state == ETHR_EVENT_ON__) { + ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore); + return 0; + } + return ETIMEDOUT; + } + else { + /* + * Timeout in nano-seconds, but we can only + * wait for milli-seconds... + */ + tmo = (DWORD) (timeout - 1) / (1000*1000) + 1; + } + if (spincount < 0) ETHR_FATAL_ERROR__(EINVAL); @@ -72,8 +97,10 @@ wait(ethr_event *e, int spincount) ethr_sint32_t state; while (1) { state = ethr_atomic32_read(&e->state); - if (state == ETHR_EVENT_ON__) + if (state == ETHR_EVENT_ON__) { + ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore); return 0; + } if (sc == 0) break; sc--; @@ -95,7 +122,9 @@ wait(ethr_event *e, int spincount) ETHR_ASSERT(state == ETHR_EVENT_OFF__); } - code = WaitForSingleObject(e->handle, INFINITE); + code = WaitForSingleObject(e->handle, tmo); + if (code == WAIT_TIMEOUT) + return ETIMEDOUT; if (code != WAIT_OBJECT_0) ETHR_FATAL_ERROR__(ethr_win_get_errno__()); } @@ -105,11 +134,23 @@ wait(ethr_event *e, int spincount) int ethr_event_wait(ethr_event *e) { - return wait(e, 0); + return wait(e, 0, -1); } int ethr_event_swait(ethr_event *e, int spincount) { - return wait(e, spincount); + return wait(e, spincount, -1); +} + +int +ethr_event_twait(ethr_event *e, ethr_sint64_t timeout) +{ + return wait(e, 0, timeout); +} + +int +ethr_event_stwait(ethr_event *e, int spincount, ethr_sint64_t timeout) +{ + return wait(e, spincount, timeout); } |