aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/sys/common/erl_poll.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2010-09-15 22:14:51 +0200
committerRickard Green <[email protected]>2011-11-13 20:39:30 +0100
commita67e91e658bdbba24fcc3c79b06fdf10ff830bc9 (patch)
tree07f9e6b1fd715d516d2571521307fe1b9d7c3948 /erts/emulator/sys/common/erl_poll.c
parent55358c54778ead444e51f565d00175ba887ef182 (diff)
downloadotp-a67e91e658bdbba24fcc3c79b06fdf10ff830bc9.tar.gz
otp-a67e91e658bdbba24fcc3c79b06fdf10ff830bc9.tar.bz2
otp-a67e91e658bdbba24fcc3c79b06fdf10ff830bc9.zip
Optimize memory allocation
A number of memory allocation optimizations have been implemented. Most optimizations reduce contention caused by synchronization between threads during allocation and deallocation of memory. Most notably: * Synchronization of memory management in scheduler specific allocator instances has been rewritten to use lock-free synchronization. * Synchronization of memory management in scheduler specific pre-allocators has been rewritten to use lock-free synchronization. * The 'mseg_alloc' memory segment allocator now use scheduler specific instances instead of one instance. Apart from reducing contention this also ensures that memory allocators always create memory segments on the local NUMA node on a NUMA system.
Diffstat (limited to 'erts/emulator/sys/common/erl_poll.c')
-rw-r--r--erts/emulator/sys/common/erl_poll.c193
1 files changed, 115 insertions, 78 deletions
diff --git a/erts/emulator/sys/common/erl_poll.c b/erts/emulator/sys/common/erl_poll.c
index 9bd64f5908..80db2055a2 100644
--- a/erts/emulator/sys/common/erl_poll.c
+++ b/erts/emulator/sys/common/erl_poll.c
@@ -68,6 +68,7 @@
# endif
# endif
#endif
+#include "erl_thr_progress.h"
#include "erl_driver.h"
#include "erl_alloc.h"
@@ -114,7 +115,7 @@
#endif
#define ERTS_POLL_USE_WAKEUP_PIPE \
- (ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(ERTS_SMP))
+ (ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(USE_THREADS))
#ifdef ERTS_SMP
@@ -261,7 +262,6 @@ struct ErtsPollSet_ {
#ifdef ERTS_SMP
erts_atomic32_t polled;
erts_smp_mtx_t mtx;
-#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
#endif
#if ERTS_POLL_USE_WAKEUP_PIPE
int wake_fds[2];
@@ -269,10 +269,8 @@ struct ErtsPollSet_ {
#if ERTS_POLL_USE_FALLBACK
int fallback_used;
#endif
-#ifdef ERTS_SMP
+#if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
erts_atomic32_t wakeup_state;
-#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
- volatile int wakeup_state;
#endif
erts_smp_atomic32_t timeout;
#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS
@@ -345,21 +343,16 @@ static void print_misc_debug_info(void);
static ERTS_INLINE void
reset_wakeup_state(ErtsPollSet ps)
{
-#ifdef ERTS_SMP
- erts_atomic32_set_nob(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN);
- ERTS_THR_MEMORY_BARRIER;
-#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
- ps->wakeup_state = 0;
+#if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
+ erts_atomic32_set_mb(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN);
#endif
}
static ERTS_INLINE int
is_woken(ErtsPollSet ps)
{
-#ifdef ERTS_SMP
+#if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
return erts_atomic32_read_acqb(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN;
-#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
- return ps->wakeup_state != ERTS_POLL_NOT_WOKEN;
#else
return 0;
#endif
@@ -368,13 +361,9 @@ is_woken(ErtsPollSet ps)
static ERTS_INLINE int
is_interrupted_reset(ErtsPollSet ps)
{
-#ifdef ERTS_SMP
+#if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
return (erts_atomic32_xchg_nob(&ps->wakeup_state, ERTS_POLL_NOT_WOKEN)
== ERTS_POLL_WOKEN_INTR);
-#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
- int res = ps->wakeup_state == ERTS_POLL_WOKEN_INTR;
- ps->wakeup_state = ERTS_POLL_NOT_WOKEN;
- return res;
#else
return 0;
#endif
@@ -383,16 +372,13 @@ is_interrupted_reset(ErtsPollSet ps)
static ERTS_INLINE void
woke_up(ErtsPollSet ps)
{
-#ifdef ERTS_SMP
+#if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
erts_aint32_t wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state);
if (wakeup_state == ERTS_POLL_NOT_WOKEN)
(void) erts_atomic32_cmpxchg_nob(&ps->wakeup_state,
ERTS_POLL_WOKEN,
ERTS_POLL_NOT_WOKEN);
ASSERT(erts_atomic32_read_nob(&ps->wakeup_state) != ERTS_POLL_NOT_WOKEN);
-#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
- if (ps->wakeup_state == ERTS_POLL_NOT_WOKEN)
- ps->wakeup_state = ERTS_POLL_WOKEN;
#endif
}
@@ -403,28 +389,27 @@ woke_up(ErtsPollSet ps)
#if ERTS_POLL_USE_WAKEUP_PIPE
static ERTS_INLINE void
-wake_poller(ErtsPollSet ps, int interrupted)
+wake_poller(ErtsPollSet ps, int interrupted, int async_signal_safe)
{
- int wake = 0;
-#ifdef ERTS_SMP
- erts_aint32_t wakeup_state;
- if (!interrupted)
- wakeup_state = erts_atomic32_cmpxchg_relb(&ps->wakeup_state,
- ERTS_POLL_WOKEN,
- ERTS_POLL_NOT_WOKEN);
+ int wake;
+ if (async_signal_safe)
+ wake = 1;
else {
- /*
- * We might unnecessarily write to the pipe, however,
- * that isn't problematic.
- */
- wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state);
- erts_atomic32_set_relb(&ps->wakeup_state, ERTS_POLL_WOKEN_INTR);
+ erts_aint32_t wakeup_state;
+ if (!interrupted)
+ wakeup_state = erts_atomic32_cmpxchg_relb(&ps->wakeup_state,
+ ERTS_POLL_WOKEN,
+ ERTS_POLL_NOT_WOKEN);
+ else {
+ /*
+ * We might unnecessarily write to the pipe, however,
+ * that isn't problematic.
+ */
+ wakeup_state = erts_atomic32_read_nob(&ps->wakeup_state);
+ erts_atomic32_set_relb(&ps->wakeup_state, ERTS_POLL_WOKEN_INTR);
+ }
+ wake = wakeup_state == ERTS_POLL_NOT_WOKEN;
}
- wake = wakeup_state == ERTS_POLL_NOT_WOKEN;
-#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
- wake = ps->wakeup_state == ERTS_POLL_NOT_WOKEN;
- ps->wakeup_state = interrupted ? ERTS_POLL_WOKEN_INTR : ERTS_POLL_NOT_WOKEN;
-#endif
/*
* NOTE: This function might be called from signal handlers in the
* non-smp case; therefore, it has to be async-signal safe in
@@ -439,9 +424,17 @@ wake_poller(ErtsPollSet ps, int interrupted)
res = write(ps->wake_fds[1], "!", 1);
} while (res < 0 && errno == EINTR);
if (res <= 0 && errno != ERRNO_BLOCK) {
- fatal_error_async_signal_safe(__FILE__
- ":XXX:wake_poller(): "
- "Failed to write on wakeup pipe\n");
+ if (async_signal_safe)
+ fatal_error_async_signal_safe(__FILE__
+ ":XXX:wake_poller(): "
+ "Failed to write on wakeup pipe\n");
+ else
+ fatal_error("%s:%d:wake_poller(): "
+ "Failed to write to wakeup pipe fd=%d: "
+ "%s (%d)\n",
+ __FILE__, __LINE__,
+ ps->wake_fds[1],
+ erl_errno_id(errno), errno);
}
}
}
@@ -449,11 +442,18 @@ wake_poller(ErtsPollSet ps, int interrupted)
static ERTS_INLINE void
cleanup_wakeup_pipe(ErtsPollSet ps)
{
+#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
+ int intr = 0;
+#endif
int fd = ps->wake_fds[0];
int res;
do {
char buf[32];
res = read(fd, buf, sizeof(buf));
+#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
+ if (res > 0)
+ intr = 1;
+#endif
} while (res > 0 || (res < 0 && errno == EINTR));
if (res < 0 && errno != ERRNO_BLOCK) {
fatal_error("%s:%d:cleanup_wakeup_pipe(): "
@@ -463,6 +463,10 @@ cleanup_wakeup_pipe(ErtsPollSet ps)
fd,
erl_errno_id(errno), errno);
}
+#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
+ if (intr)
+ erts_atomic32_set_nob(&ps->wakeup_state, ERTS_POLL_WOKEN_INTR);
+#endif
}
static void
@@ -1497,7 +1501,7 @@ ERTS_POLL_EXPORT(erts_poll_controlv)(ErtsPollSet ps,
#ifdef ERTS_SMP
if (final_do_wake)
- wake_poller(ps, 0);
+ wake_poller(ps, 0, 0);
#endif /* ERTS_SMP */
}
@@ -1520,7 +1524,7 @@ ERTS_POLL_EXPORT(erts_poll_control)(ErtsPollSet ps,
#ifdef ERTS_SMP
if (*do_wake) {
- wake_poller(ps, 0);
+ wake_poller(ps, 0, 0);
}
#endif /* ERTS_SMP */
@@ -1893,9 +1897,9 @@ save_poll_result(ErtsPollSet ps, ErtsPollResFd pr[], int max_res,
}
static ERTS_INLINE int
-check_fd_events(ErtsPollSet ps, SysTimeval *tv, int max_res, int *ps_locked)
+check_fd_events(ErtsPollSet ps, SysTimeval *tv, int max_res)
{
- ASSERT(!*ps_locked);
+ int res;
if (erts_smp_atomic_read_nob(&ps->no_of_user_fds) == 0
&& tv->tv_usec == 0 && tv->tv_sec == 0) {
/* Nothing to poll and zero timeout; done... */
@@ -1915,16 +1919,23 @@ check_fd_events(ErtsPollSet ps, SysTimeval *tv, int max_res, int *ps_locked)
timeout = INT_MAX;
if (max_res > ps->res_events_len)
grow_res_events(ps, max_res);
- return epoll_wait(ps->kp_fd, ps->res_events, max_res, (int)timeout);
+#ifdef ERTS_SMP
+ if (timeout)
+ erts_thr_progress_prepare_wait(NULL);
+#endif
+ res = epoll_wait(ps->kp_fd, ps->res_events, max_res, (int)timeout);
#elif ERTS_POLL_USE_KQUEUE /* --- kqueue ------------------------------ */
struct timespec ts;
- ts.tv_sec = tv->tv_sec;
- ts.tv_nsec = tv->tv_usec*1000;
if (max_res > ps->res_events_len)
grow_res_events(ps, max_res);
- return kevent(ps->kp_fd, NULL, 0, ps->res_events, max_res, &ts);
+#ifdef ERTS_SMP
+ if (timeout)
+ erts_thr_progress_prepare_wait(NULL);
+#endif
+ ts.tv_sec = tv->tv_sec;
+ ts.tv_nsec = tv->tv_usec*1000;
+ res = kevent(ps->kp_fd, NULL, 0, ps->res_events, max_res, &ts);
#endif /* ----------------------------------------- */
-
}
else /* use fallback (i.e. poll() or select()) */
#endif /* ERTS_POLL_USE_FALLBACK */
@@ -1947,22 +1958,38 @@ check_fd_events(ErtsPollSet ps, SysTimeval *tv, int max_res, int *ps_locked)
if (poll_res.dp_nfds > ps->res_events_len)
grow_res_events(ps, poll_res.dp_nfds);
poll_res.dp_fds = ps->res_events;
+#ifdef ERTS_SMP
+ if (timeout)
+ erts_thr_progress_prepare_wait(NULL);
+#endif
poll_res.dp_timeout = (int) timeout;
- return ioctl(ps->kp_fd, DP_POLL, &poll_res);
+ res = ioctl(ps->kp_fd, DP_POLL, &poll_res);
#elif ERTS_POLL_USE_POLL /* --- poll -------------------------------- */
if (timeout > INT_MAX)
timeout = INT_MAX;
- return poll(ps->poll_fds, ps->no_poll_fds, (int) timeout);
+#ifdef ERTS_SMP
+ if (timeout)
+ erts_thr_progress_prepare_wait(NULL);
+#endif
+ res = poll(ps->poll_fds, ps->no_poll_fds, (int) timeout);
#elif ERTS_POLL_USE_SELECT /* --- select ------------------------------ */
- int res;
+ SysTimeval to = *tv;
+
ps->res_input_fds = ps->input_fds;
ps->res_output_fds = ps->output_fds;
+
+#ifdef ERTS_SMP
+ if (to.tv_sec || to.tv_usec)
+ erts_thr_progress_prepare_wait(NULL);
+#endif
res = select(ps->max_fd + 1,
&ps->res_input_fds,
&ps->res_output_fds,
NULL,
- tv);
+ &to);
#ifdef ERTS_SMP
+ if (to.tv_sec || to.tv_usec)
+ erts_thr_progress_finalize_wait(NULL);
if (res < 0
&& errno == EBADF
&& ERTS_POLLSET_HAVE_UPDATE_REQUESTS(ps)) {
@@ -1978,15 +2005,16 @@ check_fd_events(ErtsPollSet ps, SysTimeval *tv, int max_res, int *ps_locked)
* have triggered, we fake an EAGAIN error and let the caller
* restart us.
*/
- SysTimeval zero_tv = {0, 0};
- *ps_locked = 1;
+ to.tv_sec = 0;
+ to.tv_usec = 0;
ERTS_POLLSET_LOCK(ps);
handle_update_requests(ps);
+ ERTS_POLLSET_UNLOCK(ps);
res = select(ps->max_fd + 1,
&ps->res_input_fds,
&ps->res_output_fds,
NULL,
- &zero_tv);
+ &to);
if (res == 0) {
errno = EAGAIN;
res = -1;
@@ -1996,6 +2024,11 @@ check_fd_events(ErtsPollSet ps, SysTimeval *tv, int max_res, int *ps_locked)
return res;
#endif /* ----------------------------------------- */
}
+#ifdef ERTS_SMP
+ if (timeout)
+ erts_thr_progress_finalize_wait(NULL);
+#endif
+ return res;
}
}
@@ -2007,7 +2040,9 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps,
{
int res, no_fds;
int ebadf = 0;
- int ps_locked;
+#ifdef ERTS_SMP
+ int ps_locked = 0;
+#endif
SysTimeval *tvp;
SysTimeval itv;
@@ -2049,8 +2084,7 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps,
}
#endif
- ps_locked = 0;
- res = check_fd_events(ps, tvp, no_fds, &ps_locked);
+ res = check_fd_events(ps, tvp, no_fds);
woke_up(ps);
@@ -2072,10 +2106,8 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps,
#endif
#ifdef ERTS_SMP
- if (!ps_locked) {
- ps_locked = 1;
- ERTS_POLLSET_LOCK(ps);
- }
+ ps_locked = 1;
+ ERTS_POLLSET_LOCK(ps);
#endif
no_fds = save_poll_result(ps, pr, no_fds, res, ebadf);
@@ -2111,19 +2143,26 @@ ERTS_POLL_EXPORT(erts_poll_wait)(ErtsPollSet ps,
void
ERTS_POLL_EXPORT(erts_poll_interrupt)(ErtsPollSet ps, int set)
{
-#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(ERTS_SMP)
- /*
- * NOTE: This function might be called from signal handlers in the
- * non-smp case; therefore, it has to be async-signal safe in
- * the non-smp case.
- */
+#if defined(USE_THREADS)
if (!set)
reset_wakeup_state(ps);
else
- wake_poller(ps, 1);
+ wake_poller(ps, 1, 0);
#endif
}
+#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
+void
+ERTS_POLL_EXPORT(erts_poll_async_sig_interrupt)(ErtsPollSet ps)
+{
+ /*
+ * NOTE: This function is called from signal handlers, it,
+ * therefore, it has to be async-signal safe.
+ */
+ wake_poller(ps, 1, 1);
+}
+#endif
+
/*
* erts_poll_interrupt_timed():
* If 'set' != 0, interrupt thread blocked in erts_poll_wait() if it
@@ -2139,7 +2178,7 @@ ERTS_POLL_EXPORT(erts_poll_interrupt_timed)(ErtsPollSet ps,
reset_wakeup_state(ps);
else {
if (erts_smp_atomic32_read_acqb(&ps->timeout) > (erts_aint32_t) msec)
- wake_poller(ps, 1);
+ wake_poller(ps, 1, 0);
#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS
else {
if (ERTS_POLLSET_IS_POLLED(ps))
@@ -2266,10 +2305,8 @@ ERTS_POLL_EXPORT(erts_poll_create_pollset)(void)
erts_atomic32_init_nob(&ps->polled, 0);
erts_smp_mtx_init(&ps->mtx, "pollset");
#endif
-#ifdef ERTS_SMP
+#if defined(USE_THREADS) || ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
erts_atomic32_init_nob(&ps->wakeup_state, (erts_aint32_t) 0);
-#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT
- ps->wakeup_state = 0;
#endif
#if ERTS_POLL_USE_WAKEUP_PIPE
create_wakeup_pipe(ps);