/* * %CopyrightBegin% * * Copyright Ericsson AB 1996-2013. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved online at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * %CopyrightEnd% */ /* * TIMING WHEEL * * Timeouts kept in an wheel. A timeout is measured relative to the * current slot (tiw_pos) in the wheel, and inserted at slot * (tiw_pos + timeout) % TIW_SIZE. Each timeout also has a count * equal to timeout/TIW_SIZE, which is needed since the time axis * is wrapped arount the wheel. * * Several slots may be processed in one operation. If the number of * slots is greater that the wheel size, the wheel is only traversed * once, * * The following example shows a time axis where there is one timeout * at each "tick", and where 1, 2, 3 ... wheel slots are released in * one operation. The notation "wheel); } static ERTS_INLINE void set_timer_wheel(ErlTimer *p, ErtsTimerWheel *tiw) { erts_smp_atomic_set_relb(&p->wheel, (erts_aint_t) tiw); } static ERTS_INLINE void init_next_timeout(ErtsTimerWheel *tiw, ErtsMonotonicTime time) { erts_atomic64_init_nob(&tiw->next_timeout, (erts_aint64_t) time); } static ERTS_INLINE void set_next_timeout(ErtsTimerWheel *tiw, ErtsMonotonicTime time, int true_timeout) { tiw->true_next_timeout_time = true_timeout; tiw->next_timeout_time = time; erts_atomic64_set_relb(&tiw->next_timeout, (erts_aint64_t) time); } /* get the time (in units of TIW_ITIME) to the next timeout, or -1 if there are no timeouts */ static ERTS_INLINE ErtsMonotonicTime find_next_timeout(ErtsTimerWheel *tiw, ErtsMonotonicTime curr_time, ErtsMonotonicTime max_search_time) { int start_ix, tiw_pos_ix; ErlTimer *p; int true_min_timeout; ErtsMonotonicTime min_timeout, min_timeout_pos, slot_timeout_pos, timeout_limit; ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&tiw->lock)); if (tiw->true_next_timeout_time) return tiw->next_timeout_time; /* We never set next timeout beyond timeout_limit */ timeout_limit = curr_time + ERTS_MONOTONIC_DAY; if (tiw->nto == 0) { /* no timeouts in wheel */ true_min_timeout = tiw->true_next_timeout_time = 0; min_timeout_pos = ERTS_MONOTONIC_TO_CLKTCKS(timeout_limit); goto found_next; } /* * Don't want others entering trying to bump * timers while we are checking... */ set_next_timeout(tiw, timeout_limit, 0); true_min_timeout = 1; slot_timeout_pos = tiw->pos; min_timeout_pos = ERTS_MONOTONIC_TO_CLKTCKS(curr_time + max_search_time); start_ix = tiw_pos_ix = (int) (tiw->pos & (TIW_SIZE-1)); do { slot_timeout_pos++; if (slot_timeout_pos >= min_timeout_pos) { true_min_timeout = 0; break; } p = tiw->w[tiw_pos_ix]; while (p) { ErtsMonotonicTime timeout_pos; ASSERT(p != p->next); timeout_pos = p->timeout_pos; if (min_timeout_pos > timeout_pos) { min_timeout_pos = timeout_pos; if (min_timeout_pos <= slot_timeout_pos) goto found_next; } p = p->next; } tiw_pos_ix++; if (tiw_pos_ix == TIW_SIZE) tiw_pos_ix = 0; } while (start_ix != tiw_pos_ix); found_next: min_timeout = ERTS_CLKTCKS_TO_MONOTONIC(min_timeout_pos); if (min_timeout != tiw->next_timeout_time) set_next_timeout(tiw, min_timeout, true_min_timeout); return min_timeout; } static void remove_timer(ErtsTimerWheel *tiw, ErlTimer *p) { if (p->slot >= 0) { /* Timer in wheel... */ ASSERT(p->slot < TIW_SIZE); if (p->prev) p->prev->next = p->next; else { ASSERT(tiw->w[p->slot] == p); tiw->w[p->slot] = p->next; } if(p->next) p->next->prev = p->prev; } else { /* Timer in "at once" queue... */ ASSERT(p->slot == -1); if (p->prev) p->prev->next = p->next; else { ASSERT(tiw->at_once.head == p); tiw->at_once.head = p->next; } if (p->next) p->next->prev = p->prev; else { ASSERT(tiw->at_once.tail == p); tiw->at_once.tail = p->prev; } ASSERT(tiw->at_once.nto > 0); tiw->at_once.nto--; } p->slot = -2; p->next = NULL; p->prev = NULL; set_timer_wheel(p, NULL); tiw->nto--; } ErtsMonotonicTime erts_check_next_timeout_time(ErtsTimerWheel *tiw, ErtsMonotonicTime max_search_time) { ErtsMonotonicTime next, curr; curr = erts_get_monotonic_time(); erts_smp_mtx_lock(&tiw->lock); next = find_next_timeout(tiw, curr, max_search_time); erts_smp_mtx_unlock(&tiw->lock); return next; } #ifndef DEBUG #define ERTS_DBG_CHK_SAFE_TO_SKIP_TO(TIW, TO) ((void) 0) #else #define ERTS_DBG_CHK_SAFE_TO_SKIP_TO(TIW, TO) debug_check_safe_to_skip_to((TIW), (TO)) static void debug_check_safe_to_skip_to(ErtsTimerWheel *tiw, ErtsMonotonicTime skip_to_pos) { int slots, ix; ErlTimer *tmr; ErtsMonotonicTime tmp; ix = (int) (tiw->pos & (TIW_SIZE-1)); tmp = skip_to_pos - tiw->pos; ASSERT(tmp >= 0); if (tmp < (ErtsMonotonicTime) TIW_SIZE) slots = (int) tmp; else slots = TIW_SIZE; while (slots > 0) { tmr = tiw->w[ix]; while (tmr) { ASSERT(tmr->timeout_pos > skip_to_pos); tmr = tmr->next; } ix++; if (ix == TIW_SIZE) ix = 0; slots--; } } #endif void erts_bump_timers(ErtsTimerWheel *tiw, ErtsMonotonicTime curr_time) { int tiw_pos_ix, slots; ErlTimer *p, *timeout_head, **timeout_tail; ErtsMonotonicTime bump_to, tmp_slots; if (erts_smp_atomic32_cmpxchg_nob(&tiw->is_bumping, 1, 0) != 0) return; /* Another thread is currently bumping... */ bump_to = ERTS_MONOTONIC_TO_CLKTCKS(curr_time); erts_smp_mtx_lock(&tiw->lock); if (tiw->pos >= bump_to) { timeout_head = NULL; goto done; } /* Don't want others here while we are bumping... */ set_next_timeout(tiw, curr_time + ERTS_MONOTONIC_DAY, 0); if (!tiw->at_once.head) { timeout_head = NULL; timeout_tail = &timeout_head; } else { ASSERT(tiw->nto >= tiw->at_once.nto); p = timeout_head = tiw->at_once.head; while (1) { set_timer_wheel(p, NULL); if (!p->next) { timeout_tail = &p->next; break; } } tiw->nto -= tiw->at_once.nto; tiw->at_once.head = NULL; tiw->at_once.tail = NULL; tiw->at_once.nto = 0; } if (tiw->nto == 0) { ERTS_DBG_CHK_SAFE_TO_SKIP_TO(tiw, bump_to); tiw->pos = bump_to; goto done; } if (tiw->true_next_timeout_time) { ErtsMonotonicTime skip_until_pos; /* * No need inspecting slots where we know no timeouts * to trigger should reside. */ skip_until_pos = ERTS_MONOTONIC_TO_CLKTCKS(tiw->next_timeout_time); if (skip_until_pos > bump_to) skip_until_pos = bump_to; ERTS_DBG_CHK_SAFE_TO_SKIP_TO(tiw, skip_until_pos); ASSERT(skip_until_pos > tiw->pos); tiw->pos = skip_until_pos - 1; } tiw_pos_ix = (int) ((tiw->pos+1) & (TIW_SIZE-1)); tmp_slots = (bump_to - tiw->pos); if (tmp_slots < (ErtsMonotonicTime) TIW_SIZE) slots = (int) tmp_slots; else slots = TIW_SIZE; while (slots > 0) { p = tiw->w[tiw_pos_ix]; while (p) { ErlTimer *next = p->next; ASSERT(p != next); if (p->timeout_pos <= bump_to) { /* we have a timeout */ /* Remove from list */ remove_timer(tiw, p); *timeout_tail = p; /* Insert in timeout queue */ timeout_tail = &p->next; } p = next; } tiw_pos_ix++; if (tiw_pos_ix == TIW_SIZE) tiw_pos_ix = 0; slots--; } ASSERT(tmp_slots >= (ErtsMonotonicTime) TIW_SIZE || tiw_pos_ix == (int) ((bump_to+1) & (TIW_SIZE-1))); tiw->pos = bump_to; /* Search at most two seconds ahead... */ (void) find_next_timeout(tiw, curr_time, ERTS_SEC_TO_MONOTONIC(2)); done: erts_smp_mtx_unlock(&tiw->lock); erts_smp_atomic32_set_nob(&tiw->is_bumping, 0); /* Call timedout timers callbacks */ while (timeout_head) { ErlTimeoutProc timeout; void *arg; p = timeout_head; timeout_head = p->next; /* Here comes hairy use of the timer fields! * They are reset without having the lock. * It is assumed that no code but this will * accesses any field until the ->timeout * callback is called. */ ASSERT(p->timeout_pos <= bump_to); p->next = NULL; p->prev = NULL; p->slot = 0; timeout = p->timeout; arg = p->arg; (*timeout)(arg); } } Uint erts_timer_wheel_memory_size(void) { #ifdef ERTS_SMP return sizeof(ErtsTimerWheel)*(1 + erts_no_schedulers); #else return sizeof(ErtsTimerWheel); #endif } ErtsTimerWheel * erts_create_timer_wheel(int no) { ErtsMonotonicTime mtime; int i; ErtsTimerWheel *tiw; tiw = (ErtsTimerWheel *) erts_alloc(ERTS_ALC_T_TIMER_WHEEL, sizeof(ErtsTimerWheel)); for(i = 0; i < TIW_SIZE; i++) tiw->w[i] = NULL; erts_smp_atomic32_init_nob(&tiw->is_bumping, 0); erts_smp_mtx_init_x(&tiw->lock, "timer_wheel", make_small(no)); mtime = erts_get_monotonic_time(); tiw->pos = ERTS_MONOTONIC_TO_CLKTCKS(mtime); tiw->nto = 0; tiw->at_once.head = NULL; tiw->at_once.tail = NULL; tiw->at_once.nto = 0; tiw->true_next_timeout_time = 0; tiw->next_timeout_time = mtime + ERTS_MONOTONIC_DAY; init_next_timeout(tiw, mtime + ERTS_MONOTONIC_DAY); return tiw; } ErtsNextTimeoutRef erts_get_next_timeout_reference(ErtsTimerWheel *tiw) { return (ErtsNextTimeoutRef) &tiw->next_timeout; } /* this routine links the time cells into a free list at the start and sets the time queue as empty */ void erts_init_time(int time_correction, ErtsTimeWarpMode time_warp_mode) { int itime; /* system dependent init; must be done before do_time_init() if timer thread is enabled */ itime = erts_init_time_sup(time_correction, time_warp_mode); #ifdef TIW_ITIME_IS_CONSTANT if (itime != TIW_ITIME) { erl_exit(ERTS_ABORT_EXIT, "timer resolution mismatch %d != %d", itime, TIW_ITIME); } #else tiw_itime = itime; #endif erts_default_timer_wheel = erts_create_timer_wheel(0); } void erts_set_timer(ErlTimer *p, ErlTimeoutProc timeout, ErlCancelProc cancel, void *arg, Uint to) { ErtsMonotonicTime timeout_time, timeout_pos; ErtsMonotonicTime curr_time; ErtsTimerWheel *tiw; ErtsSchedulerData *esdp; curr_time = erts_get_monotonic_time(); esdp = erts_get_scheduler_data(); if (esdp) tiw = esdp->timer_wheel; else tiw = erts_default_timer_wheel; erts_smp_mtx_lock(&tiw->lock); if (get_timer_wheel(p)) ERTS_INTERNAL_ERROR("Double set timer"); p->timeout = timeout; p->cancel = cancel; p->arg = arg; if (to == 0) { timeout_pos = ERTS_MONOTONIC_TO_CLKTCKS(curr_time); tiw->nto++; tiw->at_once.nto++; p->next = NULL; p->prev = tiw->at_once.tail; tiw->at_once.tail = p; if (!tiw->at_once.head) tiw->at_once.head = p; p->timeout_pos = timeout_pos; p->slot = -1; timeout_time = ERTS_CLKTCKS_TO_MONOTONIC(timeout_pos); } else { int tm; ErtsMonotonicTime ticks; ticks = ERTS_MSEC_TO_CLKTCKS(to); timeout_pos = ERTS_MONOTONIC_TO_CLKTCKS(curr_time - 1) + 1 + ticks; /* calculate slot */ tm = (int) (timeout_pos & (TIW_SIZE-1)); p->slot = tm; /* insert at head of list at slot */ p->next = tiw->w[tm]; p->prev = NULL; if (p->next != NULL) p->next->prev = p; tiw->w[tm] = p; tiw->nto++; timeout_time = ERTS_CLKTCKS_TO_MONOTONIC(timeout_pos); p->timeout_pos = timeout_pos; ASSERT(ERTS_MSEC_TO_MONOTONIC(to) <= timeout_time - curr_time); ASSERT(timeout_time - curr_time < ERTS_MSEC_TO_MONOTONIC(to) + ERTS_CLKTCKS_TO_MONOTONIC(1)); } if (timeout_time < tiw->next_timeout_time) set_next_timeout(tiw, timeout_time, 1); set_timer_wheel(p, tiw); erts_smp_mtx_unlock(&tiw->lock); #if defined(ERTS_SMP) if (tiw == erts_default_timer_wheel) erts_interupt_aux_thread_timed(timeout_time); #endif } void erts_cancel_timer(ErlTimer *p) { ErtsTimerWheel *tiw; ErlCancelProc cancel; void *arg = NULL; /* Shut up faulty warning... */ tiw = get_timer_wheel(p); if (!tiw) return; erts_smp_mtx_lock(&tiw->lock); if (tiw != get_timer_wheel(p)) cancel = NULL; else { remove_timer(tiw, p); cancel = p->cancel; arg = p->arg; } erts_smp_mtx_unlock(&tiw->lock); if (cancel) (*cancel)(arg); } /* Returns the amount of time left in ms until the timer 'p' is triggered. 0 is returned if 'p' isn't active. 0 is returned also if the timer is overdue (i.e., would have triggered immediately if it hadn't been cancelled). */ Uint erts_time_left(ErlTimer *p) { ErtsTimerWheel *tiw; ErtsMonotonicTime current_time, timeout_time; tiw = get_timer_wheel(p); if (!tiw) return 0; erts_smp_mtx_lock(&tiw->lock); if (tiw != get_timer_wheel(p)) timeout_time = ERTS_MONOTONIC_TIME_MIN; else timeout_time = ERTS_CLKTCKS_TO_MONOTONIC(p->timeout_pos); erts_smp_mtx_unlock(&tiw->lock); current_time = erts_get_monotonic_time(); if (timeout_time <= current_time) return 0; return (Uint) ERTS_MONOTONIC_TO_MSEC(timeout_time - current_time); } #ifdef DEBUG void erts_p_slpq(void) { ErtsTimerWheel *tiw = erts_default_timer_wheel; ErtsMonotonicTime current_time = erts_get_monotonic_time(); int i; ErlTimer* p; erts_smp_mtx_lock(&tiw->lock); /* print the whole wheel, starting at the current position */ erts_printf("\ncurrent time = %bps tiw_pos = %d tiw_nto %d\n", current_time, tiw->pos, tiw->nto); i = tiw->pos; if (tiw->w[i] != NULL) { erts_printf("%d:\n", i); for(p = tiw->w[i]; p != NULL; p = p->next) { erts_printf(" (timeout time %bps, slot %d)\n", ERTS_CLKTCKS_TO_MONOTONIC(p->timeout_pos), p->slot); } } for(i = ((i+1) & (TIW_SIZE-1)); i != (tiw->pos & (TIW_SIZE-1)); i = ((i+1) & (TIW_SIZE-1))) { if (tiw->w[i] != NULL) { erts_printf("%d:\n", i); for(p = tiw->w[i]; p != NULL; p = p->next) { erts_printf(" (timeout time %bps, slot %d)\n", ERTS_CLKTCKS_TO_MONOTONIC(p->timeout_pos), p->slot); } } } erts_smp_mtx_unlock(&tiw->lock); } #endif /* DEBUG */