diff options
Diffstat (limited to 'erts/emulator/beam/erl_process.c')
-rw-r--r-- | erts/emulator/beam/erl_process.c | 200 |
1 files changed, 156 insertions, 44 deletions
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 0f7f1598fd..2427d87f66 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -174,7 +174,6 @@ ErtsLcPSDLocks erts_psd_required_locks[ERTS_PSD_SIZE]; typedef struct { int aux_work; int tse; - int sys_schedule; } ErtsBusyWaitParams; static ErtsBusyWaitParams sched_busy_wait_params[ERTS_SCHED_TYPE_LAST + 1]; @@ -344,6 +343,9 @@ erts_sched_stat_t erts_sched_stat; static erts_tsd_key_t ERTS_WRITE_UNLIKELY(sched_data_key); +#if ERTS_POLL_USE_SCHEDULER_POLLING +static erts_atomic32_t doing_sys_schedule; +#endif static erts_atomic32_t no_empty_run_queues; long erts_runq_supervision_interval = 0; static ethr_event runq_supervision_event; @@ -1646,7 +1648,7 @@ haw_thr_prgr_wakeup(ErtsAuxWorkData *awdp, ErtsThrPrgrVal val) awdp->latest_wakeup = val; haw_chk_later_cleanup_op_wakeup(awdp, val); } - erts_thr_progress_wakeup(awdp->esdp, val); + erts_thr_progress_wakeup(erts_thr_prgr_data(awdp->esdp), val); } } @@ -1656,7 +1658,7 @@ haw_thr_prgr_soft_wakeup(ErtsAuxWorkData *awdp, ErtsThrPrgrVal val) if (erts_thr_progress_cmp(val, awdp->latest_wakeup) > 0) { awdp->latest_wakeup = val; haw_chk_later_cleanup_op_wakeup(awdp, val); - erts_thr_progress_wakeup(awdp->esdp, val); + erts_thr_progress_wakeup(erts_thr_prgr_data(awdp->esdp), val); } } @@ -1670,7 +1672,7 @@ haw_thr_prgr_later_cleanup_op_wakeup(ErtsAuxWorkData *awdp, ErtsThrPrgrVal val, else { awdp->latest_wakeup = val; awdp->later_op.size = thr_prgr_later_cleanup_op_threshold; - erts_thr_progress_wakeup(awdp->esdp, val); + erts_thr_progress_wakeup(erts_thr_prgr_data(awdp->esdp), val); } } } @@ -3066,6 +3068,7 @@ aux_thread(void *unused) ErtsSchedulerSleepInfo *ssi = ERTS_SCHED_SLEEP_INFO_IX(-1); erts_aint32_t aux_work; ErtsThrPrgrCallbacks callbacks; + ErtsThrPrgrData *tpd; int thr_prgr_active = 1; ERTS_MSACC_DECLARE_CACHE(); @@ -3087,12 +3090,16 @@ aux_thread(void *unused) callbacks.wait = thr_prgr_wait; callbacks.finalize_wait = thr_prgr_fin_wait; - erts_thr_progress_register_managed_thread(NULL, &callbacks, 1); + tpd = erts_thr_progress_register_managed_thread(NULL, &callbacks, 1); init_aux_work_data(awdp, NULL, NULL); awdp->ssi = ssi; #if ERTS_POLL_USE_FALLBACK - ssi->psi = erts_create_pollset_thread(-1); +#if ERTS_POLL_USE_SCHEDULER_POLLING + ssi->psi = erts_create_pollset_thread(-2, tpd); +#else + ssi->psi = erts_create_pollset_thread(-1, tpd); +#endif #endif sched_prep_spin_wait(ssi); @@ -3105,11 +3112,11 @@ aux_thread(void *unused) aux_work = erts_atomic32_read_acqb(&ssi->aux_work); if (aux_work) { if (!thr_prgr_active) - erts_thr_progress_active(NULL, thr_prgr_active = 1); + erts_thr_progress_active(tpd, thr_prgr_active = 1); aux_work = handle_aux_work(awdp, aux_work, 1); ERTS_MSACC_UPDATE_CACHE(); - if (aux_work && erts_thr_progress_update(NULL)) - erts_thr_progress_leader_update(NULL); + if (aux_work && erts_thr_progress_update(tpd)) + erts_thr_progress_leader_update(tpd); } if (!aux_work) { @@ -3120,7 +3127,7 @@ aux_thread(void *unused) #endif if (thr_prgr_active) - erts_thr_progress_active(NULL, thr_prgr_active = 0); + erts_thr_progress_active(tpd, thr_prgr_active = 0); #if ERTS_POLL_USE_FALLBACK @@ -3132,11 +3139,11 @@ aux_thread(void *unused) if (flgs & ERTS_SSI_FLG_SLEEPING) { ASSERT(flgs & ERTS_SSI_FLG_POLL_SLEEPING); ASSERT(flgs & ERTS_SSI_FLG_WAITING); - erts_check_io(ssi->psi); + erts_check_io(ssi->psi, ERTS_POLL_INF_TIMEOUT); } } #else - erts_thr_progress_prepare_wait(NULL); + erts_thr_progress_prepare_wait(tpd); flgs = sched_spin_wait(ssi, 0); @@ -3153,7 +3160,7 @@ aux_thread(void *unused) ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_OTHER); } } - erts_thr_progress_finalize_wait(NULL); + erts_thr_progress_finalize_wait(tpd); #endif } @@ -3171,7 +3178,8 @@ poll_thread(void *arg) erts_aint32_t aux_work; ErtsThrPrgrCallbacks callbacks; int thr_prgr_active = 1; - struct erts_poll_thread *psi = erts_create_pollset_thread(id); + struct erts_poll_thread *psi; + ErtsThrPrgrData *tpd; ERTS_MSACC_DECLARE_CACHE(); #ifdef ERTS_ENABLE_LOCK_CHECK @@ -3192,9 +3200,12 @@ poll_thread(void *arg) callbacks.wait = thr_prgr_wait; callbacks.finalize_wait = thr_prgr_fin_wait; - erts_thr_progress_register_managed_thread(NULL, &callbacks, 0); + tpd = erts_thr_progress_register_managed_thread(NULL, &callbacks, 0); init_aux_work_data(awdp, NULL, NULL); awdp->ssi = ssi; + + psi = erts_create_pollset_thread(id, tpd); + ssi->psi = psi; sched_prep_spin_wait(ssi); @@ -3207,16 +3218,16 @@ poll_thread(void *arg) aux_work = erts_atomic32_read_acqb(&ssi->aux_work); if (aux_work) { if (!thr_prgr_active) - erts_thr_progress_active(NULL, thr_prgr_active = 1); + erts_thr_progress_active(tpd, thr_prgr_active = 1); aux_work = handle_aux_work(awdp, aux_work, 1); ERTS_MSACC_UPDATE_CACHE(); - if (aux_work && erts_thr_progress_update(NULL)) - erts_thr_progress_leader_update(NULL); + if (aux_work && erts_thr_progress_update(tpd)) + erts_thr_progress_leader_update(tpd); } if (!aux_work) { if (thr_prgr_active) - erts_thr_progress_active(NULL, thr_prgr_active = 0); + erts_thr_progress_active(tpd, thr_prgr_active = 0); flgs = sched_spin_wait(ssi, 0); @@ -3226,7 +3237,7 @@ poll_thread(void *arg) if (flgs & ERTS_SSI_FLG_SLEEPING) { ASSERT(flgs & ERTS_SSI_FLG_POLL_SLEEPING); ASSERT(flgs & ERTS_SSI_FLG_WAITING); - erts_check_io(psi); + erts_check_io(psi, ERTS_POLL_INF_TIMEOUT); } } } @@ -3236,6 +3247,59 @@ poll_thread(void *arg) return NULL; } +#if ERTS_POLL_USE_SCHEDULER_POLLING +static ERTS_INLINE void +clear_sys_scheduling(void) +{ + erts_atomic32_set_mb(&doing_sys_schedule, 0); +} + +static ERTS_INLINE int +try_set_sys_scheduling(void) +{ + return 0 == erts_atomic32_cmpxchg_acqb(&doing_sys_schedule, 1, 0); +} + + +static ERTS_INLINE int +prepare_for_sys_schedule(void) +{ + while (!erts_port_task_have_outstanding_io_tasks() + && try_set_sys_scheduling()) { + if (!erts_port_task_have_outstanding_io_tasks()) + return 1; + clear_sys_scheduling(); + } + return 0; +} + +static void +check_io_timer(void *null) +{ + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + if (prepare_for_sys_schedule()) { + erts_check_io(esdp->ssi->psi, ERTS_POLL_NO_TIMEOUT); + clear_sys_scheduling(); + } + + /* The timer is cleared if this schedulers run-queue became empty + or if the CHECKIO flag was cleared. The CHECKIO flags is cleared + when a check_balance assigns another scheduler to be the poller in + the overload scenario. */ + if ((ERTS_RUNQ_FLGS_GET_NOB(esdp->run_queue) & (ERTS_RUNQ_FLG_OUT_OF_WORK|ERTS_RUNQ_FLG_CHECKIO)) + == ERTS_RUNQ_FLG_CHECKIO) { + erts_start_timer_callback(ERTS_POLL_SCHEDULER_POLLING_TIMEOUT, + check_io_timer, NULL); + } else { + ERTS_RUNQ_FLGS_UNSET(esdp->run_queue, ERTS_RUNQ_FLG_CHECKIO); + } +} + +#else +#define clear_sys_scheduling() +#define prepare_for_sys_schedule() 0 +#endif + static void scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) { @@ -3286,13 +3350,13 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) aux_work = erts_atomic32_read_acqb(&ssi->aux_work); if (aux_work && !ERTS_SCHEDULER_IS_DIRTY(esdp)) { if (!thr_prgr_active) { - erts_thr_progress_active(esdp, thr_prgr_active = 1); + erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1); sched_wall_time_change(esdp, 1); } aux_work = handle_aux_work(&esdp->aux_work_data, aux_work, 1); ERTS_MSACC_UPDATE_CACHE(); - if (aux_work && erts_thr_progress_update(esdp)) - erts_thr_progress_leader_update(esdp); + if (aux_work && erts_thr_progress_update(erts_thr_prgr_data(esdp))) + erts_thr_progress_leader_update(erts_thr_prgr_data(esdp)); } if (aux_work) { @@ -3301,7 +3365,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) current_time = erts_get_monotonic_time(esdp); if (current_time >= erts_next_timeout_time(esdp->next_tmo_ref)) { if (!thr_prgr_active) { - erts_thr_progress_active(esdp, thr_prgr_active = 1); + erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1); sched_wall_time_change(esdp, 1); } erts_bump_timers(esdp->timer_wheel, current_time); @@ -3321,19 +3385,36 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) } if (do_timeout) { if (!thr_prgr_active) { - erts_thr_progress_active(esdp, thr_prgr_active = 1); + erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1); sched_wall_time_change(esdp, 1); } } - else { + else if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && prepare_for_sys_schedule()) { + /* We sleep in check_io, only for normal schedulers */ + if (thr_prgr_active) { + erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 0); + sched_wall_time_change(esdp, 0); + } + flgs = sched_spin_wait(ssi, 0); + if (flgs & ERTS_SSI_FLG_SLEEPING) { + ASSERT(flgs & ERTS_SSI_FLG_WAITING); + flgs = sched_set_sleeptype(ssi, ERTS_SSI_FLG_POLL_SLEEPING); + if (flgs & ERTS_SSI_FLG_SLEEPING) { + ASSERT(flgs & ERTS_SSI_FLG_POLL_SLEEPING); + ASSERT(flgs & ERTS_SSI_FLG_WAITING); + erts_check_io(ssi->psi, timeout_time); + current_time = erts_get_monotonic_time(esdp); + } + } + clear_sys_scheduling(); + } else { if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) { if (thr_prgr_active) { - erts_thr_progress_active(esdp, thr_prgr_active = 0); + erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 0); sched_wall_time_change(esdp, 0); } - erts_thr_progress_prepare_wait(esdp); + erts_thr_progress_prepare_wait(erts_thr_prgr_data(esdp)); } - flgs = sched_spin_wait(ssi, spincount); if (flgs & ERTS_SSI_FLG_SLEEPING) { ASSERT(flgs & ERTS_SSI_FLG_WAITING); @@ -3363,7 +3444,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) } } if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) - erts_thr_progress_finalize_wait(esdp); + erts_thr_progress_finalize_wait(erts_thr_prgr_data(esdp)); } if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && current_time >= timeout_time) erts_bump_timers(esdp->timer_wheel, current_time); @@ -3392,7 +3473,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) if (ERTS_SCHEDULER_IS_DIRTY(esdp)) dirty_sched_wall_time_change(esdp, working = 1); else if (!thr_prgr_active) { - erts_thr_progress_active(esdp, thr_prgr_active = 1); + erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1); sched_wall_time_change(esdp, 1); } @@ -4580,6 +4661,15 @@ check_balance(ErtsRunQueue *c_rq) if (blnc_no_rqs == 1) { c_rq->check_balance_reds = INT_MAX; erts_atomic32_set_nob(&balance_info.checking_balance, 0); +#if ERTS_POLL_USE_SCHEDULER_POLLING + c_rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; + if ((ERTS_RUNQ_FLGS_GET_NOB(c_rq) & (ERTS_RUNQ_FLG_OUT_OF_WORK|ERTS_RUNQ_FLG_CHECKIO)) + == 0) { + ERTS_RUNQ_FLGS_SET(c_rq, ERTS_RUNQ_FLG_CHECKIO); + erts_start_timer_callback(ERTS_POLL_SCHEDULER_POLLING_TIMEOUT, check_io_timer, NULL); + } + ERTS_RUNQ_FLGS_UNSET(c_rq, ERTS_RUNQ_FLGS_MIGRATION_INFO); +#endif return; } @@ -5099,6 +5189,19 @@ erts_fprintf(stderr, "--------------------------------\n"); /* Publish new migration paths... */ erts_atomic_set_wb(&erts_migration_paths, (erts_aint_t) new_mpaths); +#if ERTS_POLL_USE_SCHEDULER_POLLING + if (full_scheds == current_active) { + ERTS_ASSERT(full_scheds <= current_active); + /* All active schedulers ran for full, we need to do active polling, + so we setup a timer that does active polling */ + if (!(ERTS_RUNQ_FLGS_GET_NOB(c_rq) & ERTS_RUNQ_FLG_CHECKIO)) { + /* Active polling is not running, start it */ + erts_start_timer_callback(ERTS_POLL_SCHEDULER_POLLING_TIMEOUT, check_io_timer, NULL); + } + run_queue_info[c_rq->ix].flags |= ERTS_RUNQ_FLG_CHECKIO; + } +#endif + /* Reset balance statistics in all online queues */ for (qix = 0; qix < blnc_no_rqs; qix++) { Uint32 flags = run_queue_info[qix].flags; @@ -5108,6 +5211,8 @@ erts_fprintf(stderr, "--------------------------------\n"); ASSERT(!(flags & ERTS_RUNQ_FLG_OUT_OF_WORK)); if (rq->waiting) flags |= ERTS_RUNQ_FLG_OUT_OF_WORK; + if (rq != c_rq) + flags &= ~ERTS_RUNQ_FLG_CHECKIO; rq->full_reds_history_sum = run_queue_info[qix].full_reds_history_sum; @@ -5117,8 +5222,7 @@ erts_fprintf(stderr, "--------------------------------\n"); ERTS_DBG_CHK_FULL_REDS_HISTORY(rq); rq->out_of_work_count = 0; - (void) ERTS_RUNQ_FLGS_READ_BSET(rq, ERTS_RUNQ_FLGS_MIGRATION_INFO, flags); - + (void) ERTS_RUNQ_FLGS_READ_BSET(rq, ERTS_RUNQ_FLGS_MIGRATION_INFO|ERTS_RUNQ_FLG_CHECKIO, flags); rq->max_len = erts_atomic32_read_dirty(&rq->len); for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { ErtsRunQueueInfo *rqi; @@ -5557,7 +5661,6 @@ erts_sched_set_busy_wait_threshold(ErtsSchedType sched_type, char *str) return EINVAL; } - params->sys_schedule = sys_sched; params->tse = sys_sched * ERTS_SCHED_TSE_SLEEP_SPINCOUNT_FACT; params->aux_work = sys_sched * aux_work_fact; @@ -5768,6 +5871,9 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online, int no_poll_th size_runqs = sizeof(ErtsAlignedRunQueue) * tot_rqs; erts_aligned_run_queues = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_RUNQS, size_runqs); +#if ERTS_POLL_USE_SCHEDULER_POLLING + erts_atomic32_init_nob(&doing_sys_schedule, 0); +#endif erts_atomic32_init_nob(&no_empty_run_queues, 0); erts_no_run_queues = n; @@ -7565,7 +7671,8 @@ suspend_scheduler(ErtsSchedulerData *esdp) if (aux_work|evacuate) { if (!thr_prgr_active) { - erts_thr_progress_active(esdp, thr_prgr_active = 1); + erts_thr_progress_active(erts_thr_prgr_data(esdp), + thr_prgr_active = 1); sched_wall_time_change(esdp, 1); } if (aux_work) @@ -7573,8 +7680,8 @@ suspend_scheduler(ErtsSchedulerData *esdp) aux_work, 1); - if (aux_work && erts_thr_progress_update(esdp)) - erts_thr_progress_leader_update(esdp); + if (aux_work && erts_thr_progress_update(erts_thr_prgr_data(esdp))) + erts_thr_progress_leader_update(erts_thr_prgr_data(esdp)); if (evacuate) { erts_runq_lock(esdp->run_queue); evacuate_run_queue(esdp->run_queue, &sbp); @@ -7593,18 +7700,18 @@ suspend_scheduler(ErtsSchedulerData *esdp) if (!aux_work && current_time < timeout_time) { /* go to sleep... */ if (thr_prgr_active) { - erts_thr_progress_active(esdp, thr_prgr_active = 0); + erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 0); sched_wall_time_change(esdp, 0); } - erts_thr_progress_prepare_wait(NULL); + erts_thr_progress_prepare_wait(erts_thr_prgr_data(NULL)); suspend_normal_scheduler_sleep(esdp); - erts_thr_progress_finalize_wait(NULL); + erts_thr_progress_finalize_wait(erts_thr_prgr_data(NULL)); current_time = erts_get_monotonic_time(esdp); } if (current_time >= timeout_time) { if (!thr_prgr_active) { - erts_thr_progress_active(esdp, thr_prgr_active = 1); + erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1); sched_wall_time_change(esdp, 1); } erts_bump_timers(esdp->timer_wheel, current_time); @@ -7661,7 +7768,7 @@ suspend_scheduler(ErtsSchedulerData *esdp) profile_scheduler(make_small(esdp->no), am_active); if (!thr_prgr_active) { - erts_thr_progress_active(esdp, thr_prgr_active = 1); + erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1); sched_wall_time_change(esdp, 1); } } @@ -8296,6 +8403,11 @@ sched_thread_func(void *vesdp) erts_msacc_init_thread("scheduler", no, 1); erts_thr_progress_register_managed_thread(esdp, &callbacks, 0); + +#if ERTS_POLL_USE_SCHEDULER_POLLING + esdp->ssi->psi = erts_create_pollset_thread(-1, NULL); +#endif + erts_alloc_register_scheduler(vesdp); #ifdef ERTS_ENABLE_LOCK_CHECK { @@ -9313,12 +9425,12 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) } } - leader_update = erts_thr_progress_update(esdp); + leader_update = erts_thr_progress_update(erts_thr_prgr_data(esdp)); aux_work = erts_atomic32_read_acqb(&esdp->ssi->aux_work); if (aux_work | leader_update) { erts_runq_unlock(rq); if (leader_update) - erts_thr_progress_leader_update(esdp); + erts_thr_progress_leader_update(erts_thr_prgr_data(esdp)); if (aux_work) handle_aux_work(&esdp->aux_work_data, aux_work, 0); erts_runq_lock(rq); |