diff options
Diffstat (limited to 'erts/emulator/beam')
23 files changed, 2206 insertions, 2294 deletions
diff --git a/erts/emulator/beam/beam_bp.c b/erts/emulator/beam/beam_bp.c index f4e7119d4d..a6e00f87e0 100644 --- a/erts/emulator/beam/beam_bp.c +++ b/erts/emulator/beam/beam_bp.c @@ -713,7 +713,7 @@ void erts_trace_time_break(Process *p, BeamInstr *pc, BpDataTime *bdt, Uint type BpDataTime *pbdt = NULL; ASSERT(p); - ASSERT(p->status == P_RUNNING); + ASSERT(ERTS_PSFLG_RUNNING & erts_smp_atomic32_read_acqb(&p->state)); /* get previous timestamp and breakpoint * from the process psd */ diff --git a/erts/emulator/beam/beam_emu.c b/erts/emulator/beam/beam_emu.c index 5ca3e8060b..b831147295 100644 --- a/erts/emulator/beam/beam_emu.c +++ b/erts/emulator/beam/beam_emu.c @@ -1875,13 +1875,12 @@ void process_main(void) msgp = PEEK_MESSAGE(c_p); if (msgp) erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); - else { + else #endif + { SET_I((BeamInstr *) Arg(0)); Goto(*I); /* Jump to a wait or wait_timeout instruction */ -#ifdef ERTS_SMP } -#endif } ErtsMoveMsgAttachmentIntoProc(msgp, c_p, E, HTOP, FCALLS, { @@ -2110,11 +2109,11 @@ void process_main(void) OpCase(wait_f): wait2: { - ASSERT(!ERTS_PROC_IS_EXITING(c_p)); c_p->i = (BeamInstr *) Arg(0); /* L1 */ SWAPOUT; c_p->arity = 0; - c_p->status = P_WAITING; + erts_smp_atomic32_read_band_relb(&c_p->state, ~ERTS_PSFLG_ACTIVE); + ASSERT(!ERTS_PROC_IS_EXITING(c_p)); erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); c_p->current = NULL; goto do_schedule; @@ -3193,10 +3192,6 @@ void process_main(void) c_p->arg_reg[0] = r(0); SWAPOUT; c_p->i = I; - erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); - if (c_p->status != P_SUSPENDED) - erts_add_to_runq(c_p); - erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS); goto do_schedule1; } @@ -5159,9 +5154,6 @@ void process_main(void) c_p->arity = 1; /* One living register (the 'true' return value) */ SWAPOUT; c_p->i = I + 1; /* Next instruction */ - erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); - erts_add_to_runq(c_p); - erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS); c_p->current = NULL; goto do_schedule; } @@ -6255,9 +6247,7 @@ erts_hibernate(Process* c_p, Eterm module, Eterm function, Eterm args, Eterm* re */ erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS); ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p); - if (c_p->msg.len > 0) { - erts_add_to_runq(c_p); - } else { + if (!c_p->msg.len) { erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS); c_p->fvalue = NIL; PROCESS_MAIN_CHK_LOCKS(c_p); @@ -6265,14 +6255,12 @@ erts_hibernate(Process* c_p, Eterm module, Eterm function, Eterm args, Eterm* re ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); PROCESS_MAIN_CHK_LOCKS(c_p); erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS); - ASSERT(!ERTS_PROC_IS_EXITING(c_p)); #ifdef ERTS_SMP ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p); - if (c_p->msg.len > 0) - erts_add_to_runq(c_p); - else + if (!c_p->msg.len) #endif - c_p->status = P_WAITING; + erts_smp_atomic32_read_band_relb(&c_p->state, ~ERTS_PSFLG_ACTIVE); + ASSERT(!ERTS_PROC_IS_EXITING(c_p)); } erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS); c_p->current = bif_export[BIF_hibernate_3]->code; diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index dd9488afa6..160b1ef0fc 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -233,15 +233,17 @@ BIF_RETTYPE link_1(BIF_ALIST_1) BIF_ERROR(BIF_P, BADARG); - res_no_proc: - if (BIF_P->flags & F_TRAPEXIT) { - ErtsProcLocks locks = ERTS_PROC_LOCK_MAIN; - erts_deliver_exit_message(BIF_ARG_1, BIF_P, &locks, am_noproc, NIL); - erts_smp_proc_unlock(BIF_P, ~ERTS_PROC_LOCK_MAIN & locks); - BIF_RET(am_true); +res_no_proc: { + erts_aint32_t state = erts_smp_atomic32_read_nob(&BIF_P->state); + if (state & ERTS_PSFLG_TRAP_EXIT) { + ErtsProcLocks locks = ERTS_PROC_LOCK_MAIN; + erts_deliver_exit_message(BIF_ARG_1, BIF_P, &locks, am_noproc, NIL); + erts_smp_proc_unlock(BIF_P, ~ERTS_PROC_LOCK_MAIN & locks); + BIF_RET(am_true); + } + else + BIF_ERROR(BIF_P, EXC_NOPROC); } - else - BIF_ERROR(BIF_P, EXC_NOPROC); } #define ERTS_DEMONITOR_FALSE 2 @@ -1103,8 +1105,9 @@ BIF_RETTYPE hibernate_3(BIF_ALIST_3) if (erts_hibernate(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, reg)) { /* - * If hibernate succeeded, TRAP. The process will be suspended - * if status is P_WAITING or continue (if any message was in the queue). + * If hibernate succeeded, TRAP. The process will be wait in a + * hibernated state if its state is inactive (!ERTS_PSFLG_ACTIVE); + * otherwise, continue executing (if any message was in the queue). */ BIF_TRAP_CODE_PTR_(BIF_P, BIF_P->i); } @@ -1499,14 +1502,13 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2) BIF_RET(old_value); } else if (BIF_ARG_1 == am_priority) { - erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_STATUS); old_value = erts_set_process_priority(BIF_P, BIF_ARG_2); - erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_STATUS); if (old_value == THE_NON_VALUE) goto error; BIF_RET(old_value); } else if (BIF_ARG_1 == am_trap_exit) { + erts_aint32_t state; Uint trap_exit; if (BIF_ARG_2 == am_true) { trap_exit = 1; @@ -1521,59 +1523,52 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2) * For more info, see implementation of erts_send_exit_signal(). */ erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_STATUS); + ERTS_SMP_LC_ASSERT((ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS) + & erts_proc_lc_my_proc_locks(BIF_P)); ERTS_SMP_BIF_CHK_PENDING_EXIT(BIF_P, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); - old_value = ERTS_PROC_IS_TRAPPING_EXITS(BIF_P) ? am_true : am_false; - if (trap_exit) { - ERTS_PROC_SET_TRAP_EXIT(BIF_P); - } else { - ERTS_PROC_UNSET_TRAP_EXIT(BIF_P); - } + if (trap_exit) + state = erts_smp_atomic32_read_bor_nob(&BIF_P->state, + ERTS_PSFLG_TRAP_EXIT); + else + state = erts_smp_atomic32_read_band_nob(&BIF_P->state, + ~ERTS_PSFLG_TRAP_EXIT); + old_value = (state & ERTS_PSFLG_TRAP_EXIT) ? am_true : am_false; erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_STATUS); BIF_RET(old_value); } else if (BIF_ARG_1 == am_scheduler) { - int yield; - ErtsRunQueue *old; - ErtsRunQueue *new; + ErtsRunQueue *old, *new, *curr; Sint sched; + erts_aint32_t state; + if (!is_small(BIF_ARG_2)) goto error; sched = signed_val(BIF_ARG_2); if (sched < 0 || erts_no_schedulers < sched) goto error; - erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_STATUS); - old = BIF_P->bound_runq; -#ifdef ERTS_SMP - ASSERT(!old || old == BIF_P->run_queue); -#endif - new = !sched ? NULL : erts_schedid2runq(sched); -#ifndef ERTS_SMP - yield = 0; -#else - if (new == old) - yield = 0; + + if (sched == 0) { + new = NULL; + state = erts_smp_atomic32_read_band_mb(&BIF_P->state, + ~ERTS_PSFLG_BOUND); + } else { - ErtsRunQueue *curr = BIF_P->run_queue; - if (!new) - erts_smp_runq_lock(curr); - else - erts_smp_runqs_lock(curr, new); - yield = new && BIF_P->run_queue != new; -#endif - BIF_P->bound_runq = new; + new = erts_schedid2runq(sched); #ifdef ERTS_SMP - if (new) - BIF_P->run_queue = new; - if (!new) - erts_smp_runq_unlock(curr); - else - erts_smp_runqs_unlock(curr, new); - } + erts_atomic_set_nob(&BIF_P->run_queue, (erts_aint_t) new); #endif - erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_STATUS); + state = erts_smp_atomic32_read_bor_mb(&BIF_P->state, + ERTS_PSFLG_BOUND); + } + + curr = ERTS_GET_SCHEDULER_DATA_FROM_PROC(BIF_P)->run_queue; + old = (ERTS_PSFLG_BOUND & state) ? curr : NULL; + + ASSERT(!old || old == curr); + old_value = old ? make_small(old->ix+1) : make_small(0); - if (yield) + if (new && new != curr) ERTS_BIF_YIELD_RETURN_X(BIF_P, old_value, am_scheduler); else BIF_RET(old_value); diff --git a/erts/emulator/beam/break.c b/erts/emulator/beam/break.c index 0a5fc26ee8..9cb5f2cc16 100644 --- a/erts/emulator/beam/break.c +++ b/erts/emulator/beam/break.c @@ -73,8 +73,8 @@ process_info(int to, void *to_arg) for (i = 0; i < erts_max_processes; i++) { Process *p = erts_pix2proc(i); if (p && p->i != ENULL) { - if (p->status != P_EXITING) - print_process_info(to, to_arg, p); + if (!ERTS_PROC_IS_EXITING(p)) + print_process_info(to, to_arg, p); } } @@ -99,11 +99,20 @@ process_killer(void) if ((j = sys_get_key(0)) <= 0) erl_exit(0, ""); switch(j) { - case 'k': - if (rp->status == P_WAITING) { - ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; - erts_smp_proc_inc_refc(rp); - erts_smp_proc_lock(rp, rp_locks); + case 'k': { + ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; + erts_aint32_t state; + erts_smp_proc_inc_refc(rp); + erts_smp_proc_lock(rp, rp_locks); + state = erts_smp_atomic32_read_acqb(&rp->state); + if (state & (ERTS_PSFLG_FREE + | ERTS_PSFLG_EXITING + | ERTS_PSFLG_ACTIVE + | ERTS_PSFLG_IN_RUNQ + | ERTS_PSFLG_RUNNING)) { + erts_printf("Can only kill WAITING processes this way\n"); + } + else { (void) erts_send_exit_signal(NULL, NIL, rp, @@ -112,12 +121,10 @@ process_killer(void) NIL, NULL, 0); - erts_smp_proc_unlock(rp, rp_locks); - erts_smp_proc_dec_refc(rp); } - else - erts_printf("Can only kill WAITING processes this way\n"); - + erts_smp_proc_unlock(rp, rp_locks); + erts_smp_proc_dec_refc(rp); + } case 'n': br = 1; break; case 'r': return; default: return; @@ -186,38 +193,34 @@ print_process_info(int to, void *to_arg, Process *p) int garbing = 0; int running = 0; struct saved_calls *scb; + erts_aint32_t state; /* display the PID */ erts_print(to, to_arg, "=proc:%T\n", p->id); /* Display the state */ erts_print(to, to_arg, "State: "); - switch (p->status) { - case P_FREE: + + state = erts_smp_atomic32_read_acqb(&p->state); + if (state & ERTS_PSFLG_FREE) erts_print(to, to_arg, "Non Existing\n"); /* Should never happen */ - break; - case P_RUNABLE: - erts_print(to, to_arg, "Scheduled\n"); - break; - case P_WAITING: - erts_print(to, to_arg, "Waiting\n"); - break; - case P_SUSPENDED: - erts_print(to, to_arg, "Suspended\n"); - break; - case P_RUNNING: - erts_print(to, to_arg, "Running\n"); - running = 1; - break; - case P_EXITING: + else if (state & ERTS_PSFLG_EXITING) erts_print(to, to_arg, "Exiting\n"); - break; - case P_GARBING: - erts_print(to, to_arg, "Garbing\n"); + else if (state & ERTS_PSFLG_GC) { garbing = 1; running = 1; - break; + erts_print(to, to_arg, "Garbing\n"); + } + else if (state & ERTS_PSFLG_SUSPENDED) + erts_print(to, to_arg, "Suspended\n"); + else if (state & ERTS_PSFLG_RUNNING) { + running = 1; + erts_print(to, to_arg, "Running\n"); } + else if (state & ERTS_PSFLG_ACTIVE) + erts_print(to, to_arg, "Scheduled\n"); + else + erts_print(to, to_arg, "Waiting\n"); /* * If the process is registered as a global process, display the diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index b30e6dc018..d1e3b4b0ef 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -336,6 +336,8 @@ type SL_PTIMER SHORT_LIVED SYSTEM ptimer_sl type LL_PTIMER STANDARD SYSTEM ptimer_ll type SYS_MSG_Q SHORT_LIVED PROCESSES system_messages_queue type FP_EXCEPTION LONG_LIVED SYSTEM fp_exception +type LL_MPATHS LONG_LIVED SYSTEM ll_migration_paths +type SL_MPATHS SHORT_LIVED SYSTEM sl_migration_paths +endif +if hipe diff --git a/erts/emulator/beam/erl_alloc_util.c b/erts/emulator/beam/erl_alloc_util.c index e0d525bdde..62225d3572 100644 --- a/erts/emulator/beam/erl_alloc_util.c +++ b/erts/emulator/beam/erl_alloc_util.c @@ -832,14 +832,14 @@ init_dd_queue(ErtsAllctrDDQueue_t *ddq) static ERTS_INLINE erts_aint_t ddq_managed_thread_enqueue(ErtsAllctrDDQueue_t *ddq, void *ptr) { - erts_aint_t ilast, itmp; + erts_aint_t first_ilast, ilast, itmp; ErtsAllctrDDBlock_t *this = ptr; erts_atomic_init_nob(&this->atmc_next, ERTS_AINT_NULL); /* Enqueue at end of list... */ - ilast = erts_atomic_read_nob(&ddq->tail.data.last); + first_ilast = ilast = erts_atomic_read_nob(&ddq->tail.data.last); while (1) { ErtsAllctrDDBlock_t *last = (ErtsAllctrDDBlock_t *) ilast; itmp = erts_atomic_cmpxchg_mb(&last->atmc_next, @@ -853,8 +853,11 @@ ddq_managed_thread_enqueue(ErtsAllctrDDQueue_t *ddq, void *ptr) /* Move last pointer forward... */ while (1) { if (erts_atomic_read_rb(&this->atmc_next) != ERTS_AINT_NULL) { - /* Someone else will move it forward */ - return erts_atomic_read_rb(&ddq->tail.data.last); + ilast = erts_atomic_read_rb(&ddq->tail.data.last); + if (first_ilast != ilast) { + /* Someone else will move it forward */ + return ilast; + } } itmp = erts_atomic_cmpxchg_mb(&ddq->tail.data.last, (erts_aint_t) this, diff --git a/erts/emulator/beam/erl_bif_ddll.c b/erts/emulator/beam/erl_bif_ddll.c index c338ee1c4b..7f7c975e78 100644 --- a/erts/emulator/beam/erl_bif_ddll.c +++ b/erts/emulator/beam/erl_bif_ddll.c @@ -368,13 +368,11 @@ BIF_RETTYPE erl_ddll_try_load_3(BIF_ALIST_3) #endif for (j = 0; j < erts_max_ports; j++) { Port* prt = &erts_port[j]; -#ifdef DDLL_SMP erts_smp_port_state_lock(prt); -#endif if (!(prt->status & FREE_PORT_FLAGS) && prt->drv_ptr->handle == dh) { -#if DDLL_SMP erts_smp_atomic_inc_nob(&prt->refc); +#if DDLL_SMP /* Extremely rare spinlock */ while(prt->status & ERTS_PORT_SFLG_INITIALIZING) { erts_smp_port_state_unlock(prt); @@ -598,13 +596,11 @@ done: #endif for (j = 0; j < erts_max_ports; j++) { Port* prt = &erts_port[j]; -#if DDLL_SMP erts_smp_port_state_lock(prt); -#endif if (!(prt->status & FREE_PORT_FLAGS) && prt->drv_ptr->handle == dh) { -#if DDLL_SMP erts_smp_atomic_inc_nob(&prt->refc); +#if DDLL_SMP /* Extremely rare spinlock */ while(prt->status & ERTS_PORT_SFLG_INITIALIZING) { erts_smp_port_state_unlock(prt); @@ -1060,13 +1056,11 @@ void erts_ddll_proc_dead(Process *p, ErtsProcLocks plocks) #endif for (j = 0; j < erts_max_ports; j++) { Port* prt = &erts_port[j]; -#if DDLL_SMP erts_smp_port_state_lock(prt); -#endif if (!(prt->status & FREE_PORT_FLAGS) && prt->drv_ptr->handle == dh) { -#if DDLL_SMP erts_smp_atomic_inc_nob(&prt->refc); +#if DDLL_SMP while(prt->status & ERTS_PORT_SFLG_INITIALIZING) { erts_smp_port_state_unlock(prt); erts_smp_port_state_lock(prt); diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 8117b1bfaa..90ddaa4fac 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -1355,13 +1355,15 @@ process_info_aux(Process *BIF_P, hp = HAlloc(BIF_P, 3); break; - case am_trap_exit: + case am_trap_exit: { + erts_aint32_t state = erts_smp_atomic32_read_nob(&rp->state); hp = HAlloc(BIF_P, 3); - if (rp->flags & F_TRAPEXIT) + if (state & ERTS_PSFLG_TRAP_EXIT) res = am_true; else res = am_false; break; + } case am_error_handler: hp = HAlloc(BIF_P, 3); @@ -3144,14 +3146,8 @@ BIF_RETTYPE is_process_alive_1(BIF_ALIST_1) BIF_RET(am_false); } else { - int px; -#ifdef ERTS_SMP - px = (erts_atomic32_read_acqb(&rp->aflags) - & (ERTS_PROC_AFLG_PENDING_EXIT|ERTS_PROC_AFLG_EXITING)); -#else - px = 0; -#endif - if (px) + if (erts_smp_atomic32_read_acqb(&rp->state) + & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) ERTS_BIF_AWAIT_X_DATA_TRAP(BIF_P, BIF_ARG_1, am_false); else BIF_RET(am_true); diff --git a/erts/emulator/beam/erl_cpu_topology.c b/erts/emulator/beam/erl_cpu_topology.c index fe3693d0ca..41e71881a0 100644 --- a/erts/emulator/beam/erl_cpu_topology.c +++ b/erts/emulator/beam/erl_cpu_topology.c @@ -486,7 +486,7 @@ erts_sched_check_cpu_bind_post_suspend(ErtsSchedulerData *esdp) erts_thr_set_main_status(1, (int) esdp->no); /* Make sure we check if we should bind to a cpu or not... */ - esdp->run_queue->flags |= ERTS_RUNQ_FLG_CHK_CPU_BIND; + ERTS_RUNQ_FLGS_SET(esdp->run_queue, ERTS_RUNQ_FLG_CHK_CPU_BIND); } #endif @@ -498,9 +498,6 @@ erts_sched_check_cpu_bind(ErtsSchedulerData *esdp) erts_cpu_groups_map_t *cgm; erts_cpu_groups_callback_list_t *cgcl; erts_cpu_groups_callback_call_t *cgcc; -#ifdef ERTS_SMP - esdp->run_queue->flags &= ~ERTS_RUNQ_FLG_CHK_CPU_BIND; -#endif erts_smp_runq_unlock(esdp->run_queue); erts_smp_rwmtx_rwlock(&cpuinfo_rwmtx); cpu_id = scheduler2cpu_map[esdp->no].bind_id; diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c index 52a6e52e6c..6075a527c3 100644 --- a/erts/emulator/beam/erl_gc.c +++ b/erts/emulator/beam/erl_gc.c @@ -357,11 +357,7 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) trace_gc(p, am_gc_start); } - erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); - p->gcstatus = p->status; - p->status = P_GARBING; - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); - + erts_smp_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC); if (erts_system_monitor_long_gc != 0) { get_now(&ms1, &s1, &us1); } @@ -404,9 +400,8 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) ErtsGcQuickSanityCheck(p); - erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); - p->status = p->gcstatus; - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); + erts_smp_atomic32_read_band_nob(&p->state, ~ERTS_PSFLG_GC); + if (IS_TRACED_FL(p, F_TRACE_GC)) { trace_gc(p, am_gc_end); } @@ -490,10 +485,7 @@ erts_garbage_collect_hibernate(Process* p) /* * Preliminaries. */ - erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); - p->gcstatus = p->status; - p->status = P_GARBING; - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); + erts_smp_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC); ErtsGcQuickSanityCheck(p); ASSERT(p->mbuf_sz == 0); ASSERT(p->mbuf == 0); @@ -604,9 +596,7 @@ erts_garbage_collect_hibernate(Process* p) ErtsGcQuickSanityCheck(p); - erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); - p->status = p->gcstatus; - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); + erts_smp_atomic32_read_band_nob(&p->state, ~ERTS_PSFLG_GC); } @@ -630,10 +620,7 @@ erts_garbage_collect_literals(Process* p, Eterm* literals, /* * Set GC state. */ - erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); - p->gcstatus = p->status; - p->status = P_GARBING; - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); + erts_smp_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC); /* * We assume that the caller has already done a major collection @@ -775,9 +762,7 @@ erts_garbage_collect_literals(Process* p, Eterm* literals, /* * Restore status. */ - erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); - p->status = p->gcstatus; - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); + erts_smp_atomic32_read_band_nob(&p->state, ~ERTS_PSFLG_GC); } static int diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c index 306cb652c4..175695e856 100644 --- a/erts/emulator/beam/erl_lock_check.c +++ b/erts/emulator/beam/erl_lock_check.c @@ -82,7 +82,6 @@ static erts_lc_lock_order_t erts_lock_order[] = { #ifdef ERTS_SMP { "bif_timers", NULL }, { "reg_tab", NULL }, - { "migration_info_update", NULL }, { "proc_main", "pid" }, #ifdef HIPE { "hipe_mfait_lock", NULL }, @@ -124,6 +123,7 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "removed_fd_pre_alloc_lock", "address" }, { "state_prealloc", NULL }, { "schdlr_sspnd", NULL }, + { "migration_info_update", NULL }, { "run_queue", "address" }, { "cpu_info", NULL }, { "pollset", "address" }, diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 4a797fbeae..b10964da52 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -297,38 +297,6 @@ erts_msg_distext2heap(Process *pp, return THE_NON_VALUE; } -static ERTS_INLINE void -notify_new_message(Process *receiver) -{ - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS - & erts_proc_lc_my_proc_locks(receiver)); - - ACTIVATE(receiver); - - switch (receiver->status) { - case P_GARBING: - switch (receiver->gcstatus) { - case P_SUSPENDED: - goto suspended; - case P_WAITING: - goto waiting; - default: - break; - } - break; - case P_SUSPENDED: - suspended: - receiver->rstatus = P_RUNABLE; - break; - case P_WAITING: - waiting: - erts_add_to_runq(receiver); - break; - default: - break; - } -} - void erts_queue_dist_message(Process *rcvr, ErtsProcLocks *rcvr_locks, @@ -342,7 +310,7 @@ erts_queue_dist_message(Process *rcvr, Sint tok_serial = 0; #endif #ifdef ERTS_SMP - ErtsProcLocks need_locks; + erts_aint_t state; #endif ERTS_SMP_LC_ASSERT(*rcvr_locks == erts_proc_lc_my_proc_locks(rcvr)); @@ -350,20 +318,21 @@ erts_queue_dist_message(Process *rcvr, mp = message_alloc(); #ifdef ERTS_SMP - need_locks = ~(*rcvr_locks) & (ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS); - if (need_locks) { - *rcvr_locks |= need_locks; - if (erts_smp_proc_trylock(rcvr, need_locks) == EBUSY) { - if (need_locks == ERTS_PROC_LOCK_MSGQ) { + if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) { + if (erts_smp_proc_trylock(rcvr, ERTS_PROC_LOCK_MSGQ) == EBUSY) { + ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ; + if (*rcvr_locks & ERTS_PROC_LOCK_STATUS) { erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_STATUS); - need_locks = (ERTS_PROC_LOCK_MSGQ - | ERTS_PROC_LOCK_STATUS); + need_locks |= ERTS_PROC_LOCK_STATUS; } erts_smp_proc_lock(rcvr, need_locks); } } - if (ERTS_PROC_IS_EXITING(rcvr) || ERTS_PROC_PENDING_EXIT(rcvr)) { + state = erts_smp_atomic32_read_acqb(&rcvr->state); + if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) { + if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) + erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); /* Drop message if receiver is exiting or has a pending exit ... */ if (is_not_nil(token)) { ErlHeapFragment *heap_frag; @@ -379,6 +348,8 @@ erts_queue_dist_message(Process *rcvr, /* Ahh... need to decode it in order to trace it... */ ErlHeapFragment *mbuf; Eterm msg; + if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) + erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); message_free(mp); msg = erts_msg_distext2heap(rcvr, rcvr_locks, &mbuf, &token, dist_ext); if (is_value(msg)) @@ -440,26 +411,32 @@ erts_queue_dist_message(Process *rcvr, mp->data.dist_ext = dist_ext; LINK_MESSAGE(rcvr, mp); - notify_new_message(rcvr); + if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) + erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); + + erts_proc_notify_new_message(rcvr); } } /* Add a message last in message queue */ -void -erts_queue_message(Process* receiver, - ErtsProcLocks *receiver_locks, - ErlHeapFragment* bp, - Eterm message, - Eterm seq_trace_token +static int +queue_message(Process *c_p, + Process* receiver, + ErtsProcLocks *receiver_locks, + erts_aint32_t *receiver_state, + ErlHeapFragment* bp, + Eterm message, + Eterm seq_trace_token #ifdef USE_VM_PROBES , Eterm dt_utag #endif -) + ) { ErlMessage* mp; -#ifdef ERTS_SMP - ErtsProcLocks need_locks; -#else + int locked_msgq = 0; + erts_aint_t state; + +#ifndef ERTS_SMP ASSERT(bp != NULL || receiver->mbuf == NULL); #endif @@ -467,31 +444,45 @@ erts_queue_message(Process* receiver, mp = message_alloc(); + if (receiver_state) + state = *receiver_state; + else + state = erts_smp_atomic32_read_acqb(&receiver->state); + #ifdef ERTS_SMP - need_locks = ~(*receiver_locks) & (ERTS_PROC_LOCK_MSGQ - | ERTS_PROC_LOCK_STATUS); - if (need_locks) { - *receiver_locks |= need_locks; - if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) { - if (need_locks == ERTS_PROC_LOCK_MSGQ) { + + if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) + goto exiting; + + if (!(*receiver_locks & ERTS_PROC_LOCK_MSGQ)) { + if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) { + ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ; + if (*receiver_locks & ERTS_PROC_LOCK_STATUS) { erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS); - need_locks = (ERTS_PROC_LOCK_MSGQ - | ERTS_PROC_LOCK_STATUS); + need_locks |= ERTS_PROC_LOCK_STATUS; } erts_smp_proc_lock(receiver, need_locks); } + locked_msgq = 1; + state = erts_smp_atomic32_read_nob(&receiver->state); + if (receiver_state) + *receiver_state = state; } - if (ERTS_PROC_IS_EXITING(receiver) || ERTS_PROC_PENDING_EXIT(receiver)) { - /* Drop message if receiver is exiting or has a pending - * exit ... - */ +#endif + + if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) { +#ifdef ERTS_SMP + exiting: +#endif + /* Drop message if receiver is exiting or has a pending exit... */ + if (locked_msgq) + erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ); if (bp) free_message_buffer(bp); message_free(mp); - return; + return 0; } -#endif ERL_MESSAGE_TERM(mp) = message; ERL_MESSAGE_TOKEN(mp) = seq_trace_token; @@ -514,12 +505,11 @@ erts_queue_message(Process* receiver, ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); LINK_MESSAGE_PRIVQ(receiver, mp); } - else { + else +#endif + { LINK_MESSAGE(receiver, mp); } -#else - LINK_MESSAGE(receiver, mp); -#endif #ifdef USE_VM_PROBES if (DTRACE_ENABLED(message_queued)) { @@ -539,15 +529,43 @@ erts_queue_message(Process* receiver, tok_label, tok_lastcnt, tok_serial); } #endif - notify_new_message(receiver); - if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) trace_receive(receiver, message); - } + + if (locked_msgq) + erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ); + + erts_proc_notify_new_message(receiver); #ifndef ERTS_SMP ERTS_HOLE_CHECK(receiver); #endif + return 0; +} + +void +erts_queue_message(Process* receiver, + ErtsProcLocks *receiver_locks, + ErlHeapFragment* bp, + Eterm message, + Eterm seq_trace_token +#ifdef USE_VM_PROBES + , Eterm dt_utag +#endif + ) +{ + queue_message(NULL, + receiver, + receiver_locks, + NULL, + bp, + message, + seq_trace_token +#ifdef USE_VM_PROBES + , dt_utag +#endif + ); } void @@ -1025,11 +1043,8 @@ erts_send_message(Process* sender, LINK_MESSAGE(receiver, mp); ACTIVATE(receiver); - if (receiver->status == P_WAITING) { - erts_add_to_runq(receiver); - } else if (receiver->status == P_SUSPENDED) { - receiver->rstatus = P_RUNABLE; - } + erts_proc_notify_new_message(receiver); + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { trace_receive(receiver, message); } @@ -1089,21 +1104,34 @@ erts_send_message(Process* sender, #ifdef ERTS_SMP ErlOffHeap *ohp; Eterm *hp; + erts_aint32_t state; + BM_SWAP_TIMER(send,size); msize = size_object(message); BM_SWAP_TIMER(size,send); - hp = erts_alloc_message_heap(msize,&bp,&ohp,receiver,receiver_locks); + hp = erts_alloc_message_heap_state(msize, + &bp, + &ohp, + receiver, + receiver_locks, + &state); BM_SWAP_TIMER(send,copy); message = copy_struct(message, msize, &hp, ohp); BM_MESSAGE_COPIED(msz); BM_SWAP_TIMER(copy,send); DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); - erts_queue_message(receiver, receiver_locks, bp, message, token + queue_message(sender, + receiver, + receiver_locks, + &state, + bp, + message, + token #ifdef USE_VM_PROBES - , NIL + , NIL #endif - ); + ); BM_SWAP_TIMER(send,system); #else ErlMessage* mp = message_alloc(); @@ -1134,17 +1162,13 @@ erts_send_message(Process* sender, mp->data.attached = NULL; LINK_MESSAGE(receiver, mp); - if (receiver->status == P_WAITING) { - erts_add_to_runq(receiver); - } else if (receiver->status == P_SUSPENDED) { - receiver->rstatus = P_RUNABLE; - } + erts_proc_notify_new_message(receiver); + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { trace_receive(receiver, message); } BM_SWAP_TIMER(send,system); #endif /* #ifndef ERTS_SMP */ - return; #endif /* HYBRID */ } } diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index 0f1a0d441a..86454fe1fa 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -56,12 +56,6 @@ #define ERTS_PORT_IS_IN_RUNQ(RQ, P) \ ((P)->sched.next || (P)->sched.prev || (RQ)->ports.start == (P)) -#define ERTS_PORT_NOT_IN_RUNQ(P) \ -do { \ - (P)->sched.prev = NULL; \ - (P)->sched.next = NULL; \ -} while (0) - #ifdef USE_VM_PROBES #define DTRACE_DRIVER(PROBE_NAME, PP) \ if (DTRACE_ENABLED(driver_ready_input)) { \ @@ -167,7 +161,7 @@ enqueue_port(ErtsRunQueue *runq, Port *pp) { ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); pp->sched.next = NULL; - pp->sched.prev = runq->ports.end; + pp->sched.in_runq = 1; if (runq->ports.end) { ASSERT(runq->ports.start); runq->ports.end->sched.next = pp; @@ -177,39 +171,10 @@ enqueue_port(ErtsRunQueue *runq, Port *pp) runq->ports.start = pp; } - runq->ports.info.len++; - if (runq->ports.info.max_len < runq->ports.info.len) - runq->ports.info.max_len = runq->ports.info.len; - runq->len++; - if (runq->max_len < runq->len) - runq->max_len = runq->len; runq->ports.end = pp; ASSERT(runq->ports.start && runq->ports.end); -} -static ERTS_INLINE void -dequeue_port(ErtsRunQueue *runq, Port *pp) -{ - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); - if (pp->sched.next) - pp->sched.next->sched.prev = pp->sched.prev; - else { - ASSERT(runq->ports.end == pp); - runq->ports.end = pp->sched.prev; - } - if (pp->sched.prev) - pp->sched.prev->sched.next = pp->sched.next; - else { - ASSERT(runq->ports.start == pp); - runq->ports.start = pp->sched.next; - } - - ASSERT(runq->ports.info.len > 0); - runq->ports.info.len--; - ASSERT(runq->len > 0); - runq->len--; - ASSERT(runq->ports.start || !runq->ports.end); - ASSERT(runq->ports.end || !runq->ports.start); + erts_smp_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL); } static ERTS_INLINE Port * @@ -222,16 +187,11 @@ pop_port(ErtsRunQueue *runq) } else { runq->ports.start = runq->ports.start->sched.next; - if (runq->ports.start) - runq->ports.start->sched.prev = NULL; - else { + if (!runq->ports.start) { ASSERT(runq->ports.end == pp); runq->ports.end = NULL; } - ASSERT(runq->ports.info.len > 0); - runq->ports.info.len--; - ASSERT(runq->len > 0); - runq->len--; + erts_smp_dec_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL); } ASSERT(runq->ports.start || !runq->ports.end); @@ -285,7 +245,7 @@ check_port_queue(ErtsRunQueue *runq, Port *chk_pp, int inq) } ASSERT(no_forward == no_backward); } - ASSERT(no_forward == runq->ports.info.len); + ASSERT(no_forward == RUNQ_READ_LEN(&runq->ports.info.len)); if (chk_pp) { if (chk_pp->sched.taskq || chk_pp->sched.exe_taskq) { ASSERT(chk_pp->sched.taskq != chk_pp->sched.exe_taskq); @@ -467,10 +427,11 @@ erts_port_task_abort(Eterm id, ErtsPortTaskHandle *pthp) ErtsPortTaskQueue *ptqp; ErtsPortTask *ptp; Port *pp; - int port_is_dequeued = 0; pp = &erts_port[internal_port_index(id)]; runq = erts_port_runq(pp); + if (!runq) + return 1; ptp = handle2task(pthp); @@ -505,22 +466,12 @@ erts_port_task_abort(Eterm id, ErtsPortTaskHandle *pthp) if (ptqp->first || pp->sched.taskq != ptqp) ptqp = NULL; - else { + else pp->sched.taskq = NULL; - if (!pp->sched.exe_taskq) { - dequeue_port(runq, pp); - ERTS_PORT_NOT_IN_RUNQ(pp); - port_is_dequeued = 1; - } - } ERTS_PT_CHK_PRES_PORTQ(runq, pp); erts_smp_runq_unlock(runq); - - if (erts_system_profile_flags.runnable_ports && port_is_dequeued) { - profile_runnable_port(pp, am_inactive); - } port_task_free(ptp); if (ptqp) @@ -575,7 +526,7 @@ erts_port_task_schedule(Eterm id, if (!pp->sched.taskq) { pp->sched.taskq = port_taskq_init(port_taskq_alloc(), pp); - enq_port = !pp->sched.exe_taskq; + enq_port = !pp->sched.in_runq && !pp->sched.exe_taskq; } #ifdef ERTS_SMP @@ -585,13 +536,13 @@ erts_port_task_schedule(Eterm id, /* Port emigrated ... */ erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); erts_smp_runq_unlock(runq); - runq = xrunq; + runq = erts_port_runq(pp); + if (!runq) + return -1; } } #endif - ASSERT(!enq_port || !(runq->flags & ERTS_RUNQ_FLG_SUSPENDED)); - ASSERT(pp->sched.taskq); ASSERT(ptp); @@ -633,7 +584,7 @@ erts_port_task_schedule(Eterm id, #elif defined(DEBUG) if (!enq_port && !pp->sched.exe_taskq) { /* We should be in port run q */ - ASSERT(pp->sched.prev || runq->ports.start == pp); + ASSERT(pp->sched.in_runq); } #endif #endif @@ -661,63 +612,54 @@ void erts_port_task_free_port(Port *pp) { ErtsRunQueue *runq; - int port_is_dequeued = 0; + ErtsPortTaskQueue *ptqp; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); ASSERT(!(pp->status & ERTS_PORT_SFLGS_DEAD)); runq = erts_port_runq(pp); ASSERT(runq); ERTS_PT_CHK_PRES_PORTQ(runq, pp); - if (pp->sched.exe_taskq) { + ptqp = pp->sched.exe_taskq; + if (ptqp) { /* I (this thread) am currently executing this port, free it when scheduled out... */ - ErtsPortTask *ptp = port_task_alloc(); + ErtsPortTask *ptp; + enqueue_free: + ptp = port_task_alloc(); erts_smp_port_state_lock(pp); pp->status &= ~ERTS_PORT_SFLG_CLOSING; pp->status |= ERTS_PORT_SFLG_FREE_SCHEDULED; erts_may_save_closed_port(pp); erts_smp_port_state_unlock(pp); - ERTS_SMP_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 1); + ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 1); ptp->type = ERTS_PORT_TASK_FREE; ptp->event = (ErlDrvEvent) -1; ptp->event_data = NULL; set_handle(ptp, NULL); - push_task(pp->sched.exe_taskq, ptp); + push_task(ptqp, ptp); ERTS_PT_CHK_PRES_PORTQ(runq, pp); erts_smp_runq_unlock(runq); } else { - ErtsPortTaskQueue *ptqp = pp->sched.taskq; - if (ptqp) { - dequeue_port(runq, pp); - ERTS_PORT_NOT_IN_RUNQ(pp); - port_is_dequeued = 1; + if (pp->sched.in_runq) { + ptqp = pp->sched.taskq; + if (!ptqp) + pp->sched.taskq = ptqp = port_taskq_init(port_taskq_alloc(), pp); + goto enqueue_free; } + ASSERT(!pp->sched.taskq); erts_smp_port_state_lock(pp); pp->status &= ~ERTS_PORT_SFLG_CLOSING; pp->status |= ERTS_PORT_SFLG_FREE_SCHEDULED; erts_may_save_closed_port(pp); erts_smp_port_state_unlock(pp); -#ifdef ERTS_SMP erts_smp_atomic_dec_nob(&pp->refc); /* Not alive */ -#endif - ERTS_SMP_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 0); /* Lock */ + ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 0); /* Lock */ handle_remaining_tasks(runq, pp); /* May release runq lock */ ASSERT(!pp->sched.exe_taskq && (!ptqp || !ptqp->first)); pp->sched.taskq = NULL; ERTS_PT_CHK_PRES_PORTQ(runq, pp); -#ifndef ERTS_SMP - ASSERT(pp->status & ERTS_PORT_SFLG_PORT_DEBUG); - erts_port_status_set(pp, ERTS_PORT_SFLG_FREE); -#endif erts_smp_runq_unlock(runq); - - if (erts_system_profile_flags.runnable_ports && port_is_dequeued) { - profile_runnable_port(pp, am_inactive); - } - - if (ptqp) - port_taskq_free(ptqp); } } @@ -738,7 +680,6 @@ typedef struct { int erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) { - int port_was_enqueued = 0; Port *pp; ErtsPortTaskQueue *ptqp; ErtsPortTask *ptp; @@ -757,11 +698,18 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) goto done; } - ERTS_PORT_NOT_IN_RUNQ(pp); + ASSERT(pp->sched.in_runq); + pp->sched.in_runq = 0; + if (!pp->sched.taskq) { + if (erts_system_profile_flags.runnable_ports) + profile_runnable_port(pp, am_inactive); + res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) + != (erts_aint_t) 0); + goto done; + } *curr_port_pp = pp; - ASSERT(pp->sched.taskq); ASSERT(pp->sched.taskq->first); ptqp = pp->sched.taskq; pp->sched.taskq = NULL; @@ -823,12 +771,8 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) handle_remaining_tasks(runq, pp); ASSERT(!ptqp->first && (!pp->sched.taskq || !pp->sched.taskq->first)); -#ifdef ERTS_SMP erts_smp_atomic_dec_nob(&pp->refc); /* Not alive */ - ERTS_SMP_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 0); /* Lock */ -#else - erts_port_status_bor_set(pp, ERTS_PORT_SFLG_FREE); -#endif + ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 0); /* Lock */ port_task_free(ptp); if (pp->sched.taskq) @@ -918,6 +862,8 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) if (!pp->sched.taskq) { ASSERT(pp->sched.exe_taskq); pp->sched.exe_taskq = NULL; + if (erts_system_profile_flags.runnable_ports) + profile_runnable_port(pp, am_inactive); } else { #ifdef ERTS_SMP @@ -940,14 +886,20 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) else { /* Port emigrated ... */ erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); - enqueue_port(xrunq, pp); - ASSERT(pp->sched.exe_taskq); - pp->sched.exe_taskq = NULL; - erts_smp_runq_unlock(xrunq); - erts_smp_notify_inc_runq(xrunq); + erts_smp_runq_unlock(runq); + + xrunq = erts_port_runq(pp); + if (xrunq) { + enqueue_port(xrunq, pp); + ASSERT(pp->sched.exe_taskq); + pp->sched.exe_taskq = NULL; + erts_smp_runq_unlock(xrunq); + erts_smp_notify_inc_runq(xrunq); + } + + erts_smp_runq_lock(runq); } #endif - port_was_enqueued = 1; } res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) @@ -957,10 +909,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) port_taskq_free(ptqp); - if (erts_system_profile_flags.runnable_ports && (port_was_enqueued != 1)) { - profile_runnable_port(pp, am_inactive); - } - /* trace port scheduling, out */ if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { trace_sched_ports(pp, am_out); @@ -1049,63 +997,33 @@ erts_port_is_scheduled(Port *pp) { int res; ErtsRunQueue *runq = erts_port_runq(pp); + if (!runq) + return 0; res = pp->sched.taskq || pp->sched.exe_taskq; erts_smp_runq_unlock(runq); return res; } #ifdef ERTS_SMP +void +erts_enqueue_port(ErtsRunQueue *rq, Port *pp) +{ + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); + ASSERT(rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); + ASSERT(pp->sched.in_runq); + enqueue_port(rq, pp); +} -ErtsMigrateResult -erts_port_migrate(Port *prt, int *prt_locked, - ErtsRunQueue *from_rq, int *from_locked, - ErtsRunQueue *to_rq, int *to_locked) +Port * +erts_dequeue_port(ErtsRunQueue *rq) { - ERTS_SMP_LC_ASSERT(*from_locked); - ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked); - ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked); - - if (!*from_locked || !*to_locked) { - if (from_rq < to_rq) { - if (!*to_locked) { - if (!*from_locked) - erts_smp_runq_lock(from_rq); - erts_smp_runq_lock(to_rq); - } - else if (erts_smp_runq_trylock(from_rq) == EBUSY) { - erts_smp_runq_unlock(to_rq); - erts_smp_runq_lock(from_rq); - erts_smp_runq_lock(to_rq); - } - } - else { - if (!*from_locked) { - if (!*to_locked) - erts_smp_runq_lock(to_rq); - erts_smp_runq_lock(from_rq); - } - else if (erts_smp_runq_trylock(to_rq) == EBUSY) { - erts_smp_runq_unlock(from_rq); - erts_smp_runq_lock(to_rq); - erts_smp_runq_lock(from_rq); - } - } - *to_locked = *from_locked = 1; - } - ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked); - ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked); - - /* Refuse to migrate to a suspended run queue */ - if (to_rq->flags & ERTS_RUNQ_FLG_SUSPENDED) - return ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED; - if (from_rq != (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue)) - return ERTS_MIGRATE_FAILED_RUNQ_CHANGED; - if (!ERTS_PORT_IS_IN_RUNQ(from_rq, prt)) - return ERTS_MIGRATE_FAILED_NOT_IN_RUNQ; - dequeue_port(from_rq, prt); - erts_smp_atomic_set_nob(&prt->run_queue, (erts_aint_t) to_rq); - enqueue_port(to_rq, prt); - return ERTS_MIGRATE_SUCCESS; + Port *pp; + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); + pp = pop_port(rq); + ASSERT(!pp + || rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); + ASSERT(!pp || pp->sched.in_runq); + return pp; } #endif diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h index d7104e1143..fd88b1c1ff 100644 --- a/erts/emulator/beam/erl_port_task.h +++ b/erts/emulator/beam/erl_port_task.h @@ -62,7 +62,7 @@ typedef struct ErtsPortTaskQueue_ ErtsPortTaskQueue; typedef struct { Port *next; - Port *prev; + int in_runq; ErtsPortTaskQueue *taskq; ErtsPortTaskQueue *exe_taskq; } ErtsPortTaskSched; @@ -92,7 +92,7 @@ ERTS_GLB_INLINE void erts_port_task_init_sched(ErtsPortTaskSched *ptsp) { ptsp->next = NULL; - ptsp->prev = NULL; + ptsp->in_runq = 0; ptsp->taskq = NULL; ptsp->exe_taskq = NULL; } @@ -123,13 +123,10 @@ int erts_port_task_schedule(Eterm, ErlDrvEventData); void erts_port_task_free_port(Port *); int erts_port_is_scheduled(Port *); + #ifdef ERTS_SMP -ErtsMigrateResult erts_port_migrate(Port *, - int *, - ErtsRunQueue *, - int *, - ErtsRunQueue *, - int *); +void erts_enqueue_port(ErtsRunQueue *rq, Port *pp); +Port *erts_dequeue_port(ErtsRunQueue *rq); #endif #undef ERTS_INCLUDE_SCHEDULER_INTERNALS #endif /* ERL_PORT_TASK_H__ */ diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 7174991ade..ba46b9011d 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -94,12 +94,44 @@ #define HIGH_BIT (1 << PRIORITY_HIGH) #define NORMAL_BIT (1 << PRIORITY_NORMAL) #define LOW_BIT (1 << PRIORITY_LOW) +#define PORT_BIT (1 << ERTS_PORT_PRIO_LEVEL) -#define ERTS_EMPTY_RUNQ(RQ) \ - ((RQ)->len == 0 && (RQ)->misc.start == NULL) +#define ERTS_EMPTY_RUNQ(RQ) \ + ((ERTS_RUNQ_FLGS_GET_NOB((RQ)) & ERTS_RUNQ_FLGS_QMASK) == 0 \ + && (RQ)->misc.start == NULL) + +#undef RUNQ_READ_RQ +#undef RUNQ_SET_RQ +#define RUNQ_READ_RQ(X) ((ErtsRunQueue *) erts_smp_atomic_read_nob((X))) +#define RUNQ_SET_RQ(X, RQ) erts_smp_atomic_set_nob((X), (erts_aint_t) (RQ)) + +#ifdef DEBUG +# if defined(ARCH_64) && !HALFWORD_HEAP +# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) \ + (RUNQ_SET_RQ((RQP), (0xdeadbeefdead0003LL | ((N) << 4))) +# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) \ +do { \ + ASSERT((RQP) != NULL); \ + ASSERT(((((Uint) (RQP)) & ((Uint) 0x3))) == ((Uint) 0)); \ + ASSERT((((Uint) (RQP)) & ~((Uint) 0xffff)) != ((Uint) 0xdeadbeefdead0000LL));\ +} while (0) +# else +# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) \ + (RUNQ_SET_RQ((RQP), (0xdead0003 | ((N) << 4)))) +# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) \ +do { \ + ASSERT((RQP) != NULL); \ + ASSERT(((((UWord) (RQP)) & ((UWord) 1))) == ((UWord) 0)); \ + ASSERT((((UWord) (RQP)) & ~((UWord) 0xffff)) != ((UWord) 0xdead0000)); \ +} while (0) +# endif +#else +# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) +# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) +#endif #define ERTS_EMPTY_RUNQ_PORTS(RQ) \ - ((RQ)->ports.info.len == 0 && (RQ)->misc.start == NULL) + (RUNQ_READ_LEN(&(RQ)->ports.info.len) == 0 && (RQ)->misc.start == NULL) extern BeamInstr beam_apply[]; extern BeamInstr beam_exit[]; @@ -312,7 +344,7 @@ static struct { struct { int active_runqs; int reds; - int max_len; + erts_aint32_t max_len; } prev_rise; Uint n; } balance_info; @@ -1884,8 +1916,8 @@ sched_waiting_sys(Uint no, ErtsRunQueue *rq) { ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); ASSERT(rq->waiting >= 0); - rq->flags |= (ERTS_RUNQ_FLG_OUT_OF_WORK - | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK); + ERTS_RUNQ_FLGS_SET(rq, (ERTS_RUNQ_FLG_OUT_OF_WORK + | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK)); rq->waiting++; rq->waiting *= -1; rq->woken = 0; @@ -1961,8 +1993,8 @@ static ERTS_INLINE void sched_waiting(Uint no, ErtsRunQueue *rq) { ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); - rq->flags |= (ERTS_RUNQ_FLG_OUT_OF_WORK - | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK); + ERTS_RUNQ_FLGS_SET(rq, (ERTS_RUNQ_FLG_OUT_OF_WORK + | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK)); if (rq->waiting < 0) rq->waiting--; else @@ -1994,9 +2026,8 @@ ongoing_multi_scheduling_block(void) static ERTS_INLINE void empty_runq(ErtsRunQueue *rq) { - erts_aint32_t oifls = erts_smp_atomic32_read_band_nob(&rq->info_flags, - ~ERTS_RUNQ_IFLG_NONEMPTY); - if (oifls & ERTS_RUNQ_IFLG_NONEMPTY) { + Uint32 old_flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED); + if (old_flags & ERTS_RUNQ_FLG_NONEMPTY) { #ifdef DEBUG erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues); /* @@ -2012,9 +2043,8 @@ empty_runq(ErtsRunQueue *rq) static ERTS_INLINE void non_empty_runq(ErtsRunQueue *rq) { - erts_aint32_t oifls = erts_smp_atomic32_read_bor_nob(&rq->info_flags, - ERTS_RUNQ_IFLG_NONEMPTY); - if (!(oifls & ERTS_RUNQ_IFLG_NONEMPTY)) { + Uint32 old_flags = ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_NONEMPTY); + if (!(old_flags & ERTS_RUNQ_FLG_NONEMPTY)) { #ifdef DEBUG erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues); /* @@ -2627,19 +2657,16 @@ try_inc_no_active_runqs(int active) static ERTS_INLINE int chk_wake_sched(ErtsRunQueue *crq, int ix, int activate) { - erts_aint32_t iflgs; + Uint32 flags; ErtsRunQueue *wrq; if (crq->ix == ix) return 0; wrq = ERTS_RUNQ_IX(ix); - iflgs = erts_smp_atomic32_read_nob(&wrq->info_flags); - if (!(iflgs & (ERTS_RUNQ_IFLG_SUSPENDED|ERTS_RUNQ_IFLG_NONEMPTY))) { + flags = ERTS_RUNQ_FLGS_GET(wrq); + if (!(flags & (ERTS_RUNQ_FLG_SUSPENDED|ERTS_RUNQ_FLG_NONEMPTY))) { if (activate) { - if (try_inc_no_active_runqs(ix+1)) { - erts_smp_xrunq_lock(crq, wrq); - wrq->flags &= ~ERTS_RUNQ_FLG_INACTIVE; - erts_smp_xrunq_unlock(crq, wrq); - } + if (try_inc_no_active_runqs(ix+1)) + ERTS_RUNQ_FLGS_UNSET(wrq, ERTS_RUNQ_FLG_INACTIVE); } wake_scheduler(wrq, 0); return 1; @@ -2706,9 +2733,7 @@ erts_sched_notify_check_cpu_bind(void) int ix; for (ix = 0; ix < erts_no_run_queues; ix++) { ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); - erts_smp_runq_lock(rq); - rq->flags |= ERTS_RUNQ_FLG_CHK_CPU_BIND; - erts_smp_runq_unlock(rq); + ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_CHK_CPU_BIND); wake_scheduler(rq, 0); } #else @@ -2717,409 +2742,547 @@ erts_sched_notify_check_cpu_bind(void) } -#ifdef ERTS_SMP +static ERTS_INLINE void +enqueue_process(ErtsRunQueue *runq, int prio, Process *p) +{ + ErtsRunPrioQueue *rpq; -ErtsRunQueue * -erts_prepare_emigrate(ErtsRunQueue *c_rq, ErtsRunQueueInfo *c_rqi, int prio) + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); + + erts_smp_inc_runq_len(runq, &runq->procs.prio_info[prio], prio); + + if (prio == PRIORITY_LOW) { + p->schedule_count = RESCHEDULE_LOW; + rpq = &runq->procs.prio[PRIORITY_NORMAL]; + } + else { + p->schedule_count = 1; + rpq = &runq->procs.prio[prio]; + } + + p->next = NULL; + if (rpq->last) + rpq->last->next = p; + else + rpq->first = p; + rpq->last = p; +} + + +static ERTS_INLINE void +unqueue_process(ErtsRunQueue *runq, + ErtsRunPrioQueue *rpq, + ErtsRunQueueInfo *rqi, + int prio, + Process *prev_proc, + Process *proc) { - ASSERT(ERTS_CHK_RUNQ_FLG_EMIGRATE(c_rq->flags, prio)); - ASSERT(ERTS_CHK_RUNQ_FLG_EVACUATE(c_rq->flags, prio) - || c_rqi->len >= c_rqi->migrate.limit.this); + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); - while (1) { - ErtsRunQueue *n_rq = c_rqi->migrate.runq; - ERTS_DBG_VERIFY_VALID_RUNQP(n_rq); - erts_smp_xrunq_lock(c_rq, n_rq); - - /* - * erts_smp_xrunq_lock() may release lock on c_rq! We have - * to check that we still want to emigrate and emigrate - * to the same run queue as before. - */ + if (prev_proc) + prev_proc->next = proc->next; + else + rpq->first = proc->next; + if (!proc->next) + rpq->last = prev_proc; - if (ERTS_CHK_RUNQ_FLG_EMIGRATE(c_rq->flags, prio)) { - Uint32 force = (ERTS_CHK_RUNQ_FLG_EVACUATE(c_rq->flags, prio) - | (c_rq->flags & ERTS_RUNQ_FLG_INACTIVE)); - if (force || c_rqi->len > c_rqi->migrate.limit.this) { - ErtsRunQueueInfo *n_rqi; - /* We still want to emigrate */ - - if (n_rq != c_rqi->migrate.runq) { - /* Ahh... run queue changed; need to do it all over again... */ - erts_smp_runq_unlock(n_rq); - continue; - } - else { + if (!rpq->first) + rpq->last = NULL; - if (prio == ERTS_PORT_PRIO_LEVEL) - n_rqi = &n_rq->ports.info; - else - n_rqi = &n_rq->procs.prio_info[prio]; + erts_smp_dec_runq_len(runq, rqi, prio); +} - if (force || (n_rqi->len < c_rqi->migrate.limit.other)) { - /* emigrate ... */ - return n_rq; - } - } - } - } - ASSERT(n_rq != c_rq); - erts_smp_runq_unlock(n_rq); - if (!(c_rq->flags & ERTS_RUNQ_FLG_INACTIVE)) { - /* No more emigrations to this runq */ - ERTS_UNSET_RUNQ_FLG_EMIGRATE(c_rq->flags, prio); - ERTS_DBG_SET_INVALID_RUNQP(c_rqi->migrate.runq, 0x3); - } +static ERTS_INLINE Process * +dequeue_process(ErtsRunQueue *runq, int prio_q, erts_aint32_t *statep) +{ + erts_aint32_t state; + int prio; + ErtsRunPrioQueue *rpq; + ErtsRunQueueInfo *rqi; + Process *p; + + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); + + ASSERT(PRIORITY_NORMAL == prio_q + || PRIORITY_HIGH == prio_q + || PRIORITY_MAX == prio_q); + rpq = &runq->procs.prio[prio_q]; + p = rpq->first; + if (!p) return NULL; + + ERTS_SMP_DATA_DEPENDENCY_READ_MEMORY_BARRIER; + + state = erts_smp_atomic32_read_nob(&p->state); + if (statep) + *statep = state; + + prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + + rqi = &runq->procs.prio_info[prio]; + + if (p) + unqueue_process(runq, rpq, rqi, prio, NULL, p); + + return p; +} + +static ERTS_INLINE int +check_requeue_process(ErtsRunQueue *rq, int prio_q) +{ + ErtsRunPrioQueue *rpq = &rq->procs.prio[prio_q]; + Process *p = rpq->first; + if (--p->schedule_count > 0 && p != rpq->last) { + /* reschedule */ + rpq->first = p->next; + rpq->last->next = p; + rpq->last = p; + p->next = NULL; + return 1; + } + return 0; +} + +#ifdef ERTS_SMP + +static ErtsRunQueue * +check_immigration_need(ErtsRunQueue *c_rq, ErtsMigrationPath *mp, int prio) +{ + int len; + Uint32 f_flags, f_rq_flags; + ErtsRunQueue *f_rq; + + f_flags = mp->prio[prio].flags; + + ASSERT(ERTS_CHK_RUNQ_FLG_IMMIGRATE(mp->flags, prio)); + + f_rq = mp->prio[prio].runq; + if (!f_rq) + return NULL; + + f_rq_flags = ERTS_RUNQ_FLGS_GET(f_rq); + if (f_rq_flags & ERTS_RUNQ_FLG_PROTECTED) + return NULL; + + if (ERTS_CHK_RUNQ_FLG_EVACUATE(f_flags, prio)) + return f_rq; + + if (f_rq_flags & ERTS_RUNQ_FLG_INACTIVE) + return f_rq; + + if (prio == ERTS_PORT_PRIO_LEVEL) + len = RUNQ_READ_LEN(&c_rq->ports.info.len); + else + len = RUNQ_READ_LEN(&c_rq->procs.prio_info[prio].len); + + if (len < mp->prio[prio].limit.this) { + if (prio == ERTS_PORT_PRIO_LEVEL) + len = RUNQ_READ_LEN(&f_rq->ports.info.len); + else + len = RUNQ_READ_LEN(&f_rq->procs.prio_info[prio].len); + + if (len > mp->prio[prio].limit.other) + return f_rq; } + return NULL; } static void -immigrate(ErtsRunQueue *rq) +immigrate(ErtsRunQueue *c_rq, ErtsMigrationPath *mp) { - int prio; + Uint32 iflags, iflag; + erts_smp_runq_unlock(c_rq); - ASSERT(rq->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK); + ASSERT(erts_thr_progress_is_managed_thread()); - for (prio = 0; prio < ERTS_NO_PRIO_LEVELS; prio++) { - if (ERTS_CHK_RUNQ_FLG_IMMIGRATE(rq->flags, prio)) { - ErtsRunQueueInfo *rqi = (prio == ERTS_PORT_PRIO_LEVEL - ? &rq->ports.info - : &rq->procs.prio_info[prio]); - ErtsRunQueue *from_rq = rqi->migrate.runq; - int rq_locked, from_rq_locked; + iflags = mp->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK; - ERTS_DBG_VERIFY_VALID_RUNQP(from_rq); + iflag = iflags & -iflags; - rq_locked = 1; - from_rq_locked = 1; - erts_smp_xrunq_lock(rq, from_rq); - /* - * erts_smp_xrunq_lock() may release lock on rq! We have - * to check that we still want to immigrate from the same - * run queue as before. - */ - if (ERTS_CHK_RUNQ_FLG_IMMIGRATE(rq->flags, prio) - && from_rq == rqi->migrate.runq) { - ErtsRunQueueInfo *from_rqi = (prio == ERTS_PORT_PRIO_LEVEL - ? &from_rq->ports.info - : &from_rq->procs.prio_info[prio]); - if ((ERTS_CHK_RUNQ_FLG_EVACUATE(rq->flags, prio) - && ERTS_CHK_RUNQ_FLG_EVACUATE(from_rq->flags, prio) - && from_rqi->len) - || (from_rqi->len > rqi->migrate.limit.other - && rqi->len < rqi->migrate.limit.this)) { - if (prio == ERTS_PORT_PRIO_LEVEL) { - Port *prt = from_rq->ports.start; - if (prt) { - int prt_locked = 0; - (void) erts_port_migrate(prt, &prt_locked, - from_rq, &from_rq_locked, - rq, &rq_locked); - if (prt_locked) - erts_smp_port_unlock(prt); - } - } - else { - Process *proc; - ErtsRunPrioQueue *from_rpq; - from_rpq = (prio == PRIORITY_LOW - ? &from_rq->procs.prio[PRIORITY_NORMAL] - : &from_rq->procs.prio[prio]); - for (proc = from_rpq->first; proc; proc = proc->next) - if (proc->prio == prio && !proc->bound_runq) - break; - if (proc) { - ErtsProcLocks proc_locks = 0; - (void) erts_proc_migrate(proc, &proc_locks, - from_rq, &from_rq_locked, - rq, &rq_locked); - if (proc_locks) - erts_smp_proc_unlock(proc, proc_locks); - } + while (iflag) { + ErtsRunQueue *rq; + int prio; + + switch (iflag) { + case (MAX_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT): + prio = PRIORITY_MAX; + break; + case (HIGH_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT): + prio = PRIORITY_HIGH; + break; + case (NORMAL_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT): + prio = PRIORITY_NORMAL; + break; + case (LOW_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT): + prio = PRIORITY_LOW; + break; + case (PORT_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT): + prio = ERTS_PORT_PRIO_LEVEL; + break; + default: + erl_exit(ERTS_ABORT_EXIT, + "%s:%d:%s(): Invalid immigrate queue mask", + __FILE__, __LINE__, __func__); + prio = 0; + break; + } + + iflags &= ~iflag; + iflag = iflags & -iflags; + + rq = check_immigration_need(c_rq, mp, prio); + if (rq) { + erts_smp_runq_lock(rq); + if (prio == ERTS_PORT_PRIO_LEVEL) { + Port *prt; + prt = erts_dequeue_port(rq); + if (prt) + RUNQ_SET_RQ(&prt->run_queue, c_rq); + erts_smp_runq_unlock(rq); + if (prt) { + /* port might terminate while we have no lock... */ + rq = erts_port_runq(prt); + if (rq) { + if (rq != c_rq) + erl_exit(ERTS_ABORT_EXIT, + "%s:%d:%s(): Internal error", + __FILE__, __LINE__, __func__); + erts_enqueue_port(c_rq, prt); + if (!iflag) + return; /* done */ + erts_smp_runq_unlock(c_rq); } } - else { - ERTS_UNSET_RUNQ_FLG_IMMIGRATE(rq->flags, prio); - ERTS_DBG_SET_INVALID_RUNQP(rqi->migrate.runq, 0x1); + } + else { + ErtsRunPrioQueue *rpq = &rq->procs.prio[prio == PRIORITY_LOW + ? PRIORITY_NORMAL + : prio]; + Process *prev_proc = NULL; + Process *proc = rpq->first; + int rq_locked = 1; + + while (proc) { + erts_aint32_t state; + state = erts_smp_atomic32_read_acqb(&proc->state); + if (!(ERTS_PSFLG_BOUND & state) + && (prio == (int) (ERTS_PSFLG_PRIO_MASK & state))) { + ErtsRunQueueInfo *rqi = &rq->procs.prio_info[prio]; + unqueue_process(rq, rpq, rqi, prio, prev_proc, proc); + erts_smp_runq_unlock(rq); + RUNQ_SET_RQ(&proc->run_queue, c_rq); + rq_locked = 0; + + erts_smp_runq_lock(c_rq); + enqueue_process(c_rq, prio, proc); + if (!iflag) + return; /* done */ + erts_smp_runq_unlock(c_rq); + break; + } + prev_proc = proc; + proc = proc->next; } + if (rq_locked) + erts_smp_runq_unlock(rq); } - if (from_rq_locked) - erts_smp_runq_unlock(from_rq); - if (!rq_locked) - erts_smp_runq_lock(rq); } } + + erts_smp_runq_lock(c_rq); +} + +static ERTS_INLINE void +suspend_run_queue(ErtsRunQueue *rq) +{ + erts_smp_atomic32_read_bor_nob(&rq->scheduler->ssi->flags, + ERTS_SSI_FLG_SUSPENDED); + ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_SUSPENDED); + + wake_scheduler(rq, 0); +} + +static void scheduler_ix_resume_wake(Uint ix); + +static ERTS_INLINE void +resume_run_queue(ErtsRunQueue *rq) +{ + int pix; + + erts_smp_runq_lock(rq); + + (void) ERTS_RUNQ_FLGS_MASK_SET(rq, + (ERTS_RUNQ_FLG_OUT_OF_WORK + | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK + | ERTS_RUNQ_FLG_SUSPENDED), + (ERTS_RUNQ_FLG_OUT_OF_WORK + | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK)); + + rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; + for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) { + rq->procs.prio_info[pix].max_len = 0; + rq->procs.prio_info[pix].reds = 0; + } + rq->ports.info.max_len = 0; + rq->ports.info.reds = 0; + rq->max_len = 0; + + erts_smp_runq_unlock(rq); + + scheduler_ix_resume_wake(rq->ix); } +typedef struct { + Process *first; + Process *last; +} ErtsStuckBoundProcesses; + static void -evacuate_run_queue(ErtsRunQueue *evac_rq, ErtsRunQueue *rq) +schedule_bound_processes(ErtsRunQueue *rq, + ErtsStuckBoundProcesses *sbpp) { - Port *prt; - int notify_to_rq = 0; - int prio; - int prt_locked = 0; - int rq_locked = 0; - int evac_rq_locked = 1; - ErtsMigrateResult mres; + Process *proc, *next; + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); + + proc = sbpp->first; + while (proc) { + erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state); + next = proc->next; + enqueue_process(rq, (int) (ERTS_PSFLG_PRIO_MASK & state), proc); + proc = next; + } +} - erts_smp_runq_lock(evac_rq); +static void +evacuate_run_queue(ErtsRunQueue *rq, + ErtsStuckBoundProcesses *sbpp) +{ + int prio_q; + ErtsRunQueue *to_rq; + ErtsMigrationPaths *mps; + ErtsMigrationPath *mp; - erts_smp_atomic32_read_bor_nob(&evac_rq->scheduler->ssi->flags, - ERTS_SSI_FLG_SUSPENDED); + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); - evac_rq->flags &= ~ERTS_RUNQ_FLGS_IMMIGRATE_QMASK; - evac_rq->flags |= (ERTS_RUNQ_FLGS_EMIGRATE_QMASK - | ERTS_RUNQ_FLGS_EVACUATE_QMASK - | ERTS_RUNQ_FLG_SUSPENDED); + ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED); - erts_smp_atomic32_read_bor_nob(&evac_rq->info_flags, ERTS_RUNQ_IFLG_SUSPENDED); - /* - * Need to set up evacuation paths first since we - * may release the run queue lock on evac_rq - * when evacuating. - */ - evac_rq->misc.evac_runq = rq; - evac_rq->ports.info.migrate.runq = rq; - for (prio = 0; prio < ERTS_NO_PROC_PRIO_LEVELS; prio++) - evac_rq->procs.prio_info[prio].migrate.runq = rq; + mps = erts_get_migration_paths_managed(); + mp = &mps->mpath[rq->ix]; /* Evacuate scheduled misc ops */ - if (evac_rq->misc.start) { - rq_locked = 1; - erts_smp_xrunq_lock(evac_rq, rq); - if (rq->misc.end) - rq->misc.end->next = evac_rq->misc.start; + if (rq->misc.start) { + ErtsMiscOpList *start, *end; + + to_rq = mp->misc_evac_runq; + if (!to_rq) + return; + + start = rq->misc.start; + end = rq->misc.end; + rq->misc.start = NULL; + rq->misc.end = NULL; + erts_smp_runq_unlock(rq); + + erts_smp_runq_lock(to_rq); + if (to_rq->misc.end) + to_rq->misc.end->next = start; else - rq->misc.start = evac_rq->misc.start; - rq->misc.end = evac_rq->misc.end; - evac_rq->misc.start = NULL; - evac_rq->misc.end = NULL; + to_rq->misc.start = start; + + to_rq->misc.end = end; + erts_smp_runq_unlock(to_rq); + smp_notify_inc_runq(to_rq); + erts_smp_runq_lock(to_rq); } - /* Evacuate scheduled ports */ - prt = evac_rq->ports.start; - while (prt) { - mres = erts_port_migrate(prt, &prt_locked, - evac_rq, &evac_rq_locked, - rq, &rq_locked); - if (mres == ERTS_MIGRATE_SUCCESS) - notify_to_rq = 1; - if (prt_locked) - erts_smp_port_unlock(prt); - if (!evac_rq_locked) { - evac_rq_locked = 1; - erts_smp_runq_lock(evac_rq); + if (rq->ports.start) { + Port *prt; + + to_rq = mp->prio[ERTS_PORT_PRIO_LEVEL].runq; + if (!to_rq) + return; + + /* Evacuate scheduled ports */ + prt = rq->ports.start; + while (prt) { + ErtsRunQueue *prt_rq; + prt = erts_dequeue_port(rq); + RUNQ_SET_RQ(&prt->run_queue, to_rq); + erts_smp_runq_unlock(rq); + /* + * The port might terminate while + * we have no lock on it... + */ + prt_rq = erts_port_runq(prt); + if (prt_rq) { + if (prt_rq != to_rq) + erl_exit(ERTS_ABORT_EXIT, + "%s:%d:%s() internal error\n", + __FILE__, __LINE__, __func__); + erts_enqueue_port(to_rq, prt); + erts_smp_runq_unlock(to_rq); + } + erts_smp_runq_lock(rq); + prt = rq->ports.start; } - prt = evac_rq->ports.start; + smp_notify_inc_runq(to_rq); } /* Evacuate scheduled processes */ - for (prio = 0; prio < ERTS_NO_PROC_PRIO_LEVELS; prio++) { + for (prio_q = 0; prio_q < ERTS_NO_PROC_PRIO_QUEUES; prio_q++) { + erts_aint32_t state; Process *proc; + int notify = 0; + to_rq = NULL; - switch (prio) { - case PRIORITY_MAX: - case PRIORITY_HIGH: - case PRIORITY_NORMAL: - proc = evac_rq->procs.prio[prio].first; - while (proc) { - ErtsProcLocks proc_locks = 0; - - /* Bound processes are stuck... */ - while (proc->bound_runq) { - proc = proc->next; - if (!proc) - goto end_of_proc; - } - - mres = erts_proc_migrate(proc, &proc_locks, - evac_rq, &evac_rq_locked, - rq, &rq_locked); - if (mres == ERTS_MIGRATE_SUCCESS) - notify_to_rq = 1; - if (proc_locks) - erts_smp_proc_unlock(proc, proc_locks); - if (!evac_rq_locked) { - erts_smp_runq_lock(evac_rq); - evac_rq_locked = 1; - } + if (!mp->prio[prio_q].runq) + return; + if (prio_q == PRIORITY_NORMAL && !mp->prio[PRIORITY_LOW].runq) + return; - proc = evac_rq->procs.prio[prio].first; + proc = dequeue_process(rq, prio_q, &state); + while (proc) { + if (ERTS_PSFLG_BOUND & state) { + /* Bound processes get stuck here... */ + proc->next = NULL; + if (sbpp->last) + sbpp->last->next = proc; + else + sbpp->first = proc; + sbpp->last = proc; } + else { + int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + erts_smp_runq_unlock(rq); - end_of_proc: + to_rq = mp->prio[prio].runq; + RUNQ_SET_RQ(&proc->run_queue, to_rq); -#ifdef DEBUG - for (proc = evac_rq->procs.prio[prio].first; - proc; - proc = proc->next) { - ASSERT(proc->bound_runq); + erts_smp_runq_lock(to_rq); + enqueue_process(to_rq, prio, proc); + erts_smp_runq_unlock(to_rq); + notify = 1; + + erts_smp_runq_lock(rq); } -#endif - break; - case PRIORITY_LOW: - break; - default: - ASSERT(!"Invalid process priority"); - break; + proc = dequeue_process(rq, prio_q, &state); } + if (notify) + smp_notify_inc_runq(to_rq); } - if (rq_locked) - erts_smp_runq_unlock(rq); - - if (evac_rq_locked) - erts_smp_runq_unlock(evac_rq); - - if (notify_to_rq) - smp_notify_inc_runq(rq); - - wake_scheduler(evac_rq, 0); + if (ERTS_EMPTY_RUNQ(rq)) + empty_runq(rq); } static int -try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq) +try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq, Uint32 flags) { - Process *proc; - int vrq_locked; + Uint32 procs_qmask = flags & ERTS_RUNQ_FLGS_PROCS_QMASK; + int max_prio_bit; + ErtsRunPrioQueue *rpq; - if (*rq_lockedp) - erts_smp_xrunq_lock(rq, vrq); - else - erts_smp_runq_lock(vrq); - vrq_locked = 1; + if (*rq_lockedp) { + erts_smp_runq_unlock(rq); + *rq_lockedp = 0; + } + + ERTS_SMP_LC_ASSERT(!erts_smp_lc_runq_is_locked(rq)); - ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp); - ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked); + erts_smp_runq_lock(vrq); if (rq->halt_in_progress) - goto try_steal_port; + goto no_procs; /* * Check for a runnable process to steal... */ - switch (vrq->flags & ERTS_RUNQ_FLGS_PROCS_QMASK) { - case MAX_BIT: - case MAX_BIT|HIGH_BIT: - case MAX_BIT|NORMAL_BIT: - case MAX_BIT|LOW_BIT: - case MAX_BIT|HIGH_BIT|NORMAL_BIT: - case MAX_BIT|HIGH_BIT|LOW_BIT: - case MAX_BIT|NORMAL_BIT|LOW_BIT: - case MAX_BIT|HIGH_BIT|NORMAL_BIT|LOW_BIT: - for (proc = vrq->procs.prio[PRIORITY_MAX].last; - proc; - proc = proc->prev) { - if (!proc->bound_runq) - break; - } - if (proc) + while (procs_qmask) { + Process *prev_proc; + Process *proc; + + max_prio_bit = procs_qmask & -procs_qmask; + switch (max_prio_bit) { + case MAX_BIT: + rpq = &vrq->procs.prio[PRIORITY_MAX]; break; - case HIGH_BIT: - case HIGH_BIT|NORMAL_BIT: - case HIGH_BIT|LOW_BIT: - case HIGH_BIT|NORMAL_BIT|LOW_BIT: - for (proc = vrq->procs.prio[PRIORITY_HIGH].last; - proc; - proc = proc->prev) { - if (!proc->bound_runq) - break; - } - if (proc) + case HIGH_BIT: + rpq = &vrq->procs.prio[PRIORITY_HIGH]; break; - case NORMAL_BIT: - case LOW_BIT: - case NORMAL_BIT|LOW_BIT: - for (proc = vrq->procs.prio[PRIORITY_NORMAL].last; - proc; - proc = proc->prev) { - if (!proc->bound_runq) - break; - } - if (proc) + case NORMAL_BIT: + case LOW_BIT: + rpq = &vrq->procs.prio[PRIORITY_NORMAL]; break; - case 0: - proc = NULL; - break; - default: - ASSERT(!"Invalid queue mask"); - proc = NULL; - break; - } + case 0: + goto no_procs; + default: + ASSERT(!"Invalid queue mask"); + goto no_procs; + } - if (proc) { - ErtsProcLocks proc_locks = 0; - int res; - ErtsMigrateResult mres; - mres = erts_proc_migrate(proc, &proc_locks, - vrq, &vrq_locked, - rq, rq_lockedp); - if (proc_locks) - erts_smp_proc_unlock(proc, proc_locks); - res = !0; - switch (mres) { - case ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED: - res = 0; - case ERTS_MIGRATE_SUCCESS: - if (vrq_locked) + prev_proc = NULL; + proc = rpq->first; + + while (proc) { + erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state); + if (!(ERTS_PSFLG_BOUND & state)) { + /* Steal process */ + int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + ErtsRunQueueInfo *rqi = &vrq->procs.prio_info[prio]; + unqueue_process(vrq, rpq, rqi, prio, prev_proc, proc); erts_smp_runq_unlock(vrq); - return res; - default: /* Other failures */ - break; - } - } + RUNQ_SET_RQ(&proc->run_queue, rq); - ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp); - ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked); + erts_smp_runq_lock(rq); + *rq_lockedp = 1; + enqueue_process(rq, prio, proc); + return !0; + } + prev_proc = proc; + proc = proc->next; + } - if (!vrq_locked) { - if (*rq_lockedp) - erts_smp_xrunq_lock(rq, vrq); - else - erts_smp_runq_lock(vrq); - vrq_locked = 1; + procs_qmask &= ~max_prio_bit; } - try_steal_port: +no_procs: - ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp); - ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked); + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(vrq)); /* * Check for a runnable port to steal... */ - if (vrq->ports.info.len) { - Port *prt = vrq->ports.end; - int prt_locked = 0; - int res; - ErtsMigrateResult mres; - - mres = erts_port_migrate(prt, &prt_locked, - vrq, &vrq_locked, - rq, rq_lockedp); - if (prt_locked) - erts_smp_port_unlock(prt); - res = !0; - switch (mres) { - case ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED: - res = 0; - case ERTS_MIGRATE_SUCCESS: - if (vrq_locked) - erts_smp_runq_unlock(vrq); - return res; - default: /* Other failures */ - break; + if (vrq->ports.start) { + ErtsRunQueue *prt_rq; + Port *prt = erts_dequeue_port(vrq); + RUNQ_SET_RQ(&prt->run_queue, rq); + erts_smp_runq_unlock(vrq); + + /* + * The port might terminate while + * we have no lock on it... + */ + + prt_rq = erts_port_runq(prt); + if (!prt_rq) + return 0; + else { + if (prt_rq != rq) + erl_exit(ERTS_ABORT_EXIT, + "%s:%d:%s() internal error\n", + __FILE__, __LINE__, __func__); + *rq_lockedp = 1; + erts_enqueue_port(rq, prt); + return !0; } } - if (vrq_locked) - erts_smp_runq_unlock(vrq); + erts_smp_runq_unlock(vrq); return 0; } @@ -3129,9 +3292,10 @@ static ERTS_INLINE int check_possible_steal_victim(ErtsRunQueue *rq, int *rq_lockedp, int vix) { ErtsRunQueue *vrq = ERTS_RUNQ_IX(vix); - erts_aint32_t iflgs = erts_smp_atomic32_read_nob(&vrq->info_flags); - if (iflgs & ERTS_RUNQ_IFLG_NONEMPTY) - return try_steal_task_from_victim(rq, rq_lockedp, vrq); + Uint32 flags = ERTS_RUNQ_FLGS_GET(vrq); + if ((flags & (ERTS_RUNQ_FLG_NONEMPTY + | ERTS_RUNQ_FLG_PROTECTED)) == ERTS_RUNQ_FLG_NONEMPTY) + return try_steal_task_from_victim(rq, rq_lockedp, vrq, flags); else return 0; } @@ -3141,15 +3305,12 @@ static int try_steal_task(ErtsRunQueue *rq) { int res, rq_locked, vix, active_rqs, blnc_rqs; + Uint32 flags; - /* - * We are not allowed to steal jobs to this run queue - * if it is suspended. Note that it might get suspended - * at any time when we don't have the lock on the run - * queue. - */ - if (rq->flags & ERTS_RUNQ_FLG_SUSPENDED) - return 0; + /* Protect jobs we steal from getting stolen from us... */ + flags = ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_PROTECTED); + if (flags & ERTS_RUNQ_FLG_SUSPENDED) + return 0; /* go suspend instead... */ res = 0; rq_locked = 1; @@ -3268,12 +3429,123 @@ do { \ ASSERT(sum__ == (RQ)->full_reds_history_sum); \ } while (0); +#define ERTS_PRE_ALLOCED_MPATHS 8 + +erts_atomic_t erts_migration_paths; + +static struct { + size_t size; + ErtsMigrationPaths *freelist; + struct { + ErtsMigrationPaths *first; + ErtsMigrationPaths *last; + } retired; +} mpaths; + +static void +init_migration_paths(void) +{ + int qix, i; + char *p; + ErtsMigrationPaths *mps; + + mpaths.size = sizeof(ErtsMigrationPaths); + mpaths.size += sizeof(ErtsMigrationPath)*(erts_no_schedulers-1); + mpaths.size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(mpaths.size); + + p = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_LL_MPATHS, + (mpaths.size + * ERTS_PRE_ALLOCED_MPATHS)); + mpaths.freelist = NULL; + for (i = 0; i < ERTS_PRE_ALLOCED_MPATHS-1; i++) { + mps = (ErtsMigrationPaths *) p; + mps->next = mpaths.freelist; + mpaths.freelist = mps; + p += mpaths.size; + } + + mps = (ErtsMigrationPaths *) p; + mps->block = NULL; + for (qix = 0; qix < erts_no_run_queues; qix++) { + int pix; + mps->mpath[qix].flags = 0; + mps->mpath[qix].misc_evac_runq = NULL; + for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { + mps->mpath[qix].prio[pix].limit.this = -1; + mps->mpath[qix].prio[pix].limit.other = -1; + mps->mpath[qix].prio[pix].runq = NULL; + mps->mpath[qix].prio[pix].flags = 0; + } + } + erts_atomic_init_wb(&erts_migration_paths, (erts_aint_t) mps); +} + +static ERTS_INLINE ErtsMigrationPaths * +alloc_mpaths(void) +{ + void *block; + ErtsMigrationPaths *res; + ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&balance_info.update_mtx)); + + res = mpaths.freelist; + if (res) { + mpaths.freelist = res->next; + res->block = NULL; + return res; + } + res = erts_alloc(ERTS_ALC_T_SL_MPATHS, + mpaths.size+ERTS_CACHE_LINE_SIZE); + block = (void *) res; + if (((UWord) res) & ERTS_CACHE_LINE_MASK) + res = (ErtsMigrationPaths *) ((((UWord) res) & ~ERTS_CACHE_LINE_MASK) + + ERTS_CACHE_LINE_SIZE); + res->block = block; + return res; +} + +static ERTS_INLINE void +retire_mpaths(ErtsMigrationPaths *mps) +{ + ErtsThrPrgrVal current; + + ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&balance_info.update_mtx)); + + current = erts_thr_progress_current(); + + while (mpaths.retired.first) { + ErtsMigrationPaths *tmp = mpaths.retired.first; + if (!erts_thr_progress_has_reached_this(current, tmp->thr_prgr)) + break; + mpaths.retired.first = tmp->next; + if (tmp->block) { + erts_free(ERTS_ALC_T_SL_MPATHS, tmp->block); + } + else { + tmp->next = mpaths.freelist; + mpaths.freelist = tmp; + } + } + + if (!mpaths.retired.first) + mpaths.retired.last = NULL; + + mps->thr_prgr = erts_thr_progress_later_than(current); + mps->next = NULL; + + if (mpaths.retired.last) + mpaths.retired.last->next = mps; + else + mpaths.retired.first = mps; + mpaths.retired.last = mps; +} + static void check_balance(ErtsRunQueue *c_rq) { #if ERTS_MAX_PROCESSES >= (1 << 27) # error check_balance() assumes ERTS_MAX_PROCESS < (1 << 27) #endif + ErtsMigrationPaths *new_mpaths, *old_mpaths; ErtsRunQueueBalance avg = {0}; Sint64 scheds_reds, full_scheds_reds; int forced, active, current_active, oowc, half_full_scheds, full_scheds, @@ -3299,9 +3571,9 @@ check_balance(ErtsRunQueue *c_rq) ERTS_FOREACH_RUNQ(rq, { if (rq->waiting) - rq->flags |= ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK; + ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK); else - rq->flags &= ~ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK; + ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK); rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; }); @@ -3343,7 +3615,7 @@ check_balance(ErtsRunQueue *c_rq) ErtsRunQueue *rq = ERTS_RUNQ_IX(qix); erts_smp_runq_lock(rq); - run_queue_info[qix].flags = rq->flags; + run_queue_info[qix].flags = ERTS_RUNQ_FLGS_GET_NOB(rq); for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) { run_queue_info[qix].prio[pix].max_len = rq->procs.prio_info[pix].max_len; @@ -3677,47 +3949,27 @@ erts_fprintf(stderr, "--------------------------------\n"); set_no_active_runqs(active); balance_info.halftime = 1; - erts_smp_atomic32_set_nob(&balance_info.checking_balance, 0); + new_mpaths = alloc_mpaths(); + + /* Write migration paths */ - /* Write migration paths and reset balance statistics in all queues */ for (qix = 0; qix < blnc_no_rqs; qix++) { int mqix; - Uint32 flags; - ErtsRunQueue *rq = ERTS_RUNQ_IX(qix); - ErtsRunQueueInfo *rqi; - flags = run_queue_info[qix].flags; - erts_smp_runq_lock(rq); - flags |= (rq->flags & ~ERTS_RUNQ_FLGS_MIGRATION_INFO); - ASSERT(!(flags & ERTS_RUNQ_FLG_OUT_OF_WORK)); - if (rq->waiting) - flags |= ERTS_RUNQ_FLG_OUT_OF_WORK; - - rq->full_reds_history_sum - = run_queue_info[qix].full_reds_history_sum; - rq->full_reds_history[freds_hist_ix] - = run_queue_info[qix].full_reds_history_change; + Uint32 flags = run_queue_info[qix].flags; + ErtsMigrationPath *mp = &new_mpaths->mpath[qix]; - ERTS_DBG_CHK_FULL_REDS_HISTORY(rq); + mp->flags = flags; + mp->misc_evac_runq = NULL; - rq->out_of_work_count = 0; - rq->flags = flags; - rq->max_len = rq->len; for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { - rqi = (pix == ERTS_PORT_PRIO_LEVEL - ? &rq->ports.info - : &rq->procs.prio_info[pix]); - rqi->max_len = rqi->len; - rqi->reds = 0; if (!(ERTS_CHK_RUNQ_FLG_EMIGRATE(flags, pix) | ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix))) { ASSERT(run_queue_info[qix].prio[pix].immigrate_from < 0); ASSERT(run_queue_info[qix].prio[pix].emigrate_to < 0); -#ifdef DEBUG - rqi->migrate.limit.this = -1; - rqi->migrate.limit.other = -1; - ERTS_DBG_SET_INVALID_RUNQP(rqi->migrate.runq, 0x2); -#endif - + mp->prio[pix].limit.this = -1; + mp->prio[pix].limit.other = -1; + mp->prio[pix].runq = NULL; + mp->prio[pix].flags = 0; } else if (ERTS_CHK_RUNQ_FLG_EMIGRATE(flags, pix)) { ASSERT(!ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix)); @@ -3725,11 +3977,12 @@ erts_fprintf(stderr, "--------------------------------\n"); ASSERT(run_queue_info[qix].prio[pix].emigrate_to >= 0); mqix = run_queue_info[qix].prio[pix].emigrate_to; - rqi->migrate.limit.this + mp->prio[pix].limit.this = run_queue_info[qix].prio[pix].migration_limit; - rqi->migrate.limit.other + mp->prio[pix].limit.other = run_queue_info[mqix].prio[pix].migration_limit; - rqi->migrate.runq = ERTS_RUNQ_IX(mqix); + mp->prio[pix].runq = ERTS_RUNQ_IX(mqix); + mp->prio[pix].flags = run_queue_info[mqix].flags; } else { ASSERT(ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix)); @@ -3737,24 +3990,142 @@ erts_fprintf(stderr, "--------------------------------\n"); ASSERT(run_queue_info[qix].prio[pix].immigrate_from >= 0); mqix = run_queue_info[qix].prio[pix].immigrate_from; - rqi->migrate.limit.this + mp->prio[pix].limit.this = run_queue_info[qix].prio[pix].migration_limit; - rqi->migrate.limit.other + mp->prio[pix].limit.other = run_queue_info[mqix].prio[pix].migration_limit; - rqi->migrate.runq = ERTS_RUNQ_IX(mqix); + mp->prio[pix].runq = ERTS_RUNQ_IX(mqix); + mp->prio[pix].flags = run_queue_info[mqix].flags; } } + } + + old_mpaths = erts_get_migration_paths_managed(); + + /* Keep offline run-queues as is */ + for (qix = blnc_no_rqs; qix < erts_no_schedulers; qix++) { + ErtsMigrationPath *nmp = &new_mpaths->mpath[qix]; + ErtsMigrationPath *omp = &old_mpaths->mpath[qix]; + + nmp->flags = omp->flags; + nmp->misc_evac_runq = omp->misc_evac_runq; + + for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { + nmp->prio[pix].limit.this = omp->prio[pix].limit.this; + nmp->prio[pix].limit.other = omp->prio[pix].limit.other; + nmp->prio[pix].runq = omp->prio[pix].runq; + nmp->prio[pix].flags = omp->prio[pix].flags; + } + } + + + /* Publish new migration paths... */ + erts_atomic_set_wb(&erts_migration_paths, (erts_aint_t) new_mpaths); + + /* Reset balance statistics in all online queues */ + for (qix = 0; qix < blnc_no_rqs; qix++) { + Uint32 flags = run_queue_info[qix].flags; + ErtsRunQueue *rq = ERTS_RUNQ_IX(qix); + + erts_smp_runq_lock(rq); + ASSERT(!(flags & ERTS_RUNQ_FLG_OUT_OF_WORK)); + if (rq->waiting) + flags |= ERTS_RUNQ_FLG_OUT_OF_WORK; + + rq->full_reds_history_sum + = run_queue_info[qix].full_reds_history_sum; + rq->full_reds_history[freds_hist_ix] + = run_queue_info[qix].full_reds_history_change; + + ERTS_DBG_CHK_FULL_REDS_HISTORY(rq); + + rq->out_of_work_count = 0; + (void) ERTS_RUNQ_FLGS_MASK_SET(rq, ERTS_RUNQ_FLGS_MIGRATION_INFO, flags); + + rq->max_len = rq->len; + for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { + ErtsRunQueueInfo *rqi; + rqi = (pix == ERTS_PORT_PRIO_LEVEL + ? &rq->ports.info + : &rq->procs.prio_info[pix]); + erts_smp_reset_max_len(rq, rqi); + rqi->reds = 0; + } rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; erts_smp_runq_unlock(rq); } + erts_smp_atomic32_set_nob(&balance_info.checking_balance, 0); + balance_info.n++; + retire_mpaths(old_mpaths); erts_smp_mtx_unlock(&balance_info.update_mtx); erts_smp_runq_lock(c_rq); } +static void +change_no_used_runqs(int used) +{ + ErtsMigrationPaths *new_mpaths, *old_mpaths; + int active, qix; + erts_smp_mtx_lock(&balance_info.update_mtx); + get_no_runqs(&active, NULL); + set_no_used_runqs(used); + + old_mpaths = erts_get_migration_paths_managed(); + new_mpaths = alloc_mpaths(); + + /* Write migration paths... */ + + for (qix = 0; qix < used; qix++) { + int pix; + ErtsMigrationPath *omp = &old_mpaths->mpath[qix]; + ErtsMigrationPath *nmp = &new_mpaths->mpath[qix]; + + nmp->flags = omp->flags & ~ERTS_RUNQ_FLGS_MIGRATION_QMASKS; + nmp->misc_evac_runq = NULL; + + for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { + nmp->prio[pix].limit.this = -1; + nmp->prio[pix].limit.other = -1; + nmp->prio[pix].runq = NULL; + nmp->prio[pix].flags = 0; + } + } + for (qix = used; qix < erts_no_run_queues; qix++) { + int pix; + ErtsRunQueue *to_rq = ERTS_RUNQ_IX(qix % used); + ErtsMigrationPath *nmp = &new_mpaths->mpath[qix]; + + nmp->flags = (ERTS_RUNQ_FLGS_EMIGRATE_QMASK + | ERTS_RUNQ_FLGS_EVACUATE_QMASK); + nmp->misc_evac_runq = to_rq; + for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) { + nmp->prio[pix].limit.this = -1; + nmp->prio[pix].limit.other = -1; + nmp->prio[pix].runq = to_rq; + nmp->prio[pix].flags = 0; + } + } + + /* ... and publish them. */ + erts_atomic_set_wb(&erts_migration_paths, (erts_aint_t) new_mpaths); + + retire_mpaths(old_mpaths); + + /* Make sure that we balance soon... */ + balance_info.forced_check_balance = 1; + + erts_smp_mtx_unlock(&balance_info.update_mtx); + + erts_smp_runq_lock(ERTS_RUNQ_IX(0)); + ERTS_RUNQ_IX(0)->check_balance_reds = 0; + erts_smp_runq_unlock(ERTS_RUNQ_IX(0)); +} + + #endif /* #ifdef ERTS_SMP */ Uint @@ -3846,7 +4217,6 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online) ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); rq->ix = ix; - erts_smp_atomic32_init_nob(&rq->info_flags, ERTS_RUNQ_IFLG_NONEMPTY); /* make sure that the "extra" id correponds to the schedulers * id if the esdp->no <-> ix+1 mapping change. @@ -3857,7 +4227,7 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online) rq->waiting = 0; rq->woken = 0; - rq->flags = 0; + ERTS_RUNQ_FLGS_INIT(rq, ERTS_RUNQ_FLG_NONEMPTY); rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; rq->full_reds_history_sum = 0; for (rix = 0; rix < ERTS_FULL_REDS_HISTORY_SIZE; rix++) { @@ -3871,19 +4241,14 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online) rq->wakeup_other_reds = 0; rq->halt_in_progress = 0; - rq->procs.len = 0; rq->procs.pending_exiters = NULL; rq->procs.context_switches = 0; rq->procs.reductions = 0; for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) { - rq->procs.prio_info[pix].len = 0; + erts_smp_atomic32_init_nob(&rq->procs.prio_info[pix].len, 0); rq->procs.prio_info[pix].max_len = 0; rq->procs.prio_info[pix].reds = 0; - rq->procs.prio_info[pix].migrate.limit.this = 0; - rq->procs.prio_info[pix].migrate.limit.other = 0; - ERTS_DBG_SET_INVALID_RUNQP(rq->procs.prio_info[pix].migrate.runq, - 0x0); if (pix < ERTS_NO_PROC_PRIO_LEVELS - 1) { rq->procs.prio[pix].first = NULL; rq->procs.prio[pix].last = NULL; @@ -3892,14 +4257,10 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online) rq->misc.start = NULL; rq->misc.end = NULL; - rq->misc.evac_runq = NULL; - rq->ports.info.len = 0; + erts_smp_atomic32_init_nob(&rq->ports.info.len, 0); rq->ports.info.max_len = 0; rq->ports.info.reds = 0; - rq->ports.info.migrate.limit.this = 0; - rq->ports.info.migrate.limit.other = 0; - rq->ports.info.migrate.runq = NULL; rq->ports.start = NULL; rq->ports.end = NULL; } @@ -4022,10 +4383,12 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online) balance_info.prev_rise.reds = 0; balance_info.n = 0; + init_migration_paths(); + if (no_schedulers_online < no_schedulers) { + change_no_used_runqs(no_schedulers_online); for (ix = no_schedulers_online; ix < erts_no_run_queues; ix++) - evacuate_run_queue(ERTS_RUNQ_IX(ix), - ERTS_RUNQ_IX(ix % no_schedulers_online)); + suspend_run_queue(ERTS_RUNQ_IX(ix)); } schdlr_sspnd.wait_curr_online = no_schedulers_online; @@ -4088,91 +4451,208 @@ erts_get_scheduler_data(void) #endif -static int remove_proc_from_runq(ErtsRunQueue *rq, Process *p, int to_inactive); +/* + * scheduler_out_process() return with c_rq locked. + */ +static ERTS_INLINE int +schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p) +{ + erts_aint32_t a, e, n; + int res = 0; + + a = state; + + while (1) { + n = e = a; + + ASSERT(a & ERTS_PSFLG_RUNNING); + ASSERT(!(a & ERTS_PSFLG_IN_RUNQ)); + + n &= ~ERTS_PSFLG_RUNNING; + if ((a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) + n |= ERTS_PSFLG_IN_RUNQ; + a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); + if (a == e) + break; + } + + if (!(n & ERTS_PSFLG_IN_RUNQ)) { + if (erts_system_profile_flags.runnable_procs) + profile_runnable_proc(p, am_inactive); + } + else { + int prio = (int) (ERTS_PSFLG_PRIO_MASK & n); + ErtsRunQueue *runq = erts_get_runq_proc(p); + + ASSERT(!(n & ERTS_PSFLG_SUSPENDED)); + +#ifdef ERTS_SMP + if (!(ERTS_PSFLG_BOUND & n)) { + ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio); + if (new_runq) { + RUNQ_SET_RQ(&p->run_queue, new_runq); + runq = new_runq; + } + } +#endif + ASSERT(runq); + res = 1; + + erts_smp_runq_lock(runq); + + /* Enqueue the process */ + enqueue_process(runq, prio, p); + + if (runq == c_rq) + return res; + erts_smp_runq_unlock(runq); + smp_notify_inc_runq(runq); + } + erts_smp_runq_lock(c_rq); + return res; +} static ERTS_INLINE void -suspend_process(ErtsRunQueue *rq, Process *p) +add2runq(Process *p, erts_aint32_t state) { - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); - p->rcount++; /* count number of suspend */ -#ifdef ERTS_SMP - ASSERT(!(p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) - || p == erts_get_current_process()); - ASSERT(p->status != P_RUNNING - || p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING); - if (p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ) - goto runable; -#endif - switch(p->status) { - case P_SUSPENDED: - break; - case P_RUNABLE: + int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + ErtsRunQueue *runq = erts_get_runq_proc(p); + #ifdef ERTS_SMP - runable: - if (!ERTS_PROC_PENDING_EXIT(p)) + if (!(ERTS_PSFLG_BOUND & state)) { + ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio); + if (new_runq) { + RUNQ_SET_RQ(&p->run_queue, new_runq); + runq = new_runq; + } + } #endif - remove_proc_from_runq(rq, p, 1); - /* else: - * leave process in schedq so it will discover the pending exit - */ - p->rstatus = P_RUNABLE; /* wakeup as runnable */ - break; - case P_RUNNING: - p->rstatus = P_RUNABLE; /* wakeup as runnable */ - break; - case P_WAITING: - p->rstatus = P_WAITING; /* wakeup as waiting */ - break; - case P_EXITING: - return; /* ignore this */ - case P_GARBING: - case P_FREE: - erl_exit(1, "bad state in suspend_process()\n"); + ASSERT(runq); + + erts_smp_runq_lock(runq); + + /* Enqueue the process */ + enqueue_process(runq, prio, p); + + erts_smp_runq_unlock(runq); + smp_notify_inc_runq(runq); + +} + +static ERTS_INLINE void +schedule_process(Process *p, erts_aint32_t state, int active_enq) +{ + erts_aint32_t a = state, n; + + while (1) { + erts_aint32_t e; + n = e = a; + ASSERT(!(a & ERTS_PSFLG_FREE)); + n |= ERTS_PSFLG_ACTIVE; + if (!(a & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_RUNNING))) + n |= ERTS_PSFLG_IN_RUNQ; + a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); + if (a == e) + break; + if (!active_enq && (a & ERTS_PSFLG_ACTIVE)) + return; /* Someone else activated process ... */ + } + +#ifdef RRR_DEBUG + if (active_enq) + erts_fprintf(stderr, "! state=0x%x\n", n); +#endif + + if (erts_system_profile_flags.runnable_procs + && !(a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) { + profile_runnable_proc(p, am_active); } - if ((erts_system_profile_flags.runnable_procs) && (p->rcount == 1) && (p->status != P_WAITING)) { - profile_runnable_proc(p, am_inactive); + if ((n & ERTS_PSFLG_IN_RUNQ) && !(a & ERTS_PSFLG_IN_RUNQ)) { +#ifdef RRR_DEBUG + if (active_enq) + erts_fprintf(stderr, "-->\n"); +#endif + add2runq(p, n); } +} - p->status = P_SUSPENDED; - +void +erts_schedule_process(Process *p, erts_aint32_t state) +{ + schedule_process(p, state, 0); } -static ERTS_INLINE void -resume_process(Process *p) +static ERTS_INLINE int +suspend_process(Process *c_p, Process *p) { - Uint32 *statusp; + erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); + int suspended = 0; ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - switch (p->status) { - case P_SUSPENDED: - statusp = &p->status; - break; - case P_GARBING: - if (p->gcstatus == P_SUSPENDED) { - statusp = &p->gcstatus; - break; + + if ((state & ERTS_PSFLG_SUSPENDED)) + suspended = -1; + else { + if (c_p == p) { + state = erts_smp_atomic32_read_bor_relb(&p->state, + ERTS_PSFLG_SUSPENDED); + state |= ERTS_PSFLG_SUSPENDED; + ASSERT(state & ERTS_PSFLG_RUNNING); + suspended = 1; } - /* Fall through */ - default: - return; + else { + while (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_EXITING))) { + erts_aint32_t e, n; + n = e = state; + n |= ERTS_PSFLG_SUSPENDED; + state = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e); + if (state == e) { + state = n; + suspended = 1; + break; + } + } + } + } + + if (state & ERTS_PSFLG_SUSPENDED) { + + ASSERT(!(ERTS_PSFLG_RUNNING & state) + || p == erts_get_current_process()); + + if (erts_system_profile_flags.runnable_procs + && (p->rcount == 0) + && (state & ERTS_PSFLG_ACTIVE)) { + profile_runnable_proc(p, am_inactive); + } + + p->rcount++; /* count number of suspend */ } + return suspended; +} + +static ERTS_INLINE void +resume_process(Process *p) +{ + erts_aint32_t state; + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); ASSERT(p->rcount > 0); if (--p->rcount > 0) /* multiple suspend */ return; - switch(p->rstatus) { - case P_RUNABLE: - erts_add_to_runq(p); - break; - case P_WAITING: - *statusp = P_WAITING; - break; - default: - erl_exit(1, "bad state in resume_process()\n"); + + state = erts_smp_atomic32_read_band_mb(&p->state, ~ERTS_PSFLG_SUSPENDED); + state &= ~ERTS_PSFLG_SUSPENDED; +#ifdef RRR_DEBUG + erts_fprintf(stderr, "%T - state=0x%x\n", p->id, state); +#endif + if ((state & (ERTS_PSFLG_EXITING + | ERTS_PSFLG_ACTIVE + | ERTS_PSFLG_IN_RUNQ + | ERTS_PSFLG_RUNNING)) == ERTS_PSFLG_ACTIVE) { + schedule_process(p, state, 1); } - p->rstatus = P_FREE; } int @@ -4296,6 +4776,7 @@ suspend_scheduler(ErtsSchedulerData *esdp) int wake = 0; erts_aint32_t aux_work; int thr_prgr_active = 1; + ErtsStuckBoundProcesses sbp = {NULL, NULL}; /* * Schedulers may be suspended in two different ways: @@ -4310,6 +4791,8 @@ suspend_scheduler(ErtsSchedulerData *esdp) ASSERT(no != 1); + evacuate_run_queue(esdp->run_queue, &sbp); + erts_smp_runq_unlock(esdp->run_queue); erts_sched_check_cpu_bind_prep_suspend(esdp); @@ -4373,17 +4856,26 @@ suspend_scheduler(ErtsSchedulerData *esdp) erts_smp_mtx_unlock(&schdlr_sspnd.mtx); while (1) { + erts_aint32_t qmask; erts_aint32_t flgs; + qmask = (ERTS_RUNQ_FLGS_GET(esdp->run_queue) + & ERTS_RUNQ_FLGS_QMASK); aux_work = erts_atomic32_read_acqb(&ssi->aux_work); - if (aux_work) { + if (aux_work|qmask) { if (!thr_prgr_active) { erts_thr_progress_active(esdp, thr_prgr_active = 1); sched_wall_time_change(esdp, 1); } - aux_work = handle_aux_work(&esdp->aux_work_data, aux_work); + if (aux_work) + aux_work = handle_aux_work(&esdp->aux_work_data, aux_work); if (aux_work && erts_thr_progress_update(esdp)) erts_thr_progress_leader_update(esdp); + if (qmask) { + erts_smp_runq_lock(esdp->run_queue); + evacuate_run_queue(esdp->run_queue, &sbp); + erts_smp_runq_unlock(esdp->run_queue); + } } if (!aux_work) { @@ -4453,56 +4945,11 @@ suspend_scheduler(ErtsSchedulerData *esdp) erts_smp_runq_lock(esdp->run_queue); non_empty_runq(esdp->run_queue); + schedule_bound_processes(esdp->run_queue, &sbp); + erts_sched_check_cpu_bind_post_suspend(esdp); } -#define ERTS_RUNQ_RESET_SUSPEND_INFO(RQ, DBG_ID) \ -do { \ - int pix__; \ - (RQ)->misc.evac_runq = NULL; \ - (RQ)->ports.info.migrate.runq = NULL; \ - (RQ)->flags &= ~(ERTS_RUNQ_FLGS_IMMIGRATE_QMASK \ - | ERTS_RUNQ_FLGS_EMIGRATE_QMASK \ - | ERTS_RUNQ_FLGS_EVACUATE_QMASK \ - | ERTS_RUNQ_FLG_SUSPENDED); \ - (RQ)->flags |= (ERTS_RUNQ_FLG_OUT_OF_WORK \ - | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK); \ - (RQ)->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; \ - erts_smp_atomic32_read_band_nob(&(RQ)->info_flags, ~ERTS_RUNQ_IFLG_SUSPENDED);\ - for (pix__ = 0; pix__ < ERTS_NO_PROC_PRIO_LEVELS; pix__++) { \ - (RQ)->procs.prio_info[pix__].max_len = 0; \ - (RQ)->procs.prio_info[pix__].reds = 0; \ - ERTS_DBG_SET_INVALID_RUNQP((RQ)->procs.prio_info[pix__].migrate.runq,\ - (DBG_ID)); \ - } \ - (RQ)->ports.info.max_len = 0; \ - (RQ)->ports.info.reds = 0; \ -} while (0) - -#define ERTS_RUNQ_RESET_MIGRATION_PATHS__(RQ) \ -do { \ - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked((RQ))); \ - (RQ)->misc.evac_runq = NULL; \ - (RQ)->ports.info.migrate.runq = NULL; \ - (RQ)->flags &= ~(ERTS_RUNQ_FLGS_IMMIGRATE_QMASK \ - | ERTS_RUNQ_FLGS_EMIGRATE_QMASK \ - | ERTS_RUNQ_FLGS_EVACUATE_QMASK); \ -} while (0) - -#ifdef DEBUG -#define ERTS_RUNQ_RESET_MIGRATION_PATHS(RQ, DBG_ID) \ -do { \ - int pix__; \ - ERTS_RUNQ_RESET_MIGRATION_PATHS__((RQ)); \ - for (pix__ = 0; pix__ < ERTS_NO_PROC_PRIO_LEVELS; pix__++) \ - ERTS_DBG_SET_INVALID_RUNQP((RQ)->procs.prio_info[pix__].migrate.runq,\ - (DBG_ID)); \ -} while (0) -#else -#define ERTS_RUNQ_RESET_MIGRATION_PATHS(RQ, DBG_ID) \ - ERTS_RUNQ_RESET_MIGRATION_PATHS__((RQ)) -#endif - ErtsSchedSuspendResult erts_schedulers_state(Uint *total, Uint *online, @@ -4572,27 +5019,13 @@ erts_set_schedulers_online(Process *p, have_unlocked_plocks = 1; erts_smp_proc_unlock(p, plocks); } - erts_smp_mtx_unlock(&schdlr_sspnd.mtx); - erts_smp_mtx_lock(&balance_info.update_mtx); - for (ix = online; ix < no; ix++) { - ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); - erts_smp_runq_lock(rq); - ERTS_RUNQ_RESET_SUSPEND_INFO(rq, 0x5); - erts_smp_runq_unlock(rq); - scheduler_ix_resume_wake(ix); - } - /* - * Spread evacuation paths among all online - * run queues. - */ - for (ix = no; ix < erts_no_run_queues; ix++) { - ErtsRunQueue *from_rq = ERTS_RUNQ_IX(ix); - ErtsRunQueue *to_rq = ERTS_RUNQ_IX(ix % no); - evacuate_run_queue(from_rq, to_rq); - } - set_no_used_runqs(no); - erts_smp_mtx_unlock(&balance_info.update_mtx); - erts_smp_mtx_lock(&schdlr_sspnd.mtx); + change_no_used_runqs(no); + + for (ix = online; ix < no; ix++) + resume_run_queue(ERTS_RUNQ_IX(ix)); + + for (ix = no; ix < erts_no_run_queues; ix++) + suspend_run_queue(ERTS_RUNQ_IX(ix)); } res = ERTS_SCHDLR_SSPND_DONE; } @@ -4619,25 +5052,11 @@ erts_set_schedulers_online(Process *p, have_unlocked_plocks = 1; erts_smp_proc_unlock(p, plocks); } - erts_smp_mtx_unlock(&schdlr_sspnd.mtx); - erts_smp_mtx_lock(&balance_info.update_mtx); - - for (ix = 0; ix < online; ix++) { - ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); - erts_smp_runq_lock(rq); - ERTS_RUNQ_RESET_MIGRATION_PATHS(rq, 0x6); - erts_smp_runq_unlock(rq); - } - /* - * Evacutation order important! Newly suspended run queues - * has to be evacuated last. - */ - for (ix = erts_no_run_queues-1; ix >= no; ix--) - evacuate_run_queue(ERTS_RUNQ_IX(ix), - ERTS_RUNQ_IX(ix % no)); - set_no_used_runqs(no); - erts_smp_mtx_unlock(&balance_info.update_mtx); - erts_smp_mtx_lock(&schdlr_sspnd.mtx); + + change_no_used_runqs(no); + for (ix = no; ix < erts_no_run_queues; ix++) + suspend_run_queue(ERTS_RUNQ_IX(ix)); + for (ix = no; ix < online; ix++) { ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); wake_scheduler(rq, 0); @@ -4734,25 +5153,14 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all) schdlr_sspnd.msb.wait_active = 2; } - erts_smp_mtx_unlock(&schdlr_sspnd.mtx); - erts_smp_mtx_lock(&balance_info.update_mtx); - set_no_used_runqs(1); - for (ix = 0; ix < online; ix++) { + change_no_used_runqs(1); + for (ix = 1; ix < erts_no_run_queues; ix++) + suspend_run_queue(ERTS_RUNQ_IX(ix)); + + for (ix = 1; ix < online; ix++) { ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); - erts_smp_runq_lock(rq); - ASSERT(!(rq->flags & ERTS_RUNQ_FLG_SUSPENDED)); - ERTS_RUNQ_RESET_MIGRATION_PATHS(rq, 0x7); - erts_smp_runq_unlock(rq); + wake_scheduler(rq, 0); } - /* - * Evacuate all activities in all other run queues - * into the first run queue. Note order is important, - * online run queues has to be evacuated last. - */ - for (ix = erts_no_run_queues-1; ix >= 1; ix--) - evacuate_run_queue(ERTS_RUNQ_IX(ix), ERTS_RUNQ_IX(0)); - erts_smp_mtx_unlock(&balance_info.update_mtx); - erts_smp_mtx_lock(&schdlr_sspnd.mtx); if (erts_smp_atomic32_read_nob(&schdlr_sspnd.active) != schdlr_sspnd.msb.wait_active) { @@ -4796,15 +5204,6 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all) plp = proclist_create(p); plp->next = schdlr_sspnd.msb.procs; schdlr_sspnd.msb.procs = plp; -#ifdef DEBUG - ERTS_FOREACH_RUNQ(srq, - { - if (srq != ERTS_RUNQ_IX(0)) { - ASSERT(ERTS_EMPTY_RUNQ(srq)); - ASSERT(srq->flags & ERTS_RUNQ_FLG_SUSPENDED); - } - }); -#endif ASSERT(p->scheduler_data); } } @@ -4836,27 +5235,6 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all) res = ERTS_SCHDLR_SSPND_DONE_MSCHED_BLOCKED; else { ERTS_SCHDLR_SSPND_CHNG_SET(ERTS_SCHDLR_SSPND_CHNG_MSB, 0); -#ifdef DEBUG - ERTS_FOREACH_RUNQ(rq, - { - if (rq != p->scheduler_data->run_queue) { - if (!ERTS_EMPTY_RUNQ(rq)) { - Process *rp; - int pix; - ASSERT(rq->ports.info.len == 0); - for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) { - for (rp = rq->procs.prio[pix].first; - rp; - rp = rp->next) { - ASSERT(rp->bound_runq); - } - } - } - - ASSERT(rq->flags & ERTS_RUNQ_FLG_SUSPENDED); - } - }); -#endif p->flags &= ~F_HAVE_BLCKD_MSCHED; schdlr_sspnd.msb.ongoing = 0; if (schdlr_sspnd.online == 1) { @@ -4866,35 +5244,19 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all) } else { int online = schdlr_sspnd.online; - erts_smp_mtx_unlock(&schdlr_sspnd.mtx); if (plocks) { have_unlocked_plocks = 1; erts_smp_proc_unlock(p, plocks); } - erts_smp_mtx_lock(&balance_info.update_mtx); + + change_no_used_runqs(online); /* Resume all online run queues */ - for (ix = 1; ix < online; ix++) { - ErtsRunQueue *rq = ERTS_RUNQ_IX(ix); - erts_smp_runq_lock(rq); - ERTS_RUNQ_RESET_SUSPEND_INFO(rq, 0x4); - erts_smp_runq_unlock(rq); - scheduler_ix_resume_wake(ix); - } + for (ix = 1; ix < online; ix++) + resume_run_queue(ERTS_RUNQ_IX(ix)); - /* Spread evacuation paths among all online run queues */ for (ix = online; ix < erts_no_run_queues; ix++) - evacuate_run_queue(ERTS_RUNQ_IX(ix), - ERTS_RUNQ_IX(ix % online)); - - set_no_used_runqs(online); - /* Make sure that we balance soon... */ - balance_info.forced_check_balance = 1; - erts_smp_runq_lock(ERTS_RUNQ_IX(0)); - ERTS_RUNQ_IX(0)->check_balance_reds = 0; - erts_smp_runq_unlock(ERTS_RUNQ_IX(0)); - erts_smp_mtx_unlock(&balance_info.update_mtx); - erts_smp_mtx_lock(&schdlr_sspnd.mtx); + suspend_run_queue(ERTS_RUNQ_IX(ix)); } res = ERTS_SCHDLR_SSPND_DONE; } @@ -5201,10 +5563,7 @@ handle_pend_sync_suspend(Process *suspendee, if (suspender) { ASSERT(is_nil(suspender->suspendee)); if (suspendee_alive) { - ErtsRunQueue *rq = erts_get_runq_proc(suspendee); - erts_smp_runq_lock(rq); - suspend_process(rq, suspendee); - erts_smp_runq_unlock(rq); + erts_suspend(suspendee, suspendee_locks, NULL); suspender->suspendee = suspendee->id; } /* suspender is suspended waiting for suspendee to suspend; @@ -5247,10 +5606,9 @@ pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks, resume_process(rp); } else { - ErtsRunQueue *cp_rq, *rp_rq; rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS, - pid, ERTS_PROC_LOCK_STATUS); + pid, pid_locks|ERTS_PROC_LOCK_STATUS); if (!rp) { c_p->flags &= ~F_P2PNR_RESCHED; @@ -5259,58 +5617,36 @@ pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks, ASSERT(!(c_p->flags & F_P2PNR_RESCHED)); - cp_rq = erts_get_runq_proc(c_p); - rp_rq = erts_get_runq_proc(rp); - erts_smp_runqs_lock(cp_rq, rp_rq); - if (rp->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) { - running: - /* Phiu... */ - - /* - * If we got pending suspenders and suspend ourselves waiting - * to suspend another process we might deadlock. - * In this case we have to yield, be suspended by - * someone else and then do it all over again. - */ - if (!c_p->pending_suspenders) { - /* Mark rp pending for suspend by c_p */ - add_pend_suspend(rp, c_p->id, handle_pend_sync_suspend); - ASSERT(is_nil(c_p->suspendee)); - - /* Suspend c_p; when rp is suspended c_p will be resumed. */ - suspend_process(cp_rq, c_p); - c_p->flags |= F_P2PNR_RESCHED; - } - /* Yield (caller is assumed to yield immediately in bif). */ - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); - rp = ERTS_PROC_LOCK_BUSY; + if (suspend) { + if (suspend_process(c_p, rp)) + goto done; } else { - ErtsProcLocks need_locks = pid_locks & ~ERTS_PROC_LOCK_STATUS; - if (need_locks && erts_smp_proc_trylock(rp, need_locks) == EBUSY) { - erts_smp_runqs_unlock(cp_rq, rp_rq); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); - rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS, - pid, pid_locks|ERTS_PROC_LOCK_STATUS); - if (!rp) - goto done; - /* run-queues may have changed */ - cp_rq = erts_get_runq_proc(c_p); - rp_rq = erts_get_runq_proc(rp); - erts_smp_runqs_lock(cp_rq, rp_rq); - if (rp->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) { - /* Ahh... */ - erts_smp_proc_unlock(rp, - pid_locks & ~ERTS_PROC_LOCK_STATUS); - goto running; - } - } + if (!(ERTS_PSFLG_RUNNING & erts_smp_atomic32_read_acqb(&rp->state))) + goto done; - /* rp is not running and we got the locks we want... */ - if (suspend) - suspend_process(rp_rq, rp); } - erts_smp_runqs_unlock(cp_rq, rp_rq); + + /* Other process running */ + + /* + * If we got pending suspenders and suspend ourselves waiting + * to suspend another process we might deadlock. + * In this case we have to yield, be suspended by + * someone else and then do it all over again. + */ + if (!c_p->pending_suspenders) { + /* Mark rp pending for suspend by c_p */ + add_pend_suspend(rp, c_p->id, handle_pend_sync_suspend); + ASSERT(is_nil(c_p->suspendee)); + + /* Suspend c_p; when rp is suspended c_p will be resumed. */ + suspend_process(c_p, c_p); + c_p->flags |= F_P2PNR_RESCHED; + } + /* Yield (caller is assumed to yield immediately in bif). */ + erts_smp_proc_unlock(rp, pid_locks|ERTS_PROC_LOCK_STATUS); + rp = ERTS_PROC_LOCK_BUSY; } done: @@ -5367,10 +5703,10 @@ erts_pid2proc_nropt(Process *c_p, ErtsProcLocks c_p_locks, return erts_pid2proc_not_running(c_p, c_p_locks, pid, pid_locks); } -static ERTS_INLINE void -do_bif_suspend_process(ErtsSuspendMonitor *smon, - Process *suspendee, - ErtsRunQueue *locked_runq) +static ERTS_INLINE int +do_bif_suspend_process(Process *c_p, + ErtsSuspendMonitor *smon, + Process *suspendee) { ASSERT(suspendee); ASSERT(!ERTS_PROC_IS_EXITING(suspendee)); @@ -5378,25 +5714,15 @@ do_bif_suspend_process(ErtsSuspendMonitor *smon, & erts_proc_lc_my_proc_locks(suspendee)); if (smon) { if (!smon->active) { - ErtsRunQueue *rq; - - if (locked_runq) - rq = locked_runq; - else { - rq = erts_get_runq_proc(suspendee); - erts_smp_runq_lock(rq); - } - - suspend_process(rq, suspendee); - - if (!locked_runq) - erts_smp_runq_unlock(rq); + if (!suspend_process(c_p, suspendee)) + return 0; } smon->active += smon->pending; ASSERT(smon->active); smon->pending = 0; + return 1; } - + return 0; } static void @@ -5419,10 +5745,17 @@ handle_pend_bif_sync_suspend(Process *suspendee, erts_delete_suspend_monitor(&suspender->suspend_monitors, suspendee->id); else { +#ifdef DEBUG + int res; +#endif ErtsSuspendMonitor *smon; smon = erts_lookup_suspend_monitor(suspender->suspend_monitors, suspendee->id); - do_bif_suspend_process(smon, suspendee, NULL); +#ifdef DEBUG + res = +#endif + do_bif_suspend_process(suspendee, smon, suspendee); + ASSERT(!smon || res != 0); suspender->suspendee = suspendee->id; } /* suspender is suspended waiting for suspendee to suspend; @@ -5454,10 +5787,17 @@ handle_pend_bif_async_suspend(Process *suspendee, erts_delete_suspend_monitor(&suspender->suspend_monitors, suspendee->id); else { +#ifdef DEBUG + int res; +#endif ErtsSuspendMonitor *smon; smon = erts_lookup_suspend_monitor(suspender->suspend_monitors, suspendee->id); - do_bif_suspend_process(smon, suspendee, NULL); +#ifdef DEBUG + res = +#endif + do_bif_suspend_process(suspendee, smon, suspendee); + ASSERT(!smon || res != 0); } erts_smp_proc_unlock(suspender, ERTS_PROC_LOCK_LINK); } @@ -5542,7 +5882,8 @@ suspend_process_2(BIF_ALIST_2) /* This is really a piece of cake without SMP support... */ if (!smon->active) { - suspend_process(ERTS_RUNQ_IX(0), suspendee); + erts_smp_atomic32_read_bor_nob(&suspendee->state, ERTS_PSFLG_SUSPENDED); + suspend_process(BIF_P, suspendee); smon->active++; res = am_true; } @@ -5585,21 +5926,15 @@ suspend_process_2(BIF_ALIST_2) if (smon->pending && unless_suspending) res = am_false; else { - ErtsRunQueue *rq; if (smon->pending == INT_MAX) goto system_limit; smon->pending++; - rq = erts_get_runq_proc(suspendee); - erts_smp_runq_lock(rq); - if (suspendee->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) + if (!do_bif_suspend_process(BIF_P, smon, suspendee)) add_pend_suspend(suspendee, BIF_P->id, handle_pend_bif_async_suspend); - else - do_bif_suspend_process(smon, suspendee, rq); - erts_smp_runq_unlock(rq); res = am_true; } @@ -5636,7 +5971,6 @@ suspend_process_2(BIF_ALIST_2) /* done */ } else { - ErtsRunQueue *cp_rq, *s_rq; /* We haven't got any active suspends on the suspendee */ /* @@ -5653,12 +5987,7 @@ suspend_process_2(BIF_ALIST_2) if (!unless_suspending || smon->pending == 0) smon->pending++; - cp_rq = erts_get_runq_proc(BIF_P); - s_rq = erts_get_runq_proc(suspendee); - erts_smp_runqs_lock(cp_rq, s_rq); - if (!(suspendee->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING)) { - do_bif_suspend_process(smon, suspendee, s_rq); - erts_smp_runqs_unlock(cp_rq, s_rq); + if (do_bif_suspend_process(BIF_P, smon, suspendee)) { res = (!unless_suspending || smon->active == 1 ? am_true : am_false); @@ -5678,8 +6007,7 @@ suspend_process_2(BIF_ALIST_2) * This time with BIF_P->suspendee == BIF_ARG_1 (see * above). */ - suspend_process(cp_rq, BIF_P); - erts_smp_runqs_unlock(cp_rq, s_rq); + suspend_process(BIF_P, BIF_P); goto yield; } } @@ -5687,9 +6015,15 @@ suspend_process_2(BIF_ALIST_2) } #endif /* ERTS_SMP */ - - ASSERT(suspendee->status == P_SUSPENDED || (asynchronous && smon->pending)); - ASSERT(suspendee->status == P_SUSPENDED || !smon->active); +#ifdef DEBUG + { + erts_aint32_t state = erts_smp_atomic32_read_acqb(&suspendee->state); + ASSERT((state & ERTS_PSFLG_SUSPENDED) + || (asynchronous && smon->pending)); + ASSERT((state & ERTS_PSFLG_SUSPENDED) + || !smon->active); + } +#endif erts_smp_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS); erts_smp_proc_unlock(BIF_P, xlocks); @@ -5784,9 +6118,8 @@ resume_process_1(BIF_ALIST_1) if (!suspendee) goto no_suspendee; - ASSERT(suspendee->status == P_SUSPENDED - || (suspendee->status == P_GARBING - && suspendee->gcstatus == P_SUSPENDED)); + ASSERT(ERTS_PSFLG_SUSPENDED + & erts_smp_atomic32_read_nob(&suspendee->state)); resume_process(suspendee); erts_smp_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS); @@ -5815,449 +6148,46 @@ erts_run_queues_len(Uint *qlen) Uint len = 0; ERTS_ATOMIC_FOREACH_RUNQ(rq, { - if (qlen) - qlen[i++] = rq->procs.len; - len += rq->procs.len; + Sint pqlen = 0; + int pix; + for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) + pqlen += RUNQ_READ_LEN(&rq->procs.prio_info[pix].len); + + if (pqlen < 0) + pqlen = 0; + if (qlen) + qlen[i++] = pqlen; + len += pqlen; } ); return len; } -#ifdef HARDDEBUG_RUNQS -static void -check_procs_runq(ErtsRunQueue *runq, Process *p_in_q, Process *p_not_in_q) -{ - int len[ERTS_NO_PROC_PRIO_LEVELS] = {0}; - int tot_len; - int prioq, prio; - int found_p_in_q; - Process *p, *prevp; - - found_p_in_q = 0; - for (prioq = 0; prioq < ERTS_NO_PROC_PRIO_LEVELS - 1; prioq++) { - prevp = NULL; - for (p = runq->procs.prio[prioq].first; p; p = p->next) { - ASSERT(p != p_not_in_q); - if (p == p_in_q) - found_p_in_q = 1; - switch (p->prio) { - case PRIORITY_MAX: - case PRIORITY_HIGH: - case PRIORITY_NORMAL: - ASSERT(prioq == p->prio); - break; - case PRIORITY_LOW: - ASSERT(prioq == PRIORITY_NORMAL); - break; - default: - ASSERT(!"Bad prio on process"); - } - len[p->prio]++; - ASSERT(prevp == p->prev); - if (p->prev) { - ASSERT(p->prev->next == p); - } - else { - ASSERT(runq->procs.prio[prioq].first == p); - } - if (p->next) { - ASSERT(p->next->prev == p); - } - else { - ASSERT(runq->procs.prio[prioq].last == p); - } - ASSERT(p->run_queue == runq); - prevp = p; - } - } - - ASSERT(!p_in_q || found_p_in_q); - - tot_len = 0; - for (prio = 0; prio < ERTS_NO_PROC_PRIO_LEVELS; prio++) { - ASSERT(len[prio] == runq->procs.prio_info[prio].len); - if (len[prio]) { - ASSERT(runq->flags & (1 << prio)); - } - else { - ASSERT(!(runq->flags & (1 << prio))); - } - tot_len += len[prio]; - } - ASSERT(runq->procs.len == tot_len); -} -# define ERTS_DBG_CHK_PROCS_RUNQ(RQ) check_procs_runq((RQ), NULL, NULL) -# define ERTS_DBG_CHK_PROCS_RUNQ_PROC(RQ, P) check_procs_runq((RQ), (P), NULL) -# define ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(RQ, P) check_procs_runq((RQ), NULL, (P)) -#else -# define ERTS_DBG_CHK_PROCS_RUNQ(RQ) -# define ERTS_DBG_CHK_PROCS_RUNQ_PROC(RQ, P) -# define ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(RQ, P) -#endif - - -static ERTS_INLINE void -enqueue_process(ErtsRunQueue *runq, Process *p) -{ - ErtsRunPrioQueue *rpq; - ErtsRunQueueInfo *rqi; - - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - - ASSERT(p->bound_runq || !(runq->flags & ERTS_RUNQ_FLG_SUSPENDED)); - - rqi = &runq->procs.prio_info[p->prio]; - rqi->len++; - if (rqi->max_len < rqi->len) - rqi->max_len = rqi->len; - - runq->procs.len++; - runq->len++; - if (runq->max_len < runq->len) - runq->max_len = runq->len; - - runq->flags |= (1 << p->prio); - - rpq = (p->prio == PRIORITY_LOW - ? &runq->procs.prio[PRIORITY_NORMAL] - : &runq->procs.prio[p->prio]); - - p->next = NULL; - p->prev = rpq->last; - if (rpq->last) - rpq->last->next = p; - else - rpq->first = p; - rpq->last = p; - - switch (p->status) { - case P_EXITING: - break; - case P_GARBING: - p->gcstatus = P_RUNABLE; - break; - default: - p->status = P_RUNABLE; - break; - } - -#ifdef ERTS_SMP - p->status_flags |= ERTS_PROC_SFLG_INRUNQ; -#endif - - ERTS_DBG_CHK_PROCS_RUNQ_PROC(runq, p); -} - - -static ERTS_INLINE int -dequeue_process(ErtsRunQueue *runq, Process *p) -{ - ErtsRunPrioQueue *rpq; - int res = 1; - - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - - ERTS_DBG_CHK_PROCS_RUNQ(runq); - - rpq = &runq->procs.prio[p->prio == PRIORITY_LOW ? PRIORITY_NORMAL : p->prio]; - if (p->prev) { - p->prev->next = p->next; - } - else if (rpq->first == p) { - rpq->first = p->next; - } - else { - res = 0; - } - if (p->next) { - p->next->prev = p->prev; - } - else if (rpq->last == p) { - rpq->last = p->prev; - } - else { - ASSERT(res == 0); - } - - if (res) { - - if (--runq->procs.prio_info[p->prio].len == 0) - runq->flags &= ~(1 << p->prio); - runq->procs.len--; - runq->len--; - -#ifdef ERTS_SMP - p->status_flags &= ~ERTS_PROC_SFLG_INRUNQ; -#endif - } - - ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(runq, p); - return res; -} - -/* schedule a process */ -static ERTS_INLINE ErtsRunQueue * -internal_add_to_runq(ErtsRunQueue *runq, Process *p) -{ - Uint32 prev_status = p->status; - ErtsRunQueue *add_runq; -#ifdef ERTS_SMP - - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); - - if (p->status_flags & ERTS_PROC_SFLG_INRUNQ) - return NULL; - else if (p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) { - ASSERT(ERTS_PROC_IS_EXITING(p) || p->rcount == 0); - ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(runq, p); - p->status_flags |= ERTS_PROC_SFLG_PENDADD2SCHEDQ; - return NULL; - } - ASSERT(!p->scheduler_data); -#endif - - ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(runq, p); -#ifndef ERTS_SMP - /* Never schedule a suspended process (ok in smp case) */ - ASSERT(ERTS_PROC_IS_EXITING(p) || p->rcount == 0); - add_runq = runq; -#else - ASSERT(!p->bound_runq || p->bound_runq == p->run_queue); - if (p->bound_runq) { - if (p->bound_runq == runq) - add_runq = runq; - else { - add_runq = p->bound_runq; - erts_smp_xrunq_lock(runq, add_runq); - } - } - else { - add_runq = erts_check_emigration_need(runq, p->prio); - if (!add_runq) - add_runq = runq; - else /* Process emigrated */ - p->run_queue = add_runq; - } -#endif - - /* Enqueue the process */ - enqueue_process(add_runq, p); - - if ((erts_system_profile_flags.runnable_procs) - && (prev_status == P_WAITING - || prev_status == P_SUSPENDED)) { - profile_runnable_proc(p, am_active); - } - - if (add_runq != runq) - erts_smp_runq_unlock(add_runq); - - return add_runq; -} - - -void -erts_add_to_runq(Process *p) -{ - ErtsRunQueue *notify_runq; - ErtsRunQueue *runq = erts_get_runq_proc(p); - erts_smp_runq_lock(runq); - notify_runq = internal_add_to_runq(runq, p); - erts_smp_runq_unlock(runq); - smp_notify_inc_runq(notify_runq); - -} - -/* Possibly remove a scheduled process we need to suspend */ - -static int -remove_proc_from_runq(ErtsRunQueue *rq, Process *p, int to_inactive) -{ - int res; - - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - -#ifdef ERTS_SMP - if (p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ) { - p->status_flags &= ~ERTS_PROC_SFLG_PENDADD2SCHEDQ; - ASSERT(!remove_proc_from_runq(rq, p, 0)); - return 1; - } -#endif - - res = dequeue_process(rq, p); - - if (res && erts_system_profile_flags.runnable_procs && to_inactive) - profile_runnable_proc(p, am_inactive); - -#ifdef ERTS_SMP - ASSERT(!(p->status_flags & ERTS_PROC_SFLG_INRUNQ)); -#endif - - return res; -} - -#ifdef ERTS_SMP - -ErtsMigrateResult -erts_proc_migrate(Process *p, ErtsProcLocks *plcks, - ErtsRunQueue *from_rq, int *from_locked, - ErtsRunQueue *to_rq, int *to_locked) -{ - ERTS_SMP_LC_ASSERT(*plcks == erts_proc_lc_my_proc_locks(p)); - ERTS_SMP_LC_ASSERT((ERTS_PROC_LOCK_STATUS & *plcks) - || from_locked); - ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked); - ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked); - - /* - * If we have the lock on the run queue to migrate to, - * check that it isn't suspended. If it is suspended, - * we will refuse to migrate to it anyway. - */ - if (*to_locked && (to_rq->flags & ERTS_RUNQ_FLG_SUSPENDED)) - return ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED; - - /* We need status lock on process and locks on both run queues */ - - if (!(ERTS_PROC_LOCK_STATUS & *plcks)) { - if (erts_smp_proc_trylock(p, ERTS_PROC_LOCK_STATUS) == EBUSY) { - ErtsProcLocks lcks = *plcks; - Eterm pid = p->id; - Process *proc = *plcks ? p : NULL; - - if (*from_locked) { - *from_locked = 0; - erts_smp_runq_unlock(from_rq); - } - if (*to_locked) { - *to_locked = 0; - erts_smp_runq_unlock(to_rq); - } - - proc = erts_pid2proc_opt(proc, - lcks, - pid, - lcks|ERTS_PROC_LOCK_STATUS, - ERTS_P2P_FLG_ALLOW_OTHER_X); - if (!proc) { - *plcks = 0; - return ERTS_MIGRATE_FAILED_NOT_IN_RUNQ; - } - ASSERT(proc == p); - } - *plcks |= ERTS_PROC_LOCK_STATUS; - } - - ASSERT(!p->bound_runq); - - ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked); - ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked); - - if (p->run_queue != from_rq) - return ERTS_MIGRATE_FAILED_RUNQ_CHANGED; - - if (!*from_locked || !*to_locked) { - if (from_rq < to_rq) { - if (!*to_locked) { - if (!*from_locked) - erts_smp_runq_lock(from_rq); - erts_smp_runq_lock(to_rq); - } - else if (erts_smp_runq_trylock(from_rq) == EBUSY) { - erts_smp_runq_unlock(to_rq); - erts_smp_runq_lock(from_rq); - erts_smp_runq_lock(to_rq); - } - } - else { - if (!*from_locked) { - if (!*to_locked) - erts_smp_runq_lock(to_rq); - erts_smp_runq_lock(from_rq); - } - else if (erts_smp_runq_trylock(to_rq) == EBUSY) { - erts_smp_runq_unlock(from_rq); - erts_smp_runq_lock(to_rq); - erts_smp_runq_lock(from_rq); - } - } - *to_locked = *from_locked = 1; - } - - ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked); - ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked); - - /* Ok we now got all locks we need; do it... */ - - /* Refuse to migrate to a suspended run queue */ - if (to_rq->flags & ERTS_RUNQ_FLG_SUSPENDED) - return ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED; - - if ((p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) - || !(p->status_flags & ERTS_PROC_SFLG_INRUNQ)) - return ERTS_MIGRATE_FAILED_NOT_IN_RUNQ; - - dequeue_process(from_rq, p); - p->run_queue = to_rq; - enqueue_process(to_rq, p); - - return ERTS_MIGRATE_SUCCESS; -} -#endif /* ERTS_SMP */ - Eterm erts_process_status(Process *c_p, ErtsProcLocks c_p_locks, Process *rp, Eterm rpid) { Eterm res = am_undefined; - Process *p; - - if (rp) { - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS - & erts_proc_lc_my_proc_locks(rp)); - p = rp; - } - else { - p = erts_pid2proc_opt(c_p, c_p_locks, - rpid, ERTS_PROC_LOCK_STATUS, - ERTS_P2P_FLG_ALLOW_OTHER_X); - } + Process *p = rp ? rp : erts_proc_lookup_raw(rpid); if (p) { - switch (p->status) { - case P_RUNABLE: - res = am_runnable; - break; - case P_WAITING: - res = am_waiting; - break; - case P_RUNNING: - res = am_running; - break; - case P_EXITING: + erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); + if (state & ERTS_PSFLG_FREE) + res = am_free; + else if (state & ERTS_PSFLG_EXITING) res = am_exiting; - break; - case P_GARBING: + else if (state & ERTS_PSFLG_GC) res = am_garbage_collecting; - break; - case P_SUSPENDED: + else if (state & ERTS_PSFLG_SUSPENDED) res = am_suspended; - break; - case P_FREE: /* We cannot look up a process in P_FREE... */ - default: /* Not a valid status... */ - erl_exit(1, "Bad status (%b32u) found for process %T\n", - p->status, p->id); - break; - } - -#ifdef ERTS_SMP - if (!rp && (p != c_p || !(ERTS_PROC_LOCK_STATUS & c_p_locks))) - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); + else if (state & ERTS_PSFLG_RUNNING) + res = am_running; + else if (state & ERTS_PSFLG_ACTIVE) + res = am_runnable; + else + res = am_waiting; } +#ifdef ERTS_SMP else { int i; ErtsSchedulerData *esdp; @@ -6272,42 +6202,40 @@ erts_process_status(Process *c_p, ErtsProcLocks c_p_locks, } erts_smp_runq_unlock(esdp->run_queue); } - -#endif - } - +#endif return res; } /* -** Suspend a process +** Suspend a currently executing process ** If we are to suspend on a port the busy_port is the thing ** otherwise busy_port is NIL */ void -erts_suspend(Process* process, ErtsProcLocks process_locks, Port *busy_port) +erts_suspend(Process* c_p, ErtsProcLocks c_p_locks, Port *busy_port) { - ErtsRunQueue *rq; - - ERTS_SMP_LC_ASSERT(process_locks == erts_proc_lc_my_proc_locks(process)); - if (!(process_locks & ERTS_PROC_LOCK_STATUS)) - erts_smp_proc_lock(process, ERTS_PROC_LOCK_STATUS); - - rq = erts_get_runq_proc(process); - - erts_smp_runq_lock(rq); +#ifdef DEBUG + int res; +#endif + ASSERT(c_p == erts_get_current_process()); + ERTS_SMP_LC_ASSERT(c_p_locks == erts_proc_lc_my_proc_locks(c_p)); + if (!(c_p_locks & ERTS_PROC_LOCK_STATUS)) + erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); - suspend_process(rq, process); +#ifdef DEBUG + res = +#endif + suspend_process(c_p, c_p); - erts_smp_runq_unlock(rq); + ASSERT(res); if (busy_port) - erts_wake_process_later(busy_port, process); + erts_wake_process_later(busy_port, c_p); - if (!(process_locks & ERTS_PROC_LOCK_STATUS)) - erts_smp_proc_unlock(process, ERTS_PROC_LOCK_STATUS); + if (!(c_p_locks & ERTS_PROC_LOCK_STATUS)) + erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS); } @@ -6348,57 +6276,54 @@ erts_resume_processes(ErtsProcList *plp) Eterm erts_get_process_priority(Process *p) { - ErtsRunQueue *rq; - Eterm value; - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - rq = erts_get_runq_proc(p); - erts_smp_runq_lock(rq); - switch(p->prio) { - case PRIORITY_MAX: value = am_max; break; - case PRIORITY_HIGH: value = am_high; break; - case PRIORITY_NORMAL: value = am_normal; break; - case PRIORITY_LOW: value = am_low; break; - default: ASSERT(0); value = am_undefined; break; + erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); + switch (state & ERTS_PSFLG_PRIO_MASK) { + case PRIORITY_MAX: return am_max; + case PRIORITY_HIGH: return am_high; + case PRIORITY_NORMAL: return am_normal; + case PRIORITY_LOW: return am_low; + default: ASSERT(0); return am_undefined; } - erts_smp_runq_unlock(rq); - return value; } Eterm -erts_set_process_priority(Process *p, Eterm new_value) +erts_set_process_priority(Process *p, Eterm value) { - ErtsRunQueue *rq; - Eterm old_value; - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - rq = erts_get_runq_proc(p); -#ifdef ERTS_SMP - ASSERT(!(p->status_flags & ERTS_PROC_SFLG_INRUNQ)); -#endif - erts_smp_runq_lock(rq); - switch(p->prio) { - case PRIORITY_MAX: old_value = am_max; break; - case PRIORITY_HIGH: old_value = am_high; break; - case PRIORITY_NORMAL: old_value = am_normal; break; - case PRIORITY_LOW: old_value = am_low; break; - default: ASSERT(0); old_value = am_undefined; break; - } - switch (new_value) { - case am_max: p->prio = PRIORITY_MAX; break; - case am_high: p->prio = PRIORITY_HIGH; break; - case am_normal: p->prio = PRIORITY_NORMAL; break; - case am_low: p->prio = PRIORITY_LOW; break; - default: old_value = THE_NON_VALUE; break; + erts_aint32_t a, oprio, nprio; + + switch (value) { + case am_max: nprio = (erts_aint32_t) PRIORITY_MAX; break; + case am_high: nprio = (erts_aint32_t) PRIORITY_HIGH; break; + case am_normal: nprio = (erts_aint32_t) PRIORITY_NORMAL; break; + case am_low: nprio = (erts_aint32_t) PRIORITY_LOW; break; + default: return THE_NON_VALUE; break; } - erts_smp_runq_unlock(rq); - return old_value; -} -/* note that P_RUNNING is only set so that we don't try to remove -** running processes from the schedule queue if they exit - a running -** process not being in the schedule queue!! -** Schedule for up to INPUT_REDUCTIONS context switches, -** return 1 if more to do. -*/ + a = erts_smp_atomic32_read_nob(&p->state); + if (nprio == (a & ERTS_PSFLG_PRIO_MASK)) + oprio = nprio; + else { + erts_aint32_t e, n; + do { + oprio = a & ERTS_PSFLG_PRIO_MASK; + n = e = a; + + ASSERT(!(a & ERTS_PSFLG_IN_RUNQ)); + + n &= ~ERTS_PSFLG_PRIO_MASK; + n |= nprio; + a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); + } while (a != e); + } + + switch (oprio) { + case PRIORITY_MAX: return am_max; + case PRIORITY_HIGH: return am_high; + case PRIORITY_NORMAL: return am_normal; + case PRIORITY_LOW: return am_low; + default: ASSERT(0); return am_undefined; + } +} /* * schedule() is called from BEAM (process_main()) or HiPE @@ -6421,7 +6346,6 @@ erts_set_process_priority(Process *p, Eterm new_value) Process *schedule(Process *p, int calls) { ErtsRunQueue *rq; - ErtsRunPrioQueue *rpq; erts_aint_t dt; ErtsSchedulerData *esdp; int context_reds; @@ -6429,6 +6353,8 @@ Process *schedule(Process *p, int calls) int input_reductions; int actual_reds; int reds; + Uint32 flags; + erts_aint32_t state = 0; /* Supress warning... */ #ifdef USE_VM_PROBES if (p != NULL && DTRACE_ENABLED(process_unscheduled)) { @@ -6484,68 +6410,49 @@ Process *schedule(Process *p, int calls) erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); - if ((erts_system_profile_flags.runnable_procs) - && (p->status == P_WAITING)) { - profile_runnable_proc(p, am_inactive); - } + state = erts_smp_atomic32_read_acqb(&p->state); if (IS_TRACED(p)) { - if (IS_TRACED_FL(p, F_TRACE_CALLS) && p->status != P_FREE) { + if (IS_TRACED_FL(p, F_TRACE_CALLS) && !(state & ERTS_PSFLG_FREE)) { erts_schedule_time_break(p, ERTS_BP_CALL_TIME_SCHEDULE_OUT); } - switch (p->status) { - case P_EXITING: - if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_EXIT)) - trace_sched(p, am_out_exiting); - break; - case P_FREE: + if (state & (ERTS_PSFLG_FREE|ERTS_PSFLG_EXITING)) { if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_EXIT)) - trace_sched(p, am_out_exited); - break; - default: + trace_sched(p, ((state & ERTS_PSFLG_FREE) + ? am_out_exited + : am_out_exiting)); + } + else { if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED)) trace_sched(p, am_out); else if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_PROCS)) trace_virtual_sched(p, am_out); - break; } } #ifdef ERTS_SMP - if (ERTS_PROC_PENDING_EXIT(p)) { - erts_handle_pending_exit(p, - ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); - p->status_flags |= ERTS_PROC_SFLG_PENDADD2SCHEDQ; - } - - if (p->pending_suspenders) { - handle_pending_suspend(p, - ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); - ASSERT(!(p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ) - || p->rcount == 0); - } + if (state & ERTS_PSFLG_PENDING_EXIT) + erts_handle_pending_exit(p, (ERTS_PROC_LOCK_MAIN + | ERTS_PROC_LOCK_STATUS)); + if (p->pending_suspenders) + handle_pending_suspend(p, (ERTS_PROC_LOCK_MAIN + | ERTS_PROC_LOCK_STATUS)); #endif - erts_smp_runq_lock(rq); - ERTS_PROC_REDUCTIONS_EXECUTED(rq, p->prio, reds, actual_reds); + schedule_out_process(rq, state, p); /* Returns with rq locked! */ + + ERTS_PROC_REDUCTIONS_EXECUTED(rq, + (int) (state & ERTS_PSFLG_PRIO_MASK), + reds, + actual_reds); esdp->current_process = NULL; #ifdef ERTS_SMP p->scheduler_data = NULL; - p->runq_flags &= ~ERTS_PROC_RUNQ_FLG_RUNNING; - p->status_flags &= ~ERTS_PROC_SFLG_RUNNING; - - if (p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ) { - ErtsRunQueue *notify_runq; - p->status_flags &= ~ERTS_PROC_SFLG_PENDADD2SCHEDQ; - notify_runq = internal_add_to_runq(rq, p); - if (notify_runq != rq) - smp_notify_inc_runq(notify_runq); - } #endif - if (p->status == P_FREE) { + if (state & ERTS_PSFLG_FREE) { #ifdef ERTS_SMP ASSERT(esdp->free_process == p); esdp->free_process = NULL; @@ -6576,8 +6483,9 @@ Process *schedule(Process *p, int calls) ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); check_activities_to_run: { - #ifdef ERTS_SMP + ErtsMigrationPaths *mps; + ErtsMigrationPath *mp; #ifdef ERTS_SMP { @@ -6599,20 +6507,28 @@ Process *schedule(Process *p, int calls) ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking()); ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); - if (rq->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK) - immigrate(rq); + mps = erts_get_migration_paths_managed(); + mp = &mps->mpath[rq->ix]; + + if (mp->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK) + immigrate(rq, mp); - continue_check_activities_to_run: + continue_check_activities_to_run: + flags = ERTS_RUNQ_FLGS_GET_NOB(rq); + continue_check_activities_to_run_known_flags: - if (rq->flags & (ERTS_RUNQ_FLG_CHK_CPU_BIND - | ERTS_RUNQ_FLG_SUSPENDED)) { - if (rq->flags & ERTS_RUNQ_FLG_SUSPENDED) { - ASSERT(erts_smp_atomic32_read_nob(&esdp->ssi->flags) - & ERTS_SSI_FLG_SUSPENDED); + + if (flags & (ERTS_RUNQ_FLG_CHK_CPU_BIND|ERTS_RUNQ_FLG_SUSPENDED)) { + + if (flags & ERTS_RUNQ_FLG_SUSPENDED) { suspend_scheduler(esdp); + flags = ERTS_RUNQ_FLGS_GET_NOB(rq); } - if (rq->flags & ERTS_RUNQ_FLG_CHK_CPU_BIND) + if (flags & ERTS_RUNQ_FLG_CHK_CPU_BIND) { + flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_CHK_CPU_BIND); + flags &= ~ ERTS_RUNQ_FLG_CHK_CPU_BIND; erts_sched_check_cpu_bind(esdp); + } } { @@ -6641,14 +6557,12 @@ Process *schedule(Process *p, int calls) } #endif /* ERTS_SMP */ - ASSERT(rq->len == rq->procs.len + rq->ports.info.len); - - if ((rq->len == 0 && !rq->misc.start) - || (rq->halt_in_progress - && rq->ports.info.len == 0 && !rq->misc.start)) { + flags = ERTS_RUNQ_FLGS_GET_NOB(rq); + if ((!(flags & ERTS_RUNQ_FLGS_QMASK) && !rq->misc.start) + || (rq->halt_in_progress && ERTS_EMPTY_RUNQ_PORTS(rq))) { + /* Prepare for scheduler wait */ #ifdef ERTS_SMP - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); rq->wakeup_other = 0; @@ -6656,21 +6570,27 @@ Process *schedule(Process *p, int calls) empty_runq(rq); - if (rq->flags & ERTS_RUNQ_FLG_SUSPENDED) { - ASSERT(erts_smp_atomic32_read_nob(&esdp->ssi->flags) - & ERTS_SSI_FLG_SUSPENDED); + flags = ERTS_RUNQ_FLGS_GET_NOB(rq); + if (flags & ERTS_RUNQ_FLG_SUSPENDED) { non_empty_runq(rq); - goto continue_check_activities_to_run; + goto continue_check_activities_to_run_known_flags; } - else if (!(rq->flags & ERTS_RUNQ_FLG_INACTIVE)) { + else if (!(flags & ERTS_RUNQ_FLG_INACTIVE)) { + if (try_steal_task(rq)) { + non_empty_runq(rq); + goto continue_check_activities_to_run; + } + + ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED); + /* * Check for ERTS_RUNQ_FLG_SUSPENDED has to be done * after trying to steal a task. */ - if (try_steal_task(rq) - || (rq->flags & ERTS_RUNQ_FLG_SUSPENDED)) { + flags = ERTS_RUNQ_FLGS_GET_NOB(rq); + if (flags & ERTS_RUNQ_FLG_SUSPENDED) { non_empty_runq(rq); - goto continue_check_activities_to_run; + goto continue_check_activities_to_run_known_flags; } } @@ -6701,6 +6621,7 @@ Process *schedule(Process *p, int calls) erl_sys_schedule(1); dt = erts_do_time_read_and_reset(); if (dt) erts_bump_timer(dt); + #ifdef ERTS_SMP erts_smp_runq_lock(rq); clear_sys_scheduling(); @@ -6717,14 +6638,17 @@ Process *schedule(Process *p, int calls) { int wo_reds = rq->wakeup_other_reds; if (wo_reds) { - if (rq->len < 2) { + erts_aint32_t len = rq->len; + if (len < 2) { rq->wakeup_other -= ERTS_WAKEUP_OTHER_DEC*wo_reds; if (rq->wakeup_other < 0) rq->wakeup_other = 0; } else if (rq->wakeup_other < wakeup_other_limit) - rq->wakeup_other += rq->len*wo_reds + ERTS_WAKEUP_OTHER_FIXED_INC; + rq->wakeup_other += len*wo_reds + ERTS_WAKEUP_OTHER_FIXED_INC; else { + if (flags & ERTS_RUNQ_FLG_PROTECTED) + ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED); if (erts_smp_atomic32_read_acqb(&no_empty_run_queues) != 0) { wake_scheduler_on_empty_runq(rq); rq->wakeup_other = 0; @@ -6740,7 +6664,7 @@ Process *schedule(Process *p, int calls) * Find a new port to run. */ - if (rq->ports.info.len) { + if (RUNQ_READ_LEN(&rq->ports.info.len)) { int have_outstanding_io; have_outstanding_io = erts_port_task_execute(rq, &esdp->current_port); if ((have_outstanding_io && fcalls > 2*input_reductions) @@ -6767,160 +6691,123 @@ Process *schedule(Process *p, int calls) /* * Find a new process to run. */ - pick_next_process: - - ERTS_DBG_CHK_PROCS_RUNQ(rq); - - switch (rq->flags & ERTS_RUNQ_FLGS_PROCS_QMASK) { - case MAX_BIT: - case MAX_BIT|HIGH_BIT: - case MAX_BIT|NORMAL_BIT: - case MAX_BIT|LOW_BIT: - case MAX_BIT|HIGH_BIT|NORMAL_BIT: - case MAX_BIT|HIGH_BIT|LOW_BIT: - case MAX_BIT|NORMAL_BIT|LOW_BIT: - case MAX_BIT|HIGH_BIT|NORMAL_BIT|LOW_BIT: - rpq = &rq->procs.prio[PRIORITY_MAX]; - break; - case HIGH_BIT: - case HIGH_BIT|NORMAL_BIT: - case HIGH_BIT|LOW_BIT: - case HIGH_BIT|NORMAL_BIT|LOW_BIT: - rpq = &rq->procs.prio[PRIORITY_HIGH]; - break; - case NORMAL_BIT: - rpq = &rq->procs.prio[PRIORITY_NORMAL]; - break; - case LOW_BIT: - rpq = &rq->procs.prio[PRIORITY_NORMAL]; - break; - case NORMAL_BIT|LOW_BIT: - rpq = &rq->procs.prio[PRIORITY_NORMAL]; - ASSERT(rpq->first != NULL); - p = rpq->first; - if (p->prio == PRIORITY_LOW) { - if (p == rpq->last || p->skipped >= RESCHEDULE_LOW-1) - p->skipped = 0; - else { - /* skip it */ - p->skipped++; - rpq->first = p->next; - rpq->first->prev = NULL; - rpq->last->next = p; - p->prev = rpq->last; - p->next = NULL; - rpq->last = p; + pick_next_process: { + int prio_q; + int qmask; + + flags = ERTS_RUNQ_FLGS_GET_NOB(rq); + qmask = (int) (flags & ERTS_RUNQ_FLGS_PROCS_QMASK); + switch (qmask & -qmask) { + case MAX_BIT: + prio_q = PRIORITY_MAX; + break; + case HIGH_BIT: + prio_q = PRIORITY_HIGH; + break; + case NORMAL_BIT: + case LOW_BIT: + prio_q = PRIORITY_NORMAL; + if (check_requeue_process(rq, PRIORITY_NORMAL)) goto pick_next_process; - } + break; + case 0: /* No process at all */ + default: + ASSERT(qmask == 0); + goto check_activities_to_run; } - break; - case 0: /* No process at all */ - default: - ASSERT((rq->flags & ERTS_RUNQ_FLGS_PROCS_QMASK) == 0); - ASSERT(rq->procs.len == 0); - goto check_activities_to_run; - } - - BM_START_TIMER(system); - - /* - * Take the chosen process out of the queue. - */ - ASSERT(rpq->first); /* Wrong qmask in rq->flags? */ - p = rpq->first; -#ifdef ERTS_SMP - ERTS_SMP_LC_ASSERT(rq == p->run_queue); -#endif - rpq->first = p->next; - if (!rpq->first) - rpq->last = NULL; - else - rpq->first->prev = NULL; - p->next = p->prev = NULL; + BM_START_TIMER(system); - if (--rq->procs.prio_info[p->prio].len == 0) - rq->flags &= ~(1 << p->prio); - ASSERT(rq->procs.len > 0); - rq->procs.len--; - ASSERT(rq->len > 0); - rq->len--; + /* + * Take the chosen process out of the queue. + */ + p = dequeue_process(rq, prio_q, &state); - { - Uint32 ee_flgs = (ERTS_RUNQ_FLG_EVACUATE(p->prio) - | ERTS_RUNQ_FLG_EMIGRATE(p->prio)); + ASSERT(p); /* Wrong qmask in rq->flags? */ - if ((rq->flags & (ERTS_RUNQ_FLG_SUSPENDED|ee_flgs)) == ee_flgs) - ERTS_UNSET_RUNQ_FLG_EVACUATE(rq->flags, p->prio); - } + while (1) { + erts_aint32_t exp, new, tmp; + tmp = new = exp = state; + new &= ~ERTS_PSFLG_IN_RUNQ; + tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); + if (tmp != ERTS_PSFLG_SUSPENDED) + new |= ERTS_PSFLG_RUNNING; + state = erts_smp_atomic32_cmpxchg_relb(&p->state, new, exp); + if (state == exp) { + tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); + if (tmp == ERTS_PSFLG_SUSPENDED) + goto pick_next_process; + state = new; + break; + } + } - ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(rq, p); + rq->procs.context_switches++; - rq->procs.context_switches++; + esdp->current_process = p; - esdp->current_process = p; + } #ifdef ERTS_SMP - p->runq_flags |= ERTS_PROC_RUNQ_FLG_RUNNING; erts_smp_runq_unlock(rq); + if (flags & ERTS_RUNQ_FLG_PROTECTED) + ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED); + ERTS_SMP_CHK_NO_PROC_LOCKS; erts_smp_proc_lock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); if (erts_sched_stat.enabled) { + int prio; UWord old = ERTS_PROC_SCHED_ID(p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_STATUS), (UWord) esdp->no); int migrated = old && old != esdp->no; + prio = (int) (state & ERTS_PSFLG_PRIO_MASK); + erts_smp_spin_lock(&erts_sched_stat.lock); - erts_sched_stat.prio[p->prio].total_executed++; - erts_sched_stat.prio[p->prio].executed++; + erts_sched_stat.prio[prio].total_executed++; + erts_sched_stat.prio[prio].executed++; if (migrated) { - erts_sched_stat.prio[p->prio].total_migrated++; - erts_sched_stat.prio[p->prio].migrated++; + erts_sched_stat.prio[prio].total_migrated++; + erts_sched_stat.prio[prio].migrated++; } erts_smp_spin_unlock(&erts_sched_stat.lock); } - p->status_flags |= ERTS_PROC_SFLG_RUNNING; - p->status_flags &= ~ERTS_PROC_SFLG_INRUNQ; if (ERTS_PROC_PENDING_EXIT(p)) { erts_handle_pending_exit(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); + state = erts_smp_atomic32_read_nob(&p->state); } ASSERT(!p->scheduler_data); p->scheduler_data = esdp; - #endif - ASSERT(p->status != P_SUSPENDED); /* Never run a suspended process */ + /* Never run a suspended process */ + ASSERT(!(ERTS_PSFLG_SUSPENDED & erts_smp_atomic32_read_nob(&p->state))); ACTIVATE(p); reds = context_reds; if (IS_TRACED(p)) { - switch (p->status) { - case P_EXITING: + if (state & ERTS_PSFLG_EXITING) { if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_EXIT)) trace_sched(p, am_in_exiting); - break; - default: + } + else { if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED)) trace_sched(p, am_in); else if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_PROCS)) trace_virtual_sched(p, am_in); - break; } if (IS_TRACED_FL(p, F_TRACE_CALLS)) { erts_schedule_time_break(p, ERTS_BP_CALL_TIME_SCHEDULE_IN); } } - if (p->status != P_EXITING) - p->status = P_RUNNING; - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); #ifdef ERTS_SMP @@ -6928,7 +6815,7 @@ Process *schedule(Process *p, int calls) erts_check_my_tracer_proc(p); #endif - if (!ERTS_PROC_IS_EXITING(p) + if (!(state & ERTS_PSFLG_EXITING) && ((FLAGS(p) & F_FORCE_GC) || (MSO(p).overhead > BIN_VHEAP_SZ(p)))) { reds -= erts_garbage_collect(p, 0, p->arg_reg, p->arity); @@ -7020,17 +6907,19 @@ erts_schedule_misc_op(void (*func)(void *), void *arg) ErtsSchedulerData *esdp = erts_get_scheduler_data(); ErtsRunQueue *rq = esdp ? esdp->run_queue : ERTS_RUNQ_IX(0); ErtsMiscOpList *molp = misc_op_list_alloc(); +#ifdef ERTS_SMP + ErtsMigrationPaths *mpaths = erts_get_migration_paths(); - erts_smp_runq_lock(rq); - - while (rq->misc.evac_runq) { - ErtsRunQueue *tmp_rq = rq->misc.evac_runq; - erts_smp_runq_unlock(rq); - rq = tmp_rq; - erts_smp_runq_lock(rq); + if (!mpaths) + rq = ERTS_RUNQ_IX(0); + else { + ErtsRunQueue *erq = mpaths->mpath[rq->ix].misc_evac_runq; + if (erq) + rq = erq; } +#endif - ASSERT(!(rq->flags & ERTS_RUNQ_FLG_SUSPENDED)); + erts_smp_runq_lock(rq); molp->next = NULL; molp->func = func; @@ -7040,7 +6929,9 @@ erts_schedule_misc_op(void (*func)(void *), void *arg) else rq->misc.start = molp; rq->misc.end = molp; + erts_smp_runq_unlock(rq); + smp_notify_inc_runq(rq); } @@ -7201,7 +7092,7 @@ erts_free_proc(Process *p) ** Allocate process and find out where to place next process. */ static Process* -alloc_process(void) +alloc_process(ErtsRunQueue *rq, erts_aint32_t state) { int pix; Process* p; @@ -7280,6 +7171,12 @@ alloc_process(void) exp_lpd = act_lpd; } +#ifdef ERTS_SMP + RUNQ_SET_RQ(&p->run_queue, rq); +#endif + + erts_smp_atomic32_init_relb(&p->state, state); + #ifdef DEBUG pid = p->id; #endif @@ -7303,7 +7200,6 @@ alloc_process(void) erts_smp_rwmtx_runlock(&erts_proc_tab_rwmtx); - p->rstatus = P_FREE; p->rcount = 0; ASSERT(p == (Process *) @@ -7317,6 +7213,7 @@ system_limit: erts_smp_rwmtx_runlock(&erts_proc_tab_rwmtx); return NULL; + } Eterm @@ -7326,7 +7223,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). Eterm args, /* Arguments for function (must be well-formed list). */ ErlSpawnOpts* so) /* Options for spawn. */ { - ErtsRunQueue *rq, *notify_runq; + ErtsRunQueue *rq = NULL; Process *p; Sint arity; /* Number of arguments. */ #ifndef HYBRID @@ -7335,6 +7232,8 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). Uint sz; /* Needed words on heap. */ Uint heap_need; /* Size needed on heap. */ Eterm res = THE_NON_VALUE; + erts_aint32_t state = 0; + erts_aint32_t prio = (erts_aint32_t) PRIORITY_NORMAL; #ifdef ERTS_SMP erts_smp_proc_lock(parent, ERTS_PROC_LOCKS_ALL_MINOR); @@ -7359,8 +7258,24 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). so->error_code = BADARG; goto error; } - p = alloc_process(); /* All proc locks are locked by this thread - on success */ + + if (so->flags & SPO_USE_ARGS) { + if (so->scheduler) { + int ix = so->scheduler-1; + ASSERT(0 <= ix && ix < erts_no_run_queues); + rq = ERTS_RUNQ_IX(ix); + state |= ERTS_PSFLG_BOUND; + } + prio = (erts_aint32_t) so->priority; + } + + state |= (prio & ERTS_PSFLG_PRIO_MASK); + + if (!rq) + rq = erts_get_runq_proc(parent); + + p = alloc_process(rq, state); /* All proc locks are locked by this thread + on success */ if (!p) { erts_send_error_to_logger_str(parent->group_leader, "Too many processes\n"); @@ -7382,22 +7297,16 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->flags = erts_default_process_flags; - /* Scheduler queue mutex should be locked when changeing - * prio. In this case we don't have to lock it, since - * noone except us has access to the process. - */ if (so->flags & SPO_USE_ARGS) { p->min_heap_size = so->min_heap_size; p->min_vheap_size = so->min_vheap_size; - p->prio = so->priority; p->max_gen_gcs = so->max_gen_gcs; } else { p->min_heap_size = H_MIN_SIZE; p->min_vheap_size = BIN_VH_MIN_SIZE; - p->prio = PRIORITY_NORMAL; p->max_gen_gcs = (Uint16) erts_smp_atomic32_read_nob(&erts_max_gen_gcs); } - p->skipped = 0; + p->schedule_count = 0; ASSERT(p->min_heap_size == erts_next_heap_size(p->min_heap_size, 0)); p->initial[INITIAL_MOD] = mod; @@ -7501,12 +7410,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). : STORE_NC(&p->htop, &p->off_heap, parent->group_leader); } -#if 1 - p->trace_flags = ~TRACEE_FLAGS; - p->tracer_proc = NIL; -#else erts_get_default_tracing(&p->trace_flags, &p->tracer_proc); -#endif p->msg.first = NULL; p->msg.last = &p->msg.first; @@ -7516,7 +7420,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->msg_inq.first = NULL; p->msg_inq.last = &p->msg_inq.first; p->msg_inq.len = 0; - p->bound_runq = NULL; #endif p->bif_timers = NULL; p->mbuf = NULL; @@ -7618,9 +7521,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). #ifdef ERTS_SMP p->scheduler_data = NULL; - erts_atomic32_init_nob(&p->aflags, 0); - p->status_flags = 0; - p->runq_flags = 0; p->suspendee = NIL; p->pending_suspenders = NULL; p->pending_exit.reason = THE_NON_VALUE; @@ -7631,34 +7531,15 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->fp_exception = 0; #endif + erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL); + + res = p->id; + /* * Schedule process for execution. */ - if (!((so->flags & SPO_USE_ARGS) && so->scheduler)) - rq = erts_get_runq_proc(parent); - else { - int ix = so->scheduler-1; - ASSERT(0 <= ix && ix < erts_no_run_queues); - rq = ERTS_RUNQ_IX(ix); - p->bound_runq = rq; - } - - erts_smp_runq_lock(rq); - -#ifdef ERTS_SMP - p->run_queue = rq; -#endif - - p->status = P_WAITING; - notify_runq = internal_add_to_runq(rq, p); - - erts_smp_runq_unlock(rq); - - smp_notify_inc_runq(notify_runq); - - res = p->id; - erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL); + schedule_process(p, state, 0); VERBOSE(DEBUG_PROCESSES, ("Created a new process: %T\n",p->id)); @@ -7694,12 +7575,8 @@ void erts_init_empty_process(Process *p) p->max_gen_gcs = 0; p->min_heap_size = 0; p->min_vheap_size = 0; - p->status = P_RUNABLE; - p->gcstatus = P_RUNABLE; - p->rstatus = P_RUNABLE; p->rcount = 0; p->id = ERTS_INVALID_PID; - p->prio = PRIORITY_NORMAL; p->reds = 0; p->tracer_proc = NIL; p->trace_flags = F_INITIAL_TRACE_FLAGS; @@ -7716,7 +7593,6 @@ void erts_init_empty_process(Process *p) p->bin_vheap_mature = 0; #ifdef ERTS_SMP p->u.ptimer = NULL; - p->bound_runq = NULL; #else memset(&(p->u.tm), 0, sizeof(ErlTimer)); #endif @@ -7793,12 +7669,10 @@ void erts_init_empty_process(Process *p) p->last_old_htop = NULL; #endif + erts_smp_atomic32_init_nob(&p->state, (erts_aint32_t) PRIORITY_NORMAL); #ifdef ERTS_SMP p->scheduler_data = NULL; - erts_atomic32_init_nob(&p->aflags, 0); - p->status_flags = 0; - p->runq_flags = 0; p->msg_inq.first = NULL; p->msg_inq.last = &p->msg_inq.first; p->msg_inq.len = 0; @@ -7808,7 +7682,7 @@ void erts_init_empty_process(Process *p) p->pending_exit.bp = NULL; erts_proc_lock_init(p); erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL); - p->run_queue = ERTS_RUNQ_IX(0); + RUNQ_SET_RQ(&p->run_queue, ERTS_RUNQ_IX(0)); #endif #if !defined(NO_FPE_SIGNALS) || defined(HIPE) @@ -8003,26 +7877,34 @@ delete_process(Process* p) } +static ERTS_INLINE erts_aint32_t +set_proc_exiting_state(Process *p, erts_aint32_t state) +{ + erts_aint32_t a, n, e; + a = state; + while (1) { + n = e = a; + n &= ~(ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); + n |= ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE; + if (!(a & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING))) + n |= ERTS_PSFLG_IN_RUNQ; + a = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e); + if (a == e) + break; + } + return a; +} + static ERTS_INLINE void -set_proc_exiting(Process *p, Eterm reason, ErlHeapFragment *bp) +set_proc_exiting(Process *p, + erts_aint32_t state, + Eterm reason, + ErlHeapFragment *bp) { -#ifdef ERTS_SMP - erts_pix_lock_t *pix_lock = ERTS_PID2PIXLOCK(p->id); ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(p) == ERTS_PROC_LOCKS_ALL); - /* - * You are required to have all proc locks and the pix lock when going - * to status P_EXITING. This makes it is enough to take any lock when - * looking up a process (pid2proc()) to prevent the looked up process - * from exiting until the lock has been released. - */ - erts_pix_lock(pix_lock); - erts_atomic32_read_bor_relb(&p->aflags, ERTS_PROC_AFLG_EXITING); -#endif - p->status = P_EXITING; -#ifdef ERTS_SMP - erts_pix_unlock(pix_lock); -#endif + state = set_proc_exiting_state(p, state); + p->fvalue = reason; if (bp) erts_link_mbuf_to_proc(p, bp); @@ -8035,6 +7917,14 @@ set_proc_exiting(Process *p, Eterm reason, ErlHeapFragment *bp) KILL_CATCHES(p); cancel_timer(p); p->i = (BeamInstr *) beam_exit; + + if (erts_system_profile_flags.runnable_procs + && !(state & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) { + profile_runnable_proc(p, am_active); + } + + if (!(state & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING))) + add2runq(p, state); } @@ -8047,8 +7937,8 @@ erts_handle_pending_exit(Process *c_p, ErtsProcLocks locks) ASSERT(is_value(c_p->pending_exit.reason)); ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == locks); ERTS_SMP_LC_ASSERT(locks & ERTS_PROC_LOCK_MAIN); - ERTS_SMP_LC_ASSERT(c_p->status != P_EXITING); - ERTS_SMP_LC_ASSERT(c_p->status != P_FREE); + ERTS_SMP_LC_ASSERT(!((ERTS_PSFLG_EXITING|ERTS_PSFLG_FREE) + & erts_smp_atomic32_read_nob(&c_p->state))); /* Ensure that all locks on c_p are locked before proceeding... */ if (locks == ERTS_PROC_LOCKS_ALL) @@ -8061,7 +7951,10 @@ erts_handle_pending_exit(Process *c_p, ErtsProcLocks locks) } } - set_proc_exiting(c_p, c_p->pending_exit.reason, c_p->pending_exit.bp); + set_proc_exiting(c_p, + erts_smp_atomic32_read_acqb(&c_p->state), + c_p->pending_exit.reason, + c_p->pending_exit.bp); c_p->pending_exit.reason = THE_NON_VALUE; c_p->pending_exit.bp = NULL; @@ -8077,11 +7970,12 @@ handle_pending_exiters(ErtsProcList *pnd_xtrs) while (plp) { Process *p = erts_pid2proc(NULL, 0, plp->pid, ERTS_PROC_LOCKS_ALL); if (p) { - if (proclist_same(plp, p) - && !(p->status_flags & ERTS_PROC_SFLG_RUNNING)) { - ASSERT(p->status_flags & ERTS_PROC_SFLG_INRUNQ); - ASSERT(ERTS_PROC_PENDING_EXIT(p)); - erts_handle_pending_exit(p, ERTS_PROC_LOCKS_ALL); + if (proclist_same(plp, p)) { + erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); + if (!(state & ERTS_PSFLG_RUNNING)) { + ASSERT(state & ERTS_PSFLG_PENDING_EXIT); + erts_handle_pending_exit(p, ERTS_PROC_LOCKS_ALL); + } } erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL); } @@ -8171,7 +8065,7 @@ send_exit_message(Process *to, ErtsProcLocks *to_locksp, * SMP emulator). When the signal is received the receiver receives an * 'EXIT' message if it is trapping exits; otherwise, it will either * ignore the signal if the exit reason is normal, or go into an - * exiting state (status P_EXITING). When a process has gone into the + * exiting state (ERTS_PSFLG_EXITING). When a process has gone into the * exiting state it will not execute any more Erlang code, but it might * take a while before it actually exits. The exit signal is being * received when the 'EXIT' message is put in the message queue, the @@ -8244,6 +8138,7 @@ send_exit_signal(Process *c_p, /* current process if and only Uint32 flags /* flags */ ) { + erts_aint32_t state = erts_smp_atomic32_read_nob(&rp->state); Eterm rsn = reason == am_kill ? am_killed : reason; ERTS_SMP_LC_ASSERT(*rp_locks == erts_proc_lc_my_proc_locks(rp)); @@ -8265,7 +8160,7 @@ send_exit_signal(Process *c_p, /* current process if and only } #endif - if (ERTS_PROC_IS_TRAPPING_EXITS(rp) + if ((state & ERTS_PSFLG_TRAP_EXIT) && (reason != am_kill || (flags & ERTS_XSIG_FLG_IGN_KILL))) { if (is_not_nil(token) #ifdef USE_VM_PROBES @@ -8281,9 +8176,7 @@ send_exit_signal(Process *c_p, /* current process if and only } else if (reason != am_normal || (flags & ERTS_XSIG_FLG_NO_IGN_NORMAL)) { #ifdef ERTS_SMP - if (!ERTS_PROC_PENDING_EXIT(rp) && !ERTS_PROC_IS_EXITING(rp)) { - ASSERT(rp->status != P_EXITING); - ASSERT(rp->status != P_FREE); + if (!(state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))) { ASSERT(!rp->pending_exit.bp); if (rp == c_p && (*rp_locks & ERTS_PROC_LOCK_MAIN)) { @@ -8299,9 +8192,9 @@ send_exit_signal(Process *c_p, /* current process if and only } *rp_locks = ERTS_PROC_LOCKS_ALL; } - set_proc_exiting(c_p, rsn, NULL); + set_proc_exiting(c_p, state, rsn, NULL); } - else if (!(rp->status_flags & ERTS_PROC_SFLG_RUNNING)) { + else if (!(state & ERTS_PSFLG_RUNNING)) { /* Process not running ... */ ErtsProcLocks need_locks = ~(*rp_locks) & ERTS_PROC_LOCKS_ALL; if (need_locks @@ -8318,6 +8211,7 @@ send_exit_signal(Process *c_p, /* current process if and only /* ...and we have all locks on it... */ *rp_locks = ERTS_PROC_LOCKS_ALL; set_proc_exiting(rp, + state, (is_immed(rsn) ? rsn : copy_object(rsn, rp)), @@ -8347,13 +8241,9 @@ send_exit_signal(Process *c_p, /* current process if and only &bp->off_heap); rp->pending_exit.bp = bp; } - erts_atomic32_read_bor_relb(&rp->aflags, - ERTS_PROC_AFLG_PENDING_EXIT); - ASSERT(ERTS_PROC_PENDING_EXIT(rp)); + erts_smp_atomic32_read_bor_relb(&rp->state, + ERTS_PSFLG_PENDING_EXIT); } - if (!(rp->status_flags - & (ERTS_PROC_SFLG_INRUNQ|ERTS_PROC_SFLG_RUNNING))) - erts_add_to_runq(rp); } /* else: * @@ -8365,18 +8255,15 @@ send_exit_signal(Process *c_p, /* current process if and only * exit or by itself before seeing the pending exit. */ #else /* !ERTS_SMP */ - if (c_p == rp) { - rp->status = P_EXITING; - c_p->fvalue = rsn; - } - else if (rp->status != P_EXITING) { /* No recursive process exits /PaN */ - Eterm old_status = rp->status; + erts_aint32_t state = erts_smp_atomic32_read_nob(&rp->state); + if (!(state & ERTS_PSFLG_EXITING)) { set_proc_exiting(rp, - is_immed(rsn) ? rsn : copy_object(rsn, rp), + state, + (is_immed(rsn) || c_p == rp + ? rsn + : copy_object(rsn, rp)), NULL); ACTIVATE(rp); - if (old_status != P_RUNABLE && old_status != P_RUNNING) - erts_add_to_runq(rp); } #endif return -1; /* Receiver will exit */ @@ -8688,22 +8575,14 @@ proc_dec_refc(void *vproc) #endif -static void -continue_exit_process(Process *p -#ifdef ERTS_SMP - , erts_pix_lock_t *pix_lock -#endif - ); - /* this function fishishes a process and propagates exit messages - called by process_main when a process dies */ void erts_do_exit_process(Process* p, Eterm reason) { #ifdef ERTS_SMP - erts_pix_lock_t *pix_lock = ERTS_PID2PIXLOCK(p->id); + erts_aint32_t state; #endif - p->arity = 0; /* No live registers */ p->fvalue = reason; @@ -8721,27 +8600,17 @@ erts_do_exit_process(Process* p, Eterm reason) #ifdef ERTS_SMP ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); /* By locking all locks (main lock is already locked) when going - to status P_EXITING, it is enough to take any lock when + to exiting state (ERTS_PSFLG_EXITING), it is enough to take any lock when looking up a process (erts_pid2proc()) to prevent the looked up process from exiting until the lock has been released. */ erts_smp_proc_lock(p, ERTS_PROC_LOCKS_ALL_MINOR); #endif - - if (erts_system_profile_flags.runnable_procs && (p->status != P_WAITING)) { - profile_runnable_proc(p, am_inactive); - } - -#ifdef ERTS_SMP - erts_pix_lock(pix_lock); - erts_atomic32_read_bor_relb(&p->aflags, ERTS_PROC_AFLG_EXITING); -#endif - - p->status = P_EXITING; -#ifdef ERTS_SMP - erts_pix_unlock(pix_lock); - - if (ERTS_PROC_PENDING_EXIT(p)) { +#ifndef ERTS_SMP + set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state)); +#else + state = set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state)); + if (state & ERTS_PSFLG_PENDING_EXIT) { /* Process exited before pending exit was received... */ p->pending_exit.reason = THE_NON_VALUE; if (p->pending_exit.bp) { @@ -8780,29 +8649,11 @@ erts_do_exit_process(Process* p, Eterm reason) erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL_MINOR); -#ifdef ERTS_SMP - continue_exit_process(p, pix_lock); -#else - continue_exit_process(p); -#endif + erts_continue_exit_process(p); } void -erts_continue_exit_process(Process *c_p) -{ -#ifdef ERTS_SMP - continue_exit_process(c_p, ERTS_PID2PIXLOCK(c_p->id)); -#else - continue_exit_process(c_p); -#endif -} - -static void -continue_exit_process(Process *p -#ifdef ERTS_SMP - , erts_pix_lock_t *pix_lock -#endif - ) +erts_continue_exit_process(Process *p) { ErtsLink* lnk; ErtsMonitor *mon; @@ -8818,11 +8669,7 @@ continue_exit_process(Process *p ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(p)); -#ifdef DEBUG - erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); - ASSERT(p->status == P_EXITING); - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); -#endif + ASSERT(ERTS_PROC_IS_EXITING(p)); #ifdef ERTS_SMP if (p->flags & F_HAVE_BLCKD_MSCHED) { @@ -8914,7 +8761,6 @@ continue_exit_process(Process *p p->scheduler_data->current_process = NULL; p->scheduler_data->free_process = p; - p->status_flags = 0; #endif /* Time of death! */ erts_smp_atomic_set_relb(&erts_proc.tab[pix], ERTS_AINT_NULL); @@ -8946,7 +8792,21 @@ continue_exit_process(Process *p lnk = p->nlinks; p->nlinks = NULL; - p->status = P_FREE; + + { + /* Inactivate and notify free */ + erts_aint32_t n, e, a = erts_smp_atomic32_read_nob(&p->state); + while (1) { + n = e = a; + ASSERT(a & ERTS_PSFLG_EXITING); + n |= ERTS_PSFLG_FREE; + n &= ~ERTS_PSFLG_ACTIVE; + a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); + if (a == e) + break; + } + } + dep = ((p->flags & F_DISTRIBUTION) ? ERTS_PROC_SET_DIST_ENTRY(p, ERTS_PROC_LOCKS_ALL, NULL) : NULL); @@ -9032,8 +8892,6 @@ continue_exit_process(Process *p ERTS_SMP_LC_ASSERT(curr_locks == erts_proc_lc_my_proc_locks(p)); ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN & curr_locks); - ASSERT(p->status == P_EXITING); - p->i = (BeamInstr *) beam_continue_exit; if (!(curr_locks & ERTS_PROC_LOCK_STATUS)) { @@ -9041,8 +8899,6 @@ continue_exit_process(Process *p curr_locks |= ERTS_PROC_LOCK_STATUS; } - erts_add_to_runq(p); - if (curr_locks != ERTS_PROC_LOCK_MAIN) erts_smp_proc_unlock(p, ~ERTS_PROC_LOCK_MAIN & curr_locks); @@ -9054,33 +8910,15 @@ continue_exit_process(Process *p static void timeout_proc(Process* p) { + erts_aint32_t state; BeamInstr** pi = (BeamInstr **) p->def_arg_reg; p->i = *pi; p->flags |= F_TIMO; p->flags &= ~F_INSLPQUEUE; - switch (p->status) { - case P_GARBING: - switch (p->gcstatus) { - case P_SUSPENDED: - goto suspended; - case P_WAITING: - goto waiting; - default: - break; - } - break; - case P_WAITING: - waiting: - erts_add_to_runq(p); - break; - case P_SUSPENDED: - suspended: - p->rstatus = P_RUNABLE; /* MUST set resume status to runnable */ - break; - default: - break; - } + state = erts_smp_atomic32_read_acqb(&p->state); + if (!(state & ERTS_PSFLG_ACTIVE)) + schedule_process(p, state, 0); } @@ -9148,6 +8986,7 @@ erts_stack_dump(int to, void *to_arg, Process *p) void erts_program_counter_info(int to, void *to_arg, Process *p) { + erts_aint32_t state; int i; erts_print(to, to_arg, "Program counter: %p (", p->i); @@ -9156,7 +8995,8 @@ erts_program_counter_info(int to, void *to_arg, Process *p) erts_print(to, to_arg, "CP: %p (", p->cp); print_function_from_pc(to, to_arg, p->cp); erts_print(to, to_arg, ")\n"); - if (!((p->status == P_RUNNING) || (p->status == P_GARBING))) { + state = erts_smp_atomic32_read_acqb(&p->state); + if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_GC))) { erts_print(to, to_arg, "arity = %d\n",p->arity); if (!ERTS_IS_CRASH_DUMPING) { /* diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index d40f50c0af..249f29e7e6 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -112,28 +112,29 @@ extern int erts_sched_thread_suggested_stack_size; #define PRIORITY_NORMAL 2 #define PRIORITY_LOW 3 #define ERTS_NO_PROC_PRIO_LEVELS 4 +#define ERTS_NO_PROC_PRIO_QUEUES 3 #define ERTS_PORT_PRIO_LEVEL ERTS_NO_PROC_PRIO_LEVELS +#define ERTS_NO_PRIO_LEVELS (ERTS_NO_PROC_PRIO_LEVELS + 1) #define ERTS_RUNQ_FLGS_PROCS_QMASK \ ((((Uint32) 1) << ERTS_NO_PROC_PRIO_LEVELS) - 1) -#define ERTS_NO_PRIO_LEVELS (ERTS_NO_PROC_PRIO_LEVELS + 1) -#define ERTS_RUNQ_FLGS_MIGRATE_QMASK \ +#define ERTS_RUNQ_FLGS_QMASK \ ((((Uint32) 1) << ERTS_NO_PRIO_LEVELS) - 1) #define ERTS_RUNQ_FLGS_EMIGRATE_SHFT \ - ERTS_NO_PROC_PRIO_LEVELS + ERTS_NO_PRIO_LEVELS #define ERTS_RUNQ_FLGS_IMMIGRATE_SHFT \ (ERTS_RUNQ_FLGS_EMIGRATE_SHFT + ERTS_NO_PRIO_LEVELS) #define ERTS_RUNQ_FLGS_EVACUATE_SHFT \ (ERTS_RUNQ_FLGS_IMMIGRATE_SHFT + ERTS_NO_PRIO_LEVELS) #define ERTS_RUNQ_FLGS_EMIGRATE_QMASK \ - (ERTS_RUNQ_FLGS_MIGRATE_QMASK << ERTS_RUNQ_FLGS_EMIGRATE_SHFT) + (ERTS_RUNQ_FLGS_QMASK << ERTS_RUNQ_FLGS_EMIGRATE_SHFT) #define ERTS_RUNQ_FLGS_IMMIGRATE_QMASK \ - (ERTS_RUNQ_FLGS_MIGRATE_QMASK << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT) + (ERTS_RUNQ_FLGS_QMASK << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT) #define ERTS_RUNQ_FLGS_EVACUATE_QMASK \ - (ERTS_RUNQ_FLGS_MIGRATE_QMASK << ERTS_RUNQ_FLGS_EVACUATE_SHFT) + (ERTS_RUNQ_FLGS_QMASK << ERTS_RUNQ_FLGS_EVACUATE_SHFT) #define ERTS_RUNQ_FLG_BASE2 \ (ERTS_RUNQ_FLGS_EVACUATE_SHFT + ERTS_NO_PRIO_LEVELS) @@ -148,14 +149,18 @@ extern int erts_sched_thread_suggested_stack_size; (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 3)) #define ERTS_RUNQ_FLG_INACTIVE \ (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 4)) +#define ERTS_RUNQ_FLG_NONEMPTY \ + (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 5)) +#define ERTS_RUNQ_FLG_PROTECTED \ + (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 6)) #define ERTS_RUNQ_FLGS_MIGRATION_QMASKS \ (ERTS_RUNQ_FLGS_EMIGRATE_QMASK \ | ERTS_RUNQ_FLGS_IMMIGRATE_QMASK \ | ERTS_RUNQ_FLGS_EVACUATE_QMASK) + #define ERTS_RUNQ_FLGS_MIGRATION_INFO \ - (ERTS_RUNQ_FLGS_MIGRATION_QMASKS \ - | ERTS_RUNQ_FLG_INACTIVE \ + (ERTS_RUNQ_FLG_INACTIVE \ | ERTS_RUNQ_FLG_OUT_OF_WORK \ | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK) @@ -186,33 +191,45 @@ extern int erts_sched_thread_suggested_stack_size; #define ERTS_UNSET_RUNQ_FLG_EVACUATE(FLGS, PRIO) \ ((FLGS) &= ~ERTS_RUNQ_FLG_EVACUATE((PRIO))) -#define ERTS_RUNQ_IFLG_SUSPENDED (((erts_aint32_t) 1) << 0) -#define ERTS_RUNQ_IFLG_NONEMPTY (((erts_aint32_t) 1) << 1) - - -#ifdef DEBUG -# if defined(ARCH_64) && !HALFWORD_HEAP -# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) \ - (*((char **) &(RQP)) = (char *) (0xdeadbeefdead0003 | ((N) << 4))) -# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) \ -do { \ - ASSERT((RQP) != NULL); \ - ASSERT(((((Uint) (RQP)) & ((Uint) 0x3))) == ((Uint) 0)); \ - ASSERT((((Uint) (RQP)) & ~((Uint) 0xffff)) != ((Uint) 0xdeadbeefdead0000));\ -} while (0) -# else -# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) \ - (*((char **) &(RQP)) = (char *) (0xdead0003 | ((N) << 4))) -# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) \ -do { \ - ASSERT((RQP) != NULL); \ - ASSERT(((((UWord) (RQP)) & ((UWord) 1))) == ((UWord) 0)); \ - ASSERT((((UWord) (RQP)) & ~((UWord) 0xffff)) != ((UWord) 0xdead0000)); \ -} while (0) -# endif -#else -# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) -# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) +#define ERTS_RUNQ_FLGS_INIT(RQ, INIT) \ + erts_smp_atomic32_init_nob(&(RQ)->flags, (erts_aint32_t) (INIT)) +#define ERTS_RUNQ_FLGS_SET(RQ, FLGS) \ + ((Uint32) erts_smp_atomic32_read_bor_relb(&(RQ)->flags, \ + (erts_aint32_t) (FLGS))) +#define ERTS_RUNQ_FLGS_UNSET(RQ, FLGS) \ + ((Uint32) erts_smp_atomic32_read_band_relb(&(RQ)->flags, \ + (erts_aint32_t) ~(FLGS))) +#define ERTS_RUNQ_FLGS_GET(RQ) \ + ((Uint32) erts_smp_atomic32_read_acqb(&(RQ)->flags)) +#define ERTS_RUNQ_FLGS_GET_NOB(RQ) \ + ((Uint32) erts_smp_atomic32_read_nob(&(RQ)->flags)) +#define ERTS_RUNQ_FLGS_GET_MB(RQ) \ + ((Uint32) erts_smp_atomic32_read_mb(&(RQ)->flags)) +#define ERTS_RUNQ_FLGS_MASK_SET(RQ, MSK, FLGS) \ + ((Uint32) erts_smp_atomic32_mask_set_relb(&(RQ)->flags, \ + (erts_aint32_t) (MSK), \ + (erts_aint32_t) (FLGS))) + +ERTS_GLB_INLINE erts_aint32_t +erts_smp_atomic32_mask_set_relb(erts_smp_atomic32_t *a32p, + erts_aint32_t mask, + erts_aint32_t set); +#if ERTS_GLB_INLINE_INCL_FUNC_DEF +ERTS_GLB_INLINE erts_aint32_t +erts_smp_atomic32_mask_set_relb(erts_smp_atomic32_t *a32p, + erts_aint32_t mask, + erts_aint32_t set) +{ + erts_aint32_t act = erts_smp_atomic32_read_nob(a32p); + while (1) { + erts_aint32_t exp = act; + erts_aint32_t new = exp & ~mask; + new |= (mask & set); + act = erts_smp_atomic32_cmpxchg_relb(a32p, new, exp); + if (act == exp) + return act; + } +} #endif typedef enum { @@ -311,21 +328,39 @@ typedef struct ErtsSchedulerData_ ErtsSchedulerData; typedef struct ErtsRunQueue_ ErtsRunQueue; typedef struct { - int len; - int max_len; + erts_smp_atomic32_t len; + erts_aint32_t max_len; int reds; +} ErtsRunQueueInfo; + +#ifdef ERTS_SMP + +typedef struct { + Uint32 flags; + ErtsRunQueue *misc_evac_runq; struct { struct { int this; int other; } limit; ErtsRunQueue *runq; - } migrate; -} ErtsRunQueueInfo; + Uint32 flags; + } prio[ERTS_NO_PRIO_LEVELS]; +} ErtsMigrationPath; + +typedef struct ErtsMigrationPaths_ ErtsMigrationPaths; + +struct ErtsMigrationPaths_ { + void *block; + ErtsMigrationPaths *next; + ErtsThrPrgrVal thr_prgr; + ErtsMigrationPath mpath[1]; +}; + +#endif /* ERTS_SMP */ struct ErtsRunQueue_ { int ix; - erts_smp_atomic32_t info_flags; erts_smp_mtx_t mtx; erts_smp_cnd_t cnd; @@ -333,19 +368,18 @@ struct ErtsRunQueue_ { ErtsSchedulerData *scheduler; int waiting; /* < 0 in sys schedule; > 0 on cnd variable */ int woken; - Uint32 flags; + erts_smp_atomic32_t flags; int check_balance_reds; int full_reds_history_sum; int full_reds_history[ERTS_FULL_REDS_HISTORY_SIZE]; int out_of_work_count; - int max_len; - int len; + erts_aint32_t max_len; + erts_aint32_t len; int wakeup_other; int wakeup_other_reds; int halt_in_progress; struct { - int len; ErtsProcList *pending_exiters; Uint context_switches; Uint reductions; @@ -360,7 +394,7 @@ struct ErtsRunQueue_ { struct { ErtsMiscOpList *start; ErtsMiscOpList *end; - ErtsRunQueue *evac_runq; + erts_smp_atomic_t evac_runq; } misc; struct { @@ -380,7 +414,7 @@ extern ErtsAlignedRunQueue *erts_aligned_run_queues; #define ERTS_PROC_REDUCTIONS_EXECUTED(RQ, PRIO, REDS, AREDS) \ do { \ (RQ)->procs.reductions += (AREDS); \ - (RQ)->procs.prio_info[p->prio].reds += (REDS); \ + (RQ)->procs.prio_info[(PRIO)].reds += (REDS); \ (RQ)->check_balance_reds -= (REDS); \ (RQ)->wakeup_other_reds += (AREDS); \ } while (0) @@ -488,6 +522,90 @@ extern ErtsAlignedSchedulerData *erts_aligned_scheduler_data; extern ErtsSchedulerData *erts_scheduler_data; #endif +#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK) +int erts_smp_lc_runq_is_locked(ErtsRunQueue *); +#endif + +#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS + +/* + * Run queue locked during modifications. We use atomic ops since + * other threads peek at values without run queue lock. + */ + +ERTS_GLB_INLINE void erts_smp_inc_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio); +ERTS_GLB_INLINE void erts_smp_dec_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio); +ERTS_GLB_INLINE void erts_smp_reset_max_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi); + +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_INLINE void +erts_smp_inc_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio) +{ + erts_aint32_t len; + + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); + + len = erts_smp_atomic32_read_nob(&rqi->len); + ASSERT(len >= 0); + if (len == 0) { + ASSERT((erts_smp_atomic32_read_nob(&rq->flags) + & ((erts_aint32_t) (1 << prio))) == 0); + erts_smp_atomic32_read_bor_nob(&rq->flags, + (erts_aint32_t) (1 << prio)); + } + len++; + if (rqi->max_len < len) + rqi->max_len = len; + + erts_smp_atomic32_set_relb(&rqi->len, len); + + rq->len++; + if (rq->max_len < rq->len) + rq->max_len = len; + ASSERT(rq->len > 0); +} + +ERTS_GLB_INLINE void +erts_smp_dec_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio) +{ + erts_aint32_t len; + + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); + + len = erts_smp_atomic32_read_nob(&rqi->len); + len--; + ASSERT(len >= 0); + if (len == 0) { + ASSERT((erts_smp_atomic32_read_nob(&rq->flags) + & ((erts_aint32_t) (1 << prio)))); + erts_smp_atomic32_read_band_nob(&rq->flags, + ~((erts_aint32_t) (1 << prio))); + } + erts_smp_atomic32_set_relb(&rqi->len, len); + + rq->len--; + ASSERT(rq->len >= 0); +} + +ERTS_GLB_INLINE void +erts_smp_reset_max_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi) +{ + erts_aint32_t len; + + ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); + + len = erts_smp_atomic32_read_nob(&rqi->len); + ASSERT(rqi->max_len >= len); + rqi->max_len = len; +} + +#endif /* ERTS_GLB_INLINE_INCL_FUNC_DEF */ + +#define RUNQ_READ_LEN(X) erts_smp_atomic32_read_nob((X)) + +#endif /* ERTS_INCLUDE_SCHEDULER_INTERNALS */ + /* * Process Specific Data. * @@ -644,12 +762,9 @@ struct process { * Number of reductions left to execute. * Only valid for the current process. */ - Uint32 status; /* process STATE */ - Uint32 gcstatus; /* process gc STATE */ - Uint32 rstatus; /* process resume STATE */ Uint32 rcount; /* suspend count */ int prio; /* Priority of process */ - int skipped; /* Times a low prio process has been rescheduled */ + int schedule_count; /* Times left to reschedule a low prio process */ Uint reds; /* No of reductions for this process */ Eterm tracer_proc; /* If proc is traced, this is the tracer (can NOT be boxed) */ @@ -662,7 +777,6 @@ struct process { Eterm ftrace; /* Latest exception stack trace dump */ Process *next; /* Pointer to next process in run queue */ - Process *prev; /* Pointer to prev process in run queue */ struct reg_proc *reg; /* NULL iff not registered */ ErtsLink *nlinks; @@ -732,19 +846,16 @@ struct process { void *exit_data; /* Misc data referred during termination */ } u; - ErtsRunQueue *bound_runq; + erts_smp_atomic32_t state; /* Process state flags (see ERTS_PSFLG_*) */ #ifdef ERTS_SMP erts_proc_lock_t lock; - erts_atomic32_t aflags; ErtsSchedulerData *scheduler_data; - Uint32 runq_flags; - Uint32 status_flags; ErlMessageInQueue msg_inq; Eterm suspendee; ErtsPendingSuspend *pending_suspenders; ErtsPendExit pending_exit; - ErtsRunQueue *run_queue; + erts_smp_atomic_t run_queue; #ifdef HIPE struct hipe_process_state_smp hipe_smp; #endif @@ -816,8 +927,27 @@ void erts_check_for_holes(Process* p); #define SEQ_TRACE_TOKEN(p) ((p)->seq_trace_token) -#define ERTS_PROC_AFLG_EXITING (((erts_aint_t) 1) << 0) -#define ERTS_PROC_AFLG_PENDING_EXIT (((erts_aint_t) 1) << 1) +#if ERTS_NO_PROC_PRIO_LEVELS > 4 +# error "Need to increase ERTS_PSFLG_PRIO_SHIFT" +#endif + +#define ERTS_PSFLG_PRIO_SHIFT 2 + +#define ERTS_PSFLG_BIT(N) \ + (((erts_aint32_t) 1) << (ERTS_PSFLG_PRIO_SHIFT + (N))) + +#define ERTS_PSFLG_PRIO_MASK (ERTS_PSFLG_BIT(0) - 1) + +#define ERTS_PSFLG_FREE ERTS_PSFLG_BIT(0) +#define ERTS_PSFLG_EXITING ERTS_PSFLG_BIT(1) +#define ERTS_PSFLG_PENDING_EXIT ERTS_PSFLG_BIT(2) +#define ERTS_PSFLG_ACTIVE ERTS_PSFLG_BIT(3) +#define ERTS_PSFLG_IN_RUNQ ERTS_PSFLG_BIT(4) +#define ERTS_PSFLG_RUNNING ERTS_PSFLG_BIT(5) +#define ERTS_PSFLG_SUSPENDED ERTS_PSFLG_BIT(6) +#define ERTS_PSFLG_GC ERTS_PSFLG_BIT(7) +#define ERTS_PSFLG_BOUND ERTS_PSFLG_BIT(8) +#define ERTS_PSFLG_TRAP_EXIT ERTS_PSFLG_BIT(9) /* The sequential tracing token is a tuple of size 5: @@ -925,17 +1055,13 @@ struct erts_system_profile_flags_t { unsigned int exclusive : 1; }; extern struct erts_system_profile_flags_t erts_system_profile_flags; - -#define INVALID_PID(p, pid) ((p) == NULL \ - || (p)->id != (pid) \ - || (p)->status == P_EXITING) #define IS_TRACED(p) ( (p)->tracer_proc != NIL ) #define ARE_TRACE_FLAGS_ON(p,tf) ( ((p)->trace_flags & (tf|F_SENSITIVE)) == (tf) ) #define IS_TRACED_FL(p,tf) ( IS_TRACED(p) && ARE_TRACE_FLAGS_ON(p,tf) ) /* process flags */ -#define F_TRAPEXIT (1 << 0) +#define F_HIBERNATE_SCHED (1 << 0) /* Schedule out after hibernate op */ #define F_INSLPQUEUE (1 << 1) /* Set if in timer queue */ #define F_TIMO (1 << 2) /* Set if timeout */ #define F_HEAP_GROW (1 << 3) @@ -946,7 +1072,6 @@ extern struct erts_system_profile_flags_t erts_system_profile_flags; #define F_HAVE_BLCKD_MSCHED (1 << 8) /* Process has blocked multi-scheduling */ #define F_P2PNR_RESCHED (1 << 9) /* Process has been rescheduled via erts_pid2proc_not_running() */ #define F_FORCE_GC (1 << 10) /* Force gc at process in-scheduling */ -#define F_HIBERNATE_SCHED (1 << 11) /* Schedule out after hibernate op */ /* process trace_flags */ #define F_SENSITIVE (1 << 0) @@ -1013,67 +1138,10 @@ extern struct erts_system_profile_flags_t erts_system_profile_flags; #define DT_UTAG_FLAGS(P) ((P)->dt_utag_flags) #endif - -#ifdef ERTS_SMP -/* Status flags ... */ -#define ERTS_PROC_SFLG_PENDADD2SCHEDQ (((Uint32) 1) << 0) /* Pending - add to - schedule q */ -#define ERTS_PROC_SFLG_INRUNQ (((Uint32) 1) << 1) /* Process is - in run q */ -#define ERTS_PROC_SFLG_TRAPEXIT (((Uint32) 1) << 2) /* Process is - trapping - exit */ -#define ERTS_PROC_SFLG_RUNNING (((Uint32) 1) << 3) /* Process is - running */ -/* Scheduler flags in process struct... */ -#define ERTS_PROC_RUNQ_FLG_RUNNING (((Uint32) 1) << 0) /* Process is - running */ - -#endif - - -#ifdef ERTS_SMP -#define ERTS_PROC_IS_TRAPPING_EXITS(P) \ - (ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks((P)) \ - & ERTS_PROC_LOCK_STATUS), \ - (P)->status_flags & ERTS_PROC_SFLG_TRAPEXIT) - -#define ERTS_PROC_SET_TRAP_EXIT(P) \ - (ERTS_SMP_LC_ASSERT(((ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS) \ - & erts_proc_lc_my_proc_locks((P))) \ - == (ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS)), \ - (P)->status_flags |= ERTS_PROC_SFLG_TRAPEXIT, \ - (P)->flags |= F_TRAPEXIT, \ - 1) - -#define ERTS_PROC_UNSET_TRAP_EXIT(P) \ - (ERTS_SMP_LC_ASSERT(((ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS) \ - & erts_proc_lc_my_proc_locks((P))) \ - == (ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS)), \ - (P)->status_flags &= ~ERTS_PROC_SFLG_TRAPEXIT, \ - (P)->flags &= ~F_TRAPEXIT, \ - 0) -#else -#define ERTS_PROC_IS_TRAPPING_EXITS(P) ((P)->flags & F_TRAPEXIT) -#define ERTS_PROC_SET_TRAP_EXIT(P) ((P)->flags |= F_TRAPEXIT, 1) -#define ERTS_PROC_UNSET_TRAP_EXIT(P) ((P)->flags &= ~F_TRAPEXIT, 0) -#endif - /* Option flags to erts_send_exit_signal() */ #define ERTS_XSIG_FLG_IGN_KILL (((Uint32) 1) << 0) #define ERTS_XSIG_FLG_NO_IGN_NORMAL (((Uint32) 1) << 1) - -/* Process status values */ -#define P_FREE 0 -#define P_RUNABLE 1 -#define P_WAITING 2 -#define P_RUNNING 3 -#define P_EXITING 4 -#define P_GARBING 5 -#define P_SUSPENDED 6 - #define CANCEL_TIMER(p) \ do { \ if ((p)->flags & (F_INSLPQUEUE)) \ @@ -1151,14 +1219,6 @@ Eterm erts_get_schedulers_binds(Process *c_p); Eterm erts_set_cpu_topology(Process *c_p, Eterm term); Eterm erts_bind_schedulers(Process *c_p, Eterm how); ErtsRunQueue *erts_schedid2runq(Uint); -#ifdef ERTS_SMP -ErtsMigrateResult erts_proc_migrate(Process *, - ErtsProcLocks *, - ErtsRunQueue *, - int *, - ErtsRunQueue *, - int *); -#endif Process *schedule(Process*, int); void erts_schedule_misc_op(void (*)(void *), void *); Eterm erl_create_process(Process*, Eterm, Eterm, Eterm, ErlSpawnOpts*); @@ -1207,8 +1267,7 @@ int erts_send_exit_signal(Process *, #ifdef ERTS_SMP void erts_handle_pending_exit(Process *, ErtsProcLocks); #define ERTS_PROC_PENDING_EXIT(P) \ - (ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks((P)) & ERTS_PROC_LOCK_STATUS),\ - (P)->pending_exit.reason != THE_NON_VALUE) + (ERTS_PSFLG_PENDING_EXIT & erts_smp_atomic32_read_acqb(&(P)->state)) #else #define ERTS_PROC_PENDING_EXIT(P) 0 #endif @@ -1260,13 +1319,26 @@ ErtsSchedulerData *erts_get_scheduler_data(void) #endif #endif +void erts_schedule_process(Process *, erts_aint32_t); + +ERTS_GLB_INLINE void erts_proc_notify_new_message(Process *p); +#if ERTS_GLB_INLINE_INCL_FUNC_DEF +ERTS_GLB_INLINE void +erts_proc_notify_new_message(Process *p) +{ + /* No barrier needed, due to msg lock */ + erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); + if (!(state & ERTS_PSFLG_ACTIVE)) + erts_schedule_process(p, state); +} +#endif + #if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK) #define ERTS_PROCESS_LOCK_ONLY_LOCK_CHECK_PROTO__ #include "erl_process_lock.h" #undef ERTS_PROCESS_LOCK_ONLY_LOCK_CHECK_PROTO__ -int erts_smp_lc_runq_is_locked(ErtsRunQueue *); #define ERTS_SMP_LC_CHK_RUNQ_LOCK(RQ, L) \ do { \ if ((L)) \ @@ -1392,13 +1464,91 @@ erts_proc_set_error_handler(Process *p, ErtsProcLocks plocks, Eterm handler) #endif +#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS + #ifdef ERTS_SMP -ErtsRunQueue *erts_prepare_emigrate(ErtsRunQueue *c_rq, - ErtsRunQueueInfo *c_rqi, - int prio); +#include "erl_thr_progress.h" + +extern erts_atomic_t erts_migration_paths; + +ERTS_GLB_INLINE ErtsMigrationPaths *erts_get_migration_paths_managed(void); +ERTS_GLB_INLINE ErtsMigrationPaths *erts_get_migration_paths(void); ERTS_GLB_INLINE ErtsRunQueue *erts_check_emigration_need(ErtsRunQueue *c_rq, int prio); +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_INLINE ErtsMigrationPaths * +erts_get_migration_paths_managed(void) +{ + return (ErtsMigrationPaths *) erts_atomic_read_ddrb(&erts_migration_paths); +} + +ERTS_GLB_INLINE ErtsMigrationPaths * +erts_get_migration_paths(void) +{ + if (erts_thr_progress_is_managed_thread()) + return erts_get_migration_paths_managed(); + else + return NULL; +} + +ERTS_GLB_INLINE ErtsRunQueue * +erts_check_emigration_need(ErtsRunQueue *c_rq, int prio) +{ + ErtsMigrationPaths *mps = erts_get_migration_paths(); + ErtsMigrationPath *mp; + Uint32 flags; + + if (!mps) + return NULL; + + mp = &mps->mpath[c_rq->ix]; + flags = mp->flags; + + if (ERTS_CHK_RUNQ_FLG_EMIGRATE(flags, prio)) { + int len; + + if (ERTS_CHK_RUNQ_FLG_EVACUATE(flags, prio)) { + /* force emigration */ + return mp->prio[prio].runq; + } + + if (flags & ERTS_RUNQ_FLG_INACTIVE) { + /* + * Run queue was inactive at last balance. Verify that + * it still is before forcing emigration. + */ + if (ERTS_RUNQ_FLGS_GET(c_rq) & ERTS_RUNQ_FLG_INACTIVE) + return mp->prio[prio].runq; + } + + + if (prio == ERTS_PORT_PRIO_LEVEL) + len = RUNQ_READ_LEN(&c_rq->ports.info.len); + else + len = RUNQ_READ_LEN(&c_rq->procs.prio_info[prio].len); + + if (len > mp->prio[prio].limit.this) { + ErtsRunQueue *n_rq = mp->prio[prio].runq; + if (n_rq) { + if (prio == ERTS_PORT_PRIO_LEVEL) + len = RUNQ_READ_LEN(&n_rq->ports.info.len); + else + len = RUNQ_READ_LEN(&n_rq->procs.prio_info[prio].len); + + if (len < mp->prio[prio].limit.other) + return n_rq; + } + } + } + return NULL; +} + +#endif + +#endif + #endif ERTS_GLB_INLINE int erts_is_scheduler_bound(ErtsSchedulerData *esdp); @@ -1417,29 +1567,6 @@ ERTS_GLB_INLINE void erts_smp_runqs_unlock(ErtsRunQueue *rq1, ErtsRunQueue *rq2) #if ERTS_GLB_INLINE_INCL_FUNC_DEF -#ifdef ERTS_SMP -ERTS_GLB_INLINE ErtsRunQueue * -erts_check_emigration_need(ErtsRunQueue *c_rq, int prio) -{ - ErtsRunQueueInfo *c_rqi; - - if (!ERTS_CHK_RUNQ_FLG_EMIGRATE(c_rq->flags, prio)) - return NULL; - - if (prio == ERTS_PORT_PRIO_LEVEL) - c_rqi = &c_rq->ports.info; - else - c_rqi = &c_rq->procs.prio_info[prio]; - - if (!ERTS_CHK_RUNQ_FLG_EVACUATE(c_rq->flags, prio) - && !(c_rq->flags & ERTS_RUNQ_FLG_INACTIVE) - && c_rqi->len <= c_rqi->migrate.limit.this) - return NULL; - - return erts_prepare_emigrate(c_rq, c_rqi, prio); -} -#endif - ERTS_GLB_INLINE int erts_is_scheduler_bound(ErtsSchedulerData *esdp) { @@ -1477,10 +1604,9 @@ Uint erts_get_scheduler_id(void) ERTS_GLB_INLINE ErtsRunQueue * erts_get_runq_proc(Process *p) { - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); #ifdef ERTS_SMP - ASSERT(p->run_queue); - return p->run_queue; + ASSERT(ERTS_AINT_NULL != erts_atomic_read_nob(&p->run_queue)); + return (ErtsRunQueue *) erts_atomic_read_nob(&p->run_queue); #else return ERTS_RUNQ_IX(0); #endif @@ -1614,10 +1740,6 @@ Process *erts_pid2proc_nropt(Process *c_p, ErtsProcLocks pid_locks); extern int erts_disable_proc_not_running_opt; - -#define ERTS_PROC_IS_EXITING(P) \ - (ERTS_PROC_AFLG_EXITING & erts_atomic32_read_acqb(&(P)->aflags)) - #ifdef DEBUG #define ERTS_SMP_ASSERT_IS_NOT_EXITING(P) \ do { ASSERT(!ERTS_PROC_IS_EXITING((P))); } while (0) @@ -1627,8 +1749,6 @@ extern int erts_disable_proc_not_running_opt; #else /* !ERTS_SMP */ -#define ERTS_PROC_IS_EXITING(P) ((P)->status == P_EXITING) - #define ERTS_SMP_ASSERT_IS_NOT_EXITING(P) #define erts_pid2proc_not_running erts_pid2proc @@ -1636,6 +1756,10 @@ extern int erts_disable_proc_not_running_opt; #endif +#define ERTS_PROC_IS_EXITING(P) \ + (ERTS_PSFLG_EXITING & erts_smp_atomic32_read_acqb(&(P)->state)) + + /* Minimum NUMBER of processes for a small system to start */ #ifdef ERTS_SMP #define ERTS_MIN_PROCESSES ERTS_NO_OF_PIX_LOCKS diff --git a/erts/emulator/beam/erl_process_dump.c b/erts/emulator/beam/erl_process_dump.c index 37cc717156..964dc1ae3e 100644 --- a/erts/emulator/beam/erl_process_dump.c +++ b/erts/emulator/beam/erl_process_dump.c @@ -67,11 +67,9 @@ erts_deep_process_dump(int to, void *to_arg) for (i = 0; i < erts_max_processes; i++) { Process *p = erts_pix2proc(i); if (p && p->i != ENULL) { - if (p->status != P_EXITING) { - if (p->status != P_GARBING) { - dump_process_info(to, to_arg, p); - } - } + erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); + if (!(state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_GC))) + dump_process_info(to, to_arg, p); } } diff --git a/erts/emulator/beam/erl_process_lock.h b/erts/emulator/beam/erl_process_lock.h index 9981af7a8a..015fda583e 100644 --- a/erts/emulator/beam/erl_process_lock.h +++ b/erts/emulator/beam/erl_process_lock.h @@ -129,11 +129,9 @@ typedef struct erts_proc_lock_t_ { /* * Status lock: * Protects the following fields in the process structure: - * * status - * * rstatus - * * status_flags * * pending_suspenders * * suspendee + * * ... */ #define ERTS_PROC_LOCK_STATUS (((ErtsProcLocks) 1) << ERTS_PROC_LOCK_MAX_BIT) @@ -165,12 +163,11 @@ typedef struct erts_proc_lock_t_ { * Other rules regarding process locking: * * Exiting processes: - * When changing status to P_EXITING on a process, you are required - * to take all process locks (ERTS_PROC_LOCKS_ALL). Thus, by holding - * at least one process lock (whichever one doesn't matter) you - * are guaranteed that the process won't exit until the lock you are - * holding has been released. Appart from all process locks also - * the pix lock corresponding to the process has to be held. + * When changing state to exiting (ERTS_PSFLG_EXITING) on a process, + * you are required to take all process locks (ERTS_PROC_LOCKS_ALL). + * Thus, by holding at least one process lock (whichever one doesn't + * matter) you are guaranteed that the process won't exit until the + * lock you are holding has been released. * * Lock order: * Process locks with low numeric values has to be locked before @@ -946,11 +943,11 @@ erts_pid2proc_opt(Process *c_p_unused, int flags) { Process *proc = erts_proc_lookup_raw(pid); - if (!(flags & ERTS_P2P_FLG_ALLOW_OTHER_X) - && proc - && proc->status == P_EXITING) - return NULL; - return proc; + return ((!(flags & ERTS_P2P_FLG_ALLOW_OTHER_X) + && proc + && ERTS_PROC_IS_EXITING(proc)) + ? NULL + : proc); } #endif /* !ERTS_SMP */ diff --git a/erts/emulator/beam/erl_trace.c b/erts/emulator/beam/erl_trace.c index a321bc504b..bc988cd61b 100644 --- a/erts/emulator/beam/erl_trace.c +++ b/erts/emulator/beam/erl_trace.c @@ -64,7 +64,7 @@ int erts_cpu_timestamp; #endif static erts_smp_mtx_t smq_mtx; -static erts_smp_mtx_t sys_trace_mtx; +static erts_smp_rwmtx_t sys_trace_rwmtx; enum ErtsSysMsgType { SYS_MSG_TYPE_UNDEFINED, @@ -91,7 +91,12 @@ static void init_sys_msg_dispatcher(void); #endif void erts_init_trace(void) { - erts_smp_mtx_init(&sys_trace_mtx, "sys_tracers"); + erts_smp_rwmtx_opt_t rwmtx_opts = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER; + rwmtx_opts.type = ERTS_SMP_RWMTX_TYPE_EXTREMELY_FREQUENT_READ; + rwmtx_opts.lived = ERTS_SMP_RWMTX_LONG_LIVED; + + erts_smp_rwmtx_init_opt(&sys_trace_rwmtx, &rwmtx_opts, "sys_tracers"); + #ifdef HAVE_ERTS_NOW_CPU erts_cpu_timestamp = 0; #endif @@ -169,10 +174,10 @@ erts_system_profile_setup_active_schedulers(void) active_sched = erts_active_schedulers(); } -void -erts_trace_check_exiting(Eterm exiting) +static void +exiting_reset(Eterm exiting) { - erts_smp_mtx_lock(&sys_trace_mtx); + erts_smp_rwmtx_rwlock(&sys_trace_rwmtx); if (exiting == default_tracer) { default_tracer = NIL; default_trace_flags &= TRACEE_FLAGS; @@ -202,29 +207,49 @@ erts_trace_check_exiting(Eterm exiting) erts_system_profile_clear(NULL); #endif } - erts_smp_mtx_unlock(&sys_trace_mtx); + erts_smp_rwmtx_rwunlock(&sys_trace_rwmtx); +} + +void +erts_trace_check_exiting(Eterm exiting) +{ + int reset = 0; + erts_smp_rwmtx_rlock(&sys_trace_rwmtx); + if (exiting == default_tracer) + reset = 1; + else if (exiting == system_seq_tracer) + reset = 1; + else if (exiting == system_monitor) + reset = 1; + else if (exiting == system_profile) + reset = 1; + erts_smp_rwmtx_runlock(&sys_trace_rwmtx); + if (reset) + exiting_reset(exiting); +} + +static ERTS_INLINE int +is_valid_tracer(Eterm tracer) +{ + return erts_proc_lookup(tracer) || erts_is_valid_tracer_port(tracer); } Eterm erts_set_system_seq_tracer(Process *c_p, ErtsProcLocks c_p_locks, Eterm new) { - Eterm old = THE_NON_VALUE; + Eterm old; - if (new != am_false) { - if (!erts_proc_lookup(new) - && !erts_is_valid_tracer_port(new)) { - return old; - } - } + if (new != am_false && !is_valid_tracer(new)) + return THE_NON_VALUE; - erts_smp_mtx_lock(&sys_trace_mtx); + erts_smp_rwmtx_rwlock(&sys_trace_rwmtx); old = system_seq_tracer; system_seq_tracer = new; #ifdef DEBUG_PRINTOUTS erts_fprintf(stderr, "set seq tracer new=%T old=%T\n", new, old); #endif - erts_smp_mtx_unlock(&sys_trace_mtx); + erts_smp_rwmtx_rwunlock(&sys_trace_rwmtx); return old; } @@ -232,12 +257,12 @@ Eterm erts_get_system_seq_tracer(void) { Eterm st; - erts_smp_mtx_lock(&sys_trace_mtx); + erts_smp_rwmtx_rlock(&sys_trace_rwmtx); st = system_seq_tracer; #ifdef DEBUG_PRINTOUTS erts_fprintf(stderr, "get seq tracer %T\n", st); #endif - erts_smp_mtx_unlock(&sys_trace_mtx); + erts_smp_rwmtx_runlock(&sys_trace_rwmtx); return st; } @@ -270,7 +295,7 @@ get_default_tracing(Uint *flagsp, Eterm *tracerp) void erts_change_default_tracing(int setflags, Uint *flagsp, Eterm *tracerp) { - erts_smp_mtx_lock(&sys_trace_mtx); + erts_smp_rwmtx_rwlock(&sys_trace_rwmtx); if (flagsp) { if (setflags) default_trace_flags |= *flagsp; @@ -280,48 +305,48 @@ erts_change_default_tracing(int setflags, Uint *flagsp, Eterm *tracerp) if (tracerp) default_tracer = *tracerp; get_default_tracing(flagsp, tracerp); - erts_smp_mtx_unlock(&sys_trace_mtx); + erts_smp_rwmtx_rwunlock(&sys_trace_rwmtx); } void erts_get_default_tracing(Uint *flagsp, Eterm *tracerp) { - erts_smp_mtx_lock(&sys_trace_mtx); + erts_smp_rwmtx_rlock(&sys_trace_rwmtx); get_default_tracing(flagsp, tracerp); - erts_smp_mtx_unlock(&sys_trace_mtx); + erts_smp_rwmtx_runlock(&sys_trace_rwmtx); } void erts_set_system_monitor(Eterm monitor) { - erts_smp_mtx_lock(&sys_trace_mtx); + erts_smp_rwmtx_rwlock(&sys_trace_rwmtx); system_monitor = monitor; - erts_smp_mtx_unlock(&sys_trace_mtx); + erts_smp_rwmtx_rwunlock(&sys_trace_rwmtx); } Eterm erts_get_system_monitor(void) { Eterm monitor; - erts_smp_mtx_lock(&sys_trace_mtx); + erts_smp_rwmtx_rlock(&sys_trace_rwmtx); monitor = system_monitor; - erts_smp_mtx_unlock(&sys_trace_mtx); + erts_smp_rwmtx_runlock(&sys_trace_rwmtx); return monitor; } /* Performance monitoring */ void erts_set_system_profile(Eterm profile) { - erts_smp_mtx_lock(&sys_trace_mtx); + erts_smp_rwmtx_rwlock(&sys_trace_rwmtx); system_profile = profile; - erts_smp_mtx_unlock(&sys_trace_mtx); + erts_smp_rwmtx_rwunlock(&sys_trace_rwmtx); } Eterm erts_get_system_profile(void) { Eterm profile; - erts_smp_mtx_lock(&sys_trace_mtx); + erts_smp_rwmtx_rlock(&sys_trace_rwmtx); profile = system_profile; - erts_smp_mtx_unlock(&sys_trace_mtx); + erts_smp_rwmtx_runlock(&sys_trace_rwmtx); return profile; } @@ -384,13 +409,9 @@ WRITE_SYS_MSG_TO_PORT(Eterm unused_to, } #ifndef ERTS_SMP - if (!INVALID_TRACER_PORT(trace_port, trace_port->id)) { + if (!INVALID_TRACER_PORT(trace_port, trace_port->id)) #endif erts_raw_port_command(trace_port, buffer, ptr-buffer); -#ifndef ERTS_SMP - erts_port_release(trace_port); - } -#endif erts_free(ERTS_ALC_T_TMP, (void *) buffer); } @@ -465,13 +486,13 @@ send_to_port(Process *c_p, Eterm message, trace_port = NULL; #else - if (is_not_internal_port(*tracer_pid)) - goto invalid_tracer_port; - trace_port = &erts_port[internal_port_index(*tracer_pid)]; + trace_port = erts_id2port_sflgs(*tracer_pid, + NULL, + 0, + ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP); - if (INVALID_TRACER_PORT(trace_port, *tracer_pid)) { - invalid_tracer_port: + if (!trace_port) { *tracee_flags &= ~TRACEE_FLAGS; *tracer_pid = NIL; return; @@ -491,6 +512,7 @@ send_to_port(Process *c_p, Eterm message, SYS_MSG_TYPE_TRACE, message); #ifndef ERTS_SMP + erts_port_release(trace_port); return; } @@ -537,6 +559,9 @@ send_to_port(Process *c_p, Eterm message, */ do_send_schedfix_to_port(trace_port, c_p->id, ts); } + + erts_port_release(trace_port); + UnUseTmpHeapNoproc(LOCAL_HEAP_SIZE); #undef LOCAL_HEAP_SIZE #endif @@ -566,15 +591,19 @@ profile_send(Eterm from, Eterm message) { Port *profiler_port = NULL; /* not smp */ - - - profiler_port = &erts_port[internal_port_index(profiler)]; - - do_send_to_port(profiler, - profiler_port, - NIL, /* or current process->id */ - SYS_MSG_TYPE_SYSPROF, - message); + + profiler_port = erts_id2port_sflgs(profiler, + NULL, + 0, + ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP); + if (profiler_port) { + do_send_to_port(profiler, + profiler_port, + NIL, /* or current process->id */ + SYS_MSG_TYPE_SYSPROF, + message); + erts_port_release(profiler_port); + } } else { ASSERT(is_internal_pid(profiler) @@ -627,13 +656,11 @@ seq_trace_send_to_port(Process *c_p, trace_port = NULL; #else - if (is_not_internal_port(seq_tracer)) - goto invalid_tracer_port; - - trace_port = &erts_port[internal_port_index(seq_tracer)]; - - if (INVALID_TRACER_PORT(trace_port, seq_tracer)) { - invalid_tracer_port: + trace_port = erts_id2port_sflgs(seq_tracer, + NULL, + 0, + ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP); + if (!trace_port) { system_seq_tracer = am_false; #ifndef ERTS_SMP UnUseTmpHeapNoproc(LOCAL_HEAP_SIZE); @@ -651,6 +678,7 @@ seq_trace_send_to_port(Process *c_p, message); #ifndef ERTS_SMP + erts_port_release(trace_port); UnUseTmpHeapNoproc(LOCAL_HEAP_SIZE); return; } @@ -692,6 +720,9 @@ seq_trace_send_to_port(Process *c_p, */ do_send_schedfix_to_port(trace_port, c_p->id, ts); } + + erts_port_release(trace_port); + UnUseTmpHeapNoproc(LOCAL_HEAP_SIZE); #undef LOCAL_HEAP_SIZE #endif @@ -790,13 +821,8 @@ trace_sched_aux(Process *p, Eterm what, int never_fake_sched) ERTS_GET_TRACER_REF(tracer_ref, p->tracer_proc, p->trace_flags); } - if (ERTS_PROC_IS_EXITING(p) -#ifndef ERTS_SMP - || p->status == P_FREE -#endif - ) { + if (ERTS_PROC_IS_EXITING(p)) curr_func = 0; - } else { if (!p->current) p->current = find_function_from_pc(p->i); @@ -3148,10 +3174,10 @@ sys_msg_disp_failure(ErtsSysMsgQ *smqp, Eterm receiver) break; case SYS_MSG_TYPE_SEQTRACE: /* Reset seq_tracer if it hasn't changed */ - erts_smp_mtx_lock(&sys_trace_mtx); + erts_smp_rwmtx_rwlock(&sys_trace_rwmtx); if (system_seq_tracer == receiver) system_seq_tracer = am_false; - erts_smp_mtx_unlock(&sys_trace_mtx); + erts_smp_rwmtx_rwunlock(&sys_trace_rwmtx); break; case SYS_MSG_TYPE_SYSMON: if (receiver == NIL @@ -3406,12 +3432,12 @@ sys_msg_dispatcher_func(void *unused) goto queue_proc_msg; } else if (is_internal_port(receiver)) { - port = erts_id2port(receiver, NULL, 0); - if (INVALID_TRACER_PORT(port, receiver)) { - if (port) - erts_port_release(port); + port = erts_id2port_sflgs(receiver, + NULL, + 0, + ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP); + if (!port) goto failure; - } else { write_sys_msg_to_port(receiver, port, diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index ded9646800..e9dbba3d4a 100644 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -143,8 +143,8 @@ typedef struct ErtsXPortsList_ ErtsXPortsList; struct port { ErtsPortTaskSched sched; ErtsPortTaskHandle timeout_task; -#ifdef ERTS_SMP erts_smp_atomic_t refc; +#ifdef ERTS_SMP erts_smp_mtx_t *lock; ErtsXPortsList *xports; erts_smp_atomic_t run_queue; @@ -196,6 +196,8 @@ erts_port_runq(Port *prt) #ifdef ERTS_SMP ErtsRunQueue *rq1, *rq2; rq1 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue); + if (!rq1) + return NULL; while (1) { erts_smp_runq_lock(rq1); rq2 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue); @@ -203,6 +205,8 @@ erts_port_runq(Port *prt) return rq1; erts_smp_runq_unlock(rq1); rq1 = rq2; + if (!rq1) + return NULL; } #else return ERTS_RUNQ_IX(0); @@ -1220,28 +1224,29 @@ erts_smp_port_state_unlock(Port *prt) ERTS_GLB_INLINE int erts_smp_port_trylock(Port *prt) { -#ifdef ERTS_SMP int res; ASSERT(erts_smp_atomic_read_nob(&prt->refc) > 0); erts_smp_atomic_inc_nob(&prt->refc); + +#ifdef ERTS_SMP res = erts_smp_mtx_trylock(prt->lock); if (res == EBUSY) { erts_smp_atomic_dec_nob(&prt->refc); } +#else + res = 0; +#endif return res; -#else /* !ERTS_SMP */ - return 0; -#endif } ERTS_GLB_INLINE void erts_smp_port_lock(Port *prt) { -#ifdef ERTS_SMP ASSERT(erts_smp_atomic_read_nob(&prt->refc) > 0); erts_smp_atomic_inc_nob(&prt->refc); +#ifdef ERTS_SMP erts_smp_mtx_lock(prt->lock); #endif } @@ -1249,14 +1254,14 @@ erts_smp_port_lock(Port *prt) ERTS_GLB_INLINE void erts_smp_port_unlock(Port *prt) { -#ifdef ERTS_SMP erts_aint_t refc; +#ifdef ERTS_SMP erts_smp_mtx_unlock(prt->lock); +#endif refc = erts_smp_atomic_dec_read_nob(&prt->refc); ASSERT(refc >= 0); if (refc == 0) erts_port_cleanup(prt); -#endif } #endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ @@ -1319,12 +1324,12 @@ erts_id2port_sflgs(Eterm id, Process *c_p, ErtsProcLocks c_p_locks, Uint32 sflgs erts_smp_port_state_unlock(prt); prt = NULL; } -#ifdef ERTS_SMP else { erts_smp_atomic_inc_nob(&prt->refc); erts_smp_port_state_unlock(prt); - if (no_proc_locks) +#ifdef ERTS_SMP + if (no_proc_locks) erts_smp_mtx_lock(prt->lock); else if (erts_smp_mtx_trylock(prt->lock) == EBUSY) { /* Unlock process locks, and acquire locks in lock order... */ @@ -1340,21 +1345,17 @@ erts_id2port_sflgs(Eterm id, Process *c_p, ErtsProcLocks c_p_locks, Uint32 sflgs erts_smp_port_unlock(prt); /* Also decrements refc... */ prt = NULL; } - } #endif + } + return prt; } ERTS_GLB_INLINE void erts_port_release(Port *prt) { -#ifdef ERTS_SMP erts_smp_port_unlock(prt); -#else - if (prt->status & ERTS_PORT_SFLGS_DEAD) - erts_port_cleanup(prt); -#endif } ERTS_GLB_INLINE Port* @@ -1935,6 +1936,13 @@ extern erts_driver_t spawn_driver; extern erts_driver_t fd_driver; /* Should maybe be placed in erl_message.h, but then we get an include mess. */ +ERTS_GLB_INLINE Eterm * +erts_alloc_message_heap_state(Uint size, + ErlHeapFragment **bpp, + ErlOffHeap **ohpp, + Process *receiver, + ErtsProcLocks *receiver_locks, + erts_aint32_t *statep); ERTS_GLB_INLINE Eterm * erts_alloc_message_heap(Uint size, @@ -1957,16 +1965,22 @@ erts_alloc_message_heap(Uint size, */ ERTS_GLB_INLINE Eterm * -erts_alloc_message_heap(Uint size, - ErlHeapFragment **bpp, - ErlOffHeap **ohpp, - Process *receiver, - ErtsProcLocks *receiver_locks) +erts_alloc_message_heap_state(Uint size, + ErlHeapFragment **bpp, + ErlOffHeap **ohpp, + Process *receiver, + ErtsProcLocks *receiver_locks, + erts_aint32_t *statep) { Eterm *hp; + erts_aint32_t state; #ifdef ERTS_SMP int locked_main = 0; - ErtsProcLocks ulocks = *receiver_locks & ERTS_PROC_LOCKS_MSG_SEND; + state = erts_smp_atomic32_read_acqb(&receiver->state); + if (statep) + *statep = state; + if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) + goto allocate_in_mbuf; #endif if (size > (Uint) INT_MAX) @@ -1982,20 +1996,19 @@ erts_alloc_message_heap(Uint size, #ifdef ERTS_SMP try_allocate_on_heap: #endif - if (ERTS_PROC_IS_EXITING(receiver) + state = erts_smp_atomic32_read_nob(&receiver->state); + if (statep) + *statep = state; + if ((state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) || HEAP_LIMIT(receiver) - HEAP_TOP(receiver) <= size) { #ifdef ERTS_SMP - if (locked_main) - ulocks |= ERTS_PROC_LOCK_MAIN; + if (locked_main) { + *receiver_locks &= ~ERTS_PROC_LOCK_MAIN; + erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MAIN); + } #endif goto allocate_in_mbuf; } -#ifdef ERTS_SMP - if (ulocks) { - erts_smp_proc_unlock(receiver, ulocks); - *receiver_locks &= ~ulocks; - } -#endif hp = HEAP_TOP(receiver); HEAP_TOP(receiver) = hp + size; *bpp = NULL; @@ -2011,12 +2024,6 @@ erts_alloc_message_heap(Uint size, else { ErlHeapFragment *bp; allocate_in_mbuf: -#ifdef ERTS_SMP - if (ulocks) { - *receiver_locks &= ~ulocks; - erts_smp_proc_unlock(receiver, ulocks); - } -#endif bp = new_message_buffer(size); hp = bp->mem; *bpp = bp; @@ -2026,6 +2033,17 @@ erts_alloc_message_heap(Uint size, return hp; } +ERTS_GLB_INLINE Eterm * +erts_alloc_message_heap(Uint size, + ErlHeapFragment **bpp, + ErlOffHeap **ohpp, + Process *receiver, + ErtsProcLocks *receiver_locks) +{ + return erts_alloc_message_heap_state(size, bpp, ohpp, receiver, + receiver_locks, NULL); +} + #endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ #if !HEAP_ON_C_STACK diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 8d20178307..4dd60b4d23 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -259,10 +259,8 @@ get_free_port(void) } } port->status = ERTS_PORT_SFLG_INITIALIZING; -#ifdef ERTS_SMP - ERTS_SMP_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 0); + ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 0); erts_smp_atomic_set_nob(&port->refc, 2); /* Port alive + lock */ -#endif erts_smp_port_state_unlock(port); return num & port_num_mask; } @@ -340,10 +338,14 @@ port_cleanup(Port *prt) prt->drv_ptr = NULL; ASSERT(driver); -#ifdef ERTS_SMP - ASSERT(prt->status & ERTS_PORT_SFLG_FREE_SCHEDULED); - ERTS_SMP_LC_ASSERT(erts_smp_atomic_read_nob(&prt->refc) == 0); + ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&prt->refc) == 0); + + ASSERT(prt->status & ERTS_PORT_SFLG_PORT_DEBUG); + ASSERT(!(prt->status & ERTS_PORT_SFLG_FREE)); + prt->status = ERTS_PORT_SFLG_FREE; + +#ifdef ERTS_SMP port_specific = (prt->status & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK); @@ -352,10 +354,6 @@ port_cleanup(Port *prt) prt->lock = NULL; - ASSERT(prt->status & ERTS_PORT_SFLG_PORT_DEBUG); - ASSERT(!(prt->status & ERTS_PORT_SFLG_FREE)); - prt->status = ERTS_PORT_SFLG_FREE; - erts_smp_port_state_unlock(prt); erts_smp_mtx_unlock(mtx); @@ -605,10 +603,8 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */ /* Need to mark the port as free again */ erts_smp_port_state_lock(port); port->status = ERTS_PORT_SFLG_FREE; -#ifdef ERTS_SMP - ERTS_SMP_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 2); + ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 2); erts_smp_atomic_set_nob(&port->refc, 0); -#endif erts_smp_port_state_unlock(port); return -3; } @@ -1343,8 +1339,8 @@ void init_io(void) for (i = 0; i < erts_max_ports; i++) { erts_port_task_init_sched(&erts_port[i].sched); -#ifdef ERTS_SMP erts_smp_atomic_init_nob(&erts_port[i].refc, 0); +#ifdef ERTS_SMP erts_port[i].lock = NULL; erts_port[i].xports = NULL; erts_smp_spinlock_init_x(&erts_port[i].state_lck, "port_state", make_small(i)); diff --git a/erts/emulator/beam/register.c b/erts/emulator/beam/register.c index 0ddae56322..c02872ef80 100644 --- a/erts/emulator/beam/register.c +++ b/erts/emulator/beam/register.c @@ -93,14 +93,14 @@ reg_safe_write_lock(Process *c_p, ErtsProcLocks *c_p_locks) reg_write_lock(); } +#endif + static ERTS_INLINE int is_proc_alive(Process *p) { return !ERTS_PROC_IS_EXITING(p); } -#endif - void register_info(int to, void *to_arg) { int lock = !ERTS_IS_CRASH_DUMPING; @@ -384,8 +384,7 @@ erts_whereis_name(Process *c_p, } #else if (rp->p - && ((flags & ERTS_P2P_FLG_ALLOW_OTHER_X) - || rp->p->status != P_EXITING)) + && ((flags & ERTS_P2P_FLG_ALLOW_OTHER_X) || is_proc_alive(rp->p))) *proc = rp->p; else *proc = NULL; @@ -397,7 +396,9 @@ erts_whereis_name(Process *c_p, if (!rp || !rp->pt) *port = NULL; else { -#ifdef ERTS_SMP +#ifndef ERTS_SMP + erts_smp_atomic_inc_nob(&rp->pt->refc); +#else if (pending_port == rp->pt) pending_port = NULL; else { @@ -504,9 +505,11 @@ int erts_unregister_name(Process *c_p, if ((rp = (RegProc*) hash_get(&process_reg, (void*) &r)) != NULL) { if (rp->pt) { if (port != rp->pt) { -#ifdef ERTS_SMP +#ifndef ERTS_SMP + erts_smp_atomic_inc_nob(&rp->pt->refc); +#else if (port) { - ERTS_SMP_LC_ASSERT(port != c_prt); + ASSERT(port != c_prt); erts_smp_port_unlock(port); port = NULL; } diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c index 245916bc0d..db6597dc7c 100644 --- a/erts/emulator/beam/utils.c +++ b/erts/emulator/beam/utils.c @@ -1666,12 +1666,20 @@ static int do_send_to_logger(Eterm tag, Eterm gleader, char *buf, int len) } #ifndef ERTS_SMP - if ( #ifdef USE_THREADS - !erts_get_scheduler_data() || /* Must be scheduler thread */ + p = NULL; + if (erts_get_scheduler_data()) /* Must be scheduler thread */ #endif - (p = erts_whereis_process(NULL, 0, am_error_logger, 0, 0)) == NULL - || p->status == P_RUNNING) { + { + p = erts_whereis_process(NULL, 0, am_error_logger, 0, 0); + if (p) { + erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); + if (state & ERTS_PSFLG_RUNNING) + p = NULL; + } + } + + if (!p) { /* buf *always* points to a null terminated string */ erts_fprintf(stderr, "(no error logger present) %T: \"%s\"\n", tag, buf); |