aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_process.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-04-27 13:06:27 +0200
committerRickard Green <[email protected]>2012-04-27 13:06:27 +0200
commit398bb9a9a5b2a56f0333ce81efe00380692ec93a (patch)
treeadb7239744ae6f3d45f672e29d9c2bd97e0e67d9 /erts/emulator/beam/erl_process.c
parent52312a8b93e8b250099e2f1b1b802e63e37971cc (diff)
parent3730e28ad736f0538141d4474e0038a9cc48df71 (diff)
downloadotp-398bb9a9a5b2a56f0333ce81efe00380692ec93a.tar.gz
otp-398bb9a9a5b2a56f0333ce81efe00380692ec93a.tar.bz2
otp-398bb9a9a5b2a56f0333ce81efe00380692ec93a.zip
Merge branch 'rickard/proc-sched/OTP-9892'
* rickard/proc-sched/OTP-9892: Teach etp-commands to understand new emulator internal data structures Optimize process state changes Optimize process table access Implement possibility to use ordinary mutexes as process locks Conflicts: erts/emulator/beam/erl_alloc.types
Diffstat (limited to 'erts/emulator/beam/erl_process.c')
-rw-r--r--erts/emulator/beam/erl_process.c3742
1 files changed, 1929 insertions, 1813 deletions
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 7942a05153..bb8edd2624 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -94,34 +94,194 @@
#define HIGH_BIT (1 << PRIORITY_HIGH)
#define NORMAL_BIT (1 << PRIORITY_NORMAL)
#define LOW_BIT (1 << PRIORITY_LOW)
+#define PORT_BIT (1 << ERTS_PORT_PRIO_LEVEL)
-#define ERTS_MAYBE_SAVE_TERMINATING_PROCESS(P) \
-do { \
- ERTS_SMP_LC_ASSERT(erts_lc_mtx_is_locked(&proc_tab_mtx)); \
- if (saved_term_procs.end) \
- save_terminating_process((P)); \
-} while (0)
+#define ERTS_EMPTY_RUNQ(RQ) \
+ ((ERTS_RUNQ_FLGS_GET_NOB((RQ)) & ERTS_RUNQ_FLGS_QMASK) == 0 \
+ && (RQ)->misc.start == NULL)
+
+#undef RUNQ_READ_RQ
+#undef RUNQ_SET_RQ
+#define RUNQ_READ_RQ(X) ((ErtsRunQueue *) erts_smp_atomic_read_nob((X)))
+#define RUNQ_SET_RQ(X, RQ) erts_smp_atomic_set_nob((X), (erts_aint_t) (RQ))
-#define ERTS_EMPTY_RUNQ(RQ) \
- ((RQ)->len == 0 && (RQ)->misc.start == NULL)
+#ifdef DEBUG
+# if defined(ARCH_64) && !HALFWORD_HEAP
+# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) \
+ (RUNQ_SET_RQ((RQP), (0xdeadbeefdead0003LL | ((N) << 4)))
+# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) \
+do { \
+ ASSERT((RQP) != NULL); \
+ ASSERT(((((Uint) (RQP)) & ((Uint) 0x3))) == ((Uint) 0)); \
+ ASSERT((((Uint) (RQP)) & ~((Uint) 0xffff)) != ((Uint) 0xdeadbeefdead0000LL));\
+} while (0)
+# else
+# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) \
+ (RUNQ_SET_RQ((RQP), (0xdead0003 | ((N) << 4))))
+# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) \
+do { \
+ ASSERT((RQP) != NULL); \
+ ASSERT(((((UWord) (RQP)) & ((UWord) 1))) == ((UWord) 0)); \
+ ASSERT((((UWord) (RQP)) & ~((UWord) 0xffff)) != ((UWord) 0xdead0000)); \
+} while (0)
+# endif
+#else
+# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N)
+# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP)
+#endif
#define ERTS_EMPTY_RUNQ_PORTS(RQ) \
- ((RQ)->ports.info.len == 0 && (RQ)->misc.start == NULL)
+ (RUNQ_READ_LEN(&(RQ)->ports.info.len) == 0 && (RQ)->misc.start == NULL)
extern BeamInstr beam_apply[];
extern BeamInstr beam_exit[];
extern BeamInstr beam_continue_exit[];
-static Sint p_last;
-static Sint p_next;
-static Sint p_serial;
-static Uint p_serial_mask;
-static Uint p_serial_shift;
+#ifdef ARCH_32
+
+union {
+ erts_smp_dw_atomic_t pid_data;
+ char align[ERTS_CACHE_LINE_SIZE];
+} last erts_align_attribute(ERTS_CACHE_LINE_SIZE);
+
+
+static ERTS_INLINE Uint64
+dw_aint_to_uint64(erts_dw_aint_t *dw)
+{
+#ifdef ETHR_SU_DW_NAINT_T__
+ return (Uint64) dw->dw_sint;
+#else
+ Uint64 res;
+ res = (Uint64) ((Uint32) dw->sint[ERTS_DW_AINT_HIGH_WORD]);
+ res <<= 32;
+ res |= (Uint64) ((Uint32) dw->sint[ERTS_DW_AINT_LOW_WORD]);
+ return res;
+#endif
+}
+
+static void
+unint64_to_dw_aint(erts_dw_aint_t *dw, Uint64 val)
+{
+#ifdef ETHR_SU_DW_NAINT_T__
+ dw->dw_sint = (ETHR_SU_DW_NAINT_T__) val;
+#else
+ dw->sint[ERTS_DW_AINT_LOW_WORD] = (erts_aint_t) (val & 0xffffffff);
+ dw->sint[ERTS_DW_AINT_HIGH_WORD] = (erts_aint_t) ((val >> 32) & 0xffffffff);
+#endif
+}
+
+static ERTS_INLINE void
+last_pid_data_init_nob(Uint64 val)
+{
+ erts_dw_aint_t dw;
+ unint64_to_dw_aint(&dw, val);
+ erts_smp_dw_atomic_init_nob(&last.pid_data, &dw);
+}
+
+static ERTS_INLINE void
+last_pid_data_set_relb(Uint64 val)
+{
+ erts_dw_aint_t dw;
+ unint64_to_dw_aint(&dw, val);
+ erts_smp_dw_atomic_set_relb(&last.pid_data, &dw);
+}
+
+static ERTS_INLINE Uint64
+last_pid_data_read_nob(void)
+{
+ erts_dw_aint_t dw;
+ erts_smp_dw_atomic_read_nob(&last.pid_data, &dw);
+ return dw_aint_to_uint64(&dw);
+}
+
+static ERTS_INLINE Uint64
+last_pid_data_read_acqb(void)
+{
+ erts_dw_aint_t dw;
+ erts_smp_dw_atomic_read_acqb(&last.pid_data, &dw);
+ return dw_aint_to_uint64(&dw);
+}
+
+static ERTS_INLINE Uint64
+last_pid_data_cmpxchg_relb(Uint64 new, Uint64 exp)
+{
+ erts_dw_aint_t dw_new, dw_xchg;
+
+ unint64_to_dw_aint(&dw_new, new);
+ unint64_to_dw_aint(&dw_xchg, exp);
+
+ if (erts_smp_dw_atomic_cmpxchg_relb(&last.pid_data, &dw_new, &dw_xchg))
+ return exp;
+ else
+ return dw_aint_to_uint64(&dw_xchg);
+}
+
+#elif defined(ARCH_64)
+
+union {
+ erts_smp_atomic_t pid_data;
+ char align[ERTS_CACHE_LINE_SIZE];
+} last erts_align_attribute(ERTS_CACHE_LINE_SIZE);
+
+static ERTS_INLINE void
+last_pid_data_init_nob(Uint64 val)
+{
+ erts_smp_atomic_init_nob(&last.pid_data, (erts_aint_t) val);
+}
+
+static ERTS_INLINE void
+last_pid_data_set_relb(Uint64 val)
+{
+ erts_smp_atomic_set_relb(&last.pid_data, (erts_aint_t) val);
+}
+
+static ERTS_INLINE Uint64
+last_pid_data_read_nob(void)
+{
+ return (Uint64) erts_smp_atomic_read_nob(&last.pid_data);
+}
+
+static ERTS_INLINE Uint64
+last_pid_data_read_acqb(void)
+{
+ return (Uint64) erts_smp_atomic_read_acqb(&last.pid_data);
+}
+
+static ERTS_INLINE Uint64
+last_pid_data_cmpxchg_relb(Uint64 new, Uint64 exp)
+{
+ return (Uint64) erts_smp_atomic_cmpxchg_relb(&last.pid_data,
+ (erts_aint_t) new,
+ (erts_aint_t) exp);
+}
+
+#else
+# error "Not 64-bit, nor 32-bit architecture..."
+#endif
+
+static ERTS_INLINE int
+last_pid_data_cmp(Uint64 lpd1, Uint64 lpd2)
+{
+ Uint64 lpd1_wrap;
+
+ if (lpd1 == lpd2)
+ return 0;
+
+ lpd1_wrap = lpd1 + (((Uint64) 1) << 63);
+
+ if (lpd1 < lpd1_wrap)
+ return (lpd1 < lpd2 && lpd2 < lpd1_wrap) ? -1 : 1;
+ else
+ return (lpd1_wrap <= lpd2 && lpd2 < lpd1) ? 1 : -1;
+}
+
+
+#define ERTS_PID_DATA_MASK__ ((1 << _PID_DATA_SIZE) - 1)
int erts_sched_compact_load;
Uint erts_no_schedulers;
-Uint erts_max_processes = ERTS_DEFAULT_MAX_PROCESSES;
-Uint erts_process_tab_index_mask;
+
+ErtsProcTab erts_proc erts_align_attribute(ERTS_CACHE_LINE_SIZE);
static int wakeup_other_limit;
@@ -184,7 +344,7 @@ static struct {
struct {
int active_runqs;
int reds;
- int max_len;
+ erts_aint32_t max_len;
} prev_rise;
Uint n;
} balance_info;
@@ -204,7 +364,7 @@ erts_sched_stat_t erts_sched_stat;
static erts_tsd_key_t sched_data_key;
#endif
-static erts_smp_mtx_t proc_tab_mtx;
+erts_smp_rwmtx_t erts_proc_tab_rwmtx;
static erts_smp_atomic32_t function_calls;
@@ -227,7 +387,6 @@ typedef union {
static ErtsAlignedSchedulerSleepInfo *aligned_sched_sleep_info;
-Process** process_tab;
static Uint last_reductions;
static Uint last_exact_reductions;
Uint erts_default_process_flags;
@@ -258,11 +417,11 @@ struct ErtsTermProcElement_ {
union {
struct {
Eterm pid;
- SysTimeval spawned;
- SysTimeval exited;
+ Uint64 spawned;
+ Uint64 exited;
} process;
struct {
- SysTimeval time;
+ Uint64 interval;
} bif_invocation;
} u;
};
@@ -402,6 +561,53 @@ erts_smp_lc_runq_is_locked(ErtsRunQueue *runq)
}
#endif
+static erts_interval_t *proc_interval;
+
+static void
+proc_interval_init(void)
+{
+ proc_interval = erts_alloc_permanent_cache_aligned(
+ ERTS_ALC_T_PROC_INTERVAL,
+ sizeof(erts_interval_t));
+ erts_smp_interval_init(proc_interval);
+}
+
+static ERTS_INLINE Uint64
+get_proc_interval(void)
+{
+ return erts_smp_current_interval_nob(proc_interval);
+}
+
+static ERTS_INLINE Uint64
+ensure_later_proc_interval(Uint64 interval)
+{
+ return erts_smp_ensure_later_interval_nob(proc_interval, interval);
+}
+
+static ERTS_INLINE Uint64
+step_proc_interval(void)
+{
+ return erts_smp_step_interval_nob(proc_interval);
+}
+
+Uint64
+erts_get_proc_interval(void)
+{
+ return get_proc_interval();
+}
+
+Uint64
+erts_ensure_later_proc_interval(Uint64 interval)
+{
+ return ensure_later_proc_interval(interval);
+}
+
+Uint64
+erts_step_proc_interval(void)
+{
+ return step_proc_interval();
+}
+
void
erts_pre_init_process(void)
{
@@ -451,7 +657,16 @@ erts_pre_init_process(void)
void
erts_init_process(int ncpu)
{
- Uint proc_bits = ERTS_PROC_BITS;
+ int proc_tab_sz;
+ int max_proc_bits;
+ int proc_bits = ERTS_PROC_BITS;
+ erts_smp_atomic_t *proc_entry;
+ char *proc_tab_end;
+ erts_smp_rwmtx_opt_t proc_tab_rwmtx_opts = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER;
+ proc_tab_rwmtx_opts.type = ERTS_SMP_RWMTX_TYPE_EXTREMELY_FREQUENT_READ;
+ proc_tab_rwmtx_opts.lived = ERTS_SMP_RWMTX_LONG_LIVED;
+
+ proc_interval_init();
#ifdef ERTS_SMP
erts_disable_proc_not_running_opt = 0;
@@ -462,29 +677,57 @@ erts_init_process(int ncpu)
erts_smp_atomic32_init_nob(&process_count, 0);
- if (erts_use_r9_pids_ports) {
+ if (erts_use_r9_pids_ports)
proc_bits = ERTS_R9_PROC_BITS;
- ASSERT(erts_max_processes <= (1 << ERTS_R9_PROC_BITS));
- }
- process_tab = (Process**) erts_alloc(ERTS_ALC_T_PROC_TABLE,
- erts_max_processes*sizeof(Process*));
- sys_memzero(process_tab, erts_max_processes * sizeof(Process*));
+ if (erts_proc.max > (1 << proc_bits))
+ erts_proc.max = 1 << proc_bits;
+
+ proc_tab_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(erts_proc.max
+ * sizeof(erts_smp_atomic_t));
+ erts_proc.tab = erts_alloc(ERTS_ALC_T_PROC_TABLE, proc_tab_sz);
+ proc_tab_end = ((char *) erts_proc.tab) + proc_tab_sz;
+ proc_entry = erts_proc.tab;
+ while (proc_tab_end > ((char *) proc_entry)) {
+ erts_smp_atomic_init_nob(proc_entry, ERTS_AINT_NULL);
+ proc_entry++;
+ }
#ifdef HYBRID
erts_active_procs = (Process**)
erts_alloc(ERTS_ALC_T_ACTIVE_PROCS,
- erts_max_processes * sizeof(Process*));
+ erts_proc.max * sizeof(Process*));
erts_num_active_procs = 0;
#endif
- erts_smp_mtx_init(&proc_tab_mtx, "proc_tab");
- p_last = -1;
- p_next = 0;
- p_serial = 0;
+ erts_smp_rwmtx_init_opt(&erts_proc_tab_rwmtx,
+ &proc_tab_rwmtx_opts,
+ "proc_tab");
+ last_pid_data_init_nob(~((Uint64) 0));
+
+ max_proc_bits = erts_fit_in_bits_int32((Sint32) erts_proc.max - 1);
+
+ erts_proc.tab_cache_lines = proc_tab_sz/ERTS_CACHE_LINE_SIZE;
+ erts_proc.pix_per_cache_line = ERTS_CACHE_LINE_SIZE/sizeof(erts_smp_atomic_t);
+ if ((erts_proc.max & (erts_proc.max - 1))
+ | (erts_proc.pix_per_cache_line & (erts_proc.pix_per_cache_line - 1))) {
+ /*
+ * erts_proc.max or erts_proc.pix_per_cache_line
+ * not a power of 2 :(
+ */
+ erts_proc.pix_cl_mask = 0;
+ erts_proc.pix_cl_shift = 0;
+ erts_proc.pix_cli_mask = 0;
+ erts_proc.pix_cli_shift = 0;
+ }
+ else {
+ ASSERT((erts_proc.tab_cache_lines
+ & (erts_proc.tab_cache_lines - 1)) == 0);
+ erts_proc.pix_cl_mask = erts_proc.tab_cache_lines-1;
+ erts_proc.pix_cl_shift = erts_fit_in_bits_int32(erts_proc.pix_per_cache_line-1);
+ erts_proc.pix_cli_shift = erts_fit_in_bits_int32(erts_proc.pix_cl_mask);
+ erts_proc.pix_cli_mask = (1 << (max_proc_bits - erts_proc.pix_cli_shift)) - 1;
+ }
- p_serial_shift = erts_fit_in_bits(erts_max_processes - 1);
- p_serial_mask = ((~(~((Uint) 0) << proc_bits)) >> p_serial_shift);
- erts_process_tab_index_mask = ~(~((Uint) 0) << p_serial_shift);
last_reductions = 0;
last_exact_reductions = 0;
erts_default_process_flags = 0;
@@ -742,8 +985,9 @@ static ERTS_INLINE ErtsProcList *
proclist_create(Process *p)
{
ErtsProcList *plp = proclist_alloc();
+ ensure_later_proc_interval(p->started_interval);
plp->pid = p->id;
- plp->started = p->started;
+ plp->started_interval = p->started_interval;
return plp;
}
@@ -756,8 +1000,7 @@ proclist_destroy(ErtsProcList *plp)
static ERTS_INLINE int
proclist_same(ErtsProcList *plp, Process *p)
{
- return (plp->pid == p->id
- && erts_cmp_timeval(&plp->started, &p->started) == 0);
+ return plp->pid == p->id && plp->started_interval == p->started_interval;
}
ErtsProcList *
@@ -1719,8 +1962,8 @@ sched_waiting_sys(Uint no, ErtsRunQueue *rq)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
ASSERT(rq->waiting >= 0);
- rq->flags |= (ERTS_RUNQ_FLG_OUT_OF_WORK
- | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK);
+ ERTS_RUNQ_FLGS_SET(rq, (ERTS_RUNQ_FLG_OUT_OF_WORK
+ | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK));
rq->waiting++;
rq->waiting *= -1;
rq->woken = 0;
@@ -1796,8 +2039,8 @@ static ERTS_INLINE void
sched_waiting(Uint no, ErtsRunQueue *rq)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
- rq->flags |= (ERTS_RUNQ_FLG_OUT_OF_WORK
- | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK);
+ ERTS_RUNQ_FLGS_SET(rq, (ERTS_RUNQ_FLG_OUT_OF_WORK
+ | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK));
if (rq->waiting < 0)
rq->waiting--;
else
@@ -1829,9 +2072,8 @@ ongoing_multi_scheduling_block(void)
static ERTS_INLINE void
empty_runq(ErtsRunQueue *rq)
{
- erts_aint32_t oifls = erts_smp_atomic32_read_band_nob(&rq->info_flags,
- ~ERTS_RUNQ_IFLG_NONEMPTY);
- if (oifls & ERTS_RUNQ_IFLG_NONEMPTY) {
+ Uint32 old_flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED);
+ if (old_flags & ERTS_RUNQ_FLG_NONEMPTY) {
#ifdef DEBUG
erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues);
/*
@@ -1847,9 +2089,8 @@ empty_runq(ErtsRunQueue *rq)
static ERTS_INLINE void
non_empty_runq(ErtsRunQueue *rq)
{
- erts_aint32_t oifls = erts_smp_atomic32_read_bor_nob(&rq->info_flags,
- ERTS_RUNQ_IFLG_NONEMPTY);
- if (!(oifls & ERTS_RUNQ_IFLG_NONEMPTY)) {
+ Uint32 old_flags = ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_NONEMPTY);
+ if (!(old_flags & ERTS_RUNQ_FLG_NONEMPTY)) {
#ifdef DEBUG
erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues);
/*
@@ -2462,19 +2703,16 @@ try_inc_no_active_runqs(int active)
static ERTS_INLINE int
chk_wake_sched(ErtsRunQueue *crq, int ix, int activate)
{
- erts_aint32_t iflgs;
+ Uint32 flags;
ErtsRunQueue *wrq;
if (crq->ix == ix)
return 0;
wrq = ERTS_RUNQ_IX(ix);
- iflgs = erts_smp_atomic32_read_nob(&wrq->info_flags);
- if (!(iflgs & (ERTS_RUNQ_IFLG_SUSPENDED|ERTS_RUNQ_IFLG_NONEMPTY))) {
+ flags = ERTS_RUNQ_FLGS_GET(wrq);
+ if (!(flags & (ERTS_RUNQ_FLG_SUSPENDED|ERTS_RUNQ_FLG_NONEMPTY))) {
if (activate) {
- if (try_inc_no_active_runqs(ix+1)) {
- erts_smp_xrunq_lock(crq, wrq);
- wrq->flags &= ~ERTS_RUNQ_FLG_INACTIVE;
- erts_smp_xrunq_unlock(crq, wrq);
- }
+ if (try_inc_no_active_runqs(ix+1))
+ ERTS_RUNQ_FLGS_UNSET(wrq, ERTS_RUNQ_FLG_INACTIVE);
}
wake_scheduler(wrq, 0);
return 1;
@@ -2541,9 +2779,7 @@ erts_sched_notify_check_cpu_bind(void)
int ix;
for (ix = 0; ix < erts_no_run_queues; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- erts_smp_runq_lock(rq);
- rq->flags |= ERTS_RUNQ_FLG_CHK_CPU_BIND;
- erts_smp_runq_unlock(rq);
+ ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_CHK_CPU_BIND);
wake_scheduler(rq, 0);
}
#else
@@ -2552,409 +2788,547 @@ erts_sched_notify_check_cpu_bind(void)
}
-#ifdef ERTS_SMP
+static ERTS_INLINE void
+enqueue_process(ErtsRunQueue *runq, int prio, Process *p)
+{
+ ErtsRunPrioQueue *rpq;
-ErtsRunQueue *
-erts_prepare_emigrate(ErtsRunQueue *c_rq, ErtsRunQueueInfo *c_rqi, int prio)
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
+
+ erts_smp_inc_runq_len(runq, &runq->procs.prio_info[prio], prio);
+
+ if (prio == PRIORITY_LOW) {
+ p->schedule_count = RESCHEDULE_LOW;
+ rpq = &runq->procs.prio[PRIORITY_NORMAL];
+ }
+ else {
+ p->schedule_count = 1;
+ rpq = &runq->procs.prio[prio];
+ }
+
+ p->next = NULL;
+ if (rpq->last)
+ rpq->last->next = p;
+ else
+ rpq->first = p;
+ rpq->last = p;
+}
+
+
+static ERTS_INLINE void
+unqueue_process(ErtsRunQueue *runq,
+ ErtsRunPrioQueue *rpq,
+ ErtsRunQueueInfo *rqi,
+ int prio,
+ Process *prev_proc,
+ Process *proc)
{
- ASSERT(ERTS_CHK_RUNQ_FLG_EMIGRATE(c_rq->flags, prio));
- ASSERT(ERTS_CHK_RUNQ_FLG_EVACUATE(c_rq->flags, prio)
- || c_rqi->len >= c_rqi->migrate.limit.this);
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
- while (1) {
- ErtsRunQueue *n_rq = c_rqi->migrate.runq;
- ERTS_DBG_VERIFY_VALID_RUNQP(n_rq);
- erts_smp_xrunq_lock(c_rq, n_rq);
-
- /*
- * erts_smp_xrunq_lock() may release lock on c_rq! We have
- * to check that we still want to emigrate and emigrate
- * to the same run queue as before.
- */
+ if (prev_proc)
+ prev_proc->next = proc->next;
+ else
+ rpq->first = proc->next;
+ if (!proc->next)
+ rpq->last = prev_proc;
- if (ERTS_CHK_RUNQ_FLG_EMIGRATE(c_rq->flags, prio)) {
- Uint32 force = (ERTS_CHK_RUNQ_FLG_EVACUATE(c_rq->flags, prio)
- | (c_rq->flags & ERTS_RUNQ_FLG_INACTIVE));
- if (force || c_rqi->len > c_rqi->migrate.limit.this) {
- ErtsRunQueueInfo *n_rqi;
- /* We still want to emigrate */
-
- if (n_rq != c_rqi->migrate.runq) {
- /* Ahh... run queue changed; need to do it all over again... */
- erts_smp_runq_unlock(n_rq);
- continue;
- }
- else {
+ if (!rpq->first)
+ rpq->last = NULL;
- if (prio == ERTS_PORT_PRIO_LEVEL)
- n_rqi = &n_rq->ports.info;
- else
- n_rqi = &n_rq->procs.prio_info[prio];
+ erts_smp_dec_runq_len(runq, rqi, prio);
+}
- if (force || (n_rqi->len < c_rqi->migrate.limit.other)) {
- /* emigrate ... */
- return n_rq;
- }
- }
- }
- }
- ASSERT(n_rq != c_rq);
- erts_smp_runq_unlock(n_rq);
- if (!(c_rq->flags & ERTS_RUNQ_FLG_INACTIVE)) {
- /* No more emigrations to this runq */
- ERTS_UNSET_RUNQ_FLG_EMIGRATE(c_rq->flags, prio);
- ERTS_DBG_SET_INVALID_RUNQP(c_rqi->migrate.runq, 0x3);
- }
+static ERTS_INLINE Process *
+dequeue_process(ErtsRunQueue *runq, int prio_q, erts_aint32_t *statep)
+{
+ erts_aint32_t state;
+ int prio;
+ ErtsRunPrioQueue *rpq;
+ ErtsRunQueueInfo *rqi;
+ Process *p;
+
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
+
+ ASSERT(PRIORITY_NORMAL == prio_q
+ || PRIORITY_HIGH == prio_q
+ || PRIORITY_MAX == prio_q);
+
+ rpq = &runq->procs.prio[prio_q];
+ p = rpq->first;
+ if (!p)
+ return NULL;
+
+ ERTS_SMP_DATA_DEPENDENCY_READ_MEMORY_BARRIER;
+
+ state = erts_smp_atomic32_read_nob(&p->state);
+ if (statep)
+ *statep = state;
+
+ prio = (int) (ERTS_PSFLG_PRIO_MASK & state);
+
+ rqi = &runq->procs.prio_info[prio];
+
+ if (p)
+ unqueue_process(runq, rpq, rqi, prio, NULL, p);
+
+ return p;
+}
+
+static ERTS_INLINE int
+check_requeue_process(ErtsRunQueue *rq, int prio_q)
+{
+ ErtsRunPrioQueue *rpq = &rq->procs.prio[prio_q];
+ Process *p = rpq->first;
+ if (--p->schedule_count > 0 && p != rpq->last) {
+ /* reschedule */
+ rpq->first = p->next;
+ rpq->last->next = p;
+ rpq->last = p;
+ p->next = NULL;
+ return 1;
+ }
+ return 0;
+}
+
+#ifdef ERTS_SMP
+
+static ErtsRunQueue *
+check_immigration_need(ErtsRunQueue *c_rq, ErtsMigrationPath *mp, int prio)
+{
+ int len;
+ Uint32 f_flags, f_rq_flags;
+ ErtsRunQueue *f_rq;
+
+ f_flags = mp->prio[prio].flags;
+
+ ASSERT(ERTS_CHK_RUNQ_FLG_IMMIGRATE(mp->flags, prio));
+
+ f_rq = mp->prio[prio].runq;
+ if (!f_rq)
+ return NULL;
+ f_rq_flags = ERTS_RUNQ_FLGS_GET(f_rq);
+ if (f_rq_flags & ERTS_RUNQ_FLG_PROTECTED)
return NULL;
+
+ if (ERTS_CHK_RUNQ_FLG_EVACUATE(f_flags, prio))
+ return f_rq;
+
+ if (f_rq_flags & ERTS_RUNQ_FLG_INACTIVE)
+ return f_rq;
+
+ if (prio == ERTS_PORT_PRIO_LEVEL)
+ len = RUNQ_READ_LEN(&c_rq->ports.info.len);
+ else
+ len = RUNQ_READ_LEN(&c_rq->procs.prio_info[prio].len);
+
+ if (len < mp->prio[prio].limit.this) {
+ if (prio == ERTS_PORT_PRIO_LEVEL)
+ len = RUNQ_READ_LEN(&f_rq->ports.info.len);
+ else
+ len = RUNQ_READ_LEN(&f_rq->procs.prio_info[prio].len);
+
+ if (len > mp->prio[prio].limit.other)
+ return f_rq;
}
+ return NULL;
}
static void
-immigrate(ErtsRunQueue *rq)
+immigrate(ErtsRunQueue *c_rq, ErtsMigrationPath *mp)
{
- int prio;
+ Uint32 iflags, iflag;
+ erts_smp_runq_unlock(c_rq);
- ASSERT(rq->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK);
+ ASSERT(erts_thr_progress_is_managed_thread());
- for (prio = 0; prio < ERTS_NO_PRIO_LEVELS; prio++) {
- if (ERTS_CHK_RUNQ_FLG_IMMIGRATE(rq->flags, prio)) {
- ErtsRunQueueInfo *rqi = (prio == ERTS_PORT_PRIO_LEVEL
- ? &rq->ports.info
- : &rq->procs.prio_info[prio]);
- ErtsRunQueue *from_rq = rqi->migrate.runq;
- int rq_locked, from_rq_locked;
+ iflags = mp->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK;
- ERTS_DBG_VERIFY_VALID_RUNQP(from_rq);
+ iflag = iflags & -iflags;
- rq_locked = 1;
- from_rq_locked = 1;
- erts_smp_xrunq_lock(rq, from_rq);
- /*
- * erts_smp_xrunq_lock() may release lock on rq! We have
- * to check that we still want to immigrate from the same
- * run queue as before.
- */
- if (ERTS_CHK_RUNQ_FLG_IMMIGRATE(rq->flags, prio)
- && from_rq == rqi->migrate.runq) {
- ErtsRunQueueInfo *from_rqi = (prio == ERTS_PORT_PRIO_LEVEL
- ? &from_rq->ports.info
- : &from_rq->procs.prio_info[prio]);
- if ((ERTS_CHK_RUNQ_FLG_EVACUATE(rq->flags, prio)
- && ERTS_CHK_RUNQ_FLG_EVACUATE(from_rq->flags, prio)
- && from_rqi->len)
- || (from_rqi->len > rqi->migrate.limit.other
- && rqi->len < rqi->migrate.limit.this)) {
- if (prio == ERTS_PORT_PRIO_LEVEL) {
- Port *prt = from_rq->ports.start;
- if (prt) {
- int prt_locked = 0;
- (void) erts_port_migrate(prt, &prt_locked,
- from_rq, &from_rq_locked,
- rq, &rq_locked);
- if (prt_locked)
- erts_smp_port_unlock(prt);
- }
- }
- else {
- Process *proc;
- ErtsRunPrioQueue *from_rpq;
- from_rpq = (prio == PRIORITY_LOW
- ? &from_rq->procs.prio[PRIORITY_NORMAL]
- : &from_rq->procs.prio[prio]);
- for (proc = from_rpq->first; proc; proc = proc->next)
- if (proc->prio == prio && !proc->bound_runq)
- break;
- if (proc) {
- ErtsProcLocks proc_locks = 0;
- (void) erts_proc_migrate(proc, &proc_locks,
- from_rq, &from_rq_locked,
- rq, &rq_locked);
- if (proc_locks)
- erts_smp_proc_unlock(proc, proc_locks);
- }
+ while (iflag) {
+ ErtsRunQueue *rq;
+ int prio;
+
+ switch (iflag) {
+ case (MAX_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT):
+ prio = PRIORITY_MAX;
+ break;
+ case (HIGH_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT):
+ prio = PRIORITY_HIGH;
+ break;
+ case (NORMAL_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT):
+ prio = PRIORITY_NORMAL;
+ break;
+ case (LOW_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT):
+ prio = PRIORITY_LOW;
+ break;
+ case (PORT_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT):
+ prio = ERTS_PORT_PRIO_LEVEL;
+ break;
+ default:
+ erl_exit(ERTS_ABORT_EXIT,
+ "%s:%d:%s(): Invalid immigrate queue mask",
+ __FILE__, __LINE__, __func__);
+ prio = 0;
+ break;
+ }
+
+ iflags &= ~iflag;
+ iflag = iflags & -iflags;
+
+ rq = check_immigration_need(c_rq, mp, prio);
+ if (rq) {
+ erts_smp_runq_lock(rq);
+ if (prio == ERTS_PORT_PRIO_LEVEL) {
+ Port *prt;
+ prt = erts_dequeue_port(rq);
+ if (prt)
+ RUNQ_SET_RQ(&prt->run_queue, c_rq);
+ erts_smp_runq_unlock(rq);
+ if (prt) {
+ /* port might terminate while we have no lock... */
+ rq = erts_port_runq(prt);
+ if (rq) {
+ if (rq != c_rq)
+ erl_exit(ERTS_ABORT_EXIT,
+ "%s:%d:%s(): Internal error",
+ __FILE__, __LINE__, __func__);
+ erts_enqueue_port(c_rq, prt);
+ if (!iflag)
+ return; /* done */
+ erts_smp_runq_unlock(c_rq);
}
}
- else {
- ERTS_UNSET_RUNQ_FLG_IMMIGRATE(rq->flags, prio);
- ERTS_DBG_SET_INVALID_RUNQP(rqi->migrate.runq, 0x1);
+ }
+ else {
+ ErtsRunPrioQueue *rpq = &rq->procs.prio[prio == PRIORITY_LOW
+ ? PRIORITY_NORMAL
+ : prio];
+ Process *prev_proc = NULL;
+ Process *proc = rpq->first;
+ int rq_locked = 1;
+
+ while (proc) {
+ erts_aint32_t state;
+ state = erts_smp_atomic32_read_acqb(&proc->state);
+ if (!(ERTS_PSFLG_BOUND & state)
+ && (prio == (int) (ERTS_PSFLG_PRIO_MASK & state))) {
+ ErtsRunQueueInfo *rqi = &rq->procs.prio_info[prio];
+ unqueue_process(rq, rpq, rqi, prio, prev_proc, proc);
+ erts_smp_runq_unlock(rq);
+ RUNQ_SET_RQ(&proc->run_queue, c_rq);
+ rq_locked = 0;
+
+ erts_smp_runq_lock(c_rq);
+ enqueue_process(c_rq, prio, proc);
+ if (!iflag)
+ return; /* done */
+ erts_smp_runq_unlock(c_rq);
+ break;
+ }
+ prev_proc = proc;
+ proc = proc->next;
}
+ if (rq_locked)
+ erts_smp_runq_unlock(rq);
}
- if (from_rq_locked)
- erts_smp_runq_unlock(from_rq);
- if (!rq_locked)
- erts_smp_runq_lock(rq);
}
}
+
+ erts_smp_runq_lock(c_rq);
+}
+
+static ERTS_INLINE void
+suspend_run_queue(ErtsRunQueue *rq)
+{
+ erts_smp_atomic32_read_bor_nob(&rq->scheduler->ssi->flags,
+ ERTS_SSI_FLG_SUSPENDED);
+ ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_SUSPENDED);
+
+ wake_scheduler(rq, 0);
}
+static void scheduler_ix_resume_wake(Uint ix);
+
+static ERTS_INLINE void
+resume_run_queue(ErtsRunQueue *rq)
+{
+ int pix;
+
+ erts_smp_runq_lock(rq);
+
+ (void) ERTS_RUNQ_FLGS_MASK_SET(rq,
+ (ERTS_RUNQ_FLG_OUT_OF_WORK
+ | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK
+ | ERTS_RUNQ_FLG_SUSPENDED),
+ (ERTS_RUNQ_FLG_OUT_OF_WORK
+ | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK));
+
+ rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS;
+ for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) {
+ rq->procs.prio_info[pix].max_len = 0;
+ rq->procs.prio_info[pix].reds = 0;
+ }
+ rq->ports.info.max_len = 0;
+ rq->ports.info.reds = 0;
+ rq->max_len = 0;
+
+ erts_smp_runq_unlock(rq);
+
+ scheduler_ix_resume_wake(rq->ix);
+}
+
+typedef struct {
+ Process *first;
+ Process *last;
+} ErtsStuckBoundProcesses;
+
static void
-evacuate_run_queue(ErtsRunQueue *evac_rq, ErtsRunQueue *rq)
+schedule_bound_processes(ErtsRunQueue *rq,
+ ErtsStuckBoundProcesses *sbpp)
{
- Port *prt;
- int notify_to_rq = 0;
- int prio;
- int prt_locked = 0;
- int rq_locked = 0;
- int evac_rq_locked = 1;
- ErtsMigrateResult mres;
+ Process *proc, *next;
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
- erts_smp_runq_lock(evac_rq);
+ proc = sbpp->first;
+ while (proc) {
+ erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state);
+ next = proc->next;
+ enqueue_process(rq, (int) (ERTS_PSFLG_PRIO_MASK & state), proc);
+ proc = next;
+ }
+}
- erts_smp_atomic32_read_bor_nob(&evac_rq->scheduler->ssi->flags,
- ERTS_SSI_FLG_SUSPENDED);
+static void
+evacuate_run_queue(ErtsRunQueue *rq,
+ ErtsStuckBoundProcesses *sbpp)
+{
+ int prio_q;
+ ErtsRunQueue *to_rq;
+ ErtsMigrationPaths *mps;
+ ErtsMigrationPath *mp;
- evac_rq->flags &= ~ERTS_RUNQ_FLGS_IMMIGRATE_QMASK;
- evac_rq->flags |= (ERTS_RUNQ_FLGS_EMIGRATE_QMASK
- | ERTS_RUNQ_FLGS_EVACUATE_QMASK
- | ERTS_RUNQ_FLG_SUSPENDED);
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
- erts_smp_atomic32_read_bor_nob(&evac_rq->info_flags, ERTS_RUNQ_IFLG_SUSPENDED);
- /*
- * Need to set up evacuation paths first since we
- * may release the run queue lock on evac_rq
- * when evacuating.
- */
- evac_rq->misc.evac_runq = rq;
- evac_rq->ports.info.migrate.runq = rq;
- for (prio = 0; prio < ERTS_NO_PROC_PRIO_LEVELS; prio++)
- evac_rq->procs.prio_info[prio].migrate.runq = rq;
+ ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
+
+ mps = erts_get_migration_paths_managed();
+ mp = &mps->mpath[rq->ix];
/* Evacuate scheduled misc ops */
- if (evac_rq->misc.start) {
- rq_locked = 1;
- erts_smp_xrunq_lock(evac_rq, rq);
- if (rq->misc.end)
- rq->misc.end->next = evac_rq->misc.start;
+ if (rq->misc.start) {
+ ErtsMiscOpList *start, *end;
+
+ to_rq = mp->misc_evac_runq;
+ if (!to_rq)
+ return;
+
+ start = rq->misc.start;
+ end = rq->misc.end;
+ rq->misc.start = NULL;
+ rq->misc.end = NULL;
+ erts_smp_runq_unlock(rq);
+
+ erts_smp_runq_lock(to_rq);
+ if (to_rq->misc.end)
+ to_rq->misc.end->next = start;
else
- rq->misc.start = evac_rq->misc.start;
- rq->misc.end = evac_rq->misc.end;
- evac_rq->misc.start = NULL;
- evac_rq->misc.end = NULL;
+ to_rq->misc.start = start;
+
+ to_rq->misc.end = end;
+ erts_smp_runq_unlock(to_rq);
+ smp_notify_inc_runq(to_rq);
+ erts_smp_runq_lock(to_rq);
}
- /* Evacuate scheduled ports */
- prt = evac_rq->ports.start;
- while (prt) {
- mres = erts_port_migrate(prt, &prt_locked,
- evac_rq, &evac_rq_locked,
- rq, &rq_locked);
- if (mres == ERTS_MIGRATE_SUCCESS)
- notify_to_rq = 1;
- if (prt_locked)
- erts_smp_port_unlock(prt);
- if (!evac_rq_locked) {
- evac_rq_locked = 1;
- erts_smp_runq_lock(evac_rq);
+ if (rq->ports.start) {
+ Port *prt;
+
+ to_rq = mp->prio[ERTS_PORT_PRIO_LEVEL].runq;
+ if (!to_rq)
+ return;
+
+ /* Evacuate scheduled ports */
+ prt = rq->ports.start;
+ while (prt) {
+ ErtsRunQueue *prt_rq;
+ prt = erts_dequeue_port(rq);
+ RUNQ_SET_RQ(&prt->run_queue, to_rq);
+ erts_smp_runq_unlock(rq);
+ /*
+ * The port might terminate while
+ * we have no lock on it...
+ */
+ prt_rq = erts_port_runq(prt);
+ if (prt_rq) {
+ if (prt_rq != to_rq)
+ erl_exit(ERTS_ABORT_EXIT,
+ "%s:%d:%s() internal error\n",
+ __FILE__, __LINE__, __func__);
+ erts_enqueue_port(to_rq, prt);
+ erts_smp_runq_unlock(to_rq);
+ }
+ erts_smp_runq_lock(rq);
+ prt = rq->ports.start;
}
- prt = evac_rq->ports.start;
+ smp_notify_inc_runq(to_rq);
}
/* Evacuate scheduled processes */
- for (prio = 0; prio < ERTS_NO_PROC_PRIO_LEVELS; prio++) {
+ for (prio_q = 0; prio_q < ERTS_NO_PROC_PRIO_QUEUES; prio_q++) {
+ erts_aint32_t state;
Process *proc;
+ int notify = 0;
+ to_rq = NULL;
- switch (prio) {
- case PRIORITY_MAX:
- case PRIORITY_HIGH:
- case PRIORITY_NORMAL:
- proc = evac_rq->procs.prio[prio].first;
- while (proc) {
- ErtsProcLocks proc_locks = 0;
-
- /* Bound processes are stuck... */
- while (proc->bound_runq) {
- proc = proc->next;
- if (!proc)
- goto end_of_proc;
- }
-
- mres = erts_proc_migrate(proc, &proc_locks,
- evac_rq, &evac_rq_locked,
- rq, &rq_locked);
- if (mres == ERTS_MIGRATE_SUCCESS)
- notify_to_rq = 1;
- if (proc_locks)
- erts_smp_proc_unlock(proc, proc_locks);
- if (!evac_rq_locked) {
- erts_smp_runq_lock(evac_rq);
- evac_rq_locked = 1;
- }
+ if (!mp->prio[prio_q].runq)
+ return;
+ if (prio_q == PRIORITY_NORMAL && !mp->prio[PRIORITY_LOW].runq)
+ return;
- proc = evac_rq->procs.prio[prio].first;
+ proc = dequeue_process(rq, prio_q, &state);
+ while (proc) {
+ if (ERTS_PSFLG_BOUND & state) {
+ /* Bound processes get stuck here... */
+ proc->next = NULL;
+ if (sbpp->last)
+ sbpp->last->next = proc;
+ else
+ sbpp->first = proc;
+ sbpp->last = proc;
}
+ else {
+ int prio = (int) (ERTS_PSFLG_PRIO_MASK & state);
+ erts_smp_runq_unlock(rq);
- end_of_proc:
+ to_rq = mp->prio[prio].runq;
+ RUNQ_SET_RQ(&proc->run_queue, to_rq);
-#ifdef DEBUG
- for (proc = evac_rq->procs.prio[prio].first;
- proc;
- proc = proc->next) {
- ASSERT(proc->bound_runq);
+ erts_smp_runq_lock(to_rq);
+ enqueue_process(to_rq, prio, proc);
+ erts_smp_runq_unlock(to_rq);
+ notify = 1;
+
+ erts_smp_runq_lock(rq);
}
-#endif
- break;
- case PRIORITY_LOW:
- break;
- default:
- ASSERT(!"Invalid process priority");
- break;
+ proc = dequeue_process(rq, prio_q, &state);
}
+ if (notify)
+ smp_notify_inc_runq(to_rq);
}
- if (rq_locked)
- erts_smp_runq_unlock(rq);
-
- if (evac_rq_locked)
- erts_smp_runq_unlock(evac_rq);
-
- if (notify_to_rq)
- smp_notify_inc_runq(rq);
-
- wake_scheduler(evac_rq, 0);
+ if (ERTS_EMPTY_RUNQ(rq))
+ empty_runq(rq);
}
static int
-try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq)
+try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq, Uint32 flags)
{
- Process *proc;
- int vrq_locked;
+ Uint32 procs_qmask = flags & ERTS_RUNQ_FLGS_PROCS_QMASK;
+ int max_prio_bit;
+ ErtsRunPrioQueue *rpq;
- if (*rq_lockedp)
- erts_smp_xrunq_lock(rq, vrq);
- else
- erts_smp_runq_lock(vrq);
- vrq_locked = 1;
+ if (*rq_lockedp) {
+ erts_smp_runq_unlock(rq);
+ *rq_lockedp = 0;
+ }
+
+ ERTS_SMP_LC_ASSERT(!erts_smp_lc_runq_is_locked(rq));
- ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked);
+ erts_smp_runq_lock(vrq);
if (rq->halt_in_progress)
- goto try_steal_port;
+ goto no_procs;
/*
* Check for a runnable process to steal...
*/
- switch (vrq->flags & ERTS_RUNQ_FLGS_PROCS_QMASK) {
- case MAX_BIT:
- case MAX_BIT|HIGH_BIT:
- case MAX_BIT|NORMAL_BIT:
- case MAX_BIT|LOW_BIT:
- case MAX_BIT|HIGH_BIT|NORMAL_BIT:
- case MAX_BIT|HIGH_BIT|LOW_BIT:
- case MAX_BIT|NORMAL_BIT|LOW_BIT:
- case MAX_BIT|HIGH_BIT|NORMAL_BIT|LOW_BIT:
- for (proc = vrq->procs.prio[PRIORITY_MAX].last;
- proc;
- proc = proc->prev) {
- if (!proc->bound_runq)
- break;
- }
- if (proc)
+ while (procs_qmask) {
+ Process *prev_proc;
+ Process *proc;
+
+ max_prio_bit = procs_qmask & -procs_qmask;
+ switch (max_prio_bit) {
+ case MAX_BIT:
+ rpq = &vrq->procs.prio[PRIORITY_MAX];
break;
- case HIGH_BIT:
- case HIGH_BIT|NORMAL_BIT:
- case HIGH_BIT|LOW_BIT:
- case HIGH_BIT|NORMAL_BIT|LOW_BIT:
- for (proc = vrq->procs.prio[PRIORITY_HIGH].last;
- proc;
- proc = proc->prev) {
- if (!proc->bound_runq)
- break;
- }
- if (proc)
+ case HIGH_BIT:
+ rpq = &vrq->procs.prio[PRIORITY_HIGH];
break;
- case NORMAL_BIT:
- case LOW_BIT:
- case NORMAL_BIT|LOW_BIT:
- for (proc = vrq->procs.prio[PRIORITY_NORMAL].last;
- proc;
- proc = proc->prev) {
- if (!proc->bound_runq)
- break;
- }
- if (proc)
+ case NORMAL_BIT:
+ case LOW_BIT:
+ rpq = &vrq->procs.prio[PRIORITY_NORMAL];
break;
- case 0:
- proc = NULL;
- break;
- default:
- ASSERT(!"Invalid queue mask");
- proc = NULL;
- break;
- }
+ case 0:
+ goto no_procs;
+ default:
+ ASSERT(!"Invalid queue mask");
+ goto no_procs;
+ }
+
+ prev_proc = NULL;
+ proc = rpq->first;
- if (proc) {
- ErtsProcLocks proc_locks = 0;
- int res;
- ErtsMigrateResult mres;
- mres = erts_proc_migrate(proc, &proc_locks,
- vrq, &vrq_locked,
- rq, rq_lockedp);
- if (proc_locks)
- erts_smp_proc_unlock(proc, proc_locks);
- res = !0;
- switch (mres) {
- case ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED:
- res = 0;
- case ERTS_MIGRATE_SUCCESS:
- if (vrq_locked)
+ while (proc) {
+ erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state);
+ if (!(ERTS_PSFLG_BOUND & state)) {
+ /* Steal process */
+ int prio = (int) (ERTS_PSFLG_PRIO_MASK & state);
+ ErtsRunQueueInfo *rqi = &vrq->procs.prio_info[prio];
+ unqueue_process(vrq, rpq, rqi, prio, prev_proc, proc);
erts_smp_runq_unlock(vrq);
- return res;
- default: /* Other failures */
- break;
- }
- }
+ RUNQ_SET_RQ(&proc->run_queue, rq);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked);
+ erts_smp_runq_lock(rq);
+ *rq_lockedp = 1;
+ enqueue_process(rq, prio, proc);
+ return !0;
+ }
+ prev_proc = proc;
+ proc = proc->next;
+ }
- if (!vrq_locked) {
- if (*rq_lockedp)
- erts_smp_xrunq_lock(rq, vrq);
- else
- erts_smp_runq_lock(vrq);
- vrq_locked = 1;
+ procs_qmask &= ~max_prio_bit;
}
- try_steal_port:
+no_procs:
- ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked);
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(vrq));
/*
* Check for a runnable port to steal...
*/
- if (vrq->ports.info.len) {
- Port *prt = vrq->ports.end;
- int prt_locked = 0;
- int res;
- ErtsMigrateResult mres;
-
- mres = erts_port_migrate(prt, &prt_locked,
- vrq, &vrq_locked,
- rq, rq_lockedp);
- if (prt_locked)
- erts_smp_port_unlock(prt);
- res = !0;
- switch (mres) {
- case ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED:
- res = 0;
- case ERTS_MIGRATE_SUCCESS:
- if (vrq_locked)
- erts_smp_runq_unlock(vrq);
- return res;
- default: /* Other failures */
- break;
+ if (vrq->ports.start) {
+ ErtsRunQueue *prt_rq;
+ Port *prt = erts_dequeue_port(vrq);
+ RUNQ_SET_RQ(&prt->run_queue, rq);
+ erts_smp_runq_unlock(vrq);
+
+ /*
+ * The port might terminate while
+ * we have no lock on it...
+ */
+
+ prt_rq = erts_port_runq(prt);
+ if (!prt_rq)
+ return 0;
+ else {
+ if (prt_rq != rq)
+ erl_exit(ERTS_ABORT_EXIT,
+ "%s:%d:%s() internal error\n",
+ __FILE__, __LINE__, __func__);
+ *rq_lockedp = 1;
+ erts_enqueue_port(rq, prt);
+ return !0;
}
}
- if (vrq_locked)
- erts_smp_runq_unlock(vrq);
+ erts_smp_runq_unlock(vrq);
return 0;
}
@@ -2964,9 +3338,10 @@ static ERTS_INLINE int
check_possible_steal_victim(ErtsRunQueue *rq, int *rq_lockedp, int vix)
{
ErtsRunQueue *vrq = ERTS_RUNQ_IX(vix);
- erts_aint32_t iflgs = erts_smp_atomic32_read_nob(&vrq->info_flags);
- if (iflgs & ERTS_RUNQ_IFLG_NONEMPTY)
- return try_steal_task_from_victim(rq, rq_lockedp, vrq);
+ Uint32 flags = ERTS_RUNQ_FLGS_GET(vrq);
+ if ((flags & (ERTS_RUNQ_FLG_NONEMPTY
+ | ERTS_RUNQ_FLG_PROTECTED)) == ERTS_RUNQ_FLG_NONEMPTY)
+ return try_steal_task_from_victim(rq, rq_lockedp, vrq, flags);
else
return 0;
}
@@ -2976,15 +3351,12 @@ static int
try_steal_task(ErtsRunQueue *rq)
{
int res, rq_locked, vix, active_rqs, blnc_rqs;
+ Uint32 flags;
- /*
- * We are not allowed to steal jobs to this run queue
- * if it is suspended. Note that it might get suspended
- * at any time when we don't have the lock on the run
- * queue.
- */
- if (rq->flags & ERTS_RUNQ_FLG_SUSPENDED)
- return 0;
+ /* Protect jobs we steal from getting stolen from us... */
+ flags = ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_PROTECTED);
+ if (flags & ERTS_RUNQ_FLG_SUSPENDED)
+ return 0; /* go suspend instead... */
res = 0;
rq_locked = 1;
@@ -3103,12 +3475,123 @@ do { \
ASSERT(sum__ == (RQ)->full_reds_history_sum); \
} while (0);
+#define ERTS_PRE_ALLOCED_MPATHS 8
+
+erts_atomic_t erts_migration_paths;
+
+static struct {
+ size_t size;
+ ErtsMigrationPaths *freelist;
+ struct {
+ ErtsMigrationPaths *first;
+ ErtsMigrationPaths *last;
+ } retired;
+} mpaths;
+
+static void
+init_migration_paths(void)
+{
+ int qix, i;
+ char *p;
+ ErtsMigrationPaths *mps;
+
+ mpaths.size = sizeof(ErtsMigrationPaths);
+ mpaths.size += sizeof(ErtsMigrationPath)*(erts_no_schedulers-1);
+ mpaths.size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(mpaths.size);
+
+ p = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_LL_MPATHS,
+ (mpaths.size
+ * ERTS_PRE_ALLOCED_MPATHS));
+ mpaths.freelist = NULL;
+ for (i = 0; i < ERTS_PRE_ALLOCED_MPATHS-1; i++) {
+ mps = (ErtsMigrationPaths *) p;
+ mps->next = mpaths.freelist;
+ mpaths.freelist = mps;
+ p += mpaths.size;
+ }
+
+ mps = (ErtsMigrationPaths *) p;
+ mps->block = NULL;
+ for (qix = 0; qix < erts_no_run_queues; qix++) {
+ int pix;
+ mps->mpath[qix].flags = 0;
+ mps->mpath[qix].misc_evac_runq = NULL;
+ for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
+ mps->mpath[qix].prio[pix].limit.this = -1;
+ mps->mpath[qix].prio[pix].limit.other = -1;
+ mps->mpath[qix].prio[pix].runq = NULL;
+ mps->mpath[qix].prio[pix].flags = 0;
+ }
+ }
+ erts_atomic_init_wb(&erts_migration_paths, (erts_aint_t) mps);
+}
+
+static ERTS_INLINE ErtsMigrationPaths *
+alloc_mpaths(void)
+{
+ void *block;
+ ErtsMigrationPaths *res;
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&balance_info.update_mtx));
+
+ res = mpaths.freelist;
+ if (res) {
+ mpaths.freelist = res->next;
+ res->block = NULL;
+ return res;
+ }
+ res = erts_alloc(ERTS_ALC_T_SL_MPATHS,
+ mpaths.size+ERTS_CACHE_LINE_SIZE);
+ block = (void *) res;
+ if (((UWord) res) & ERTS_CACHE_LINE_MASK)
+ res = (ErtsMigrationPaths *) ((((UWord) res) & ~ERTS_CACHE_LINE_MASK)
+ + ERTS_CACHE_LINE_SIZE);
+ res->block = block;
+ return res;
+}
+
+static ERTS_INLINE void
+retire_mpaths(ErtsMigrationPaths *mps)
+{
+ ErtsThrPrgrVal current;
+
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&balance_info.update_mtx));
+
+ current = erts_thr_progress_current();
+
+ while (mpaths.retired.first) {
+ ErtsMigrationPaths *tmp = mpaths.retired.first;
+ if (!erts_thr_progress_has_reached_this(current, tmp->thr_prgr))
+ break;
+ mpaths.retired.first = tmp->next;
+ if (tmp->block) {
+ erts_free(ERTS_ALC_T_SL_MPATHS, tmp->block);
+ }
+ else {
+ tmp->next = mpaths.freelist;
+ mpaths.freelist = tmp;
+ }
+ }
+
+ if (!mpaths.retired.first)
+ mpaths.retired.last = NULL;
+
+ mps->thr_prgr = erts_thr_progress_later_than(current);
+ mps->next = NULL;
+
+ if (mpaths.retired.last)
+ mpaths.retired.last->next = mps;
+ else
+ mpaths.retired.first = mps;
+ mpaths.retired.last = mps;
+}
+
static void
check_balance(ErtsRunQueue *c_rq)
{
#if ERTS_MAX_PROCESSES >= (1 << 27)
# error check_balance() assumes ERTS_MAX_PROCESS < (1 << 27)
#endif
+ ErtsMigrationPaths *new_mpaths, *old_mpaths;
ErtsRunQueueBalance avg = {0};
Sint64 scheds_reds, full_scheds_reds;
int forced, active, current_active, oowc, half_full_scheds, full_scheds,
@@ -3134,9 +3617,9 @@ check_balance(ErtsRunQueue *c_rq)
ERTS_FOREACH_RUNQ(rq,
{
if (rq->waiting)
- rq->flags |= ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK;
+ ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK);
else
- rq->flags &= ~ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK;
+ ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK);
rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS;
});
@@ -3178,7 +3661,7 @@ check_balance(ErtsRunQueue *c_rq)
ErtsRunQueue *rq = ERTS_RUNQ_IX(qix);
erts_smp_runq_lock(rq);
- run_queue_info[qix].flags = rq->flags;
+ run_queue_info[qix].flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) {
run_queue_info[qix].prio[pix].max_len
= rq->procs.prio_info[pix].max_len;
@@ -3512,47 +3995,27 @@ erts_fprintf(stderr, "--------------------------------\n");
set_no_active_runqs(active);
balance_info.halftime = 1;
- erts_smp_atomic32_set_nob(&balance_info.checking_balance, 0);
+ new_mpaths = alloc_mpaths();
+
+ /* Write migration paths */
- /* Write migration paths and reset balance statistics in all queues */
for (qix = 0; qix < blnc_no_rqs; qix++) {
int mqix;
- Uint32 flags;
- ErtsRunQueue *rq = ERTS_RUNQ_IX(qix);
- ErtsRunQueueInfo *rqi;
- flags = run_queue_info[qix].flags;
- erts_smp_runq_lock(rq);
- flags |= (rq->flags & ~ERTS_RUNQ_FLGS_MIGRATION_INFO);
- ASSERT(!(flags & ERTS_RUNQ_FLG_OUT_OF_WORK));
- if (rq->waiting)
- flags |= ERTS_RUNQ_FLG_OUT_OF_WORK;
+ Uint32 flags = run_queue_info[qix].flags;
+ ErtsMigrationPath *mp = &new_mpaths->mpath[qix];
- rq->full_reds_history_sum
- = run_queue_info[qix].full_reds_history_sum;
- rq->full_reds_history[freds_hist_ix]
- = run_queue_info[qix].full_reds_history_change;
+ mp->flags = flags;
+ mp->misc_evac_runq = NULL;
- ERTS_DBG_CHK_FULL_REDS_HISTORY(rq);
-
- rq->out_of_work_count = 0;
- rq->flags = flags;
- rq->max_len = rq->len;
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
- rqi = (pix == ERTS_PORT_PRIO_LEVEL
- ? &rq->ports.info
- : &rq->procs.prio_info[pix]);
- rqi->max_len = rqi->len;
- rqi->reds = 0;
if (!(ERTS_CHK_RUNQ_FLG_EMIGRATE(flags, pix)
| ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix))) {
ASSERT(run_queue_info[qix].prio[pix].immigrate_from < 0);
ASSERT(run_queue_info[qix].prio[pix].emigrate_to < 0);
-#ifdef DEBUG
- rqi->migrate.limit.this = -1;
- rqi->migrate.limit.other = -1;
- ERTS_DBG_SET_INVALID_RUNQP(rqi->migrate.runq, 0x2);
-#endif
-
+ mp->prio[pix].limit.this = -1;
+ mp->prio[pix].limit.other = -1;
+ mp->prio[pix].runq = NULL;
+ mp->prio[pix].flags = 0;
}
else if (ERTS_CHK_RUNQ_FLG_EMIGRATE(flags, pix)) {
ASSERT(!ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix));
@@ -3560,11 +4023,12 @@ erts_fprintf(stderr, "--------------------------------\n");
ASSERT(run_queue_info[qix].prio[pix].emigrate_to >= 0);
mqix = run_queue_info[qix].prio[pix].emigrate_to;
- rqi->migrate.limit.this
+ mp->prio[pix].limit.this
= run_queue_info[qix].prio[pix].migration_limit;
- rqi->migrate.limit.other
+ mp->prio[pix].limit.other
= run_queue_info[mqix].prio[pix].migration_limit;
- rqi->migrate.runq = ERTS_RUNQ_IX(mqix);
+ mp->prio[pix].runq = ERTS_RUNQ_IX(mqix);
+ mp->prio[pix].flags = run_queue_info[mqix].flags;
}
else {
ASSERT(ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix));
@@ -3572,24 +4036,142 @@ erts_fprintf(stderr, "--------------------------------\n");
ASSERT(run_queue_info[qix].prio[pix].immigrate_from >= 0);
mqix = run_queue_info[qix].prio[pix].immigrate_from;
- rqi->migrate.limit.this
+ mp->prio[pix].limit.this
= run_queue_info[qix].prio[pix].migration_limit;
- rqi->migrate.limit.other
+ mp->prio[pix].limit.other
= run_queue_info[mqix].prio[pix].migration_limit;
- rqi->migrate.runq = ERTS_RUNQ_IX(mqix);
+ mp->prio[pix].runq = ERTS_RUNQ_IX(mqix);
+ mp->prio[pix].flags = run_queue_info[mqix].flags;
}
}
+ }
+
+ old_mpaths = erts_get_migration_paths_managed();
+
+ /* Keep offline run-queues as is */
+ for (qix = blnc_no_rqs; qix < erts_no_schedulers; qix++) {
+ ErtsMigrationPath *nmp = &new_mpaths->mpath[qix];
+ ErtsMigrationPath *omp = &old_mpaths->mpath[qix];
+
+ nmp->flags = omp->flags;
+ nmp->misc_evac_runq = omp->misc_evac_runq;
+
+ for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
+ nmp->prio[pix].limit.this = omp->prio[pix].limit.this;
+ nmp->prio[pix].limit.other = omp->prio[pix].limit.other;
+ nmp->prio[pix].runq = omp->prio[pix].runq;
+ nmp->prio[pix].flags = omp->prio[pix].flags;
+ }
+ }
+
+
+ /* Publish new migration paths... */
+ erts_atomic_set_wb(&erts_migration_paths, (erts_aint_t) new_mpaths);
+
+ /* Reset balance statistics in all online queues */
+ for (qix = 0; qix < blnc_no_rqs; qix++) {
+ Uint32 flags = run_queue_info[qix].flags;
+ ErtsRunQueue *rq = ERTS_RUNQ_IX(qix);
+
+ erts_smp_runq_lock(rq);
+ ASSERT(!(flags & ERTS_RUNQ_FLG_OUT_OF_WORK));
+ if (rq->waiting)
+ flags |= ERTS_RUNQ_FLG_OUT_OF_WORK;
+
+ rq->full_reds_history_sum
+ = run_queue_info[qix].full_reds_history_sum;
+ rq->full_reds_history[freds_hist_ix]
+ = run_queue_info[qix].full_reds_history_change;
+
+ ERTS_DBG_CHK_FULL_REDS_HISTORY(rq);
+
+ rq->out_of_work_count = 0;
+ (void) ERTS_RUNQ_FLGS_MASK_SET(rq, ERTS_RUNQ_FLGS_MIGRATION_INFO, flags);
+
+ rq->max_len = rq->len;
+ for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
+ ErtsRunQueueInfo *rqi;
+ rqi = (pix == ERTS_PORT_PRIO_LEVEL
+ ? &rq->ports.info
+ : &rq->procs.prio_info[pix]);
+ erts_smp_reset_max_len(rq, rqi);
+ rqi->reds = 0;
+ }
rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS;
erts_smp_runq_unlock(rq);
}
+ erts_smp_atomic32_set_nob(&balance_info.checking_balance, 0);
+
balance_info.n++;
+ retire_mpaths(old_mpaths);
erts_smp_mtx_unlock(&balance_info.update_mtx);
erts_smp_runq_lock(c_rq);
}
+static void
+change_no_used_runqs(int used)
+{
+ ErtsMigrationPaths *new_mpaths, *old_mpaths;
+ int active, qix;
+ erts_smp_mtx_lock(&balance_info.update_mtx);
+ get_no_runqs(&active, NULL);
+ set_no_used_runqs(used);
+
+ old_mpaths = erts_get_migration_paths_managed();
+ new_mpaths = alloc_mpaths();
+
+ /* Write migration paths... */
+
+ for (qix = 0; qix < used; qix++) {
+ int pix;
+ ErtsMigrationPath *omp = &old_mpaths->mpath[qix];
+ ErtsMigrationPath *nmp = &new_mpaths->mpath[qix];
+
+ nmp->flags = omp->flags & ~ERTS_RUNQ_FLGS_MIGRATION_QMASKS;
+ nmp->misc_evac_runq = NULL;
+
+ for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
+ nmp->prio[pix].limit.this = -1;
+ nmp->prio[pix].limit.other = -1;
+ nmp->prio[pix].runq = NULL;
+ nmp->prio[pix].flags = 0;
+ }
+ }
+ for (qix = used; qix < erts_no_run_queues; qix++) {
+ int pix;
+ ErtsRunQueue *to_rq = ERTS_RUNQ_IX(qix % used);
+ ErtsMigrationPath *nmp = &new_mpaths->mpath[qix];
+
+ nmp->flags = (ERTS_RUNQ_FLGS_EMIGRATE_QMASK
+ | ERTS_RUNQ_FLGS_EVACUATE_QMASK);
+ nmp->misc_evac_runq = to_rq;
+ for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
+ nmp->prio[pix].limit.this = -1;
+ nmp->prio[pix].limit.other = -1;
+ nmp->prio[pix].runq = to_rq;
+ nmp->prio[pix].flags = 0;
+ }
+ }
+
+ /* ... and publish them. */
+ erts_atomic_set_wb(&erts_migration_paths, (erts_aint_t) new_mpaths);
+
+ retire_mpaths(old_mpaths);
+
+ /* Make sure that we balance soon... */
+ balance_info.forced_check_balance = 1;
+
+ erts_smp_mtx_unlock(&balance_info.update_mtx);
+
+ erts_smp_runq_lock(ERTS_RUNQ_IX(0));
+ ERTS_RUNQ_IX(0)->check_balance_reds = 0;
+ erts_smp_runq_unlock(ERTS_RUNQ_IX(0));
+}
+
+
#endif /* #ifdef ERTS_SMP */
Uint
@@ -3681,7 +4263,6 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
rq->ix = ix;
- erts_smp_atomic32_init_nob(&rq->info_flags, ERTS_RUNQ_IFLG_NONEMPTY);
/* make sure that the "extra" id correponds to the schedulers
* id if the esdp->no <-> ix+1 mapping change.
@@ -3692,7 +4273,7 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
rq->waiting = 0;
rq->woken = 0;
- rq->flags = 0;
+ ERTS_RUNQ_FLGS_INIT(rq, ERTS_RUNQ_FLG_NONEMPTY);
rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS;
rq->full_reds_history_sum = 0;
for (rix = 0; rix < ERTS_FULL_REDS_HISTORY_SIZE; rix++) {
@@ -3706,19 +4287,14 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
rq->wakeup_other_reds = 0;
rq->halt_in_progress = 0;
- rq->procs.len = 0;
rq->procs.pending_exiters = NULL;
rq->procs.context_switches = 0;
rq->procs.reductions = 0;
for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) {
- rq->procs.prio_info[pix].len = 0;
+ erts_smp_atomic32_init_nob(&rq->procs.prio_info[pix].len, 0);
rq->procs.prio_info[pix].max_len = 0;
rq->procs.prio_info[pix].reds = 0;
- rq->procs.prio_info[pix].migrate.limit.this = 0;
- rq->procs.prio_info[pix].migrate.limit.other = 0;
- ERTS_DBG_SET_INVALID_RUNQP(rq->procs.prio_info[pix].migrate.runq,
- 0x0);
if (pix < ERTS_NO_PROC_PRIO_LEVELS - 1) {
rq->procs.prio[pix].first = NULL;
rq->procs.prio[pix].last = NULL;
@@ -3727,14 +4303,10 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
rq->misc.start = NULL;
rq->misc.end = NULL;
- rq->misc.evac_runq = NULL;
- rq->ports.info.len = 0;
+ erts_smp_atomic32_init_nob(&rq->ports.info.len, 0);
rq->ports.info.max_len = 0;
rq->ports.info.reds = 0;
- rq->ports.info.migrate.limit.this = 0;
- rq->ports.info.migrate.limit.other = 0;
- rq->ports.info.migrate.runq = NULL;
rq->ports.start = NULL;
rq->ports.end = NULL;
}
@@ -3857,10 +4429,12 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
balance_info.prev_rise.reds = 0;
balance_info.n = 0;
+ init_migration_paths();
+
if (no_schedulers_online < no_schedulers) {
+ change_no_used_runqs(no_schedulers_online);
for (ix = no_schedulers_online; ix < erts_no_run_queues; ix++)
- evacuate_run_queue(ERTS_RUNQ_IX(ix),
- ERTS_RUNQ_IX(ix % no_schedulers_online));
+ suspend_run_queue(ERTS_RUNQ_IX(ix));
}
schdlr_sspnd.wait_curr_online = no_schedulers_online;
@@ -3923,91 +4497,208 @@ erts_get_scheduler_data(void)
#endif
-static int remove_proc_from_runq(ErtsRunQueue *rq, Process *p, int to_inactive);
+/*
+ * scheduler_out_process() return with c_rq locked.
+ */
+static ERTS_INLINE int
+schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p)
+{
+ erts_aint32_t a, e, n;
+ int res = 0;
+
+ a = state;
+
+ while (1) {
+ n = e = a;
+
+ ASSERT(a & ERTS_PSFLG_RUNNING);
+ ASSERT(!(a & ERTS_PSFLG_IN_RUNQ));
+
+ n &= ~ERTS_PSFLG_RUNNING;
+ if ((a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE)
+ n |= ERTS_PSFLG_IN_RUNQ;
+ a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e);
+ if (a == e)
+ break;
+ }
+
+ if (!(n & ERTS_PSFLG_IN_RUNQ)) {
+ if (erts_system_profile_flags.runnable_procs)
+ profile_runnable_proc(p, am_inactive);
+ }
+ else {
+ int prio = (int) (ERTS_PSFLG_PRIO_MASK & n);
+ ErtsRunQueue *runq = erts_get_runq_proc(p);
+
+ ASSERT(!(n & ERTS_PSFLG_SUSPENDED));
+
+#ifdef ERTS_SMP
+ if (!(ERTS_PSFLG_BOUND & n)) {
+ ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio);
+ if (new_runq) {
+ RUNQ_SET_RQ(&p->run_queue, new_runq);
+ runq = new_runq;
+ }
+ }
+#endif
+ ASSERT(runq);
+ res = 1;
+
+ erts_smp_runq_lock(runq);
+
+ /* Enqueue the process */
+ enqueue_process(runq, prio, p);
+
+ if (runq == c_rq)
+ return res;
+ erts_smp_runq_unlock(runq);
+ smp_notify_inc_runq(runq);
+ }
+ erts_smp_runq_lock(c_rq);
+ return res;
+}
static ERTS_INLINE void
-suspend_process(ErtsRunQueue *rq, Process *p)
+add2runq(Process *p, erts_aint32_t state)
{
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
- p->rcount++; /* count number of suspend */
-#ifdef ERTS_SMP
- ASSERT(!(p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING)
- || p == erts_get_current_process());
- ASSERT(p->status != P_RUNNING
- || p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING);
- if (p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ)
- goto runable;
-#endif
- switch(p->status) {
- case P_SUSPENDED:
- break;
- case P_RUNABLE:
+ int prio = (int) (ERTS_PSFLG_PRIO_MASK & state);
+ ErtsRunQueue *runq = erts_get_runq_proc(p);
+
#ifdef ERTS_SMP
- runable:
- if (!ERTS_PROC_PENDING_EXIT(p))
+ if (!(ERTS_PSFLG_BOUND & state)) {
+ ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio);
+ if (new_runq) {
+ RUNQ_SET_RQ(&p->run_queue, new_runq);
+ runq = new_runq;
+ }
+ }
#endif
- remove_proc_from_runq(rq, p, 1);
- /* else:
- * leave process in schedq so it will discover the pending exit
- */
- p->rstatus = P_RUNABLE; /* wakeup as runnable */
- break;
- case P_RUNNING:
- p->rstatus = P_RUNABLE; /* wakeup as runnable */
- break;
- case P_WAITING:
- p->rstatus = P_WAITING; /* wakeup as waiting */
- break;
- case P_EXITING:
- return; /* ignore this */
- case P_GARBING:
- case P_FREE:
- erl_exit(1, "bad state in suspend_process()\n");
+ ASSERT(runq);
+
+ erts_smp_runq_lock(runq);
+
+ /* Enqueue the process */
+ enqueue_process(runq, prio, p);
+
+ erts_smp_runq_unlock(runq);
+ smp_notify_inc_runq(runq);
+
+}
+
+static ERTS_INLINE void
+schedule_process(Process *p, erts_aint32_t state, int active_enq)
+{
+ erts_aint32_t a = state, n;
+
+ while (1) {
+ erts_aint32_t e;
+ n = e = a;
+ ASSERT(!(a & ERTS_PSFLG_FREE));
+ n |= ERTS_PSFLG_ACTIVE;
+ if (!(a & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_RUNNING)))
+ n |= ERTS_PSFLG_IN_RUNQ;
+ a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e);
+ if (a == e)
+ break;
+ if (!active_enq && (a & ERTS_PSFLG_ACTIVE))
+ return; /* Someone else activated process ... */
}
- if ((erts_system_profile_flags.runnable_procs) && (p->rcount == 1) && (p->status != P_WAITING)) {
- profile_runnable_proc(p, am_inactive);
+#ifdef RRR_DEBUG
+ if (active_enq)
+ erts_fprintf(stderr, "! state=0x%x\n", n);
+#endif
+
+ if (erts_system_profile_flags.runnable_procs
+ && !(a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) {
+ profile_runnable_proc(p, am_active);
}
- p->status = P_SUSPENDED;
-
+ if ((n & ERTS_PSFLG_IN_RUNQ) && !(a & ERTS_PSFLG_IN_RUNQ)) {
+#ifdef RRR_DEBUG
+ if (active_enq)
+ erts_fprintf(stderr, "-->\n");
+#endif
+ add2runq(p, n);
+ }
}
-static ERTS_INLINE void
-resume_process(Process *p)
+void
+erts_schedule_process(Process *p, erts_aint32_t state)
{
- Uint32 *statusp;
+ schedule_process(p, state, 0);
+}
+
+static ERTS_INLINE int
+suspend_process(Process *c_p, Process *p)
+{
+ erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state);
+ int suspended = 0;
ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
- switch (p->status) {
- case P_SUSPENDED:
- statusp = &p->status;
- break;
- case P_GARBING:
- if (p->gcstatus == P_SUSPENDED) {
- statusp = &p->gcstatus;
- break;
+
+ if ((state & ERTS_PSFLG_SUSPENDED))
+ suspended = -1;
+ else {
+ if (c_p == p) {
+ state = erts_smp_atomic32_read_bor_relb(&p->state,
+ ERTS_PSFLG_SUSPENDED);
+ state |= ERTS_PSFLG_SUSPENDED;
+ ASSERT(state & ERTS_PSFLG_RUNNING);
+ suspended = 1;
+ }
+ else {
+ while (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_EXITING))) {
+ erts_aint32_t e, n;
+ n = e = state;
+ n |= ERTS_PSFLG_SUSPENDED;
+ state = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e);
+ if (state == e) {
+ state = n;
+ suspended = 1;
+ break;
+ }
+ }
}
- /* Fall through */
- default:
- return;
}
+ if (state & ERTS_PSFLG_SUSPENDED) {
+
+ ASSERT(!(ERTS_PSFLG_RUNNING & state)
+ || p == erts_get_current_process());
+
+ if (erts_system_profile_flags.runnable_procs
+ && (p->rcount == 0)
+ && (state & ERTS_PSFLG_ACTIVE)) {
+ profile_runnable_proc(p, am_inactive);
+ }
+
+ p->rcount++; /* count number of suspend */
+ }
+ return suspended;
+}
+
+static ERTS_INLINE void
+resume_process(Process *p)
+{
+ erts_aint32_t state;
+ ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
+
ASSERT(p->rcount > 0);
if (--p->rcount > 0) /* multiple suspend */
return;
- switch(p->rstatus) {
- case P_RUNABLE:
- erts_add_to_runq(p);
- break;
- case P_WAITING:
- *statusp = P_WAITING;
- break;
- default:
- erl_exit(1, "bad state in resume_process()\n");
+
+ state = erts_smp_atomic32_read_band_mb(&p->state, ~ERTS_PSFLG_SUSPENDED);
+ state &= ~ERTS_PSFLG_SUSPENDED;
+#ifdef RRR_DEBUG
+ erts_fprintf(stderr, "%T - state=0x%x\n", p->id, state);
+#endif
+ if ((state & (ERTS_PSFLG_EXITING
+ | ERTS_PSFLG_ACTIVE
+ | ERTS_PSFLG_IN_RUNQ
+ | ERTS_PSFLG_RUNNING)) == ERTS_PSFLG_ACTIVE) {
+ schedule_process(p, state, 1);
}
- p->rstatus = P_FREE;
}
int
@@ -4131,6 +4822,7 @@ suspend_scheduler(ErtsSchedulerData *esdp)
int wake = 0;
erts_aint32_t aux_work;
int thr_prgr_active = 1;
+ ErtsStuckBoundProcesses sbp = {NULL, NULL};
/*
* Schedulers may be suspended in two different ways:
@@ -4145,6 +4837,8 @@ suspend_scheduler(ErtsSchedulerData *esdp)
ASSERT(no != 1);
+ evacuate_run_queue(esdp->run_queue, &sbp);
+
erts_smp_runq_unlock(esdp->run_queue);
erts_sched_check_cpu_bind_prep_suspend(esdp);
@@ -4208,17 +4902,26 @@ suspend_scheduler(ErtsSchedulerData *esdp)
erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
while (1) {
+ erts_aint32_t qmask;
erts_aint32_t flgs;
+ qmask = (ERTS_RUNQ_FLGS_GET(esdp->run_queue)
+ & ERTS_RUNQ_FLGS_QMASK);
aux_work = erts_atomic32_read_acqb(&ssi->aux_work);
- if (aux_work) {
+ if (aux_work|qmask) {
if (!thr_prgr_active) {
erts_thr_progress_active(esdp, thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
- aux_work = handle_aux_work(&esdp->aux_work_data, aux_work);
+ if (aux_work)
+ aux_work = handle_aux_work(&esdp->aux_work_data, aux_work);
if (aux_work && erts_thr_progress_update(esdp))
erts_thr_progress_leader_update(esdp);
+ if (qmask) {
+ erts_smp_runq_lock(esdp->run_queue);
+ evacuate_run_queue(esdp->run_queue, &sbp);
+ erts_smp_runq_unlock(esdp->run_queue);
+ }
}
if (!aux_work) {
@@ -4288,56 +4991,11 @@ suspend_scheduler(ErtsSchedulerData *esdp)
erts_smp_runq_lock(esdp->run_queue);
non_empty_runq(esdp->run_queue);
+ schedule_bound_processes(esdp->run_queue, &sbp);
+
erts_sched_check_cpu_bind_post_suspend(esdp);
}
-#define ERTS_RUNQ_RESET_SUSPEND_INFO(RQ, DBG_ID) \
-do { \
- int pix__; \
- (RQ)->misc.evac_runq = NULL; \
- (RQ)->ports.info.migrate.runq = NULL; \
- (RQ)->flags &= ~(ERTS_RUNQ_FLGS_IMMIGRATE_QMASK \
- | ERTS_RUNQ_FLGS_EMIGRATE_QMASK \
- | ERTS_RUNQ_FLGS_EVACUATE_QMASK \
- | ERTS_RUNQ_FLG_SUSPENDED); \
- (RQ)->flags |= (ERTS_RUNQ_FLG_OUT_OF_WORK \
- | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK); \
- (RQ)->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; \
- erts_smp_atomic32_read_band_nob(&(RQ)->info_flags, ~ERTS_RUNQ_IFLG_SUSPENDED);\
- for (pix__ = 0; pix__ < ERTS_NO_PROC_PRIO_LEVELS; pix__++) { \
- (RQ)->procs.prio_info[pix__].max_len = 0; \
- (RQ)->procs.prio_info[pix__].reds = 0; \
- ERTS_DBG_SET_INVALID_RUNQP((RQ)->procs.prio_info[pix__].migrate.runq,\
- (DBG_ID)); \
- } \
- (RQ)->ports.info.max_len = 0; \
- (RQ)->ports.info.reds = 0; \
-} while (0)
-
-#define ERTS_RUNQ_RESET_MIGRATION_PATHS__(RQ) \
-do { \
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked((RQ))); \
- (RQ)->misc.evac_runq = NULL; \
- (RQ)->ports.info.migrate.runq = NULL; \
- (RQ)->flags &= ~(ERTS_RUNQ_FLGS_IMMIGRATE_QMASK \
- | ERTS_RUNQ_FLGS_EMIGRATE_QMASK \
- | ERTS_RUNQ_FLGS_EVACUATE_QMASK); \
-} while (0)
-
-#ifdef DEBUG
-#define ERTS_RUNQ_RESET_MIGRATION_PATHS(RQ, DBG_ID) \
-do { \
- int pix__; \
- ERTS_RUNQ_RESET_MIGRATION_PATHS__((RQ)); \
- for (pix__ = 0; pix__ < ERTS_NO_PROC_PRIO_LEVELS; pix__++) \
- ERTS_DBG_SET_INVALID_RUNQP((RQ)->procs.prio_info[pix__].migrate.runq,\
- (DBG_ID)); \
-} while (0)
-#else
-#define ERTS_RUNQ_RESET_MIGRATION_PATHS(RQ, DBG_ID) \
- ERTS_RUNQ_RESET_MIGRATION_PATHS__((RQ))
-#endif
-
ErtsSchedSuspendResult
erts_schedulers_state(Uint *total,
Uint *online,
@@ -4407,27 +5065,13 @@ erts_set_schedulers_online(Process *p,
have_unlocked_plocks = 1;
erts_smp_proc_unlock(p, plocks);
}
- erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
- erts_smp_mtx_lock(&balance_info.update_mtx);
- for (ix = online; ix < no; ix++) {
- ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- erts_smp_runq_lock(rq);
- ERTS_RUNQ_RESET_SUSPEND_INFO(rq, 0x5);
- erts_smp_runq_unlock(rq);
- scheduler_ix_resume_wake(ix);
- }
- /*
- * Spread evacuation paths among all online
- * run queues.
- */
- for (ix = no; ix < erts_no_run_queues; ix++) {
- ErtsRunQueue *from_rq = ERTS_RUNQ_IX(ix);
- ErtsRunQueue *to_rq = ERTS_RUNQ_IX(ix % no);
- evacuate_run_queue(from_rq, to_rq);
- }
- set_no_used_runqs(no);
- erts_smp_mtx_unlock(&balance_info.update_mtx);
- erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+ change_no_used_runqs(no);
+
+ for (ix = online; ix < no; ix++)
+ resume_run_queue(ERTS_RUNQ_IX(ix));
+
+ for (ix = no; ix < erts_no_run_queues; ix++)
+ suspend_run_queue(ERTS_RUNQ_IX(ix));
}
res = ERTS_SCHDLR_SSPND_DONE;
}
@@ -4454,25 +5098,11 @@ erts_set_schedulers_online(Process *p,
have_unlocked_plocks = 1;
erts_smp_proc_unlock(p, plocks);
}
- erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
- erts_smp_mtx_lock(&balance_info.update_mtx);
-
- for (ix = 0; ix < online; ix++) {
- ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- erts_smp_runq_lock(rq);
- ERTS_RUNQ_RESET_MIGRATION_PATHS(rq, 0x6);
- erts_smp_runq_unlock(rq);
- }
- /*
- * Evacutation order important! Newly suspended run queues
- * has to be evacuated last.
- */
- for (ix = erts_no_run_queues-1; ix >= no; ix--)
- evacuate_run_queue(ERTS_RUNQ_IX(ix),
- ERTS_RUNQ_IX(ix % no));
- set_no_used_runqs(no);
- erts_smp_mtx_unlock(&balance_info.update_mtx);
- erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+
+ change_no_used_runqs(no);
+ for (ix = no; ix < erts_no_run_queues; ix++)
+ suspend_run_queue(ERTS_RUNQ_IX(ix));
+
for (ix = no; ix < online; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
wake_scheduler(rq, 0);
@@ -4569,25 +5199,14 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
schdlr_sspnd.msb.wait_active = 2;
}
- erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
- erts_smp_mtx_lock(&balance_info.update_mtx);
- set_no_used_runqs(1);
- for (ix = 0; ix < online; ix++) {
+ change_no_used_runqs(1);
+ for (ix = 1; ix < erts_no_run_queues; ix++)
+ suspend_run_queue(ERTS_RUNQ_IX(ix));
+
+ for (ix = 1; ix < online; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- erts_smp_runq_lock(rq);
- ASSERT(!(rq->flags & ERTS_RUNQ_FLG_SUSPENDED));
- ERTS_RUNQ_RESET_MIGRATION_PATHS(rq, 0x7);
- erts_smp_runq_unlock(rq);
+ wake_scheduler(rq, 0);
}
- /*
- * Evacuate all activities in all other run queues
- * into the first run queue. Note order is important,
- * online run queues has to be evacuated last.
- */
- for (ix = erts_no_run_queues-1; ix >= 1; ix--)
- evacuate_run_queue(ERTS_RUNQ_IX(ix), ERTS_RUNQ_IX(0));
- erts_smp_mtx_unlock(&balance_info.update_mtx);
- erts_smp_mtx_lock(&schdlr_sspnd.mtx);
if (erts_smp_atomic32_read_nob(&schdlr_sspnd.active)
!= schdlr_sspnd.msb.wait_active) {
@@ -4631,15 +5250,6 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
plp = proclist_create(p);
plp->next = schdlr_sspnd.msb.procs;
schdlr_sspnd.msb.procs = plp;
-#ifdef DEBUG
- ERTS_FOREACH_RUNQ(srq,
- {
- if (srq != ERTS_RUNQ_IX(0)) {
- ASSERT(ERTS_EMPTY_RUNQ(srq));
- ASSERT(srq->flags & ERTS_RUNQ_FLG_SUSPENDED);
- }
- });
-#endif
ASSERT(p->scheduler_data);
}
}
@@ -4671,27 +5281,6 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
res = ERTS_SCHDLR_SSPND_DONE_MSCHED_BLOCKED;
else {
ERTS_SCHDLR_SSPND_CHNG_SET(ERTS_SCHDLR_SSPND_CHNG_MSB, 0);
-#ifdef DEBUG
- ERTS_FOREACH_RUNQ(rq,
- {
- if (rq != p->scheduler_data->run_queue) {
- if (!ERTS_EMPTY_RUNQ(rq)) {
- Process *rp;
- int pix;
- ASSERT(rq->ports.info.len == 0);
- for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) {
- for (rp = rq->procs.prio[pix].first;
- rp;
- rp = rp->next) {
- ASSERT(rp->bound_runq);
- }
- }
- }
-
- ASSERT(rq->flags & ERTS_RUNQ_FLG_SUSPENDED);
- }
- });
-#endif
p->flags &= ~F_HAVE_BLCKD_MSCHED;
schdlr_sspnd.msb.ongoing = 0;
if (schdlr_sspnd.online == 1) {
@@ -4701,35 +5290,19 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
}
else {
int online = schdlr_sspnd.online;
- erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
if (plocks) {
have_unlocked_plocks = 1;
erts_smp_proc_unlock(p, plocks);
}
- erts_smp_mtx_lock(&balance_info.update_mtx);
+
+ change_no_used_runqs(online);
/* Resume all online run queues */
- for (ix = 1; ix < online; ix++) {
- ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- erts_smp_runq_lock(rq);
- ERTS_RUNQ_RESET_SUSPEND_INFO(rq, 0x4);
- erts_smp_runq_unlock(rq);
- scheduler_ix_resume_wake(ix);
- }
+ for (ix = 1; ix < online; ix++)
+ resume_run_queue(ERTS_RUNQ_IX(ix));
- /* Spread evacuation paths among all online run queues */
for (ix = online; ix < erts_no_run_queues; ix++)
- evacuate_run_queue(ERTS_RUNQ_IX(ix),
- ERTS_RUNQ_IX(ix % online));
-
- set_no_used_runqs(online);
- /* Make sure that we balance soon... */
- balance_info.forced_check_balance = 1;
- erts_smp_runq_lock(ERTS_RUNQ_IX(0));
- ERTS_RUNQ_IX(0)->check_balance_reds = 0;
- erts_smp_runq_unlock(ERTS_RUNQ_IX(0));
- erts_smp_mtx_unlock(&balance_info.update_mtx);
- erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+ suspend_run_queue(ERTS_RUNQ_IX(ix));
}
res = ERTS_SCHDLR_SSPND_DONE;
}
@@ -5036,10 +5609,7 @@ handle_pend_sync_suspend(Process *suspendee,
if (suspender) {
ASSERT(is_nil(suspender->suspendee));
if (suspendee_alive) {
- ErtsRunQueue *rq = erts_get_runq_proc(suspendee);
- erts_smp_runq_lock(rq);
- suspend_process(rq, suspendee);
- erts_smp_runq_unlock(rq);
+ erts_suspend(suspendee, suspendee_locks, NULL);
suspender->suspendee = suspendee->id;
}
/* suspender is suspended waiting for suspendee to suspend;
@@ -5082,10 +5652,9 @@ pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks,
resume_process(rp);
}
else {
- ErtsRunQueue *cp_rq, *rp_rq;
rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS,
- pid, ERTS_PROC_LOCK_STATUS);
+ pid, pid_locks|ERTS_PROC_LOCK_STATUS);
if (!rp) {
c_p->flags &= ~F_P2PNR_RESCHED;
@@ -5094,58 +5663,36 @@ pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks,
ASSERT(!(c_p->flags & F_P2PNR_RESCHED));
- cp_rq = erts_get_runq_proc(c_p);
- rp_rq = erts_get_runq_proc(rp);
- erts_smp_runqs_lock(cp_rq, rp_rq);
- if (rp->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) {
- running:
- /* Phiu... */
-
- /*
- * If we got pending suspenders and suspend ourselves waiting
- * to suspend another process we might deadlock.
- * In this case we have to yield, be suspended by
- * someone else and then do it all over again.
- */
- if (!c_p->pending_suspenders) {
- /* Mark rp pending for suspend by c_p */
- add_pend_suspend(rp, c_p->id, handle_pend_sync_suspend);
- ASSERT(is_nil(c_p->suspendee));
-
- /* Suspend c_p; when rp is suspended c_p will be resumed. */
- suspend_process(cp_rq, c_p);
- c_p->flags |= F_P2PNR_RESCHED;
- }
- /* Yield (caller is assumed to yield immediately in bif). */
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
- rp = ERTS_PROC_LOCK_BUSY;
+ if (suspend) {
+ if (suspend_process(c_p, rp))
+ goto done;
}
else {
- ErtsProcLocks need_locks = pid_locks & ~ERTS_PROC_LOCK_STATUS;
- if (need_locks && erts_smp_proc_trylock(rp, need_locks) == EBUSY) {
- erts_smp_runqs_unlock(cp_rq, rp_rq);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
- rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS,
- pid, pid_locks|ERTS_PROC_LOCK_STATUS);
- if (!rp)
- goto done;
- /* run-queues may have changed */
- cp_rq = erts_get_runq_proc(c_p);
- rp_rq = erts_get_runq_proc(rp);
- erts_smp_runqs_lock(cp_rq, rp_rq);
- if (rp->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) {
- /* Ahh... */
- erts_smp_proc_unlock(rp,
- pid_locks & ~ERTS_PROC_LOCK_STATUS);
- goto running;
- }
- }
+ if (!(ERTS_PSFLG_RUNNING & erts_smp_atomic32_read_acqb(&rp->state)))
+ goto done;
+
+ }
+
+ /* Other process running */
- /* rp is not running and we got the locks we want... */
- if (suspend)
- suspend_process(rp_rq, rp);
+ /*
+ * If we got pending suspenders and suspend ourselves waiting
+ * to suspend another process we might deadlock.
+ * In this case we have to yield, be suspended by
+ * someone else and then do it all over again.
+ */
+ if (!c_p->pending_suspenders) {
+ /* Mark rp pending for suspend by c_p */
+ add_pend_suspend(rp, c_p->id, handle_pend_sync_suspend);
+ ASSERT(is_nil(c_p->suspendee));
+
+ /* Suspend c_p; when rp is suspended c_p will be resumed. */
+ suspend_process(c_p, c_p);
+ c_p->flags |= F_P2PNR_RESCHED;
}
- erts_smp_runqs_unlock(cp_rq, rp_rq);
+ /* Yield (caller is assumed to yield immediately in bif). */
+ erts_smp_proc_unlock(rp, pid_locks|ERTS_PROC_LOCK_STATUS);
+ rp = ERTS_PROC_LOCK_BUSY;
}
done:
@@ -5202,36 +5749,26 @@ erts_pid2proc_nropt(Process *c_p, ErtsProcLocks c_p_locks,
return erts_pid2proc_not_running(c_p, c_p_locks, pid, pid_locks);
}
-static ERTS_INLINE void
-do_bif_suspend_process(ErtsSuspendMonitor *smon,
- Process *suspendee,
- ErtsRunQueue *locked_runq)
+static ERTS_INLINE int
+do_bif_suspend_process(Process *c_p,
+ ErtsSuspendMonitor *smon,
+ Process *suspendee)
{
ASSERT(suspendee);
- ASSERT(!suspendee->is_exiting);
+ ASSERT(!ERTS_PROC_IS_EXITING(suspendee));
ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS
& erts_proc_lc_my_proc_locks(suspendee));
if (smon) {
if (!smon->active) {
- ErtsRunQueue *rq;
-
- if (locked_runq)
- rq = locked_runq;
- else {
- rq = erts_get_runq_proc(suspendee);
- erts_smp_runq_lock(rq);
- }
-
- suspend_process(rq, suspendee);
-
- if (!locked_runq)
- erts_smp_runq_unlock(rq);
+ if (!suspend_process(c_p, suspendee))
+ return 0;
}
smon->active += smon->pending;
ASSERT(smon->active);
smon->pending = 0;
+ return 1;
}
-
+ return 0;
}
static void
@@ -5254,10 +5791,17 @@ handle_pend_bif_sync_suspend(Process *suspendee,
erts_delete_suspend_monitor(&suspender->suspend_monitors,
suspendee->id);
else {
+#ifdef DEBUG
+ int res;
+#endif
ErtsSuspendMonitor *smon;
smon = erts_lookup_suspend_monitor(suspender->suspend_monitors,
suspendee->id);
- do_bif_suspend_process(smon, suspendee, NULL);
+#ifdef DEBUG
+ res =
+#endif
+ do_bif_suspend_process(suspendee, smon, suspendee);
+ ASSERT(!smon || res != 0);
suspender->suspendee = suspendee->id;
}
/* suspender is suspended waiting for suspendee to suspend;
@@ -5289,10 +5833,17 @@ handle_pend_bif_async_suspend(Process *suspendee,
erts_delete_suspend_monitor(&suspender->suspend_monitors,
suspendee->id);
else {
+#ifdef DEBUG
+ int res;
+#endif
ErtsSuspendMonitor *smon;
smon = erts_lookup_suspend_monitor(suspender->suspend_monitors,
suspendee->id);
- do_bif_suspend_process(smon, suspendee, NULL);
+#ifdef DEBUG
+ res =
+#endif
+ do_bif_suspend_process(suspendee, smon, suspendee);
+ ASSERT(!smon || res != 0);
}
erts_smp_proc_unlock(suspender, ERTS_PROC_LOCK_LINK);
}
@@ -5377,7 +5928,8 @@ suspend_process_2(BIF_ALIST_2)
/* This is really a piece of cake without SMP support... */
if (!smon->active) {
- suspend_process(ERTS_RUNQ_IX(0), suspendee);
+ erts_smp_atomic32_read_bor_nob(&suspendee->state, ERTS_PSFLG_SUSPENDED);
+ suspend_process(BIF_P, suspendee);
smon->active++;
res = am_true;
}
@@ -5420,21 +5972,15 @@ suspend_process_2(BIF_ALIST_2)
if (smon->pending && unless_suspending)
res = am_false;
else {
- ErtsRunQueue *rq;
if (smon->pending == INT_MAX)
goto system_limit;
smon->pending++;
- rq = erts_get_runq_proc(suspendee);
- erts_smp_runq_lock(rq);
- if (suspendee->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING)
+ if (!do_bif_suspend_process(BIF_P, smon, suspendee))
add_pend_suspend(suspendee,
BIF_P->id,
handle_pend_bif_async_suspend);
- else
- do_bif_suspend_process(smon, suspendee, rq);
- erts_smp_runq_unlock(rq);
res = am_true;
}
@@ -5471,7 +6017,6 @@ suspend_process_2(BIF_ALIST_2)
/* done */
}
else {
- ErtsRunQueue *cp_rq, *s_rq;
/* We haven't got any active suspends on the suspendee */
/*
@@ -5488,12 +6033,7 @@ suspend_process_2(BIF_ALIST_2)
if (!unless_suspending || smon->pending == 0)
smon->pending++;
- cp_rq = erts_get_runq_proc(BIF_P);
- s_rq = erts_get_runq_proc(suspendee);
- erts_smp_runqs_lock(cp_rq, s_rq);
- if (!(suspendee->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING)) {
- do_bif_suspend_process(smon, suspendee, s_rq);
- erts_smp_runqs_unlock(cp_rq, s_rq);
+ if (do_bif_suspend_process(BIF_P, smon, suspendee)) {
res = (!unless_suspending || smon->active == 1
? am_true
: am_false);
@@ -5513,8 +6053,7 @@ suspend_process_2(BIF_ALIST_2)
* This time with BIF_P->suspendee == BIF_ARG_1 (see
* above).
*/
- suspend_process(cp_rq, BIF_P);
- erts_smp_runqs_unlock(cp_rq, s_rq);
+ suspend_process(BIF_P, BIF_P);
goto yield;
}
}
@@ -5522,9 +6061,15 @@ suspend_process_2(BIF_ALIST_2)
}
#endif /* ERTS_SMP */
-
- ASSERT(suspendee->status == P_SUSPENDED || (asynchronous && smon->pending));
- ASSERT(suspendee->status == P_SUSPENDED || !smon->active);
+#ifdef DEBUG
+ {
+ erts_aint32_t state = erts_smp_atomic32_read_acqb(&suspendee->state);
+ ASSERT((state & ERTS_PSFLG_SUSPENDED)
+ || (asynchronous && smon->pending));
+ ASSERT((state & ERTS_PSFLG_SUSPENDED)
+ || !smon->active);
+ }
+#endif
erts_smp_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS);
erts_smp_proc_unlock(BIF_P, xlocks);
@@ -5619,9 +6164,8 @@ resume_process_1(BIF_ALIST_1)
if (!suspendee)
goto no_suspendee;
- ASSERT(suspendee->status == P_SUSPENDED
- || (suspendee->status == P_GARBING
- && suspendee->gcstatus == P_SUSPENDED));
+ ASSERT(ERTS_PSFLG_SUSPENDED
+ & erts_smp_atomic32_read_nob(&suspendee->state));
resume_process(suspendee);
erts_smp_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS);
@@ -5650,449 +6194,46 @@ erts_run_queues_len(Uint *qlen)
Uint len = 0;
ERTS_ATOMIC_FOREACH_RUNQ(rq,
{
- if (qlen)
- qlen[i++] = rq->procs.len;
- len += rq->procs.len;
+ Sint pqlen = 0;
+ int pix;
+ for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++)
+ pqlen += RUNQ_READ_LEN(&rq->procs.prio_info[pix].len);
+
+ if (pqlen < 0)
+ pqlen = 0;
+ if (qlen)
+ qlen[i++] = pqlen;
+ len += pqlen;
}
);
return len;
}
-#ifdef HARDDEBUG_RUNQS
-static void
-check_procs_runq(ErtsRunQueue *runq, Process *p_in_q, Process *p_not_in_q)
-{
- int len[ERTS_NO_PROC_PRIO_LEVELS] = {0};
- int tot_len;
- int prioq, prio;
- int found_p_in_q;
- Process *p, *prevp;
-
- found_p_in_q = 0;
- for (prioq = 0; prioq < ERTS_NO_PROC_PRIO_LEVELS - 1; prioq++) {
- prevp = NULL;
- for (p = runq->procs.prio[prioq].first; p; p = p->next) {
- ASSERT(p != p_not_in_q);
- if (p == p_in_q)
- found_p_in_q = 1;
- switch (p->prio) {
- case PRIORITY_MAX:
- case PRIORITY_HIGH:
- case PRIORITY_NORMAL:
- ASSERT(prioq == p->prio);
- break;
- case PRIORITY_LOW:
- ASSERT(prioq == PRIORITY_NORMAL);
- break;
- default:
- ASSERT(!"Bad prio on process");
- }
- len[p->prio]++;
- ASSERT(prevp == p->prev);
- if (p->prev) {
- ASSERT(p->prev->next == p);
- }
- else {
- ASSERT(runq->procs.prio[prioq].first == p);
- }
- if (p->next) {
- ASSERT(p->next->prev == p);
- }
- else {
- ASSERT(runq->procs.prio[prioq].last == p);
- }
- ASSERT(p->run_queue == runq);
- prevp = p;
- }
- }
-
- ASSERT(!p_in_q || found_p_in_q);
-
- tot_len = 0;
- for (prio = 0; prio < ERTS_NO_PROC_PRIO_LEVELS; prio++) {
- ASSERT(len[prio] == runq->procs.prio_info[prio].len);
- if (len[prio]) {
- ASSERT(runq->flags & (1 << prio));
- }
- else {
- ASSERT(!(runq->flags & (1 << prio)));
- }
- tot_len += len[prio];
- }
- ASSERT(runq->procs.len == tot_len);
-}
-# define ERTS_DBG_CHK_PROCS_RUNQ(RQ) check_procs_runq((RQ), NULL, NULL)
-# define ERTS_DBG_CHK_PROCS_RUNQ_PROC(RQ, P) check_procs_runq((RQ), (P), NULL)
-# define ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(RQ, P) check_procs_runq((RQ), NULL, (P))
-#else
-# define ERTS_DBG_CHK_PROCS_RUNQ(RQ)
-# define ERTS_DBG_CHK_PROCS_RUNQ_PROC(RQ, P)
-# define ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(RQ, P)
-#endif
-
-
-static ERTS_INLINE void
-enqueue_process(ErtsRunQueue *runq, Process *p)
-{
- ErtsRunPrioQueue *rpq;
- ErtsRunQueueInfo *rqi;
-
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
-
- ASSERT(p->bound_runq || !(runq->flags & ERTS_RUNQ_FLG_SUSPENDED));
-
- rqi = &runq->procs.prio_info[p->prio];
- rqi->len++;
- if (rqi->max_len < rqi->len)
- rqi->max_len = rqi->len;
-
- runq->procs.len++;
- runq->len++;
- if (runq->max_len < runq->len)
- runq->max_len = runq->len;
-
- runq->flags |= (1 << p->prio);
-
- rpq = (p->prio == PRIORITY_LOW
- ? &runq->procs.prio[PRIORITY_NORMAL]
- : &runq->procs.prio[p->prio]);
-
- p->next = NULL;
- p->prev = rpq->last;
- if (rpq->last)
- rpq->last->next = p;
- else
- rpq->first = p;
- rpq->last = p;
-
- switch (p->status) {
- case P_EXITING:
- break;
- case P_GARBING:
- p->gcstatus = P_RUNABLE;
- break;
- default:
- p->status = P_RUNABLE;
- break;
- }
-
-#ifdef ERTS_SMP
- p->status_flags |= ERTS_PROC_SFLG_INRUNQ;
-#endif
-
- ERTS_DBG_CHK_PROCS_RUNQ_PROC(runq, p);
-}
-
-
-static ERTS_INLINE int
-dequeue_process(ErtsRunQueue *runq, Process *p)
-{
- ErtsRunPrioQueue *rpq;
- int res = 1;
-
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
-
- ERTS_DBG_CHK_PROCS_RUNQ(runq);
-
- rpq = &runq->procs.prio[p->prio == PRIORITY_LOW ? PRIORITY_NORMAL : p->prio];
- if (p->prev) {
- p->prev->next = p->next;
- }
- else if (rpq->first == p) {
- rpq->first = p->next;
- }
- else {
- res = 0;
- }
- if (p->next) {
- p->next->prev = p->prev;
- }
- else if (rpq->last == p) {
- rpq->last = p->prev;
- }
- else {
- ASSERT(res == 0);
- }
-
- if (res) {
-
- if (--runq->procs.prio_info[p->prio].len == 0)
- runq->flags &= ~(1 << p->prio);
- runq->procs.len--;
- runq->len--;
-
-#ifdef ERTS_SMP
- p->status_flags &= ~ERTS_PROC_SFLG_INRUNQ;
-#endif
- }
-
- ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(runq, p);
- return res;
-}
-
-/* schedule a process */
-static ERTS_INLINE ErtsRunQueue *
-internal_add_to_runq(ErtsRunQueue *runq, Process *p)
-{
- Uint32 prev_status = p->status;
- ErtsRunQueue *add_runq;
-#ifdef ERTS_SMP
-
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
-
- if (p->status_flags & ERTS_PROC_SFLG_INRUNQ)
- return NULL;
- else if (p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) {
- ASSERT(ERTS_PROC_IS_EXITING(p) || p->rcount == 0);
- ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(runq, p);
- p->status_flags |= ERTS_PROC_SFLG_PENDADD2SCHEDQ;
- return NULL;
- }
- ASSERT(!p->scheduler_data);
-#endif
-
- ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(runq, p);
-#ifndef ERTS_SMP
- /* Never schedule a suspended process (ok in smp case) */
- ASSERT(ERTS_PROC_IS_EXITING(p) || p->rcount == 0);
- add_runq = runq;
-#else
- ASSERT(!p->bound_runq || p->bound_runq == p->run_queue);
- if (p->bound_runq) {
- if (p->bound_runq == runq)
- add_runq = runq;
- else {
- add_runq = p->bound_runq;
- erts_smp_xrunq_lock(runq, add_runq);
- }
- }
- else {
- add_runq = erts_check_emigration_need(runq, p->prio);
- if (!add_runq)
- add_runq = runq;
- else /* Process emigrated */
- p->run_queue = add_runq;
- }
-#endif
-
- /* Enqueue the process */
- enqueue_process(add_runq, p);
-
- if ((erts_system_profile_flags.runnable_procs)
- && (prev_status == P_WAITING
- || prev_status == P_SUSPENDED)) {
- profile_runnable_proc(p, am_active);
- }
-
- if (add_runq != runq)
- erts_smp_runq_unlock(add_runq);
-
- return add_runq;
-}
-
-
-void
-erts_add_to_runq(Process *p)
-{
- ErtsRunQueue *notify_runq;
- ErtsRunQueue *runq = erts_get_runq_proc(p);
- erts_smp_runq_lock(runq);
- notify_runq = internal_add_to_runq(runq, p);
- erts_smp_runq_unlock(runq);
- smp_notify_inc_runq(notify_runq);
-
-}
-
-/* Possibly remove a scheduled process we need to suspend */
-
-static int
-remove_proc_from_runq(ErtsRunQueue *rq, Process *p, int to_inactive)
-{
- int res;
-
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
-
-#ifdef ERTS_SMP
- if (p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ) {
- p->status_flags &= ~ERTS_PROC_SFLG_PENDADD2SCHEDQ;
- ASSERT(!remove_proc_from_runq(rq, p, 0));
- return 1;
- }
-#endif
-
- res = dequeue_process(rq, p);
-
- if (res && erts_system_profile_flags.runnable_procs && to_inactive)
- profile_runnable_proc(p, am_inactive);
-
-#ifdef ERTS_SMP
- ASSERT(!(p->status_flags & ERTS_PROC_SFLG_INRUNQ));
-#endif
-
- return res;
-}
-
-#ifdef ERTS_SMP
-
-ErtsMigrateResult
-erts_proc_migrate(Process *p, ErtsProcLocks *plcks,
- ErtsRunQueue *from_rq, int *from_locked,
- ErtsRunQueue *to_rq, int *to_locked)
-{
- ERTS_SMP_LC_ASSERT(*plcks == erts_proc_lc_my_proc_locks(p));
- ERTS_SMP_LC_ASSERT((ERTS_PROC_LOCK_STATUS & *plcks)
- || from_locked);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked);
-
- /*
- * If we have the lock on the run queue to migrate to,
- * check that it isn't suspended. If it is suspended,
- * we will refuse to migrate to it anyway.
- */
- if (*to_locked && (to_rq->flags & ERTS_RUNQ_FLG_SUSPENDED))
- return ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED;
-
- /* We need status lock on process and locks on both run queues */
-
- if (!(ERTS_PROC_LOCK_STATUS & *plcks)) {
- if (erts_smp_proc_trylock(p, ERTS_PROC_LOCK_STATUS) == EBUSY) {
- ErtsProcLocks lcks = *plcks;
- Eterm pid = p->id;
- Process *proc = *plcks ? p : NULL;
-
- if (*from_locked) {
- *from_locked = 0;
- erts_smp_runq_unlock(from_rq);
- }
- if (*to_locked) {
- *to_locked = 0;
- erts_smp_runq_unlock(to_rq);
- }
-
- proc = erts_pid2proc_opt(proc,
- lcks,
- pid,
- lcks|ERTS_PROC_LOCK_STATUS,
- ERTS_P2P_FLG_ALLOW_OTHER_X);
- if (!proc) {
- *plcks = 0;
- return ERTS_MIGRATE_FAILED_NOT_IN_RUNQ;
- }
- ASSERT(proc == p);
- }
- *plcks |= ERTS_PROC_LOCK_STATUS;
- }
-
- ASSERT(!p->bound_runq);
-
- ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked);
-
- if (p->run_queue != from_rq)
- return ERTS_MIGRATE_FAILED_RUNQ_CHANGED;
-
- if (!*from_locked || !*to_locked) {
- if (from_rq < to_rq) {
- if (!*to_locked) {
- if (!*from_locked)
- erts_smp_runq_lock(from_rq);
- erts_smp_runq_lock(to_rq);
- }
- else if (erts_smp_runq_trylock(from_rq) == EBUSY) {
- erts_smp_runq_unlock(to_rq);
- erts_smp_runq_lock(from_rq);
- erts_smp_runq_lock(to_rq);
- }
- }
- else {
- if (!*from_locked) {
- if (!*to_locked)
- erts_smp_runq_lock(to_rq);
- erts_smp_runq_lock(from_rq);
- }
- else if (erts_smp_runq_trylock(to_rq) == EBUSY) {
- erts_smp_runq_unlock(from_rq);
- erts_smp_runq_lock(to_rq);
- erts_smp_runq_lock(from_rq);
- }
- }
- *to_locked = *from_locked = 1;
- }
-
- ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked);
-
- /* Ok we now got all locks we need; do it... */
-
- /* Refuse to migrate to a suspended run queue */
- if (to_rq->flags & ERTS_RUNQ_FLG_SUSPENDED)
- return ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED;
-
- if ((p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING)
- || !(p->status_flags & ERTS_PROC_SFLG_INRUNQ))
- return ERTS_MIGRATE_FAILED_NOT_IN_RUNQ;
-
- dequeue_process(from_rq, p);
- p->run_queue = to_rq;
- enqueue_process(to_rq, p);
-
- return ERTS_MIGRATE_SUCCESS;
-}
-#endif /* ERTS_SMP */
-
Eterm
erts_process_status(Process *c_p, ErtsProcLocks c_p_locks,
Process *rp, Eterm rpid)
{
Eterm res = am_undefined;
- Process *p;
-
- if (rp) {
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS
- & erts_proc_lc_my_proc_locks(rp));
- p = rp;
- }
- else {
- p = erts_pid2proc_opt(c_p, c_p_locks,
- rpid, ERTS_PROC_LOCK_STATUS,
- ERTS_P2P_FLG_ALLOW_OTHER_X);
- }
+ Process *p = rp ? rp : erts_proc_lookup_raw(rpid);
if (p) {
- switch (p->status) {
- case P_RUNABLE:
- res = am_runnable;
- break;
- case P_WAITING:
- res = am_waiting;
- break;
- case P_RUNNING:
- res = am_running;
- break;
- case P_EXITING:
+ erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state);
+ if (state & ERTS_PSFLG_FREE)
+ res = am_free;
+ else if (state & ERTS_PSFLG_EXITING)
res = am_exiting;
- break;
- case P_GARBING:
+ else if (state & ERTS_PSFLG_GC)
res = am_garbage_collecting;
- break;
- case P_SUSPENDED:
+ else if (state & ERTS_PSFLG_SUSPENDED)
res = am_suspended;
- break;
- case P_FREE: /* We cannot look up a process in P_FREE... */
- default: /* Not a valid status... */
- erl_exit(1, "Bad status (%b32u) found for process %T\n",
- p->status, p->id);
- break;
- }
-
-#ifdef ERTS_SMP
- if (!rp && (p != c_p || !(ERTS_PROC_LOCK_STATUS & c_p_locks)))
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
+ else if (state & ERTS_PSFLG_RUNNING)
+ res = am_running;
+ else if (state & ERTS_PSFLG_ACTIVE)
+ res = am_runnable;
+ else
+ res = am_waiting;
}
+#ifdef ERTS_SMP
else {
int i;
ErtsSchedulerData *esdp;
@@ -6107,42 +6248,40 @@ erts_process_status(Process *c_p, ErtsProcLocks c_p_locks,
}
erts_smp_runq_unlock(esdp->run_queue);
}
-
-#endif
-
}
-
+#endif
return res;
}
/*
-** Suspend a process
+** Suspend a currently executing process
** If we are to suspend on a port the busy_port is the thing
** otherwise busy_port is NIL
*/
void
-erts_suspend(Process* process, ErtsProcLocks process_locks, Port *busy_port)
+erts_suspend(Process* c_p, ErtsProcLocks c_p_locks, Port *busy_port)
{
- ErtsRunQueue *rq;
-
- ERTS_SMP_LC_ASSERT(process_locks == erts_proc_lc_my_proc_locks(process));
- if (!(process_locks & ERTS_PROC_LOCK_STATUS))
- erts_smp_proc_lock(process, ERTS_PROC_LOCK_STATUS);
-
- rq = erts_get_runq_proc(process);
-
- erts_smp_runq_lock(rq);
+#ifdef DEBUG
+ int res;
+#endif
+ ASSERT(c_p == erts_get_current_process());
+ ERTS_SMP_LC_ASSERT(c_p_locks == erts_proc_lc_my_proc_locks(c_p));
+ if (!(c_p_locks & ERTS_PROC_LOCK_STATUS))
+ erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS);
- suspend_process(rq, process);
+#ifdef DEBUG
+ res =
+#endif
+ suspend_process(c_p, c_p);
- erts_smp_runq_unlock(rq);
+ ASSERT(res);
if (busy_port)
- erts_wake_process_later(busy_port, process);
+ erts_wake_process_later(busy_port, c_p);
- if (!(process_locks & ERTS_PROC_LOCK_STATUS))
- erts_smp_proc_unlock(process, ERTS_PROC_LOCK_STATUS);
+ if (!(c_p_locks & ERTS_PROC_LOCK_STATUS))
+ erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS);
}
@@ -6183,57 +6322,54 @@ erts_resume_processes(ErtsProcList *plp)
Eterm
erts_get_process_priority(Process *p)
{
- ErtsRunQueue *rq;
- Eterm value;
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
- rq = erts_get_runq_proc(p);
- erts_smp_runq_lock(rq);
- switch(p->prio) {
- case PRIORITY_MAX: value = am_max; break;
- case PRIORITY_HIGH: value = am_high; break;
- case PRIORITY_NORMAL: value = am_normal; break;
- case PRIORITY_LOW: value = am_low; break;
- default: ASSERT(0); value = am_undefined; break;
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state);
+ switch (state & ERTS_PSFLG_PRIO_MASK) {
+ case PRIORITY_MAX: return am_max;
+ case PRIORITY_HIGH: return am_high;
+ case PRIORITY_NORMAL: return am_normal;
+ case PRIORITY_LOW: return am_low;
+ default: ASSERT(0); return am_undefined;
}
- erts_smp_runq_unlock(rq);
- return value;
}
Eterm
-erts_set_process_priority(Process *p, Eterm new_value)
+erts_set_process_priority(Process *p, Eterm value)
{
- ErtsRunQueue *rq;
- Eterm old_value;
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
- rq = erts_get_runq_proc(p);
-#ifdef ERTS_SMP
- ASSERT(!(p->status_flags & ERTS_PROC_SFLG_INRUNQ));
-#endif
- erts_smp_runq_lock(rq);
- switch(p->prio) {
- case PRIORITY_MAX: old_value = am_max; break;
- case PRIORITY_HIGH: old_value = am_high; break;
- case PRIORITY_NORMAL: old_value = am_normal; break;
- case PRIORITY_LOW: old_value = am_low; break;
- default: ASSERT(0); old_value = am_undefined; break;
- }
- switch (new_value) {
- case am_max: p->prio = PRIORITY_MAX; break;
- case am_high: p->prio = PRIORITY_HIGH; break;
- case am_normal: p->prio = PRIORITY_NORMAL; break;
- case am_low: p->prio = PRIORITY_LOW; break;
- default: old_value = THE_NON_VALUE; break;
+ erts_aint32_t a, oprio, nprio;
+
+ switch (value) {
+ case am_max: nprio = (erts_aint32_t) PRIORITY_MAX; break;
+ case am_high: nprio = (erts_aint32_t) PRIORITY_HIGH; break;
+ case am_normal: nprio = (erts_aint32_t) PRIORITY_NORMAL; break;
+ case am_low: nprio = (erts_aint32_t) PRIORITY_LOW; break;
+ default: return THE_NON_VALUE; break;
}
- erts_smp_runq_unlock(rq);
- return old_value;
-}
-/* note that P_RUNNING is only set so that we don't try to remove
-** running processes from the schedule queue if they exit - a running
-** process not being in the schedule queue!!
-** Schedule for up to INPUT_REDUCTIONS context switches,
-** return 1 if more to do.
-*/
+ a = erts_smp_atomic32_read_nob(&p->state);
+ if (nprio == (a & ERTS_PSFLG_PRIO_MASK))
+ oprio = nprio;
+ else {
+ erts_aint32_t e, n;
+ do {
+ oprio = a & ERTS_PSFLG_PRIO_MASK;
+ n = e = a;
+
+ ASSERT(!(a & ERTS_PSFLG_IN_RUNQ));
+
+ n &= ~ERTS_PSFLG_PRIO_MASK;
+ n |= nprio;
+ a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e);
+ } while (a != e);
+ }
+
+ switch (oprio) {
+ case PRIORITY_MAX: return am_max;
+ case PRIORITY_HIGH: return am_high;
+ case PRIORITY_NORMAL: return am_normal;
+ case PRIORITY_LOW: return am_low;
+ default: ASSERT(0); return am_undefined;
+ }
+}
/*
* schedule() is called from BEAM (process_main()) or HiPE
@@ -6256,7 +6392,6 @@ erts_set_process_priority(Process *p, Eterm new_value)
Process *schedule(Process *p, int calls)
{
ErtsRunQueue *rq;
- ErtsRunPrioQueue *rpq;
erts_aint_t dt;
ErtsSchedulerData *esdp;
int context_reds;
@@ -6264,6 +6399,8 @@ Process *schedule(Process *p, int calls)
int input_reductions;
int actual_reds;
int reds;
+ Uint32 flags;
+ erts_aint32_t state = 0; /* Supress warning... */
#ifdef USE_VM_PROBES
if (p != NULL && DTRACE_ENABLED(process_unscheduled)) {
@@ -6319,92 +6456,60 @@ Process *schedule(Process *p, int calls)
erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS);
- if ((erts_system_profile_flags.runnable_procs)
- && (p->status == P_WAITING)) {
- profile_runnable_proc(p, am_inactive);
- }
+ state = erts_smp_atomic32_read_acqb(&p->state);
if (IS_TRACED(p)) {
- if (IS_TRACED_FL(p, F_TRACE_CALLS) && p->status != P_FREE) {
+ if (IS_TRACED_FL(p, F_TRACE_CALLS) && !(state & ERTS_PSFLG_FREE)) {
erts_schedule_time_break(p, ERTS_BP_CALL_TIME_SCHEDULE_OUT);
}
- switch (p->status) {
- case P_EXITING:
- if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_EXIT))
- trace_sched(p, am_out_exiting);
- break;
- case P_FREE:
+ if (state & (ERTS_PSFLG_FREE|ERTS_PSFLG_EXITING)) {
if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_EXIT))
- trace_sched(p, am_out_exited);
- break;
- default:
+ trace_sched(p, ((state & ERTS_PSFLG_FREE)
+ ? am_out_exited
+ : am_out_exiting));
+ }
+ else {
if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED))
trace_sched(p, am_out);
else if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_PROCS))
trace_virtual_sched(p, am_out);
- break;
}
}
#ifdef ERTS_SMP
- if (ERTS_PROC_PENDING_EXIT(p)) {
- erts_handle_pending_exit(p,
- ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
- p->status_flags |= ERTS_PROC_SFLG_PENDADD2SCHEDQ;
- }
-
- if (p->pending_suspenders) {
- handle_pending_suspend(p,
- ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
- ASSERT(!(p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ)
- || p->rcount == 0);
- }
+ if (state & ERTS_PSFLG_PENDING_EXIT)
+ erts_handle_pending_exit(p, (ERTS_PROC_LOCK_MAIN
+ | ERTS_PROC_LOCK_STATUS));
+ if (p->pending_suspenders)
+ handle_pending_suspend(p, (ERTS_PROC_LOCK_MAIN
+ | ERTS_PROC_LOCK_STATUS));
#endif
- erts_smp_runq_lock(rq);
- ERTS_PROC_REDUCTIONS_EXECUTED(rq, p->prio, reds, actual_reds);
+ schedule_out_process(rq, state, p); /* Returns with rq locked! */
+
+ ERTS_PROC_REDUCTIONS_EXECUTED(rq,
+ (int) (state & ERTS_PSFLG_PRIO_MASK),
+ reds,
+ actual_reds);
esdp->current_process = NULL;
#ifdef ERTS_SMP
p->scheduler_data = NULL;
- p->runq_flags &= ~ERTS_PROC_RUNQ_FLG_RUNNING;
- p->status_flags &= ~ERTS_PROC_SFLG_RUNNING;
-
- if (p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ) {
- ErtsRunQueue *notify_runq;
- p->status_flags &= ~ERTS_PROC_SFLG_PENDADD2SCHEDQ;
- notify_runq = internal_add_to_runq(rq, p);
- if (notify_runq != rq)
- smp_notify_inc_runq(notify_runq);
- }
#endif
- if (p->status == P_FREE) {
+ if (state & ERTS_PSFLG_FREE) {
#ifdef ERTS_SMP
ASSERT(esdp->free_process == p);
esdp->free_process = NULL;
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
- erts_smp_proc_dec_refc(p);
#else
erts_free_proc(p);
#endif
- } else {
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
}
-#ifdef ERTS_SMP
- {
- ErtsProcList *pnd_xtrs = rq->procs.pending_exiters;
- rq->procs.pending_exiters = NULL;
+ erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
- if (pnd_xtrs) {
- erts_smp_runq_unlock(rq);
- handle_pending_exiters(pnd_xtrs);
- erts_smp_runq_lock(rq);
- }
-
- }
+#ifdef ERTS_SMP
ASSERT(!esdp->free_process);
#endif
ASSERT(!esdp->current_process);
@@ -6424,8 +6529,23 @@ Process *schedule(Process *p, int calls)
ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
check_activities_to_run: {
+#ifdef ERTS_SMP
+ ErtsMigrationPaths *mps;
+ ErtsMigrationPath *mp;
#ifdef ERTS_SMP
+ {
+ ErtsProcList *pnd_xtrs = rq->procs.pending_exiters;
+ rq->procs.pending_exiters = NULL;
+
+ if (pnd_xtrs) {
+ erts_smp_runq_unlock(rq);
+ handle_pending_exiters(pnd_xtrs);
+ erts_smp_runq_lock(rq);
+ }
+
+ }
+#endif
if (rq->check_balance_reds <= 0)
check_balance(rq);
@@ -6433,20 +6553,28 @@ Process *schedule(Process *p, int calls)
ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
- if (rq->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK)
- immigrate(rq);
+ mps = erts_get_migration_paths_managed();
+ mp = &mps->mpath[rq->ix];
- continue_check_activities_to_run:
+ if (mp->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK)
+ immigrate(rq, mp);
- if (rq->flags & (ERTS_RUNQ_FLG_CHK_CPU_BIND
- | ERTS_RUNQ_FLG_SUSPENDED)) {
- if (rq->flags & ERTS_RUNQ_FLG_SUSPENDED) {
- ASSERT(erts_smp_atomic32_read_nob(&esdp->ssi->flags)
- & ERTS_SSI_FLG_SUSPENDED);
+ continue_check_activities_to_run:
+ flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
+ continue_check_activities_to_run_known_flags:
+
+
+ if (flags & (ERTS_RUNQ_FLG_CHK_CPU_BIND|ERTS_RUNQ_FLG_SUSPENDED)) {
+
+ if (flags & ERTS_RUNQ_FLG_SUSPENDED) {
suspend_scheduler(esdp);
+ flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
}
- if (rq->flags & ERTS_RUNQ_FLG_CHK_CPU_BIND)
+ if (flags & ERTS_RUNQ_FLG_CHK_CPU_BIND) {
+ flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_CHK_CPU_BIND);
+ flags &= ~ ERTS_RUNQ_FLG_CHK_CPU_BIND;
erts_sched_check_cpu_bind(esdp);
+ }
}
{
@@ -6475,14 +6603,12 @@ Process *schedule(Process *p, int calls)
}
#endif /* ERTS_SMP */
- ASSERT(rq->len == rq->procs.len + rq->ports.info.len);
-
- if ((rq->len == 0 && !rq->misc.start)
- || (rq->halt_in_progress
- && rq->ports.info.len == 0 && !rq->misc.start)) {
+ flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
+ if ((!(flags & ERTS_RUNQ_FLGS_QMASK) && !rq->misc.start)
+ || (rq->halt_in_progress && ERTS_EMPTY_RUNQ_PORTS(rq))) {
+ /* Prepare for scheduler wait */
#ifdef ERTS_SMP
-
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
rq->wakeup_other = 0;
@@ -6490,21 +6616,27 @@ Process *schedule(Process *p, int calls)
empty_runq(rq);
- if (rq->flags & ERTS_RUNQ_FLG_SUSPENDED) {
- ASSERT(erts_smp_atomic32_read_nob(&esdp->ssi->flags)
- & ERTS_SSI_FLG_SUSPENDED);
+ flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
+ if (flags & ERTS_RUNQ_FLG_SUSPENDED) {
non_empty_runq(rq);
- goto continue_check_activities_to_run;
+ goto continue_check_activities_to_run_known_flags;
}
- else if (!(rq->flags & ERTS_RUNQ_FLG_INACTIVE)) {
+ else if (!(flags & ERTS_RUNQ_FLG_INACTIVE)) {
+ if (try_steal_task(rq)) {
+ non_empty_runq(rq);
+ goto continue_check_activities_to_run;
+ }
+
+ ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
+
/*
* Check for ERTS_RUNQ_FLG_SUSPENDED has to be done
* after trying to steal a task.
*/
- if (try_steal_task(rq)
- || (rq->flags & ERTS_RUNQ_FLG_SUSPENDED)) {
+ flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
+ if (flags & ERTS_RUNQ_FLG_SUSPENDED) {
non_empty_runq(rq);
- goto continue_check_activities_to_run;
+ goto continue_check_activities_to_run_known_flags;
}
}
@@ -6535,6 +6667,7 @@ Process *schedule(Process *p, int calls)
erl_sys_schedule(1);
dt = erts_do_time_read_and_reset();
if (dt) erts_bump_timer(dt);
+
#ifdef ERTS_SMP
erts_smp_runq_lock(rq);
clear_sys_scheduling();
@@ -6551,14 +6684,17 @@ Process *schedule(Process *p, int calls)
{
int wo_reds = rq->wakeup_other_reds;
if (wo_reds) {
- if (rq->len < 2) {
+ erts_aint32_t len = rq->len;
+ if (len < 2) {
rq->wakeup_other -= ERTS_WAKEUP_OTHER_DEC*wo_reds;
if (rq->wakeup_other < 0)
rq->wakeup_other = 0;
}
else if (rq->wakeup_other < wakeup_other_limit)
- rq->wakeup_other += rq->len*wo_reds + ERTS_WAKEUP_OTHER_FIXED_INC;
+ rq->wakeup_other += len*wo_reds + ERTS_WAKEUP_OTHER_FIXED_INC;
else {
+ if (flags & ERTS_RUNQ_FLG_PROTECTED)
+ ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
if (erts_smp_atomic32_read_acqb(&no_empty_run_queues) != 0) {
wake_scheduler_on_empty_runq(rq);
rq->wakeup_other = 0;
@@ -6574,7 +6710,7 @@ Process *schedule(Process *p, int calls)
* Find a new port to run.
*/
- if (rq->ports.info.len) {
+ if (RUNQ_READ_LEN(&rq->ports.info.len)) {
int have_outstanding_io;
have_outstanding_io = erts_port_task_execute(rq, &esdp->current_port);
if ((have_outstanding_io && fcalls > 2*input_reductions)
@@ -6601,160 +6737,123 @@ Process *schedule(Process *p, int calls)
/*
* Find a new process to run.
*/
- pick_next_process:
-
- ERTS_DBG_CHK_PROCS_RUNQ(rq);
-
- switch (rq->flags & ERTS_RUNQ_FLGS_PROCS_QMASK) {
- case MAX_BIT:
- case MAX_BIT|HIGH_BIT:
- case MAX_BIT|NORMAL_BIT:
- case MAX_BIT|LOW_BIT:
- case MAX_BIT|HIGH_BIT|NORMAL_BIT:
- case MAX_BIT|HIGH_BIT|LOW_BIT:
- case MAX_BIT|NORMAL_BIT|LOW_BIT:
- case MAX_BIT|HIGH_BIT|NORMAL_BIT|LOW_BIT:
- rpq = &rq->procs.prio[PRIORITY_MAX];
- break;
- case HIGH_BIT:
- case HIGH_BIT|NORMAL_BIT:
- case HIGH_BIT|LOW_BIT:
- case HIGH_BIT|NORMAL_BIT|LOW_BIT:
- rpq = &rq->procs.prio[PRIORITY_HIGH];
- break;
- case NORMAL_BIT:
- rpq = &rq->procs.prio[PRIORITY_NORMAL];
- break;
- case LOW_BIT:
- rpq = &rq->procs.prio[PRIORITY_NORMAL];
- break;
- case NORMAL_BIT|LOW_BIT:
- rpq = &rq->procs.prio[PRIORITY_NORMAL];
- ASSERT(rpq->first != NULL);
- p = rpq->first;
- if (p->prio == PRIORITY_LOW) {
- if (p == rpq->last || p->skipped >= RESCHEDULE_LOW-1)
- p->skipped = 0;
- else {
- /* skip it */
- p->skipped++;
- rpq->first = p->next;
- rpq->first->prev = NULL;
- rpq->last->next = p;
- p->prev = rpq->last;
- p->next = NULL;
- rpq->last = p;
+ pick_next_process: {
+ int prio_q;
+ int qmask;
+
+ flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
+ qmask = (int) (flags & ERTS_RUNQ_FLGS_PROCS_QMASK);
+ switch (qmask & -qmask) {
+ case MAX_BIT:
+ prio_q = PRIORITY_MAX;
+ break;
+ case HIGH_BIT:
+ prio_q = PRIORITY_HIGH;
+ break;
+ case NORMAL_BIT:
+ case LOW_BIT:
+ prio_q = PRIORITY_NORMAL;
+ if (check_requeue_process(rq, PRIORITY_NORMAL))
goto pick_next_process;
- }
+ break;
+ case 0: /* No process at all */
+ default:
+ ASSERT(qmask == 0);
+ goto check_activities_to_run;
}
- break;
- case 0: /* No process at all */
- default:
- ASSERT((rq->flags & ERTS_RUNQ_FLGS_PROCS_QMASK) == 0);
- ASSERT(rq->procs.len == 0);
- goto check_activities_to_run;
- }
-
- BM_START_TIMER(system);
-
- /*
- * Take the chosen process out of the queue.
- */
- ASSERT(rpq->first); /* Wrong qmask in rq->flags? */
- p = rpq->first;
-#ifdef ERTS_SMP
- ERTS_SMP_LC_ASSERT(rq == p->run_queue);
-#endif
- rpq->first = p->next;
- if (!rpq->first)
- rpq->last = NULL;
- else
- rpq->first->prev = NULL;
- p->next = p->prev = NULL;
+ BM_START_TIMER(system);
- if (--rq->procs.prio_info[p->prio].len == 0)
- rq->flags &= ~(1 << p->prio);
- ASSERT(rq->procs.len > 0);
- rq->procs.len--;
- ASSERT(rq->len > 0);
- rq->len--;
+ /*
+ * Take the chosen process out of the queue.
+ */
+ p = dequeue_process(rq, prio_q, &state);
- {
- Uint32 ee_flgs = (ERTS_RUNQ_FLG_EVACUATE(p->prio)
- | ERTS_RUNQ_FLG_EMIGRATE(p->prio));
+ ASSERT(p); /* Wrong qmask in rq->flags? */
- if ((rq->flags & (ERTS_RUNQ_FLG_SUSPENDED|ee_flgs)) == ee_flgs)
- ERTS_UNSET_RUNQ_FLG_EVACUATE(rq->flags, p->prio);
- }
+ while (1) {
+ erts_aint32_t exp, new, tmp;
+ tmp = new = exp = state;
+ new &= ~ERTS_PSFLG_IN_RUNQ;
+ tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT);
+ if (tmp != ERTS_PSFLG_SUSPENDED)
+ new |= ERTS_PSFLG_RUNNING;
+ state = erts_smp_atomic32_cmpxchg_relb(&p->state, new, exp);
+ if (state == exp) {
+ tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT);
+ if (tmp == ERTS_PSFLG_SUSPENDED)
+ goto pick_next_process;
+ state = new;
+ break;
+ }
+ }
- ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(rq, p);
+ rq->procs.context_switches++;
- rq->procs.context_switches++;
+ esdp->current_process = p;
- esdp->current_process = p;
+ }
#ifdef ERTS_SMP
- p->runq_flags |= ERTS_PROC_RUNQ_FLG_RUNNING;
erts_smp_runq_unlock(rq);
+ if (flags & ERTS_RUNQ_FLG_PROTECTED)
+ ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
+
ERTS_SMP_CHK_NO_PROC_LOCKS;
erts_smp_proc_lock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
if (erts_sched_stat.enabled) {
+ int prio;
UWord old = ERTS_PROC_SCHED_ID(p,
(ERTS_PROC_LOCK_MAIN
| ERTS_PROC_LOCK_STATUS),
(UWord) esdp->no);
int migrated = old && old != esdp->no;
+ prio = (int) (state & ERTS_PSFLG_PRIO_MASK);
+
erts_smp_spin_lock(&erts_sched_stat.lock);
- erts_sched_stat.prio[p->prio].total_executed++;
- erts_sched_stat.prio[p->prio].executed++;
+ erts_sched_stat.prio[prio].total_executed++;
+ erts_sched_stat.prio[prio].executed++;
if (migrated) {
- erts_sched_stat.prio[p->prio].total_migrated++;
- erts_sched_stat.prio[p->prio].migrated++;
+ erts_sched_stat.prio[prio].total_migrated++;
+ erts_sched_stat.prio[prio].migrated++;
}
erts_smp_spin_unlock(&erts_sched_stat.lock);
}
- p->status_flags |= ERTS_PROC_SFLG_RUNNING;
- p->status_flags &= ~ERTS_PROC_SFLG_INRUNQ;
if (ERTS_PROC_PENDING_EXIT(p)) {
erts_handle_pending_exit(p,
ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
+ state = erts_smp_atomic32_read_nob(&p->state);
}
ASSERT(!p->scheduler_data);
p->scheduler_data = esdp;
-
#endif
- ASSERT(p->status != P_SUSPENDED); /* Never run a suspended process */
+ /* Never run a suspended process */
+ ASSERT(!(ERTS_PSFLG_SUSPENDED & erts_smp_atomic32_read_nob(&p->state)));
ACTIVATE(p);
reds = context_reds;
if (IS_TRACED(p)) {
- switch (p->status) {
- case P_EXITING:
+ if (state & ERTS_PSFLG_EXITING) {
if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_EXIT))
trace_sched(p, am_in_exiting);
- break;
- default:
+ }
+ else {
if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED))
trace_sched(p, am_in);
else if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_PROCS))
trace_virtual_sched(p, am_in);
- break;
}
if (IS_TRACED_FL(p, F_TRACE_CALLS)) {
erts_schedule_time_break(p, ERTS_BP_CALL_TIME_SCHEDULE_IN);
}
}
- if (p->status != P_EXITING)
- p->status = P_RUNNING;
-
erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
#ifdef ERTS_SMP
@@ -6762,7 +6861,7 @@ Process *schedule(Process *p, int calls)
erts_check_my_tracer_proc(p);
#endif
- if (!ERTS_PROC_IS_EXITING(p)
+ if (!(state & ERTS_PSFLG_EXITING)
&& ((FLAGS(p) & F_FORCE_GC)
|| (MSO(p).overhead > BIN_VHEAP_SZ(p)))) {
reds -= erts_garbage_collect(p, 0, p->arg_reg, p->arity);
@@ -6854,17 +6953,19 @@ erts_schedule_misc_op(void (*func)(void *), void *arg)
ErtsSchedulerData *esdp = erts_get_scheduler_data();
ErtsRunQueue *rq = esdp ? esdp->run_queue : ERTS_RUNQ_IX(0);
ErtsMiscOpList *molp = misc_op_list_alloc();
+#ifdef ERTS_SMP
+ ErtsMigrationPaths *mpaths = erts_get_migration_paths();
- erts_smp_runq_lock(rq);
-
- while (rq->misc.evac_runq) {
- ErtsRunQueue *tmp_rq = rq->misc.evac_runq;
- erts_smp_runq_unlock(rq);
- rq = tmp_rq;
- erts_smp_runq_lock(rq);
+ if (!mpaths)
+ rq = ERTS_RUNQ_IX(0);
+ else {
+ ErtsRunQueue *erq = mpaths->mpath[rq->ix].misc_evac_runq;
+ if (erq)
+ rq = erq;
}
+#endif
- ASSERT(!(rq->flags & ERTS_RUNQ_FLG_SUSPENDED));
+ erts_smp_runq_lock(rq);
molp->next = NULL;
molp->func = func;
@@ -6874,7 +6975,9 @@ erts_schedule_misc_op(void (*func)(void *), void *arg)
else
rq->misc.start = molp;
rq->misc.end = molp;
+
erts_smp_runq_unlock(rq);
+
smp_notify_inc_runq(rq);
}
@@ -6964,44 +7067,50 @@ erts_get_exact_total_reductions(Process *c_p, Uint *redsp, Uint *diffp)
Sint
erts_test_next_pid(int set, Uint next)
{
+ Uint64 lpd;
Sint res;
- Sint p_prev;
+ Eterm pid_data;
+ int first_pix = -1;
+ erts_smp_rwmtx_rwlock(&erts_proc_tab_rwmtx);
- erts_smp_mtx_lock(&proc_tab_mtx);
-
- if (!set) {
- res = p_next < 0 ? -1 : (p_serial << p_serial_shift | p_next);
- }
+ if (!set)
+ lpd = last_pid_data_read_nob();
else {
- p_serial = (Sint) ((next >> p_serial_shift) & p_serial_mask);
- p_next = (Sint) (erts_process_tab_index_mask & next);
-
- if (p_next >= erts_max_processes) {
- p_next = 0;
- p_serial++;
- p_serial &= p_serial_mask;
+ lpd = (Uint64) next;
+ pid_data = (Eterm) (lpd & ERTS_PID_DATA_MASK__);
+ if (ERTS_INVALID_PID == make_internal_pid(pid_data)) {
+ lpd += erts_proc.max;
+ ASSERT(erts_pid_data2ix(pid_data)
+ == erts_pid_data2ix(lpd & ERTS_PID_DATA_MASK__));
}
+ last_pid_data_set_relb(lpd);
+ }
- p_prev = p_next;
-
- do {
- if (!process_tab[p_next])
- break;
- p_next++;
- if(p_next >= erts_max_processes) {
- p_next = 0;
- p_serial++;
- p_serial &= p_serial_mask;
+ while (1) {
+ int pix;
+ lpd++;
+ pix = (int) (lpd % erts_proc.max);
+ if (first_pix < 0)
+ first_pix = pix;
+ else if (pix == first_pix) {
+ res = -1;
+ break;
+ }
+ if (ERTS_AINT_NULL == erts_smp_atomic_read_nob(&erts_proc.tab[pix])) {
+ pid_data = (Eterm) (lpd & ERTS_PID_DATA_MASK__);
+ if (ERTS_INVALID_PID == make_internal_pid(pid_data)) {
+ lpd += erts_proc.max;
+ ASSERT(erts_pid_data2ix(pid_data)
+ == erts_pid_data2ix(lpd & ERTS_PID_DATA_MASK__));
}
- } while (p_prev != p_next);
-
- res = process_tab[p_next] ? -1 : (p_serial << p_serial_shift | p_next);
-
+ res = lpd & ERTS_PID_DATA_MASK__;
+ break;
+ }
}
- erts_smp_mtx_unlock(&proc_tab_mtx);
+ erts_smp_rwmtx_rwunlock(&erts_proc_tab_rwmtx);
return res;
@@ -7010,6 +7119,8 @@ erts_test_next_pid(int set, Uint next)
Uint erts_process_count(void)
{
erts_aint32_t res = erts_smp_atomic32_read_nob(&process_count);
+ if (res > erts_proc.max)
+ return erts_proc.max;
ASSERT(res >= 0);
return (Uint) res;
}
@@ -7017,97 +7128,138 @@ Uint erts_process_count(void)
void
erts_free_proc(Process *p)
{
-#if defined(ERTS_ENABLE_LOCK_COUNT) && defined(ERTS_SMP)
- erts_lcnt_proc_lock_destroy(p);
+#ifdef ERTS_SMP
+ erts_proc_lock_fin(p);
#endif
erts_free(ERTS_ALC_T_PROC, (void *) p);
}
-
/*
** Allocate process and find out where to place next process.
*/
static Process*
-alloc_process(void)
+alloc_process(ErtsRunQueue *rq, erts_aint32_t state)
{
-#ifdef ERTS_SMP
- erts_pix_lock_t *pix_lock;
-#endif
+ int pix;
Process* p;
- int p_prev;
+ Uint64 lpd, exp_lpd;
+ Eterm pid_data;
+ erts_aint32_t proc_count;
+#ifdef DEBUG
+ Eterm pid;
+#endif
+
+ erts_smp_rwmtx_rlock(&erts_proc_tab_rwmtx);
- erts_smp_mtx_lock(&proc_tab_mtx);
+ proc_count = erts_smp_atomic32_inc_read_acqb(&process_count);
+ if (proc_count > erts_proc.max) {
+ while (1) {
+ erts_aint32_t act_proc_count;
- if (p_next == -1) {
- p = NULL;
- goto error; /* Process table full! */
+ act_proc_count = erts_smp_atomic32_cmpxchg_relb(&process_count,
+ proc_count-1,
+ proc_count);
+ if (act_proc_count == proc_count)
+ goto system_limit;
+ proc_count = act_proc_count;
+ if (proc_count <= erts_proc.max)
+ break;
+ }
}
p = (Process*) erts_alloc_fnf(ERTS_ALC_T_PROC, sizeof(Process));
if (!p)
- goto error; /* ENOMEM */
+ goto enomem;
- p_last = p_next;
+ p->approx_started = erts_get_approx_time();
+ p->started_interval = get_proc_interval();
- erts_get_emu_time(&p->started);
+ lpd = last_pid_data_read_acqb();
-#ifdef ERTS_SMP
- pix_lock = ERTS_PIX2PIXLOCK(p_next);
- erts_pix_lock(pix_lock);
-#endif
- ASSERT(!process_tab[p_next]);
+ /* Reserve slot */
+ while (1) {
+ lpd++;
+ pix = erts_pid_data2ix((Eterm) (lpd & ERTS_PID_DATA_MASK__));
+ if (erts_smp_atomic_read_nob(&erts_proc.tab[pix]) == ERTS_AINT_NULL) {
+ erts_aint_t val;
+ val = erts_smp_atomic_cmpxchg_relb(&erts_proc.tab[pix],
+ ((erts_aint_t)
+ ERTS_PROC_LOCK_BUSY),
+ ERTS_AINT_NULL);
+
+ if (ERTS_AINT_NULL == val)
+ break;
+ }
+ }
- process_tab[p_next] = p;
- erts_smp_atomic32_inc_nob(&process_count);
- p->id = make_internal_pid(p_serial << p_serial_shift | p_next);
+ pid_data = (Eterm) lpd & ERTS_PID_DATA_MASK__;
+
+ p->id = make_internal_pid(pid_data);
if (p->id == ERTS_INVALID_PID) {
/* Do not use the invalid pid; change serial */
- p_serial++;
- p_serial &= p_serial_mask;
- p->id = make_internal_pid(p_serial << p_serial_shift | p_next);
+ lpd += erts_proc.max;
+ ASSERT(pix == erts_pid_data2ix((Eterm) (lpd & ERTS_PID_DATA_MASK__)));
+ pid_data = (Eterm) lpd & ERTS_PID_DATA_MASK__;
+ p->id = make_internal_pid(pid_data);
ASSERT(p->id != ERTS_INVALID_PID);
}
- ASSERT(internal_pid_serial(p->id) <= (erts_use_r9_pids_ports
- ? ERTS_MAX_PID_R9_SERIAL
- : ERTS_MAX_PID_SERIAL));
+
+ exp_lpd = last_pid_data_read_nob();
+
+ /* Move last pid data forward */
+ while (1) {
+ Uint64 act_lpd;
+ if (last_pid_data_cmp(lpd, exp_lpd) < 0)
+ break;
+ act_lpd = last_pid_data_cmpxchg_relb(lpd, exp_lpd);
+ if (act_lpd == exp_lpd)
+ break;
+ exp_lpd = act_lpd;
+ }
#ifdef ERTS_SMP
- erts_proc_lock_init(p); /* All locks locked */
- erts_pix_unlock(pix_lock);
+ RUNQ_SET_RQ(&p->run_queue, rq);
#endif
- p->rstatus = P_FREE;
- p->rcount = 0;
+ erts_smp_atomic32_init_relb(&p->state, state);
- /*
- * set p_next to the next available slot
- */
+#ifdef DEBUG
+ pid = p->id;
+#endif
- p_prev = p_next;
+#ifdef ERTS_SMP
+ erts_proc_lock_init(p); /* All locks locked */
+#endif
- while (1) {
- p_next++;
- if(p_next >= erts_max_processes) {
- p_serial++;
- p_serial &= p_serial_mask;
- p_next = 0;
- }
+ /* Move into slot reserved */
+#ifdef DEBUG
+ ASSERT(ERTS_PROC_LOCK_BUSY
+ == (Process *) erts_smp_atomic_xchg_relb(&erts_proc.tab[pix],
+ (erts_aint_t) p));
+#else
+ erts_smp_atomic_set_relb(&erts_proc.tab[pix], (erts_aint_t) p);
+#endif
- if (p_prev == p_next) {
- p_next = -1;
- break; /* Table full! */
- }
+ ASSERT(internal_pid_serial(p->id) <= (erts_use_r9_pids_ports
+ ? ERTS_MAX_PID_R9_SERIAL
+ : ERTS_MAX_PID_SERIAL));
- if (!process_tab[p_next])
- break; /* found a free slot */
- }
+ erts_smp_rwmtx_runlock(&erts_proc_tab_rwmtx);
- error:
+ p->rcount = 0;
- erts_smp_mtx_unlock(&proc_tab_mtx);
+ ASSERT(p == (Process *)
+ erts_smp_atomic_read_nob(
+ &erts_proc.tab[internal_pid_index(pid)]));
return p;
+enomem:
+system_limit:
+
+ erts_smp_rwmtx_runlock(&erts_proc_tab_rwmtx);
+ return NULL;
+
}
Eterm
@@ -7117,7 +7269,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
Eterm args, /* Arguments for function (must be well-formed list). */
ErlSpawnOpts* so) /* Options for spawn. */
{
- ErtsRunQueue *rq, *notify_runq;
+ ErtsRunQueue *rq = NULL;
Process *p;
Sint arity; /* Number of arguments. */
#ifndef HYBRID
@@ -7126,6 +7278,8 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
Uint sz; /* Needed words on heap. */
Uint heap_need; /* Size needed on heap. */
Eterm res = THE_NON_VALUE;
+ erts_aint32_t state = 0;
+ erts_aint32_t prio = (erts_aint32_t) PRIORITY_NORMAL;
#ifdef ERTS_SMP
erts_smp_proc_lock(parent, ERTS_PROC_LOCKS_ALL_MINOR);
@@ -7135,7 +7289,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
/*
* Copy the arguments to the global heap
* Since global GC might occur we want to do this before adding the
- * new process to the process_tab.
+ * new process to the erts_proc.tab.
*/
BM_SWAP_TIMER(system,copy);
LAZY_COPY(parent,args);
@@ -7150,8 +7304,24 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
so->error_code = BADARG;
goto error;
}
- p = alloc_process(); /* All proc locks are locked by this thread
- on success */
+
+ if (so->flags & SPO_USE_ARGS) {
+ if (so->scheduler) {
+ int ix = so->scheduler-1;
+ ASSERT(0 <= ix && ix < erts_no_run_queues);
+ rq = ERTS_RUNQ_IX(ix);
+ state |= ERTS_PSFLG_BOUND;
+ }
+ prio = (erts_aint32_t) so->priority;
+ }
+
+ state |= (prio & ERTS_PSFLG_PRIO_MASK);
+
+ if (!rq)
+ rq = erts_get_runq_proc(parent);
+
+ p = alloc_process(rq, state); /* All proc locks are locked by this thread
+ on success */
if (!p) {
erts_send_error_to_logger_str(parent->group_leader,
"Too many processes\n");
@@ -7173,22 +7343,16 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
p->flags = erts_default_process_flags;
- /* Scheduler queue mutex should be locked when changeing
- * prio. In this case we don't have to lock it, since
- * noone except us has access to the process.
- */
if (so->flags & SPO_USE_ARGS) {
p->min_heap_size = so->min_heap_size;
p->min_vheap_size = so->min_vheap_size;
- p->prio = so->priority;
p->max_gen_gcs = so->max_gen_gcs;
} else {
p->min_heap_size = H_MIN_SIZE;
p->min_vheap_size = BIN_VH_MIN_SIZE;
- p->prio = PRIORITY_NORMAL;
p->max_gen_gcs = (Uint16) erts_smp_atomic32_read_nob(&erts_max_gen_gcs);
}
- p->skipped = 0;
+ p->schedule_count = 0;
ASSERT(p->min_heap_size == erts_next_heap_size(p->min_heap_size, 0));
p->initial[INITIAL_MOD] = mod;
@@ -7302,7 +7466,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
p->msg_inq.first = NULL;
p->msg_inq.last = &p->msg_inq.first;
p->msg_inq.len = 0;
- p->bound_runq = NULL;
#endif
p->bif_timers = NULL;
p->mbuf = NULL;
@@ -7404,9 +7567,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
#ifdef ERTS_SMP
p->scheduler_data = NULL;
- p->is_exiting = 0;
- p->status_flags = 0;
- p->runq_flags = 0;
p->suspendee = NIL;
p->pending_suspenders = NULL;
p->pending_exit.reason = THE_NON_VALUE;
@@ -7417,34 +7577,15 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
p->fp_exception = 0;
#endif
+ erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL);
+
+ res = p->id;
+
/*
* Schedule process for execution.
*/
- if (!((so->flags & SPO_USE_ARGS) && so->scheduler))
- rq = erts_get_runq_proc(parent);
- else {
- int ix = so->scheduler-1;
- ASSERT(0 <= ix && ix < erts_no_run_queues);
- rq = ERTS_RUNQ_IX(ix);
- p->bound_runq = rq;
- }
-
- erts_smp_runq_lock(rq);
-
-#ifdef ERTS_SMP
- p->run_queue = rq;
-#endif
-
- p->status = P_WAITING;
- notify_runq = internal_add_to_runq(rq, p);
-
- erts_smp_runq_unlock(rq);
-
- smp_notify_inc_runq(notify_runq);
-
- res = p->id;
- erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL);
+ schedule_process(p, state, 0);
VERBOSE(DEBUG_PROCESSES, ("Created a new process: %T\n",p->id));
@@ -7480,12 +7621,8 @@ void erts_init_empty_process(Process *p)
p->max_gen_gcs = 0;
p->min_heap_size = 0;
p->min_vheap_size = 0;
- p->status = P_RUNABLE;
- p->gcstatus = P_RUNABLE;
- p->rstatus = P_RUNABLE;
p->rcount = 0;
p->id = ERTS_INVALID_PID;
- p->prio = PRIORITY_NORMAL;
p->reds = 0;
p->tracer_proc = NIL;
p->trace_flags = F_INITIAL_TRACE_FLAGS;
@@ -7502,7 +7639,6 @@ void erts_init_empty_process(Process *p)
p->bin_vheap_mature = 0;
#ifdef ERTS_SMP
p->u.ptimer = NULL;
- p->bound_runq = NULL;
#else
memset(&(p->u.tm), 0, sizeof(ErlTimer));
#endif
@@ -7556,8 +7692,8 @@ void erts_init_empty_process(Process *p)
p->def_arg_reg[5] = 0;
p->parent = NIL;
- p->started.tv_sec = 0;
- p->started.tv_usec = 0;
+ p->approx_started = 0;
+ p->started_interval = 0;
#ifdef HIPE
hipe_init_process(&p->hipe);
@@ -7579,12 +7715,10 @@ void erts_init_empty_process(Process *p)
p->last_old_htop = NULL;
#endif
+ erts_smp_atomic32_init_nob(&p->state, (erts_aint32_t) PRIORITY_NORMAL);
#ifdef ERTS_SMP
p->scheduler_data = NULL;
- p->is_exiting = 0;
- p->status_flags = 0;
- p->runq_flags = 0;
p->msg_inq.first = NULL;
p->msg_inq.last = &p->msg_inq.first;
p->msg_inq.len = 0;
@@ -7594,7 +7728,7 @@ void erts_init_empty_process(Process *p)
p->pending_exit.bp = NULL;
erts_proc_lock_init(p);
erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL);
- p->run_queue = ERTS_RUNQ_IX(0);
+ RUNQ_SET_RQ(&p->run_queue, ERTS_RUNQ_IX(0));
#endif
#if !defined(NO_FPE_SIGNALS) || defined(HIPE)
@@ -7675,8 +7809,8 @@ erts_cleanup_empty_process(Process* p)
free_message_buffer(p->mbuf);
p->mbuf = NULL;
}
-#if defined(ERTS_ENABLE_LOCK_COUNT) && defined(ERTS_SMP)
- erts_lcnt_proc_lock_destroy(p);
+#ifdef ERTS_SMP
+ erts_proc_lock_fin(p);
#endif
#ifdef DEBUG
erts_debug_verify_clean_empty_process(p);
@@ -7789,26 +7923,34 @@ delete_process(Process* p)
}
+static ERTS_INLINE erts_aint32_t
+set_proc_exiting_state(Process *p, erts_aint32_t state)
+{
+ erts_aint32_t a, n, e;
+ a = state;
+ while (1) {
+ n = e = a;
+ n &= ~(ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT);
+ n |= ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE;
+ if (!(a & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING)))
+ n |= ERTS_PSFLG_IN_RUNQ;
+ a = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e);
+ if (a == e)
+ break;
+ }
+ return a;
+}
+
static ERTS_INLINE void
-set_proc_exiting(Process *p, Eterm reason, ErlHeapFragment *bp)
+set_proc_exiting(Process *p,
+ erts_aint32_t state,
+ Eterm reason,
+ ErlHeapFragment *bp)
{
-#ifdef ERTS_SMP
- erts_pix_lock_t *pix_lock = ERTS_PID2PIXLOCK(p->id);
ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(p) == ERTS_PROC_LOCKS_ALL);
- /*
- * You are required to have all proc locks and the pix lock when going
- * to status P_EXITING. This makes it is enough to take any lock when
- * looking up a process (pid2proc()) to prevent the looked up process
- * from exiting until the lock has been released.
- */
- erts_pix_lock(pix_lock);
- p->is_exiting = 1;
-#endif
- p->status = P_EXITING;
-#ifdef ERTS_SMP
- erts_pix_unlock(pix_lock);
-#endif
+ state = set_proc_exiting_state(p, state);
+
p->fvalue = reason;
if (bp)
erts_link_mbuf_to_proc(p, bp);
@@ -7821,6 +7963,14 @@ set_proc_exiting(Process *p, Eterm reason, ErlHeapFragment *bp)
KILL_CATCHES(p);
cancel_timer(p);
p->i = (BeamInstr *) beam_exit;
+
+ if (erts_system_profile_flags.runnable_procs
+ && !(state & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) {
+ profile_runnable_proc(p, am_active);
+ }
+
+ if (!(state & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING)))
+ add2runq(p, state);
}
@@ -7833,8 +7983,8 @@ erts_handle_pending_exit(Process *c_p, ErtsProcLocks locks)
ASSERT(is_value(c_p->pending_exit.reason));
ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == locks);
ERTS_SMP_LC_ASSERT(locks & ERTS_PROC_LOCK_MAIN);
- ERTS_SMP_LC_ASSERT(c_p->status != P_EXITING);
- ERTS_SMP_LC_ASSERT(c_p->status != P_FREE);
+ ERTS_SMP_LC_ASSERT(!((ERTS_PSFLG_EXITING|ERTS_PSFLG_FREE)
+ & erts_smp_atomic32_read_nob(&c_p->state)));
/* Ensure that all locks on c_p are locked before proceeding... */
if (locks == ERTS_PROC_LOCKS_ALL)
@@ -7847,7 +7997,10 @@ erts_handle_pending_exit(Process *c_p, ErtsProcLocks locks)
}
}
- set_proc_exiting(c_p, c_p->pending_exit.reason, c_p->pending_exit.bp);
+ set_proc_exiting(c_p,
+ erts_smp_atomic32_read_acqb(&c_p->state),
+ c_p->pending_exit.reason,
+ c_p->pending_exit.bp);
c_p->pending_exit.reason = THE_NON_VALUE;
c_p->pending_exit.bp = NULL;
@@ -7863,11 +8016,12 @@ handle_pending_exiters(ErtsProcList *pnd_xtrs)
while (plp) {
Process *p = erts_pid2proc(NULL, 0, plp->pid, ERTS_PROC_LOCKS_ALL);
if (p) {
- if (proclist_same(plp, p)
- && !(p->status_flags & ERTS_PROC_SFLG_RUNNING)) {
- ASSERT(p->status_flags & ERTS_PROC_SFLG_INRUNQ);
- ASSERT(ERTS_PROC_PENDING_EXIT(p));
- erts_handle_pending_exit(p, ERTS_PROC_LOCKS_ALL);
+ if (proclist_same(plp, p)) {
+ erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state);
+ if (!(state & ERTS_PSFLG_RUNNING)) {
+ ASSERT(state & ERTS_PSFLG_PENDING_EXIT);
+ erts_handle_pending_exit(p, ERTS_PROC_LOCKS_ALL);
+ }
}
erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL);
}
@@ -7895,7 +8049,7 @@ save_pending_exiter(Process *p)
rq->procs.pending_exiters = plp;
erts_smp_runq_unlock(rq);
-
+ wake_scheduler(rq, 1);
}
#endif
@@ -7957,7 +8111,7 @@ send_exit_message(Process *to, ErtsProcLocks *to_locksp,
* SMP emulator). When the signal is received the receiver receives an
* 'EXIT' message if it is trapping exits; otherwise, it will either
* ignore the signal if the exit reason is normal, or go into an
- * exiting state (status P_EXITING). When a process has gone into the
+ * exiting state (ERTS_PSFLG_EXITING). When a process has gone into the
* exiting state it will not execute any more Erlang code, but it might
* take a while before it actually exits. The exit signal is being
* received when the 'EXIT' message is put in the message queue, the
@@ -8030,6 +8184,7 @@ send_exit_signal(Process *c_p, /* current process if and only
Uint32 flags /* flags */
)
{
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&rp->state);
Eterm rsn = reason == am_kill ? am_killed : reason;
ERTS_SMP_LC_ASSERT(*rp_locks == erts_proc_lc_my_proc_locks(rp));
@@ -8051,7 +8206,7 @@ send_exit_signal(Process *c_p, /* current process if and only
}
#endif
- if (ERTS_PROC_IS_TRAPPING_EXITS(rp)
+ if ((state & ERTS_PSFLG_TRAP_EXIT)
&& (reason != am_kill || (flags & ERTS_XSIG_FLG_IGN_KILL))) {
if (is_not_nil(token)
#ifdef USE_VM_PROBES
@@ -8067,9 +8222,7 @@ send_exit_signal(Process *c_p, /* current process if and only
}
else if (reason != am_normal || (flags & ERTS_XSIG_FLG_NO_IGN_NORMAL)) {
#ifdef ERTS_SMP
- if (!ERTS_PROC_PENDING_EXIT(rp) && !rp->is_exiting) {
- ASSERT(rp->status != P_EXITING);
- ASSERT(rp->status != P_FREE);
+ if (!(state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))) {
ASSERT(!rp->pending_exit.bp);
if (rp == c_p && (*rp_locks & ERTS_PROC_LOCK_MAIN)) {
@@ -8085,9 +8238,9 @@ send_exit_signal(Process *c_p, /* current process if and only
}
*rp_locks = ERTS_PROC_LOCKS_ALL;
}
- set_proc_exiting(c_p, rsn, NULL);
+ set_proc_exiting(c_p, state, rsn, NULL);
}
- else if (!(rp->status_flags & ERTS_PROC_SFLG_RUNNING)) {
+ else if (!(state & ERTS_PSFLG_RUNNING)) {
/* Process not running ... */
ErtsProcLocks need_locks = ~(*rp_locks) & ERTS_PROC_LOCKS_ALL;
if (need_locks
@@ -8104,6 +8257,7 @@ send_exit_signal(Process *c_p, /* current process if and only
/* ...and we have all locks on it... */
*rp_locks = ERTS_PROC_LOCKS_ALL;
set_proc_exiting(rp,
+ state,
(is_immed(rsn)
? rsn
: copy_object(rsn, rp)),
@@ -8133,11 +8287,9 @@ send_exit_signal(Process *c_p, /* current process if and only
&bp->off_heap);
rp->pending_exit.bp = bp;
}
- ASSERT(ERTS_PROC_PENDING_EXIT(rp));
+ erts_smp_atomic32_read_bor_relb(&rp->state,
+ ERTS_PSFLG_PENDING_EXIT);
}
- if (!(rp->status_flags
- & (ERTS_PROC_SFLG_INRUNQ|ERTS_PROC_SFLG_RUNNING)))
- erts_add_to_runq(rp);
}
/* else:
*
@@ -8149,18 +8301,15 @@ send_exit_signal(Process *c_p, /* current process if and only
* exit or by itself before seeing the pending exit.
*/
#else /* !ERTS_SMP */
- if (c_p == rp) {
- rp->status = P_EXITING;
- c_p->fvalue = rsn;
- }
- else if (rp->status != P_EXITING) { /* No recursive process exits /PaN */
- Eterm old_status = rp->status;
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&rp->state);
+ if (!(state & ERTS_PSFLG_EXITING)) {
set_proc_exiting(rp,
- is_immed(rsn) ? rsn : copy_object(rsn, rp),
+ state,
+ (is_immed(rsn) || c_p == rp
+ ? rsn
+ : copy_object(rsn, rp)),
NULL);
ACTIVATE(rp);
- if (old_status != P_RUNABLE && old_status != P_RUNNING)
- erts_add_to_runq(rp);
}
#endif
return -1; /* Receiver will exit */
@@ -8463,12 +8612,14 @@ resume_suspend_monitor(ErtsSuspendMonitor *smon, void *vc_p)
erts_destroy_suspend_monitor(smon);
}
-static void
-continue_exit_process(Process *p
#ifdef ERTS_SMP
- , erts_pix_lock_t *pix_lock
+static void
+proc_dec_refc(void *vproc)
+{
+ erts_smp_proc_dec_refc((Process *) vproc);
+}
#endif
- );
+
/* this function fishishes a process and propagates exit messages - called
by process_main when a process dies */
@@ -8476,9 +8627,8 @@ void
erts_do_exit_process(Process* p, Eterm reason)
{
#ifdef ERTS_SMP
- erts_pix_lock_t *pix_lock = ERTS_PID2PIXLOCK(p->id);
+ erts_aint32_t state;
#endif
-
p->arity = 0; /* No live registers */
p->fvalue = reason;
@@ -8496,27 +8646,17 @@ erts_do_exit_process(Process* p, Eterm reason)
#ifdef ERTS_SMP
ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p);
/* By locking all locks (main lock is already locked) when going
- to status P_EXITING, it is enough to take any lock when
+ to exiting state (ERTS_PSFLG_EXITING), it is enough to take any lock when
looking up a process (erts_pid2proc()) to prevent the looked up
process from exiting until the lock has been released. */
erts_smp_proc_lock(p, ERTS_PROC_LOCKS_ALL_MINOR);
#endif
-
- if (erts_system_profile_flags.runnable_procs && (p->status != P_WAITING)) {
- profile_runnable_proc(p, am_inactive);
- }
-
-#ifdef ERTS_SMP
- erts_pix_lock(pix_lock);
- p->is_exiting = 1;
-#endif
-
- p->status = P_EXITING;
-
-#ifdef ERTS_SMP
- erts_pix_unlock(pix_lock);
- if (ERTS_PROC_PENDING_EXIT(p)) {
+#ifndef ERTS_SMP
+ set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state));
+#else
+ state = set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state));
+ if (state & ERTS_PSFLG_PENDING_EXIT) {
/* Process exited before pending exit was received... */
p->pending_exit.reason = THE_NON_VALUE;
if (p->pending_exit.bp) {
@@ -8555,29 +8695,11 @@ erts_do_exit_process(Process* p, Eterm reason)
erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL_MINOR);
-#ifdef ERTS_SMP
- continue_exit_process(p, pix_lock);
-#else
- continue_exit_process(p);
-#endif
+ erts_continue_exit_process(p);
}
void
-erts_continue_exit_process(Process *c_p)
-{
-#ifdef ERTS_SMP
- continue_exit_process(c_p, ERTS_PID2PIXLOCK(c_p->id));
-#else
- continue_exit_process(c_p);
-#endif
-}
-
-static void
-continue_exit_process(Process *p
-#ifdef ERTS_SMP
- , erts_pix_lock_t *pix_lock
-#endif
- )
+erts_continue_exit_process(Process *p)
{
ErtsLink* lnk;
ErtsMonitor *mon;
@@ -8593,11 +8715,7 @@ continue_exit_process(Process *p
ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(p));
-#ifdef DEBUG
- erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS);
- ASSERT(p->status == P_EXITING);
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
-#endif
+ ASSERT(ERTS_PROC_IS_EXITING(p));
#ifdef ERTS_SMP
if (p->flags & F_HAVE_BLCKD_MSCHED) {
@@ -8665,48 +8783,47 @@ continue_exit_process(Process *p
#endif
{
+ int maybe_save;
int pix;
/* Do *not* use erts_get_runq_proc() */
ErtsRunQueue *rq;
rq = erts_get_runq_current(ERTS_GET_SCHEDULER_DATA_FROM_PROC(p));
- ASSERT(internal_pid_index(p->id) < erts_max_processes);
pix = internal_pid_index(p->id);
- erts_smp_mtx_lock(&proc_tab_mtx);
+ erts_smp_rwmtx_rlock(&erts_proc_tab_rwmtx);
+ maybe_save = saved_term_procs.end != NULL;
+ if (maybe_save) {
+ erts_smp_rwmtx_runlock(&erts_proc_tab_rwmtx);
+ erts_smp_rwmtx_rwlock(&erts_proc_tab_rwmtx);
+ }
+
erts_smp_runq_lock(rq);
#ifdef ERTS_SMP
- erts_pix_lock(pix_lock);
-
ASSERT(p->scheduler_data);
ASSERT(p->scheduler_data->current_process == p);
ASSERT(p->scheduler_data->free_process == NULL);
p->scheduler_data->current_process = NULL;
p->scheduler_data->free_process = p;
- p->status_flags = 0;
#endif
- process_tab[pix] = NULL; /* Time of death! */
+ /* Time of death! */
+ erts_smp_atomic_set_relb(&erts_proc.tab[pix], ERTS_AINT_NULL);
+
ASSERT(erts_smp_atomic32_read_nob(&process_count) > 0);
- erts_smp_atomic32_dec_nob(&process_count);
+ erts_smp_atomic32_dec_relb(&process_count);
-#ifdef ERTS_SMP
- erts_pix_unlock(pix_lock);
-#endif
erts_smp_runq_unlock(rq);
- if (p_next < 0) {
- if (p_last >= p_next) {
- p_serial++;
- p_serial &= p_serial_mask;
- }
- p_next = pix;
+ if (!maybe_save)
+ erts_smp_rwmtx_runlock(&erts_proc_tab_rwmtx);
+ else {
+ if (saved_term_procs.end)
+ save_terminating_process(p);
+ erts_smp_rwmtx_rwunlock(&erts_proc_tab_rwmtx);
}
- ERTS_MAYBE_SAVE_TERMINATING_PROCESS(p);
-
- erts_smp_mtx_unlock(&proc_tab_mtx);
}
/*
@@ -8721,7 +8838,21 @@ continue_exit_process(Process *p
lnk = p->nlinks;
p->nlinks = NULL;
- p->status = P_FREE;
+
+ {
+ /* Inactivate and notify free */
+ erts_aint32_t n, e, a = erts_smp_atomic32_read_nob(&p->state);
+ while (1) {
+ n = e = a;
+ ASSERT(a & ERTS_PSFLG_EXITING);
+ n |= ERTS_PSFLG_FREE;
+ n &= ~ERTS_PSFLG_ACTIVE;
+ a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e);
+ if (a == e)
+ break;
+ }
+ }
+
dep = ((p->flags & F_DISTRIBUTION)
? ERTS_PROC_SET_DIST_ENTRY(p, ERTS_PROC_LOCKS_ALL, NULL)
: NULL);
@@ -8776,8 +8907,25 @@ continue_exit_process(Process *p
delete_process(p);
+#ifdef ERTS_SMP
+ /*
+ * Each scheduler will decrease refc by one via misc aux work;
+ * we have one refc for reference from process table which we
+ * now want to remove, i.e. we increase refc with schedulers-1.
+ *
+ * Process struct wont be deallocated until (earliest) when
+ * all schedulers have decreased refc via misc aux work...
+ */
+ if (erts_no_schedulers != 1)
+ erts_smp_proc_add_refc(p, (Sint32) erts_no_schedulers-1);
+ erts_schedule_multi_misc_aux_work(0,
+ erts_no_schedulers,
+ proc_dec_refc,
+ (void *) p);
+
erts_smp_proc_lock(p, ERTS_PROC_LOCK_MAIN);
ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p);
+#endif
return;
@@ -8790,8 +8938,6 @@ continue_exit_process(Process *p
ERTS_SMP_LC_ASSERT(curr_locks == erts_proc_lc_my_proc_locks(p));
ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN & curr_locks);
- ASSERT(p->status == P_EXITING);
-
p->i = (BeamInstr *) beam_continue_exit;
if (!(curr_locks & ERTS_PROC_LOCK_STATUS)) {
@@ -8799,8 +8945,6 @@ continue_exit_process(Process *p
curr_locks |= ERTS_PROC_LOCK_STATUS;
}
- erts_add_to_runq(p);
-
if (curr_locks != ERTS_PROC_LOCK_MAIN)
erts_smp_proc_unlock(p, ~ERTS_PROC_LOCK_MAIN & curr_locks);
@@ -8812,33 +8956,15 @@ continue_exit_process(Process *p
static void
timeout_proc(Process* p)
{
+ erts_aint32_t state;
BeamInstr** pi = (BeamInstr **) p->def_arg_reg;
p->i = *pi;
p->flags |= F_TIMO;
p->flags &= ~F_INSLPQUEUE;
- switch (p->status) {
- case P_GARBING:
- switch (p->gcstatus) {
- case P_SUSPENDED:
- goto suspended;
- case P_WAITING:
- goto waiting;
- default:
- break;
- }
- break;
- case P_WAITING:
- waiting:
- erts_add_to_runq(p);
- break;
- case P_SUSPENDED:
- suspended:
- p->rstatus = P_RUNABLE; /* MUST set resume status to runnable */
- break;
- default:
- break;
- }
+ state = erts_smp_atomic32_read_acqb(&p->state);
+ if (!(state & ERTS_PSFLG_ACTIVE))
+ schedule_process(p, state, 0);
}
@@ -8906,6 +9032,7 @@ erts_stack_dump(int to, void *to_arg, Process *p)
void
erts_program_counter_info(int to, void *to_arg, Process *p)
{
+ erts_aint32_t state;
int i;
erts_print(to, to_arg, "Program counter: %p (", p->i);
@@ -8914,7 +9041,8 @@ erts_program_counter_info(int to, void *to_arg, Process *p)
erts_print(to, to_arg, "CP: %p (", p->cp);
print_function_from_pc(to, to_arg, p->cp);
erts_print(to, to_arg, ")\n");
- if (!((p->status == P_RUNNING) || (p->status == P_GARBING))) {
+ state = erts_smp_atomic32_read_acqb(&p->state);
+ if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_GC))) {
erts_print(to, to_arg, "arity = %d\n",p->arity);
if (!ERTS_IS_CRASH_DUMPING) {
/*
@@ -9087,13 +9215,13 @@ do { \
#endif
#if ERTS_PROCESSES_BIF_DEBUGLEVEL >= ERTS_PROCS_DBGLVL_CHK_FOUND_PIDS
-# define ERTS_PROCS_DBG_CHK_PID_FOUND(PBDP, PID, TVP) \
- debug_processes_check_found_pid((PBDP), (PID), (TVP), 1)
-# define ERTS_PROCS_DBG_CHK_PID_NOT_FOUND(PBDP, PID, TVP) \
- debug_processes_check_found_pid((PBDP), (PID), (TVP), 0)
+# define ERTS_PROCS_DBG_CHK_PID_FOUND(PBDP, PID, IC) \
+ debug_processes_check_found_pid((PBDP), (PID), (IC), 1)
+# define ERTS_PROCS_DBG_CHK_PID_NOT_FOUND(PBDP, PID, IC) \
+ debug_processes_check_found_pid((PBDP), (PID), (IC), 0)
#else
-# define ERTS_PROCS_DBG_CHK_PID_FOUND(PBDP, PID, TVP)
-# define ERTS_PROCS_DBG_CHK_PID_NOT_FOUND(PBDP, PID, TVP)
+# define ERTS_PROCS_DBG_CHK_PID_FOUND(PBDP, PID, IC)
+# define ERTS_PROCS_DBG_CHK_PID_NOT_FOUND(PBDP, PID, IC)
#endif
#if ERTS_PROCESSES_BIF_DEBUGLEVEL >= ERTS_PROCS_DBGLVL_CHK_TERM_PROC_LIST
@@ -9138,7 +9266,7 @@ static Uint processes_bif_tab_chunks;
static Export processes_trap_export;
typedef struct {
- SysTimeval time;
+ Uint64 interval;
} ErtsProcessesBifChunkInfo;
typedef enum {
@@ -9163,7 +9291,7 @@ typedef struct {
struct {
Eterm caller;
#if ERTS_PROCESSES_BIF_DEBUGLEVEL >= ERTS_PROCS_DBGLVL_CHK_FOUND_PIDS
- SysTimeval *pid_started;
+ Uint64 *pid_started;
#endif
#if ERTS_PROCESSES_BIF_DEBUGLEVEL >= ERTS_PROCS_DBGLVL_CHK_HALLOC
Eterm *heap;
@@ -9192,11 +9320,10 @@ static void debug_processes_verify_all_pids(ErtsProcessesBifData *pbdp);
#if ERTS_PROCESSES_BIF_DEBUGLEVEL >= ERTS_PROCS_DBGLVL_CHK_FOUND_PIDS
static void debug_processes_check_found_pid(ErtsProcessesBifData *pbdp,
Eterm pid,
- SysTimeval *started,
+ Uint64 ic,
int pid_should_be_found);
#endif
#if ERTS_PROCESSES_BIF_DEBUGLEVEL >= ERTS_PROCS_DBGLVL_CHK_TERM_PROC_LIST
-static SysTimeval debug_tv_start;
static void debug_processes_check_term_proc_list(void);
static void debug_processes_check_term_proc_free_list(ErtsTermProcElement *tpep);
#endif
@@ -9207,7 +9334,7 @@ save_terminating_process(Process *p)
ErtsTermProcElement *tpep = erts_alloc(ERTS_ALC_T_PROCS_TPROC_EL,
sizeof(ErtsTermProcElement));
ERTS_PROCS_ASSERT(saved_term_procs.start && saved_term_procs.end);
- ERTS_SMP_LC_ASSERT(erts_lc_mtx_is_locked(&proc_tab_mtx));
+ ERTS_SMP_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&erts_proc_tab_rwmtx));
ERTS_PROCS_DBG_CHK_TPLIST();
@@ -9215,19 +9342,19 @@ save_terminating_process(Process *p)
tpep->next = NULL;
tpep->ix = internal_pid_index(p->id);
tpep->u.process.pid = p->id;
- tpep->u.process.spawned = p->started;
- erts_get_emu_time(&tpep->u.process.exited);
+ tpep->u.process.spawned = p->started_interval;
+ tpep->u.process.exited = get_proc_interval();
saved_term_procs.end->next = tpep;
saved_term_procs.end = tpep;
ERTS_PROCS_DBG_CHK_TPLIST();
- ERTS_PROCS_ASSERT((tpep->prev->ix >= 0
- ? erts_cmp_timeval(&tpep->u.process.exited,
- &tpep->prev->u.process.exited)
- : erts_cmp_timeval(&tpep->u.process.exited,
- &tpep->prev->u.bif_invocation.time)) > 0);
+ ERTS_PROCS_ASSERT(tpep->prev->ix >= 0
+ ? (tpep->u.process.exited
+ >= tpep->prev->u.process.exited)
+ : (tpep->u.process.exited
+ >= tpep->prev->u.bif_invocation.interval));
}
static void
@@ -9258,7 +9385,7 @@ cleanup_processes_bif_data(Binary *bp)
if (pbdp->bif_invocation) {
ErtsTermProcElement *tpep;
- erts_smp_mtx_lock(&proc_tab_mtx);
+ erts_smp_rwmtx_rwlock(&erts_proc_tab_rwmtx);
ERTS_PROCS_DBG_TRACE(pbdp->debug.caller,
cleanup_processes_bif_data,
@@ -9312,7 +9439,7 @@ cleanup_processes_bif_data(Binary *bp)
ERTS_PROCS_DBG_CHK_TPLIST();
- erts_smp_mtx_unlock(&proc_tab_mtx);
+ erts_smp_rwmtx_rwunlock(&erts_proc_tab_rwmtx);
}
}
@@ -9340,7 +9467,7 @@ processes_bif_engine(Process *p, Eterm *res_accp, Binary *mbp)
pbdp->tix = 0;
pbdp->pid_ix = 0;
- erts_smp_mtx_lock(&proc_tab_mtx);
+ erts_smp_rwmtx_rwlock(&erts_proc_tab_rwmtx);
locked = 1;
ERTS_PROCS_DBG_TRACE(p->id, processes_bif_engine, init);
@@ -9351,7 +9478,7 @@ processes_bif_engine(Process *p, Eterm *res_accp, Binary *mbp)
#if ERTS_PROCESSES_BIF_DEBUGLEVEL >= ERTS_PROCS_DBGLVL_CHK_FOUND_PIDS
pbdp->debug.pid_started = erts_alloc(ERTS_ALC_T_PROCS_PIDS,
- sizeof(SysTimeval)*pbdp->pid_sz);
+ sizeof(Uint64)*pbdp->pid_sz);
#endif
ERTS_PROCS_DBG_SAVE_PIDS(pbdp);
@@ -9366,7 +9493,8 @@ processes_bif_engine(Process *p, Eterm *res_accp, Binary *mbp)
pbdp->bif_invocation = erts_alloc(ERTS_ALC_T_PROCS_TPROC_EL,
sizeof(ErtsTermProcElement));
pbdp->bif_invocation->ix = -1;
- erts_get_emu_time(&pbdp->bif_invocation->u.bif_invocation.time);
+ pbdp->bif_invocation->u.bif_invocation.interval
+ = step_proc_interval();
ERTS_PROCS_DBG_CHK_TPLIST();
pbdp->bif_invocation->next = NULL;
@@ -9393,30 +9521,31 @@ processes_bif_engine(Process *p, Eterm *res_accp, Binary *mbp)
int indices = ERTS_PROCESSES_BIF_TAB_CHUNK_SIZE;
int cix = ix / ERTS_PROCESSES_BIF_TAB_CHUNK_SIZE;
int end_ix = ix + indices;
- SysTimeval *invocation_timep;
+ Uint64 *invocation_interval_p;
- invocation_timep = (pbdp->bif_invocation
- ? &pbdp->bif_invocation->u.bif_invocation.time
- : NULL);
+ invocation_interval_p
+ = (pbdp->bif_invocation
+ ? &pbdp->bif_invocation->u.bif_invocation.interval
+ : NULL);
ERTS_PROCS_ASSERT(is_nil(*res_accp));
if (!locked) {
- erts_smp_mtx_lock(&proc_tab_mtx);
+ erts_smp_rwmtx_rwlock(&erts_proc_tab_rwmtx);
locked = 1;
}
- ERTS_SMP_LC_ASSERT(erts_lc_mtx_is_locked(&proc_tab_mtx));
+ ERTS_SMP_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&erts_proc_tab_rwmtx));
ERTS_PROCS_DBG_TRACE(p->id, processes_bif_engine, insp_table);
if (cix != 0)
- erts_get_emu_time(&pbdp->chunk[cix].time);
+ pbdp->chunk[cix].interval = step_proc_interval();
else if (pbdp->bif_invocation)
- pbdp->chunk[0].time = *invocation_timep;
- /* else: Time is irrelevant */
+ pbdp->chunk[0].interval = *invocation_interval_p;
+ /* else: interval is irrelevant */
- if (end_ix >= erts_max_processes) {
+ if (end_ix >= erts_proc.max) {
ERTS_PROCS_ASSERT(cix+1 == processes_bif_tab_chunks);
- end_ix = erts_max_processes;
+ end_ix = erts_proc.max;
indices = end_ix - ix;
/* What to do when done with this chunk */
pbdp->state = (processes_bif_tab_chunks == 1
@@ -9425,16 +9554,15 @@ processes_bif_engine(Process *p, Eterm *res_accp, Binary *mbp)
}
for (; ix < end_ix; ix++) {
- Process *rp = process_tab[ix];
+ Process *rp = erts_pix2proc(ix);
if (rp
- && (!invocation_timep
- || erts_cmp_timeval(&rp->started,
- invocation_timep) < 0)) {
+ && (!invocation_interval_p
+ || rp->started_interval < *invocation_interval_p)) {
ERTS_PROCS_ASSERT(is_internal_pid(rp->id));
pbdp->pid[pbdp->pid_ix] = rp->id;
#if ERTS_PROCESSES_BIF_DEBUGLEVEL >= ERTS_PROCS_DBGLVL_CHK_FOUND_PIDS
- pbdp->debug.pid_started[pbdp->pid_ix] = rp->started;
+ pbdp->debug.pid_started[pbdp->pid_ix] = rp->started_interval;
#endif
pbdp->pid_ix++;
@@ -9444,7 +9572,7 @@ processes_bif_engine(Process *p, Eterm *res_accp, Binary *mbp)
pbdp->tix = end_ix;
- erts_smp_mtx_unlock(&proc_tab_mtx);
+ erts_smp_rwmtx_rwunlock(&erts_proc_tab_rwmtx);
locked = 0;
reds = indices/ERTS_PROCESSES_BIF_TAB_INSPECT_INDICES_PER_RED;
@@ -9456,8 +9584,8 @@ processes_bif_engine(Process *p, Eterm *res_accp, Binary *mbp)
ix = pbdp->tix;
indices = ERTS_PROCESSES_BIF_TAB_CHUNK_SIZE;
end_ix = ix + indices;
- if (end_ix > erts_max_processes) {
- end_ix = erts_max_processes;
+ if (end_ix > erts_proc.max) {
+ end_ix = erts_proc.max;
indices = end_ix - ix;
}
@@ -9476,20 +9604,20 @@ processes_bif_engine(Process *p, Eterm *res_accp, Binary *mbp)
int i;
int max_reds;
int free_term_procs = 0;
- SysTimeval *invocation_timep;
+ Uint64 invocation_interval;
ErtsTermProcElement *tpep;
ErtsTermProcElement *free_list = NULL;
tpep = pbdp->bif_invocation;
ERTS_PROCS_ASSERT(tpep);
- invocation_timep = &tpep->u.bif_invocation.time;
+ invocation_interval = tpep->u.bif_invocation.interval;
max_reds = have_reds = ERTS_BIF_REDS_LEFT(p);
if (max_reds > ERTS_PROCESSES_INSPECT_TERM_PROC_MAX_REDS)
max_reds = ERTS_PROCESSES_INSPECT_TERM_PROC_MAX_REDS;
reds = 0;
- erts_smp_mtx_lock(&proc_tab_mtx);
+ erts_smp_rwmtx_rwlock(&erts_proc_tab_rwmtx);
ERTS_PROCS_DBG_TRACE(p->id, processes_bif_engine, insp_term_procs);
ERTS_PROCS_DBG_CHK_TPLIST();
@@ -9528,20 +9656,19 @@ processes_bif_engine(Process *p, Eterm *res_accp, Binary *mbp)
}
else {
int cix = tpep->ix/ERTS_PROCESSES_BIF_TAB_CHUNK_SIZE;
- SysTimeval *chunk_timep = &pbdp->chunk[cix].time;
+ Uint64 chunk_interval = pbdp->chunk[cix].interval;
Eterm pid = tpep->u.process.pid;
ERTS_PROCS_ASSERT(is_internal_pid(pid));
- if (erts_cmp_timeval(&tpep->u.process.spawned,
- invocation_timep) < 0) {
- if (erts_cmp_timeval(&tpep->u.process.exited,
- chunk_timep) < 0) {
+ if (tpep->u.process.spawned < invocation_interval) {
+ if (tpep->u.process.exited < chunk_interval) {
ERTS_PROCS_DBG_CHK_PID_NOT_FOUND(pbdp,
pid,
- &tpep->u.process.spawned);
+ tpep->u.process.spawned);
pbdp->pid[pbdp->pid_ix] = pid;
#if ERTS_PROCESSES_BIF_DEBUGLEVEL >= ERTS_PROCS_DBGLVL_CHK_FOUND_PIDS
- pbdp->debug.pid_started[pbdp->pid_ix] = tpep->u.process.spawned;
+ pbdp->debug.pid_started[pbdp->pid_ix]
+ = tpep->u.process.spawned;
#endif
pbdp->pid_ix++;
ERTS_PROCS_ASSERT(pbdp->pid_ix <= pbdp->pid_sz);
@@ -9549,13 +9676,13 @@ processes_bif_engine(Process *p, Eterm *res_accp, Binary *mbp)
else {
ERTS_PROCS_DBG_CHK_PID_FOUND(pbdp,
pid,
- &tpep->u.process.spawned);
+ tpep->u.process.spawned);
}
}
else {
ERTS_PROCS_DBG_CHK_PID_NOT_FOUND(pbdp,
pid,
- &tpep->u.process.spawned);
+ tpep->u.process.spawned);
}
i++;
@@ -9604,7 +9731,7 @@ processes_bif_engine(Process *p, Eterm *res_accp, Binary *mbp)
ERTS_PROCS_DBG_CHK_TPLIST();
ERTS_PROCS_DBG_CHK_FREELIST(free_list);
- erts_smp_mtx_unlock(&proc_tab_mtx);
+ erts_smp_rwmtx_rwunlock(&erts_proc_tab_rwmtx);
/*
* We do the actual free of term proc structures now when we
@@ -9664,8 +9791,9 @@ processes_bif_engine(Process *p, Eterm *res_accp, Binary *mbp)
sizeof(Eterm)*pbdp->pid_sz);
#if ERTS_PROCESSES_BIF_DEBUGLEVEL >= ERTS_PROCS_DBGLVL_CHK_FOUND_PIDS
pbdp->debug.pid_started = erts_realloc(ERTS_ALC_T_PROCS_PIDS,
- pbdp->debug.pid_started,
- sizeof(SysTimeval)*pbdp->pid_sz);
+ pbdp->debug.pid_started,
+ (sizeof(Uint64)
+ * pbdp->pid_sz));
#endif
}
reds = conses/ERTS_PROCESSES_BIF_BUILD_RESULT_CONSES_PER_RED;
@@ -9785,7 +9913,7 @@ init_processes_bif(void)
{
saved_term_procs.start = NULL;
saved_term_procs.end = NULL;
- processes_bif_tab_chunks = (((erts_max_processes - 1)
+ processes_bif_tab_chunks = (((erts_proc.max - 1)
/ ERTS_PROCESSES_BIF_TAB_CHUNK_SIZE)
+ 1);
@@ -9793,10 +9921,6 @@ init_processes_bif(void)
erts_init_trap_export(&processes_trap_export, am_erlang, am_processes_trap, 2,
&processes_trap);
-#if ERTS_PROCESSES_BIF_DEBUGLEVEL >= ERTS_PROCS_DBGLVL_CHK_TERM_PROC_LIST
- erts_get_emu_time(&debug_tv_start);
-#endif
-
}
/*
@@ -9828,31 +9952,29 @@ erts_debug_processes(Process *c_p)
Eterm res;
Eterm* hp;
Process *p;
-#ifdef DEBUG
Eterm *hp_end;
-#endif
- erts_smp_mtx_lock(&proc_tab_mtx);
+ erts_smp_rwmtx_rwlock(&erts_proc_tab_rwmtx);
res = NIL;
need = erts_process_count() * 2;
hp = HAlloc(c_p, need); /* we need two heap words for each pid */
-#ifdef DEBUG
hp_end = hp + need;
-#endif
/* make the list by scanning bakward */
- for (i = erts_max_processes-1; i >= 0; i--) {
- if ((p = process_tab[i]) != NULL) {
- res = CONS(hp, process_tab[i]->id, res);
+ for (i = erts_proc.max-1; i >= 0; i--) {
+ p = erts_pix2proc(i);
+ if (p) {
+ res = CONS(hp, p->id, res);
hp += 2;
}
}
- ASSERT(hp == hp_end);
- erts_smp_mtx_unlock(&proc_tab_mtx);
+ erts_smp_rwmtx_rwunlock(&erts_proc_tab_rwmtx);
+
+ HRelease(c_p, hp_end, hp);
return res;
}
@@ -9884,14 +10006,12 @@ erts_debug_processes_bif_info(Process *c_p)
static void
debug_processes_check_found_pid(ErtsProcessesBifData *pbdp,
Eterm pid,
- SysTimeval *tvp,
+ Uint64 ic,
int pid_should_be_found)
{
int i;
for (i = 0; i < pbdp->pid_ix; i++) {
- if (pbdp->pid[i] == pid
- && pbdp->debug.pid_started[i].tv_sec == tvp->tv_sec
- && pbdp->debug.pid_started[i].tv_usec == tvp->tv_usec) {
+ if (pbdp->pid[i] == pid && pbdp->debug.pid_started[i] == ic) {
ERTS_PROCS_ASSERT(pid_should_be_found);
return;
}
@@ -9925,8 +10045,8 @@ debug_processes_save_all_pids(ErtsProcessesBifData *pbdp)
pbdp->debug.correct_pids = erts_alloc(ERTS_ALC_T_PROCS_PIDS,
sizeof(Eterm)*pbdp->pid_sz);
- for (tix = 0, cpix = 0; tix < erts_max_processes; tix++) {
- Process *rp = process_tab[tix];
+ for (tix = 0, cpix = 0; tix < erts_proc.max; tix++) {
+ Process *rp = erts_pix2proc(tix);
if (rp) {
ERTS_PROCS_ASSERT(is_internal_pid(rp->id));
pbdp->debug.correct_pids[cpix++] = rp->id;
@@ -9979,14 +10099,13 @@ debug_processes_verify_all_pids(ErtsProcessesBifData *pbdp)
static void
debug_processes_check_term_proc_list(void)
{
- ERTS_SMP_LC_ASSERT(erts_lc_mtx_is_locked(&proc_tab_mtx));
+ ERTS_SMP_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&erts_proc_tab_rwmtx));
if (!saved_term_procs.start)
ERTS_PROCS_ASSERT(!saved_term_procs.end);
else {
- SysTimeval tv_now;
- SysTimeval *prev_xtvp = NULL;
+ Uint64 curr_interval = get_proc_interval();
+ Uint64 *prev_x_interval_p = NULL;
ErtsTermProcElement *tpep;
- erts_get_emu_time(&tv_now);
for (tpep = saved_term_procs.start; tpep; tpep = tpep->next) {
if (!tpep->prev)
@@ -9998,20 +10117,17 @@ debug_processes_check_term_proc_list(void)
else
ERTS_PROCS_ASSERT(tpep->next->prev == tpep);
if (tpep->ix < 0) {
- SysTimeval *tvp = &tpep->u.bif_invocation.time;
- ERTS_PROCS_ASSERT(erts_cmp_timeval(&debug_tv_start, tvp) < 0
- && erts_cmp_timeval(tvp, &tv_now) < 0);
+ Uint64 interval = tpep->u.bif_invocation.interval;
+ ERTS_PROCS_ASSERT(interval <= curr_interval);
}
else {
- SysTimeval *stvp = &tpep->u.process.spawned;
- SysTimeval *xtvp = &tpep->u.process.exited;
+ Uint64 s_interval = tpep->u.process.spawned;
+ Uint64 x_interval = tpep->u.process.exited;
- ERTS_PROCS_ASSERT(erts_cmp_timeval(&debug_tv_start,
- stvp) < 0);
- ERTS_PROCS_ASSERT(erts_cmp_timeval(stvp, xtvp) < 0);
- if (prev_xtvp)
- ERTS_PROCS_ASSERT(erts_cmp_timeval(prev_xtvp, xtvp) < 0);
- prev_xtvp = xtvp;
+ ERTS_PROCS_ASSERT(s_interval <= x_interval);
+ if (prev_x_interval_p)
+ ERTS_PROCS_ASSERT(*prev_x_interval_p <= x_interval);
+ prev_x_interval_p = &tpep->u.process.exited;
ERTS_PROCS_ASSERT(is_internal_pid(tpep->u.process.pid));
ERTS_PROCS_ASSERT(tpep->ix
== internal_pid_index(tpep->u.process.pid));