diff options
Diffstat (limited to 'erts/emulator/beam/erl_process.c')
-rw-r--r-- | erts/emulator/beam/erl_process.c | 2992 |
1 files changed, 1416 insertions, 1576 deletions
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 7174991ade..ba46b9011d 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -94,12 +94,44 @@ #define HIGH_BIT (1 << PRIORITY_HIGH) #define NORMAL_BIT (1 << PRIORITY_NORMAL) #define LOW_BIT (1 << PRIORITY_LOW) +#define PORT_BIT (1 << ERTS_PORT_PRIO_LEVEL) -#define ERTS_EMPTY_RUNQ(RQ) \ - ((RQ)->len == 0 && (RQ)->misc.start == NULL) +#define ERTS_EMPTY_RUNQ(RQ) \ + ((ERTS_RUNQ_FLGS_GET_NOB((RQ)) & ERTS_RUNQ_FLGS_QMASK) == 0 \ + && (RQ)->misc.start == NULL) + +#undef RUNQ_READ_RQ +#undef RUNQ_SET_RQ +#define RUNQ_READ_RQ(X) ((ErtsRunQueue *) erts_smp_atomic_read_nob((X))) +#define RUNQ_SET_RQ(X, RQ) erts_smp_atomic_set_nob((X), (erts_aint_t) (RQ)) + +#ifdef DEBUG +# if defined(ARCH_64) && !HALFWORD_HEAP +# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) \ + (RUNQ_SET_RQ((RQP), (0xdeadbeefdead0003LL | ((N) << 4))) +# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) \ +do { \ + ASSERT((RQP) != NULL); \ + ASSERT(((((Uint) (RQP)) & ((Uint) 0x3))) == ((Uint) 0)); \ + ASSERT((((Uint) (RQP)) & ~((Uint) 0xffff)) != ((Uint) 0xdeadbeefdead0000LL));\ +} while (0) +# else +# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) \ + (RUNQ_SET_RQ((RQP), (0xdead0003 | ((N) << 4)))) +# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) \ +do { \ + ASSERT((RQP) != NULL); \ + ASSERT(((((UWord) (RQP)) & ((UWord) 1))) == ((UWord) 0)); \ + ASSERT((((UWord) (RQP)) & ~((UWord) 0xffff)) != ((UWord) 0xdead0000)); \ +} while (0) +# endif +#else +# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) +# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) +#endif #define ERTS_EMPTY_RUNQ_PORTS(RQ) \ - ((RQ)->ports.info.len == 0 && (RQ)->misc.start == NULL) + (RUNQ_READ_LEN(&(RQ)->ports.info.len) == 0 && (RQ)->misc.start == NULL) extern BeamInstr beam_apply[]; extern BeamInstr beam_exit[]; @@ -312,7 +344,7 @@ static struct { struct { int active_runqs; int reds; - int max_len; + erts_aint32_t max_len; } prev_rise; Uint n; } balance_info; @@ -1884,8 +1916,8 @@ sched_waiting_sys(Uint no, ErtsRunQueue *rq) { ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); ASSERT(rq->waiting >= 0); - rq->flags |= (ERTS_RUNQ_FLG_OUT_OF_WORK - | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK); + ERTS_RUNQ_FLGS_SET(rq, (ERTS_RUNQ_FLG_OUT_OF_WORK + | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK)); rq->waiting++; rq->waiting *= -1; rq->woken = 0; @@ -1961,8 +1993,8 @@ static ERTS_INLINE void sched_waiting(Uint no, ErtsRunQueue *rq) { ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); - rq->flags |= (ERTS_RUNQ_FLG_OUT_OF_WORK - | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK); + ERTS_RUNQ_FLGS_SET(rq, (ERTS_RUNQ_FLG_OUT_OF_WORK + | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK)); if (rq->waiting < 0) rq->waiting--; else @@ -1994,9 +2026,8 @@ ongoing_multi_scheduling_block(void) static ERTS_INLINE void empty_runq(ErtsRunQueue *rq) { - erts_aint32_t oifls = erts_smp_atomic32_read_band_nob(&rq->info_flags, - ~ERTS_RUNQ_IFLG_NONEMPTY); - if (oifls & ERTS_RUNQ_IFLG_NONEMPTY) { + Uint32 old_flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED); + if (old_flags & ERTS_RUNQ_FLG_NONEMPTY) { #ifdef DEBUG erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues); /* @@ -2012,9 +2043,8 @@ empty_runq(ErtsRunQueue *rq) static ERTS_INLINE void non_empty_runq(ErtsRunQueue *rq) { - erts_aint32_t oifls = erts_smp_atomic32_read_bor_nob(&rq->info_flags, - ERTS_RUNQ_IFLG_NONEMPTY); - if (!(oifls & ERTS_RUNQ_IFLG_NONEMPTY)) { + Uint32 old_flags = ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_NONEMPTY); + if (!(old_flags & ERTS_RUNQ_FLG_NONEMPTY)) { #ifdef DEBUG erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues); /* @@ -2627,19 +2657,16 @@ try_inc_no_active_runqs(int active) static ERTS_INLINE int chk_wake_sched(ErtsRunQueue *crq, int ix, int activate) { - erts_aint32_t iflgs; + Uint32 flags; ErtsRunQueue *wrq; if (crq->ix == ix) return 0; wrq = ERTS_RUNQ_IX(ix); - iflgs = erts_smp_atomic32_read_nob(&wrq->info_flags); - if (!(iflgs & (ERTS_RUNQ_IFLG_SUSPENDED|ERTS_RUNQ_IFLG_NONEMPTY))) { + flags = ERTS_RUNQ_FLGS_GET(wrq); + if (!(flags & (ERTS_RUNQ_FLG_SUSPENDED|ERTS_RUNQ_FLG_NONEMPTY))) { if (activate) { - if (try_inc_no_active_runqs(ix+1)) { - erts_smp_xrunq_lock(crq, wrq); - wrq->flags &= ~ERTS_RUNQ_FLG_INACTIVE; - erts_smp_xrunq_unlock(crq, wrq); - } + if (try_inc_no_active_runqs(ix+1)) + ERTS_RUNQ_FLGS_UNSET(wrq, ERTS_RUNQ_FLG_INACTIVE); } wake_scheduler(wrq, 0); return 1; @@ -2706,9 +2733,7 @@ erts_sched_notify_check_cpu_bind(void) int ix; for (ix = 0; ix < erts_no_run_queues; ix++) { ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); - erts_smp_runq_lock(rq); - rq->flags |= ERTS_RUNQ_FLG_CHK_CPU_BIND; - erts_smp_runq_unlock(rq); + ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_CHK_CPU_BIND); wake_scheduler(rq, 0); } #else @@ -2717,409 +2742,547 @@ erts_sched_notify_check_cpu_bind(void) } -#ifdef ERTS_SMP +static ERTS_INLINE void +enqueue_process(ErtsRunQueue *runq, int prio, Process *p) +{ + ErtsRunPrioQueue *rpq; -ErtsRunQueue * -erts_prepare_emigrate(ErtsRunQueue *c_rq, ErtsRunQueueInfo *c_rqi, int prio) + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); + + erts_smp_inc_runq_len(runq, &runq->procs.prio_info[prio], prio); + + if (prio == PRIORITY_LOW) { + p->schedule_count = RESCHEDULE_LOW; + rpq = &runq->procs.prio[PRIORITY_NORMAL]; + } + else { + p->schedule_count = 1; + rpq = &runq->procs.prio[prio]; + } + + p->next = NULL; + if (rpq->last) + rpq->last->next = p; + else + rpq->first = p; + rpq->last = p; +} + + +static ERTS_INLINE void +unqueue_process(ErtsRunQueue *runq, + ErtsRunPrioQueue *rpq, + ErtsRunQueueInfo *rqi, + int prio, + Process *prev_proc, + Process *proc) { - ASSERT(ERTS_CHK_RUNQ_FLG_EMIGRATE(c_rq->flags, prio)); - ASSERT(ERTS_CHK_RUNQ_FLG_EVACUATE(c_rq->flags, prio) - || c_rqi->len >= c_rqi->migrate.limit.this); + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); - while (1) { - ErtsRunQueue *n_rq = c_rqi->migrate.runq; - ERTS_DBG_VERIFY_VALID_RUNQP(n_rq); - erts_smp_xrunq_lock(c_rq, n_rq); - - /* - * erts_smp_xrunq_lock() may release lock on c_rq! We have - * to check that we still want to emigrate and emigrate - * to the same run queue as before. - */ + if (prev_proc) + prev_proc->next = proc->next; + else + rpq->first = proc->next; + if (!proc->next) + rpq->last = prev_proc; - if (ERTS_CHK_RUNQ_FLG_EMIGRATE(c_rq->flags, prio)) { - Uint32 force = (ERTS_CHK_RUNQ_FLG_EVACUATE(c_rq->flags, prio) - | (c_rq->flags & ERTS_RUNQ_FLG_INACTIVE)); - if (force || c_rqi->len > c_rqi->migrate.limit.this) { - ErtsRunQueueInfo *n_rqi; - /* We still want to emigrate */ - - if (n_rq != c_rqi->migrate.runq) { - /* Ahh... run queue changed; need to do it all over again... */ - erts_smp_runq_unlock(n_rq); - continue; - } - else { + if (!rpq->first) + rpq->last = NULL; - if (prio == ERTS_PORT_PRIO_LEVEL) - n_rqi = &n_rq->ports.info; - else - n_rqi = &n_rq->procs.prio_info[prio]; + erts_smp_dec_runq_len(runq, rqi, prio); +} - if (force || (n_rqi->len < c_rqi->migrate.limit.other)) { - /* emigrate ... */ - return n_rq; - } - } - } - } - ASSERT(n_rq != c_rq); - erts_smp_runq_unlock(n_rq); - if (!(c_rq->flags & ERTS_RUNQ_FLG_INACTIVE)) { - /* No more emigrations to this runq */ - ERTS_UNSET_RUNQ_FLG_EMIGRATE(c_rq->flags, prio); - ERTS_DBG_SET_INVALID_RUNQP(c_rqi->migrate.runq, 0x3); - } +static ERTS_INLINE Process * +dequeue_process(ErtsRunQueue *runq, int prio_q, erts_aint32_t *statep) +{ + erts_aint32_t state; + int prio; + ErtsRunPrioQueue *rpq; + ErtsRunQueueInfo *rqi; + Process *p; + + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); + + ASSERT(PRIORITY_NORMAL == prio_q + || PRIORITY_HIGH == prio_q + || PRIORITY_MAX == prio_q); + rpq = &runq->procs.prio[prio_q]; + p = rpq->first; + if (!p) return NULL; + + ERTS_SMP_DATA_DEPENDENCY_READ_MEMORY_BARRIER; + + state = erts_smp_atomic32_read_nob(&p->state); + if (statep) + *statep = state; + + prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + + rqi = &runq->procs.prio_info[prio]; + + if (p) + unqueue_process(runq, rpq, rqi, prio, NULL, p); + + return p; +} + +static ERTS_INLINE int +check_requeue_process(ErtsRunQueue *rq, int prio_q) +{ + ErtsRunPrioQueue *rpq = &rq->procs.prio[prio_q]; + Process *p = rpq->first; + if (--p->schedule_count > 0 && p != rpq->last) { + /* reschedule */ + rpq->first = p->next; + rpq->last->next = p; + rpq->last = p; + p->next = NULL; + return 1; + } + return 0; +} + +#ifdef ERTS_SMP + +static ErtsRunQueue * +check_immigration_need(ErtsRunQueue *c_rq, ErtsMigrationPath *mp, int prio) +{ + int len; + Uint32 f_flags, f_rq_flags; + ErtsRunQueue *f_rq; + + f_flags = mp->prio[prio].flags; + + ASSERT(ERTS_CHK_RUNQ_FLG_IMMIGRATE(mp->flags, prio)); + + f_rq = mp->prio[prio].runq; + if (!f_rq) + return NULL; + + f_rq_flags = ERTS_RUNQ_FLGS_GET(f_rq); + if (f_rq_flags & ERTS_RUNQ_FLG_PROTECTED) + return NULL; + + if (ERTS_CHK_RUNQ_FLG_EVACUATE(f_flags, prio)) + return f_rq; + + if (f_rq_flags & ERTS_RUNQ_FLG_INACTIVE) + return f_rq; + + if (prio == ERTS_PORT_PRIO_LEVEL) + len = RUNQ_READ_LEN(&c_rq->ports.info.len); + else + len = RUNQ_READ_LEN(&c_rq->procs.prio_info[prio].len); + + if (len < mp->prio[prio].limit.this) { + if (prio == ERTS_PORT_PRIO_LEVEL) + len = RUNQ_READ_LEN(&f_rq->ports.info.len); + else + len = RUNQ_READ_LEN(&f_rq->procs.prio_info[prio].len); + + if (len > mp->prio[prio].limit.other) + return f_rq; } + return NULL; } static void -immigrate(ErtsRunQueue *rq) +immigrate(ErtsRunQueue *c_rq, ErtsMigrationPath *mp) { - int prio; + Uint32 iflags, iflag; + erts_smp_runq_unlock(c_rq); - ASSERT(rq->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK); + ASSERT(erts_thr_progress_is_managed_thread()); - for (prio = 0; prio < ERTS_NO_PRIO_LEVELS; prio++) { - if (ERTS_CHK_RUNQ_FLG_IMMIGRATE(rq->flags, prio)) { - ErtsRunQueueInfo *rqi = (prio == ERTS_PORT_PRIO_LEVEL - ? &rq->ports.info - : &rq->procs.prio_info[prio]); - ErtsRunQueue *from_rq = rqi->migrate.runq; - int rq_locked, from_rq_locked; + iflags = mp->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK; - ERTS_DBG_VERIFY_VALID_RUNQP(from_rq); + iflag = iflags & -iflags; - rq_locked = 1; - from_rq_locked = 1; - erts_smp_xrunq_lock(rq, from_rq); - /* - * erts_smp_xrunq_lock() may release lock on rq! We have - * to check that we still want to immigrate from the same - * run queue as before. - */ - if (ERTS_CHK_RUNQ_FLG_IMMIGRATE(rq->flags, prio) - && from_rq == rqi->migrate.runq) { - ErtsRunQueueInfo *from_rqi = (prio == ERTS_PORT_PRIO_LEVEL - ? &from_rq->ports.info - : &from_rq->procs.prio_info[prio]); - if ((ERTS_CHK_RUNQ_FLG_EVACUATE(rq->flags, prio) - && ERTS_CHK_RUNQ_FLG_EVACUATE(from_rq->flags, prio) - && from_rqi->len) - || (from_rqi->len > rqi->migrate.limit.other - && rqi->len < rqi->migrate.limit.this)) { - if (prio == ERTS_PORT_PRIO_LEVEL) { - Port *prt = from_rq->ports.start; - if (prt) { - int prt_locked = 0; - (void) erts_port_migrate(prt, &prt_locked, - from_rq, &from_rq_locked, - rq, &rq_locked); - if (prt_locked) - erts_smp_port_unlock(prt); - } - } - else { - Process *proc; - ErtsRunPrioQueue *from_rpq; - from_rpq = (prio == PRIORITY_LOW - ? &from_rq->procs.prio[PRIORITY_NORMAL] - : &from_rq->procs.prio[prio]); - for (proc = from_rpq->first; proc; proc = proc->next) - if (proc->prio == prio && !proc->bound_runq) - break; - if (proc) { - ErtsProcLocks proc_locks = 0; - (void) erts_proc_migrate(proc, &proc_locks, - from_rq, &from_rq_locked, - rq, &rq_locked); - if (proc_locks) - erts_smp_proc_unlock(proc, proc_locks); - } + while (iflag) { + ErtsRunQueue *rq; + int prio; + + switch (iflag) { + case (MAX_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT): + prio = PRIORITY_MAX; + break; + case (HIGH_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT): + prio = PRIORITY_HIGH; + break; + case (NORMAL_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT): + prio = PRIORITY_NORMAL; + break; + case (LOW_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT): + prio = PRIORITY_LOW; + break; + case (PORT_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT): + prio = ERTS_PORT_PRIO_LEVEL; + break; + default: + erl_exit(ERTS_ABORT_EXIT, + "%s:%d:%s(): Invalid immigrate queue mask", + __FILE__, __LINE__, __func__); + prio = 0; + break; + } + + iflags &= ~iflag; + iflag = iflags & -iflags; + + rq = check_immigration_need(c_rq, mp, prio); + if (rq) { + erts_smp_runq_lock(rq); + if (prio == ERTS_PORT_PRIO_LEVEL) { + Port *prt; + prt = erts_dequeue_port(rq); + if (prt) + RUNQ_SET_RQ(&prt->run_queue, c_rq); + erts_smp_runq_unlock(rq); + if (prt) { + /* port might terminate while we have no lock... */ + rq = erts_port_runq(prt); + if (rq) { + if (rq != c_rq) + erl_exit(ERTS_ABORT_EXIT, + "%s:%d:%s(): Internal error", + __FILE__, __LINE__, __func__); + erts_enqueue_port(c_rq, prt); + if (!iflag) + return; /* done */ + erts_smp_runq_unlock(c_rq); } } - else { - ERTS_UNSET_RUNQ_FLG_IMMIGRATE(rq->flags, prio); - ERTS_DBG_SET_INVALID_RUNQP(rqi->migrate.runq, 0x1); + } + else { + ErtsRunPrioQueue *rpq = &rq->procs.prio[prio == PRIORITY_LOW + ? PRIORITY_NORMAL + : prio]; + Process *prev_proc = NULL; + Process *proc = rpq->first; + int rq_locked = 1; + + while (proc) { + erts_aint32_t state; + state = erts_smp_atomic32_read_acqb(&proc->state); + if (!(ERTS_PSFLG_BOUND & state) + && (prio == (int) (ERTS_PSFLG_PRIO_MASK & state))) { + ErtsRunQueueInfo *rqi = &rq->procs.prio_info[prio]; + unqueue_process(rq, rpq, rqi, prio, prev_proc, proc); + erts_smp_runq_unlock(rq); + RUNQ_SET_RQ(&proc->run_queue, c_rq); + rq_locked = 0; + + erts_smp_runq_lock(c_rq); + enqueue_process(c_rq, prio, proc); + if (!iflag) + return; /* done */ + erts_smp_runq_unlock(c_rq); + break; + } + prev_proc = proc; + proc = proc->next; } + if (rq_locked) + erts_smp_runq_unlock(rq); } - if (from_rq_locked) - erts_smp_runq_unlock(from_rq); - if (!rq_locked) - erts_smp_runq_lock(rq); } } + + erts_smp_runq_lock(c_rq); +} + +static ERTS_INLINE void +suspend_run_queue(ErtsRunQueue *rq) +{ + erts_smp_atomic32_read_bor_nob(&rq->scheduler->ssi->flags, + ERTS_SSI_FLG_SUSPENDED); + ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_SUSPENDED); + + wake_scheduler(rq, 0); +} + +static void scheduler_ix_resume_wake(Uint ix); + +static ERTS_INLINE void +resume_run_queue(ErtsRunQueue *rq) +{ + int pix; + + erts_smp_runq_lock(rq); + + (void) ERTS_RUNQ_FLGS_MASK_SET(rq, + (ERTS_RUNQ_FLG_OUT_OF_WORK + | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK + | ERTS_RUNQ_FLG_SUSPENDED), + (ERTS_RUNQ_FLG_OUT_OF_WORK + | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK)); + + rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; + for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) { + rq->procs.prio_info[pix].max_len = 0; + rq->procs.prio_info[pix].reds = 0; + } + rq->ports.info.max_len = 0; + rq->ports.info.reds = 0; + rq->max_len = 0; + + erts_smp_runq_unlock(rq); + + scheduler_ix_resume_wake(rq->ix); } +typedef struct { + Process *first; + Process *last; +} ErtsStuckBoundProcesses; + static void -evacuate_run_queue(ErtsRunQueue *evac_rq, ErtsRunQueue *rq) +schedule_bound_processes(ErtsRunQueue *rq, + ErtsStuckBoundProcesses *sbpp) { - Port *prt; - int notify_to_rq = 0; - int prio; - int prt_locked = 0; - int rq_locked = 0; - int evac_rq_locked = 1; - ErtsMigrateResult mres; + Process *proc, *next; + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); + + proc = sbpp->first; + while (proc) { + erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state); + next = proc->next; + enqueue_process(rq, (int) (ERTS_PSFLG_PRIO_MASK & state), proc); + proc = next; + } +} - erts_smp_runq_lock(evac_rq); +static void +evacuate_run_queue(ErtsRunQueue *rq, + ErtsStuckBoundProcesses *sbpp) +{ + int prio_q; + ErtsRunQueue *to_rq; + ErtsMigrationPaths *mps; + ErtsMigrationPath *mp; - erts_smp_atomic32_read_bor_nob(&evac_rq->scheduler->ssi->flags, - ERTS_SSI_FLG_SUSPENDED); + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); - evac_rq->flags &= ~ERTS_RUNQ_FLGS_IMMIGRATE_QMASK; - evac_rq->flags |= (ERTS_RUNQ_FLGS_EMIGRATE_QMASK - | ERTS_RUNQ_FLGS_EVACUATE_QMASK - | ERTS_RUNQ_FLG_SUSPENDED); + ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED); - erts_smp_atomic32_read_bor_nob(&evac_rq->info_flags, ERTS_RUNQ_IFLG_SUSPENDED); - /* - * Need to set up evacuation paths first since we - * may release the run queue lock on evac_rq - * when evacuating. - */ - evac_rq->misc.evac_runq = rq; - evac_rq->ports.info.migrate.runq = rq; - for (prio = 0; prio < ERTS_NO_PROC_PRIO_LEVELS; prio++) - evac_rq->procs.prio_info[prio].migrate.runq = rq; + mps = erts_get_migration_paths_managed(); + mp = &mps->mpath[rq->ix]; /* Evacuate scheduled misc ops */ - if (evac_rq->misc.start) { - rq_locked = 1; - erts_smp_xrunq_lock(evac_rq, rq); - if (rq->misc.end) - rq->misc.end->next = evac_rq->misc.start; + if (rq->misc.start) { + ErtsMiscOpList *start, *end; + + to_rq = mp->misc_evac_runq; + if (!to_rq) + return; + + start = rq->misc.start; + end = rq->misc.end; + rq->misc.start = NULL; + rq->misc.end = NULL; + erts_smp_runq_unlock(rq); + + erts_smp_runq_lock(to_rq); + if (to_rq->misc.end) + to_rq->misc.end->next = start; else - rq->misc.start = evac_rq->misc.start; - rq->misc.end = evac_rq->misc.end; - evac_rq->misc.start = NULL; - evac_rq->misc.end = NULL; + to_rq->misc.start = start; + + to_rq->misc.end = end; + erts_smp_runq_unlock(to_rq); + smp_notify_inc_runq(to_rq); + erts_smp_runq_lock(to_rq); } - /* Evacuate scheduled ports */ - prt = evac_rq->ports.start; - while (prt) { - mres = erts_port_migrate(prt, &prt_locked, - evac_rq, &evac_rq_locked, - rq, &rq_locked); - if (mres == ERTS_MIGRATE_SUCCESS) - notify_to_rq = 1; - if (prt_locked) - erts_smp_port_unlock(prt); - if (!evac_rq_locked) { - evac_rq_locked = 1; - erts_smp_runq_lock(evac_rq); + if (rq->ports.start) { + Port *prt; + + to_rq = mp->prio[ERTS_PORT_PRIO_LEVEL].runq; + if (!to_rq) + return; + + /* Evacuate scheduled ports */ + prt = rq->ports.start; + while (prt) { + ErtsRunQueue *prt_rq; + prt = erts_dequeue_port(rq); + RUNQ_SET_RQ(&prt->run_queue, to_rq); + erts_smp_runq_unlock(rq); + /* + * The port might terminate while + * we have no lock on it... + */ + prt_rq = erts_port_runq(prt); + if (prt_rq) { + if (prt_rq != to_rq) + erl_exit(ERTS_ABORT_EXIT, + "%s:%d:%s() internal error\n", + __FILE__, __LINE__, __func__); + erts_enqueue_port(to_rq, prt); + erts_smp_runq_unlock(to_rq); + } + erts_smp_runq_lock(rq); + prt = rq->ports.start; } - prt = evac_rq->ports.start; + smp_notify_inc_runq(to_rq); } /* Evacuate scheduled processes */ - for (prio = 0; prio < ERTS_NO_PROC_PRIO_LEVELS; prio++) { + for (prio_q = 0; prio_q < ERTS_NO_PROC_PRIO_QUEUES; prio_q++) { + erts_aint32_t state; Process *proc; + int notify = 0; + to_rq = NULL; - switch (prio) { - case PRIORITY_MAX: - case PRIORITY_HIGH: - case PRIORITY_NORMAL: - proc = evac_rq->procs.prio[prio].first; - while (proc) { - ErtsProcLocks proc_locks = 0; - - /* Bound processes are stuck... */ - while (proc->bound_runq) { - proc = proc->next; - if (!proc) - goto end_of_proc; - } - - mres = erts_proc_migrate(proc, &proc_locks, - evac_rq, &evac_rq_locked, - rq, &rq_locked); - if (mres == ERTS_MIGRATE_SUCCESS) - notify_to_rq = 1; - if (proc_locks) - erts_smp_proc_unlock(proc, proc_locks); - if (!evac_rq_locked) { - erts_smp_runq_lock(evac_rq); - evac_rq_locked = 1; - } + if (!mp->prio[prio_q].runq) + return; + if (prio_q == PRIORITY_NORMAL && !mp->prio[PRIORITY_LOW].runq) + return; - proc = evac_rq->procs.prio[prio].first; + proc = dequeue_process(rq, prio_q, &state); + while (proc) { + if (ERTS_PSFLG_BOUND & state) { + /* Bound processes get stuck here... */ + proc->next = NULL; + if (sbpp->last) + sbpp->last->next = proc; + else + sbpp->first = proc; + sbpp->last = proc; } + else { + int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + erts_smp_runq_unlock(rq); - end_of_proc: + to_rq = mp->prio[prio].runq; + RUNQ_SET_RQ(&proc->run_queue, to_rq); -#ifdef DEBUG - for (proc = evac_rq->procs.prio[prio].first; - proc; - proc = proc->next) { - ASSERT(proc->bound_runq); + erts_smp_runq_lock(to_rq); + enqueue_process(to_rq, prio, proc); + erts_smp_runq_unlock(to_rq); + notify = 1; + + erts_smp_runq_lock(rq); } -#endif - break; - case PRIORITY_LOW: - break; - default: - ASSERT(!"Invalid process priority"); - break; + proc = dequeue_process(rq, prio_q, &state); } + if (notify) + smp_notify_inc_runq(to_rq); } - if (rq_locked) - erts_smp_runq_unlock(rq); - - if (evac_rq_locked) - erts_smp_runq_unlock(evac_rq); - - if (notify_to_rq) - smp_notify_inc_runq(rq); - - wake_scheduler(evac_rq, 0); + if (ERTS_EMPTY_RUNQ(rq)) + empty_runq(rq); } static int -try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq) +try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq, Uint32 flags) { - Process *proc; - int vrq_locked; + Uint32 procs_qmask = flags & ERTS_RUNQ_FLGS_PROCS_QMASK; + int max_prio_bit; + ErtsRunPrioQueue *rpq; - if (*rq_lockedp) - erts_smp_xrunq_lock(rq, vrq); - else - erts_smp_runq_lock(vrq); - vrq_locked = 1; + if (*rq_lockedp) { + erts_smp_runq_unlock(rq); + *rq_lockedp = 0; + } + + ERTS_SMP_LC_ASSERT(!erts_smp_lc_runq_is_locked(rq)); - ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp); - ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked); + erts_smp_runq_lock(vrq); if (rq->halt_in_progress) - goto try_steal_port; + goto no_procs; /* * Check for a runnable process to steal... */ - switch (vrq->flags & ERTS_RUNQ_FLGS_PROCS_QMASK) { - case MAX_BIT: - case MAX_BIT|HIGH_BIT: - case MAX_BIT|NORMAL_BIT: - case MAX_BIT|LOW_BIT: - case MAX_BIT|HIGH_BIT|NORMAL_BIT: - case MAX_BIT|HIGH_BIT|LOW_BIT: - case MAX_BIT|NORMAL_BIT|LOW_BIT: - case MAX_BIT|HIGH_BIT|NORMAL_BIT|LOW_BIT: - for (proc = vrq->procs.prio[PRIORITY_MAX].last; - proc; - proc = proc->prev) { - if (!proc->bound_runq) - break; - } - if (proc) + while (procs_qmask) { + Process *prev_proc; + Process *proc; + + max_prio_bit = procs_qmask & -procs_qmask; + switch (max_prio_bit) { + case MAX_BIT: + rpq = &vrq->procs.prio[PRIORITY_MAX]; break; - case HIGH_BIT: - case HIGH_BIT|NORMAL_BIT: - case HIGH_BIT|LOW_BIT: - case HIGH_BIT|NORMAL_BIT|LOW_BIT: - for (proc = vrq->procs.prio[PRIORITY_HIGH].last; - proc; - proc = proc->prev) { - if (!proc->bound_runq) - break; - } - if (proc) + case HIGH_BIT: + rpq = &vrq->procs.prio[PRIORITY_HIGH]; break; - case NORMAL_BIT: - case LOW_BIT: - case NORMAL_BIT|LOW_BIT: - for (proc = vrq->procs.prio[PRIORITY_NORMAL].last; - proc; - proc = proc->prev) { - if (!proc->bound_runq) - break; - } - if (proc) + case NORMAL_BIT: + case LOW_BIT: + rpq = &vrq->procs.prio[PRIORITY_NORMAL]; break; - case 0: - proc = NULL; - break; - default: - ASSERT(!"Invalid queue mask"); - proc = NULL; - break; - } + case 0: + goto no_procs; + default: + ASSERT(!"Invalid queue mask"); + goto no_procs; + } - if (proc) { - ErtsProcLocks proc_locks = 0; - int res; - ErtsMigrateResult mres; - mres = erts_proc_migrate(proc, &proc_locks, - vrq, &vrq_locked, - rq, rq_lockedp); - if (proc_locks) - erts_smp_proc_unlock(proc, proc_locks); - res = !0; - switch (mres) { - case ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED: - res = 0; - case ERTS_MIGRATE_SUCCESS: - if (vrq_locked) + prev_proc = NULL; + proc = rpq->first; + + while (proc) { + erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state); + if (!(ERTS_PSFLG_BOUND & state)) { + /* Steal process */ + int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + ErtsRunQueueInfo *rqi = &vrq->procs.prio_info[prio]; + unqueue_process(vrq, rpq, rqi, prio, prev_proc, proc); erts_smp_runq_unlock(vrq); - return res; - default: /* Other failures */ - break; - } - } + RUNQ_SET_RQ(&proc->run_queue, rq); - ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp); - ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked); + erts_smp_runq_lock(rq); + *rq_lockedp = 1; + enqueue_process(rq, prio, proc); + return !0; + } + prev_proc = proc; + proc = proc->next; + } - if (!vrq_locked) { - if (*rq_lockedp) - erts_smp_xrunq_lock(rq, vrq); - else - erts_smp_runq_lock(vrq); - vrq_locked = 1; + procs_qmask &= ~max_prio_bit; } - try_steal_port: +no_procs: - ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp); - ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked); + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(vrq)); /* * Check for a runnable port to steal... */ - if (vrq->ports.info.len) { - Port *prt = vrq->ports.end; - int prt_locked = 0; - int res; - ErtsMigrateResult mres; - - mres = erts_port_migrate(prt, &prt_locked, - vrq, &vrq_locked, - rq, rq_lockedp); - if (prt_locked) - erts_smp_port_unlock(prt); - res = !0; - switch (mres) { - case ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED: - res = 0; - case ERTS_MIGRATE_SUCCESS: - if (vrq_locked) - erts_smp_runq_unlock(vrq); - return res; - default: /* Other failures */ - break; + if (vrq->ports.start) { + ErtsRunQueue *prt_rq; + Port *prt = erts_dequeue_port(vrq); + RUNQ_SET_RQ(&prt->run_queue, rq); + erts_smp_runq_unlock(vrq); + + /* + * The port might terminate while + * we have no lock on it... + */ + + prt_rq = erts_port_runq(prt); + if (!prt_rq) + return 0; + else { + if (prt_rq != rq) + erl_exit(ERTS_ABORT_EXIT, + "%s:%d:%s() internal error\n", + __FILE__, __LINE__, __func__); + *rq_lockedp = 1; + erts_enqueue_port(rq, prt); + return !0; } } - if (vrq_locked) - erts_smp_runq_unlock(vrq); + erts_smp_runq_unlock(vrq); return 0; } @@ -3129,9 +3292,10 @@ static ERTS_INLINE int check_possible_steal_victim(ErtsRunQueue *rq, int *rq_lockedp, int vix) { ErtsRunQueue *vrq = ERTS_RUNQ_IX(vix); - erts_aint32_t iflgs = erts_smp_atomic32_read_nob(&vrq->info_flags); - if (iflgs & ERTS_RUNQ_IFLG_NONEMPTY) - return try_steal_task_from_victim(rq, rq_lockedp, vrq); + Uint32 flags = ERTS_RUNQ_FLGS_GET(vrq); + if ((flags & (ERTS_RUNQ_FLG_NONEMPTY + | ERTS_RUNQ_FLG_PROTECTED)) == ERTS_RUNQ_FLG_NONEMPTY) + return try_steal_task_from_victim(rq, rq_lockedp, vrq, flags); else return 0; } @@ -3141,15 +3305,12 @@ static int try_steal_task(ErtsRunQueue *rq) { int res, rq_locked, vix, active_rqs, blnc_rqs; + Uint32 flags; - /* - * We are not allowed to steal jobs to this run queue - * if it is suspended. Note that it might get suspended - * at any time when we don't have the lock on the run - * queue. - */ - if (rq->flags & ERTS_RUNQ_FLG_SUSPENDED) - return 0; + /* Protect jobs we steal from getting stolen from us... */ + flags = ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_PROTECTED); + if (flags & ERTS_RUNQ_FLG_SUSPENDED) + return 0; /* go suspend instead... */ res = 0; rq_locked = 1; @@ -3268,12 +3429,123 @@ do { \ ASSERT(sum__ == (RQ)->full_reds_history_sum); \ } while (0); +#define ERTS_PRE_ALLOCED_MPATHS 8 + +erts_atomic_t erts_migration_paths; + +static struct { + size_t size; + ErtsMigrationPaths *freelist; + struct { + ErtsMigrationPaths *first; + ErtsMigrationPaths *last; + } retired; +} mpaths; + +static void +init_migration_paths(void) +{ + int qix, i; + char *p; + ErtsMigrationPaths *mps; + + mpaths.size = sizeof(ErtsMigrationPaths); + mpaths.size += sizeof(ErtsMigrationPath)*(erts_no_schedulers-1); + mpaths.size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(mpaths.size); + + p = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_LL_MPATHS, + (mpaths.size + * ERTS_PRE_ALLOCED_MPATHS)); + mpaths.freelist = NULL; + for (i = 0; i < ERTS_PRE_ALLOCED_MPATHS-1; i++) { + mps = (ErtsMigrationPaths *) p; + mps->next = mpaths.freelist; + mpaths.freelist = mps; + p += mpaths.size; + } + + mps = (ErtsMigrationPaths *) p; + mps->block = NULL; + for (qix = 0; qix < erts_no_run_queues; qix++) { + int pix; + mps->mpath[qix].flags = 0; + mps->mpath[qix].misc_evac_runq = NULL; + for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { + mps->mpath[qix].prio[pix].limit.this = -1; + mps->mpath[qix].prio[pix].limit.other = -1; + mps->mpath[qix].prio[pix].runq = NULL; + mps->mpath[qix].prio[pix].flags = 0; + } + } + erts_atomic_init_wb(&erts_migration_paths, (erts_aint_t) mps); +} + +static ERTS_INLINE ErtsMigrationPaths * +alloc_mpaths(void) +{ + void *block; + ErtsMigrationPaths *res; + ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&balance_info.update_mtx)); + + res = mpaths.freelist; + if (res) { + mpaths.freelist = res->next; + res->block = NULL; + return res; + } + res = erts_alloc(ERTS_ALC_T_SL_MPATHS, + mpaths.size+ERTS_CACHE_LINE_SIZE); + block = (void *) res; + if (((UWord) res) & ERTS_CACHE_LINE_MASK) + res = (ErtsMigrationPaths *) ((((UWord) res) & ~ERTS_CACHE_LINE_MASK) + + ERTS_CACHE_LINE_SIZE); + res->block = block; + return res; +} + +static ERTS_INLINE void +retire_mpaths(ErtsMigrationPaths *mps) +{ + ErtsThrPrgrVal current; + + ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&balance_info.update_mtx)); + + current = erts_thr_progress_current(); + + while (mpaths.retired.first) { + ErtsMigrationPaths *tmp = mpaths.retired.first; + if (!erts_thr_progress_has_reached_this(current, tmp->thr_prgr)) + break; + mpaths.retired.first = tmp->next; + if (tmp->block) { + erts_free(ERTS_ALC_T_SL_MPATHS, tmp->block); + } + else { + tmp->next = mpaths.freelist; + mpaths.freelist = tmp; + } + } + + if (!mpaths.retired.first) + mpaths.retired.last = NULL; + + mps->thr_prgr = erts_thr_progress_later_than(current); + mps->next = NULL; + + if (mpaths.retired.last) + mpaths.retired.last->next = mps; + else + mpaths.retired.first = mps; + mpaths.retired.last = mps; +} + static void check_balance(ErtsRunQueue *c_rq) { #if ERTS_MAX_PROCESSES >= (1 << 27) # error check_balance() assumes ERTS_MAX_PROCESS < (1 << 27) #endif + ErtsMigrationPaths *new_mpaths, *old_mpaths; ErtsRunQueueBalance avg = {0}; Sint64 scheds_reds, full_scheds_reds; int forced, active, current_active, oowc, half_full_scheds, full_scheds, @@ -3299,9 +3571,9 @@ check_balance(ErtsRunQueue *c_rq) ERTS_FOREACH_RUNQ(rq, { if (rq->waiting) - rq->flags |= ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK; + ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK); else - rq->flags &= ~ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK; + ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK); rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; }); @@ -3343,7 +3615,7 @@ check_balance(ErtsRunQueue *c_rq) ErtsRunQueue *rq = ERTS_RUNQ_IX(qix); erts_smp_runq_lock(rq); - run_queue_info[qix].flags = rq->flags; + run_queue_info[qix].flags = ERTS_RUNQ_FLGS_GET_NOB(rq); for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) { run_queue_info[qix].prio[pix].max_len = rq->procs.prio_info[pix].max_len; @@ -3677,47 +3949,27 @@ erts_fprintf(stderr, "--------------------------------\n"); set_no_active_runqs(active); balance_info.halftime = 1; - erts_smp_atomic32_set_nob(&balance_info.checking_balance, 0); + new_mpaths = alloc_mpaths(); + + /* Write migration paths */ - /* Write migration paths and reset balance statistics in all queues */ for (qix = 0; qix < blnc_no_rqs; qix++) { int mqix; - Uint32 flags; - ErtsRunQueue *rq = ERTS_RUNQ_IX(qix); - ErtsRunQueueInfo *rqi; - flags = run_queue_info[qix].flags; - erts_smp_runq_lock(rq); - flags |= (rq->flags & ~ERTS_RUNQ_FLGS_MIGRATION_INFO); - ASSERT(!(flags & ERTS_RUNQ_FLG_OUT_OF_WORK)); - if (rq->waiting) - flags |= ERTS_RUNQ_FLG_OUT_OF_WORK; - - rq->full_reds_history_sum - = run_queue_info[qix].full_reds_history_sum; - rq->full_reds_history[freds_hist_ix] - = run_queue_info[qix].full_reds_history_change; + Uint32 flags = run_queue_info[qix].flags; + ErtsMigrationPath *mp = &new_mpaths->mpath[qix]; - ERTS_DBG_CHK_FULL_REDS_HISTORY(rq); + mp->flags = flags; + mp->misc_evac_runq = NULL; - rq->out_of_work_count = 0; - rq->flags = flags; - rq->max_len = rq->len; for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { - rqi = (pix == ERTS_PORT_PRIO_LEVEL - ? &rq->ports.info - : &rq->procs.prio_info[pix]); - rqi->max_len = rqi->len; - rqi->reds = 0; if (!(ERTS_CHK_RUNQ_FLG_EMIGRATE(flags, pix) | ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix))) { ASSERT(run_queue_info[qix].prio[pix].immigrate_from < 0); ASSERT(run_queue_info[qix].prio[pix].emigrate_to < 0); -#ifdef DEBUG - rqi->migrate.limit.this = -1; - rqi->migrate.limit.other = -1; - ERTS_DBG_SET_INVALID_RUNQP(rqi->migrate.runq, 0x2); -#endif - + mp->prio[pix].limit.this = -1; + mp->prio[pix].limit.other = -1; + mp->prio[pix].runq = NULL; + mp->prio[pix].flags = 0; } else if (ERTS_CHK_RUNQ_FLG_EMIGRATE(flags, pix)) { ASSERT(!ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix)); @@ -3725,11 +3977,12 @@ erts_fprintf(stderr, "--------------------------------\n"); ASSERT(run_queue_info[qix].prio[pix].emigrate_to >= 0); mqix = run_queue_info[qix].prio[pix].emigrate_to; - rqi->migrate.limit.this + mp->prio[pix].limit.this = run_queue_info[qix].prio[pix].migration_limit; - rqi->migrate.limit.other + mp->prio[pix].limit.other = run_queue_info[mqix].prio[pix].migration_limit; - rqi->migrate.runq = ERTS_RUNQ_IX(mqix); + mp->prio[pix].runq = ERTS_RUNQ_IX(mqix); + mp->prio[pix].flags = run_queue_info[mqix].flags; } else { ASSERT(ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix)); @@ -3737,24 +3990,142 @@ erts_fprintf(stderr, "--------------------------------\n"); ASSERT(run_queue_info[qix].prio[pix].immigrate_from >= 0); mqix = run_queue_info[qix].prio[pix].immigrate_from; - rqi->migrate.limit.this + mp->prio[pix].limit.this = run_queue_info[qix].prio[pix].migration_limit; - rqi->migrate.limit.other + mp->prio[pix].limit.other = run_queue_info[mqix].prio[pix].migration_limit; - rqi->migrate.runq = ERTS_RUNQ_IX(mqix); + mp->prio[pix].runq = ERTS_RUNQ_IX(mqix); + mp->prio[pix].flags = run_queue_info[mqix].flags; } } + } + + old_mpaths = erts_get_migration_paths_managed(); + + /* Keep offline run-queues as is */ + for (qix = blnc_no_rqs; qix < erts_no_schedulers; qix++) { + ErtsMigrationPath *nmp = &new_mpaths->mpath[qix]; + ErtsMigrationPath *omp = &old_mpaths->mpath[qix]; + + nmp->flags = omp->flags; + nmp->misc_evac_runq = omp->misc_evac_runq; + + for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { + nmp->prio[pix].limit.this = omp->prio[pix].limit.this; + nmp->prio[pix].limit.other = omp->prio[pix].limit.other; + nmp->prio[pix].runq = omp->prio[pix].runq; + nmp->prio[pix].flags = omp->prio[pix].flags; + } + } + + + /* Publish new migration paths... */ + erts_atomic_set_wb(&erts_migration_paths, (erts_aint_t) new_mpaths); + + /* Reset balance statistics in all online queues */ + for (qix = 0; qix < blnc_no_rqs; qix++) { + Uint32 flags = run_queue_info[qix].flags; + ErtsRunQueue *rq = ERTS_RUNQ_IX(qix); + + erts_smp_runq_lock(rq); + ASSERT(!(flags & ERTS_RUNQ_FLG_OUT_OF_WORK)); + if (rq->waiting) + flags |= ERTS_RUNQ_FLG_OUT_OF_WORK; + + rq->full_reds_history_sum + = run_queue_info[qix].full_reds_history_sum; + rq->full_reds_history[freds_hist_ix] + = run_queue_info[qix].full_reds_history_change; + + ERTS_DBG_CHK_FULL_REDS_HISTORY(rq); + + rq->out_of_work_count = 0; + (void) ERTS_RUNQ_FLGS_MASK_SET(rq, ERTS_RUNQ_FLGS_MIGRATION_INFO, flags); + + rq->max_len = rq->len; + for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { + ErtsRunQueueInfo *rqi; + rqi = (pix == ERTS_PORT_PRIO_LEVEL + ? &rq->ports.info + : &rq->procs.prio_info[pix]); + erts_smp_reset_max_len(rq, rqi); + rqi->reds = 0; + } rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; erts_smp_runq_unlock(rq); } + erts_smp_atomic32_set_nob(&balance_info.checking_balance, 0); + balance_info.n++; + retire_mpaths(old_mpaths); erts_smp_mtx_unlock(&balance_info.update_mtx); erts_smp_runq_lock(c_rq); } +static void +change_no_used_runqs(int used) +{ + ErtsMigrationPaths *new_mpaths, *old_mpaths; + int active, qix; + erts_smp_mtx_lock(&balance_info.update_mtx); + get_no_runqs(&active, NULL); + set_no_used_runqs(used); + + old_mpaths = erts_get_migration_paths_managed(); + new_mpaths = alloc_mpaths(); + + /* Write migration paths... */ + + for (qix = 0; qix < used; qix++) { + int pix; + ErtsMigrationPath *omp = &old_mpaths->mpath[qix]; + ErtsMigrationPath *nmp = &new_mpaths->mpath[qix]; + + nmp->flags = omp->flags & ~ERTS_RUNQ_FLGS_MIGRATION_QMASKS; + nmp->misc_evac_runq = NULL; + + for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { + nmp->prio[pix].limit.this = -1; + nmp->prio[pix].limit.other = -1; + nmp->prio[pix].runq = NULL; + nmp->prio[pix].flags = 0; + } + } + for (qix = used; qix < erts_no_run_queues; qix++) { + int pix; + ErtsRunQueue *to_rq = ERTS_RUNQ_IX(qix % used); + ErtsMigrationPath *nmp = &new_mpaths->mpath[qix]; + + nmp->flags = (ERTS_RUNQ_FLGS_EMIGRATE_QMASK + | ERTS_RUNQ_FLGS_EVACUATE_QMASK); + nmp->misc_evac_runq = to_rq; + for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { + nmp->prio[pix].limit.this = -1; + nmp->prio[pix].limit.other = -1; + nmp->prio[pix].runq = to_rq; + nmp->prio[pix].flags = 0; + } + } + + /* ... and publish them. */ + erts_atomic_set_wb(&erts_migration_paths, (erts_aint_t) new_mpaths); + + retire_mpaths(old_mpaths); + + /* Make sure that we balance soon... */ + balance_info.forced_check_balance = 1; + + erts_smp_mtx_unlock(&balance_info.update_mtx); + + erts_smp_runq_lock(ERTS_RUNQ_IX(0)); + ERTS_RUNQ_IX(0)->check_balance_reds = 0; + erts_smp_runq_unlock(ERTS_RUNQ_IX(0)); +} + + #endif /* #ifdef ERTS_SMP */ Uint @@ -3846,7 +4217,6 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online) ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); rq->ix = ix; - erts_smp_atomic32_init_nob(&rq->info_flags, ERTS_RUNQ_IFLG_NONEMPTY); /* make sure that the "extra" id correponds to the schedulers * id if the esdp->no <-> ix+1 mapping change. @@ -3857,7 +4227,7 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online) rq->waiting = 0; rq->woken = 0; - rq->flags = 0; + ERTS_RUNQ_FLGS_INIT(rq, ERTS_RUNQ_FLG_NONEMPTY); rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; rq->full_reds_history_sum = 0; for (rix = 0; rix < ERTS_FULL_REDS_HISTORY_SIZE; rix++) { @@ -3871,19 +4241,14 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online) rq->wakeup_other_reds = 0; rq->halt_in_progress = 0; - rq->procs.len = 0; rq->procs.pending_exiters = NULL; rq->procs.context_switches = 0; rq->procs.reductions = 0; for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) { - rq->procs.prio_info[pix].len = 0; + erts_smp_atomic32_init_nob(&rq->procs.prio_info[pix].len, 0); rq->procs.prio_info[pix].max_len = 0; rq->procs.prio_info[pix].reds = 0; - rq->procs.prio_info[pix].migrate.limit.this = 0; - rq->procs.prio_info[pix].migrate.limit.other = 0; - ERTS_DBG_SET_INVALID_RUNQP(rq->procs.prio_info[pix].migrate.runq, - 0x0); if (pix < ERTS_NO_PROC_PRIO_LEVELS - 1) { rq->procs.prio[pix].first = NULL; rq->procs.prio[pix].last = NULL; @@ -3892,14 +4257,10 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online) rq->misc.start = NULL; rq->misc.end = NULL; - rq->misc.evac_runq = NULL; - rq->ports.info.len = 0; + erts_smp_atomic32_init_nob(&rq->ports.info.len, 0); rq->ports.info.max_len = 0; rq->ports.info.reds = 0; - rq->ports.info.migrate.limit.this = 0; - rq->ports.info.migrate.limit.other = 0; - rq->ports.info.migrate.runq = NULL; rq->ports.start = NULL; rq->ports.end = NULL; } @@ -4022,10 +4383,12 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online) balance_info.prev_rise.reds = 0; balance_info.n = 0; + init_migration_paths(); + if (no_schedulers_online < no_schedulers) { + change_no_used_runqs(no_schedulers_online); for (ix = no_schedulers_online; ix < erts_no_run_queues; ix++) - evacuate_run_queue(ERTS_RUNQ_IX(ix), - ERTS_RUNQ_IX(ix % no_schedulers_online)); + suspend_run_queue(ERTS_RUNQ_IX(ix)); } schdlr_sspnd.wait_curr_online = no_schedulers_online; @@ -4088,91 +4451,208 @@ erts_get_scheduler_data(void) #endif -static int remove_proc_from_runq(ErtsRunQueue *rq, Process *p, int to_inactive); +/* + * scheduler_out_process() return with c_rq locked. + */ +static ERTS_INLINE int +schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p) +{ + erts_aint32_t a, e, n; + int res = 0; + + a = state; + + while (1) { + n = e = a; + + ASSERT(a & ERTS_PSFLG_RUNNING); + ASSERT(!(a & ERTS_PSFLG_IN_RUNQ)); + + n &= ~ERTS_PSFLG_RUNNING; + if ((a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) + n |= ERTS_PSFLG_IN_RUNQ; + a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); + if (a == e) + break; + } + + if (!(n & ERTS_PSFLG_IN_RUNQ)) { + if (erts_system_profile_flags.runnable_procs) + profile_runnable_proc(p, am_inactive); + } + else { + int prio = (int) (ERTS_PSFLG_PRIO_MASK & n); + ErtsRunQueue *runq = erts_get_runq_proc(p); + + ASSERT(!(n & ERTS_PSFLG_SUSPENDED)); + +#ifdef ERTS_SMP + if (!(ERTS_PSFLG_BOUND & n)) { + ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio); + if (new_runq) { + RUNQ_SET_RQ(&p->run_queue, new_runq); + runq = new_runq; + } + } +#endif + ASSERT(runq); + res = 1; + + erts_smp_runq_lock(runq); + + /* Enqueue the process */ + enqueue_process(runq, prio, p); + + if (runq == c_rq) + return res; + erts_smp_runq_unlock(runq); + smp_notify_inc_runq(runq); + } + erts_smp_runq_lock(c_rq); + return res; +} static ERTS_INLINE void -suspend_process(ErtsRunQueue *rq, Process *p) +add2runq(Process *p, erts_aint32_t state) { - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); - p->rcount++; /* count number of suspend */ -#ifdef ERTS_SMP - ASSERT(!(p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) - || p == erts_get_current_process()); - ASSERT(p->status != P_RUNNING - || p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING); - if (p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ) - goto runable; -#endif - switch(p->status) { - case P_SUSPENDED: - break; - case P_RUNABLE: + int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + ErtsRunQueue *runq = erts_get_runq_proc(p); + #ifdef ERTS_SMP - runable: - if (!ERTS_PROC_PENDING_EXIT(p)) + if (!(ERTS_PSFLG_BOUND & state)) { + ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio); + if (new_runq) { + RUNQ_SET_RQ(&p->run_queue, new_runq); + runq = new_runq; + } + } #endif - remove_proc_from_runq(rq, p, 1); - /* else: - * leave process in schedq so it will discover the pending exit - */ - p->rstatus = P_RUNABLE; /* wakeup as runnable */ - break; - case P_RUNNING: - p->rstatus = P_RUNABLE; /* wakeup as runnable */ - break; - case P_WAITING: - p->rstatus = P_WAITING; /* wakeup as waiting */ - break; - case P_EXITING: - return; /* ignore this */ - case P_GARBING: - case P_FREE: - erl_exit(1, "bad state in suspend_process()\n"); + ASSERT(runq); + + erts_smp_runq_lock(runq); + + /* Enqueue the process */ + enqueue_process(runq, prio, p); + + erts_smp_runq_unlock(runq); + smp_notify_inc_runq(runq); + +} + +static ERTS_INLINE void +schedule_process(Process *p, erts_aint32_t state, int active_enq) +{ + erts_aint32_t a = state, n; + + while (1) { + erts_aint32_t e; + n = e = a; + ASSERT(!(a & ERTS_PSFLG_FREE)); + n |= ERTS_PSFLG_ACTIVE; + if (!(a & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_RUNNING))) + n |= ERTS_PSFLG_IN_RUNQ; + a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); + if (a == e) + break; + if (!active_enq && (a & ERTS_PSFLG_ACTIVE)) + return; /* Someone else activated process ... */ + } + +#ifdef RRR_DEBUG + if (active_enq) + erts_fprintf(stderr, "! state=0x%x\n", n); +#endif + + if (erts_system_profile_flags.runnable_procs + && !(a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) { + profile_runnable_proc(p, am_active); } - if ((erts_system_profile_flags.runnable_procs) && (p->rcount == 1) && (p->status != P_WAITING)) { - profile_runnable_proc(p, am_inactive); + if ((n & ERTS_PSFLG_IN_RUNQ) && !(a & ERTS_PSFLG_IN_RUNQ)) { +#ifdef RRR_DEBUG + if (active_enq) + erts_fprintf(stderr, "-->\n"); +#endif + add2runq(p, n); } +} - p->status = P_SUSPENDED; - +void +erts_schedule_process(Process *p, erts_aint32_t state) +{ + schedule_process(p, state, 0); } -static ERTS_INLINE void -resume_process(Process *p) +static ERTS_INLINE int +suspend_process(Process *c_p, Process *p) { - Uint32 *statusp; + erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); + int suspended = 0; ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - switch (p->status) { - case P_SUSPENDED: - statusp = &p->status; - break; - case P_GARBING: - if (p->gcstatus == P_SUSPENDED) { - statusp = &p->gcstatus; - break; + + if ((state & ERTS_PSFLG_SUSPENDED)) + suspended = -1; + else { + if (c_p == p) { + state = erts_smp_atomic32_read_bor_relb(&p->state, + ERTS_PSFLG_SUSPENDED); + state |= ERTS_PSFLG_SUSPENDED; + ASSERT(state & ERTS_PSFLG_RUNNING); + suspended = 1; } - /* Fall through */ - default: - return; + else { + while (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_EXITING))) { + erts_aint32_t e, n; + n = e = state; + n |= ERTS_PSFLG_SUSPENDED; + state = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e); + if (state == e) { + state = n; + suspended = 1; + break; + } + } + } + } + + if (state & ERTS_PSFLG_SUSPENDED) { + + ASSERT(!(ERTS_PSFLG_RUNNING & state) + || p == erts_get_current_process()); + + if (erts_system_profile_flags.runnable_procs + && (p->rcount == 0) + && (state & ERTS_PSFLG_ACTIVE)) { + profile_runnable_proc(p, am_inactive); + } + + p->rcount++; /* count number of suspend */ } + return suspended; +} + +static ERTS_INLINE void +resume_process(Process *p) +{ + erts_aint32_t state; + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); ASSERT(p->rcount > 0); if (--p->rcount > 0) /* multiple suspend */ return; - switch(p->rstatus) { - case P_RUNABLE: - erts_add_to_runq(p); - break; - case P_WAITING: - *statusp = P_WAITING; - break; - default: - erl_exit(1, "bad state in resume_process()\n"); + + state = erts_smp_atomic32_read_band_mb(&p->state, ~ERTS_PSFLG_SUSPENDED); + state &= ~ERTS_PSFLG_SUSPENDED; +#ifdef RRR_DEBUG + erts_fprintf(stderr, "%T - state=0x%x\n", p->id, state); +#endif + if ((state & (ERTS_PSFLG_EXITING + | ERTS_PSFLG_ACTIVE + | ERTS_PSFLG_IN_RUNQ + | ERTS_PSFLG_RUNNING)) == ERTS_PSFLG_ACTIVE) { + schedule_process(p, state, 1); } - p->rstatus = P_FREE; } int @@ -4296,6 +4776,7 @@ suspend_scheduler(ErtsSchedulerData *esdp) int wake = 0; erts_aint32_t aux_work; int thr_prgr_active = 1; + ErtsStuckBoundProcesses sbp = {NULL, NULL}; /* * Schedulers may be suspended in two different ways: @@ -4310,6 +4791,8 @@ suspend_scheduler(ErtsSchedulerData *esdp) ASSERT(no != 1); + evacuate_run_queue(esdp->run_queue, &sbp); + erts_smp_runq_unlock(esdp->run_queue); erts_sched_check_cpu_bind_prep_suspend(esdp); @@ -4373,17 +4856,26 @@ suspend_scheduler(ErtsSchedulerData *esdp) erts_smp_mtx_unlock(&schdlr_sspnd.mtx); while (1) { + erts_aint32_t qmask; erts_aint32_t flgs; + qmask = (ERTS_RUNQ_FLGS_GET(esdp->run_queue) + & ERTS_RUNQ_FLGS_QMASK); aux_work = erts_atomic32_read_acqb(&ssi->aux_work); - if (aux_work) { + if (aux_work|qmask) { if (!thr_prgr_active) { erts_thr_progress_active(esdp, thr_prgr_active = 1); sched_wall_time_change(esdp, 1); } - aux_work = handle_aux_work(&esdp->aux_work_data, aux_work); + if (aux_work) + aux_work = handle_aux_work(&esdp->aux_work_data, aux_work); if (aux_work && erts_thr_progress_update(esdp)) erts_thr_progress_leader_update(esdp); + if (qmask) { + erts_smp_runq_lock(esdp->run_queue); + evacuate_run_queue(esdp->run_queue, &sbp); + erts_smp_runq_unlock(esdp->run_queue); + } } if (!aux_work) { @@ -4453,56 +4945,11 @@ suspend_scheduler(ErtsSchedulerData *esdp) erts_smp_runq_lock(esdp->run_queue); non_empty_runq(esdp->run_queue); + schedule_bound_processes(esdp->run_queue, &sbp); + erts_sched_check_cpu_bind_post_suspend(esdp); } -#define ERTS_RUNQ_RESET_SUSPEND_INFO(RQ, DBG_ID) \ -do { \ - int pix__; \ - (RQ)->misc.evac_runq = NULL; \ - (RQ)->ports.info.migrate.runq = NULL; \ - (RQ)->flags &= ~(ERTS_RUNQ_FLGS_IMMIGRATE_QMASK \ - | ERTS_RUNQ_FLGS_EMIGRATE_QMASK \ - | ERTS_RUNQ_FLGS_EVACUATE_QMASK \ - | ERTS_RUNQ_FLG_SUSPENDED); \ - (RQ)->flags |= (ERTS_RUNQ_FLG_OUT_OF_WORK \ - | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK); \ - (RQ)->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; \ - erts_smp_atomic32_read_band_nob(&(RQ)->info_flags, ~ERTS_RUNQ_IFLG_SUSPENDED);\ - for (pix__ = 0; pix__ < ERTS_NO_PROC_PRIO_LEVELS; pix__++) { \ - (RQ)->procs.prio_info[pix__].max_len = 0; \ - (RQ)->procs.prio_info[pix__].reds = 0; \ - ERTS_DBG_SET_INVALID_RUNQP((RQ)->procs.prio_info[pix__].migrate.runq,\ - (DBG_ID)); \ - } \ - (RQ)->ports.info.max_len = 0; \ - (RQ)->ports.info.reds = 0; \ -} while (0) - -#define ERTS_RUNQ_RESET_MIGRATION_PATHS__(RQ) \ -do { \ - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked((RQ))); \ - (RQ)->misc.evac_runq = NULL; \ - (RQ)->ports.info.migrate.runq = NULL; \ - (RQ)->flags &= ~(ERTS_RUNQ_FLGS_IMMIGRATE_QMASK \ - | ERTS_RUNQ_FLGS_EMIGRATE_QMASK \ - | ERTS_RUNQ_FLGS_EVACUATE_QMASK); \ -} while (0) - -#ifdef DEBUG -#define ERTS_RUNQ_RESET_MIGRATION_PATHS(RQ, DBG_ID) \ -do { \ - int pix__; \ - ERTS_RUNQ_RESET_MIGRATION_PATHS__((RQ)); \ - for (pix__ = 0; pix__ < ERTS_NO_PROC_PRIO_LEVELS; pix__++) \ - ERTS_DBG_SET_INVALID_RUNQP((RQ)->procs.prio_info[pix__].migrate.runq,\ - (DBG_ID)); \ -} while (0) -#else -#define ERTS_RUNQ_RESET_MIGRATION_PATHS(RQ, DBG_ID) \ - ERTS_RUNQ_RESET_MIGRATION_PATHS__((RQ)) -#endif - ErtsSchedSuspendResult erts_schedulers_state(Uint *total, Uint *online, @@ -4572,27 +5019,13 @@ erts_set_schedulers_online(Process *p, have_unlocked_plocks = 1; erts_smp_proc_unlock(p, plocks); } - erts_smp_mtx_unlock(&schdlr_sspnd.mtx); - erts_smp_mtx_lock(&balance_info.update_mtx); - for (ix = online; ix < no; ix++) { - ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); - erts_smp_runq_lock(rq); - ERTS_RUNQ_RESET_SUSPEND_INFO(rq, 0x5); - erts_smp_runq_unlock(rq); - scheduler_ix_resume_wake(ix); - } - /* - * Spread evacuation paths among all online - * run queues. - */ - for (ix = no; ix < erts_no_run_queues; ix++) { - ErtsRunQueue *from_rq = ERTS_RUNQ_IX(ix); - ErtsRunQueue *to_rq = ERTS_RUNQ_IX(ix % no); - evacuate_run_queue(from_rq, to_rq); - } - set_no_used_runqs(no); - erts_smp_mtx_unlock(&balance_info.update_mtx); - erts_smp_mtx_lock(&schdlr_sspnd.mtx); + change_no_used_runqs(no); + + for (ix = online; ix < no; ix++) + resume_run_queue(ERTS_RUNQ_IX(ix)); + + for (ix = no; ix < erts_no_run_queues; ix++) + suspend_run_queue(ERTS_RUNQ_IX(ix)); } res = ERTS_SCHDLR_SSPND_DONE; } @@ -4619,25 +5052,11 @@ erts_set_schedulers_online(Process *p, have_unlocked_plocks = 1; erts_smp_proc_unlock(p, plocks); } - erts_smp_mtx_unlock(&schdlr_sspnd.mtx); - erts_smp_mtx_lock(&balance_info.update_mtx); - - for (ix = 0; ix < online; ix++) { - ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); - erts_smp_runq_lock(rq); - ERTS_RUNQ_RESET_MIGRATION_PATHS(rq, 0x6); - erts_smp_runq_unlock(rq); - } - /* - * Evacutation order important! Newly suspended run queues - * has to be evacuated last. - */ - for (ix = erts_no_run_queues-1; ix >= no; ix--) - evacuate_run_queue(ERTS_RUNQ_IX(ix), - ERTS_RUNQ_IX(ix % no)); - set_no_used_runqs(no); - erts_smp_mtx_unlock(&balance_info.update_mtx); - erts_smp_mtx_lock(&schdlr_sspnd.mtx); + + change_no_used_runqs(no); + for (ix = no; ix < erts_no_run_queues; ix++) + suspend_run_queue(ERTS_RUNQ_IX(ix)); + for (ix = no; ix < online; ix++) { ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); wake_scheduler(rq, 0); @@ -4734,25 +5153,14 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all) schdlr_sspnd.msb.wait_active = 2; } - erts_smp_mtx_unlock(&schdlr_sspnd.mtx); - erts_smp_mtx_lock(&balance_info.update_mtx); - set_no_used_runqs(1); - for (ix = 0; ix < online; ix++) { + change_no_used_runqs(1); + for (ix = 1; ix < erts_no_run_queues; ix++) + suspend_run_queue(ERTS_RUNQ_IX(ix)); + + for (ix = 1; ix < online; ix++) { ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); - erts_smp_runq_lock(rq); - ASSERT(!(rq->flags & ERTS_RUNQ_FLG_SUSPENDED)); - ERTS_RUNQ_RESET_MIGRATION_PATHS(rq, 0x7); - erts_smp_runq_unlock(rq); + wake_scheduler(rq, 0); } - /* - * Evacuate all activities in all other run queues - * into the first run queue. Note order is important, - * online run queues has to be evacuated last. - */ - for (ix = erts_no_run_queues-1; ix >= 1; ix--) - evacuate_run_queue(ERTS_RUNQ_IX(ix), ERTS_RUNQ_IX(0)); - erts_smp_mtx_unlock(&balance_info.update_mtx); - erts_smp_mtx_lock(&schdlr_sspnd.mtx); if (erts_smp_atomic32_read_nob(&schdlr_sspnd.active) != schdlr_sspnd.msb.wait_active) { @@ -4796,15 +5204,6 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all) plp = proclist_create(p); plp->next = schdlr_sspnd.msb.procs; schdlr_sspnd.msb.procs = plp; -#ifdef DEBUG - ERTS_FOREACH_RUNQ(srq, - { - if (srq != ERTS_RUNQ_IX(0)) { - ASSERT(ERTS_EMPTY_RUNQ(srq)); - ASSERT(srq->flags & ERTS_RUNQ_FLG_SUSPENDED); - } - }); -#endif ASSERT(p->scheduler_data); } } @@ -4836,27 +5235,6 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all) res = ERTS_SCHDLR_SSPND_DONE_MSCHED_BLOCKED; else { ERTS_SCHDLR_SSPND_CHNG_SET(ERTS_SCHDLR_SSPND_CHNG_MSB, 0); -#ifdef DEBUG - ERTS_FOREACH_RUNQ(rq, - { - if (rq != p->scheduler_data->run_queue) { - if (!ERTS_EMPTY_RUNQ(rq)) { - Process *rp; - int pix; - ASSERT(rq->ports.info.len == 0); - for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) { - for (rp = rq->procs.prio[pix].first; - rp; - rp = rp->next) { - ASSERT(rp->bound_runq); - } - } - } - - ASSERT(rq->flags & ERTS_RUNQ_FLG_SUSPENDED); - } - }); -#endif p->flags &= ~F_HAVE_BLCKD_MSCHED; schdlr_sspnd.msb.ongoing = 0; if (schdlr_sspnd.online == 1) { @@ -4866,35 +5244,19 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all) } else { int online = schdlr_sspnd.online; - erts_smp_mtx_unlock(&schdlr_sspnd.mtx); if (plocks) { have_unlocked_plocks = 1; erts_smp_proc_unlock(p, plocks); } - erts_smp_mtx_lock(&balance_info.update_mtx); + + change_no_used_runqs(online); /* Resume all online run queues */ - for (ix = 1; ix < online; ix++) { - ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); - erts_smp_runq_lock(rq); - ERTS_RUNQ_RESET_SUSPEND_INFO(rq, 0x4); - erts_smp_runq_unlock(rq); - scheduler_ix_resume_wake(ix); - } + for (ix = 1; ix < online; ix++) + resume_run_queue(ERTS_RUNQ_IX(ix)); - /* Spread evacuation paths among all online run queues */ for (ix = online; ix < erts_no_run_queues; ix++) - evacuate_run_queue(ERTS_RUNQ_IX(ix), - ERTS_RUNQ_IX(ix % online)); - - set_no_used_runqs(online); - /* Make sure that we balance soon... */ - balance_info.forced_check_balance = 1; - erts_smp_runq_lock(ERTS_RUNQ_IX(0)); - ERTS_RUNQ_IX(0)->check_balance_reds = 0; - erts_smp_runq_unlock(ERTS_RUNQ_IX(0)); - erts_smp_mtx_unlock(&balance_info.update_mtx); - erts_smp_mtx_lock(&schdlr_sspnd.mtx); + suspend_run_queue(ERTS_RUNQ_IX(ix)); } res = ERTS_SCHDLR_SSPND_DONE; } @@ -5201,10 +5563,7 @@ handle_pend_sync_suspend(Process *suspendee, if (suspender) { ASSERT(is_nil(suspender->suspendee)); if (suspendee_alive) { - ErtsRunQueue *rq = erts_get_runq_proc(suspendee); - erts_smp_runq_lock(rq); - suspend_process(rq, suspendee); - erts_smp_runq_unlock(rq); + erts_suspend(suspendee, suspendee_locks, NULL); suspender->suspendee = suspendee->id; } /* suspender is suspended waiting for suspendee to suspend; @@ -5247,10 +5606,9 @@ pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks, resume_process(rp); } else { - ErtsRunQueue *cp_rq, *rp_rq; rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS, - pid, ERTS_PROC_LOCK_STATUS); + pid, pid_locks|ERTS_PROC_LOCK_STATUS); if (!rp) { c_p->flags &= ~F_P2PNR_RESCHED; @@ -5259,58 +5617,36 @@ pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks, ASSERT(!(c_p->flags & F_P2PNR_RESCHED)); - cp_rq = erts_get_runq_proc(c_p); - rp_rq = erts_get_runq_proc(rp); - erts_smp_runqs_lock(cp_rq, rp_rq); - if (rp->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) { - running: - /* Phiu... */ - - /* - * If we got pending suspenders and suspend ourselves waiting - * to suspend another process we might deadlock. - * In this case we have to yield, be suspended by - * someone else and then do it all over again. - */ - if (!c_p->pending_suspenders) { - /* Mark rp pending for suspend by c_p */ - add_pend_suspend(rp, c_p->id, handle_pend_sync_suspend); - ASSERT(is_nil(c_p->suspendee)); - - /* Suspend c_p; when rp is suspended c_p will be resumed. */ - suspend_process(cp_rq, c_p); - c_p->flags |= F_P2PNR_RESCHED; - } - /* Yield (caller is assumed to yield immediately in bif). */ - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); - rp = ERTS_PROC_LOCK_BUSY; + if (suspend) { + if (suspend_process(c_p, rp)) + goto done; } else { - ErtsProcLocks need_locks = pid_locks & ~ERTS_PROC_LOCK_STATUS; - if (need_locks && erts_smp_proc_trylock(rp, need_locks) == EBUSY) { - erts_smp_runqs_unlock(cp_rq, rp_rq); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); - rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS, - pid, pid_locks|ERTS_PROC_LOCK_STATUS); - if (!rp) - goto done; - /* run-queues may have changed */ - cp_rq = erts_get_runq_proc(c_p); - rp_rq = erts_get_runq_proc(rp); - erts_smp_runqs_lock(cp_rq, rp_rq); - if (rp->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) { - /* Ahh... */ - erts_smp_proc_unlock(rp, - pid_locks & ~ERTS_PROC_LOCK_STATUS); - goto running; - } - } + if (!(ERTS_PSFLG_RUNNING & erts_smp_atomic32_read_acqb(&rp->state))) + goto done; - /* rp is not running and we got the locks we want... */ - if (suspend) - suspend_process(rp_rq, rp); } - erts_smp_runqs_unlock(cp_rq, rp_rq); + + /* Other process running */ + + /* + * If we got pending suspenders and suspend ourselves waiting + * to suspend another process we might deadlock. + * In this case we have to yield, be suspended by + * someone else and then do it all over again. + */ + if (!c_p->pending_suspenders) { + /* Mark rp pending for suspend by c_p */ + add_pend_suspend(rp, c_p->id, handle_pend_sync_suspend); + ASSERT(is_nil(c_p->suspendee)); + + /* Suspend c_p; when rp is suspended c_p will be resumed. */ + suspend_process(c_p, c_p); + c_p->flags |= F_P2PNR_RESCHED; + } + /* Yield (caller is assumed to yield immediately in bif). */ + erts_smp_proc_unlock(rp, pid_locks|ERTS_PROC_LOCK_STATUS); + rp = ERTS_PROC_LOCK_BUSY; } done: @@ -5367,10 +5703,10 @@ erts_pid2proc_nropt(Process *c_p, ErtsProcLocks c_p_locks, return erts_pid2proc_not_running(c_p, c_p_locks, pid, pid_locks); } -static ERTS_INLINE void -do_bif_suspend_process(ErtsSuspendMonitor *smon, - Process *suspendee, - ErtsRunQueue *locked_runq) +static ERTS_INLINE int +do_bif_suspend_process(Process *c_p, + ErtsSuspendMonitor *smon, + Process *suspendee) { ASSERT(suspendee); ASSERT(!ERTS_PROC_IS_EXITING(suspendee)); @@ -5378,25 +5714,15 @@ do_bif_suspend_process(ErtsSuspendMonitor *smon, & erts_proc_lc_my_proc_locks(suspendee)); if (smon) { if (!smon->active) { - ErtsRunQueue *rq; - - if (locked_runq) - rq = locked_runq; - else { - rq = erts_get_runq_proc(suspendee); - erts_smp_runq_lock(rq); - } - - suspend_process(rq, suspendee); - - if (!locked_runq) - erts_smp_runq_unlock(rq); + if (!suspend_process(c_p, suspendee)) + return 0; } smon->active += smon->pending; ASSERT(smon->active); smon->pending = 0; + return 1; } - + return 0; } static void @@ -5419,10 +5745,17 @@ handle_pend_bif_sync_suspend(Process *suspendee, erts_delete_suspend_monitor(&suspender->suspend_monitors, suspendee->id); else { +#ifdef DEBUG + int res; +#endif ErtsSuspendMonitor *smon; smon = erts_lookup_suspend_monitor(suspender->suspend_monitors, suspendee->id); - do_bif_suspend_process(smon, suspendee, NULL); +#ifdef DEBUG + res = +#endif + do_bif_suspend_process(suspendee, smon, suspendee); + ASSERT(!smon || res != 0); suspender->suspendee = suspendee->id; } /* suspender is suspended waiting for suspendee to suspend; @@ -5454,10 +5787,17 @@ handle_pend_bif_async_suspend(Process *suspendee, erts_delete_suspend_monitor(&suspender->suspend_monitors, suspendee->id); else { +#ifdef DEBUG + int res; +#endif ErtsSuspendMonitor *smon; smon = erts_lookup_suspend_monitor(suspender->suspend_monitors, suspendee->id); - do_bif_suspend_process(smon, suspendee, NULL); +#ifdef DEBUG + res = +#endif + do_bif_suspend_process(suspendee, smon, suspendee); + ASSERT(!smon || res != 0); } erts_smp_proc_unlock(suspender, ERTS_PROC_LOCK_LINK); } @@ -5542,7 +5882,8 @@ suspend_process_2(BIF_ALIST_2) /* This is really a piece of cake without SMP support... */ if (!smon->active) { - suspend_process(ERTS_RUNQ_IX(0), suspendee); + erts_smp_atomic32_read_bor_nob(&suspendee->state, ERTS_PSFLG_SUSPENDED); + suspend_process(BIF_P, suspendee); smon->active++; res = am_true; } @@ -5585,21 +5926,15 @@ suspend_process_2(BIF_ALIST_2) if (smon->pending && unless_suspending) res = am_false; else { - ErtsRunQueue *rq; if (smon->pending == INT_MAX) goto system_limit; smon->pending++; - rq = erts_get_runq_proc(suspendee); - erts_smp_runq_lock(rq); - if (suspendee->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) + if (!do_bif_suspend_process(BIF_P, smon, suspendee)) add_pend_suspend(suspendee, BIF_P->id, handle_pend_bif_async_suspend); - else - do_bif_suspend_process(smon, suspendee, rq); - erts_smp_runq_unlock(rq); res = am_true; } @@ -5636,7 +5971,6 @@ suspend_process_2(BIF_ALIST_2) /* done */ } else { - ErtsRunQueue *cp_rq, *s_rq; /* We haven't got any active suspends on the suspendee */ /* @@ -5653,12 +5987,7 @@ suspend_process_2(BIF_ALIST_2) if (!unless_suspending || smon->pending == 0) smon->pending++; - cp_rq = erts_get_runq_proc(BIF_P); - s_rq = erts_get_runq_proc(suspendee); - erts_smp_runqs_lock(cp_rq, s_rq); - if (!(suspendee->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING)) { - do_bif_suspend_process(smon, suspendee, s_rq); - erts_smp_runqs_unlock(cp_rq, s_rq); + if (do_bif_suspend_process(BIF_P, smon, suspendee)) { res = (!unless_suspending || smon->active == 1 ? am_true : am_false); @@ -5678,8 +6007,7 @@ suspend_process_2(BIF_ALIST_2) * This time with BIF_P->suspendee == BIF_ARG_1 (see * above). */ - suspend_process(cp_rq, BIF_P); - erts_smp_runqs_unlock(cp_rq, s_rq); + suspend_process(BIF_P, BIF_P); goto yield; } } @@ -5687,9 +6015,15 @@ suspend_process_2(BIF_ALIST_2) } #endif /* ERTS_SMP */ - - ASSERT(suspendee->status == P_SUSPENDED || (asynchronous && smon->pending)); - ASSERT(suspendee->status == P_SUSPENDED || !smon->active); +#ifdef DEBUG + { + erts_aint32_t state = erts_smp_atomic32_read_acqb(&suspendee->state); + ASSERT((state & ERTS_PSFLG_SUSPENDED) + || (asynchronous && smon->pending)); + ASSERT((state & ERTS_PSFLG_SUSPENDED) + || !smon->active); + } +#endif erts_smp_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS); erts_smp_proc_unlock(BIF_P, xlocks); @@ -5784,9 +6118,8 @@ resume_process_1(BIF_ALIST_1) if (!suspendee) goto no_suspendee; - ASSERT(suspendee->status == P_SUSPENDED - || (suspendee->status == P_GARBING - && suspendee->gcstatus == P_SUSPENDED)); + ASSERT(ERTS_PSFLG_SUSPENDED + & erts_smp_atomic32_read_nob(&suspendee->state)); resume_process(suspendee); erts_smp_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS); @@ -5815,449 +6148,46 @@ erts_run_queues_len(Uint *qlen) Uint len = 0; ERTS_ATOMIC_FOREACH_RUNQ(rq, { - if (qlen) - qlen[i++] = rq->procs.len; - len += rq->procs.len; + Sint pqlen = 0; + int pix; + for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) + pqlen += RUNQ_READ_LEN(&rq->procs.prio_info[pix].len); + + if (pqlen < 0) + pqlen = 0; + if (qlen) + qlen[i++] = pqlen; + len += pqlen; } ); return len; } -#ifdef HARDDEBUG_RUNQS -static void -check_procs_runq(ErtsRunQueue *runq, Process *p_in_q, Process *p_not_in_q) -{ - int len[ERTS_NO_PROC_PRIO_LEVELS] = {0}; - int tot_len; - int prioq, prio; - int found_p_in_q; - Process *p, *prevp; - - found_p_in_q = 0; - for (prioq = 0; prioq < ERTS_NO_PROC_PRIO_LEVELS - 1; prioq++) { - prevp = NULL; - for (p = runq->procs.prio[prioq].first; p; p = p->next) { - ASSERT(p != p_not_in_q); - if (p == p_in_q) - found_p_in_q = 1; - switch (p->prio) { - case PRIORITY_MAX: - case PRIORITY_HIGH: - case PRIORITY_NORMAL: - ASSERT(prioq == p->prio); - break; - case PRIORITY_LOW: - ASSERT(prioq == PRIORITY_NORMAL); - break; - default: - ASSERT(!"Bad prio on process"); - } - len[p->prio]++; - ASSERT(prevp == p->prev); - if (p->prev) { - ASSERT(p->prev->next == p); - } - else { - ASSERT(runq->procs.prio[prioq].first == p); - } - if (p->next) { - ASSERT(p->next->prev == p); - } - else { - ASSERT(runq->procs.prio[prioq].last == p); - } - ASSERT(p->run_queue == runq); - prevp = p; - } - } - - ASSERT(!p_in_q || found_p_in_q); - - tot_len = 0; - for (prio = 0; prio < ERTS_NO_PROC_PRIO_LEVELS; prio++) { - ASSERT(len[prio] == runq->procs.prio_info[prio].len); - if (len[prio]) { - ASSERT(runq->flags & (1 << prio)); - } - else { - ASSERT(!(runq->flags & (1 << prio))); - } - tot_len += len[prio]; - } - ASSERT(runq->procs.len == tot_len); -} -# define ERTS_DBG_CHK_PROCS_RUNQ(RQ) check_procs_runq((RQ), NULL, NULL) -# define ERTS_DBG_CHK_PROCS_RUNQ_PROC(RQ, P) check_procs_runq((RQ), (P), NULL) -# define ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(RQ, P) check_procs_runq((RQ), NULL, (P)) -#else -# define ERTS_DBG_CHK_PROCS_RUNQ(RQ) -# define ERTS_DBG_CHK_PROCS_RUNQ_PROC(RQ, P) -# define ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(RQ, P) -#endif - - -static ERTS_INLINE void -enqueue_process(ErtsRunQueue *runq, Process *p) -{ - ErtsRunPrioQueue *rpq; - ErtsRunQueueInfo *rqi; - - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - - ASSERT(p->bound_runq || !(runq->flags & ERTS_RUNQ_FLG_SUSPENDED)); - - rqi = &runq->procs.prio_info[p->prio]; - rqi->len++; - if (rqi->max_len < rqi->len) - rqi->max_len = rqi->len; - - runq->procs.len++; - runq->len++; - if (runq->max_len < runq->len) - runq->max_len = runq->len; - - runq->flags |= (1 << p->prio); - - rpq = (p->prio == PRIORITY_LOW - ? &runq->procs.prio[PRIORITY_NORMAL] - : &runq->procs.prio[p->prio]); - - p->next = NULL; - p->prev = rpq->last; - if (rpq->last) - rpq->last->next = p; - else - rpq->first = p; - rpq->last = p; - - switch (p->status) { - case P_EXITING: - break; - case P_GARBING: - p->gcstatus = P_RUNABLE; - break; - default: - p->status = P_RUNABLE; - break; - } - -#ifdef ERTS_SMP - p->status_flags |= ERTS_PROC_SFLG_INRUNQ; -#endif - - ERTS_DBG_CHK_PROCS_RUNQ_PROC(runq, p); -} - - -static ERTS_INLINE int -dequeue_process(ErtsRunQueue *runq, Process *p) -{ - ErtsRunPrioQueue *rpq; - int res = 1; - - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - - ERTS_DBG_CHK_PROCS_RUNQ(runq); - - rpq = &runq->procs.prio[p->prio == PRIORITY_LOW ? PRIORITY_NORMAL : p->prio]; - if (p->prev) { - p->prev->next = p->next; - } - else if (rpq->first == p) { - rpq->first = p->next; - } - else { - res = 0; - } - if (p->next) { - p->next->prev = p->prev; - } - else if (rpq->last == p) { - rpq->last = p->prev; - } - else { - ASSERT(res == 0); - } - - if (res) { - - if (--runq->procs.prio_info[p->prio].len == 0) - runq->flags &= ~(1 << p->prio); - runq->procs.len--; - runq->len--; - -#ifdef ERTS_SMP - p->status_flags &= ~ERTS_PROC_SFLG_INRUNQ; -#endif - } - - ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(runq, p); - return res; -} - -/* schedule a process */ -static ERTS_INLINE ErtsRunQueue * -internal_add_to_runq(ErtsRunQueue *runq, Process *p) -{ - Uint32 prev_status = p->status; - ErtsRunQueue *add_runq; -#ifdef ERTS_SMP - - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); - - if (p->status_flags & ERTS_PROC_SFLG_INRUNQ) - return NULL; - else if (p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) { - ASSERT(ERTS_PROC_IS_EXITING(p) || p->rcount == 0); - ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(runq, p); - p->status_flags |= ERTS_PROC_SFLG_PENDADD2SCHEDQ; - return NULL; - } - ASSERT(!p->scheduler_data); -#endif - - ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(runq, p); -#ifndef ERTS_SMP - /* Never schedule a suspended process (ok in smp case) */ - ASSERT(ERTS_PROC_IS_EXITING(p) || p->rcount == 0); - add_runq = runq; -#else - ASSERT(!p->bound_runq || p->bound_runq == p->run_queue); - if (p->bound_runq) { - if (p->bound_runq == runq) - add_runq = runq; - else { - add_runq = p->bound_runq; - erts_smp_xrunq_lock(runq, add_runq); - } - } - else { - add_runq = erts_check_emigration_need(runq, p->prio); - if (!add_runq) - add_runq = runq; - else /* Process emigrated */ - p->run_queue = add_runq; - } -#endif - - /* Enqueue the process */ - enqueue_process(add_runq, p); - - if ((erts_system_profile_flags.runnable_procs) - && (prev_status == P_WAITING - || prev_status == P_SUSPENDED)) { - profile_runnable_proc(p, am_active); - } - - if (add_runq != runq) - erts_smp_runq_unlock(add_runq); - - return add_runq; -} - - -void -erts_add_to_runq(Process *p) -{ - ErtsRunQueue *notify_runq; - ErtsRunQueue *runq = erts_get_runq_proc(p); - erts_smp_runq_lock(runq); - notify_runq = internal_add_to_runq(runq, p); - erts_smp_runq_unlock(runq); - smp_notify_inc_runq(notify_runq); - -} - -/* Possibly remove a scheduled process we need to suspend */ - -static int -remove_proc_from_runq(ErtsRunQueue *rq, Process *p, int to_inactive) -{ - int res; - - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - -#ifdef ERTS_SMP - if (p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ) { - p->status_flags &= ~ERTS_PROC_SFLG_PENDADD2SCHEDQ; - ASSERT(!remove_proc_from_runq(rq, p, 0)); - return 1; - } -#endif - - res = dequeue_process(rq, p); - - if (res && erts_system_profile_flags.runnable_procs && to_inactive) - profile_runnable_proc(p, am_inactive); - -#ifdef ERTS_SMP - ASSERT(!(p->status_flags & ERTS_PROC_SFLG_INRUNQ)); -#endif - - return res; -} - -#ifdef ERTS_SMP - -ErtsMigrateResult -erts_proc_migrate(Process *p, ErtsProcLocks *plcks, - ErtsRunQueue *from_rq, int *from_locked, - ErtsRunQueue *to_rq, int *to_locked) -{ - ERTS_SMP_LC_ASSERT(*plcks == erts_proc_lc_my_proc_locks(p)); - ERTS_SMP_LC_ASSERT((ERTS_PROC_LOCK_STATUS & *plcks) - || from_locked); - ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked); - ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked); - - /* - * If we have the lock on the run queue to migrate to, - * check that it isn't suspended. If it is suspended, - * we will refuse to migrate to it anyway. - */ - if (*to_locked && (to_rq->flags & ERTS_RUNQ_FLG_SUSPENDED)) - return ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED; - - /* We need status lock on process and locks on both run queues */ - - if (!(ERTS_PROC_LOCK_STATUS & *plcks)) { - if (erts_smp_proc_trylock(p, ERTS_PROC_LOCK_STATUS) == EBUSY) { - ErtsProcLocks lcks = *plcks; - Eterm pid = p->id; - Process *proc = *plcks ? p : NULL; - - if (*from_locked) { - *from_locked = 0; - erts_smp_runq_unlock(from_rq); - } - if (*to_locked) { - *to_locked = 0; - erts_smp_runq_unlock(to_rq); - } - - proc = erts_pid2proc_opt(proc, - lcks, - pid, - lcks|ERTS_PROC_LOCK_STATUS, - ERTS_P2P_FLG_ALLOW_OTHER_X); - if (!proc) { - *plcks = 0; - return ERTS_MIGRATE_FAILED_NOT_IN_RUNQ; - } - ASSERT(proc == p); - } - *plcks |= ERTS_PROC_LOCK_STATUS; - } - - ASSERT(!p->bound_runq); - - ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked); - ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked); - - if (p->run_queue != from_rq) - return ERTS_MIGRATE_FAILED_RUNQ_CHANGED; - - if (!*from_locked || !*to_locked) { - if (from_rq < to_rq) { - if (!*to_locked) { - if (!*from_locked) - erts_smp_runq_lock(from_rq); - erts_smp_runq_lock(to_rq); - } - else if (erts_smp_runq_trylock(from_rq) == EBUSY) { - erts_smp_runq_unlock(to_rq); - erts_smp_runq_lock(from_rq); - erts_smp_runq_lock(to_rq); - } - } - else { - if (!*from_locked) { - if (!*to_locked) - erts_smp_runq_lock(to_rq); - erts_smp_runq_lock(from_rq); - } - else if (erts_smp_runq_trylock(to_rq) == EBUSY) { - erts_smp_runq_unlock(from_rq); - erts_smp_runq_lock(to_rq); - erts_smp_runq_lock(from_rq); - } - } - *to_locked = *from_locked = 1; - } - - ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked); - ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked); - - /* Ok we now got all locks we need; do it... */ - - /* Refuse to migrate to a suspended run queue */ - if (to_rq->flags & ERTS_RUNQ_FLG_SUSPENDED) - return ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED; - - if ((p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) - || !(p->status_flags & ERTS_PROC_SFLG_INRUNQ)) - return ERTS_MIGRATE_FAILED_NOT_IN_RUNQ; - - dequeue_process(from_rq, p); - p->run_queue = to_rq; - enqueue_process(to_rq, p); - - return ERTS_MIGRATE_SUCCESS; -} -#endif /* ERTS_SMP */ - Eterm erts_process_status(Process *c_p, ErtsProcLocks c_p_locks, Process *rp, Eterm rpid) { Eterm res = am_undefined; - Process *p; - - if (rp) { - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS - & erts_proc_lc_my_proc_locks(rp)); - p = rp; - } - else { - p = erts_pid2proc_opt(c_p, c_p_locks, - rpid, ERTS_PROC_LOCK_STATUS, - ERTS_P2P_FLG_ALLOW_OTHER_X); - } + Process *p = rp ? rp : erts_proc_lookup_raw(rpid); if (p) { - switch (p->status) { - case P_RUNABLE: - res = am_runnable; - break; - case P_WAITING: - res = am_waiting; - break; - case P_RUNNING: - res = am_running; - break; - case P_EXITING: + erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); + if (state & ERTS_PSFLG_FREE) + res = am_free; + else if (state & ERTS_PSFLG_EXITING) res = am_exiting; - break; - case P_GARBING: + else if (state & ERTS_PSFLG_GC) res = am_garbage_collecting; - break; - case P_SUSPENDED: + else if (state & ERTS_PSFLG_SUSPENDED) res = am_suspended; - break; - case P_FREE: /* We cannot look up a process in P_FREE... */ - default: /* Not a valid status... */ - erl_exit(1, "Bad status (%b32u) found for process %T\n", - p->status, p->id); - break; - } - -#ifdef ERTS_SMP - if (!rp && (p != c_p || !(ERTS_PROC_LOCK_STATUS & c_p_locks))) - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); + else if (state & ERTS_PSFLG_RUNNING) + res = am_running; + else if (state & ERTS_PSFLG_ACTIVE) + res = am_runnable; + else + res = am_waiting; } +#ifdef ERTS_SMP else { int i; ErtsSchedulerData *esdp; @@ -6272,42 +6202,40 @@ erts_process_status(Process *c_p, ErtsProcLocks c_p_locks, } erts_smp_runq_unlock(esdp->run_queue); } - -#endif - } - +#endif return res; } /* -** Suspend a process +** Suspend a currently executing process ** If we are to suspend on a port the busy_port is the thing ** otherwise busy_port is NIL */ void -erts_suspend(Process* process, ErtsProcLocks process_locks, Port *busy_port) +erts_suspend(Process* c_p, ErtsProcLocks c_p_locks, Port *busy_port) { - ErtsRunQueue *rq; - - ERTS_SMP_LC_ASSERT(process_locks == erts_proc_lc_my_proc_locks(process)); - if (!(process_locks & ERTS_PROC_LOCK_STATUS)) - erts_smp_proc_lock(process, ERTS_PROC_LOCK_STATUS); - - rq = erts_get_runq_proc(process); - - erts_smp_runq_lock(rq); +#ifdef DEBUG + int res; +#endif + ASSERT(c_p == erts_get_current_process()); + ERTS_SMP_LC_ASSERT(c_p_locks == erts_proc_lc_my_proc_locks(c_p)); + if (!(c_p_locks & ERTS_PROC_LOCK_STATUS)) + erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); - suspend_process(rq, process); +#ifdef DEBUG + res = +#endif + suspend_process(c_p, c_p); - erts_smp_runq_unlock(rq); + ASSERT(res); if (busy_port) - erts_wake_process_later(busy_port, process); + erts_wake_process_later(busy_port, c_p); - if (!(process_locks & ERTS_PROC_LOCK_STATUS)) - erts_smp_proc_unlock(process, ERTS_PROC_LOCK_STATUS); + if (!(c_p_locks & ERTS_PROC_LOCK_STATUS)) + erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS); } @@ -6348,57 +6276,54 @@ erts_resume_processes(ErtsProcList *plp) Eterm erts_get_process_priority(Process *p) { - ErtsRunQueue *rq; - Eterm value; - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - rq = erts_get_runq_proc(p); - erts_smp_runq_lock(rq); - switch(p->prio) { - case PRIORITY_MAX: value = am_max; break; - case PRIORITY_HIGH: value = am_high; break; - case PRIORITY_NORMAL: value = am_normal; break; - case PRIORITY_LOW: value = am_low; break; - default: ASSERT(0); value = am_undefined; break; + erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); + switch (state & ERTS_PSFLG_PRIO_MASK) { + case PRIORITY_MAX: return am_max; + case PRIORITY_HIGH: return am_high; + case PRIORITY_NORMAL: return am_normal; + case PRIORITY_LOW: return am_low; + default: ASSERT(0); return am_undefined; } - erts_smp_runq_unlock(rq); - return value; } Eterm -erts_set_process_priority(Process *p, Eterm new_value) +erts_set_process_priority(Process *p, Eterm value) { - ErtsRunQueue *rq; - Eterm old_value; - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - rq = erts_get_runq_proc(p); -#ifdef ERTS_SMP - ASSERT(!(p->status_flags & ERTS_PROC_SFLG_INRUNQ)); -#endif - erts_smp_runq_lock(rq); - switch(p->prio) { - case PRIORITY_MAX: old_value = am_max; break; - case PRIORITY_HIGH: old_value = am_high; break; - case PRIORITY_NORMAL: old_value = am_normal; break; - case PRIORITY_LOW: old_value = am_low; break; - default: ASSERT(0); old_value = am_undefined; break; - } - switch (new_value) { - case am_max: p->prio = PRIORITY_MAX; break; - case am_high: p->prio = PRIORITY_HIGH; break; - case am_normal: p->prio = PRIORITY_NORMAL; break; - case am_low: p->prio = PRIORITY_LOW; break; - default: old_value = THE_NON_VALUE; break; + erts_aint32_t a, oprio, nprio; + + switch (value) { + case am_max: nprio = (erts_aint32_t) PRIORITY_MAX; break; + case am_high: nprio = (erts_aint32_t) PRIORITY_HIGH; break; + case am_normal: nprio = (erts_aint32_t) PRIORITY_NORMAL; break; + case am_low: nprio = (erts_aint32_t) PRIORITY_LOW; break; + default: return THE_NON_VALUE; break; } - erts_smp_runq_unlock(rq); - return old_value; -} -/* note that P_RUNNING is only set so that we don't try to remove -** running processes from the schedule queue if they exit - a running -** process not being in the schedule queue!! -** Schedule for up to INPUT_REDUCTIONS context switches, -** return 1 if more to do. -*/ + a = erts_smp_atomic32_read_nob(&p->state); + if (nprio == (a & ERTS_PSFLG_PRIO_MASK)) + oprio = nprio; + else { + erts_aint32_t e, n; + do { + oprio = a & ERTS_PSFLG_PRIO_MASK; + n = e = a; + + ASSERT(!(a & ERTS_PSFLG_IN_RUNQ)); + + n &= ~ERTS_PSFLG_PRIO_MASK; + n |= nprio; + a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); + } while (a != e); + } + + switch (oprio) { + case PRIORITY_MAX: return am_max; + case PRIORITY_HIGH: return am_high; + case PRIORITY_NORMAL: return am_normal; + case PRIORITY_LOW: return am_low; + default: ASSERT(0); return am_undefined; + } +} /* * schedule() is called from BEAM (process_main()) or HiPE @@ -6421,7 +6346,6 @@ erts_set_process_priority(Process *p, Eterm new_value) Process *schedule(Process *p, int calls) { ErtsRunQueue *rq; - ErtsRunPrioQueue *rpq; erts_aint_t dt; ErtsSchedulerData *esdp; int context_reds; @@ -6429,6 +6353,8 @@ Process *schedule(Process *p, int calls) int input_reductions; int actual_reds; int reds; + Uint32 flags; + erts_aint32_t state = 0; /* Supress warning... */ #ifdef USE_VM_PROBES if (p != NULL && DTRACE_ENABLED(process_unscheduled)) { @@ -6484,68 +6410,49 @@ Process *schedule(Process *p, int calls) erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); - if ((erts_system_profile_flags.runnable_procs) - && (p->status == P_WAITING)) { - profile_runnable_proc(p, am_inactive); - } + state = erts_smp_atomic32_read_acqb(&p->state); if (IS_TRACED(p)) { - if (IS_TRACED_FL(p, F_TRACE_CALLS) && p->status != P_FREE) { + if (IS_TRACED_FL(p, F_TRACE_CALLS) && !(state & ERTS_PSFLG_FREE)) { erts_schedule_time_break(p, ERTS_BP_CALL_TIME_SCHEDULE_OUT); } - switch (p->status) { - case P_EXITING: - if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_EXIT)) - trace_sched(p, am_out_exiting); - break; - case P_FREE: + if (state & (ERTS_PSFLG_FREE|ERTS_PSFLG_EXITING)) { if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_EXIT)) - trace_sched(p, am_out_exited); - break; - default: + trace_sched(p, ((state & ERTS_PSFLG_FREE) + ? am_out_exited + : am_out_exiting)); + } + else { if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED)) trace_sched(p, am_out); else if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_PROCS)) trace_virtual_sched(p, am_out); - break; } } #ifdef ERTS_SMP - if (ERTS_PROC_PENDING_EXIT(p)) { - erts_handle_pending_exit(p, - ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); - p->status_flags |= ERTS_PROC_SFLG_PENDADD2SCHEDQ; - } - - if (p->pending_suspenders) { - handle_pending_suspend(p, - ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); - ASSERT(!(p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ) - || p->rcount == 0); - } + if (state & ERTS_PSFLG_PENDING_EXIT) + erts_handle_pending_exit(p, (ERTS_PROC_LOCK_MAIN + | ERTS_PROC_LOCK_STATUS)); + if (p->pending_suspenders) + handle_pending_suspend(p, (ERTS_PROC_LOCK_MAIN + | ERTS_PROC_LOCK_STATUS)); #endif - erts_smp_runq_lock(rq); - ERTS_PROC_REDUCTIONS_EXECUTED(rq, p->prio, reds, actual_reds); + schedule_out_process(rq, state, p); /* Returns with rq locked! */ + + ERTS_PROC_REDUCTIONS_EXECUTED(rq, + (int) (state & ERTS_PSFLG_PRIO_MASK), + reds, + actual_reds); esdp->current_process = NULL; #ifdef ERTS_SMP p->scheduler_data = NULL; - p->runq_flags &= ~ERTS_PROC_RUNQ_FLG_RUNNING; - p->status_flags &= ~ERTS_PROC_SFLG_RUNNING; - - if (p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ) { - ErtsRunQueue *notify_runq; - p->status_flags &= ~ERTS_PROC_SFLG_PENDADD2SCHEDQ; - notify_runq = internal_add_to_runq(rq, p); - if (notify_runq != rq) - smp_notify_inc_runq(notify_runq); - } #endif - if (p->status == P_FREE) { + if (state & ERTS_PSFLG_FREE) { #ifdef ERTS_SMP ASSERT(esdp->free_process == p); esdp->free_process = NULL; @@ -6576,8 +6483,9 @@ Process *schedule(Process *p, int calls) ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); check_activities_to_run: { - #ifdef ERTS_SMP + ErtsMigrationPaths *mps; + ErtsMigrationPath *mp; #ifdef ERTS_SMP { @@ -6599,20 +6507,28 @@ Process *schedule(Process *p, int calls) ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); - if (rq->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK) - immigrate(rq); + mps = erts_get_migration_paths_managed(); + mp = &mps->mpath[rq->ix]; + + if (mp->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK) + immigrate(rq, mp); - continue_check_activities_to_run: + continue_check_activities_to_run: + flags = ERTS_RUNQ_FLGS_GET_NOB(rq); + continue_check_activities_to_run_known_flags: - if (rq->flags & (ERTS_RUNQ_FLG_CHK_CPU_BIND - | ERTS_RUNQ_FLG_SUSPENDED)) { - if (rq->flags & ERTS_RUNQ_FLG_SUSPENDED) { - ASSERT(erts_smp_atomic32_read_nob(&esdp->ssi->flags) - & ERTS_SSI_FLG_SUSPENDED); + + if (flags & (ERTS_RUNQ_FLG_CHK_CPU_BIND|ERTS_RUNQ_FLG_SUSPENDED)) { + + if (flags & ERTS_RUNQ_FLG_SUSPENDED) { suspend_scheduler(esdp); + flags = ERTS_RUNQ_FLGS_GET_NOB(rq); } - if (rq->flags & ERTS_RUNQ_FLG_CHK_CPU_BIND) + if (flags & ERTS_RUNQ_FLG_CHK_CPU_BIND) { + flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_CHK_CPU_BIND); + flags &= ~ ERTS_RUNQ_FLG_CHK_CPU_BIND; erts_sched_check_cpu_bind(esdp); + } } { @@ -6641,14 +6557,12 @@ Process *schedule(Process *p, int calls) } #endif /* ERTS_SMP */ - ASSERT(rq->len == rq->procs.len + rq->ports.info.len); - - if ((rq->len == 0 && !rq->misc.start) - || (rq->halt_in_progress - && rq->ports.info.len == 0 && !rq->misc.start)) { + flags = ERTS_RUNQ_FLGS_GET_NOB(rq); + if ((!(flags & ERTS_RUNQ_FLGS_QMASK) && !rq->misc.start) + || (rq->halt_in_progress && ERTS_EMPTY_RUNQ_PORTS(rq))) { + /* Prepare for scheduler wait */ #ifdef ERTS_SMP - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); rq->wakeup_other = 0; @@ -6656,21 +6570,27 @@ Process *schedule(Process *p, int calls) empty_runq(rq); - if (rq->flags & ERTS_RUNQ_FLG_SUSPENDED) { - ASSERT(erts_smp_atomic32_read_nob(&esdp->ssi->flags) - & ERTS_SSI_FLG_SUSPENDED); + flags = ERTS_RUNQ_FLGS_GET_NOB(rq); + if (flags & ERTS_RUNQ_FLG_SUSPENDED) { non_empty_runq(rq); - goto continue_check_activities_to_run; + goto continue_check_activities_to_run_known_flags; } - else if (!(rq->flags & ERTS_RUNQ_FLG_INACTIVE)) { + else if (!(flags & ERTS_RUNQ_FLG_INACTIVE)) { + if (try_steal_task(rq)) { + non_empty_runq(rq); + goto continue_check_activities_to_run; + } + + ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED); + /* * Check for ERTS_RUNQ_FLG_SUSPENDED has to be done * after trying to steal a task. */ - if (try_steal_task(rq) - || (rq->flags & ERTS_RUNQ_FLG_SUSPENDED)) { + flags = ERTS_RUNQ_FLGS_GET_NOB(rq); + if (flags & ERTS_RUNQ_FLG_SUSPENDED) { non_empty_runq(rq); - goto continue_check_activities_to_run; + goto continue_check_activities_to_run_known_flags; } } @@ -6701,6 +6621,7 @@ Process *schedule(Process *p, int calls) erl_sys_schedule(1); dt = erts_do_time_read_and_reset(); if (dt) erts_bump_timer(dt); + #ifdef ERTS_SMP erts_smp_runq_lock(rq); clear_sys_scheduling(); @@ -6717,14 +6638,17 @@ Process *schedule(Process *p, int calls) { int wo_reds = rq->wakeup_other_reds; if (wo_reds) { - if (rq->len < 2) { + erts_aint32_t len = rq->len; + if (len < 2) { rq->wakeup_other -= ERTS_WAKEUP_OTHER_DEC*wo_reds; if (rq->wakeup_other < 0) rq->wakeup_other = 0; } else if (rq->wakeup_other < wakeup_other_limit) - rq->wakeup_other += rq->len*wo_reds + ERTS_WAKEUP_OTHER_FIXED_INC; + rq->wakeup_other += len*wo_reds + ERTS_WAKEUP_OTHER_FIXED_INC; else { + if (flags & ERTS_RUNQ_FLG_PROTECTED) + ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED); if (erts_smp_atomic32_read_acqb(&no_empty_run_queues) != 0) { wake_scheduler_on_empty_runq(rq); rq->wakeup_other = 0; @@ -6740,7 +6664,7 @@ Process *schedule(Process *p, int calls) * Find a new port to run. */ - if (rq->ports.info.len) { + if (RUNQ_READ_LEN(&rq->ports.info.len)) { int have_outstanding_io; have_outstanding_io = erts_port_task_execute(rq, &esdp->current_port); if ((have_outstanding_io && fcalls > 2*input_reductions) @@ -6767,160 +6691,123 @@ Process *schedule(Process *p, int calls) /* * Find a new process to run. */ - pick_next_process: - - ERTS_DBG_CHK_PROCS_RUNQ(rq); - - switch (rq->flags & ERTS_RUNQ_FLGS_PROCS_QMASK) { - case MAX_BIT: - case MAX_BIT|HIGH_BIT: - case MAX_BIT|NORMAL_BIT: - case MAX_BIT|LOW_BIT: - case MAX_BIT|HIGH_BIT|NORMAL_BIT: - case MAX_BIT|HIGH_BIT|LOW_BIT: - case MAX_BIT|NORMAL_BIT|LOW_BIT: - case MAX_BIT|HIGH_BIT|NORMAL_BIT|LOW_BIT: - rpq = &rq->procs.prio[PRIORITY_MAX]; - break; - case HIGH_BIT: - case HIGH_BIT|NORMAL_BIT: - case HIGH_BIT|LOW_BIT: - case HIGH_BIT|NORMAL_BIT|LOW_BIT: - rpq = &rq->procs.prio[PRIORITY_HIGH]; - break; - case NORMAL_BIT: - rpq = &rq->procs.prio[PRIORITY_NORMAL]; - break; - case LOW_BIT: - rpq = &rq->procs.prio[PRIORITY_NORMAL]; - break; - case NORMAL_BIT|LOW_BIT: - rpq = &rq->procs.prio[PRIORITY_NORMAL]; - ASSERT(rpq->first != NULL); - p = rpq->first; - if (p->prio == PRIORITY_LOW) { - if (p == rpq->last || p->skipped >= RESCHEDULE_LOW-1) - p->skipped = 0; - else { - /* skip it */ - p->skipped++; - rpq->first = p->next; - rpq->first->prev = NULL; - rpq->last->next = p; - p->prev = rpq->last; - p->next = NULL; - rpq->last = p; + pick_next_process: { + int prio_q; + int qmask; + + flags = ERTS_RUNQ_FLGS_GET_NOB(rq); + qmask = (int) (flags & ERTS_RUNQ_FLGS_PROCS_QMASK); + switch (qmask & -qmask) { + case MAX_BIT: + prio_q = PRIORITY_MAX; + break; + case HIGH_BIT: + prio_q = PRIORITY_HIGH; + break; + case NORMAL_BIT: + case LOW_BIT: + prio_q = PRIORITY_NORMAL; + if (check_requeue_process(rq, PRIORITY_NORMAL)) goto pick_next_process; - } + break; + case 0: /* No process at all */ + default: + ASSERT(qmask == 0); + goto check_activities_to_run; } - break; - case 0: /* No process at all */ - default: - ASSERT((rq->flags & ERTS_RUNQ_FLGS_PROCS_QMASK) == 0); - ASSERT(rq->procs.len == 0); - goto check_activities_to_run; - } - - BM_START_TIMER(system); - - /* - * Take the chosen process out of the queue. - */ - ASSERT(rpq->first); /* Wrong qmask in rq->flags? */ - p = rpq->first; -#ifdef ERTS_SMP - ERTS_SMP_LC_ASSERT(rq == p->run_queue); -#endif - rpq->first = p->next; - if (!rpq->first) - rpq->last = NULL; - else - rpq->first->prev = NULL; - p->next = p->prev = NULL; + BM_START_TIMER(system); - if (--rq->procs.prio_info[p->prio].len == 0) - rq->flags &= ~(1 << p->prio); - ASSERT(rq->procs.len > 0); - rq->procs.len--; - ASSERT(rq->len > 0); - rq->len--; + /* + * Take the chosen process out of the queue. + */ + p = dequeue_process(rq, prio_q, &state); - { - Uint32 ee_flgs = (ERTS_RUNQ_FLG_EVACUATE(p->prio) - | ERTS_RUNQ_FLG_EMIGRATE(p->prio)); + ASSERT(p); /* Wrong qmask in rq->flags? */ - if ((rq->flags & (ERTS_RUNQ_FLG_SUSPENDED|ee_flgs)) == ee_flgs) - ERTS_UNSET_RUNQ_FLG_EVACUATE(rq->flags, p->prio); - } + while (1) { + erts_aint32_t exp, new, tmp; + tmp = new = exp = state; + new &= ~ERTS_PSFLG_IN_RUNQ; + tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); + if (tmp != ERTS_PSFLG_SUSPENDED) + new |= ERTS_PSFLG_RUNNING; + state = erts_smp_atomic32_cmpxchg_relb(&p->state, new, exp); + if (state == exp) { + tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); + if (tmp == ERTS_PSFLG_SUSPENDED) + goto pick_next_process; + state = new; + break; + } + } - ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(rq, p); + rq->procs.context_switches++; - rq->procs.context_switches++; + esdp->current_process = p; - esdp->current_process = p; + } #ifdef ERTS_SMP - p->runq_flags |= ERTS_PROC_RUNQ_FLG_RUNNING; erts_smp_runq_unlock(rq); + if (flags & ERTS_RUNQ_FLG_PROTECTED) + ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED); + ERTS_SMP_CHK_NO_PROC_LOCKS; erts_smp_proc_lock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); if (erts_sched_stat.enabled) { + int prio; UWord old = ERTS_PROC_SCHED_ID(p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_STATUS), (UWord) esdp->no); int migrated = old && old != esdp->no; + prio = (int) (state & ERTS_PSFLG_PRIO_MASK); + erts_smp_spin_lock(&erts_sched_stat.lock); - erts_sched_stat.prio[p->prio].total_executed++; - erts_sched_stat.prio[p->prio].executed++; + erts_sched_stat.prio[prio].total_executed++; + erts_sched_stat.prio[prio].executed++; if (migrated) { - erts_sched_stat.prio[p->prio].total_migrated++; - erts_sched_stat.prio[p->prio].migrated++; + erts_sched_stat.prio[prio].total_migrated++; + erts_sched_stat.prio[prio].migrated++; } erts_smp_spin_unlock(&erts_sched_stat.lock); } - p->status_flags |= ERTS_PROC_SFLG_RUNNING; - p->status_flags &= ~ERTS_PROC_SFLG_INRUNQ; if (ERTS_PROC_PENDING_EXIT(p)) { erts_handle_pending_exit(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); + state = erts_smp_atomic32_read_nob(&p->state); } ASSERT(!p->scheduler_data); p->scheduler_data = esdp; - #endif - ASSERT(p->status != P_SUSPENDED); /* Never run a suspended process */ + /* Never run a suspended process */ + ASSERT(!(ERTS_PSFLG_SUSPENDED & erts_smp_atomic32_read_nob(&p->state))); ACTIVATE(p); reds = context_reds; if (IS_TRACED(p)) { - switch (p->status) { - case P_EXITING: + if (state & ERTS_PSFLG_EXITING) { if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_EXIT)) trace_sched(p, am_in_exiting); - break; - default: + } + else { if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED)) trace_sched(p, am_in); else if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_PROCS)) trace_virtual_sched(p, am_in); - break; } if (IS_TRACED_FL(p, F_TRACE_CALLS)) { erts_schedule_time_break(p, ERTS_BP_CALL_TIME_SCHEDULE_IN); } } - if (p->status != P_EXITING) - p->status = P_RUNNING; - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); #ifdef ERTS_SMP @@ -6928,7 +6815,7 @@ Process *schedule(Process *p, int calls) erts_check_my_tracer_proc(p); #endif - if (!ERTS_PROC_IS_EXITING(p) + if (!(state & ERTS_PSFLG_EXITING) && ((FLAGS(p) & F_FORCE_GC) || (MSO(p).overhead > BIN_VHEAP_SZ(p)))) { reds -= erts_garbage_collect(p, 0, p->arg_reg, p->arity); @@ -7020,17 +6907,19 @@ erts_schedule_misc_op(void (*func)(void *), void *arg) ErtsSchedulerData *esdp = erts_get_scheduler_data(); ErtsRunQueue *rq = esdp ? esdp->run_queue : ERTS_RUNQ_IX(0); ErtsMiscOpList *molp = misc_op_list_alloc(); +#ifdef ERTS_SMP + ErtsMigrationPaths *mpaths = erts_get_migration_paths(); - erts_smp_runq_lock(rq); - - while (rq->misc.evac_runq) { - ErtsRunQueue *tmp_rq = rq->misc.evac_runq; - erts_smp_runq_unlock(rq); - rq = tmp_rq; - erts_smp_runq_lock(rq); + if (!mpaths) + rq = ERTS_RUNQ_IX(0); + else { + ErtsRunQueue *erq = mpaths->mpath[rq->ix].misc_evac_runq; + if (erq) + rq = erq; } +#endif - ASSERT(!(rq->flags & ERTS_RUNQ_FLG_SUSPENDED)); + erts_smp_runq_lock(rq); molp->next = NULL; molp->func = func; @@ -7040,7 +6929,9 @@ erts_schedule_misc_op(void (*func)(void *), void *arg) else rq->misc.start = molp; rq->misc.end = molp; + erts_smp_runq_unlock(rq); + smp_notify_inc_runq(rq); } @@ -7201,7 +7092,7 @@ erts_free_proc(Process *p) ** Allocate process and find out where to place next process. */ static Process* -alloc_process(void) +alloc_process(ErtsRunQueue *rq, erts_aint32_t state) { int pix; Process* p; @@ -7280,6 +7171,12 @@ alloc_process(void) exp_lpd = act_lpd; } +#ifdef ERTS_SMP + RUNQ_SET_RQ(&p->run_queue, rq); +#endif + + erts_smp_atomic32_init_relb(&p->state, state); + #ifdef DEBUG pid = p->id; #endif @@ -7303,7 +7200,6 @@ alloc_process(void) erts_smp_rwmtx_runlock(&erts_proc_tab_rwmtx); - p->rstatus = P_FREE; p->rcount = 0; ASSERT(p == (Process *) @@ -7317,6 +7213,7 @@ system_limit: erts_smp_rwmtx_runlock(&erts_proc_tab_rwmtx); return NULL; + } Eterm @@ -7326,7 +7223,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). Eterm args, /* Arguments for function (must be well-formed list). */ ErlSpawnOpts* so) /* Options for spawn. */ { - ErtsRunQueue *rq, *notify_runq; + ErtsRunQueue *rq = NULL; Process *p; Sint arity; /* Number of arguments. */ #ifndef HYBRID @@ -7335,6 +7232,8 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). Uint sz; /* Needed words on heap. */ Uint heap_need; /* Size needed on heap. */ Eterm res = THE_NON_VALUE; + erts_aint32_t state = 0; + erts_aint32_t prio = (erts_aint32_t) PRIORITY_NORMAL; #ifdef ERTS_SMP erts_smp_proc_lock(parent, ERTS_PROC_LOCKS_ALL_MINOR); @@ -7359,8 +7258,24 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). so->error_code = BADARG; goto error; } - p = alloc_process(); /* All proc locks are locked by this thread - on success */ + + if (so->flags & SPO_USE_ARGS) { + if (so->scheduler) { + int ix = so->scheduler-1; + ASSERT(0 <= ix && ix < erts_no_run_queues); + rq = ERTS_RUNQ_IX(ix); + state |= ERTS_PSFLG_BOUND; + } + prio = (erts_aint32_t) so->priority; + } + + state |= (prio & ERTS_PSFLG_PRIO_MASK); + + if (!rq) + rq = erts_get_runq_proc(parent); + + p = alloc_process(rq, state); /* All proc locks are locked by this thread + on success */ if (!p) { erts_send_error_to_logger_str(parent->group_leader, "Too many processes\n"); @@ -7382,22 +7297,16 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->flags = erts_default_process_flags; - /* Scheduler queue mutex should be locked when changeing - * prio. In this case we don't have to lock it, since - * noone except us has access to the process. - */ if (so->flags & SPO_USE_ARGS) { p->min_heap_size = so->min_heap_size; p->min_vheap_size = so->min_vheap_size; - p->prio = so->priority; p->max_gen_gcs = so->max_gen_gcs; } else { p->min_heap_size = H_MIN_SIZE; p->min_vheap_size = BIN_VH_MIN_SIZE; - p->prio = PRIORITY_NORMAL; p->max_gen_gcs = (Uint16) erts_smp_atomic32_read_nob(&erts_max_gen_gcs); } - p->skipped = 0; + p->schedule_count = 0; ASSERT(p->min_heap_size == erts_next_heap_size(p->min_heap_size, 0)); p->initial[INITIAL_MOD] = mod; @@ -7501,12 +7410,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). : STORE_NC(&p->htop, &p->off_heap, parent->group_leader); } -#if 1 - p->trace_flags = ~TRACEE_FLAGS; - p->tracer_proc = NIL; -#else erts_get_default_tracing(&p->trace_flags, &p->tracer_proc); -#endif p->msg.first = NULL; p->msg.last = &p->msg.first; @@ -7516,7 +7420,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->msg_inq.first = NULL; p->msg_inq.last = &p->msg_inq.first; p->msg_inq.len = 0; - p->bound_runq = NULL; #endif p->bif_timers = NULL; p->mbuf = NULL; @@ -7618,9 +7521,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). #ifdef ERTS_SMP p->scheduler_data = NULL; - erts_atomic32_init_nob(&p->aflags, 0); - p->status_flags = 0; - p->runq_flags = 0; p->suspendee = NIL; p->pending_suspenders = NULL; p->pending_exit.reason = THE_NON_VALUE; @@ -7631,34 +7531,15 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->fp_exception = 0; #endif + erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL); + + res = p->id; + /* * Schedule process for execution. */ - if (!((so->flags & SPO_USE_ARGS) && so->scheduler)) - rq = erts_get_runq_proc(parent); - else { - int ix = so->scheduler-1; - ASSERT(0 <= ix && ix < erts_no_run_queues); - rq = ERTS_RUNQ_IX(ix); - p->bound_runq = rq; - } - - erts_smp_runq_lock(rq); - -#ifdef ERTS_SMP - p->run_queue = rq; -#endif - - p->status = P_WAITING; - notify_runq = internal_add_to_runq(rq, p); - - erts_smp_runq_unlock(rq); - - smp_notify_inc_runq(notify_runq); - - res = p->id; - erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL); + schedule_process(p, state, 0); VERBOSE(DEBUG_PROCESSES, ("Created a new process: %T\n",p->id)); @@ -7694,12 +7575,8 @@ void erts_init_empty_process(Process *p) p->max_gen_gcs = 0; p->min_heap_size = 0; p->min_vheap_size = 0; - p->status = P_RUNABLE; - p->gcstatus = P_RUNABLE; - p->rstatus = P_RUNABLE; p->rcount = 0; p->id = ERTS_INVALID_PID; - p->prio = PRIORITY_NORMAL; p->reds = 0; p->tracer_proc = NIL; p->trace_flags = F_INITIAL_TRACE_FLAGS; @@ -7716,7 +7593,6 @@ void erts_init_empty_process(Process *p) p->bin_vheap_mature = 0; #ifdef ERTS_SMP p->u.ptimer = NULL; - p->bound_runq = NULL; #else memset(&(p->u.tm), 0, sizeof(ErlTimer)); #endif @@ -7793,12 +7669,10 @@ void erts_init_empty_process(Process *p) p->last_old_htop = NULL; #endif + erts_smp_atomic32_init_nob(&p->state, (erts_aint32_t) PRIORITY_NORMAL); #ifdef ERTS_SMP p->scheduler_data = NULL; - erts_atomic32_init_nob(&p->aflags, 0); - p->status_flags = 0; - p->runq_flags = 0; p->msg_inq.first = NULL; p->msg_inq.last = &p->msg_inq.first; p->msg_inq.len = 0; @@ -7808,7 +7682,7 @@ void erts_init_empty_process(Process *p) p->pending_exit.bp = NULL; erts_proc_lock_init(p); erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL); - p->run_queue = ERTS_RUNQ_IX(0); + RUNQ_SET_RQ(&p->run_queue, ERTS_RUNQ_IX(0)); #endif #if !defined(NO_FPE_SIGNALS) || defined(HIPE) @@ -8003,26 +7877,34 @@ delete_process(Process* p) } +static ERTS_INLINE erts_aint32_t +set_proc_exiting_state(Process *p, erts_aint32_t state) +{ + erts_aint32_t a, n, e; + a = state; + while (1) { + n = e = a; + n &= ~(ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); + n |= ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE; + if (!(a & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING))) + n |= ERTS_PSFLG_IN_RUNQ; + a = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e); + if (a == e) + break; + } + return a; +} + static ERTS_INLINE void -set_proc_exiting(Process *p, Eterm reason, ErlHeapFragment *bp) +set_proc_exiting(Process *p, + erts_aint32_t state, + Eterm reason, + ErlHeapFragment *bp) { -#ifdef ERTS_SMP - erts_pix_lock_t *pix_lock = ERTS_PID2PIXLOCK(p->id); ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(p) == ERTS_PROC_LOCKS_ALL); - /* - * You are required to have all proc locks and the pix lock when going - * to status P_EXITING. This makes it is enough to take any lock when - * looking up a process (pid2proc()) to prevent the looked up process - * from exiting until the lock has been released. - */ - erts_pix_lock(pix_lock); - erts_atomic32_read_bor_relb(&p->aflags, ERTS_PROC_AFLG_EXITING); -#endif - p->status = P_EXITING; -#ifdef ERTS_SMP - erts_pix_unlock(pix_lock); -#endif + state = set_proc_exiting_state(p, state); + p->fvalue = reason; if (bp) erts_link_mbuf_to_proc(p, bp); @@ -8035,6 +7917,14 @@ set_proc_exiting(Process *p, Eterm reason, ErlHeapFragment *bp) KILL_CATCHES(p); cancel_timer(p); p->i = (BeamInstr *) beam_exit; + + if (erts_system_profile_flags.runnable_procs + && !(state & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) { + profile_runnable_proc(p, am_active); + } + + if (!(state & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING))) + add2runq(p, state); } @@ -8047,8 +7937,8 @@ erts_handle_pending_exit(Process *c_p, ErtsProcLocks locks) ASSERT(is_value(c_p->pending_exit.reason)); ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == locks); ERTS_SMP_LC_ASSERT(locks & ERTS_PROC_LOCK_MAIN); - ERTS_SMP_LC_ASSERT(c_p->status != P_EXITING); - ERTS_SMP_LC_ASSERT(c_p->status != P_FREE); + ERTS_SMP_LC_ASSERT(!((ERTS_PSFLG_EXITING|ERTS_PSFLG_FREE) + & erts_smp_atomic32_read_nob(&c_p->state))); /* Ensure that all locks on c_p are locked before proceeding... */ if (locks == ERTS_PROC_LOCKS_ALL) @@ -8061,7 +7951,10 @@ erts_handle_pending_exit(Process *c_p, ErtsProcLocks locks) } } - set_proc_exiting(c_p, c_p->pending_exit.reason, c_p->pending_exit.bp); + set_proc_exiting(c_p, + erts_smp_atomic32_read_acqb(&c_p->state), + c_p->pending_exit.reason, + c_p->pending_exit.bp); c_p->pending_exit.reason = THE_NON_VALUE; c_p->pending_exit.bp = NULL; @@ -8077,11 +7970,12 @@ handle_pending_exiters(ErtsProcList *pnd_xtrs) while (plp) { Process *p = erts_pid2proc(NULL, 0, plp->pid, ERTS_PROC_LOCKS_ALL); if (p) { - if (proclist_same(plp, p) - && !(p->status_flags & ERTS_PROC_SFLG_RUNNING)) { - ASSERT(p->status_flags & ERTS_PROC_SFLG_INRUNQ); - ASSERT(ERTS_PROC_PENDING_EXIT(p)); - erts_handle_pending_exit(p, ERTS_PROC_LOCKS_ALL); + if (proclist_same(plp, p)) { + erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); + if (!(state & ERTS_PSFLG_RUNNING)) { + ASSERT(state & ERTS_PSFLG_PENDING_EXIT); + erts_handle_pending_exit(p, ERTS_PROC_LOCKS_ALL); + } } erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL); } @@ -8171,7 +8065,7 @@ send_exit_message(Process *to, ErtsProcLocks *to_locksp, * SMP emulator). When the signal is received the receiver receives an * 'EXIT' message if it is trapping exits; otherwise, it will either * ignore the signal if the exit reason is normal, or go into an - * exiting state (status P_EXITING). When a process has gone into the + * exiting state (ERTS_PSFLG_EXITING). When a process has gone into the * exiting state it will not execute any more Erlang code, but it might * take a while before it actually exits. The exit signal is being * received when the 'EXIT' message is put in the message queue, the @@ -8244,6 +8138,7 @@ send_exit_signal(Process *c_p, /* current process if and only Uint32 flags /* flags */ ) { + erts_aint32_t state = erts_smp_atomic32_read_nob(&rp->state); Eterm rsn = reason == am_kill ? am_killed : reason; ERTS_SMP_LC_ASSERT(*rp_locks == erts_proc_lc_my_proc_locks(rp)); @@ -8265,7 +8160,7 @@ send_exit_signal(Process *c_p, /* current process if and only } #endif - if (ERTS_PROC_IS_TRAPPING_EXITS(rp) + if ((state & ERTS_PSFLG_TRAP_EXIT) && (reason != am_kill || (flags & ERTS_XSIG_FLG_IGN_KILL))) { if (is_not_nil(token) #ifdef USE_VM_PROBES @@ -8281,9 +8176,7 @@ send_exit_signal(Process *c_p, /* current process if and only } else if (reason != am_normal || (flags & ERTS_XSIG_FLG_NO_IGN_NORMAL)) { #ifdef ERTS_SMP - if (!ERTS_PROC_PENDING_EXIT(rp) && !ERTS_PROC_IS_EXITING(rp)) { - ASSERT(rp->status != P_EXITING); - ASSERT(rp->status != P_FREE); + if (!(state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))) { ASSERT(!rp->pending_exit.bp); if (rp == c_p && (*rp_locks & ERTS_PROC_LOCK_MAIN)) { @@ -8299,9 +8192,9 @@ send_exit_signal(Process *c_p, /* current process if and only } *rp_locks = ERTS_PROC_LOCKS_ALL; } - set_proc_exiting(c_p, rsn, NULL); + set_proc_exiting(c_p, state, rsn, NULL); } - else if (!(rp->status_flags & ERTS_PROC_SFLG_RUNNING)) { + else if (!(state & ERTS_PSFLG_RUNNING)) { /* Process not running ... */ ErtsProcLocks need_locks = ~(*rp_locks) & ERTS_PROC_LOCKS_ALL; if (need_locks @@ -8318,6 +8211,7 @@ send_exit_signal(Process *c_p, /* current process if and only /* ...and we have all locks on it... */ *rp_locks = ERTS_PROC_LOCKS_ALL; set_proc_exiting(rp, + state, (is_immed(rsn) ? rsn : copy_object(rsn, rp)), @@ -8347,13 +8241,9 @@ send_exit_signal(Process *c_p, /* current process if and only &bp->off_heap); rp->pending_exit.bp = bp; } - erts_atomic32_read_bor_relb(&rp->aflags, - ERTS_PROC_AFLG_PENDING_EXIT); - ASSERT(ERTS_PROC_PENDING_EXIT(rp)); + erts_smp_atomic32_read_bor_relb(&rp->state, + ERTS_PSFLG_PENDING_EXIT); } - if (!(rp->status_flags - & (ERTS_PROC_SFLG_INRUNQ|ERTS_PROC_SFLG_RUNNING))) - erts_add_to_runq(rp); } /* else: * @@ -8365,18 +8255,15 @@ send_exit_signal(Process *c_p, /* current process if and only * exit or by itself before seeing the pending exit. */ #else /* !ERTS_SMP */ - if (c_p == rp) { - rp->status = P_EXITING; - c_p->fvalue = rsn; - } - else if (rp->status != P_EXITING) { /* No recursive process exits /PaN */ - Eterm old_status = rp->status; + erts_aint32_t state = erts_smp_atomic32_read_nob(&rp->state); + if (!(state & ERTS_PSFLG_EXITING)) { set_proc_exiting(rp, - is_immed(rsn) ? rsn : copy_object(rsn, rp), + state, + (is_immed(rsn) || c_p == rp + ? rsn + : copy_object(rsn, rp)), NULL); ACTIVATE(rp); - if (old_status != P_RUNABLE && old_status != P_RUNNING) - erts_add_to_runq(rp); } #endif return -1; /* Receiver will exit */ @@ -8688,22 +8575,14 @@ proc_dec_refc(void *vproc) #endif -static void -continue_exit_process(Process *p -#ifdef ERTS_SMP - , erts_pix_lock_t *pix_lock -#endif - ); - /* this function fishishes a process and propagates exit messages - called by process_main when a process dies */ void erts_do_exit_process(Process* p, Eterm reason) { #ifdef ERTS_SMP - erts_pix_lock_t *pix_lock = ERTS_PID2PIXLOCK(p->id); + erts_aint32_t state; #endif - p->arity = 0; /* No live registers */ p->fvalue = reason; @@ -8721,27 +8600,17 @@ erts_do_exit_process(Process* p, Eterm reason) #ifdef ERTS_SMP ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); /* By locking all locks (main lock is already locked) when going - to status P_EXITING, it is enough to take any lock when + to exiting state (ERTS_PSFLG_EXITING), it is enough to take any lock when looking up a process (erts_pid2proc()) to prevent the looked up process from exiting until the lock has been released. */ erts_smp_proc_lock(p, ERTS_PROC_LOCKS_ALL_MINOR); #endif - - if (erts_system_profile_flags.runnable_procs && (p->status != P_WAITING)) { - profile_runnable_proc(p, am_inactive); - } - -#ifdef ERTS_SMP - erts_pix_lock(pix_lock); - erts_atomic32_read_bor_relb(&p->aflags, ERTS_PROC_AFLG_EXITING); -#endif - - p->status = P_EXITING; -#ifdef ERTS_SMP - erts_pix_unlock(pix_lock); - - if (ERTS_PROC_PENDING_EXIT(p)) { +#ifndef ERTS_SMP + set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state)); +#else + state = set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state)); + if (state & ERTS_PSFLG_PENDING_EXIT) { /* Process exited before pending exit was received... */ p->pending_exit.reason = THE_NON_VALUE; if (p->pending_exit.bp) { @@ -8780,29 +8649,11 @@ erts_do_exit_process(Process* p, Eterm reason) erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL_MINOR); -#ifdef ERTS_SMP - continue_exit_process(p, pix_lock); -#else - continue_exit_process(p); -#endif + erts_continue_exit_process(p); } void -erts_continue_exit_process(Process *c_p) -{ -#ifdef ERTS_SMP - continue_exit_process(c_p, ERTS_PID2PIXLOCK(c_p->id)); -#else - continue_exit_process(c_p); -#endif -} - -static void -continue_exit_process(Process *p -#ifdef ERTS_SMP - , erts_pix_lock_t *pix_lock -#endif - ) +erts_continue_exit_process(Process *p) { ErtsLink* lnk; ErtsMonitor *mon; @@ -8818,11 +8669,7 @@ continue_exit_process(Process *p ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(p)); -#ifdef DEBUG - erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); - ASSERT(p->status == P_EXITING); - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); -#endif + ASSERT(ERTS_PROC_IS_EXITING(p)); #ifdef ERTS_SMP if (p->flags & F_HAVE_BLCKD_MSCHED) { @@ -8914,7 +8761,6 @@ continue_exit_process(Process *p p->scheduler_data->current_process = NULL; p->scheduler_data->free_process = p; - p->status_flags = 0; #endif /* Time of death! */ erts_smp_atomic_set_relb(&erts_proc.tab[pix], ERTS_AINT_NULL); @@ -8946,7 +8792,21 @@ continue_exit_process(Process *p lnk = p->nlinks; p->nlinks = NULL; - p->status = P_FREE; + + { + /* Inactivate and notify free */ + erts_aint32_t n, e, a = erts_smp_atomic32_read_nob(&p->state); + while (1) { + n = e = a; + ASSERT(a & ERTS_PSFLG_EXITING); + n |= ERTS_PSFLG_FREE; + n &= ~ERTS_PSFLG_ACTIVE; + a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); + if (a == e) + break; + } + } + dep = ((p->flags & F_DISTRIBUTION) ? ERTS_PROC_SET_DIST_ENTRY(p, ERTS_PROC_LOCKS_ALL, NULL) : NULL); @@ -9032,8 +8892,6 @@ continue_exit_process(Process *p ERTS_SMP_LC_ASSERT(curr_locks == erts_proc_lc_my_proc_locks(p)); ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN & curr_locks); - ASSERT(p->status == P_EXITING); - p->i = (BeamInstr *) beam_continue_exit; if (!(curr_locks & ERTS_PROC_LOCK_STATUS)) { @@ -9041,8 +8899,6 @@ continue_exit_process(Process *p curr_locks |= ERTS_PROC_LOCK_STATUS; } - erts_add_to_runq(p); - if (curr_locks != ERTS_PROC_LOCK_MAIN) erts_smp_proc_unlock(p, ~ERTS_PROC_LOCK_MAIN & curr_locks); @@ -9054,33 +8910,15 @@ continue_exit_process(Process *p static void timeout_proc(Process* p) { + erts_aint32_t state; BeamInstr** pi = (BeamInstr **) p->def_arg_reg; p->i = *pi; p->flags |= F_TIMO; p->flags &= ~F_INSLPQUEUE; - switch (p->status) { - case P_GARBING: - switch (p->gcstatus) { - case P_SUSPENDED: - goto suspended; - case P_WAITING: - goto waiting; - default: - break; - } - break; - case P_WAITING: - waiting: - erts_add_to_runq(p); - break; - case P_SUSPENDED: - suspended: - p->rstatus = P_RUNABLE; /* MUST set resume status to runnable */ - break; - default: - break; - } + state = erts_smp_atomic32_read_acqb(&p->state); + if (!(state & ERTS_PSFLG_ACTIVE)) + schedule_process(p, state, 0); } @@ -9148,6 +8986,7 @@ erts_stack_dump(int to, void *to_arg, Process *p) void erts_program_counter_info(int to, void *to_arg, Process *p) { + erts_aint32_t state; int i; erts_print(to, to_arg, "Program counter: %p (", p->i); @@ -9156,7 +8995,8 @@ erts_program_counter_info(int to, void *to_arg, Process *p) erts_print(to, to_arg, "CP: %p (", p->cp); print_function_from_pc(to, to_arg, p->cp); erts_print(to, to_arg, ")\n"); - if (!((p->status == P_RUNNING) || (p->status == P_GARBING))) { + state = erts_smp_atomic32_read_acqb(&p->state); + if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_GC))) { erts_print(to, to_arg, "arity = %d\n",p->arity); if (!ERTS_IS_CRASH_DUMPING) { /* |