From fa7b2c00cbf9212c4a3551980939b92fc6606510 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Fri, 13 Feb 2015 03:18:16 +0100 Subject: Implement ethread events with timeout --- erts/lib_src/pthread/ethr_event.c | 379 ++++++++++++++++++++++++++++++++++---- 1 file changed, 343 insertions(+), 36 deletions(-) (limited to 'erts/lib_src/pthread/ethr_event.c') diff --git a/erts/lib_src/pthread/ethr_event.c b/erts/lib_src/pthread/ethr_event.c index 9434d60d0a..b35c599365 100644 --- a/erts/lib_src/pthread/ethr_event.c +++ b/erts/lib_src/pthread/ethr_event.c @@ -29,6 +29,9 @@ #endif #include "ethread.h" +#undef ETHR_INCLUDE_MONOTONIC_CLOCK__ +#define ETHR_INCLUDE_MONOTONIC_CLOCK__ +#include "ethr_internal.h" #if defined(ETHR_LINUX_FUTEX_IMPL__) /* --- Linux futex implementation of ethread events ------------------------- */ @@ -38,6 +41,12 @@ #define ETHR_YIELD_AFTER_BUSY_LOOPS 50 +void +ethr_init_event__(void) +{ + +} + int ethr_event_init(ethr_event *e) { @@ -52,21 +61,43 @@ ethr_event_destroy(ethr_event *e) } static ETHR_INLINE int -wait__(ethr_event *e, int spincount) +wait__(ethr_event *e, int spincount, ethr_sint64_t timeout) { unsigned sc = spincount; int res; ethr_sint32_t val; int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; + ethr_sint64_t time = 0; /* SHUT UP annoying faulty warning... */ + struct timespec ts, *tsp; +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + ethr_sint64_t start = 0; /* SHUT UP annoying faulty warning... */ +#endif if (spincount < 0) ETHR_FATAL_ERROR__(EINVAL); + if (timeout < 0) { + tsp = NULL; + } + else { + tsp = &ts; + time = timeout; + if (spincount == 0) { + val = ethr_atomic32_read(&e->futex); + if (val == ETHR_EVENT_ON__) + goto return_event_on; + goto set_timeout; + } +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + start = ethr_get_monotonic_time(); +#endif + } + while (1) { while (1) { val = ethr_atomic32_read(&e->futex); if (val == ETHR_EVENT_ON__) - return 0; + goto return_event_on; if (sc == 0) break; sc--; @@ -79,44 +110,147 @@ wait__(ethr_event *e, int spincount) } } + if (timeout >= 0) { +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + time = timeout - (ethr_get_monotonic_time() - start); +#endif + set_timeout: + if (time <= 0) { + val = ethr_atomic32_read(&e->futex); + if (val == ETHR_EVENT_ON__) + goto return_event_on; + return ETIMEDOUT; + } + ts.tv_sec = time / (1000*1000*1000); + ts.tv_nsec = time % (1000*1000*1000); + } + if (val != ETHR_EVENT_OFF_WAITER__) { val = ethr_atomic32_cmpxchg(&e->futex, ETHR_EVENT_OFF_WAITER__, ETHR_EVENT_OFF__); if (val == ETHR_EVENT_ON__) - return 0; + goto return_event_on; ETHR_ASSERT(val == ETHR_EVENT_OFF__); } res = ETHR_FUTEX__(&e->futex, ETHR_FUTEX_WAIT__, - ETHR_EVENT_OFF_WAITER__); - if (res == EINTR) + ETHR_EVENT_OFF_WAITER__, + tsp); + switch (res) { + case EINTR: + case ETIMEDOUT: + return res; + case 0: + case EWOULDBLOCK: break; - if (res != 0 && res != EWOULDBLOCK) + default: ETHR_FATAL_ERROR__(res); + } } - return res; +return_event_on: + + ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore); + + return 0; + } #elif defined(ETHR_PTHREADS) -/* --- Posix mutex/cond implementation of events ---------------------------- */ +/* --- Posix mutex/cond pipe/select implementation of events ---------------- */ + +#include +#include +#include +#include +#include +#include +#include + +static void +setup_nonblocking_pipe(ethr_event *e) +{ + int flgs; + int res; + + res = pipe(e->fd); + if (res != 0) + ETHR_FATAL_ERROR__(errno); + + ETHR_ASSERT(e->fd[0] >= 0 && e->fd[1] >= 0); + + flgs = fcntl(e->fd[0], F_GETFL, 0); + fcntl(e->fd[0], F_SETFL, flgs | O_NONBLOCK); + flgs = fcntl(e->fd[1], F_GETFL, 0); + fcntl(e->fd[1], F_SETFL, flgs | O_NONBLOCK); + + ETHR_MEMBAR(ETHR_StoreStore); +} + +#define ETHR_EVENT_INVALID_FD__ -1 +#define ETHR_EVENT_COND_TIMEDWAIT__ -2 + +#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC +static pthread_condattr_t monotonic_clock_cond_attr; +#endif +static pthread_condattr_t *monotonic_clock_cond_attr_p; + +#ifndef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME +# undef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC +#endif +#ifndef ETHR_MONOTONIC_CLOCK_ID +# undef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC +#endif + +void +ethr_init_event__(void) +{ + monotonic_clock_cond_attr_p = NULL; +#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC + if (!ethr_get_monotonic_time_is_broken() + && pthread_condattr_init(&monotonic_clock_cond_attr) == 0) { + if (pthread_condattr_setclock(&monotonic_clock_cond_attr, + ETHR_MONOTONIC_CLOCK_ID) == 0) + monotonic_clock_cond_attr_p = &monotonic_clock_cond_attr; + else + pthread_condattr_destroy(&monotonic_clock_cond_attr); + } +#endif +} int ethr_event_init(ethr_event *e) { int res; + ethr_atomic32_init(&e->state, ETHR_EVENT_OFF__); + res = pthread_mutex_init(&e->mtx, NULL); if (res != 0) return res; - res = pthread_cond_init(&e->cnd, NULL); + + res = pthread_cond_init(&e->cnd, monotonic_clock_cond_attr_p); if (res != 0) { pthread_mutex_destroy(&e->mtx); return res; } + +#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC + /* + * If ethr_get_monotonic_time() is broken we + * fall back on the pipe/select solution... + */ + if (monotonic_clock_cond_attr_p) { + e->fd[0] = e->fd[1] = ETHR_EVENT_COND_TIMEDWAIT__; + return 0; + } +#endif + + e->fd[0] = e->fd[1] = ETHR_EVENT_INVALID_FD__; + return 0; } @@ -124,6 +258,10 @@ int ethr_event_destroy(ethr_event *e) { int res; + if (e->fd[0] >= 0) { + close(e->fd[0]); + close(e->fd[1]); + } res = pthread_mutex_destroy(&e->mtx); if (res != 0) return res; @@ -134,12 +272,58 @@ ethr_event_destroy(ethr_event *e) } static ETHR_INLINE int -wait__(ethr_event *e, int spincount) +wait__(ethr_event *e, int spincount, ethr_sint64_t timeout) { int sc = spincount; ethr_sint32_t val; int res, ulres; int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; + ethr_sint64_t time = 0; /* SHUT UP annoying faulty warning... */ +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + ethr_sint64_t start = 0; /* SHUT UP annoying faulty warning... */ +#endif +#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC + struct timespec cond_timeout; +#endif + + val = ethr_atomic32_read(&e->state); + if (val == ETHR_EVENT_ON__) + goto return_event_on; + + if (timeout < 0) { + if (spincount == 0) + goto set_event_off_waiter; + } + if (timeout == 0) + return ETIMEDOUT; + else { + time = timeout; + switch (e->fd[0]) { + case ETHR_EVENT_INVALID_FD__: +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + start = ethr_get_monotonic_time(); +#endif + setup_nonblocking_pipe(e); + break; +#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC + case ETHR_EVENT_COND_TIMEDWAIT__: + time += ethr_get_monotonic_time(); + cond_timeout.tv_sec = time / (1000*1000*1000); + cond_timeout.tv_nsec = time % (1000*1000*1000); + if (spincount == 0) + goto set_event_off_waiter; + break; +#endif + default: + /* Already initialized pipe... */ + if (spincount == 0) + goto set_select_timeout; +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + start = ethr_get_monotonic_time(); +#endif + break; + } + } if (spincount < 0) ETHR_FATAL_ERROR__(EINVAL); @@ -147,7 +331,8 @@ wait__(ethr_event *e, int spincount) while (1) { val = ethr_atomic32_read(&e->state); if (val == ETHR_EVENT_ON__) - return 0; + goto return_event_on; + if (sc == 0) break; sc--; @@ -160,40 +345,150 @@ wait__(ethr_event *e, int spincount) } } - if (val != ETHR_EVENT_OFF_WAITER__) { - val = ethr_atomic32_cmpxchg(&e->state, - ETHR_EVENT_OFF_WAITER__, - ETHR_EVENT_OFF__); - if (val == ETHR_EVENT_ON__) - return 0; - ETHR_ASSERT(val == ETHR_EVENT_OFF__); + if (timeout < 0 +#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC + || e->fd[0] == ETHR_EVENT_COND_TIMEDWAIT__ +#endif + ) { + + set_event_off_waiter: + + if (val != ETHR_EVENT_OFF_WAITER__) { + ethr_sint32_t act; + act = ethr_atomic32_cmpxchg(&e->state, + ETHR_EVENT_OFF_WAITER__, + val); + if (act == ETHR_EVENT_ON__) + goto return_event_on; + ETHR_ASSERT(act == val); + } + + res = pthread_mutex_lock(&e->mtx); + if (res != 0) + ETHR_FATAL_ERROR__(res); + + while (1) { + + val = ethr_atomic32_read(&e->state); + if (val == ETHR_EVENT_ON__) { + ETHR_ASSERT(res == 0); + ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore); + break; + } + +#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC + if (timeout > 0) { + res = pthread_cond_timedwait(&e->cnd, &e->mtx, &cond_timeout); + if (res == EINTR || res == ETIMEDOUT) + break; + } + else +#endif + { + res = pthread_cond_wait(&e->cnd, &e->mtx); + if (res == EINTR) + break; + } + if (res != 0) + ETHR_FATAL_ERROR__(res); + } + + ulres = pthread_mutex_unlock(&e->mtx); + if (ulres != 0) + ETHR_FATAL_ERROR__(ulres); + } + else { + int fd; + int sres; + ssize_t rres; + fd_set rset; + fd_set eset; + struct timeval select_timeout; + +#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME + time -= ethr_get_monotonic_time() - start; + if (time <= 0) + return ETIMEDOUT; +#endif - ETHR_ASSERT(val == ETHR_EVENT_OFF_WAITER__ - || val == ETHR_EVENT_OFF__); + set_select_timeout: - res = pthread_mutex_lock(&e->mtx); - if (res != 0) - ETHR_FATAL_ERROR__(res); + ETHR_ASSERT(time > 0); - while (1) { + /* + * timeout in nano-second, but we can only wait + * for micro-seconds... + */ + time = ((time - 1) / 1000) + 1; + + select_timeout.tv_sec = time / (1000*1000); + select_timeout.tv_usec = time % (1000*1000); + + ETHR_ASSERT(val != ETHR_EVENT_ON__); + + fd = e->fd[0]; + + /* Cleanup pipe... */ + do { + char buf[64]; + rres = read(fd, buf, sizeof(buf)); + } while (rres > 0 || (rres < 0 && errno == EINTR)); + if (rres < 0 && errno != EAGAIN && errno != EWOULDBLOCK) + ETHR_FATAL_ERROR__(errno); + + /* + * Need to verify that state is still off + * after cleaning the pipe... + */ + if (val == ETHR_EVENT_OFF_WAITER_SELECT__) { + val = ethr_atomic32_read(&e->state); + if (val == ETHR_EVENT_ON__) + goto return_event_on; + } + else { + ethr_sint32_t act; + act = ethr_atomic32_cmpxchg(&e->state, + ETHR_EVENT_OFF_WAITER_SELECT__, + val); + if (act == ETHR_EVENT_ON__) + goto return_event_on; + ETHR_ASSERT(act == val); + } + + FD_ZERO(&rset); + FD_SET(fd, &rset); + FD_ZERO(&eset); + FD_SET(fd, &eset); + + sres = select(fd + 1, &rset, NULL, &eset, &select_timeout); + if (sres == 0) + res = ETIMEDOUT; + else { + res = EINTR; + if (sres < 0 && errno != EINTR) + ETHR_FATAL_ERROR__(errno); + /* else: + * Event is *probably* set, but it can be a + * lingering writer. That is, it is important + * that we verify that it actually is set. If + * it isn't, return EINTR (spurious wakeup). + */ + } val = ethr_atomic32_read(&e->state); if (val == ETHR_EVENT_ON__) - break; + goto return_event_on; - res = pthread_cond_wait(&e->cnd, &e->mtx); - if (res == EINTR) - break; - if (res != 0) - ETHR_FATAL_ERROR__(res); } - ulres = pthread_mutex_unlock(&e->mtx); - if (ulres != 0) - ETHR_FATAL_ERROR__(ulres); + return res; + +return_event_on: + + ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore); - return res; /* 0 || EINTR */ + return 0; } #else @@ -215,11 +510,23 @@ ethr_event_set(ethr_event *e) int ethr_event_wait(ethr_event *e) { - return wait__(e, 0); + return wait__(e, 0, -1); } int ethr_event_swait(ethr_event *e, int spincount) { - return wait__(e, spincount); + return wait__(e, spincount, -1); +} + +int +ethr_event_twait(ethr_event *e, ethr_sint64_t timeout) +{ + return wait__(e, 0, timeout); +} + +int +ethr_event_stwait(ethr_event *e, int spincount, ethr_sint64_t timeout) +{ + return wait__(e, spincount, timeout); } -- cgit v1.2.3