aboutsummaryrefslogtreecommitdiffstats
path: root/erts/lib_src/pthread
diff options
context:
space:
mode:
Diffstat (limited to 'erts/lib_src/pthread')
-rw-r--r--erts/lib_src/pthread/ethr_event.c379
-rw-r--r--erts/lib_src/pthread/ethread.c148
2 files changed, 491 insertions, 36 deletions
diff --git a/erts/lib_src/pthread/ethr_event.c b/erts/lib_src/pthread/ethr_event.c
index 9434d60d0a..b35c599365 100644
--- a/erts/lib_src/pthread/ethr_event.c
+++ b/erts/lib_src/pthread/ethr_event.c
@@ -29,6 +29,9 @@
#endif
#include "ethread.h"
+#undef ETHR_INCLUDE_MONOTONIC_CLOCK__
+#define ETHR_INCLUDE_MONOTONIC_CLOCK__
+#include "ethr_internal.h"
#if defined(ETHR_LINUX_FUTEX_IMPL__)
/* --- Linux futex implementation of ethread events ------------------------- */
@@ -38,6 +41,12 @@
#define ETHR_YIELD_AFTER_BUSY_LOOPS 50
+void
+ethr_init_event__(void)
+{
+
+}
+
int
ethr_event_init(ethr_event *e)
{
@@ -52,21 +61,43 @@ ethr_event_destroy(ethr_event *e)
}
static ETHR_INLINE int
-wait__(ethr_event *e, int spincount)
+wait__(ethr_event *e, int spincount, ethr_sint64_t timeout)
{
unsigned sc = spincount;
int res;
ethr_sint32_t val;
int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS;
+ ethr_sint64_t time = 0; /* SHUT UP annoying faulty warning... */
+ struct timespec ts, *tsp;
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ ethr_sint64_t start = 0; /* SHUT UP annoying faulty warning... */
+#endif
if (spincount < 0)
ETHR_FATAL_ERROR__(EINVAL);
+ if (timeout < 0) {
+ tsp = NULL;
+ }
+ else {
+ tsp = &ts;
+ time = timeout;
+ if (spincount == 0) {
+ val = ethr_atomic32_read(&e->futex);
+ if (val == ETHR_EVENT_ON__)
+ goto return_event_on;
+ goto set_timeout;
+ }
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ start = ethr_get_monotonic_time();
+#endif
+ }
+
while (1) {
while (1) {
val = ethr_atomic32_read(&e->futex);
if (val == ETHR_EVENT_ON__)
- return 0;
+ goto return_event_on;
if (sc == 0)
break;
sc--;
@@ -79,44 +110,147 @@ wait__(ethr_event *e, int spincount)
}
}
+ if (timeout >= 0) {
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ time = timeout - (ethr_get_monotonic_time() - start);
+#endif
+ set_timeout:
+ if (time <= 0) {
+ val = ethr_atomic32_read(&e->futex);
+ if (val == ETHR_EVENT_ON__)
+ goto return_event_on;
+ return ETIMEDOUT;
+ }
+ ts.tv_sec = time / (1000*1000*1000);
+ ts.tv_nsec = time % (1000*1000*1000);
+ }
+
if (val != ETHR_EVENT_OFF_WAITER__) {
val = ethr_atomic32_cmpxchg(&e->futex,
ETHR_EVENT_OFF_WAITER__,
ETHR_EVENT_OFF__);
if (val == ETHR_EVENT_ON__)
- return 0;
+ goto return_event_on;
ETHR_ASSERT(val == ETHR_EVENT_OFF__);
}
res = ETHR_FUTEX__(&e->futex,
ETHR_FUTEX_WAIT__,
- ETHR_EVENT_OFF_WAITER__);
- if (res == EINTR)
+ ETHR_EVENT_OFF_WAITER__,
+ tsp);
+ switch (res) {
+ case EINTR:
+ case ETIMEDOUT:
+ return res;
+ case 0:
+ case EWOULDBLOCK:
break;
- if (res != 0 && res != EWOULDBLOCK)
+ default:
ETHR_FATAL_ERROR__(res);
+ }
}
- return res;
+return_event_on:
+
+ ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
+
+ return 0;
+
}
#elif defined(ETHR_PTHREADS)
-/* --- Posix mutex/cond implementation of events ---------------------------- */
+/* --- Posix mutex/cond pipe/select implementation of events ---------------- */
+
+#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/uio.h>
+#include <unistd.h>
+#include <sys/select.h>
+#include <errno.h>
+#include <string.h>
+
+static void
+setup_nonblocking_pipe(ethr_event *e)
+{
+ int flgs;
+ int res;
+
+ res = pipe(e->fd);
+ if (res != 0)
+ ETHR_FATAL_ERROR__(errno);
+
+ ETHR_ASSERT(e->fd[0] >= 0 && e->fd[1] >= 0);
+
+ flgs = fcntl(e->fd[0], F_GETFL, 0);
+ fcntl(e->fd[0], F_SETFL, flgs | O_NONBLOCK);
+ flgs = fcntl(e->fd[1], F_GETFL, 0);
+ fcntl(e->fd[1], F_SETFL, flgs | O_NONBLOCK);
+
+ ETHR_MEMBAR(ETHR_StoreStore);
+}
+
+#define ETHR_EVENT_INVALID_FD__ -1
+#define ETHR_EVENT_COND_TIMEDWAIT__ -2
+
+#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+static pthread_condattr_t monotonic_clock_cond_attr;
+#endif
+static pthread_condattr_t *monotonic_clock_cond_attr_p;
+
+#ifndef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+# undef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+#endif
+#ifndef ETHR_MONOTONIC_CLOCK_ID
+# undef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+#endif
+
+void
+ethr_init_event__(void)
+{
+ monotonic_clock_cond_attr_p = NULL;
+#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+ if (!ethr_get_monotonic_time_is_broken()
+ && pthread_condattr_init(&monotonic_clock_cond_attr) == 0) {
+ if (pthread_condattr_setclock(&monotonic_clock_cond_attr,
+ ETHR_MONOTONIC_CLOCK_ID) == 0)
+ monotonic_clock_cond_attr_p = &monotonic_clock_cond_attr;
+ else
+ pthread_condattr_destroy(&monotonic_clock_cond_attr);
+ }
+#endif
+}
int
ethr_event_init(ethr_event *e)
{
int res;
+
ethr_atomic32_init(&e->state, ETHR_EVENT_OFF__);
+
res = pthread_mutex_init(&e->mtx, NULL);
if (res != 0)
return res;
- res = pthread_cond_init(&e->cnd, NULL);
+
+ res = pthread_cond_init(&e->cnd, monotonic_clock_cond_attr_p);
if (res != 0) {
pthread_mutex_destroy(&e->mtx);
return res;
}
+
+#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+ /*
+ * If ethr_get_monotonic_time() is broken we
+ * fall back on the pipe/select solution...
+ */
+ if (monotonic_clock_cond_attr_p) {
+ e->fd[0] = e->fd[1] = ETHR_EVENT_COND_TIMEDWAIT__;
+ return 0;
+ }
+#endif
+
+ e->fd[0] = e->fd[1] = ETHR_EVENT_INVALID_FD__;
+
return 0;
}
@@ -124,6 +258,10 @@ int
ethr_event_destroy(ethr_event *e)
{
int res;
+ if (e->fd[0] >= 0) {
+ close(e->fd[0]);
+ close(e->fd[1]);
+ }
res = pthread_mutex_destroy(&e->mtx);
if (res != 0)
return res;
@@ -134,12 +272,58 @@ ethr_event_destroy(ethr_event *e)
}
static ETHR_INLINE int
-wait__(ethr_event *e, int spincount)
+wait__(ethr_event *e, int spincount, ethr_sint64_t timeout)
{
int sc = spincount;
ethr_sint32_t val;
int res, ulres;
int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS;
+ ethr_sint64_t time = 0; /* SHUT UP annoying faulty warning... */
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ ethr_sint64_t start = 0; /* SHUT UP annoying faulty warning... */
+#endif
+#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+ struct timespec cond_timeout;
+#endif
+
+ val = ethr_atomic32_read(&e->state);
+ if (val == ETHR_EVENT_ON__)
+ goto return_event_on;
+
+ if (timeout < 0) {
+ if (spincount == 0)
+ goto set_event_off_waiter;
+ }
+ if (timeout == 0)
+ return ETIMEDOUT;
+ else {
+ time = timeout;
+ switch (e->fd[0]) {
+ case ETHR_EVENT_INVALID_FD__:
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ start = ethr_get_monotonic_time();
+#endif
+ setup_nonblocking_pipe(e);
+ break;
+#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+ case ETHR_EVENT_COND_TIMEDWAIT__:
+ time += ethr_get_monotonic_time();
+ cond_timeout.tv_sec = time / (1000*1000*1000);
+ cond_timeout.tv_nsec = time % (1000*1000*1000);
+ if (spincount == 0)
+ goto set_event_off_waiter;
+ break;
+#endif
+ default:
+ /* Already initialized pipe... */
+ if (spincount == 0)
+ goto set_select_timeout;
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ start = ethr_get_monotonic_time();
+#endif
+ break;
+ }
+ }
if (spincount < 0)
ETHR_FATAL_ERROR__(EINVAL);
@@ -147,7 +331,8 @@ wait__(ethr_event *e, int spincount)
while (1) {
val = ethr_atomic32_read(&e->state);
if (val == ETHR_EVENT_ON__)
- return 0;
+ goto return_event_on;
+
if (sc == 0)
break;
sc--;
@@ -160,40 +345,150 @@ wait__(ethr_event *e, int spincount)
}
}
- if (val != ETHR_EVENT_OFF_WAITER__) {
- val = ethr_atomic32_cmpxchg(&e->state,
- ETHR_EVENT_OFF_WAITER__,
- ETHR_EVENT_OFF__);
- if (val == ETHR_EVENT_ON__)
- return 0;
- ETHR_ASSERT(val == ETHR_EVENT_OFF__);
+ if (timeout < 0
+#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+ || e->fd[0] == ETHR_EVENT_COND_TIMEDWAIT__
+#endif
+ ) {
+
+ set_event_off_waiter:
+
+ if (val != ETHR_EVENT_OFF_WAITER__) {
+ ethr_sint32_t act;
+ act = ethr_atomic32_cmpxchg(&e->state,
+ ETHR_EVENT_OFF_WAITER__,
+ val);
+ if (act == ETHR_EVENT_ON__)
+ goto return_event_on;
+ ETHR_ASSERT(act == val);
+ }
+
+ res = pthread_mutex_lock(&e->mtx);
+ if (res != 0)
+ ETHR_FATAL_ERROR__(res);
+
+ while (1) {
+
+ val = ethr_atomic32_read(&e->state);
+ if (val == ETHR_EVENT_ON__) {
+ ETHR_ASSERT(res == 0);
+ ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
+ break;
+ }
+
+#ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC
+ if (timeout > 0) {
+ res = pthread_cond_timedwait(&e->cnd, &e->mtx, &cond_timeout);
+ if (res == EINTR || res == ETIMEDOUT)
+ break;
+ }
+ else
+#endif
+ {
+ res = pthread_cond_wait(&e->cnd, &e->mtx);
+ if (res == EINTR)
+ break;
+ }
+ if (res != 0)
+ ETHR_FATAL_ERROR__(res);
+ }
+
+ ulres = pthread_mutex_unlock(&e->mtx);
+ if (ulres != 0)
+ ETHR_FATAL_ERROR__(ulres);
+
}
+ else {
+ int fd;
+ int sres;
+ ssize_t rres;
+ fd_set rset;
+ fd_set eset;
+ struct timeval select_timeout;
+
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ time -= ethr_get_monotonic_time() - start;
+ if (time <= 0)
+ return ETIMEDOUT;
+#endif
- ETHR_ASSERT(val == ETHR_EVENT_OFF_WAITER__
- || val == ETHR_EVENT_OFF__);
+ set_select_timeout:
- res = pthread_mutex_lock(&e->mtx);
- if (res != 0)
- ETHR_FATAL_ERROR__(res);
+ ETHR_ASSERT(time > 0);
- while (1) {
+ /*
+ * timeout in nano-second, but we can only wait
+ * for micro-seconds...
+ */
+ time = ((time - 1) / 1000) + 1;
+
+ select_timeout.tv_sec = time / (1000*1000);
+ select_timeout.tv_usec = time % (1000*1000);
+
+ ETHR_ASSERT(val != ETHR_EVENT_ON__);
+
+ fd = e->fd[0];
+
+ /* Cleanup pipe... */
+ do {
+ char buf[64];
+ rres = read(fd, buf, sizeof(buf));
+ } while (rres > 0 || (rres < 0 && errno == EINTR));
+ if (rres < 0 && errno != EAGAIN && errno != EWOULDBLOCK)
+ ETHR_FATAL_ERROR__(errno);
+
+ /*
+ * Need to verify that state is still off
+ * after cleaning the pipe...
+ */
+ if (val == ETHR_EVENT_OFF_WAITER_SELECT__) {
+ val = ethr_atomic32_read(&e->state);
+ if (val == ETHR_EVENT_ON__)
+ goto return_event_on;
+ }
+ else {
+ ethr_sint32_t act;
+ act = ethr_atomic32_cmpxchg(&e->state,
+ ETHR_EVENT_OFF_WAITER_SELECT__,
+ val);
+ if (act == ETHR_EVENT_ON__)
+ goto return_event_on;
+ ETHR_ASSERT(act == val);
+ }
+
+ FD_ZERO(&rset);
+ FD_SET(fd, &rset);
+ FD_ZERO(&eset);
+ FD_SET(fd, &eset);
+
+ sres = select(fd + 1, &rset, NULL, &eset, &select_timeout);
+ if (sres == 0)
+ res = ETIMEDOUT;
+ else {
+ res = EINTR;
+ if (sres < 0 && errno != EINTR)
+ ETHR_FATAL_ERROR__(errno);
+ /* else:
+ * Event is *probably* set, but it can be a
+ * lingering writer. That is, it is important
+ * that we verify that it actually is set. If
+ * it isn't, return EINTR (spurious wakeup).
+ */
+ }
val = ethr_atomic32_read(&e->state);
if (val == ETHR_EVENT_ON__)
- break;
+ goto return_event_on;
- res = pthread_cond_wait(&e->cnd, &e->mtx);
- if (res == EINTR)
- break;
- if (res != 0)
- ETHR_FATAL_ERROR__(res);
}
- ulres = pthread_mutex_unlock(&e->mtx);
- if (ulres != 0)
- ETHR_FATAL_ERROR__(ulres);
+ return res;
+
+return_event_on:
+
+ ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore);
- return res; /* 0 || EINTR */
+ return 0;
}
#else
@@ -215,11 +510,23 @@ ethr_event_set(ethr_event *e)
int
ethr_event_wait(ethr_event *e)
{
- return wait__(e, 0);
+ return wait__(e, 0, -1);
}
int
ethr_event_swait(ethr_event *e, int spincount)
{
- return wait__(e, spincount);
+ return wait__(e, spincount, -1);
+}
+
+int
+ethr_event_twait(ethr_event *e, ethr_sint64_t timeout)
+{
+ return wait__(e, 0, timeout);
+}
+
+int
+ethr_event_stwait(ethr_event *e, int spincount, ethr_sint64_t timeout)
+{
+ return wait__(e, spincount, timeout);
}
diff --git a/erts/lib_src/pthread/ethread.c b/erts/lib_src/pthread/ethread.c
index 7cf38580c5..c0b1dad0b6 100644
--- a/erts/lib_src/pthread/ethread.c
+++ b/erts/lib_src/pthread/ethread.c
@@ -50,6 +50,8 @@
#define ETHREAD_IMPL__
#include "ethread.h"
+#undef ETHR_INCLUDE_MONOTONIC_CLOCK__
+#define ETHR_INCLUDE_MONOTONIC_CLOCK__
#include "ethr_internal.h"
#ifndef ETHR_HAVE_ETHREAD_DEFINES
@@ -237,6 +239,10 @@ ethr_x86_cpuid__(int *eax, int *ebx, int *ecx, int *edx)
#endif /* ETHR_X86_RUNTIME_CONF__ */
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+static void init_get_monotonic_time(void);
+#endif
+
/*
* --------------------------------------------------------------------------
* Exported functions
@@ -259,6 +265,10 @@ ethr_init(ethr_init_data *id)
goto error;
#endif
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+ init_get_monotonic_time();
+#endif
+
res = ethr_init_common__(id);
if (res != 0)
goto error;
@@ -613,6 +623,144 @@ int ethr_kill(const ethr_tid tid, const int sig)
#endif /* #if ETHR_HAVE_ETHR_SIG_FUNCS */
+#ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME
+
+static int broken_get_monotonic_time;
+
+#if defined(ETHR_HAVE_CLOCK_GETTIME_MONOTONIC)
+# ifndef ETHR_MONOTONIC_CLOCK_ID
+# error ETHR_MONOTONIC_CLOCK_ID should have been defined
+# endif
+
+ethr_sint64_t
+ethr_get_monotonic_time(void)
+{
+ ethr_sint64_t time;
+ struct timespec ts;
+
+ if (broken_get_monotonic_time)
+ return (ethr_sint64_t) 0;
+
+ if (0 != clock_gettime(ETHR_MONOTONIC_CLOCK_ID, &ts))
+ ETHR_FATAL_ERROR__(errno);
+
+ time = (ethr_sint64_t) ts.tv_sec;
+ time *= (ethr_sint64_t) 1000*1000*1000;
+ time += (ethr_sint64_t) ts.tv_nsec;
+ return time;
+}
+
+#elif defined(ETHR_HAVE_MACH_CLOCK_GET_TIME)
+# ifndef ETHR_MONOTONIC_CLOCK_ID
+# error ETHR_MONOTONIC_CLOCK_ID should have been defined
+# endif
+
+ethr_sint64_t
+ethr_get_monotonic_time(void)
+{
+ ethr_sint64_t time;
+ kern_return_t res;
+ clock_serv_t clk_srv;
+ mach_timespec_t time_spec;
+
+ if (broken_get_monotonic_time)
+ return (ethr_sint64_t) 0;
+
+ errno = EFAULT;
+ host_get_clock_service(mach_host_self(),
+ ETHR_MONOTONIC_CLOCK_ID,
+ &clk_srv);
+ res = clock_get_time(clk_srv, &time_spec);
+ if (res != KERN_SUCCESS)
+ ETHR_FATAL_ERROR__(errno);
+ mach_port_deallocate(mach_task_self(), clk_srv);
+
+ time = (ethr_sint64_t) time_spec.tv_sec;
+ time *= (ethr_sint64_t) 1000*1000*1000;
+ time += (ethr_sint64_t) time_spec.tv_nsec;
+ return time;
+}
+
+#elif defined(ETHR_HAVE_GETHRTIME)
+
+ethr_sint64_t
+ethr_get_monotonic_time(void)
+{
+ if (broken_get_monotonic_time)
+ return (ethr_sint64_t) 0;
+ return (ethr_sint64_t) gethrtime();
+}
+
+#else
+#error missing monotonic clock
+#endif
+
+int
+ethr_get_monotonic_time_is_broken(void)
+{
+ return broken_get_monotonic_time;
+}
+
+#include <string.h>
+#include <ctype.h>
+#include <sys/utsname.h>
+
+static void
+init_get_monotonic_time(void)
+{
+ struct utsname uts;
+ int vsn[3];
+ int i;
+ char *c;
+
+ broken_get_monotonic_time = 0;
+
+ (void) uname(&uts);
+
+ for (c = uts.sysname; *c; c++) {
+ if (isupper((int) *c))
+ *c = tolower((int) *c);
+ }
+
+ c = uts.release;
+ for (i = 0; i < sizeof(vsn)/sizeof(int); i++) {
+ if (!isdigit((int) *c))
+ vsn[i] = 0;
+ else {
+ char *c2 = c;
+ do {
+ c2++;
+ } while (isdigit((int) *c2));
+ *c2 = '\0';
+ vsn[i] = atoi(c);
+ c = c2;
+ c++;
+ }
+ }
+
+ if (strcmp("linux", uts.sysname) == 0) {
+ if (vsn[0] < 2
+ || (vsn[0] == 2 && vsn[1] < 6)
+ || (vsn[0] == 2 && vsn[1] == 6 && vsn[2] < 33)) {
+ broken_get_monotonic_time = 1;
+ }
+ }
+ else if (strcmp("sunos", uts.sysname) == 0) {
+ if ((vsn[0] < 5
+ || (vsn[0] == 5 && vsn[1] < 8))
+#if defined(HAVE_SYSCONF) && defined(_SC_NPROCESSORS_CONF)
+ && sysconf(_SC_NPROCESSORS_CONF) > 1
+#endif
+ ) {
+ broken_get_monotonic_time = 1;
+ }
+ }
+
+}
+
+
+#endif /* ETHR_HAVE_ETHR_GET_MONOTONIC_TIME */
+
ETHR_IMPL_NORETURN__
ethr_abort__(void)
{