aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2015-02-13 03:18:16 +0100
committerRickard Green <[email protected]>2015-03-20 15:28:52 +0100
commitfa7b2c00cbf9212c4a3551980939b92fc6606510 (patch)
treee3978ac46d61aff7b6f66e67fd763de57a4414bd
parent6487aac5977cf470bc6a2cd0964da2850ee38717 (diff)
downloadotp-fa7b2c00cbf9212c4a3551980939b92fc6606510.tar.gz
otp-fa7b2c00cbf9212c4a3551980939b92fc6606510.tar.bz2
otp-fa7b2c00cbf9212c4a3551980939b92fc6606510.zip
Implement ethread events with timeout
-rw-r--r--erts/aclocal.m444
-rw-r--r--erts/emulator/beam/erl_process.c6
-rw-r--r--erts/emulator/beam/erl_threads.h23
-rw-r--r--erts/include/internal/ethr_internal.h27
-rw-r--r--erts/include/internal/ethread_header_config.h.in4
-rw-r--r--erts/include/internal/pthread/ethr_event.h59
-rw-r--r--erts/include/internal/win/ethr_event.h2
-rw-r--r--erts/lib_src/common/ethr_aux.c2
-rw-r--r--erts/lib_src/pthread/ethr_event.c379
-rw-r--r--erts/lib_src/pthread/ethread.c148
-rw-r--r--erts/lib_src/win/ethr_event.c53
11 files changed, 683 insertions, 64 deletions
diff --git a/erts/aclocal.m4 b/erts/aclocal.m4
index 989f697201..1e449bf874 100644
--- a/erts/aclocal.m4
+++ b/erts/aclocal.m4
@@ -1460,6 +1460,50 @@ case "$THR_LIB_NAME" in
AC_DEFINE(ETHR_HAVE_PTHREAD_ATTR_SETGUARDSIZE, 1, \
[Define if you have the pthread_attr_setguardsize function.]))
+ if test "x$erl_monotonic_clock_id" != "x"; then
+ AC_MSG_CHECKING(whether pthread_cond_timedwait() can use the monotonic clock $erl_monotonic_clock_id for timeout)
+ pthread_cond_timedwait_monotonic=no
+ AC_TRY_LINK([
+ #if defined(ETHR_NEED_NPTL_PTHREAD_H)
+ # include <nptl/pthread.h>
+ #elif defined(ETHR_HAVE_MIT_PTHREAD_H)
+ # include <pthread/mit/pthread.h>
+ #elif defined(ETHR_HAVE_PTHREAD_H)
+ # include <pthread.h>
+ #endif
+ #ifdef ETHR_TIME_WITH_SYS_TIME
+ # include <time.h>
+ # include <sys/time.h>
+ #else
+ # ifdef ETHR_HAVE_SYS_TIME_H
+ # include <sys/time.h>
+ # else
+ # include <time.h>
+ # endif
+ #endif
+ #if defined(ETHR_HAVE_MACH_CLOCK_GET_TIME)
+ # include <mach/clock.h>
+ # include <mach/mach.h>
+ #endif
+ ],
+ [
+ int res;
+ pthread_condattr_t attr;
+ pthread_cond_t cond;
+ struct timespec cond_timeout;
+ pthread_mutex_t mutex;
+ res = pthread_condattr_init(&attr);
+ res = pthread_condattr_setclock(&attr, ETHR_MONOTONIC_CLOCK_ID);
+ res = pthread_cond_init(&cond, &attr);
+ res = pthread_cond_timedwait(&cond, &mutex, &cond_timeout);
+ ],
+ [pthread_cond_timedwait_monotonic=yes])
+ AC_MSG_RESULT([$pthread_cond_timedwait_monotonic])
+ if test $pthread_cond_timedwait_monotonic = yes; then
+ AC_DEFINE(ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC, [1], [Define if pthread_cond_timedwait() can be used with a monotonic clock])
+ fi
+ fi
+
linux_futex=no
AC_MSG_CHECKING([for Linux futexes])
AC_TRY_LINK([
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 81bc3d2429..9dcaf2fdca 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -2806,7 +2806,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
ASSERT(flgs & ERTS_SSI_FLG_TSE_SLEEPING);
ASSERT(flgs & ERTS_SSI_FLG_WAITING);
do {
- res = erts_tse_wait(ssi->event);
+ res = erts_tse_twait(ssi->event, -1);
} while (res == EINTR);
}
}
@@ -6594,7 +6594,7 @@ suspend_scheduler(ErtsSchedulerData *esdp)
int res;
do {
- res = erts_tse_wait(ssi->event);
+ res = erts_tse_twait(ssi->event, -1);
} while (res == EINTR);
}
}
@@ -6800,7 +6800,7 @@ suspend_scheduler(ErtsSchedulerData *esdp)
int res;
do {
- res = erts_tse_wait(ssi->event);
+ res = erts_tse_twait(ssi->event, -1);
} while (res == EINTR);
}
}
diff --git a/erts/emulator/beam/erl_threads.h b/erts/emulator/beam/erl_threads.h
index 7214f3ea33..b9f34d4c12 100644
--- a/erts/emulator/beam/erl_threads.h
+++ b/erts/emulator/beam/erl_threads.h
@@ -651,6 +651,8 @@ ERTS_GLB_INLINE void erts_tse_set(erts_tse_t *ep);
ERTS_GLB_INLINE void erts_tse_reset(erts_tse_t *ep);
ERTS_GLB_INLINE int erts_tse_wait(erts_tse_t *ep);
ERTS_GLB_INLINE int erts_tse_swait(erts_tse_t *ep, int spincount);
+ERTS_GLB_INLINE int erts_tse_twait(erts_tse_t *ep, Sint64 tmo);
+ERTS_GLB_INLINE int erts_tse_stwait(erts_tse_t *ep, int spincount, Sint64 tmo);
ERTS_GLB_INLINE int erts_tse_is_tmp(erts_tse_t *ep);
ERTS_GLB_INLINE void erts_thr_set_main_status(int, int);
ERTS_GLB_INLINE int erts_thr_get_main_status(void);
@@ -3473,6 +3475,27 @@ ERTS_GLB_INLINE int erts_tse_swait(erts_tse_t *ep, int spincount)
#endif
}
+ERTS_GLB_INLINE int erts_tse_twait(erts_tse_t *ep, Sint64 tmo)
+{
+#ifdef USE_THREADS
+ return ethr_event_twait(&((ethr_ts_event *) ep)->event,
+ (ethr_sint64_t) tmo);
+#else
+ return ENOTSUP;
+#endif
+}
+
+ERTS_GLB_INLINE int erts_tse_stwait(erts_tse_t *ep, int spincount, Sint64 tmo)
+{
+#ifdef USE_THREADS
+ return ethr_event_stwait(&((ethr_ts_event *) ep)->event,
+ spincount,
+ (ethr_sint64_t) tmo);
+#else
+ return ENOTSUP;
+#endif
+}
+
ERTS_GLB_INLINE int erts_tse_is_tmp(erts_tse_t *ep)
{
#ifdef USE_THREADS
diff --git a/erts/include/internal/ethr_internal.h b/erts/include/internal/ethr_internal.h
index c9b1db5b46..65195145af 100644
--- a/erts/include/internal/ethr_internal.h
+++ b/erts/include/internal/ethr_internal.h
@@ -57,6 +57,33 @@ ETHR_PROTO_NORETURN__ ethr_abort__(void);
int ethr_win_get_errno__(void);
#endif
+#ifdef ETHR_INCLUDE_MONOTONIC_CLOCK__
+#undef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+#if defined(ETHR_HAVE_CLOCK_GETTIME_MONOTONIC) \
+ || defined(ETHR_HAVE_MACH_CLOCK_GET_TIME) \
+ || defined(ETHR_HAVE_GETHRTIME)
+#ifdef ETHR_TIME_WITH_SYS_TIME
+# include <time.h>
+# include <sys/time.h>
+#else
+# ifdef ETHR_HAVE_SYS_TIME_H
+# include <sys/time.h>
+# else
+# include <time.h>
+# endif
+#endif
+#ifdef ETHR_HAVE_MACH_CLOCK_GET_TIME
+#include <mach/clock.h>
+#include <mach/mach.h>
+#endif
+#define ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ethr_sint64_t ethr_get_monotonic_time(void);
+int ethr_get_monotonic_time_is_broken(void);
+#endif
+#endif /* ETHR_INCLUDE_MONOTONIC_CLOCK__ */
+
+void ethr_init_event__(void);
+
/* implemented in lib_src/common/ethread_aux.c */
int ethr_init_common__(ethr_init_data *id);
int ethr_late_init_common__(ethr_late_init_data *lid);
diff --git a/erts/include/internal/ethread_header_config.h.in b/erts/include/internal/ethread_header_config.h.in
index ffd9d350e2..618fcba380 100644
--- a/erts/include/internal/ethread_header_config.h.in
+++ b/erts/include/internal/ethread_header_config.h.in
@@ -250,5 +250,5 @@
/* Define to the monotonic clock id to use */
#undef ETHR_MONOTONIC_CLOCK_ID
-
-
+/* Define if pthread_cond_timedwait() can be used with a monotonic clock */
+#undef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
diff --git a/erts/include/internal/pthread/ethr_event.h b/erts/include/internal/pthread/ethr_event.h
index d0a77990cc..f67bac858b 100644
--- a/erts/include/internal/pthread/ethr_event.h
+++ b/erts/include/internal/pthread/ethr_event.h
@@ -46,12 +46,12 @@ typedef struct {
ethr_atomic32_t futex;
} ethr_event;
-#define ETHR_FUTEX__(FTX, OP, VAL) \
+#define ETHR_FUTEX__(FTX, OP, VAL, TIMEOUT) \
(-1 == syscall(__NR_futex, \
(void *) ethr_atomic32_addr((FTX)), \
(OP), \
(int) (VAL), \
- NULL, \
+ (TIMEOUT), \
NULL, \
0) \
? errno : 0)
@@ -64,7 +64,7 @@ ETHR_INLINE_FUNC_NAME_(ethr_event_set)(ethr_event *e)
ethr_sint32_t val;
val = ethr_atomic32_xchg_mb(&e->futex, ETHR_EVENT_ON__);
if (val == ETHR_EVENT_OFF_WAITER__) {
- int res = ETHR_FUTEX__(&e->futex, ETHR_FUTEX_WAKE__, 1);
+ int res = ETHR_FUTEX__(&e->futex, ETHR_FUTEX_WAKE__, 1, NULL);
if (res != 0)
ETHR_FATAL_ERROR__(res);
}
@@ -80,35 +80,58 @@ ETHR_INLINE_FUNC_NAME_(ethr_event_reset)(ethr_event *e)
#endif
#elif defined(ETHR_PTHREADS)
-/* --- Posix mutex/cond implementation of events ---------------------------- */
+/* --- Posix mutex/cond pipe/select implementation of events ---------------- */
+
typedef struct {
ethr_atomic32_t state;
pthread_mutex_t mtx;
pthread_cond_t cnd;
+ int fd[2];
} ethr_event;
-#define ETHR_EVENT_OFF_WAITER__ -1L
-#define ETHR_EVENT_OFF__ 1L
-#define ETHR_EVENT_ON__ 0L
+#define ETHR_EVENT_OFF_WAITER_SELECT__ ((ethr_sint32_t) -2)
+#define ETHR_EVENT_OFF_WAITER__ ((ethr_sint32_t) -1)
+#define ETHR_EVENT_OFF__ ((ethr_sint32_t) 1)
+#define ETHR_EVENT_ON__ ((ethr_sint32_t) 0)
+
+#define ETHR_EVENT_IS_WAITING__(VAL) ((VAL) < 0)
#if defined(ETHR_TRY_INLINE_FUNCS) || defined(ETHR_EVENT_IMPL__)
+#ifndef ETHR_HAVE_PTHREAD_TIMED_COND_MONOTONIC
+#include <unistd.h>
+#include <errno.h>
+#endif
+
static void ETHR_INLINE
ETHR_INLINE_FUNC_NAME_(ethr_event_set)(ethr_event *e)
{
ethr_sint32_t val;
val = ethr_atomic32_xchg_mb(&e->state, ETHR_EVENT_ON__);
- if (val == ETHR_EVENT_OFF_WAITER__) {
- int res = pthread_mutex_lock(&e->mtx);
- if (res != 0)
- ETHR_FATAL_ERROR__(res);
- res = pthread_cond_signal(&e->cnd);
- if (res != 0)
- ETHR_FATAL_ERROR__(res);
- res = pthread_mutex_unlock(&e->mtx);
- if (res != 0)
- ETHR_FATAL_ERROR__(res);
+ if (ETHR_EVENT_IS_WAITING__(val)) {
+ int res;
+ if (val == ETHR_EVENT_OFF_WAITER_SELECT__) {
+ ssize_t wres;
+ int fd = e->fd[1];
+ ETHR_ASSERT(fd >= 0);
+ do {
+ wres = write(fd, "!", 1);
+ } while (wres < 0 && errno == EINTR);
+ if (wres < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
+ ETHR_FATAL_ERROR__(errno);
+ }
+ else {
+ res = pthread_mutex_lock(&e->mtx);
+ if (res != 0)
+ ETHR_FATAL_ERROR__(res);
+ res = pthread_cond_signal(&e->cnd);
+ if (res != 0)
+ ETHR_FATAL_ERROR__(res);
+ res = pthread_mutex_unlock(&e->mtx);
+ if (res != 0)
+ ETHR_FATAL_ERROR__(res);
+ }
}
}
@@ -127,6 +150,8 @@ int ethr_event_init(ethr_event *e);
int ethr_event_destroy(ethr_event *e);
int ethr_event_wait(ethr_event *e);
int ethr_event_swait(ethr_event *e, int spincount);
+int ethr_event_twait(ethr_event *e, ethr_sint64_t timeout);
+int ethr_event_stwait(ethr_event *e, int spincount, ethr_sint64_t timeout);
#if !defined(ETHR_TRY_INLINE_FUNCS) || defined(ETHR_EVENT_IMPL__)
void ethr_event_set(ethr_event *e);
void ethr_event_reset(ethr_event *e);
diff --git a/erts/include/internal/win/ethr_event.h b/erts/include/internal/win/ethr_event.h
index 6363174a74..95e681983f 100644
--- a/erts/include/internal/win/ethr_event.h
+++ b/erts/include/internal/win/ethr_event.h
@@ -58,6 +58,8 @@ int ethr_event_init(ethr_event *e);
int ethr_event_destroy(ethr_event *e);
int ethr_event_wait(ethr_event *e);
int ethr_event_swait(ethr_event *e, int spincount);
+int ethr_event_twait(ethr_event *e, ethr_sint64_t timeout);
+int ethr_event_stwait(ethr_event *e, int spincount, ethr_sint64_t timeout);
#if !defined(ETHR_TRY_INLINE_FUNCS) || defined(ETHR_EVENT_IMPL__)
void ethr_event_set(ethr_event *e);
void ethr_event_reset(ethr_event *e);
diff --git a/erts/lib_src/common/ethr_aux.c b/erts/lib_src/common/ethr_aux.c
index b77f2178f2..1ba51882c3 100644
--- a/erts/lib_src/common/ethr_aux.c
+++ b/erts/lib_src/common/ethr_aux.c
@@ -148,6 +148,8 @@ ethr_init_common__(ethr_init_data *id)
{
int res;
+ ethr_init_event__();
+
#if defined(ETHR_X86_RUNTIME_CONF__)
x86_init();
#endif
diff --git a/erts/lib_src/pthread/ethr_event.c b/erts/lib_src/pthread/ethr_event.c
index 9434d60d0a..b35c599365 100644
--- a/erts/lib_src/pthread/ethr_event.c
+++ b/erts/lib_src/pthread/ethr_event.c
@@ -29,6 +29,9 @@
#endif
#include "ethread.h"
+#undef ETHR_INCLUDE_MONOTONIC_CLOCK__
+#define ETHR_INCLUDE_MONOTONIC_CLOCK__
+#include "ethr_internal.h"
#if defined(ETHR_LINUX_FUTEX_IMPL__)
/* --- Linux futex implementation of ethread events ------------------------- */
@@ -38,6 +41,12 @@
#define ETHR_YIELD_AFTER_BUSY_LOOPS 50
+void
+ethr_init_event__(void)
+{
+
+}
+
int
ethr_event_init(ethr_event *e)
{
@@ -52,21 +61,43 @@ ethr_event_destroy(ethr_event *e)
}
static ETHR_INLINE int
-wait__(ethr_event *e, int spincount)
+wait__(ethr_event *e, int spincount, ethr_sint64_t timeout)
{
unsigned sc = spincount;
int res;
ethr_sint32_t val;
int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS;
+ ethr_sint64_t time = 0; /* SHUT UP annoying faulty warning... */
+ struct timespec ts, *tsp;
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ ethr_sint64_t start = 0; /* SHUT UP annoying faulty warning... */
+#endif
if (spincount < 0)
ETHR_FATAL_ERROR__(EINVAL);
+ if (timeout < 0) {
+ tsp = NULL;
+ }
+ else {
+ tsp = &ts;
+ time = timeout;
+ if (spincount == 0) {
+ val = ethr_atomic32_read(&e->futex);
+ if (val == ETHR_EVENT_ON__)
+ goto return_event_on;
+ goto set_timeout;
+ }
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ start = ethr_get_monotonic_time();
+#endif
+ }
+
while (1) {
while (1) {
val = ethr_atomic32_read(&e->futex);
if (val == ETHR_EVENT_ON__)
- return 0;
+ goto return_event_on;
if (sc == 0)
break;
sc--;
@@ -79,44 +110,147 @@ wait__(ethr_event *e, int spincount)
}
}
+ if (timeout >= 0) {
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ time = timeout - (ethr_get_monotonic_time() - start);
+#endif
+ set_timeout:
+ if (time <= 0) {
+ val = ethr_atomic32_read(&e->futex);
+ if (val == ETHR_EVENT_ON__)
+ goto return_event_on;
+ return ETIMEDOUT;
+ }
+ ts.tv_sec = time / (1000*1000*1000);
+ ts.tv_nsec = time % (1000*1000*1000);
+ }
+
if (val != ETHR_EVENT_OFF_WAITER__) {
val = ethr_atomic32_cmpxchg(&e->futex,
ETHR_EVENT_OFF_WAITER__,
ETHR_EVENT_OFF__);
if (val == ETHR_EVENT_ON__)
- return 0;
+ goto return_event_on;
ETHR_ASSERT(val == ETHR_EVENT_OFF__);
}
res = ETHR_FUTEX__(&e->futex,
ETHR_FUTEX_WAIT__,
- ETHR_EVENT_OFF_WAITER__);
- if (res == EINTR)
+ ETHR_EVENT_OFF_WAITER__,
+ tsp);
+ switch (res) {
+ case EINTR:
+ case ETIMEDOUT:
+ return res;
+ case 0:
+ case EWOULDBLOCK:
break;
- if (res != 0 && res != EWOULDBLOCK)
+ default:
ETHR_FATAL_ERROR__(res);
+ }
}
- return res;
+return_event_on:
+
+ ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
+
+ return 0;
+
}
#elif defined(ETHR_PTHREADS)
-/* --- Posix mutex/cond implementation of events ---------------------------- */
+/* --- Posix mutex/cond pipe/select implementation of events ---------------- */
+
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+#include <sys/select.h>
+#include <errno.h>
+#include <string.h>
+
+static void
+setup_nonblocking_pipe(ethr_event *e)
+{
+ int flgs;
+ int res;
+
+ res = pipe(e->fd);
+ if (res != 0)
+ ETHR_FATAL_ERROR__(errno);
+
+ ETHR_ASSERT(e->fd[0] >= 0 && e->fd[1] >= 0);
+
+ flgs = fcntl(e->fd[0], F_GETFL, 0);
+ fcntl(e->fd[0], F_SETFL, flgs | O_NONBLOCK);
+ flgs = fcntl(e->fd[1], F_GETFL, 0);
+ fcntl(e->fd[1], F_SETFL, flgs | O_NONBLOCK);
+
+ ETHR_MEMBAR(ETHR_StoreStore);
+}
+
+#define ETHR_EVENT_INVALID_FD__ -1
+#define ETHR_EVENT_COND_TIMEDWAIT__ -2
+
+#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+static pthread_condattr_t monotonic_clock_cond_attr;
+#endif
+static pthread_condattr_t *monotonic_clock_cond_attr_p;
+
+#ifndef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+# undef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+#endif
+#ifndef ETHR_MONOTONIC_CLOCK_ID
+# undef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+#endif
+
+void
+ethr_init_event__(void)
+{
+ monotonic_clock_cond_attr_p = NULL;
+#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+ if (!ethr_get_monotonic_time_is_broken()
+ && pthread_condattr_init(&monotonic_clock_cond_attr) == 0) {
+ if (pthread_condattr_setclock(&monotonic_clock_cond_attr,
+ ETHR_MONOTONIC_CLOCK_ID) == 0)
+ monotonic_clock_cond_attr_p = &monotonic_clock_cond_attr;
+ else
+ pthread_condattr_destroy(&monotonic_clock_cond_attr);
+ }
+#endif
+}
int
ethr_event_init(ethr_event *e)
{
int res;
+
ethr_atomic32_init(&e->state, ETHR_EVENT_OFF__);
+
res = pthread_mutex_init(&e->mtx, NULL);
if (res != 0)
return res;
- res = pthread_cond_init(&e->cnd, NULL);
+
+ res = pthread_cond_init(&e->cnd, monotonic_clock_cond_attr_p);
if (res != 0) {
pthread_mutex_destroy(&e->mtx);
return res;
}
+
+#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+ /*
+ * If ethr_get_monotonic_time() is broken we
+ * fall back on the pipe/select solution...
+ */
+ if (monotonic_clock_cond_attr_p) {
+ e->fd[0] = e->fd[1] = ETHR_EVENT_COND_TIMEDWAIT__;
+ return 0;
+ }
+#endif
+
+ e->fd[0] = e->fd[1] = ETHR_EVENT_INVALID_FD__;
+
return 0;
}
@@ -124,6 +258,10 @@ int
ethr_event_destroy(ethr_event *e)
{
int res;
+ if (e->fd[0] >= 0) {
+ close(e->fd[0]);
+ close(e->fd[1]);
+ }
res = pthread_mutex_destroy(&e->mtx);
if (res != 0)
return res;
@@ -134,12 +272,58 @@ ethr_event_destroy(ethr_event *e)
}
static ETHR_INLINE int
-wait__(ethr_event *e, int spincount)
+wait__(ethr_event *e, int spincount, ethr_sint64_t timeout)
{
int sc = spincount;
ethr_sint32_t val;
int res, ulres;
int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS;
+ ethr_sint64_t time = 0; /* SHUT UP annoying faulty warning... */
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ ethr_sint64_t start = 0; /* SHUT UP annoying faulty warning... */
+#endif
+#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+ struct timespec cond_timeout;
+#endif
+
+ val = ethr_atomic32_read(&e->state);
+ if (val == ETHR_EVENT_ON__)
+ goto return_event_on;
+
+ if (timeout < 0) {
+ if (spincount == 0)
+ goto set_event_off_waiter;
+ }
+ if (timeout == 0)
+ return ETIMEDOUT;
+ else {
+ time = timeout;
+ switch (e->fd[0]) {
+ case ETHR_EVENT_INVALID_FD__:
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ start = ethr_get_monotonic_time();
+#endif
+ setup_nonblocking_pipe(e);
+ break;
+#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+ case ETHR_EVENT_COND_TIMEDWAIT__:
+ time += ethr_get_monotonic_time();
+ cond_timeout.tv_sec = time / (1000*1000*1000);
+ cond_timeout.tv_nsec = time % (1000*1000*1000);
+ if (spincount == 0)
+ goto set_event_off_waiter;
+ break;
+#endif
+ default:
+ /* Already initialized pipe... */
+ if (spincount == 0)
+ goto set_select_timeout;
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ start = ethr_get_monotonic_time();
+#endif
+ break;
+ }
+ }
if (spincount < 0)
ETHR_FATAL_ERROR__(EINVAL);
@@ -147,7 +331,8 @@ wait__(ethr_event *e, int spincount)
while (1) {
val = ethr_atomic32_read(&e->state);
if (val == ETHR_EVENT_ON__)
- return 0;
+ goto return_event_on;
+
if (sc == 0)
break;
sc--;
@@ -160,40 +345,150 @@ wait__(ethr_event *e, int spincount)
}
}
- if (val != ETHR_EVENT_OFF_WAITER__) {
- val = ethr_atomic32_cmpxchg(&e->state,
- ETHR_EVENT_OFF_WAITER__,
- ETHR_EVENT_OFF__);
- if (val == ETHR_EVENT_ON__)
- return 0;
- ETHR_ASSERT(val == ETHR_EVENT_OFF__);
+ if (timeout < 0
+#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+ || e->fd[0] == ETHR_EVENT_COND_TIMEDWAIT__
+#endif
+ ) {
+
+ set_event_off_waiter:
+
+ if (val != ETHR_EVENT_OFF_WAITER__) {
+ ethr_sint32_t act;
+ act = ethr_atomic32_cmpxchg(&e->state,
+ ETHR_EVENT_OFF_WAITER__,
+ val);
+ if (act == ETHR_EVENT_ON__)
+ goto return_event_on;
+ ETHR_ASSERT(act == val);
+ }
+
+ res = pthread_mutex_lock(&e->mtx);
+ if (res != 0)
+ ETHR_FATAL_ERROR__(res);
+
+ while (1) {
+
+ val = ethr_atomic32_read(&e->state);
+ if (val == ETHR_EVENT_ON__) {
+ ETHR_ASSERT(res == 0);
+ ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
+ break;
+ }
+
+#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+ if (timeout > 0) {
+ res = pthread_cond_timedwait(&e->cnd, &e->mtx, &cond_timeout);
+ if (res == EINTR || res == ETIMEDOUT)
+ break;
+ }
+ else
+#endif
+ {
+ res = pthread_cond_wait(&e->cnd, &e->mtx);
+ if (res == EINTR)
+ break;
+ }
+ if (res != 0)
+ ETHR_FATAL_ERROR__(res);
+ }
+
+ ulres = pthread_mutex_unlock(&e->mtx);
+ if (ulres != 0)
+ ETHR_FATAL_ERROR__(ulres);
+
}
+ else {
+ int fd;
+ int sres;
+ ssize_t rres;
+ fd_set rset;
+ fd_set eset;
+ struct timeval select_timeout;
+
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ time -= ethr_get_monotonic_time() - start;
+ if (time <= 0)
+ return ETIMEDOUT;
+#endif
- ETHR_ASSERT(val == ETHR_EVENT_OFF_WAITER__
- || val == ETHR_EVENT_OFF__);
+ set_select_timeout:
- res = pthread_mutex_lock(&e->mtx);
- if (res != 0)
- ETHR_FATAL_ERROR__(res);
+ ETHR_ASSERT(time > 0);
- while (1) {
+ /*
+ * timeout in nano-second, but we can only wait
+ * for micro-seconds...
+ */
+ time = ((time - 1) / 1000) + 1;
+
+ select_timeout.tv_sec = time / (1000*1000);
+ select_timeout.tv_usec = time % (1000*1000);
+
+ ETHR_ASSERT(val != ETHR_EVENT_ON__);
+
+ fd = e->fd[0];
+
+ /* Cleanup pipe... */
+ do {
+ char buf[64];
+ rres = read(fd, buf, sizeof(buf));
+ } while (rres > 0 || (rres < 0 && errno == EINTR));
+ if (rres < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
+ ETHR_FATAL_ERROR__(errno);
+
+ /*
+ * Need to verify that state is still off
+ * after cleaning the pipe...
+ */
+ if (val == ETHR_EVENT_OFF_WAITER_SELECT__) {
+ val = ethr_atomic32_read(&e->state);
+ if (val == ETHR_EVENT_ON__)
+ goto return_event_on;
+ }
+ else {
+ ethr_sint32_t act;
+ act = ethr_atomic32_cmpxchg(&e->state,
+ ETHR_EVENT_OFF_WAITER_SELECT__,
+ val);
+ if (act == ETHR_EVENT_ON__)
+ goto return_event_on;
+ ETHR_ASSERT(act == val);
+ }
+
+ FD_ZERO(&rset);
+ FD_SET(fd, &rset);
+ FD_ZERO(&eset);
+ FD_SET(fd, &eset);
+
+ sres = select(fd + 1, &rset, NULL, &eset, &select_timeout);
+ if (sres == 0)
+ res = ETIMEDOUT;
+ else {
+ res = EINTR;
+ if (sres < 0 && errno != EINTR)
+ ETHR_FATAL_ERROR__(errno);
+ /* else:
+ * Event is *probably* set, but it can be a
+ * lingering writer. That is, it is important
+ * that we verify that it actually is set. If
+ * it isn't, return EINTR (spurious wakeup).
+ */
+ }
val = ethr_atomic32_read(&e->state);
if (val == ETHR_EVENT_ON__)
- break;
+ goto return_event_on;
- res = pthread_cond_wait(&e->cnd, &e->mtx);
- if (res == EINTR)
- break;
- if (res != 0)
- ETHR_FATAL_ERROR__(res);
}
- ulres = pthread_mutex_unlock(&e->mtx);
- if (ulres != 0)
- ETHR_FATAL_ERROR__(ulres);
+ return res;
+
+return_event_on:
+
+ ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
- return res; /* 0 || EINTR */
+ return 0;
}
#else
@@ -215,11 +510,23 @@ ethr_event_set(ethr_event *e)
int
ethr_event_wait(ethr_event *e)
{
- return wait__(e, 0);
+ return wait__(e, 0, -1);
}
int
ethr_event_swait(ethr_event *e, int spincount)
{
- return wait__(e, spincount);
+ return wait__(e, spincount, -1);
+}
+
+int
+ethr_event_twait(ethr_event *e, ethr_sint64_t timeout)
+{
+ return wait__(e, 0, timeout);
+}
+
+int
+ethr_event_stwait(ethr_event *e, int spincount, ethr_sint64_t timeout)
+{
+ return wait__(e, spincount, timeout);
}
diff --git a/erts/lib_src/pthread/ethread.c b/erts/lib_src/pthread/ethread.c
index 79784c5b84..42d2abd3f1 100644
--- a/erts/lib_src/pthread/ethread.c
+++ b/erts/lib_src/pthread/ethread.c
@@ -49,6 +49,8 @@
#define ETHREAD_IMPL__
#include "ethread.h"
+#undef ETHR_INCLUDE_MONOTONIC_CLOCK__
+#define ETHR_INCLUDE_MONOTONIC_CLOCK__
#include "ethr_internal.h"
#ifndef ETHR_HAVE_ETHREAD_DEFINES
@@ -232,6 +234,10 @@ ethr_x86_cpuid__(int *eax, int *ebx, int *ecx, int *edx)
#endif /* ETHR_X86_RUNTIME_CONF__ */
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+static void init_get_monotonic_time(void);
+#endif
+
/*
* --------------------------------------------------------------------------
* Exported functions
@@ -254,6 +260,10 @@ ethr_init(ethr_init_data *id)
goto error;
#endif
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ init_get_monotonic_time();
+#endif
+
res = ethr_init_common__(id);
if (res != 0)
goto error;
@@ -567,6 +577,144 @@ int ethr_sigwait(const sigset_t *set, int *sig)
#endif /* #if ETHR_HAVE_ETHR_SIG_FUNCS */
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+
+static int broken_get_monotonic_time;
+
+#if defined(ETHR_HAVE_CLOCK_GETTIME_MONOTONIC)
+# ifndef ETHR_MONOTONIC_CLOCK_ID
+# error ETHR_MONOTONIC_CLOCK_ID should have been defined
+# endif
+
+ethr_sint64_t
+ethr_get_monotonic_time(void)
+{
+ ethr_sint64_t time;
+ struct timespec ts;
+
+ if (broken_get_monotonic_time)
+ return (ethr_sint64_t) 0;
+
+ if (0 != clock_gettime(ETHR_MONOTONIC_CLOCK_ID, &ts))
+ ETHR_FATAL_ERROR__(errno);
+
+ time = (ethr_sint64_t) ts.tv_sec;
+ time *= (ethr_sint64_t) 1000*1000*1000;
+ time += (ethr_sint64_t) ts.tv_nsec;
+ return time;
+}
+
+#elif defined(ETHR_HAVE_MACH_CLOCK_GET_TIME)
+# ifndef ETHR_MONOTONIC_CLOCK_ID
+# error ETHR_MONOTONIC_CLOCK_ID should have been defined
+# endif
+
+ethr_sint64_t
+ethr_get_monotonic_time(void)
+{
+ ethr_sint64_t time;
+ kern_return_t res;
+ clock_serv_t clk_srv;
+ mach_timespec_t time_spec;
+
+ if (broken_get_monotonic_time)
+ return (ethr_sint64_t) 0;
+
+ errno = EFAULT;
+ host_get_clock_service(mach_host_self(),
+ ETHR_MONOTONIC_CLOCK_ID,
+ &clk_srv);
+ res = clock_get_time(clk_srv, &time_spec);
+ if (res != KERN_SUCCESS)
+ ETHR_FATAL_ERROR__(errno);
+ mach_port_deallocate(mach_task_self(), clk_srv);
+
+ time = (ethr_sint64_t) time_spec.tv_sec;
+ time *= (ethr_sint64_t) 1000*1000*1000;
+ time += (ethr_sint64_t) time_spec.tv_nsec;
+ return time;
+}
+
+#elif defined(ETHR_HAVE_GETHRTIME)
+
+ethr_sint64_t
+ethr_get_monotonic_time(void)
+{
+ if (broken_get_monotonic_time)
+ return (ethr_sint64_t) 0;
+ return (ethr_sint64_t) gethrtime();
+}
+
+#else
+#error missing monotonic clock
+#endif
+
+int
+ethr_get_monotonic_time_is_broken(void)
+{
+ return broken_get_monotonic_time;
+}
+
+#include <string.h>
+#include <ctype.h>
+#include <sys/utsname.h>
+
+static void
+init_get_monotonic_time(void)
+{
+ struct utsname uts;
+ int vsn[3];
+ int i;
+ char *c;
+
+ broken_get_monotonic_time = 0;
+
+ (void) uname(&uts);
+
+ for (c = uts.sysname; *c; c++) {
+ if (isupper((int) *c))
+ *c = tolower((int) *c);
+ }
+
+ c = uts.release;
+ for (i = 0; i < sizeof(vsn)/sizeof(int); i++) {
+ if (!isdigit((int) *c))
+ vsn[i] = 0;
+ else {
+ char *c2 = c;
+ do {
+ c2++;
+ } while (isdigit((int) *c2));
+ *c2 = '\0';
+ vsn[i] = atoi(c);
+ c = c2;
+ c++;
+ }
+ }
+
+ if (strcmp("linux", uts.sysname) == 0) {
+ if (vsn[0] < 2
+ || (vsn[0] == 2 && vsn[1] < 6)
+ || (vsn[0] == 2 && vsn[1] == 6 && vsn[2] < 33)) {
+ broken_get_monotonic_time = 1;
+ }
+ }
+ else if (strcmp("sunos", uts.sysname) == 0) {
+ if ((vsn[0] < 5
+ || (vsn[0] == 5 && vsn[1] < 8))
+#if defined(HAVE_SYSCONF) && defined(_SC_NPROCESSORS_CONF)
+ && sysconf(_SC_NPROCESSORS_CONF) > 1
+#endif
+ ) {
+ broken_get_monotonic_time = 1;
+ }
+ }
+
+}
+
+
+#endif /* ETHR_HAVE_ETHR_GET_MONOTONIC_TIME */
+
ETHR_IMPL_NORETURN__
ethr_abort__(void)
{
diff --git a/erts/lib_src/win/ethr_event.c b/erts/lib_src/win/ethr_event.c
index bc2f635c26..a0d506356d 100644
--- a/erts/lib_src/win/ethr_event.c
+++ b/erts/lib_src/win/ethr_event.c
@@ -25,9 +25,16 @@
#define ETHR_EVENT_IMPL__
#include "ethread.h"
+#include "ethr_internal.h"
/* --- Windows implementation of thread events ------------------------------ */
+void
+ethr_init_event__(void)
+{
+
+}
+
int
ethr_event_init(ethr_event *e)
{
@@ -58,11 +65,29 @@ ethr_event_reset(ethr_event *e)
}
static ETHR_INLINE int
-wait(ethr_event *e, int spincount)
+wait(ethr_event *e, int spincount, ethr_sint64_t timeout)
{
- DWORD code;
+ DWORD code, tmo;
int sc, res, until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS;
+ if (timeout < 0)
+ tmo = INFINITE;
+ else if (timeout == 0) {
+ ethr_sint32_t state = ethr_atomic32_read(&e->state);
+ if (state == ETHR_EVENT_ON__) {
+ ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
+ return 0;
+ }
+ return ETIMEDOUT;
+ }
+ else {
+ /*
+ * Timeout in nano-seconds, but we can only
+ * wait for milli-seconds...
+ */
+ tmo = (DWORD) (timeout - 1) / (1000*1000) + 1;
+ }
+
if (spincount < 0)
ETHR_FATAL_ERROR__(EINVAL);
@@ -72,8 +97,10 @@ wait(ethr_event *e, int spincount)
ethr_sint32_t state;
while (1) {
state = ethr_atomic32_read(&e->state);
- if (state == ETHR_EVENT_ON__)
+ if (state == ETHR_EVENT_ON__) {
+ ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
return 0;
+ }
if (sc == 0)
break;
sc--;
@@ -95,7 +122,9 @@ wait(ethr_event *e, int spincount)
ETHR_ASSERT(state == ETHR_EVENT_OFF__);
}
- code = WaitForSingleObject(e->handle, INFINITE);
+ code = WaitForSingleObject(e->handle, tmo);
+ if (code == WAIT_TIMEOUT)
+ return ETIMEDOUT;
if (code != WAIT_OBJECT_0)
ETHR_FATAL_ERROR__(ethr_win_get_errno__());
}
@@ -105,11 +134,23 @@ wait(ethr_event *e, int spincount)
int
ethr_event_wait(ethr_event *e)
{
- return wait(e, 0);
+ return wait(e, 0, -1);
}
int
ethr_event_swait(ethr_event *e, int spincount)
{
- return wait(e, spincount);
+ return wait(e, spincount, -1);
+}
+
+int
+ethr_event_twait(ethr_event *e, ethr_sint64_t timeout)
+{
+ return wait(e, 0, timeout);
+}
+
+int
+ethr_event_stwait(ethr_event *e, int spincount, ethr_sint64_t timeout)
+{
+ return wait(e, spincount, timeout);
}