aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r--erts/emulator/beam/erl_init.c26
-rw-r--r--erts/emulator/beam/erl_process.c424
-rw-r--r--erts/emulator/beam/erl_process.h104
3 files changed, 497 insertions, 57 deletions
diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c
index 1af80dd04b..1d4f617746 100644
--- a/erts/emulator/beam/erl_init.c
+++ b/erts/emulator/beam/erl_init.c
@@ -537,6 +537,12 @@ void erts_usage(void)
erts_fprintf(stderr, " see the erl(1) documentation for more info.\n");
erts_fprintf(stderr, "-sct cput set cpu topology,\n");
erts_fprintf(stderr, " see the erl(1) documentation for more info.\n");
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT_OPT
+ erts_fprintf(stderr, "-sub bool enable/disable scheduler utilization balancing,\n");
+#else
+ erts_fprintf(stderr, "-sub false disable scheduler utilization balancing,\n");
+#endif
+ erts_fprintf(stderr, " see the erl(1) documentation for more info.\n");
erts_fprintf(stderr, "-sws val set scheduler wakeup strategy, valid values are:\n");
erts_fprintf(stderr, " default|legacy.\n");
erts_fprintf(stderr, "-swct val set scheduler wake cleanup threshold, valid values are:\n");
@@ -1512,6 +1518,26 @@ erl_start(int argc, char **argv)
erts_usage();
}
}
+ else if (has_prefix("ub", sub_param)) {
+ arg = get_arg(sub_param+2, argv[i+1], &i);
+ if (sys_strcmp("true", arg) == 0) {
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT_OPT
+ erts_sched_balance_util = 1;
+#else
+ erts_fprintf(stderr,
+ "scheduler utilization balancing not "
+ "supported on this system\n");
+ erts_usage();
+#endif
+ }
+ else if (sys_strcmp("false", arg) == 0)
+ erts_sched_balance_util = 0;
+ else {
+ erts_fprintf(stderr, "bad scheduler utilization balancing "
+ " value '%s'\n", arg);
+ erts_usage();
+ }
+ }
else if (has_prefix("wct", sub_param)) {
arg = get_arg(sub_param+3, argv[i+1], &i);
if (erts_sched_set_wake_cleanup_threshold(arg) != 0) {
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 9983a26688..2f383f4c01 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -144,6 +144,7 @@ extern BeamInstr beam_exit[];
extern BeamInstr beam_continue_exit[];
int erts_sched_compact_load;
+int erts_sched_balance_util = 0;
Uint erts_no_schedulers;
#define ERTS_THR_PRGR_LATER_CLEANUP_OP_THRESHOLD_VERY_LAZY (4*1024*1024)
@@ -608,6 +609,7 @@ erts_late_init_process(void)
static void
init_sched_wall_time(ErtsSchedWallTime *swtp)
{
+ swtp->need = erts_sched_balance_util;
swtp->enabled = 0;
swtp->start = 0;
swtp->working.total = 0;
@@ -630,27 +632,253 @@ sched_wall_time_ts(void)
#endif
}
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+
+#ifdef ARCH_64
+
+static ERTS_INLINE Uint64
+aschedtime_read(ErtsAtomicSchedTime *var)
+{
+ return (Uint64) erts_atomic_read_nob((erts_atomic_t *) var);
+}
+
+static ERTS_INLINE void
+aschedtime_set(ErtsAtomicSchedTime *var, Uint64 val)
+{
+ erts_atomic_set_nob((erts_atomic_t *) var, (erts_aint_t) val);
+}
+
+static ERTS_INLINE void
+aschedtime_init(ErtsAtomicSchedTime *var)
+{
+ erts_atomic_init_nob((erts_atomic_t *) var, (erts_aint_t) 0);
+}
+
+#elif defined(ARCH_32)
+
+static ERTS_INLINE Uint64
+aschedtime_read(ErtsAtomicSchedTime *var)
+{
+ erts_dw_aint_t dw;
+ erts_dw_atomic_read_nob((erts_dw_atomic_t *) var, &dw);
+#ifdef ETHR_SU_DW_NAINT_T__
+ return (Uint64) dw.dw_sint;
+#else
+ {
+ Uint64 res;
+ res = (Uint64) ((Uint32) dw.sint[ERTS_DW_AINT_HIGH_WORD]);
+ res <<= 32;
+ res |= (Uint64) ((Uint32) dw.sint[ERTS_DW_AINT_LOW_WORD]);
+ return res;
+ }
+#endif
+}
+
+static ERTS_INLINE void
+aschedtime_set(ErtsAtomicSchedTime *var, Uint64 val)
+{
+ erts_dw_aint_t dw;
+#ifdef ETHR_SU_DW_NAINT_T__
+ dw.dw_sint = (ETHR_SU_DW_NAINT_T__) val;
+#else
+ dw.sint[ERTS_DW_AINT_LOW_WORD] = (erts_aint_t) (val & 0xffffffff);
+ dw.sint[ERTS_DW_AINT_HIGH_WORD] = (erts_aint_t) ((val >> 32) & 0xffffffff);
+#endif
+ erts_dw_atomic_set_nob((erts_dw_atomic_t *) var, &dw);
+}
+
+static ERTS_INLINE void
+aschedtime_init(ErtsAtomicSchedTime *var)
+{
+ erts_dw_aint_t dw;
+ dw.sint[ERTS_DW_AINT_LOW_WORD] = (erts_aint_t) 0;
+ dw.sint[ERTS_DW_AINT_HIGH_WORD] = (erts_aint_t) 0;
+ erts_dw_atomic_init_nob((erts_dw_atomic_t *) var, &dw);
+}
+
+#else
+# error :-/
+#endif
+
+#define ERTS_GET_AVG_MAX_UNLOCKED_TRY 50
+#define ERTS_SCHED_AVG_UTIL_WRITE_MARKER (~((Uint64) 0))
+
+/* Intervals in nanoseconds */
+#define ERTS_SCHED_UTIL_SHORT_INTERVAL ((Uint64) 1*1000*1000*1000)
+#define ERTS_SCHED_UTIL_LONG_INTERVAL ((Uint64) 10*1000*1000*1000)
+
+
+#define ERTS_SCHED_UTIL_IGNORE_IMBALANCE_DIFF 5000 /* ppm */
+
+static ERTS_INLINE Uint64
+calc_sched_worktime(int is_working, Uint64 now, Uint64 last,
+ Uint64 interval, Uint64 old_worktime)
+{
+ Uint64 worktime;
+ Uint64 new;
+
+ if (now <= last)
+ return old_worktime;
+
+ new = now - last;
+
+ if (new >= interval)
+ return is_working ? interval : (Uint64) 0;
+
+
+ /*
+ * Division by 1000 in order to avoid
+ * overflow. If changed update assertions
+ * in init_runq_sched_util().
+ */
+ worktime = old_worktime;
+ worktime *= (interval - new)/1000;
+ worktime /= (interval/1000);
+ if (is_working)
+ worktime += new;
+
+ ASSERT(0 <= worktime && worktime <= interval);
+
+ return worktime;
+}
+
+static ERTS_INLINE void
+update_avg_sched_util(ErtsSchedulerData *esdp, Uint64 now, int is_working)
+{
+ ErtsRunQueue *rq;
+ int worked;
+ Uint64 swt, lwt, last;
+
+ rq = esdp->run_queue;
+ last = aschedtime_read(&rq->sched_util.last);
+
+ if (now <= last) {
+ ASSERT(last == ERTS_SCHED_AVG_UTIL_WRITE_MARKER);
+ return;
+ }
+
+ ASSERT(now >= last);
+
+ worked = rq->sched_util.is_working;
+
+ swt = calc_sched_worktime(worked, now, last, ERTS_SCHED_UTIL_SHORT_INTERVAL,
+ rq->sched_util.worktime.short_interval);
+ lwt = calc_sched_worktime(worked, now, last, ERTS_SCHED_UTIL_LONG_INTERVAL,
+ rq->sched_util.worktime.long_interval);
+
+ aschedtime_set(&rq->sched_util.last, ERTS_SCHED_AVG_UTIL_WRITE_MARKER);
+ ERTS_THR_WRITE_MEMORY_BARRIER;
+ rq->sched_util.is_working = is_working;
+ rq->sched_util.worktime.short_interval = swt;
+ rq->sched_util.worktime.long_interval = lwt;
+ ERTS_THR_WRITE_MEMORY_BARRIER;
+ aschedtime_set(&rq->sched_util.last, now);
+}
+
+int
+erts_get_sched_util(ErtsRunQueue *rq, int initially_locked, int short_interval)
+{
+ /* Average scheduler utilization in ppm */
+ int util, is_working, try = 0, locked = initially_locked;
+ Uint64 worktime, old_worktime, now, last, interval, *old_worktimep;
+
+ if (short_interval) {
+ old_worktimep = &rq->sched_util.worktime.short_interval;
+ interval = ERTS_SCHED_UTIL_SHORT_INTERVAL;
+ }
+ else {
+ old_worktimep = &rq->sched_util.worktime.long_interval;
+ interval = ERTS_SCHED_UTIL_LONG_INTERVAL;
+ }
+
+ while (1) {
+ Uint64 chk_last;
+ last = aschedtime_read(&rq->sched_util.last);
+ ERTS_THR_READ_MEMORY_BARRIER;
+ is_working = rq->sched_util.is_working;
+ old_worktime = *old_worktimep;
+ ERTS_THR_READ_MEMORY_BARRIER;
+ chk_last = aschedtime_read(&rq->sched_util.last);
+ if (chk_last == last)
+ break;
+ if (!locked) {
+ if (++try >= ERTS_GET_AVG_MAX_UNLOCKED_TRY) {
+ /* Writer will eventually block on runq-lock */
+ erts_smp_runq_lock(rq);
+ locked = 1;
+ }
+ }
+ }
+
+ if (!initially_locked && locked)
+ erts_smp_runq_unlock(rq);
+
+ now = sched_wall_time_ts();
+ worktime = calc_sched_worktime(is_working, now, last, interval, old_worktime);
+
+ util = (int) ((worktime * 1000000)/interval);
+
+ ASSERT(0 <= util && util <= 1000000);
+
+ return util;
+}
+
+static void
+init_runq_sched_util(ErtsRunQueueSchedUtil *rqsu, int enabled)
+{
+ aschedtime_init(&rqsu->last);
+ if (!enabled)
+ aschedtime_set(&rqsu->last, ERTS_SCHED_AVG_UTIL_WRITE_MARKER);
+ rqsu->is_working = 0;
+ rqsu->worktime.short_interval = (Uint64) 0;
+ rqsu->worktime.long_interval = (Uint64) 0;
+
+#ifdef DEBUG
+ {
+ Uint64 intrvl;
+ /*
+ * If one of these asserts fail we may have
+ * overflow in calc_sched_worktime(). Which
+ * have to be fixed either by shrinking
+ * interval size, or fix calculation of
+ * worktime in calc_sched_worktime().
+ */
+ intrvl = ERTS_SCHED_UTIL_SHORT_INTERVAL;
+ ASSERT(intrvl*(intrvl/1000) > intrvl);
+ intrvl = ERTS_SCHED_UTIL_LONG_INTERVAL;
+ ASSERT(intrvl*(intrvl/1000) > intrvl);
+ }
+#endif
+}
+
+#endif /* ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT */
+
static ERTS_INLINE void
sched_wall_time_change(ErtsSchedulerData *esdp, int working)
{
- if (esdp->sched_wall_time.enabled) {
+ if (esdp->sched_wall_time.need) {
Uint64 ts = sched_wall_time_ts();
- if (working) {
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ update_avg_sched_util(esdp, ts, working);
+#endif
+ if (esdp->sched_wall_time.enabled) {
+ if (working) {
#ifdef DEBUG
- ASSERT(!esdp->sched_wall_time.working.currently);
- esdp->sched_wall_time.working.currently = 1;
+ ASSERT(!esdp->sched_wall_time.working.currently);
+ esdp->sched_wall_time.working.currently = 1;
#endif
- ts -= esdp->sched_wall_time.start;
- esdp->sched_wall_time.working.start = ts;
- }
- else {
+ ts -= esdp->sched_wall_time.start;
+ esdp->sched_wall_time.working.start = ts;
+ }
+ else {
#ifdef DEBUG
- ASSERT(esdp->sched_wall_time.working.currently);
- esdp->sched_wall_time.working.currently = 0;
+ ASSERT(esdp->sched_wall_time.working.currently);
+ esdp->sched_wall_time.working.currently = 0;
#endif
- ts -= esdp->sched_wall_time.start;
- ts -= esdp->sched_wall_time.working.start;
- esdp->sched_wall_time.working.total += ts;
+ ts -= esdp->sched_wall_time.start;
+ ts -= esdp->sched_wall_time.working.start;
+ esdp->sched_wall_time.working.total += ts;
+ }
}
}
}
@@ -705,10 +933,13 @@ reply_sched_wall_time(void *vswtrp)
ASSERT(esdp);
if (swtrp->set) {
- if (!swtrp->enable && esdp->sched_wall_time.enabled)
+ if (!swtrp->enable && esdp->sched_wall_time.enabled) {
+ esdp->sched_wall_time.need = erts_sched_balance_util;
esdp->sched_wall_time.enabled = 0;
+ }
else if (swtrp->enable && !esdp->sched_wall_time.enabled) {
Uint64 ts = sched_wall_time_ts();
+ esdp->sched_wall_time.need = 1;
esdp->sched_wall_time.enabled = 1;
esdp->sched_wall_time.start = ts;
esdp->sched_wall_time.working.total = 0;
@@ -2084,9 +2315,8 @@ ongoing_multi_scheduling_block(void)
}
static ERTS_INLINE void
-empty_runq(ErtsRunQueue *rq)
+empty_runq_aux(ErtsRunQueue *rq, Uint32 old_flags)
{
- Uint32 old_flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED);
if (old_flags & ERTS_RUNQ_FLG_NONEMPTY) {
#ifdef DEBUG
erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues);
@@ -2107,6 +2337,23 @@ empty_runq(ErtsRunQueue *rq)
}
static ERTS_INLINE void
+empty_runq(ErtsRunQueue *rq)
+{
+ Uint32 old_flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED);
+ empty_runq_aux(rq, old_flags);
+}
+
+static ERTS_INLINE Uint32
+empty_protected_runq(ErtsRunQueue *rq)
+{
+ Uint32 old_flags = ERTS_RUNQ_FLGS_BSET(rq,
+ ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED,
+ ERTS_RUNQ_FLG_PROTECTED);
+ empty_runq_aux(rq, old_flags);
+ return old_flags;
+}
+
+static ERTS_INLINE void
non_empty_runq(ErtsRunQueue *rq)
{
Uint32 old_flags = ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_NONEMPTY);
@@ -2130,6 +2377,18 @@ non_empty_runq(ErtsRunQueue *rq)
}
}
+void
+erts_empty_runq(ErtsRunQueue *rq)
+{
+ empty_runq(rq);
+}
+
+void
+erts_non_empty_runq(ErtsRunQueue *rq)
+{
+ non_empty_runq(rq);
+}
+
static erts_aint32_t
sched_prep_spin_wait(ErtsSchedulerSleepInfo *ssi)
{
@@ -2632,7 +2891,7 @@ ssi_flags_set_wake(ErtsSchedulerSleepInfo *ssi)
}
static void
-wake_scheduler(ErtsRunQueue *rq, int incq)
+wake_scheduler(ErtsRunQueue *rq)
{
ErtsSchedulerSleepInfo *ssi;
erts_aint32_t flgs;
@@ -2651,9 +2910,6 @@ wake_scheduler(ErtsRunQueue *rq, int incq)
flgs = ssi_flags_set_wake(ssi);
erts_sched_finish_poke(ssi, flgs);
-
- if (incq && (flgs & ERTS_SSI_FLG_WAITING))
- non_empty_runq(rq);
}
#define ERTS_NO_USED_RUNQS_SHIFT 16
@@ -2744,7 +3000,7 @@ chk_wake_sched(ErtsRunQueue *crq, int ix, int activate)
if (try_inc_no_active_runqs(ix+1))
(void) ERTS_RUNQ_FLGS_UNSET(wrq, ERTS_RUNQ_FLG_INACTIVE);
}
- wake_scheduler(wrq, 0);
+ wake_scheduler(wrq);
return 1;
}
return 0;
@@ -2792,7 +3048,7 @@ smp_notify_inc_runq(ErtsRunQueue *runq)
{
#ifdef ERTS_SMP
if (runq)
- wake_scheduler(runq, 1);
+ wake_scheduler(runq);
#endif
}
@@ -2810,7 +3066,7 @@ erts_sched_notify_check_cpu_bind(void)
for (ix = 0; ix < erts_no_run_queues; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
(void) ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_CHK_CPU_BIND);
- wake_scheduler(rq, 0);
+ wake_scheduler(rq);
}
#else
erts_sched_check_cpu_bind(erts_get_scheduler_data());
@@ -2938,6 +3194,11 @@ check_immigration_need(ErtsRunQueue *c_rq, ErtsMigrationPath *mp, int prio)
if (!f_rq)
return NULL;
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (mp->sched_util)
+ return NULL;
+#endif
+
f_rq_flags = ERTS_RUNQ_FLGS_GET(f_rq);
if (f_rq_flags & ERTS_RUNQ_FLG_PROTECTED)
return NULL;
@@ -3077,7 +3338,7 @@ suspend_run_queue(ErtsRunQueue *rq)
ERTS_SSI_FLG_SUSPENDED);
(void) ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_SUSPENDED);
- wake_scheduler(rq, 0);
+ wake_scheduler(rq);
}
static void scheduler_ix_resume_wake(Uint ix);
@@ -3169,6 +3430,9 @@ evacuate_run_queue(ErtsRunQueue *rq,
to_rq->misc.start = start;
to_rq->misc.end = end;
+
+ non_empty_runq(to_rq);
+
erts_smp_runq_unlock(to_rq);
smp_notify_inc_runq(to_rq);
erts_smp_runq_lock(to_rq);
@@ -3381,7 +3645,7 @@ try_steal_task(ErtsRunQueue *rq)
Uint32 flags;
/* Protect jobs we steal from getting stolen from us... */
- flags = ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_PROTECTED);
+ flags = empty_protected_runq(rq);
if (flags & ERTS_RUNQ_FLG_SUSPENDED)
return 0; /* go suspend instead... */
@@ -3460,6 +3724,9 @@ typedef struct {
int full_reds_history_change;
int oowc;
int max_len;
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ int sched_util;
+#endif
} ErtsRunQueueBalance;
static ErtsRunQueueBalance *run_queue_info;
@@ -3623,6 +3890,9 @@ check_balance(ErtsRunQueue *c_rq)
Sint64 scheds_reds, full_scheds_reds;
int forced, active, current_active, oowc, half_full_scheds, full_scheds,
mmax_len, blnc_no_rqs, qix, pix, freds_hist_ix;
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ int sched_util_balancing;
+#endif
if (erts_smp_atomic32_xchg_nob(&balance_info.checking_balance, 1)) {
c_rq->check_balance_reds = INT_MAX;
@@ -3678,6 +3948,10 @@ check_balance(ErtsRunQueue *c_rq)
return;
}
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ sched_util_balancing = 0;
+#endif
+
freds_hist_ix = balance_info.full_reds_history_index;
balance_info.full_reds_history_index++;
if (balance_info.full_reds_history_index >= ERTS_FULL_REDS_HISTORY_SIZE)
@@ -3708,7 +3982,12 @@ check_balance(ErtsRunQueue *c_rq)
run_queue_info[qix].oowc = rq->out_of_work_count;
run_queue_info[qix].max_len = rq->max_len;
rq->check_balance_reds = INT_MAX;
-
+
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (erts_sched_balance_util)
+ run_queue_info[qix].sched_util = erts_get_sched_util(rq, 1, 0);
+#endif
+
erts_smp_runq_unlock(rq);
}
@@ -3778,8 +4057,38 @@ check_balance(ErtsRunQueue *c_rq)
mmax_len = run_queue_info[qix].max_len;
}
- if (!erts_sched_compact_load)
+ if (!erts_sched_compact_load) {
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (erts_sched_balance_util && full_scheds < blnc_no_rqs) {
+ int avg_util = 0;
+
+ for (qix = 0; qix < blnc_no_rqs; qix++)
+ avg_util += run_queue_info[qix].sched_util;
+
+ avg_util /= blnc_no_rqs; /* in ppm */
+
+ sched_util_balancing = 1;
+ /*
+ * In order to avoid renaming a large amount of fields
+ * we write utilization values instead of lenght values
+ * in the 'max_len' and 'migration_limit' fields...
+ */
+ for (qix = 0; qix < blnc_no_rqs; qix++) {
+ run_queue_info[qix].flags = 0; /* Reset for later use... */
+ for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
+ run_queue_info[qix].prio[pix].emigrate_to = -1;
+ run_queue_info[qix].prio[pix].immigrate_from = -1;
+ run_queue_info[qix].prio[pix].avail = 100;
+ run_queue_info[qix].prio[pix].max_len = run_queue_info[qix].sched_util;
+ run_queue_info[qix].prio[pix].migration_limit = avg_util;
+ }
+ }
+ active = blnc_no_rqs;
+ goto setup_migration_paths;
+ }
+#endif
goto all_active;
+ }
if (!forced && half_full_scheds != blnc_no_rqs) {
int min = 1;
@@ -3896,15 +4205,30 @@ check_balance(ErtsRunQueue *c_rq)
}
}
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ setup_migration_paths:
+#endif
+
/* Setup migration paths for all priorities */
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
int low = 0, high = 0;
for (qix = 0; qix < blnc_no_rqs; qix++) {
int len_diff = run_queue_info[qix].prio[pix].max_len;
len_diff -= run_queue_info[qix].prio[pix].migration_limit;
+
#ifdef DBG_PRINT
if (pix == 2) erts_fprintf(stderr, "%d ", len_diff);
#endif
+
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (sched_util_balancing
+ && -ERTS_SCHED_UTIL_IGNORE_IMBALANCE_DIFF <= len_diff
+ && len_diff <= ERTS_SCHED_UTIL_IGNORE_IMBALANCE_DIFF) {
+ /* ignore minor imbalance */
+ len_diff = 0;
+ }
+#endif
+
run_queue_compare[qix].qix = qix;
run_queue_compare[qix].len = len_diff;
if (len_diff != 0) {
@@ -4031,6 +4355,9 @@ erts_fprintf(stderr, "--------------------------------\n");
Uint32 flags = run_queue_info[qix].flags;
ErtsMigrationPath *mp = &new_mpaths->mpath[qix];
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ mp->sched_util = sched_util_balancing;
+#endif
mp->flags = flags;
mp->misc_evac_runq = NULL;
@@ -4628,6 +4955,11 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
set_wakeup_other_data();
#endif
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (erts_sched_balance_util)
+ erts_sched_compact_load = 0;
+#endif
+
ASSERT(no_schedulers_online <= no_schedulers);
ASSERT(no_schedulers_online >= 1);
ASSERT(no_schedulers >= 1);
@@ -4696,6 +5028,11 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
rq->ports.info.reds = 0;
rq->ports.start = NULL;
rq->ports.end = NULL;
+
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ init_runq_sched_util(&rq->sched_util, erts_sched_balance_util);
+#endif
+
}
#ifdef ERTS_SMP
@@ -4794,6 +5131,7 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
esdp->reductions = 0;
init_sched_wall_time(&esdp->sched_wall_time);
+
erts_port_task_handle_init(&esdp->nosuspend_port_task_handle);
}
@@ -5761,7 +6099,7 @@ erts_set_schedulers_online(Process *p,
for (ix = no; ix < online; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- wake_scheduler(rq, 0);
+ wake_scheduler(rq);
}
}
}
@@ -5860,7 +6198,7 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
for (ix = 1; ix < online; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- wake_scheduler(rq, 0);
+ wake_scheduler(rq);
}
if (erts_smp_atomic32_read_nob(&schdlr_sspnd.active)
@@ -7294,7 +7632,7 @@ Process *schedule(Process *p, int calls)
continue_check_activities_to_run:
flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
continue_check_activities_to_run_known_flags:
-
+ ASSERT(flags & ERTS_RUNQ_FLG_NONEMPTY);
if (flags & (ERTS_RUNQ_FLG_CHK_CPU_BIND|ERTS_RUNQ_FLG_SUSPENDED)) {
@@ -7346,20 +7684,16 @@ Process *schedule(Process *p, int calls)
rq->wakeup_other = 0;
rq->wakeup_other_reds = 0;
- empty_runq(rq);
-
flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
- if (flags & ERTS_RUNQ_FLG_SUSPENDED) {
- non_empty_runq(rq);
+ if (flags & ERTS_RUNQ_FLG_SUSPENDED)
goto continue_check_activities_to_run_known_flags;
- }
- else if (!(flags & ERTS_RUNQ_FLG_INACTIVE)) {
- if (try_steal_task(rq)) {
- non_empty_runq(rq);
+ if (flags & ERTS_RUNQ_FLG_INACTIVE)
+ empty_runq(rq);
+ else {
+ if (try_steal_task(rq))
goto continue_check_activities_to_run;
- }
- (void) ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
+ empty_runq(rq);
/*
* Check for ERTS_RUNQ_FLG_SUSPENDED has to be done
@@ -7371,7 +7705,6 @@ Process *schedule(Process *p, int calls)
goto continue_check_activities_to_run_known_flags;
}
}
-
#endif
scheduler_wait(&fcalls, esdp, rq);
@@ -8486,6 +8819,10 @@ erts_schedule_misc_op(void (*func)(void *), void *arg)
rq->misc.start = molp;
rq->misc.end = molp;
+#ifdef ERTS_SMP
+ non_empty_runq(rq);
+#endif
+
erts_smp_runq_unlock(rq);
smp_notify_inc_runq(rq);
@@ -9371,8 +9708,11 @@ save_pending_exiter(Process *p)
erts_proclist_store_last(&rq->procs.pending_exiters, plp);
+ non_empty_runq(rq);
+
erts_smp_runq_unlock(rq);
- wake_scheduler(rq, 1);
+
+ wake_scheduler(rq);
}
#endif
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index e35d1c785c..6155f99b85 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -70,6 +70,9 @@ typedef struct process Process;
struct ErtsNodesMonitor_;
+#define ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT_OPT 0
+#define ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT 0
+
#define ERTS_MAX_NO_OF_SCHEDULERS 1024
#define ERTS_DEFAULT_MAX_PROCESSES (1 << 18)
@@ -98,6 +101,7 @@ struct saved_calls {
extern Export exp_send, exp_receive, exp_timeout;
extern int erts_sched_compact_load;
+extern int erts_sched_balance_util;
extern Uint erts_no_schedulers;
extern Uint erts_no_run_queues;
extern int erts_sched_thread_suggested_stack_size;
@@ -198,6 +202,10 @@ extern int erts_sched_thread_suggested_stack_size;
#define ERTS_RUNQ_FLGS_SET(RQ, FLGS) \
((Uint32) erts_smp_atomic32_read_bor_relb(&(RQ)->flags, \
(erts_aint32_t) (FLGS)))
+#define ERTS_RUNQ_FLGS_BSET(RQ, MSK, FLGS) \
+ ((Uint32) erts_smp_atomic32_read_bset_relb(&(RQ)->flags, \
+ (erts_aint32_t) (MSK), \
+ (erts_aint32_t) (FLGS)))
#define ERTS_RUNQ_FLGS_UNSET(RQ, FLGS) \
((Uint32) erts_smp_atomic32_read_band_relb(&(RQ)->flags, \
(erts_aint32_t) ~(FLGS)))
@@ -316,9 +324,40 @@ typedef struct {
int reds;
} ErtsRunQueueInfo;
+
+#ifdef HAVE_GETHRTIME
+# undef ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT_OPT
+# define ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT_OPT 1
+#endif
+
#ifdef ERTS_SMP
+#undef ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+#define ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT_OPT
+
+#ifdef ARCH_64
+typedef erts_atomic_t ErtsAtomicSchedTime;
+#elif defined(ARCH_32)
+typedef erts_dw_atomic_t ErtsAtomicSchedTime;
+#else
+# error :-/
+#endif
+
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+typedef struct {
+ ErtsAtomicSchedTime last;
+ struct {
+ Uint64 short_interval;
+ Uint64 long_interval;
+ } worktime;
+ int is_working;
+} ErtsRunQueueSchedUtil;
+#endif
+
typedef struct {
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ int sched_util;
+#endif
Uint32 flags;
ErtsRunQueue *misc_evac_runq;
struct {
@@ -385,6 +424,9 @@ struct ErtsRunQueue_ {
Port *start;
Port *end;
} ports;
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ ErtsRunQueueSchedUtil sched_util;
+#endif
};
#ifdef ERTS_SMP
@@ -414,6 +456,7 @@ do { \
} while (0)
typedef struct {
+ int need; /* "+sbu true" or scheduler_wall_time enabled */
int enabled;
Uint64 start;
struct {
@@ -542,6 +585,12 @@ int erts_smp_lc_runq_is_locked(ErtsRunQueue *);
#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
+#ifdef ERTS_SMP
+void erts_empty_runq(ErtsRunQueue *rq);
+void erts_non_empty_runq(ErtsRunQueue *rq);
+#endif
+
+
/*
* Run queue locked during modifications. We use atomic ops since
* other threads peek at values without run queue lock.
@@ -574,6 +623,10 @@ erts_smp_inc_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio)
erts_smp_atomic32_set_relb(&rqi->len, len);
+#ifdef ERTS_SMP
+ if (rq->len == 0)
+ erts_non_empty_runq(rq);
+#endif
rq->len++;
if (rq->max_len < rq->len)
rq->max_len = len;
@@ -1686,6 +1739,13 @@ erts_proc_set_error_handler(Process *p, ErtsProcLocks plocks, Eterm handler)
extern erts_atomic_t erts_migration_paths;
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+int erts_get_sched_util(ErtsRunQueue *rq,
+ int initially_locked,
+ int short_interval);
+#endif
+
+
ERTS_GLB_INLINE ErtsMigrationPaths *erts_get_migration_paths_managed(void);
ERTS_GLB_INLINE ErtsMigrationPaths *erts_get_migration_paths(void);
ERTS_GLB_INLINE ErtsRunQueue *erts_check_emigration_need(ErtsRunQueue *c_rq,
@@ -1737,22 +1797,36 @@ erts_check_emigration_need(ErtsRunQueue *c_rq, int prio)
return mp->prio[prio].runq;
}
-
- if (prio == ERTS_PORT_PRIO_LEVEL)
- len = RUNQ_READ_LEN(&c_rq->ports.info.len);
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (mp->sched_util) {
+ ErtsRunQueue *rq = mp->prio[prio].runq;
+ /* No migration if other is non-empty */
+ if (!(ERTS_RUNQ_FLGS_GET(rq) & ERTS_RUNQ_FLG_NONEMPTY)
+ && erts_get_sched_util(rq, 0, 1) < mp->prio[prio].limit.other
+ && erts_get_sched_util(c_rq, 0, 1) > mp->prio[prio].limit.this) {
+ return rq;
+ }
+ }
else
- len = RUNQ_READ_LEN(&c_rq->procs.prio_info[prio].len);
-
- if (len > mp->prio[prio].limit.this) {
- ErtsRunQueue *n_rq = mp->prio[prio].runq;
- if (n_rq) {
- if (prio == ERTS_PORT_PRIO_LEVEL)
- len = RUNQ_READ_LEN(&n_rq->ports.info.len);
- else
- len = RUNQ_READ_LEN(&n_rq->procs.prio_info[prio].len);
-
- if (len < mp->prio[prio].limit.other)
- return n_rq;
+#endif
+ {
+
+ if (prio == ERTS_PORT_PRIO_LEVEL)
+ len = RUNQ_READ_LEN(&c_rq->ports.info.len);
+ else
+ len = RUNQ_READ_LEN(&c_rq->procs.prio_info[prio].len);
+
+ if (len > mp->prio[prio].limit.this) {
+ ErtsRunQueue *n_rq = mp->prio[prio].runq;
+ if (n_rq) {
+ if (prio == ERTS_PORT_PRIO_LEVEL)
+ len = RUNQ_READ_LEN(&n_rq->ports.info.len);
+ else
+ len = RUNQ_READ_LEN(&n_rq->procs.prio_info[prio].len);
+
+ if (len < mp->prio[prio].limit.other)
+ return n_rq;
+ }
}
}
}