/* * %CopyrightBegin% * * Copyright Ericsson AB 2009-2016. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * %CopyrightEnd% */ /* * Author: Rickard Green */ #define ETHR_INLINE_FUNC_NAME_(X) X ## __ #define ETHR_EVENT_IMPL__ #ifdef HAVE_CONFIG_H #include "config.h" #endif #if defined(__APPLE__) && defined(__MACH__) && !defined(__DARWIN__) # define __DARWIN__ 1 #endif #ifdef __DARWIN__ # define _DARWIN_UNLIMITED_SELECT #endif #include "ethread.h" #undef ETHR_INCLUDE_MONOTONIC_CLOCK__ #define ETHR_INCLUDE_MONOTONIC_CLOCK__ #include "ethr_internal.h" #if defined(ETHR_LINUX_FUTEX_IMPL__) /* --- Linux futex implementation of ethread events ------------------------- */ #include #include #define ETHR_YIELD_AFTER_BUSY_LOOPS 50 void ethr_init_event__(void) { } int ethr_event_init(ethr_event *e) { ethr_atomic32_init(&e->futex, ETHR_EVENT_OFF__); return 0; } int ethr_event_prepare_timed(ethr_event *e) { return 0; } int ethr_event_destroy(ethr_event *e) { return 0; } static ETHR_INLINE int wait__(ethr_event *e, int spincount, ethr_sint64_t timeout) { unsigned sc = spincount; int res; ethr_sint32_t val; int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; ethr_sint64_t time = 0; /* SHUT UP annoying faulty warning... */ struct timespec ts, *tsp; #ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME ethr_sint64_t start = 0; /* SHUT UP annoying faulty warning... */ #endif if (spincount < 0) ETHR_FATAL_ERROR__(EINVAL); if (timeout < 0) { tsp = NULL; } else { #ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME start = ethr_get_monotonic_time(); #endif tsp = &ts; time = timeout; if (spincount == 0) { val = ethr_atomic32_read(&e->futex); if (val == ETHR_EVENT_ON__) goto return_event_on; goto set_timeout; } } while (1) { while (1) { val = ethr_atomic32_read(&e->futex); if (val == ETHR_EVENT_ON__) goto return_event_on; if (sc == 0) break; sc--; ETHR_SPIN_BODY; if (--until_yield == 0) { until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; res = ETHR_YIELD(); if (res != 0) ETHR_FATAL_ERROR__(res); } } if (timeout >= 0) { #ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME time = timeout - (ethr_get_monotonic_time() - start); #endif set_timeout: if (time <= 0) { val = ethr_atomic32_read(&e->futex); if (val == ETHR_EVENT_ON__) goto return_event_on; return ETIMEDOUT; } ts.tv_sec = time / (1000*1000*1000); ts.tv_nsec = time % (1000*1000*1000); } if (val != ETHR_EVENT_OFF_WAITER__) { val = ethr_atomic32_cmpxchg(&e->futex, ETHR_EVENT_OFF_WAITER__, ETHR_EVENT_OFF__); if (val == ETHR_EVENT_ON__) goto return_event_on; ETHR_ASSERT(val == ETHR_EVENT_OFF__); } res = ETHR_FUTEX__(&e->futex, ETHR_FUTEX_WAIT__, ETHR_EVENT_OFF_WAITER__, tsp); switch (res) { case EINTR: case ETIMEDOUT: return res; case 0: case EWOULDBLOCK: break; default: ETHR_FATAL_ERROR__(res); } } return_event_on: ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore); return 0; } #elif defined(ETHR_PTHREADS) /* --- Posix mutex/cond pipe/select implementation of events ---------------- */ #include #include #include #include #include #include #include #include "erl_misc_utils.h" #ifdef __DARWIN__ struct ethr_event_fdsets___ { fd_set *rsetp; fd_set *esetp; size_t mem_size; fd_mask mem[1]; }; #endif static void setup_nonblocking_pipe(ethr_event *e) { int flgs; int res; res = pipe(e->fd); if (res != 0) ETHR_FATAL_ERROR__(errno); ETHR_ASSERT(e->fd[0] >= 0 && e->fd[1] >= 0); flgs = fcntl(e->fd[0], F_GETFL, 0); fcntl(e->fd[0], F_SETFL, flgs | O_NONBLOCK); flgs = fcntl(e->fd[1], F_GETFL, 0); fcntl(e->fd[1], F_SETFL, flgs | O_NONBLOCK); #ifndef __DARWIN__ if (e->fd[0] >= FD_SETSIZE) ETHR_FATAL_ERROR__(ENOTSUP); #else { int nmasks; ethr_event_fdsets__ *fdsets; size_t mem_size; nmasks = (e->fd[0]+NFDBITS)/NFDBITS; mem_size = 2*nmasks*sizeof(fd_mask); if (mem_size < 2*sizeof(fd_set)) { mem_size = 2*sizeof(fd_set); nmasks = mem_size/(2*sizeof(fd_mask)); } fdsets = malloc(sizeof(ethr_event_fdsets__) + mem_size - sizeof(fd_mask)); if (!fdsets) ETHR_FATAL_ERROR__(ENOMEM); fdsets->rsetp = (fd_set *) (char *) &fdsets->mem[0]; fdsets->esetp = (fd_set *) (char *) &fdsets->mem[nmasks]; fdsets->mem_size = mem_size; e->fdsets = fdsets; } #endif ETHR_MEMBAR(ETHR_StoreStore); } #define ETHR_EVENT_INVALID_FD__ -1 #define ETHR_EVENT_COND_TIMEDWAIT__ -2 #ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC static pthread_condattr_t monotonic_clock_cond_attr; #endif static pthread_condattr_t *monotonic_clock_cond_attr_p; #ifndef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME # undef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC #endif #ifndef ETHR_MONOTONIC_CLOCK_ID # undef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC #endif void ethr_init_event__(void) { monotonic_clock_cond_attr_p = NULL; #ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC if (!ethr_get_monotonic_time_is_broken() && pthread_condattr_init(&monotonic_clock_cond_attr) == 0) { if (pthread_condattr_setclock(&monotonic_clock_cond_attr, ETHR_MONOTONIC_CLOCK_ID) == 0) monotonic_clock_cond_attr_p = &monotonic_clock_cond_attr; else pthread_condattr_destroy(&monotonic_clock_cond_attr); } #endif } int ethr_event_init(ethr_event *e) { int res; ethr_atomic32_init(&e->state, ETHR_EVENT_OFF__); res = pthread_mutex_init(&e->mtx, NULL); if (res != 0) return res; res = pthread_cond_init(&e->cnd, monotonic_clock_cond_attr_p); if (res != 0) { pthread_mutex_destroy(&e->mtx); return res; } #ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC /* * If ethr_get_monotonic_time() is broken we * fall back on the pipe/select solution... */ if (monotonic_clock_cond_attr_p) { e->fd[0] = e->fd[1] = ETHR_EVENT_COND_TIMEDWAIT__; return 0; } #endif e->fd[0] = e->fd[1] = ETHR_EVENT_INVALID_FD__; #ifdef __DARWIN__ e->fdsets = NULL; #endif return 0; } int ethr_event_prepare_timed(ethr_event *e) { if (e->fd[0] == ETHR_EVENT_INVALID_FD__) setup_nonblocking_pipe(e); return 0; } int ethr_event_destroy(ethr_event *e) { int res; if (e->fd[0] >= 0) { close(e->fd[0]); close(e->fd[1]); } #ifdef __DARWIN__ if (e->fdsets) free(e->fdsets); #endif res = pthread_mutex_destroy(&e->mtx); if (res != 0) return res; return pthread_cond_destroy(&e->cnd); } static ETHR_INLINE int wait__(ethr_event *e, int spincount, ethr_sint64_t timeout) { int sc = spincount; ethr_sint32_t val; int res, ulres; int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; ethr_sint64_t time = 0; #ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME ethr_sint64_t timeout_time = 0; /* SHUT UP annoying faulty warning... */ #endif val = ethr_atomic32_read(&e->state); if (val == ETHR_EVENT_ON__) goto return_event_on; if (timeout < 0) { if (spincount == 0) goto set_event_off_waiter; } if (timeout == 0) return ETIMEDOUT; else { #ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME timeout_time = ethr_get_monotonic_time(); timeout_time += timeout; #endif switch (e->fd[0]) { case ETHR_EVENT_INVALID_FD__: time = timeout; setup_nonblocking_pipe(e); break; #ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC case ETHR_EVENT_COND_TIMEDWAIT__: time = -1; if (spincount == 0) goto set_event_off_waiter; break; #endif default: time = timeout; /* Already initialized pipe... */ if (spincount == 0) goto set_select_timeout; break; } } if (spincount < 0) ETHR_FATAL_ERROR__(EINVAL); while (1) { val = ethr_atomic32_read(&e->state); if (val == ETHR_EVENT_ON__) goto return_event_on; if (sc == 0) break; sc--; ETHR_SPIN_BODY; if (--until_yield == 0) { until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; res = ETHR_YIELD(); if (res != 0) ETHR_FATAL_ERROR__(res); } } if (timeout < 0 #ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC || e->fd[0] == ETHR_EVENT_COND_TIMEDWAIT__ #endif ) { #ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC struct timespec cond_timeout; #endif set_event_off_waiter: if (val != ETHR_EVENT_OFF_WAITER__) { ethr_sint32_t act; act = ethr_atomic32_cmpxchg(&e->state, ETHR_EVENT_OFF_WAITER__, val); if (act == ETHR_EVENT_ON__) goto return_event_on; ETHR_ASSERT(act == val); } res = pthread_mutex_lock(&e->mtx); if (res != 0) ETHR_FATAL_ERROR__(res); while (1) { val = ethr_atomic32_read(&e->state); if (val == ETHR_EVENT_ON__) { ETHR_ASSERT(res == 0); ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore); break; } #ifdef ETHR_HAVE_PTHREAD_COND_TIMEDWAIT_MONOTONIC if (timeout > 0) { if (time != timeout_time) { time = timeout_time; #if ERTS_USE_PREMATURE_TIMEOUT { ethr_sint64_t rtmo; rtmo = timeout_time - ethr_get_monotonic_time(); if (rtmo <= 0) { res = ETIMEDOUT; break; } time = timeout_time; time -= ERTS_PREMATURE_TIMEOUT(rtmo, 1000*1000*1000); } #endif cond_timeout.tv_sec = time / (1000*1000*1000); cond_timeout.tv_nsec = time % (1000*1000*1000); } res = pthread_cond_timedwait(&e->cnd, &e->mtx, &cond_timeout); if (res == EINTR || (res == ETIMEDOUT #if ERTS_USE_PREMATURE_TIMEOUT && time == timeout_time #endif )) { break; } } else #endif { res = pthread_cond_wait(&e->cnd, &e->mtx); if (res == EINTR) break; } if (res != 0) ETHR_FATAL_ERROR__(res); } ulres = pthread_mutex_unlock(&e->mtx); if (ulres != 0) ETHR_FATAL_ERROR__(ulres); } else { int fd; int sres; ssize_t rres; #ifndef __DARWIN__ fd_set rset, eset; #endif fd_set *rsetp, *esetp; struct timeval select_timeout; #ifdef ETHR_HAVE_ETHR_GET_MONOTONIC_TIME #if ERTS_USE_PREMATURE_TIMEOUT restart_select: #endif time = timeout_time - ethr_get_monotonic_time(); if (time <= 0) return ETIMEDOUT; #endif set_select_timeout: ETHR_ASSERT(time > 0); /* * timeout in nano-second, but we can only wait * for micro-seconds... */ time = ((time - 1) / 1000) + 1; #if defined(ETHR_HAVE_ETHR_GET_MONOTONIC_TIME) \ && ERTS_USE_PREMATURE_TIMEOUT time -= ERTS_PREMATURE_TIMEOUT(time, 1000*1000); #endif select_timeout.tv_sec = time / (1000*1000); select_timeout.tv_usec = time % (1000*1000); ETHR_ASSERT(val != ETHR_EVENT_ON__); fd = e->fd[0]; /* Cleanup pipe... */ do { char buf[64]; rres = read(fd, buf, sizeof(buf)); } while (rres > 0 || (rres < 0 && errno == EINTR)); if (rres < 0 && errno != EAGAIN && errno != EWOULDBLOCK) ETHR_FATAL_ERROR__(errno); /* * Need to verify that state is still off * after cleaning the pipe... */ if (val == ETHR_EVENT_OFF_WAITER_SELECT__) { val = ethr_atomic32_read(&e->state); if (val == ETHR_EVENT_ON__) goto return_event_on; } else { ethr_sint32_t act; act = ethr_atomic32_cmpxchg(&e->state, ETHR_EVENT_OFF_WAITER_SELECT__, val); if (act == ETHR_EVENT_ON__) goto return_event_on; ETHR_ASSERT(act == val); } #ifdef __DARWIN__ rsetp = e->fdsets->rsetp; esetp = e->fdsets->esetp; memset((void *) &e->fdsets->mem[0], 0, e->fdsets->mem_size); #else FD_ZERO(&rset); FD_ZERO(&eset); rsetp = &rset; esetp = &eset; #endif FD_SET(fd, rsetp); FD_SET(fd, esetp); sres = select(fd + 1, rsetp, NULL, esetp, &select_timeout); if (sres == 0) res = ETIMEDOUT; else { res = EINTR; if (sres < 0 && errno != EINTR) ETHR_FATAL_ERROR__(errno); /* else: * Event is *probably* set, but it can be a * lingering writer. That is, it is important * that we verify that it actually is set. If * it isn't, return EINTR (spurious wakeup). */ } val = ethr_atomic32_read(&e->state); if (val == ETHR_EVENT_ON__) goto return_event_on; #if defined(ETHR_HAVE_ETHR_GET_MONOTONIC_TIME) \ && ERTS_USE_PREMATURE_TIMEOUT if (res == ETIMEDOUT) goto restart_select; /* Verify timeout */ #endif } return res; return_event_on: ETHR_MEMBAR(ETHR_LoadLoad|ETHR_LoadStore); return 0; } #else #error No ethread event implementation #endif void ethr_event_reset(ethr_event *e) { ethr_event_reset__(e); } void ethr_event_set(ethr_event *e) { ethr_event_set__(e); } int ethr_event_wait(ethr_event *e) { return wait__(e, 0, -1); } int ethr_event_swait(ethr_event *e, int spincount) { return wait__(e, spincount, -1); } int ethr_event_twait(ethr_event *e, ethr_sint64_t timeout) { return wait__(e, 0, timeout); } int ethr_event_stwait(ethr_event *e, int spincount, ethr_sint64_t timeout) { return wait__(e, spincount, timeout); }