diff options
Diffstat (limited to 'erts/emulator/beam/erl_process.c')
-rw-r--r-- | erts/emulator/beam/erl_process.c | 122 |
1 files changed, 114 insertions, 8 deletions
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 919d9f9edf..2427d87f66 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -174,7 +174,6 @@ ErtsLcPSDLocks erts_psd_required_locks[ERTS_PSD_SIZE]; typedef struct { int aux_work; int tse; - int sys_schedule; } ErtsBusyWaitParams; static ErtsBusyWaitParams sched_busy_wait_params[ERTS_SCHED_TYPE_LAST + 1]; @@ -344,6 +343,9 @@ erts_sched_stat_t erts_sched_stat; static erts_tsd_key_t ERTS_WRITE_UNLIKELY(sched_data_key); +#if ERTS_POLL_USE_SCHEDULER_POLLING +static erts_atomic32_t doing_sys_schedule; +#endif static erts_atomic32_t no_empty_run_queues; long erts_runq_supervision_interval = 0; static ethr_event runq_supervision_event; @@ -3093,8 +3095,12 @@ aux_thread(void *unused) awdp->ssi = ssi; #if ERTS_POLL_USE_FALLBACK +#if ERTS_POLL_USE_SCHEDULER_POLLING + ssi->psi = erts_create_pollset_thread(-2, tpd); +#else ssi->psi = erts_create_pollset_thread(-1, tpd); #endif +#endif sched_prep_spin_wait(ssi); @@ -3133,7 +3139,7 @@ aux_thread(void *unused) if (flgs & ERTS_SSI_FLG_SLEEPING) { ASSERT(flgs & ERTS_SSI_FLG_POLL_SLEEPING); ASSERT(flgs & ERTS_SSI_FLG_WAITING); - erts_check_io(ssi->psi); + erts_check_io(ssi->psi, ERTS_POLL_INF_TIMEOUT); } } #else @@ -3231,7 +3237,7 @@ poll_thread(void *arg) if (flgs & ERTS_SSI_FLG_SLEEPING) { ASSERT(flgs & ERTS_SSI_FLG_POLL_SLEEPING); ASSERT(flgs & ERTS_SSI_FLG_WAITING); - erts_check_io(psi); + erts_check_io(psi, ERTS_POLL_INF_TIMEOUT); } } } @@ -3241,6 +3247,59 @@ poll_thread(void *arg) return NULL; } +#if ERTS_POLL_USE_SCHEDULER_POLLING +static ERTS_INLINE void +clear_sys_scheduling(void) +{ + erts_atomic32_set_mb(&doing_sys_schedule, 0); +} + +static ERTS_INLINE int +try_set_sys_scheduling(void) +{ + return 0 == erts_atomic32_cmpxchg_acqb(&doing_sys_schedule, 1, 0); +} + + +static ERTS_INLINE int +prepare_for_sys_schedule(void) +{ + while (!erts_port_task_have_outstanding_io_tasks() + && try_set_sys_scheduling()) { + if (!erts_port_task_have_outstanding_io_tasks()) + return 1; + clear_sys_scheduling(); + } + return 0; +} + +static void +check_io_timer(void *null) +{ + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + if (prepare_for_sys_schedule()) { + erts_check_io(esdp->ssi->psi, ERTS_POLL_NO_TIMEOUT); + clear_sys_scheduling(); + } + + /* The timer is cleared if this schedulers run-queue became empty + or if the CHECKIO flag was cleared. The CHECKIO flags is cleared + when a check_balance assigns another scheduler to be the poller in + the overload scenario. */ + if ((ERTS_RUNQ_FLGS_GET_NOB(esdp->run_queue) & (ERTS_RUNQ_FLG_OUT_OF_WORK|ERTS_RUNQ_FLG_CHECKIO)) + == ERTS_RUNQ_FLG_CHECKIO) { + erts_start_timer_callback(ERTS_POLL_SCHEDULER_POLLING_TIMEOUT, + check_io_timer, NULL); + } else { + ERTS_RUNQ_FLGS_UNSET(esdp->run_queue, ERTS_RUNQ_FLG_CHECKIO); + } +} + +#else +#define clear_sys_scheduling() +#define prepare_for_sys_schedule() 0 +#endif + static void scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) { @@ -3330,7 +3389,25 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) sched_wall_time_change(esdp, 1); } } - else { + else if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && prepare_for_sys_schedule()) { + /* We sleep in check_io, only for normal schedulers */ + if (thr_prgr_active) { + erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 0); + sched_wall_time_change(esdp, 0); + } + flgs = sched_spin_wait(ssi, 0); + if (flgs & ERTS_SSI_FLG_SLEEPING) { + ASSERT(flgs & ERTS_SSI_FLG_WAITING); + flgs = sched_set_sleeptype(ssi, ERTS_SSI_FLG_POLL_SLEEPING); + if (flgs & ERTS_SSI_FLG_SLEEPING) { + ASSERT(flgs & ERTS_SSI_FLG_POLL_SLEEPING); + ASSERT(flgs & ERTS_SSI_FLG_WAITING); + erts_check_io(ssi->psi, timeout_time); + current_time = erts_get_monotonic_time(esdp); + } + } + clear_sys_scheduling(); + } else { if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) { if (thr_prgr_active) { erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 0); @@ -3338,7 +3415,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) } erts_thr_progress_prepare_wait(erts_thr_prgr_data(esdp)); } - flgs = sched_spin_wait(ssi, spincount); if (flgs & ERTS_SSI_FLG_SLEEPING) { ASSERT(flgs & ERTS_SSI_FLG_WAITING); @@ -4585,6 +4661,15 @@ check_balance(ErtsRunQueue *c_rq) if (blnc_no_rqs == 1) { c_rq->check_balance_reds = INT_MAX; erts_atomic32_set_nob(&balance_info.checking_balance, 0); +#if ERTS_POLL_USE_SCHEDULER_POLLING + c_rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; + if ((ERTS_RUNQ_FLGS_GET_NOB(c_rq) & (ERTS_RUNQ_FLG_OUT_OF_WORK|ERTS_RUNQ_FLG_CHECKIO)) + == 0) { + ERTS_RUNQ_FLGS_SET(c_rq, ERTS_RUNQ_FLG_CHECKIO); + erts_start_timer_callback(ERTS_POLL_SCHEDULER_POLLING_TIMEOUT, check_io_timer, NULL); + } + ERTS_RUNQ_FLGS_UNSET(c_rq, ERTS_RUNQ_FLGS_MIGRATION_INFO); +#endif return; } @@ -5104,6 +5189,19 @@ erts_fprintf(stderr, "--------------------------------\n"); /* Publish new migration paths... */ erts_atomic_set_wb(&erts_migration_paths, (erts_aint_t) new_mpaths); +#if ERTS_POLL_USE_SCHEDULER_POLLING + if (full_scheds == current_active) { + ERTS_ASSERT(full_scheds <= current_active); + /* All active schedulers ran for full, we need to do active polling, + so we setup a timer that does active polling */ + if (!(ERTS_RUNQ_FLGS_GET_NOB(c_rq) & ERTS_RUNQ_FLG_CHECKIO)) { + /* Active polling is not running, start it */ + erts_start_timer_callback(ERTS_POLL_SCHEDULER_POLLING_TIMEOUT, check_io_timer, NULL); + } + run_queue_info[c_rq->ix].flags |= ERTS_RUNQ_FLG_CHECKIO; + } +#endif + /* Reset balance statistics in all online queues */ for (qix = 0; qix < blnc_no_rqs; qix++) { Uint32 flags = run_queue_info[qix].flags; @@ -5113,6 +5211,8 @@ erts_fprintf(stderr, "--------------------------------\n"); ASSERT(!(flags & ERTS_RUNQ_FLG_OUT_OF_WORK)); if (rq->waiting) flags |= ERTS_RUNQ_FLG_OUT_OF_WORK; + if (rq != c_rq) + flags &= ~ERTS_RUNQ_FLG_CHECKIO; rq->full_reds_history_sum = run_queue_info[qix].full_reds_history_sum; @@ -5122,8 +5222,7 @@ erts_fprintf(stderr, "--------------------------------\n"); ERTS_DBG_CHK_FULL_REDS_HISTORY(rq); rq->out_of_work_count = 0; - (void) ERTS_RUNQ_FLGS_READ_BSET(rq, ERTS_RUNQ_FLGS_MIGRATION_INFO, flags); - + (void) ERTS_RUNQ_FLGS_READ_BSET(rq, ERTS_RUNQ_FLGS_MIGRATION_INFO|ERTS_RUNQ_FLG_CHECKIO, flags); rq->max_len = erts_atomic32_read_dirty(&rq->len); for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { ErtsRunQueueInfo *rqi; @@ -5562,7 +5661,6 @@ erts_sched_set_busy_wait_threshold(ErtsSchedType sched_type, char *str) return EINVAL; } - params->sys_schedule = sys_sched; params->tse = sys_sched * ERTS_SCHED_TSE_SLEEP_SPINCOUNT_FACT; params->aux_work = sys_sched * aux_work_fact; @@ -5773,6 +5871,9 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online, int no_poll_th size_runqs = sizeof(ErtsAlignedRunQueue) * tot_rqs; erts_aligned_run_queues = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_RUNQS, size_runqs); +#if ERTS_POLL_USE_SCHEDULER_POLLING + erts_atomic32_init_nob(&doing_sys_schedule, 0); +#endif erts_atomic32_init_nob(&no_empty_run_queues, 0); erts_no_run_queues = n; @@ -8302,6 +8403,11 @@ sched_thread_func(void *vesdp) erts_msacc_init_thread("scheduler", no, 1); erts_thr_progress_register_managed_thread(esdp, &callbacks, 0); + +#if ERTS_POLL_USE_SCHEDULER_POLLING + esdp->ssi->psi = erts_create_pollset_thread(-1, NULL); +#endif + erts_alloc_register_scheduler(vesdp); #ifdef ERTS_ENABLE_LOCK_CHECK { |