diff options
-rw-r--r-- | erts/emulator/beam/erl_nif.c | 15 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 650 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 48 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process_dump.c | 10 |
4 files changed, 458 insertions, 265 deletions
diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c index d7a2076d85..962cb4858f 100644 --- a/erts/emulator/beam/erl_nif.c +++ b/erts/emulator/beam/erl_nif.c @@ -1772,12 +1772,10 @@ execute_dirty_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) ep = (NifExport*) ERTS_PROC_GET_NIF_TRAP_EXPORT(proc); ASSERT(ep); ep->fp = NULL; + erts_smp_atomic32_read_band_mb(&proc->state, ~(ERTS_PSFLG_DIRTY_CPU_PROC + | ERTS_PSFLG_DIRTY_IO_PROC)); result = (*fp)(env, argc, argv); - erts_smp_atomic32_read_band_mb(&proc->state, - ~(ERTS_PSFLG_DIRTY_CPU_PROC - |ERTS_PSFLG_DIRTY_IO_PROC - |ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q - |ERTS_PSFLG_DIRTY_IO_PROC_IN_Q)); + if (erts_refc_dectest(&env->mod_nif->rt_dtor_cnt, 0) == 0 && env->mod_nif->mod == NULL) close_lib(env->mod_nif); /* @@ -1825,13 +1823,6 @@ schedule_dirty_nif(ErlNifEnv* env, int flags, int argc, const ERL_NIF_TERM argv[ a = erts_smp_atomic32_read_acqb(&proc->state); while (1) { n = state = a; - /* - * clear any current dirty flags and dirty queue indicators, - * in case the application is shifting a job from one type - * of dirty scheduler to the other - */ - n &= ~(ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC - |ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q|ERTS_PSFLG_DIRTY_IO_PROC_IN_Q); if (flags == ERL_NIF_DIRTY_JOB_CPU_BOUND) n |= ERTS_PSFLG_DIRTY_CPU_PROC; else diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index d583118e7b..1765315ed3 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -2969,7 +2969,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) sched_wall_time_change(esdp, thr_prgr_active); while (1) { - ErtsMonotonicTime current_time; + ErtsMonotonicTime current_time = 0; aux_work = erts_atomic32_read_acqb(&ssi->aux_work); if (aux_work) { @@ -3635,6 +3635,13 @@ check_requeue_process(ErtsRunQueue *rq, int prio_q) return 0; } +static ERTS_INLINE void +free_proxy_proc(Process *proxy) +{ + ASSERT(erts_smp_atomic32_read_nob(&proxy->state) & ERTS_PSFLG_PROXY); + erts_free(ERTS_ALC_T_PROC, proxy); +} + #ifdef ERTS_SMP static ErtsRunQueue * @@ -3949,9 +3956,6 @@ evacuate_run_queue(ErtsRunQueue *rq, erts_aint32_t state; Process *proc; int notify = 0; -#ifdef ERTS_DIRTY_SCHEDULERS - int requeue; -#endif to_rq = NULL; #ifdef ERTS_DIRTY_SCHEDULERS @@ -3966,49 +3970,94 @@ evacuate_run_queue(ErtsRunQueue *rq, proc = dequeue_process(rq, prio_q, &state); while (proc) { -#ifdef ERTS_DIRTY_SCHEDULERS - requeue = 1; + Process *real_proc; + int prio; + erts_aint32_t max_qbit, qbit, real_state; + + prio = ERTS_PSFLGS_GET_PRQ_PRIO(state); + qbit = ((erts_aint32_t) 1) << prio; + + if (!(state & ERTS_PSFLG_PROXY)) { + real_proc = proc; + real_state = state; + } + else { + real_proc = erts_proc_lookup_raw(proc->common.id); + if (!real_proc) { + free_proxy_proc(proc); + goto handle_next_proc; + } + real_state = erts_smp_atomic32_read_acqb(&real_proc->state); + } + + max_qbit = (state >> ERTS_PSFLGS_IN_PRQ_MASK_OFFSET); + max_qbit &= ERTS_PSFLGS_QMASK; + max_qbit |= 1 << ERTS_PSFLGS_QMASK_BITS; + max_qbit &= -max_qbit; + + if (qbit > max_qbit) { + /* Process already queued with higher prio; drop it... */ + if (real_proc != proc) + free_proxy_proc(proc); + else { + erts_aint32_t clr_bits; +#ifdef DEBUG + erts_aint32_t old; #endif - if (ERTS_PSFLG_BOUND & state) { - /* Bound processes get stuck here... */ - proc->next = NULL; - if (sbpp->last) - sbpp->last->next = proc; - else - sbpp->first = proc; - sbpp->last = proc; -#ifdef ERTS_DIRTY_SCHEDULERS - requeue = 0; + + clr_bits = ERTS_PSFLG_IN_RUNQ; + clr_bits |= qbit << ERTS_PSFLGS_IN_PRQ_MASK_OFFSET; + +#ifdef DEBUG + old = +#else + (void) #endif + erts_smp_atomic32_read_band_mb(&proc->state, + ~clr_bits); + ASSERT((old & clr_bits) == clr_bits); + + } + + goto handle_next_proc; } + #ifdef ERTS_DIRTY_SCHEDULERS - else if (state & ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q) { + + if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix)) { + erts_aint32_t dqbit = qbit; #ifdef DEBUG - erts_aint32_t old = -#else - (void) + erts_aint32_t old_dqbit; #endif - erts_smp_atomic32_read_band_nob(&proc->state, - ~(ERTS_PSFLG_DIRTY_CPU_PROC - | ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q)); - /* assert that no other dirty flags are set */ - ASSERT(!(old & (ERTS_PSFLG_DIRTY_IO_PROC|ERTS_PSFLG_DIRTY_IO_PROC_IN_Q))); - } else if (state & ERTS_PSFLG_DIRTY_IO_PROC_IN_Q) { + + if (rq == ERTS_DIRTY_CPU_RUNQ) + dqbit <<= ERTS_PDSFLGS_IN_CPU_PRQ_MASK_OFFSET; + else { + ASSERT(rq == ERTS_DIRTY_IO_RUNQ); + dqbit <<= ERTS_PDSFLGS_IN_IO_PRQ_MASK_OFFSET; + } + #ifdef DEBUG - erts_aint32_t old = + old_dqbit = (int) #else - (void) + (void) #endif - erts_smp_atomic32_read_band_nob(&proc->state, - ~(ERTS_PSFLG_DIRTY_IO_PROC - | ERTS_PSFLG_DIRTY_IO_PROC_IN_Q)); - /* assert that no other dirty flags are set */ - ASSERT(!(old & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q))); + erts_smp_atomic32_read_band_mb(&real_proc->dirty_state, + ~dqbit); + ASSERT(old_dqbit & dqbit); } - if (requeue) { -#else - else { #endif + + if (ERTS_PSFLG_BOUND & real_state) { + /* Bound processes get stuck here... */ + proc->next = NULL; + if (sbpp->last) + sbpp->last->next = proc; + else + sbpp->first = proc; + sbpp->last = proc; + } + else { int prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state); erts_smp_runq_unlock(rq); @@ -4031,6 +4080,8 @@ evacuate_run_queue(ErtsRunQueue *rq, erts_smp_runq_lock(rq); } + + handle_next_proc: proc = dequeue_process(rq, prio_q, &state); } if (notify) @@ -5974,19 +6025,96 @@ make_proxy_proc(Process *prev_proxy, Process *proc, erts_aint32_t prio) return proxy; } -static ERTS_INLINE void -free_proxy_proc(Process *proxy) -{ - ASSERT(erts_smp_atomic32_read_nob(&proxy->state) & ERTS_PSFLG_PROXY); - erts_free(ERTS_ALC_T_PROC, proxy); -} - #define ERTS_ENQUEUE_NOT 0 #define ERTS_ENQUEUE_NORMAL_QUEUE 1 -#ifdef ERTS_DIRTY_SCHEDULERS #define ERTS_ENQUEUE_DIRTY_CPU_QUEUE 2 #define ERTS_ENQUEUE_DIRTY_IO_QUEUE 3 + +#ifdef ERTS_DIRTY_SCHEDULERS + +static int +check_dirty_enqueue_in_prio_queue(Process *c_p, + erts_aint32_t *newp, + erts_aint32_t actual, + erts_aint32_t aprio, + erts_aint32_t qbit) +{ + int queue; + erts_aint32_t dact, max_qbit; + + /* Termination should be done on an ordinary scheduler */ + if (actual & ERTS_PSFLG_EXITING) { + *newp &= ~ERTS_PSFLGS_DIRTY_WORK; + return ERTS_ENQUEUE_NORMAL_QUEUE; + } + + /* + * If we have system tasks, we enqueue on ordinary run-queue + * and take care of those system tasks first. + */ + if (actual & ERTS_PSFLG_ACTIVE_SYS) + return ERTS_ENQUEUE_NORMAL_QUEUE; + + dact = erts_smp_atomic32_read_mb(&c_p->dirty_state); + if (actual & (ERTS_PSFLG_DIRTY_ACTIVE_SYS + | ERTS_PSFLG_DIRTY_CPU_PROC)) { + max_qbit = ((dact >> ERTS_PDSFLGS_IN_CPU_PRQ_MASK_OFFSET) + & ERTS_PDSFLGS_QMASK); + queue = ERTS_ENQUEUE_DIRTY_CPU_QUEUE; + } + else { + ASSERT(actual & ERTS_PSFLG_DIRTY_IO_PROC); + max_qbit = ((dact >> ERTS_PDSFLGS_IN_IO_PRQ_MASK_OFFSET) + & ERTS_PDSFLGS_QMASK); + queue = ERTS_ENQUEUE_DIRTY_IO_QUEUE; + } + + max_qbit |= 1 << ERTS_PSFLGS_QMASK_BITS; + max_qbit &= -max_qbit; + + if (qbit >= max_qbit) + return ERTS_ENQUEUE_NOT; /* Already queued in higher or equal prio */ + if ((actual & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLGS_USR_PRIO_MASK)) + != (aprio << ERTS_PSFLGS_USR_PRIO_OFFSET)) { + /* + * Process struct already enqueued, or actual prio not + * equal to user prio, i.e., enqueue using proxy. + */ + return -1*queue; + } + + *newp |= ERTS_PSFLG_IN_RUNQ; + return queue; +} + +static ERTS_INLINE int +fin_dirty_enq_s_change(Process *p, + int pstruct_reserved, + erts_aint32_t enq_prio, + int qmask_offset) +{ + erts_aint32_t qbit = 1 << enq_prio; + qbit <<= qmask_offset; + + if (qbit & erts_smp_atomic32_read_bor_mb(&p->dirty_state, qbit)) { + /* Already enqueue by someone else... */ + if (pstruct_reserved) { + /* We reserved process struct for enqueue; clear it... */ +#ifdef DEBUG + erts_aint32_t old = +#else + (void) #endif + erts_smp_atomic32_read_band_nob(&p->state, ~ERTS_PSFLG_IN_RUNQ); + ASSERT(old & ERTS_PSFLG_IN_RUNQ); + } + return 0; + } + + return !0; +} + +#endif /* ERTS_DIRTY_SCHEDULERS */ static ERTS_INLINE int check_enqueue_in_prio_queue(Process *c_p, @@ -6002,61 +6130,14 @@ check_enqueue_in_prio_queue(Process *c_p, *prq_prio_p = aprio; #ifdef ERTS_DIRTY_SCHEDULERS - if (actual & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC)) { - /* - * If we have system tasks of a priority higher - * or equal to the user priority, we enqueue - * on ordinary run-queue and take care of - * those system tasks first. - */ - if (actual & ERTS_PSFLG_ACTIVE_SYS) { - erts_aint32_t uprio, stprio, qmask; - uprio = (actual >> ERTS_PSFLGS_USR_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK; - if (aprio < uprio) - goto enqueue_normal_runq; /* system tasks with higher prio */ - erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); - qmask = c_p->sys_task_qs->qmask; - erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS); - switch (qmask & -qmask) { - case MAX_BIT: - stprio = PRIORITY_MAX; - break; - case HIGH_BIT: - stprio = PRIORITY_HIGH; - break; - case NORMAL_BIT: - stprio = PRIORITY_NORMAL; - break; - case LOW_BIT: - stprio = PRIORITY_LOW; - break; - default: - stprio = PRIORITY_LOW+1; - break; - } - if (stprio <= uprio) - goto enqueue_normal_runq; /* system tasks with higher prio */ - } - - /* Enqueue in dirty run queue if not already enqueued */ - if (actual & (ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q|ERTS_PSFLG_DIRTY_IO_PROC_IN_Q)) - return ERTS_ENQUEUE_NOT; /* already in queue */ - if (actual & ERTS_PSFLG_DIRTY_CPU_PROC) { - *newp |= ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q; - if (actual & ERTS_PSFLG_IN_RUNQ) - return -ERTS_ENQUEUE_DIRTY_CPU_QUEUE; /* use proxy */ - *newp |= ERTS_PSFLG_IN_RUNQ; - return ERTS_ENQUEUE_DIRTY_CPU_QUEUE; - } - *newp |= ERTS_PSFLG_DIRTY_IO_PROC_IN_Q; - if (actual & ERTS_PSFLG_IN_RUNQ) - return -ERTS_ENQUEUE_DIRTY_IO_QUEUE; /* use proxy */ - *newp |= ERTS_PSFLG_IN_RUNQ; - return ERTS_ENQUEUE_DIRTY_IO_QUEUE; + if (actual & ERTS_PSFLGS_DIRTY_WORK) { + int res = check_dirty_enqueue_in_prio_queue(c_p, newp, actual, + aprio, qbit); + if (res != ERTS_ENQUEUE_NORMAL_QUEUE) + return res; } - - enqueue_normal_runq: #endif + max_qbit = (actual >> ERTS_PSFLGS_IN_PRQ_MASK_OFFSET) & ERTS_PSFLGS_QMASK; max_qbit |= 1 << ERTS_PSFLGS_QMASK_BITS; max_qbit &= -max_qbit; @@ -6088,6 +6169,65 @@ check_enqueue_in_prio_queue(Process *c_p, return ERTS_ENQUEUE_NORMAL_QUEUE; } +static ERTS_INLINE ErtsRunQueue * +select_enqueue_run_queue(int enqueue, int enq_prio, Process *p, erts_aint32_t state) +{ + + switch (enqueue) { + + case ERTS_ENQUEUE_NOT: + + return NULL; + +#ifdef ERTS_DIRTY_SCHEDULERS + + case ERTS_ENQUEUE_DIRTY_CPU_QUEUE: + case -ERTS_ENQUEUE_DIRTY_CPU_QUEUE: + + if (fin_dirty_enq_s_change(p, enqueue > 0, enq_prio, + ERTS_PDSFLGS_IN_CPU_PRQ_MASK_OFFSET)) + return ERTS_DIRTY_CPU_RUNQ; + + return NULL; + + + case ERTS_ENQUEUE_DIRTY_IO_QUEUE: + case -ERTS_ENQUEUE_DIRTY_IO_QUEUE: + + if (fin_dirty_enq_s_change(p, enqueue > 0, enq_prio, + ERTS_PDSFLGS_IN_IO_PRQ_MASK_OFFSET)) + return ERTS_DIRTY_IO_RUNQ; + + return NULL; + +#endif + + default: { + ErtsRunQueue* runq; + + ASSERT(enqueue == ERTS_ENQUEUE_NORMAL_QUEUE + || enqueue == -ERTS_ENQUEUE_NORMAL_QUEUE); + + runq = erts_get_runq_proc(p); + +#ifdef ERTS_SMP + if (!(ERTS_PSFLG_BOUND & state)) { + ErtsRunQueue *new_runq = erts_check_emigration_need(runq, enq_prio); + if (new_runq) { + RUNQ_SET_RQ(&p->run_queue, new_runq); + runq = new_runq; + } + } +#endif + + ASSERT(runq); + + return runq; + } + } +} + + /* * schedule_out_process() return with c_rq locked. */ @@ -6096,11 +6236,7 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Proces { erts_aint32_t a, e, n, enq_prio = -1; int enqueue; /* < 0 -> use proxy */ - Process* sched_p; ErtsRunQueue* runq; -#ifdef ERTS_SMP - int check_emigration_need; -#endif a = state; @@ -6112,7 +6248,7 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Proces enqueue = ERTS_ENQUEUE_NOT; n &= ~(ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS); - if (a & ERTS_PSFLG_ACTIVE_SYS + if (a & (ERTS_PSFLG_ACTIVE_SYS|ERTS_PSFLG_DIRTY_ACTIVE_SYS) || (a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) { enqueue = check_enqueue_in_prio_queue(p, &enq_prio, &n, a); } @@ -6121,16 +6257,17 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Proces break; } - switch (enqueue) { - case ERTS_ENQUEUE_NOT: + runq = select_enqueue_run_queue(enqueue, enq_prio, p, n); + + if (!runq) { + if (erts_system_profile_flags.runnable_procs) { /* Status lock prevents out of order "runnable proc" trace msgs */ ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - if (!(a & ERTS_PSFLG_ACTIVE_SYS) - && (!(a & ERTS_PSFLG_ACTIVE) - || (a & ERTS_PSFLG_SUSPENDED))) { + if (!(a & (ERTS_PSFLG_ACTIVE_SYS|ERTS_PSFLG_DIRTY_ACTIVE_SYS)) + && (!(a & ERTS_PSFLG_ACTIVE) || (a & ERTS_PSFLG_SUSPENDED))) { /* Process inactive */ profile_runnable_proc(p, am_inactive); } @@ -6143,98 +6280,76 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Proces return 0; -#ifdef ERTS_DIRTY_SCHEDULERS -#ifdef ERTS_SMP - case ERTS_ENQUEUE_DIRTY_CPU_QUEUE: - case -ERTS_ENQUEUE_DIRTY_CPU_QUEUE: - runq = ERTS_DIRTY_CPU_RUNQ; - ASSERT(ERTS_SCHEDULER_IS_DIRTY_CPU(runq->scheduler)); -#ifdef ERTS_SMP - check_emigration_need = 0; -#endif - break; + } + else { + Process* sched_p; - case ERTS_ENQUEUE_DIRTY_IO_QUEUE: - case -ERTS_ENQUEUE_DIRTY_IO_QUEUE: - runq = ERTS_DIRTY_IO_RUNQ; - ASSERT(ERTS_SCHEDULER_IS_DIRTY_IO(runq->scheduler)); -#ifdef ERTS_SMP - check_emigration_need = 0; -#endif - break; -#endif -#endif + ASSERT(!(n & ERTS_PSFLG_SUSPENDED) || (n & (ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_DIRTY_ACTIVE_SYS))); - default: - ASSERT(enqueue == ERTS_ENQUEUE_NORMAL_QUEUE - || enqueue == -ERTS_ENQUEUE_NORMAL_QUEUE); + if (enqueue < 0) + sched_p = make_proxy_proc(proxy, p, enq_prio); + else { + sched_p = p; + if (proxy) + free_proxy_proc(proxy); + } - runq = erts_get_runq_proc(p); -#ifdef ERTS_SMP - check_emigration_need = !(ERTS_PSFLG_BOUND & n); -#endif - break; - } + ASSERT(runq); - ASSERT(!(n & ERTS_PSFLG_SUSPENDED) || (n & ERTS_PSFLG_ACTIVE_SYS)); + erts_smp_runq_lock(runq); - if (enqueue < 0) - sched_p = make_proxy_proc(proxy, p, enq_prio); - else { - sched_p = p; - if (proxy) - free_proxy_proc(proxy); - } + /* Enqueue the process */ + enqueue_process(runq, (int) enq_prio, sched_p); -#ifdef ERTS_SMP - if (check_emigration_need) { - ErtsRunQueue *new_runq = erts_check_emigration_need(runq, enq_prio); - if (new_runq) { - RUNQ_SET_RQ(&sched_p->run_queue, new_runq); - runq = new_runq; - } - } -#endif + if (runq == c_rq) + return 1; - ASSERT(runq); + erts_smp_runq_unlock(runq); - erts_smp_runq_lock(runq); + smp_notify_inc_runq(runq); - /* Enqueue the process */ - enqueue_process(runq, (int) enq_prio, sched_p); + erts_smp_runq_lock(c_rq); - if (runq == c_rq) return 1; - erts_smp_runq_unlock(runq); - smp_notify_inc_runq(runq); - erts_smp_runq_lock(c_rq); - return 1; + } + } static ERTS_INLINE void -add2runq(Process *p, erts_aint32_t state, erts_aint32_t prio) +add2runq(int enqueue, erts_aint32_t prio, + Process *proc, erts_aint32_t state, + Process **proxy) { - ErtsRunQueue *runq = erts_get_runq_proc(p); + ErtsRunQueue *runq; -#ifdef ERTS_SMP - if (!(ERTS_PSFLG_BOUND & state)) { - ErtsRunQueue *new_runq = erts_check_emigration_need(runq, (int) prio); - if (new_runq) { - RUNQ_SET_RQ(&p->run_queue, new_runq); - runq = new_runq; - } - } -#endif - ASSERT(runq); + runq = select_enqueue_run_queue(enqueue, prio, proc, state); - erts_smp_runq_lock(runq); + if (runq) { + Process *sched_p; - /* Enqueue the process */ - enqueue_process(runq, (int) prio, p); + if (enqueue > 0) + sched_p = proc; + else { + Process *pxy; - erts_smp_runq_unlock(runq); - smp_notify_inc_runq(runq); + if (!proxy) + pxy = NULL; + else { + pxy = *proxy; + *proxy = NULL; + } + sched_p = make_proxy_proc(pxy, proc, prio); + } + + erts_smp_runq_lock(runq); + + /* Enqueue the process */ + enqueue_process(runq, (int) prio, sched_p); + erts_smp_runq_unlock(runq); + smp_notify_inc_runq(runq); + } } static ERTS_INLINE int @@ -6340,10 +6455,7 @@ schedule_process(Process *p, erts_aint32_t in_state, ErtsProcLocks locks) &state, &enq_prio, locks); - if (enqueue != ERTS_ENQUEUE_NOT) - add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), - state, - enq_prio); + add2runq(enqueue, enq_prio, p, state, NULL); } void @@ -6403,16 +6515,7 @@ schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy) prof_runnable_procs = 0; } - if (enqueue != ERTS_ENQUEUE_NOT) { - Process *sched_p; - if (enqueue > 0) - sched_p = p; - else { - sched_p = make_proxy_proc(proxy, p, enq_prio); - proxy = NULL; - } - add2runq(sched_p, n, enq_prio); - } + add2runq(enqueue, enq_prio, p, n, &proxy); cleanup: @@ -6508,10 +6611,7 @@ resume_process(Process *p, ErtsProcLocks locks) &state, &enq_prio, locks); - if (enqueue) - add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), - state, - enq_prio); + add2runq(enqueue, enq_prio, p, state, NULL); } int @@ -9361,15 +9461,16 @@ Process *schedule(Process *p, int calls) #ifdef ERTS_SMP ErtsMigrationPaths *mps; ErtsMigrationPath *mp; - ErtsProcList *pnd_xtrs = rq->procs.pending_exiters; - if (erts_proclist_fetch(&pnd_xtrs, NULL)) { + + if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) { + ErtsProcList *pnd_xtrs = rq->procs.pending_exiters; + if (erts_proclist_fetch(&pnd_xtrs, NULL)) { rq->procs.pending_exiters = NULL; erts_smp_runq_unlock(rq); handle_pending_exiters(pnd_xtrs); erts_smp_runq_lock(rq); } - if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) { if (rq->check_balance_reds <= 0) check_balance(rq); @@ -9564,11 +9665,12 @@ Process *schedule(Process *p, int calls) pick_next_process: { erts_aint32_t psflg_band_mask; int prio_q; - int qmask; + int qmask, qbit; flags = ERTS_RUNQ_FLGS_GET_NOB(rq); qmask = (int) (flags & ERTS_RUNQ_FLGS_PROCS_QMASK); - switch (qmask & -qmask) { + qbit = qmask & -qmask; + switch (qbit) { case MAX_BIT: prio_q = PRIORITY_MAX; break; @@ -9596,20 +9698,11 @@ Process *schedule(Process *p, int calls) ASSERT(p); /* Wrong qmask in rq->flags? */ - psflg_band_mask = ~(((erts_aint32_t) 1) << (ERTS_PSFLGS_GET_PRQ_PRIO(state) - + ERTS_PSFLGS_IN_PRQ_MASK_OFFSET)); - -#ifdef ERTS_DIRTY_SCHEDULERS - ASSERT((state & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC)) != - (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC)); - if (state & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC)) { - ASSERT((ERTS_SCHEDULER_IS_DIRTY_CPU(esdp) && (state & ERTS_PSFLG_DIRTY_CPU_PROC)) || - (ERTS_SCHEDULER_IS_DIRTY_IO(esdp) && (state & ERTS_PSFLG_DIRTY_IO_PROC))); - if (!ERTS_SCHEDULER_IS_DIRTY(esdp) && !(state & ERTS_PSFLG_ACTIVE_SYS)) - goto pick_next_process; - state &= ~(ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q|ERTS_PSFLG_DIRTY_IO_PROC_IN_Q); - } -#endif + if (ERTS_SCHEDULER_IS_DIRTY(esdp)) + psflg_band_mask = ~((erts_aint32_t) 0); + else + psflg_band_mask = ~(((erts_aint32_t) 1) << (ERTS_PSFLGS_GET_PRQ_PRIO(state) + + ERTS_PSFLGS_IN_PRQ_MASK_OFFSET)); if (!(state & ERTS_PSFLG_PROXY)) psflg_band_mask &= ~ERTS_PSFLG_IN_RUNQ; @@ -9632,9 +9725,11 @@ Process *schedule(Process *p, int calls) | ERTS_PSFLG_RUNNING_SYS))) { tmp = state & (ERTS_PSFLG_SUSPENDED | ERTS_PSFLG_PENDING_EXIT - | ERTS_PSFLG_ACTIVE_SYS); + | ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_DIRTY_ACTIVE_SYS); if (tmp != ERTS_PSFLG_SUSPENDED) { - if (state & ERTS_PSFLG_ACTIVE_SYS) + if (state & (ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_DIRTY_ACTIVE_SYS)) new |= ERTS_PSFLG_RUNNING_SYS; else new |= ERTS_PSFLG_RUNNING; @@ -9647,7 +9742,8 @@ Process *schedule(Process *p, int calls) | ERTS_PSFLG_FREE)) || ((state & (ERTS_PSFLG_SUSPENDED | ERTS_PSFLG_PENDING_EXIT - | ERTS_PSFLG_ACTIVE_SYS)) + | ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_DIRTY_ACTIVE_SYS)) == ERTS_PSFLG_SUSPENDED)) { if (state & ERTS_PSFLG_FREE) erts_proc_dec_refc(p); @@ -9666,10 +9762,42 @@ Process *schedule(Process *p, int calls) esdp->current_process = p; + + reds = context_reds; + +#ifdef ERTS_SMP + + erts_smp_runq_unlock(rq); + +#ifdef ERTS_DIRTY_SCHEDULERS + if (ERTS_SCHEDULER_IS_DIRTY(esdp)) { +#ifdef DEBUG + int old_dqbit; +#endif + int dqbit = qbit; + + if (rq == ERTS_DIRTY_CPU_RUNQ) + dqbit <<= ERTS_PDSFLGS_IN_CPU_PRQ_MASK_OFFSET; + else { + ASSERT(rq == ERTS_DIRTY_IO_RUNQ); + dqbit <<= ERTS_PDSFLGS_IN_IO_PRQ_MASK_OFFSET; + } + +#ifdef DEBUG + old_dqbit = (int) +#else + (void) +#endif + erts_smp_atomic32_read_band_mb(&p->dirty_state, ~dqbit); + ASSERT(old_dqbit & dqbit); + } +#endif /* ERTS_DIRTY_SCHEDULERS */ + +#endif /* ERTS_SMP */ + } #ifdef ERTS_SMP - erts_smp_runq_unlock(rq); if (flags & ERTS_RUNQ_FLG_PROTECTED) (void) ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED); @@ -9702,15 +9830,56 @@ Process *schedule(Process *p, int calls) erts_smp_spin_unlock(&erts_sched_stat.lock); } - if (ERTS_PROC_PENDING_EXIT(p)) { + ASSERT(!p->scheduler_data); + p->scheduler_data = esdp; + + state = erts_smp_atomic32_read_nob(&p->state); + +#ifdef ERTS_DIRTY_SCHEDULERS + if (!ERTS_SCHEDULER_IS_DIRTY(esdp)) { + if (!!(state & ERTS_PSFLGS_DIRTY_WORK) + & !(state & ERTS_PSFLG_ACTIVE_SYS)) { + /* Migrate to dirty scheduler... */ + sunlock_sched_out_proc: + erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); + p->fcalls = reds; + goto sched_out_proc; + + } + } + else { + if (state & (ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_PENDING_EXIT + | ERTS_PSFLG_EXITING)) { + /* Migrate to normal scheduler... */ + goto sunlock_sched_out_proc; + } + if ((state & ERTS_PSFLG_DIRTY_ACTIVE_SYS) + && rq == ERTS_DIRTY_IO_RUNQ) { + /* Migrate to dirty cpu scheduler... */ + goto sunlock_sched_out_proc; + } + + ASSERT((state & ERTS_PSFLG_DIRTY_ACTIVE_SYS) + || *p->i == (BeamInstr) em_call_nif); + + ASSERT(rq == ERTS_DIRTY_CPU_RUNQ + ? (state & (ERTS_PSFLG_DIRTY_CPU_PROC + | ERTS_PSFLG_DIRTY_ACTIVE_SYS)) + : (rq == ERTS_DIRTY_IO_RUNQ + && (state & ERTS_PSFLG_DIRTY_IO_PROC))); + } +#endif + + if (state & ERTS_PSFLG_PENDING_EXIT) { erts_handle_pending_exit(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); state = erts_smp_atomic32_read_nob(&p->state); } - ASSERT(!p->scheduler_data); - p->scheduler_data = esdp; -#endif - reds = context_reds; + +#endif /* ERTS_SMP */ + + p->fcalls = reds; if (IS_TRACED(p)) { if (state & ERTS_PSFLG_EXITING) { @@ -9739,7 +9908,8 @@ Process *schedule(Process *p, int calls) reds -= execute_sys_tasks(p, &state, reds); if (reds <= 0 #ifdef ERTS_DIRTY_SCHEDULERS - || (state & (ERTS_PSFLG_DIRTY_CPU_PROC|ERTS_PSFLG_DIRTY_IO_PROC)) + || ERTS_SCHEDULER_IS_DIRTY(esdp) + || (state & ERTS_PSFLGS_DIRTY_WORK) #endif ) { p->fcalls = reds; @@ -9787,7 +9957,6 @@ Process *schedule(Process *p, int calls) proxy_p = NULL; } - p->fcalls = reds; ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); /* Never run a suspended process */ @@ -10732,6 +10901,9 @@ static void early_init_process_struct(void *varg, Eterm data) Process *proc = arg->proc; proc->common.id = make_internal_pid(data); +#ifdef ERTS_DIRTY_SCHEDULERS + erts_smp_atomic32_init_nob(&proc->dirty_state, 0); +#endif erts_smp_atomic32_init_relb(&proc->state, arg->state); #ifdef ERTS_SMP @@ -11194,6 +11366,9 @@ void erts_init_empty_process(Process *p) p->last_old_htop = NULL; #endif +#ifdef ERTS_DIRTY_SCHEDULERS + erts_smp_atomic32_init_nob(&p->dirty_state, 0); +#endif erts_smp_atomic32_init_nob(&p->state, (erts_aint32_t) PRIORITY_NORMAL); #ifdef ERTS_SMP @@ -11395,7 +11570,9 @@ set_proc_exiting(Process *p, ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(p) == ERTS_PROC_LOCKS_ALL); enqueue = change_proc_schedule_state(p, - ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT, + (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_PENDING_EXIT + | ERTS_PSFLGS_DIRTY_WORK), ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE, &state, &enq_prio, @@ -11429,10 +11606,7 @@ set_proc_exiting(Process *p, } #endif - if (enqueue) - add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), - state, - enq_prio); + add2runq(enqueue, enq_prio, p, state, NULL); } static ERTS_INLINE erts_aint32_t @@ -11535,6 +11709,11 @@ save_pending_exiter(Process *p) else rq = esdp->run_queue; +#ifdef ERTS_DIRTY_SCHEDULERS + if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix)) + rq = ERTS_RUNQ_IX(0); /* Handle on ordinary scheduler */ +#endif + plp = proclist_create(p); erts_smp_runq_lock(rq); @@ -11544,13 +11723,8 @@ save_pending_exiter(Process *p) non_empty_runq(rq); erts_smp_runq_unlock(rq); -#ifdef ERTS_DIRTY_SCHEDULERS - if (ERTS_RUNQ_IX_IS_DIRTY(rq->ix)) - wake_dirty_schedulers(rq, 0); - else -#endif - wake_scheduler(rq); + wake_scheduler(rq); } #endif diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 10c6fa4a67..32e458aa1d 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -1033,6 +1033,9 @@ struct process { ErtsProcSysTaskQs *sys_task_qs; erts_smp_atomic32_t state; /* Process state flags (see ERTS_PSFLG_*) */ +#ifdef ERTS_DIRTY_SCHEDULERS + erts_smp_atomic32_t dirty_state; /* Process dirty state flags (see ERTS_PDSFLG_*) */ +#endif #ifdef ERTS_SMP ErlMessageInQueue msg_inq; @@ -1150,15 +1153,14 @@ void erts_check_for_holes(Process* p); #define ERTS_PSFLG_RUNNING_SYS ERTS_PSFLG_BIT(15) #define ERTS_PSFLG_PROXY ERTS_PSFLG_BIT(16) #define ERTS_PSFLG_DELAYED_SYS ERTS_PSFLG_BIT(17) -#ifdef ERTS_DIRTY_SCHEDULERS #define ERTS_PSFLG_DIRTY_CPU_PROC ERTS_PSFLG_BIT(18) #define ERTS_PSFLG_DIRTY_IO_PROC ERTS_PSFLG_BIT(19) -#define ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q ERTS_PSFLG_BIT(20) -#define ERTS_PSFLG_DIRTY_IO_PROC_IN_Q ERTS_PSFLG_BIT(21) -#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 22) -#else -#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 18) -#endif +#define ERTS_PSFLG_DIRTY_ACTIVE_SYS ERTS_PSFLG_BIT(20) +#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 20) + +#define ERTS_PSFLGS_DIRTY_WORK (ERTS_PSFLG_DIRTY_CPU_PROC \ + | ERTS_PSFLG_DIRTY_IO_PROC \ + | ERTS_PSFLG_DIRTY_ACTIVE_SYS) #define ERTS_PSFLGS_IN_PRQ_MASK (ERTS_PSFLG_IN_PRQ_MAX \ | ERTS_PSFLG_IN_PRQ_HIGH \ @@ -1170,7 +1172,37 @@ void erts_check_for_holes(Process* p); #define ERTS_PSFLGS_GET_USR_PRIO(PSFLGS) \ (((PSFLGS) >> ERTS_PSFLGS_USR_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK) #define ERTS_PSFLGS_GET_PRQ_PRIO(PSFLGS) \ - (((PSFLGS) >> ERTS_PSFLGS_USR_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK) + (((PSFLGS) >> ERTS_PSFLGS_PRQ_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK) + +#ifdef ERTS_DIRTY_SCHEDULERS + +/* + * Flags in the dirty_state field. + */ + +#define ERTS_PDSFLG_IN_CPU_PRQ_MAX (((erts_aint32_t) 1) << 0) +#define ERTS_PDSFLG_IN_CPU_PRQ_HIGH (((erts_aint32_t) 1) << 1) +#define ERTS_PDSFLG_IN_CPU_PRQ_NORMAL (((erts_aint32_t) 1) << 2) +#define ERTS_PDSFLG_IN_CPU_PRQ_LOW (((erts_aint32_t) 1) << 3) +#define ERTS_PDSFLG_IN_IO_PRQ_MAX (((erts_aint32_t) 1) << 4) +#define ERTS_PDSFLG_IN_IO_PRQ_HIGH (((erts_aint32_t) 1) << 5) +#define ERTS_PDSFLG_IN_IO_PRQ_NORMAL (((erts_aint32_t) 1) << 6) +#define ERTS_PDSFLG_IN_IO_PRQ_LOW (((erts_aint32_t) 1) << 7) + +#define ERTS_PDSFLGS_QMASK ERTS_PSFLGS_QMASK +#define ERTS_PDSFLGS_IN_CPU_PRQ_MASK_OFFSET 0 +#define ERTS_PDSFLGS_IN_IO_PRQ_MASK_OFFSET ERTS_PSFLGS_QMASK_BITS + +#define ERTS_PDSFLG_IN_CPU_PRQ_MASK (ERTS_PDSFLG_IN_CPU_PRQ_MAX \ + | ERTS_PDSFLG_IN_CPU_PRQ_HIGH \ + | ERTS_PDSFLG_IN_CPU_PRQ_NORMAL\ + | ERTS_PDSFLG_IN_CPU_PRQ_LOW) +#define ERTS_PDSFLG_IN_IO_PRQ_MASK (ERTS_PDSFLG_IN_CPU_PRQ_MAX \ + | ERTS_PDSFLG_IN_CPU_PRQ_HIGH \ + | ERTS_PDSFLG_IN_CPU_PRQ_NORMAL\ + | ERTS_PDSFLG_IN_CPU_PRQ_LOW) +#endif + /* * Static flags that do not change after process creation. diff --git a/erts/emulator/beam/erl_process_dump.c b/erts/emulator/beam/erl_process_dump.c index 25f0b1ed38..598c1d66ba 100644 --- a/erts/emulator/beam/erl_process_dump.c +++ b/erts/emulator/beam/erl_process_dump.c @@ -617,7 +617,7 @@ erts_dump_extended_process_state(int to, void *to_arg, erts_aint32_t psflg) { if (psflg) erts_print(to, to_arg, " | "); - for (i = 0; i < ERTS_PSFLG_MAX && psflg; i++) { + for (i = 0; i <= ERTS_PSFLG_MAX && psflg; i++) { erts_aint32_t chk = (1 << i); if (psflg & chk) { switch (chk) { @@ -657,16 +657,12 @@ erts_dump_extended_process_state(int to, void *to_arg, erts_aint32_t psflg) { erts_print(to, to_arg, "PROXY"); break; case ERTS_PSFLG_DELAYED_SYS: erts_print(to, to_arg, "DELAYED_SYS"); break; -#ifdef ERTS_DIRTY_SCHEDULERS case ERTS_PSFLG_DIRTY_CPU_PROC: erts_print(to, to_arg, "DIRTY_CPU_PROC"); break; case ERTS_PSFLG_DIRTY_IO_PROC: erts_print(to, to_arg, "DIRTY_IO_PROC"); break; - case ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q: - erts_print(to, to_arg, "DIRTY_CPU_PROC_IN_Q"); break; - case ERTS_PSFLG_DIRTY_IO_PROC_IN_Q: - erts_print(to, to_arg, "DIRTY_IO_PROC_IN_Q"); break; -#endif + case ERTS_PSFLG_DIRTY_ACTIVE_SYS: + erts_print(to, to_arg, "DIRTY_ACTIVE_SYS"); break; default: erts_print(to, to_arg, "UNKNOWN(%d)", chk); break; } |