aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_process.c
diff options
context:
space:
mode:
authorLukas Larsson <[email protected]>2018-12-06 11:53:18 +0100
committerLukas Larsson <[email protected]>2018-12-06 11:53:18 +0100
commitea7d6c39f2179b2240d55df4a1ddd515b6d32832 (patch)
tree117436f49db44ee51a9b28c0cf94e094c31ed280 /erts/emulator/beam/erl_process.c
parent551e15331967551307b1618248334b20dce3241c (diff)
parent73e4f5f21bbf492ab61c01dee48fa09fd7309a50 (diff)
downloadotp-ea7d6c39f2179b2240d55df4a1ddd515b6d32832.tar.gz
otp-ea7d6c39f2179b2240d55df4a1ddd515b6d32832.tar.bz2
otp-ea7d6c39f2179b2240d55df4a1ddd515b6d32832.zip
Merge branch 'maint'
Conflicts: erts/emulator/beam/erl_process.c
Diffstat (limited to 'erts/emulator/beam/erl_process.c')
-rw-r--r--erts/emulator/beam/erl_process.c200
1 files changed, 156 insertions, 44 deletions
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 1dde9800f8..a24f4bc193 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -174,7 +174,6 @@ ErtsLcPSDLocks erts_psd_required_locks[ERTS_PSD_SIZE];
typedef struct {
int aux_work;
int tse;
- int sys_schedule;
} ErtsBusyWaitParams;
static ErtsBusyWaitParams sched_busy_wait_params[ERTS_SCHED_TYPE_LAST + 1];
@@ -344,6 +343,9 @@ erts_sched_stat_t erts_sched_stat;
static erts_tsd_key_t ERTS_WRITE_UNLIKELY(sched_data_key);
+#if ERTS_POLL_USE_SCHEDULER_POLLING
+static erts_atomic32_t doing_sys_schedule;
+#endif
static erts_atomic32_t no_empty_run_queues;
long erts_runq_supervision_interval = 0;
static ethr_event runq_supervision_event;
@@ -1646,7 +1648,7 @@ haw_thr_prgr_wakeup(ErtsAuxWorkData *awdp, ErtsThrPrgrVal val)
awdp->latest_wakeup = val;
haw_chk_later_cleanup_op_wakeup(awdp, val);
}
- erts_thr_progress_wakeup(awdp->esdp, val);
+ erts_thr_progress_wakeup(erts_thr_prgr_data(awdp->esdp), val);
}
}
@@ -1656,7 +1658,7 @@ haw_thr_prgr_soft_wakeup(ErtsAuxWorkData *awdp, ErtsThrPrgrVal val)
if (erts_thr_progress_cmp(val, awdp->latest_wakeup) > 0) {
awdp->latest_wakeup = val;
haw_chk_later_cleanup_op_wakeup(awdp, val);
- erts_thr_progress_wakeup(awdp->esdp, val);
+ erts_thr_progress_wakeup(erts_thr_prgr_data(awdp->esdp), val);
}
}
@@ -1670,7 +1672,7 @@ haw_thr_prgr_later_cleanup_op_wakeup(ErtsAuxWorkData *awdp, ErtsThrPrgrVal val,
else {
awdp->latest_wakeup = val;
awdp->later_op.size = thr_prgr_later_cleanup_op_threshold;
- erts_thr_progress_wakeup(awdp->esdp, val);
+ erts_thr_progress_wakeup(erts_thr_prgr_data(awdp->esdp), val);
}
}
}
@@ -3066,6 +3068,7 @@ aux_thread(void *unused)
ErtsSchedulerSleepInfo *ssi = ERTS_SCHED_SLEEP_INFO_IX(-1);
erts_aint32_t aux_work;
ErtsThrPrgrCallbacks callbacks;
+ ErtsThrPrgrData *tpd;
int thr_prgr_active = 1;
ERTS_MSACC_DECLARE_CACHE();
@@ -3087,12 +3090,16 @@ aux_thread(void *unused)
callbacks.wait = thr_prgr_wait;
callbacks.finalize_wait = thr_prgr_fin_wait;
- erts_thr_progress_register_managed_thread(NULL, &callbacks, 1);
+ tpd = erts_thr_progress_register_managed_thread(NULL, &callbacks, 1);
init_aux_work_data(awdp, NULL, NULL);
awdp->ssi = ssi;
#if ERTS_POLL_USE_FALLBACK
- ssi->psi = erts_create_pollset_thread(-1);
+#if ERTS_POLL_USE_SCHEDULER_POLLING
+ ssi->psi = erts_create_pollset_thread(-2, tpd);
+#else
+ ssi->psi = erts_create_pollset_thread(-1, tpd);
+#endif
#endif
sched_prep_spin_wait(ssi);
@@ -3105,11 +3112,11 @@ aux_thread(void *unused)
aux_work = erts_atomic32_read_acqb(&ssi->aux_work);
if (aux_work) {
if (!thr_prgr_active)
- erts_thr_progress_active(NULL, thr_prgr_active = 1);
+ erts_thr_progress_active(tpd, thr_prgr_active = 1);
aux_work = handle_aux_work(awdp, aux_work, 1);
ERTS_MSACC_UPDATE_CACHE();
- if (aux_work && erts_thr_progress_update(NULL))
- erts_thr_progress_leader_update(NULL);
+ if (aux_work && erts_thr_progress_update(tpd))
+ erts_thr_progress_leader_update(tpd);
}
if (!aux_work) {
@@ -3120,7 +3127,7 @@ aux_thread(void *unused)
#endif
if (thr_prgr_active)
- erts_thr_progress_active(NULL, thr_prgr_active = 0);
+ erts_thr_progress_active(tpd, thr_prgr_active = 0);
#if ERTS_POLL_USE_FALLBACK
@@ -3132,11 +3139,11 @@ aux_thread(void *unused)
if (flgs & ERTS_SSI_FLG_SLEEPING) {
ASSERT(flgs & ERTS_SSI_FLG_POLL_SLEEPING);
ASSERT(flgs & ERTS_SSI_FLG_WAITING);
- erts_check_io(ssi->psi);
+ erts_check_io(ssi->psi, ERTS_POLL_INF_TIMEOUT);
}
}
#else
- erts_thr_progress_prepare_wait(NULL);
+ erts_thr_progress_prepare_wait(tpd);
flgs = sched_spin_wait(ssi, 0);
@@ -3153,7 +3160,7 @@ aux_thread(void *unused)
ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_OTHER);
}
}
- erts_thr_progress_finalize_wait(NULL);
+ erts_thr_progress_finalize_wait(tpd);
#endif
}
@@ -3171,7 +3178,8 @@ poll_thread(void *arg)
erts_aint32_t aux_work;
ErtsThrPrgrCallbacks callbacks;
int thr_prgr_active = 1;
- struct erts_poll_thread *psi = erts_create_pollset_thread(id);
+ struct erts_poll_thread *psi;
+ ErtsThrPrgrData *tpd;
ERTS_MSACC_DECLARE_CACHE();
#ifdef ERTS_ENABLE_LOCK_CHECK
@@ -3192,9 +3200,12 @@ poll_thread(void *arg)
callbacks.wait = thr_prgr_wait;
callbacks.finalize_wait = thr_prgr_fin_wait;
- erts_thr_progress_register_managed_thread(NULL, &callbacks, 0);
+ tpd = erts_thr_progress_register_managed_thread(NULL, &callbacks, 0);
init_aux_work_data(awdp, NULL, NULL);
awdp->ssi = ssi;
+
+ psi = erts_create_pollset_thread(id, tpd);
+
ssi->psi = psi;
sched_prep_spin_wait(ssi);
@@ -3207,16 +3218,16 @@ poll_thread(void *arg)
aux_work = erts_atomic32_read_acqb(&ssi->aux_work);
if (aux_work) {
if (!thr_prgr_active)
- erts_thr_progress_active(NULL, thr_prgr_active = 1);
+ erts_thr_progress_active(tpd, thr_prgr_active = 1);
aux_work = handle_aux_work(awdp, aux_work, 1);
ERTS_MSACC_UPDATE_CACHE();
- if (aux_work && erts_thr_progress_update(NULL))
- erts_thr_progress_leader_update(NULL);
+ if (aux_work && erts_thr_progress_update(tpd))
+ erts_thr_progress_leader_update(tpd);
}
if (!aux_work) {
if (thr_prgr_active)
- erts_thr_progress_active(NULL, thr_prgr_active = 0);
+ erts_thr_progress_active(tpd, thr_prgr_active = 0);
flgs = sched_spin_wait(ssi, 0);
@@ -3226,7 +3237,7 @@ poll_thread(void *arg)
if (flgs & ERTS_SSI_FLG_SLEEPING) {
ASSERT(flgs & ERTS_SSI_FLG_POLL_SLEEPING);
ASSERT(flgs & ERTS_SSI_FLG_WAITING);
- erts_check_io(psi);
+ erts_check_io(psi, ERTS_POLL_INF_TIMEOUT);
}
}
}
@@ -3236,6 +3247,59 @@ poll_thread(void *arg)
return NULL;
}
+#if ERTS_POLL_USE_SCHEDULER_POLLING
+static ERTS_INLINE void
+clear_sys_scheduling(void)
+{
+ erts_atomic32_set_mb(&doing_sys_schedule, 0);
+}
+
+static ERTS_INLINE int
+try_set_sys_scheduling(void)
+{
+ return 0 == erts_atomic32_cmpxchg_acqb(&doing_sys_schedule, 1, 0);
+}
+
+
+static ERTS_INLINE int
+prepare_for_sys_schedule(void)
+{
+ while (!erts_port_task_have_outstanding_io_tasks()
+ && try_set_sys_scheduling()) {
+ if (!erts_port_task_have_outstanding_io_tasks())
+ return 1;
+ clear_sys_scheduling();
+ }
+ return 0;
+}
+
+static void
+check_io_timer(void *null)
+{
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ if (prepare_for_sys_schedule()) {
+ erts_check_io(esdp->ssi->psi, ERTS_POLL_NO_TIMEOUT);
+ clear_sys_scheduling();
+ }
+
+ /* The timer is cleared if this schedulers run-queue became empty
+ or if the CHECKIO flag was cleared. The CHECKIO flags is cleared
+ when a check_balance assigns another scheduler to be the poller in
+ the overload scenario. */
+ if ((ERTS_RUNQ_FLGS_GET_NOB(esdp->run_queue) & (ERTS_RUNQ_FLG_OUT_OF_WORK|ERTS_RUNQ_FLG_CHECKIO))
+ == ERTS_RUNQ_FLG_CHECKIO) {
+ erts_start_timer_callback(ERTS_POLL_SCHEDULER_POLLING_TIMEOUT,
+ check_io_timer, NULL);
+ } else {
+ ERTS_RUNQ_FLGS_UNSET(esdp->run_queue, ERTS_RUNQ_FLG_CHECKIO);
+ }
+}
+
+#else
+#define clear_sys_scheduling()
+#define prepare_for_sys_schedule() 0
+#endif
+
static void
scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
{
@@ -3291,13 +3355,13 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
if (aux_work) {
if (!thr_prgr_active) {
- erts_thr_progress_active(esdp, thr_prgr_active = 1);
+ erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
aux_work = handle_aux_work(&esdp->aux_work_data, aux_work, 1);
ERTS_MSACC_UPDATE_CACHE();
- if (aux_work && erts_thr_progress_update(esdp))
- erts_thr_progress_leader_update(esdp);
+ if (aux_work && erts_thr_progress_update(erts_thr_prgr_data(esdp)))
+ erts_thr_progress_leader_update(erts_thr_prgr_data(esdp));
}
if (aux_work) {
@@ -3305,7 +3369,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
current_time = erts_get_monotonic_time(esdp);
if (current_time >= erts_next_timeout_time(esdp->next_tmo_ref)) {
if (!thr_prgr_active) {
- erts_thr_progress_active(esdp, thr_prgr_active = 1);
+ erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
erts_bump_timers(esdp->timer_wheel, current_time);
@@ -3324,19 +3388,36 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
}
if (do_timeout) {
if (!thr_prgr_active) {
- erts_thr_progress_active(esdp, thr_prgr_active = 1);
+ erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
}
- else {
+ else if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && prepare_for_sys_schedule()) {
+ /* We sleep in check_io, only for normal schedulers */
+ if (thr_prgr_active) {
+ erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 0);
+ sched_wall_time_change(esdp, 0);
+ }
+ flgs = sched_spin_wait(ssi, 0);
+ if (flgs & ERTS_SSI_FLG_SLEEPING) {
+ ASSERT(flgs & ERTS_SSI_FLG_WAITING);
+ flgs = sched_set_sleeptype(ssi, ERTS_SSI_FLG_POLL_SLEEPING);
+ if (flgs & ERTS_SSI_FLG_SLEEPING) {
+ ASSERT(flgs & ERTS_SSI_FLG_POLL_SLEEPING);
+ ASSERT(flgs & ERTS_SSI_FLG_WAITING);
+ erts_check_io(ssi->psi, timeout_time);
+ current_time = erts_get_monotonic_time(esdp);
+ }
+ }
+ clear_sys_scheduling();
+ } else {
if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) {
if (thr_prgr_active) {
- erts_thr_progress_active(esdp, thr_prgr_active = 0);
+ erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 0);
sched_wall_time_change(esdp, 0);
}
- erts_thr_progress_prepare_wait(esdp);
+ erts_thr_progress_prepare_wait(erts_thr_prgr_data(esdp));
}
-
flgs = sched_spin_wait(ssi, spincount);
if (flgs & ERTS_SSI_FLG_SLEEPING) {
ASSERT(flgs & ERTS_SSI_FLG_WAITING);
@@ -3366,7 +3447,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
}
}
if (!ERTS_SCHEDULER_IS_DIRTY(esdp))
- erts_thr_progress_finalize_wait(esdp);
+ erts_thr_progress_finalize_wait(erts_thr_prgr_data(esdp));
}
if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && current_time >= timeout_time)
erts_bump_timers(esdp->timer_wheel, current_time);
@@ -3395,7 +3476,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
if (ERTS_SCHEDULER_IS_DIRTY(esdp))
dirty_sched_wall_time_change(esdp, working = 1);
else if (!thr_prgr_active) {
- erts_thr_progress_active(esdp, thr_prgr_active = 1);
+ erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
@@ -4576,6 +4657,15 @@ check_balance(ErtsRunQueue *c_rq)
if (blnc_no_rqs == 1) {
c_rq->check_balance_reds = INT_MAX;
erts_atomic32_set_nob(&balance_info.checking_balance, 0);
+#if ERTS_POLL_USE_SCHEDULER_POLLING
+ c_rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS;
+ if ((ERTS_RUNQ_FLGS_GET_NOB(c_rq) & (ERTS_RUNQ_FLG_OUT_OF_WORK|ERTS_RUNQ_FLG_CHECKIO))
+ == 0) {
+ ERTS_RUNQ_FLGS_SET(c_rq, ERTS_RUNQ_FLG_CHECKIO);
+ erts_start_timer_callback(ERTS_POLL_SCHEDULER_POLLING_TIMEOUT, check_io_timer, NULL);
+ }
+ ERTS_RUNQ_FLGS_UNSET(c_rq, ERTS_RUNQ_FLGS_MIGRATION_INFO);
+#endif
return;
}
@@ -5095,6 +5185,19 @@ erts_fprintf(stderr, "--------------------------------\n");
/* Publish new migration paths... */
erts_atomic_set_wb(&erts_migration_paths, (erts_aint_t) new_mpaths);
+#if ERTS_POLL_USE_SCHEDULER_POLLING
+ if (full_scheds == current_active) {
+ ERTS_ASSERT(full_scheds <= current_active);
+ /* All active schedulers ran for full, we need to do active polling,
+ so we setup a timer that does active polling */
+ if (!(ERTS_RUNQ_FLGS_GET_NOB(c_rq) & ERTS_RUNQ_FLG_CHECKIO)) {
+ /* Active polling is not running, start it */
+ erts_start_timer_callback(ERTS_POLL_SCHEDULER_POLLING_TIMEOUT, check_io_timer, NULL);
+ }
+ run_queue_info[c_rq->ix].flags |= ERTS_RUNQ_FLG_CHECKIO;
+ }
+#endif
+
/* Reset balance statistics in all online queues */
for (qix = 0; qix < blnc_no_rqs; qix++) {
Uint32 flags = run_queue_info[qix].flags;
@@ -5104,6 +5207,8 @@ erts_fprintf(stderr, "--------------------------------\n");
ASSERT(!(flags & ERTS_RUNQ_FLG_OUT_OF_WORK));
if (rq->waiting)
flags |= ERTS_RUNQ_FLG_OUT_OF_WORK;
+ if (rq != c_rq)
+ flags &= ~ERTS_RUNQ_FLG_CHECKIO;
rq->full_reds_history_sum
= run_queue_info[qix].full_reds_history_sum;
@@ -5113,8 +5218,7 @@ erts_fprintf(stderr, "--------------------------------\n");
ERTS_DBG_CHK_FULL_REDS_HISTORY(rq);
rq->out_of_work_count = 0;
- (void) ERTS_RUNQ_FLGS_READ_BSET(rq, ERTS_RUNQ_FLGS_MIGRATION_INFO, flags);
-
+ (void) ERTS_RUNQ_FLGS_READ_BSET(rq, ERTS_RUNQ_FLGS_MIGRATION_INFO|ERTS_RUNQ_FLG_CHECKIO, flags);
rq->max_len = erts_atomic32_read_dirty(&rq->len);
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
ErtsRunQueueInfo *rqi;
@@ -5553,7 +5657,6 @@ erts_sched_set_busy_wait_threshold(ErtsSchedType sched_type, char *str)
return EINVAL;
}
- params->sys_schedule = sys_sched;
params->tse = sys_sched * ERTS_SCHED_TSE_SLEEP_SPINCOUNT_FACT;
params->aux_work = sys_sched * aux_work_fact;
@@ -5764,6 +5867,9 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online, int no_poll_th
size_runqs = sizeof(ErtsAlignedRunQueue) * tot_rqs;
erts_aligned_run_queues =
erts_alloc_permanent_cache_aligned(ERTS_ALC_T_RUNQS, size_runqs);
+#if ERTS_POLL_USE_SCHEDULER_POLLING
+ erts_atomic32_init_nob(&doing_sys_schedule, 0);
+#endif
erts_atomic32_init_nob(&no_empty_run_queues, 0);
erts_no_run_queues = n;
@@ -7551,7 +7657,8 @@ suspend_scheduler(ErtsSchedulerData *esdp)
if (aux_work|evacuate) {
if (!thr_prgr_active) {
- erts_thr_progress_active(esdp, thr_prgr_active = 1);
+ erts_thr_progress_active(erts_thr_prgr_data(esdp),
+ thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
if (aux_work)
@@ -7559,8 +7666,8 @@ suspend_scheduler(ErtsSchedulerData *esdp)
aux_work,
1);
- if (aux_work && erts_thr_progress_update(esdp))
- erts_thr_progress_leader_update(esdp);
+ if (aux_work && erts_thr_progress_update(erts_thr_prgr_data(esdp)))
+ erts_thr_progress_leader_update(erts_thr_prgr_data(esdp));
if (evacuate) {
erts_runq_lock(esdp->run_queue);
evacuate_run_queue(esdp->run_queue, &sbp);
@@ -7579,18 +7686,18 @@ suspend_scheduler(ErtsSchedulerData *esdp)
if (!aux_work && current_time < timeout_time) {
/* go to sleep... */
if (thr_prgr_active) {
- erts_thr_progress_active(esdp, thr_prgr_active = 0);
+ erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 0);
sched_wall_time_change(esdp, 0);
}
- erts_thr_progress_prepare_wait(NULL);
+ erts_thr_progress_prepare_wait(erts_thr_prgr_data(NULL));
suspend_normal_scheduler_sleep(esdp);
- erts_thr_progress_finalize_wait(NULL);
+ erts_thr_progress_finalize_wait(erts_thr_prgr_data(NULL));
current_time = erts_get_monotonic_time(esdp);
}
if (current_time >= timeout_time) {
if (!thr_prgr_active) {
- erts_thr_progress_active(esdp, thr_prgr_active = 1);
+ erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
erts_bump_timers(esdp->timer_wheel, current_time);
@@ -7647,7 +7754,7 @@ suspend_scheduler(ErtsSchedulerData *esdp)
profile_scheduler(make_small(esdp->no), am_active);
if (!thr_prgr_active) {
- erts_thr_progress_active(esdp, thr_prgr_active = 1);
+ erts_thr_progress_active(erts_thr_prgr_data(esdp), thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
}
@@ -8282,6 +8389,11 @@ sched_thread_func(void *vesdp)
erts_msacc_init_thread("scheduler", no, 1);
erts_thr_progress_register_managed_thread(esdp, &callbacks, 0);
+
+#if ERTS_POLL_USE_SCHEDULER_POLLING
+ esdp->ssi->psi = erts_create_pollset_thread(-1, NULL);
+#endif
+
erts_alloc_register_scheduler(vesdp);
#ifdef ERTS_ENABLE_LOCK_CHECK
{
@@ -9296,12 +9408,12 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
}
}
- leader_update = erts_thr_progress_update(esdp);
+ leader_update = erts_thr_progress_update(erts_thr_prgr_data(esdp));
aux_work = erts_atomic32_read_acqb(&esdp->ssi->aux_work);
if (aux_work | leader_update) {
erts_runq_unlock(rq);
if (leader_update)
- erts_thr_progress_leader_update(esdp);
+ erts_thr_progress_leader_update(erts_thr_prgr_data(esdp));
if (aux_work)
handle_aux_work(&esdp->aux_work_data, aux_work, 0);
erts_runq_lock(rq);