aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_process.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_process.c')
-rw-r--r--erts/emulator/beam/erl_process.c3630
1 files changed, 3306 insertions, 324 deletions
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 1efd070afd..37e1d07107 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1996-2013. All Rights Reserved.
+ * Copyright Ericsson AB 1996-2014. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
@@ -44,6 +44,7 @@
#include "dtrace-wrapper.h"
#include "erl_ptab.h"
+
#define ERTS_DELAYED_WAKEUP_INFINITY (~(Uint64) 0)
#define ERTS_DELAYED_WAKEUP_REDUCTIONS ((Uint64) CONTEXT_REDS/2)
@@ -53,7 +54,11 @@
#define ERTS_PROC_MIN_CONTEXT_SWITCH_REDS_COST (CONTEXT_REDS/10)
+#ifndef ERTS_SCHED_MIN_SPIN
#define ERTS_SCHED_SPIN_UNTIL_YIELD 100
+#else
+#define ERTS_SCHED_SPIN_UNTIL_YIELD 1
+#endif
#define ERTS_SCHED_SYS_SLEEP_SPINCOUNT_VERY_LONG 40
#define ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_VERY_LONG 1000
@@ -144,7 +149,12 @@ extern BeamInstr beam_exit[];
extern BeamInstr beam_continue_exit[];
int erts_sched_compact_load;
+int erts_sched_balance_util = 0;
Uint erts_no_schedulers;
+#ifdef ERTS_DIRTY_SCHEDULERS
+Uint erts_no_dirty_cpu_schedulers;
+Uint erts_no_dirty_io_schedulers;
+#endif
#define ERTS_THR_PRGR_LATER_CLEANUP_OP_THRESHOLD_VERY_LAZY (4*1024*1024)
#define ERTS_THR_PRGR_LATER_CLEANUP_OP_THRESHOLD_LAZY (512*1024)
@@ -182,6 +192,13 @@ static ErtsAuxWorkData *aux_thread_aux_work_data;
#define ERTS_SCHDLR_SSPND_CHNG_SET(VAL, OLD_VAL) \
erts_smp_atomic32_set_nob(&schdlr_sspnd.changing, (VAL))
+#ifdef ERTS_DIRTY_SCHEDULERS
+#define ERTS_SCHDLR_SSPND_DIRTY_CPU_CHNG_SET(VAL, OLD_VAL) \
+ erts_smp_atomic32_set_nob(&schdlr_sspnd.dirty_cpu_changing, (VAL))
+#define ERTS_SCHDLR_SSPND_DIRTY_IO_CHNG_SET(VAL, OLD_VAL) \
+ erts_smp_atomic32_set_nob(&schdlr_sspnd.dirty_io_changing, (VAL))
+#endif
+
#else
#define ERTS_SCHDLR_SSPND_CHNG_SET(VAL, OLD_VAL) \
@@ -192,6 +209,23 @@ do { \
ASSERT(old_val__ == (OLD_VAL)); \
} while (0)
+#ifdef ERTS_DIRTY_SCHEDULERS
+#define ERTS_SCHDLR_SSPND_DIRTY_CPU_CHNG_SET(VAL, OLD_VAL) \
+do { \
+ erts_aint32_t old_val__; \
+ old_val__ = erts_smp_atomic32_xchg_nob(&schdlr_sspnd.dirty_cpu_changing, \
+ (VAL)); \
+ ASSERT(old_val__ == (OLD_VAL)); \
+} while (0)
+#define ERTS_SCHDLR_SSPND_DIRTY_IO_CHNG_SET(VAL, OLD_VAL) \
+do { \
+ erts_aint32_t old_val__; \
+ old_val__ = erts_smp_atomic32_xchg_nob(&schdlr_sspnd.dirty_io_changing, \
+ (VAL)); \
+ ASSERT(old_val__ == (OLD_VAL)); \
+} while (0)
+#endif
+
#endif
@@ -201,11 +235,29 @@ static struct {
int online;
int curr_online;
int wait_curr_online;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ int dirty_cpu_online;
+ int dirty_cpu_curr_online;
+ int dirty_cpu_wait_curr_online;
+ int dirty_io_online;
+ int dirty_io_curr_online;
+ int dirty_io_wait_curr_online;
+#endif
erts_smp_atomic32_t changing;
erts_smp_atomic32_t active;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ erts_smp_atomic32_t dirty_cpu_changing;
+ erts_smp_atomic32_t dirty_cpu_active;
+ erts_smp_atomic32_t dirty_io_changing;
+ erts_smp_atomic32_t dirty_io_active;
+#endif
struct {
int ongoing;
long wait_active;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ long dirty_cpu_wait_active;
+ long dirty_io_wait_active;
+#endif
ErtsProcList *procs;
} msb; /* Multi Scheduling Block */
} schdlr_sspnd;
@@ -258,6 +310,10 @@ ErtsAlignedRunQueue *erts_aligned_run_queues;
Uint erts_no_run_queues;
ErtsAlignedSchedulerData *erts_aligned_scheduler_data;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ErtsAlignedSchedulerData *erts_aligned_dirty_cpu_scheduler_data;
+ErtsAlignedSchedulerData *erts_aligned_dirty_io_scheduler_data;
+#endif
typedef union {
ErtsSchedulerSleepInfo ssi;
@@ -265,6 +321,12 @@ typedef union {
} ErtsAlignedSchedulerSleepInfo;
static ErtsAlignedSchedulerSleepInfo *aligned_sched_sleep_info;
+#ifdef ERTS_DIRTY_SCHEDULERS
+#ifdef ERTS_SMP
+static ErtsAlignedSchedulerSleepInfo *aligned_dirty_cpu_sched_sleep_info;
+static ErtsAlignedSchedulerSleepInfo *aligned_dirty_io_sched_sleep_info;
+#endif
+#endif
static Uint last_reductions;
static Uint last_exact_reductions;
@@ -283,6 +345,40 @@ struct erts_system_profile_flags_t erts_system_profile_flags;
#error "Need to store process_count in another type"
#endif
+typedef enum {
+ ERTS_PSTT_GC, /* Garbage Collect */
+ ERTS_PSTT_CPC /* Check Process Code */
+} ErtsProcSysTaskType;
+
+#define ERTS_MAX_PROC_SYS_TASK_ARGS 2
+
+struct ErtsProcSysTask_ {
+ ErtsProcSysTask *next;
+ ErtsProcSysTask *prev;
+ ErtsProcSysTaskType type;
+ Eterm requester;
+ Eterm reply_tag;
+ Eterm req_id;
+ Uint req_id_sz;
+ Eterm arg[ERTS_MAX_PROC_SYS_TASK_ARGS];
+ ErlOffHeap off_heap;
+ Eterm heap[1];
+};
+
+#define ERTS_PROC_SYS_TASK_SIZE(HSz) \
+ (sizeof(ErtsProcSysTask) - sizeof(Eterm) + sizeof(Eterm)*(HSz))
+
+struct ErtsProcSysTaskQs_ {
+ int qmask;
+ int ncount;
+ ErtsProcSysTask *q[ERTS_NO_PROC_PRIO_LEVELS];
+};
+
+ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(proc_sys_task_queues,
+ ErtsProcSysTaskQs,
+ 50,
+ ERTS_ALC_T_PROC_SYS_TSK_QS)
+
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_op_list,
ErtsMiscOpList,
10,
@@ -297,6 +393,16 @@ ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(proclist,
(ASSERT(-1 <= ((int) (IX)) \
&& ((int) (IX)) < ((int) erts_no_schedulers)), \
&aligned_sched_sleep_info[(IX)].ssi)
+#ifdef ERTS_DIRTY_SCHEDULERS
+#define ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(IX) \
+ (ASSERT(0 <= ((int) (IX)) \
+ && ((int) (IX)) < ((int) erts_no_dirty_cpu_schedulers)), \
+ &aligned_dirty_cpu_sched_sleep_info[(IX)].ssi)
+#define ERTS_DIRTY_IO_SCHED_SLEEP_INFO_IX(IX) \
+ (ASSERT(0 <= ((int) (IX)) \
+ && ((int) (IX)) < ((int) erts_no_dirty_io_schedulers)), \
+ &aligned_dirty_io_sched_sleep_info[(IX)].ssi)
+#endif
#define ERTS_FOREACH_RUNQ(RQVAR, DO) \
do { \
@@ -353,6 +459,14 @@ static void aux_work_timeout_early_init(int no_schedulers);
static void aux_work_timeout_late_init(void);
static void setup_aux_work_timer(void);
+static int execute_sys_tasks(Process *c_p,
+ erts_aint32_t *statep,
+ int in_reds);
+static int cleanup_sys_tasks(Process *c_p,
+ erts_aint32_t in_state,
+ int in_reds);
+
+
#if defined(DEBUG) || 0
#define ERTS_DBG_CHK_AUX_WORK_VAL(V) dbg_chk_aux_work_val((V))
static void
@@ -399,7 +513,7 @@ dbg_chk_aux_work_val(erts_aint32_t value)
#ifdef ERTS_SMP
static void handle_pending_exiters(ErtsProcList *);
-
+static void wake_scheduler(ErtsRunQueue *rq);
#endif
#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
@@ -439,7 +553,7 @@ void
erts_pre_init_process(void)
{
#ifdef USE_THREADS
- erts_tsd_key_create(&sched_data_key);
+ erts_tsd_key_create(&sched_data_key, "erts_sched_data_key");
#endif
#ifdef ERTS_ENABLE_LOCK_CHECK
@@ -471,6 +585,18 @@ erts_pre_init_process(void)
erts_psd_required_locks[ERTS_PSD_CALL_TIME_BP].set_locks
= ERTS_PSD_CALL_TIME_BP_SET_LOCKS;
+ erts_psd_required_locks[ERTS_PSD_DELAYED_GC_TASK_QS].get_locks
+ = ERTS_PSD_DELAYED_GC_TASK_QS_GET_LOCKS;
+ erts_psd_required_locks[ERTS_PSD_DELAYED_GC_TASK_QS].set_locks
+ = ERTS_PSD_DELAYED_GC_TASK_QS_SET_LOCKS;
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+ erts_psd_required_locks[ERTS_PSD_DIRTY_SCHED_TRAP_EXPORT].get_locks
+ = ERTS_PSD_DIRTY_SCHED_TRAP_EXPORT_GET_LOCKS;
+ erts_psd_required_locks[ERTS_PSD_DIRTY_SCHED_TRAP_EXPORT].set_locks
+ = ERTS_PSD_DIRTY_SCHED_TRAP_EXPORT_SET_LOCKS;
+#endif
+
/* Check that we have locks for all entries */
for (ix = 0; ix < ERTS_PSD_SIZE; ix++) {
ERTS_SMP_LC_ASSERT(erts_psd_required_locks[ix].get_locks);
@@ -561,6 +687,7 @@ erts_late_init_process(void)
static void
init_sched_wall_time(ErtsSchedWallTime *swtp)
{
+ swtp->need = erts_sched_balance_util;
swtp->enabled = 0;
swtp->start = 0;
swtp->working.total = 0;
@@ -583,27 +710,253 @@ sched_wall_time_ts(void)
#endif
}
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+
+#ifdef ARCH_64
+
+static ERTS_INLINE Uint64
+aschedtime_read(ErtsAtomicSchedTime *var)
+{
+ return (Uint64) erts_atomic_read_nob((erts_atomic_t *) var);
+}
+
+static ERTS_INLINE void
+aschedtime_set(ErtsAtomicSchedTime *var, Uint64 val)
+{
+ erts_atomic_set_nob((erts_atomic_t *) var, (erts_aint_t) val);
+}
+
+static ERTS_INLINE void
+aschedtime_init(ErtsAtomicSchedTime *var)
+{
+ erts_atomic_init_nob((erts_atomic_t *) var, (erts_aint_t) 0);
+}
+
+#elif defined(ARCH_32)
+
+static ERTS_INLINE Uint64
+aschedtime_read(ErtsAtomicSchedTime *var)
+{
+ erts_dw_aint_t dw;
+ erts_dw_atomic_read_nob((erts_dw_atomic_t *) var, &dw);
+#ifdef ETHR_SU_DW_NAINT_T__
+ return (Uint64) dw.dw_sint;
+#else
+ {
+ Uint64 res;
+ res = (Uint64) ((Uint32) dw.sint[ERTS_DW_AINT_HIGH_WORD]);
+ res <<= 32;
+ res |= (Uint64) ((Uint32) dw.sint[ERTS_DW_AINT_LOW_WORD]);
+ return res;
+ }
+#endif
+}
+
+static ERTS_INLINE void
+aschedtime_set(ErtsAtomicSchedTime *var, Uint64 val)
+{
+ erts_dw_aint_t dw;
+#ifdef ETHR_SU_DW_NAINT_T__
+ dw.dw_sint = (ETHR_SU_DW_NAINT_T__) val;
+#else
+ dw.sint[ERTS_DW_AINT_LOW_WORD] = (erts_aint_t) (val & 0xffffffff);
+ dw.sint[ERTS_DW_AINT_HIGH_WORD] = (erts_aint_t) ((val >> 32) & 0xffffffff);
+#endif
+ erts_dw_atomic_set_nob((erts_dw_atomic_t *) var, &dw);
+}
+
+static ERTS_INLINE void
+aschedtime_init(ErtsAtomicSchedTime *var)
+{
+ erts_dw_aint_t dw;
+ dw.sint[ERTS_DW_AINT_LOW_WORD] = (erts_aint_t) 0;
+ dw.sint[ERTS_DW_AINT_HIGH_WORD] = (erts_aint_t) 0;
+ erts_dw_atomic_init_nob((erts_dw_atomic_t *) var, &dw);
+}
+
+#else
+# error :-/
+#endif
+
+#define ERTS_GET_AVG_MAX_UNLOCKED_TRY 50
+#define ERTS_SCHED_AVG_UTIL_WRITE_MARKER (~((Uint64) 0))
+
+/* Intervals in nanoseconds */
+#define ERTS_SCHED_UTIL_SHORT_INTERVAL ((Uint64) 1*1000*1000*1000)
+#define ERTS_SCHED_UTIL_LONG_INTERVAL ((Uint64) 10*1000*1000*1000)
+
+
+#define ERTS_SCHED_UTIL_IGNORE_IMBALANCE_DIFF 5000 /* ppm */
+
+static ERTS_INLINE Uint64
+calc_sched_worktime(int is_working, Uint64 now, Uint64 last,
+ Uint64 interval, Uint64 old_worktime)
+{
+ Uint64 worktime;
+ Uint64 new;
+
+ if (now <= last)
+ return old_worktime;
+
+ new = now - last;
+
+ if (new >= interval)
+ return is_working ? interval : (Uint64) 0;
+
+
+ /*
+ * Division by 1000 in order to avoid
+ * overflow. If changed update assertions
+ * in init_runq_sched_util().
+ */
+ worktime = old_worktime;
+ worktime *= (interval - new)/1000;
+ worktime /= (interval/1000);
+ if (is_working)
+ worktime += new;
+
+ ASSERT(0 <= worktime && worktime <= interval);
+
+ return worktime;
+}
+
+static ERTS_INLINE void
+update_avg_sched_util(ErtsSchedulerData *esdp, Uint64 now, int is_working)
+{
+ ErtsRunQueue *rq;
+ int worked;
+ Uint64 swt, lwt, last;
+
+ rq = esdp->run_queue;
+ last = aschedtime_read(&rq->sched_util.last);
+
+ if (now <= last) {
+ ASSERT(last == ERTS_SCHED_AVG_UTIL_WRITE_MARKER);
+ return;
+ }
+
+ ASSERT(now >= last);
+
+ worked = rq->sched_util.is_working;
+
+ swt = calc_sched_worktime(worked, now, last, ERTS_SCHED_UTIL_SHORT_INTERVAL,
+ rq->sched_util.worktime.short_interval);
+ lwt = calc_sched_worktime(worked, now, last, ERTS_SCHED_UTIL_LONG_INTERVAL,
+ rq->sched_util.worktime.long_interval);
+
+ aschedtime_set(&rq->sched_util.last, ERTS_SCHED_AVG_UTIL_WRITE_MARKER);
+ ERTS_THR_WRITE_MEMORY_BARRIER;
+ rq->sched_util.is_working = is_working;
+ rq->sched_util.worktime.short_interval = swt;
+ rq->sched_util.worktime.long_interval = lwt;
+ ERTS_THR_WRITE_MEMORY_BARRIER;
+ aschedtime_set(&rq->sched_util.last, now);
+}
+
+int
+erts_get_sched_util(ErtsRunQueue *rq, int initially_locked, int short_interval)
+{
+ /* Average scheduler utilization in ppm */
+ int util, is_working, try = 0, locked = initially_locked;
+ Uint64 worktime, old_worktime, now, last, interval, *old_worktimep;
+
+ if (short_interval) {
+ old_worktimep = &rq->sched_util.worktime.short_interval;
+ interval = ERTS_SCHED_UTIL_SHORT_INTERVAL;
+ }
+ else {
+ old_worktimep = &rq->sched_util.worktime.long_interval;
+ interval = ERTS_SCHED_UTIL_LONG_INTERVAL;
+ }
+
+ while (1) {
+ Uint64 chk_last;
+ last = aschedtime_read(&rq->sched_util.last);
+ ERTS_THR_READ_MEMORY_BARRIER;
+ is_working = rq->sched_util.is_working;
+ old_worktime = *old_worktimep;
+ ERTS_THR_READ_MEMORY_BARRIER;
+ chk_last = aschedtime_read(&rq->sched_util.last);
+ if (chk_last == last)
+ break;
+ if (!locked) {
+ if (++try >= ERTS_GET_AVG_MAX_UNLOCKED_TRY) {
+ /* Writer will eventually block on runq-lock */
+ erts_smp_runq_lock(rq);
+ locked = 1;
+ }
+ }
+ }
+
+ if (!initially_locked && locked)
+ erts_smp_runq_unlock(rq);
+
+ now = sched_wall_time_ts();
+ worktime = calc_sched_worktime(is_working, now, last, interval, old_worktime);
+
+ util = (int) ((worktime * 1000000)/interval);
+
+ ASSERT(0 <= util && util <= 1000000);
+
+ return util;
+}
+
+static void
+init_runq_sched_util(ErtsRunQueueSchedUtil *rqsu, int enabled)
+{
+ aschedtime_init(&rqsu->last);
+ if (!enabled)
+ aschedtime_set(&rqsu->last, ERTS_SCHED_AVG_UTIL_WRITE_MARKER);
+ rqsu->is_working = 0;
+ rqsu->worktime.short_interval = (Uint64) 0;
+ rqsu->worktime.long_interval = (Uint64) 0;
+
+#ifdef DEBUG
+ {
+ Uint64 intrvl;
+ /*
+ * If one of these asserts fail we may have
+ * overflow in calc_sched_worktime(). Which
+ * have to be fixed either by shrinking
+ * interval size, or fix calculation of
+ * worktime in calc_sched_worktime().
+ */
+ intrvl = ERTS_SCHED_UTIL_SHORT_INTERVAL;
+ ASSERT(intrvl*(intrvl/1000) > intrvl);
+ intrvl = ERTS_SCHED_UTIL_LONG_INTERVAL;
+ ASSERT(intrvl*(intrvl/1000) > intrvl);
+ }
+#endif
+}
+
+#endif /* ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT */
+
static ERTS_INLINE void
sched_wall_time_change(ErtsSchedulerData *esdp, int working)
{
- if (esdp->sched_wall_time.enabled) {
+ if (esdp->sched_wall_time.need) {
Uint64 ts = sched_wall_time_ts();
- if (working) {
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ update_avg_sched_util(esdp, ts, working);
+#endif
+ if (esdp->sched_wall_time.enabled) {
+ if (working) {
#ifdef DEBUG
- ASSERT(!esdp->sched_wall_time.working.currently);
- esdp->sched_wall_time.working.currently = 1;
+ ASSERT(!esdp->sched_wall_time.working.currently);
+ esdp->sched_wall_time.working.currently = 1;
#endif
- ts -= esdp->sched_wall_time.start;
- esdp->sched_wall_time.working.start = ts;
- }
- else {
+ ts -= esdp->sched_wall_time.start;
+ esdp->sched_wall_time.working.start = ts;
+ }
+ else {
#ifdef DEBUG
- ASSERT(esdp->sched_wall_time.working.currently);
- esdp->sched_wall_time.working.currently = 0;
+ ASSERT(esdp->sched_wall_time.working.currently);
+ esdp->sched_wall_time.working.currently = 0;
#endif
- ts -= esdp->sched_wall_time.start;
- ts -= esdp->sched_wall_time.working.start;
- esdp->sched_wall_time.working.total += ts;
+ ts -= esdp->sched_wall_time.start;
+ ts -= esdp->sched_wall_time.working.start;
+ esdp->sched_wall_time.working.total += ts;
+ }
}
}
}
@@ -656,12 +1009,17 @@ reply_sched_wall_time(void *vswtrp)
ErlHeapFragment *bp = NULL;
ASSERT(esdp);
-
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp));
+#endif
if (swtrp->set) {
- if (!swtrp->enable && esdp->sched_wall_time.enabled)
+ if (!swtrp->enable && esdp->sched_wall_time.enabled) {
+ esdp->sched_wall_time.need = erts_sched_balance_util;
esdp->sched_wall_time.enabled = 0;
+ }
else if (swtrp->enable && !esdp->sched_wall_time.enabled) {
Uint64 ts = sched_wall_time_ts();
+ esdp->sched_wall_time.need = 1;
esdp->sched_wall_time.enabled = 1;
esdp->sched_wall_time.start = ts;
esdp->sched_wall_time.working.total = 0;
@@ -737,6 +1095,9 @@ erts_sched_wall_time_request(Process *c_p, int set, int enable)
if (!set && !esdp->sched_wall_time.enabled)
return THE_NON_VALUE;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp));
+#endif
swtrp = swtreq_alloc();
ref = erts_make_ref(c_p);
@@ -991,6 +1352,9 @@ static ERTS_INLINE void
haw_thr_prgr_current_check_progress(ErtsAuxWorkData *awdp)
{
ErtsThrPrgrVal current = awdp->current_thr_prgr;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!awdp->esdp || !ERTS_SCHEDULER_IS_DIRTY(awdp->esdp));
+#endif
if (current != ERTS_THR_PRGR_INVALID
&& !erts_thr_progress_equal(current, erts_thr_progress_current())) {
/*
@@ -1007,6 +1371,10 @@ handle_delayed_aux_work_wakeup(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, in
{
int jix, max_jix;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!awdp->esdp || !ERTS_SCHEDULER_IS_DIRTY(awdp->esdp));
+#endif
+
ASSERT(awdp->delayed_wakeup.next != ERTS_DELAYED_WAKEUP_INFINITY);
if (!waiting && awdp->delayed_wakeup.next > awdp->esdp->reductions)
@@ -1162,6 +1530,9 @@ handle_misc_aux_work_thr_prgr(ErtsAuxWorkData *awdp,
erts_aint32_t aux_work,
int waiting)
{
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!awdp->esdp || !ERTS_SCHEDULER_IS_DIRTY(awdp->esdp));
+#endif
if (!erts_thr_progress_has_reached_this(haw_thr_prgr_current(awdp),
awdp->misc.thr_prgr))
return aux_work & ~ERTS_SSI_AUX_WORK_MISC_THR_PRGR;
@@ -1214,6 +1585,9 @@ erts_schedule_multi_misc_aux_work(int ignore_self,
if (ignore_self) {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp));
+#endif
if (esdp)
self = (int) esdp->no;
}
@@ -1243,6 +1617,9 @@ handle_async_ready(ErtsAuxWorkData *awdp,
int waiting)
{
ErtsSchedulerSleepInfo *ssi = awdp->ssi;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!awdp->esdp || !ERTS_SCHEDULER_IS_DIRTY(awdp->esdp));
+#endif
unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY);
if (erts_check_async_ready(awdp->async_ready.queue)) {
if (set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY)
@@ -1267,6 +1644,9 @@ handle_async_ready_clean(ErtsAuxWorkData *awdp,
{
void *thr_prgr_p;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!awdp->esdp || !ERTS_SCHEDULER_IS_DIRTY(awdp->esdp));
+#endif
#ifdef ERTS_SMP
if (awdp->async_ready.need_thr_prgr
&& !erts_thr_progress_has_reached_this(haw_thr_prgr_current(awdp),
@@ -1304,6 +1684,9 @@ handle_fix_alloc(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
ErtsSchedulerSleepInfo *ssi = awdp->ssi;
erts_aint32_t res;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!awdp->esdp || !ERTS_SCHEDULER_IS_DIRTY(awdp->esdp));
+#endif
unset_aux_work_flags(ssi, (ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM
| ERTS_SSI_AUX_WORK_FIX_ALLOC_DEALLOC));
aux_work &= ~(ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM
@@ -1323,7 +1706,7 @@ void
erts_alloc_notify_delayed_dealloc(int ix)
{
ErtsSchedulerData *esdp = erts_get_scheduler_data();
- if (esdp)
+ if (esdp && !ERTS_SCHEDULER_IS_DIRTY(esdp))
schedule_aux_work_wakeup(&esdp->aux_work_data,
ix,
ERTS_SSI_AUX_WORK_DD);
@@ -1337,7 +1720,7 @@ erts_alloc_ensure_handle_delayed_dealloc_call(int ix)
{
#ifdef DEBUG
ErtsSchedulerData *esdp = erts_get_scheduler_data();
- ASSERT(!esdp || ix == (int) esdp->no);
+ ASSERT(!esdp || (ERTS_SCHEDULER_IS_DIRTY(esdp) || ix == (int) esdp->no));
#endif
set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(ix-1),
ERTS_SSI_AUX_WORK_DD);
@@ -1351,6 +1734,9 @@ handle_delayed_dealloc(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waitin
ErtsThrPrgrVal wakeup = ERTS_THR_PRGR_INVALID;
int more_work = 0;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!awdp->esdp || !ERTS_SCHEDULER_IS_DIRTY(awdp->esdp));
+#endif
unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_DD);
erts_alloc_scheduler_handle_delayed_dealloc((void *) awdp->esdp,
&need_thr_progress,
@@ -1390,6 +1776,9 @@ handle_delayed_dealloc_thr_prgr(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, i
ErtsThrPrgrVal wakeup = ERTS_THR_PRGR_INVALID;
ErtsThrPrgrVal current = haw_thr_prgr_current(awdp);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!awdp->esdp || !ERTS_SCHEDULER_IS_DIRTY(awdp->esdp));
+#endif
if (!erts_thr_progress_has_reached_this(current, awdp->dd.thr_prgr))
return aux_work & ~ERTS_SSI_AUX_WORK_DD_THR_PRGR;
@@ -1437,6 +1826,9 @@ handle_thr_prgr_later_op(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int wait
int lops;
ErtsThrPrgrVal current = haw_thr_prgr_current(awdp);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!awdp->esdp || !ERTS_SCHEDULER_IS_DIRTY(awdp->esdp));
+#endif
for (lops = 0; lops < ERTS_MAX_THR_PRGR_LATER_OPS; lops++) {
ErtsThrPrgrLaterOp *lop = awdp->later_op.first;
if (!erts_thr_progress_has_reached_this(current, lop->later))
@@ -1595,6 +1987,14 @@ erts_smp_notify_check_children_needed(void)
for (i = 0; i < erts_no_schedulers; i++)
set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(i),
ERTS_SSI_AUX_WORK_CHECK_CHILDREN);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ for (i = 0; i < erts_no_dirty_cpu_schedulers; i++)
+ set_aux_work_flags_wakeup_nob(ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(i),
+ ERTS_SSI_AUX_WORK_CHECK_CHILDREN);
+ for (i = 0; i < erts_no_dirty_io_schedulers; i++)
+ set_aux_work_flags_wakeup_nob(ERTS_DIRTY_IO_SCHED_SLEEP_INFO_IX(i),
+ ERTS_SSI_AUX_WORK_CHECK_CHILDREN);
+#endif
}
static ERTS_INLINE erts_aint32_t
@@ -1942,6 +2342,9 @@ static ERTS_INLINE void
sched_active_sys(Uint no, ErtsRunQueue *rq)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_RUNQ_IX_IS_DIRTY(rq->ix));
+#endif
ASSERT(rq->waiting < 0);
rq->waiting *= -1;
rq->waiting--;
@@ -1977,14 +2380,24 @@ try_set_sys_scheduling(void)
#endif
static ERTS_INLINE int
-prepare_for_sys_schedule(void)
+prepare_for_sys_schedule(ErtsSchedulerData *esdp)
{
#ifdef ERTS_SMP
while (!erts_port_task_have_outstanding_io_tasks()
&& try_set_sys_scheduling()) {
- if (!erts_port_task_have_outstanding_io_tasks())
- return 1;
+#ifdef ERTS_SCHED_ONLY_POLL_SCHED_1
+ if (esdp->no != 1) {
+ /* If we are not scheduler 1 and ERTS_SCHED_ONLY_POLL_SCHED_1 is used
+ then we make sure to wake scheduler 1 */
+ ErtsRunQueue *rq = ERTS_RUNQ_IX(0);
clear_sys_scheduling();
+ wake_scheduler(rq);
+ return 0;
+ }
+#endif
+ if (!erts_port_task_have_outstanding_io_tasks())
+ return 1;
+ clear_sys_scheduling();
}
return 0;
#else
@@ -1998,6 +2411,9 @@ static ERTS_INLINE void
sched_change_waiting_sys_to_waiting(Uint no, ErtsRunQueue *rq)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_RUNQ_IX_IS_DIRTY(rq->ix));
+#endif
ASSERT(rq->waiting < 0);
rq->waiting *= -1;
}
@@ -2013,7 +2429,7 @@ sched_waiting(Uint no, ErtsRunQueue *rq)
else
rq->waiting++;
rq->woken = 0;
- if (erts_system_profile_flags.scheduler)
+ if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix) && erts_system_profile_flags.scheduler)
profile_scheduler(make_small(no), am_inactive);
}
@@ -2025,7 +2441,7 @@ sched_active(Uint no, ErtsRunQueue *rq)
rq->waiting++;
else
rq->waiting--;
- if (erts_system_profile_flags.scheduler)
+ if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix) && erts_system_profile_flags.scheduler)
profile_scheduler(make_small(no), am_active);
}
@@ -2037,10 +2453,9 @@ ongoing_multi_scheduling_block(void)
}
static ERTS_INLINE void
-empty_runq(ErtsRunQueue *rq)
+empty_runq_aux(ErtsRunQueue *rq, Uint32 old_flags)
{
- Uint32 old_flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED);
- if (old_flags & ERTS_RUNQ_FLG_NONEMPTY) {
+ if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix) && old_flags & ERTS_RUNQ_FLG_NONEMPTY) {
#ifdef DEBUG
erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues);
/*
@@ -2060,10 +2475,27 @@ empty_runq(ErtsRunQueue *rq)
}
static ERTS_INLINE void
+empty_runq(ErtsRunQueue *rq)
+{
+ Uint32 old_flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED);
+ empty_runq_aux(rq, old_flags);
+}
+
+static ERTS_INLINE Uint32
+empty_protected_runq(ErtsRunQueue *rq)
+{
+ Uint32 old_flags = ERTS_RUNQ_FLGS_BSET(rq,
+ ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED,
+ ERTS_RUNQ_FLG_PROTECTED);
+ empty_runq_aux(rq, old_flags);
+ return old_flags;
+}
+
+static ERTS_INLINE void
non_empty_runq(ErtsRunQueue *rq)
{
Uint32 old_flags = ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_NONEMPTY);
- if (!(old_flags & ERTS_RUNQ_FLG_NONEMPTY)) {
+ if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix) && (!(old_flags & ERTS_RUNQ_FLG_NONEMPTY))) {
#ifdef DEBUG
erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues);
/*
@@ -2083,6 +2515,18 @@ non_empty_runq(ErtsRunQueue *rq)
}
}
+void
+erts_empty_runq(ErtsRunQueue *rq)
+{
+ empty_runq(rq);
+}
+
+void
+erts_non_empty_runq(ErtsRunQueue *rq)
+{
+ non_empty_runq(rq);
+}
+
static erts_aint32_t
sched_prep_spin_wait(ErtsSchedulerSleepInfo *ssi)
{
@@ -2259,6 +2703,8 @@ aux_thread(void *unused)
erts_thr_progress_active(NULL, thr_prgr_active = 0);
erts_thr_progress_prepare_wait(NULL);
+ ERTS_SCHED_FAIR_YIELD();
+
flgs = sched_spin_wait(ssi, 0);
if (flgs & ERTS_SSI_FLG_SLEEPING) {
@@ -2296,18 +2742,37 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix))
+ erts_smp_spin_lock(&rq->sleepers.lock);
+#endif
flgs = sched_prep_spin_wait(ssi);
if (flgs & ERTS_SSI_FLG_SUSPENDED) {
/* Go suspend instead... */
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix))
+ erts_smp_spin_unlock(&rq->sleepers.lock);
+#endif
return;
}
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix)) {
+ ssi->prev = NULL;
+ ssi->next = rq->sleepers.list;
+ if (rq->sleepers.list)
+ rq->sleepers.list->prev = ssi;
+ rq->sleepers.list = ssi;
+ erts_smp_spin_unlock(&rq->sleepers.lock);
+ }
+#endif
+
/*
* If all schedulers are waiting, one of them *should*
* be waiting in erl_sys_schedule()
*/
- if (!prepare_for_sys_schedule()) {
+ if (ERTS_SCHEDULER_IS_DIRTY(esdp) || !prepare_for_sys_schedule(esdp)) {
sched_waiting(esdp->no, rq);
@@ -2317,30 +2782,35 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
tse_wait:
- if (thr_prgr_active != working)
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && thr_prgr_active != working)
sched_wall_time_change(esdp, thr_prgr_active);
while (1) {
aux_work = erts_atomic32_read_acqb(&ssi->aux_work);
if (aux_work) {
- if (!thr_prgr_active) {
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && !thr_prgr_active) {
erts_thr_progress_active(esdp, thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
aux_work = handle_aux_work(&esdp->aux_work_data, aux_work, 1);
- if (aux_work && erts_thr_progress_update(esdp))
+ if (aux_work && !ERTS_SCHEDULER_IS_DIRTY(esdp)
+ && erts_thr_progress_update(esdp))
erts_thr_progress_leader_update(esdp);
}
if (aux_work)
flgs = erts_smp_atomic32_read_acqb(&ssi->flags);
else {
- if (thr_prgr_active) {
- erts_thr_progress_active(esdp, thr_prgr_active = 0);
- sched_wall_time_change(esdp, 0);
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) {
+ if (thr_prgr_active) {
+ erts_thr_progress_active(esdp, thr_prgr_active = 0);
+ sched_wall_time_change(esdp, 0);
+ }
+ erts_thr_progress_prepare_wait(esdp);
}
- erts_thr_progress_prepare_wait(esdp);
+
+ ERTS_SCHED_FAIR_YIELD();
flgs = sched_spin_wait(ssi, spincount);
if (flgs & ERTS_SSI_FLG_SLEEPING) {
@@ -2355,7 +2825,8 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
} while (res == EINTR);
}
}
- erts_thr_progress_finalize_wait(esdp);
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp))
+ erts_thr_progress_finalize_wait(esdp);
}
if (!(flgs & ERTS_SSI_FLG_WAITING)) {
@@ -2376,7 +2847,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
if (flgs & ~ERTS_SSI_FLG_SUSPENDED)
erts_smp_atomic32_read_band_nob(&ssi->flags, ERTS_SSI_FLG_SUSPENDED);
- if (!thr_prgr_active) {
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && !thr_prgr_active) {
erts_thr_progress_active(esdp, thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
@@ -2393,6 +2864,13 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
erts_smp_atomic32_set_relb(&function_calls, 0);
*fcalls = 0;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp));
+#endif
+
+#ifdef ERTS_SCHED_ONLY_POLL_SCHED_1
+ ASSERT(esdp->no == 1);
+#endif
sched_waiting_sys(esdp->no, rq);
@@ -2413,7 +2891,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
sched_wall_time_change(esdp, working = 0);
ASSERT(!erts_port_task_have_outstanding_io_tasks());
-
erl_sys_schedule(1); /* Might give us something to do */
dt = erts_do_time_read_and_reset();
@@ -2459,7 +2936,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
* Got to check that we still got I/O tasks; otherwise
* we have to continue checking for I/O...
*/
- if (!prepare_for_sys_schedule()) {
+ if (!prepare_for_sys_schedule(esdp)) {
spincount *= ERTS_SCHED_TSE_SLEEP_SPINCOUNT_FACT;
goto tse_wait;
}
@@ -2481,7 +2958,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
* Got to check that we still got I/O tasks; otherwise
* we have to wait in erl_sys_schedule() after all...
*/
- if (!prepare_for_sys_schedule()) {
+ if (!prepare_for_sys_schedule(esdp)) {
/*
* Not allowed to wait in erl_sys_schedule;
* do tse wait instead...
@@ -2585,7 +3062,7 @@ ssi_flags_set_wake(ErtsSchedulerSleepInfo *ssi)
}
static void
-wake_scheduler(ErtsRunQueue *rq, int incq)
+wake_scheduler(ErtsRunQueue *rq)
{
ErtsSchedulerSleepInfo *ssi;
erts_aint32_t flgs;
@@ -2604,10 +3081,53 @@ wake_scheduler(ErtsRunQueue *rq, int incq)
flgs = ssi_flags_set_wake(ssi);
erts_sched_finish_poke(ssi, flgs);
+}
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+static void
+wake_dirty_schedulers(ErtsRunQueue *rq, int one)
+{
+ ErtsSchedulerSleepInfo *ssi;
+ ErtsSchedulerSleepList *sl;
+
+ ASSERT(ERTS_RUNQ_IX_IS_DIRTY(rq->ix));
+
+ sl = &rq->sleepers;
+ erts_smp_spin_lock(&sl->lock);
+ ssi = sl->list;
+ if (!ssi) {
+ erts_smp_spin_unlock(&sl->lock);
+ if (one)
+ wake_scheduler(rq);
+ } else if (one) {
+ erts_aint32_t flgs;
+ if (ssi->prev)
+ ssi->prev->next = ssi->next;
+ else {
+ ASSERT(sl->list == ssi);
+ sl->list = ssi->next;
+ }
+ if (ssi->next)
+ ssi->next->prev = ssi->prev;
+
+ erts_smp_spin_unlock(&sl->lock);
+
+ ERTS_THR_MEMORY_BARRIER;
+ flgs = ssi_flags_set_wake(ssi);
+ erts_sched_finish_poke(ssi, flgs);
+ } else {
+ sl->list = NULL;
+ erts_smp_spin_unlock(&sl->lock);
- if (incq && (flgs & ERTS_SSI_FLG_WAITING))
- non_empty_runq(rq);
+ ERTS_THR_MEMORY_BARRIER;
+ do {
+ ErtsSchedulerSleepInfo *wake_ssi = ssi;
+ ssi = ssi->next;
+ erts_sched_finish_poke(wake_ssi, ssi_flags_set_wake(wake_ssi));
+ } while (ssi);
+ }
}
+#endif
#define ERTS_NO_USED_RUNQS_SHIFT 16
#define ERTS_NO_RUNQS_MASK 0xffff
@@ -2697,7 +3217,7 @@ chk_wake_sched(ErtsRunQueue *crq, int ix, int activate)
if (try_inc_no_active_runqs(ix+1))
(void) ERTS_RUNQ_FLGS_UNSET(wrq, ERTS_RUNQ_FLG_INACTIVE);
}
- wake_scheduler(wrq, 0);
+ wake_scheduler(wrq);
return 1;
}
return 0;
@@ -2744,8 +3264,14 @@ static ERTS_INLINE void
smp_notify_inc_runq(ErtsRunQueue *runq)
{
#ifdef ERTS_SMP
- if (runq)
- wake_scheduler(runq, 1);
+ if (runq) {
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(runq->ix))
+ wake_dirty_schedulers(runq, 1);
+ else
+#endif
+ wake_scheduler(runq);
+ }
#endif
}
@@ -2763,7 +3289,7 @@ erts_sched_notify_check_cpu_bind(void)
for (ix = 0; ix < erts_no_run_queues; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
(void) ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_CHK_CPU_BIND);
- wake_scheduler(rq, 0);
+ wake_scheduler(rq);
}
#else
erts_sched_check_cpu_bind(erts_get_scheduler_data());
@@ -2848,7 +3374,7 @@ dequeue_process(ErtsRunQueue *runq, int prio_q, erts_aint32_t *statep)
if (statep)
*statep = state;
- prio = (int) (ERTS_PSFLG_PRIO_MASK & state);
+ prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state);
rqi = &runq->procs.prio_info[prio];
@@ -2891,6 +3417,11 @@ check_immigration_need(ErtsRunQueue *c_rq, ErtsMigrationPath *mp, int prio)
if (!f_rq)
return NULL;
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (mp->sched_util)
+ return NULL;
+#endif
+
f_rq_flags = ERTS_RUNQ_FLGS_GET(f_rq);
if (f_rq_flags & ERTS_RUNQ_FLG_PROTECTED)
return NULL;
@@ -2997,7 +3528,7 @@ immigrate(ErtsRunQueue *c_rq, ErtsMigrationPath *mp)
erts_aint32_t state;
state = erts_smp_atomic32_read_acqb(&proc->state);
if (!(ERTS_PSFLG_BOUND & state)
- && (prio == (int) (ERTS_PSFLG_PRIO_MASK & state))) {
+ && (prio == (int) ERTS_PSFLGS_GET_PRQ_PRIO(state))) {
ErtsRunQueueInfo *rqi = &rq->procs.prio_info[prio];
unqueue_process(rq, rpq, rqi, prio, prev_proc, proc);
erts_smp_runq_unlock(rq);
@@ -3030,10 +3561,11 @@ suspend_run_queue(ErtsRunQueue *rq)
ERTS_SSI_FLG_SUSPENDED);
(void) ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_SUSPENDED);
- wake_scheduler(rq, 0);
+ wake_scheduler(rq);
}
static void scheduler_ix_resume_wake(Uint ix);
+static void scheduler_ssi_resume_wake(ErtsSchedulerSleepInfo *ssi);
static ERTS_INLINE void
resume_run_queue(ErtsRunQueue *rq)
@@ -3060,7 +3592,10 @@ resume_run_queue(ErtsRunQueue *rq)
erts_smp_runq_unlock(rq);
- scheduler_ix_resume_wake(rq->ix);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix))
+#endif
+ scheduler_ix_resume_wake(rq->ix);
}
typedef struct {
@@ -3079,7 +3614,7 @@ schedule_bound_processes(ErtsRunQueue *rq,
while (proc) {
erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state);
next = proc->next;
- enqueue_process(rq, (int) (ERTS_PSFLG_PRIO_MASK & state), proc);
+ enqueue_process(rq, (int) ERTS_PSFLGS_GET_PRQ_PRIO(state), proc);
proc = next;
}
}
@@ -3091,20 +3626,28 @@ evacuate_run_queue(ErtsRunQueue *rq,
int prio_q;
ErtsRunQueue *to_rq;
ErtsMigrationPaths *mps;
- ErtsMigrationPath *mp;
+ ErtsMigrationPath *mp = NULL;
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
(void) ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
- mps = erts_get_migration_paths_managed();
- mp = &mps->mpath[rq->ix];
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix))
+#endif
+ {
+ mps = erts_get_migration_paths_managed();
+ mp = &mps->mpath[rq->ix];
+ }
/* Evacuate scheduled misc ops */
if (rq->misc.start) {
ErtsMiscOpList *start, *end;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_RUNQ_IX_IS_DIRTY(rq->ix));
+#endif
to_rq = mp->misc_evac_runq;
if (!to_rq)
return;
@@ -3122,6 +3665,9 @@ evacuate_run_queue(ErtsRunQueue *rq,
to_rq->misc.start = start;
to_rq->misc.end = end;
+
+ non_empty_runq(to_rq);
+
erts_smp_runq_unlock(to_rq);
smp_notify_inc_runq(to_rq);
erts_smp_runq_lock(to_rq);
@@ -3130,6 +3676,9 @@ evacuate_run_queue(ErtsRunQueue *rq,
if (rq->ports.start) {
Port *prt;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_RUNQ_IX_IS_DIRTY(rq->ix));
+#endif
to_rq = mp->prio[ERTS_PORT_PRIO_LEVEL].runq;
if (!to_rq)
return;
@@ -3165,15 +3714,26 @@ evacuate_run_queue(ErtsRunQueue *rq,
erts_aint32_t state;
Process *proc;
int notify = 0;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ int requeue;
+#endif
to_rq = NULL;
- if (!mp->prio[prio_q].runq)
- return;
- if (prio_q == PRIORITY_NORMAL && !mp->prio[PRIORITY_LOW].runq)
- return;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix))
+#endif
+ {
+ if (!mp->prio[prio_q].runq)
+ return;
+ if (prio_q == PRIORITY_NORMAL && !mp->prio[PRIORITY_LOW].runq)
+ return;
+ }
proc = dequeue_process(rq, prio_q, &state);
while (proc) {
+#ifdef ERTS_DIRTY_SCHEDULERS
+ requeue = 1;
+#endif
if (ERTS_PSFLG_BOUND & state) {
/* Bound processes get stuck here... */
proc->next = NULL;
@@ -3182,12 +3742,43 @@ evacuate_run_queue(ErtsRunQueue *rq,
else
sbpp->first = proc;
sbpp->last = proc;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ requeue = 0;
+#endif
+ }
+#ifdef ERTS_DIRTY_SCHEDULERS
+ else if (state & ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q) {
+ erts_aint32_t old;
+ old = erts_smp_atomic32_read_band_nob(&proc->state,
+ ~(ERTS_PSFLG_DIRTY_CPU_PROC
+ | ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q));
+ /* assert that no other dirty flags are set */
+ ASSERT(!(old & (ERTS_PSFLG_DIRTY_IO_PROC|ERTS_PSFLG_DIRTY_IO_PROC_IN_Q)));
+ } else if (state & ERTS_PSFLG_DIRTY_IO_PROC_IN_Q) {
+ erts_aint32_t old;
+ old = erts_smp_atomic32_read_band_nob(&proc->state,
+ ~(ERTS_PSFLG_DIRTY_IO_PROC
+ | ERTS_PSFLG_DIRTY_IO_PROC_IN_Q));
+ /* assert that no other dirty flags are set */
+ ASSERT(!(old & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q)));
}
+ if (requeue) {
+#else
else {
- int prio = (int) (ERTS_PSFLG_PRIO_MASK & state);
+#endif
+ int prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state);
erts_smp_runq_unlock(rq);
- to_rq = mp->prio[prio].runq;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix))
+ /*
+ * dirty run queues evacuate only to run
+ * queue 0 during multi-scheduling blocking
+ */
+ to_rq = ERTS_RUNQ_IX(0);
+ else
+#endif
+ to_rq = mp->prio[prio].runq;
RUNQ_SET_RQ(&proc->run_queue, to_rq);
erts_smp_runq_lock(to_rq);
@@ -3257,7 +3848,7 @@ try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq,
erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state);
if (!(ERTS_PSFLG_BOUND & state)) {
/* Steal process */
- int prio = (int) (ERTS_PSFLG_PRIO_MASK & state);
+ int prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state);
ErtsRunQueueInfo *rqi = &vrq->procs.prio_info[prio];
unqueue_process(vrq, rpq, rqi, prio, prev_proc, proc);
erts_smp_runq_unlock(vrq);
@@ -3334,7 +3925,7 @@ try_steal_task(ErtsRunQueue *rq)
Uint32 flags;
/* Protect jobs we steal from getting stolen from us... */
- flags = ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_PROTECTED);
+ flags = empty_protected_runq(rq);
if (flags & ERTS_RUNQ_FLG_SUSPENDED)
return 0; /* go suspend instead... */
@@ -3413,6 +4004,9 @@ typedef struct {
int full_reds_history_change;
int oowc;
int max_len;
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ int sched_util;
+#endif
} ErtsRunQueueBalance;
static ErtsRunQueueBalance *run_queue_info;
@@ -3576,6 +4170,9 @@ check_balance(ErtsRunQueue *c_rq)
Sint64 scheds_reds, full_scheds_reds;
int forced, active, current_active, oowc, half_full_scheds, full_scheds,
mmax_len, blnc_no_rqs, qix, pix, freds_hist_ix;
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ int sched_util_balancing;
+#endif
if (erts_smp_atomic32_xchg_nob(&balance_info.checking_balance, 1)) {
c_rq->check_balance_reds = INT_MAX;
@@ -3631,6 +4228,10 @@ check_balance(ErtsRunQueue *c_rq)
return;
}
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ sched_util_balancing = 0;
+#endif
+
freds_hist_ix = balance_info.full_reds_history_index;
balance_info.full_reds_history_index++;
if (balance_info.full_reds_history_index >= ERTS_FULL_REDS_HISTORY_SIZE)
@@ -3661,7 +4262,12 @@ check_balance(ErtsRunQueue *c_rq)
run_queue_info[qix].oowc = rq->out_of_work_count;
run_queue_info[qix].max_len = rq->max_len;
rq->check_balance_reds = INT_MAX;
-
+
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (erts_sched_balance_util)
+ run_queue_info[qix].sched_util = erts_get_sched_util(rq, 1, 0);
+#endif
+
erts_smp_runq_unlock(rq);
}
@@ -3731,8 +4337,38 @@ check_balance(ErtsRunQueue *c_rq)
mmax_len = run_queue_info[qix].max_len;
}
- if (!erts_sched_compact_load)
+ if (!erts_sched_compact_load) {
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (erts_sched_balance_util && full_scheds < blnc_no_rqs) {
+ int avg_util = 0;
+
+ for (qix = 0; qix < blnc_no_rqs; qix++)
+ avg_util += run_queue_info[qix].sched_util;
+
+ avg_util /= blnc_no_rqs; /* in ppm */
+
+ sched_util_balancing = 1;
+ /*
+ * In order to avoid renaming a large amount of fields
+ * we write utilization values instead of lenght values
+ * in the 'max_len' and 'migration_limit' fields...
+ */
+ for (qix = 0; qix < blnc_no_rqs; qix++) {
+ run_queue_info[qix].flags = 0; /* Reset for later use... */
+ for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
+ run_queue_info[qix].prio[pix].emigrate_to = -1;
+ run_queue_info[qix].prio[pix].immigrate_from = -1;
+ run_queue_info[qix].prio[pix].avail = 100;
+ run_queue_info[qix].prio[pix].max_len = run_queue_info[qix].sched_util;
+ run_queue_info[qix].prio[pix].migration_limit = avg_util;
+ }
+ }
+ active = blnc_no_rqs;
+ goto setup_migration_paths;
+ }
+#endif
goto all_active;
+ }
if (!forced && half_full_scheds != blnc_no_rqs) {
int min = 1;
@@ -3849,15 +4485,30 @@ check_balance(ErtsRunQueue *c_rq)
}
}
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ setup_migration_paths:
+#endif
+
/* Setup migration paths for all priorities */
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
int low = 0, high = 0;
for (qix = 0; qix < blnc_no_rqs; qix++) {
int len_diff = run_queue_info[qix].prio[pix].max_len;
len_diff -= run_queue_info[qix].prio[pix].migration_limit;
+
#ifdef DBG_PRINT
if (pix == 2) erts_fprintf(stderr, "%d ", len_diff);
#endif
+
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (sched_util_balancing
+ && -ERTS_SCHED_UTIL_IGNORE_IMBALANCE_DIFF <= len_diff
+ && len_diff <= ERTS_SCHED_UTIL_IGNORE_IMBALANCE_DIFF) {
+ /* ignore minor imbalance */
+ len_diff = 0;
+ }
+#endif
+
run_queue_compare[qix].qix = qix;
run_queue_compare[qix].len = len_diff;
if (len_diff != 0) {
@@ -3984,6 +4635,9 @@ erts_fprintf(stderr, "--------------------------------\n");
Uint32 flags = run_queue_info[qix].flags;
ErtsMigrationPath *mp = &new_mpaths->mpath[qix];
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ mp->sched_util = sched_util_balancing;
+#endif
mp->flags = flags;
mp->misc_evac_runq = NULL;
@@ -4238,13 +4892,20 @@ wakeup_other_check(ErtsRunQueue *rq, Uint32 flags)
rq->wakeup_other += (left_len*wo_reds
+ ERTS_WAKEUP_OTHER_FIXED_INC);
if (rq->wakeup_other > wakeup_other.limit) {
- int empty_rqs =
- erts_smp_atomic32_read_acqb(&no_empty_run_queues);
- if (flags & ERTS_RUNQ_FLG_PROTECTED)
- (void) ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
- if (empty_rqs != 0)
- wake_scheduler_on_empty_runq(rq);
- rq->wakeup_other = 0;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix) && rq->waiting)
+ wake_dirty_schedulers(rq, 1);
+ else
+#endif
+ {
+ int empty_rqs =
+ erts_smp_atomic32_read_acqb(&no_empty_run_queues);
+ if (flags & ERTS_RUNQ_FLG_PROTECTED)
+ (void) ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
+ if (empty_rqs != 0)
+ wake_scheduler_on_empty_runq(rq);
+ rq->wakeup_other = 0;
+ }
}
}
rq->wakeup_other_reds = 0;
@@ -4405,11 +5066,17 @@ erts_early_init_scheduling(int no_schedulers)
wakeup_other.threshold = ERTS_SCHED_WAKEUP_OTHER_THRESHOLD_MEDIUM;
wakeup_other.type = ERTS_SCHED_WAKEUP_OTHER_TYPE_DEFAULT;
#endif
+#ifndef ERTS_SCHED_MIN_SPIN
sched_busy_wait.sys_schedule = ERTS_SCHED_SYS_SLEEP_SPINCOUNT_MEDIUM;
sched_busy_wait.tse = (ERTS_SCHED_SYS_SLEEP_SPINCOUNT_MEDIUM
* ERTS_SCHED_TSE_SLEEP_SPINCOUNT_FACT);
sched_busy_wait.aux_work = (ERTS_SCHED_SYS_SLEEP_SPINCOUNT_MEDIUM
* ERTS_SCHED_AUX_WORK_SLEEP_SPINCOUNT_FACT_MEDIUM);
+#else
+ sched_busy_wait.sys_schedule = ERTS_SCHED_SYS_SLEEP_SPINCOUNT_NONE;
+ sched_busy_wait.tse = ERTS_SCHED_SYS_SLEEP_SPINCOUNT_NONE;
+ sched_busy_wait.aux_work = ERTS_SCHED_SYS_SLEEP_SPINCOUNT_NONE;
+#endif
}
int
@@ -4525,7 +5192,14 @@ erts_sched_set_wake_cleanup_threshold(char *str)
static void
init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp, char *dawwp)
{
- awdp->sched_id = esdp ? (int) esdp->no : 0;
+ if (!esdp)
+ awdp->sched_id = 0;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ else if (ERTS_SCHEDULER_IS_DIRTY(esdp))
+ awdp->sched_id = (int) ERTS_DIRTY_SCHEDULER_NO(esdp);
+#endif
+ else
+ awdp->sched_id = (int) esdp->no;
awdp->esdp = esdp;
awdp->ssi = esdp ? esdp->ssi : NULL;
#ifdef ERTS_SMP
@@ -4565,41 +5239,122 @@ init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp, char *dawwp)
#endif
}
+static void
+init_scheduler_data(ErtsSchedulerData* esdp, int num,
+ ErtsSchedulerSleepInfo* ssi,
+ ErtsRunQueue* runq,
+ char** daww_ptr, size_t daww_sz)
+{
+#ifdef ERTS_SMP
+ erts_bits_init_state(&esdp->erl_bits_state);
+ esdp->match_pseudo_process = NULL;
+ esdp->free_process = NULL;
+#endif
+ esdp->x_reg_array =
+ erts_alloc_permanent_cache_aligned(ERTS_ALC_T_BEAM_REGISTER,
+ ERTS_X_REGS_ALLOCATED *
+ sizeof(Eterm));
+ esdp->f_reg_array =
+ erts_alloc_permanent_cache_aligned(ERTS_ALC_T_BEAM_REGISTER,
+ MAX_REG * sizeof(FloatDef));
+#if !HEAP_ON_C_STACK
+ esdp->num_tmp_heap_used = 0;
+#endif
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(runq->ix)) {
+ esdp->no = 0;
+ ERTS_DIRTY_SCHEDULER_NO(esdp) = (Uint) num;
+ }
+ else {
+ esdp->no = (Uint) num;
+ ERTS_DIRTY_SCHEDULER_NO(esdp) = 0;
+ }
+#else
+ esdp->no = (Uint) num;
+#endif
+ esdp->ssi = ssi;
+ esdp->current_process = NULL;
+ esdp->current_port = NULL;
+
+ esdp->virtual_reds = 0;
+ esdp->cpu_id = -1;
+
+ erts_init_atom_cache_map(&esdp->atom_cache_map);
+
+ esdp->run_queue = runq;
+ esdp->run_queue->scheduler = esdp;
+
+ if (daww_ptr) {
+ init_aux_work_data(&esdp->aux_work_data, esdp, *daww_ptr);
+#ifdef ERTS_SMP
+ *daww_ptr += daww_sz;
+#endif
+ }
+
+ esdp->reductions = 0;
+
+ init_sched_wall_time(&esdp->sched_wall_time);
+ erts_port_task_handle_init(&esdp->nosuspend_port_task_handle);
+}
+
void
-erts_init_scheduling(int no_schedulers, int no_schedulers_online)
+erts_init_scheduling(int no_schedulers, int no_schedulers_online
+#ifdef ERTS_DIRTY_SCHEDULERS
+ , int no_dirty_cpu_schedulers, int no_dirty_cpu_schedulers_online,
+ int no_dirty_io_schedulers
+#endif
+ )
{
int ix, n, no_ssi;
char *daww_ptr;
-#ifdef ERTS_SMP
size_t daww_sz;
-#endif
+ size_t size_runqs;
init_misc_op_list_alloc();
+ init_proc_sys_task_queues_alloc();
#ifdef ERTS_SMP
set_wakeup_other_data();
#endif
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ if (erts_sched_balance_util)
+ erts_sched_compact_load = 0;
+#endif
+
ASSERT(no_schedulers_online <= no_schedulers);
ASSERT(no_schedulers_online >= 1);
ASSERT(no_schedulers >= 1);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(no_dirty_cpu_schedulers <= no_schedulers);
+ ASSERT(no_dirty_cpu_schedulers >= 1);
+ ASSERT(no_dirty_cpu_schedulers_online <= no_schedulers_online);
+ ASSERT(no_dirty_cpu_schedulers_online >= 1);
+#endif
/* Create and initialize run queues */
n = no_schedulers;
-
- erts_aligned_run_queues =
- erts_alloc_permanent_cache_aligned(ERTS_ALC_T_RUNQS,
- sizeof(ErtsAlignedRunQueue) * n);
+ size_runqs = sizeof(ErtsAlignedRunQueue) * (n + ERTS_NUM_DIRTY_RUNQS);
+ erts_aligned_run_queues =
+ erts_alloc_permanent_cache_aligned(ERTS_ALC_T_RUNQS, size_runqs);
#ifdef ERTS_SMP
+#ifdef ERTS_DIRTY_SCHEDULERS
+ erts_aligned_run_queues += ERTS_NUM_DIRTY_RUNQS;
+#endif
erts_smp_atomic32_init_nob(&no_empty_run_queues, 0);
#endif
erts_no_run_queues = n;
- for (ix = 0; ix < n; ix++) {
+ for (ix = -(ERTS_NUM_DIRTY_RUNQS); ix < n; ix++) {
int pix, rix;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ErtsRunQueue *rq = ERTS_RUNQ_IX_IS_DIRTY(ix) ?
+ ERTS_DIRTY_RUNQ_IX(ix) : ERTS_RUNQ_IX(ix);
+#else
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
+#endif
rq->ix = ix;
@@ -4610,6 +5365,14 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
erts_smp_mtx_init_x(&rq->mtx, "run_queue", make_small(ix + 1));
erts_smp_cnd_init(&rq->cnd);
+#ifdef ERTS_DIRTY_SCHEDULERS
+#ifdef ERTS_SMP
+ if (ERTS_RUNQ_IX_IS_DIRTY(ix))
+ erts_smp_spinlock_init(&rq->sleepers.lock, "dirty_run_queue_sleep_list");
+ rq->sleepers.list = NULL;
+#endif
+#endif
+
rq->waiting = 0;
rq->woken = 0;
ERTS_RUNQ_FLGS_INIT(rq, ERTS_RUNQ_FLG_NONEMPTY);
@@ -4648,6 +5411,11 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
rq->ports.info.reds = 0;
rq->ports.start = NULL;
rq->ports.end = NULL;
+
+#if ERTS_HAVE_SCHED_UTIL_BALANCING_SUPPORT
+ init_runq_sched_util(&rq->sched_util, erts_sched_balance_util);
+#endif
+
}
#ifdef ERTS_SMP
@@ -4665,6 +5433,10 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
n = (int) no_schedulers;
erts_no_schedulers = n;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ erts_no_dirty_cpu_schedulers = no_dirty_cpu_schedulers;
+ erts_no_dirty_io_schedulers = no_dirty_io_schedulers;
+#endif
/* Create and initialize scheduler sleep info */
#ifdef ERTS_SMP
@@ -4691,6 +5463,29 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
#ifdef ERTS_SMP
aligned_sched_sleep_info++;
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+ aligned_dirty_cpu_sched_sleep_info =
+ erts_alloc_permanent_cache_aligned(
+ ERTS_ALC_T_SCHDLR_SLP_INFO,
+ no_dirty_cpu_schedulers*sizeof(ErtsAlignedSchedulerSleepInfo));
+ for (ix = 0; ix < no_dirty_cpu_schedulers; ix++) {
+ ErtsSchedulerSleepInfo *ssi = &aligned_dirty_cpu_sched_sleep_info[ix].ssi;
+ erts_smp_atomic32_init_nob(&ssi->flags, 0);
+ ssi->event = NULL; /* initialized in sched_dirty_cpu_thread_func */
+ erts_atomic32_init_nob(&ssi->aux_work, 0);
+ }
+ aligned_dirty_io_sched_sleep_info =
+ erts_alloc_permanent_cache_aligned(
+ ERTS_ALC_T_SCHDLR_SLP_INFO,
+ no_dirty_io_schedulers*sizeof(ErtsAlignedSchedulerSleepInfo));
+ for (ix = 0; ix < no_dirty_io_schedulers; ix++) {
+ ErtsSchedulerSleepInfo *ssi = &aligned_dirty_io_sched_sleep_info[ix].ssi;
+ erts_smp_atomic32_init_nob(&ssi->flags, 0);
+ ssi->event = NULL; /* initialized in sched_dirty_io_thread_func */
+ erts_atomic32_init_nob(&ssi->aux_work, 0);
+ }
+#endif
#endif
/* Create and initialize scheduler specific data */
@@ -4701,6 +5496,7 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
daww_ptr = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_SCHDLR_DATA,
daww_sz*n);
#else
+ daww_sz = 0;
daww_ptr = NULL;
#endif
@@ -4710,45 +5506,32 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
for (ix = 0; ix < n; ix++) {
ErtsSchedulerData *esdp = ERTS_SCHEDULER_IX(ix);
-#ifdef ERTS_SMP
- erts_bits_init_state(&esdp->erl_bits_state);
- esdp->match_pseudo_process = NULL;
- esdp->free_process = NULL;
-#endif
- esdp->x_reg_array =
- erts_alloc_permanent_cache_aligned(ERTS_ALC_T_BEAM_REGISTER,
- ERTS_X_REGS_ALLOCATED *
- sizeof(Eterm));
- esdp->f_reg_array =
- erts_alloc_permanent_cache_aligned(ERTS_ALC_T_BEAM_REGISTER,
- MAX_REG * sizeof(FloatDef));
-#if !HEAP_ON_C_STACK
- esdp->num_tmp_heap_used = 0;
-#endif
- esdp->no = (Uint) ix+1;
- esdp->ssi = ERTS_SCHED_SLEEP_INFO_IX(ix);
- esdp->current_process = NULL;
- esdp->current_port = NULL;
-
- esdp->virtual_reds = 0;
- esdp->cpu_id = -1;
-
- erts_init_atom_cache_map(&esdp->atom_cache_map);
-
- esdp->run_queue = ERTS_RUNQ_IX(ix);
- esdp->run_queue->scheduler = esdp;
+ init_scheduler_data(esdp, ix+1, ERTS_SCHED_SLEEP_INFO_IX(ix),
+ ERTS_RUNQ_IX(ix), &daww_ptr, daww_sz);
+ }
- init_aux_work_data(&esdp->aux_work_data, esdp, daww_ptr);
+#ifdef ERTS_DIRTY_SCHEDULERS
#ifdef ERTS_SMP
- daww_ptr += daww_sz;
-#endif
-
- esdp->reductions = 0;
-
- init_sched_wall_time(&esdp->sched_wall_time);
- erts_port_task_handle_init(&esdp->nosuspend_port_task_handle);
-
+ erts_aligned_dirty_cpu_scheduler_data =
+ erts_alloc_permanent_cache_aligned(
+ ERTS_ALC_T_SCHDLR_DATA,
+ no_dirty_cpu_schedulers*sizeof(ErtsAlignedSchedulerData));
+ for (ix = 0; ix < no_dirty_cpu_schedulers; ix++) {
+ ErtsSchedulerData *esdp = ERTS_DIRTY_CPU_SCHEDULER_IX(ix);
+ init_scheduler_data(esdp, ix+1, ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(ix),
+ ERTS_DIRTY_CPU_RUNQ, NULL, 0);
+ }
+ erts_aligned_dirty_io_scheduler_data =
+ erts_alloc_permanent_cache_aligned(
+ ERTS_ALC_T_SCHDLR_DATA,
+ no_dirty_io_schedulers*sizeof(ErtsAlignedSchedulerData));
+ for (ix = 0; ix < no_dirty_io_schedulers; ix++) {
+ ErtsSchedulerData *esdp = ERTS_DIRTY_IO_SCHEDULER_IX(ix);
+ init_scheduler_data(esdp, ix+1, ERTS_DIRTY_IO_SCHED_SLEEP_INFO_IX(ix),
+ ERTS_DIRTY_IO_RUNQ, NULL, 0);
}
+#endif
+#endif
init_misc_aux_work();
#if !HALFWORD_HEAP
@@ -4772,6 +5555,16 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
schdlr_sspnd.curr_online = no_schedulers;
schdlr_sspnd.msb.ongoing = 0;
erts_smp_atomic32_init_nob(&schdlr_sspnd.active, no_schedulers);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ erts_smp_atomic32_init_nob(&schdlr_sspnd.dirty_cpu_changing, 0);
+ schdlr_sspnd.dirty_cpu_online = no_dirty_cpu_schedulers_online;
+ schdlr_sspnd.dirty_cpu_curr_online = no_dirty_cpu_schedulers;
+ erts_smp_atomic32_init_nob(&schdlr_sspnd.dirty_cpu_active, no_dirty_cpu_schedulers);
+ erts_smp_atomic32_init_nob(&schdlr_sspnd.dirty_io_changing, 0);
+ schdlr_sspnd.dirty_io_online = no_dirty_io_schedulers;
+ schdlr_sspnd.dirty_io_curr_online = no_dirty_io_schedulers;
+ erts_smp_atomic32_init_nob(&schdlr_sspnd.dirty_io_active, no_dirty_io_schedulers);
+#endif
schdlr_sspnd.msb.procs = NULL;
init_no_runqs(no_schedulers_online, no_schedulers_online);
balance_info.last_active_runqs = no_schedulers;
@@ -4797,6 +5590,21 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
schdlr_sspnd.curr_online *= 2; /* Boot strapping... */
ERTS_SCHDLR_SSPND_CHNG_SET((ERTS_SCHDLR_SSPND_CHNG_ONLN
| ERTS_SCHDLR_SSPND_CHNG_WAITER), 0);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ schdlr_sspnd.dirty_cpu_wait_curr_online = no_dirty_cpu_schedulers_online;
+ schdlr_sspnd.dirty_cpu_curr_online *= 2;
+ ERTS_SCHDLR_SSPND_DIRTY_CPU_CHNG_SET((ERTS_SCHDLR_SSPND_CHNG_ONLN
+ | ERTS_SCHDLR_SSPND_CHNG_WAITER), 0);
+ for (ix = no_dirty_cpu_schedulers_online; ix < no_dirty_cpu_schedulers; ix++) {
+ ErtsSchedulerData* esdp = ERTS_DIRTY_CPU_SCHEDULER_IX(ix);
+ erts_smp_atomic32_read_bor_nob(&esdp->ssi->flags, ERTS_SSI_FLG_SUSPENDED);
+ }
+
+ schdlr_sspnd.dirty_io_wait_curr_online = no_dirty_io_schedulers;
+ schdlr_sspnd.dirty_io_curr_online *= 2;
+ ERTS_SCHDLR_SSPND_DIRTY_IO_CHNG_SET((ERTS_SCHDLR_SSPND_CHNG_ONLN
+ | ERTS_SCHDLR_SSPND_CHNG_WAITER), 0);
+#endif
erts_smp_atomic32_init_nob(&doing_sys_schedule, 0);
@@ -4812,6 +5620,10 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
#endif
}
erts_no_schedulers = 1;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ erts_no_dirty_cpu_schedulers = 0;
+ erts_no_dirty_io_schedulers = 0;
+#endif
#endif
erts_smp_atomic32_init_nob(&function_calls, 0);
@@ -4858,76 +5670,293 @@ erts_get_scheduler_data(void)
#endif
+static Process *
+make_proxy_proc(Process *prev_proxy, Process *proc, erts_aint32_t prio)
+{
+ erts_aint32_t state;
+ Process *proxy;
+#ifdef ERTS_SMP
+ ErtsRunQueue *rq = RUNQ_READ_RQ(&proc->run_queue);
+#endif
+
+ state = (ERTS_PSFLG_PROXY
+ | ERTS_PSFLG_IN_RUNQ
+ | (((erts_aint32_t) 1) << (prio + ERTS_PSFLGS_IN_PRQ_MASK_OFFSET))
+ | (prio << ERTS_PSFLGS_PRQ_PRIO_OFFSET)
+ | (prio << ERTS_PSFLGS_USR_PRIO_OFFSET)
+ | (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET));
+
+ if (prev_proxy) {
+ proxy = prev_proxy;
+ ASSERT(erts_smp_atomic32_read_nob(&proxy->state) & ERTS_PSFLG_PROXY);
+ erts_smp_atomic32_set_nob(&proxy->state, state);
+#ifdef ERTS_SMP
+ RUNQ_SET_RQ(&proc->run_queue, rq);
+#endif
+ }
+ else {
+ proxy = erts_alloc(ERTS_ALC_T_PROC, sizeof(Process));
+#ifdef DEBUG
+ {
+ int i;
+ Uint32 *ui32 = (Uint32 *) (char *) proxy;
+ for (i = 0; i < sizeof(Process)/sizeof(Uint32); i++)
+ ui32[i] = (Uint32) 0xdeadbeef;
+ }
+#endif
+ erts_smp_atomic32_init_nob(&proxy->state, state);
+#ifdef ERTS_SMP
+ erts_smp_atomic_init_nob(&proxy->run_queue,
+ erts_smp_atomic_read_nob(&proc->run_queue));
+#endif
+ }
+
+ proxy->common.id = proc->common.id;
+
+ return proxy;
+}
+
+static ERTS_INLINE void
+free_proxy_proc(Process *proxy)
+{
+ ASSERT(erts_smp_atomic32_read_nob(&proxy->state) & ERTS_PSFLG_PROXY);
+ erts_free(ERTS_ALC_T_PROC, proxy);
+}
+
+#define ERTS_ENQUEUE_NOT 0
+#define ERTS_ENQUEUE_NORMAL_QUEUE 1
+#ifdef ERTS_DIRTY_SCHEDULERS
+#define ERTS_ENQUEUE_DIRTY_CPU_QUEUE 2
+#define ERTS_ENQUEUE_DIRTY_IO_QUEUE 3
+#endif
+
+static ERTS_INLINE int
+check_enqueue_in_prio_queue(Process *c_p,
+ erts_aint32_t *prq_prio_p,
+ erts_aint32_t *newp,
+ erts_aint32_t actual)
+{
+ erts_aint32_t aprio, qbit, max_qbit;
+
+ aprio = (actual >> ERTS_PSFLGS_ACT_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK;
+ qbit = 1 << aprio;
+
+ *prq_prio_p = aprio;
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (actual & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC)) {
+ /*
+ * If we have system tasks of a priority higher
+ * or equal to the user priority, we enqueue
+ * on ordinary run-queue and take care of
+ * those system tasks first.
+ */
+ if (actual & ERTS_PSFLG_ACTIVE_SYS) {
+ erts_aint32_t uprio, stprio, qmask;
+ uprio = (actual >> ERTS_PSFLGS_USR_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK;
+ if (aprio < uprio)
+ goto enqueue_normal_runq; /* system tasks with higher prio */
+ erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS);
+ qmask = c_p->sys_task_qs->qmask;
+ erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS);
+ switch (qmask & -qmask) {
+ case MAX_BIT:
+ stprio = PRIORITY_MAX;
+ break;
+ case HIGH_BIT:
+ stprio = PRIORITY_HIGH;
+ break;
+ case NORMAL_BIT:
+ stprio = PRIORITY_NORMAL;
+ break;
+ case LOW_BIT:
+ stprio = PRIORITY_LOW;
+ break;
+ default:
+ stprio = PRIORITY_LOW+1;
+ break;
+ }
+ if (stprio <= uprio)
+ goto enqueue_normal_runq; /* system tasks with higher prio */
+ }
+
+ /* Enqueue in dirty run queue if not already enqueued */
+ if (actual & (ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q|ERTS_PSFLG_DIRTY_IO_PROC_IN_Q))
+ return ERTS_ENQUEUE_NOT; /* already in queue */
+ if (actual & ERTS_PSFLG_DIRTY_CPU_PROC) {
+ *newp |= ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q;
+ if (actual & ERTS_PSFLG_IN_RUNQ)
+ return -ERTS_ENQUEUE_DIRTY_CPU_QUEUE; /* use proxy */
+ *newp |= ERTS_PSFLG_IN_RUNQ;
+ return ERTS_ENQUEUE_DIRTY_CPU_QUEUE;
+ }
+ *newp |= ERTS_PSFLG_DIRTY_IO_PROC_IN_Q;
+ if (actual & ERTS_PSFLG_IN_RUNQ)
+ return -ERTS_ENQUEUE_DIRTY_IO_QUEUE; /* use proxy */
+ *newp |= ERTS_PSFLG_IN_RUNQ;
+ return ERTS_ENQUEUE_DIRTY_IO_QUEUE;
+ }
+
+ enqueue_normal_runq:
+#endif
+ max_qbit = (actual >> ERTS_PSFLGS_IN_PRQ_MASK_OFFSET) & ERTS_PSFLGS_QMASK;
+ max_qbit |= 1 << ERTS_PSFLGS_QMASK_BITS;
+ max_qbit &= -max_qbit;
+ /*
+ * max_qbit now either contain bit set for highest prio queue or a bit
+ * out of range (which will have a value larger than valid range).
+ */
+
+ if (qbit >= max_qbit)
+ return ERTS_ENQUEUE_NOT; /* Already queued in higher or equal prio */
+
+ /* Need to enqueue (if already enqueued, it is in lower prio) */
+ *newp |= qbit << ERTS_PSFLGS_IN_PRQ_MASK_OFFSET;
+
+ if ((actual & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLGS_USR_PRIO_MASK))
+ != (aprio << ERTS_PSFLGS_USR_PRIO_OFFSET)) {
+ /*
+ * Process struct already enqueued, or actual prio not
+ * equal to user prio, i.e., enqueue using proxy.
+ */
+ return -ERTS_ENQUEUE_NORMAL_QUEUE;
+ }
+
+ /*
+ * Enqueue using process struct.
+ */
+ *newp &= ~ERTS_PSFLGS_PRQ_PRIO_MASK;
+ *newp |= ERTS_PSFLG_IN_RUNQ | (aprio << ERTS_PSFLGS_PRQ_PRIO_OFFSET);
+ return ERTS_ENQUEUE_NORMAL_QUEUE;
+}
+
/*
- * scheduler_out_process() return with c_rq locked.
+ * schedule_out_process() return with c_rq locked.
*/
static ERTS_INLINE int
-schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p)
+schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Process *proxy)
{
- erts_aint32_t a, e, n;
- int res = 0;
+ erts_aint32_t a, e, n, enq_prio = -1;
+ int enqueue; /* < 0 -> use proxy */
+ Process* sched_p;
+ ErtsRunQueue* runq;
+#ifdef ERTS_SMP
+ int check_emigration_need;
+#endif
a = state;
while (1) {
n = e = a;
- ASSERT(a & ERTS_PSFLG_RUNNING);
- ASSERT(!(a & ERTS_PSFLG_IN_RUNQ));
+ ASSERT(a & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS));
+
+ enqueue = ERTS_ENQUEUE_NOT;
- n &= ~ERTS_PSFLG_RUNNING;
- if ((a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE)
- n |= ERTS_PSFLG_IN_RUNQ;
+ n &= ~(ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS);
+ if (a & ERTS_PSFLG_ACTIVE_SYS
+ || (a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) {
+ enqueue = check_enqueue_in_prio_queue(p, &enq_prio, &n, a);
+ }
a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e);
if (a == e)
break;
}
- if (!(n & ERTS_PSFLG_IN_RUNQ)) {
- if (erts_system_profile_flags.runnable_procs)
- profile_runnable_proc(p, am_inactive);
+ switch (enqueue) {
+ case ERTS_ENQUEUE_NOT:
+ if (erts_system_profile_flags.runnable_procs) {
+
+ if (!(a & ERTS_PSFLG_ACTIVE_SYS)
+ && (!(a & ERTS_PSFLG_ACTIVE)
+ || (a & ERTS_PSFLG_SUSPENDED))) {
+ /* Process inactive */
+ profile_runnable_proc(p, am_inactive);
+ }
+ }
+
+ if (proxy)
+ free_proxy_proc(proxy);
+
+ erts_smp_runq_lock(c_rq);
+ return 0;
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+#ifdef ERTS_SMP
+ case ERTS_ENQUEUE_DIRTY_CPU_QUEUE:
+ case -ERTS_ENQUEUE_DIRTY_CPU_QUEUE:
+ runq = ERTS_DIRTY_CPU_RUNQ;
+ ASSERT(ERTS_SCHEDULER_IS_DIRTY_CPU(runq->scheduler));
+#ifdef ERTS_SMP
+ check_emigration_need = 0;
+#endif
+ break;
+
+ case ERTS_ENQUEUE_DIRTY_IO_QUEUE:
+ case -ERTS_ENQUEUE_DIRTY_IO_QUEUE:
+ runq = ERTS_DIRTY_IO_RUNQ;
+ ASSERT(ERTS_SCHEDULER_IS_DIRTY_IO(runq->scheduler));
+#ifdef ERTS_SMP
+ check_emigration_need = 0;
+#endif
+ break;
+#endif
+#endif
+
+ default:
+ ASSERT(enqueue == ERTS_ENQUEUE_NORMAL_QUEUE
+ || enqueue == -ERTS_ENQUEUE_NORMAL_QUEUE);
+
+ runq = erts_get_runq_proc(p);
+#ifdef ERTS_SMP
+ check_emigration_need = !(ERTS_PSFLG_BOUND & n);
+#endif
+ break;
}
- else {
- int prio = (int) (ERTS_PSFLG_PRIO_MASK & n);
- ErtsRunQueue *runq = erts_get_runq_proc(p);
- ASSERT(!(n & ERTS_PSFLG_SUSPENDED));
+ ASSERT(!(n & ERTS_PSFLG_SUSPENDED) || (n & ERTS_PSFLG_ACTIVE_SYS));
+
+ if (enqueue < 0)
+ sched_p = make_proxy_proc(proxy, p, enq_prio);
+ else {
+ sched_p = p;
+ if (proxy)
+ free_proxy_proc(proxy);
+ }
#ifdef ERTS_SMP
- if (!(ERTS_PSFLG_BOUND & n)) {
- ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio);
- if (new_runq) {
- RUNQ_SET_RQ(&p->run_queue, new_runq);
- runq = new_runq;
- }
+ if (check_emigration_need) {
+ ErtsRunQueue *new_runq = erts_check_emigration_need(runq, enq_prio);
+ if (new_runq) {
+ RUNQ_SET_RQ(&sched_p->run_queue, new_runq);
+ runq = new_runq;
}
+ }
#endif
- ASSERT(runq);
- res = 1;
- erts_smp_runq_lock(runq);
+ ASSERT(runq);
- /* Enqueue the process */
- enqueue_process(runq, prio, p);
+ erts_smp_runq_lock(runq);
- if (runq == c_rq)
- return res;
- erts_smp_runq_unlock(runq);
- smp_notify_inc_runq(runq);
- }
+ /* Enqueue the process */
+ enqueue_process(runq, (int) enq_prio, sched_p);
+
+ if (runq == c_rq)
+ return 1;
+ erts_smp_runq_unlock(runq);
+ smp_notify_inc_runq(runq);
erts_smp_runq_lock(c_rq);
- return res;
+ return 1;
}
static ERTS_INLINE void
-add2runq(Process *p, erts_aint32_t state)
+add2runq(Process *p, erts_aint32_t state, erts_aint32_t prio)
{
- int prio = (int) (ERTS_PSFLG_PRIO_MASK & state);
ErtsRunQueue *runq = erts_get_runq_proc(p);
#ifdef ERTS_SMP
if (!(ERTS_PSFLG_BOUND & state)) {
- ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio);
+ ErtsRunQueue *new_runq = erts_check_emigration_need(runq, (int) prio);
if (new_runq) {
RUNQ_SET_RQ(&p->run_queue, new_runq);
runq = new_runq;
@@ -4939,101 +5968,236 @@ add2runq(Process *p, erts_aint32_t state)
erts_smp_runq_lock(runq);
/* Enqueue the process */
- enqueue_process(runq, prio, p);
+ enqueue_process(runq, (int) prio, p);
erts_smp_runq_unlock(runq);
smp_notify_inc_runq(runq);
}
-static ERTS_INLINE void
-schedule_process(Process *p, erts_aint32_t state, int active_enq)
+static ERTS_INLINE int
+change_proc_schedule_state(Process *p,
+ erts_aint32_t clear_state_flags,
+ erts_aint32_t set_state_flags,
+ erts_aint32_t *statep,
+ erts_aint32_t *enq_prio_p)
{
- erts_aint32_t a = state, n;
+ /*
+ * NOTE: ERTS_PSFLG_RUNNING, ERTS_PSFLG_RUNNING_SYS and
+ * ERTS_PSFLG_ACTIVE_SYS are not allowed to be
+ * altered by this function!
+ */
+ erts_aint32_t a = *statep, n;
+ int enqueue; /* < 0 -> use proxy */
+
+ ASSERT(!(a & ERTS_PSFLG_PROXY));
+ ASSERT((clear_state_flags & (ERTS_PSFLG_RUNNING
+ | ERTS_PSFLG_RUNNING_SYS
+ | ERTS_PSFLG_ACTIVE_SYS)) == 0);
+ ASSERT((set_state_flags & (ERTS_PSFLG_RUNNING
+ | ERTS_PSFLG_RUNNING_SYS
+ | ERTS_PSFLG_ACTIVE_SYS)) == 0);
while (1) {
erts_aint32_t e;
n = e = a;
+ enqueue = ERTS_ENQUEUE_NOT;
+
if (a & ERTS_PSFLG_FREE)
- return; /* We don't want to schedule free processes... */
- n |= ERTS_PSFLG_ACTIVE;
- if (!(a & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_RUNNING)))
- n |= ERTS_PSFLG_IN_RUNQ;
+ break; /* We don't want to schedule free processes... */
+
+ if (clear_state_flags)
+ n &= ~clear_state_flags;
+
+ if (set_state_flags)
+ n |= set_state_flags;
+
+ if ((n & (ERTS_PSFLG_SUSPENDED
+ | ERTS_PSFLG_RUNNING
+ | ERTS_PSFLG_RUNNING_SYS
+ | ERTS_PSFLG_IN_RUNQ
+ | ERTS_PSFLG_ACTIVE)) == ERTS_PSFLG_ACTIVE) {
+ /*
+ * Active and seemingly need to be enqueued, but
+ * process may be in a run queue via proxy, need
+ * further inspection...
+ */
+ enqueue = check_enqueue_in_prio_queue(p, enq_prio_p, &n, a);
+ }
+
a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e);
if (a == e)
break;
- if (!active_enq && (a & ERTS_PSFLG_ACTIVE))
- return; /* Someone else activated process ... */
+ if (enqueue == ERTS_ENQUEUE_NOT && n == a)
+ break;
}
- if (erts_system_profile_flags.runnable_procs
- && !(a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) {
- profile_runnable_proc(p, am_active);
+ if (erts_system_profile_flags.runnable_procs) {
+
+ if (((n & (ERTS_PSFLG_SUSPENDED
+ | ERTS_PSFLG_ACTIVE)) == ERTS_PSFLG_ACTIVE)
+ && (!(a & (ERTS_PSFLG_ACTIVE_SYS
+ | ERTS_PSFLG_RUNNING
+ | ERTS_PSFLG_RUNNING_SYS)
+ && (!(a & ERTS_PSFLG_ACTIVE)
+ || (a & ERTS_PSFLG_SUSPENDED))))) {
+ /* We activated a prevously inactive process */
+ profile_runnable_proc(p, am_active);
+ }
+
}
- if ((n & ERTS_PSFLG_IN_RUNQ) && !(a & ERTS_PSFLG_IN_RUNQ))
- add2runq(p, n);
+ *statep = a;
+
+ return enqueue;
+}
+
+static ERTS_INLINE void
+schedule_process(Process *p, erts_aint32_t in_state)
+{
+ erts_aint32_t enq_prio = -1;
+ erts_aint32_t state = in_state;
+ int enqueue = change_proc_schedule_state(p,
+ 0,
+ ERTS_PSFLG_ACTIVE,
+ &state,
+ &enq_prio);
+ if (enqueue != ERTS_ENQUEUE_NOT)
+ add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio),
+ state,
+ enq_prio);
}
void
erts_schedule_process(Process *p, erts_aint32_t state)
{
- schedule_process(p, state, 0);
+ schedule_process(p, state);
+}
+
+static void
+schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy)
+{
+ erts_aint32_t a = state, n, enq_prio = -1;
+ int enqueue; /* < 0 -> use proxy */
+
+ ASSERT(!(state & ERTS_PSFLG_PROXY));
+
+ while (1) {
+ erts_aint32_t e;
+ n = e = a;
+
+ if (a & ERTS_PSFLG_FREE)
+ return; /* We don't want to schedule free processes... */
+
+ enqueue = ERTS_ENQUEUE_NOT;
+ n |= ERTS_PSFLG_ACTIVE_SYS;
+ if (!(a & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS)))
+ enqueue = check_enqueue_in_prio_queue(p, &enq_prio, &n, a);
+ a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e);
+ if (a == e)
+ break;
+ if (a == n && enqueue == ERTS_ENQUEUE_NOT)
+ goto cleanup;
+ }
+
+ if (erts_system_profile_flags.runnable_procs) {
+
+ if (!(a & (ERTS_PSFLG_ACTIVE_SYS
+ | ERTS_PSFLG_RUNNING
+ | ERTS_PSFLG_RUNNING_SYS))
+ && (!(a & ERTS_PSFLG_ACTIVE) || (a & ERTS_PSFLG_SUSPENDED))) {
+ /* We activated a prevously inactive process */
+ profile_runnable_proc(p, am_active);
+ }
+
+ }
+
+ if (enqueue != ERTS_ENQUEUE_NOT) {
+ Process *sched_p;
+ if (enqueue > 0)
+ sched_p = p;
+ else {
+ sched_p = make_proxy_proc(proxy, p, enq_prio);
+ proxy = NULL;
+ }
+ add2runq(sched_p, n, enq_prio);
+ }
+
+cleanup:
+ if (proxy)
+ free_proxy_proc(proxy);
}
static ERTS_INLINE int
suspend_process(Process *c_p, Process *p)
{
- erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state);
+ erts_aint32_t state;
int suspended = 0;
ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
+ state = erts_smp_atomic32_read_acqb(&p->state);
+
if ((state & ERTS_PSFLG_SUSPENDED))
suspended = -1;
else {
if (c_p == p) {
state = erts_smp_atomic32_read_bor_relb(&p->state,
ERTS_PSFLG_SUSPENDED);
- state |= ERTS_PSFLG_SUSPENDED;
ASSERT(state & ERTS_PSFLG_RUNNING);
- suspended = 1;
+ suspended = (state & ERTS_PSFLG_SUSPENDED) ? -1: 1;
}
else {
while (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_EXITING))) {
- erts_aint32_t e, n;
+ erts_aint32_t n, e;
+
n = e = state;
n |= ERTS_PSFLG_SUSPENDED;
state = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e);
if (state == e) {
- state = n;
suspended = 1;
break;
}
+ if (state & ERTS_PSFLG_SUSPENDED) {
+ suspended = -1;
+ break;
+ }
}
}
}
- if (state & ERTS_PSFLG_SUSPENDED) {
+ if (suspended) {
ASSERT(!(ERTS_PSFLG_RUNNING & state)
|| p == erts_get_current_process());
- if (erts_system_profile_flags.runnable_procs
- && (p->rcount == 0)
- && (state & ERTS_PSFLG_ACTIVE)) {
- profile_runnable_proc(p, am_inactive);
+ if (suspended > 0 && erts_system_profile_flags.runnable_procs) {
+
+ /* 'state' is before our change... */
+
+ if ((state & (ERTS_PSFLG_ACTIVE
+ | ERTS_PSFLG_ACTIVE_SYS
+ | ERTS_PSFLG_RUNNING
+ | ERTS_PSFLG_RUNNING_SYS
+ | ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) {
+ /* We made process inactive */
+ profile_runnable_proc(p, am_inactive);
+ }
+
}
p->rcount++; /* count number of suspend */
}
+
return suspended;
}
static ERTS_INLINE void
resume_process(Process *p)
{
- erts_aint32_t state;
+ erts_aint32_t state, enq_prio = -1;
+ int enqueue;
+
ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
ASSERT(p->rcount > 0);
@@ -5041,14 +6205,16 @@ resume_process(Process *p)
if (--p->rcount > 0) /* multiple suspend */
return;
- state = erts_smp_atomic32_read_band_mb(&p->state, ~ERTS_PSFLG_SUSPENDED);
- state &= ~ERTS_PSFLG_SUSPENDED;
- if ((state & (ERTS_PSFLG_EXITING
- | ERTS_PSFLG_ACTIVE
- | ERTS_PSFLG_IN_RUNQ
- | ERTS_PSFLG_RUNNING)) == ERTS_PSFLG_ACTIVE) {
- schedule_process(p, state, 1);
- }
+ state = erts_smp_atomic32_read_nob(&p->state);
+ enqueue = change_proc_schedule_state(p,
+ ERTS_PSFLG_SUSPENDED,
+ 0,
+ &state,
+ &enq_prio);
+ if (enqueue)
+ add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio),
+ state,
+ enq_prio);
}
int
@@ -5070,6 +6236,12 @@ static void
scheduler_ix_resume_wake(Uint ix)
{
ErtsSchedulerSleepInfo *ssi = ERTS_SCHED_SLEEP_INFO_IX(ix);
+ scheduler_ssi_resume_wake(ssi);
+}
+
+static void
+scheduler_ssi_resume_wake(ErtsSchedulerSleepInfo *ssi)
+{
erts_aint32_t xflgs = (ERTS_SSI_FLG_SLEEPING
| ERTS_SSI_FLG_TSE_SLEEPING
| ERTS_SSI_FLG_WAITING
@@ -5160,6 +6332,301 @@ sched_set_suspended_sleeptype(ErtsSchedulerSleepInfo *ssi)
}
}
+#ifdef ERTS_DIRTY_SCHEDULERS
+
+static void
+suspend_scheduler(ErtsSchedulerData *esdp)
+{
+ erts_aint32_t flgs;
+ erts_aint32_t changing;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ long no = (long) (ERTS_SCHEDULER_IS_DIRTY(esdp)
+ ? ERTS_DIRTY_SCHEDULER_NO(esdp)
+ : esdp->no);
+#else
+ long no = (long) esdp->no;
+#endif
+ ErtsSchedulerSleepInfo *ssi = esdp->ssi;
+ long active_schedulers;
+ int curr_online = 1;
+ int wake = 0;
+ erts_aint32_t aux_work;
+ int thr_prgr_active = 1;
+ ErtsStuckBoundProcesses sbp = {NULL, NULL};
+ int* ss_onlinep;
+ int* ss_curr_onlinep;
+ int* ss_wait_curr_onlinep;
+ long* ss_wait_activep;
+ long ss_wait_active_target;
+ erts_smp_atomic32_t* ss_changingp;
+ erts_smp_atomic32_t* ss_activep;
+
+ /*
+ * Schedulers may be suspended in two different ways:
+ * - A scheduler may be suspended since it is not online.
+ * All schedulers with scheduler ids greater than
+ * schdlr_sspnd.online are suspended; same for dirty
+ * schedulers and schdlr_sspnd.dirty_cpu_online and
+ * schdlr_sspnd.dirty_io_online.
+ * - Multi scheduling is blocked. All schedulers except the
+ * scheduler with scheduler id 1 are suspended, and all
+ * dirty CPU and dirty I/O schedulers are suspended.
+ *
+ * Regardless of why a scheduler is suspended, it ends up here.
+ */
+
+ ASSERT(ERTS_SCHEDULER_IS_DIRTY(esdp) || no != 1);
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_SCHEDULER_IS_DIRTY(esdp)) {
+ if (erts_smp_mtx_trylock(&schdlr_sspnd.mtx) == EBUSY) {
+ erts_smp_runq_unlock(esdp->run_queue);
+ erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+ erts_smp_runq_lock(esdp->run_queue);
+ }
+ if (ongoing_multi_scheduling_block())
+ evacuate_run_queue(esdp->run_queue, &sbp);
+ } else
+#endif
+ evacuate_run_queue(esdp->run_queue, &sbp);
+
+ erts_smp_runq_unlock(esdp->run_queue);
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp))
+#endif
+ {
+ erts_sched_check_cpu_bind_prep_suspend(esdp);
+
+ if (erts_system_profile_flags.scheduler)
+ profile_scheduler(make_small(esdp->no), am_inactive);
+
+ sched_wall_time_change(esdp, 0);
+
+ erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+ }
+
+ flgs = sched_prep_spin_suspended(ssi, ERTS_SSI_FLG_SUSPENDED);
+ if (flgs & ERTS_SSI_FLG_SUSPENDED) {
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_SCHEDULER_IS_DIRTY(esdp)) {
+ if (ERTS_RUNQ_IS_DIRTY_CPU_RUNQ(esdp->run_queue)) {
+ active_schedulers = erts_smp_atomic32_dec_read_nob(&schdlr_sspnd.dirty_cpu_active);
+ ASSERT(active_schedulers >= 0);
+ changing = erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_cpu_changing);
+ ss_onlinep = &schdlr_sspnd.dirty_cpu_online;
+ ss_curr_onlinep = &schdlr_sspnd.dirty_cpu_curr_online;
+ ss_wait_curr_onlinep = &schdlr_sspnd.dirty_cpu_wait_curr_online;
+ ss_changingp = &schdlr_sspnd.dirty_cpu_changing;
+ ss_wait_activep = &schdlr_sspnd.msb.dirty_cpu_wait_active;
+ ss_activep = &schdlr_sspnd.dirty_cpu_active;
+ } else {
+ active_schedulers = erts_smp_atomic32_dec_read_nob(&schdlr_sspnd.dirty_io_active);
+ ASSERT(active_schedulers >= 0);
+ changing = erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_io_changing);
+ ss_onlinep = &schdlr_sspnd.dirty_io_online;
+ ss_curr_onlinep = &schdlr_sspnd.dirty_io_curr_online;
+ ss_wait_curr_onlinep = &schdlr_sspnd.dirty_io_wait_curr_online;
+ ss_changingp = &schdlr_sspnd.dirty_io_changing;
+ ss_wait_activep = &schdlr_sspnd.msb.dirty_io_wait_active;
+ ss_activep = &schdlr_sspnd.dirty_io_active;
+ }
+ ss_wait_active_target = 0;
+ }
+ else
+#endif
+ {
+ active_schedulers = erts_smp_atomic32_dec_read_nob(&schdlr_sspnd.active);
+ ASSERT(active_schedulers >= 1);
+ changing = erts_smp_atomic32_read_nob(&schdlr_sspnd.changing);
+ ss_onlinep = &schdlr_sspnd.online;
+ ss_curr_onlinep = &schdlr_sspnd.curr_online;
+ ss_wait_curr_onlinep = &schdlr_sspnd.wait_curr_online;
+ ss_changingp = &schdlr_sspnd.changing;
+ ss_wait_activep = &schdlr_sspnd.msb.wait_active;
+ ss_activep = &schdlr_sspnd.active;
+ ss_wait_active_target = 1;
+ }
+ if (changing & ERTS_SCHDLR_SSPND_CHNG_MSB) {
+ if (active_schedulers == *ss_wait_activep)
+ wake = 1;
+ if (active_schedulers == ss_wait_active_target) {
+ changing = erts_smp_atomic32_read_band_nob(ss_changingp,
+ ~ERTS_SCHDLR_SSPND_CHNG_MSB);
+ changing &= ~ERTS_SCHDLR_SSPND_CHNG_MSB;
+ }
+ }
+
+ while (1) {
+ if (changing & ERTS_SCHDLR_SSPND_CHNG_ONLN) {
+ int changed = 0;
+ if (no > *ss_onlinep && curr_online) {
+ (*ss_curr_onlinep)--;
+ curr_online = 0;
+ changed = 1;
+ }
+ else if (no <= *ss_onlinep && !curr_online) {
+ (*ss_curr_onlinep)++;
+ curr_online = 1;
+ changed = 1;
+ }
+ if (changed
+ && *ss_curr_onlinep == *ss_wait_curr_onlinep)
+ wake = 1;
+ if (*ss_onlinep == *ss_curr_onlinep) {
+ changing = erts_smp_atomic32_read_band_nob(ss_changingp,
+ ~ERTS_SCHDLR_SSPND_CHNG_ONLN);
+ changing &= ~ERTS_SCHDLR_SSPND_CHNG_ONLN;
+ }
+ }
+
+ if (wake) {
+ erts_smp_cnd_signal(&schdlr_sspnd.cnd);
+ wake = 0;
+ }
+
+ if (curr_online && !ongoing_multi_scheduling_block()) {
+ flgs = erts_smp_atomic32_read_acqb(&ssi->flags);
+ if (!(flgs & ERTS_SSI_FLG_SUSPENDED))
+ break;
+ }
+ erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
+
+ while (1) {
+ erts_aint32_t qmask;
+ erts_aint32_t flgs;
+
+ qmask = (ERTS_RUNQ_FLGS_GET(esdp->run_queue)
+ & ERTS_RUNQ_FLGS_QMASK);
+ aux_work = erts_atomic32_read_acqb(&ssi->aux_work);
+ if (aux_work|qmask) {
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && !thr_prgr_active) {
+ erts_thr_progress_active(esdp, thr_prgr_active = 1);
+ sched_wall_time_change(esdp, 1);
+ }
+ if (aux_work)
+ aux_work = handle_aux_work(&esdp->aux_work_data,
+ aux_work,
+ 1);
+
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp) &&
+ (aux_work && erts_thr_progress_update(esdp)))
+ erts_thr_progress_leader_update(esdp);
+ if (qmask) {
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_SCHEDULER_IS_DIRTY(esdp)) {
+ erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+ erts_smp_runq_lock(esdp->run_queue);
+ if (ongoing_multi_scheduling_block())
+ evacuate_run_queue(esdp->run_queue, &sbp);
+ erts_smp_runq_unlock(esdp->run_queue);
+ erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
+ } else
+#endif
+ {
+ erts_smp_runq_lock(esdp->run_queue);
+ evacuate_run_queue(esdp->run_queue, &sbp);
+ erts_smp_runq_unlock(esdp->run_queue);
+ }
+ }
+ }
+
+ if (!aux_work) {
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp))
+#endif
+ {
+ if (thr_prgr_active) {
+ erts_thr_progress_active(esdp, thr_prgr_active = 0);
+ sched_wall_time_change(esdp, 0);
+ }
+ erts_thr_progress_prepare_wait(esdp);
+ }
+ flgs = sched_spin_suspended(ssi,
+ ERTS_SCHED_SUSPEND_SLEEP_SPINCOUNT);
+ if (flgs == (ERTS_SSI_FLG_SLEEPING
+ | ERTS_SSI_FLG_WAITING
+ | ERTS_SSI_FLG_SUSPENDED)) {
+ flgs = sched_set_suspended_sleeptype(ssi);
+ if (flgs == (ERTS_SSI_FLG_SLEEPING
+ | ERTS_SSI_FLG_TSE_SLEEPING
+ | ERTS_SSI_FLG_WAITING
+ | ERTS_SSI_FLG_SUSPENDED)) {
+ int res;
+
+ do {
+ res = erts_tse_wait(ssi->event);
+ } while (res == EINTR);
+ }
+ }
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp))
+#endif
+ erts_thr_progress_finalize_wait(esdp);
+ }
+
+ flgs = sched_prep_spin_suspended(ssi, (ERTS_SSI_FLG_WAITING
+ | ERTS_SSI_FLG_SUSPENDED));
+ if (!(flgs & ERTS_SSI_FLG_SUSPENDED))
+ break;
+ changing = erts_smp_atomic32_read_nob(ss_changingp);
+ if (changing & ~ERTS_SCHDLR_SSPND_CHNG_WAITER)
+ break;
+ }
+
+ erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+ changing = erts_smp_atomic32_read_nob(ss_changingp);
+ }
+
+ active_schedulers = erts_smp_atomic32_inc_read_nob(ss_activep);
+ changing = erts_smp_atomic32_read_nob(ss_changingp);
+ if ((changing & ERTS_SCHDLR_SSPND_CHNG_MSB)
+ && *ss_onlinep == active_schedulers) {
+ erts_smp_atomic32_read_band_nob(ss_changingp,
+ ~ERTS_SCHDLR_SSPND_CHNG_MSB);
+ }
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp))
+#endif
+ ASSERT(no <= *ss_onlinep);
+ ASSERT(!ongoing_multi_scheduling_block());
+
+ }
+
+ erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
+
+ ASSERT(curr_online);
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp))
+#endif
+ {
+ if (erts_system_profile_flags.scheduler)
+ profile_scheduler(make_small(esdp->no), am_active);
+
+ if (!thr_prgr_active) {
+ erts_thr_progress_active(esdp, thr_prgr_active = 1);
+ sched_wall_time_change(esdp, 1);
+ }
+ }
+
+ erts_smp_runq_lock(esdp->run_queue);
+ non_empty_runq(esdp->run_queue);
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp))
+#endif
+ {
+ schedule_bound_processes(esdp->run_queue, &sbp);
+
+ erts_sched_check_cpu_bind_post_suspend(esdp);
+ }
+}
+
+#else /* !ERTS_DIRTY_SCHEDULERS */
+
static void
suspend_scheduler(ErtsSchedulerData *esdp)
{
@@ -5348,29 +6815,328 @@ suspend_scheduler(ErtsSchedulerData *esdp)
erts_sched_check_cpu_bind_post_suspend(esdp);
}
+#endif
+
ErtsSchedSuspendResult
erts_schedulers_state(Uint *total,
Uint *online,
Uint *active,
+ Uint *dirty_cpu,
+ Uint *dirty_cpu_online,
+ Uint *dirty_io,
int yield_allowed)
{
- int res;
+ int res = ERTS_SCHDLR_SSPND_EINVAL;
erts_aint32_t changing;
erts_smp_mtx_lock(&schdlr_sspnd.mtx);
changing = erts_smp_atomic32_read_nob(&schdlr_sspnd.changing);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ changing |= (erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_cpu_changing)
+ | erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_io_changing));
+#endif
if (yield_allowed && (changing & ~ERTS_SCHDLR_SSPND_CHNG_WAITER))
res = ERTS_SCHDLR_SSPND_YIELD_RESTART;
else {
- *active = *online = schdlr_sspnd.online;
- if (ongoing_multi_scheduling_block())
+ if (active)
+ *active = schdlr_sspnd.online;
+ if (online)
+ *online = schdlr_sspnd.online;
+ if (ongoing_multi_scheduling_block() && active)
*active = 1;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (dirty_cpu_online)
+ *dirty_cpu_online = schdlr_sspnd.dirty_cpu_online;
+#endif
res = ERTS_SCHDLR_SSPND_DONE;
}
erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
- *total = erts_no_schedulers;
+ if (total)
+ *total = erts_no_schedulers;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (dirty_cpu)
+ *dirty_cpu = erts_no_dirty_cpu_schedulers;
+ if (dirty_io)
+ *dirty_io = erts_no_dirty_io_schedulers;
+#endif
+ return res;
+}
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+
+ErtsSchedSuspendResult
+erts_set_schedulers_online(Process *p,
+ ErtsProcLocks plocks,
+ Sint new_no,
+ Sint *old_no
+#ifdef ERTS_DIRTY_SCHEDULERS
+ , int dirty_only
+#endif
+ )
+{
+ ErtsSchedulerData *esdp;
+ int ix, res = -1, no, have_unlocked_plocks, end_wait;
+ erts_aint32_t changing = 0;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ErtsSchedulerSleepInfo* ssi;
+ int dirty_no, change_dirty;
+#endif
+
+ if (new_no < 1)
+ return ERTS_SCHDLR_SSPND_EINVAL;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ else if (dirty_only && erts_no_dirty_cpu_schedulers < new_no)
+ return ERTS_SCHDLR_SSPND_EINVAL;
+#endif
+ else if (erts_no_schedulers < new_no)
+ return ERTS_SCHDLR_SSPND_EINVAL;
+
+ esdp = ERTS_PROC_GET_SCHDATA(p);
+ end_wait = 0;
+
+ erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+
+ have_unlocked_plocks = 0;
+ no = (int) new_no;
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(schdlr_sspnd.dirty_cpu_online <= erts_no_dirty_cpu_schedulers);
+ if (dirty_only) {
+ if (no > schdlr_sspnd.online) {
+ erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
+ return ERTS_SCHDLR_SSPND_EINVAL;
+ }
+ dirty_no = no;
+ } else {
+ /*
+ * Adjust the number of dirty CPU schedulers online relative to the
+ * adjustment made to the number of normal schedulers online.
+ */
+ int total_pct = erts_no_dirty_cpu_schedulers*100/erts_no_schedulers;
+ int onln_pct = no*total_pct/schdlr_sspnd.online;
+ dirty_no = schdlr_sspnd.dirty_cpu_online*onln_pct/100;
+ if (dirty_no == 0)
+ dirty_no = 1;
+ ASSERT(dirty_no <= erts_no_dirty_cpu_schedulers);
+ }
+#endif
+ changing = erts_smp_atomic32_read_nob(&schdlr_sspnd.changing);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ changing |= erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_cpu_changing);
+#endif
+ if (changing) {
+ res = ERTS_SCHDLR_SSPND_YIELD_RESTART;
+ }
+ else {
+ int online = *old_no = schdlr_sspnd.online;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ int dirty_online = schdlr_sspnd.dirty_cpu_online;
+
+ if (dirty_only) {
+ *old_no = schdlr_sspnd.dirty_cpu_online;
+ if (dirty_no == schdlr_sspnd.dirty_cpu_online) {
+ res = ERTS_SCHDLR_SSPND_DONE;
+ }
+ change_dirty = 1;
+ } else {
+#endif
+ if (no == schdlr_sspnd.online) {
+#ifdef ERTS_DIRTY_SCHEDULERS
+ dirty_only = 1;
+ if (dirty_no == schdlr_sspnd.dirty_cpu_online)
+#endif
+ res = ERTS_SCHDLR_SSPND_DONE;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ else
+ change_dirty = 1;
+#endif
+ }
+#ifdef ERTS_DIRTY_SCHEDULERS
+ else
+ change_dirty = (dirty_no != schdlr_sspnd.dirty_cpu_online);
+ }
+#endif
+ if (res == -1)
+ {
+ int increase = (no > online);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (!dirty_only) {
+#endif
+ ERTS_SCHDLR_SSPND_CHNG_SET((ERTS_SCHDLR_SSPND_CHNG_ONLN
+ | ERTS_SCHDLR_SSPND_CHNG_WAITER), 0);
+ schdlr_sspnd.online = no;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ } else
+ increase = (dirty_no > dirty_online);
+ if (change_dirty) {
+ ERTS_SCHDLR_SSPND_DIRTY_CPU_CHNG_SET((ERTS_SCHDLR_SSPND_CHNG_ONLN
+ | ERTS_SCHDLR_SSPND_CHNG_WAITER), 0);
+ schdlr_sspnd.dirty_cpu_online = dirty_no;
+ }
+#endif
+ if (increase) {
+ int ix;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (!dirty_only) {
+#endif
+ schdlr_sspnd.wait_curr_online = no;
+ if (ongoing_multi_scheduling_block()) {
+ for (ix = online; ix < no; ix++)
+ erts_sched_poke(ERTS_SCHED_SLEEP_INFO_IX(ix));
+ }
+ else {
+ if (plocks) {
+ have_unlocked_plocks = 1;
+ erts_smp_proc_unlock(p, plocks);
+ }
+ change_no_used_runqs(no);
+
+ for (ix = online; ix < no; ix++)
+ resume_run_queue(ERTS_RUNQ_IX(ix));
+
+ for (ix = no; ix < erts_no_run_queues; ix++)
+ suspend_run_queue(ERTS_RUNQ_IX(ix));
+ }
+#ifdef ERTS_DIRTY_SCHEDULERS
+ }
+ if (change_dirty) {
+ schdlr_sspnd.dirty_cpu_wait_curr_online = dirty_no;
+ ASSERT(schdlr_sspnd.dirty_cpu_curr_online !=
+ schdlr_sspnd.dirty_cpu_wait_curr_online);
+ if (ongoing_multi_scheduling_block()) {
+ for (ix = dirty_online; ix < dirty_no; ix++) {
+ ssi = ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(ix);
+ erts_sched_poke(ssi);
+ }
+ } else {
+ for (ix = dirty_online; ix < dirty_no; ix++) {
+ ssi = ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(ix);
+ scheduler_ssi_resume_wake(ssi);
+ erts_smp_atomic32_read_band_nob(&ssi->flags,
+ ~ERTS_SSI_FLG_SUSPENDED);
+ }
+ wake_dirty_schedulers(ERTS_DIRTY_CPU_RUNQ, 0);
+ }
+ }
+#endif
+ res = ERTS_SCHDLR_SSPND_DONE;
+ }
+ else /* if (no < online) */ {
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (change_dirty) {
+ schdlr_sspnd.dirty_cpu_wait_curr_online = dirty_no;
+ ASSERT(schdlr_sspnd.dirty_cpu_curr_online !=
+ schdlr_sspnd.dirty_cpu_wait_curr_online);
+ if (ongoing_multi_scheduling_block()) {
+ for (ix = dirty_no; ix < dirty_online; ix++) {
+ ssi = ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(ix);
+ erts_sched_poke(ssi);
+ }
+ } else {
+ for (ix = dirty_no; ix < dirty_online; ix++) {
+ ssi = ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(ix);
+ erts_smp_atomic32_read_bor_nob(&ssi->flags,
+ ERTS_SSI_FLG_SUSPENDED);
+ }
+ wake_dirty_schedulers(ERTS_DIRTY_CPU_RUNQ, 0);
+ }
+ }
+ if (dirty_only) {
+ res = ERTS_SCHDLR_SSPND_DONE;
+ }
+ else
+#endif
+ {
+ if (p->scheduler_data->no <= no) {
+ res = ERTS_SCHDLR_SSPND_DONE;
+ schdlr_sspnd.wait_curr_online = no;
+ }
+ else {
+ /*
+ * Yield! Current process needs to migrate
+ * before bif returns.
+ */
+ res = ERTS_SCHDLR_SSPND_YIELD_DONE;
+ schdlr_sspnd.wait_curr_online = no+1;
+ }
+
+ if (ongoing_multi_scheduling_block()) {
+ for (ix = no; ix < online; ix++)
+ erts_sched_poke(ERTS_SCHED_SLEEP_INFO_IX(ix));
+ }
+ else {
+ if (plocks) {
+ have_unlocked_plocks = 1;
+ erts_smp_proc_unlock(p, plocks);
+ }
+
+ change_no_used_runqs(no);
+ for (ix = no; ix < erts_no_run_queues; ix++)
+ suspend_run_queue(ERTS_RUNQ_IX(ix));
+
+ for (ix = no; ix < online; ix++) {
+ ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
+ wake_scheduler(rq);
+ }
+ }
+ }
+ }
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (change_dirty) {
+ while (schdlr_sspnd.dirty_cpu_curr_online != schdlr_sspnd.dirty_cpu_wait_curr_online)
+ erts_smp_cnd_wait(&schdlr_sspnd.cnd, &schdlr_sspnd.mtx);
+ ERTS_SCHDLR_SSPND_DIRTY_CPU_CHNG_SET(0, ERTS_SCHDLR_SSPND_CHNG_WAITER);
+ erts_smp_atomic32_read_band_nob(&schdlr_sspnd.dirty_cpu_changing,
+ ~ERTS_SCHDLR_SSPND_CHNG_WAITER);
+ }
+ if (!dirty_only)
+#endif
+ {
+ if (schdlr_sspnd.curr_online != schdlr_sspnd.wait_curr_online) {
+ erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
+ if (plocks && !have_unlocked_plocks) {
+ have_unlocked_plocks = 1;
+ erts_smp_proc_unlock(p, plocks);
+ }
+ erts_thr_progress_active(esdp, 0);
+ erts_thr_progress_prepare_wait(esdp);
+ end_wait = 1;
+ erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+ }
+
+ while (schdlr_sspnd.curr_online != schdlr_sspnd.wait_curr_online)
+ erts_smp_cnd_wait(&schdlr_sspnd.cnd, &schdlr_sspnd.mtx);
+
+ ASSERT(res != ERTS_SCHDLR_SSPND_DONE
+ ? (ERTS_SCHDLR_SSPND_CHNG_WAITER
+ & erts_smp_atomic32_read_nob(&schdlr_sspnd.changing))
+ : (ERTS_SCHDLR_SSPND_CHNG_WAITER
+ == erts_smp_atomic32_read_nob(&schdlr_sspnd.changing)));
+ erts_smp_atomic32_read_band_nob(&schdlr_sspnd.changing,
+ ~ERTS_SCHDLR_SSPND_CHNG_WAITER);
+ }
+ }
+ }
+
+ erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(schdlr_sspnd.dirty_cpu_online <= schdlr_sspnd.online);
+ if (!dirty_only)
+#endif
+ {
+ if (end_wait) {
+ erts_thr_progress_finalize_wait(esdp);
+ erts_thr_progress_active(esdp, 1);
+ }
+ if (have_unlocked_plocks)
+ erts_smp_proc_lock(p, plocks);
+ }
+
return res;
}
+#else /* !ERTS_DIRTY_SCHEDULERS */
+
ErtsSchedSuspendResult
erts_set_schedulers_online(Process *p,
ErtsProcLocks plocks,
@@ -5457,7 +7223,7 @@ erts_set_schedulers_online(Process *p,
for (ix = no; ix < online; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- wake_scheduler(rq, 0);
+ wake_scheduler(rq);
}
}
}
@@ -5499,15 +7265,24 @@ erts_set_schedulers_online(Process *p,
return res;
}
+#endif
+
ErtsSchedSuspendResult
erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
{
- int ix, res, have_unlocked_plocks = 0;
+ int ix, res, have_unlocked_plocks = 0, online;
erts_aint32_t changing;
ErtsProcList *plp;
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ErtsSchedulerSleepInfo* ssi;
+#endif
erts_smp_mtx_lock(&schdlr_sspnd.mtx);
changing = erts_smp_atomic32_read_nob(&schdlr_sspnd.changing);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ changing |= (erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_cpu_changing)
+ | erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_io_changing));
+#endif
if (changing) {
res = ERTS_SCHDLR_SSPND_YIELD_RESTART; /* Yield */
}
@@ -5517,10 +7292,13 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
erts_proclist_store_last(&schdlr_sspnd.msb.procs, plp);
p->flags |= F_HAVE_BLCKD_MSCHED;
ASSERT(erts_smp_atomic32_read_nob(&schdlr_sspnd.active) == 1);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_cpu_active) == 0);
+ ASSERT(erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_io_active) == 0);
+#endif
ASSERT(p->scheduler_data->no == 1);
res = ERTS_SCHDLR_SSPND_DONE_MSCHED_BLOCKED;
- }
- else {
+ } else {
int online = schdlr_sspnd.online;
p->flags |= F_HAVE_BLCKD_MSCHED;
if (plocks) {
@@ -5532,6 +7310,35 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
if (online == 1) {
res = ERTS_SCHDLR_SSPND_DONE_MSCHED_BLOCKED;
ASSERT(erts_smp_atomic32_read_nob(&schdlr_sspnd.active) == 1);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_cpu_active) == 1);
+ ASSERT(!(erts_smp_atomic32_read_nob(&ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(0)->flags)
+ & ERTS_SSI_FLG_SUSPENDED));
+ schdlr_sspnd.msb.dirty_cpu_wait_active = 0;
+ ERTS_SCHDLR_SSPND_DIRTY_CPU_CHNG_SET((ERTS_SCHDLR_SSPND_CHNG_MSB
+ | ERTS_SCHDLR_SSPND_CHNG_WAITER), 0);
+ ssi = ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(0);
+ erts_smp_atomic32_read_bor_nob(&ssi->flags, ERTS_SSI_FLG_SUSPENDED);
+ wake_dirty_schedulers(ERTS_DIRTY_CPU_RUNQ, 0);
+ while (erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_cpu_active)
+ != schdlr_sspnd.msb.dirty_cpu_wait_active)
+ erts_smp_cnd_wait(&schdlr_sspnd.cnd, &schdlr_sspnd.mtx);
+ ERTS_SCHDLR_SSPND_DIRTY_CPU_CHNG_SET(0, ERTS_SCHDLR_SSPND_CHNG_WAITER);
+
+ schdlr_sspnd.msb.dirty_io_wait_active = 0;
+ ERTS_SCHDLR_SSPND_DIRTY_IO_CHNG_SET((ERTS_SCHDLR_SSPND_CHNG_MSB
+ | ERTS_SCHDLR_SSPND_CHNG_WAITER), 0);
+ for (ix = 0; ix < erts_no_dirty_io_schedulers; ix++) {
+ ssi = ERTS_DIRTY_IO_SCHED_SLEEP_INFO_IX(ix);
+ erts_smp_atomic32_read_bor_nob(&ssi->flags,
+ ERTS_SSI_FLG_SUSPENDED);
+ }
+ wake_dirty_schedulers(ERTS_DIRTY_IO_RUNQ, 0);
+ while (erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_io_active)
+ != schdlr_sspnd.msb.dirty_io_wait_active)
+ erts_smp_cnd_wait(&schdlr_sspnd.cnd, &schdlr_sspnd.mtx);
+ ERTS_SCHDLR_SSPND_DIRTY_IO_CHNG_SET(0, ERTS_SCHDLR_SSPND_CHNG_WAITER);
+#endif
ASSERT(p->scheduler_data->no == 1);
}
else {
@@ -5550,13 +7357,44 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
schdlr_sspnd.msb.wait_active = 2;
}
+#ifdef ERTS_DIRTY_SCHEDULERS
+ schdlr_sspnd.msb.dirty_cpu_wait_active = 0;
+ ERTS_SCHDLR_SSPND_DIRTY_CPU_CHNG_SET((ERTS_SCHDLR_SSPND_CHNG_MSB
+ | ERTS_SCHDLR_SSPND_CHNG_WAITER), 0);
+ for (ix = 0; ix < erts_no_dirty_cpu_schedulers; ix++) {
+ ssi = ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(ix);
+ erts_smp_atomic32_read_bor_nob(&ssi->flags,
+ ERTS_SSI_FLG_SUSPENDED);
+ }
+ wake_dirty_schedulers(ERTS_DIRTY_CPU_RUNQ, 0);
+ while (erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_cpu_active)
+ != schdlr_sspnd.msb.dirty_cpu_wait_active)
+ erts_smp_cnd_wait(&schdlr_sspnd.cnd, &schdlr_sspnd.mtx);
+ ERTS_SCHDLR_SSPND_DIRTY_CPU_CHNG_SET(0, ERTS_SCHDLR_SSPND_CHNG_WAITER);
+ ASSERT(schdlr_sspnd.dirty_cpu_curr_online == schdlr_sspnd.dirty_cpu_online);
+
+ schdlr_sspnd.msb.dirty_io_wait_active = 0;
+ ERTS_SCHDLR_SSPND_DIRTY_IO_CHNG_SET((ERTS_SCHDLR_SSPND_CHNG_MSB
+ | ERTS_SCHDLR_SSPND_CHNG_WAITER), 0);
+ for (ix = 0; ix < erts_no_dirty_io_schedulers; ix++) {
+ ssi = ERTS_DIRTY_IO_SCHED_SLEEP_INFO_IX(ix);
+ erts_smp_atomic32_read_bor_nob(&ssi->flags,
+ ERTS_SSI_FLG_SUSPENDED);
+ }
+ wake_dirty_schedulers(ERTS_DIRTY_IO_RUNQ, 0);
+ while (erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_io_active)
+ != schdlr_sspnd.msb.dirty_io_wait_active)
+ erts_smp_cnd_wait(&schdlr_sspnd.cnd, &schdlr_sspnd.mtx);
+ ERTS_SCHDLR_SSPND_DIRTY_IO_CHNG_SET(0, ERTS_SCHDLR_SSPND_CHNG_WAITER);
+ ASSERT(schdlr_sspnd.dirty_io_curr_online == schdlr_sspnd.dirty_io_online);
+#endif
change_no_used_runqs(1);
for (ix = 1; ix < erts_no_run_queues; ix++)
suspend_run_queue(ERTS_RUNQ_IX(ix));
for (ix = 1; ix < online; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- wake_scheduler(rq, 0);
+ wake_scheduler(rq);
}
if (erts_smp_atomic32_read_nob(&schdlr_sspnd.active)
@@ -5590,6 +7428,7 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
erts_smp_mtx_lock(&schdlr_sspnd.mtx);
}
+
ASSERT(res != ERTS_SCHDLR_SSPND_DONE_MSCHED_BLOCKED
? (ERTS_SCHDLR_SSPND_CHNG_WAITER
& erts_smp_atomic32_read_nob(&schdlr_sspnd.changing))
@@ -5630,12 +7469,12 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
p->flags &= ~F_HAVE_BLCKD_MSCHED;
schdlr_sspnd.msb.ongoing = 0;
if (schdlr_sspnd.online == 1) {
- /* No schedulers to resume */
+ /* No normal schedulers to resume */
ASSERT(erts_smp_atomic32_read_nob(&schdlr_sspnd.active) == 1);
ERTS_SCHDLR_SSPND_CHNG_SET(0, ERTS_SCHDLR_SSPND_CHNG_MSB);
}
else {
- int online = schdlr_sspnd.online;
+ online = schdlr_sspnd.online;
if (plocks) {
have_unlocked_plocks = 1;
erts_smp_proc_unlock(p, plocks);
@@ -5650,6 +7489,27 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
for (ix = online; ix < erts_no_run_queues; ix++)
suspend_run_queue(ERTS_RUNQ_IX(ix));
}
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ERTS_SCHDLR_SSPND_DIRTY_CPU_CHNG_SET(ERTS_SCHDLR_SSPND_CHNG_MSB, 0);
+ schdlr_sspnd.msb.dirty_cpu_wait_active = schdlr_sspnd.dirty_cpu_online;
+ for (ix = 0; ix < schdlr_sspnd.dirty_cpu_online; ix++) {
+ ssi = ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(ix);
+ scheduler_ssi_resume_wake(ssi);
+ erts_smp_atomic32_read_band_nob(&ssi->flags,
+ ~ERTS_SSI_FLG_SUSPENDED);
+ }
+ wake_dirty_schedulers(ERTS_DIRTY_CPU_RUNQ, 0);
+
+ ERTS_SCHDLR_SSPND_DIRTY_IO_CHNG_SET(ERTS_SCHDLR_SSPND_CHNG_MSB, 0);
+ schdlr_sspnd.msb.dirty_io_wait_active = erts_no_dirty_io_schedulers;
+ for (ix = 0; ix < erts_no_dirty_io_schedulers; ix++) {
+ ssi = ERTS_DIRTY_IO_SCHED_SLEEP_INFO_IX(ix);
+ scheduler_ssi_resume_wake(ssi);
+ erts_smp_atomic32_read_band_nob(&ssi->flags,
+ ~ERTS_SSI_FLG_SUSPENDED);
+ }
+ wake_dirty_schedulers(ERTS_DIRTY_IO_RUNQ, 0);
+#endif
res = ERTS_SCHDLR_SSPND_DONE;
}
}
@@ -5776,7 +7636,11 @@ sched_thread_func(void *vesdp)
erts_smp_atomic32_read_band_nob(&schdlr_sspnd.changing,
~ERTS_SCHDLR_SSPND_CHNG_ONLN);
if (no != 1)
+#ifdef ERTS_DIRTY_SCHEDULERS
+ erts_smp_cnd_broadcast(&schdlr_sspnd.cnd);
+#else
erts_smp_cnd_signal(&schdlr_sspnd.cnd);
+#endif
}
if (no == 1) {
@@ -5806,22 +7670,155 @@ sched_thread_func(void *vesdp)
return NULL;
}
+#ifdef ERTS_DIRTY_SCHEDULERS
+#ifdef ERTS_SMP
+static void*
+sched_dirty_cpu_thread_func(void *vesdp)
+{
+ ErtsThrPrgrCallbacks callbacks;
+ ErtsSchedulerData *esdp = vesdp;
+ Uint no = ERTS_DIRTY_SCHEDULER_NO(esdp);
+ ERTS_DIRTY_SCHEDULER_TYPE(esdp) = ERTS_DIRTY_CPU_SCHEDULER;
+ ASSERT(no != 0);
+ ERTS_DIRTY_CPU_SCHED_SLEEP_INFO_IX(no-1)->event = erts_tse_fetch();
+ callbacks.arg = (void *) esdp->ssi;
+ callbacks.wakeup = thr_prgr_wakeup;
+ callbacks.prepare_wait = NULL;
+ callbacks.wait = NULL;
+ callbacks.finalize_wait = NULL;
+
+ erts_thr_progress_register_unmanaged_thread(&callbacks);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ {
+ char buf[31];
+ erts_snprintf(&buf[0], 31, "dirty cpu scheduler %beu", no);
+ erts_lc_set_thread_name(&buf[0]);
+ }
+#endif
+ erts_tsd_set(sched_data_key, vesdp);
+#if ERTS_USE_ASYNC_READY_Q
+ esdp->aux_work_data.async_ready.queue = NULL;
+#endif
+
+ erts_proc_lock_prepare_proc_lock_waiter();
+
+#ifdef HIPE
+ hipe_thread_signal_init();
+#endif
+ erts_thread_init_float();
+
+ erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+ ASSERT(erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_cpu_changing)
+ & ERTS_SCHDLR_SSPND_CHNG_ONLN);
+
+ if (--schdlr_sspnd.dirty_cpu_curr_online == schdlr_sspnd.dirty_cpu_wait_curr_online) {
+ erts_smp_atomic32_read_band_nob(&schdlr_sspnd.dirty_cpu_changing,
+ ~ERTS_SCHDLR_SSPND_CHNG_ONLN);
+ if (no != 1)
+ erts_smp_cnd_broadcast(&schdlr_sspnd.cnd);
+ }
+
+ if (no == 1) {
+ while (schdlr_sspnd.dirty_cpu_curr_online != schdlr_sspnd.dirty_cpu_wait_curr_online)
+ erts_smp_cnd_wait(&schdlr_sspnd.cnd, &schdlr_sspnd.mtx);
+ ERTS_SCHDLR_SSPND_DIRTY_CPU_CHNG_SET(0, ERTS_SCHDLR_SSPND_CHNG_WAITER);
+ }
+ erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
+
+ process_main();
+ /* No schedulers should *ever* terminate */
+ erl_exit(ERTS_ABORT_EXIT,
+ "Dirty CPU scheduler thread number %beu terminated\n",
+ no);
+ return NULL;
+}
+
+static void*
+sched_dirty_io_thread_func(void *vesdp)
+{
+ ErtsThrPrgrCallbacks callbacks;
+ ErtsSchedulerData *esdp = vesdp;
+ Uint no = ERTS_DIRTY_SCHEDULER_NO(esdp);
+ ERTS_DIRTY_SCHEDULER_TYPE(esdp) = ERTS_DIRTY_IO_SCHEDULER;
+ ASSERT(no != 0);
+ ERTS_DIRTY_IO_SCHED_SLEEP_INFO_IX(no-1)->event = erts_tse_fetch();
+ callbacks.arg = (void *) esdp->ssi;
+ callbacks.wakeup = thr_prgr_wakeup;
+ callbacks.prepare_wait = NULL;
+ callbacks.wait = NULL;
+ callbacks.finalize_wait = NULL;
+
+ erts_thr_progress_register_unmanaged_thread(&callbacks);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ {
+ char buf[31];
+ erts_snprintf(&buf[0], 31, "dirty io scheduler %beu", no);
+ erts_lc_set_thread_name(&buf[0]);
+ }
+#endif
+ erts_tsd_set(sched_data_key, vesdp);
+#if ERTS_USE_ASYNC_READY_Q
+ esdp->aux_work_data.async_ready.queue = NULL;
+#endif
+
+ erts_proc_lock_prepare_proc_lock_waiter();
+
+#ifdef HIPE
+ hipe_thread_signal_init();
+#endif
+ erts_thread_init_float();
+
+ erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+ ASSERT(erts_smp_atomic32_read_nob(&schdlr_sspnd.dirty_io_changing)
+ & ERTS_SCHDLR_SSPND_CHNG_ONLN);
+
+ if (--schdlr_sspnd.dirty_io_curr_online == schdlr_sspnd.dirty_io_wait_curr_online) {
+ erts_smp_atomic32_read_band_nob(&schdlr_sspnd.dirty_io_changing,
+ ~ERTS_SCHDLR_SSPND_CHNG_ONLN);
+ if (no != 1)
+ erts_smp_cnd_broadcast(&schdlr_sspnd.cnd);
+ }
+
+ if (no == 1) {
+ while (schdlr_sspnd.dirty_io_curr_online != schdlr_sspnd.dirty_io_wait_curr_online)
+ erts_smp_cnd_wait(&schdlr_sspnd.cnd, &schdlr_sspnd.mtx);
+ ERTS_SCHDLR_SSPND_DIRTY_IO_CHNG_SET(0, ERTS_SCHDLR_SSPND_CHNG_WAITER);
+ }
+ erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
+
+ process_main();
+ /* No schedulers should *ever* terminate */
+ erl_exit(ERTS_ABORT_EXIT,
+ "Dirty I/O scheduler thread number %beu terminated\n",
+ no);
+ return NULL;
+}
+#endif
+#endif
+
static ethr_tid aux_tid;
void
erts_start_schedulers(void)
{
int res = 0;
- Uint actual = 0;
+ Uint actual;
Uint wanted = erts_no_schedulers;
Uint wanted_no_schedulers = erts_no_schedulers;
ethr_thr_opts opts = ETHR_THR_OPTS_DEFAULT_INITER;
opts.detached = 1;
+#ifdef ETHR_HAVE_THREAD_NAMES
+ opts.name = malloc(80);
+#endif
+
#ifdef ERTS_SMP
if (erts_runq_supervision_interval) {
opts.suggested_stack_size = 16;
+#ifdef ETHR_HAVE_THREAD_NAMES
+ sprintf(opts.name, "runq_supervisor");
+#endif
erts_atomic_init_nob(&runq_supervisor_sleeping, 0);
if (0 != ethr_event_init(&runq_supervision_event))
erl_exit(1, "Failed to create run-queue supervision event\n");
@@ -5843,21 +7840,65 @@ erts_start_schedulers(void)
res = ENOTSUP;
}
- while (actual < wanted) {
+ for (actual = 0; actual < wanted; actual++) {
ErtsSchedulerData *esdp = ERTS_SCHEDULER_IX(actual);
- actual++;
- ASSERT(actual == esdp->no);
- res = ethr_thr_create(&esdp->tid,sched_thread_func,(void*)esdp,&opts);
+
+ ASSERT(actual == esdp->no - 1);
+
+#ifdef ETHR_HAVE_THREAD_NAMES
+ sprintf(opts.name, "scheduler_%d", actual + 1);
+#endif
+
+#ifdef __OSE__
+ /* This should be done in the bind strategy */
+ opts.coreNo = (actual+1) % ose_num_cpus();
+#endif
+
+ res = ethr_thr_create(&esdp->tid, sched_thread_func, (void*)esdp, &opts);
+
if (res != 0) {
- actual--;
- break;
+ break;
}
}
-
+
erts_no_schedulers = actual;
+#ifdef ERTS_DIRTY_SCHEDULERS
+#ifdef ERTS_SMP
+ {
+ int ix;
+ for (ix = 0; ix < erts_no_dirty_cpu_schedulers; ix++) {
+ ErtsSchedulerData *esdp = ERTS_DIRTY_CPU_SCHEDULER_IX(ix);
+#ifdef ETHR_HAVE_THREAD_NAMES
+ sprintf(opts.name,"dirty_cpu_scheduler_%d", ix + 1);
+#endif
+ res = ethr_thr_create(&esdp->tid,sched_dirty_cpu_thread_func,(void*)esdp,&opts);
+ if (res != 0)
+ erl_exit(1, "Failed to create dirty cpu scheduler thread %d\n", ix);
+ }
+ for (ix = 0; ix < erts_no_dirty_io_schedulers; ix++) {
+ ErtsSchedulerData *esdp = ERTS_DIRTY_IO_SCHEDULER_IX(ix);
+#ifdef ETHR_HAVE_THREAD_NAMES
+ sprintf(opts.name,"dirty_io_scheduler_%d", ix + 1);
+#endif
+ res = ethr_thr_create(&esdp->tid,sched_dirty_io_thread_func,(void*)esdp,&opts);
+ if (res != 0)
+ erl_exit(1, "Failed to create dirty io scheduler thread %d\n", ix);
+ }
+ }
+#endif
+#endif
+
ERTS_THR_MEMORY_BARRIER;
+#ifdef ETHR_HAVE_THREAD_NAMES
+ sprintf(opts.name, "aux");
+#endif
+
+#ifdef __OSE__
+ opts.coreNo = 0;
+#endif /* __OSE__ */
+
res = ethr_thr_create(&aux_tid, aux_thread, NULL, &opts);
if (res != 0)
erl_exit(1, "Failed to create aux thread\n");
@@ -5877,6 +7918,10 @@ erts_start_schedulers(void)
actual, actual == 1 ? " was" : "s were");
erts_send_error_to_logger_nogl(dsbufp);
}
+
+#ifdef ETHR_HAVE_THREAD_NAMES
+ free(opts.name);
+#endif
}
#endif /* ERTS_SMP */
@@ -6032,7 +8077,8 @@ pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks,
goto done;
}
else {
- if (!(ERTS_PSFLG_RUNNING & erts_smp_atomic32_read_acqb(&rp->state)))
+ if (!((ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS)
+ & erts_smp_atomic32_read_acqb(&rp->state)))
goto done;
}
@@ -6695,7 +8741,7 @@ Eterm
erts_get_process_priority(Process *p)
{
erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state);
- switch (state & ERTS_PSFLG_PRIO_MASK) {
+ switch (ERTS_PSFLGS_GET_USR_PRIO(state)) {
case PRIORITY_MAX: return am_max;
case PRIORITY_HIGH: return am_high;
case PRIORITY_NORMAL: return am_normal;
@@ -6718,18 +8764,68 @@ erts_set_process_priority(Process *p, Eterm value)
}
a = erts_smp_atomic32_read_nob(&p->state);
- if (nprio == (a & ERTS_PSFLG_PRIO_MASK))
+ if (nprio == ERTS_PSFLGS_GET_USR_PRIO(a))
oprio = nprio;
else {
- erts_aint32_t e, n;
+ int slocked = 0;
+ erts_aint32_t e, n, aprio;
+
+ if (a & ERTS_PSFLG_ACTIVE_SYS) {
+ erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS);
+ slocked = 1;
+ }
+
do {
- oprio = a & ERTS_PSFLG_PRIO_MASK;
+ oprio = ERTS_PSFLGS_GET_USR_PRIO(a);
n = e = a;
- ASSERT(!(a & ERTS_PSFLG_IN_RUNQ));
+ if (!(a & (ERTS_PSFLG_ACTIVE_SYS|ERTS_PSFLG_DELAYED_SYS)))
+ aprio = nprio;
+ else {
+ int max_qbit;
+
+ if (!slocked) {
+ erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS);
+ slocked = 1;
+ }
+
+ max_qbit = 0;
+ if (a & ERTS_PSFLG_ACTIVE_SYS)
+ max_qbit |= p->sys_task_qs->qmask;
+ if (a & ERTS_PSFLG_DELAYED_SYS) {
+ ErtsProcSysTaskQs *qs;
+ qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(p);
+ ASSERT(qs);
+ max_qbit |= qs->qmask;
+ }
+ max_qbit &= -max_qbit;
+ switch (max_qbit) {
+ case MAX_BIT:
+ aprio = PRIORITY_MAX;
+ break;
+ case HIGH_BIT:
+ aprio = PRIORITY_HIGH;
+ break;
+ case NORMAL_BIT:
+ aprio = PRIORITY_NORMAL;
+ break;
+ case LOW_BIT:
+ aprio = PRIORITY_LOW;
+ break;
+ default:
+ ERTS_INTERNAL_ERROR("Invalid qmask");
+ aprio = -1;
+ }
+
+ if (aprio > nprio) /* low value -> high prio */
+ aprio = nprio;
+ }
+
+ n &= ~(ERTS_PSFLGS_USR_PRIO_MASK
+ | ERTS_PSFLGS_ACT_PRIO_MASK);
+ n |= ((nprio << ERTS_PSFLGS_USR_PRIO_OFFSET)
+ | (aprio << ERTS_PSFLGS_ACT_PRIO_OFFSET));
- n &= ~ERTS_PSFLG_PRIO_MASK;
- n |= nprio;
a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e);
} while (a != e);
}
@@ -6763,6 +8859,7 @@ erts_set_process_priority(Process *p, Eterm value)
Process *schedule(Process *p, int calls)
{
+ Process *proxy_p = NULL;
ErtsRunQueue *rq;
erts_aint_t dt;
ErtsSchedulerData *esdp;
@@ -6792,7 +8889,8 @@ Process *schedule(Process *p, int calls)
input_reductions = INPUT_REDUCTIONS;
}
- ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
+ ERTS_SMP_LC_ASSERT(ERTS_SCHEDULER_IS_DIRTY(erts_get_scheduler_data())
+ || !erts_thr_progress_is_blocking());
/*
* Clean up after the process being scheduled out.
@@ -6805,6 +8903,8 @@ Process *schedule(Process *p, int calls)
actual_reds = reds = 0;
erts_smp_runq_lock(rq);
} else {
+ sched_out_proc:
+
#ifdef ERTS_SMP
ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p);
esdp = p->scheduler_data;
@@ -6858,10 +8958,11 @@ Process *schedule(Process *p, int calls)
esdp->reductions += reds;
- schedule_out_process(rq, state, p); /* Returns with rq locked! */
+ schedule_out_process(rq, state, p, proxy_p); /* Returns with rq locked! */
+ proxy_p = NULL;
ERTS_PROC_REDUCTIONS_EXECUTED(rq,
- (int) (state & ERTS_PSFLG_PRIO_MASK),
+ (int) ERTS_PSFLGS_GET_USR_PRIO(state),
reds,
actual_reds);
@@ -6870,18 +8971,19 @@ Process *schedule(Process *p, int calls)
p->scheduler_data = NULL;
#endif
+ erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
if (state & ERTS_PSFLG_FREE) {
#ifdef ERTS_SMP
ASSERT(esdp->free_process == p);
esdp->free_process = NULL;
-#else
- erts_free_proc(p);
+#else
+ state = erts_smp_atomic32_read_nob(&p->state);
+ if (!(state & ERTS_PSFLG_IN_RUNQ))
+ erts_free_proc(p);
#endif
}
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
-
#ifdef ERTS_SMP
ASSERT(!esdp->free_process);
#endif
@@ -6899,45 +9001,42 @@ Process *schedule(Process *p, int calls)
}
- ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
+ ERTS_SMP_LC_ASSERT(ERTS_SCHEDULER_IS_DIRTY(esdp)
+ || !erts_thr_progress_is_blocking());
check_activities_to_run: {
#ifdef ERTS_SMP
ErtsMigrationPaths *mps;
ErtsMigrationPath *mp;
-
-#ifdef ERTS_SMP
- {
- ErtsProcList *pnd_xtrs = rq->procs.pending_exiters;
- if (erts_proclist_fetch(&pnd_xtrs, NULL)) {
- rq->procs.pending_exiters = NULL;
- erts_smp_runq_unlock(rq);
- handle_pending_exiters(pnd_xtrs);
- erts_smp_runq_lock(rq);
- }
-
+ ErtsProcList *pnd_xtrs = rq->procs.pending_exiters;
+ if (erts_proclist_fetch(&pnd_xtrs, NULL)) {
+ rq->procs.pending_exiters = NULL;
+ erts_smp_runq_unlock(rq);
+ handle_pending_exiters(pnd_xtrs);
+ erts_smp_runq_lock(rq);
}
-#endif
- if (rq->check_balance_reds <= 0)
- check_balance(rq);
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) {
+ if (rq->check_balance_reds <= 0)
+ check_balance(rq);
- ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+ ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
- mps = erts_get_migration_paths_managed();
- mp = &mps->mpath[rq->ix];
+ mps = erts_get_migration_paths_managed();
+ mp = &mps->mpath[rq->ix];
- if (mp->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK)
- immigrate(rq, mp);
+ if (mp->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK)
+ immigrate(rq, mp);
+ }
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
continue_check_activities_to_run:
flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
continue_check_activities_to_run_known_flags:
-
+ ASSERT(ERTS_SCHEDULER_IS_DIRTY(esdp)
+ || flags & ERTS_RUNQ_FLG_NONEMPTY);
if (flags & (ERTS_RUNQ_FLG_CHK_CPU_BIND|ERTS_RUNQ_FLG_SUSPENDED)) {
-
if (flags & ERTS_RUNQ_FLG_SUSPENDED) {
suspend_scheduler(esdp);
flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
@@ -6948,22 +9047,32 @@ Process *schedule(Process *p, int calls)
erts_sched_check_cpu_bind(esdp);
}
}
+#ifdef ERTS_DIRTY_SCHEDULERS
+ else if (ERTS_SCHEDULER_IS_DIRTY(esdp)
+ && (erts_smp_atomic32_read_acqb(&esdp->ssi->flags)
+ & ERTS_SSI_FLG_SUSPENDED))
+ suspend_scheduler(esdp);
+#endif
{
erts_aint32_t aux_work;
- int leader_update = erts_thr_progress_update(esdp);
+ int leader_update = ERTS_SCHEDULER_IS_DIRTY(esdp) ? 0
+ : erts_thr_progress_update(esdp);
aux_work = erts_atomic32_read_acqb(&esdp->ssi->aux_work);
- if (aux_work | leader_update) {
+ if (aux_work | leader_update | ERTS_SCHED_FAIR) {
erts_smp_runq_unlock(rq);
if (leader_update)
erts_thr_progress_leader_update(esdp);
+ else if (ERTS_SCHED_FAIR)
+ ERTS_SCHED_FAIR_YIELD();
if (aux_work)
handle_aux_work(&esdp->aux_work_data, aux_work, 0);
erts_smp_runq_lock(rq);
}
- }
- ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
+ ERTS_SMP_LC_ASSERT(ERTS_SCHEDULER_IS_DIRTY(esdp)
+ || !erts_thr_progress_is_blocking());
+ }
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
#else /* ERTS_SMP */
@@ -6977,6 +9086,17 @@ Process *schedule(Process *p, int calls)
flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix) && rq->halt_in_progress) {
+ /*
+ * TODO: if halt in progress, need to put the dirty scheduler
+ * to sleep somewhere around here to prevent it from picking up
+ * new work
+ */
+ }
+ else
+#endif
+
if ((!(flags & ERTS_RUNQ_FLGS_QMASK) && !rq->misc.start)
|| (rq->halt_in_progress && ERTS_EMPTY_RUNQ_PORTS(rq))) {
/* Prepare for scheduler wait */
@@ -6986,20 +9106,16 @@ Process *schedule(Process *p, int calls)
rq->wakeup_other = 0;
rq->wakeup_other_reds = 0;
- empty_runq(rq);
-
flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
- if (flags & ERTS_RUNQ_FLG_SUSPENDED) {
- non_empty_runq(rq);
+ if (flags & ERTS_RUNQ_FLG_SUSPENDED)
goto continue_check_activities_to_run_known_flags;
- }
- else if (!(flags & ERTS_RUNQ_FLG_INACTIVE)) {
- if (try_steal_task(rq)) {
- non_empty_runq(rq);
+ if (flags & ERTS_RUNQ_FLG_INACTIVE)
+ empty_runq(rq);
+ else {
+ if (!ERTS_RUNQ_IX_IS_DIRTY(rq->ix) && try_steal_task(rq))
goto continue_check_activities_to_run;
- }
- (void) ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
+ empty_runq(rq);
/*
* Check for ERTS_RUNQ_FLG_SUSPENDED has to be done
@@ -7008,10 +9124,10 @@ Process *schedule(Process *p, int calls)
flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
if (flags & ERTS_RUNQ_FLG_SUSPENDED) {
non_empty_runq(rq);
+ flags |= ERTS_RUNQ_FLG_NONEMPTY;
goto continue_check_activities_to_run_known_flags;
}
}
-
#endif
scheduler_wait(&fcalls, esdp, rq);
@@ -7022,7 +9138,9 @@ Process *schedule(Process *p, int calls)
goto check_activities_to_run;
}
- else if (fcalls > input_reductions && prepare_for_sys_schedule()) {
+ else if (!ERTS_SCHEDULER_IS_DIRTY(esdp) &&
+ (fcalls > input_reductions &&
+ prepare_for_sys_schedule(esdp))) {
/*
* Schedule system-level activities.
*/
@@ -7088,6 +9206,8 @@ Process *schedule(Process *p, int calls)
* Find a new process to run.
*/
pick_next_process: {
+ erts_aint32_t psflg_band_mask;
+ erts_aint32_t running_flag;
int prio_q;
int qmask;
@@ -7121,18 +9241,74 @@ Process *schedule(Process *p, int calls)
ASSERT(p); /* Wrong qmask in rq->flags? */
+ psflg_band_mask = ~(((erts_aint32_t) 1) << (ERTS_PSFLGS_GET_PRQ_PRIO(state)
+ + ERTS_PSFLGS_IN_PRQ_MASK_OFFSET));
+
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT((state & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC)) !=
+ (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC));
+ if (state & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC)) {
+ ASSERT((ERTS_SCHEDULER_IS_DIRTY_CPU(esdp) && (state & ERTS_PSFLG_DIRTY_CPU_PROC)) ||
+ (ERTS_SCHEDULER_IS_DIRTY_IO(esdp) && (state & ERTS_PSFLG_DIRTY_IO_PROC)));
+ if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && !(state & ERTS_PSFLG_ACTIVE_SYS))
+ goto pick_next_process;
+ state &= ~(ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q|ERTS_PSFLG_DIRTY_IO_PROC_IN_Q);
+ }
+#endif
+
+ if (!(state & ERTS_PSFLG_PROXY))
+ psflg_band_mask &= ~ERTS_PSFLG_IN_RUNQ;
+ else {
+ proxy_p = p;
+ p = erts_proc_lookup_raw(proxy_p->common.id);
+ if (!p) {
+ free_proxy_proc(proxy_p);
+ proxy_p = NULL;
+ goto pick_next_process;
+ }
+ state = erts_smp_atomic32_read_nob(&p->state);
+ }
+
+
+ if (state & ERTS_PSFLG_ACTIVE_SYS)
+ running_flag = ERTS_PSFLG_RUNNING_SYS;
+ else
+ running_flag = ERTS_PSFLG_RUNNING;
+
while (1) {
erts_aint32_t exp, new, tmp;
tmp = new = exp = state;
- new &= ~ERTS_PSFLG_IN_RUNQ;
- tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT);
- if (tmp != ERTS_PSFLG_SUSPENDED)
- new |= ERTS_PSFLG_RUNNING;
+ new &= psflg_band_mask;
+ if (!(state & (ERTS_PSFLG_RUNNING
+ | ERTS_PSFLG_RUNNING_SYS))) {
+ tmp = state & (ERTS_PSFLG_SUSPENDED
+ | ERTS_PSFLG_PENDING_EXIT
+ | ERTS_PSFLG_ACTIVE_SYS);
+ if (tmp != ERTS_PSFLG_SUSPENDED)
+ new |= running_flag;
+ }
state = erts_smp_atomic32_cmpxchg_relb(&p->state, new, exp);
if (state == exp) {
- tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT);
- if (tmp == ERTS_PSFLG_SUSPENDED)
+ if ((state & (ERTS_PSFLG_RUNNING
+ | ERTS_PSFLG_RUNNING_SYS
+ | ERTS_PSFLG_FREE))
+ || ((state & (ERTS_PSFLG_SUSPENDED
+ | ERTS_PSFLG_PENDING_EXIT
+ | ERTS_PSFLG_ACTIVE_SYS))
+ == ERTS_PSFLG_SUSPENDED)) {
+ if (state & ERTS_PSFLG_FREE) {
+#ifdef ERTS_SMP
+ erts_smp_proc_dec_refc(p);
+#else
+ erts_free_proc(p);
+#endif
+ }
+ if (proxy_p) {
+ free_proxy_proc(proxy_p);
+ proxy_p = NULL;
+ }
goto pick_next_process;
+ }
state = new;
break;
}
@@ -7162,7 +9338,11 @@ Process *schedule(Process *p, int calls)
(UWord) esdp->no);
int migrated = old && old != esdp->no;
- prio = (int) (state & ERTS_PSFLG_PRIO_MASK);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp));
+#endif
+
+ prio = (int) ERTS_PSFLGS_GET_USR_PRIO(state);
erts_smp_spin_lock(&erts_sched_stat.lock);
erts_sched_stat.prio[prio].total_executed++;
@@ -7182,9 +9362,6 @@ Process *schedule(Process *p, int calls)
ASSERT(!p->scheduler_data);
p->scheduler_data = esdp;
#endif
- /* Never run a suspended process */
- ASSERT(!(ERTS_PSFLG_SUSPENDED & erts_smp_atomic32_read_nob(&p->state)));
-
reds = context_reds;
if (IS_TRACED(p)) {
@@ -7210,21 +9387,784 @@ Process *schedule(Process *p, int calls)
erts_check_my_tracer_proc(p);
#endif
+ if (state & ERTS_PSFLG_RUNNING_SYS) {
+ reds -= execute_sys_tasks(p, &state, reds);
+ if (reds <= 0
+#ifdef ERTS_DIRTY_SCHEDULERS
+ || (state & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC))
+#endif
+ ) {
+ p->fcalls = reds;
+ goto sched_out_proc;
+ }
+
+ ASSERT(state & ERTS_PSFLG_RUNNING_SYS);
+ ASSERT(!(state & ERTS_PSFLG_RUNNING));
+
+ while (1) {
+ erts_aint32_t n, e;
+
+ if (((state & (ERTS_PSFLG_SUSPENDED
+ | ERTS_PSFLG_ACTIVE)) != ERTS_PSFLG_ACTIVE)
+ && !(state & ERTS_PSFLG_EXITING))
+ goto sched_out_proc;
+
+ n = e = state;
+ n &= ~ERTS_PSFLG_RUNNING_SYS;
+ n |= ERTS_PSFLG_RUNNING;
+
+ state = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e);
+ if (state == e) {
+ state = n;
+ break;
+ }
+
+ ASSERT(state & ERTS_PSFLG_RUNNING_SYS);
+ ASSERT(!(state & ERTS_PSFLG_RUNNING));
+ }
+ }
+
if (!(state & ERTS_PSFLG_EXITING)
&& ((FLAGS(p) & F_FORCE_GC)
|| (MSO(p).overhead > BIN_VHEAP_SZ(p)))) {
reds -= erts_garbage_collect(p, 0, p->arg_reg, p->arity);
- if (reds < 0) {
- reds = 1;
+ if (reds <= 0) {
+ p->fcalls = reds;
+ goto sched_out_proc;
}
}
+
+ if (proxy_p) {
+ free_proxy_proc(proxy_p);
+ proxy_p = NULL;
+ }
p->fcalls = reds;
ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p);
+
+ /* Never run a suspended process */
+ ASSERT(!(ERTS_PSFLG_SUSPENDED & erts_smp_atomic32_read_nob(&p->state)));
+
return p;
}
}
+static int
+notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, Eterm st_result)
+{
+ Process *rp = erts_proc_lookup(st->requester);
+ if (rp) {
+ ErtsProcLocks rp_locks;
+ ErlOffHeap *ohp;
+ ErlHeapFragment* bp;
+ Eterm *hp, msg, req_id, result;
+ Uint st_result_sz, hsz;
+#ifdef DEBUG
+ Eterm *hp_start;
+#endif
+
+ rp_locks = (c_p == rp) ? ERTS_PROC_LOCK_MAIN : 0;
+
+ st_result_sz = is_immed(st_result) ? 0 : size_object(st_result);
+ hsz = st->req_id_sz + st_result_sz + 4 /* 3-tuple */;
+
+ hp = erts_alloc_message_heap(hsz,
+ &bp,
+ &ohp,
+ rp,
+ &rp_locks);
+
+#ifdef DEBUG
+ hp_start = hp;
+#endif
+
+ req_id = st->req_id_sz == 0 ? st->req_id : copy_struct(st->req_id,
+ st->req_id_sz,
+ &hp,
+ ohp);
+
+ result = st_result_sz == 0 ? st_result : copy_struct(st_result,
+ st_result_sz,
+ &hp,
+ ohp);
+
+ ASSERT(is_immed(st->reply_tag));
+
+ msg = TUPLE3(hp, st->reply_tag, req_id, result);
+
+#ifdef DEBUG
+ hp += 4;
+ ASSERT(hp_start + hsz == hp);
+#endif
+
+ erts_queue_message(rp,
+ &rp_locks,
+ bp,
+ msg,
+ NIL
+#ifdef USE_VM_PROBES
+ , NIL
+#endif
+ );
+
+ if (c_p == rp)
+ rp_locks &= ~ERTS_PROC_LOCK_MAIN;
+
+ if (rp_locks)
+ erts_smp_proc_unlock(rp, rp_locks);
+ }
+
+ erts_cleanup_offheap(&st->off_heap);
+
+ erts_free(ERTS_ALC_T_PROC_SYS_TSK, st);
+
+ return rp ? 1 : 0;
+}
+
+static ERTS_INLINE ErtsProcSysTask *
+fetch_sys_task(Process *c_p, erts_aint32_t state, int *qmaskp, int *priop)
+{
+ ErtsProcSysTaskQs *unused_qs = NULL;
+ int qbit, qmask;
+ ErtsProcSysTask *st, **qp;
+
+ *priop = -1; /* Shut up annoying erroneous warning */
+
+ erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS);
+
+ if (!c_p->sys_task_qs) {
+ qmask = 0;
+ st = NULL;
+ goto update_state;
+ }
+
+ qmask = c_p->sys_task_qs->qmask;
+
+ if ((state & (ERTS_PSFLG_ACTIVE
+ | ERTS_PSFLG_EXITING
+ | ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) {
+ /* No sys tasks if we got exclusively higher prio user work to do */
+ st = NULL;
+ switch (ERTS_PSFLGS_GET_USR_PRIO(state)) {
+ case PRIORITY_MAX:
+ if (!(qmask & MAX_BIT))
+ goto done;
+ break;
+ case PRIORITY_HIGH:
+ if (!(qmask & (MAX_BIT|HIGH_BIT)))
+ goto done;
+ break;
+ default:
+ break;
+ }
+ }
+
+ qbit = qmask & -qmask;
+ switch (qbit) {
+ case MAX_BIT:
+ qp = &c_p->sys_task_qs->q[PRIORITY_MAX];
+ *priop = PRIORITY_MAX;
+ break;
+ case HIGH_BIT:
+ qp = &c_p->sys_task_qs->q[PRIORITY_HIGH];
+ *priop = PRIORITY_HIGH;
+ break;
+ case NORMAL_BIT:
+ if (!(qmask & PRIORITY_LOW)
+ || ++c_p->sys_task_qs->ncount <= RESCHEDULE_LOW) {
+ qp = &c_p->sys_task_qs->q[PRIORITY_NORMAL];
+ *priop = PRIORITY_NORMAL;
+ break;
+ }
+ c_p->sys_task_qs->ncount = 0;
+ /* Fall through */
+ case LOW_BIT:
+ qp = &c_p->sys_task_qs->q[PRIORITY_LOW];
+ *priop = PRIORITY_LOW;
+ break;
+ default:
+ ERTS_INTERNAL_ERROR("Invalid qmask");
+ }
+
+ st = *qp;
+ ASSERT(st);
+ if (st->next != st) {
+ *qp = st->next;
+ st->next->prev = st->prev;
+ st->prev->next = st->next;
+ }
+ else {
+ erts_aint32_t a, e, n, st_prio, qmask2;
+
+ *qp = NULL;
+ qmask &= ~qbit;
+ c_p->sys_task_qs->qmask = qmask;
+
+ update_state:
+
+ qmask2 = qmask;
+
+ if (state & ERTS_PSFLG_DELAYED_SYS) {
+ ErtsProcSysTaskQs *qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(c_p);
+ ASSERT(qs);
+ qmask2 |= qs->qmask;
+ }
+
+ switch (qmask2 & -qmask2) {
+ case MAX_BIT:
+ st_prio = PRIORITY_MAX;
+ break;
+ case HIGH_BIT:
+ st_prio = PRIORITY_HIGH;
+ break;
+ case NORMAL_BIT:
+ st_prio = PRIORITY_NORMAL;
+ break;
+ case LOW_BIT:
+ case 0:
+ st_prio = PRIORITY_LOW;
+ break;
+ default:
+ ERTS_INTERNAL_ERROR("Invalid qmask");
+ }
+
+ if (!qmask) {
+ unused_qs = c_p->sys_task_qs;
+ c_p->sys_task_qs = NULL;
+ }
+
+ a = state;
+ do {
+ erts_aint32_t prio = ERTS_PSFLGS_GET_USR_PRIO(a);
+
+ if (prio > st_prio)
+ prio = st_prio;
+
+ n = e = a;
+
+ n &= ~ERTS_PSFLGS_ACT_PRIO_MASK;
+ n |= (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET);
+
+ if (!qmask)
+ n &= ~ERTS_PSFLG_ACTIVE_SYS;
+
+ if (a == n)
+ break;
+ a = erts_smp_atomic32_cmpxchg_nob(&c_p->state, n, e);
+ } while (a != e);
+ }
+
+done:
+
+ erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS);
+
+ if (unused_qs)
+ proc_sys_task_queues_free(unused_qs);
+
+ *qmaskp = qmask;
+
+ return st;
+}
+
+static void save_gc_task(Process *c_p, ErtsProcSysTask *st, int prio);
+
+static int
+execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds)
+{
+ int garbage_collected = 0;
+ erts_aint32_t state = *statep;
+ int max_reds = in_reds;
+ int reds = 0;
+ int qmask = 0;
+
+ ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);
+
+ do {
+ ErtsProcSysTask *st;
+ int st_prio;
+ Eterm st_res;
+
+ if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) {
+#ifdef ERTS_SMP
+ if (state & ERTS_PSFLG_PENDING_EXIT)
+ erts_handle_pending_exit(c_p, ERTS_PROC_LOCK_MAIN);
+#endif
+ ASSERT(ERTS_PROC_IS_EXITING(c_p));
+ break;
+ }
+
+ st = fetch_sys_task(c_p, state, &qmask, &st_prio);
+ if (!st)
+ break;
+
+ switch (st->type) {
+ case ERTS_PSTT_GC:
+ if (c_p->flags & F_DISABLE_GC) {
+ save_gc_task(c_p, st, st_prio);
+ st = NULL;
+ reds++;
+ }
+ else {
+ if (!garbage_collected) {
+ FLAGS(c_p) |= F_NEED_FULLSWEEP;
+ reds += erts_garbage_collect(c_p,
+ 0,
+ c_p->arg_reg,
+ c_p->arity);
+ garbage_collected = 1;
+ }
+ st_res = am_true;
+ }
+ break;
+ case ERTS_PSTT_CPC:
+ st_res = erts_check_process_code(c_p,
+ st->arg[0],
+ st->arg[1] == am_true,
+ &reds);
+ if (is_non_value(st_res)) {
+ /* Needed gc, but gc was disabled */
+ save_gc_task(c_p, st, st_prio);
+ st = NULL;
+ }
+ break;
+ default:
+ ERTS_INTERNAL_ERROR("Invalid process sys task type");
+ st_res = am_false;
+ }
+
+ if (st)
+ reds += notify_sys_task_executed(c_p, st, st_res);
+
+ state = erts_smp_atomic32_read_acqb(&c_p->state);
+ } while (qmask && reds < max_reds);
+
+ *statep = state;
+
+ return reds;
+}
+
+static int
+cleanup_sys_tasks(Process *c_p, erts_aint32_t in_state, int in_reds)
+{
+ erts_aint32_t state = in_state;
+ int max_reds = in_reds;
+ int reds = 0;
+ int qmask = 0;
+
+ ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);
+
+ do {
+ ErtsProcSysTask *st;
+ Eterm st_res;
+ int st_prio;
+
+ st = fetch_sys_task(c_p, state, &qmask, &st_prio);
+ if (!st)
+ break;
+
+ switch (st->type) {
+ case ERTS_PSTT_GC:
+ st_res = am_false;
+ break;
+ case ERTS_PSTT_CPC:
+ st_res = am_false;
+ break;
+ default:
+ ERTS_INTERNAL_ERROR("Invalid process sys task type");
+ st_res = am_false;
+ break;
+ }
+
+ reds += notify_sys_task_executed(c_p, st, st_res);
+
+ state = erts_smp_atomic32_read_acqb(&c_p->state);
+ } while (qmask && reds < max_reds);
+
+ return reds;
+}
+
+BIF_RETTYPE
+erts_internal_request_system_task_3(BIF_ALIST_3)
+{
+ Process *rp = erts_proc_lookup(BIF_ARG_1);
+ ErtsProcSysTaskQs *stqs, *free_stqs = NULL;
+ ErtsProcSysTask *st = NULL;
+ erts_aint32_t prio, rp_state;
+ int rp_locked;
+ Eterm noproc_res, req_type;
+
+ if (!rp && !is_internal_pid(BIF_ARG_1)) {
+ if (!is_external_pid(BIF_ARG_1))
+ goto badarg;
+ if (external_pid_dist_entry(BIF_ARG_1) != erts_this_dist_entry)
+ goto badarg;
+ }
+
+ switch (BIF_ARG_2) {
+ case am_max: prio = PRIORITY_MAX; break;
+ case am_high: prio = PRIORITY_HIGH; break;
+ case am_normal: prio = PRIORITY_NORMAL; break;
+ case am_low: prio = PRIORITY_LOW; break;
+ default: goto badarg;
+ }
+
+ if (is_not_tuple(BIF_ARG_3))
+ goto badarg;
+ else {
+ int i;
+ Eterm *tp = tuple_val(BIF_ARG_3);
+ Uint arity = arityval(*tp);
+ Eterm req_id;
+ Uint req_id_sz;
+ Eterm arg[ERTS_MAX_PROC_SYS_TASK_ARGS];
+ Uint arg_sz[ERTS_MAX_PROC_SYS_TASK_ARGS];
+ Uint tot_sz;
+ Eterm *hp;
+
+ if (arity < 2)
+ goto badarg;
+ if (arity > 2 + ERTS_MAX_PROC_SYS_TASK_ARGS)
+ goto badarg;
+ req_type = tp[1];
+ req_id = tp[2];
+ req_id_sz = is_immed(req_id) ? req_id : size_object(req_id);
+ tot_sz = req_id_sz;
+ for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++) {
+ int tix = 3 + i;
+ if (tix > arity) {
+ arg[i] = THE_NON_VALUE;
+ arg_sz[i] = 0;
+ }
+ else {
+ arg[i] = tp[tix];
+ if (is_immed(arg[i]))
+ arg_sz[i] = 0;
+ else {
+ arg_sz[i] = size_object(arg[i]);
+ tot_sz += arg_sz[i];
+ }
+ }
+ }
+ st = erts_alloc(ERTS_ALC_T_PROC_SYS_TSK,
+ ERTS_PROC_SYS_TASK_SIZE(tot_sz));
+ st->next = st->prev = st; /* Prep for empty prio queue */
+ ERTS_INIT_OFF_HEAP(&st->off_heap);
+ hp = &st->heap[0];
+
+ st->requester = BIF_P->common.id;
+ st->reply_tag = req_type;
+ st->req_id_sz = req_id_sz;
+ st->req_id = req_id_sz == 0 ? req_id : copy_struct(req_id,
+ req_id_sz,
+ &hp,
+ &st->off_heap);
+
+ for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++)
+ st->arg[i] = arg_sz[i] == 0 ? arg[i] : copy_struct(arg[i],
+ arg_sz[i],
+ &hp,
+ &st->off_heap);
+ ASSERT(&st->heap[0] + tot_sz == hp);
+ }
+
+ switch (req_type) {
+
+ case am_garbage_collect:
+ st->type = ERTS_PSTT_GC;
+ noproc_res = am_false;
+ if (!rp)
+ goto noproc;
+ break;
+
+ case am_check_process_code:
+ if (is_not_atom(st->arg[0]))
+ goto badarg;
+ if (st->arg[1] != am_true && st->arg[1] != am_false)
+ goto badarg;
+ noproc_res = am_false;
+ st->type = ERTS_PSTT_CPC;
+ if (!rp)
+ goto noproc;
+ break;
+
+ default:
+ goto badarg;
+ }
+
+ rp_state = erts_smp_atomic32_read_nob(&rp->state);
+
+ rp_locked = 0;
+
+ free_stqs = NULL;
+ if (rp_state & ERTS_PSFLG_ACTIVE_SYS)
+ stqs = NULL;
+ else {
+ alloc_qs:
+ stqs = proc_sys_task_queues_alloc();
+ stqs->qmask = 1 << prio;
+ stqs->ncount = 0;
+ stqs->q[PRIORITY_MAX] = NULL;
+ stqs->q[PRIORITY_HIGH] = NULL;
+ stqs->q[PRIORITY_NORMAL] = NULL;
+ stqs->q[PRIORITY_LOW] = NULL;
+ stqs->q[prio] = st;
+ }
+
+ if (!rp_locked) {
+ rp_locked = 1;
+ erts_smp_proc_lock(rp, ERTS_PROC_LOCK_STATUS);
+
+ rp_state = erts_smp_atomic32_read_nob(&rp->state);
+ if (rp_state & ERTS_PSFLG_EXITING) {
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
+ rp = NULL;
+ free_stqs = stqs;
+ goto noproc;
+ }
+ }
+
+ if (!rp->sys_task_qs) {
+ if (stqs)
+ rp->sys_task_qs = stqs;
+ else
+ goto alloc_qs;
+ }
+ else {
+ if (stqs)
+ free_stqs = stqs;
+ stqs = rp->sys_task_qs;
+ if (!stqs->q[prio]) {
+ stqs->q[prio] = st;
+ stqs->qmask |= 1 << prio;
+ }
+ else {
+ st->next = stqs->q[prio];
+ st->prev = stqs->q[prio]->prev;
+ st->next->prev = st;
+ st->prev->next = st;
+ ASSERT(stqs->qmask & (1 << prio));
+ }
+ }
+
+ if (ERTS_PSFLGS_GET_ACT_PRIO(rp_state) > prio) {
+ erts_aint32_t n, a, e;
+ /* Need to elevate actual prio */
+
+ a = rp_state;
+ do {
+ if (ERTS_PSFLGS_GET_ACT_PRIO(a) <= prio) {
+ n = a;
+ break;
+ }
+ n = e = a;
+ n &= ~ERTS_PSFLGS_ACT_PRIO_MASK;
+ n |= (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET);
+ a = erts_smp_atomic32_cmpxchg_nob(&rp->state, n, e);
+ } while (a != e);
+ rp_state = n;
+ }
+
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
+
+ schedule_process_sys_task(rp, rp_state, NULL);
+
+ if (free_stqs)
+ proc_sys_task_queues_free(free_stqs);
+
+ BIF_RET(am_ok);
+
+noproc:
+
+ notify_sys_task_executed(BIF_P, st, noproc_res);
+ if (free_stqs)
+ proc_sys_task_queues_free(free_stqs);
+ BIF_RET(am_ok);
+
+badarg:
+
+ if (st) {
+ erts_cleanup_offheap(&st->off_heap);
+ erts_free(ERTS_ALC_T_PROC_SYS_TSK, st);
+ }
+ if (free_stqs)
+ proc_sys_task_queues_free(free_stqs);
+ BIF_ERROR(BIF_P, BADARG);
+}
+
+static void
+save_gc_task(Process *c_p, ErtsProcSysTask *st, int prio)
+{
+ erts_aint32_t state;
+ ErtsProcSysTaskQs *qs;
+
+ ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
+
+ qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(c_p);
+ if (!qs) {
+ st->next = st->prev = st;
+ qs = proc_sys_task_queues_alloc();
+ qs->qmask = 1 << prio;
+ qs->ncount = 0;
+ qs->q[PRIORITY_MAX] = NULL;
+ qs->q[PRIORITY_HIGH] = NULL;
+ qs->q[PRIORITY_NORMAL] = NULL;
+ qs->q[PRIORITY_LOW] = NULL;
+ qs->q[prio] = st;
+ (void) ERTS_PROC_SET_DELAYED_GC_TASK_QS(c_p, ERTS_PROC_LOCK_MAIN, qs);
+ }
+ else {
+ if (!qs->q[prio]) {
+ st->next = st->prev = st;
+ qs->q[prio] = st;
+ qs->qmask |= 1 << prio;
+ }
+ else {
+ st->next = qs->q[prio];
+ st->prev = qs->q[prio]->prev;
+ st->next->prev = st;
+ st->prev->next = st;
+ ASSERT(qs->qmask & (1 << prio));
+ }
+ }
+
+ state = erts_smp_atomic32_read_nob(&c_p->state);
+ ASSERT((ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS) & state);
+
+ while (!(state & ERTS_PSFLG_DELAYED_SYS)
+ || prio < ERTS_PSFLGS_GET_ACT_PRIO(state)) {
+ erts_aint32_t n, e;
+
+ n = e = state;
+ n |= ERTS_PSFLG_DELAYED_SYS;
+ if (prio < ERTS_PSFLGS_GET_ACT_PRIO(state)) {
+ n &= ~ERTS_PSFLGS_ACT_PRIO_MASK;
+ n |= prio << ERTS_PSFLGS_ACT_PRIO_OFFSET;
+ }
+ state = erts_smp_atomic32_cmpxchg_relb(&c_p->state, n, e);
+ if (state == e)
+ break;
+ }
+}
+
+int
+erts_set_gc_state(Process *c_p, int enable)
+{
+ ErtsProcSysTaskQs *dgc_tsk_qs;
+ ASSERT(c_p == erts_get_current_process());
+ ASSERT((ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS)
+ & erts_smp_atomic32_read_nob(&c_p->state));
+ ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
+
+ if (!enable) {
+ c_p->flags |= F_DISABLE_GC;
+ return 0;
+ }
+
+ c_p->flags &= ~F_DISABLE_GC;
+
+ dgc_tsk_qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(c_p);
+ if (!dgc_tsk_qs)
+ return 0;
+
+ /* Move delayed gc tasks into sys tasks queues. */
+
+ erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS);
+
+ if (!c_p->sys_task_qs) {
+ c_p->sys_task_qs = dgc_tsk_qs;
+ dgc_tsk_qs = NULL;
+ }
+ else {
+ ErtsProcSysTaskQs *stsk_qs;
+ int prio;
+
+ /*
+ * We push delayed tasks to the front of the queue
+ * since they have already made it to the front
+ * once and then been delayed after that.
+ */
+
+ stsk_qs = c_p->sys_task_qs;
+
+ while (dgc_tsk_qs->qmask) {
+ int qbit = dgc_tsk_qs->qmask & -dgc_tsk_qs->qmask;
+ dgc_tsk_qs->qmask &= ~qbit;
+ switch (qbit) {
+ case MAX_BIT:
+ prio = PRIORITY_MAX;
+ break;
+ case HIGH_BIT:
+ prio = PRIORITY_HIGH;
+ break;
+ case NORMAL_BIT:
+ prio = PRIORITY_NORMAL;
+ break;
+ case LOW_BIT:
+ prio = PRIORITY_LOW;
+ break;
+ default:
+ ERTS_INTERNAL_ERROR("Invalid qmask");
+ prio = -1;
+ break;
+ }
+
+ ASSERT(dgc_tsk_qs->q[prio]);
+
+ if (!stsk_qs->q[prio]) {
+ stsk_qs->q[prio] = dgc_tsk_qs->q[prio];
+ stsk_qs->qmask |= 1 << prio;
+ }
+ else {
+ ErtsProcSysTask *first1, *last1, *first2, *last2;
+
+ ASSERT(stsk_qs->qmask & (1 << prio));
+ first1 = dgc_tsk_qs->q[prio];
+ last1 = first1->prev;
+ first2 = stsk_qs->q[prio];
+ last2 = first1->prev;
+
+ last1->next = first2;
+ first2->prev = last1;
+
+ first1->prev = last2;
+ last2->next = first1;
+
+ stsk_qs->q[prio] = first1;
+ }
+
+ }
+ }
+
+#ifdef DEBUG
+ {
+ int qmask;
+ erts_aint32_t aprio, state =
+#endif
+
+ erts_smp_atomic32_read_bset_nob(&c_p->state,
+ (ERTS_PSFLG_DELAYED_SYS
+ | ERTS_PSFLG_ACTIVE_SYS),
+ ERTS_PSFLG_ACTIVE_SYS);
+
+#ifdef DEBUG
+ ASSERT(state & ERTS_PSFLG_DELAYED_SYS);
+ qmask = c_p->sys_task_qs->qmask;
+ aprio = ERTS_PSFLGS_GET_ACT_PRIO(state);
+ ASSERT(ERTS_PSFLGS_GET_USR_PRIO(state) >= aprio);
+ ASSERT((qmask & -qmask) >= (1 << aprio));
+ }
+#endif
+
+ erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS);
+
+ (void) ERTS_PROC_SET_DELAYED_GC_TASK_QS(c_p, ERTS_PROC_LOCK_MAIN, NULL);
+
+ if (dgc_tsk_qs)
+ proc_sys_task_queues_free(dgc_tsk_qs);
+
+ return 1;
+}
+
void
erts_sched_stat_modify(int what)
{
@@ -7324,6 +10264,10 @@ erts_schedule_misc_op(void (*func)(void *), void *arg)
rq->misc.start = molp;
rq->misc.end = molp;
+#ifdef ERTS_SMP
+ non_empty_runq(rq);
+#endif
+
erts_smp_runq_unlock(rq);
smp_notify_inc_runq(rq);
@@ -7506,7 +10450,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
* Check for errors.
*/
- if (is_not_atom(mod) || is_not_atom(func) || ((arity = list_length(args)) < 0)) {
+ if (is_not_atom(mod) || is_not_atom(func) || ((arity = erts_list_length(args)) < 0)) {
so->error_code = BADARG;
goto error;
}
@@ -7521,7 +10465,8 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
prio = (erts_aint32_t) so->priority;
}
- state |= (prio & ERTS_PSFLG_PRIO_MASK);
+ state |= (((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_ACT_PRIO_OFFSET)
+ | ((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_USR_PRIO_OFFSET));
if (!rq)
rq = erts_get_runq_proc(parent);
@@ -7592,13 +10537,14 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
p->htop = p->heap;
p->heap_sz = sz;
p->catches = 0;
- p->extra_root = NULL;
p->bin_vheap_sz = p->min_vheap_size;
p->bin_old_vheap_sz = p->min_vheap_size;
p->bin_old_vheap = 0;
p->bin_vheap_mature = 0;
+ p->sys_task_qs = NULL;
+
/* No need to initialize p->fcalls. */
p->current = p->initial+INITIAL_MOD;
@@ -7764,7 +10710,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
* Schedule process for execution.
*/
- schedule_process(p, state, 0);
+ schedule_process(p, state);
VERBOSE(DEBUG_PROCESSES, ("Created a new process: %T\n",p->common.id));
@@ -7815,6 +10761,7 @@ void erts_init_empty_process(Process *p)
p->bin_vheap_sz = BIN_VH_MIN_SIZE;
p->bin_old_vheap_sz = BIN_VH_MIN_SIZE;
p->bin_old_vheap = 0;
+ p->sys_task_qs = NULL;
p->bin_vheap_mature = 0;
#ifdef ERTS_SMP
p->common.u.alive.ptimer = NULL;
@@ -8070,33 +11017,21 @@ delete_process(Process* p)
p->fvalue = NIL;
}
-static ERTS_INLINE erts_aint32_t
-set_proc_exiting_state(Process *p, erts_aint32_t state)
-{
- erts_aint32_t a, n, e;
- a = state;
- while (1) {
- n = e = a;
- n &= ~(ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT);
- n |= ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE;
- if (!(a & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING)))
- n |= ERTS_PSFLG_IN_RUNQ;
- a = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e);
- if (a == e)
- break;
- }
- return a;
-}
-
static ERTS_INLINE void
set_proc_exiting(Process *p,
- erts_aint32_t state,
+ erts_aint32_t in_state,
Eterm reason,
ErlHeapFragment *bp)
{
+ erts_aint32_t state = in_state, enq_prio = -1;
+ int enqueue;
ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(p) == ERTS_PROC_LOCKS_ALL);
- state = set_proc_exiting_state(p, state);
+ enqueue = change_proc_schedule_state(p,
+ ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT,
+ ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE,
+ &state,
+ &enq_prio);
p->fvalue = reason;
if (bp)
@@ -8111,15 +11046,37 @@ set_proc_exiting(Process *p,
cancel_timer(p);
p->i = (BeamInstr *) beam_exit;
- if (erts_system_profile_flags.runnable_procs
- && !(state & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) {
- profile_runnable_proc(p, am_active);
- }
-
- if (!(state & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING)))
- add2runq(p, state);
+ if (enqueue)
+ add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio),
+ state,
+ enq_prio);
}
+static ERTS_INLINE erts_aint32_t
+set_proc_self_exiting(Process *c_p)
+{
+#ifdef DEBUG
+ int enqueue;
+#endif
+ erts_aint32_t state, enq_prio = -1;
+
+ ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCKS_ALL);
+
+ state = erts_smp_atomic32_read_nob(&c_p->state);
+ ASSERT(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS));
+
+#ifdef DEBUG
+ enqueue =
+#endif
+ change_proc_schedule_state(c_p,
+ ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT,
+ ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE,
+ &state,
+ &enq_prio);
+
+ ASSERT(!enqueue);
+ return state;
+}
#ifdef ERTS_SMP
@@ -8167,7 +11124,7 @@ handle_pending_exiters(ErtsProcList *pnd_xtrs)
if (p) {
if (erts_proclist_same(plp, p)) {
erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state);
- if (!(state & ERTS_PSFLG_RUNNING)) {
+ if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS))) {
ASSERT(state & ERTS_PSFLG_PENDING_EXIT);
erts_handle_pending_exit(p, ERTS_PROC_LOCKS_ALL);
}
@@ -8196,8 +11153,15 @@ save_pending_exiter(Process *p)
erts_proclist_store_last(&rq->procs.pending_exiters, plp);
+ non_empty_runq(rq);
+
erts_smp_runq_unlock(rq);
- wake_scheduler(rq, 1);
+#ifdef ERTS_DIRTY_SCHEDULERS
+ if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix))
+ wake_dirty_schedulers(rq, 0);
+ else
+#endif
+ wake_scheduler(rq);
}
#endif
@@ -8388,7 +11352,7 @@ send_exit_signal(Process *c_p, /* current process if and only
}
set_proc_exiting(c_p, state, rsn, NULL);
}
- else if (!(state & ERTS_PSFLG_RUNNING)) {
+ else if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS))) {
/* Process not running ... */
ErtsProcLocks need_locks = ~(*rp_locks) & ERTS_PROC_LOCKS_ALL;
if (need_locks
@@ -8765,9 +11729,6 @@ resume_suspend_monitor(ErtsSuspendMonitor *smon, void *vc_p)
void
erts_do_exit_process(Process* p, Eterm reason)
{
-#ifdef ERTS_SMP
- erts_aint32_t state;
-#endif
p->arity = 0; /* No live registers */
p->fvalue = reason;
@@ -8792,10 +11753,9 @@ erts_do_exit_process(Process* p, Eterm reason)
#endif
#ifndef ERTS_SMP
- set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state));
+ set_proc_self_exiting(p);
#else
- state = set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state));
- if (state & ERTS_PSFLG_PENDING_EXIT) {
+ if (ERTS_PSFLG_PENDING_EXIT & set_proc_self_exiting(p)) {
/* Process exited before pending exit was received... */
p->pending_exit.reason = THE_NON_VALUE;
if (p->pending_exit.bp) {
@@ -8849,6 +11809,7 @@ erts_continue_exit_process(Process *p)
DistEntry *dep;
struct saved_calls *scb;
process_breakpoint_time_t *pbt;
+ erts_aint32_t state;
#ifdef DEBUG
int yield_allowed = 1;
@@ -8885,6 +11846,13 @@ erts_continue_exit_process(Process *p)
p->flags &= ~F_USING_DB;
}
+ erts_set_gc_state(p, 1);
+ state = erts_smp_atomic32_read_acqb(&p->state);
+ if (state & ERTS_PSFLG_ACTIVE_SYS) {
+ if (cleanup_sys_tasks(p, state, CONTEXT_REDS) >= CONTEXT_REDS/2)
+ goto yield;
+ }
+
if (p->flags & F_USING_DDLL) {
erts_ddll_proc_dead(p, ERTS_PROC_LOCK_MAIN);
p->flags &= ~F_USING_DDLL;
@@ -8962,17 +11930,31 @@ erts_continue_exit_process(Process *p)
{
/* Inactivate and notify free */
erts_aint32_t n, e, a = erts_smp_atomic32_read_nob(&p->state);
+#ifdef ERTS_SMP
+ int refc_inced = 0;
+#endif
while (1) {
n = e = a;
ASSERT(a & ERTS_PSFLG_EXITING);
n |= ERTS_PSFLG_FREE;
n &= ~ERTS_PSFLG_ACTIVE;
+#ifdef ERTS_SMP
+ if ((n & ERTS_PSFLG_IN_RUNQ) && !refc_inced) {
+ erts_smp_proc_inc_refc(p);
+ refc_inced = 1;
+ }
+#endif
a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e);
if (a == e)
break;
}
- }
+#ifdef ERTS_SMP
+ if (refc_inced && !(n & ERTS_PSFLG_IN_RUNQ))
+ erts_smp_proc_dec_refc(p);
+#endif
+ }
+
dep = ((p->flags & F_DISTRIBUTION)
? ERTS_PROC_SET_DIST_ENTRY(p, ERTS_PROC_LOCKS_ALL, NULL)
: NULL);
@@ -9025,12 +12007,6 @@ erts_continue_exit_process(Process *p)
if (pbt)
erts_free(ERTS_ALC_T_BPD, (void *) pbt);
- if (p->extra_root != NULL) {
- (p->extra_root->cleanup)(p->extra_root); /* Should deallocate
- whole structure */
- p->extra_root = NULL;
- }
-
delete_process(p);
#ifdef ERTS_SMP
@@ -9075,7 +12051,7 @@ timeout_proc(Process* p)
state = erts_smp_atomic32_read_acqb(&p->state);
if (!(state & ERTS_PSFLG_ACTIVE))
- schedule_process(p, state, 0);
+ schedule_process(p, state);
}
@@ -9153,7 +12129,9 @@ erts_program_counter_info(int to, void *to_arg, Process *p)
print_function_from_pc(to, to_arg, p->cp);
erts_print(to, to_arg, ")\n");
state = erts_smp_atomic32_read_acqb(&p->state);
- if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_GC))) {
+ if (!(state & (ERTS_PSFLG_RUNNING
+ | ERTS_PSFLG_RUNNING_SYS
+ | ERTS_PSFLG_GC))) {
erts_print(to, to_arg, "arity = %d\n",p->arity);
if (!ERTS_IS_CRASH_DUMPING) {
/*
@@ -9241,6 +12219,10 @@ void erl_halt(int code)
if (-1 == erts_smp_atomic32_cmpxchg_acqb(&erts_halt_progress,
erts_no_schedulers,
-1)) {
+#ifdef ERTS_DIRTY_SCHEDULERS
+ ERTS_DIRTY_CPU_RUNQ->halt_in_progress = 1;
+ ERTS_DIRTY_IO_RUNQ->halt_in_progress = 1;
+#endif
erts_halt_code = code;
notify_reap_ports_relb();
}