From 6487aac5977cf470bc6a2cd0964da2850ee38717 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Thu, 30 Oct 2014 23:57:01 +0100 Subject: Introduce a new time API The old time API is based on erlang:now/0. The major issue with erlang:now/0 is that it was intended to be used for so many unrelated things. This tied these unrelated operations together and unnecessarily caused performance, scalability as well as accuracy, and precision issues for operations that do not need to have such issues. The new API spreads different functionality over multiple functions in order to improve on this. The new API consists of a number of new BIFs: - erlang:convert_time_unit/3 - erlang:monotonic_time/0 - erlang:monotonic_time/1 - erlang:system_time/0 - erlang:system_time/1 - erlang:time_offset/0 - erlang:time_offset/1 - erlang:timestamp/0 - erlang:unique_integer/0 - erlang:unique_integer/1 - os:system_time/0 - os:system_time/1 and a number of extensions of existing BIFs: - erlang:monitor(time_offset, clock_service) - erlang:system_flag(time_offset, finalize) - erlang:system_info(os_monotonic_time_source) - erlang:system_info(time_offset) - erlang:system_info(time_warp_mode) - erlang:system_info(time_correction) - erlang:system_info(start_time) See the "Time and Time Correction in Erlang" chapter of the ERTS User's Guide for more information. --- erts/emulator/beam/time.c | 477 ++++++++++++++++++++++++++++------------------ 1 file changed, 293 insertions(+), 184 deletions(-) (limited to 'erts/emulator/beam/time.c') diff --git a/erts/emulator/beam/time.c b/erts/emulator/beam/time.c index 2fd8e0cf00..c9f8b68bca 100644 --- a/erts/emulator/beam/time.c +++ b/erts/emulator/beam/time.c @@ -83,6 +83,10 @@ #define ASSERT_NO_LOCKED_LOCKS #endif +#define ERTS_MONOTONIC_DAY ERTS_SEC_TO_MONOTONIC(60*60*24) +#define ERTS_CLKTCKS_DAY ERTS_MONOTONIC_TO_CLKTCKS(ERTS_MONOTONIC_DAY) + +static erts_smp_atomic32_t is_bumping; static erts_smp_mtx_t tiw_lock; @@ -91,18 +95,20 @@ static erts_smp_mtx_t tiw_lock; ** The individual timer cells in tiw are also protected by the same mutex. */ +/* timing wheel size NEED to be a power of 2 */ #ifdef SMALL_MEMORY -#define TIW_SIZE 8192 +#define TIW_SIZE (1 << 13) #else -#define TIW_SIZE 65536 /* timing wheel size (should be a power of 2) */ +#define TIW_SIZE (1 << 20) #endif static ErlTimer** tiw; /* the timing wheel, allocated in init_time() */ -static Uint tiw_pos; /* current position in wheel */ +static ErtsMonotonicTime tiw_pos; /* current position in wheel */ static Uint tiw_nto; /* number of timeouts in wheel */ -static Uint tiw_min; -static ErlTimer *tiw_min_ptr; - -/* END tiw_lock protected variables */ +static struct { + ErlTimer *head; + ErlTimer **tail; + Uint nto; +} tiw_at_once; /* Actual interval time chosen by sys_init_time() */ @@ -114,77 +120,97 @@ static int tiw_itime; /* Constant after init */ # define TIW_ITIME tiw_itime #endif -erts_smp_atomic32_t do_time; /* set at clock interrupt */ -static ERTS_INLINE erts_short_time_t do_time_read(void) -{ - return erts_smp_atomic32_read_acqb(&do_time); -} +static int true_next_timeout_time; +static ErtsMonotonicTime next_timeout_time; +erts_atomic64_t erts_next_timeout__; -static ERTS_INLINE erts_short_time_t do_time_update(void) +static ERTS_INLINE void +init_next_timeout(ErtsMonotonicTime time) { - return do_time_read(); + erts_atomic64_init_nob(&erts_next_timeout__, + (erts_aint64_t) time); } -static ERTS_INLINE void do_time_init(void) +static ERTS_INLINE void +set_next_timeout(ErtsMonotonicTime time, int true_timeout) { - erts_smp_atomic32_init_nob(&do_time, 0); + true_next_timeout_time = true_timeout; + next_timeout_time = time; + erts_atomic64_set_relb(&erts_next_timeout__, + (erts_aint64_t) time); } /* get the time (in units of TIW_ITIME) to the next timeout, or -1 if there are no timeouts */ -static erts_short_time_t next_time_internal(void) /* PRE: tiw_lock taken by caller */ +static ERTS_INLINE ErtsMonotonicTime +find_next_timeout(ErtsMonotonicTime curr_time, + ErtsMonotonicTime max_search_time) { - int i, tm, nto; - Uint32 min; - ErlTimer* p; - erts_short_time_t dt; - - if (tiw_nto == 0) - return -1; /* no timeouts in wheel */ + int start_ix, tiw_pos_ix; + ErlTimer *p; + int true_min_timeout; + ErtsMonotonicTime min_timeout, min_timeout_pos, slot_timeout_pos, timeout_limit; - if (tiw_min_ptr) { - min = tiw_min; - dt = do_time_read(); - return ((min >= dt) ? (min - dt) : 0); + ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&tiw_lock)); + + if (true_next_timeout_time) + return next_timeout_time; + + /* We never set next timeout beyond timeout_limit */ + timeout_limit = curr_time + ERTS_MONOTONIC_DAY; + + if (tiw_nto == 0) { /* no timeouts in wheel */ + true_min_timeout = true_next_timeout_time = 0; + min_timeout_pos = ERTS_MONOTONIC_TO_CLKTCKS(timeout_limit); + goto found_next; } - - /* start going through wheel to find next timeout */ - tm = nto = 0; - min = (Uint32) -1; /* max Uint32 */ - i = tiw_pos; + + /* + * Don't want others entering trying to bump + * timers while we are checking... + */ + set_next_timeout(timeout_limit, 0); + + true_min_timeout = 1; + slot_timeout_pos = tiw_pos; + min_timeout_pos = ERTS_MONOTONIC_TO_CLKTCKS(curr_time + max_search_time); + + start_ix = tiw_pos_ix = (int) (tiw_pos & (TIW_SIZE-1)); + do { - p = tiw[i]; - while (p != NULL) { - nto++; - if (p->count == 0) { - /* found next timeout */ - dt = do_time_read(); - /* p->count is zero */ - tiw_min_ptr = p; - tiw_min = tm; - return ((tm >= dt) ? (tm - dt) : 0); - } else { - /* keep shortest time in 'min' */ - if (tm + p->count*TIW_SIZE < min) { - min = tm + p->count*TIW_SIZE; - tiw_min_ptr = p; - tiw_min = min; - } + slot_timeout_pos++; + if (slot_timeout_pos >= min_timeout_pos) { + true_min_timeout = 0; + break; + } + + p = tiw[tiw_pos_ix]; + + while (p) { + ErtsMonotonicTime timeout_pos; + ASSERT(p != p->next); + timeout_pos = p->timeout_pos; + if (min_timeout_pos > timeout_pos) { + min_timeout_pos = timeout_pos; + if (min_timeout_pos <= slot_timeout_pos) + goto found_next; } p = p->next; } - /* when we have found all timeouts the shortest time will be in min */ - if (nto == tiw_nto) break; - tm++; - i = (i + 1) % TIW_SIZE; - } while (i != tiw_pos); - dt = do_time_read(); - if (min <= (Uint32) dt) - return 0; - if ((min - (Uint32) dt) > (Uint32) ERTS_SHORT_TIME_T_MAX) - return ERTS_SHORT_TIME_T_MAX; - return (erts_short_time_t) (min - (Uint32) dt); + + tiw_pos_ix++; + if (tiw_pos_ix == TIW_SIZE) + tiw_pos_ix = 0; + } while (start_ix != tiw_pos_ix); + +found_next: + + min_timeout = ERTS_CLKTCKS_TO_MONOTONIC(min_timeout_pos); + if (min_timeout != next_timeout_time) + set_next_timeout(min_timeout, true_min_timeout); + + return min_timeout; } static void remove_timer(ErlTimer *p) { @@ -212,72 +238,153 @@ static void remove_timer(ErlTimer *p) { tiw_nto--; } -/* Private export to erl_time_sup.c */ -erts_short_time_t erts_next_time(void) +ErtsMonotonicTime +erts_check_next_timeout_time(ErtsMonotonicTime max_search_time) { - erts_short_time_t ret; + ErtsMonotonicTime next, curr; + + curr = erts_get_monotonic_time(); erts_smp_mtx_lock(&tiw_lock); - (void)do_time_update(); - ret = next_time_internal(); + + next = find_next_timeout(curr, max_search_time); + erts_smp_mtx_unlock(&tiw_lock); - return ret; + + return next; } -static ERTS_INLINE void bump_timer_internal(erts_short_time_t dt) /* PRE: tiw_lock is write-locked */ +#ifndef DEBUG +#define ERTS_DBG_CHK_SAFE_TO_SKIP_TO(TO) ((void) 0) +#else +#define ERTS_DBG_CHK_SAFE_TO_SKIP_TO(TO) debug_check_safe_to_skip_to((TO)) +static void +debug_check_safe_to_skip_to(ErtsMonotonicTime skip_to_pos) { - Uint keep_pos; - Uint count; - ErlTimer *p, **prev, *timeout_head, **timeout_tail; - Uint dtime = (Uint) dt; + int slots, ix; + ErlTimer *tmr; + ErtsMonotonicTime tmp; + + ix = (int) (tiw_pos & (TIW_SIZE-1)); + tmp = skip_to_pos - tiw_pos; + ASSERT(tmp >= 0); + if (tmp < (ErtsMonotonicTime) TIW_SIZE) + slots = (int) tmp; + else + slots = TIW_SIZE; + + while (slots > 0) { + tmr = tiw[ix]; + while (tmr) { + ASSERT(tmr->timeout_pos > skip_to_pos); + tmr = tmr->next; + } + ix++; + if (ix == TIW_SIZE) + ix = 0; + slots--; + } +} +#endif + +void erts_bump_timers(ErtsMonotonicTime curr_time) +{ + int tiw_pos_ix, slots; + ErlTimer *p, *timeout_head, **timeout_tail; + ErtsMonotonicTime bump_to, tmp_slots; + + if (erts_smp_atomic32_cmpxchg_nob(&is_bumping, 1, 0) != 0) + return; /* Another thread is currently bumping... */ + + bump_to = ERTS_MONOTONIC_TO_CLKTCKS(curr_time); + + erts_smp_mtx_lock(&tiw_lock); + + if (tiw_pos >= bump_to) { + timeout_head = NULL; + goto done; + } + + /* Don't want others here while we are bumping... */ + set_next_timeout(curr_time + ERTS_MONOTONIC_DAY, 0); + + if (!tiw_at_once.head) { + timeout_head = NULL; + timeout_tail = &timeout_head; + } + else { + ASSERT(tiw_nto >= tiw_at_once.nto); + timeout_head = tiw_at_once.head; + timeout_tail = tiw_at_once.tail; + tiw_nto -= tiw_at_once.nto; + tiw_at_once.head = NULL; + tiw_at_once.tail = &tiw_at_once.head; + tiw_at_once.nto = 0; + } - /* no need to bump the position if there aren't any timeouts */ if (tiw_nto == 0) { - erts_smp_mtx_unlock(&tiw_lock); - return; + ERTS_DBG_CHK_SAFE_TO_SKIP_TO(bump_to); + tiw_pos = bump_to; + goto done; } - /* if do_time > TIW_SIZE we want to go around just once */ - count = (Uint)(dtime / TIW_SIZE) + 1; - keep_pos = (tiw_pos + dtime) % TIW_SIZE; - if (dtime > TIW_SIZE) dtime = TIW_SIZE; - - timeout_head = NULL; - timeout_tail = &timeout_head; - while (dtime > 0) { - /* this is to decrease the counters with the right amount */ - /* when dtime >= TIW_SIZE */ - if (tiw_pos == keep_pos) count--; - prev = &tiw[tiw_pos]; - while ((p = *prev) != NULL) { - ASSERT( p != p->next); - if (p->count < count) { /* we have a timeout */ - /* remove min time */ - if (tiw_min_ptr == p) { - tiw_min_ptr = NULL; - tiw_min = 0; - } + if (true_next_timeout_time) { + ErtsMonotonicTime skip_until_pos; + /* + * No need inspecting slots where we know no timeouts + * to trigger should reside. + */ + skip_until_pos = ERTS_MONOTONIC_TO_CLKTCKS(next_timeout_time); + if (skip_until_pos > bump_to) + skip_until_pos = bump_to; + + ERTS_DBG_CHK_SAFE_TO_SKIP_TO(skip_until_pos); + ASSERT(skip_until_pos > tiw_pos); + + tiw_pos = skip_until_pos - 1; + } + + tiw_pos_ix = (int) ((tiw_pos+1) & (TIW_SIZE-1)); + tmp_slots = (bump_to - tiw_pos); + if (tmp_slots < (ErtsMonotonicTime) TIW_SIZE) + slots = (int) tmp_slots; + else + slots = TIW_SIZE; + + while (slots > 0) { + p = tiw[tiw_pos_ix]; + while (p) { + ErlTimer *next = p->next; + ASSERT(p != next); + if (p->timeout_pos <= bump_to) { /* we have a timeout */ /* Remove from list */ remove_timer(p); *timeout_tail = p; /* Insert in timeout queue */ timeout_tail = &p->next; } - else { - /* no timeout, just decrease counter */ - p->count -= count; - prev = &p->next; - } + p = next; } - tiw_pos = (tiw_pos + 1) % TIW_SIZE; - dtime--; + tiw_pos_ix++; + if (tiw_pos_ix == TIW_SIZE) + tiw_pos_ix = 0; + slots--; } - tiw_pos = keep_pos; - if (tiw_min_ptr) - tiw_min -= dt; - + + ASSERT(tmp_slots >= (ErtsMonotonicTime) TIW_SIZE + || tiw_pos_ix == (int) ((bump_to+1) & (TIW_SIZE-1))); + + tiw_pos = bump_to; + + /* Search at most two seconds ahead... */ + (void) find_next_timeout(curr_time, ERTS_SEC_TO_MONOTONIC(2)); + +done: + erts_smp_mtx_unlock(&tiw_lock); + erts_smp_atomic32_set_nob(&is_bumping, 0); + /* Call timedout timers callbacks */ while (timeout_head) { p = timeout_head; @@ -288,6 +395,7 @@ static ERTS_INLINE void bump_timer_internal(erts_short_time_t dt) /* PRE: tiw_lo * accesses any field until the ->timeout * callback is called. */ + ASSERT(p->timeout_pos <= bump_to); p->next = NULL; p->prev = NULL; p->slot = 0; @@ -295,12 +403,6 @@ static ERTS_INLINE void bump_timer_internal(erts_short_time_t dt) /* PRE: tiw_lo } } -void erts_bump_timer(erts_short_time_t dt) /* dt is value from do_time */ -{ - erts_smp_mtx_lock(&tiw_lock); - bump_timer_internal(dt); -} - Uint erts_timer_wheel_memory_size(void) { @@ -310,13 +412,14 @@ erts_timer_wheel_memory_size(void) /* this routine links the time cells into a free list at the start and sets the time queue as empty */ void -erts_init_time(void) +erts_init_time(int time_correction, ErtsTimeWarpMode time_warp_mode) { + ErtsMonotonicTime mtime; int i, itime; /* system dependent init; must be done before do_time_init() if timer thread is enabled */ - itime = erts_init_time_sup(); + itime = erts_init_time_sup(time_correction, time_warp_mode); #ifdef TIW_ITIME_IS_CONSTANT if (itime != TIW_ITIME) { erl_exit(ERTS_ABORT_EXIT, "timer resolution mismatch %d != %d", itime, TIW_ITIME); @@ -325,16 +428,23 @@ erts_init_time(void) tiw_itime = itime; #endif + erts_smp_atomic32_init_nob(&is_bumping, 0); erts_smp_mtx_init(&tiw_lock, "timer_wheel"); tiw = (ErlTimer**) erts_alloc(ERTS_ALC_T_TIMER_WHEEL, TIW_SIZE * sizeof(ErlTimer*)); for(i = 0; i < TIW_SIZE; i++) tiw[i] = NULL; - do_time_init(); - tiw_pos = tiw_nto = 0; - tiw_min_ptr = NULL; - tiw_min = 0; + + mtime = erts_get_monotonic_time(); + tiw_pos = ERTS_MONOTONIC_TO_CLKTCKS(mtime); + tiw_nto = 0; + tiw_at_once.head = NULL; + tiw_at_once.tail = &tiw_at_once.head; + tiw_at_once.nto = 0; + init_next_timeout(mtime + ERTS_MONOTONIC_DAY); + + erts_late_init_time_sup(); } @@ -343,58 +453,62 @@ erts_init_time(void) /* ** Insert a process into the time queue, with a timeout 't' */ -static void -insert_timer(ErlTimer* p, Uint t) +static ErtsMonotonicTime +insert_timer(ErlTimer* p, ErtsMonotonicTime curr_time, ErtsMonotonicTime to) { - Uint tm; - Uint64 ticks; + ErtsMonotonicTime timeout_time, timeout_pos; - /* The current slot (tiw_pos) in timing wheel is the next slot to be - * be processed. Hence no extra time tick is needed. - * - * (x + y - 1)/y is precisely the "number of bins" formula. - */ - ticks = (t + (TIW_ITIME - 1)) / TIW_ITIME; + if (to == 0) { + timeout_pos = ERTS_MONOTONIC_TO_CLKTCKS(curr_time); + tiw_nto++; + tiw_at_once.nto++; + *tiw_at_once.tail = p; + p->next = NULL; + p->timeout_pos = timeout_pos; + timeout_time = ERTS_CLKTCKS_TO_MONOTONIC(timeout_pos); + } + else { + int tm; + ErtsMonotonicTime ticks; - /* - * Ticks must be a Uint64, or the addition may overflow here, - * resulting in an incorrect value for p->count below. - */ - ticks += do_time_update(); /* Add backlog of unprocessed time */ - - /* calculate slot */ - tm = (ticks + tiw_pos) % TIW_SIZE; - p->slot = (Uint) tm; - p->count = (Uint) (ticks / TIW_SIZE); + ticks = ERTS_MSEC_TO_CLKTCKS(to); + timeout_pos = ERTS_MONOTONIC_TO_CLKTCKS(curr_time - 1) + 1 + ticks; + + /* calculate slot */ + tm = (int) (timeout_pos & (TIW_SIZE-1)); + p->slot = (Uint) tm; - /* insert at head of list at slot */ - p->next = tiw[tm]; - p->prev = NULL; - if (p->next != NULL) - p->next->prev = p; - tiw[tm] = p; + /* insert at head of list at slot */ + p->next = tiw[tm]; + p->prev = NULL; + if (p->next != NULL) + p->next->prev = p; + tiw[tm] = p; + tiw_nto++; - /* insert min time */ - if ((tiw_nto == 0) || ((tiw_min_ptr != NULL) && (ticks < tiw_min))) { - tiw_min = ticks; - tiw_min_ptr = p; - } - if ((tiw_min_ptr == p) && (ticks > tiw_min)) { - /* some other timer might be 'min' now */ - tiw_min = 0; - tiw_min_ptr = NULL; + timeout_time = ERTS_CLKTCKS_TO_MONOTONIC(timeout_pos); + p->timeout_pos = timeout_pos; + + ASSERT(ERTS_MSEC_TO_MONOTONIC(to) <= timeout_time - curr_time); + ASSERT(timeout_time - curr_time + < ERTS_MSEC_TO_MONOTONIC(to) + ERTS_CLKTCKS_TO_MONOTONIC(1)); } - tiw_nto++; + if (timeout_time < next_timeout_time) + set_next_timeout(timeout_time, 1); + + return timeout_time; } void erts_set_timer(ErlTimer* p, ErlTimeoutProc timeout, ErlCancelProc cancel, void* arg, Uint t) { - - erts_deliver_time(); +#ifdef ERTS_SMP + ErtsMonotonicTime timeout_time; +#endif + ErtsMonotonicTime current_time = erts_get_monotonic_time(); erts_smp_mtx_lock(&tiw_lock); if (p->active) { /* XXX assert ? */ erts_smp_mtx_unlock(&tiw_lock); @@ -404,11 +518,15 @@ erts_set_timer(ErlTimer* p, ErlTimeoutProc timeout, ErlCancelProc cancel, p->cancel = cancel; p->arg = arg; p->active = 1; - insert_timer(p, t); +#ifdef ERTS_SMP + timeout_time = +#else + (void) +#endif + insert_timer(p, current_time, (ErtsMonotonicTime) t); erts_smp_mtx_unlock(&tiw_lock); #if defined(ERTS_SMP) - if (t <= (Uint) ERTS_SHORT_TIME_T_MAX) - erts_sys_schedule_interrupt_timed(1, (erts_short_time_t) t); + erts_sys_schedule_interrupt_timed(1, timeout_time); #endif } @@ -421,14 +539,8 @@ erts_cancel_timer(ErlTimer* p) return; } - /* is it the 'min' timer, remove min */ - if (p == tiw_min_ptr) { - tiw_min_ptr = NULL; - tiw_min = 0; - } - remove_timer(p); - p->slot = p->count = 0; + p->slot = 0; if (p->cancel != NULL) { erts_smp_mtx_unlock(&tiw_lock); @@ -447,8 +559,7 @@ erts_cancel_timer(ErlTimer* p) Uint erts_time_left(ErlTimer *p) { - Uint left; - erts_short_time_t dt; + ErtsMonotonicTime current_time, timeout_time; erts_smp_mtx_lock(&tiw_lock); @@ -457,45 +568,43 @@ erts_time_left(ErlTimer *p) return 0; } - if (p->slot < tiw_pos) - left = (p->count + 1) * TIW_SIZE + p->slot - tiw_pos; - else - left = p->count * TIW_SIZE + p->slot - tiw_pos; - dt = do_time_read(); - if (left < dt) - left = 0; - else - left -= dt; + timeout_time = ERTS_CLKTCKS_TO_MONOTONIC(p->timeout_pos); erts_smp_mtx_unlock(&tiw_lock); - return (Uint) left * TIW_ITIME; + current_time = erts_get_monotonic_time(); + if (timeout_time <= current_time) + return 0; + return (Uint) ERTS_MONOTONIC_TO_MSEC(timeout_time - current_time); } #ifdef DEBUG void erts_p_slpq(void) { + ErtsMonotonicTime current_time = erts_get_monotonic_time(); int i; ErlTimer* p; erts_smp_mtx_lock(&tiw_lock); /* print the whole wheel, starting at the current position */ - erts_printf("\ntiw_pos = %d tiw_nto %d\n", tiw_pos, tiw_nto); + erts_printf("\ncurrent time = %bps tiw_pos = %d tiw_nto %d\n", + current_time, tiw_pos, tiw_nto); i = tiw_pos; if (tiw[i] != NULL) { erts_printf("%d:\n", i); for(p = tiw[i]; p != NULL; p = p->next) { - erts_printf(" (count %d, slot %d)\n", - p->count, p->slot); + erts_printf(" (timeout time %bps, slot %d)\n", + ERTS_CLKTCKS_TO_MONOTONIC(p->timeout_pos), + p->slot); } } - for(i = (i+1)%TIW_SIZE; i != tiw_pos; i = (i+1)%TIW_SIZE) { + for(i = ((i+1) & (TIW_SIZE-1)); i != (tiw_pos & (TIW_SIZE-1)); i = ((i+1) & (TIW_SIZE-1))) { if (tiw[i] != NULL) { erts_printf("%d:\n", i); for(p = tiw[i]; p != NULL; p = p->next) { - erts_printf(" (count %d, slot %d)\n", - p->count, p->slot); + erts_printf(" (timeout time %bps, slot %d)\n", + ERTS_CLKTCKS_TO_MONOTONIC(p->timeout_pos), p->slot); } } } -- cgit v1.2.3 From 1d9350693fe2c4d1d6b2baa504aacd070e023a1a Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Fri, 13 Feb 2015 10:12:02 +0100 Subject: Multiple timer wheels --- erts/emulator/beam/time.c | 387 ++++++++++++++++++++++++++-------------------- 1 file changed, 218 insertions(+), 169 deletions(-) (limited to 'erts/emulator/beam/time.c') diff --git a/erts/emulator/beam/time.c b/erts/emulator/beam/time.c index c9f8b68bca..9f997e1d0b 100644 --- a/erts/emulator/beam/time.c +++ b/erts/emulator/beam/time.c @@ -86,9 +86,6 @@ #define ERTS_MONOTONIC_DAY ERTS_SEC_TO_MONOTONIC(60*60*24) #define ERTS_CLKTCKS_DAY ERTS_MONOTONIC_TO_CLKTCKS(ERTS_MONOTONIC_DAY) -static erts_smp_atomic32_t is_bumping; -static erts_smp_mtx_t tiw_lock; - /* BEGIN tiw_lock protected variables ** @@ -101,14 +98,6 @@ static erts_smp_mtx_t tiw_lock; #else #define TIW_SIZE (1 << 20) #endif -static ErlTimer** tiw; /* the timing wheel, allocated in init_time() */ -static ErtsMonotonicTime tiw_pos; /* current position in wheel */ -static Uint tiw_nto; /* number of timeouts in wheel */ -static struct { - ErlTimer *head; - ErlTimer **tail; - Uint nto; -} tiw_at_once; /* Actual interval time chosen by sys_init_time() */ @@ -120,23 +109,52 @@ static int tiw_itime; /* Constant after init */ # define TIW_ITIME tiw_itime #endif -static int true_next_timeout_time; -static ErtsMonotonicTime next_timeout_time; -erts_atomic64_t erts_next_timeout__; +struct ErtsTimerWheel_ { + ErlTimer *w[TIW_SIZE]; + ErtsMonotonicTime pos; + Uint nto; + struct { + ErlTimer *head; + ErlTimer **tail; + Uint nto; + } at_once; + int true_next_timeout_time; + ErtsMonotonicTime next_timeout_time; + erts_atomic64_t next_timeout; + erts_smp_atomic32_t is_bumping; + erts_smp_mtx_t lock; +}; + +ErtsTimerWheel *erts_default_timer_wheel; /* managed by aux thread */ + +static ERTS_INLINE ErtsTimerWheel * +get_timer_wheel(ErlTimer *p) +{ + return (ErtsTimerWheel *) erts_smp_atomic_read_acqb(&p->wheel); +} + +static ERTS_INLINE void +set_timer_wheel(ErlTimer *p, ErtsTimerWheel *tiw) +{ + erts_smp_atomic_set_relb(&p->wheel, (erts_aint_t) tiw); +} static ERTS_INLINE void -init_next_timeout(ErtsMonotonicTime time) +init_next_timeout(ErtsTimerWheel *tiw, + ErtsMonotonicTime time) { - erts_atomic64_init_nob(&erts_next_timeout__, + erts_atomic64_init_nob(&tiw->next_timeout, (erts_aint64_t) time); } static ERTS_INLINE void -set_next_timeout(ErtsMonotonicTime time, int true_timeout) +set_next_timeout(ErtsTimerWheel *tiw, + ErtsMonotonicTime time, + int true_timeout) { - true_next_timeout_time = true_timeout; - next_timeout_time = time; - erts_atomic64_set_relb(&erts_next_timeout__, + tiw->true_next_timeout_time = true_timeout; + tiw->next_timeout_time = time; + erts_atomic64_set_relb(&tiw->next_timeout, (erts_aint64_t) time); } @@ -144,7 +162,8 @@ set_next_timeout(ErtsMonotonicTime time, int true_timeout) or -1 if there are no timeouts */ static ERTS_INLINE ErtsMonotonicTime -find_next_timeout(ErtsMonotonicTime curr_time, +find_next_timeout(ErtsTimerWheel *tiw, + ErtsMonotonicTime curr_time, ErtsMonotonicTime max_search_time) { int start_ix, tiw_pos_ix; @@ -152,16 +171,16 @@ find_next_timeout(ErtsMonotonicTime curr_time, int true_min_timeout; ErtsMonotonicTime min_timeout, min_timeout_pos, slot_timeout_pos, timeout_limit; - ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&tiw_lock)); + ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&tiw->lock)); - if (true_next_timeout_time) - return next_timeout_time; + if (tiw->true_next_timeout_time) + return tiw->next_timeout_time; /* We never set next timeout beyond timeout_limit */ timeout_limit = curr_time + ERTS_MONOTONIC_DAY; - if (tiw_nto == 0) { /* no timeouts in wheel */ - true_min_timeout = true_next_timeout_time = 0; + if (tiw->nto == 0) { /* no timeouts in wheel */ + true_min_timeout = tiw->true_next_timeout_time = 0; min_timeout_pos = ERTS_MONOTONIC_TO_CLKTCKS(timeout_limit); goto found_next; } @@ -170,13 +189,13 @@ find_next_timeout(ErtsMonotonicTime curr_time, * Don't want others entering trying to bump * timers while we are checking... */ - set_next_timeout(timeout_limit, 0); + set_next_timeout(tiw, timeout_limit, 0); true_min_timeout = 1; - slot_timeout_pos = tiw_pos; + slot_timeout_pos = tiw->pos; min_timeout_pos = ERTS_MONOTONIC_TO_CLKTCKS(curr_time + max_search_time); - start_ix = tiw_pos_ix = (int) (tiw_pos & (TIW_SIZE-1)); + start_ix = tiw_pos_ix = (int) (tiw->pos & (TIW_SIZE-1)); do { slot_timeout_pos++; @@ -185,7 +204,7 @@ find_next_timeout(ErtsMonotonicTime curr_time, break; } - p = tiw[tiw_pos_ix]; + p = tiw->w[tiw_pos_ix]; while (p) { ErtsMonotonicTime timeout_pos; @@ -207,16 +226,18 @@ find_next_timeout(ErtsMonotonicTime curr_time, found_next: min_timeout = ERTS_CLKTCKS_TO_MONOTONIC(min_timeout_pos); - if (min_timeout != next_timeout_time) - set_next_timeout(min_timeout, true_min_timeout); + if (min_timeout != tiw->next_timeout_time) + set_next_timeout(tiw, min_timeout, true_min_timeout); return min_timeout; } -static void remove_timer(ErlTimer *p) { +static void +remove_timer(ErtsTimerWheel *tiw, ErlTimer *p) +{ /* first */ if (!p->prev) { - tiw[p->slot] = p->next; + tiw->w[p->slot] = p->next; if(p->next) p->next->prev = NULL; } else { @@ -233,40 +254,41 @@ static void remove_timer(ErlTimer *p) { p->next = NULL; p->prev = NULL; - /* Make sure cancel callback isn't called */ - p->active = 0; - tiw_nto--; + + set_timer_wheel(p, NULL); + tiw->nto--; } ErtsMonotonicTime -erts_check_next_timeout_time(ErtsMonotonicTime max_search_time) +erts_check_next_timeout_time(ErtsTimerWheel *tiw, + ErtsMonotonicTime max_search_time) { ErtsMonotonicTime next, curr; curr = erts_get_monotonic_time(); - erts_smp_mtx_lock(&tiw_lock); + erts_smp_mtx_lock(&tiw->lock); - next = find_next_timeout(curr, max_search_time); + next = find_next_timeout(tiw, curr, max_search_time); - erts_smp_mtx_unlock(&tiw_lock); + erts_smp_mtx_unlock(&tiw->lock); return next; } #ifndef DEBUG -#define ERTS_DBG_CHK_SAFE_TO_SKIP_TO(TO) ((void) 0) +#define ERTS_DBG_CHK_SAFE_TO_SKIP_TO(TIW, TO) ((void) 0) #else -#define ERTS_DBG_CHK_SAFE_TO_SKIP_TO(TO) debug_check_safe_to_skip_to((TO)) +#define ERTS_DBG_CHK_SAFE_TO_SKIP_TO(TIW, TO) debug_check_safe_to_skip_to((TIW), (TO)) static void -debug_check_safe_to_skip_to(ErtsMonotonicTime skip_to_pos) +debug_check_safe_to_skip_to(ErtsTimerWheel *tiw, ErtsMonotonicTime skip_to_pos) { int slots, ix; ErlTimer *tmr; ErtsMonotonicTime tmp; - ix = (int) (tiw_pos & (TIW_SIZE-1)); - tmp = skip_to_pos - tiw_pos; + ix = (int) (tiw->pos & (TIW_SIZE-1)); + tmp = skip_to_pos - tiw->pos; ASSERT(tmp >= 0); if (tmp < (ErtsMonotonicTime) TIW_SIZE) slots = (int) tmp; @@ -274,7 +296,7 @@ debug_check_safe_to_skip_to(ErtsMonotonicTime skip_to_pos) slots = TIW_SIZE; while (slots > 0) { - tmr = tiw[ix]; + tmr = tiw->w[ix]; while (tmr) { ASSERT(tmr->timeout_pos > skip_to_pos); tmr = tmr->next; @@ -287,79 +309,80 @@ debug_check_safe_to_skip_to(ErtsMonotonicTime skip_to_pos) } #endif -void erts_bump_timers(ErtsMonotonicTime curr_time) +void +erts_bump_timers(ErtsTimerWheel *tiw, ErtsMonotonicTime curr_time) { int tiw_pos_ix, slots; ErlTimer *p, *timeout_head, **timeout_tail; ErtsMonotonicTime bump_to, tmp_slots; - if (erts_smp_atomic32_cmpxchg_nob(&is_bumping, 1, 0) != 0) + if (erts_smp_atomic32_cmpxchg_nob(&tiw->is_bumping, 1, 0) != 0) return; /* Another thread is currently bumping... */ bump_to = ERTS_MONOTONIC_TO_CLKTCKS(curr_time); - erts_smp_mtx_lock(&tiw_lock); + erts_smp_mtx_lock(&tiw->lock); - if (tiw_pos >= bump_to) { + if (tiw->pos >= bump_to) { timeout_head = NULL; goto done; } /* Don't want others here while we are bumping... */ - set_next_timeout(curr_time + ERTS_MONOTONIC_DAY, 0); + set_next_timeout(tiw, curr_time + ERTS_MONOTONIC_DAY, 0); - if (!tiw_at_once.head) { + if (!tiw->at_once.head) { timeout_head = NULL; timeout_tail = &timeout_head; } else { - ASSERT(tiw_nto >= tiw_at_once.nto); - timeout_head = tiw_at_once.head; - timeout_tail = tiw_at_once.tail; - tiw_nto -= tiw_at_once.nto; - tiw_at_once.head = NULL; - tiw_at_once.tail = &tiw_at_once.head; - tiw_at_once.nto = 0; + ASSERT(tiw->nto >= tiw->at_once.nto); + timeout_head = tiw->at_once.head; + timeout_tail = tiw->at_once.tail; + tiw->nto -= tiw->at_once.nto; + tiw->at_once.head = NULL; + tiw->at_once.tail = &tiw->at_once.head; + tiw->at_once.nto = 0; } - if (tiw_nto == 0) { - ERTS_DBG_CHK_SAFE_TO_SKIP_TO(bump_to); - tiw_pos = bump_to; + if (tiw->nto == 0) { + ERTS_DBG_CHK_SAFE_TO_SKIP_TO(tiw, bump_to); + tiw->pos = bump_to; goto done; } - if (true_next_timeout_time) { + if (tiw->true_next_timeout_time) { ErtsMonotonicTime skip_until_pos; /* * No need inspecting slots where we know no timeouts * to trigger should reside. */ - skip_until_pos = ERTS_MONOTONIC_TO_CLKTCKS(next_timeout_time); + skip_until_pos = ERTS_MONOTONIC_TO_CLKTCKS(tiw->next_timeout_time); if (skip_until_pos > bump_to) skip_until_pos = bump_to; - ERTS_DBG_CHK_SAFE_TO_SKIP_TO(skip_until_pos); - ASSERT(skip_until_pos > tiw_pos); + ERTS_DBG_CHK_SAFE_TO_SKIP_TO(tiw, skip_until_pos); + ASSERT(skip_until_pos > tiw->pos); - tiw_pos = skip_until_pos - 1; + tiw->pos = skip_until_pos - 1; } - tiw_pos_ix = (int) ((tiw_pos+1) & (TIW_SIZE-1)); - tmp_slots = (bump_to - tiw_pos); + tiw_pos_ix = (int) ((tiw->pos+1) & (TIW_SIZE-1)); + tmp_slots = (bump_to - tiw->pos); if (tmp_slots < (ErtsMonotonicTime) TIW_SIZE) slots = (int) tmp_slots; else slots = TIW_SIZE; while (slots > 0) { - p = tiw[tiw_pos_ix]; + p = tiw->w[tiw_pos_ix]; while (p) { ErlTimer *next = p->next; ASSERT(p != next); if (p->timeout_pos <= bump_to) { /* we have a timeout */ /* Remove from list */ - remove_timer(p); + remove_timer(tiw, p); *timeout_tail = p; /* Insert in timeout queue */ timeout_tail = &p->next; } @@ -374,19 +397,21 @@ void erts_bump_timers(ErtsMonotonicTime curr_time) ASSERT(tmp_slots >= (ErtsMonotonicTime) TIW_SIZE || tiw_pos_ix == (int) ((bump_to+1) & (TIW_SIZE-1))); - tiw_pos = bump_to; + tiw->pos = bump_to; /* Search at most two seconds ahead... */ - (void) find_next_timeout(curr_time, ERTS_SEC_TO_MONOTONIC(2)); + (void) find_next_timeout(tiw, curr_time, ERTS_SEC_TO_MONOTONIC(2)); done: - erts_smp_mtx_unlock(&tiw_lock); + erts_smp_mtx_unlock(&tiw->lock); - erts_smp_atomic32_set_nob(&is_bumping, 0); + erts_smp_atomic32_set_nob(&tiw->is_bumping, 0); /* Call timedout timers callbacks */ while (timeout_head) { + ErlTimeoutProc timeout; + void *arg; p = timeout_head; timeout_head = p->next; /* Here comes hairy use of the timer fields! @@ -399,23 +424,61 @@ done: p->next = NULL; p->prev = NULL; p->slot = 0; - (*p->timeout)(p->arg); + timeout = p->timeout; + arg = p->arg; + (*timeout)(arg); } } Uint erts_timer_wheel_memory_size(void) { - return (Uint) TIW_SIZE * sizeof(ErlTimer*); +#ifdef ERTS_SMP + return sizeof(ErtsTimerWheel)*(1 + erts_no_schedulers); +#else + return sizeof(ErtsTimerWheel); +#endif } +ErtsTimerWheel * +erts_create_timer_wheel(int no) +{ + ErtsMonotonicTime mtime; + int i; + ErtsTimerWheel *tiw; + tiw = (ErtsTimerWheel *) erts_alloc(ERTS_ALC_T_TIMER_WHEEL, + sizeof(ErtsTimerWheel)); + for(i = 0; i < TIW_SIZE; i++) + tiw->w[i] = NULL; + + erts_smp_atomic32_init_nob(&tiw->is_bumping, 0); + erts_smp_mtx_init_x(&tiw->lock, "timer_wheel", make_small(no)); + + mtime = erts_get_monotonic_time(); + tiw->pos = ERTS_MONOTONIC_TO_CLKTCKS(mtime); + tiw->nto = 0; + tiw->at_once.head = NULL; + tiw->at_once.tail = &tiw->at_once.head; + tiw->at_once.nto = 0; + tiw->true_next_timeout_time = 0; + tiw->next_timeout_time = mtime + ERTS_MONOTONIC_DAY; + init_next_timeout(tiw, mtime + ERTS_MONOTONIC_DAY); + return tiw; +} + +ErtsNextTimeoutRef +erts_get_next_timeout_reference(ErtsTimerWheel *tiw) +{ + return (ErtsNextTimeoutRef) &tiw->next_timeout; +} + + /* this routine links the time cells into a free list at the start and sets the time queue as empty */ void erts_init_time(int time_correction, ErtsTimeWarpMode time_warp_mode) { - ErtsMonotonicTime mtime; - int i, itime; + int itime; /* system dependent init; must be done before do_time_init() if timer thread is enabled */ @@ -428,41 +491,39 @@ erts_init_time(int time_correction, ErtsTimeWarpMode time_warp_mode) tiw_itime = itime; #endif - erts_smp_atomic32_init_nob(&is_bumping, 0); - erts_smp_mtx_init(&tiw_lock, "timer_wheel"); - - tiw = (ErlTimer**) erts_alloc(ERTS_ALC_T_TIMER_WHEEL, - TIW_SIZE * sizeof(ErlTimer*)); - for(i = 0; i < TIW_SIZE; i++) - tiw[i] = NULL; - - mtime = erts_get_monotonic_time(); - tiw_pos = ERTS_MONOTONIC_TO_CLKTCKS(mtime); - tiw_nto = 0; - tiw_at_once.head = NULL; - tiw_at_once.tail = &tiw_at_once.head; - tiw_at_once.nto = 0; - init_next_timeout(mtime + ERTS_MONOTONIC_DAY); - - erts_late_init_time_sup(); + erts_default_timer_wheel = erts_create_timer_wheel(0); } +void +erts_set_timer(ErlTimer *p, ErlTimeoutProc timeout, + ErlCancelProc cancel, void *arg, Uint to) +{ + ErtsMonotonicTime timeout_time, timeout_pos; + ErtsMonotonicTime curr_time; + ErtsTimerWheel *tiw; + ErtsSchedulerData *esdp; + + curr_time = erts_get_monotonic_time(); + esdp = erts_get_scheduler_data(); + if (esdp) + tiw = esdp->timer_wheel; + else + tiw = erts_default_timer_wheel; + erts_smp_mtx_lock(&tiw->lock); + if (get_timer_wheel(p)) + ERTS_INTERNAL_ERROR("Double set timer"); -/* -** Insert a process into the time queue, with a timeout 't' -*/ -static ErtsMonotonicTime -insert_timer(ErlTimer* p, ErtsMonotonicTime curr_time, ErtsMonotonicTime to) -{ - ErtsMonotonicTime timeout_time, timeout_pos; + p->timeout = timeout; + p->cancel = cancel; + p->arg = arg; if (to == 0) { timeout_pos = ERTS_MONOTONIC_TO_CLKTCKS(curr_time); - tiw_nto++; - tiw_at_once.nto++; - *tiw_at_once.tail = p; + tiw->nto++; + tiw->at_once.nto++; + *tiw->at_once.tail = p; p->next = NULL; p->timeout_pos = timeout_pos; timeout_time = ERTS_CLKTCKS_TO_MONOTONIC(timeout_pos); @@ -479,13 +540,13 @@ insert_timer(ErlTimer* p, ErtsMonotonicTime curr_time, ErtsMonotonicTime to) p->slot = (Uint) tm; /* insert at head of list at slot */ - p->next = tiw[tm]; + p->next = tiw->w[tm]; p->prev = NULL; if (p->next != NULL) p->next->prev = p; - tiw[tm] = p; + tiw->w[tm] = p; - tiw_nto++; + tiw->nto++; timeout_time = ERTS_CLKTCKS_TO_MONOTONIC(timeout_pos); p->timeout_pos = timeout_pos; @@ -495,59 +556,45 @@ insert_timer(ErlTimer* p, ErtsMonotonicTime curr_time, ErtsMonotonicTime to) < ERTS_MSEC_TO_MONOTONIC(to) + ERTS_CLKTCKS_TO_MONOTONIC(1)); } - if (timeout_time < next_timeout_time) - set_next_timeout(timeout_time, 1); + if (timeout_time < tiw->next_timeout_time) + set_next_timeout(tiw, timeout_time, 1); - return timeout_time; -} + set_timer_wheel(p, tiw); + + erts_smp_mtx_unlock(&tiw->lock); -void -erts_set_timer(ErlTimer* p, ErlTimeoutProc timeout, ErlCancelProc cancel, - void* arg, Uint t) -{ -#ifdef ERTS_SMP - ErtsMonotonicTime timeout_time; -#endif - ErtsMonotonicTime current_time = erts_get_monotonic_time(); - erts_smp_mtx_lock(&tiw_lock); - if (p->active) { /* XXX assert ? */ - erts_smp_mtx_unlock(&tiw_lock); - return; - } - p->timeout = timeout; - p->cancel = cancel; - p->arg = arg; - p->active = 1; -#ifdef ERTS_SMP - timeout_time = -#else - (void) -#endif - insert_timer(p, current_time, (ErtsMonotonicTime) t); - erts_smp_mtx_unlock(&tiw_lock); #if defined(ERTS_SMP) - erts_sys_schedule_interrupt_timed(1, timeout_time); + if (tiw == erts_default_timer_wheel) + erts_interupt_aux_thread_timed(timeout_time); #endif + } void -erts_cancel_timer(ErlTimer* p) +erts_cancel_timer(ErlTimer *p) { - erts_smp_mtx_lock(&tiw_lock); - if (!p->active) { /* allow repeated cancel (drivers) */ - erts_smp_mtx_unlock(&tiw_lock); - return; - } - - remove_timer(p); - p->slot = 0; + ErtsTimerWheel *tiw; + ErlCancelProc cancel; + void *arg; - if (p->cancel != NULL) { - erts_smp_mtx_unlock(&tiw_lock); - (*p->cancel)(p->arg); + tiw = get_timer_wheel(p); + if (!tiw) return; + + erts_smp_mtx_lock(&tiw->lock); + if (tiw != get_timer_wheel(p)) + cancel = NULL; + else { + remove_timer(tiw, p); + p->slot = 0; + + cancel = p->cancel; + arg = p->arg; } - erts_smp_mtx_unlock(&tiw_lock); + erts_smp_mtx_unlock(&tiw->lock); + + if (cancel) + (*cancel)(arg); } /* @@ -559,18 +606,19 @@ erts_cancel_timer(ErlTimer* p) Uint erts_time_left(ErlTimer *p) { + ErtsTimerWheel *tiw; ErtsMonotonicTime current_time, timeout_time; - erts_smp_mtx_lock(&tiw_lock); - - if (!p->active) { - erts_smp_mtx_unlock(&tiw_lock); + tiw = get_timer_wheel(p); + if (!tiw) return 0; - } - timeout_time = ERTS_CLKTCKS_TO_MONOTONIC(p->timeout_pos); - - erts_smp_mtx_unlock(&tiw_lock); + erts_smp_mtx_lock(&tiw->lock); + if (tiw != get_timer_wheel(p)) + timeout_time = ERTS_MONOTONIC_TIME_MIN; + else + timeout_time = ERTS_CLKTCKS_TO_MONOTONIC(p->timeout_pos); + erts_smp_mtx_unlock(&tiw->lock); current_time = erts_get_monotonic_time(); if (timeout_time <= current_time) @@ -581,34 +629,35 @@ erts_time_left(ErlTimer *p) #ifdef DEBUG void erts_p_slpq(void) { + ErtsTimerWheel *tiw = erts_default_timer_wheel; ErtsMonotonicTime current_time = erts_get_monotonic_time(); int i; ErlTimer* p; - erts_smp_mtx_lock(&tiw_lock); + erts_smp_mtx_lock(&tiw->lock); /* print the whole wheel, starting at the current position */ erts_printf("\ncurrent time = %bps tiw_pos = %d tiw_nto %d\n", - current_time, tiw_pos, tiw_nto); - i = tiw_pos; - if (tiw[i] != NULL) { + current_time, tiw->pos, tiw->nto); + i = tiw->pos; + if (tiw->w[i] != NULL) { erts_printf("%d:\n", i); - for(p = tiw[i]; p != NULL; p = p->next) { + for(p = tiw->w[i]; p != NULL; p = p->next) { erts_printf(" (timeout time %bps, slot %d)\n", ERTS_CLKTCKS_TO_MONOTONIC(p->timeout_pos), p->slot); } } - for(i = ((i+1) & (TIW_SIZE-1)); i != (tiw_pos & (TIW_SIZE-1)); i = ((i+1) & (TIW_SIZE-1))) { - if (tiw[i] != NULL) { + for(i = ((i+1) & (TIW_SIZE-1)); i != (tiw->pos & (TIW_SIZE-1)); i = ((i+1) & (TIW_SIZE-1))) { + if (tiw->w[i] != NULL) { erts_printf("%d:\n", i); - for(p = tiw[i]; p != NULL; p = p->next) { + for(p = tiw->w[i]; p != NULL; p = p->next) { erts_printf(" (timeout time %bps, slot %d)\n", ERTS_CLKTCKS_TO_MONOTONIC(p->timeout_pos), p->slot); } } } - erts_smp_mtx_unlock(&tiw_lock); + erts_smp_mtx_unlock(&tiw->lock); } #endif /* DEBUG */ -- cgit v1.2.3