aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r--erts/emulator/beam/beam_bp.c2
-rw-r--r--erts/emulator/beam/beam_emu.c28
-rw-r--r--erts/emulator/beam/bif.c93
-rw-r--r--erts/emulator/beam/break.c69
-rw-r--r--erts/emulator/beam/erl_alloc.types2
-rw-r--r--erts/emulator/beam/erl_alloc_util.c11
-rw-r--r--erts/emulator/beam/erl_bif_ddll.c12
-rw-r--r--erts/emulator/beam/erl_bif_info.c16
-rw-r--r--erts/emulator/beam/erl_cpu_topology.c5
-rw-r--r--erts/emulator/beam/erl_gc.c29
-rw-r--r--erts/emulator/beam/erl_lock_check.c2
-rw-r--r--erts/emulator/beam/erl_message.c200
-rw-r--r--erts/emulator/beam/erl_port_task.c224
-rw-r--r--erts/emulator/beam/erl_port_task.h13
-rw-r--r--erts/emulator/beam/erl_process.c2992
-rw-r--r--erts/emulator/beam/erl_process.h458
-rw-r--r--erts/emulator/beam/erl_process_dump.c8
-rw-r--r--erts/emulator/beam/erl_process_lock.h25
-rw-r--r--erts/emulator/beam/erl_trace.c162
-rw-r--r--erts/emulator/beam/global.h92
-rw-r--r--erts/emulator/beam/io.c24
-rw-r--r--erts/emulator/beam/register.c17
-rw-r--r--erts/emulator/beam/utils.c16
23 files changed, 2206 insertions, 2294 deletions
diff --git a/erts/emulator/beam/beam_bp.c b/erts/emulator/beam/beam_bp.c
index f4e7119d4d..a6e00f87e0 100644
--- a/erts/emulator/beam/beam_bp.c
+++ b/erts/emulator/beam/beam_bp.c
@@ -713,7 +713,7 @@ void erts_trace_time_break(Process *p, BeamInstr *pc, BpDataTime *bdt, Uint type
BpDataTime *pbdt = NULL;
ASSERT(p);
- ASSERT(p->status == P_RUNNING);
+ ASSERT(ERTS_PSFLG_RUNNING & erts_smp_atomic32_read_acqb(&p->state));
/* get previous timestamp and breakpoint
* from the process psd */
diff --git a/erts/emulator/beam/beam_emu.c b/erts/emulator/beam/beam_emu.c
index 5ca3e8060b..b831147295 100644
--- a/erts/emulator/beam/beam_emu.c
+++ b/erts/emulator/beam/beam_emu.c
@@ -1875,13 +1875,12 @@ void process_main(void)
msgp = PEEK_MESSAGE(c_p);
if (msgp)
erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
- else {
+ else
#endif
+ {
SET_I((BeamInstr *) Arg(0));
Goto(*I); /* Jump to a wait or wait_timeout instruction */
-#ifdef ERTS_SMP
}
-#endif
}
ErtsMoveMsgAttachmentIntoProc(msgp, c_p, E, HTOP, FCALLS,
{
@@ -2110,11 +2109,11 @@ void process_main(void)
OpCase(wait_f):
wait2: {
- ASSERT(!ERTS_PROC_IS_EXITING(c_p));
c_p->i = (BeamInstr *) Arg(0); /* L1 */
SWAPOUT;
c_p->arity = 0;
- c_p->status = P_WAITING;
+ erts_smp_atomic32_read_band_relb(&c_p->state, ~ERTS_PSFLG_ACTIVE);
+ ASSERT(!ERTS_PROC_IS_EXITING(c_p));
erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
c_p->current = NULL;
goto do_schedule;
@@ -3193,10 +3192,6 @@ void process_main(void)
c_p->arg_reg[0] = r(0);
SWAPOUT;
c_p->i = I;
- erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS);
- if (c_p->status != P_SUSPENDED)
- erts_add_to_runq(c_p);
- erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS);
goto do_schedule1;
}
@@ -5159,9 +5154,6 @@ void process_main(void)
c_p->arity = 1; /* One living register (the 'true' return value) */
SWAPOUT;
c_p->i = I + 1; /* Next instruction */
- erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS);
- erts_add_to_runq(c_p);
- erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS);
c_p->current = NULL;
goto do_schedule;
}
@@ -6255,9 +6247,7 @@ erts_hibernate(Process* c_p, Eterm module, Eterm function, Eterm args, Eterm* re
*/
erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS);
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p);
- if (c_p->msg.len > 0) {
- erts_add_to_runq(c_p);
- } else {
+ if (!c_p->msg.len) {
erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS);
c_p->fvalue = NIL;
PROCESS_MAIN_CHK_LOCKS(c_p);
@@ -6265,14 +6255,12 @@ erts_hibernate(Process* c_p, Eterm module, Eterm function, Eterm args, Eterm* re
ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
PROCESS_MAIN_CHK_LOCKS(c_p);
erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS);
- ASSERT(!ERTS_PROC_IS_EXITING(c_p));
#ifdef ERTS_SMP
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p);
- if (c_p->msg.len > 0)
- erts_add_to_runq(c_p);
- else
+ if (!c_p->msg.len)
#endif
- c_p->status = P_WAITING;
+ erts_smp_atomic32_read_band_relb(&c_p->state, ~ERTS_PSFLG_ACTIVE);
+ ASSERT(!ERTS_PROC_IS_EXITING(c_p));
}
erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS);
c_p->current = bif_export[BIF_hibernate_3]->code;
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index dd9488afa6..160b1ef0fc 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -233,15 +233,17 @@ BIF_RETTYPE link_1(BIF_ALIST_1)
BIF_ERROR(BIF_P, BADARG);
- res_no_proc:
- if (BIF_P->flags & F_TRAPEXIT) {
- ErtsProcLocks locks = ERTS_PROC_LOCK_MAIN;
- erts_deliver_exit_message(BIF_ARG_1, BIF_P, &locks, am_noproc, NIL);
- erts_smp_proc_unlock(BIF_P, ~ERTS_PROC_LOCK_MAIN & locks);
- BIF_RET(am_true);
+res_no_proc: {
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&BIF_P->state);
+ if (state & ERTS_PSFLG_TRAP_EXIT) {
+ ErtsProcLocks locks = ERTS_PROC_LOCK_MAIN;
+ erts_deliver_exit_message(BIF_ARG_1, BIF_P, &locks, am_noproc, NIL);
+ erts_smp_proc_unlock(BIF_P, ~ERTS_PROC_LOCK_MAIN & locks);
+ BIF_RET(am_true);
+ }
+ else
+ BIF_ERROR(BIF_P, EXC_NOPROC);
}
- else
- BIF_ERROR(BIF_P, EXC_NOPROC);
}
#define ERTS_DEMONITOR_FALSE 2
@@ -1103,8 +1105,9 @@ BIF_RETTYPE hibernate_3(BIF_ALIST_3)
if (erts_hibernate(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, reg)) {
/*
- * If hibernate succeeded, TRAP. The process will be suspended
- * if status is P_WAITING or continue (if any message was in the queue).
+ * If hibernate succeeded, TRAP. The process will be wait in a
+ * hibernated state if its state is inactive (!ERTS_PSFLG_ACTIVE);
+ * otherwise, continue executing (if any message was in the queue).
*/
BIF_TRAP_CODE_PTR_(BIF_P, BIF_P->i);
}
@@ -1499,14 +1502,13 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2)
BIF_RET(old_value);
}
else if (BIF_ARG_1 == am_priority) {
- erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_STATUS);
old_value = erts_set_process_priority(BIF_P, BIF_ARG_2);
- erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_STATUS);
if (old_value == THE_NON_VALUE)
goto error;
BIF_RET(old_value);
}
else if (BIF_ARG_1 == am_trap_exit) {
+ erts_aint32_t state;
Uint trap_exit;
if (BIF_ARG_2 == am_true) {
trap_exit = 1;
@@ -1521,59 +1523,52 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2)
* For more info, see implementation of erts_send_exit_signal().
*/
erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_STATUS);
+ ERTS_SMP_LC_ASSERT((ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS)
+ & erts_proc_lc_my_proc_locks(BIF_P));
ERTS_SMP_BIF_CHK_PENDING_EXIT(BIF_P,
ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
- old_value = ERTS_PROC_IS_TRAPPING_EXITS(BIF_P) ? am_true : am_false;
- if (trap_exit) {
- ERTS_PROC_SET_TRAP_EXIT(BIF_P);
- } else {
- ERTS_PROC_UNSET_TRAP_EXIT(BIF_P);
- }
+ if (trap_exit)
+ state = erts_smp_atomic32_read_bor_nob(&BIF_P->state,
+ ERTS_PSFLG_TRAP_EXIT);
+ else
+ state = erts_smp_atomic32_read_band_nob(&BIF_P->state,
+ ~ERTS_PSFLG_TRAP_EXIT);
+ old_value = (state & ERTS_PSFLG_TRAP_EXIT) ? am_true : am_false;
erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_STATUS);
BIF_RET(old_value);
}
else if (BIF_ARG_1 == am_scheduler) {
- int yield;
- ErtsRunQueue *old;
- ErtsRunQueue *new;
+ ErtsRunQueue *old, *new, *curr;
Sint sched;
+ erts_aint32_t state;
+
if (!is_small(BIF_ARG_2))
goto error;
sched = signed_val(BIF_ARG_2);
if (sched < 0 || erts_no_schedulers < sched)
goto error;
- erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_STATUS);
- old = BIF_P->bound_runq;
-#ifdef ERTS_SMP
- ASSERT(!old || old == BIF_P->run_queue);
-#endif
- new = !sched ? NULL : erts_schedid2runq(sched);
-#ifndef ERTS_SMP
- yield = 0;
-#else
- if (new == old)
- yield = 0;
+
+ if (sched == 0) {
+ new = NULL;
+ state = erts_smp_atomic32_read_band_mb(&BIF_P->state,
+ ~ERTS_PSFLG_BOUND);
+ }
else {
- ErtsRunQueue *curr = BIF_P->run_queue;
- if (!new)
- erts_smp_runq_lock(curr);
- else
- erts_smp_runqs_lock(curr, new);
- yield = new && BIF_P->run_queue != new;
-#endif
- BIF_P->bound_runq = new;
+ new = erts_schedid2runq(sched);
#ifdef ERTS_SMP
- if (new)
- BIF_P->run_queue = new;
- if (!new)
- erts_smp_runq_unlock(curr);
- else
- erts_smp_runqs_unlock(curr, new);
- }
+ erts_atomic_set_nob(&BIF_P->run_queue, (erts_aint_t) new);
#endif
- erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_STATUS);
+ state = erts_smp_atomic32_read_bor_mb(&BIF_P->state,
+ ERTS_PSFLG_BOUND);
+ }
+
+ curr = ERTS_GET_SCHEDULER_DATA_FROM_PROC(BIF_P)->run_queue;
+ old = (ERTS_PSFLG_BOUND & state) ? curr : NULL;
+
+ ASSERT(!old || old == curr);
+
old_value = old ? make_small(old->ix+1) : make_small(0);
- if (yield)
+ if (new && new != curr)
ERTS_BIF_YIELD_RETURN_X(BIF_P, old_value, am_scheduler);
else
BIF_RET(old_value);
diff --git a/erts/emulator/beam/break.c b/erts/emulator/beam/break.c
index 0a5fc26ee8..9cb5f2cc16 100644
--- a/erts/emulator/beam/break.c
+++ b/erts/emulator/beam/break.c
@@ -73,8 +73,8 @@ process_info(int to, void *to_arg)
for (i = 0; i < erts_max_processes; i++) {
Process *p = erts_pix2proc(i);
if (p && p->i != ENULL) {
- if (p->status != P_EXITING)
- print_process_info(to, to_arg, p);
+ if (!ERTS_PROC_IS_EXITING(p))
+ print_process_info(to, to_arg, p);
}
}
@@ -99,11 +99,20 @@ process_killer(void)
if ((j = sys_get_key(0)) <= 0)
erl_exit(0, "");
switch(j) {
- case 'k':
- if (rp->status == P_WAITING) {
- ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
- erts_smp_proc_inc_refc(rp);
- erts_smp_proc_lock(rp, rp_locks);
+ case 'k': {
+ ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
+ erts_aint32_t state;
+ erts_smp_proc_inc_refc(rp);
+ erts_smp_proc_lock(rp, rp_locks);
+ state = erts_smp_atomic32_read_acqb(&rp->state);
+ if (state & (ERTS_PSFLG_FREE
+ | ERTS_PSFLG_EXITING
+ | ERTS_PSFLG_ACTIVE
+ | ERTS_PSFLG_IN_RUNQ
+ | ERTS_PSFLG_RUNNING)) {
+ erts_printf("Can only kill WAITING processes this way\n");
+ }
+ else {
(void) erts_send_exit_signal(NULL,
NIL,
rp,
@@ -112,12 +121,10 @@ process_killer(void)
NIL,
NULL,
0);
- erts_smp_proc_unlock(rp, rp_locks);
- erts_smp_proc_dec_refc(rp);
}
- else
- erts_printf("Can only kill WAITING processes this way\n");
-
+ erts_smp_proc_unlock(rp, rp_locks);
+ erts_smp_proc_dec_refc(rp);
+ }
case 'n': br = 1; break;
case 'r': return;
default: return;
@@ -186,38 +193,34 @@ print_process_info(int to, void *to_arg, Process *p)
int garbing = 0;
int running = 0;
struct saved_calls *scb;
+ erts_aint32_t state;
/* display the PID */
erts_print(to, to_arg, "=proc:%T\n", p->id);
/* Display the state */
erts_print(to, to_arg, "State: ");
- switch (p->status) {
- case P_FREE:
+
+ state = erts_smp_atomic32_read_acqb(&p->state);
+ if (state & ERTS_PSFLG_FREE)
erts_print(to, to_arg, "Non Existing\n"); /* Should never happen */
- break;
- case P_RUNABLE:
- erts_print(to, to_arg, "Scheduled\n");
- break;
- case P_WAITING:
- erts_print(to, to_arg, "Waiting\n");
- break;
- case P_SUSPENDED:
- erts_print(to, to_arg, "Suspended\n");
- break;
- case P_RUNNING:
- erts_print(to, to_arg, "Running\n");
- running = 1;
- break;
- case P_EXITING:
+ else if (state & ERTS_PSFLG_EXITING)
erts_print(to, to_arg, "Exiting\n");
- break;
- case P_GARBING:
- erts_print(to, to_arg, "Garbing\n");
+ else if (state & ERTS_PSFLG_GC) {
garbing = 1;
running = 1;
- break;
+ erts_print(to, to_arg, "Garbing\n");
+ }
+ else if (state & ERTS_PSFLG_SUSPENDED)
+ erts_print(to, to_arg, "Suspended\n");
+ else if (state & ERTS_PSFLG_RUNNING) {
+ running = 1;
+ erts_print(to, to_arg, "Running\n");
}
+ else if (state & ERTS_PSFLG_ACTIVE)
+ erts_print(to, to_arg, "Scheduled\n");
+ else
+ erts_print(to, to_arg, "Waiting\n");
/*
* If the process is registered as a global process, display the
diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types
index b30e6dc018..d1e3b4b0ef 100644
--- a/erts/emulator/beam/erl_alloc.types
+++ b/erts/emulator/beam/erl_alloc.types
@@ -336,6 +336,8 @@ type SL_PTIMER SHORT_LIVED SYSTEM ptimer_sl
type LL_PTIMER STANDARD SYSTEM ptimer_ll
type SYS_MSG_Q SHORT_LIVED PROCESSES system_messages_queue
type FP_EXCEPTION LONG_LIVED SYSTEM fp_exception
+type LL_MPATHS LONG_LIVED SYSTEM ll_migration_paths
+type SL_MPATHS SHORT_LIVED SYSTEM sl_migration_paths
+endif
+if hipe
diff --git a/erts/emulator/beam/erl_alloc_util.c b/erts/emulator/beam/erl_alloc_util.c
index e0d525bdde..62225d3572 100644
--- a/erts/emulator/beam/erl_alloc_util.c
+++ b/erts/emulator/beam/erl_alloc_util.c
@@ -832,14 +832,14 @@ init_dd_queue(ErtsAllctrDDQueue_t *ddq)
static ERTS_INLINE erts_aint_t
ddq_managed_thread_enqueue(ErtsAllctrDDQueue_t *ddq, void *ptr)
{
- erts_aint_t ilast, itmp;
+ erts_aint_t first_ilast, ilast, itmp;
ErtsAllctrDDBlock_t *this = ptr;
erts_atomic_init_nob(&this->atmc_next, ERTS_AINT_NULL);
/* Enqueue at end of list... */
- ilast = erts_atomic_read_nob(&ddq->tail.data.last);
+ first_ilast = ilast = erts_atomic_read_nob(&ddq->tail.data.last);
while (1) {
ErtsAllctrDDBlock_t *last = (ErtsAllctrDDBlock_t *) ilast;
itmp = erts_atomic_cmpxchg_mb(&last->atmc_next,
@@ -853,8 +853,11 @@ ddq_managed_thread_enqueue(ErtsAllctrDDQueue_t *ddq, void *ptr)
/* Move last pointer forward... */
while (1) {
if (erts_atomic_read_rb(&this->atmc_next) != ERTS_AINT_NULL) {
- /* Someone else will move it forward */
- return erts_atomic_read_rb(&ddq->tail.data.last);
+ ilast = erts_atomic_read_rb(&ddq->tail.data.last);
+ if (first_ilast != ilast) {
+ /* Someone else will move it forward */
+ return ilast;
+ }
}
itmp = erts_atomic_cmpxchg_mb(&ddq->tail.data.last,
(erts_aint_t) this,
diff --git a/erts/emulator/beam/erl_bif_ddll.c b/erts/emulator/beam/erl_bif_ddll.c
index c338ee1c4b..7f7c975e78 100644
--- a/erts/emulator/beam/erl_bif_ddll.c
+++ b/erts/emulator/beam/erl_bif_ddll.c
@@ -368,13 +368,11 @@ BIF_RETTYPE erl_ddll_try_load_3(BIF_ALIST_3)
#endif
for (j = 0; j < erts_max_ports; j++) {
Port* prt = &erts_port[j];
-#ifdef DDLL_SMP
erts_smp_port_state_lock(prt);
-#endif
if (!(prt->status & FREE_PORT_FLAGS) &&
prt->drv_ptr->handle == dh) {
-#if DDLL_SMP
erts_smp_atomic_inc_nob(&prt->refc);
+#if DDLL_SMP
/* Extremely rare spinlock */
while(prt->status & ERTS_PORT_SFLG_INITIALIZING) {
erts_smp_port_state_unlock(prt);
@@ -598,13 +596,11 @@ done:
#endif
for (j = 0; j < erts_max_ports; j++) {
Port* prt = &erts_port[j];
-#if DDLL_SMP
erts_smp_port_state_lock(prt);
-#endif
if (!(prt->status & FREE_PORT_FLAGS)
&& prt->drv_ptr->handle == dh) {
-#if DDLL_SMP
erts_smp_atomic_inc_nob(&prt->refc);
+#if DDLL_SMP
/* Extremely rare spinlock */
while(prt->status & ERTS_PORT_SFLG_INITIALIZING) {
erts_smp_port_state_unlock(prt);
@@ -1060,13 +1056,11 @@ void erts_ddll_proc_dead(Process *p, ErtsProcLocks plocks)
#endif
for (j = 0; j < erts_max_ports; j++) {
Port* prt = &erts_port[j];
-#if DDLL_SMP
erts_smp_port_state_lock(prt);
-#endif
if (!(prt->status & FREE_PORT_FLAGS) &&
prt->drv_ptr->handle == dh) {
-#if DDLL_SMP
erts_smp_atomic_inc_nob(&prt->refc);
+#if DDLL_SMP
while(prt->status & ERTS_PORT_SFLG_INITIALIZING) {
erts_smp_port_state_unlock(prt);
erts_smp_port_state_lock(prt);
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 8117b1bfaa..90ddaa4fac 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -1355,13 +1355,15 @@ process_info_aux(Process *BIF_P,
hp = HAlloc(BIF_P, 3);
break;
- case am_trap_exit:
+ case am_trap_exit: {
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&rp->state);
hp = HAlloc(BIF_P, 3);
- if (rp->flags & F_TRAPEXIT)
+ if (state & ERTS_PSFLG_TRAP_EXIT)
res = am_true;
else
res = am_false;
break;
+ }
case am_error_handler:
hp = HAlloc(BIF_P, 3);
@@ -3144,14 +3146,8 @@ BIF_RETTYPE is_process_alive_1(BIF_ALIST_1)
BIF_RET(am_false);
}
else {
- int px;
-#ifdef ERTS_SMP
- px = (erts_atomic32_read_acqb(&rp->aflags)
- & (ERTS_PROC_AFLG_PENDING_EXIT|ERTS_PROC_AFLG_EXITING));
-#else
- px = 0;
-#endif
- if (px)
+ if (erts_smp_atomic32_read_acqb(&rp->state)
+ & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING))
ERTS_BIF_AWAIT_X_DATA_TRAP(BIF_P, BIF_ARG_1, am_false);
else
BIF_RET(am_true);
diff --git a/erts/emulator/beam/erl_cpu_topology.c b/erts/emulator/beam/erl_cpu_topology.c
index fe3693d0ca..41e71881a0 100644
--- a/erts/emulator/beam/erl_cpu_topology.c
+++ b/erts/emulator/beam/erl_cpu_topology.c
@@ -486,7 +486,7 @@ erts_sched_check_cpu_bind_post_suspend(ErtsSchedulerData *esdp)
erts_thr_set_main_status(1, (int) esdp->no);
/* Make sure we check if we should bind to a cpu or not... */
- esdp->run_queue->flags |= ERTS_RUNQ_FLG_CHK_CPU_BIND;
+ ERTS_RUNQ_FLGS_SET(esdp->run_queue, ERTS_RUNQ_FLG_CHK_CPU_BIND);
}
#endif
@@ -498,9 +498,6 @@ erts_sched_check_cpu_bind(ErtsSchedulerData *esdp)
erts_cpu_groups_map_t *cgm;
erts_cpu_groups_callback_list_t *cgcl;
erts_cpu_groups_callback_call_t *cgcc;
-#ifdef ERTS_SMP
- esdp->run_queue->flags &= ~ERTS_RUNQ_FLG_CHK_CPU_BIND;
-#endif
erts_smp_runq_unlock(esdp->run_queue);
erts_smp_rwmtx_rwlock(&cpuinfo_rwmtx);
cpu_id = scheduler2cpu_map[esdp->no].bind_id;
diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c
index 52a6e52e6c..6075a527c3 100644
--- a/erts/emulator/beam/erl_gc.c
+++ b/erts/emulator/beam/erl_gc.c
@@ -357,11 +357,7 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj)
trace_gc(p, am_gc_start);
}
- erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS);
- p->gcstatus = p->status;
- p->status = P_GARBING;
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
-
+ erts_smp_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC);
if (erts_system_monitor_long_gc != 0) {
get_now(&ms1, &s1, &us1);
}
@@ -404,9 +400,8 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj)
ErtsGcQuickSanityCheck(p);
- erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS);
- p->status = p->gcstatus;
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
+ erts_smp_atomic32_read_band_nob(&p->state, ~ERTS_PSFLG_GC);
+
if (IS_TRACED_FL(p, F_TRACE_GC)) {
trace_gc(p, am_gc_end);
}
@@ -490,10 +485,7 @@ erts_garbage_collect_hibernate(Process* p)
/*
* Preliminaries.
*/
- erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS);
- p->gcstatus = p->status;
- p->status = P_GARBING;
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
+ erts_smp_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC);
ErtsGcQuickSanityCheck(p);
ASSERT(p->mbuf_sz == 0);
ASSERT(p->mbuf == 0);
@@ -604,9 +596,7 @@ erts_garbage_collect_hibernate(Process* p)
ErtsGcQuickSanityCheck(p);
- erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS);
- p->status = p->gcstatus;
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
+ erts_smp_atomic32_read_band_nob(&p->state, ~ERTS_PSFLG_GC);
}
@@ -630,10 +620,7 @@ erts_garbage_collect_literals(Process* p, Eterm* literals,
/*
* Set GC state.
*/
- erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS);
- p->gcstatus = p->status;
- p->status = P_GARBING;
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
+ erts_smp_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC);
/*
* We assume that the caller has already done a major collection
@@ -775,9 +762,7 @@ erts_garbage_collect_literals(Process* p, Eterm* literals,
/*
* Restore status.
*/
- erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS);
- p->status = p->gcstatus;
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
+ erts_smp_atomic32_read_band_nob(&p->state, ~ERTS_PSFLG_GC);
}
static int
diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c
index 306cb652c4..175695e856 100644
--- a/erts/emulator/beam/erl_lock_check.c
+++ b/erts/emulator/beam/erl_lock_check.c
@@ -82,7 +82,6 @@ static erts_lc_lock_order_t erts_lock_order[] = {
#ifdef ERTS_SMP
{ "bif_timers", NULL },
{ "reg_tab", NULL },
- { "migration_info_update", NULL },
{ "proc_main", "pid" },
#ifdef HIPE
{ "hipe_mfait_lock", NULL },
@@ -124,6 +123,7 @@ static erts_lc_lock_order_t erts_lock_order[] = {
{ "removed_fd_pre_alloc_lock", "address" },
{ "state_prealloc", NULL },
{ "schdlr_sspnd", NULL },
+ { "migration_info_update", NULL },
{ "run_queue", "address" },
{ "cpu_info", NULL },
{ "pollset", "address" },
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index 4a797fbeae..b10964da52 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -297,38 +297,6 @@ erts_msg_distext2heap(Process *pp,
return THE_NON_VALUE;
}
-static ERTS_INLINE void
-notify_new_message(Process *receiver)
-{
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS
- & erts_proc_lc_my_proc_locks(receiver));
-
- ACTIVATE(receiver);
-
- switch (receiver->status) {
- case P_GARBING:
- switch (receiver->gcstatus) {
- case P_SUSPENDED:
- goto suspended;
- case P_WAITING:
- goto waiting;
- default:
- break;
- }
- break;
- case P_SUSPENDED:
- suspended:
- receiver->rstatus = P_RUNABLE;
- break;
- case P_WAITING:
- waiting:
- erts_add_to_runq(receiver);
- break;
- default:
- break;
- }
-}
-
void
erts_queue_dist_message(Process *rcvr,
ErtsProcLocks *rcvr_locks,
@@ -342,7 +310,7 @@ erts_queue_dist_message(Process *rcvr,
Sint tok_serial = 0;
#endif
#ifdef ERTS_SMP
- ErtsProcLocks need_locks;
+ erts_aint_t state;
#endif
ERTS_SMP_LC_ASSERT(*rcvr_locks == erts_proc_lc_my_proc_locks(rcvr));
@@ -350,20 +318,21 @@ erts_queue_dist_message(Process *rcvr,
mp = message_alloc();
#ifdef ERTS_SMP
- need_locks = ~(*rcvr_locks) & (ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS);
- if (need_locks) {
- *rcvr_locks |= need_locks;
- if (erts_smp_proc_trylock(rcvr, need_locks) == EBUSY) {
- if (need_locks == ERTS_PROC_LOCK_MSGQ) {
+ if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) {
+ if (erts_smp_proc_trylock(rcvr, ERTS_PROC_LOCK_MSGQ) == EBUSY) {
+ ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ;
+ if (*rcvr_locks & ERTS_PROC_LOCK_STATUS) {
erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_STATUS);
- need_locks = (ERTS_PROC_LOCK_MSGQ
- | ERTS_PROC_LOCK_STATUS);
+ need_locks |= ERTS_PROC_LOCK_STATUS;
}
erts_smp_proc_lock(rcvr, need_locks);
}
}
- if (ERTS_PROC_IS_EXITING(rcvr) || ERTS_PROC_PENDING_EXIT(rcvr)) {
+ state = erts_smp_atomic32_read_acqb(&rcvr->state);
+ if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) {
+ if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
+ erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
/* Drop message if receiver is exiting or has a pending exit ... */
if (is_not_nil(token)) {
ErlHeapFragment *heap_frag;
@@ -379,6 +348,8 @@ erts_queue_dist_message(Process *rcvr,
/* Ahh... need to decode it in order to trace it... */
ErlHeapFragment *mbuf;
Eterm msg;
+ if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
+ erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
message_free(mp);
msg = erts_msg_distext2heap(rcvr, rcvr_locks, &mbuf, &token, dist_ext);
if (is_value(msg))
@@ -440,26 +411,32 @@ erts_queue_dist_message(Process *rcvr,
mp->data.dist_ext = dist_ext;
LINK_MESSAGE(rcvr, mp);
- notify_new_message(rcvr);
+ if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
+ erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
+
+ erts_proc_notify_new_message(rcvr);
}
}
/* Add a message last in message queue */
-void
-erts_queue_message(Process* receiver,
- ErtsProcLocks *receiver_locks,
- ErlHeapFragment* bp,
- Eterm message,
- Eterm seq_trace_token
+static int
+queue_message(Process *c_p,
+ Process* receiver,
+ ErtsProcLocks *receiver_locks,
+ erts_aint32_t *receiver_state,
+ ErlHeapFragment* bp,
+ Eterm message,
+ Eterm seq_trace_token
#ifdef USE_VM_PROBES
, Eterm dt_utag
#endif
-)
+ )
{
ErlMessage* mp;
-#ifdef ERTS_SMP
- ErtsProcLocks need_locks;
-#else
+ int locked_msgq = 0;
+ erts_aint_t state;
+
+#ifndef ERTS_SMP
ASSERT(bp != NULL || receiver->mbuf == NULL);
#endif
@@ -467,31 +444,45 @@ erts_queue_message(Process* receiver,
mp = message_alloc();
+ if (receiver_state)
+ state = *receiver_state;
+ else
+ state = erts_smp_atomic32_read_acqb(&receiver->state);
+
#ifdef ERTS_SMP
- need_locks = ~(*receiver_locks) & (ERTS_PROC_LOCK_MSGQ
- | ERTS_PROC_LOCK_STATUS);
- if (need_locks) {
- *receiver_locks |= need_locks;
- if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) {
- if (need_locks == ERTS_PROC_LOCK_MSGQ) {
+
+ if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))
+ goto exiting;
+
+ if (!(*receiver_locks & ERTS_PROC_LOCK_MSGQ)) {
+ if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) {
+ ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ;
+ if (*receiver_locks & ERTS_PROC_LOCK_STATUS) {
erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS);
- need_locks = (ERTS_PROC_LOCK_MSGQ
- | ERTS_PROC_LOCK_STATUS);
+ need_locks |= ERTS_PROC_LOCK_STATUS;
}
erts_smp_proc_lock(receiver, need_locks);
}
+ locked_msgq = 1;
+ state = erts_smp_atomic32_read_nob(&receiver->state);
+ if (receiver_state)
+ *receiver_state = state;
}
- if (ERTS_PROC_IS_EXITING(receiver) || ERTS_PROC_PENDING_EXIT(receiver)) {
- /* Drop message if receiver is exiting or has a pending
- * exit ...
- */
+#endif
+
+ if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) {
+#ifdef ERTS_SMP
+ exiting:
+#endif
+ /* Drop message if receiver is exiting or has a pending exit... */
+ if (locked_msgq)
+ erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
if (bp)
free_message_buffer(bp);
message_free(mp);
- return;
+ return 0;
}
-#endif
ERL_MESSAGE_TERM(mp) = message;
ERL_MESSAGE_TOKEN(mp) = seq_trace_token;
@@ -514,12 +505,11 @@ erts_queue_message(Process* receiver,
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);
LINK_MESSAGE_PRIVQ(receiver, mp);
}
- else {
+ else
+#endif
+ {
LINK_MESSAGE(receiver, mp);
}
-#else
- LINK_MESSAGE(receiver, mp);
-#endif
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(message_queued)) {
@@ -539,15 +529,43 @@ erts_queue_message(Process* receiver,
tok_label, tok_lastcnt, tok_serial);
}
#endif
- notify_new_message(receiver);
- if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
+ if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE))
trace_receive(receiver, message);
- }
+
+ if (locked_msgq)
+ erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
+
+ erts_proc_notify_new_message(receiver);
#ifndef ERTS_SMP
ERTS_HOLE_CHECK(receiver);
#endif
+ return 0;
+}
+
+void
+erts_queue_message(Process* receiver,
+ ErtsProcLocks *receiver_locks,
+ ErlHeapFragment* bp,
+ Eterm message,
+ Eterm seq_trace_token
+#ifdef USE_VM_PROBES
+ , Eterm dt_utag
+#endif
+ )
+{
+ queue_message(NULL,
+ receiver,
+ receiver_locks,
+ NULL,
+ bp,
+ message,
+ seq_trace_token
+#ifdef USE_VM_PROBES
+ , dt_utag
+#endif
+ );
}
void
@@ -1025,11 +1043,8 @@ erts_send_message(Process* sender,
LINK_MESSAGE(receiver, mp);
ACTIVATE(receiver);
- if (receiver->status == P_WAITING) {
- erts_add_to_runq(receiver);
- } else if (receiver->status == P_SUSPENDED) {
- receiver->rstatus = P_RUNABLE;
- }
+ erts_proc_notify_new_message(receiver);
+
if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
trace_receive(receiver, message);
}
@@ -1089,21 +1104,34 @@ erts_send_message(Process* sender,
#ifdef ERTS_SMP
ErlOffHeap *ohp;
Eterm *hp;
+ erts_aint32_t state;
+
BM_SWAP_TIMER(send,size);
msize = size_object(message);
BM_SWAP_TIMER(size,send);
- hp = erts_alloc_message_heap(msize,&bp,&ohp,receiver,receiver_locks);
+ hp = erts_alloc_message_heap_state(msize,
+ &bp,
+ &ohp,
+ receiver,
+ receiver_locks,
+ &state);
BM_SWAP_TIMER(send,copy);
message = copy_struct(message, msize, &hp, ohp);
BM_MESSAGE_COPIED(msz);
BM_SWAP_TIMER(copy,send);
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
- erts_queue_message(receiver, receiver_locks, bp, message, token
+ queue_message(sender,
+ receiver,
+ receiver_locks,
+ &state,
+ bp,
+ message,
+ token
#ifdef USE_VM_PROBES
- , NIL
+ , NIL
#endif
- );
+ );
BM_SWAP_TIMER(send,system);
#else
ErlMessage* mp = message_alloc();
@@ -1134,17 +1162,13 @@ erts_send_message(Process* sender,
mp->data.attached = NULL;
LINK_MESSAGE(receiver, mp);
- if (receiver->status == P_WAITING) {
- erts_add_to_runq(receiver);
- } else if (receiver->status == P_SUSPENDED) {
- receiver->rstatus = P_RUNABLE;
- }
+ erts_proc_notify_new_message(receiver);
+
if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
trace_receive(receiver, message);
}
BM_SWAP_TIMER(send,system);
#endif /* #ifndef ERTS_SMP */
- return;
#endif /* HYBRID */
}
}
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index 0f1a0d441a..86454fe1fa 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -56,12 +56,6 @@
#define ERTS_PORT_IS_IN_RUNQ(RQ, P) \
((P)->sched.next || (P)->sched.prev || (RQ)->ports.start == (P))
-#define ERTS_PORT_NOT_IN_RUNQ(P) \
-do { \
- (P)->sched.prev = NULL; \
- (P)->sched.next = NULL; \
-} while (0)
-
#ifdef USE_VM_PROBES
#define DTRACE_DRIVER(PROBE_NAME, PP) \
if (DTRACE_ENABLED(driver_ready_input)) { \
@@ -167,7 +161,7 @@ enqueue_port(ErtsRunQueue *runq, Port *pp)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
pp->sched.next = NULL;
- pp->sched.prev = runq->ports.end;
+ pp->sched.in_runq = 1;
if (runq->ports.end) {
ASSERT(runq->ports.start);
runq->ports.end->sched.next = pp;
@@ -177,39 +171,10 @@ enqueue_port(ErtsRunQueue *runq, Port *pp)
runq->ports.start = pp;
}
- runq->ports.info.len++;
- if (runq->ports.info.max_len < runq->ports.info.len)
- runq->ports.info.max_len = runq->ports.info.len;
- runq->len++;
- if (runq->max_len < runq->len)
- runq->max_len = runq->len;
runq->ports.end = pp;
ASSERT(runq->ports.start && runq->ports.end);
-}
-static ERTS_INLINE void
-dequeue_port(ErtsRunQueue *runq, Port *pp)
-{
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
- if (pp->sched.next)
- pp->sched.next->sched.prev = pp->sched.prev;
- else {
- ASSERT(runq->ports.end == pp);
- runq->ports.end = pp->sched.prev;
- }
- if (pp->sched.prev)
- pp->sched.prev->sched.next = pp->sched.next;
- else {
- ASSERT(runq->ports.start == pp);
- runq->ports.start = pp->sched.next;
- }
-
- ASSERT(runq->ports.info.len > 0);
- runq->ports.info.len--;
- ASSERT(runq->len > 0);
- runq->len--;
- ASSERT(runq->ports.start || !runq->ports.end);
- ASSERT(runq->ports.end || !runq->ports.start);
+ erts_smp_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
}
static ERTS_INLINE Port *
@@ -222,16 +187,11 @@ pop_port(ErtsRunQueue *runq)
}
else {
runq->ports.start = runq->ports.start->sched.next;
- if (runq->ports.start)
- runq->ports.start->sched.prev = NULL;
- else {
+ if (!runq->ports.start) {
ASSERT(runq->ports.end == pp);
runq->ports.end = NULL;
}
- ASSERT(runq->ports.info.len > 0);
- runq->ports.info.len--;
- ASSERT(runq->len > 0);
- runq->len--;
+ erts_smp_dec_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
}
ASSERT(runq->ports.start || !runq->ports.end);
@@ -285,7 +245,7 @@ check_port_queue(ErtsRunQueue *runq, Port *chk_pp, int inq)
}
ASSERT(no_forward == no_backward);
}
- ASSERT(no_forward == runq->ports.info.len);
+ ASSERT(no_forward == RUNQ_READ_LEN(&runq->ports.info.len));
if (chk_pp) {
if (chk_pp->sched.taskq || chk_pp->sched.exe_taskq) {
ASSERT(chk_pp->sched.taskq != chk_pp->sched.exe_taskq);
@@ -467,10 +427,11 @@ erts_port_task_abort(Eterm id, ErtsPortTaskHandle *pthp)
ErtsPortTaskQueue *ptqp;
ErtsPortTask *ptp;
Port *pp;
- int port_is_dequeued = 0;
pp = &erts_port[internal_port_index(id)];
runq = erts_port_runq(pp);
+ if (!runq)
+ return 1;
ptp = handle2task(pthp);
@@ -505,22 +466,12 @@ erts_port_task_abort(Eterm id, ErtsPortTaskHandle *pthp)
if (ptqp->first || pp->sched.taskq != ptqp)
ptqp = NULL;
- else {
+ else
pp->sched.taskq = NULL;
- if (!pp->sched.exe_taskq) {
- dequeue_port(runq, pp);
- ERTS_PORT_NOT_IN_RUNQ(pp);
- port_is_dequeued = 1;
- }
- }
ERTS_PT_CHK_PRES_PORTQ(runq, pp);
erts_smp_runq_unlock(runq);
-
- if (erts_system_profile_flags.runnable_ports && port_is_dequeued) {
- profile_runnable_port(pp, am_inactive);
- }
port_task_free(ptp);
if (ptqp)
@@ -575,7 +526,7 @@ erts_port_task_schedule(Eterm id,
if (!pp->sched.taskq) {
pp->sched.taskq = port_taskq_init(port_taskq_alloc(), pp);
- enq_port = !pp->sched.exe_taskq;
+ enq_port = !pp->sched.in_runq && !pp->sched.exe_taskq;
}
#ifdef ERTS_SMP
@@ -585,13 +536,13 @@ erts_port_task_schedule(Eterm id,
/* Port emigrated ... */
erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
erts_smp_runq_unlock(runq);
- runq = xrunq;
+ runq = erts_port_runq(pp);
+ if (!runq)
+ return -1;
}
}
#endif
- ASSERT(!enq_port || !(runq->flags & ERTS_RUNQ_FLG_SUSPENDED));
-
ASSERT(pp->sched.taskq);
ASSERT(ptp);
@@ -633,7 +584,7 @@ erts_port_task_schedule(Eterm id,
#elif defined(DEBUG)
if (!enq_port && !pp->sched.exe_taskq) {
/* We should be in port run q */
- ASSERT(pp->sched.prev || runq->ports.start == pp);
+ ASSERT(pp->sched.in_runq);
}
#endif
#endif
@@ -661,63 +612,54 @@ void
erts_port_task_free_port(Port *pp)
{
ErtsRunQueue *runq;
- int port_is_dequeued = 0;
+ ErtsPortTaskQueue *ptqp;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
ASSERT(!(pp->status & ERTS_PORT_SFLGS_DEAD));
runq = erts_port_runq(pp);
ASSERT(runq);
ERTS_PT_CHK_PRES_PORTQ(runq, pp);
- if (pp->sched.exe_taskq) {
+ ptqp = pp->sched.exe_taskq;
+ if (ptqp) {
/* I (this thread) am currently executing this port, free it
when scheduled out... */
- ErtsPortTask *ptp = port_task_alloc();
+ ErtsPortTask *ptp;
+ enqueue_free:
+ ptp = port_task_alloc();
erts_smp_port_state_lock(pp);
pp->status &= ~ERTS_PORT_SFLG_CLOSING;
pp->status |= ERTS_PORT_SFLG_FREE_SCHEDULED;
erts_may_save_closed_port(pp);
erts_smp_port_state_unlock(pp);
- ERTS_SMP_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 1);
+ ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 1);
ptp->type = ERTS_PORT_TASK_FREE;
ptp->event = (ErlDrvEvent) -1;
ptp->event_data = NULL;
set_handle(ptp, NULL);
- push_task(pp->sched.exe_taskq, ptp);
+ push_task(ptqp, ptp);
ERTS_PT_CHK_PRES_PORTQ(runq, pp);
erts_smp_runq_unlock(runq);
}
else {
- ErtsPortTaskQueue *ptqp = pp->sched.taskq;
- if (ptqp) {
- dequeue_port(runq, pp);
- ERTS_PORT_NOT_IN_RUNQ(pp);
- port_is_dequeued = 1;
+ if (pp->sched.in_runq) {
+ ptqp = pp->sched.taskq;
+ if (!ptqp)
+ pp->sched.taskq = ptqp = port_taskq_init(port_taskq_alloc(), pp);
+ goto enqueue_free;
}
+ ASSERT(!pp->sched.taskq);
erts_smp_port_state_lock(pp);
pp->status &= ~ERTS_PORT_SFLG_CLOSING;
pp->status |= ERTS_PORT_SFLG_FREE_SCHEDULED;
erts_may_save_closed_port(pp);
erts_smp_port_state_unlock(pp);
-#ifdef ERTS_SMP
erts_smp_atomic_dec_nob(&pp->refc); /* Not alive */
-#endif
- ERTS_SMP_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 0); /* Lock */
+ ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 0); /* Lock */
handle_remaining_tasks(runq, pp); /* May release runq lock */
ASSERT(!pp->sched.exe_taskq && (!ptqp || !ptqp->first));
pp->sched.taskq = NULL;
ERTS_PT_CHK_PRES_PORTQ(runq, pp);
-#ifndef ERTS_SMP
- ASSERT(pp->status & ERTS_PORT_SFLG_PORT_DEBUG);
- erts_port_status_set(pp, ERTS_PORT_SFLG_FREE);
-#endif
erts_smp_runq_unlock(runq);
-
- if (erts_system_profile_flags.runnable_ports && port_is_dequeued) {
- profile_runnable_port(pp, am_inactive);
- }
-
- if (ptqp)
- port_taskq_free(ptqp);
}
}
@@ -738,7 +680,6 @@ typedef struct {
int
erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
- int port_was_enqueued = 0;
Port *pp;
ErtsPortTaskQueue *ptqp;
ErtsPortTask *ptp;
@@ -757,11 +698,18 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
goto done;
}
- ERTS_PORT_NOT_IN_RUNQ(pp);
+ ASSERT(pp->sched.in_runq);
+ pp->sched.in_runq = 0;
+ if (!pp->sched.taskq) {
+ if (erts_system_profile_flags.runnable_ports)
+ profile_runnable_port(pp, am_inactive);
+ res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
+ != (erts_aint_t) 0);
+ goto done;
+ }
*curr_port_pp = pp;
- ASSERT(pp->sched.taskq);
ASSERT(pp->sched.taskq->first);
ptqp = pp->sched.taskq;
pp->sched.taskq = NULL;
@@ -823,12 +771,8 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
handle_remaining_tasks(runq, pp);
ASSERT(!ptqp->first
&& (!pp->sched.taskq || !pp->sched.taskq->first));
-#ifdef ERTS_SMP
erts_smp_atomic_dec_nob(&pp->refc); /* Not alive */
- ERTS_SMP_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 0); /* Lock */
-#else
- erts_port_status_bor_set(pp, ERTS_PORT_SFLG_FREE);
-#endif
+ ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 0); /* Lock */
port_task_free(ptp);
if (pp->sched.taskq)
@@ -918,6 +862,8 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
if (!pp->sched.taskq) {
ASSERT(pp->sched.exe_taskq);
pp->sched.exe_taskq = NULL;
+ if (erts_system_profile_flags.runnable_ports)
+ profile_runnable_port(pp, am_inactive);
}
else {
#ifdef ERTS_SMP
@@ -940,14 +886,20 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
else {
/* Port emigrated ... */
erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
- enqueue_port(xrunq, pp);
- ASSERT(pp->sched.exe_taskq);
- pp->sched.exe_taskq = NULL;
- erts_smp_runq_unlock(xrunq);
- erts_smp_notify_inc_runq(xrunq);
+ erts_smp_runq_unlock(runq);
+
+ xrunq = erts_port_runq(pp);
+ if (xrunq) {
+ enqueue_port(xrunq, pp);
+ ASSERT(pp->sched.exe_taskq);
+ pp->sched.exe_taskq = NULL;
+ erts_smp_runq_unlock(xrunq);
+ erts_smp_notify_inc_runq(xrunq);
+ }
+
+ erts_smp_runq_lock(runq);
}
#endif
- port_was_enqueued = 1;
}
res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
@@ -957,10 +909,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
port_taskq_free(ptqp);
- if (erts_system_profile_flags.runnable_ports && (port_was_enqueued != 1)) {
- profile_runnable_port(pp, am_inactive);
- }
-
/* trace port scheduling, out */
if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
trace_sched_ports(pp, am_out);
@@ -1049,63 +997,33 @@ erts_port_is_scheduled(Port *pp)
{
int res;
ErtsRunQueue *runq = erts_port_runq(pp);
+ if (!runq)
+ return 0;
res = pp->sched.taskq || pp->sched.exe_taskq;
erts_smp_runq_unlock(runq);
return res;
}
#ifdef ERTS_SMP
+void
+erts_enqueue_port(ErtsRunQueue *rq, Port *pp)
+{
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+ ASSERT(rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
+ ASSERT(pp->sched.in_runq);
+ enqueue_port(rq, pp);
+}
-ErtsMigrateResult
-erts_port_migrate(Port *prt, int *prt_locked,
- ErtsRunQueue *from_rq, int *from_locked,
- ErtsRunQueue *to_rq, int *to_locked)
+Port *
+erts_dequeue_port(ErtsRunQueue *rq)
{
- ERTS_SMP_LC_ASSERT(*from_locked);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked);
-
- if (!*from_locked || !*to_locked) {
- if (from_rq < to_rq) {
- if (!*to_locked) {
- if (!*from_locked)
- erts_smp_runq_lock(from_rq);
- erts_smp_runq_lock(to_rq);
- }
- else if (erts_smp_runq_trylock(from_rq) == EBUSY) {
- erts_smp_runq_unlock(to_rq);
- erts_smp_runq_lock(from_rq);
- erts_smp_runq_lock(to_rq);
- }
- }
- else {
- if (!*from_locked) {
- if (!*to_locked)
- erts_smp_runq_lock(to_rq);
- erts_smp_runq_lock(from_rq);
- }
- else if (erts_smp_runq_trylock(to_rq) == EBUSY) {
- erts_smp_runq_unlock(from_rq);
- erts_smp_runq_lock(to_rq);
- erts_smp_runq_lock(from_rq);
- }
- }
- *to_locked = *from_locked = 1;
- }
- ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked);
-
- /* Refuse to migrate to a suspended run queue */
- if (to_rq->flags & ERTS_RUNQ_FLG_SUSPENDED)
- return ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED;
- if (from_rq != (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue))
- return ERTS_MIGRATE_FAILED_RUNQ_CHANGED;
- if (!ERTS_PORT_IS_IN_RUNQ(from_rq, prt))
- return ERTS_MIGRATE_FAILED_NOT_IN_RUNQ;
- dequeue_port(from_rq, prt);
- erts_smp_atomic_set_nob(&prt->run_queue, (erts_aint_t) to_rq);
- enqueue_port(to_rq, prt);
- return ERTS_MIGRATE_SUCCESS;
+ Port *pp;
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+ pp = pop_port(rq);
+ ASSERT(!pp
+ || rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
+ ASSERT(!pp || pp->sched.in_runq);
+ return pp;
}
#endif
diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h
index d7104e1143..fd88b1c1ff 100644
--- a/erts/emulator/beam/erl_port_task.h
+++ b/erts/emulator/beam/erl_port_task.h
@@ -62,7 +62,7 @@ typedef struct ErtsPortTaskQueue_ ErtsPortTaskQueue;
typedef struct {
Port *next;
- Port *prev;
+ int in_runq;
ErtsPortTaskQueue *taskq;
ErtsPortTaskQueue *exe_taskq;
} ErtsPortTaskSched;
@@ -92,7 +92,7 @@ ERTS_GLB_INLINE void
erts_port_task_init_sched(ErtsPortTaskSched *ptsp)
{
ptsp->next = NULL;
- ptsp->prev = NULL;
+ ptsp->in_runq = 0;
ptsp->taskq = NULL;
ptsp->exe_taskq = NULL;
}
@@ -123,13 +123,10 @@ int erts_port_task_schedule(Eterm,
ErlDrvEventData);
void erts_port_task_free_port(Port *);
int erts_port_is_scheduled(Port *);
+
#ifdef ERTS_SMP
-ErtsMigrateResult erts_port_migrate(Port *,
- int *,
- ErtsRunQueue *,
- int *,
- ErtsRunQueue *,
- int *);
+void erts_enqueue_port(ErtsRunQueue *rq, Port *pp);
+Port *erts_dequeue_port(ErtsRunQueue *rq);
#endif
#undef ERTS_INCLUDE_SCHEDULER_INTERNALS
#endif /* ERL_PORT_TASK_H__ */
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 7174991ade..ba46b9011d 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -94,12 +94,44 @@
#define HIGH_BIT (1 << PRIORITY_HIGH)
#define NORMAL_BIT (1 << PRIORITY_NORMAL)
#define LOW_BIT (1 << PRIORITY_LOW)
+#define PORT_BIT (1 << ERTS_PORT_PRIO_LEVEL)
-#define ERTS_EMPTY_RUNQ(RQ) \
- ((RQ)->len == 0 && (RQ)->misc.start == NULL)
+#define ERTS_EMPTY_RUNQ(RQ) \
+ ((ERTS_RUNQ_FLGS_GET_NOB((RQ)) & ERTS_RUNQ_FLGS_QMASK) == 0 \
+ && (RQ)->misc.start == NULL)
+
+#undef RUNQ_READ_RQ
+#undef RUNQ_SET_RQ
+#define RUNQ_READ_RQ(X) ((ErtsRunQueue *) erts_smp_atomic_read_nob((X)))
+#define RUNQ_SET_RQ(X, RQ) erts_smp_atomic_set_nob((X), (erts_aint_t) (RQ))
+
+#ifdef DEBUG
+# if defined(ARCH_64) && !HALFWORD_HEAP
+# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) \
+ (RUNQ_SET_RQ((RQP), (0xdeadbeefdead0003LL | ((N) << 4)))
+# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) \
+do { \
+ ASSERT((RQP) != NULL); \
+ ASSERT(((((Uint) (RQP)) & ((Uint) 0x3))) == ((Uint) 0)); \
+ ASSERT((((Uint) (RQP)) & ~((Uint) 0xffff)) != ((Uint) 0xdeadbeefdead0000LL));\
+} while (0)
+# else
+# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) \
+ (RUNQ_SET_RQ((RQP), (0xdead0003 | ((N) << 4))))
+# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) \
+do { \
+ ASSERT((RQP) != NULL); \
+ ASSERT(((((UWord) (RQP)) & ((UWord) 1))) == ((UWord) 0)); \
+ ASSERT((((UWord) (RQP)) & ~((UWord) 0xffff)) != ((UWord) 0xdead0000)); \
+} while (0)
+# endif
+#else
+# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N)
+# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP)
+#endif
#define ERTS_EMPTY_RUNQ_PORTS(RQ) \
- ((RQ)->ports.info.len == 0 && (RQ)->misc.start == NULL)
+ (RUNQ_READ_LEN(&(RQ)->ports.info.len) == 0 && (RQ)->misc.start == NULL)
extern BeamInstr beam_apply[];
extern BeamInstr beam_exit[];
@@ -312,7 +344,7 @@ static struct {
struct {
int active_runqs;
int reds;
- int max_len;
+ erts_aint32_t max_len;
} prev_rise;
Uint n;
} balance_info;
@@ -1884,8 +1916,8 @@ sched_waiting_sys(Uint no, ErtsRunQueue *rq)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
ASSERT(rq->waiting >= 0);
- rq->flags |= (ERTS_RUNQ_FLG_OUT_OF_WORK
- | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK);
+ ERTS_RUNQ_FLGS_SET(rq, (ERTS_RUNQ_FLG_OUT_OF_WORK
+ | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK));
rq->waiting++;
rq->waiting *= -1;
rq->woken = 0;
@@ -1961,8 +1993,8 @@ static ERTS_INLINE void
sched_waiting(Uint no, ErtsRunQueue *rq)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
- rq->flags |= (ERTS_RUNQ_FLG_OUT_OF_WORK
- | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK);
+ ERTS_RUNQ_FLGS_SET(rq, (ERTS_RUNQ_FLG_OUT_OF_WORK
+ | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK));
if (rq->waiting < 0)
rq->waiting--;
else
@@ -1994,9 +2026,8 @@ ongoing_multi_scheduling_block(void)
static ERTS_INLINE void
empty_runq(ErtsRunQueue *rq)
{
- erts_aint32_t oifls = erts_smp_atomic32_read_band_nob(&rq->info_flags,
- ~ERTS_RUNQ_IFLG_NONEMPTY);
- if (oifls & ERTS_RUNQ_IFLG_NONEMPTY) {
+ Uint32 old_flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_NONEMPTY|ERTS_RUNQ_FLG_PROTECTED);
+ if (old_flags & ERTS_RUNQ_FLG_NONEMPTY) {
#ifdef DEBUG
erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues);
/*
@@ -2012,9 +2043,8 @@ empty_runq(ErtsRunQueue *rq)
static ERTS_INLINE void
non_empty_runq(ErtsRunQueue *rq)
{
- erts_aint32_t oifls = erts_smp_atomic32_read_bor_nob(&rq->info_flags,
- ERTS_RUNQ_IFLG_NONEMPTY);
- if (!(oifls & ERTS_RUNQ_IFLG_NONEMPTY)) {
+ Uint32 old_flags = ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_NONEMPTY);
+ if (!(old_flags & ERTS_RUNQ_FLG_NONEMPTY)) {
#ifdef DEBUG
erts_aint32_t empty = erts_smp_atomic32_read_nob(&no_empty_run_queues);
/*
@@ -2627,19 +2657,16 @@ try_inc_no_active_runqs(int active)
static ERTS_INLINE int
chk_wake_sched(ErtsRunQueue *crq, int ix, int activate)
{
- erts_aint32_t iflgs;
+ Uint32 flags;
ErtsRunQueue *wrq;
if (crq->ix == ix)
return 0;
wrq = ERTS_RUNQ_IX(ix);
- iflgs = erts_smp_atomic32_read_nob(&wrq->info_flags);
- if (!(iflgs & (ERTS_RUNQ_IFLG_SUSPENDED|ERTS_RUNQ_IFLG_NONEMPTY))) {
+ flags = ERTS_RUNQ_FLGS_GET(wrq);
+ if (!(flags & (ERTS_RUNQ_FLG_SUSPENDED|ERTS_RUNQ_FLG_NONEMPTY))) {
if (activate) {
- if (try_inc_no_active_runqs(ix+1)) {
- erts_smp_xrunq_lock(crq, wrq);
- wrq->flags &= ~ERTS_RUNQ_FLG_INACTIVE;
- erts_smp_xrunq_unlock(crq, wrq);
- }
+ if (try_inc_no_active_runqs(ix+1))
+ ERTS_RUNQ_FLGS_UNSET(wrq, ERTS_RUNQ_FLG_INACTIVE);
}
wake_scheduler(wrq, 0);
return 1;
@@ -2706,9 +2733,7 @@ erts_sched_notify_check_cpu_bind(void)
int ix;
for (ix = 0; ix < erts_no_run_queues; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- erts_smp_runq_lock(rq);
- rq->flags |= ERTS_RUNQ_FLG_CHK_CPU_BIND;
- erts_smp_runq_unlock(rq);
+ ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_CHK_CPU_BIND);
wake_scheduler(rq, 0);
}
#else
@@ -2717,409 +2742,547 @@ erts_sched_notify_check_cpu_bind(void)
}
-#ifdef ERTS_SMP
+static ERTS_INLINE void
+enqueue_process(ErtsRunQueue *runq, int prio, Process *p)
+{
+ ErtsRunPrioQueue *rpq;
-ErtsRunQueue *
-erts_prepare_emigrate(ErtsRunQueue *c_rq, ErtsRunQueueInfo *c_rqi, int prio)
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
+
+ erts_smp_inc_runq_len(runq, &runq->procs.prio_info[prio], prio);
+
+ if (prio == PRIORITY_LOW) {
+ p->schedule_count = RESCHEDULE_LOW;
+ rpq = &runq->procs.prio[PRIORITY_NORMAL];
+ }
+ else {
+ p->schedule_count = 1;
+ rpq = &runq->procs.prio[prio];
+ }
+
+ p->next = NULL;
+ if (rpq->last)
+ rpq->last->next = p;
+ else
+ rpq->first = p;
+ rpq->last = p;
+}
+
+
+static ERTS_INLINE void
+unqueue_process(ErtsRunQueue *runq,
+ ErtsRunPrioQueue *rpq,
+ ErtsRunQueueInfo *rqi,
+ int prio,
+ Process *prev_proc,
+ Process *proc)
{
- ASSERT(ERTS_CHK_RUNQ_FLG_EMIGRATE(c_rq->flags, prio));
- ASSERT(ERTS_CHK_RUNQ_FLG_EVACUATE(c_rq->flags, prio)
- || c_rqi->len >= c_rqi->migrate.limit.this);
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
- while (1) {
- ErtsRunQueue *n_rq = c_rqi->migrate.runq;
- ERTS_DBG_VERIFY_VALID_RUNQP(n_rq);
- erts_smp_xrunq_lock(c_rq, n_rq);
-
- /*
- * erts_smp_xrunq_lock() may release lock on c_rq! We have
- * to check that we still want to emigrate and emigrate
- * to the same run queue as before.
- */
+ if (prev_proc)
+ prev_proc->next = proc->next;
+ else
+ rpq->first = proc->next;
+ if (!proc->next)
+ rpq->last = prev_proc;
- if (ERTS_CHK_RUNQ_FLG_EMIGRATE(c_rq->flags, prio)) {
- Uint32 force = (ERTS_CHK_RUNQ_FLG_EVACUATE(c_rq->flags, prio)
- | (c_rq->flags & ERTS_RUNQ_FLG_INACTIVE));
- if (force || c_rqi->len > c_rqi->migrate.limit.this) {
- ErtsRunQueueInfo *n_rqi;
- /* We still want to emigrate */
-
- if (n_rq != c_rqi->migrate.runq) {
- /* Ahh... run queue changed; need to do it all over again... */
- erts_smp_runq_unlock(n_rq);
- continue;
- }
- else {
+ if (!rpq->first)
+ rpq->last = NULL;
- if (prio == ERTS_PORT_PRIO_LEVEL)
- n_rqi = &n_rq->ports.info;
- else
- n_rqi = &n_rq->procs.prio_info[prio];
+ erts_smp_dec_runq_len(runq, rqi, prio);
+}
- if (force || (n_rqi->len < c_rqi->migrate.limit.other)) {
- /* emigrate ... */
- return n_rq;
- }
- }
- }
- }
- ASSERT(n_rq != c_rq);
- erts_smp_runq_unlock(n_rq);
- if (!(c_rq->flags & ERTS_RUNQ_FLG_INACTIVE)) {
- /* No more emigrations to this runq */
- ERTS_UNSET_RUNQ_FLG_EMIGRATE(c_rq->flags, prio);
- ERTS_DBG_SET_INVALID_RUNQP(c_rqi->migrate.runq, 0x3);
- }
+static ERTS_INLINE Process *
+dequeue_process(ErtsRunQueue *runq, int prio_q, erts_aint32_t *statep)
+{
+ erts_aint32_t state;
+ int prio;
+ ErtsRunPrioQueue *rpq;
+ ErtsRunQueueInfo *rqi;
+ Process *p;
+
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
+
+ ASSERT(PRIORITY_NORMAL == prio_q
+ || PRIORITY_HIGH == prio_q
+ || PRIORITY_MAX == prio_q);
+ rpq = &runq->procs.prio[prio_q];
+ p = rpq->first;
+ if (!p)
return NULL;
+
+ ERTS_SMP_DATA_DEPENDENCY_READ_MEMORY_BARRIER;
+
+ state = erts_smp_atomic32_read_nob(&p->state);
+ if (statep)
+ *statep = state;
+
+ prio = (int) (ERTS_PSFLG_PRIO_MASK & state);
+
+ rqi = &runq->procs.prio_info[prio];
+
+ if (p)
+ unqueue_process(runq, rpq, rqi, prio, NULL, p);
+
+ return p;
+}
+
+static ERTS_INLINE int
+check_requeue_process(ErtsRunQueue *rq, int prio_q)
+{
+ ErtsRunPrioQueue *rpq = &rq->procs.prio[prio_q];
+ Process *p = rpq->first;
+ if (--p->schedule_count > 0 && p != rpq->last) {
+ /* reschedule */
+ rpq->first = p->next;
+ rpq->last->next = p;
+ rpq->last = p;
+ p->next = NULL;
+ return 1;
+ }
+ return 0;
+}
+
+#ifdef ERTS_SMP
+
+static ErtsRunQueue *
+check_immigration_need(ErtsRunQueue *c_rq, ErtsMigrationPath *mp, int prio)
+{
+ int len;
+ Uint32 f_flags, f_rq_flags;
+ ErtsRunQueue *f_rq;
+
+ f_flags = mp->prio[prio].flags;
+
+ ASSERT(ERTS_CHK_RUNQ_FLG_IMMIGRATE(mp->flags, prio));
+
+ f_rq = mp->prio[prio].runq;
+ if (!f_rq)
+ return NULL;
+
+ f_rq_flags = ERTS_RUNQ_FLGS_GET(f_rq);
+ if (f_rq_flags & ERTS_RUNQ_FLG_PROTECTED)
+ return NULL;
+
+ if (ERTS_CHK_RUNQ_FLG_EVACUATE(f_flags, prio))
+ return f_rq;
+
+ if (f_rq_flags & ERTS_RUNQ_FLG_INACTIVE)
+ return f_rq;
+
+ if (prio == ERTS_PORT_PRIO_LEVEL)
+ len = RUNQ_READ_LEN(&c_rq->ports.info.len);
+ else
+ len = RUNQ_READ_LEN(&c_rq->procs.prio_info[prio].len);
+
+ if (len < mp->prio[prio].limit.this) {
+ if (prio == ERTS_PORT_PRIO_LEVEL)
+ len = RUNQ_READ_LEN(&f_rq->ports.info.len);
+ else
+ len = RUNQ_READ_LEN(&f_rq->procs.prio_info[prio].len);
+
+ if (len > mp->prio[prio].limit.other)
+ return f_rq;
}
+ return NULL;
}
static void
-immigrate(ErtsRunQueue *rq)
+immigrate(ErtsRunQueue *c_rq, ErtsMigrationPath *mp)
{
- int prio;
+ Uint32 iflags, iflag;
+ erts_smp_runq_unlock(c_rq);
- ASSERT(rq->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK);
+ ASSERT(erts_thr_progress_is_managed_thread());
- for (prio = 0; prio < ERTS_NO_PRIO_LEVELS; prio++) {
- if (ERTS_CHK_RUNQ_FLG_IMMIGRATE(rq->flags, prio)) {
- ErtsRunQueueInfo *rqi = (prio == ERTS_PORT_PRIO_LEVEL
- ? &rq->ports.info
- : &rq->procs.prio_info[prio]);
- ErtsRunQueue *from_rq = rqi->migrate.runq;
- int rq_locked, from_rq_locked;
+ iflags = mp->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK;
- ERTS_DBG_VERIFY_VALID_RUNQP(from_rq);
+ iflag = iflags & -iflags;
- rq_locked = 1;
- from_rq_locked = 1;
- erts_smp_xrunq_lock(rq, from_rq);
- /*
- * erts_smp_xrunq_lock() may release lock on rq! We have
- * to check that we still want to immigrate from the same
- * run queue as before.
- */
- if (ERTS_CHK_RUNQ_FLG_IMMIGRATE(rq->flags, prio)
- && from_rq == rqi->migrate.runq) {
- ErtsRunQueueInfo *from_rqi = (prio == ERTS_PORT_PRIO_LEVEL
- ? &from_rq->ports.info
- : &from_rq->procs.prio_info[prio]);
- if ((ERTS_CHK_RUNQ_FLG_EVACUATE(rq->flags, prio)
- && ERTS_CHK_RUNQ_FLG_EVACUATE(from_rq->flags, prio)
- && from_rqi->len)
- || (from_rqi->len > rqi->migrate.limit.other
- && rqi->len < rqi->migrate.limit.this)) {
- if (prio == ERTS_PORT_PRIO_LEVEL) {
- Port *prt = from_rq->ports.start;
- if (prt) {
- int prt_locked = 0;
- (void) erts_port_migrate(prt, &prt_locked,
- from_rq, &from_rq_locked,
- rq, &rq_locked);
- if (prt_locked)
- erts_smp_port_unlock(prt);
- }
- }
- else {
- Process *proc;
- ErtsRunPrioQueue *from_rpq;
- from_rpq = (prio == PRIORITY_LOW
- ? &from_rq->procs.prio[PRIORITY_NORMAL]
- : &from_rq->procs.prio[prio]);
- for (proc = from_rpq->first; proc; proc = proc->next)
- if (proc->prio == prio && !proc->bound_runq)
- break;
- if (proc) {
- ErtsProcLocks proc_locks = 0;
- (void) erts_proc_migrate(proc, &proc_locks,
- from_rq, &from_rq_locked,
- rq, &rq_locked);
- if (proc_locks)
- erts_smp_proc_unlock(proc, proc_locks);
- }
+ while (iflag) {
+ ErtsRunQueue *rq;
+ int prio;
+
+ switch (iflag) {
+ case (MAX_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT):
+ prio = PRIORITY_MAX;
+ break;
+ case (HIGH_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT):
+ prio = PRIORITY_HIGH;
+ break;
+ case (NORMAL_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT):
+ prio = PRIORITY_NORMAL;
+ break;
+ case (LOW_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT):
+ prio = PRIORITY_LOW;
+ break;
+ case (PORT_BIT << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT):
+ prio = ERTS_PORT_PRIO_LEVEL;
+ break;
+ default:
+ erl_exit(ERTS_ABORT_EXIT,
+ "%s:%d:%s(): Invalid immigrate queue mask",
+ __FILE__, __LINE__, __func__);
+ prio = 0;
+ break;
+ }
+
+ iflags &= ~iflag;
+ iflag = iflags & -iflags;
+
+ rq = check_immigration_need(c_rq, mp, prio);
+ if (rq) {
+ erts_smp_runq_lock(rq);
+ if (prio == ERTS_PORT_PRIO_LEVEL) {
+ Port *prt;
+ prt = erts_dequeue_port(rq);
+ if (prt)
+ RUNQ_SET_RQ(&prt->run_queue, c_rq);
+ erts_smp_runq_unlock(rq);
+ if (prt) {
+ /* port might terminate while we have no lock... */
+ rq = erts_port_runq(prt);
+ if (rq) {
+ if (rq != c_rq)
+ erl_exit(ERTS_ABORT_EXIT,
+ "%s:%d:%s(): Internal error",
+ __FILE__, __LINE__, __func__);
+ erts_enqueue_port(c_rq, prt);
+ if (!iflag)
+ return; /* done */
+ erts_smp_runq_unlock(c_rq);
}
}
- else {
- ERTS_UNSET_RUNQ_FLG_IMMIGRATE(rq->flags, prio);
- ERTS_DBG_SET_INVALID_RUNQP(rqi->migrate.runq, 0x1);
+ }
+ else {
+ ErtsRunPrioQueue *rpq = &rq->procs.prio[prio == PRIORITY_LOW
+ ? PRIORITY_NORMAL
+ : prio];
+ Process *prev_proc = NULL;
+ Process *proc = rpq->first;
+ int rq_locked = 1;
+
+ while (proc) {
+ erts_aint32_t state;
+ state = erts_smp_atomic32_read_acqb(&proc->state);
+ if (!(ERTS_PSFLG_BOUND & state)
+ && (prio == (int) (ERTS_PSFLG_PRIO_MASK & state))) {
+ ErtsRunQueueInfo *rqi = &rq->procs.prio_info[prio];
+ unqueue_process(rq, rpq, rqi, prio, prev_proc, proc);
+ erts_smp_runq_unlock(rq);
+ RUNQ_SET_RQ(&proc->run_queue, c_rq);
+ rq_locked = 0;
+
+ erts_smp_runq_lock(c_rq);
+ enqueue_process(c_rq, prio, proc);
+ if (!iflag)
+ return; /* done */
+ erts_smp_runq_unlock(c_rq);
+ break;
+ }
+ prev_proc = proc;
+ proc = proc->next;
}
+ if (rq_locked)
+ erts_smp_runq_unlock(rq);
}
- if (from_rq_locked)
- erts_smp_runq_unlock(from_rq);
- if (!rq_locked)
- erts_smp_runq_lock(rq);
}
}
+
+ erts_smp_runq_lock(c_rq);
+}
+
+static ERTS_INLINE void
+suspend_run_queue(ErtsRunQueue *rq)
+{
+ erts_smp_atomic32_read_bor_nob(&rq->scheduler->ssi->flags,
+ ERTS_SSI_FLG_SUSPENDED);
+ ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_SUSPENDED);
+
+ wake_scheduler(rq, 0);
+}
+
+static void scheduler_ix_resume_wake(Uint ix);
+
+static ERTS_INLINE void
+resume_run_queue(ErtsRunQueue *rq)
+{
+ int pix;
+
+ erts_smp_runq_lock(rq);
+
+ (void) ERTS_RUNQ_FLGS_MASK_SET(rq,
+ (ERTS_RUNQ_FLG_OUT_OF_WORK
+ | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK
+ | ERTS_RUNQ_FLG_SUSPENDED),
+ (ERTS_RUNQ_FLG_OUT_OF_WORK
+ | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK));
+
+ rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS;
+ for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) {
+ rq->procs.prio_info[pix].max_len = 0;
+ rq->procs.prio_info[pix].reds = 0;
+ }
+ rq->ports.info.max_len = 0;
+ rq->ports.info.reds = 0;
+ rq->max_len = 0;
+
+ erts_smp_runq_unlock(rq);
+
+ scheduler_ix_resume_wake(rq->ix);
}
+typedef struct {
+ Process *first;
+ Process *last;
+} ErtsStuckBoundProcesses;
+
static void
-evacuate_run_queue(ErtsRunQueue *evac_rq, ErtsRunQueue *rq)
+schedule_bound_processes(ErtsRunQueue *rq,
+ ErtsStuckBoundProcesses *sbpp)
{
- Port *prt;
- int notify_to_rq = 0;
- int prio;
- int prt_locked = 0;
- int rq_locked = 0;
- int evac_rq_locked = 1;
- ErtsMigrateResult mres;
+ Process *proc, *next;
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+
+ proc = sbpp->first;
+ while (proc) {
+ erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state);
+ next = proc->next;
+ enqueue_process(rq, (int) (ERTS_PSFLG_PRIO_MASK & state), proc);
+ proc = next;
+ }
+}
- erts_smp_runq_lock(evac_rq);
+static void
+evacuate_run_queue(ErtsRunQueue *rq,
+ ErtsStuckBoundProcesses *sbpp)
+{
+ int prio_q;
+ ErtsRunQueue *to_rq;
+ ErtsMigrationPaths *mps;
+ ErtsMigrationPath *mp;
- erts_smp_atomic32_read_bor_nob(&evac_rq->scheduler->ssi->flags,
- ERTS_SSI_FLG_SUSPENDED);
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
- evac_rq->flags &= ~ERTS_RUNQ_FLGS_IMMIGRATE_QMASK;
- evac_rq->flags |= (ERTS_RUNQ_FLGS_EMIGRATE_QMASK
- | ERTS_RUNQ_FLGS_EVACUATE_QMASK
- | ERTS_RUNQ_FLG_SUSPENDED);
+ ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
- erts_smp_atomic32_read_bor_nob(&evac_rq->info_flags, ERTS_RUNQ_IFLG_SUSPENDED);
- /*
- * Need to set up evacuation paths first since we
- * may release the run queue lock on evac_rq
- * when evacuating.
- */
- evac_rq->misc.evac_runq = rq;
- evac_rq->ports.info.migrate.runq = rq;
- for (prio = 0; prio < ERTS_NO_PROC_PRIO_LEVELS; prio++)
- evac_rq->procs.prio_info[prio].migrate.runq = rq;
+ mps = erts_get_migration_paths_managed();
+ mp = &mps->mpath[rq->ix];
/* Evacuate scheduled misc ops */
- if (evac_rq->misc.start) {
- rq_locked = 1;
- erts_smp_xrunq_lock(evac_rq, rq);
- if (rq->misc.end)
- rq->misc.end->next = evac_rq->misc.start;
+ if (rq->misc.start) {
+ ErtsMiscOpList *start, *end;
+
+ to_rq = mp->misc_evac_runq;
+ if (!to_rq)
+ return;
+
+ start = rq->misc.start;
+ end = rq->misc.end;
+ rq->misc.start = NULL;
+ rq->misc.end = NULL;
+ erts_smp_runq_unlock(rq);
+
+ erts_smp_runq_lock(to_rq);
+ if (to_rq->misc.end)
+ to_rq->misc.end->next = start;
else
- rq->misc.start = evac_rq->misc.start;
- rq->misc.end = evac_rq->misc.end;
- evac_rq->misc.start = NULL;
- evac_rq->misc.end = NULL;
+ to_rq->misc.start = start;
+
+ to_rq->misc.end = end;
+ erts_smp_runq_unlock(to_rq);
+ smp_notify_inc_runq(to_rq);
+ erts_smp_runq_lock(to_rq);
}
- /* Evacuate scheduled ports */
- prt = evac_rq->ports.start;
- while (prt) {
- mres = erts_port_migrate(prt, &prt_locked,
- evac_rq, &evac_rq_locked,
- rq, &rq_locked);
- if (mres == ERTS_MIGRATE_SUCCESS)
- notify_to_rq = 1;
- if (prt_locked)
- erts_smp_port_unlock(prt);
- if (!evac_rq_locked) {
- evac_rq_locked = 1;
- erts_smp_runq_lock(evac_rq);
+ if (rq->ports.start) {
+ Port *prt;
+
+ to_rq = mp->prio[ERTS_PORT_PRIO_LEVEL].runq;
+ if (!to_rq)
+ return;
+
+ /* Evacuate scheduled ports */
+ prt = rq->ports.start;
+ while (prt) {
+ ErtsRunQueue *prt_rq;
+ prt = erts_dequeue_port(rq);
+ RUNQ_SET_RQ(&prt->run_queue, to_rq);
+ erts_smp_runq_unlock(rq);
+ /*
+ * The port might terminate while
+ * we have no lock on it...
+ */
+ prt_rq = erts_port_runq(prt);
+ if (prt_rq) {
+ if (prt_rq != to_rq)
+ erl_exit(ERTS_ABORT_EXIT,
+ "%s:%d:%s() internal error\n",
+ __FILE__, __LINE__, __func__);
+ erts_enqueue_port(to_rq, prt);
+ erts_smp_runq_unlock(to_rq);
+ }
+ erts_smp_runq_lock(rq);
+ prt = rq->ports.start;
}
- prt = evac_rq->ports.start;
+ smp_notify_inc_runq(to_rq);
}
/* Evacuate scheduled processes */
- for (prio = 0; prio < ERTS_NO_PROC_PRIO_LEVELS; prio++) {
+ for (prio_q = 0; prio_q < ERTS_NO_PROC_PRIO_QUEUES; prio_q++) {
+ erts_aint32_t state;
Process *proc;
+ int notify = 0;
+ to_rq = NULL;
- switch (prio) {
- case PRIORITY_MAX:
- case PRIORITY_HIGH:
- case PRIORITY_NORMAL:
- proc = evac_rq->procs.prio[prio].first;
- while (proc) {
- ErtsProcLocks proc_locks = 0;
-
- /* Bound processes are stuck... */
- while (proc->bound_runq) {
- proc = proc->next;
- if (!proc)
- goto end_of_proc;
- }
-
- mres = erts_proc_migrate(proc, &proc_locks,
- evac_rq, &evac_rq_locked,
- rq, &rq_locked);
- if (mres == ERTS_MIGRATE_SUCCESS)
- notify_to_rq = 1;
- if (proc_locks)
- erts_smp_proc_unlock(proc, proc_locks);
- if (!evac_rq_locked) {
- erts_smp_runq_lock(evac_rq);
- evac_rq_locked = 1;
- }
+ if (!mp->prio[prio_q].runq)
+ return;
+ if (prio_q == PRIORITY_NORMAL && !mp->prio[PRIORITY_LOW].runq)
+ return;
- proc = evac_rq->procs.prio[prio].first;
+ proc = dequeue_process(rq, prio_q, &state);
+ while (proc) {
+ if (ERTS_PSFLG_BOUND & state) {
+ /* Bound processes get stuck here... */
+ proc->next = NULL;
+ if (sbpp->last)
+ sbpp->last->next = proc;
+ else
+ sbpp->first = proc;
+ sbpp->last = proc;
}
+ else {
+ int prio = (int) (ERTS_PSFLG_PRIO_MASK & state);
+ erts_smp_runq_unlock(rq);
- end_of_proc:
+ to_rq = mp->prio[prio].runq;
+ RUNQ_SET_RQ(&proc->run_queue, to_rq);
-#ifdef DEBUG
- for (proc = evac_rq->procs.prio[prio].first;
- proc;
- proc = proc->next) {
- ASSERT(proc->bound_runq);
+ erts_smp_runq_lock(to_rq);
+ enqueue_process(to_rq, prio, proc);
+ erts_smp_runq_unlock(to_rq);
+ notify = 1;
+
+ erts_smp_runq_lock(rq);
}
-#endif
- break;
- case PRIORITY_LOW:
- break;
- default:
- ASSERT(!"Invalid process priority");
- break;
+ proc = dequeue_process(rq, prio_q, &state);
}
+ if (notify)
+ smp_notify_inc_runq(to_rq);
}
- if (rq_locked)
- erts_smp_runq_unlock(rq);
-
- if (evac_rq_locked)
- erts_smp_runq_unlock(evac_rq);
-
- if (notify_to_rq)
- smp_notify_inc_runq(rq);
-
- wake_scheduler(evac_rq, 0);
+ if (ERTS_EMPTY_RUNQ(rq))
+ empty_runq(rq);
}
static int
-try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq)
+try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq, Uint32 flags)
{
- Process *proc;
- int vrq_locked;
+ Uint32 procs_qmask = flags & ERTS_RUNQ_FLGS_PROCS_QMASK;
+ int max_prio_bit;
+ ErtsRunPrioQueue *rpq;
- if (*rq_lockedp)
- erts_smp_xrunq_lock(rq, vrq);
- else
- erts_smp_runq_lock(vrq);
- vrq_locked = 1;
+ if (*rq_lockedp) {
+ erts_smp_runq_unlock(rq);
+ *rq_lockedp = 0;
+ }
+
+ ERTS_SMP_LC_ASSERT(!erts_smp_lc_runq_is_locked(rq));
- ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked);
+ erts_smp_runq_lock(vrq);
if (rq->halt_in_progress)
- goto try_steal_port;
+ goto no_procs;
/*
* Check for a runnable process to steal...
*/
- switch (vrq->flags & ERTS_RUNQ_FLGS_PROCS_QMASK) {
- case MAX_BIT:
- case MAX_BIT|HIGH_BIT:
- case MAX_BIT|NORMAL_BIT:
- case MAX_BIT|LOW_BIT:
- case MAX_BIT|HIGH_BIT|NORMAL_BIT:
- case MAX_BIT|HIGH_BIT|LOW_BIT:
- case MAX_BIT|NORMAL_BIT|LOW_BIT:
- case MAX_BIT|HIGH_BIT|NORMAL_BIT|LOW_BIT:
- for (proc = vrq->procs.prio[PRIORITY_MAX].last;
- proc;
- proc = proc->prev) {
- if (!proc->bound_runq)
- break;
- }
- if (proc)
+ while (procs_qmask) {
+ Process *prev_proc;
+ Process *proc;
+
+ max_prio_bit = procs_qmask & -procs_qmask;
+ switch (max_prio_bit) {
+ case MAX_BIT:
+ rpq = &vrq->procs.prio[PRIORITY_MAX];
break;
- case HIGH_BIT:
- case HIGH_BIT|NORMAL_BIT:
- case HIGH_BIT|LOW_BIT:
- case HIGH_BIT|NORMAL_BIT|LOW_BIT:
- for (proc = vrq->procs.prio[PRIORITY_HIGH].last;
- proc;
- proc = proc->prev) {
- if (!proc->bound_runq)
- break;
- }
- if (proc)
+ case HIGH_BIT:
+ rpq = &vrq->procs.prio[PRIORITY_HIGH];
break;
- case NORMAL_BIT:
- case LOW_BIT:
- case NORMAL_BIT|LOW_BIT:
- for (proc = vrq->procs.prio[PRIORITY_NORMAL].last;
- proc;
- proc = proc->prev) {
- if (!proc->bound_runq)
- break;
- }
- if (proc)
+ case NORMAL_BIT:
+ case LOW_BIT:
+ rpq = &vrq->procs.prio[PRIORITY_NORMAL];
break;
- case 0:
- proc = NULL;
- break;
- default:
- ASSERT(!"Invalid queue mask");
- proc = NULL;
- break;
- }
+ case 0:
+ goto no_procs;
+ default:
+ ASSERT(!"Invalid queue mask");
+ goto no_procs;
+ }
- if (proc) {
- ErtsProcLocks proc_locks = 0;
- int res;
- ErtsMigrateResult mres;
- mres = erts_proc_migrate(proc, &proc_locks,
- vrq, &vrq_locked,
- rq, rq_lockedp);
- if (proc_locks)
- erts_smp_proc_unlock(proc, proc_locks);
- res = !0;
- switch (mres) {
- case ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED:
- res = 0;
- case ERTS_MIGRATE_SUCCESS:
- if (vrq_locked)
+ prev_proc = NULL;
+ proc = rpq->first;
+
+ while (proc) {
+ erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state);
+ if (!(ERTS_PSFLG_BOUND & state)) {
+ /* Steal process */
+ int prio = (int) (ERTS_PSFLG_PRIO_MASK & state);
+ ErtsRunQueueInfo *rqi = &vrq->procs.prio_info[prio];
+ unqueue_process(vrq, rpq, rqi, prio, prev_proc, proc);
erts_smp_runq_unlock(vrq);
- return res;
- default: /* Other failures */
- break;
- }
- }
+ RUNQ_SET_RQ(&proc->run_queue, rq);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked);
+ erts_smp_runq_lock(rq);
+ *rq_lockedp = 1;
+ enqueue_process(rq, prio, proc);
+ return !0;
+ }
+ prev_proc = proc;
+ proc = proc->next;
+ }
- if (!vrq_locked) {
- if (*rq_lockedp)
- erts_smp_xrunq_lock(rq, vrq);
- else
- erts_smp_runq_lock(vrq);
- vrq_locked = 1;
+ procs_qmask &= ~max_prio_bit;
}
- try_steal_port:
+no_procs:
- ERTS_SMP_LC_CHK_RUNQ_LOCK(rq, *rq_lockedp);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(vrq, vrq_locked);
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(vrq));
/*
* Check for a runnable port to steal...
*/
- if (vrq->ports.info.len) {
- Port *prt = vrq->ports.end;
- int prt_locked = 0;
- int res;
- ErtsMigrateResult mres;
-
- mres = erts_port_migrate(prt, &prt_locked,
- vrq, &vrq_locked,
- rq, rq_lockedp);
- if (prt_locked)
- erts_smp_port_unlock(prt);
- res = !0;
- switch (mres) {
- case ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED:
- res = 0;
- case ERTS_MIGRATE_SUCCESS:
- if (vrq_locked)
- erts_smp_runq_unlock(vrq);
- return res;
- default: /* Other failures */
- break;
+ if (vrq->ports.start) {
+ ErtsRunQueue *prt_rq;
+ Port *prt = erts_dequeue_port(vrq);
+ RUNQ_SET_RQ(&prt->run_queue, rq);
+ erts_smp_runq_unlock(vrq);
+
+ /*
+ * The port might terminate while
+ * we have no lock on it...
+ */
+
+ prt_rq = erts_port_runq(prt);
+ if (!prt_rq)
+ return 0;
+ else {
+ if (prt_rq != rq)
+ erl_exit(ERTS_ABORT_EXIT,
+ "%s:%d:%s() internal error\n",
+ __FILE__, __LINE__, __func__);
+ *rq_lockedp = 1;
+ erts_enqueue_port(rq, prt);
+ return !0;
}
}
- if (vrq_locked)
- erts_smp_runq_unlock(vrq);
+ erts_smp_runq_unlock(vrq);
return 0;
}
@@ -3129,9 +3292,10 @@ static ERTS_INLINE int
check_possible_steal_victim(ErtsRunQueue *rq, int *rq_lockedp, int vix)
{
ErtsRunQueue *vrq = ERTS_RUNQ_IX(vix);
- erts_aint32_t iflgs = erts_smp_atomic32_read_nob(&vrq->info_flags);
- if (iflgs & ERTS_RUNQ_IFLG_NONEMPTY)
- return try_steal_task_from_victim(rq, rq_lockedp, vrq);
+ Uint32 flags = ERTS_RUNQ_FLGS_GET(vrq);
+ if ((flags & (ERTS_RUNQ_FLG_NONEMPTY
+ | ERTS_RUNQ_FLG_PROTECTED)) == ERTS_RUNQ_FLG_NONEMPTY)
+ return try_steal_task_from_victim(rq, rq_lockedp, vrq, flags);
else
return 0;
}
@@ -3141,15 +3305,12 @@ static int
try_steal_task(ErtsRunQueue *rq)
{
int res, rq_locked, vix, active_rqs, blnc_rqs;
+ Uint32 flags;
- /*
- * We are not allowed to steal jobs to this run queue
- * if it is suspended. Note that it might get suspended
- * at any time when we don't have the lock on the run
- * queue.
- */
- if (rq->flags & ERTS_RUNQ_FLG_SUSPENDED)
- return 0;
+ /* Protect jobs we steal from getting stolen from us... */
+ flags = ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_PROTECTED);
+ if (flags & ERTS_RUNQ_FLG_SUSPENDED)
+ return 0; /* go suspend instead... */
res = 0;
rq_locked = 1;
@@ -3268,12 +3429,123 @@ do { \
ASSERT(sum__ == (RQ)->full_reds_history_sum); \
} while (0);
+#define ERTS_PRE_ALLOCED_MPATHS 8
+
+erts_atomic_t erts_migration_paths;
+
+static struct {
+ size_t size;
+ ErtsMigrationPaths *freelist;
+ struct {
+ ErtsMigrationPaths *first;
+ ErtsMigrationPaths *last;
+ } retired;
+} mpaths;
+
+static void
+init_migration_paths(void)
+{
+ int qix, i;
+ char *p;
+ ErtsMigrationPaths *mps;
+
+ mpaths.size = sizeof(ErtsMigrationPaths);
+ mpaths.size += sizeof(ErtsMigrationPath)*(erts_no_schedulers-1);
+ mpaths.size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(mpaths.size);
+
+ p = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_LL_MPATHS,
+ (mpaths.size
+ * ERTS_PRE_ALLOCED_MPATHS));
+ mpaths.freelist = NULL;
+ for (i = 0; i < ERTS_PRE_ALLOCED_MPATHS-1; i++) {
+ mps = (ErtsMigrationPaths *) p;
+ mps->next = mpaths.freelist;
+ mpaths.freelist = mps;
+ p += mpaths.size;
+ }
+
+ mps = (ErtsMigrationPaths *) p;
+ mps->block = NULL;
+ for (qix = 0; qix < erts_no_run_queues; qix++) {
+ int pix;
+ mps->mpath[qix].flags = 0;
+ mps->mpath[qix].misc_evac_runq = NULL;
+ for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
+ mps->mpath[qix].prio[pix].limit.this = -1;
+ mps->mpath[qix].prio[pix].limit.other = -1;
+ mps->mpath[qix].prio[pix].runq = NULL;
+ mps->mpath[qix].prio[pix].flags = 0;
+ }
+ }
+ erts_atomic_init_wb(&erts_migration_paths, (erts_aint_t) mps);
+}
+
+static ERTS_INLINE ErtsMigrationPaths *
+alloc_mpaths(void)
+{
+ void *block;
+ ErtsMigrationPaths *res;
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&balance_info.update_mtx));
+
+ res = mpaths.freelist;
+ if (res) {
+ mpaths.freelist = res->next;
+ res->block = NULL;
+ return res;
+ }
+ res = erts_alloc(ERTS_ALC_T_SL_MPATHS,
+ mpaths.size+ERTS_CACHE_LINE_SIZE);
+ block = (void *) res;
+ if (((UWord) res) & ERTS_CACHE_LINE_MASK)
+ res = (ErtsMigrationPaths *) ((((UWord) res) & ~ERTS_CACHE_LINE_MASK)
+ + ERTS_CACHE_LINE_SIZE);
+ res->block = block;
+ return res;
+}
+
+static ERTS_INLINE void
+retire_mpaths(ErtsMigrationPaths *mps)
+{
+ ErtsThrPrgrVal current;
+
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&balance_info.update_mtx));
+
+ current = erts_thr_progress_current();
+
+ while (mpaths.retired.first) {
+ ErtsMigrationPaths *tmp = mpaths.retired.first;
+ if (!erts_thr_progress_has_reached_this(current, tmp->thr_prgr))
+ break;
+ mpaths.retired.first = tmp->next;
+ if (tmp->block) {
+ erts_free(ERTS_ALC_T_SL_MPATHS, tmp->block);
+ }
+ else {
+ tmp->next = mpaths.freelist;
+ mpaths.freelist = tmp;
+ }
+ }
+
+ if (!mpaths.retired.first)
+ mpaths.retired.last = NULL;
+
+ mps->thr_prgr = erts_thr_progress_later_than(current);
+ mps->next = NULL;
+
+ if (mpaths.retired.last)
+ mpaths.retired.last->next = mps;
+ else
+ mpaths.retired.first = mps;
+ mpaths.retired.last = mps;
+}
+
static void
check_balance(ErtsRunQueue *c_rq)
{
#if ERTS_MAX_PROCESSES >= (1 << 27)
# error check_balance() assumes ERTS_MAX_PROCESS < (1 << 27)
#endif
+ ErtsMigrationPaths *new_mpaths, *old_mpaths;
ErtsRunQueueBalance avg = {0};
Sint64 scheds_reds, full_scheds_reds;
int forced, active, current_active, oowc, half_full_scheds, full_scheds,
@@ -3299,9 +3571,9 @@ check_balance(ErtsRunQueue *c_rq)
ERTS_FOREACH_RUNQ(rq,
{
if (rq->waiting)
- rq->flags |= ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK;
+ ERTS_RUNQ_FLGS_SET(rq, ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK);
else
- rq->flags &= ~ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK;
+ ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK);
rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS;
});
@@ -3343,7 +3615,7 @@ check_balance(ErtsRunQueue *c_rq)
ErtsRunQueue *rq = ERTS_RUNQ_IX(qix);
erts_smp_runq_lock(rq);
- run_queue_info[qix].flags = rq->flags;
+ run_queue_info[qix].flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) {
run_queue_info[qix].prio[pix].max_len
= rq->procs.prio_info[pix].max_len;
@@ -3677,47 +3949,27 @@ erts_fprintf(stderr, "--------------------------------\n");
set_no_active_runqs(active);
balance_info.halftime = 1;
- erts_smp_atomic32_set_nob(&balance_info.checking_balance, 0);
+ new_mpaths = alloc_mpaths();
+
+ /* Write migration paths */
- /* Write migration paths and reset balance statistics in all queues */
for (qix = 0; qix < blnc_no_rqs; qix++) {
int mqix;
- Uint32 flags;
- ErtsRunQueue *rq = ERTS_RUNQ_IX(qix);
- ErtsRunQueueInfo *rqi;
- flags = run_queue_info[qix].flags;
- erts_smp_runq_lock(rq);
- flags |= (rq->flags & ~ERTS_RUNQ_FLGS_MIGRATION_INFO);
- ASSERT(!(flags & ERTS_RUNQ_FLG_OUT_OF_WORK));
- if (rq->waiting)
- flags |= ERTS_RUNQ_FLG_OUT_OF_WORK;
-
- rq->full_reds_history_sum
- = run_queue_info[qix].full_reds_history_sum;
- rq->full_reds_history[freds_hist_ix]
- = run_queue_info[qix].full_reds_history_change;
+ Uint32 flags = run_queue_info[qix].flags;
+ ErtsMigrationPath *mp = &new_mpaths->mpath[qix];
- ERTS_DBG_CHK_FULL_REDS_HISTORY(rq);
+ mp->flags = flags;
+ mp->misc_evac_runq = NULL;
- rq->out_of_work_count = 0;
- rq->flags = flags;
- rq->max_len = rq->len;
for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
- rqi = (pix == ERTS_PORT_PRIO_LEVEL
- ? &rq->ports.info
- : &rq->procs.prio_info[pix]);
- rqi->max_len = rqi->len;
- rqi->reds = 0;
if (!(ERTS_CHK_RUNQ_FLG_EMIGRATE(flags, pix)
| ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix))) {
ASSERT(run_queue_info[qix].prio[pix].immigrate_from < 0);
ASSERT(run_queue_info[qix].prio[pix].emigrate_to < 0);
-#ifdef DEBUG
- rqi->migrate.limit.this = -1;
- rqi->migrate.limit.other = -1;
- ERTS_DBG_SET_INVALID_RUNQP(rqi->migrate.runq, 0x2);
-#endif
-
+ mp->prio[pix].limit.this = -1;
+ mp->prio[pix].limit.other = -1;
+ mp->prio[pix].runq = NULL;
+ mp->prio[pix].flags = 0;
}
else if (ERTS_CHK_RUNQ_FLG_EMIGRATE(flags, pix)) {
ASSERT(!ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix));
@@ -3725,11 +3977,12 @@ erts_fprintf(stderr, "--------------------------------\n");
ASSERT(run_queue_info[qix].prio[pix].emigrate_to >= 0);
mqix = run_queue_info[qix].prio[pix].emigrate_to;
- rqi->migrate.limit.this
+ mp->prio[pix].limit.this
= run_queue_info[qix].prio[pix].migration_limit;
- rqi->migrate.limit.other
+ mp->prio[pix].limit.other
= run_queue_info[mqix].prio[pix].migration_limit;
- rqi->migrate.runq = ERTS_RUNQ_IX(mqix);
+ mp->prio[pix].runq = ERTS_RUNQ_IX(mqix);
+ mp->prio[pix].flags = run_queue_info[mqix].flags;
}
else {
ASSERT(ERTS_CHK_RUNQ_FLG_IMMIGRATE(flags, pix));
@@ -3737,24 +3990,142 @@ erts_fprintf(stderr, "--------------------------------\n");
ASSERT(run_queue_info[qix].prio[pix].immigrate_from >= 0);
mqix = run_queue_info[qix].prio[pix].immigrate_from;
- rqi->migrate.limit.this
+ mp->prio[pix].limit.this
= run_queue_info[qix].prio[pix].migration_limit;
- rqi->migrate.limit.other
+ mp->prio[pix].limit.other
= run_queue_info[mqix].prio[pix].migration_limit;
- rqi->migrate.runq = ERTS_RUNQ_IX(mqix);
+ mp->prio[pix].runq = ERTS_RUNQ_IX(mqix);
+ mp->prio[pix].flags = run_queue_info[mqix].flags;
}
}
+ }
+
+ old_mpaths = erts_get_migration_paths_managed();
+
+ /* Keep offline run-queues as is */
+ for (qix = blnc_no_rqs; qix < erts_no_schedulers; qix++) {
+ ErtsMigrationPath *nmp = &new_mpaths->mpath[qix];
+ ErtsMigrationPath *omp = &old_mpaths->mpath[qix];
+
+ nmp->flags = omp->flags;
+ nmp->misc_evac_runq = omp->misc_evac_runq;
+
+ for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
+ nmp->prio[pix].limit.this = omp->prio[pix].limit.this;
+ nmp->prio[pix].limit.other = omp->prio[pix].limit.other;
+ nmp->prio[pix].runq = omp->prio[pix].runq;
+ nmp->prio[pix].flags = omp->prio[pix].flags;
+ }
+ }
+
+
+ /* Publish new migration paths... */
+ erts_atomic_set_wb(&erts_migration_paths, (erts_aint_t) new_mpaths);
+
+ /* Reset balance statistics in all online queues */
+ for (qix = 0; qix < blnc_no_rqs; qix++) {
+ Uint32 flags = run_queue_info[qix].flags;
+ ErtsRunQueue *rq = ERTS_RUNQ_IX(qix);
+
+ erts_smp_runq_lock(rq);
+ ASSERT(!(flags & ERTS_RUNQ_FLG_OUT_OF_WORK));
+ if (rq->waiting)
+ flags |= ERTS_RUNQ_FLG_OUT_OF_WORK;
+
+ rq->full_reds_history_sum
+ = run_queue_info[qix].full_reds_history_sum;
+ rq->full_reds_history[freds_hist_ix]
+ = run_queue_info[qix].full_reds_history_change;
+
+ ERTS_DBG_CHK_FULL_REDS_HISTORY(rq);
+
+ rq->out_of_work_count = 0;
+ (void) ERTS_RUNQ_FLGS_MASK_SET(rq, ERTS_RUNQ_FLGS_MIGRATION_INFO, flags);
+
+ rq->max_len = rq->len;
+ for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
+ ErtsRunQueueInfo *rqi;
+ rqi = (pix == ERTS_PORT_PRIO_LEVEL
+ ? &rq->ports.info
+ : &rq->procs.prio_info[pix]);
+ erts_smp_reset_max_len(rq, rqi);
+ rqi->reds = 0;
+ }
rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS;
erts_smp_runq_unlock(rq);
}
+ erts_smp_atomic32_set_nob(&balance_info.checking_balance, 0);
+
balance_info.n++;
+ retire_mpaths(old_mpaths);
erts_smp_mtx_unlock(&balance_info.update_mtx);
erts_smp_runq_lock(c_rq);
}
+static void
+change_no_used_runqs(int used)
+{
+ ErtsMigrationPaths *new_mpaths, *old_mpaths;
+ int active, qix;
+ erts_smp_mtx_lock(&balance_info.update_mtx);
+ get_no_runqs(&active, NULL);
+ set_no_used_runqs(used);
+
+ old_mpaths = erts_get_migration_paths_managed();
+ new_mpaths = alloc_mpaths();
+
+ /* Write migration paths... */
+
+ for (qix = 0; qix < used; qix++) {
+ int pix;
+ ErtsMigrationPath *omp = &old_mpaths->mpath[qix];
+ ErtsMigrationPath *nmp = &new_mpaths->mpath[qix];
+
+ nmp->flags = omp->flags & ~ERTS_RUNQ_FLGS_MIGRATION_QMASKS;
+ nmp->misc_evac_runq = NULL;
+
+ for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
+ nmp->prio[pix].limit.this = -1;
+ nmp->prio[pix].limit.other = -1;
+ nmp->prio[pix].runq = NULL;
+ nmp->prio[pix].flags = 0;
+ }
+ }
+ for (qix = used; qix < erts_no_run_queues; qix++) {
+ int pix;
+ ErtsRunQueue *to_rq = ERTS_RUNQ_IX(qix % used);
+ ErtsMigrationPath *nmp = &new_mpaths->mpath[qix];
+
+ nmp->flags = (ERTS_RUNQ_FLGS_EMIGRATE_QMASK
+ | ERTS_RUNQ_FLGS_EVACUATE_QMASK);
+ nmp->misc_evac_runq = to_rq;
+ for (pix = 0; pix < ERTS_NO_PRIO_LEVELS; pix++) {
+ nmp->prio[pix].limit.this = -1;
+ nmp->prio[pix].limit.other = -1;
+ nmp->prio[pix].runq = to_rq;
+ nmp->prio[pix].flags = 0;
+ }
+ }
+
+ /* ... and publish them. */
+ erts_atomic_set_wb(&erts_migration_paths, (erts_aint_t) new_mpaths);
+
+ retire_mpaths(old_mpaths);
+
+ /* Make sure that we balance soon... */
+ balance_info.forced_check_balance = 1;
+
+ erts_smp_mtx_unlock(&balance_info.update_mtx);
+
+ erts_smp_runq_lock(ERTS_RUNQ_IX(0));
+ ERTS_RUNQ_IX(0)->check_balance_reds = 0;
+ erts_smp_runq_unlock(ERTS_RUNQ_IX(0));
+}
+
+
#endif /* #ifdef ERTS_SMP */
Uint
@@ -3846,7 +4217,6 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
rq->ix = ix;
- erts_smp_atomic32_init_nob(&rq->info_flags, ERTS_RUNQ_IFLG_NONEMPTY);
/* make sure that the "extra" id correponds to the schedulers
* id if the esdp->no <-> ix+1 mapping change.
@@ -3857,7 +4227,7 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
rq->waiting = 0;
rq->woken = 0;
- rq->flags = 0;
+ ERTS_RUNQ_FLGS_INIT(rq, ERTS_RUNQ_FLG_NONEMPTY);
rq->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS;
rq->full_reds_history_sum = 0;
for (rix = 0; rix < ERTS_FULL_REDS_HISTORY_SIZE; rix++) {
@@ -3871,19 +4241,14 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
rq->wakeup_other_reds = 0;
rq->halt_in_progress = 0;
- rq->procs.len = 0;
rq->procs.pending_exiters = NULL;
rq->procs.context_switches = 0;
rq->procs.reductions = 0;
for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) {
- rq->procs.prio_info[pix].len = 0;
+ erts_smp_atomic32_init_nob(&rq->procs.prio_info[pix].len, 0);
rq->procs.prio_info[pix].max_len = 0;
rq->procs.prio_info[pix].reds = 0;
- rq->procs.prio_info[pix].migrate.limit.this = 0;
- rq->procs.prio_info[pix].migrate.limit.other = 0;
- ERTS_DBG_SET_INVALID_RUNQP(rq->procs.prio_info[pix].migrate.runq,
- 0x0);
if (pix < ERTS_NO_PROC_PRIO_LEVELS - 1) {
rq->procs.prio[pix].first = NULL;
rq->procs.prio[pix].last = NULL;
@@ -3892,14 +4257,10 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
rq->misc.start = NULL;
rq->misc.end = NULL;
- rq->misc.evac_runq = NULL;
- rq->ports.info.len = 0;
+ erts_smp_atomic32_init_nob(&rq->ports.info.len, 0);
rq->ports.info.max_len = 0;
rq->ports.info.reds = 0;
- rq->ports.info.migrate.limit.this = 0;
- rq->ports.info.migrate.limit.other = 0;
- rq->ports.info.migrate.runq = NULL;
rq->ports.start = NULL;
rq->ports.end = NULL;
}
@@ -4022,10 +4383,12 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
balance_info.prev_rise.reds = 0;
balance_info.n = 0;
+ init_migration_paths();
+
if (no_schedulers_online < no_schedulers) {
+ change_no_used_runqs(no_schedulers_online);
for (ix = no_schedulers_online; ix < erts_no_run_queues; ix++)
- evacuate_run_queue(ERTS_RUNQ_IX(ix),
- ERTS_RUNQ_IX(ix % no_schedulers_online));
+ suspend_run_queue(ERTS_RUNQ_IX(ix));
}
schdlr_sspnd.wait_curr_online = no_schedulers_online;
@@ -4088,91 +4451,208 @@ erts_get_scheduler_data(void)
#endif
-static int remove_proc_from_runq(ErtsRunQueue *rq, Process *p, int to_inactive);
+/*
+ * scheduler_out_process() return with c_rq locked.
+ */
+static ERTS_INLINE int
+schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p)
+{
+ erts_aint32_t a, e, n;
+ int res = 0;
+
+ a = state;
+
+ while (1) {
+ n = e = a;
+
+ ASSERT(a & ERTS_PSFLG_RUNNING);
+ ASSERT(!(a & ERTS_PSFLG_IN_RUNQ));
+
+ n &= ~ERTS_PSFLG_RUNNING;
+ if ((a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE)
+ n |= ERTS_PSFLG_IN_RUNQ;
+ a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e);
+ if (a == e)
+ break;
+ }
+
+ if (!(n & ERTS_PSFLG_IN_RUNQ)) {
+ if (erts_system_profile_flags.runnable_procs)
+ profile_runnable_proc(p, am_inactive);
+ }
+ else {
+ int prio = (int) (ERTS_PSFLG_PRIO_MASK & n);
+ ErtsRunQueue *runq = erts_get_runq_proc(p);
+
+ ASSERT(!(n & ERTS_PSFLG_SUSPENDED));
+
+#ifdef ERTS_SMP
+ if (!(ERTS_PSFLG_BOUND & n)) {
+ ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio);
+ if (new_runq) {
+ RUNQ_SET_RQ(&p->run_queue, new_runq);
+ runq = new_runq;
+ }
+ }
+#endif
+ ASSERT(runq);
+ res = 1;
+
+ erts_smp_runq_lock(runq);
+
+ /* Enqueue the process */
+ enqueue_process(runq, prio, p);
+
+ if (runq == c_rq)
+ return res;
+ erts_smp_runq_unlock(runq);
+ smp_notify_inc_runq(runq);
+ }
+ erts_smp_runq_lock(c_rq);
+ return res;
+}
static ERTS_INLINE void
-suspend_process(ErtsRunQueue *rq, Process *p)
+add2runq(Process *p, erts_aint32_t state)
{
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
- p->rcount++; /* count number of suspend */
-#ifdef ERTS_SMP
- ASSERT(!(p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING)
- || p == erts_get_current_process());
- ASSERT(p->status != P_RUNNING
- || p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING);
- if (p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ)
- goto runable;
-#endif
- switch(p->status) {
- case P_SUSPENDED:
- break;
- case P_RUNABLE:
+ int prio = (int) (ERTS_PSFLG_PRIO_MASK & state);
+ ErtsRunQueue *runq = erts_get_runq_proc(p);
+
#ifdef ERTS_SMP
- runable:
- if (!ERTS_PROC_PENDING_EXIT(p))
+ if (!(ERTS_PSFLG_BOUND & state)) {
+ ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio);
+ if (new_runq) {
+ RUNQ_SET_RQ(&p->run_queue, new_runq);
+ runq = new_runq;
+ }
+ }
#endif
- remove_proc_from_runq(rq, p, 1);
- /* else:
- * leave process in schedq so it will discover the pending exit
- */
- p->rstatus = P_RUNABLE; /* wakeup as runnable */
- break;
- case P_RUNNING:
- p->rstatus = P_RUNABLE; /* wakeup as runnable */
- break;
- case P_WAITING:
- p->rstatus = P_WAITING; /* wakeup as waiting */
- break;
- case P_EXITING:
- return; /* ignore this */
- case P_GARBING:
- case P_FREE:
- erl_exit(1, "bad state in suspend_process()\n");
+ ASSERT(runq);
+
+ erts_smp_runq_lock(runq);
+
+ /* Enqueue the process */
+ enqueue_process(runq, prio, p);
+
+ erts_smp_runq_unlock(runq);
+ smp_notify_inc_runq(runq);
+
+}
+
+static ERTS_INLINE void
+schedule_process(Process *p, erts_aint32_t state, int active_enq)
+{
+ erts_aint32_t a = state, n;
+
+ while (1) {
+ erts_aint32_t e;
+ n = e = a;
+ ASSERT(!(a & ERTS_PSFLG_FREE));
+ n |= ERTS_PSFLG_ACTIVE;
+ if (!(a & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_RUNNING)))
+ n |= ERTS_PSFLG_IN_RUNQ;
+ a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e);
+ if (a == e)
+ break;
+ if (!active_enq && (a & ERTS_PSFLG_ACTIVE))
+ return; /* Someone else activated process ... */
+ }
+
+#ifdef RRR_DEBUG
+ if (active_enq)
+ erts_fprintf(stderr, "! state=0x%x\n", n);
+#endif
+
+ if (erts_system_profile_flags.runnable_procs
+ && !(a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) {
+ profile_runnable_proc(p, am_active);
}
- if ((erts_system_profile_flags.runnable_procs) && (p->rcount == 1) && (p->status != P_WAITING)) {
- profile_runnable_proc(p, am_inactive);
+ if ((n & ERTS_PSFLG_IN_RUNQ) && !(a & ERTS_PSFLG_IN_RUNQ)) {
+#ifdef RRR_DEBUG
+ if (active_enq)
+ erts_fprintf(stderr, "-->\n");
+#endif
+ add2runq(p, n);
}
+}
- p->status = P_SUSPENDED;
-
+void
+erts_schedule_process(Process *p, erts_aint32_t state)
+{
+ schedule_process(p, state, 0);
}
-static ERTS_INLINE void
-resume_process(Process *p)
+static ERTS_INLINE int
+suspend_process(Process *c_p, Process *p)
{
- Uint32 *statusp;
+ erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state);
+ int suspended = 0;
ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
- switch (p->status) {
- case P_SUSPENDED:
- statusp = &p->status;
- break;
- case P_GARBING:
- if (p->gcstatus == P_SUSPENDED) {
- statusp = &p->gcstatus;
- break;
+
+ if ((state & ERTS_PSFLG_SUSPENDED))
+ suspended = -1;
+ else {
+ if (c_p == p) {
+ state = erts_smp_atomic32_read_bor_relb(&p->state,
+ ERTS_PSFLG_SUSPENDED);
+ state |= ERTS_PSFLG_SUSPENDED;
+ ASSERT(state & ERTS_PSFLG_RUNNING);
+ suspended = 1;
}
- /* Fall through */
- default:
- return;
+ else {
+ while (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_EXITING))) {
+ erts_aint32_t e, n;
+ n = e = state;
+ n |= ERTS_PSFLG_SUSPENDED;
+ state = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e);
+ if (state == e) {
+ state = n;
+ suspended = 1;
+ break;
+ }
+ }
+ }
+ }
+
+ if (state & ERTS_PSFLG_SUSPENDED) {
+
+ ASSERT(!(ERTS_PSFLG_RUNNING & state)
+ || p == erts_get_current_process());
+
+ if (erts_system_profile_flags.runnable_procs
+ && (p->rcount == 0)
+ && (state & ERTS_PSFLG_ACTIVE)) {
+ profile_runnable_proc(p, am_inactive);
+ }
+
+ p->rcount++; /* count number of suspend */
}
+ return suspended;
+}
+
+static ERTS_INLINE void
+resume_process(Process *p)
+{
+ erts_aint32_t state;
+ ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
ASSERT(p->rcount > 0);
if (--p->rcount > 0) /* multiple suspend */
return;
- switch(p->rstatus) {
- case P_RUNABLE:
- erts_add_to_runq(p);
- break;
- case P_WAITING:
- *statusp = P_WAITING;
- break;
- default:
- erl_exit(1, "bad state in resume_process()\n");
+
+ state = erts_smp_atomic32_read_band_mb(&p->state, ~ERTS_PSFLG_SUSPENDED);
+ state &= ~ERTS_PSFLG_SUSPENDED;
+#ifdef RRR_DEBUG
+ erts_fprintf(stderr, "%T - state=0x%x\n", p->id, state);
+#endif
+ if ((state & (ERTS_PSFLG_EXITING
+ | ERTS_PSFLG_ACTIVE
+ | ERTS_PSFLG_IN_RUNQ
+ | ERTS_PSFLG_RUNNING)) == ERTS_PSFLG_ACTIVE) {
+ schedule_process(p, state, 1);
}
- p->rstatus = P_FREE;
}
int
@@ -4296,6 +4776,7 @@ suspend_scheduler(ErtsSchedulerData *esdp)
int wake = 0;
erts_aint32_t aux_work;
int thr_prgr_active = 1;
+ ErtsStuckBoundProcesses sbp = {NULL, NULL};
/*
* Schedulers may be suspended in two different ways:
@@ -4310,6 +4791,8 @@ suspend_scheduler(ErtsSchedulerData *esdp)
ASSERT(no != 1);
+ evacuate_run_queue(esdp->run_queue, &sbp);
+
erts_smp_runq_unlock(esdp->run_queue);
erts_sched_check_cpu_bind_prep_suspend(esdp);
@@ -4373,17 +4856,26 @@ suspend_scheduler(ErtsSchedulerData *esdp)
erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
while (1) {
+ erts_aint32_t qmask;
erts_aint32_t flgs;
+ qmask = (ERTS_RUNQ_FLGS_GET(esdp->run_queue)
+ & ERTS_RUNQ_FLGS_QMASK);
aux_work = erts_atomic32_read_acqb(&ssi->aux_work);
- if (aux_work) {
+ if (aux_work|qmask) {
if (!thr_prgr_active) {
erts_thr_progress_active(esdp, thr_prgr_active = 1);
sched_wall_time_change(esdp, 1);
}
- aux_work = handle_aux_work(&esdp->aux_work_data, aux_work);
+ if (aux_work)
+ aux_work = handle_aux_work(&esdp->aux_work_data, aux_work);
if (aux_work && erts_thr_progress_update(esdp))
erts_thr_progress_leader_update(esdp);
+ if (qmask) {
+ erts_smp_runq_lock(esdp->run_queue);
+ evacuate_run_queue(esdp->run_queue, &sbp);
+ erts_smp_runq_unlock(esdp->run_queue);
+ }
}
if (!aux_work) {
@@ -4453,56 +4945,11 @@ suspend_scheduler(ErtsSchedulerData *esdp)
erts_smp_runq_lock(esdp->run_queue);
non_empty_runq(esdp->run_queue);
+ schedule_bound_processes(esdp->run_queue, &sbp);
+
erts_sched_check_cpu_bind_post_suspend(esdp);
}
-#define ERTS_RUNQ_RESET_SUSPEND_INFO(RQ, DBG_ID) \
-do { \
- int pix__; \
- (RQ)->misc.evac_runq = NULL; \
- (RQ)->ports.info.migrate.runq = NULL; \
- (RQ)->flags &= ~(ERTS_RUNQ_FLGS_IMMIGRATE_QMASK \
- | ERTS_RUNQ_FLGS_EMIGRATE_QMASK \
- | ERTS_RUNQ_FLGS_EVACUATE_QMASK \
- | ERTS_RUNQ_FLG_SUSPENDED); \
- (RQ)->flags |= (ERTS_RUNQ_FLG_OUT_OF_WORK \
- | ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK); \
- (RQ)->check_balance_reds = ERTS_RUNQ_CALL_CHECK_BALANCE_REDS; \
- erts_smp_atomic32_read_band_nob(&(RQ)->info_flags, ~ERTS_RUNQ_IFLG_SUSPENDED);\
- for (pix__ = 0; pix__ < ERTS_NO_PROC_PRIO_LEVELS; pix__++) { \
- (RQ)->procs.prio_info[pix__].max_len = 0; \
- (RQ)->procs.prio_info[pix__].reds = 0; \
- ERTS_DBG_SET_INVALID_RUNQP((RQ)->procs.prio_info[pix__].migrate.runq,\
- (DBG_ID)); \
- } \
- (RQ)->ports.info.max_len = 0; \
- (RQ)->ports.info.reds = 0; \
-} while (0)
-
-#define ERTS_RUNQ_RESET_MIGRATION_PATHS__(RQ) \
-do { \
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked((RQ))); \
- (RQ)->misc.evac_runq = NULL; \
- (RQ)->ports.info.migrate.runq = NULL; \
- (RQ)->flags &= ~(ERTS_RUNQ_FLGS_IMMIGRATE_QMASK \
- | ERTS_RUNQ_FLGS_EMIGRATE_QMASK \
- | ERTS_RUNQ_FLGS_EVACUATE_QMASK); \
-} while (0)
-
-#ifdef DEBUG
-#define ERTS_RUNQ_RESET_MIGRATION_PATHS(RQ, DBG_ID) \
-do { \
- int pix__; \
- ERTS_RUNQ_RESET_MIGRATION_PATHS__((RQ)); \
- for (pix__ = 0; pix__ < ERTS_NO_PROC_PRIO_LEVELS; pix__++) \
- ERTS_DBG_SET_INVALID_RUNQP((RQ)->procs.prio_info[pix__].migrate.runq,\
- (DBG_ID)); \
-} while (0)
-#else
-#define ERTS_RUNQ_RESET_MIGRATION_PATHS(RQ, DBG_ID) \
- ERTS_RUNQ_RESET_MIGRATION_PATHS__((RQ))
-#endif
-
ErtsSchedSuspendResult
erts_schedulers_state(Uint *total,
Uint *online,
@@ -4572,27 +5019,13 @@ erts_set_schedulers_online(Process *p,
have_unlocked_plocks = 1;
erts_smp_proc_unlock(p, plocks);
}
- erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
- erts_smp_mtx_lock(&balance_info.update_mtx);
- for (ix = online; ix < no; ix++) {
- ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- erts_smp_runq_lock(rq);
- ERTS_RUNQ_RESET_SUSPEND_INFO(rq, 0x5);
- erts_smp_runq_unlock(rq);
- scheduler_ix_resume_wake(ix);
- }
- /*
- * Spread evacuation paths among all online
- * run queues.
- */
- for (ix = no; ix < erts_no_run_queues; ix++) {
- ErtsRunQueue *from_rq = ERTS_RUNQ_IX(ix);
- ErtsRunQueue *to_rq = ERTS_RUNQ_IX(ix % no);
- evacuate_run_queue(from_rq, to_rq);
- }
- set_no_used_runqs(no);
- erts_smp_mtx_unlock(&balance_info.update_mtx);
- erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+ change_no_used_runqs(no);
+
+ for (ix = online; ix < no; ix++)
+ resume_run_queue(ERTS_RUNQ_IX(ix));
+
+ for (ix = no; ix < erts_no_run_queues; ix++)
+ suspend_run_queue(ERTS_RUNQ_IX(ix));
}
res = ERTS_SCHDLR_SSPND_DONE;
}
@@ -4619,25 +5052,11 @@ erts_set_schedulers_online(Process *p,
have_unlocked_plocks = 1;
erts_smp_proc_unlock(p, plocks);
}
- erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
- erts_smp_mtx_lock(&balance_info.update_mtx);
-
- for (ix = 0; ix < online; ix++) {
- ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- erts_smp_runq_lock(rq);
- ERTS_RUNQ_RESET_MIGRATION_PATHS(rq, 0x6);
- erts_smp_runq_unlock(rq);
- }
- /*
- * Evacutation order important! Newly suspended run queues
- * has to be evacuated last.
- */
- for (ix = erts_no_run_queues-1; ix >= no; ix--)
- evacuate_run_queue(ERTS_RUNQ_IX(ix),
- ERTS_RUNQ_IX(ix % no));
- set_no_used_runqs(no);
- erts_smp_mtx_unlock(&balance_info.update_mtx);
- erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+
+ change_no_used_runqs(no);
+ for (ix = no; ix < erts_no_run_queues; ix++)
+ suspend_run_queue(ERTS_RUNQ_IX(ix));
+
for (ix = no; ix < online; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
wake_scheduler(rq, 0);
@@ -4734,25 +5153,14 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
schdlr_sspnd.msb.wait_active = 2;
}
- erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
- erts_smp_mtx_lock(&balance_info.update_mtx);
- set_no_used_runqs(1);
- for (ix = 0; ix < online; ix++) {
+ change_no_used_runqs(1);
+ for (ix = 1; ix < erts_no_run_queues; ix++)
+ suspend_run_queue(ERTS_RUNQ_IX(ix));
+
+ for (ix = 1; ix < online; ix++) {
ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- erts_smp_runq_lock(rq);
- ASSERT(!(rq->flags & ERTS_RUNQ_FLG_SUSPENDED));
- ERTS_RUNQ_RESET_MIGRATION_PATHS(rq, 0x7);
- erts_smp_runq_unlock(rq);
+ wake_scheduler(rq, 0);
}
- /*
- * Evacuate all activities in all other run queues
- * into the first run queue. Note order is important,
- * online run queues has to be evacuated last.
- */
- for (ix = erts_no_run_queues-1; ix >= 1; ix--)
- evacuate_run_queue(ERTS_RUNQ_IX(ix), ERTS_RUNQ_IX(0));
- erts_smp_mtx_unlock(&balance_info.update_mtx);
- erts_smp_mtx_lock(&schdlr_sspnd.mtx);
if (erts_smp_atomic32_read_nob(&schdlr_sspnd.active)
!= schdlr_sspnd.msb.wait_active) {
@@ -4796,15 +5204,6 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
plp = proclist_create(p);
plp->next = schdlr_sspnd.msb.procs;
schdlr_sspnd.msb.procs = plp;
-#ifdef DEBUG
- ERTS_FOREACH_RUNQ(srq,
- {
- if (srq != ERTS_RUNQ_IX(0)) {
- ASSERT(ERTS_EMPTY_RUNQ(srq));
- ASSERT(srq->flags & ERTS_RUNQ_FLG_SUSPENDED);
- }
- });
-#endif
ASSERT(p->scheduler_data);
}
}
@@ -4836,27 +5235,6 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
res = ERTS_SCHDLR_SSPND_DONE_MSCHED_BLOCKED;
else {
ERTS_SCHDLR_SSPND_CHNG_SET(ERTS_SCHDLR_SSPND_CHNG_MSB, 0);
-#ifdef DEBUG
- ERTS_FOREACH_RUNQ(rq,
- {
- if (rq != p->scheduler_data->run_queue) {
- if (!ERTS_EMPTY_RUNQ(rq)) {
- Process *rp;
- int pix;
- ASSERT(rq->ports.info.len == 0);
- for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++) {
- for (rp = rq->procs.prio[pix].first;
- rp;
- rp = rp->next) {
- ASSERT(rp->bound_runq);
- }
- }
- }
-
- ASSERT(rq->flags & ERTS_RUNQ_FLG_SUSPENDED);
- }
- });
-#endif
p->flags &= ~F_HAVE_BLCKD_MSCHED;
schdlr_sspnd.msb.ongoing = 0;
if (schdlr_sspnd.online == 1) {
@@ -4866,35 +5244,19 @@ erts_block_multi_scheduling(Process *p, ErtsProcLocks plocks, int on, int all)
}
else {
int online = schdlr_sspnd.online;
- erts_smp_mtx_unlock(&schdlr_sspnd.mtx);
if (plocks) {
have_unlocked_plocks = 1;
erts_smp_proc_unlock(p, plocks);
}
- erts_smp_mtx_lock(&balance_info.update_mtx);
+
+ change_no_used_runqs(online);
/* Resume all online run queues */
- for (ix = 1; ix < online; ix++) {
- ErtsRunQueue *rq = ERTS_RUNQ_IX(ix);
- erts_smp_runq_lock(rq);
- ERTS_RUNQ_RESET_SUSPEND_INFO(rq, 0x4);
- erts_smp_runq_unlock(rq);
- scheduler_ix_resume_wake(ix);
- }
+ for (ix = 1; ix < online; ix++)
+ resume_run_queue(ERTS_RUNQ_IX(ix));
- /* Spread evacuation paths among all online run queues */
for (ix = online; ix < erts_no_run_queues; ix++)
- evacuate_run_queue(ERTS_RUNQ_IX(ix),
- ERTS_RUNQ_IX(ix % online));
-
- set_no_used_runqs(online);
- /* Make sure that we balance soon... */
- balance_info.forced_check_balance = 1;
- erts_smp_runq_lock(ERTS_RUNQ_IX(0));
- ERTS_RUNQ_IX(0)->check_balance_reds = 0;
- erts_smp_runq_unlock(ERTS_RUNQ_IX(0));
- erts_smp_mtx_unlock(&balance_info.update_mtx);
- erts_smp_mtx_lock(&schdlr_sspnd.mtx);
+ suspend_run_queue(ERTS_RUNQ_IX(ix));
}
res = ERTS_SCHDLR_SSPND_DONE;
}
@@ -5201,10 +5563,7 @@ handle_pend_sync_suspend(Process *suspendee,
if (suspender) {
ASSERT(is_nil(suspender->suspendee));
if (suspendee_alive) {
- ErtsRunQueue *rq = erts_get_runq_proc(suspendee);
- erts_smp_runq_lock(rq);
- suspend_process(rq, suspendee);
- erts_smp_runq_unlock(rq);
+ erts_suspend(suspendee, suspendee_locks, NULL);
suspender->suspendee = suspendee->id;
}
/* suspender is suspended waiting for suspendee to suspend;
@@ -5247,10 +5606,9 @@ pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks,
resume_process(rp);
}
else {
- ErtsRunQueue *cp_rq, *rp_rq;
rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS,
- pid, ERTS_PROC_LOCK_STATUS);
+ pid, pid_locks|ERTS_PROC_LOCK_STATUS);
if (!rp) {
c_p->flags &= ~F_P2PNR_RESCHED;
@@ -5259,58 +5617,36 @@ pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks,
ASSERT(!(c_p->flags & F_P2PNR_RESCHED));
- cp_rq = erts_get_runq_proc(c_p);
- rp_rq = erts_get_runq_proc(rp);
- erts_smp_runqs_lock(cp_rq, rp_rq);
- if (rp->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) {
- running:
- /* Phiu... */
-
- /*
- * If we got pending suspenders and suspend ourselves waiting
- * to suspend another process we might deadlock.
- * In this case we have to yield, be suspended by
- * someone else and then do it all over again.
- */
- if (!c_p->pending_suspenders) {
- /* Mark rp pending for suspend by c_p */
- add_pend_suspend(rp, c_p->id, handle_pend_sync_suspend);
- ASSERT(is_nil(c_p->suspendee));
-
- /* Suspend c_p; when rp is suspended c_p will be resumed. */
- suspend_process(cp_rq, c_p);
- c_p->flags |= F_P2PNR_RESCHED;
- }
- /* Yield (caller is assumed to yield immediately in bif). */
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
- rp = ERTS_PROC_LOCK_BUSY;
+ if (suspend) {
+ if (suspend_process(c_p, rp))
+ goto done;
}
else {
- ErtsProcLocks need_locks = pid_locks & ~ERTS_PROC_LOCK_STATUS;
- if (need_locks && erts_smp_proc_trylock(rp, need_locks) == EBUSY) {
- erts_smp_runqs_unlock(cp_rq, rp_rq);
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
- rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS,
- pid, pid_locks|ERTS_PROC_LOCK_STATUS);
- if (!rp)
- goto done;
- /* run-queues may have changed */
- cp_rq = erts_get_runq_proc(c_p);
- rp_rq = erts_get_runq_proc(rp);
- erts_smp_runqs_lock(cp_rq, rp_rq);
- if (rp->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) {
- /* Ahh... */
- erts_smp_proc_unlock(rp,
- pid_locks & ~ERTS_PROC_LOCK_STATUS);
- goto running;
- }
- }
+ if (!(ERTS_PSFLG_RUNNING & erts_smp_atomic32_read_acqb(&rp->state)))
+ goto done;
- /* rp is not running and we got the locks we want... */
- if (suspend)
- suspend_process(rp_rq, rp);
}
- erts_smp_runqs_unlock(cp_rq, rp_rq);
+
+ /* Other process running */
+
+ /*
+ * If we got pending suspenders and suspend ourselves waiting
+ * to suspend another process we might deadlock.
+ * In this case we have to yield, be suspended by
+ * someone else and then do it all over again.
+ */
+ if (!c_p->pending_suspenders) {
+ /* Mark rp pending for suspend by c_p */
+ add_pend_suspend(rp, c_p->id, handle_pend_sync_suspend);
+ ASSERT(is_nil(c_p->suspendee));
+
+ /* Suspend c_p; when rp is suspended c_p will be resumed. */
+ suspend_process(c_p, c_p);
+ c_p->flags |= F_P2PNR_RESCHED;
+ }
+ /* Yield (caller is assumed to yield immediately in bif). */
+ erts_smp_proc_unlock(rp, pid_locks|ERTS_PROC_LOCK_STATUS);
+ rp = ERTS_PROC_LOCK_BUSY;
}
done:
@@ -5367,10 +5703,10 @@ erts_pid2proc_nropt(Process *c_p, ErtsProcLocks c_p_locks,
return erts_pid2proc_not_running(c_p, c_p_locks, pid, pid_locks);
}
-static ERTS_INLINE void
-do_bif_suspend_process(ErtsSuspendMonitor *smon,
- Process *suspendee,
- ErtsRunQueue *locked_runq)
+static ERTS_INLINE int
+do_bif_suspend_process(Process *c_p,
+ ErtsSuspendMonitor *smon,
+ Process *suspendee)
{
ASSERT(suspendee);
ASSERT(!ERTS_PROC_IS_EXITING(suspendee));
@@ -5378,25 +5714,15 @@ do_bif_suspend_process(ErtsSuspendMonitor *smon,
& erts_proc_lc_my_proc_locks(suspendee));
if (smon) {
if (!smon->active) {
- ErtsRunQueue *rq;
-
- if (locked_runq)
- rq = locked_runq;
- else {
- rq = erts_get_runq_proc(suspendee);
- erts_smp_runq_lock(rq);
- }
-
- suspend_process(rq, suspendee);
-
- if (!locked_runq)
- erts_smp_runq_unlock(rq);
+ if (!suspend_process(c_p, suspendee))
+ return 0;
}
smon->active += smon->pending;
ASSERT(smon->active);
smon->pending = 0;
+ return 1;
}
-
+ return 0;
}
static void
@@ -5419,10 +5745,17 @@ handle_pend_bif_sync_suspend(Process *suspendee,
erts_delete_suspend_monitor(&suspender->suspend_monitors,
suspendee->id);
else {
+#ifdef DEBUG
+ int res;
+#endif
ErtsSuspendMonitor *smon;
smon = erts_lookup_suspend_monitor(suspender->suspend_monitors,
suspendee->id);
- do_bif_suspend_process(smon, suspendee, NULL);
+#ifdef DEBUG
+ res =
+#endif
+ do_bif_suspend_process(suspendee, smon, suspendee);
+ ASSERT(!smon || res != 0);
suspender->suspendee = suspendee->id;
}
/* suspender is suspended waiting for suspendee to suspend;
@@ -5454,10 +5787,17 @@ handle_pend_bif_async_suspend(Process *suspendee,
erts_delete_suspend_monitor(&suspender->suspend_monitors,
suspendee->id);
else {
+#ifdef DEBUG
+ int res;
+#endif
ErtsSuspendMonitor *smon;
smon = erts_lookup_suspend_monitor(suspender->suspend_monitors,
suspendee->id);
- do_bif_suspend_process(smon, suspendee, NULL);
+#ifdef DEBUG
+ res =
+#endif
+ do_bif_suspend_process(suspendee, smon, suspendee);
+ ASSERT(!smon || res != 0);
}
erts_smp_proc_unlock(suspender, ERTS_PROC_LOCK_LINK);
}
@@ -5542,7 +5882,8 @@ suspend_process_2(BIF_ALIST_2)
/* This is really a piece of cake without SMP support... */
if (!smon->active) {
- suspend_process(ERTS_RUNQ_IX(0), suspendee);
+ erts_smp_atomic32_read_bor_nob(&suspendee->state, ERTS_PSFLG_SUSPENDED);
+ suspend_process(BIF_P, suspendee);
smon->active++;
res = am_true;
}
@@ -5585,21 +5926,15 @@ suspend_process_2(BIF_ALIST_2)
if (smon->pending && unless_suspending)
res = am_false;
else {
- ErtsRunQueue *rq;
if (smon->pending == INT_MAX)
goto system_limit;
smon->pending++;
- rq = erts_get_runq_proc(suspendee);
- erts_smp_runq_lock(rq);
- if (suspendee->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING)
+ if (!do_bif_suspend_process(BIF_P, smon, suspendee))
add_pend_suspend(suspendee,
BIF_P->id,
handle_pend_bif_async_suspend);
- else
- do_bif_suspend_process(smon, suspendee, rq);
- erts_smp_runq_unlock(rq);
res = am_true;
}
@@ -5636,7 +5971,6 @@ suspend_process_2(BIF_ALIST_2)
/* done */
}
else {
- ErtsRunQueue *cp_rq, *s_rq;
/* We haven't got any active suspends on the suspendee */
/*
@@ -5653,12 +5987,7 @@ suspend_process_2(BIF_ALIST_2)
if (!unless_suspending || smon->pending == 0)
smon->pending++;
- cp_rq = erts_get_runq_proc(BIF_P);
- s_rq = erts_get_runq_proc(suspendee);
- erts_smp_runqs_lock(cp_rq, s_rq);
- if (!(suspendee->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING)) {
- do_bif_suspend_process(smon, suspendee, s_rq);
- erts_smp_runqs_unlock(cp_rq, s_rq);
+ if (do_bif_suspend_process(BIF_P, smon, suspendee)) {
res = (!unless_suspending || smon->active == 1
? am_true
: am_false);
@@ -5678,8 +6007,7 @@ suspend_process_2(BIF_ALIST_2)
* This time with BIF_P->suspendee == BIF_ARG_1 (see
* above).
*/
- suspend_process(cp_rq, BIF_P);
- erts_smp_runqs_unlock(cp_rq, s_rq);
+ suspend_process(BIF_P, BIF_P);
goto yield;
}
}
@@ -5687,9 +6015,15 @@ suspend_process_2(BIF_ALIST_2)
}
#endif /* ERTS_SMP */
-
- ASSERT(suspendee->status == P_SUSPENDED || (asynchronous && smon->pending));
- ASSERT(suspendee->status == P_SUSPENDED || !smon->active);
+#ifdef DEBUG
+ {
+ erts_aint32_t state = erts_smp_atomic32_read_acqb(&suspendee->state);
+ ASSERT((state & ERTS_PSFLG_SUSPENDED)
+ || (asynchronous && smon->pending));
+ ASSERT((state & ERTS_PSFLG_SUSPENDED)
+ || !smon->active);
+ }
+#endif
erts_smp_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS);
erts_smp_proc_unlock(BIF_P, xlocks);
@@ -5784,9 +6118,8 @@ resume_process_1(BIF_ALIST_1)
if (!suspendee)
goto no_suspendee;
- ASSERT(suspendee->status == P_SUSPENDED
- || (suspendee->status == P_GARBING
- && suspendee->gcstatus == P_SUSPENDED));
+ ASSERT(ERTS_PSFLG_SUSPENDED
+ & erts_smp_atomic32_read_nob(&suspendee->state));
resume_process(suspendee);
erts_smp_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS);
@@ -5815,449 +6148,46 @@ erts_run_queues_len(Uint *qlen)
Uint len = 0;
ERTS_ATOMIC_FOREACH_RUNQ(rq,
{
- if (qlen)
- qlen[i++] = rq->procs.len;
- len += rq->procs.len;
+ Sint pqlen = 0;
+ int pix;
+ for (pix = 0; pix < ERTS_NO_PROC_PRIO_LEVELS; pix++)
+ pqlen += RUNQ_READ_LEN(&rq->procs.prio_info[pix].len);
+
+ if (pqlen < 0)
+ pqlen = 0;
+ if (qlen)
+ qlen[i++] = pqlen;
+ len += pqlen;
}
);
return len;
}
-#ifdef HARDDEBUG_RUNQS
-static void
-check_procs_runq(ErtsRunQueue *runq, Process *p_in_q, Process *p_not_in_q)
-{
- int len[ERTS_NO_PROC_PRIO_LEVELS] = {0};
- int tot_len;
- int prioq, prio;
- int found_p_in_q;
- Process *p, *prevp;
-
- found_p_in_q = 0;
- for (prioq = 0; prioq < ERTS_NO_PROC_PRIO_LEVELS - 1; prioq++) {
- prevp = NULL;
- for (p = runq->procs.prio[prioq].first; p; p = p->next) {
- ASSERT(p != p_not_in_q);
- if (p == p_in_q)
- found_p_in_q = 1;
- switch (p->prio) {
- case PRIORITY_MAX:
- case PRIORITY_HIGH:
- case PRIORITY_NORMAL:
- ASSERT(prioq == p->prio);
- break;
- case PRIORITY_LOW:
- ASSERT(prioq == PRIORITY_NORMAL);
- break;
- default:
- ASSERT(!"Bad prio on process");
- }
- len[p->prio]++;
- ASSERT(prevp == p->prev);
- if (p->prev) {
- ASSERT(p->prev->next == p);
- }
- else {
- ASSERT(runq->procs.prio[prioq].first == p);
- }
- if (p->next) {
- ASSERT(p->next->prev == p);
- }
- else {
- ASSERT(runq->procs.prio[prioq].last == p);
- }
- ASSERT(p->run_queue == runq);
- prevp = p;
- }
- }
-
- ASSERT(!p_in_q || found_p_in_q);
-
- tot_len = 0;
- for (prio = 0; prio < ERTS_NO_PROC_PRIO_LEVELS; prio++) {
- ASSERT(len[prio] == runq->procs.prio_info[prio].len);
- if (len[prio]) {
- ASSERT(runq->flags & (1 << prio));
- }
- else {
- ASSERT(!(runq->flags & (1 << prio)));
- }
- tot_len += len[prio];
- }
- ASSERT(runq->procs.len == tot_len);
-}
-# define ERTS_DBG_CHK_PROCS_RUNQ(RQ) check_procs_runq((RQ), NULL, NULL)
-# define ERTS_DBG_CHK_PROCS_RUNQ_PROC(RQ, P) check_procs_runq((RQ), (P), NULL)
-# define ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(RQ, P) check_procs_runq((RQ), NULL, (P))
-#else
-# define ERTS_DBG_CHK_PROCS_RUNQ(RQ)
-# define ERTS_DBG_CHK_PROCS_RUNQ_PROC(RQ, P)
-# define ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(RQ, P)
-#endif
-
-
-static ERTS_INLINE void
-enqueue_process(ErtsRunQueue *runq, Process *p)
-{
- ErtsRunPrioQueue *rpq;
- ErtsRunQueueInfo *rqi;
-
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
-
- ASSERT(p->bound_runq || !(runq->flags & ERTS_RUNQ_FLG_SUSPENDED));
-
- rqi = &runq->procs.prio_info[p->prio];
- rqi->len++;
- if (rqi->max_len < rqi->len)
- rqi->max_len = rqi->len;
-
- runq->procs.len++;
- runq->len++;
- if (runq->max_len < runq->len)
- runq->max_len = runq->len;
-
- runq->flags |= (1 << p->prio);
-
- rpq = (p->prio == PRIORITY_LOW
- ? &runq->procs.prio[PRIORITY_NORMAL]
- : &runq->procs.prio[p->prio]);
-
- p->next = NULL;
- p->prev = rpq->last;
- if (rpq->last)
- rpq->last->next = p;
- else
- rpq->first = p;
- rpq->last = p;
-
- switch (p->status) {
- case P_EXITING:
- break;
- case P_GARBING:
- p->gcstatus = P_RUNABLE;
- break;
- default:
- p->status = P_RUNABLE;
- break;
- }
-
-#ifdef ERTS_SMP
- p->status_flags |= ERTS_PROC_SFLG_INRUNQ;
-#endif
-
- ERTS_DBG_CHK_PROCS_RUNQ_PROC(runq, p);
-}
-
-
-static ERTS_INLINE int
-dequeue_process(ErtsRunQueue *runq, Process *p)
-{
- ErtsRunPrioQueue *rpq;
- int res = 1;
-
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
-
- ERTS_DBG_CHK_PROCS_RUNQ(runq);
-
- rpq = &runq->procs.prio[p->prio == PRIORITY_LOW ? PRIORITY_NORMAL : p->prio];
- if (p->prev) {
- p->prev->next = p->next;
- }
- else if (rpq->first == p) {
- rpq->first = p->next;
- }
- else {
- res = 0;
- }
- if (p->next) {
- p->next->prev = p->prev;
- }
- else if (rpq->last == p) {
- rpq->last = p->prev;
- }
- else {
- ASSERT(res == 0);
- }
-
- if (res) {
-
- if (--runq->procs.prio_info[p->prio].len == 0)
- runq->flags &= ~(1 << p->prio);
- runq->procs.len--;
- runq->len--;
-
-#ifdef ERTS_SMP
- p->status_flags &= ~ERTS_PROC_SFLG_INRUNQ;
-#endif
- }
-
- ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(runq, p);
- return res;
-}
-
-/* schedule a process */
-static ERTS_INLINE ErtsRunQueue *
-internal_add_to_runq(ErtsRunQueue *runq, Process *p)
-{
- Uint32 prev_status = p->status;
- ErtsRunQueue *add_runq;
-#ifdef ERTS_SMP
-
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
-
- if (p->status_flags & ERTS_PROC_SFLG_INRUNQ)
- return NULL;
- else if (p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING) {
- ASSERT(ERTS_PROC_IS_EXITING(p) || p->rcount == 0);
- ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(runq, p);
- p->status_flags |= ERTS_PROC_SFLG_PENDADD2SCHEDQ;
- return NULL;
- }
- ASSERT(!p->scheduler_data);
-#endif
-
- ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(runq, p);
-#ifndef ERTS_SMP
- /* Never schedule a suspended process (ok in smp case) */
- ASSERT(ERTS_PROC_IS_EXITING(p) || p->rcount == 0);
- add_runq = runq;
-#else
- ASSERT(!p->bound_runq || p->bound_runq == p->run_queue);
- if (p->bound_runq) {
- if (p->bound_runq == runq)
- add_runq = runq;
- else {
- add_runq = p->bound_runq;
- erts_smp_xrunq_lock(runq, add_runq);
- }
- }
- else {
- add_runq = erts_check_emigration_need(runq, p->prio);
- if (!add_runq)
- add_runq = runq;
- else /* Process emigrated */
- p->run_queue = add_runq;
- }
-#endif
-
- /* Enqueue the process */
- enqueue_process(add_runq, p);
-
- if ((erts_system_profile_flags.runnable_procs)
- && (prev_status == P_WAITING
- || prev_status == P_SUSPENDED)) {
- profile_runnable_proc(p, am_active);
- }
-
- if (add_runq != runq)
- erts_smp_runq_unlock(add_runq);
-
- return add_runq;
-}
-
-
-void
-erts_add_to_runq(Process *p)
-{
- ErtsRunQueue *notify_runq;
- ErtsRunQueue *runq = erts_get_runq_proc(p);
- erts_smp_runq_lock(runq);
- notify_runq = internal_add_to_runq(runq, p);
- erts_smp_runq_unlock(runq);
- smp_notify_inc_runq(notify_runq);
-
-}
-
-/* Possibly remove a scheduled process we need to suspend */
-
-static int
-remove_proc_from_runq(ErtsRunQueue *rq, Process *p, int to_inactive)
-{
- int res;
-
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
-
-#ifdef ERTS_SMP
- if (p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ) {
- p->status_flags &= ~ERTS_PROC_SFLG_PENDADD2SCHEDQ;
- ASSERT(!remove_proc_from_runq(rq, p, 0));
- return 1;
- }
-#endif
-
- res = dequeue_process(rq, p);
-
- if (res && erts_system_profile_flags.runnable_procs && to_inactive)
- profile_runnable_proc(p, am_inactive);
-
-#ifdef ERTS_SMP
- ASSERT(!(p->status_flags & ERTS_PROC_SFLG_INRUNQ));
-#endif
-
- return res;
-}
-
-#ifdef ERTS_SMP
-
-ErtsMigrateResult
-erts_proc_migrate(Process *p, ErtsProcLocks *plcks,
- ErtsRunQueue *from_rq, int *from_locked,
- ErtsRunQueue *to_rq, int *to_locked)
-{
- ERTS_SMP_LC_ASSERT(*plcks == erts_proc_lc_my_proc_locks(p));
- ERTS_SMP_LC_ASSERT((ERTS_PROC_LOCK_STATUS & *plcks)
- || from_locked);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked);
-
- /*
- * If we have the lock on the run queue to migrate to,
- * check that it isn't suspended. If it is suspended,
- * we will refuse to migrate to it anyway.
- */
- if (*to_locked && (to_rq->flags & ERTS_RUNQ_FLG_SUSPENDED))
- return ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED;
-
- /* We need status lock on process and locks on both run queues */
-
- if (!(ERTS_PROC_LOCK_STATUS & *plcks)) {
- if (erts_smp_proc_trylock(p, ERTS_PROC_LOCK_STATUS) == EBUSY) {
- ErtsProcLocks lcks = *plcks;
- Eterm pid = p->id;
- Process *proc = *plcks ? p : NULL;
-
- if (*from_locked) {
- *from_locked = 0;
- erts_smp_runq_unlock(from_rq);
- }
- if (*to_locked) {
- *to_locked = 0;
- erts_smp_runq_unlock(to_rq);
- }
-
- proc = erts_pid2proc_opt(proc,
- lcks,
- pid,
- lcks|ERTS_PROC_LOCK_STATUS,
- ERTS_P2P_FLG_ALLOW_OTHER_X);
- if (!proc) {
- *plcks = 0;
- return ERTS_MIGRATE_FAILED_NOT_IN_RUNQ;
- }
- ASSERT(proc == p);
- }
- *plcks |= ERTS_PROC_LOCK_STATUS;
- }
-
- ASSERT(!p->bound_runq);
-
- ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked);
-
- if (p->run_queue != from_rq)
- return ERTS_MIGRATE_FAILED_RUNQ_CHANGED;
-
- if (!*from_locked || !*to_locked) {
- if (from_rq < to_rq) {
- if (!*to_locked) {
- if (!*from_locked)
- erts_smp_runq_lock(from_rq);
- erts_smp_runq_lock(to_rq);
- }
- else if (erts_smp_runq_trylock(from_rq) == EBUSY) {
- erts_smp_runq_unlock(to_rq);
- erts_smp_runq_lock(from_rq);
- erts_smp_runq_lock(to_rq);
- }
- }
- else {
- if (!*from_locked) {
- if (!*to_locked)
- erts_smp_runq_lock(to_rq);
- erts_smp_runq_lock(from_rq);
- }
- else if (erts_smp_runq_trylock(to_rq) == EBUSY) {
- erts_smp_runq_unlock(from_rq);
- erts_smp_runq_lock(to_rq);
- erts_smp_runq_lock(from_rq);
- }
- }
- *to_locked = *from_locked = 1;
- }
-
- ERTS_SMP_LC_CHK_RUNQ_LOCK(from_rq, *from_locked);
- ERTS_SMP_LC_CHK_RUNQ_LOCK(to_rq, *to_locked);
-
- /* Ok we now got all locks we need; do it... */
-
- /* Refuse to migrate to a suspended run queue */
- if (to_rq->flags & ERTS_RUNQ_FLG_SUSPENDED)
- return ERTS_MIGRATE_FAILED_RUNQ_SUSPENDED;
-
- if ((p->runq_flags & ERTS_PROC_RUNQ_FLG_RUNNING)
- || !(p->status_flags & ERTS_PROC_SFLG_INRUNQ))
- return ERTS_MIGRATE_FAILED_NOT_IN_RUNQ;
-
- dequeue_process(from_rq, p);
- p->run_queue = to_rq;
- enqueue_process(to_rq, p);
-
- return ERTS_MIGRATE_SUCCESS;
-}
-#endif /* ERTS_SMP */
-
Eterm
erts_process_status(Process *c_p, ErtsProcLocks c_p_locks,
Process *rp, Eterm rpid)
{
Eterm res = am_undefined;
- Process *p;
-
- if (rp) {
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS
- & erts_proc_lc_my_proc_locks(rp));
- p = rp;
- }
- else {
- p = erts_pid2proc_opt(c_p, c_p_locks,
- rpid, ERTS_PROC_LOCK_STATUS,
- ERTS_P2P_FLG_ALLOW_OTHER_X);
- }
+ Process *p = rp ? rp : erts_proc_lookup_raw(rpid);
if (p) {
- switch (p->status) {
- case P_RUNABLE:
- res = am_runnable;
- break;
- case P_WAITING:
- res = am_waiting;
- break;
- case P_RUNNING:
- res = am_running;
- break;
- case P_EXITING:
+ erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state);
+ if (state & ERTS_PSFLG_FREE)
+ res = am_free;
+ else if (state & ERTS_PSFLG_EXITING)
res = am_exiting;
- break;
- case P_GARBING:
+ else if (state & ERTS_PSFLG_GC)
res = am_garbage_collecting;
- break;
- case P_SUSPENDED:
+ else if (state & ERTS_PSFLG_SUSPENDED)
res = am_suspended;
- break;
- case P_FREE: /* We cannot look up a process in P_FREE... */
- default: /* Not a valid status... */
- erl_exit(1, "Bad status (%b32u) found for process %T\n",
- p->status, p->id);
- break;
- }
-
-#ifdef ERTS_SMP
- if (!rp && (p != c_p || !(ERTS_PROC_LOCK_STATUS & c_p_locks)))
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
+ else if (state & ERTS_PSFLG_RUNNING)
+ res = am_running;
+ else if (state & ERTS_PSFLG_ACTIVE)
+ res = am_runnable;
+ else
+ res = am_waiting;
}
+#ifdef ERTS_SMP
else {
int i;
ErtsSchedulerData *esdp;
@@ -6272,42 +6202,40 @@ erts_process_status(Process *c_p, ErtsProcLocks c_p_locks,
}
erts_smp_runq_unlock(esdp->run_queue);
}
-
-#endif
-
}
-
+#endif
return res;
}
/*
-** Suspend a process
+** Suspend a currently executing process
** If we are to suspend on a port the busy_port is the thing
** otherwise busy_port is NIL
*/
void
-erts_suspend(Process* process, ErtsProcLocks process_locks, Port *busy_port)
+erts_suspend(Process* c_p, ErtsProcLocks c_p_locks, Port *busy_port)
{
- ErtsRunQueue *rq;
-
- ERTS_SMP_LC_ASSERT(process_locks == erts_proc_lc_my_proc_locks(process));
- if (!(process_locks & ERTS_PROC_LOCK_STATUS))
- erts_smp_proc_lock(process, ERTS_PROC_LOCK_STATUS);
-
- rq = erts_get_runq_proc(process);
-
- erts_smp_runq_lock(rq);
+#ifdef DEBUG
+ int res;
+#endif
+ ASSERT(c_p == erts_get_current_process());
+ ERTS_SMP_LC_ASSERT(c_p_locks == erts_proc_lc_my_proc_locks(c_p));
+ if (!(c_p_locks & ERTS_PROC_LOCK_STATUS))
+ erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS);
- suspend_process(rq, process);
+#ifdef DEBUG
+ res =
+#endif
+ suspend_process(c_p, c_p);
- erts_smp_runq_unlock(rq);
+ ASSERT(res);
if (busy_port)
- erts_wake_process_later(busy_port, process);
+ erts_wake_process_later(busy_port, c_p);
- if (!(process_locks & ERTS_PROC_LOCK_STATUS))
- erts_smp_proc_unlock(process, ERTS_PROC_LOCK_STATUS);
+ if (!(c_p_locks & ERTS_PROC_LOCK_STATUS))
+ erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS);
}
@@ -6348,57 +6276,54 @@ erts_resume_processes(ErtsProcList *plp)
Eterm
erts_get_process_priority(Process *p)
{
- ErtsRunQueue *rq;
- Eterm value;
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
- rq = erts_get_runq_proc(p);
- erts_smp_runq_lock(rq);
- switch(p->prio) {
- case PRIORITY_MAX: value = am_max; break;
- case PRIORITY_HIGH: value = am_high; break;
- case PRIORITY_NORMAL: value = am_normal; break;
- case PRIORITY_LOW: value = am_low; break;
- default: ASSERT(0); value = am_undefined; break;
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state);
+ switch (state & ERTS_PSFLG_PRIO_MASK) {
+ case PRIORITY_MAX: return am_max;
+ case PRIORITY_HIGH: return am_high;
+ case PRIORITY_NORMAL: return am_normal;
+ case PRIORITY_LOW: return am_low;
+ default: ASSERT(0); return am_undefined;
}
- erts_smp_runq_unlock(rq);
- return value;
}
Eterm
-erts_set_process_priority(Process *p, Eterm new_value)
+erts_set_process_priority(Process *p, Eterm value)
{
- ErtsRunQueue *rq;
- Eterm old_value;
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
- rq = erts_get_runq_proc(p);
-#ifdef ERTS_SMP
- ASSERT(!(p->status_flags & ERTS_PROC_SFLG_INRUNQ));
-#endif
- erts_smp_runq_lock(rq);
- switch(p->prio) {
- case PRIORITY_MAX: old_value = am_max; break;
- case PRIORITY_HIGH: old_value = am_high; break;
- case PRIORITY_NORMAL: old_value = am_normal; break;
- case PRIORITY_LOW: old_value = am_low; break;
- default: ASSERT(0); old_value = am_undefined; break;
- }
- switch (new_value) {
- case am_max: p->prio = PRIORITY_MAX; break;
- case am_high: p->prio = PRIORITY_HIGH; break;
- case am_normal: p->prio = PRIORITY_NORMAL; break;
- case am_low: p->prio = PRIORITY_LOW; break;
- default: old_value = THE_NON_VALUE; break;
+ erts_aint32_t a, oprio, nprio;
+
+ switch (value) {
+ case am_max: nprio = (erts_aint32_t) PRIORITY_MAX; break;
+ case am_high: nprio = (erts_aint32_t) PRIORITY_HIGH; break;
+ case am_normal: nprio = (erts_aint32_t) PRIORITY_NORMAL; break;
+ case am_low: nprio = (erts_aint32_t) PRIORITY_LOW; break;
+ default: return THE_NON_VALUE; break;
}
- erts_smp_runq_unlock(rq);
- return old_value;
-}
-/* note that P_RUNNING is only set so that we don't try to remove
-** running processes from the schedule queue if they exit - a running
-** process not being in the schedule queue!!
-** Schedule for up to INPUT_REDUCTIONS context switches,
-** return 1 if more to do.
-*/
+ a = erts_smp_atomic32_read_nob(&p->state);
+ if (nprio == (a & ERTS_PSFLG_PRIO_MASK))
+ oprio = nprio;
+ else {
+ erts_aint32_t e, n;
+ do {
+ oprio = a & ERTS_PSFLG_PRIO_MASK;
+ n = e = a;
+
+ ASSERT(!(a & ERTS_PSFLG_IN_RUNQ));
+
+ n &= ~ERTS_PSFLG_PRIO_MASK;
+ n |= nprio;
+ a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e);
+ } while (a != e);
+ }
+
+ switch (oprio) {
+ case PRIORITY_MAX: return am_max;
+ case PRIORITY_HIGH: return am_high;
+ case PRIORITY_NORMAL: return am_normal;
+ case PRIORITY_LOW: return am_low;
+ default: ASSERT(0); return am_undefined;
+ }
+}
/*
* schedule() is called from BEAM (process_main()) or HiPE
@@ -6421,7 +6346,6 @@ erts_set_process_priority(Process *p, Eterm new_value)
Process *schedule(Process *p, int calls)
{
ErtsRunQueue *rq;
- ErtsRunPrioQueue *rpq;
erts_aint_t dt;
ErtsSchedulerData *esdp;
int context_reds;
@@ -6429,6 +6353,8 @@ Process *schedule(Process *p, int calls)
int input_reductions;
int actual_reds;
int reds;
+ Uint32 flags;
+ erts_aint32_t state = 0; /* Supress warning... */
#ifdef USE_VM_PROBES
if (p != NULL && DTRACE_ENABLED(process_unscheduled)) {
@@ -6484,68 +6410,49 @@ Process *schedule(Process *p, int calls)
erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS);
- if ((erts_system_profile_flags.runnable_procs)
- && (p->status == P_WAITING)) {
- profile_runnable_proc(p, am_inactive);
- }
+ state = erts_smp_atomic32_read_acqb(&p->state);
if (IS_TRACED(p)) {
- if (IS_TRACED_FL(p, F_TRACE_CALLS) && p->status != P_FREE) {
+ if (IS_TRACED_FL(p, F_TRACE_CALLS) && !(state & ERTS_PSFLG_FREE)) {
erts_schedule_time_break(p, ERTS_BP_CALL_TIME_SCHEDULE_OUT);
}
- switch (p->status) {
- case P_EXITING:
- if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_EXIT))
- trace_sched(p, am_out_exiting);
- break;
- case P_FREE:
+ if (state & (ERTS_PSFLG_FREE|ERTS_PSFLG_EXITING)) {
if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_EXIT))
- trace_sched(p, am_out_exited);
- break;
- default:
+ trace_sched(p, ((state & ERTS_PSFLG_FREE)
+ ? am_out_exited
+ : am_out_exiting));
+ }
+ else {
if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED))
trace_sched(p, am_out);
else if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_PROCS))
trace_virtual_sched(p, am_out);
- break;
}
}
#ifdef ERTS_SMP
- if (ERTS_PROC_PENDING_EXIT(p)) {
- erts_handle_pending_exit(p,
- ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
- p->status_flags |= ERTS_PROC_SFLG_PENDADD2SCHEDQ;
- }
-
- if (p->pending_suspenders) {
- handle_pending_suspend(p,
- ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
- ASSERT(!(p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ)
- || p->rcount == 0);
- }
+ if (state & ERTS_PSFLG_PENDING_EXIT)
+ erts_handle_pending_exit(p, (ERTS_PROC_LOCK_MAIN
+ | ERTS_PROC_LOCK_STATUS));
+ if (p->pending_suspenders)
+ handle_pending_suspend(p, (ERTS_PROC_LOCK_MAIN
+ | ERTS_PROC_LOCK_STATUS));
#endif
- erts_smp_runq_lock(rq);
- ERTS_PROC_REDUCTIONS_EXECUTED(rq, p->prio, reds, actual_reds);
+ schedule_out_process(rq, state, p); /* Returns with rq locked! */
+
+ ERTS_PROC_REDUCTIONS_EXECUTED(rq,
+ (int) (state & ERTS_PSFLG_PRIO_MASK),
+ reds,
+ actual_reds);
esdp->current_process = NULL;
#ifdef ERTS_SMP
p->scheduler_data = NULL;
- p->runq_flags &= ~ERTS_PROC_RUNQ_FLG_RUNNING;
- p->status_flags &= ~ERTS_PROC_SFLG_RUNNING;
-
- if (p->status_flags & ERTS_PROC_SFLG_PENDADD2SCHEDQ) {
- ErtsRunQueue *notify_runq;
- p->status_flags &= ~ERTS_PROC_SFLG_PENDADD2SCHEDQ;
- notify_runq = internal_add_to_runq(rq, p);
- if (notify_runq != rq)
- smp_notify_inc_runq(notify_runq);
- }
#endif
- if (p->status == P_FREE) {
+ if (state & ERTS_PSFLG_FREE) {
#ifdef ERTS_SMP
ASSERT(esdp->free_process == p);
esdp->free_process = NULL;
@@ -6576,8 +6483,9 @@ Process *schedule(Process *p, int calls)
ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
check_activities_to_run: {
-
#ifdef ERTS_SMP
+ ErtsMigrationPaths *mps;
+ ErtsMigrationPath *mp;
#ifdef ERTS_SMP
{
@@ -6599,20 +6507,28 @@ Process *schedule(Process *p, int calls)
ERTS_SMP_LC_ASSERT(!erts_thr_progress_is_blocking());
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
- if (rq->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK)
- immigrate(rq);
+ mps = erts_get_migration_paths_managed();
+ mp = &mps->mpath[rq->ix];
+
+ if (mp->flags & ERTS_RUNQ_FLGS_IMMIGRATE_QMASK)
+ immigrate(rq, mp);
- continue_check_activities_to_run:
+ continue_check_activities_to_run:
+ flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
+ continue_check_activities_to_run_known_flags:
- if (rq->flags & (ERTS_RUNQ_FLG_CHK_CPU_BIND
- | ERTS_RUNQ_FLG_SUSPENDED)) {
- if (rq->flags & ERTS_RUNQ_FLG_SUSPENDED) {
- ASSERT(erts_smp_atomic32_read_nob(&esdp->ssi->flags)
- & ERTS_SSI_FLG_SUSPENDED);
+
+ if (flags & (ERTS_RUNQ_FLG_CHK_CPU_BIND|ERTS_RUNQ_FLG_SUSPENDED)) {
+
+ if (flags & ERTS_RUNQ_FLG_SUSPENDED) {
suspend_scheduler(esdp);
+ flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
}
- if (rq->flags & ERTS_RUNQ_FLG_CHK_CPU_BIND)
+ if (flags & ERTS_RUNQ_FLG_CHK_CPU_BIND) {
+ flags = ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_CHK_CPU_BIND);
+ flags &= ~ ERTS_RUNQ_FLG_CHK_CPU_BIND;
erts_sched_check_cpu_bind(esdp);
+ }
}
{
@@ -6641,14 +6557,12 @@ Process *schedule(Process *p, int calls)
}
#endif /* ERTS_SMP */
- ASSERT(rq->len == rq->procs.len + rq->ports.info.len);
-
- if ((rq->len == 0 && !rq->misc.start)
- || (rq->halt_in_progress
- && rq->ports.info.len == 0 && !rq->misc.start)) {
+ flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
+ if ((!(flags & ERTS_RUNQ_FLGS_QMASK) && !rq->misc.start)
+ || (rq->halt_in_progress && ERTS_EMPTY_RUNQ_PORTS(rq))) {
+ /* Prepare for scheduler wait */
#ifdef ERTS_SMP
-
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
rq->wakeup_other = 0;
@@ -6656,21 +6570,27 @@ Process *schedule(Process *p, int calls)
empty_runq(rq);
- if (rq->flags & ERTS_RUNQ_FLG_SUSPENDED) {
- ASSERT(erts_smp_atomic32_read_nob(&esdp->ssi->flags)
- & ERTS_SSI_FLG_SUSPENDED);
+ flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
+ if (flags & ERTS_RUNQ_FLG_SUSPENDED) {
non_empty_runq(rq);
- goto continue_check_activities_to_run;
+ goto continue_check_activities_to_run_known_flags;
}
- else if (!(rq->flags & ERTS_RUNQ_FLG_INACTIVE)) {
+ else if (!(flags & ERTS_RUNQ_FLG_INACTIVE)) {
+ if (try_steal_task(rq)) {
+ non_empty_runq(rq);
+ goto continue_check_activities_to_run;
+ }
+
+ ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
+
/*
* Check for ERTS_RUNQ_FLG_SUSPENDED has to be done
* after trying to steal a task.
*/
- if (try_steal_task(rq)
- || (rq->flags & ERTS_RUNQ_FLG_SUSPENDED)) {
+ flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
+ if (flags & ERTS_RUNQ_FLG_SUSPENDED) {
non_empty_runq(rq);
- goto continue_check_activities_to_run;
+ goto continue_check_activities_to_run_known_flags;
}
}
@@ -6701,6 +6621,7 @@ Process *schedule(Process *p, int calls)
erl_sys_schedule(1);
dt = erts_do_time_read_and_reset();
if (dt) erts_bump_timer(dt);
+
#ifdef ERTS_SMP
erts_smp_runq_lock(rq);
clear_sys_scheduling();
@@ -6717,14 +6638,17 @@ Process *schedule(Process *p, int calls)
{
int wo_reds = rq->wakeup_other_reds;
if (wo_reds) {
- if (rq->len < 2) {
+ erts_aint32_t len = rq->len;
+ if (len < 2) {
rq->wakeup_other -= ERTS_WAKEUP_OTHER_DEC*wo_reds;
if (rq->wakeup_other < 0)
rq->wakeup_other = 0;
}
else if (rq->wakeup_other < wakeup_other_limit)
- rq->wakeup_other += rq->len*wo_reds + ERTS_WAKEUP_OTHER_FIXED_INC;
+ rq->wakeup_other += len*wo_reds + ERTS_WAKEUP_OTHER_FIXED_INC;
else {
+ if (flags & ERTS_RUNQ_FLG_PROTECTED)
+ ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
if (erts_smp_atomic32_read_acqb(&no_empty_run_queues) != 0) {
wake_scheduler_on_empty_runq(rq);
rq->wakeup_other = 0;
@@ -6740,7 +6664,7 @@ Process *schedule(Process *p, int calls)
* Find a new port to run.
*/
- if (rq->ports.info.len) {
+ if (RUNQ_READ_LEN(&rq->ports.info.len)) {
int have_outstanding_io;
have_outstanding_io = erts_port_task_execute(rq, &esdp->current_port);
if ((have_outstanding_io && fcalls > 2*input_reductions)
@@ -6767,160 +6691,123 @@ Process *schedule(Process *p, int calls)
/*
* Find a new process to run.
*/
- pick_next_process:
-
- ERTS_DBG_CHK_PROCS_RUNQ(rq);
-
- switch (rq->flags & ERTS_RUNQ_FLGS_PROCS_QMASK) {
- case MAX_BIT:
- case MAX_BIT|HIGH_BIT:
- case MAX_BIT|NORMAL_BIT:
- case MAX_BIT|LOW_BIT:
- case MAX_BIT|HIGH_BIT|NORMAL_BIT:
- case MAX_BIT|HIGH_BIT|LOW_BIT:
- case MAX_BIT|NORMAL_BIT|LOW_BIT:
- case MAX_BIT|HIGH_BIT|NORMAL_BIT|LOW_BIT:
- rpq = &rq->procs.prio[PRIORITY_MAX];
- break;
- case HIGH_BIT:
- case HIGH_BIT|NORMAL_BIT:
- case HIGH_BIT|LOW_BIT:
- case HIGH_BIT|NORMAL_BIT|LOW_BIT:
- rpq = &rq->procs.prio[PRIORITY_HIGH];
- break;
- case NORMAL_BIT:
- rpq = &rq->procs.prio[PRIORITY_NORMAL];
- break;
- case LOW_BIT:
- rpq = &rq->procs.prio[PRIORITY_NORMAL];
- break;
- case NORMAL_BIT|LOW_BIT:
- rpq = &rq->procs.prio[PRIORITY_NORMAL];
- ASSERT(rpq->first != NULL);
- p = rpq->first;
- if (p->prio == PRIORITY_LOW) {
- if (p == rpq->last || p->skipped >= RESCHEDULE_LOW-1)
- p->skipped = 0;
- else {
- /* skip it */
- p->skipped++;
- rpq->first = p->next;
- rpq->first->prev = NULL;
- rpq->last->next = p;
- p->prev = rpq->last;
- p->next = NULL;
- rpq->last = p;
+ pick_next_process: {
+ int prio_q;
+ int qmask;
+
+ flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
+ qmask = (int) (flags & ERTS_RUNQ_FLGS_PROCS_QMASK);
+ switch (qmask & -qmask) {
+ case MAX_BIT:
+ prio_q = PRIORITY_MAX;
+ break;
+ case HIGH_BIT:
+ prio_q = PRIORITY_HIGH;
+ break;
+ case NORMAL_BIT:
+ case LOW_BIT:
+ prio_q = PRIORITY_NORMAL;
+ if (check_requeue_process(rq, PRIORITY_NORMAL))
goto pick_next_process;
- }
+ break;
+ case 0: /* No process at all */
+ default:
+ ASSERT(qmask == 0);
+ goto check_activities_to_run;
}
- break;
- case 0: /* No process at all */
- default:
- ASSERT((rq->flags & ERTS_RUNQ_FLGS_PROCS_QMASK) == 0);
- ASSERT(rq->procs.len == 0);
- goto check_activities_to_run;
- }
-
- BM_START_TIMER(system);
-
- /*
- * Take the chosen process out of the queue.
- */
- ASSERT(rpq->first); /* Wrong qmask in rq->flags? */
- p = rpq->first;
-#ifdef ERTS_SMP
- ERTS_SMP_LC_ASSERT(rq == p->run_queue);
-#endif
- rpq->first = p->next;
- if (!rpq->first)
- rpq->last = NULL;
- else
- rpq->first->prev = NULL;
- p->next = p->prev = NULL;
+ BM_START_TIMER(system);
- if (--rq->procs.prio_info[p->prio].len == 0)
- rq->flags &= ~(1 << p->prio);
- ASSERT(rq->procs.len > 0);
- rq->procs.len--;
- ASSERT(rq->len > 0);
- rq->len--;
+ /*
+ * Take the chosen process out of the queue.
+ */
+ p = dequeue_process(rq, prio_q, &state);
- {
- Uint32 ee_flgs = (ERTS_RUNQ_FLG_EVACUATE(p->prio)
- | ERTS_RUNQ_FLG_EMIGRATE(p->prio));
+ ASSERT(p); /* Wrong qmask in rq->flags? */
- if ((rq->flags & (ERTS_RUNQ_FLG_SUSPENDED|ee_flgs)) == ee_flgs)
- ERTS_UNSET_RUNQ_FLG_EVACUATE(rq->flags, p->prio);
- }
+ while (1) {
+ erts_aint32_t exp, new, tmp;
+ tmp = new = exp = state;
+ new &= ~ERTS_PSFLG_IN_RUNQ;
+ tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT);
+ if (tmp != ERTS_PSFLG_SUSPENDED)
+ new |= ERTS_PSFLG_RUNNING;
+ state = erts_smp_atomic32_cmpxchg_relb(&p->state, new, exp);
+ if (state == exp) {
+ tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT);
+ if (tmp == ERTS_PSFLG_SUSPENDED)
+ goto pick_next_process;
+ state = new;
+ break;
+ }
+ }
- ERTS_DBG_CHK_PROCS_RUNQ_NOPROC(rq, p);
+ rq->procs.context_switches++;
- rq->procs.context_switches++;
+ esdp->current_process = p;
- esdp->current_process = p;
+ }
#ifdef ERTS_SMP
- p->runq_flags |= ERTS_PROC_RUNQ_FLG_RUNNING;
erts_smp_runq_unlock(rq);
+ if (flags & ERTS_RUNQ_FLG_PROTECTED)
+ ERTS_RUNQ_FLGS_UNSET(rq, ERTS_RUNQ_FLG_PROTECTED);
+
ERTS_SMP_CHK_NO_PROC_LOCKS;
erts_smp_proc_lock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
if (erts_sched_stat.enabled) {
+ int prio;
UWord old = ERTS_PROC_SCHED_ID(p,
(ERTS_PROC_LOCK_MAIN
| ERTS_PROC_LOCK_STATUS),
(UWord) esdp->no);
int migrated = old && old != esdp->no;
+ prio = (int) (state & ERTS_PSFLG_PRIO_MASK);
+
erts_smp_spin_lock(&erts_sched_stat.lock);
- erts_sched_stat.prio[p->prio].total_executed++;
- erts_sched_stat.prio[p->prio].executed++;
+ erts_sched_stat.prio[prio].total_executed++;
+ erts_sched_stat.prio[prio].executed++;
if (migrated) {
- erts_sched_stat.prio[p->prio].total_migrated++;
- erts_sched_stat.prio[p->prio].migrated++;
+ erts_sched_stat.prio[prio].total_migrated++;
+ erts_sched_stat.prio[prio].migrated++;
}
erts_smp_spin_unlock(&erts_sched_stat.lock);
}
- p->status_flags |= ERTS_PROC_SFLG_RUNNING;
- p->status_flags &= ~ERTS_PROC_SFLG_INRUNQ;
if (ERTS_PROC_PENDING_EXIT(p)) {
erts_handle_pending_exit(p,
ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
+ state = erts_smp_atomic32_read_nob(&p->state);
}
ASSERT(!p->scheduler_data);
p->scheduler_data = esdp;
-
#endif
- ASSERT(p->status != P_SUSPENDED); /* Never run a suspended process */
+ /* Never run a suspended process */
+ ASSERT(!(ERTS_PSFLG_SUSPENDED & erts_smp_atomic32_read_nob(&p->state)));
ACTIVATE(p);
reds = context_reds;
if (IS_TRACED(p)) {
- switch (p->status) {
- case P_EXITING:
+ if (state & ERTS_PSFLG_EXITING) {
if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_EXIT))
trace_sched(p, am_in_exiting);
- break;
- default:
+ }
+ else {
if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED))
trace_sched(p, am_in);
else if (ARE_TRACE_FLAGS_ON(p, F_TRACE_SCHED_PROCS))
trace_virtual_sched(p, am_in);
- break;
}
if (IS_TRACED_FL(p, F_TRACE_CALLS)) {
erts_schedule_time_break(p, ERTS_BP_CALL_TIME_SCHEDULE_IN);
}
}
- if (p->status != P_EXITING)
- p->status = P_RUNNING;
-
erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
#ifdef ERTS_SMP
@@ -6928,7 +6815,7 @@ Process *schedule(Process *p, int calls)
erts_check_my_tracer_proc(p);
#endif
- if (!ERTS_PROC_IS_EXITING(p)
+ if (!(state & ERTS_PSFLG_EXITING)
&& ((FLAGS(p) & F_FORCE_GC)
|| (MSO(p).overhead > BIN_VHEAP_SZ(p)))) {
reds -= erts_garbage_collect(p, 0, p->arg_reg, p->arity);
@@ -7020,17 +6907,19 @@ erts_schedule_misc_op(void (*func)(void *), void *arg)
ErtsSchedulerData *esdp = erts_get_scheduler_data();
ErtsRunQueue *rq = esdp ? esdp->run_queue : ERTS_RUNQ_IX(0);
ErtsMiscOpList *molp = misc_op_list_alloc();
+#ifdef ERTS_SMP
+ ErtsMigrationPaths *mpaths = erts_get_migration_paths();
- erts_smp_runq_lock(rq);
-
- while (rq->misc.evac_runq) {
- ErtsRunQueue *tmp_rq = rq->misc.evac_runq;
- erts_smp_runq_unlock(rq);
- rq = tmp_rq;
- erts_smp_runq_lock(rq);
+ if (!mpaths)
+ rq = ERTS_RUNQ_IX(0);
+ else {
+ ErtsRunQueue *erq = mpaths->mpath[rq->ix].misc_evac_runq;
+ if (erq)
+ rq = erq;
}
+#endif
- ASSERT(!(rq->flags & ERTS_RUNQ_FLG_SUSPENDED));
+ erts_smp_runq_lock(rq);
molp->next = NULL;
molp->func = func;
@@ -7040,7 +6929,9 @@ erts_schedule_misc_op(void (*func)(void *), void *arg)
else
rq->misc.start = molp;
rq->misc.end = molp;
+
erts_smp_runq_unlock(rq);
+
smp_notify_inc_runq(rq);
}
@@ -7201,7 +7092,7 @@ erts_free_proc(Process *p)
** Allocate process and find out where to place next process.
*/
static Process*
-alloc_process(void)
+alloc_process(ErtsRunQueue *rq, erts_aint32_t state)
{
int pix;
Process* p;
@@ -7280,6 +7171,12 @@ alloc_process(void)
exp_lpd = act_lpd;
}
+#ifdef ERTS_SMP
+ RUNQ_SET_RQ(&p->run_queue, rq);
+#endif
+
+ erts_smp_atomic32_init_relb(&p->state, state);
+
#ifdef DEBUG
pid = p->id;
#endif
@@ -7303,7 +7200,6 @@ alloc_process(void)
erts_smp_rwmtx_runlock(&erts_proc_tab_rwmtx);
- p->rstatus = P_FREE;
p->rcount = 0;
ASSERT(p == (Process *)
@@ -7317,6 +7213,7 @@ system_limit:
erts_smp_rwmtx_runlock(&erts_proc_tab_rwmtx);
return NULL;
+
}
Eterm
@@ -7326,7 +7223,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
Eterm args, /* Arguments for function (must be well-formed list). */
ErlSpawnOpts* so) /* Options for spawn. */
{
- ErtsRunQueue *rq, *notify_runq;
+ ErtsRunQueue *rq = NULL;
Process *p;
Sint arity; /* Number of arguments. */
#ifndef HYBRID
@@ -7335,6 +7232,8 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
Uint sz; /* Needed words on heap. */
Uint heap_need; /* Size needed on heap. */
Eterm res = THE_NON_VALUE;
+ erts_aint32_t state = 0;
+ erts_aint32_t prio = (erts_aint32_t) PRIORITY_NORMAL;
#ifdef ERTS_SMP
erts_smp_proc_lock(parent, ERTS_PROC_LOCKS_ALL_MINOR);
@@ -7359,8 +7258,24 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
so->error_code = BADARG;
goto error;
}
- p = alloc_process(); /* All proc locks are locked by this thread
- on success */
+
+ if (so->flags & SPO_USE_ARGS) {
+ if (so->scheduler) {
+ int ix = so->scheduler-1;
+ ASSERT(0 <= ix && ix < erts_no_run_queues);
+ rq = ERTS_RUNQ_IX(ix);
+ state |= ERTS_PSFLG_BOUND;
+ }
+ prio = (erts_aint32_t) so->priority;
+ }
+
+ state |= (prio & ERTS_PSFLG_PRIO_MASK);
+
+ if (!rq)
+ rq = erts_get_runq_proc(parent);
+
+ p = alloc_process(rq, state); /* All proc locks are locked by this thread
+ on success */
if (!p) {
erts_send_error_to_logger_str(parent->group_leader,
"Too many processes\n");
@@ -7382,22 +7297,16 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
p->flags = erts_default_process_flags;
- /* Scheduler queue mutex should be locked when changeing
- * prio. In this case we don't have to lock it, since
- * noone except us has access to the process.
- */
if (so->flags & SPO_USE_ARGS) {
p->min_heap_size = so->min_heap_size;
p->min_vheap_size = so->min_vheap_size;
- p->prio = so->priority;
p->max_gen_gcs = so->max_gen_gcs;
} else {
p->min_heap_size = H_MIN_SIZE;
p->min_vheap_size = BIN_VH_MIN_SIZE;
- p->prio = PRIORITY_NORMAL;
p->max_gen_gcs = (Uint16) erts_smp_atomic32_read_nob(&erts_max_gen_gcs);
}
- p->skipped = 0;
+ p->schedule_count = 0;
ASSERT(p->min_heap_size == erts_next_heap_size(p->min_heap_size, 0));
p->initial[INITIAL_MOD] = mod;
@@ -7501,12 +7410,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
: STORE_NC(&p->htop, &p->off_heap, parent->group_leader);
}
-#if 1
- p->trace_flags = ~TRACEE_FLAGS;
- p->tracer_proc = NIL;
-#else
erts_get_default_tracing(&p->trace_flags, &p->tracer_proc);
-#endif
p->msg.first = NULL;
p->msg.last = &p->msg.first;
@@ -7516,7 +7420,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
p->msg_inq.first = NULL;
p->msg_inq.last = &p->msg_inq.first;
p->msg_inq.len = 0;
- p->bound_runq = NULL;
#endif
p->bif_timers = NULL;
p->mbuf = NULL;
@@ -7618,9 +7521,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
#ifdef ERTS_SMP
p->scheduler_data = NULL;
- erts_atomic32_init_nob(&p->aflags, 0);
- p->status_flags = 0;
- p->runq_flags = 0;
p->suspendee = NIL;
p->pending_suspenders = NULL;
p->pending_exit.reason = THE_NON_VALUE;
@@ -7631,34 +7531,15 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
p->fp_exception = 0;
#endif
+ erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL);
+
+ res = p->id;
+
/*
* Schedule process for execution.
*/
- if (!((so->flags & SPO_USE_ARGS) && so->scheduler))
- rq = erts_get_runq_proc(parent);
- else {
- int ix = so->scheduler-1;
- ASSERT(0 <= ix && ix < erts_no_run_queues);
- rq = ERTS_RUNQ_IX(ix);
- p->bound_runq = rq;
- }
-
- erts_smp_runq_lock(rq);
-
-#ifdef ERTS_SMP
- p->run_queue = rq;
-#endif
-
- p->status = P_WAITING;
- notify_runq = internal_add_to_runq(rq, p);
-
- erts_smp_runq_unlock(rq);
-
- smp_notify_inc_runq(notify_runq);
-
- res = p->id;
- erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL);
+ schedule_process(p, state, 0);
VERBOSE(DEBUG_PROCESSES, ("Created a new process: %T\n",p->id));
@@ -7694,12 +7575,8 @@ void erts_init_empty_process(Process *p)
p->max_gen_gcs = 0;
p->min_heap_size = 0;
p->min_vheap_size = 0;
- p->status = P_RUNABLE;
- p->gcstatus = P_RUNABLE;
- p->rstatus = P_RUNABLE;
p->rcount = 0;
p->id = ERTS_INVALID_PID;
- p->prio = PRIORITY_NORMAL;
p->reds = 0;
p->tracer_proc = NIL;
p->trace_flags = F_INITIAL_TRACE_FLAGS;
@@ -7716,7 +7593,6 @@ void erts_init_empty_process(Process *p)
p->bin_vheap_mature = 0;
#ifdef ERTS_SMP
p->u.ptimer = NULL;
- p->bound_runq = NULL;
#else
memset(&(p->u.tm), 0, sizeof(ErlTimer));
#endif
@@ -7793,12 +7669,10 @@ void erts_init_empty_process(Process *p)
p->last_old_htop = NULL;
#endif
+ erts_smp_atomic32_init_nob(&p->state, (erts_aint32_t) PRIORITY_NORMAL);
#ifdef ERTS_SMP
p->scheduler_data = NULL;
- erts_atomic32_init_nob(&p->aflags, 0);
- p->status_flags = 0;
- p->runq_flags = 0;
p->msg_inq.first = NULL;
p->msg_inq.last = &p->msg_inq.first;
p->msg_inq.len = 0;
@@ -7808,7 +7682,7 @@ void erts_init_empty_process(Process *p)
p->pending_exit.bp = NULL;
erts_proc_lock_init(p);
erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL);
- p->run_queue = ERTS_RUNQ_IX(0);
+ RUNQ_SET_RQ(&p->run_queue, ERTS_RUNQ_IX(0));
#endif
#if !defined(NO_FPE_SIGNALS) || defined(HIPE)
@@ -8003,26 +7877,34 @@ delete_process(Process* p)
}
+static ERTS_INLINE erts_aint32_t
+set_proc_exiting_state(Process *p, erts_aint32_t state)
+{
+ erts_aint32_t a, n, e;
+ a = state;
+ while (1) {
+ n = e = a;
+ n &= ~(ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT);
+ n |= ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE;
+ if (!(a & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING)))
+ n |= ERTS_PSFLG_IN_RUNQ;
+ a = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e);
+ if (a == e)
+ break;
+ }
+ return a;
+}
+
static ERTS_INLINE void
-set_proc_exiting(Process *p, Eterm reason, ErlHeapFragment *bp)
+set_proc_exiting(Process *p,
+ erts_aint32_t state,
+ Eterm reason,
+ ErlHeapFragment *bp)
{
-#ifdef ERTS_SMP
- erts_pix_lock_t *pix_lock = ERTS_PID2PIXLOCK(p->id);
ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(p) == ERTS_PROC_LOCKS_ALL);
- /*
- * You are required to have all proc locks and the pix lock when going
- * to status P_EXITING. This makes it is enough to take any lock when
- * looking up a process (pid2proc()) to prevent the looked up process
- * from exiting until the lock has been released.
- */
- erts_pix_lock(pix_lock);
- erts_atomic32_read_bor_relb(&p->aflags, ERTS_PROC_AFLG_EXITING);
-#endif
- p->status = P_EXITING;
-#ifdef ERTS_SMP
- erts_pix_unlock(pix_lock);
-#endif
+ state = set_proc_exiting_state(p, state);
+
p->fvalue = reason;
if (bp)
erts_link_mbuf_to_proc(p, bp);
@@ -8035,6 +7917,14 @@ set_proc_exiting(Process *p, Eterm reason, ErlHeapFragment *bp)
KILL_CATCHES(p);
cancel_timer(p);
p->i = (BeamInstr *) beam_exit;
+
+ if (erts_system_profile_flags.runnable_procs
+ && !(state & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) {
+ profile_runnable_proc(p, am_active);
+ }
+
+ if (!(state & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING)))
+ add2runq(p, state);
}
@@ -8047,8 +7937,8 @@ erts_handle_pending_exit(Process *c_p, ErtsProcLocks locks)
ASSERT(is_value(c_p->pending_exit.reason));
ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == locks);
ERTS_SMP_LC_ASSERT(locks & ERTS_PROC_LOCK_MAIN);
- ERTS_SMP_LC_ASSERT(c_p->status != P_EXITING);
- ERTS_SMP_LC_ASSERT(c_p->status != P_FREE);
+ ERTS_SMP_LC_ASSERT(!((ERTS_PSFLG_EXITING|ERTS_PSFLG_FREE)
+ & erts_smp_atomic32_read_nob(&c_p->state)));
/* Ensure that all locks on c_p are locked before proceeding... */
if (locks == ERTS_PROC_LOCKS_ALL)
@@ -8061,7 +7951,10 @@ erts_handle_pending_exit(Process *c_p, ErtsProcLocks locks)
}
}
- set_proc_exiting(c_p, c_p->pending_exit.reason, c_p->pending_exit.bp);
+ set_proc_exiting(c_p,
+ erts_smp_atomic32_read_acqb(&c_p->state),
+ c_p->pending_exit.reason,
+ c_p->pending_exit.bp);
c_p->pending_exit.reason = THE_NON_VALUE;
c_p->pending_exit.bp = NULL;
@@ -8077,11 +7970,12 @@ handle_pending_exiters(ErtsProcList *pnd_xtrs)
while (plp) {
Process *p = erts_pid2proc(NULL, 0, plp->pid, ERTS_PROC_LOCKS_ALL);
if (p) {
- if (proclist_same(plp, p)
- && !(p->status_flags & ERTS_PROC_SFLG_RUNNING)) {
- ASSERT(p->status_flags & ERTS_PROC_SFLG_INRUNQ);
- ASSERT(ERTS_PROC_PENDING_EXIT(p));
- erts_handle_pending_exit(p, ERTS_PROC_LOCKS_ALL);
+ if (proclist_same(plp, p)) {
+ erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state);
+ if (!(state & ERTS_PSFLG_RUNNING)) {
+ ASSERT(state & ERTS_PSFLG_PENDING_EXIT);
+ erts_handle_pending_exit(p, ERTS_PROC_LOCKS_ALL);
+ }
}
erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL);
}
@@ -8171,7 +8065,7 @@ send_exit_message(Process *to, ErtsProcLocks *to_locksp,
* SMP emulator). When the signal is received the receiver receives an
* 'EXIT' message if it is trapping exits; otherwise, it will either
* ignore the signal if the exit reason is normal, or go into an
- * exiting state (status P_EXITING). When a process has gone into the
+ * exiting state (ERTS_PSFLG_EXITING). When a process has gone into the
* exiting state it will not execute any more Erlang code, but it might
* take a while before it actually exits. The exit signal is being
* received when the 'EXIT' message is put in the message queue, the
@@ -8244,6 +8138,7 @@ send_exit_signal(Process *c_p, /* current process if and only
Uint32 flags /* flags */
)
{
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&rp->state);
Eterm rsn = reason == am_kill ? am_killed : reason;
ERTS_SMP_LC_ASSERT(*rp_locks == erts_proc_lc_my_proc_locks(rp));
@@ -8265,7 +8160,7 @@ send_exit_signal(Process *c_p, /* current process if and only
}
#endif
- if (ERTS_PROC_IS_TRAPPING_EXITS(rp)
+ if ((state & ERTS_PSFLG_TRAP_EXIT)
&& (reason != am_kill || (flags & ERTS_XSIG_FLG_IGN_KILL))) {
if (is_not_nil(token)
#ifdef USE_VM_PROBES
@@ -8281,9 +8176,7 @@ send_exit_signal(Process *c_p, /* current process if and only
}
else if (reason != am_normal || (flags & ERTS_XSIG_FLG_NO_IGN_NORMAL)) {
#ifdef ERTS_SMP
- if (!ERTS_PROC_PENDING_EXIT(rp) && !ERTS_PROC_IS_EXITING(rp)) {
- ASSERT(rp->status != P_EXITING);
- ASSERT(rp->status != P_FREE);
+ if (!(state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))) {
ASSERT(!rp->pending_exit.bp);
if (rp == c_p && (*rp_locks & ERTS_PROC_LOCK_MAIN)) {
@@ -8299,9 +8192,9 @@ send_exit_signal(Process *c_p, /* current process if and only
}
*rp_locks = ERTS_PROC_LOCKS_ALL;
}
- set_proc_exiting(c_p, rsn, NULL);
+ set_proc_exiting(c_p, state, rsn, NULL);
}
- else if (!(rp->status_flags & ERTS_PROC_SFLG_RUNNING)) {
+ else if (!(state & ERTS_PSFLG_RUNNING)) {
/* Process not running ... */
ErtsProcLocks need_locks = ~(*rp_locks) & ERTS_PROC_LOCKS_ALL;
if (need_locks
@@ -8318,6 +8211,7 @@ send_exit_signal(Process *c_p, /* current process if and only
/* ...and we have all locks on it... */
*rp_locks = ERTS_PROC_LOCKS_ALL;
set_proc_exiting(rp,
+ state,
(is_immed(rsn)
? rsn
: copy_object(rsn, rp)),
@@ -8347,13 +8241,9 @@ send_exit_signal(Process *c_p, /* current process if and only
&bp->off_heap);
rp->pending_exit.bp = bp;
}
- erts_atomic32_read_bor_relb(&rp->aflags,
- ERTS_PROC_AFLG_PENDING_EXIT);
- ASSERT(ERTS_PROC_PENDING_EXIT(rp));
+ erts_smp_atomic32_read_bor_relb(&rp->state,
+ ERTS_PSFLG_PENDING_EXIT);
}
- if (!(rp->status_flags
- & (ERTS_PROC_SFLG_INRUNQ|ERTS_PROC_SFLG_RUNNING)))
- erts_add_to_runq(rp);
}
/* else:
*
@@ -8365,18 +8255,15 @@ send_exit_signal(Process *c_p, /* current process if and only
* exit or by itself before seeing the pending exit.
*/
#else /* !ERTS_SMP */
- if (c_p == rp) {
- rp->status = P_EXITING;
- c_p->fvalue = rsn;
- }
- else if (rp->status != P_EXITING) { /* No recursive process exits /PaN */
- Eterm old_status = rp->status;
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&rp->state);
+ if (!(state & ERTS_PSFLG_EXITING)) {
set_proc_exiting(rp,
- is_immed(rsn) ? rsn : copy_object(rsn, rp),
+ state,
+ (is_immed(rsn) || c_p == rp
+ ? rsn
+ : copy_object(rsn, rp)),
NULL);
ACTIVATE(rp);
- if (old_status != P_RUNABLE && old_status != P_RUNNING)
- erts_add_to_runq(rp);
}
#endif
return -1; /* Receiver will exit */
@@ -8688,22 +8575,14 @@ proc_dec_refc(void *vproc)
#endif
-static void
-continue_exit_process(Process *p
-#ifdef ERTS_SMP
- , erts_pix_lock_t *pix_lock
-#endif
- );
-
/* this function fishishes a process and propagates exit messages - called
by process_main when a process dies */
void
erts_do_exit_process(Process* p, Eterm reason)
{
#ifdef ERTS_SMP
- erts_pix_lock_t *pix_lock = ERTS_PID2PIXLOCK(p->id);
+ erts_aint32_t state;
#endif
-
p->arity = 0; /* No live registers */
p->fvalue = reason;
@@ -8721,27 +8600,17 @@ erts_do_exit_process(Process* p, Eterm reason)
#ifdef ERTS_SMP
ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p);
/* By locking all locks (main lock is already locked) when going
- to status P_EXITING, it is enough to take any lock when
+ to exiting state (ERTS_PSFLG_EXITING), it is enough to take any lock when
looking up a process (erts_pid2proc()) to prevent the looked up
process from exiting until the lock has been released. */
erts_smp_proc_lock(p, ERTS_PROC_LOCKS_ALL_MINOR);
#endif
-
- if (erts_system_profile_flags.runnable_procs && (p->status != P_WAITING)) {
- profile_runnable_proc(p, am_inactive);
- }
-
-#ifdef ERTS_SMP
- erts_pix_lock(pix_lock);
- erts_atomic32_read_bor_relb(&p->aflags, ERTS_PROC_AFLG_EXITING);
-#endif
-
- p->status = P_EXITING;
-#ifdef ERTS_SMP
- erts_pix_unlock(pix_lock);
-
- if (ERTS_PROC_PENDING_EXIT(p)) {
+#ifndef ERTS_SMP
+ set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state));
+#else
+ state = set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state));
+ if (state & ERTS_PSFLG_PENDING_EXIT) {
/* Process exited before pending exit was received... */
p->pending_exit.reason = THE_NON_VALUE;
if (p->pending_exit.bp) {
@@ -8780,29 +8649,11 @@ erts_do_exit_process(Process* p, Eterm reason)
erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL_MINOR);
-#ifdef ERTS_SMP
- continue_exit_process(p, pix_lock);
-#else
- continue_exit_process(p);
-#endif
+ erts_continue_exit_process(p);
}
void
-erts_continue_exit_process(Process *c_p)
-{
-#ifdef ERTS_SMP
- continue_exit_process(c_p, ERTS_PID2PIXLOCK(c_p->id));
-#else
- continue_exit_process(c_p);
-#endif
-}
-
-static void
-continue_exit_process(Process *p
-#ifdef ERTS_SMP
- , erts_pix_lock_t *pix_lock
-#endif
- )
+erts_continue_exit_process(Process *p)
{
ErtsLink* lnk;
ErtsMonitor *mon;
@@ -8818,11 +8669,7 @@ continue_exit_process(Process *p
ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(p));
-#ifdef DEBUG
- erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS);
- ASSERT(p->status == P_EXITING);
- erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
-#endif
+ ASSERT(ERTS_PROC_IS_EXITING(p));
#ifdef ERTS_SMP
if (p->flags & F_HAVE_BLCKD_MSCHED) {
@@ -8914,7 +8761,6 @@ continue_exit_process(Process *p
p->scheduler_data->current_process = NULL;
p->scheduler_data->free_process = p;
- p->status_flags = 0;
#endif
/* Time of death! */
erts_smp_atomic_set_relb(&erts_proc.tab[pix], ERTS_AINT_NULL);
@@ -8946,7 +8792,21 @@ continue_exit_process(Process *p
lnk = p->nlinks;
p->nlinks = NULL;
- p->status = P_FREE;
+
+ {
+ /* Inactivate and notify free */
+ erts_aint32_t n, e, a = erts_smp_atomic32_read_nob(&p->state);
+ while (1) {
+ n = e = a;
+ ASSERT(a & ERTS_PSFLG_EXITING);
+ n |= ERTS_PSFLG_FREE;
+ n &= ~ERTS_PSFLG_ACTIVE;
+ a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e);
+ if (a == e)
+ break;
+ }
+ }
+
dep = ((p->flags & F_DISTRIBUTION)
? ERTS_PROC_SET_DIST_ENTRY(p, ERTS_PROC_LOCKS_ALL, NULL)
: NULL);
@@ -9032,8 +8892,6 @@ continue_exit_process(Process *p
ERTS_SMP_LC_ASSERT(curr_locks == erts_proc_lc_my_proc_locks(p));
ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN & curr_locks);
- ASSERT(p->status == P_EXITING);
-
p->i = (BeamInstr *) beam_continue_exit;
if (!(curr_locks & ERTS_PROC_LOCK_STATUS)) {
@@ -9041,8 +8899,6 @@ continue_exit_process(Process *p
curr_locks |= ERTS_PROC_LOCK_STATUS;
}
- erts_add_to_runq(p);
-
if (curr_locks != ERTS_PROC_LOCK_MAIN)
erts_smp_proc_unlock(p, ~ERTS_PROC_LOCK_MAIN & curr_locks);
@@ -9054,33 +8910,15 @@ continue_exit_process(Process *p
static void
timeout_proc(Process* p)
{
+ erts_aint32_t state;
BeamInstr** pi = (BeamInstr **) p->def_arg_reg;
p->i = *pi;
p->flags |= F_TIMO;
p->flags &= ~F_INSLPQUEUE;
- switch (p->status) {
- case P_GARBING:
- switch (p->gcstatus) {
- case P_SUSPENDED:
- goto suspended;
- case P_WAITING:
- goto waiting;
- default:
- break;
- }
- break;
- case P_WAITING:
- waiting:
- erts_add_to_runq(p);
- break;
- case P_SUSPENDED:
- suspended:
- p->rstatus = P_RUNABLE; /* MUST set resume status to runnable */
- break;
- default:
- break;
- }
+ state = erts_smp_atomic32_read_acqb(&p->state);
+ if (!(state & ERTS_PSFLG_ACTIVE))
+ schedule_process(p, state, 0);
}
@@ -9148,6 +8986,7 @@ erts_stack_dump(int to, void *to_arg, Process *p)
void
erts_program_counter_info(int to, void *to_arg, Process *p)
{
+ erts_aint32_t state;
int i;
erts_print(to, to_arg, "Program counter: %p (", p->i);
@@ -9156,7 +8995,8 @@ erts_program_counter_info(int to, void *to_arg, Process *p)
erts_print(to, to_arg, "CP: %p (", p->cp);
print_function_from_pc(to, to_arg, p->cp);
erts_print(to, to_arg, ")\n");
- if (!((p->status == P_RUNNING) || (p->status == P_GARBING))) {
+ state = erts_smp_atomic32_read_acqb(&p->state);
+ if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_GC))) {
erts_print(to, to_arg, "arity = %d\n",p->arity);
if (!ERTS_IS_CRASH_DUMPING) {
/*
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index d40f50c0af..249f29e7e6 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -112,28 +112,29 @@ extern int erts_sched_thread_suggested_stack_size;
#define PRIORITY_NORMAL 2
#define PRIORITY_LOW 3
#define ERTS_NO_PROC_PRIO_LEVELS 4
+#define ERTS_NO_PROC_PRIO_QUEUES 3
#define ERTS_PORT_PRIO_LEVEL ERTS_NO_PROC_PRIO_LEVELS
+#define ERTS_NO_PRIO_LEVELS (ERTS_NO_PROC_PRIO_LEVELS + 1)
#define ERTS_RUNQ_FLGS_PROCS_QMASK \
((((Uint32) 1) << ERTS_NO_PROC_PRIO_LEVELS) - 1)
-#define ERTS_NO_PRIO_LEVELS (ERTS_NO_PROC_PRIO_LEVELS + 1)
-#define ERTS_RUNQ_FLGS_MIGRATE_QMASK \
+#define ERTS_RUNQ_FLGS_QMASK \
((((Uint32) 1) << ERTS_NO_PRIO_LEVELS) - 1)
#define ERTS_RUNQ_FLGS_EMIGRATE_SHFT \
- ERTS_NO_PROC_PRIO_LEVELS
+ ERTS_NO_PRIO_LEVELS
#define ERTS_RUNQ_FLGS_IMMIGRATE_SHFT \
(ERTS_RUNQ_FLGS_EMIGRATE_SHFT + ERTS_NO_PRIO_LEVELS)
#define ERTS_RUNQ_FLGS_EVACUATE_SHFT \
(ERTS_RUNQ_FLGS_IMMIGRATE_SHFT + ERTS_NO_PRIO_LEVELS)
#define ERTS_RUNQ_FLGS_EMIGRATE_QMASK \
- (ERTS_RUNQ_FLGS_MIGRATE_QMASK << ERTS_RUNQ_FLGS_EMIGRATE_SHFT)
+ (ERTS_RUNQ_FLGS_QMASK << ERTS_RUNQ_FLGS_EMIGRATE_SHFT)
#define ERTS_RUNQ_FLGS_IMMIGRATE_QMASK \
- (ERTS_RUNQ_FLGS_MIGRATE_QMASK << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT)
+ (ERTS_RUNQ_FLGS_QMASK << ERTS_RUNQ_FLGS_IMMIGRATE_SHFT)
#define ERTS_RUNQ_FLGS_EVACUATE_QMASK \
- (ERTS_RUNQ_FLGS_MIGRATE_QMASK << ERTS_RUNQ_FLGS_EVACUATE_SHFT)
+ (ERTS_RUNQ_FLGS_QMASK << ERTS_RUNQ_FLGS_EVACUATE_SHFT)
#define ERTS_RUNQ_FLG_BASE2 \
(ERTS_RUNQ_FLGS_EVACUATE_SHFT + ERTS_NO_PRIO_LEVELS)
@@ -148,14 +149,18 @@ extern int erts_sched_thread_suggested_stack_size;
(((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 3))
#define ERTS_RUNQ_FLG_INACTIVE \
(((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 4))
+#define ERTS_RUNQ_FLG_NONEMPTY \
+ (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 5))
+#define ERTS_RUNQ_FLG_PROTECTED \
+ (((Uint32) 1) << (ERTS_RUNQ_FLG_BASE2 + 6))
#define ERTS_RUNQ_FLGS_MIGRATION_QMASKS \
(ERTS_RUNQ_FLGS_EMIGRATE_QMASK \
| ERTS_RUNQ_FLGS_IMMIGRATE_QMASK \
| ERTS_RUNQ_FLGS_EVACUATE_QMASK)
+
#define ERTS_RUNQ_FLGS_MIGRATION_INFO \
- (ERTS_RUNQ_FLGS_MIGRATION_QMASKS \
- | ERTS_RUNQ_FLG_INACTIVE \
+ (ERTS_RUNQ_FLG_INACTIVE \
| ERTS_RUNQ_FLG_OUT_OF_WORK \
| ERTS_RUNQ_FLG_HALFTIME_OUT_OF_WORK)
@@ -186,33 +191,45 @@ extern int erts_sched_thread_suggested_stack_size;
#define ERTS_UNSET_RUNQ_FLG_EVACUATE(FLGS, PRIO) \
((FLGS) &= ~ERTS_RUNQ_FLG_EVACUATE((PRIO)))
-#define ERTS_RUNQ_IFLG_SUSPENDED (((erts_aint32_t) 1) << 0)
-#define ERTS_RUNQ_IFLG_NONEMPTY (((erts_aint32_t) 1) << 1)
-
-
-#ifdef DEBUG
-# if defined(ARCH_64) && !HALFWORD_HEAP
-# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) \
- (*((char **) &(RQP)) = (char *) (0xdeadbeefdead0003 | ((N) << 4)))
-# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) \
-do { \
- ASSERT((RQP) != NULL); \
- ASSERT(((((Uint) (RQP)) & ((Uint) 0x3))) == ((Uint) 0)); \
- ASSERT((((Uint) (RQP)) & ~((Uint) 0xffff)) != ((Uint) 0xdeadbeefdead0000));\
-} while (0)
-# else
-# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) \
- (*((char **) &(RQP)) = (char *) (0xdead0003 | ((N) << 4)))
-# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) \
-do { \
- ASSERT((RQP) != NULL); \
- ASSERT(((((UWord) (RQP)) & ((UWord) 1))) == ((UWord) 0)); \
- ASSERT((((UWord) (RQP)) & ~((UWord) 0xffff)) != ((UWord) 0xdead0000)); \
-} while (0)
-# endif
-#else
-# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N)
-# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP)
+#define ERTS_RUNQ_FLGS_INIT(RQ, INIT) \
+ erts_smp_atomic32_init_nob(&(RQ)->flags, (erts_aint32_t) (INIT))
+#define ERTS_RUNQ_FLGS_SET(RQ, FLGS) \
+ ((Uint32) erts_smp_atomic32_read_bor_relb(&(RQ)->flags, \
+ (erts_aint32_t) (FLGS)))
+#define ERTS_RUNQ_FLGS_UNSET(RQ, FLGS) \
+ ((Uint32) erts_smp_atomic32_read_band_relb(&(RQ)->flags, \
+ (erts_aint32_t) ~(FLGS)))
+#define ERTS_RUNQ_FLGS_GET(RQ) \
+ ((Uint32) erts_smp_atomic32_read_acqb(&(RQ)->flags))
+#define ERTS_RUNQ_FLGS_GET_NOB(RQ) \
+ ((Uint32) erts_smp_atomic32_read_nob(&(RQ)->flags))
+#define ERTS_RUNQ_FLGS_GET_MB(RQ) \
+ ((Uint32) erts_smp_atomic32_read_mb(&(RQ)->flags))
+#define ERTS_RUNQ_FLGS_MASK_SET(RQ, MSK, FLGS) \
+ ((Uint32) erts_smp_atomic32_mask_set_relb(&(RQ)->flags, \
+ (erts_aint32_t) (MSK), \
+ (erts_aint32_t) (FLGS)))
+
+ERTS_GLB_INLINE erts_aint32_t
+erts_smp_atomic32_mask_set_relb(erts_smp_atomic32_t *a32p,
+ erts_aint32_t mask,
+ erts_aint32_t set);
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+ERTS_GLB_INLINE erts_aint32_t
+erts_smp_atomic32_mask_set_relb(erts_smp_atomic32_t *a32p,
+ erts_aint32_t mask,
+ erts_aint32_t set)
+{
+ erts_aint32_t act = erts_smp_atomic32_read_nob(a32p);
+ while (1) {
+ erts_aint32_t exp = act;
+ erts_aint32_t new = exp & ~mask;
+ new |= (mask & set);
+ act = erts_smp_atomic32_cmpxchg_relb(a32p, new, exp);
+ if (act == exp)
+ return act;
+ }
+}
#endif
typedef enum {
@@ -311,21 +328,39 @@ typedef struct ErtsSchedulerData_ ErtsSchedulerData;
typedef struct ErtsRunQueue_ ErtsRunQueue;
typedef struct {
- int len;
- int max_len;
+ erts_smp_atomic32_t len;
+ erts_aint32_t max_len;
int reds;
+} ErtsRunQueueInfo;
+
+#ifdef ERTS_SMP
+
+typedef struct {
+ Uint32 flags;
+ ErtsRunQueue *misc_evac_runq;
struct {
struct {
int this;
int other;
} limit;
ErtsRunQueue *runq;
- } migrate;
-} ErtsRunQueueInfo;
+ Uint32 flags;
+ } prio[ERTS_NO_PRIO_LEVELS];
+} ErtsMigrationPath;
+
+typedef struct ErtsMigrationPaths_ ErtsMigrationPaths;
+
+struct ErtsMigrationPaths_ {
+ void *block;
+ ErtsMigrationPaths *next;
+ ErtsThrPrgrVal thr_prgr;
+ ErtsMigrationPath mpath[1];
+};
+
+#endif /* ERTS_SMP */
struct ErtsRunQueue_ {
int ix;
- erts_smp_atomic32_t info_flags;
erts_smp_mtx_t mtx;
erts_smp_cnd_t cnd;
@@ -333,19 +368,18 @@ struct ErtsRunQueue_ {
ErtsSchedulerData *scheduler;
int waiting; /* < 0 in sys schedule; > 0 on cnd variable */
int woken;
- Uint32 flags;
+ erts_smp_atomic32_t flags;
int check_balance_reds;
int full_reds_history_sum;
int full_reds_history[ERTS_FULL_REDS_HISTORY_SIZE];
int out_of_work_count;
- int max_len;
- int len;
+ erts_aint32_t max_len;
+ erts_aint32_t len;
int wakeup_other;
int wakeup_other_reds;
int halt_in_progress;
struct {
- int len;
ErtsProcList *pending_exiters;
Uint context_switches;
Uint reductions;
@@ -360,7 +394,7 @@ struct ErtsRunQueue_ {
struct {
ErtsMiscOpList *start;
ErtsMiscOpList *end;
- ErtsRunQueue *evac_runq;
+ erts_smp_atomic_t evac_runq;
} misc;
struct {
@@ -380,7 +414,7 @@ extern ErtsAlignedRunQueue *erts_aligned_run_queues;
#define ERTS_PROC_REDUCTIONS_EXECUTED(RQ, PRIO, REDS, AREDS) \
do { \
(RQ)->procs.reductions += (AREDS); \
- (RQ)->procs.prio_info[p->prio].reds += (REDS); \
+ (RQ)->procs.prio_info[(PRIO)].reds += (REDS); \
(RQ)->check_balance_reds -= (REDS); \
(RQ)->wakeup_other_reds += (AREDS); \
} while (0)
@@ -488,6 +522,90 @@ extern ErtsAlignedSchedulerData *erts_aligned_scheduler_data;
extern ErtsSchedulerData *erts_scheduler_data;
#endif
+#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
+int erts_smp_lc_runq_is_locked(ErtsRunQueue *);
+#endif
+
+#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
+
+/*
+ * Run queue locked during modifications. We use atomic ops since
+ * other threads peek at values without run queue lock.
+ */
+
+ERTS_GLB_INLINE void erts_smp_inc_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio);
+ERTS_GLB_INLINE void erts_smp_dec_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio);
+ERTS_GLB_INLINE void erts_smp_reset_max_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi);
+
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_INLINE void
+erts_smp_inc_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio)
+{
+ erts_aint32_t len;
+
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+
+ len = erts_smp_atomic32_read_nob(&rqi->len);
+ ASSERT(len >= 0);
+ if (len == 0) {
+ ASSERT((erts_smp_atomic32_read_nob(&rq->flags)
+ & ((erts_aint32_t) (1 << prio))) == 0);
+ erts_smp_atomic32_read_bor_nob(&rq->flags,
+ (erts_aint32_t) (1 << prio));
+ }
+ len++;
+ if (rqi->max_len < len)
+ rqi->max_len = len;
+
+ erts_smp_atomic32_set_relb(&rqi->len, len);
+
+ rq->len++;
+ if (rq->max_len < rq->len)
+ rq->max_len = len;
+ ASSERT(rq->len > 0);
+}
+
+ERTS_GLB_INLINE void
+erts_smp_dec_runq_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi, int prio)
+{
+ erts_aint32_t len;
+
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+
+ len = erts_smp_atomic32_read_nob(&rqi->len);
+ len--;
+ ASSERT(len >= 0);
+ if (len == 0) {
+ ASSERT((erts_smp_atomic32_read_nob(&rq->flags)
+ & ((erts_aint32_t) (1 << prio))));
+ erts_smp_atomic32_read_band_nob(&rq->flags,
+ ~((erts_aint32_t) (1 << prio)));
+ }
+ erts_smp_atomic32_set_relb(&rqi->len, len);
+
+ rq->len--;
+ ASSERT(rq->len >= 0);
+}
+
+ERTS_GLB_INLINE void
+erts_smp_reset_max_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi)
+{
+ erts_aint32_t len;
+
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+
+ len = erts_smp_atomic32_read_nob(&rqi->len);
+ ASSERT(rqi->max_len >= len);
+ rqi->max_len = len;
+}
+
+#endif /* ERTS_GLB_INLINE_INCL_FUNC_DEF */
+
+#define RUNQ_READ_LEN(X) erts_smp_atomic32_read_nob((X))
+
+#endif /* ERTS_INCLUDE_SCHEDULER_INTERNALS */
+
/*
* Process Specific Data.
*
@@ -644,12 +762,9 @@ struct process {
* Number of reductions left to execute.
* Only valid for the current process.
*/
- Uint32 status; /* process STATE */
- Uint32 gcstatus; /* process gc STATE */
- Uint32 rstatus; /* process resume STATE */
Uint32 rcount; /* suspend count */
int prio; /* Priority of process */
- int skipped; /* Times a low prio process has been rescheduled */
+ int schedule_count; /* Times left to reschedule a low prio process */
Uint reds; /* No of reductions for this process */
Eterm tracer_proc; /* If proc is traced, this is the tracer
(can NOT be boxed) */
@@ -662,7 +777,6 @@ struct process {
Eterm ftrace; /* Latest exception stack trace dump */
Process *next; /* Pointer to next process in run queue */
- Process *prev; /* Pointer to prev process in run queue */
struct reg_proc *reg; /* NULL iff not registered */
ErtsLink *nlinks;
@@ -732,19 +846,16 @@ struct process {
void *exit_data; /* Misc data referred during termination */
} u;
- ErtsRunQueue *bound_runq;
+ erts_smp_atomic32_t state; /* Process state flags (see ERTS_PSFLG_*) */
#ifdef ERTS_SMP
erts_proc_lock_t lock;
- erts_atomic32_t aflags;
ErtsSchedulerData *scheduler_data;
- Uint32 runq_flags;
- Uint32 status_flags;
ErlMessageInQueue msg_inq;
Eterm suspendee;
ErtsPendingSuspend *pending_suspenders;
ErtsPendExit pending_exit;
- ErtsRunQueue *run_queue;
+ erts_smp_atomic_t run_queue;
#ifdef HIPE
struct hipe_process_state_smp hipe_smp;
#endif
@@ -816,8 +927,27 @@ void erts_check_for_holes(Process* p);
#define SEQ_TRACE_TOKEN(p) ((p)->seq_trace_token)
-#define ERTS_PROC_AFLG_EXITING (((erts_aint_t) 1) << 0)
-#define ERTS_PROC_AFLG_PENDING_EXIT (((erts_aint_t) 1) << 1)
+#if ERTS_NO_PROC_PRIO_LEVELS > 4
+# error "Need to increase ERTS_PSFLG_PRIO_SHIFT"
+#endif
+
+#define ERTS_PSFLG_PRIO_SHIFT 2
+
+#define ERTS_PSFLG_BIT(N) \
+ (((erts_aint32_t) 1) << (ERTS_PSFLG_PRIO_SHIFT + (N)))
+
+#define ERTS_PSFLG_PRIO_MASK (ERTS_PSFLG_BIT(0) - 1)
+
+#define ERTS_PSFLG_FREE ERTS_PSFLG_BIT(0)
+#define ERTS_PSFLG_EXITING ERTS_PSFLG_BIT(1)
+#define ERTS_PSFLG_PENDING_EXIT ERTS_PSFLG_BIT(2)
+#define ERTS_PSFLG_ACTIVE ERTS_PSFLG_BIT(3)
+#define ERTS_PSFLG_IN_RUNQ ERTS_PSFLG_BIT(4)
+#define ERTS_PSFLG_RUNNING ERTS_PSFLG_BIT(5)
+#define ERTS_PSFLG_SUSPENDED ERTS_PSFLG_BIT(6)
+#define ERTS_PSFLG_GC ERTS_PSFLG_BIT(7)
+#define ERTS_PSFLG_BOUND ERTS_PSFLG_BIT(8)
+#define ERTS_PSFLG_TRAP_EXIT ERTS_PSFLG_BIT(9)
/* The sequential tracing token is a tuple of size 5:
@@ -925,17 +1055,13 @@ struct erts_system_profile_flags_t {
unsigned int exclusive : 1;
};
extern struct erts_system_profile_flags_t erts_system_profile_flags;
-
-#define INVALID_PID(p, pid) ((p) == NULL \
- || (p)->id != (pid) \
- || (p)->status == P_EXITING)
#define IS_TRACED(p) ( (p)->tracer_proc != NIL )
#define ARE_TRACE_FLAGS_ON(p,tf) ( ((p)->trace_flags & (tf|F_SENSITIVE)) == (tf) )
#define IS_TRACED_FL(p,tf) ( IS_TRACED(p) && ARE_TRACE_FLAGS_ON(p,tf) )
/* process flags */
-#define F_TRAPEXIT (1 << 0)
+#define F_HIBERNATE_SCHED (1 << 0) /* Schedule out after hibernate op */
#define F_INSLPQUEUE (1 << 1) /* Set if in timer queue */
#define F_TIMO (1 << 2) /* Set if timeout */
#define F_HEAP_GROW (1 << 3)
@@ -946,7 +1072,6 @@ extern struct erts_system_profile_flags_t erts_system_profile_flags;
#define F_HAVE_BLCKD_MSCHED (1 << 8) /* Process has blocked multi-scheduling */
#define F_P2PNR_RESCHED (1 << 9) /* Process has been rescheduled via erts_pid2proc_not_running() */
#define F_FORCE_GC (1 << 10) /* Force gc at process in-scheduling */
-#define F_HIBERNATE_SCHED (1 << 11) /* Schedule out after hibernate op */
/* process trace_flags */
#define F_SENSITIVE (1 << 0)
@@ -1013,67 +1138,10 @@ extern struct erts_system_profile_flags_t erts_system_profile_flags;
#define DT_UTAG_FLAGS(P) ((P)->dt_utag_flags)
#endif
-
-#ifdef ERTS_SMP
-/* Status flags ... */
-#define ERTS_PROC_SFLG_PENDADD2SCHEDQ (((Uint32) 1) << 0) /* Pending
- add to
- schedule q */
-#define ERTS_PROC_SFLG_INRUNQ (((Uint32) 1) << 1) /* Process is
- in run q */
-#define ERTS_PROC_SFLG_TRAPEXIT (((Uint32) 1) << 2) /* Process is
- trapping
- exit */
-#define ERTS_PROC_SFLG_RUNNING (((Uint32) 1) << 3) /* Process is
- running */
-/* Scheduler flags in process struct... */
-#define ERTS_PROC_RUNQ_FLG_RUNNING (((Uint32) 1) << 0) /* Process is
- running */
-
-#endif
-
-
-#ifdef ERTS_SMP
-#define ERTS_PROC_IS_TRAPPING_EXITS(P) \
- (ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks((P)) \
- & ERTS_PROC_LOCK_STATUS), \
- (P)->status_flags & ERTS_PROC_SFLG_TRAPEXIT)
-
-#define ERTS_PROC_SET_TRAP_EXIT(P) \
- (ERTS_SMP_LC_ASSERT(((ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS) \
- & erts_proc_lc_my_proc_locks((P))) \
- == (ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS)), \
- (P)->status_flags |= ERTS_PROC_SFLG_TRAPEXIT, \
- (P)->flags |= F_TRAPEXIT, \
- 1)
-
-#define ERTS_PROC_UNSET_TRAP_EXIT(P) \
- (ERTS_SMP_LC_ASSERT(((ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS) \
- & erts_proc_lc_my_proc_locks((P))) \
- == (ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS)), \
- (P)->status_flags &= ~ERTS_PROC_SFLG_TRAPEXIT, \
- (P)->flags &= ~F_TRAPEXIT, \
- 0)
-#else
-#define ERTS_PROC_IS_TRAPPING_EXITS(P) ((P)->flags & F_TRAPEXIT)
-#define ERTS_PROC_SET_TRAP_EXIT(P) ((P)->flags |= F_TRAPEXIT, 1)
-#define ERTS_PROC_UNSET_TRAP_EXIT(P) ((P)->flags &= ~F_TRAPEXIT, 0)
-#endif
-
/* Option flags to erts_send_exit_signal() */
#define ERTS_XSIG_FLG_IGN_KILL (((Uint32) 1) << 0)
#define ERTS_XSIG_FLG_NO_IGN_NORMAL (((Uint32) 1) << 1)
-
-/* Process status values */
-#define P_FREE 0
-#define P_RUNABLE 1
-#define P_WAITING 2
-#define P_RUNNING 3
-#define P_EXITING 4
-#define P_GARBING 5
-#define P_SUSPENDED 6
-
#define CANCEL_TIMER(p) \
do { \
if ((p)->flags & (F_INSLPQUEUE)) \
@@ -1151,14 +1219,6 @@ Eterm erts_get_schedulers_binds(Process *c_p);
Eterm erts_set_cpu_topology(Process *c_p, Eterm term);
Eterm erts_bind_schedulers(Process *c_p, Eterm how);
ErtsRunQueue *erts_schedid2runq(Uint);
-#ifdef ERTS_SMP
-ErtsMigrateResult erts_proc_migrate(Process *,
- ErtsProcLocks *,
- ErtsRunQueue *,
- int *,
- ErtsRunQueue *,
- int *);
-#endif
Process *schedule(Process*, int);
void erts_schedule_misc_op(void (*)(void *), void *);
Eterm erl_create_process(Process*, Eterm, Eterm, Eterm, ErlSpawnOpts*);
@@ -1207,8 +1267,7 @@ int erts_send_exit_signal(Process *,
#ifdef ERTS_SMP
void erts_handle_pending_exit(Process *, ErtsProcLocks);
#define ERTS_PROC_PENDING_EXIT(P) \
- (ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks((P)) & ERTS_PROC_LOCK_STATUS),\
- (P)->pending_exit.reason != THE_NON_VALUE)
+ (ERTS_PSFLG_PENDING_EXIT & erts_smp_atomic32_read_acqb(&(P)->state))
#else
#define ERTS_PROC_PENDING_EXIT(P) 0
#endif
@@ -1260,13 +1319,26 @@ ErtsSchedulerData *erts_get_scheduler_data(void)
#endif
#endif
+void erts_schedule_process(Process *, erts_aint32_t);
+
+ERTS_GLB_INLINE void erts_proc_notify_new_message(Process *p);
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+ERTS_GLB_INLINE void
+erts_proc_notify_new_message(Process *p)
+{
+ /* No barrier needed, due to msg lock */
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state);
+ if (!(state & ERTS_PSFLG_ACTIVE))
+ erts_schedule_process(p, state);
+}
+#endif
+
#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
#define ERTS_PROCESS_LOCK_ONLY_LOCK_CHECK_PROTO__
#include "erl_process_lock.h"
#undef ERTS_PROCESS_LOCK_ONLY_LOCK_CHECK_PROTO__
-int erts_smp_lc_runq_is_locked(ErtsRunQueue *);
#define ERTS_SMP_LC_CHK_RUNQ_LOCK(RQ, L) \
do { \
if ((L)) \
@@ -1392,13 +1464,91 @@ erts_proc_set_error_handler(Process *p, ErtsProcLocks plocks, Eterm handler)
#endif
+#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
+
#ifdef ERTS_SMP
-ErtsRunQueue *erts_prepare_emigrate(ErtsRunQueue *c_rq,
- ErtsRunQueueInfo *c_rqi,
- int prio);
+#include "erl_thr_progress.h"
+
+extern erts_atomic_t erts_migration_paths;
+
+ERTS_GLB_INLINE ErtsMigrationPaths *erts_get_migration_paths_managed(void);
+ERTS_GLB_INLINE ErtsMigrationPaths *erts_get_migration_paths(void);
ERTS_GLB_INLINE ErtsRunQueue *erts_check_emigration_need(ErtsRunQueue *c_rq,
int prio);
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_INLINE ErtsMigrationPaths *
+erts_get_migration_paths_managed(void)
+{
+ return (ErtsMigrationPaths *) erts_atomic_read_ddrb(&erts_migration_paths);
+}
+
+ERTS_GLB_INLINE ErtsMigrationPaths *
+erts_get_migration_paths(void)
+{
+ if (erts_thr_progress_is_managed_thread())
+ return erts_get_migration_paths_managed();
+ else
+ return NULL;
+}
+
+ERTS_GLB_INLINE ErtsRunQueue *
+erts_check_emigration_need(ErtsRunQueue *c_rq, int prio)
+{
+ ErtsMigrationPaths *mps = erts_get_migration_paths();
+ ErtsMigrationPath *mp;
+ Uint32 flags;
+
+ if (!mps)
+ return NULL;
+
+ mp = &mps->mpath[c_rq->ix];
+ flags = mp->flags;
+
+ if (ERTS_CHK_RUNQ_FLG_EMIGRATE(flags, prio)) {
+ int len;
+
+ if (ERTS_CHK_RUNQ_FLG_EVACUATE(flags, prio)) {
+ /* force emigration */
+ return mp->prio[prio].runq;
+ }
+
+ if (flags & ERTS_RUNQ_FLG_INACTIVE) {
+ /*
+ * Run queue was inactive at last balance. Verify that
+ * it still is before forcing emigration.
+ */
+ if (ERTS_RUNQ_FLGS_GET(c_rq) & ERTS_RUNQ_FLG_INACTIVE)
+ return mp->prio[prio].runq;
+ }
+
+
+ if (prio == ERTS_PORT_PRIO_LEVEL)
+ len = RUNQ_READ_LEN(&c_rq->ports.info.len);
+ else
+ len = RUNQ_READ_LEN(&c_rq->procs.prio_info[prio].len);
+
+ if (len > mp->prio[prio].limit.this) {
+ ErtsRunQueue *n_rq = mp->prio[prio].runq;
+ if (n_rq) {
+ if (prio == ERTS_PORT_PRIO_LEVEL)
+ len = RUNQ_READ_LEN(&n_rq->ports.info.len);
+ else
+ len = RUNQ_READ_LEN(&n_rq->procs.prio_info[prio].len);
+
+ if (len < mp->prio[prio].limit.other)
+ return n_rq;
+ }
+ }
+ }
+ return NULL;
+}
+
+#endif
+
+#endif
+
#endif
ERTS_GLB_INLINE int erts_is_scheduler_bound(ErtsSchedulerData *esdp);
@@ -1417,29 +1567,6 @@ ERTS_GLB_INLINE void erts_smp_runqs_unlock(ErtsRunQueue *rq1, ErtsRunQueue *rq2)
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
-#ifdef ERTS_SMP
-ERTS_GLB_INLINE ErtsRunQueue *
-erts_check_emigration_need(ErtsRunQueue *c_rq, int prio)
-{
- ErtsRunQueueInfo *c_rqi;
-
- if (!ERTS_CHK_RUNQ_FLG_EMIGRATE(c_rq->flags, prio))
- return NULL;
-
- if (prio == ERTS_PORT_PRIO_LEVEL)
- c_rqi = &c_rq->ports.info;
- else
- c_rqi = &c_rq->procs.prio_info[prio];
-
- if (!ERTS_CHK_RUNQ_FLG_EVACUATE(c_rq->flags, prio)
- && !(c_rq->flags & ERTS_RUNQ_FLG_INACTIVE)
- && c_rqi->len <= c_rqi->migrate.limit.this)
- return NULL;
-
- return erts_prepare_emigrate(c_rq, c_rqi, prio);
-}
-#endif
-
ERTS_GLB_INLINE
int erts_is_scheduler_bound(ErtsSchedulerData *esdp)
{
@@ -1477,10 +1604,9 @@ Uint erts_get_scheduler_id(void)
ERTS_GLB_INLINE ErtsRunQueue *
erts_get_runq_proc(Process *p)
{
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
#ifdef ERTS_SMP
- ASSERT(p->run_queue);
- return p->run_queue;
+ ASSERT(ERTS_AINT_NULL != erts_atomic_read_nob(&p->run_queue));
+ return (ErtsRunQueue *) erts_atomic_read_nob(&p->run_queue);
#else
return ERTS_RUNQ_IX(0);
#endif
@@ -1614,10 +1740,6 @@ Process *erts_pid2proc_nropt(Process *c_p,
ErtsProcLocks pid_locks);
extern int erts_disable_proc_not_running_opt;
-
-#define ERTS_PROC_IS_EXITING(P) \
- (ERTS_PROC_AFLG_EXITING & erts_atomic32_read_acqb(&(P)->aflags))
-
#ifdef DEBUG
#define ERTS_SMP_ASSERT_IS_NOT_EXITING(P) \
do { ASSERT(!ERTS_PROC_IS_EXITING((P))); } while (0)
@@ -1627,8 +1749,6 @@ extern int erts_disable_proc_not_running_opt;
#else /* !ERTS_SMP */
-#define ERTS_PROC_IS_EXITING(P) ((P)->status == P_EXITING)
-
#define ERTS_SMP_ASSERT_IS_NOT_EXITING(P)
#define erts_pid2proc_not_running erts_pid2proc
@@ -1636,6 +1756,10 @@ extern int erts_disable_proc_not_running_opt;
#endif
+#define ERTS_PROC_IS_EXITING(P) \
+ (ERTS_PSFLG_EXITING & erts_smp_atomic32_read_acqb(&(P)->state))
+
+
/* Minimum NUMBER of processes for a small system to start */
#ifdef ERTS_SMP
#define ERTS_MIN_PROCESSES ERTS_NO_OF_PIX_LOCKS
diff --git a/erts/emulator/beam/erl_process_dump.c b/erts/emulator/beam/erl_process_dump.c
index 37cc717156..964dc1ae3e 100644
--- a/erts/emulator/beam/erl_process_dump.c
+++ b/erts/emulator/beam/erl_process_dump.c
@@ -67,11 +67,9 @@ erts_deep_process_dump(int to, void *to_arg)
for (i = 0; i < erts_max_processes; i++) {
Process *p = erts_pix2proc(i);
if (p && p->i != ENULL) {
- if (p->status != P_EXITING) {
- if (p->status != P_GARBING) {
- dump_process_info(to, to_arg, p);
- }
- }
+ erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state);
+ if (!(state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_GC)))
+ dump_process_info(to, to_arg, p);
}
}
diff --git a/erts/emulator/beam/erl_process_lock.h b/erts/emulator/beam/erl_process_lock.h
index 9981af7a8a..015fda583e 100644
--- a/erts/emulator/beam/erl_process_lock.h
+++ b/erts/emulator/beam/erl_process_lock.h
@@ -129,11 +129,9 @@ typedef struct erts_proc_lock_t_ {
/*
* Status lock:
* Protects the following fields in the process structure:
- * * status
- * * rstatus
- * * status_flags
* * pending_suspenders
* * suspendee
+ * * ...
*/
#define ERTS_PROC_LOCK_STATUS (((ErtsProcLocks) 1) << ERTS_PROC_LOCK_MAX_BIT)
@@ -165,12 +163,11 @@ typedef struct erts_proc_lock_t_ {
* Other rules regarding process locking:
*
* Exiting processes:
- * When changing status to P_EXITING on a process, you are required
- * to take all process locks (ERTS_PROC_LOCKS_ALL). Thus, by holding
- * at least one process lock (whichever one doesn't matter) you
- * are guaranteed that the process won't exit until the lock you are
- * holding has been released. Appart from all process locks also
- * the pix lock corresponding to the process has to be held.
+ * When changing state to exiting (ERTS_PSFLG_EXITING) on a process,
+ * you are required to take all process locks (ERTS_PROC_LOCKS_ALL).
+ * Thus, by holding at least one process lock (whichever one doesn't
+ * matter) you are guaranteed that the process won't exit until the
+ * lock you are holding has been released.
*
* Lock order:
* Process locks with low numeric values has to be locked before
@@ -946,11 +943,11 @@ erts_pid2proc_opt(Process *c_p_unused,
int flags)
{
Process *proc = erts_proc_lookup_raw(pid);
- if (!(flags & ERTS_P2P_FLG_ALLOW_OTHER_X)
- && proc
- && proc->status == P_EXITING)
- return NULL;
- return proc;
+ return ((!(flags & ERTS_P2P_FLG_ALLOW_OTHER_X)
+ && proc
+ && ERTS_PROC_IS_EXITING(proc))
+ ? NULL
+ : proc);
}
#endif /* !ERTS_SMP */
diff --git a/erts/emulator/beam/erl_trace.c b/erts/emulator/beam/erl_trace.c
index a321bc504b..bc988cd61b 100644
--- a/erts/emulator/beam/erl_trace.c
+++ b/erts/emulator/beam/erl_trace.c
@@ -64,7 +64,7 @@ int erts_cpu_timestamp;
#endif
static erts_smp_mtx_t smq_mtx;
-static erts_smp_mtx_t sys_trace_mtx;
+static erts_smp_rwmtx_t sys_trace_rwmtx;
enum ErtsSysMsgType {
SYS_MSG_TYPE_UNDEFINED,
@@ -91,7 +91,12 @@ static void init_sys_msg_dispatcher(void);
#endif
void erts_init_trace(void) {
- erts_smp_mtx_init(&sys_trace_mtx, "sys_tracers");
+ erts_smp_rwmtx_opt_t rwmtx_opts = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER;
+ rwmtx_opts.type = ERTS_SMP_RWMTX_TYPE_EXTREMELY_FREQUENT_READ;
+ rwmtx_opts.lived = ERTS_SMP_RWMTX_LONG_LIVED;
+
+ erts_smp_rwmtx_init_opt(&sys_trace_rwmtx, &rwmtx_opts, "sys_tracers");
+
#ifdef HAVE_ERTS_NOW_CPU
erts_cpu_timestamp = 0;
#endif
@@ -169,10 +174,10 @@ erts_system_profile_setup_active_schedulers(void)
active_sched = erts_active_schedulers();
}
-void
-erts_trace_check_exiting(Eterm exiting)
+static void
+exiting_reset(Eterm exiting)
{
- erts_smp_mtx_lock(&sys_trace_mtx);
+ erts_smp_rwmtx_rwlock(&sys_trace_rwmtx);
if (exiting == default_tracer) {
default_tracer = NIL;
default_trace_flags &= TRACEE_FLAGS;
@@ -202,29 +207,49 @@ erts_trace_check_exiting(Eterm exiting)
erts_system_profile_clear(NULL);
#endif
}
- erts_smp_mtx_unlock(&sys_trace_mtx);
+ erts_smp_rwmtx_rwunlock(&sys_trace_rwmtx);
+}
+
+void
+erts_trace_check_exiting(Eterm exiting)
+{
+ int reset = 0;
+ erts_smp_rwmtx_rlock(&sys_trace_rwmtx);
+ if (exiting == default_tracer)
+ reset = 1;
+ else if (exiting == system_seq_tracer)
+ reset = 1;
+ else if (exiting == system_monitor)
+ reset = 1;
+ else if (exiting == system_profile)
+ reset = 1;
+ erts_smp_rwmtx_runlock(&sys_trace_rwmtx);
+ if (reset)
+ exiting_reset(exiting);
+}
+
+static ERTS_INLINE int
+is_valid_tracer(Eterm tracer)
+{
+ return erts_proc_lookup(tracer) || erts_is_valid_tracer_port(tracer);
}
Eterm
erts_set_system_seq_tracer(Process *c_p, ErtsProcLocks c_p_locks, Eterm new)
{
- Eterm old = THE_NON_VALUE;
+ Eterm old;
- if (new != am_false) {
- if (!erts_proc_lookup(new)
- && !erts_is_valid_tracer_port(new)) {
- return old;
- }
- }
+ if (new != am_false && !is_valid_tracer(new))
+ return THE_NON_VALUE;
- erts_smp_mtx_lock(&sys_trace_mtx);
+ erts_smp_rwmtx_rwlock(&sys_trace_rwmtx);
old = system_seq_tracer;
system_seq_tracer = new;
#ifdef DEBUG_PRINTOUTS
erts_fprintf(stderr, "set seq tracer new=%T old=%T\n", new, old);
#endif
- erts_smp_mtx_unlock(&sys_trace_mtx);
+ erts_smp_rwmtx_rwunlock(&sys_trace_rwmtx);
return old;
}
@@ -232,12 +257,12 @@ Eterm
erts_get_system_seq_tracer(void)
{
Eterm st;
- erts_smp_mtx_lock(&sys_trace_mtx);
+ erts_smp_rwmtx_rlock(&sys_trace_rwmtx);
st = system_seq_tracer;
#ifdef DEBUG_PRINTOUTS
erts_fprintf(stderr, "get seq tracer %T\n", st);
#endif
- erts_smp_mtx_unlock(&sys_trace_mtx);
+ erts_smp_rwmtx_runlock(&sys_trace_rwmtx);
return st;
}
@@ -270,7 +295,7 @@ get_default_tracing(Uint *flagsp, Eterm *tracerp)
void
erts_change_default_tracing(int setflags, Uint *flagsp, Eterm *tracerp)
{
- erts_smp_mtx_lock(&sys_trace_mtx);
+ erts_smp_rwmtx_rwlock(&sys_trace_rwmtx);
if (flagsp) {
if (setflags)
default_trace_flags |= *flagsp;
@@ -280,48 +305,48 @@ erts_change_default_tracing(int setflags, Uint *flagsp, Eterm *tracerp)
if (tracerp)
default_tracer = *tracerp;
get_default_tracing(flagsp, tracerp);
- erts_smp_mtx_unlock(&sys_trace_mtx);
+ erts_smp_rwmtx_rwunlock(&sys_trace_rwmtx);
}
void
erts_get_default_tracing(Uint *flagsp, Eterm *tracerp)
{
- erts_smp_mtx_lock(&sys_trace_mtx);
+ erts_smp_rwmtx_rlock(&sys_trace_rwmtx);
get_default_tracing(flagsp, tracerp);
- erts_smp_mtx_unlock(&sys_trace_mtx);
+ erts_smp_rwmtx_runlock(&sys_trace_rwmtx);
}
void
erts_set_system_monitor(Eterm monitor)
{
- erts_smp_mtx_lock(&sys_trace_mtx);
+ erts_smp_rwmtx_rwlock(&sys_trace_rwmtx);
system_monitor = monitor;
- erts_smp_mtx_unlock(&sys_trace_mtx);
+ erts_smp_rwmtx_rwunlock(&sys_trace_rwmtx);
}
Eterm
erts_get_system_monitor(void)
{
Eterm monitor;
- erts_smp_mtx_lock(&sys_trace_mtx);
+ erts_smp_rwmtx_rlock(&sys_trace_rwmtx);
monitor = system_monitor;
- erts_smp_mtx_unlock(&sys_trace_mtx);
+ erts_smp_rwmtx_runlock(&sys_trace_rwmtx);
return monitor;
}
/* Performance monitoring */
void erts_set_system_profile(Eterm profile) {
- erts_smp_mtx_lock(&sys_trace_mtx);
+ erts_smp_rwmtx_rwlock(&sys_trace_rwmtx);
system_profile = profile;
- erts_smp_mtx_unlock(&sys_trace_mtx);
+ erts_smp_rwmtx_rwunlock(&sys_trace_rwmtx);
}
Eterm
erts_get_system_profile(void) {
Eterm profile;
- erts_smp_mtx_lock(&sys_trace_mtx);
+ erts_smp_rwmtx_rlock(&sys_trace_rwmtx);
profile = system_profile;
- erts_smp_mtx_unlock(&sys_trace_mtx);
+ erts_smp_rwmtx_runlock(&sys_trace_rwmtx);
return profile;
}
@@ -384,13 +409,9 @@ WRITE_SYS_MSG_TO_PORT(Eterm unused_to,
}
#ifndef ERTS_SMP
- if (!INVALID_TRACER_PORT(trace_port, trace_port->id)) {
+ if (!INVALID_TRACER_PORT(trace_port, trace_port->id))
#endif
erts_raw_port_command(trace_port, buffer, ptr-buffer);
-#ifndef ERTS_SMP
- erts_port_release(trace_port);
- }
-#endif
erts_free(ERTS_ALC_T_TMP, (void *) buffer);
}
@@ -465,13 +486,13 @@ send_to_port(Process *c_p, Eterm message,
trace_port = NULL;
#else
- if (is_not_internal_port(*tracer_pid))
- goto invalid_tracer_port;
- trace_port = &erts_port[internal_port_index(*tracer_pid)];
+ trace_port = erts_id2port_sflgs(*tracer_pid,
+ NULL,
+ 0,
+ ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP);
- if (INVALID_TRACER_PORT(trace_port, *tracer_pid)) {
- invalid_tracer_port:
+ if (!trace_port) {
*tracee_flags &= ~TRACEE_FLAGS;
*tracer_pid = NIL;
return;
@@ -491,6 +512,7 @@ send_to_port(Process *c_p, Eterm message,
SYS_MSG_TYPE_TRACE,
message);
#ifndef ERTS_SMP
+ erts_port_release(trace_port);
return;
}
@@ -537,6 +559,9 @@ send_to_port(Process *c_p, Eterm message,
*/
do_send_schedfix_to_port(trace_port, c_p->id, ts);
}
+
+ erts_port_release(trace_port);
+
UnUseTmpHeapNoproc(LOCAL_HEAP_SIZE);
#undef LOCAL_HEAP_SIZE
#endif
@@ -566,15 +591,19 @@ profile_send(Eterm from, Eterm message) {
Port *profiler_port = NULL;
/* not smp */
-
-
- profiler_port = &erts_port[internal_port_index(profiler)];
-
- do_send_to_port(profiler,
- profiler_port,
- NIL, /* or current process->id */
- SYS_MSG_TYPE_SYSPROF,
- message);
+
+ profiler_port = erts_id2port_sflgs(profiler,
+ NULL,
+ 0,
+ ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP);
+ if (profiler_port) {
+ do_send_to_port(profiler,
+ profiler_port,
+ NIL, /* or current process->id */
+ SYS_MSG_TYPE_SYSPROF,
+ message);
+ erts_port_release(profiler_port);
+ }
} else {
ASSERT(is_internal_pid(profiler)
@@ -627,13 +656,11 @@ seq_trace_send_to_port(Process *c_p,
trace_port = NULL;
#else
- if (is_not_internal_port(seq_tracer))
- goto invalid_tracer_port;
-
- trace_port = &erts_port[internal_port_index(seq_tracer)];
-
- if (INVALID_TRACER_PORT(trace_port, seq_tracer)) {
- invalid_tracer_port:
+ trace_port = erts_id2port_sflgs(seq_tracer,
+ NULL,
+ 0,
+ ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP);
+ if (!trace_port) {
system_seq_tracer = am_false;
#ifndef ERTS_SMP
UnUseTmpHeapNoproc(LOCAL_HEAP_SIZE);
@@ -651,6 +678,7 @@ seq_trace_send_to_port(Process *c_p,
message);
#ifndef ERTS_SMP
+ erts_port_release(trace_port);
UnUseTmpHeapNoproc(LOCAL_HEAP_SIZE);
return;
}
@@ -692,6 +720,9 @@ seq_trace_send_to_port(Process *c_p,
*/
do_send_schedfix_to_port(trace_port, c_p->id, ts);
}
+
+ erts_port_release(trace_port);
+
UnUseTmpHeapNoproc(LOCAL_HEAP_SIZE);
#undef LOCAL_HEAP_SIZE
#endif
@@ -790,13 +821,8 @@ trace_sched_aux(Process *p, Eterm what, int never_fake_sched)
ERTS_GET_TRACER_REF(tracer_ref, p->tracer_proc, p->trace_flags);
}
- if (ERTS_PROC_IS_EXITING(p)
-#ifndef ERTS_SMP
- || p->status == P_FREE
-#endif
- ) {
+ if (ERTS_PROC_IS_EXITING(p))
curr_func = 0;
- }
else {
if (!p->current)
p->current = find_function_from_pc(p->i);
@@ -3148,10 +3174,10 @@ sys_msg_disp_failure(ErtsSysMsgQ *smqp, Eterm receiver)
break;
case SYS_MSG_TYPE_SEQTRACE:
/* Reset seq_tracer if it hasn't changed */
- erts_smp_mtx_lock(&sys_trace_mtx);
+ erts_smp_rwmtx_rwlock(&sys_trace_rwmtx);
if (system_seq_tracer == receiver)
system_seq_tracer = am_false;
- erts_smp_mtx_unlock(&sys_trace_mtx);
+ erts_smp_rwmtx_rwunlock(&sys_trace_rwmtx);
break;
case SYS_MSG_TYPE_SYSMON:
if (receiver == NIL
@@ -3406,12 +3432,12 @@ sys_msg_dispatcher_func(void *unused)
goto queue_proc_msg;
}
else if (is_internal_port(receiver)) {
- port = erts_id2port(receiver, NULL, 0);
- if (INVALID_TRACER_PORT(port, receiver)) {
- if (port)
- erts_port_release(port);
+ port = erts_id2port_sflgs(receiver,
+ NULL,
+ 0,
+ ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP);
+ if (!port)
goto failure;
- }
else {
write_sys_msg_to_port(receiver,
port,
diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h
index ded9646800..e9dbba3d4a 100644
--- a/erts/emulator/beam/global.h
+++ b/erts/emulator/beam/global.h
@@ -143,8 +143,8 @@ typedef struct ErtsXPortsList_ ErtsXPortsList;
struct port {
ErtsPortTaskSched sched;
ErtsPortTaskHandle timeout_task;
-#ifdef ERTS_SMP
erts_smp_atomic_t refc;
+#ifdef ERTS_SMP
erts_smp_mtx_t *lock;
ErtsXPortsList *xports;
erts_smp_atomic_t run_queue;
@@ -196,6 +196,8 @@ erts_port_runq(Port *prt)
#ifdef ERTS_SMP
ErtsRunQueue *rq1, *rq2;
rq1 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue);
+ if (!rq1)
+ return NULL;
while (1) {
erts_smp_runq_lock(rq1);
rq2 = (ErtsRunQueue *) erts_smp_atomic_read_nob(&prt->run_queue);
@@ -203,6 +205,8 @@ erts_port_runq(Port *prt)
return rq1;
erts_smp_runq_unlock(rq1);
rq1 = rq2;
+ if (!rq1)
+ return NULL;
}
#else
return ERTS_RUNQ_IX(0);
@@ -1220,28 +1224,29 @@ erts_smp_port_state_unlock(Port *prt)
ERTS_GLB_INLINE int
erts_smp_port_trylock(Port *prt)
{
-#ifdef ERTS_SMP
int res;
ASSERT(erts_smp_atomic_read_nob(&prt->refc) > 0);
erts_smp_atomic_inc_nob(&prt->refc);
+
+#ifdef ERTS_SMP
res = erts_smp_mtx_trylock(prt->lock);
if (res == EBUSY) {
erts_smp_atomic_dec_nob(&prt->refc);
}
+#else
+ res = 0;
+#endif
return res;
-#else /* !ERTS_SMP */
- return 0;
-#endif
}
ERTS_GLB_INLINE void
erts_smp_port_lock(Port *prt)
{
-#ifdef ERTS_SMP
ASSERT(erts_smp_atomic_read_nob(&prt->refc) > 0);
erts_smp_atomic_inc_nob(&prt->refc);
+#ifdef ERTS_SMP
erts_smp_mtx_lock(prt->lock);
#endif
}
@@ -1249,14 +1254,14 @@ erts_smp_port_lock(Port *prt)
ERTS_GLB_INLINE void
erts_smp_port_unlock(Port *prt)
{
-#ifdef ERTS_SMP
erts_aint_t refc;
+#ifdef ERTS_SMP
erts_smp_mtx_unlock(prt->lock);
+#endif
refc = erts_smp_atomic_dec_read_nob(&prt->refc);
ASSERT(refc >= 0);
if (refc == 0)
erts_port_cleanup(prt);
-#endif
}
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
@@ -1319,12 +1324,12 @@ erts_id2port_sflgs(Eterm id, Process *c_p, ErtsProcLocks c_p_locks, Uint32 sflgs
erts_smp_port_state_unlock(prt);
prt = NULL;
}
-#ifdef ERTS_SMP
else {
erts_smp_atomic_inc_nob(&prt->refc);
erts_smp_port_state_unlock(prt);
- if (no_proc_locks)
+#ifdef ERTS_SMP
+ if (no_proc_locks)
erts_smp_mtx_lock(prt->lock);
else if (erts_smp_mtx_trylock(prt->lock) == EBUSY) {
/* Unlock process locks, and acquire locks in lock order... */
@@ -1340,21 +1345,17 @@ erts_id2port_sflgs(Eterm id, Process *c_p, ErtsProcLocks c_p_locks, Uint32 sflgs
erts_smp_port_unlock(prt); /* Also decrements refc... */
prt = NULL;
}
- }
#endif
+ }
+
return prt;
}
ERTS_GLB_INLINE void
erts_port_release(Port *prt)
{
-#ifdef ERTS_SMP
erts_smp_port_unlock(prt);
-#else
- if (prt->status & ERTS_PORT_SFLGS_DEAD)
- erts_port_cleanup(prt);
-#endif
}
ERTS_GLB_INLINE Port*
@@ -1935,6 +1936,13 @@ extern erts_driver_t spawn_driver;
extern erts_driver_t fd_driver;
/* Should maybe be placed in erl_message.h, but then we get an include mess. */
+ERTS_GLB_INLINE Eterm *
+erts_alloc_message_heap_state(Uint size,
+ ErlHeapFragment **bpp,
+ ErlOffHeap **ohpp,
+ Process *receiver,
+ ErtsProcLocks *receiver_locks,
+ erts_aint32_t *statep);
ERTS_GLB_INLINE Eterm *
erts_alloc_message_heap(Uint size,
@@ -1957,16 +1965,22 @@ erts_alloc_message_heap(Uint size,
*/
ERTS_GLB_INLINE Eterm *
-erts_alloc_message_heap(Uint size,
- ErlHeapFragment **bpp,
- ErlOffHeap **ohpp,
- Process *receiver,
- ErtsProcLocks *receiver_locks)
+erts_alloc_message_heap_state(Uint size,
+ ErlHeapFragment **bpp,
+ ErlOffHeap **ohpp,
+ Process *receiver,
+ ErtsProcLocks *receiver_locks,
+ erts_aint32_t *statep)
{
Eterm *hp;
+ erts_aint32_t state;
#ifdef ERTS_SMP
int locked_main = 0;
- ErtsProcLocks ulocks = *receiver_locks & ERTS_PROC_LOCKS_MSG_SEND;
+ state = erts_smp_atomic32_read_acqb(&receiver->state);
+ if (statep)
+ *statep = state;
+ if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))
+ goto allocate_in_mbuf;
#endif
if (size > (Uint) INT_MAX)
@@ -1982,20 +1996,19 @@ erts_alloc_message_heap(Uint size,
#ifdef ERTS_SMP
try_allocate_on_heap:
#endif
- if (ERTS_PROC_IS_EXITING(receiver)
+ state = erts_smp_atomic32_read_nob(&receiver->state);
+ if (statep)
+ *statep = state;
+ if ((state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))
|| HEAP_LIMIT(receiver) - HEAP_TOP(receiver) <= size) {
#ifdef ERTS_SMP
- if (locked_main)
- ulocks |= ERTS_PROC_LOCK_MAIN;
+ if (locked_main) {
+ *receiver_locks &= ~ERTS_PROC_LOCK_MAIN;
+ erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MAIN);
+ }
#endif
goto allocate_in_mbuf;
}
-#ifdef ERTS_SMP
- if (ulocks) {
- erts_smp_proc_unlock(receiver, ulocks);
- *receiver_locks &= ~ulocks;
- }
-#endif
hp = HEAP_TOP(receiver);
HEAP_TOP(receiver) = hp + size;
*bpp = NULL;
@@ -2011,12 +2024,6 @@ erts_alloc_message_heap(Uint size,
else {
ErlHeapFragment *bp;
allocate_in_mbuf:
-#ifdef ERTS_SMP
- if (ulocks) {
- *receiver_locks &= ~ulocks;
- erts_smp_proc_unlock(receiver, ulocks);
- }
-#endif
bp = new_message_buffer(size);
hp = bp->mem;
*bpp = bp;
@@ -2026,6 +2033,17 @@ erts_alloc_message_heap(Uint size,
return hp;
}
+ERTS_GLB_INLINE Eterm *
+erts_alloc_message_heap(Uint size,
+ ErlHeapFragment **bpp,
+ ErlOffHeap **ohpp,
+ Process *receiver,
+ ErtsProcLocks *receiver_locks)
+{
+ return erts_alloc_message_heap_state(size, bpp, ohpp, receiver,
+ receiver_locks, NULL);
+}
+
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
#if !HEAP_ON_C_STACK
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 8d20178307..4dd60b4d23 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -259,10 +259,8 @@ get_free_port(void)
}
}
port->status = ERTS_PORT_SFLG_INITIALIZING;
-#ifdef ERTS_SMP
- ERTS_SMP_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 0);
+ ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 0);
erts_smp_atomic_set_nob(&port->refc, 2); /* Port alive + lock */
-#endif
erts_smp_port_state_unlock(port);
return num & port_num_mask;
}
@@ -340,10 +338,14 @@ port_cleanup(Port *prt)
prt->drv_ptr = NULL;
ASSERT(driver);
-#ifdef ERTS_SMP
-
ASSERT(prt->status & ERTS_PORT_SFLG_FREE_SCHEDULED);
- ERTS_SMP_LC_ASSERT(erts_smp_atomic_read_nob(&prt->refc) == 0);
+ ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&prt->refc) == 0);
+
+ ASSERT(prt->status & ERTS_PORT_SFLG_PORT_DEBUG);
+ ASSERT(!(prt->status & ERTS_PORT_SFLG_FREE));
+ prt->status = ERTS_PORT_SFLG_FREE;
+
+#ifdef ERTS_SMP
port_specific = (prt->status & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK);
@@ -352,10 +354,6 @@ port_cleanup(Port *prt)
prt->lock = NULL;
- ASSERT(prt->status & ERTS_PORT_SFLG_PORT_DEBUG);
- ASSERT(!(prt->status & ERTS_PORT_SFLG_FREE));
- prt->status = ERTS_PORT_SFLG_FREE;
-
erts_smp_port_state_unlock(prt);
erts_smp_mtx_unlock(mtx);
@@ -605,10 +603,8 @@ erts_open_driver(erts_driver_t* driver, /* Pointer to driver. */
/* Need to mark the port as free again */
erts_smp_port_state_lock(port);
port->status = ERTS_PORT_SFLG_FREE;
-#ifdef ERTS_SMP
- ERTS_SMP_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 2);
+ ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&port->refc) == 2);
erts_smp_atomic_set_nob(&port->refc, 0);
-#endif
erts_smp_port_state_unlock(port);
return -3;
}
@@ -1343,8 +1339,8 @@ void init_io(void)
for (i = 0; i < erts_max_ports; i++) {
erts_port_task_init_sched(&erts_port[i].sched);
-#ifdef ERTS_SMP
erts_smp_atomic_init_nob(&erts_port[i].refc, 0);
+#ifdef ERTS_SMP
erts_port[i].lock = NULL;
erts_port[i].xports = NULL;
erts_smp_spinlock_init_x(&erts_port[i].state_lck, "port_state", make_small(i));
diff --git a/erts/emulator/beam/register.c b/erts/emulator/beam/register.c
index 0ddae56322..c02872ef80 100644
--- a/erts/emulator/beam/register.c
+++ b/erts/emulator/beam/register.c
@@ -93,14 +93,14 @@ reg_safe_write_lock(Process *c_p, ErtsProcLocks *c_p_locks)
reg_write_lock();
}
+#endif
+
static ERTS_INLINE int
is_proc_alive(Process *p)
{
return !ERTS_PROC_IS_EXITING(p);
}
-#endif
-
void register_info(int to, void *to_arg)
{
int lock = !ERTS_IS_CRASH_DUMPING;
@@ -384,8 +384,7 @@ erts_whereis_name(Process *c_p,
}
#else
if (rp->p
- && ((flags & ERTS_P2P_FLG_ALLOW_OTHER_X)
- || rp->p->status != P_EXITING))
+ && ((flags & ERTS_P2P_FLG_ALLOW_OTHER_X) || is_proc_alive(rp->p)))
*proc = rp->p;
else
*proc = NULL;
@@ -397,7 +396,9 @@ erts_whereis_name(Process *c_p,
if (!rp || !rp->pt)
*port = NULL;
else {
-#ifdef ERTS_SMP
+#ifndef ERTS_SMP
+ erts_smp_atomic_inc_nob(&rp->pt->refc);
+#else
if (pending_port == rp->pt)
pending_port = NULL;
else {
@@ -504,9 +505,11 @@ int erts_unregister_name(Process *c_p,
if ((rp = (RegProc*) hash_get(&process_reg, (void*) &r)) != NULL) {
if (rp->pt) {
if (port != rp->pt) {
-#ifdef ERTS_SMP
+#ifndef ERTS_SMP
+ erts_smp_atomic_inc_nob(&rp->pt->refc);
+#else
if (port) {
- ERTS_SMP_LC_ASSERT(port != c_prt);
+ ASSERT(port != c_prt);
erts_smp_port_unlock(port);
port = NULL;
}
diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c
index 245916bc0d..db6597dc7c 100644
--- a/erts/emulator/beam/utils.c
+++ b/erts/emulator/beam/utils.c
@@ -1666,12 +1666,20 @@ static int do_send_to_logger(Eterm tag, Eterm gleader, char *buf, int len)
}
#ifndef ERTS_SMP
- if (
#ifdef USE_THREADS
- !erts_get_scheduler_data() || /* Must be scheduler thread */
+ p = NULL;
+ if (erts_get_scheduler_data()) /* Must be scheduler thread */
#endif
- (p = erts_whereis_process(NULL, 0, am_error_logger, 0, 0)) == NULL
- || p->status == P_RUNNING) {
+ {
+ p = erts_whereis_process(NULL, 0, am_error_logger, 0, 0);
+ if (p) {
+ erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state);
+ if (state & ERTS_PSFLG_RUNNING)
+ p = NULL;
+ }
+ }
+
+ if (!p) {
/* buf *always* points to a null terminated string */
erts_fprintf(stderr, "(no error logger present) %T: \"%s\"\n",
tag, buf);