diff options
-rw-r--r-- | erts/emulator/beam/bif.c | 15 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.c | 6 | ||||
-rw-r--r-- | erts/emulator/beam/erl_port.h | 38 | ||||
-rw-r--r-- | erts/emulator/beam/erl_port_task.c | 25 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 161 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 152 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process_dump.c | 4 | ||||
-rw-r--r-- | erts/emulator/beam/io.c | 16 | ||||
-rw-r--r-- | erts/etc/unix/etp-commands.in | 4 |
9 files changed, 257 insertions, 164 deletions
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index d68ccc3028..bfd572335f 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -1744,7 +1744,6 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2) else if (BIF_ARG_1 == am_scheduler) { ErtsRunQueue *old, *new, *curr; Sint sched; - erts_aint32_t state; if (!is_small(BIF_ARG_2)) goto error; @@ -1753,23 +1752,23 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2) goto error; if (sched == 0) { + old = erts_bind_runq_proc(BIF_P, 0); new = NULL; - state = erts_atomic32_read_band_mb(&BIF_P->state, - ~ERTS_PSFLG_BOUND); } else { + int bound = !0; new = erts_schedid2runq(sched); - erts_atomic_set_nob(&BIF_P->run_queue, (erts_aint_t) new); - state = erts_atomic32_read_bor_mb(&BIF_P->state, - ERTS_PSFLG_BOUND); + old = erts_set_runq_proc(BIF_P, new, &bound); + if (!bound) + old = NULL; } + old_value = old ? make_small(old->ix+1) : make_small(0); + curr = erts_proc_sched_data(BIF_P)->run_queue; - old = (ERTS_PSFLG_BOUND & state) ? curr : NULL; ASSERT(!old || old == curr); - old_value = old ? make_small(old->ix+1) : make_small(0); if (new && new != curr) ERTS_BIF_YIELD_RETURN_X(BIF_P, old_value, am_scheduler); else diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index abf194cf94..6f7c71ef98 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -616,7 +616,7 @@ erts_try_alloc_message_on_heap(Process *pp, } else { in_message_fragment: - if (!((*psp) & ERTS_PSFLG_ON_HEAP_MSGQ)) { + if ((*psp) & ERTS_PSFLG_OFF_HEAP_MSGQ) { mp = erts_alloc_message(sz, hpp); *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; } @@ -1079,8 +1079,6 @@ erts_change_message_queue_management(Process *c_p, Eterm new_state) case am_on_heap: c_p->flags |= F_ON_HEAP_MSGQ; c_p->flags &= ~F_OFF_HEAP_MSGQ; - erts_atomic32_read_bor_nob(&c_p->state, - ERTS_PSFLG_ON_HEAP_MSGQ); /* * We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ * if a off heap change is ongoing. It will be adjusted @@ -1106,8 +1104,6 @@ erts_change_message_queue_management(Process *c_p, Eterm new_state) break; case am_off_heap: c_p->flags &= ~F_ON_HEAP_MSGQ; - erts_atomic32_read_band_nob(&c_p->state, - ~ERTS_PSFLG_ON_HEAP_MSGQ); goto change_to_off_heap; default: res = THE_NON_VALUE; /* badarg */ diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h index 6b90187d60..0d148ee048 100644 --- a/erts/emulator/beam/erl_port.h +++ b/erts/emulator/beam/erl_port.h @@ -196,26 +196,52 @@ struct erl_drv_port_data_lock { Port *prt; }; +ERTS_GLB_INLINE void erts_init_runq_port(Port *prt, ErtsRunQueue *runq); +ERTS_GLB_INLINE void erts_set_runq_port(Port *prt, ErtsRunQueue *runq); +ERTS_GLB_INLINE ErtsRunQueue *erts_get_runq_port(Port *prt); ERTS_GLB_INLINE ErtsRunQueue *erts_port_runq(Port *prt); #if ERTS_GLB_INLINE_INCL_FUNC_DEF +ERTS_GLB_INLINE void +erts_init_runq_port(Port *prt, ErtsRunQueue *runq) +{ + if (!runq) + ERTS_INTERNAL_ERROR("Missing run-queue"); + erts_atomic_init_nob(&prt->run_queue, (erts_aint_t) runq); +} + +ERTS_GLB_INLINE void +erts_set_runq_port(Port *prt, ErtsRunQueue *runq) +{ + if (!runq) + ERTS_INTERNAL_ERROR("Missing run-queue"); + erts_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq); +} + +ERTS_GLB_INLINE ErtsRunQueue * +erts_get_runq_port(Port *prt) +{ + ErtsRunQueue *runq; + runq = (ErtsRunQueue *) erts_atomic_read_nob(&prt->run_queue); + if (!runq) + ERTS_INTERNAL_ERROR("Missing run-queue"); + return runq; +} + + ERTS_GLB_INLINE ErtsRunQueue * erts_port_runq(Port *prt) { ErtsRunQueue *rq1, *rq2; - rq1 = (ErtsRunQueue *) erts_atomic_read_nob(&prt->run_queue); - if (!rq1) - return NULL; + rq1 = erts_get_runq_port(prt); while (1) { erts_runq_lock(rq1); - rq2 = (ErtsRunQueue *) erts_atomic_read_nob(&prt->run_queue); + rq2 = erts_get_runq_port(prt); if (rq1 == rq2) return rq1; erts_runq_unlock(rq1); rq1 = rq2; - if (!rq1) - return NULL; } } diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index a588477320..4a3671df0c 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -84,11 +84,10 @@ static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_q #define LTTNG_DRIVER(TRACEPOINT, PP) do {} while(0) #endif -#define ERTS_LC_VERIFY_RQ(RQ, PP) \ - do { \ +#define ERTS_LC_VERIFY_RQ(RQ, PP) \ + do { \ ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq)); \ - ERTS_LC_ASSERT((RQ) == ((ErtsRunQueue *) \ - erts_atomic_read_nob(&(PP)->run_queue))); \ + ERTS_LC_ASSERT((RQ) == erts_get_runq_port((PP))); \ } while (0) #define ERTS_PT_STATE_SCHEDULED 0 @@ -1520,19 +1519,15 @@ erts_port_task_schedule(Eterm id, /* Enqueue port on run-queue */ runq = erts_port_runq(pp); - if (!runq) - ERTS_INTERNAL_ERROR("Missing run-queue"); xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL); ERTS_LC_ASSERT(runq != xrunq); ERTS_LC_VERIFY_RQ(runq, pp); if (xrunq) { /* Emigrate port ... */ - erts_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); + erts_set_runq_port(pp, xrunq); erts_runq_unlock(runq); runq = erts_port_runq(pp); - if (!runq) - ERTS_INTERNAL_ERROR("Missing run-queue"); } enqueue_port(runq, pp); @@ -1593,8 +1588,6 @@ erts_port_task_free_port(Port *pp) ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); runq = erts_port_runq(pp); - if (!runq) - ERTS_INTERNAL_ERROR("Missing run-queue"); erts_port_task_sched_lock(&pp->sched); flags = erts_atomic32_read_bor_relb(&pp->sched.flags, ERTS_PTS_FLG_EXIT); @@ -1805,7 +1798,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) erts_unblock_fpe(fpe_was_unmasked); ERTS_MSACC_POP_STATE_M(); - ASSERT(runq == (ErtsRunQueue *) erts_atomic_read_nob(&pp->run_queue)); + ASSERT(runq == erts_get_runq_port(pp)); active = finalize_exec(pp, &execq, processing_busy_q); @@ -1831,11 +1824,10 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) } else { /* Emigrate port... */ - erts_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); + erts_set_runq_port(pp, xrunq); erts_runq_unlock(runq); xrunq = erts_port_runq(pp); - ASSERT(xrunq); enqueue_port(xrunq, pp); erts_runq_unlock(xrunq); erts_notify_inc_runq(xrunq); @@ -2069,7 +2061,7 @@ void erts_enqueue_port(ErtsRunQueue *rq, Port *pp) { ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq)); - ASSERT(rq == (ErtsRunQueue *) erts_atomic_read_nob(&pp->run_queue)); + ASSERT(rq == erts_get_runq_port(pp)); ASSERT(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ); enqueue_port(rq, pp); } @@ -2080,8 +2072,7 @@ erts_dequeue_port(ErtsRunQueue *rq) Port *pp; ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq)); pp = pop_port(rq); - ASSERT(!pp - || rq == (ErtsRunQueue *) erts_atomic_read_nob(&pp->run_queue)); + ASSERT(!pp || rq == erts_get_runq_port(pp)); ASSERT(!pp || (erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ)); return pp; diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 3baca6ce79..b19659f496 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -137,36 +137,6 @@ runq_got_work_to_execute(ErtsRunQueue *rq) return runq_got_work_to_execute_flags(ERTS_RUNQ_FLGS_GET_NOB(rq)); } -#undef RUNQ_READ_RQ -#undef RUNQ_SET_RQ -#define RUNQ_READ_RQ(X) ((ErtsRunQueue *) erts_atomic_read_nob((X))) -#define RUNQ_SET_RQ(X, RQ) erts_atomic_set_nob((X), (erts_aint_t) (RQ)) - -#ifdef DEBUG -# if defined(ARCH_64) -# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) \ - (RUNQ_SET_RQ((RQP), (0xdeadbeefdead0003LL | ((N) << 4))) -# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) \ -do { \ - ASSERT((RQP) != NULL); \ - ASSERT(((((Uint) (RQP)) & ((Uint) 0x3))) == ((Uint) 0)); \ - ASSERT((((Uint) (RQP)) & ~((Uint) 0xffff)) != ((Uint) 0xdeadbeefdead0000LL));\ -} while (0) -# else -# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) \ - (RUNQ_SET_RQ((RQP), (0xdead0003 | ((N) << 4)))) -# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) \ -do { \ - ASSERT((RQP) != NULL); \ - ASSERT(((((UWord) (RQP)) & ((UWord) 1))) == ((UWord) 0)); \ - ASSERT((((UWord) (RQP)) & ~((UWord) 0xffff)) != ((UWord) 0xdead0000)); \ -} while (0) -# endif -#else -# define ERTS_DBG_SET_INVALID_RUNQP(RQP, N) -# define ERTS_DBG_VERIFY_VALID_RUNQP(RQP) -#endif - const Process erts_invalid_process = {{ERTS_INVALID_PID}}; extern BeamInstr beam_apply[]; @@ -3940,21 +3910,16 @@ immigrate(ErtsRunQueue *c_rq, ErtsMigrationPath *mp) Port *prt; prt = erts_dequeue_port(rq); if (prt) - RUNQ_SET_RQ(&prt->run_queue, c_rq); + erts_set_runq_port(prt, c_rq); erts_runq_unlock(rq); if (prt) { - /* port might terminate while we have no lock... */ rq = erts_port_runq(prt); - if (rq) { - if (rq != c_rq) - erts_exit(ERTS_ABORT_EXIT, - "%s:%d:%s(): Internal error", - __FILE__, __LINE__, __func__); - erts_enqueue_port(c_rq, prt); - if (!iflag) - return; /* done */ - erts_runq_unlock(c_rq); - } + if (rq != c_rq) + ERTS_INTERNAL_ERROR("Unexpected run-queue"); + erts_enqueue_port(c_rq, prt); + if (!iflag) + return; /* done */ + erts_runq_unlock(c_rq); } } else { @@ -3968,12 +3933,11 @@ immigrate(ErtsRunQueue *c_rq, ErtsMigrationPath *mp) while (proc) { erts_aint32_t state; state = erts_atomic32_read_acqb(&proc->state); - if (!(ERTS_PSFLG_BOUND & state) - && (prio == (int) ERTS_PSFLGS_GET_PRQ_PRIO(state))) { + if (prio == (int) ERTS_PSFLGS_GET_PRQ_PRIO(state) + && erts_try_change_runq_proc(proc, c_rq)) { ErtsRunQueueInfo *rqi = &rq->procs.prio_info[prio]; unqueue_process(rq, rpq, rqi, prio, prev_proc, proc); erts_runq_unlock(rq); - RUNQ_SET_RQ(&proc->run_queue, c_rq); rq_locked = 0; erts_runq_lock(c_rq); @@ -4154,21 +4118,13 @@ evacuate_run_queue(ErtsRunQueue *rq, while (prt) { ErtsRunQueue *prt_rq; prt = erts_dequeue_port(rq); - RUNQ_SET_RQ(&prt->run_queue, to_rq); + erts_set_runq_port(prt, to_rq); erts_runq_unlock(rq); - /* - * The port might terminate while - * we have no lock on it... - */ prt_rq = erts_port_runq(prt); - if (prt_rq) { - if (prt_rq != to_rq) - erts_exit(ERTS_ABORT_EXIT, - "%s:%d:%s() internal error\n", - __FILE__, __LINE__, __func__); - erts_enqueue_port(to_rq, prt); - erts_runq_unlock(to_rq); - } + if (prt_rq != to_rq) + ERTS_INTERNAL_ERROR("Unexpected run-queue"); + erts_enqueue_port(to_rq, prt); + erts_runq_unlock(to_rq); erts_runq_lock(rq); prt = rq->ports.start; } @@ -4191,14 +4147,13 @@ evacuate_run_queue(ErtsRunQueue *rq, while (proc) { Process *real_proc; int prio; - erts_aint32_t max_qbit, qbit, real_state; + erts_aint32_t max_qbit, qbit; prio = ERTS_PSFLGS_GET_PRQ_PRIO(state); qbit = ((erts_aint32_t) 1) << prio; if (!(state & ERTS_PSFLG_PROXY)) { real_proc = proc; - real_state = state; } else { real_proc = erts_proc_lookup_raw(proc->common.id); @@ -4206,7 +4161,6 @@ evacuate_run_queue(ErtsRunQueue *rq, free_proxy_proc(proc); goto handle_next_proc; } - real_state = erts_atomic32_read_acqb(&real_proc->state); } max_qbit = (state >> ERTS_PSFLGS_IN_PRQ_MASK_OFFSET); @@ -4241,7 +4195,7 @@ evacuate_run_queue(ErtsRunQueue *rq, goto handle_next_proc; } - if (ERTS_PSFLG_BOUND & real_state) { + if (!erts_try_change_runq_proc(proc, to_rq)) { /* Bound processes get stuck here... */ proc->next = NULL; if (sbpp->last) @@ -4255,7 +4209,6 @@ evacuate_run_queue(ErtsRunQueue *rq, erts_runq_unlock(rq); to_rq = mp->prio[prio].runq; - RUNQ_SET_RQ(&proc->run_queue, to_rq); erts_runq_lock(to_rq); enqueue_process(to_rq, prio, proc); @@ -4323,14 +4276,13 @@ try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq, proc = rpq->first; while (proc) { - erts_aint32_t state = erts_atomic32_read_acqb(&proc->state); - if (!(ERTS_PSFLG_BOUND & state)) { + if (erts_try_change_runq_proc(proc, rq)) { + erts_aint32_t state = erts_atomic32_read_acqb(&proc->state); /* Steal process */ int prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state); ErtsRunQueueInfo *rqi = &vrq->procs.prio_info[prio]; unqueue_process(vrq, rpq, rqi, prio, prev_proc, proc); erts_runq_unlock(vrq); - RUNQ_SET_RQ(&proc->run_queue, rq); erts_runq_lock(rq); *rq_lockedp = 1; @@ -4355,26 +4307,14 @@ no_procs: if (vrq->ports.start) { ErtsRunQueue *prt_rq; Port *prt = erts_dequeue_port(vrq); - RUNQ_SET_RQ(&prt->run_queue, rq); + erts_set_runq_port(prt, rq); erts_runq_unlock(vrq); - - /* - * The port might terminate while - * we have no lock on it... - */ - prt_rq = erts_port_runq(prt); - if (!prt_rq) - return 0; - else { - if (prt_rq != rq) - erts_exit(ERTS_ABORT_EXIT, - "%s:%d:%s() internal error\n", - __FILE__, __LINE__, __func__); - *rq_lockedp = 1; - erts_enqueue_port(rq, prt); - return !0; - } + if (prt_rq != rq) + ERTS_INTERNAL_ERROR("Unexpected run-queue"); + *rq_lockedp = 1; + erts_enqueue_port(rq, prt); + return !0; } erts_runq_unlock(vrq); @@ -6130,7 +6070,8 @@ make_proxy_proc(Process *prev_proxy, Process *proc, erts_aint32_t prio) { erts_aint32_t state; Process *proxy; - ErtsRunQueue *rq = RUNQ_READ_RQ(&proc->run_queue); + int bound; + ErtsRunQueue *rq = erts_get_runq_proc(proc, &bound); state = (ERTS_PSFLG_PROXY | ERTS_PSFLG_IN_RUNQ @@ -6143,7 +6084,7 @@ make_proxy_proc(Process *prev_proxy, Process *proc, erts_aint32_t prio) proxy = prev_proxy; ASSERT(erts_atomic32_read_nob(&proxy->state) & ERTS_PSFLG_PROXY); erts_atomic32_set_nob(&proxy->state, state); - RUNQ_SET_RQ(&proc->run_queue, rq); + (void) erts_set_runq_proc(proc, rq, &bound); } else { proxy = erts_alloc(ERTS_ALC_T_PROC, sizeof(Process)); @@ -6156,8 +6097,7 @@ make_proxy_proc(Process *prev_proxy, Process *proc, erts_aint32_t prio) } #endif erts_atomic32_init_nob(&proxy->state, state); - erts_atomic_init_nob(&proxy->run_queue, - erts_atomic_read_nob(&proc->run_queue)); + erts_init_runq_proc(proc, rq, bound); } proxy->common.id = proc->common.id; @@ -6348,18 +6288,21 @@ select_enqueue_run_queue(int enqueue, int enq_prio, Process *p, erts_aint32_t st default: { ErtsRunQueue* runq; + int bound; ASSERT(enqueue == ERTS_ENQUEUE_NORMAL_QUEUE || enqueue == -ERTS_ENQUEUE_NORMAL_QUEUE); - runq = erts_get_runq_proc(p); + runq = erts_get_runq_proc(p, &bound); - if (!(ERTS_PSFLG_BOUND & state)) { + if (!bound) { ErtsRunQueue *new_runq = erts_check_emigration_need(runq, enq_prio); - if (new_runq) { - RUNQ_SET_RQ(&p->run_queue, new_runq); - runq = new_runq; - } + if (new_runq) { + if (erts_try_change_runq_proc(p, new_runq)) + runq = new_runq; + else + runq = erts_get_runq_proc(p, NULL); + } } ASSERT(runq); @@ -11476,6 +11419,7 @@ typedef struct { Process *proc; erts_aint32_t state; ErtsRunQueue *run_queue; + int bound; } ErtsEarlyProcInit; static void early_init_process_struct(void *varg, Eterm data) @@ -11486,10 +11430,9 @@ static void early_init_process_struct(void *varg, Eterm data) proc->common.id = make_internal_pid(data); erts_atomic32_init_nob(&proc->dirty_state, 0); proc->dirty_sys_tasks = NULL; + erts_init_runq_proc(proc, arg->run_queue, arg->bound); erts_atomic32_init_relb(&proc->state, arg->state); - RUNQ_SET_RQ(&proc->run_queue, arg->run_queue); - erts_proc_lock_init(proc); /* All locks locked */ } @@ -11498,7 +11441,7 @@ static void early_init_process_struct(void *varg, Eterm data) ** Allocate process and find out where to place next process. */ static Process* -alloc_process(ErtsRunQueue *rq, erts_aint32_t state) +alloc_process(ErtsRunQueue *rq, int bound, erts_aint32_t state) { ErtsEarlyProcInit init_arg; Process *p; @@ -11507,9 +11450,12 @@ alloc_process(ErtsRunQueue *rq, erts_aint32_t state) if (!p) return NULL; + ASSERT(rq); + init_arg.proc = (Process *) p; - init_arg.run_queue = rq; init_arg.state = state; + init_arg.run_queue = rq; + init_arg.bound = bound; ERTS_CT_ASSERT(offsetof(Process,common) == 0); @@ -11544,6 +11490,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). Eterm args, /* Arguments for function (must be well-formed list). */ ErlSpawnOpts* so) /* Options for spawn. */ { + int bound = 0; Uint flags = 0; ErtsRunQueue *rq = NULL; Process *p; @@ -11580,7 +11527,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). ASSERT(0 <= ix && ix < erts_no_run_queues); rq = ERTS_RUNQ_IX(ix); /* Unsupported feature... */ - state |= ERTS_PSFLG_BOUND; + bound = !0; } prio = (erts_aint32_t) so->priority; } @@ -11593,17 +11540,16 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). flags |= F_OFF_HEAP_MSGQ; } else if (so->flags & SPO_ON_HEAP_MSGQ) { - state |= ERTS_PSFLG_ON_HEAP_MSGQ; flags |= F_ON_HEAP_MSGQ; } ASSERT((flags & F_ON_HEAP_MSGQ) || (flags & F_OFF_HEAP_MSGQ)); if (!rq) - rq = erts_get_runq_proc(parent); + rq = erts_get_runq_proc(parent, NULL); - p = alloc_process(rq, state); /* All proc locks are locked by this thread - on success */ + p = alloc_process(rq, bound, state); /* All proc locks are locked by this thread + on success */ if (!p) { erts_send_error_to_logger_str(parent->group_leader, "Too many processes\n"); @@ -11611,11 +11557,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). goto error; } - ASSERT((erts_atomic32_read_nob(&p->state) - & ERTS_PSFLG_ON_HEAP_MSGQ) - || (erts_atomic32_read_nob(&p->state) - & ERTS_PSFLG_OFF_HEAP_MSGQ)); - #ifdef SHCOPY_SPAWN arg_size = copy_shared_calculate(args, &info); #else @@ -11990,7 +11931,7 @@ void erts_init_empty_process(Process *p) p->pending_exit.bp = NULL; erts_proc_lock_init(p); erts_proc_unlock(p, ERTS_PROC_LOCKS_ALL); - RUNQ_SET_RQ(&p->run_queue, ERTS_RUNQ_IX(0)); + erts_init_runq_proc(p, ERTS_RUNQ_IX(0), 0); #if !defined(NO_FPE_SIGNALS) || defined(HIPE) p->fp_exception = 0; @@ -12312,7 +12253,7 @@ save_pending_exiter(Process *p, ErtsProcList *plp) ERTS_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - rq = RUNQ_READ_RQ(&p->run_queue); + rq = erts_get_runq_proc(p, NULL); ASSERT(rq && !ERTS_RUNQ_IX_IS_DIRTY(rq->ix)); if (!plp) diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 4f200b13ec..e585fabb51 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -244,6 +244,9 @@ extern int erts_dio_sched_thread_suggested_stack_size; (erts_aint32_t) (MSK), \ (erts_aint32_t) (FLGS))) +#define ERTS_RUNQ_POINTER_MASK (~((erts_aint_t) 3)) +#define ERTS_RUNQ_BOUND_FLAG ((erts_aint_t) 1) + typedef enum { ERTS_SCHDLR_SSPND_DONE_MSCHED_BLOCKED, ERTS_SCHDLR_SSPND_DONE_NMSCHED_BLOCKED, @@ -1168,14 +1171,14 @@ void erts_check_for_holes(Process* p); #define ERTS_PSFLG_RUNNING ERTS_PSFLG_BIT(9) #define ERTS_PSFLG_SUSPENDED ERTS_PSFLG_BIT(10) #define ERTS_PSFLG_GC ERTS_PSFLG_BIT(11) -#define ERTS_PSFLG_BOUND ERTS_PSFLG_BIT(12) +/* #define ERTS_PSFLG_ ERTS_PSFLG_BIT(12) */ #define ERTS_PSFLG_TRAP_EXIT ERTS_PSFLG_BIT(13) #define ERTS_PSFLG_ACTIVE_SYS ERTS_PSFLG_BIT(14) #define ERTS_PSFLG_RUNNING_SYS ERTS_PSFLG_BIT(15) #define ERTS_PSFLG_PROXY ERTS_PSFLG_BIT(16) #define ERTS_PSFLG_DELAYED_SYS ERTS_PSFLG_BIT(17) #define ERTS_PSFLG_OFF_HEAP_MSGQ ERTS_PSFLG_BIT(18) -#define ERTS_PSFLG_ON_HEAP_MSGQ ERTS_PSFLG_BIT(19) +/* #define ERTS_PSFLG_ ERTS_PSFLG_BIT(19) */ #define ERTS_PSFLG_DIRTY_CPU_PROC ERTS_PSFLG_BIT(20) #define ERTS_PSFLG_DIRTY_IO_PROC ERTS_PSFLG_BIT(21) #define ERTS_PSFLG_DIRTY_ACTIVE_SYS ERTS_PSFLG_BIT(22) @@ -2169,7 +2172,12 @@ ERTS_GLB_INLINE int erts_is_scheduler_bound(ErtsSchedulerData *esdp); ERTS_GLB_INLINE Process *erts_get_current_process(void); ERTS_GLB_INLINE Eterm erts_get_current_pid(void); ERTS_GLB_INLINE Uint erts_get_scheduler_id(void); -ERTS_GLB_INLINE ErtsRunQueue *erts_get_runq_proc(Process *p); +ERTS_GLB_INLINE void erts_init_runq_proc(Process *p, ErtsRunQueue *rq, int bnd); +ERTS_GLB_INLINE ErtsRunQueue *erts_set_runq_proc(Process *p, ErtsRunQueue *rq, int *boundp); +ERTS_GLB_INLINE int erts_try_change_runq_proc(Process *p, ErtsRunQueue *rq); +ERTS_GLB_INLINE ErtsRunQueue *erts_bind_runq_proc(Process *p, int bind); +ERTS_GLB_INLINE int erts_proc_runq_is_bound(Process *p); +ERTS_GLB_INLINE ErtsRunQueue *erts_get_runq_proc(Process *p, int *boundp); ERTS_GLB_INLINE ErtsRunQueue *erts_get_runq_current(ErtsSchedulerData *esdp); ERTS_GLB_INLINE void erts_runq_lock(ErtsRunQueue *rq); ERTS_GLB_INLINE int erts_runq_trylock(ErtsRunQueue *rq); @@ -2249,11 +2257,143 @@ Uint erts_get_scheduler_id(void) return esdp ? esdp->no : (Uint) 0; } +/** + * Init run-queue of process. + * + * @param p[in,out] Process + * @param rq[in] Run-queue that process will be assigned to + * @param bnd[in,out] If non-zero binds process to run-queue. + */ + +ERTS_GLB_INLINE void +erts_init_runq_proc(Process *p, ErtsRunQueue *rq, int bnd) +{ + erts_aint_t rqint = (erts_aint_t) rq; + if (bnd) + rqint |= ERTS_RUNQ_BOUND_FLAG; + erts_atomic_init_nob(&p->run_queue, rqint); +} + +/** + * Forcibly set run-queue of process. + * + * @param p[in,out] Process + * @param rq[in] Run-queue that process will be assigned to + * @param bndp[in,out] Pointer to integer. On input non-zero + * value causes the process to be bound to + * the run-queue. On output, indicating + * wether process previously was bound or + * not. + * @return Previous run-queue. + */ + +ERTS_GLB_INLINE ErtsRunQueue * +erts_set_runq_proc(Process *p, ErtsRunQueue *rq, int *bndp) +{ + erts_aint_t rqint = (erts_aint_t) rq; + ASSERT(bndp); + if (*bndp) + rqint |= ERTS_RUNQ_BOUND_FLAG; + rqint = erts_atomic_xchg_nob(&p->run_queue, rqint); + *bndp = (int) (rqint & ERTS_RUNQ_BOUND_FLAG); + return (ErtsRunQueue *) (rqint & ERTS_RUNQ_POINTER_MASK); +} + +/** + * Try to change run-queue assignment of a process. + * + * @param p[in,out] Process + * @param rq[int] Run-queue that process will be assigned to + * @return Non-zero if the run-queue assignment was + * successfully changed. + */ + +ERTS_GLB_INLINE int +erts_try_change_runq_proc(Process *p, ErtsRunQueue *rq) +{ + erts_aint_t old_rqint, new_rqint; + + new_rqint = (erts_aint_t) rq; + old_rqint = (erts_aint_t) erts_atomic_read_nob(&p->run_queue); + while (1) { + erts_aint_t act_rqint; + + if (old_rqint & ERTS_RUNQ_BOUND_FLAG) + return 0; + + act_rqint = erts_atomic_cmpxchg_nob(&p->run_queue, + new_rqint, + old_rqint); + if (act_rqint == old_rqint) + return !0; + } +} + +/** + * + * Bind or unbind process to/from currently used run-queue. + * + * @param p Process + * @param bind Bind if non-zero; otherwise unbind + * @return Pointer to previously bound run-queue, + * or NULL if previously unbound + */ + +ERTS_GLB_INLINE ErtsRunQueue * +erts_bind_runq_proc(Process *p, int bind) +{ + erts_aint_t rqint; + if (bind) + rqint = erts_atomic_read_bor_nob(&p->run_queue, + ERTS_RUNQ_BOUND_FLAG); + else + rqint = erts_atomic_read_band_nob(&p->run_queue, + ~ERTS_RUNQ_BOUND_FLAG); + if (rqint & ERTS_RUNQ_BOUND_FLAG) + return (ErtsRunQueue *) (rqint & ERTS_RUNQ_POINTER_MASK); + else + return NULL; +} + +/** + * Determine wether a process is bound to a run-queue or not. + * + * @return Returns a non-zero value if bound, + * and zero of not bound. + */ + +ERTS_GLB_INLINE int +erts_proc_runq_is_bound(Process *p) +{ + erts_aint_t rqint = erts_atomic_read_nob(&p->run_queue); + return (int) (rqint & ERTS_RUNQ_BOUND_FLAG); +} + +/** + * Set run-queue of process. + * + * @param p[in,out] Process + * @param bndp[out] Pointer to integer. If non-NULL pointer, + * the integer will be set to a non-zero + * value if the process is bound to the + * run-queue. + * @return Pointer to the normal run-queue that + * the process currently is assigend to. + * A process is always assigned to a + * normal run-queue. + */ + ERTS_GLB_INLINE ErtsRunQueue * -erts_get_runq_proc(Process *p) +erts_get_runq_proc(Process *p, int *bndp) { - ASSERT(ERTS_AINT_NULL != erts_atomic_read_nob(&p->run_queue)); - return (ErtsRunQueue *) erts_atomic_read_nob(&p->run_queue); + erts_aint_t rqint = erts_atomic_read_nob(&p->run_queue); + ErtsRunQueue *rq; + if (bndp) + *bndp = (int) (rqint & ERTS_RUNQ_BOUND_FLAG); + rqint &= ERTS_RUNQ_POINTER_MASK; + rq = (ErtsRunQueue *) rqint; + ASSERT(rq); + return rq; } ERTS_GLB_INLINE ErtsRunQueue * diff --git a/erts/emulator/beam/erl_process_dump.c b/erts/emulator/beam/erl_process_dump.c index 0d76a4ef53..05e7bcdea2 100644 --- a/erts/emulator/beam/erl_process_dump.c +++ b/erts/emulator/beam/erl_process_dump.c @@ -1009,8 +1009,6 @@ erts_dump_extended_process_state(fmtfn_t to, void *to_arg, erts_aint32_t psflg) erts_print(to, to_arg, "SUSPENDED"); break; case ERTS_PSFLG_GC: erts_print(to, to_arg, "GC"); break; - case ERTS_PSFLG_BOUND: - erts_print(to, to_arg, "BOUND"); break; case ERTS_PSFLG_TRAP_EXIT: erts_print(to, to_arg, "TRAP_EXIT"); break; case ERTS_PSFLG_ACTIVE_SYS: @@ -1023,8 +1021,6 @@ erts_dump_extended_process_state(fmtfn_t to, void *to_arg, erts_aint32_t psflg) erts_print(to, to_arg, "DELAYED_SYS"); break; case ERTS_PSFLG_OFF_HEAP_MSGQ: erts_print(to, to_arg, "OFF_HEAP_MSGQ"); break; - case ERTS_PSFLG_ON_HEAP_MSGQ: - erts_print(to, to_arg, "ON_HEAP_MSGQ"); break; case ERTS_PSFLG_DIRTY_CPU_PROC: erts_print(to, to_arg, "DIRTY_CPU_PROC"); break; case ERTS_PSFLG_DIRTY_IO_PROC: diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 4bb2c0cb01..3e8f6263bb 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -298,7 +298,6 @@ static Port *create_port(char *name, erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED; erts_aint32_t x_pts_flgs = 0; - ErtsRunQueue *runq; if (!driver_lock) { /* Align size for mutex following port struct */ port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); @@ -347,11 +346,16 @@ static Port *create_port(char *name, p += sizeof(erts_mtx_t); state |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; } - if (erts_get_scheduler_data()) - runq = erts_get_runq_current(NULL); - else - runq = ERTS_RUNQ_IX(0); - erts_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq); + + { + ErtsRunQueue *runq; + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + if (esdp) + runq = erts_get_runq_current(esdp); + else + runq = ERTS_RUNQ_IX(0); + erts_init_runq_port(prt, runq); + } prt->xports = NULL; diff --git a/erts/etc/unix/etp-commands.in b/erts/etc/unix/etp-commands.in index 95e065b156..02ace9126c 100644 --- a/erts/etc/unix/etp-commands.in +++ b/erts/etc/unix/etp-commands.in @@ -1693,7 +1693,7 @@ define etp-proc-state-int printf "dirty-cpu-proc | " end if ($arg0 & 0x2000000) - printf "on-heap-msgq | " + printf "GARBAGE<0x2000000> | " end if ($arg0 & 0x1000000) printf "off-heap-msgq | " @@ -1717,7 +1717,7 @@ define etp-proc-state-int printf "trapping-exit | " end if ($arg0 & 0x40000) - printf "bound | " + printf "GARBAGE<0x40000> | " end if ($arg0 & 0x20000) printf "garbage-collecting | " |