From bc5818cfdd56e19a16357f4443d80a56426aa134 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Mon, 10 Oct 2011 21:03:10 +0200 Subject: Replace system block with thread progress block The ERTS internal system block functionality has been replaced by new functionality for blocking the system. The old system block functionality had contention issues and complexity issues. The new functionality piggy-backs on thread progress tracking functionality needed by newly introduced lock-free synchronization in the runtime system. When the functionality for blocking the system isn't used there is more or less no overhead at all. This since the functionality for tracking thread progress is there and needed anyway. --- erts/emulator/beam/erl_process.c | 308 +++++++++++++++++++++++---------------- 1 file changed, 185 insertions(+), 123 deletions(-) (limited to 'erts/emulator/beam/erl_process.c') diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 4dda17f559..f14a35bafd 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -155,7 +155,6 @@ do { \ static struct { erts_smp_mtx_t mtx; erts_smp_cnd_t cnd; - int aux_thread; int online; int curr_online; int wait_curr_online; @@ -622,18 +621,6 @@ erts_psd_set_init(Process *p, ErtsProcLocks plocks, int ix, void *data) #ifdef ERTS_SMP -static void -prepare_for_block(void *vrq) -{ - erts_smp_runq_unlock((ErtsRunQueue *) vrq); -} - -static void -resume_after_block(void *vrq) -{ - erts_smp_runq_lock((ErtsRunQueue *) vrq); -} - void erts_sched_finish_poke(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flags) { @@ -641,6 +628,13 @@ erts_sched_finish_poke(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flags) case ERTS_SSI_FLG_POLL_SLEEPING: erts_sys_schedule_interrupt(1); break; + case ERTS_SSI_FLG_POLL_SLEEPING|ERTS_SSI_FLG_TSE_SLEEPING: + /* + * Thread progress blocking while poll sleeping; need + * to signal on both... + */ + erts_sys_schedule_interrupt(1); + /* fall through */ case ERTS_SSI_FLG_TSE_SLEEPING: erts_tse_set(ssi->event); break; @@ -1482,11 +1476,50 @@ sched_set_sleeptype(ErtsSchedulerSleepInfo *ssi, erts_aint32_t sleep_type) static void -poke_ssi(void *vssi) +thr_prgr_wakeup(void *vssi) { erts_sched_poke((ErtsSchedulerSleepInfo *) vssi); } +static void +thr_prgr_prep_wait(void *vssi) +{ + ErtsSchedulerSleepInfo *ssi = (ErtsSchedulerSleepInfo *) vssi; + erts_smp_atomic32_read_bor_acqb(&ssi->flags, + ERTS_SSI_FLG_SLEEPING); +} + +static void +thr_prgr_wait(void *vssi) +{ + ErtsSchedulerSleepInfo *ssi = (ErtsSchedulerSleepInfo *) vssi; + erts_aint32_t xflgs = ERTS_SSI_FLG_SLEEPING; + + erts_tse_reset(ssi->event); + + while (1) { + erts_aint32_t aflgs, nflgs; + nflgs = xflgs | ERTS_SSI_FLG_TSE_SLEEPING; + aflgs = erts_smp_atomic32_cmpxchg_acqb(&ssi->flags, nflgs, xflgs); + if (aflgs == xflgs) { + erts_tse_wait(ssi->event); + break; + } + if ((aflgs & ERTS_SSI_FLG_SLEEPING) == 0) + break; + xflgs = aflgs; + } +} + +static void +thr_prgr_fin_wait(void *vssi) +{ + ErtsSchedulerSleepInfo *ssi = (ErtsSchedulerSleepInfo *) vssi; + erts_smp_atomic32_read_band_nob(&ssi->flags, + ~(ERTS_SSI_FLG_SLEEPING + | ERTS_SSI_FLG_TSE_SLEEPING)); +} + static void init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp); static void * @@ -1495,25 +1528,21 @@ aux_thread(void *unused) ErtsAuxWorkData *awdp = aux_thread_aux_work_data; ErtsSchedulerSleepInfo *ssi = ERTS_SCHED_SLEEP_INFO_IX(-1); erts_aint32_t aux_work; - ErtsThrPrgrWakeupCallback wake_me; + ErtsThrPrgrCallbacks callbacks; int thr_prgr_active = 1; - wake_me.wakeup = poke_ssi; - wake_me.arg = (void *) ssi; + ssi->event = erts_tse_fetch(); + + callbacks.arg = (void *) ssi; + callbacks.wakeup = thr_prgr_wakeup; + callbacks.prepare_wait = thr_prgr_prep_wait; + callbacks.wait = thr_prgr_wait; + callbacks.finalize_wait = thr_prgr_fin_wait; - erts_thr_progress_register_managed_thread(NULL, &wake_me, 1); + erts_thr_progress_register_managed_thread(NULL, &callbacks, 1); init_aux_work_data(awdp, NULL); awdp->ssi = ssi; - erts_register_blockable_thread(); - - ssi->event = erts_tse_fetch(); - - erts_smp_mtx_lock(&schdlr_sspnd.mtx); - schdlr_sspnd.aux_thread = 1; - erts_smp_cnd_signal(&schdlr_sspnd.cnd); - erts_smp_mtx_unlock(&schdlr_sspnd.mtx); - sched_prep_spin_wait(ssi); while (1) { @@ -1528,8 +1557,6 @@ aux_thread(void *unused) erts_thr_progress_leader_update(NULL); } - erts_smp_activity_begin(ERTS_ACTIVITY_WAIT, NULL, NULL, NULL); - if (!aux_work) { if (thr_prgr_active) erts_thr_progress_active(NULL, thr_prgr_active = 0); @@ -1552,8 +1579,6 @@ aux_thread(void *unused) erts_thr_progress_finalize_wait(NULL); } - erts_smp_activity_end(ERTS_ACTIVITY_WAIT, NULL, NULL, NULL); - flgs = sched_prep_spin_wait(ssi); } return NULL; @@ -1614,8 +1639,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) erts_thr_progress_leader_update(esdp); } - erts_smp_activity_begin(ERTS_ACTIVITY_WAIT, NULL, NULL, NULL); - if (aux_work) flgs = erts_smp_atomic32_read_acqb(&ssi->flags); else { @@ -1639,8 +1662,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) erts_thr_progress_finalize_wait(esdp); } - erts_smp_activity_end(ERTS_ACTIVITY_WAIT, NULL, NULL, NULL); - if (!(flgs & ERTS_SSI_FLG_WAITING)) { ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING)); break; @@ -1659,6 +1680,9 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) if (flgs & ~ERTS_SSI_FLG_SUSPENDED) erts_smp_atomic32_read_band_nob(&ssi->flags, ERTS_SSI_FLG_SUSPENDED); + if (!thr_prgr_active) + erts_thr_progress_active(esdp, thr_prgr_active = 1); + erts_smp_runq_lock(rq); sched_active(esdp->no, rq); @@ -1765,12 +1789,12 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) goto sys_poll_aux_work; } #ifdef ERTS_SMP - if (thr_prgr_active) - erts_thr_progress_active(esdp, thr_prgr_active = 0); flgs = sched_set_sleeptype(ssi, ERTS_SSI_FLG_POLL_SLEEPING); if (!(flgs & ERTS_SSI_FLG_SLEEPING)) { - if (!(flgs & ERTS_SSI_FLG_WAITING)) + if (!(flgs & ERTS_SSI_FLG_WAITING)) { + ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING)); goto sys_locked_woken; + } erts_smp_runq_unlock(rq); flgs = sched_prep_cont_spin_wait(ssi); if (!(flgs & ERTS_SSI_FLG_WAITING)) { @@ -1787,6 +1811,11 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) erts_smp_runq_unlock(rq); +#ifdef ERTS_SMP + if (thr_prgr_active) + erts_thr_progress_active(esdp, thr_prgr_active = 0); +#endif + ASSERT(!erts_port_task_have_outstanding_io_tasks()); erl_sys_schedule(0); @@ -1804,8 +1833,15 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) goto sys_aux_work; sys_woken: + if (!thr_prgr_active) + erts_thr_progress_active(esdp, thr_prgr_active = 1); erts_smp_runq_lock(rq); sys_locked_woken: + if (!thr_prgr_active) { + erts_smp_runq_unlock(rq); + erts_thr_progress_active(esdp, thr_prgr_active = 1); + erts_smp_runq_lock(rq); + } clear_sys_scheduling(); if (flgs & ~ERTS_SSI_FLG_SUSPENDED) erts_smp_atomic32_read_band_nob(&ssi->flags, ERTS_SSI_FLG_SUSPENDED); @@ -1813,12 +1849,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) sched_active_sys(esdp->no, rq); } -#ifdef ERTS_SMP - if (!thr_prgr_active) - erts_thr_progress_active(esdp, thr_prgr_active = 1); - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); -#endif } #ifdef ERTS_SMP @@ -3364,7 +3395,6 @@ erts_init_scheduling(int mrq, int no_schedulers, int no_schedulers_online) erts_smp_cnd_init(&schdlr_sspnd.cnd); erts_smp_atomic32_init_nob(&schdlr_sspnd.changing, 0); - schdlr_sspnd.aux_thread = 0; schdlr_sspnd.online = no_schedulers_online; schdlr_sspnd.curr_online = no_schedulers; schdlr_sspnd.msb.ongoing = 0; @@ -3557,18 +3587,6 @@ erts_get_max_no_executing_schedulers(void) #ifdef ERTS_SMP -static void -susp_sched_prep_block(void *unused) -{ - erts_smp_mtx_unlock(&schdlr_sspnd.mtx); -} - -static void -susp_sched_resume_block(void *unused) -{ - erts_smp_mtx_lock(&schdlr_sspnd.mtx); -} - static void scheduler_ix_resume_wake(Uint ix) { @@ -3759,8 +3777,6 @@ suspend_scheduler(ErtsSchedulerData *esdp) erts_thr_progress_leader_update(esdp); } - erts_smp_activity_begin(ERTS_ACTIVITY_WAIT, NULL, NULL, NULL); - if (!aux_work) { if (thr_prgr_active) erts_thr_progress_active(esdp, thr_prgr_active = 0); @@ -3785,8 +3801,6 @@ suspend_scheduler(ErtsSchedulerData *esdp) erts_thr_progress_finalize_wait(esdp); } - erts_smp_activity_end(ERTS_ACTIVITY_WAIT, NULL, NULL, NULL); - flgs = sched_prep_spin_suspended(ssi, (ERTS_SSI_FLG_WAITING | ERTS_SSI_FLG_SUSPENDED)); if (!(flgs & ERTS_SSI_FLG_SUSPENDED)) @@ -3796,7 +3810,6 @@ suspend_scheduler(ErtsSchedulerData *esdp) break; } - erts_smp_mtx_lock(&schdlr_sspnd.mtx); changing = erts_smp_atomic32_read_nob(&schdlr_sspnd.changing); } @@ -3906,12 +3919,16 @@ erts_set_schedulers_online(Process *p, Sint new_no, Sint *old_no) { - int ix, res, no, have_unlocked_plocks; + ErtsSchedulerData *esdp; + int ix, res, no, have_unlocked_plocks, end_wait; erts_aint32_t changing; if (new_no < 1 || erts_no_schedulers < new_no) return ERTS_SCHDLR_SSPND_EINVAL; + esdp = ERTS_PROC_GET_SCHDATA(p); + end_wait = 0; + erts_smp_mtx_lock(&schdlr_sspnd.mtx); have_unlocked_plocks = 0; @@ -4028,16 +4045,21 @@ erts_set_schedulers_online(Process *p, } } - erts_smp_activity_begin(ERTS_ACTIVITY_WAIT, - susp_sched_prep_block, - susp_sched_resume_block, - NULL); + if (schdlr_sspnd.curr_online != schdlr_sspnd.wait_curr_online) { + erts_smp_mtx_unlock(&schdlr_sspnd.mtx); + if (plocks && !have_unlocked_plocks) { + have_unlocked_plocks = 1; + erts_smp_proc_unlock(p, plocks); + } + erts_thr_progress_active(esdp, 0); + erts_thr_progress_prepare_wait(esdp); + end_wait = 1; + erts_smp_mtx_lock(&schdlr_sspnd.mtx); + } + while (schdlr_sspnd.curr_online != schdlr_sspnd.wait_curr_online) erts_smp_cnd_wait(&schdlr_sspnd.cnd, &schdlr_sspnd.mtx); - erts_smp_activity_end(ERTS_ACTIVITY_WAIT, - susp_sched_prep_block, - susp_sched_resume_block, - NULL); + ASSERT(res != ERTS_SCHDLR_SSPND_DONE ? (ERTS_SCHDLR_SSPND_CHNG_WAITER & erts_smp_atomic32_read_nob(&schdlr_sspnd.changing)) @@ -4045,10 +4067,15 @@ erts_set_schedulers_online(Process *p, == erts_smp_atomic32_read_nob(&schdlr_sspnd.changing))); erts_smp_atomic32_read_band_nob(&schdlr_sspnd.changing, ~ERTS_SCHDLR_SSPND_CHNG_WAITER); + } } erts_smp_mtx_unlock(&schdlr_sspnd.mtx); + if (end_wait) { + erts_thr_progress_finalize_wait(esdp); + erts_thr_progress_active(esdp, 1); + } if (have_unlocked_plocks) erts_smp_proc_lock(p, plocks); @@ -4133,17 +4160,38 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all) erts_smp_mtx_unlock(&balance_info.update_mtx); erts_smp_mtx_lock(&schdlr_sspnd.mtx); } - erts_smp_activity_begin(ERTS_ACTIVITY_WAIT, - susp_sched_prep_block, - susp_sched_resume_block, - NULL); - while (erts_smp_atomic32_read_nob(&schdlr_sspnd.active) - != schdlr_sspnd.msb.wait_active) - erts_smp_cnd_wait(&schdlr_sspnd.cnd, &schdlr_sspnd.mtx); - erts_smp_activity_end(ERTS_ACTIVITY_WAIT, - susp_sched_prep_block, - susp_sched_resume_block, - NULL); + + if (erts_smp_atomic32_read_nob(&schdlr_sspnd.active) + != schdlr_sspnd.msb.wait_active) { + ErtsSchedulerData *esdp; + + erts_smp_mtx_unlock(&schdlr_sspnd.mtx); + + if (plocks && !have_unlocked_plocks) { + have_unlocked_plocks = 1; + erts_smp_proc_unlock(p, plocks); + } + + esdp = ERTS_PROC_GET_SCHDATA(p); + + erts_thr_progress_active(esdp, 0); + erts_thr_progress_prepare_wait(esdp); + + erts_smp_mtx_lock(&schdlr_sspnd.mtx); + + while (erts_smp_atomic32_read_nob(&schdlr_sspnd.active) + != schdlr_sspnd.msb.wait_active) + erts_smp_cnd_wait(&schdlr_sspnd.cnd, + &schdlr_sspnd.mtx); + + erts_smp_mtx_unlock(&schdlr_sspnd.mtx); + + erts_thr_progress_active(esdp, 1); + erts_thr_progress_finalize_wait(esdp); + + erts_smp_mtx_lock(&schdlr_sspnd.mtx); + + } ASSERT(res != ERTS_SCHDLR_SSPND_DONE_MSCHED_BLOCKED ? (ERTS_SCHDLR_SSPND_CHNG_WAITER & erts_smp_atomic32_read_nob(&schdlr_sspnd.changing)) @@ -4331,13 +4379,18 @@ erts_multi_scheduling_blockers(Process *p) static void * sched_thread_func(void *vesdp) { - ErtsThrPrgrWakeupCallback wake_me; + ErtsThrPrgrCallbacks callbacks; ErtsSchedulerData *esdp = vesdp; Uint no = esdp->no; #ifdef ERTS_SMP - wake_me.wakeup = poke_ssi; - wake_me.arg = (void *) esdp->ssi; - erts_thr_progress_register_managed_thread(esdp, &wake_me, 0); + ERTS_SCHED_SLEEP_INFO_IX(no - 1)->event = erts_tse_fetch(); + callbacks.arg = (void *) esdp->ssi; + callbacks.wakeup = thr_prgr_wakeup; + callbacks.prepare_wait = thr_prgr_prep_wait; + callbacks.wait = thr_prgr_wait; + callbacks.finalize_wait = thr_prgr_fin_wait; + + erts_thr_progress_register_managed_thread(esdp, &callbacks, 0); erts_alloc_register_scheduler(vesdp); #endif #ifdef ERTS_ENABLE_LOCK_CHECK @@ -4356,15 +4409,18 @@ sched_thread_func(void *vesdp) erts_sched_init_check_cpu_bind(esdp); erts_proc_lock_prepare_proc_lock_waiter(); - ERTS_SCHED_SLEEP_INFO_IX(no - 1)->event = erts_tse_fetch(); - - #endif - erts_register_blockable_thread(); + #ifdef HIPE hipe_thread_signal_init(); #endif erts_thread_init_float(); + + if (no == 1) { + erts_thr_progress_active(esdp, 0); + erts_thr_progress_prepare_wait(esdp); + } + erts_smp_mtx_lock(&schdlr_sspnd.mtx); ASSERT(erts_smp_atomic32_read_nob(&schdlr_sspnd.changing) @@ -4378,23 +4434,17 @@ sched_thread_func(void *vesdp) } if (no == 1) { - if (schdlr_sspnd.curr_online != schdlr_sspnd.wait_curr_online) { - erts_smp_activity_begin(ERTS_ACTIVITY_WAIT, - susp_sched_prep_block, - susp_sched_resume_block, - NULL); - while (schdlr_sspnd.curr_online != schdlr_sspnd.wait_curr_online - || !schdlr_sspnd.aux_thread) - erts_smp_cnd_wait(&schdlr_sspnd.cnd, &schdlr_sspnd.mtx); - erts_smp_activity_end(ERTS_ACTIVITY_WAIT, - susp_sched_prep_block, - susp_sched_resume_block, - NULL); - } + while (schdlr_sspnd.curr_online != schdlr_sspnd.wait_curr_online) + erts_smp_cnd_wait(&schdlr_sspnd.cnd, &schdlr_sspnd.mtx); ERTS_SCHDLR_SSPND_CHNG_SET(0, ERTS_SCHDLR_SSPND_CHNG_WAITER); } erts_smp_mtx_unlock(&schdlr_sspnd.mtx); + if (no == 1) { + erts_thr_progress_finalize_wait(esdp); + erts_thr_progress_active(esdp, 1); + } + #ifdef ERTS_DO_VERIFY_UNUSED_TEMP_ALLOC esdp->verify_unused_temp_alloc = erts_alloc_get_verify_unused_temp_alloc( @@ -4431,12 +4481,6 @@ erts_start_schedulers(void) res = ENOTSUP; } - erts_block_system(0); - - res = ethr_thr_create(&aux_tid, aux_thread, NULL, &opts); - if (res != 0) - erl_exit(1, "Failed to create aux thread\n"); - while (actual < wanted) { ErtsSchedulerData *esdp = ERTS_SCHEDULER_IX(actual); actual++; @@ -4449,7 +4493,12 @@ erts_start_schedulers(void) } erts_no_schedulers = actual; - erts_release_system(); + + ERTS_THR_MEMORY_BARRIER; + + res = ethr_thr_create(&aux_tid, aux_thread, NULL, &opts); + if (res != 0) + erl_exit(1, "Failed to create aux thread\n"); if (actual < 1) erl_exit(1, @@ -5810,7 +5859,7 @@ Process *schedule(Process *p, int calls) input_reductions = INPUT_REDUCTIONS; } - ERTS_SMP_LC_ASSERT(!ERTS_LC_IS_BLOCKING); + ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); /* * Clean up after the process being scheduled out. @@ -5948,7 +5997,8 @@ Process *schedule(Process *p, int calls) } - ERTS_SMP_LC_ASSERT(!ERTS_LC_IS_BLOCKING); + ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); + check_activities_to_run: { #ifdef ERTS_SMP @@ -5958,7 +6008,7 @@ Process *schedule(Process *p, int calls) check_balance(rq); } - ERTS_SMP_LC_ASSERT(!ERTS_LC_IS_BLOCKING); + ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); if (rq->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK) @@ -5996,11 +6046,7 @@ Process *schedule(Process *p, int calls) } } - erts_smp_chk_system_block(prepare_for_block, - resume_after_block, - (void *) rq); - - ERTS_SMP_LC_ASSERT(!ERTS_LC_IS_BLOCKING); + ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); #else /* ERTS_SMP */ @@ -6327,14 +6373,14 @@ erts_sched_stat_modify(int what) int ix; switch (what) { case ERTS_SCHED_STAT_MODIFY_ENABLE: - erts_smp_block_system(0); + erts_smp_thr_progress_block(); erts_sched_stat.enabled = 1; - erts_smp_release_system(); + erts_smp_thr_progress_unblock(); break; case ERTS_SCHED_STAT_MODIFY_DISABLE: - erts_smp_block_system(0); + erts_smp_thr_progress_block(); erts_sched_stat.enabled = 1; - erts_smp_release_system(); + erts_smp_thr_progress_unblock(); break; case ERTS_SCHED_STAT_MODIFY_CLEAR: erts_smp_spin_lock(&erts_sched_stat.lock); @@ -6489,7 +6535,7 @@ erts_get_exact_total_reductions(Process *c_p, Uint *redsp, Uint *diffp) * Wait for other schedulers to schedule out their processes * and update 'reductions'. */ - erts_smp_block_system(0); + erts_smp_thr_progress_block(); for (reds = 0, ix = 0; ix < erts_no_run_queues; ix++) reds += ERTS_RUNQ_IX(ix)->procs.reductions; if (redsp) @@ -6497,7 +6543,7 @@ erts_get_exact_total_reductions(Process *c_p, Uint *redsp, Uint *diffp) if (diffp) *diffp = reds - last_exact_reductions; last_exact_reductions = reds; - erts_smp_release_system(); + erts_smp_thr_progress_unblock(); erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MAIN); } @@ -9302,6 +9348,22 @@ init_processes_bif(void) * Debug stuff */ +#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK) +int +erts_dbg_check_halloc_lock(Process *p) +{ + if (ERTS_PROC_LOCK_MAIN & erts_proc_lc_my_proc_locks(p)) + return 1; + if (p->id == ERTS_INVALID_PID) + return 1; + if (p->scheduler_data && p == p->scheduler_data->match_pseudo_process) + return 1; + if (erts_thr_progress_is_blocking()) + return 1; + return 0; +} +#endif + Eterm erts_debug_processes(Process *c_p) { -- cgit v1.2.3