diff options
author | Rickard Green <[email protected]> | 2015-02-13 10:12:02 +0100 |
---|---|---|
committer | Rickard Green <[email protected]> | 2015-03-20 15:28:52 +0100 |
commit | 1d9350693fe2c4d1d6b2baa504aacd070e023a1a (patch) | |
tree | 3ac1fe2c79aaeadacdb06acaa0d106095713679a /erts/emulator/beam | |
parent | 6b1921d767de5cd1a980234f83b36dbfa13d9fc7 (diff) | |
download | otp-1d9350693fe2c4d1d6b2baa504aacd070e023a1a.tar.gz otp-1d9350693fe2c4d1d6b2baa504aacd070e023a1a.tar.bz2 otp-1d9350693fe2c4d1d6b2baa504aacd070e023a1a.zip |
Multiple timer wheels
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r-- | erts/emulator/beam/erl_bif_timer.c | 2 | ||||
-rw-r--r-- | erts/emulator/beam/erl_init.c | 1 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 242 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 4 | ||||
-rw-r--r-- | erts/emulator/beam/erl_time.h | 30 | ||||
-rw-r--r-- | erts/emulator/beam/erl_time_sup.c | 1 | ||||
-rw-r--r-- | erts/emulator/beam/time.c | 387 | ||||
-rw-r--r-- | erts/emulator/beam/utils.c | 2 |
8 files changed, 424 insertions, 245 deletions
diff --git a/erts/emulator/beam/erl_bif_timer.c b/erts/emulator/beam/erl_bif_timer.c index 8b444f2b01..0bd8d20c34 100644 --- a/erts/emulator/beam/erl_bif_timer.c +++ b/erts/emulator/beam/erl_bif_timer.c @@ -481,7 +481,7 @@ setup_bif_timer(Uint32 xflags, tab_insert(btm); ASSERT(btm == tab_find(ref)); - btm->tm.active = 0; /* MUST be initalized */ + erts_init_timer(&btm->tm); erts_set_timer(&btm->tm, (ErlTimeoutProc) bif_timer_timeout, (ErlCancelProc) bif_timer_cleanup, diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index be2c5ced9e..6708324a29 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -340,6 +340,7 @@ erl_init(int ncpu, no_dirty_io_schedulers #endif ); + erts_late_init_time_sup(); erts_init_cpu_topology(); /* Must be after init_scheduling */ erts_init_gc(); /* Must be after init_scheduling */ erts_alloc_late_init(); diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 6e562e16c8..45b6fc5fb3 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -2187,7 +2187,7 @@ aux_work_timeout_late_init(void) { aux_work_tmo->initialized = 1; if (erts_atomic32_read_nob(&aux_work_tmo->refc)) { - aux_work_tmo->timer.data.active = 0; + erts_init_timer(&aux_work_tmo->timer.data); erts_set_timer(&aux_work_tmo->timer.data, aux_work_timeout, NULL, @@ -2220,7 +2220,6 @@ aux_work_timeout(void *unused) if (refc != 1 || 1 != erts_atomic32_cmpxchg_relb(&aux_work_tmo->refc, 0, 1)) { /* Setup next timeout... */ - aux_work_tmo->timer.data.active = 0; erts_set_timer(&aux_work_tmo->timer.data, aux_work_timeout, NULL, @@ -2239,7 +2238,7 @@ setup_aux_work_timer(void) else #endif { - aux_work_tmo->timer.data.active = 0; + erts_init_timer(&aux_work_tmo->timer.data); erts_set_timer(&aux_work_tmo->timer.data, aux_work_timeout, NULL, @@ -2641,6 +2640,13 @@ thr_prgr_fin_wait(void *vssi) static void init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp, char *dawwp); +void +erts_interupt_aux_thread_timed(ErtsMonotonicTime timeout_time) +{ + /* TODO only poke when needed (based on timeout_time) */ + erts_sched_poke(ERTS_SCHED_SLEEP_INFO_IX(-1)); +} + static void * aux_thread(void *unused) { @@ -2649,6 +2655,11 @@ aux_thread(void *unused) erts_aint32_t aux_work; ErtsThrPrgrCallbacks callbacks; int thr_prgr_active = 1; + ErtsTimerWheel *timer_wheel = erts_default_timer_wheel; + ErtsNextTimeoutRef nxt_tmo_ref = erts_get_next_timeout_reference(timer_wheel); + + if (!timer_wheel) + ERTS_INTERNAL_ERROR("Missing aux timer wheel"); #ifdef ERTS_ENABLE_LOCK_CHECK { @@ -2672,6 +2683,7 @@ aux_thread(void *unused) sched_prep_spin_wait(ssi); while (1) { + ErtsMonotonicTime current_time; erts_aint32_t flgs; aux_work = erts_atomic32_read_acqb(&ssi->aux_work); @@ -2683,28 +2695,56 @@ aux_thread(void *unused) erts_thr_progress_leader_update(NULL); } - if (!aux_work) { - if (thr_prgr_active) - erts_thr_progress_active(NULL, thr_prgr_active = 0); - erts_thr_progress_prepare_wait(NULL); + if (aux_work) { + current_time = erts_get_monotonic_time(); + if (current_time >= erts_next_timeout_time(nxt_tmo_ref)) { + if (!thr_prgr_active) + erts_thr_progress_active(NULL, thr_prgr_active = 1); + erts_bump_timers(timer_wheel, current_time); + } + } + else { + ErtsMonotonicTime timeout_time; + timeout_time = erts_check_next_timeout_time(timer_wheel, + ERTS_SEC_TO_MONOTONIC(10*60)); + current_time = erts_get_monotonic_time(); + if (current_time >= timeout_time) { + if (!thr_prgr_active) + erts_thr_progress_active(NULL, thr_prgr_active = 1); + } + else { + if (thr_prgr_active) + erts_thr_progress_active(NULL, thr_prgr_active = 0); + erts_thr_progress_prepare_wait(NULL); - ERTS_SCHED_FAIR_YIELD(); + ERTS_SCHED_FAIR_YIELD(); - flgs = sched_spin_wait(ssi, 0); + flgs = sched_spin_wait(ssi, 0); - if (flgs & ERTS_SSI_FLG_SLEEPING) { - ASSERT(flgs & ERTS_SSI_FLG_WAITING); - flgs = sched_set_sleeptype(ssi, ERTS_SSI_FLG_TSE_SLEEPING); if (flgs & ERTS_SSI_FLG_SLEEPING) { - int res; - ASSERT(flgs & ERTS_SSI_FLG_TSE_SLEEPING); ASSERT(flgs & ERTS_SSI_FLG_WAITING); - do { - res = erts_tse_wait(ssi->event); - } while (res == EINTR); + flgs = sched_set_sleeptype(ssi, ERTS_SSI_FLG_TSE_SLEEPING); + if (flgs & ERTS_SSI_FLG_SLEEPING) { + int res; + ASSERT(flgs & ERTS_SSI_FLG_TSE_SLEEPING); + ASSERT(flgs & ERTS_SSI_FLG_WAITING); + current_time = erts_get_monotonic_time(); + do { + Sint64 timeout; + if (current_time >= timeout_time) + break; + timeout = ERTS_MONOTONIC_TO_NSEC(timeout_time + - current_time + - 1) + 1; + res = erts_tse_twait(ssi->event, timeout); + current_time = erts_get_monotonic_time(); + } while (res == EINTR); + } } + erts_thr_progress_finalize_wait(NULL); } - erts_thr_progress_finalize_wait(NULL); + if (current_time >= timeout_time) + erts_bump_timers(timer_wheel, current_time); } flgs = sched_prep_spin_wait(ssi); @@ -2771,6 +2811,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) sched_wall_time_change(esdp, thr_prgr_active); while (1) { + ErtsMonotonicTime current_time; aux_work = erts_atomic32_read_acqb(&ssi->aux_work); if (aux_work) { @@ -2784,34 +2825,65 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) erts_thr_progress_leader_update(esdp); } - if (aux_work) + if (aux_work) { flgs = erts_smp_atomic32_read_acqb(&ssi->flags); + current_time = erts_get_monotonic_time(); + if (current_time >= erts_next_timeout_time(esdp->next_tmo_ref)) { + if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && !thr_prgr_active) { + erts_thr_progress_active(esdp, thr_prgr_active = 1); + sched_wall_time_change(esdp, 1); + } + erts_bump_timers(esdp->timer_wheel, current_time); + } + } else { - if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) { - if (thr_prgr_active) { - erts_thr_progress_active(esdp, thr_prgr_active = 0); - sched_wall_time_change(esdp, 0); + ErtsMonotonicTime timeout_time; + timeout_time = erts_check_next_timeout_time(esdp->timer_wheel, + ERTS_SEC_TO_MONOTONIC(10*60)); + current_time = erts_get_monotonic_time(); + if (current_time >= timeout_time) { + if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && !thr_prgr_active) { + erts_thr_progress_active(esdp, thr_prgr_active = 1); + sched_wall_time_change(esdp, 1); } - erts_thr_progress_prepare_wait(esdp); } + else { + if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) { + if (thr_prgr_active) { + erts_thr_progress_active(esdp, thr_prgr_active = 0); + sched_wall_time_change(esdp, 0); + } + erts_thr_progress_prepare_wait(esdp); + } - ERTS_SCHED_FAIR_YIELD(); + ERTS_SCHED_FAIR_YIELD(); - flgs = sched_spin_wait(ssi, spincount); - if (flgs & ERTS_SSI_FLG_SLEEPING) { - ASSERT(flgs & ERTS_SSI_FLG_WAITING); - flgs = sched_set_sleeptype(ssi, ERTS_SSI_FLG_TSE_SLEEPING); + flgs = sched_spin_wait(ssi, spincount); if (flgs & ERTS_SSI_FLG_SLEEPING) { - int res; - ASSERT(flgs & ERTS_SSI_FLG_TSE_SLEEPING); ASSERT(flgs & ERTS_SSI_FLG_WAITING); - do { - res = erts_tse_twait(ssi->event, -1); - } while (res == EINTR); + flgs = sched_set_sleeptype(ssi, ERTS_SSI_FLG_TSE_SLEEPING); + if (flgs & ERTS_SSI_FLG_SLEEPING) { + int res; + ASSERT(flgs & ERTS_SSI_FLG_TSE_SLEEPING); + ASSERT(flgs & ERTS_SSI_FLG_WAITING); + current_time = erts_get_monotonic_time(); + do { + Sint64 timeout; + if (current_time >= timeout_time) + break; + timeout = ERTS_MONOTONIC_TO_NSEC(timeout_time + - current_time + - 1) + 1; + res = erts_tse_twait(ssi->event, timeout); + current_time = erts_get_monotonic_time(); + } while (res == EINTR); + } } + if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) + erts_thr_progress_finalize_wait(esdp); } - if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) - erts_thr_progress_finalize_wait(esdp); + if (current_time >= timeout_time) + erts_bump_timers(esdp->timer_wheel, current_time); } if (!(flgs & ERTS_SSI_FLG_WAITING)) { @@ -2879,8 +2951,8 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) erl_sys_schedule(1); /* Might give us something to do */ current_time = erts_get_monotonic_time(); - if (current_time >= erts_next_timeout_time()) - erts_bump_timers(current_time); + if (current_time >= erts_next_timeout_time(esdp->next_tmo_ref)) + erts_bump_timers(esdp->timer_wheel, current_time); sys_aux_work: #ifndef ERTS_SMP @@ -2997,8 +3069,8 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) { ErtsMonotonicTime current_time = erts_get_monotonic_time(); - if (current_time >= erts_next_timeout_time()) - erts_bump_timers(current_time); + if (current_time >= erts_next_timeout_time(esdp->next_tmo_ref)) + erts_bump_timers(esdp->timer_wheel, current_time); } #ifndef ERTS_SMP @@ -5269,6 +5341,10 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num, #else esdp->no = (Uint) num; #endif + + esdp->timer_wheel = erts_default_timer_wheel; + esdp->next_tmo_ref = erts_get_next_timeout_reference(esdp->timer_wheel); + esdp->ssi = ssi; esdp->current_process = NULL; esdp->current_port = NULL; @@ -6765,6 +6841,7 @@ suspend_scheduler(ErtsSchedulerData *esdp) erts_smp_mtx_unlock(&schdlr_sspnd.mtx); while (1) { + ErtsMonotonicTime current_time; erts_aint32_t qmask; erts_aint32_t flgs; @@ -6789,30 +6866,64 @@ suspend_scheduler(ErtsSchedulerData *esdp) } } - if (!aux_work) { - if (thr_prgr_active) { - erts_thr_progress_active(esdp, thr_prgr_active = 0); - sched_wall_time_change(esdp, 0); + if (aux_work) { + current_time = erts_get_monotonic_time(); + if (current_time >= erts_next_timeout_time(esdp->next_tmo_ref)) { + if (!thr_prgr_active) { + erts_thr_progress_active(esdp, thr_prgr_active = 1); + sched_wall_time_change(esdp, 1); + } + erts_bump_timers(esdp->timer_wheel, current_time); } - erts_thr_progress_prepare_wait(esdp); - flgs = sched_spin_suspended(ssi, - ERTS_SCHED_SUSPEND_SLEEP_SPINCOUNT); - if (flgs == (ERTS_SSI_FLG_SLEEPING - | ERTS_SSI_FLG_WAITING - | ERTS_SSI_FLG_SUSPENDED)) { - flgs = sched_set_suspended_sleeptype(ssi); + } + else { + ErtsMonotonicTime timeout_time; + timeout_time = erts_check_next_timeout_time(esdp->timer_wheel, + ERTS_SEC_TO_MONOTONIC(60*60)); + current_time = erts_get_monotonic_time(); + + if (current_time >= timeout_time) { + if (!thr_prgr_active) { + erts_thr_progress_active(esdp, thr_prgr_active = 1); + sched_wall_time_change(esdp, 1); + } + } + else { + if (thr_prgr_active) { + erts_thr_progress_active(esdp, thr_prgr_active = 0); + sched_wall_time_change(esdp, 0); + } + erts_thr_progress_prepare_wait(esdp); + flgs = sched_spin_suspended(ssi, + ERTS_SCHED_SUSPEND_SLEEP_SPINCOUNT); if (flgs == (ERTS_SSI_FLG_SLEEPING - | ERTS_SSI_FLG_TSE_SLEEPING | ERTS_SSI_FLG_WAITING | ERTS_SSI_FLG_SUSPENDED)) { - int res; - - do { - res = erts_tse_twait(ssi->event, -1); - } while (res == EINTR); + flgs = sched_set_suspended_sleeptype(ssi); + if (flgs == (ERTS_SSI_FLG_SLEEPING + | ERTS_SSI_FLG_TSE_SLEEPING + | ERTS_SSI_FLG_WAITING + | ERTS_SSI_FLG_SUSPENDED)) { + int res; + + current_time = erts_get_monotonic_time(); + do { + Sint64 timeout; + if (current_time >= timeout_time) + break; + timeout = ERTS_MONOTONIC_TO_NSEC(timeout_time + - current_time + - 1) + 1; + res = erts_tse_twait(ssi->event, timeout); + current_time = erts_get_monotonic_time(); + } while (res == EINTR); + } } + erts_thr_progress_finalize_wait(esdp); } - erts_thr_progress_finalize_wait(esdp); + + if (current_time >= timeout_time) + erts_bump_timers(esdp->timer_wheel, current_time); } flgs = sched_prep_spin_suspended(ssi, (ERTS_SSI_FLG_WAITING @@ -7631,6 +7742,9 @@ sched_thread_func(void *vesdp) ErtsThrPrgrCallbacks callbacks; ErtsSchedulerData *esdp = vesdp; Uint no = esdp->no; + + esdp->timer_wheel = erts_create_timer_wheel((int) no); + esdp->next_tmo_ref = erts_get_next_timeout_reference(esdp->timer_wheel); #ifdef ERTS_SMP ERTS_SCHED_SLEEP_INFO_IX(no - 1)->event = erts_tse_fetch(); callbacks.arg = (void *) esdp->ssi; @@ -9048,9 +9162,9 @@ Process *schedule(Process *p, int calls) { ErtsMonotonicTime current_time = erts_get_monotonic_time(); - if (current_time >= erts_next_timeout_time()) { + if (current_time >= erts_next_timeout_time(esdp->next_tmo_ref)) { erts_smp_runq_unlock(rq); - erts_bump_timers(current_time); + erts_bump_timers(esdp->timer_wheel, current_time); erts_smp_runq_lock(rq); } } @@ -9213,8 +9327,8 @@ Process *schedule(Process *p, int calls) erl_sys_schedule(1); current_time = erts_get_monotonic_time(); - if (current_time >= erts_next_timeout_time()) - erts_bump_timers(current_time); + if (current_time >= erts_next_timeout_time(esdp->next_tmo_ref)) + erts_bump_timers(esdp->timer_wheel, current_time); #ifdef ERTS_SMP erts_smp_runq_lock(rq); @@ -10652,7 +10766,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). #ifdef ERTS_SMP p->common.u.alive.ptimer = NULL; #else - sys_memset(&p->common.u.alive.tm, 0, sizeof(ErlTimer)); + erts_init_timer(&p->common.u.alive.tm); #endif p->common.u.alive.reg = NULL; @@ -10845,7 +10959,7 @@ void erts_init_empty_process(Process *p) #ifdef ERTS_SMP p->common.u.alive.ptimer = NULL; #else - memset(&(p->common.u.alive.tm), 0, sizeof(ErlTimer)); + erts_init_timer(&p->common.u.alive.tm); #endif p->next = NULL; p->off_heap.first = NULL; diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 53a992e115..77a4b45b09 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -562,6 +562,8 @@ struct ErtsSchedulerData_ { Eterm* x_reg_array; /* X registers */ FloatDef* f_reg_array; /* Floating point registers. */ + ErtsTimerWheel *timer_wheel; + ErtsNextTimeoutRef next_tmo_ref; #ifdef ERTS_SMP ethr_tid tid; /* Thread id */ struct erl_bits_state erl_bits_state; /* erl_bits.c state */ @@ -2237,6 +2239,8 @@ extern int erts_disable_proc_not_running_opt; void erts_smp_notify_inc_runq(ErtsRunQueue *runq); +void erts_interupt_aux_thread_timed(ErtsMonotonicTime timeout_time); + #ifdef ERTS_SMP void erts_sched_finish_poke(ErtsSchedulerSleepInfo *, erts_aint32_t); ERTS_GLB_INLINE void erts_sched_poke(ErtsSchedulerSleepInfo *ssi); diff --git a/erts/emulator/beam/erl_time.h b/erts/emulator/beam/erl_time.h index e461594e9c..7933f4973d 100644 --- a/erts/emulator/beam/erl_time.h +++ b/erts/emulator/beam/erl_time.h @@ -26,6 +26,10 @@ #define ERTS_TIME_ASSERT(B) ((void) 1) #endif +typedef struct ErtsTimerWheel_ ErtsTimerWheel; +typedef erts_atomic64_t * ErtsNextTimeoutRef; +extern ErtsTimerWheel *erts_default_timer_wheel; + extern SysTimeval erts_first_emu_time; /* @@ -35,8 +39,8 @@ typedef struct erl_timer { struct erl_timer* next; /* next entry tiw slot or chain */ struct erl_timer* prev; /* prev entry tiw slot or chain */ Uint slot; /* slot in timer wheel */ + erts_smp_atomic_t wheel; ErtsMonotonicTime timeout_pos; /* Timeout in absolute clock ticks */ - int active; /* 1=activated, 0=deactivated */ /* called when timeout */ void (*timeout)(void*); /* called when cancel (may be NULL) */ @@ -63,7 +67,6 @@ union ErtsSmpPTimer_ { ErtsSmpPTimer *next; }; - void erts_create_smp_ptimer(ErtsSmpPTimer **timer_ref, Eterm id, ErlTimeoutProc timeout_func, @@ -79,28 +82,35 @@ void erts_late_init_time_sup(void); /* timer-wheel api */ +ErtsTimerWheel *erts_create_timer_wheel(int); +ErtsNextTimeoutRef erts_get_next_timeout_reference(ErtsTimerWheel *); void erts_init_time(int time_correction, ErtsTimeWarpMode time_warp_mode); void erts_set_timer(ErlTimer*, ErlTimeoutProc, ErlCancelProc, void*, Uint); void erts_cancel_timer(ErlTimer*); -void erts_bump_timers(ErtsMonotonicTime); -Uint erts_timer_wheel_memory_size(void); Uint erts_time_left(ErlTimer *); +void erts_bump_timers(ErtsTimerWheel *, ErtsMonotonicTime); +Uint erts_timer_wheel_memory_size(void); #ifdef DEBUG void erts_p_slpq(void); #endif -ErtsMonotonicTime erts_check_next_timeout_time(ErtsMonotonicTime); +ErtsMonotonicTime erts_check_next_timeout_time(ErtsTimerWheel *, + ErtsMonotonicTime); -extern erts_atomic64_t erts_next_timeout__; - -ERTS_GLB_INLINE ErtsMonotonicTime erts_next_timeout_time(void); +ERTS_GLB_INLINE void erts_init_timer(ErlTimer *p); +ERTS_GLB_INLINE ErtsMonotonicTime erts_next_timeout_time(ErtsNextTimeoutRef); #if ERTS_GLB_INLINE_INCL_FUNC_DEF -ERTS_GLB_INLINE ErtsMonotonicTime erts_next_timeout_time(void) +ERTS_GLB_INLINE void erts_init_timer(ErlTimer *p) +{ + erts_smp_atomic_init_nob(&p->wheel, (erts_aint_t) NULL); +} + +ERTS_GLB_INLINE ErtsMonotonicTime erts_next_timeout_time(ErtsNextTimeoutRef nxt_tmo_ref) { - return (ErtsMonotonicTime) erts_atomic64_read_acqb(&erts_next_timeout__); + return (ErtsMonotonicTime) erts_atomic64_read_acqb((erts_atomic64_t *) nxt_tmo_ref); } #endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ diff --git a/erts/emulator/beam/erl_time_sup.c b/erts/emulator/beam/erl_time_sup.c index 7dfa7d8743..d47d1682d7 100644 --- a/erts/emulator/beam/erl_time_sup.c +++ b/erts/emulator/beam/erl_time_sup.c @@ -690,6 +690,7 @@ static void late_init_time_correction(void) { if (time_sup.inf.c.finalized_offset) { + erts_init_timer(&time_sup.inf.c.parmon.timer); erts_set_timer(&time_sup.inf.c.parmon.timer, #ifndef ERTS_HAVE_CORRECTED_OS_MONOTONIC init_check_time_correction, diff --git a/erts/emulator/beam/time.c b/erts/emulator/beam/time.c index c9f8b68bca..9f997e1d0b 100644 --- a/erts/emulator/beam/time.c +++ b/erts/emulator/beam/time.c @@ -86,9 +86,6 @@ #define ERTS_MONOTONIC_DAY ERTS_SEC_TO_MONOTONIC(60*60*24) #define ERTS_CLKTCKS_DAY ERTS_MONOTONIC_TO_CLKTCKS(ERTS_MONOTONIC_DAY) -static erts_smp_atomic32_t is_bumping; -static erts_smp_mtx_t tiw_lock; - /* BEGIN tiw_lock protected variables ** @@ -101,14 +98,6 @@ static erts_smp_mtx_t tiw_lock; #else #define TIW_SIZE (1 << 20) #endif -static ErlTimer** tiw; /* the timing wheel, allocated in init_time() */ -static ErtsMonotonicTime tiw_pos; /* current position in wheel */ -static Uint tiw_nto; /* number of timeouts in wheel */ -static struct { - ErlTimer *head; - ErlTimer **tail; - Uint nto; -} tiw_at_once; /* Actual interval time chosen by sys_init_time() */ @@ -120,23 +109,52 @@ static int tiw_itime; /* Constant after init */ # define TIW_ITIME tiw_itime #endif -static int true_next_timeout_time; -static ErtsMonotonicTime next_timeout_time; -erts_atomic64_t erts_next_timeout__; +struct ErtsTimerWheel_ { + ErlTimer *w[TIW_SIZE]; + ErtsMonotonicTime pos; + Uint nto; + struct { + ErlTimer *head; + ErlTimer **tail; + Uint nto; + } at_once; + int true_next_timeout_time; + ErtsMonotonicTime next_timeout_time; + erts_atomic64_t next_timeout; + erts_smp_atomic32_t is_bumping; + erts_smp_mtx_t lock; +}; + +ErtsTimerWheel *erts_default_timer_wheel; /* managed by aux thread */ + +static ERTS_INLINE ErtsTimerWheel * +get_timer_wheel(ErlTimer *p) +{ + return (ErtsTimerWheel *) erts_smp_atomic_read_acqb(&p->wheel); +} + +static ERTS_INLINE void +set_timer_wheel(ErlTimer *p, ErtsTimerWheel *tiw) +{ + erts_smp_atomic_set_relb(&p->wheel, (erts_aint_t) tiw); +} static ERTS_INLINE void -init_next_timeout(ErtsMonotonicTime time) +init_next_timeout(ErtsTimerWheel *tiw, + ErtsMonotonicTime time) { - erts_atomic64_init_nob(&erts_next_timeout__, + erts_atomic64_init_nob(&tiw->next_timeout, (erts_aint64_t) time); } static ERTS_INLINE void -set_next_timeout(ErtsMonotonicTime time, int true_timeout) +set_next_timeout(ErtsTimerWheel *tiw, + ErtsMonotonicTime time, + int true_timeout) { - true_next_timeout_time = true_timeout; - next_timeout_time = time; - erts_atomic64_set_relb(&erts_next_timeout__, + tiw->true_next_timeout_time = true_timeout; + tiw->next_timeout_time = time; + erts_atomic64_set_relb(&tiw->next_timeout, (erts_aint64_t) time); } @@ -144,7 +162,8 @@ set_next_timeout(ErtsMonotonicTime time, int true_timeout) or -1 if there are no timeouts */ static ERTS_INLINE ErtsMonotonicTime -find_next_timeout(ErtsMonotonicTime curr_time, +find_next_timeout(ErtsTimerWheel *tiw, + ErtsMonotonicTime curr_time, ErtsMonotonicTime max_search_time) { int start_ix, tiw_pos_ix; @@ -152,16 +171,16 @@ find_next_timeout(ErtsMonotonicTime curr_time, int true_min_timeout; ErtsMonotonicTime min_timeout, min_timeout_pos, slot_timeout_pos, timeout_limit; - ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&tiw_lock)); + ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&tiw->lock)); - if (true_next_timeout_time) - return next_timeout_time; + if (tiw->true_next_timeout_time) + return tiw->next_timeout_time; /* We never set next timeout beyond timeout_limit */ timeout_limit = curr_time + ERTS_MONOTONIC_DAY; - if (tiw_nto == 0) { /* no timeouts in wheel */ - true_min_timeout = true_next_timeout_time = 0; + if (tiw->nto == 0) { /* no timeouts in wheel */ + true_min_timeout = tiw->true_next_timeout_time = 0; min_timeout_pos = ERTS_MONOTONIC_TO_CLKTCKS(timeout_limit); goto found_next; } @@ -170,13 +189,13 @@ find_next_timeout(ErtsMonotonicTime curr_time, * Don't want others entering trying to bump * timers while we are checking... */ - set_next_timeout(timeout_limit, 0); + set_next_timeout(tiw, timeout_limit, 0); true_min_timeout = 1; - slot_timeout_pos = tiw_pos; + slot_timeout_pos = tiw->pos; min_timeout_pos = ERTS_MONOTONIC_TO_CLKTCKS(curr_time + max_search_time); - start_ix = tiw_pos_ix = (int) (tiw_pos & (TIW_SIZE-1)); + start_ix = tiw_pos_ix = (int) (tiw->pos & (TIW_SIZE-1)); do { slot_timeout_pos++; @@ -185,7 +204,7 @@ find_next_timeout(ErtsMonotonicTime curr_time, break; } - p = tiw[tiw_pos_ix]; + p = tiw->w[tiw_pos_ix]; while (p) { ErtsMonotonicTime timeout_pos; @@ -207,16 +226,18 @@ find_next_timeout(ErtsMonotonicTime curr_time, found_next: min_timeout = ERTS_CLKTCKS_TO_MONOTONIC(min_timeout_pos); - if (min_timeout != next_timeout_time) - set_next_timeout(min_timeout, true_min_timeout); + if (min_timeout != tiw->next_timeout_time) + set_next_timeout(tiw, min_timeout, true_min_timeout); return min_timeout; } -static void remove_timer(ErlTimer *p) { +static void +remove_timer(ErtsTimerWheel *tiw, ErlTimer *p) +{ /* first */ if (!p->prev) { - tiw[p->slot] = p->next; + tiw->w[p->slot] = p->next; if(p->next) p->next->prev = NULL; } else { @@ -233,40 +254,41 @@ static void remove_timer(ErlTimer *p) { p->next = NULL; p->prev = NULL; - /* Make sure cancel callback isn't called */ - p->active = 0; - tiw_nto--; + + set_timer_wheel(p, NULL); + tiw->nto--; } ErtsMonotonicTime -erts_check_next_timeout_time(ErtsMonotonicTime max_search_time) +erts_check_next_timeout_time(ErtsTimerWheel *tiw, + ErtsMonotonicTime max_search_time) { ErtsMonotonicTime next, curr; curr = erts_get_monotonic_time(); - erts_smp_mtx_lock(&tiw_lock); + erts_smp_mtx_lock(&tiw->lock); - next = find_next_timeout(curr, max_search_time); + next = find_next_timeout(tiw, curr, max_search_time); - erts_smp_mtx_unlock(&tiw_lock); + erts_smp_mtx_unlock(&tiw->lock); return next; } #ifndef DEBUG -#define ERTS_DBG_CHK_SAFE_TO_SKIP_TO(TO) ((void) 0) +#define ERTS_DBG_CHK_SAFE_TO_SKIP_TO(TIW, TO) ((void) 0) #else -#define ERTS_DBG_CHK_SAFE_TO_SKIP_TO(TO) debug_check_safe_to_skip_to((TO)) +#define ERTS_DBG_CHK_SAFE_TO_SKIP_TO(TIW, TO) debug_check_safe_to_skip_to((TIW), (TO)) static void -debug_check_safe_to_skip_to(ErtsMonotonicTime skip_to_pos) +debug_check_safe_to_skip_to(ErtsTimerWheel *tiw, ErtsMonotonicTime skip_to_pos) { int slots, ix; ErlTimer *tmr; ErtsMonotonicTime tmp; - ix = (int) (tiw_pos & (TIW_SIZE-1)); - tmp = skip_to_pos - tiw_pos; + ix = (int) (tiw->pos & (TIW_SIZE-1)); + tmp = skip_to_pos - tiw->pos; ASSERT(tmp >= 0); if (tmp < (ErtsMonotonicTime) TIW_SIZE) slots = (int) tmp; @@ -274,7 +296,7 @@ debug_check_safe_to_skip_to(ErtsMonotonicTime skip_to_pos) slots = TIW_SIZE; while (slots > 0) { - tmr = tiw[ix]; + tmr = tiw->w[ix]; while (tmr) { ASSERT(tmr->timeout_pos > skip_to_pos); tmr = tmr->next; @@ -287,79 +309,80 @@ debug_check_safe_to_skip_to(ErtsMonotonicTime skip_to_pos) } #endif -void erts_bump_timers(ErtsMonotonicTime curr_time) +void +erts_bump_timers(ErtsTimerWheel *tiw, ErtsMonotonicTime curr_time) { int tiw_pos_ix, slots; ErlTimer *p, *timeout_head, **timeout_tail; ErtsMonotonicTime bump_to, tmp_slots; - if (erts_smp_atomic32_cmpxchg_nob(&is_bumping, 1, 0) != 0) + if (erts_smp_atomic32_cmpxchg_nob(&tiw->is_bumping, 1, 0) != 0) return; /* Another thread is currently bumping... */ bump_to = ERTS_MONOTONIC_TO_CLKTCKS(curr_time); - erts_smp_mtx_lock(&tiw_lock); + erts_smp_mtx_lock(&tiw->lock); - if (tiw_pos >= bump_to) { + if (tiw->pos >= bump_to) { timeout_head = NULL; goto done; } /* Don't want others here while we are bumping... */ - set_next_timeout(curr_time + ERTS_MONOTONIC_DAY, 0); + set_next_timeout(tiw, curr_time + ERTS_MONOTONIC_DAY, 0); - if (!tiw_at_once.head) { + if (!tiw->at_once.head) { timeout_head = NULL; timeout_tail = &timeout_head; } else { - ASSERT(tiw_nto >= tiw_at_once.nto); - timeout_head = tiw_at_once.head; - timeout_tail = tiw_at_once.tail; - tiw_nto -= tiw_at_once.nto; - tiw_at_once.head = NULL; - tiw_at_once.tail = &tiw_at_once.head; - tiw_at_once.nto = 0; + ASSERT(tiw->nto >= tiw->at_once.nto); + timeout_head = tiw->at_once.head; + timeout_tail = tiw->at_once.tail; + tiw->nto -= tiw->at_once.nto; + tiw->at_once.head = NULL; + tiw->at_once.tail = &tiw->at_once.head; + tiw->at_once.nto = 0; } - if (tiw_nto == 0) { - ERTS_DBG_CHK_SAFE_TO_SKIP_TO(bump_to); - tiw_pos = bump_to; + if (tiw->nto == 0) { + ERTS_DBG_CHK_SAFE_TO_SKIP_TO(tiw, bump_to); + tiw->pos = bump_to; goto done; } - if (true_next_timeout_time) { + if (tiw->true_next_timeout_time) { ErtsMonotonicTime skip_until_pos; /* * No need inspecting slots where we know no timeouts * to trigger should reside. */ - skip_until_pos = ERTS_MONOTONIC_TO_CLKTCKS(next_timeout_time); + skip_until_pos = ERTS_MONOTONIC_TO_CLKTCKS(tiw->next_timeout_time); if (skip_until_pos > bump_to) skip_until_pos = bump_to; - ERTS_DBG_CHK_SAFE_TO_SKIP_TO(skip_until_pos); - ASSERT(skip_until_pos > tiw_pos); + ERTS_DBG_CHK_SAFE_TO_SKIP_TO(tiw, skip_until_pos); + ASSERT(skip_until_pos > tiw->pos); - tiw_pos = skip_until_pos - 1; + tiw->pos = skip_until_pos - 1; } - tiw_pos_ix = (int) ((tiw_pos+1) & (TIW_SIZE-1)); - tmp_slots = (bump_to - tiw_pos); + tiw_pos_ix = (int) ((tiw->pos+1) & (TIW_SIZE-1)); + tmp_slots = (bump_to - tiw->pos); if (tmp_slots < (ErtsMonotonicTime) TIW_SIZE) slots = (int) tmp_slots; else slots = TIW_SIZE; while (slots > 0) { - p = tiw[tiw_pos_ix]; + p = tiw->w[tiw_pos_ix]; while (p) { ErlTimer *next = p->next; ASSERT(p != next); if (p->timeout_pos <= bump_to) { /* we have a timeout */ /* Remove from list */ - remove_timer(p); + remove_timer(tiw, p); *timeout_tail = p; /* Insert in timeout queue */ timeout_tail = &p->next; } @@ -374,19 +397,21 @@ void erts_bump_timers(ErtsMonotonicTime curr_time) ASSERT(tmp_slots >= (ErtsMonotonicTime) TIW_SIZE || tiw_pos_ix == (int) ((bump_to+1) & (TIW_SIZE-1))); - tiw_pos = bump_to; + tiw->pos = bump_to; /* Search at most two seconds ahead... */ - (void) find_next_timeout(curr_time, ERTS_SEC_TO_MONOTONIC(2)); + (void) find_next_timeout(tiw, curr_time, ERTS_SEC_TO_MONOTONIC(2)); done: - erts_smp_mtx_unlock(&tiw_lock); + erts_smp_mtx_unlock(&tiw->lock); - erts_smp_atomic32_set_nob(&is_bumping, 0); + erts_smp_atomic32_set_nob(&tiw->is_bumping, 0); /* Call timedout timers callbacks */ while (timeout_head) { + ErlTimeoutProc timeout; + void *arg; p = timeout_head; timeout_head = p->next; /* Here comes hairy use of the timer fields! @@ -399,23 +424,61 @@ done: p->next = NULL; p->prev = NULL; p->slot = 0; - (*p->timeout)(p->arg); + timeout = p->timeout; + arg = p->arg; + (*timeout)(arg); } } Uint erts_timer_wheel_memory_size(void) { - return (Uint) TIW_SIZE * sizeof(ErlTimer*); +#ifdef ERTS_SMP + return sizeof(ErtsTimerWheel)*(1 + erts_no_schedulers); +#else + return sizeof(ErtsTimerWheel); +#endif } +ErtsTimerWheel * +erts_create_timer_wheel(int no) +{ + ErtsMonotonicTime mtime; + int i; + ErtsTimerWheel *tiw; + tiw = (ErtsTimerWheel *) erts_alloc(ERTS_ALC_T_TIMER_WHEEL, + sizeof(ErtsTimerWheel)); + for(i = 0; i < TIW_SIZE; i++) + tiw->w[i] = NULL; + + erts_smp_atomic32_init_nob(&tiw->is_bumping, 0); + erts_smp_mtx_init_x(&tiw->lock, "timer_wheel", make_small(no)); + + mtime = erts_get_monotonic_time(); + tiw->pos = ERTS_MONOTONIC_TO_CLKTCKS(mtime); + tiw->nto = 0; + tiw->at_once.head = NULL; + tiw->at_once.tail = &tiw->at_once.head; + tiw->at_once.nto = 0; + tiw->true_next_timeout_time = 0; + tiw->next_timeout_time = mtime + ERTS_MONOTONIC_DAY; + init_next_timeout(tiw, mtime + ERTS_MONOTONIC_DAY); + return tiw; +} + +ErtsNextTimeoutRef +erts_get_next_timeout_reference(ErtsTimerWheel *tiw) +{ + return (ErtsNextTimeoutRef) &tiw->next_timeout; +} + + /* this routine links the time cells into a free list at the start and sets the time queue as empty */ void erts_init_time(int time_correction, ErtsTimeWarpMode time_warp_mode) { - ErtsMonotonicTime mtime; - int i, itime; + int itime; /* system dependent init; must be done before do_time_init() if timer thread is enabled */ @@ -428,41 +491,39 @@ erts_init_time(int time_correction, ErtsTimeWarpMode time_warp_mode) tiw_itime = itime; #endif - erts_smp_atomic32_init_nob(&is_bumping, 0); - erts_smp_mtx_init(&tiw_lock, "timer_wheel"); - - tiw = (ErlTimer**) erts_alloc(ERTS_ALC_T_TIMER_WHEEL, - TIW_SIZE * sizeof(ErlTimer*)); - for(i = 0; i < TIW_SIZE; i++) - tiw[i] = NULL; - - mtime = erts_get_monotonic_time(); - tiw_pos = ERTS_MONOTONIC_TO_CLKTCKS(mtime); - tiw_nto = 0; - tiw_at_once.head = NULL; - tiw_at_once.tail = &tiw_at_once.head; - tiw_at_once.nto = 0; - init_next_timeout(mtime + ERTS_MONOTONIC_DAY); - - erts_late_init_time_sup(); + erts_default_timer_wheel = erts_create_timer_wheel(0); } +void +erts_set_timer(ErlTimer *p, ErlTimeoutProc timeout, + ErlCancelProc cancel, void *arg, Uint to) +{ + ErtsMonotonicTime timeout_time, timeout_pos; + ErtsMonotonicTime curr_time; + ErtsTimerWheel *tiw; + ErtsSchedulerData *esdp; + + curr_time = erts_get_monotonic_time(); + esdp = erts_get_scheduler_data(); + if (esdp) + tiw = esdp->timer_wheel; + else + tiw = erts_default_timer_wheel; + erts_smp_mtx_lock(&tiw->lock); + if (get_timer_wheel(p)) + ERTS_INTERNAL_ERROR("Double set timer"); -/* -** Insert a process into the time queue, with a timeout 't' -*/ -static ErtsMonotonicTime -insert_timer(ErlTimer* p, ErtsMonotonicTime curr_time, ErtsMonotonicTime to) -{ - ErtsMonotonicTime timeout_time, timeout_pos; + p->timeout = timeout; + p->cancel = cancel; + p->arg = arg; if (to == 0) { timeout_pos = ERTS_MONOTONIC_TO_CLKTCKS(curr_time); - tiw_nto++; - tiw_at_once.nto++; - *tiw_at_once.tail = p; + tiw->nto++; + tiw->at_once.nto++; + *tiw->at_once.tail = p; p->next = NULL; p->timeout_pos = timeout_pos; timeout_time = ERTS_CLKTCKS_TO_MONOTONIC(timeout_pos); @@ -479,13 +540,13 @@ insert_timer(ErlTimer* p, ErtsMonotonicTime curr_time, ErtsMonotonicTime to) p->slot = (Uint) tm; /* insert at head of list at slot */ - p->next = tiw[tm]; + p->next = tiw->w[tm]; p->prev = NULL; if (p->next != NULL) p->next->prev = p; - tiw[tm] = p; + tiw->w[tm] = p; - tiw_nto++; + tiw->nto++; timeout_time = ERTS_CLKTCKS_TO_MONOTONIC(timeout_pos); p->timeout_pos = timeout_pos; @@ -495,59 +556,45 @@ insert_timer(ErlTimer* p, ErtsMonotonicTime curr_time, ErtsMonotonicTime to) < ERTS_MSEC_TO_MONOTONIC(to) + ERTS_CLKTCKS_TO_MONOTONIC(1)); } - if (timeout_time < next_timeout_time) - set_next_timeout(timeout_time, 1); + if (timeout_time < tiw->next_timeout_time) + set_next_timeout(tiw, timeout_time, 1); - return timeout_time; -} + set_timer_wheel(p, tiw); + + erts_smp_mtx_unlock(&tiw->lock); -void -erts_set_timer(ErlTimer* p, ErlTimeoutProc timeout, ErlCancelProc cancel, - void* arg, Uint t) -{ -#ifdef ERTS_SMP - ErtsMonotonicTime timeout_time; -#endif - ErtsMonotonicTime current_time = erts_get_monotonic_time(); - erts_smp_mtx_lock(&tiw_lock); - if (p->active) { /* XXX assert ? */ - erts_smp_mtx_unlock(&tiw_lock); - return; - } - p->timeout = timeout; - p->cancel = cancel; - p->arg = arg; - p->active = 1; -#ifdef ERTS_SMP - timeout_time = -#else - (void) -#endif - insert_timer(p, current_time, (ErtsMonotonicTime) t); - erts_smp_mtx_unlock(&tiw_lock); #if defined(ERTS_SMP) - erts_sys_schedule_interrupt_timed(1, timeout_time); + if (tiw == erts_default_timer_wheel) + erts_interupt_aux_thread_timed(timeout_time); #endif + } void -erts_cancel_timer(ErlTimer* p) +erts_cancel_timer(ErlTimer *p) { - erts_smp_mtx_lock(&tiw_lock); - if (!p->active) { /* allow repeated cancel (drivers) */ - erts_smp_mtx_unlock(&tiw_lock); - return; - } - - remove_timer(p); - p->slot = 0; + ErtsTimerWheel *tiw; + ErlCancelProc cancel; + void *arg; - if (p->cancel != NULL) { - erts_smp_mtx_unlock(&tiw_lock); - (*p->cancel)(p->arg); + tiw = get_timer_wheel(p); + if (!tiw) return; + + erts_smp_mtx_lock(&tiw->lock); + if (tiw != get_timer_wheel(p)) + cancel = NULL; + else { + remove_timer(tiw, p); + p->slot = 0; + + cancel = p->cancel; + arg = p->arg; } - erts_smp_mtx_unlock(&tiw_lock); + erts_smp_mtx_unlock(&tiw->lock); + + if (cancel) + (*cancel)(arg); } /* @@ -559,18 +606,19 @@ erts_cancel_timer(ErlTimer* p) Uint erts_time_left(ErlTimer *p) { + ErtsTimerWheel *tiw; ErtsMonotonicTime current_time, timeout_time; - erts_smp_mtx_lock(&tiw_lock); - - if (!p->active) { - erts_smp_mtx_unlock(&tiw_lock); + tiw = get_timer_wheel(p); + if (!tiw) return 0; - } - timeout_time = ERTS_CLKTCKS_TO_MONOTONIC(p->timeout_pos); - - erts_smp_mtx_unlock(&tiw_lock); + erts_smp_mtx_lock(&tiw->lock); + if (tiw != get_timer_wheel(p)) + timeout_time = ERTS_MONOTONIC_TIME_MIN; + else + timeout_time = ERTS_CLKTCKS_TO_MONOTONIC(p->timeout_pos); + erts_smp_mtx_unlock(&tiw->lock); current_time = erts_get_monotonic_time(); if (timeout_time <= current_time) @@ -581,34 +629,35 @@ erts_time_left(ErlTimer *p) #ifdef DEBUG void erts_p_slpq(void) { + ErtsTimerWheel *tiw = erts_default_timer_wheel; ErtsMonotonicTime current_time = erts_get_monotonic_time(); int i; ErlTimer* p; - erts_smp_mtx_lock(&tiw_lock); + erts_smp_mtx_lock(&tiw->lock); /* print the whole wheel, starting at the current position */ erts_printf("\ncurrent time = %bps tiw_pos = %d tiw_nto %d\n", - current_time, tiw_pos, tiw_nto); - i = tiw_pos; - if (tiw[i] != NULL) { + current_time, tiw->pos, tiw->nto); + i = tiw->pos; + if (tiw->w[i] != NULL) { erts_printf("%d:\n", i); - for(p = tiw[i]; p != NULL; p = p->next) { + for(p = tiw->w[i]; p != NULL; p = p->next) { erts_printf(" (timeout time %bps, slot %d)\n", ERTS_CLKTCKS_TO_MONOTONIC(p->timeout_pos), p->slot); } } - for(i = ((i+1) & (TIW_SIZE-1)); i != (tiw_pos & (TIW_SIZE-1)); i = ((i+1) & (TIW_SIZE-1))) { - if (tiw[i] != NULL) { + for(i = ((i+1) & (TIW_SIZE-1)); i != (tiw->pos & (TIW_SIZE-1)); i = ((i+1) & (TIW_SIZE-1))) { + if (tiw->w[i] != NULL) { erts_printf("%d:\n", i); - for(p = tiw[i]; p != NULL; p = p->next) { + for(p = tiw->w[i]; p != NULL; p = p->next) { erts_printf(" (timeout time %bps, slot %d)\n", ERTS_CLKTCKS_TO_MONOTONIC(p->timeout_pos), p->slot); } } } - erts_smp_mtx_unlock(&tiw_lock); + erts_smp_mtx_unlock(&tiw->lock); } #endif /* DEBUG */ diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c index 54f1a122c4..da03960b59 100644 --- a/erts/emulator/beam/utils.c +++ b/erts/emulator/beam/utils.c @@ -3777,7 +3777,7 @@ erts_create_smp_ptimer(ErtsSmpPTimer **timer_ref, res->timer.timeout_func = timeout_func; res->timer.timer_ref = timer_ref; res->timer.id = id; - res->timer.tm.active = 0; /* MUST be initalized */ + erts_init_timer(&res->timer.tm); ASSERT(!*timer_ref); |