aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_process.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_process.c')
-rw-r--r--erts/emulator/beam/erl_process.c1045
1 files changed, 878 insertions, 167 deletions
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 21fd8dd50a..937881212a 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1996-2013. All Rights Reserved.
+ * Copyright Ericsson AB 1996-2014. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
@@ -144,7 +144,13 @@ extern BeamInstr beam_exit[];
extern BeamInstr beam_continue_exit[];
int erts_sched_compact_load;
+int erts_sched_balance_util = 0;
Uint erts_no_schedulers;
+#ifdef ERTS_DIRTY_SCHEDULERS
+Uint erts_no_dirty_cpu_schedulers;
+Uint erts_no_dirty_cpu_schedulers_online;
+Uint erts_no_dirty_io_schedulers;
+#endif
#define ERTS_THR_PRGR_LATER_CLEANUP_OP_THRESHOLD_VERY_LAZY (4*1024*1024)
#define ERTS_THR_PRGR_LATER_CLEANUP_OP_THRESHOLD_LAZY (512*1024)
@@ -258,6 +264,10 @@ ErtsAlignedRunQueue *erts_aligned_run_queues;
Uint erts_no_run_queues;
ErtsAlignedSchedulerData *erts_aligned_scheduler_data;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ErtsAlignedSchedulerData *erts_aligned_dirty_cpu_scheduler_data;
+ErtsAlignedSchedulerData *erts_aligned_dirty_io_scheduler_data;
+#endif
typedef union {
ErtsSchedulerSleepInfo ssi;
@@ -265,6 +275,12 @@ typedef union {
} ErtsAlignedSchedulerSleepInfo;
static ErtsAlignedSchedulerSleepInfo *aligned_sched_sleep_info;
+#ifdef ERTS_DIRTY_SCHEDULERS
+#ifdef ERTS_SMP
+static ErtsAlignedSchedulerSleepInfo *aligned_dirty_cpu_sched_sleep_info;
+static ErtsAlignedSchedulerSleepInfo *aligned_dirty_io_sched_sleep_info;
+#endif
+#endif
static Uint last_reductions;
static Uint last_exact_reductions;
@@ -331,6 +347,16 @@ ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(proclist,
(ASSERT(-1 <= ((int) (IX)) \
&& ((int) (IX)) < ((int) erts_no_schedulers)), \
&aligned_sched_sleep_info[(IX)].ssi)
+#ifdef ERTS_DIRTY_SCHEDULERS
+#define ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(IX) \
+ (ASSERT(0 <= ((int) (IX)) \
+ && ((int) (IX)) < ((int) erts_no_dirty_cpu_schedulers)), \
+ &aligned_dirty_cpu_sched_sleep_info[(IX)].ssi)
+#define ERTS_DIRTY_IO_SCHED_SLEEP_INFO_IX(IX) \
+ (ASSERT(0 <= ((int) (IX)) \
+ && ((int) (IX)) < ((int) erts_no_dirty_io_schedulers)), \
+ &aligned_dirty_io_sched_sleep_info[(IX)].ssi)
+#endif
#define ERTS_FOREACH_RUNQ(RQVAR, DO) \
do { \
@@ -518,6 +544,13 @@ erts_pre_init_process(void)
erts_psd_required_locks[ERTS_PSD_DELAYED_GC_TASK_QS].set_locks
= ERTS_PSD_DELAYED_GC_TASK_QS_SET_LOCKS;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ erts_psd_required_locks[ERTS_PSD_DIRTY_SCHED_TRAP_EXPORT].get_locks
+ = ERTS_PSD_DIRTY_SCHED_TRAP_EXPORT_GET_LOCKS;
+ erts_psd_required_locks[ERTS_PSD_DIRTY_SCHED_TRAP_EXPORT].set_locks
+ = ERTS_PSD_DIRTY_SCHED_TRAP_EXPORT_SET_LOCKS;
+#endif
+
/* Check that we have locks for all entries */
for (ix = 0; ix < ERTS_PSD_SIZE; ix++) {
ERTS_SMP_LC_ASSERT(erts_psd_required_locks[ix].get_locks);
@@ -608,6 +641,7 @@ erts_late_init_process(void)
static void
init_sched_wall_time(ErtsSchedWallTime *swtp)
{
+ swtp->need = erts_sched_balance_util;
swtp->enabled = 0;
swtp->start = 0;
swtp->working.total = 0;
@@ -630,27 +664,253 @@ sched_wall_time_ts(void)
#endif
}
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+
+#ifdef ARCH_64
+
+static ERTS_INLINE Uint64
+aschedtime_read(ErtsAtomicSchedTime *var)
+{
+ return (Uint64) erts_atomic_read_nob((erts_atomic_t *) var);
+}
+
+static ERTS_INLINE void
+aschedtime_set(ErtsAtomicSchedTime *var, Uint64 val)
+{
+ erts_atomic_set_nob((erts_atomic_t *) var, (erts_aint_t) val);
+}
+
+static ERTS_INLINE void
+aschedtime_init(ErtsAtomicSchedTime *var)
+{
+ erts_atomic_init_nob((erts_atomic_t *) var, (erts_aint_t) 0);
+}
+
+#elif defined(ARCH_32)
+
+static ERTS_INLINE Uint64
+aschedtime_read(ErtsAtomicSchedTime *var)
+{
+ erts_dw_aint_t dw;
+ erts_dw_atomic_read_nob((erts_dw_atomic_t *) var, &dw);
+#ifdef ETHR_SU_DW_NAINT_T__
+ return (Uint64) dw.dw_sint;
+#else
+ {
+ Uint64 res;
+ res = (Uint64) ((Uint32) dw.sint[ERTS_DW_AINT_HIGH_WORD]);
+ res <<= 32;
+ res |= (Uint64) ((Uint32) dw.sint[ERTS_DW_AINT_LOW_WORD]);
+ return res;
+ }
+#endif
+}
+
+static ERTS_INLINE void
+aschedtime_set(ErtsAtomicSchedTime *var, Uint64 val)
+{
+ erts_dw_aint_t dw;
+#ifdef ETHR_SU_DW_NAINT_T__
+ dw.dw_sint = (ETHR_SU_DW_NAINT_T__) val;
+#else
+ dw.sint[ERTS_DW_AINT_LOW_WORD] = (erts_aint_t) (val & 0xffffffff);
+ dw.sint[ERTS_DW_AINT_HIGH_WORD] = (erts_aint_t) ((val >> 32) & 0xffffffff);
+#endif
+ erts_dw_atomic_set_nob((erts_dw_atomic_t *) var, &dw);
+}
+
+static ERTS_INLINE void
+aschedtime_init(ErtsAtomicSchedTime *var)
+{
+ erts_dw_aint_t dw;
+ dw.sint[ERTS_DW_AINT_LOW_WORD] = (erts_aint_t) 0;
+ dw.sint[ERTS_DW_AINT_HIGH_WORD] = (erts_aint_t) 0;
+ erts_dw_atomic_init_nob((erts_dw_atomic_t *) var, &dw);
+}
+
+#else
+# error :-/
+#endif
+
+#define ERTS_GET_AVG_MAX_UNLOCKED_TRY 50
+#define ERTS_SCHED_AVG_UTIL_WRITE_MARKER (~((Uint64) 0))
+
+/* Intervals in nanoseconds */
+#define ERTS_SCHED_UTIL_SHORT_INTERVAL ((Uint64) 1*1000*1000*1000)
+#define ERTS_SCHED_UTIL_LONG_INTERVAL ((Uint64) 10*1000*1000*1000)
+
+
+#define ERTS_SCHED_UTIL_IGNORE_IMBALANCE_DIFF 5000 /* ppm */
+
+static ERTS_INLINE Uint64
+calc_sched_worktime(int is_working, Uint64 now, Uint64 last,
+ Uint64 interval, Uint64 old_worktime)
+{
+ Uint64 worktime;
+ Uint64 new;
+
+ if (now <= last)
+ return old_worktime;
+
+ new = now - last;
+
+ if (new >= interval)
+ return is_working ? interval : (Uint64) 0;
+
+
+ /*
+ * Division by 1000 in order to avoid
+ * overflow. If changed update assertions
+ * in init_runq_sched_util().
+ */
+ worktime = old_worktime;
+ worktime *= (interval - new)/1000;
+ worktime /= (interval/1000);
+ if (is_working)
+ worktime += new;
+
+ ASSERT(0 <= worktime && worktime <= interval);
+
+ return worktime;
+}
+
+static ERTS_INLINE void
+update_avg_sched_util(ErtsSchedulerData *esdp, Uint64 now, int is_working)
+{
+ ErtsRunQueue *rq;
+ int worked;
+ Uint64 swt, lwt, last;
+
+ rq = esdp->run_queue;
+ last = aschedtime_read(&rq->sched_util.last);
+
+ if (now <= last) {
+ ASSERT(last == ERTS_SCHED_AVG_UTIL_WRITE_MARKER);
+ return;
+ }
+
+ ASSERT(now >= last);
+
+ worked = rq->sched_util.is_working;
+
+ swt = calc_sched_worktime(worked, now, last, ERTS_SCHED_UTIL_SHORT_INTERVAL,
+ rq->sched_util.worktime.short_interval);
+ lwt = calc_sched_worktime(worked, now, last, ERTS_SCHED_UTIL_LONG_INTERVAL,
+ rq->sched_util.worktime.long_interval);
+
+ aschedtime_set(&rq->sched_util.last, ERTS_SCHED_AVG_UTIL_WRITE_MARKER);
+ ERTS_THR_WRITE_MEMORY_BARRIER;
+ rq->sched_util.is_working = is_working;
+ rq->sched_util.worktime.short_interval = swt;
+ rq->sched_util.worktime.long_interval = lwt;
+ ERTS_THR_WRITE_MEMORY_BARRIER;
+ aschedtime_set(&rq->sched_util.last, now);
+}
+
+int
+erts_get_sched_util(ErtsRunQueue *rq, int initially_locked, int short_interval)
+{
+ /* Average scheduler utilization in ppm */
+ int util, is_working, try = 0, locked = initially_locked;
+ Uint64 worktime, old_worktime, now, last, interval, *old_worktimep;
+
+ if (short_interval) {
+ old_worktimep = &rq->sched_util.worktime.short_interval;
+ interval = ERTS_SCHED_UTIL_SHORT_INTERVAL;
+ }
+ else {
+ old_worktimep = &rq->sched_util.worktime.long_interval;
+ interval = ERTS_SCHED_UTIL_LONG_INTERVAL;
+ }
+
+ while (1) {
+ Uint64 chk_last;
+ last = aschedtime_read(&rq->sched_util.last);
+ ERTS_THR_READ_MEMORY_BARRIER;
+ is_working = rq->sched_util.is_working;
+ old_worktime = *old_worktimep;
+ ERTS_THR_READ_MEMORY_BARRIER;
+ chk_last = aschedtime_read(&rq->sched_util.last);
+ if (chk_last == last)
+ break;
+ if (!locked) {
+ if (++try >= ERTS_GET_AVG_MAX_UNLOCKED_TRY) {
+ /* Writer will eventually block on runq-lock */
+ erts_smp_runq_lock(rq);
+ locked = 1;
+ }
+ }
+ }
+
+ if (!initially_locked && locked)
+ erts_smp_runq_unlock(rq);
+
+ now = sched_wall_time_ts();
+ worktime = calc_sched_worktime(is_working, now, last, interval, old_worktime);
+
+ util = (int) ((worktime * 1000000)/interval);
+
+ ASSERT(0 <= util && util <= 1000000);
+
+ return util;
+}
+
+static void
+init_runq_sched_util(ErtsRunQueueSchedUtil *rqsu, int enabled)
+{
+ aschedtime_init(&rqsu->last);
+ if (!enabled)
+ aschedtime_set(&rqsu->last, ERTS_SCHED_AVG_UTIL_WRITE_MARKER);
+ rqsu->is_working = 0;
+ rqsu->worktime.short_interval = (Uint64) 0;
+ rqsu->worktime.long_interval = (Uint64) 0;
+
+#ifdef DEBUG
+ {
+ Uint64 intrvl;
+ /*
+ * If one of these asserts fail we may have
+ * overflow in calc_sched_worktime(). Which
+ * have to be fixed either by shrinking
+ * interval size, or fix calculation of
+ * worktime in calc_sched_worktime().
+ */
+ intrvl = ERTS_SCHED_UTIL_SHORT_INTERVAL;
+ ASSERT(intrvl*(intrvl/1000) > intrvl);
+ intrvl = ERTS_SCHED_UTIL_LONG_INTERVAL;
+ ASSERT(intrvl*(intrvl/1000) > intrvl);
+ }
+#endif
+}
+
+#endif /* ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT */
+
static ERTS_INLINE void
sched_wall_time_change(ErtsSchedulerData *esdp, int working)
{
- if (esdp->sched_wall_time.enabled) {
+ if (esdp->sched_wall_time.need) {
Uint64 ts = sched_wall_time_ts();
- if (working) {
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ update_avg_sched_util(esdp, ts, working);
+#endif
+ if (esdp->sched_wall_time.enabled) {
+ if (working) {
#ifdef DEBUG
- ASSERT(!esdp->sched_wall_time.working.currently);
- esdp->sched_wall_time.working.currently = 1;
+ ASSERT(!esdp->sched_wall_time.working.currently);
+ esdp->sched_wall_time.working.currently = 1;
#endif
- ts -= esdp->sched_wall_time.start;
- esdp->sched_wall_time.working.start = ts;
- }
- else {
+ ts -= esdp->sched_wall_time.start;
+ esdp->sched_wall_time.working.start = ts;
+ }
+ else {
#ifdef DEBUG
- ASSERT(esdp->sched_wall_time.working.currently);
- esdp->sched_wall_time.working.currently = 0;
+ ASSERT(esdp->sched_wall_time.working.currently);
+ esdp->sched_wall_time.working.currently = 0;
#endif
- ts -= esdp->sched_wall_time.start;
- ts -= esdp->sched_wall_time.working.start;
- esdp->sched_wall_time.working.total += ts;
+ ts -= esdp->sched_wall_time.start;
+ ts -= esdp->sched_wall_time.working.start;
+ esdp->sched_wall_time.working.total += ts;
+ }
}
}
}
@@ -703,12 +963,17 @@ reply_sched_wall_time(void *vswtrp)
ErlHeapFragment *bp = NULL;
ASSERT(esdp);
-
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp));
+#endif
if (swtrp->set) {
- if (!swtrp->enable && esdp->sched_wall_time.enabled)
+ if (!swtrp->enable && esdp->sched_wall_time.enabled) {
+ esdp->sched_wall_time.need = erts_sched_balance_util;
esdp->sched_wall_time.enabled = 0;
+ }
else if (swtrp->enable && !esdp->sched_wall_time.enabled) {
Uint64 ts = sched_wall_time_ts();
+ esdp->sched_wall_time.need = 1;
esdp->sched_wall_time.enabled = 1;
esdp->sched_wall_time.start = ts;
esdp->sched_wall_time.working.total = 0;
@@ -784,6 +1049,9 @@ erts_sched_wall_time_request(Process *c_p, int set, int enable)
if (!set && !esdp->sched_wall_time.enabled)
return THE_NON_VALUE;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp));
+#endif
swtrp = swtreq_alloc();
ref = erts_make_ref(c_p);
@@ -1261,6 +1529,9 @@ erts_schedule_multi_misc_aux_work(int ignore_self,
if (ignore_self) {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp));
+#endif
if (esdp)
self = (int) esdp->no;
}
@@ -1370,7 +1641,7 @@ void
erts_alloc_notify_delayed_dealloc(int ix)
{
ErtsSchedulerData *esdp = erts_get_scheduler_data();
- if (esdp)
+ if (esdp && !ERTS_SCHEDULER_IS_DIRTY(esdp))
schedule_aux_work_wakeup(&esdp->aux_work_data,
ix,
ERTS_SSI_AUX_WORK_DD);
@@ -1384,6 +1655,10 @@ erts_alloc_ensure_handle_delayed_dealloc_call(int ix)
{
#ifdef DEBUG
ErtsSchedulerData *esdp = erts_get_scheduler_data();
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (esdp && ERTS_SCHEDULER_IS_DIRTY(esdp))
+ return;
+#endif
ASSERT(!esdp || ix == (int) esdp->no);
#endif
set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(ix-1),
@@ -1989,6 +2264,9 @@ static ERTS_INLINE void
sched_active_sys(Uint no, ErtsRunQueue *rq)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_RUNQ_IX_IS_DIRTY(rq->ix));
+#endif
ASSERT(rq->waiting < 0);
rq->waiting *= -1;
rq->waiting--;
@@ -2045,6 +2323,9 @@ static ERTS_INLINE void
sched_change_waiting_sys_to_waiting(Uint no, ErtsRunQueue *rq)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_RUNQ_IX_IS_DIRTY(rq->ix));
+#endif
ASSERT(rq->waiting < 0);
rq->waiting *= -1;
}
@@ -2060,7 +2341,7 @@ sched_waiting(Uint no, ErtsRunQueue *rq)
else
rq->waiting++;
rq->woken = 0;
- if (erts_system_profile_flags.scheduler)
+ if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix) && erts_system_profile_flags.scheduler)
profile_scheduler(make_small(no), am_inactive);
}
@@ -2072,7 +2353,7 @@ sched_active(Uint no, ErtsRunQueue *rq)
rq->waiting++;
else
rq->waiting--;
- if (erts_system_profile_flags.scheduler)
+ if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix) && erts_system_profile_flags.scheduler)
profile_scheduler(make_small(no), am_active);
}
@@ -2084,10 +2365,9 @@ ongoing_multi_scheduling_block(void)
}
static ERTS_INLINE void
-empty_runq(ErtsRunQueue *rq)
+empty_runq_aux(ErtsRunQueue *rq, Uint32 old_flags)
{
- Uint32 old_flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED);
- if (old_flags & ERTS_RUNQ_FLG_NONEMPTY) {
+ if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix) && old_flags & ERTS_RUNQ_FLG_NONEMPTY) {
#ifdef DEBUG
erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues);
/*
@@ -2107,10 +2387,27 @@ empty_runq(ErtsRunQueue *rq)
}
static ERTS_INLINE void
+empty_runq(ErtsRunQueue *rq)
+{
+ Uint32 old_flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED);
+ empty_runq_aux(rq, old_flags);
+}
+
+static ERTS_INLINE Uint32
+empty_protected_runq(ErtsRunQueue *rq)
+{
+ Uint32 old_flags = ERTS_RUNQ_FLGS_BSET(rq,
+ ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED,
+ ERTS_RUNQ_FLG_PROTECTED);
+ empty_runq_aux(rq, old_flags);
+ return old_flags;
+}
+
+static ERTS_INLINE void
non_empty_runq(ErtsRunQueue *rq)
{
Uint32 old_flags = ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_NONEMPTY);
- if (!(old_flags & ERTS_RUNQ_FLG_NONEMPTY)) {
+ if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix) && (!(old_flags & ERTS_RUNQ_FLG_NONEMPTY))) {
#ifdef DEBUG
erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues);
/*
@@ -2130,6 +2427,18 @@ non_empty_runq(ErtsRunQueue *rq)
}
}
+void
+erts_empty_runq(ErtsRunQueue *rq)
+{
+ empty_runq(rq);
+}
+
+void
+erts_non_empty_runq(ErtsRunQueue *rq)
+{
+ non_empty_runq(rq);
+}
+
static erts_aint32_t
sched_prep_spin_wait(ErtsSchedulerSleepInfo *ssi)
{
@@ -2343,18 +2652,37 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix))
+ erts_smp_spin_lock(&rq->sleepers.lock);
+#endif
flgs = sched_prep_spin_wait(ssi);
if (flgs & ERTS_SSI_FLG_SUSPENDED) {
/* Go suspend instead... */
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix))
+ erts_smp_spin_unlock(&rq->sleepers.lock);
+#endif
return;
}
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix)) {
+ ssi->prev = NULL;
+ ssi->next = rq->sleepers.list;
+ if (rq->sleepers.list)
+ rq->sleepers.list->prev = ssi;
+ rq->sleepers.list = ssi;
+ erts_smp_spin_unlock(&rq->sleepers.lock);
+ }
+#endif
+
/*
* If all schedulers are waiting, one of them *should*
* be waiting in erl_sys_schedule()
*/
- if (!prepare_for_sys_schedule()) {
+ if (ERTS_SCHEDULER_IS_DIRTY(esdp) || !prepare_for_sys_schedule()) {
sched_waiting(esdp->no, rq);
@@ -2364,12 +2692,13 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
tse_wait:
- if (thr_prgr_active != working)
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && thr_prgr_active != working)
sched_wall_time_change(esdp, thr_prgr_active);
while (1) {
- aux_work = erts_atomic32_read_acqb(&ssi->aux_work);
+ aux_work = ERTS_SCHEDULER_IS_DIRTY(esdp) ? 0 :
+ erts_atomic32_read_acqb(&ssi->aux_work);
if (aux_work) {
if (!thr_prgr_active) {
erts_thr_progress_active(esdp, thr_prgr_active = 1);
@@ -2383,11 +2712,13 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
if (aux_work)
flgs = erts_smp_atomic32_read_acqb(&ssi->flags);
else {
- if (thr_prgr_active) {
- erts_thr_progress_active(esdp, thr_prgr_active = 0);
- sched_wall_time_change(esdp, 0);
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) {
+ if (thr_prgr_active) {
+ erts_thr_progress_active(esdp, thr_prgr_active = 0);
+ sched_wall_time_change(esdp, 0);
+ }
+ erts_thr_progress_prepare_wait(esdp);
}
- erts_thr_progress_prepare_wait(esdp);
flgs = sched_spin_wait(ssi, spincount);
if (flgs & ERTS_SSI_FLG_SLEEPING) {
@@ -2402,7 +2733,8 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
} while (res == EINTR);
}
}
- erts_thr_progress_finalize_wait(esdp);
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp))
+ erts_thr_progress_finalize_wait(esdp);
}
if (!(flgs & ERTS_SSI_FLG_WAITING)) {
@@ -2423,7 +2755,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
if (flgs & ~ERTS_SSI_FLG_SUSPENDED)
erts_smp_atomic32_read_band_nob(&ssi->flags, ERTS_SSI_FLG_SUSPENDED);
- if (!thr_prgr_active) {
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && !thr_prgr_active) {
erts_thr_progress_active(esdp, thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
@@ -2440,6 +2772,9 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
erts_smp_atomic32_set_relb(&function_calls, 0);
*fcalls = 0;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp));
+#endif
sched_waiting_sys(esdp->no, rq);
@@ -2632,7 +2967,7 @@ ssi_flags_set_wake(ErtsSchedulerSleepInfo *ssi)
}
static void
-wake_scheduler(ErtsRunQueue *rq, int incq)
+wake_scheduler(ErtsRunQueue *rq)
{
ErtsSchedulerSleepInfo *ssi;
erts_aint32_t flgs;
@@ -2651,10 +2986,35 @@ wake_scheduler(ErtsRunQueue *rq, int incq)
flgs = ssi_flags_set_wake(ssi);
erts_sched_finish_poke(ssi, flgs);
+}
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+static void
+wake_dirty_scheduler(ErtsRunQueue *rq)
+{
+ ErtsSchedulerSleepInfo *ssi;
+ ErtsSchedulerSleepList *sl;
- if (incq && (flgs & ERTS_SSI_FLG_WAITING))
- non_empty_runq(rq);
+ ASSERT(ERTS_RUNQ_IX_IS_DIRTY(rq->ix));
+
+ sl = &rq->sleepers;
+ erts_smp_spin_lock(&sl->lock);
+ ssi = sl->list;
+ if (!ssi)
+ erts_smp_spin_unlock(&sl->lock);
+ else {
+ sl->list = NULL;
+ erts_smp_spin_unlock(&sl->lock);
+
+ ERTS_THR_MEMORY_BARRIER;
+ do {
+ ErtsSchedulerSleepInfo *wake_ssi = ssi;
+ ssi = ssi->next;
+ erts_sched_finish_poke(wake_ssi, ssi_flags_set_wake(wake_ssi));
+ } while (ssi);
+ }
}
+#endif
#define ERTS_NO_USED_RUNQS_SHIFT 16
#define ERTS_NO_RUNQS_MASK 0xffff
@@ -2744,7 +3104,7 @@ chk_wake_sched(ErtsRunQueue *crq, int ix, int activate)
if (try_inc_no_active_runqs(ix+1))
(void) ERTS_RUNQ_FLGS_UNSET(wrq, ERTS_RUNQ_FLG_INACTIVE);
}
- wake_scheduler(wrq, 0);
+ wake_scheduler(wrq);
return 1;
}
return 0;
@@ -2791,8 +3151,14 @@ static ERTS_INLINE void
smp_notify_inc_runq(ErtsRunQueue *runq)
{
#ifdef ERTS_SMP
- if (runq)
- wake_scheduler(runq, 1);
+ if (runq) {
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(runq->ix))
+ wake_dirty_scheduler(runq);
+ else
+#endif
+ wake_scheduler(runq);
+ }
#endif
}
@@ -2810,7 +3176,7 @@ erts_sched_notify_check_cpu_bind(void)
for (ix = 0; ix < erts_no_run_queues; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
(void) ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_CHK_CPU_BIND);
- wake_scheduler(rq, 0);
+ wake_scheduler(rq);
}
#else
erts_sched_check_cpu_bind(erts_get_scheduler_data());
@@ -2938,6 +3304,11 @@ check_immigration_need(ErtsRunQueue *c_rq, ErtsMigrationPath *mp, int prio)
if (!f_rq)
return NULL;
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (mp->sched_util)
+ return NULL;
+#endif
+
f_rq_flags = ERTS_RUNQ_FLGS_GET(f_rq);
if (f_rq_flags & ERTS_RUNQ_FLG_PROTECTED)
return NULL;
@@ -3077,7 +3448,7 @@ suspend_run_queue(ErtsRunQueue *rq)
ERTS_SSI_FLG_SUSPENDED);
(void) ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_SUSPENDED);
- wake_scheduler(rq, 0);
+ wake_scheduler(rq);
}
static void scheduler_ix_resume_wake(Uint ix);
@@ -3169,6 +3540,9 @@ evacuate_run_queue(ErtsRunQueue *rq,
to_rq->misc.start = start;
to_rq->misc.end = end;
+
+ non_empty_runq(to_rq);
+
erts_smp_runq_unlock(to_rq);
smp_notify_inc_runq(to_rq);
erts_smp_runq_lock(to_rq);
@@ -3381,7 +3755,7 @@ try_steal_task(ErtsRunQueue *rq)
Uint32 flags;
/* Protect jobs we steal from getting stolen from us... */
- flags = ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_PROTECTED);
+ flags = empty_protected_runq(rq);
if (flags & ERTS_RUNQ_FLG_SUSPENDED)
return 0; /* go suspend instead... */
@@ -3460,6 +3834,9 @@ typedef struct {
int full_reds_history_change;
int oowc;
int max_len;
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ int sched_util;
+#endif
} ErtsRunQueueBalance;
static ErtsRunQueueBalance *run_queue_info;
@@ -3623,6 +4000,9 @@ check_balance(ErtsRunQueue *c_rq)
Sint64 scheds_reds, full_scheds_reds;
int forced, active, current_active, oowc, half_full_scheds, full_scheds,
mmax_len, blnc_no_rqs, qix, pix, freds_hist_ix;
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ int sched_util_balancing;
+#endif
if (erts_smp_atomic32_xchg_nob(&balance_info.checking_balance, 1)) {
c_rq->check_balance_reds = INT_MAX;
@@ -3678,6 +4058,10 @@ check_balance(ErtsRunQueue *c_rq)
return;
}
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ sched_util_balancing = 0;
+#endif
+
freds_hist_ix = balance_info.full_reds_history_index;
balance_info.full_reds_history_index++;
if (balance_info.full_reds_history_index >= ERTS_FULL_REDS_HISTORY_SIZE)
@@ -3708,7 +4092,12 @@ check_balance(ErtsRunQueue *c_rq)
run_queue_info[qix].oowc = rq->out_of_work_count;
run_queue_info[qix].max_len = rq->max_len;
rq->check_balance_reds = INT_MAX;
-
+
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (erts_sched_balance_util)
+ run_queue_info[qix].sched_util = erts_get_sched_util(rq, 1, 0);
+#endif
+
erts_smp_runq_unlock(rq);
}
@@ -3778,8 +4167,38 @@ check_balance(ErtsRunQueue *c_rq)
mmax_len = run_queue_info[qix].max_len;
}
- if (!erts_sched_compact_load)
+ if (!erts_sched_compact_load) {
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (erts_sched_balance_util && full_scheds < blnc_no_rqs) {
+ int avg_util = 0;
+
+ for (qix = 0; qix < blnc_no_rqs; qix++)
+ avg_util += run_queue_info[qix].sched_util;
+
+ avg_util /= blnc_no_rqs; /* in ppm */
+
+ sched_util_balancing = 1;
+ /*
+ * In order to avoid renaming a large amount of fields
+ * we write utilization values instead of lenght values
+ * in the 'max_len' and 'migration_limit' fields...
+ */
+ for (qix = 0; qix < blnc_no_rqs; qix++) {
+ run_queue_info[qix].flags = 0; /* Reset for later use... */
+ for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
+ run_queue_info[qix].prio[pix].emigrate_to = -1;
+ run_queue_info[qix].prio[pix].immigrate_from = -1;
+ run_queue_info[qix].prio[pix].avail = 100;
+ run_queue_info[qix].prio[pix].max_len = run_queue_info[qix].sched_util;
+ run_queue_info[qix].prio[pix].migration_limit = avg_util;
+ }
+ }
+ active = blnc_no_rqs;
+ goto setup_migration_paths;
+ }
+#endif
goto all_active;
+ }
if (!forced && half_full_scheds != blnc_no_rqs) {
int min = 1;
@@ -3896,15 +4315,30 @@ check_balance(ErtsRunQueue *c_rq)
}
}
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ setup_migration_paths:
+#endif
+
/* Setup migration paths for all priorities */
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
int low = 0, high = 0;
for (qix = 0; qix < blnc_no_rqs; qix++) {
int len_diff = run_queue_info[qix].prio[pix].max_len;
len_diff -= run_queue_info[qix].prio[pix].migration_limit;
+
#ifdef DBG_PRINT
if (pix == 2) erts_fprintf(stderr, "%d ", len_diff);
#endif
+
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (sched_util_balancing
+ && -ERTS_SCHED_UTIL_IGNORE_IMBALANCE_DIFF <= len_diff
+ && len_diff <= ERTS_SCHED_UTIL_IGNORE_IMBALANCE_DIFF) {
+ /* ignore minor imbalance */
+ len_diff = 0;
+ }
+#endif
+
run_queue_compare[qix].qix = qix;
run_queue_compare[qix].len = len_diff;
if (len_diff != 0) {
@@ -4031,6 +4465,9 @@ erts_fprintf(stderr, "--------------------------------\n");
Uint32 flags = run_queue_info[qix].flags;
ErtsMigrationPath *mp = &new_mpaths->mpath[qix];
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ mp->sched_util = sched_util_balancing;
+#endif
mp->flags = flags;
mp->misc_evac_runq = NULL;
@@ -4572,7 +5009,10 @@ erts_sched_set_wake_cleanup_threshold(char *str)
static void
init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp, char *dawwp)
{
- awdp->sched_id = esdp ? (int) esdp->no : 0;
+ if (!esdp || ERTS_SCHEDULER_IS_DIRTY(esdp))
+ awdp->sched_id = 0;
+ else
+ awdp->sched_id = (int) esdp->no;
awdp->esdp = esdp;
awdp->ssi = esdp ? esdp->ssi : NULL;
#ifdef ERTS_SMP
@@ -4612,14 +5052,71 @@ init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp, char *dawwp)
#endif
}
+static void
+init_scheduler_data(ErtsSchedulerData* esdp, int num,
+ ErtsSchedulerSleepInfo* ssi,
+ ErtsRunQueue* runq,
+ char** daww_ptr, size_t daww_sz)
+{
+#ifdef ERTS_SMP
+ erts_bits_init_state(&esdp->erl_bits_state);
+ esdp->match_pseudo_process = NULL;
+ esdp->free_process = NULL;
+#endif
+ esdp->x_reg_array =
+ erts_alloc_permanent_cache_aligned(ERTS_ALC_T_BEAM_REGISTER,
+ ERTS_X_REGS_ALLOCATED *
+ sizeof(Eterm));
+ esdp->f_reg_array =
+ erts_alloc_permanent_cache_aligned(ERTS_ALC_T_BEAM_REGISTER,
+ MAX_REG * sizeof(FloatDef));
+#if !HEAP_ON_C_STACK
+ esdp->num_tmp_heap_used = 0;
+#endif
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(runq->ix)) {
+ esdp->no = 0;
+ esdp->dirty_no = (Uint) num;
+ }
+ else {
+ esdp->no = (Uint) num;
+ esdp->dirty_no = 0;
+ }
+#else
+ esdp->no = (Uint) num;
+#endif
+ esdp->ssi = ssi;
+ esdp->current_process = NULL;
+ esdp->current_port = NULL;
+
+ esdp->virtual_reds = 0;
+ esdp->cpu_id = -1;
+
+ erts_init_atom_cache_map(&esdp->atom_cache_map);
+
+ esdp->run_queue = runq;
+ esdp->run_queue->scheduler = esdp;
+
+ if (daww_ptr) {
+ init_aux_work_data(&esdp->aux_work_data, esdp, *daww_ptr);
+#ifdef ERTS_SMP
+ *daww_ptr += daww_sz;
+#endif
+ }
+
+ esdp->reductions = 0;
+
+ init_sched_wall_time(&esdp->sched_wall_time);
+ erts_port_task_handle_init(&esdp->nosuspend_port_task_handle);
+}
+
void
erts_init_scheduling(int no_schedulers, int no_schedulers_online)
{
int ix, n, no_ssi;
char *daww_ptr;
-#ifdef ERTS_SMP
size_t daww_sz;
-#endif
+ size_t size_runqs;
init_misc_op_list_alloc();
init_proc_sys_task_queues_alloc();
@@ -4628,6 +5125,11 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
set_wakeup_other_data();
#endif
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (erts_sched_balance_util)
+ erts_sched_compact_load = 0;
+#endif
+
ASSERT(no_schedulers_online <= no_schedulers);
ASSERT(no_schedulers_online >= 1);
ASSERT(no_schedulers >= 1);
@@ -4635,19 +5137,26 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
/* Create and initialize run queues */
n = no_schedulers;
-
- erts_aligned_run_queues =
- erts_alloc_permanent_cache_aligned(ERTS_ALC_T_RUNQS,
- sizeof(ErtsAlignedRunQueue) * n);
+ size_runqs = sizeof(ErtsAlignedRunQueue) * (n + ERTS_NUM_DIRTY_RUNQS);
+ erts_aligned_run_queues =
+ erts_alloc_permanent_cache_aligned(ERTS_ALC_T_RUNQS, size_runqs);
#ifdef ERTS_SMP
+#ifdef ERTS_DIRTY_SCHEDULERS
+ erts_aligned_run_queues += ERTS_NUM_DIRTY_RUNQS;
+#endif
erts_smp_atomic32_init_nob(&no_empty_run_queues, 0);
#endif
erts_no_run_queues = n;
- for (ix = 0; ix < n; ix++) {
+ for (ix = -(ERTS_NUM_DIRTY_RUNQS); ix < n; ix++) {
int pix, rix;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ErtsRunQueue *rq = ERTS_RUNQ_IX_IS_DIRTY(ix) ?
+ ERTS_DIRTY_RUNQ_IX(ix) : ERTS_RUNQ_IX(ix);
+#else
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
+#endif
rq->ix = ix;
@@ -4658,6 +5167,15 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
erts_smp_mtx_init_x(&rq->mtx, "run_queue", make_small(ix + 1));
erts_smp_cnd_init(&rq->cnd);
+#ifdef ERTS_DIRTY_SCHEDULERS
+#ifdef ERTS_SMP
+ if (ERTS_RUNQ_IX_IS_DIRTY(ix)) {
+ erts_smp_spinlock_init(&rq->sleepers.lock, "dirty_run_queue_sleep_list");
+ rq->sleepers.list = NULL;
+ }
+#endif
+#endif
+
rq->waiting = 0;
rq->woken = 0;
ERTS_RUNQ_FLGS_INIT(rq, ERTS_RUNQ_FLG_NONEMPTY);
@@ -4696,6 +5214,11 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
rq->ports.info.reds = 0;
rq->ports.start = NULL;
rq->ports.end = NULL;
+
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ init_runq_sched_util(&rq->sched_util, erts_sched_balance_util);
+#endif
+
}
#ifdef ERTS_SMP
@@ -4739,6 +5262,29 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
#ifdef ERTS_SMP
aligned_sched_sleep_info++;
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+ aligned_dirty_cpu_sched_sleep_info =
+ erts_alloc_permanent_cache_aligned(
+ ERTS_ALC_T_SCHDLR_SLP_INFO,
+ erts_no_dirty_cpu_schedulers*sizeof(ErtsAlignedSchedulerSleepInfo));
+ for (ix = 0; ix < erts_no_dirty_cpu_schedulers; ix++) {
+ ErtsSchedulerSleepInfo *ssi = &aligned_dirty_cpu_sched_sleep_info[ix].ssi;
+ erts_smp_atomic32_init_nob(&ssi->flags, 0);
+ ssi->event = NULL; /* initialized in sched_thread_func */
+ erts_atomic32_init_nob(&ssi->aux_work, 0);
+ }
+ aligned_dirty_io_sched_sleep_info =
+ erts_alloc_permanent_cache_aligned(
+ ERTS_ALC_T_SCHDLR_SLP_INFO,
+ erts_no_dirty_io_schedulers*sizeof(ErtsAlignedSchedulerSleepInfo));
+ for (ix = 0; ix < erts_no_dirty_io_schedulers; ix++) {
+ ErtsSchedulerSleepInfo *ssi = &aligned_dirty_io_sched_sleep_info[ix].ssi;
+ erts_smp_atomic32_init_nob(&ssi->flags, 0);
+ ssi->event = NULL; /* initialized in sched_thread_func */
+ erts_atomic32_init_nob(&ssi->aux_work, 0);
+ }
+#endif
#endif
/* Create and initialize scheduler specific data */
@@ -4749,6 +5295,7 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
daww_ptr = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_SCHDLR_DATA,
daww_sz*n);
#else
+ daww_sz = 0;
daww_ptr = NULL;
#endif
@@ -4758,45 +5305,32 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
for (ix = 0; ix < n; ix++) {
ErtsSchedulerData *esdp = ERTS_SCHEDULER_IX(ix);
-#ifdef ERTS_SMP
- erts_bits_init_state(&esdp->erl_bits_state);
- esdp->match_pseudo_process = NULL;
- esdp->free_process = NULL;
-#endif
- esdp->x_reg_array =
- erts_alloc_permanent_cache_aligned(ERTS_ALC_T_BEAM_REGISTER,
- ERTS_X_REGS_ALLOCATED *
- sizeof(Eterm));
- esdp->f_reg_array =
- erts_alloc_permanent_cache_aligned(ERTS_ALC_T_BEAM_REGISTER,
- MAX_REG * sizeof(FloatDef));
-#if !HEAP_ON_C_STACK
- esdp->num_tmp_heap_used = 0;
-#endif
- esdp->no = (Uint) ix+1;
- esdp->ssi = ERTS_SCHED_SLEEP_INFO_IX(ix);
- esdp->current_process = NULL;
- esdp->current_port = NULL;
-
- esdp->virtual_reds = 0;
- esdp->cpu_id = -1;
-
- erts_init_atom_cache_map(&esdp->atom_cache_map);
-
- esdp->run_queue = ERTS_RUNQ_IX(ix);
- esdp->run_queue->scheduler = esdp;
+ init_scheduler_data(esdp, ix+1, ERTS_SCHED_SLEEP_INFO_IX(ix),
+ ERTS_RUNQ_IX(ix), &daww_ptr, daww_sz);
+ }
- init_aux_work_data(&esdp->aux_work_data, esdp, daww_ptr);
+#ifdef ERTS_DIRTY_SCHEDULERS
#ifdef ERTS_SMP
- daww_ptr += daww_sz;
-#endif
-
- esdp->reductions = 0;
-
- init_sched_wall_time(&esdp->sched_wall_time);
- erts_port_task_handle_init(&esdp->nosuspend_port_task_handle);
-
+ erts_aligned_dirty_cpu_scheduler_data =
+ erts_alloc_permanent_cache_aligned(
+ ERTS_ALC_T_SCHDLR_DATA,
+ erts_no_dirty_cpu_schedulers*sizeof(ErtsAlignedSchedulerData));
+ for (ix = 0; ix < erts_no_dirty_cpu_schedulers; ix++) {
+ ErtsSchedulerData *esdp = ERTS_DIRTY_CPU_SCHEDULER_IX(ix);
+ init_scheduler_data(esdp, ix+1, ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(ix),
+ ERTS_DIRTY_CPU_RUNQ, NULL, 0);
+ }
+ erts_aligned_dirty_io_scheduler_data =
+ erts_alloc_permanent_cache_aligned(
+ ERTS_ALC_T_SCHDLR_DATA,
+ erts_no_dirty_io_schedulers*sizeof(ErtsAlignedSchedulerData));
+ for (ix = 0; ix < erts_no_dirty_io_schedulers; ix++) {
+ ErtsSchedulerData *esdp = ERTS_DIRTY_IO_SCHEDULER_IX(ix);
+ init_scheduler_data(esdp, ix+1, ERTS_DIRTY_IO_SCHED_SLEEP_INFO_IX(ix),
+ ERTS_DIRTY_IO_RUNQ, NULL, 0);
}
+#endif
+#endif
init_misc_aux_work();
#if !HALFWORD_HEAP
@@ -4860,6 +5394,10 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
#endif
}
erts_no_schedulers = 1;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ erts_no_dirty_cpu_schedulers = 0;
+ erts_no_dirty_io_schedulers = 0;
+#endif
#endif
erts_smp_atomic32_init_nob(&function_calls, 0);
@@ -4972,28 +5510,39 @@ check_enqueue_in_prio_queue(erts_aint32_t *prq_prio_p,
*prq_prio_p = aprio;
- max_qbit = (actual >> ERTS_PSFLGS_IN_PRQ_MASK_OFFSET) & ERTS_PSFLGS_QMASK;
- max_qbit |= 1 << ERTS_PSFLGS_QMASK_BITS;
- max_qbit &= -max_qbit;
- /*
- * max_qbit now either contain bit set for highest prio queue or a bit
- * out of range (which will have a value larger than valid range).
- */
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (!(actual & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC))) {
+#endif
+ max_qbit = (actual >> ERTS_PSFLGS_IN_PRQ_MASK_OFFSET) & ERTS_PSFLGS_QMASK;
+ max_qbit |= 1 << ERTS_PSFLGS_QMASK_BITS;
+ max_qbit &= -max_qbit;
+ /*
+ * max_qbit now either contain bit set for highest prio queue or a bit
+ * out of range (which will have a value larger than valid range).
+ */
- if (qbit >= max_qbit)
- return 0; /* Already queued in higher or equal prio */
+ if (qbit >= max_qbit)
+ return 0; /* Already queued in higher or equal prio */
- /* Need to enqueue (if already enqueued, it is in lower prio) */
- *newp |= qbit << ERTS_PSFLGS_IN_PRQ_MASK_OFFSET;
+ /* Need to enqueue (if already enqueued, it is in lower prio) */
+ *newp |= qbit << ERTS_PSFLGS_IN_PRQ_MASK_OFFSET;
- if ((actual & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLGS_USR_PRIO_MASK))
- != (aprio << ERTS_PSFLGS_USR_PRIO_OFFSET)) {
- /*
- * Process struct already enqueued, or actual prio not
- * equal to user prio, i.e., enqueue using proxy.
- */
- return -1;
+ if ((actual & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLGS_USR_PRIO_MASK))
+ != (aprio << ERTS_PSFLGS_USR_PRIO_OFFSET)) {
+ /*
+ * Process struct already enqueued, or actual prio not
+ * equal to user prio, i.e., enqueue using proxy.
+ */
+ return -1;
+ }
+#ifdef ERTS_DIRTY_SCHEDULERS
+ } else {
+ if (actual & ERTS_PSFLG_DIRTY_CPU_PROC)
+ *newp |= ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q;
+ else
+ *newp |= ERTS_PSFLG_DIRTY_IO_PROC_IN_Q;
}
+#endif
/*
* Enqueue using process struct.
@@ -5049,10 +5598,21 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Proces
}
else {
Process *sched_p;
- ErtsRunQueue *runq = erts_get_runq_proc(p);
+ ErtsRunQueue *runq;
ASSERT(!(n & ERTS_PSFLG_SUSPENDED) || (n & ERTS_PSFLG_ACTIVE_SYS));
+#ifdef ERTS_DIRTY_SCHEDULERS
+#ifdef ERTS_SMP
+ if (ERTS_PSFLG_DIRTY_CPU_PROC & a)
+ runq = ERTS_DIRTY_CPU_RUNQ;
+ else if (ERTS_PSFLG_DIRTY_IO_PROC & a)
+ runq = ERTS_DIRTY_IO_RUNQ;
+ else
+#endif
+#endif
+ runq = erts_get_runq_proc(p);
+
if (enqueue < 0)
sched_p = make_proxy_proc(proxy, p, enq_prio);
else {
@@ -5062,7 +5622,11 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Proces
}
#ifdef ERTS_SMP
- if (!(ERTS_PSFLG_BOUND & n)) {
+ if (!(ERTS_PSFLG_BOUND & n)
+#ifdef ERTS_DIRTY_SCHEDULERS
+ && !(n & (ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q|ERTS_PSFLG_DIRTY_IO_PROC_IN_Q))
+#endif
+ ) {
ErtsRunQueue *new_runq = erts_check_emigration_need(runq, enq_prio);
if (new_runq) {
RUNQ_SET_RQ(&sched_p->run_queue, new_runq);
@@ -5489,6 +6053,9 @@ suspend_scheduler(ErtsSchedulerData *esdp)
* Regardless of why a scheduler is suspended, it ends up here.
*/
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp));
+#endif
ASSERT(no != 1);
evacuate_run_queue(esdp->run_queue, &sbp);
@@ -5656,22 +6223,44 @@ ErtsSchedSuspendResult
erts_schedulers_state(Uint *total,
Uint *online,
Uint *active,
+ Uint *dirty_cpu,
+ Uint *dirty_cpu_online,
+ Uint *dirty_io,
int yield_allowed)
{
- int res;
+ int res = ERTS_SCHDLR_SSPND_EINVAL;
erts_aint32_t changing;
- erts_smp_mtx_lock(&schdlr_sspnd.mtx);
- changing = erts_smp_atomic32_read_nob(&schdlr_sspnd.changing);
- if (yield_allowed && (changing & ~ERTS_SCHDLR_SSPND_CHNG_WAITER))
- res = ERTS_SCHDLR_SSPND_YIELD_RESTART;
- else {
- *active = *online = schdlr_sspnd.online;
- if (ongoing_multi_scheduling_block())
- *active = 1;
- res = ERTS_SCHDLR_SSPND_DONE;
+ if (total) {
+ ASSERT(online);
+ ASSERT(active);
+ erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+ changing = erts_smp_atomic32_read_nob(&schdlr_sspnd.changing);
+ if (yield_allowed && (changing & ~ERTS_SCHDLR_SSPND_CHNG_WAITER))
+ res = ERTS_SCHDLR_SSPND_YIELD_RESTART;
+ else {
+ *active = *online = schdlr_sspnd.online;
+ if (ongoing_multi_scheduling_block())
+ *active = 1;
+ res = ERTS_SCHDLR_SSPND_DONE;
+ }
+ erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
+ *total = erts_no_schedulers;
}
- erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
- *total = erts_no_schedulers;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (dirty_cpu)
+ *dirty_cpu = erts_no_dirty_cpu_schedulers;
+ if (dirty_cpu_online)
+ *dirty_cpu_online = erts_no_dirty_cpu_schedulers_online;
+ if (dirty_io)
+ *dirty_io = erts_no_dirty_io_schedulers;
+#else
+ if (dirty_cpu)
+ *dirty_cpu = 0;
+ if (dirty_cpu_online)
+ *dirty_cpu_online = 0;
+ if (dirty_io)
+ *dirty_io = 0;
+#endif
return res;
}
@@ -5761,8 +6350,12 @@ erts_set_schedulers_online(Process *p,
for (ix = no; ix < online; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- wake_scheduler(rq, 0);
+ wake_scheduler(rq);
}
+#ifdef ERTS_DIRTY_SCHEDULERS
+ wake_dirty_scheduler(ERTS_DIRTY_CPU_RUNQ);
+ wake_dirty_scheduler(ERTS_DIRTY_IO_RUNQ);
+#endif
}
}
@@ -5860,7 +6453,7 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
for (ix = 1; ix < online; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- wake_scheduler(rq, 0);
+ wake_scheduler(rq);
}
if (erts_smp_atomic32_read_nob(&schdlr_sspnd.active)
@@ -6110,6 +6703,92 @@ sched_thread_func(void *vesdp)
return NULL;
}
+#ifdef ERTS_DIRTY_SCHEDULERS
+#ifdef ERTS_SMP
+static void*
+sched_dirty_cpu_thread_func(void *vesdp)
+{
+ ErtsThrPrgrCallbacks callbacks;
+ ErtsSchedulerData *esdp = vesdp;
+ Uint no = esdp->dirty_no;
+ ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(no-1)->event = erts_tse_fetch();
+ callbacks.arg = (void *) esdp->ssi;
+ callbacks.wakeup = thr_prgr_wakeup;
+ callbacks.prepare_wait = NULL;
+ callbacks.wait = NULL;
+ callbacks.finalize_wait = NULL;
+
+ erts_thr_progress_register_unmanaged_thread(&callbacks);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ {
+ char buf[31];
+ erts_snprintf(&buf[0], 31, "dirty cpu scheduler %beu", no);
+ erts_lc_set_thread_name(&buf[0]);
+ }
+#endif
+ erts_tsd_set(sched_data_key, vesdp);
+#if ERTS_USE_ASYNC_READY_Q
+ esdp->aux_work_data.async_ready.queue = NULL;
+#endif
+
+ erts_proc_lock_prepare_proc_lock_waiter();
+
+#ifdef HIPE
+ hipe_thread_signal_init();
+#endif
+ erts_thread_init_float();
+
+ process_main();
+ /* No schedulers should *ever* terminate */
+ erl_exit(ERTS_ABORT_EXIT,
+ "Dirty CPU scheduler thread number %beu terminated\n",
+ no);
+ return NULL;
+}
+
+static void*
+sched_dirty_io_thread_func(void *vesdp)
+{
+ ErtsThrPrgrCallbacks callbacks;
+ ErtsSchedulerData *esdp = vesdp;
+ Uint no = esdp->dirty_no;
+ ERTS_DIRTY_IO_SCHED_SLEEP_INFO_IX(no-1)->event = erts_tse_fetch();
+ callbacks.arg = (void *) esdp->ssi;
+ callbacks.wakeup = thr_prgr_wakeup;
+ callbacks.prepare_wait = NULL;
+ callbacks.wait = NULL;
+ callbacks.finalize_wait = NULL;
+
+ erts_thr_progress_register_unmanaged_thread(&callbacks);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ {
+ char buf[31];
+ erts_snprintf(&buf[0], 31, "dirty io scheduler %beu", no);
+ erts_lc_set_thread_name(&buf[0]);
+ }
+#endif
+ erts_tsd_set(sched_data_key, vesdp);
+#if ERTS_USE_ASYNC_READY_Q
+ esdp->aux_work_data.async_ready.queue = NULL;
+#endif
+
+ erts_proc_lock_prepare_proc_lock_waiter();
+
+#ifdef HIPE
+ hipe_thread_signal_init();
+#endif
+ erts_thread_init_float();
+
+ process_main();
+ /* No schedulers should *ever* terminate */
+ erl_exit(ERTS_ABORT_EXIT,
+ "Dirty I/O scheduler thread number %beu terminated\n",
+ no);
+ return NULL;
+}
+#endif
+#endif
+
static ethr_tid aux_tid;
void
@@ -6160,6 +6839,26 @@ erts_start_schedulers(void)
erts_no_schedulers = actual;
+#ifdef ERTS_DIRTY_SCHEDULERS
+#ifdef ERTS_SMP
+ {
+ int ix;
+ for (ix = 0; ix < erts_no_dirty_cpu_schedulers; ix++) {
+ ErtsSchedulerData *esdp = ERTS_DIRTY_CPU_SCHEDULER_IX(ix);
+ res = ethr_thr_create(&esdp->tid,sched_dirty_cpu_thread_func,(void*)esdp,&opts);
+ if (res != 0)
+ erl_exit(1, "Failed to create dirty cpu scheduler thread %d\n", ix);
+ }
+ for (ix = 0; ix < erts_no_dirty_io_schedulers; ix++) {
+ ErtsSchedulerData *esdp = ERTS_DIRTY_IO_SCHEDULER_IX(ix);
+ res = ethr_thr_create(&esdp->tid,sched_dirty_io_thread_func,(void*)esdp,&opts);
+ if (res != 0)
+ erl_exit(1, "Failed to create dirty io scheduler thread %d\n", ix);
+ }
+ }
+#endif
+#endif
+
ERTS_THR_MEMORY_BARRIER;
res = ethr_thr_create(&aux_tid, aux_thread, NULL, &opts);
@@ -7148,7 +7847,8 @@ Process *schedule(Process *p, int calls)
input_reductions = INPUT_REDUCTIONS;
}
- ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
+ ERTS_SMP_LC_ASSERT(ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data())
+ || !erts_thr_progress_is_blocking());
/*
* Clean up after the process being scheduled out.
@@ -7259,42 +7959,40 @@ Process *schedule(Process *p, int calls)
}
- ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
+ ERTS_SMP_LC_ASSERT(ERTS_SCHEDULER_IS_DIRTY(esdp)
+ || !erts_thr_progress_is_blocking());
check_activities_to_run: {
#ifdef ERTS_SMP
ErtsMigrationPaths *mps;
ErtsMigrationPath *mp;
-
-#ifdef ERTS_SMP
- {
- ErtsProcList *pnd_xtrs = rq->procs.pending_exiters;
- if (erts_proclist_fetch(&pnd_xtrs, NULL)) {
- rq->procs.pending_exiters = NULL;
- erts_smp_runq_unlock(rq);
- handle_pending_exiters(pnd_xtrs);
- erts_smp_runq_lock(rq);
- }
-
+ ErtsProcList *pnd_xtrs = rq->procs.pending_exiters;
+ if (erts_proclist_fetch(&pnd_xtrs, NULL)) {
+ rq->procs.pending_exiters = NULL;
+ erts_smp_runq_unlock(rq);
+ handle_pending_exiters(pnd_xtrs);
+ erts_smp_runq_lock(rq);
}
-#endif
- if (rq->check_balance_reds <= 0)
- check_balance(rq);
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) {
+ if (rq->check_balance_reds <= 0)
+ check_balance(rq);
- ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+ ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
- mps = erts_get_migration_paths_managed();
- mp = &mps->mpath[rq->ix];
+ mps = erts_get_migration_paths_managed();
+ mp = &mps->mpath[rq->ix];
- if (mp->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK)
- immigrate(rq, mp);
+ if (mp->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK)
+ immigrate(rq, mp);
+ }
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
continue_check_activities_to_run:
flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
continue_check_activities_to_run_known_flags:
-
+ ASSERT(ERTS_SCHEDULER_IS_DIRTY(esdp)
+ || flags & ERTS_RUNQ_FLG_NONEMPTY);
if (flags & (ERTS_RUNQ_FLG_CHK_CPU_BIND|ERTS_RUNQ_FLG_SUSPENDED)) {
@@ -7309,7 +8007,7 @@ Process *schedule(Process *p, int calls)
}
}
- {
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) {
erts_aint32_t aux_work;
int leader_update = erts_thr_progress_update(esdp);
aux_work = erts_atomic32_read_acqb(&esdp->ssi->aux_work);
@@ -7321,9 +8019,9 @@ Process *schedule(Process *p, int calls)
handle_aux_work(&esdp->aux_work_data, aux_work, 0);
erts_smp_runq_lock(rq);
}
- }
- ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
+ ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
+ }
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
#else /* ERTS_SMP */
@@ -7346,20 +8044,16 @@ Process *schedule(Process *p, int calls)
rq->wakeup_other = 0;
rq->wakeup_other_reds = 0;
- empty_runq(rq);
-
flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
- if (flags & ERTS_RUNQ_FLG_SUSPENDED) {
- non_empty_runq(rq);
+ if (flags & ERTS_RUNQ_FLG_SUSPENDED)
goto continue_check_activities_to_run_known_flags;
- }
- else if (!(flags & ERTS_RUNQ_FLG_INACTIVE)) {
- if (try_steal_task(rq)) {
- non_empty_runq(rq);
+ if (flags & ERTS_RUNQ_FLG_INACTIVE)
+ empty_runq(rq);
+ else {
+ if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix) && try_steal_task(rq))
goto continue_check_activities_to_run;
- }
- (void) ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
+ empty_runq(rq);
/*
* Check for ERTS_RUNQ_FLG_SUSPENDED has to be done
@@ -7368,10 +8062,10 @@ Process *schedule(Process *p, int calls)
flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
if (flags & ERTS_RUNQ_FLG_SUSPENDED) {
non_empty_runq(rq);
+ flags |= ERTS_RUNQ_FLG_NONEMPTY;
goto continue_check_activities_to_run_known_flags;
}
}
-
#endif
scheduler_wait(&fcalls, esdp, rq);
@@ -7382,7 +8076,8 @@ Process *schedule(Process *p, int calls)
goto check_activities_to_run;
}
- else if (fcalls > input_reductions && prepare_for_sys_schedule()) {
+ else if (!ERTS_SCHEDULER_IS_DIRTY(esdp) &&
+ (fcalls > input_reductions && prepare_for_sys_schedule())) {
/*
* Schedule system-level activities.
*/
@@ -7486,6 +8181,14 @@ Process *schedule(Process *p, int calls)
psflg_band_mask = ~(((erts_aint32_t) 1) << (ERTS_PSFLGS_GET_PRQ_PRIO(state)
+ ERTS_PSFLGS_IN_PRQ_MASK_OFFSET));
+#ifdef ERTS_DIRTY_SCHEDULERS
+ /* if a non-dirty scheduler picks up a process marked as already being
+ in a dirty run queue, just drop it and go get another process */
+ if (state & (ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q|ERTS_PSFLG_DIRTY_IO_PROC_IN_Q) &&
+ !ERTS_SCHEDULER_IS_DIRTY(esdp))
+ goto pick_next_process;
+#endif
+
if (!(state & ERTS_PSFLG_PROXY))
psflg_band_mask &= ~ERTS_PSFLG_IN_RUNQ;
else {
@@ -7568,6 +8271,10 @@ Process *schedule(Process *p, int calls)
(UWord) esdp->no);
int migrated = old && old != esdp->no;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp));
+#endif
+
prio = (int) ERTS_PSFLGS_GET_USR_PRIO(state);
erts_smp_spin_lock(&erts_sched_stat.lock);
@@ -8486,6 +9193,10 @@ erts_schedule_misc_op(void (*func)(void *), void *arg)
rq->misc.start = molp;
rq->misc.end = molp;
+#ifdef ERTS_SMP
+ non_empty_runq(rq);
+#endif
+
erts_smp_runq_unlock(rq);
smp_notify_inc_runq(rq);
@@ -8755,7 +9466,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
p->htop = p->heap;
p->heap_sz = sz;
p->catches = 0;
- p->extra_root = NULL;
p->bin_vheap_sz = p->min_vheap_size;
p->bin_old_vheap_sz = p->min_vheap_size;
@@ -9372,8 +10082,15 @@ save_pending_exiter(Process *p)
erts_proclist_store_last(&rq->procs.pending_exiters, plp);
+ non_empty_runq(rq);
+
erts_smp_runq_unlock(rq);
- wake_scheduler(rq, 1);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix))
+ wake_dirty_scheduler(rq);
+ else
+#endif
+ wake_scheduler(rq);
}
#endif
@@ -10219,12 +10936,6 @@ erts_continue_exit_process(Process *p)
if (pbt)
erts_free(ERTS_ALC_T_BPD, (void *) pbt);
- if (p->extra_root != NULL) {
- (p->extra_root->cleanup)(p->extra_root); /* Should deallocate
- whole structure */
- p->extra_root = NULL;
- }
-
delete_process(p);
#ifdef ERTS_SMP