diff options
author | Rickard Green <rickard@erlang.org> | 2016-01-27 14:45:32 +0100 |
---|---|---|
committer | Rickard Green <rickard@erlang.org> | 2016-02-15 09:55:42 +0100 |
commit | a7f48d4972be0c7984d5cb0e08e73260c0fdbe1b (patch) | |
tree | 7cd5326a18f7d4e2a0651606bd2566dab0b71bfb /erts/emulator/beam/erl_process.c | |
parent | 7cf9a621c5280a3e97967c4c63ab6ca1adde69c3 (diff) | |
download | otp-a7f48d4972be0c7984d5cb0e08e73260c0fdbe1b.tar.gz otp-a7f48d4972be0c7984d5cb0e08e73260c0fdbe1b.tar.bz2 otp-a7f48d4972be0c7984d5cb0e08e73260c0fdbe1b.zip |
Ensure that work is done on the correct type of schedulers
Only the actual call to the dirty nif is allowed to execute on
dirty schedulers. The dirty nif is not allowed to execute on
normal schedulers if dirty schedulers are available.
Arrival of exit signals and system tasks, while a process was
scheduled for execution on a dirty scheduler, could mess up the
process internal state.
Preparation for dirty system task has been made, but is currently
unused.
Diffstat (limited to 'erts/emulator/beam/erl_process.c')
-rw-r--r-- | erts/emulator/beam/erl_process.c | 650 |
1 files changed, 412 insertions, 238 deletions
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index d583118e7b..1765315ed3 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -2969,7 +2969,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) sched_wall_time_change(esdp, thr_prgr_active); while (1) { - ErtsMonotonicTime current_time; + ErtsMonotonicTime current_time = 0; aux_work = erts_atomic32_read_acqb(&ssi->aux_work); if (aux_work) { @@ -3635,6 +3635,13 @@ check_requeue_process(ErtsRunQueue *rq, int prio_q) return 0; } +static ERTS_INLINE void +free_proxy_proc(Process *proxy) +{ + ASSERT(erts_smp_atomic32_read_nob(&proxy->state) & ERTS_PSFLG_PROXY); + erts_free(ERTS_ALC_T_PROC, proxy); +} + #ifdef ERTS_SMP static ErtsRunQueue * @@ -3949,9 +3956,6 @@ evacuate_run_queue(ErtsRunQueue *rq, erts_aint32_t state; Process *proc; int notify = 0; -#ifdef ERTS_DIRTY_SCHEDULERS - int requeue; -#endif to_rq = NULL; #ifdef ERTS_DIRTY_SCHEDULERS @@ -3966,49 +3970,94 @@ evacuate_run_queue(ErtsRunQueue *rq, proc = dequeue_process(rq, prio_q, &state); while (proc) { -#ifdef ERTS_DIRTY_SCHEDULERS - requeue = 1; + Process *real_proc; + int prio; + erts_aint32_t max_qbit, qbit, real_state; + + prio = ERTS_PSFLGS_GET_PRQ_PRIO(state); + qbit = ((erts_aint32_t) 1) << prio; + + if (!(state & ERTS_PSFLG_PROXY)) { + real_proc = proc; + real_state = state; + } + else { + real_proc = erts_proc_lookup_raw(proc->common.id); + if (!real_proc) { + free_proxy_proc(proc); + goto handle_next_proc; + } + real_state = erts_smp_atomic32_read_acqb(&real_proc->state); + } + + max_qbit = (state >> ERTS_PSFLGS_IN_PRQ_MASK_OFFSET); + max_qbit &= ERTS_PSFLGS_QMASK; + max_qbit |= 1 << ERTS_PSFLGS_QMASK_BITS; + max_qbit &= -max_qbit; + + if (qbit > max_qbit) { + /* Process already queued with higher prio; drop it... */ + if (real_proc != proc) + free_proxy_proc(proc); + else { + erts_aint32_t clr_bits; +#ifdef DEBUG + erts_aint32_t old; #endif - if (ERTS_PSFLG_BOUND & state) { - /* Bound processes get stuck here... */ - proc->next = NULL; - if (sbpp->last) - sbpp->last->next = proc; - else - sbpp->first = proc; - sbpp->last = proc; -#ifdef ERTS_DIRTY_SCHEDULERS - requeue = 0; + + clr_bits = ERTS_PSFLG_IN_RUNQ; + clr_bits |= qbit << ERTS_PSFLGS_IN_PRQ_MASK_OFFSET; + +#ifdef DEBUG + old = +#else + (void) #endif + erts_smp_atomic32_read_band_mb(&proc->state, + ~clr_bits); + ASSERT((old & clr_bits) == clr_bits); + + } + + goto handle_next_proc; } + #ifdef ERTS_DIRTY_SCHEDULERS - else if (state & ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q) { + + if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix)) { + erts_aint32_t dqbit = qbit; #ifdef DEBUG - erts_aint32_t old = -#else - (void) + erts_aint32_t old_dqbit; #endif - erts_smp_atomic32_read_band_nob(&proc->state, - ~(ERTS_PSFLG_DIRTY_CPU_PROC - | ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q)); - /* assert that no other dirty flags are set */ - ASSERT(!(old & (ERTS_PSFLG_DIRTY_IO_PROC|ERTS_PSFLG_DIRTY_IO_PROC_IN_Q))); - } else if (state & ERTS_PSFLG_DIRTY_IO_PROC_IN_Q) { + + if (rq == ERTS_DIRTY_CPU_RUNQ) + dqbit <<= ERTS_PDSFLGS_IN_CPU_PRQ_MASK_OFFSET; + else { + ASSERT(rq == ERTS_DIRTY_IO_RUNQ); + dqbit <<= ERTS_PDSFLGS_IN_IO_PRQ_MASK_OFFSET; + } + #ifdef DEBUG - erts_aint32_t old = + old_dqbit = (int) #else - (void) + (void) #endif - erts_smp_atomic32_read_band_nob(&proc->state, - ~(ERTS_PSFLG_DIRTY_IO_PROC - | ERTS_PSFLG_DIRTY_IO_PROC_IN_Q)); - /* assert that no other dirty flags are set */ - ASSERT(!(old & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q))); + erts_smp_atomic32_read_band_mb(&real_proc->dirty_state, + ~dqbit); + ASSERT(old_dqbit & dqbit); } - if (requeue) { -#else - else { #endif + + if (ERTS_PSFLG_BOUND & real_state) { + /* Bound processes get stuck here... */ + proc->next = NULL; + if (sbpp->last) + sbpp->last->next = proc; + else + sbpp->first = proc; + sbpp->last = proc; + } + else { int prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state); erts_smp_runq_unlock(rq); @@ -4031,6 +4080,8 @@ evacuate_run_queue(ErtsRunQueue *rq, erts_smp_runq_lock(rq); } + + handle_next_proc: proc = dequeue_process(rq, prio_q, &state); } if (notify) @@ -5974,19 +6025,96 @@ make_proxy_proc(Process *prev_proxy, Process *proc, erts_aint32_t prio) return proxy; } -static ERTS_INLINE void -free_proxy_proc(Process *proxy) -{ - ASSERT(erts_smp_atomic32_read_nob(&proxy->state) & ERTS_PSFLG_PROXY); - erts_free(ERTS_ALC_T_PROC, proxy); -} - #define ERTS_ENQUEUE_NOT 0 #define ERTS_ENQUEUE_NORMAL_QUEUE 1 -#ifdef ERTS_DIRTY_SCHEDULERS #define ERTS_ENQUEUE_DIRTY_CPU_QUEUE 2 #define ERTS_ENQUEUE_DIRTY_IO_QUEUE 3 + +#ifdef ERTS_DIRTY_SCHEDULERS + +static int +check_dirty_enqueue_in_prio_queue(Process *c_p, + erts_aint32_t *newp, + erts_aint32_t actual, + erts_aint32_t aprio, + erts_aint32_t qbit) +{ + int queue; + erts_aint32_t dact, max_qbit; + + /* Termination should be done on an ordinary scheduler */ + if (actual & ERTS_PSFLG_EXITING) { + *newp &= ~ERTS_PSFLGS_DIRTY_WORK; + return ERTS_ENQUEUE_NORMAL_QUEUE; + } + + /* + * If we have system tasks, we enqueue on ordinary run-queue + * and take care of those system tasks first. + */ + if (actual & ERTS_PSFLG_ACTIVE_SYS) + return ERTS_ENQUEUE_NORMAL_QUEUE; + + dact = erts_smp_atomic32_read_mb(&c_p->dirty_state); + if (actual & (ERTS_PSFLG_DIRTY_ACTIVE_SYS + | ERTS_PSFLG_DIRTY_CPU_PROC)) { + max_qbit = ((dact >> ERTS_PDSFLGS_IN_CPU_PRQ_MASK_OFFSET) + & ERTS_PDSFLGS_QMASK); + queue = ERTS_ENQUEUE_DIRTY_CPU_QUEUE; + } + else { + ASSERT(actual & ERTS_PSFLG_DIRTY_IO_PROC); + max_qbit = ((dact >> ERTS_PDSFLGS_IN_IO_PRQ_MASK_OFFSET) + & ERTS_PDSFLGS_QMASK); + queue = ERTS_ENQUEUE_DIRTY_IO_QUEUE; + } + + max_qbit |= 1 << ERTS_PSFLGS_QMASK_BITS; + max_qbit &= -max_qbit; + + if (qbit >= max_qbit) + return ERTS_ENQUEUE_NOT; /* Already queued in higher or equal prio */ + if ((actual & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLGS_USR_PRIO_MASK)) + != (aprio << ERTS_PSFLGS_USR_PRIO_OFFSET)) { + /* + * Process struct already enqueued, or actual prio not + * equal to user prio, i.e., enqueue using proxy. + */ + return -1*queue; + } + + *newp |= ERTS_PSFLG_IN_RUNQ; + return queue; +} + +static ERTS_INLINE int +fin_dirty_enq_s_change(Process *p, + int pstruct_reserved, + erts_aint32_t enq_prio, + int qmask_offset) +{ + erts_aint32_t qbit = 1 << enq_prio; + qbit <<= qmask_offset; + + if (qbit & erts_smp_atomic32_read_bor_mb(&p->dirty_state, qbit)) { + /* Already enqueue by someone else... */ + if (pstruct_reserved) { + /* We reserved process struct for enqueue; clear it... */ +#ifdef DEBUG + erts_aint32_t old = +#else + (void) #endif + erts_smp_atomic32_read_band_nob(&p->state, ~ERTS_PSFLG_IN_RUNQ); + ASSERT(old & ERTS_PSFLG_IN_RUNQ); + } + return 0; + } + + return !0; +} + +#endif /* ERTS_DIRTY_SCHEDULERS */ static ERTS_INLINE int check_enqueue_in_prio_queue(Process *c_p, @@ -6002,61 +6130,14 @@ check_enqueue_in_prio_queue(Process *c_p, *prq_prio_p = aprio; #ifdef ERTS_DIRTY_SCHEDULERS - if (actual & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC)) { - /* - * If we have system tasks of a priority higher - * or equal to the user priority, we enqueue - * on ordinary run-queue and take care of - * those system tasks first. - */ - if (actual & ERTS_PSFLG_ACTIVE_SYS) { - erts_aint32_t uprio, stprio, qmask; - uprio = (actual >> ERTS_PSFLGS_USR_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK; - if (aprio < uprio) - goto enqueue_normal_runq; /* system tasks with higher prio */ - erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); - qmask = c_p->sys_task_qs->qmask; - erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS); - switch (qmask & -qmask) { - case MAX_BIT: - stprio = PRIORITY_MAX; - break; - case HIGH_BIT: - stprio = PRIORITY_HIGH; - break; - case NORMAL_BIT: - stprio = PRIORITY_NORMAL; - break; - case LOW_BIT: - stprio = PRIORITY_LOW; - break; - default: - stprio = PRIORITY_LOW+1; - break; - } - if (stprio <= uprio) - goto enqueue_normal_runq; /* system tasks with higher prio */ - } - - /* Enqueue in dirty run queue if not already enqueued */ - if (actual & (ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q|ERTS_PSFLG_DIRTY_IO_PROC_IN_Q)) - return ERTS_ENQUEUE_NOT; /* already in queue */ - if (actual & ERTS_PSFLG_DIRTY_CPU_PROC) { - *newp |= ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q; - if (actual & ERTS_PSFLG_IN_RUNQ) - return -ERTS_ENQUEUE_DIRTY_CPU_QUEUE; /* use proxy */ - *newp |= ERTS_PSFLG_IN_RUNQ; - return ERTS_ENQUEUE_DIRTY_CPU_QUEUE; - } - *newp |= ERTS_PSFLG_DIRTY_IO_PROC_IN_Q; - if (actual & ERTS_PSFLG_IN_RUNQ) - return -ERTS_ENQUEUE_DIRTY_IO_QUEUE; /* use proxy */ - *newp |= ERTS_PSFLG_IN_RUNQ; - return ERTS_ENQUEUE_DIRTY_IO_QUEUE; + if (actual & ERTS_PSFLGS_DIRTY_WORK) { + int res = check_dirty_enqueue_in_prio_queue(c_p, newp, actual, + aprio, qbit); + if (res != ERTS_ENQUEUE_NORMAL_QUEUE) + return res; } - - enqueue_normal_runq: #endif + max_qbit = (actual >> ERTS_PSFLGS_IN_PRQ_MASK_OFFSET) & ERTS_PSFLGS_QMASK; max_qbit |= 1 << ERTS_PSFLGS_QMASK_BITS; max_qbit &= -max_qbit; @@ -6088,6 +6169,65 @@ check_enqueue_in_prio_queue(Process *c_p, return ERTS_ENQUEUE_NORMAL_QUEUE; } +static ERTS_INLINE ErtsRunQueue * +select_enqueue_run_queue(int enqueue, int enq_prio, Process *p, erts_aint32_t state) +{ + + switch (enqueue) { + + case ERTS_ENQUEUE_NOT: + + return NULL; + +#ifdef ERTS_DIRTY_SCHEDULERS + + case ERTS_ENQUEUE_DIRTY_CPU_QUEUE: + case -ERTS_ENQUEUE_DIRTY_CPU_QUEUE: + + if (fin_dirty_enq_s_change(p, enqueue > 0, enq_prio, + ERTS_PDSFLGS_IN_CPU_PRQ_MASK_OFFSET)) + return ERTS_DIRTY_CPU_RUNQ; + + return NULL; + + + case ERTS_ENQUEUE_DIRTY_IO_QUEUE: + case -ERTS_ENQUEUE_DIRTY_IO_QUEUE: + + if (fin_dirty_enq_s_change(p, enqueue > 0, enq_prio, + ERTS_PDSFLGS_IN_IO_PRQ_MASK_OFFSET)) + return ERTS_DIRTY_IO_RUNQ; + + return NULL; + +#endif + + default: { + ErtsRunQueue* runq; + + ASSERT(enqueue == ERTS_ENQUEUE_NORMAL_QUEUE + || enqueue == -ERTS_ENQUEUE_NORMAL_QUEUE); + + runq = erts_get_runq_proc(p); + +#ifdef ERTS_SMP + if (!(ERTS_PSFLG_BOUND & state)) { + ErtsRunQueue *new_runq = erts_check_emigration_need(runq, enq_prio); + if (new_runq) { + RUNQ_SET_RQ(&p->run_queue, new_runq); + runq = new_runq; + } + } +#endif + + ASSERT(runq); + + return runq; + } + } +} + + /* * schedule_out_process() return with c_rq locked. */ @@ -6096,11 +6236,7 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Proces { erts_aint32_t a, e, n, enq_prio = -1; int enqueue; /* < 0 -> use proxy */ - Process* sched_p; ErtsRunQueue* runq; -#ifdef ERTS_SMP - int check_emigration_need; -#endif a = state; @@ -6112,7 +6248,7 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Proces enqueue = ERTS_ENQUEUE_NOT; n &= ~(ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS); - if (a & ERTS_PSFLG_ACTIVE_SYS + if (a & (ERTS_PSFLG_ACTIVE_SYS|ERTS_PSFLG_DIRTY_ACTIVE_SYS) || (a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) { enqueue = check_enqueue_in_prio_queue(p, &enq_prio, &n, a); } @@ -6121,16 +6257,17 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Proces break; } - switch (enqueue) { - case ERTS_ENQUEUE_NOT: + runq = select_enqueue_run_queue(enqueue, enq_prio, p, n); + + if (!runq) { + if (erts_system_profile_flags.runnable_procs) { /* Status lock prevents out of order "runnable proc" trace msgs */ ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - if (!(a & ERTS_PSFLG_ACTIVE_SYS) - && (!(a & ERTS_PSFLG_ACTIVE) - || (a & ERTS_PSFLG_SUSPENDED))) { + if (!(a & (ERTS_PSFLG_ACTIVE_SYS|ERTS_PSFLG_DIRTY_ACTIVE_SYS)) + && (!(a & ERTS_PSFLG_ACTIVE) || (a & ERTS_PSFLG_SUSPENDED))) { /* Process inactive */ profile_runnable_proc(p, am_inactive); } @@ -6143,98 +6280,76 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Proces return 0; -#ifdef ERTS_DIRTY_SCHEDULERS -#ifdef ERTS_SMP - case ERTS_ENQUEUE_DIRTY_CPU_QUEUE: - case -ERTS_ENQUEUE_DIRTY_CPU_QUEUE: - runq = ERTS_DIRTY_CPU_RUNQ; - ASSERT(ERTS_SCHEDULER_IS_DIRTY_CPU(runq->scheduler)); -#ifdef ERTS_SMP - check_emigration_need = 0; -#endif - break; + } + else { + Process* sched_p; - case ERTS_ENQUEUE_DIRTY_IO_QUEUE: - case -ERTS_ENQUEUE_DIRTY_IO_QUEUE: - runq = ERTS_DIRTY_IO_RUNQ; - ASSERT(ERTS_SCHEDULER_IS_DIRTY_IO(runq->scheduler)); -#ifdef ERTS_SMP - check_emigration_need = 0; -#endif - break; -#endif -#endif + ASSERT(!(n & ERTS_PSFLG_SUSPENDED) || (n & (ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_DIRTY_ACTIVE_SYS))); - default: - ASSERT(enqueue == ERTS_ENQUEUE_NORMAL_QUEUE - || enqueue == -ERTS_ENQUEUE_NORMAL_QUEUE); + if (enqueue < 0) + sched_p = make_proxy_proc(proxy, p, enq_prio); + else { + sched_p = p; + if (proxy) + free_proxy_proc(proxy); + } - runq = erts_get_runq_proc(p); -#ifdef ERTS_SMP - check_emigration_need = !(ERTS_PSFLG_BOUND & n); -#endif - break; - } + ASSERT(runq); - ASSERT(!(n & ERTS_PSFLG_SUSPENDED) || (n & ERTS_PSFLG_ACTIVE_SYS)); + erts_smp_runq_lock(runq); - if (enqueue < 0) - sched_p = make_proxy_proc(proxy, p, enq_prio); - else { - sched_p = p; - if (proxy) - free_proxy_proc(proxy); - } + /* Enqueue the process */ + enqueue_process(runq, (int) enq_prio, sched_p); -#ifdef ERTS_SMP - if (check_emigration_need) { - ErtsRunQueue *new_runq = erts_check_emigration_need(runq, enq_prio); - if (new_runq) { - RUNQ_SET_RQ(&sched_p->run_queue, new_runq); - runq = new_runq; - } - } -#endif + if (runq == c_rq) + return 1; - ASSERT(runq); + erts_smp_runq_unlock(runq); - erts_smp_runq_lock(runq); + smp_notify_inc_runq(runq); - /* Enqueue the process */ - enqueue_process(runq, (int) enq_prio, sched_p); + erts_smp_runq_lock(c_rq); - if (runq == c_rq) return 1; - erts_smp_runq_unlock(runq); - smp_notify_inc_runq(runq); - erts_smp_runq_lock(c_rq); - return 1; + } + } static ERTS_INLINE void -add2runq(Process *p, erts_aint32_t state, erts_aint32_t prio) +add2runq(int enqueue, erts_aint32_t prio, + Process *proc, erts_aint32_t state, + Process **proxy) { - ErtsRunQueue *runq = erts_get_runq_proc(p); + ErtsRunQueue *runq; -#ifdef ERTS_SMP - if (!(ERTS_PSFLG_BOUND & state)) { - ErtsRunQueue *new_runq = erts_check_emigration_need(runq, (int) prio); - if (new_runq) { - RUNQ_SET_RQ(&p->run_queue, new_runq); - runq = new_runq; - } - } -#endif - ASSERT(runq); + runq = select_enqueue_run_queue(enqueue, prio, proc, state); - erts_smp_runq_lock(runq); + if (runq) { + Process *sched_p; - /* Enqueue the process */ - enqueue_process(runq, (int) prio, p); + if (enqueue > 0) + sched_p = proc; + else { + Process *pxy; - erts_smp_runq_unlock(runq); - smp_notify_inc_runq(runq); + if (!proxy) + pxy = NULL; + else { + pxy = *proxy; + *proxy = NULL; + } + sched_p = make_proxy_proc(pxy, proc, prio); + } + + erts_smp_runq_lock(runq); + + /* Enqueue the process */ + enqueue_process(runq, (int) prio, sched_p); + erts_smp_runq_unlock(runq); + smp_notify_inc_runq(runq); + } } static ERTS_INLINE int @@ -6340,10 +6455,7 @@ schedule_process(Process *p, erts_aint32_t in_state, ErtsProcLocks locks) &state, &enq_prio, locks); - if (enqueue != ERTS_ENQUEUE_NOT) - add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), - state, - enq_prio); + add2runq(enqueue, enq_prio, p, state, NULL); } void @@ -6403,16 +6515,7 @@ schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy) prof_runnable_procs = 0; } - if (enqueue != ERTS_ENQUEUE_NOT) { - Process *sched_p; - if (enqueue > 0) - sched_p = p; - else { - sched_p = make_proxy_proc(proxy, p, enq_prio); - proxy = NULL; - } - add2runq(sched_p, n, enq_prio); - } + add2runq(enqueue, enq_prio, p, n, &proxy); cleanup: @@ -6508,10 +6611,7 @@ resume_process(Process *p, ErtsProcLocks locks) &state, &enq_prio, locks); - if (enqueue) - add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), - state, - enq_prio); + add2runq(enqueue, enq_prio, p, state, NULL); } int @@ -9361,15 +9461,16 @@ Process *schedule(Process *p, int calls) #ifdef ERTS_SMP ErtsMigrationPaths *mps; ErtsMigrationPath *mp; - ErtsProcList *pnd_xtrs = rq->procs.pending_exiters; - if (erts_proclist_fetch(&pnd_xtrs, NULL)) { + + if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) { + ErtsProcList *pnd_xtrs = rq->procs.pending_exiters; + if (erts_proclist_fetch(&pnd_xtrs, NULL)) { rq->procs.pending_exiters = NULL; erts_smp_runq_unlock(rq); handle_pending_exiters(pnd_xtrs); erts_smp_runq_lock(rq); } - if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) { if (rq->check_balance_reds <= 0) check_balance(rq); @@ -9564,11 +9665,12 @@ Process *schedule(Process *p, int calls) pick_next_process: { erts_aint32_t psflg_band_mask; int prio_q; - int qmask; + int qmask, qbit; flags = ERTS_RUNQ_FLGS_GET_NOB(rq); qmask = (int) (flags & ERTS_RUNQ_FLGS_PROCS_QMASK); - switch (qmask & -qmask) { + qbit = qmask & -qmask; + switch (qbit) { case MAX_BIT: prio_q = PRIORITY_MAX; break; @@ -9596,20 +9698,11 @@ Process *schedule(Process *p, int calls) ASSERT(p); /* Wrong qmask in rq->flags? */ - psflg_band_mask = ~(((erts_aint32_t) 1) << (ERTS_PSFLGS_GET_PRQ_PRIO(state) - + ERTS_PSFLGS_IN_PRQ_MASK_OFFSET)); - -#ifdef ERTS_DIRTY_SCHEDULERS - ASSERT((state & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC)) != - (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC)); - if (state & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC)) { - ASSERT((ERTS_SCHEDULER_IS_DIRTY_CPU(esdp) && (state & ERTS_PSFLG_DIRTY_CPU_PROC)) || - (ERTS_SCHEDULER_IS_DIRTY_IO(esdp) && (state & ERTS_PSFLG_DIRTY_IO_PROC))); - if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && !(state & ERTS_PSFLG_ACTIVE_SYS)) - goto pick_next_process; - state &= ~(ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q|ERTS_PSFLG_DIRTY_IO_PROC_IN_Q); - } -#endif + if (ERTS_SCHEDULER_IS_DIRTY(esdp)) + psflg_band_mask = ~((erts_aint32_t) 0); + else + psflg_band_mask = ~(((erts_aint32_t) 1) << (ERTS_PSFLGS_GET_PRQ_PRIO(state) + + ERTS_PSFLGS_IN_PRQ_MASK_OFFSET)); if (!(state & ERTS_PSFLG_PROXY)) psflg_band_mask &= ~ERTS_PSFLG_IN_RUNQ; @@ -9632,9 +9725,11 @@ Process *schedule(Process *p, int calls) | ERTS_PSFLG_RUNNING_SYS))) { tmp = state & (ERTS_PSFLG_SUSPENDED | ERTS_PSFLG_PENDING_EXIT - | ERTS_PSFLG_ACTIVE_SYS); + | ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_DIRTY_ACTIVE_SYS); if (tmp != ERTS_PSFLG_SUSPENDED) { - if (state & ERTS_PSFLG_ACTIVE_SYS) + if (state & (ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_DIRTY_ACTIVE_SYS)) new |= ERTS_PSFLG_RUNNING_SYS; else new |= ERTS_PSFLG_RUNNING; @@ -9647,7 +9742,8 @@ Process *schedule(Process *p, int calls) | ERTS_PSFLG_FREE)) || ((state & (ERTS_PSFLG_SUSPENDED | ERTS_PSFLG_PENDING_EXIT - | ERTS_PSFLG_ACTIVE_SYS)) + | ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_DIRTY_ACTIVE_SYS)) == ERTS_PSFLG_SUSPENDED)) { if (state & ERTS_PSFLG_FREE) erts_proc_dec_refc(p); @@ -9666,10 +9762,42 @@ Process *schedule(Process *p, int calls) esdp->current_process = p; + + reds = context_reds; + +#ifdef ERTS_SMP + + erts_smp_runq_unlock(rq); + +#ifdef ERTS_DIRTY_SCHEDULERS + if (ERTS_SCHEDULER_IS_DIRTY(esdp)) { +#ifdef DEBUG + int old_dqbit; +#endif + int dqbit = qbit; + + if (rq == ERTS_DIRTY_CPU_RUNQ) + dqbit <<= ERTS_PDSFLGS_IN_CPU_PRQ_MASK_OFFSET; + else { + ASSERT(rq == ERTS_DIRTY_IO_RUNQ); + dqbit <<= ERTS_PDSFLGS_IN_IO_PRQ_MASK_OFFSET; + } + +#ifdef DEBUG + old_dqbit = (int) +#else + (void) +#endif + erts_smp_atomic32_read_band_mb(&p->dirty_state, ~dqbit); + ASSERT(old_dqbit & dqbit); + } +#endif /* ERTS_DIRTY_SCHEDULERS */ + +#endif /* ERTS_SMP */ + } #ifdef ERTS_SMP - erts_smp_runq_unlock(rq); if (flags & ERTS_RUNQ_FLG_PROTECTED) (void) ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED); @@ -9702,15 +9830,56 @@ Process *schedule(Process *p, int calls) erts_smp_spin_unlock(&erts_sched_stat.lock); } - if (ERTS_PROC_PENDING_EXIT(p)) { + ASSERT(!p->scheduler_data); + p->scheduler_data = esdp; + + state = erts_smp_atomic32_read_nob(&p->state); + +#ifdef ERTS_DIRTY_SCHEDULERS + if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) { + if (!!(state & ERTS_PSFLGS_DIRTY_WORK) + & !(state & ERTS_PSFLG_ACTIVE_SYS)) { + /* Migrate to dirty scheduler... */ + sunlock_sched_out_proc: + erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); + p->fcalls = reds; + goto sched_out_proc; + + } + } + else { + if (state & (ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_PENDING_EXIT + | ERTS_PSFLG_EXITING)) { + /* Migrate to normal scheduler... */ + goto sunlock_sched_out_proc; + } + if ((state & ERTS_PSFLG_DIRTY_ACTIVE_SYS) + && rq == ERTS_DIRTY_IO_RUNQ) { + /* Migrate to dirty cpu scheduler... */ + goto sunlock_sched_out_proc; + } + + ASSERT((state & ERTS_PSFLG_DIRTY_ACTIVE_SYS) + || *p->i == (BeamInstr) em_call_nif); + + ASSERT(rq == ERTS_DIRTY_CPU_RUNQ + ? (state & (ERTS_PSFLG_DIRTY_CPU_PROC + | ERTS_PSFLG_DIRTY_ACTIVE_SYS)) + : (rq == ERTS_DIRTY_IO_RUNQ + && (state & ERTS_PSFLG_DIRTY_IO_PROC))); + } +#endif + + if (state & ERTS_PSFLG_PENDING_EXIT) { erts_handle_pending_exit(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); state = erts_smp_atomic32_read_nob(&p->state); } - ASSERT(!p->scheduler_data); - p->scheduler_data = esdp; -#endif - reds = context_reds; + +#endif /* ERTS_SMP */ + + p->fcalls = reds; if (IS_TRACED(p)) { if (state & ERTS_PSFLG_EXITING) { @@ -9739,7 +9908,8 @@ Process *schedule(Process *p, int calls) reds -= execute_sys_tasks(p, &state, reds); if (reds <= 0 #ifdef ERTS_DIRTY_SCHEDULERS - || (state & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC)) + || ERTS_SCHEDULER_IS_DIRTY(esdp) + || (state & ERTS_PSFLGS_DIRTY_WORK) #endif ) { p->fcalls = reds; @@ -9787,7 +9957,6 @@ Process *schedule(Process *p, int calls) proxy_p = NULL; } - p->fcalls = reds; ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); /* Never run a suspended process */ @@ -10732,6 +10901,9 @@ static void early_init_process_struct(void *varg, Eterm data) Process *proc = arg->proc; proc->common.id = make_internal_pid(data); +#ifdef ERTS_DIRTY_SCHEDULERS + erts_smp_atomic32_init_nob(&proc->dirty_state, 0); +#endif erts_smp_atomic32_init_relb(&proc->state, arg->state); #ifdef ERTS_SMP @@ -11194,6 +11366,9 @@ void erts_init_empty_process(Process *p) p->last_old_htop = NULL; #endif +#ifdef ERTS_DIRTY_SCHEDULERS + erts_smp_atomic32_init_nob(&p->dirty_state, 0); +#endif erts_smp_atomic32_init_nob(&p->state, (erts_aint32_t) PRIORITY_NORMAL); #ifdef ERTS_SMP @@ -11395,7 +11570,9 @@ set_proc_exiting(Process *p, ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(p) == ERTS_PROC_LOCKS_ALL); enqueue = change_proc_schedule_state(p, - ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT, + (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_PENDING_EXIT + | ERTS_PSFLGS_DIRTY_WORK), ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE, &state, &enq_prio, @@ -11429,10 +11606,7 @@ set_proc_exiting(Process *p, } #endif - if (enqueue) - add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), - state, - enq_prio); + add2runq(enqueue, enq_prio, p, state, NULL); } static ERTS_INLINE erts_aint32_t @@ -11535,6 +11709,11 @@ save_pending_exiter(Process *p) else rq = esdp->run_queue; +#ifdef ERTS_DIRTY_SCHEDULERS + if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix)) + rq = ERTS_RUNQ_IX(0); /* Handle on ordinary scheduler */ +#endif + plp = proclist_create(p); erts_smp_runq_lock(rq); @@ -11544,13 +11723,8 @@ save_pending_exiter(Process *p) non_empty_runq(rq); erts_smp_runq_unlock(rq); -#ifdef ERTS_DIRTY_SCHEDULERS - if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix)) - wake_dirty_schedulers(rq, 0); - else -#endif - wake_scheduler(rq); + wake_scheduler(rq); } #endif |