diff options
author | Rickard Green <[email protected]> | 2013-12-07 00:19:05 +0100 |
---|---|---|
committer | Rickard Green <[email protected]> | 2013-12-07 00:19:05 +0100 |
commit | 9232f50d4cbe7b051dfd3e83625de0b22536c4c4 (patch) | |
tree | f3c52ccc719a9df7a0714de05e2d1e9dde700df0 /erts/emulator/beam | |
parent | 3c00452a81dfde57f85c882029186cfa3c0d348d (diff) | |
parent | 9f1f0bff7f98d62f8406e5ecd76f6eb7c1a66ff3 (diff) | |
download | otp-9232f50d4cbe7b051dfd3e83625de0b22536c4c4.tar.gz otp-9232f50d4cbe7b051dfd3e83625de0b22536c4c4.tar.bz2 otp-9232f50d4cbe7b051dfd3e83625de0b22536c4c4.zip |
Merge branch 'rickard/garbage_collect/OTP-11388'
* rickard/garbage_collect/OTP-11388:
Parallel check_process_code when code_server purge a module
Functionality for disabling garbage collection
Use asynchronous check_process_code in code_parallel_SUITE
Execution of system tasks in context of another process
Conflicts:
bootstrap/lib/kernel/ebin/hipe_unified_loader.beam
erts/preloaded/ebin/erlang.beam
erts/preloaded/ebin/erts_internal.beam
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r-- | erts/emulator/beam/atom.names | 1 | ||||
-rw-r--r-- | erts/emulator/beam/beam_bif_load.c | 145 | ||||
-rw-r--r-- | erts/emulator/beam/bif.c | 39 | ||||
-rw-r--r-- | erts/emulator/beam/bif.tab | 6 | ||||
-rw-r--r-- | erts/emulator/beam/break.c | 10 | ||||
-rw-r--r-- | erts/emulator/beam/erl_alloc.types | 2 | ||||
-rwxr-xr-x | erts/emulator/beam/erl_bif_info.c | 13 | ||||
-rw-r--r-- | erts/emulator/beam/erl_debug.c | 3 | ||||
-rw-r--r-- | erts/emulator/beam/erl_gc.c | 13 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.h | 5 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 1447 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 89 | ||||
-rwxr-xr-x | erts/emulator/beam/global.h | 9 | ||||
-rw-r--r-- | erts/emulator/beam/ops.tab | 14 | ||||
-rw-r--r-- | erts/emulator/beam/utils.c | 2 |
15 files changed, 1545 insertions, 253 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index e108e706c3..eee4badfb8 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -79,6 +79,7 @@ atom allocated_areas atom allocator atom allocator_sizes atom alloc_util_allocators +atom allow_gc atom allow_passive_connect atom already_loaded atom amd64 diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index 649594a334..3f92c5b025 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -36,7 +36,7 @@ #include "erl_thr_progress.h" static void set_default_trace_pattern(Eterm module); -static Eterm check_process_code(Process* rp, Module* modp); +static Eterm check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp); static void delete_code(Module* modp); static void decrement_refc(BeamInstr* code); static int is_native(BeamInstr* code); @@ -427,69 +427,82 @@ check_old_code_1(BIF_ALIST_1) } Eterm -check_process_code_2(BIF_ALIST_2) +erts_check_process_code(Process *c_p, Eterm module, int allow_gc, int *redsp) { - Process* rp; Module* modp; + Eterm res; + ErtsCodeIndex code_ix; - if (is_not_atom(BIF_ARG_2)) { - goto error; - } - if (is_internal_pid(BIF_ARG_1)) { - Eterm res; - ErtsCodeIndex code_ix; - - code_ix = erts_active_code_ix(); - modp = erts_get_module(BIF_ARG_2, code_ix); - if (modp == NULL) { /* Doesn't exist. */ - return am_false; - } - erts_rlock_old_code(code_ix); - if (modp->old.code == NULL) { /* No old code. */ - erts_runlock_old_code(code_ix); - return am_false; - } - erts_runlock_old_code(code_ix); - -#ifdef ERTS_SMP - rp = erts_pid2proc_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, - BIF_ARG_1, ERTS_PROC_LOCK_MAIN); -#else - rp = erts_pid2proc(BIF_P, 0, BIF_ARG_1, 0); -#endif - if (!rp) { - BIF_RET(am_false); - } - if (rp == ERTS_PROC_LOCK_BUSY) { - ERTS_BIF_YIELD2(bif_export[BIF_check_process_code_2], BIF_P, - BIF_ARG_1, BIF_ARG_2); - } - erts_rlock_old_code(code_ix); - if (modp->old.code != NULL) { /* must check again */ - res = check_process_code(rp, modp); - } - else { - res = am_false; - } - erts_runlock_old_code(code_ix); -#ifdef ERTS_SMP - if (BIF_P != rp) { - erts_resume(rp, ERTS_PROC_LOCK_MAIN); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); + (*redsp)++; + + ASSERT(is_atom(module)); + + code_ix = erts_active_code_ix(); + modp = erts_get_module(module, code_ix); + if (!modp) + return am_false; + erts_rlock_old_code(code_ix); + res = modp->old.code ? check_process_code(c_p, modp, allow_gc, redsp) : am_false; + erts_runlock_old_code(code_ix); + + return res; +} + +BIF_RETTYPE erts_internal_check_process_code_2(BIF_ALIST_2) +{ + int reds = 0; + Eterm res; + Eterm olist = BIF_ARG_2; + int allow_gc = 1; + + if (is_not_atom(BIF_ARG_1)) + goto badarg; + + while (is_list(olist)) { + Eterm *lp = list_val(olist); + Eterm opt = CAR(lp); + if (is_tuple(opt)) { + Eterm* tp = tuple_val(opt); + switch (arityval(tp[0])) { + case 2: + switch (tp[1]) { + case am_allow_gc: + switch (tp[2]) { + case am_false: + allow_gc = 0; + break; + case am_true: + allow_gc = 1; + break; + default: + goto badarg; + } + break; + default: + goto badarg; + } + break; + default: + goto badarg; + } } -#endif - BIF_RET(res); - } - else if (is_external_pid(BIF_ARG_1) - && external_pid_dist_entry(BIF_ARG_1) == erts_this_dist_entry) { - BIF_RET(am_false); + else + goto badarg; + olist = CDR(lp); } + if (is_not_nil(olist)) + goto badarg; + + res = erts_check_process_code(BIF_P, BIF_ARG_1, allow_gc, &reds); + + ASSERT(is_value(res)); + + BIF_RET2(res, reds); - error: +badarg: BIF_ERROR(BIF_P, BADARG); } - BIF_RETTYPE delete_module_1(BIF_ALIST_1) { ErtsCodeIndex code_ix; @@ -710,7 +723,7 @@ set_default_trace_pattern(Eterm module) } static Eterm -check_process_code(Process* rp, Module* modp) +check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp) { BeamInstr* start; char* mod_start; @@ -773,6 +786,16 @@ check_process_code(Process* rp, Module* modp) } } + if (rp->flags & F_DISABLE_GC) { + /* + * Cannot proceed. Process has disabled gc in order to + * safely leave inconsistent data on the heap and/or + * off heap lists. Need to wait for gc to be enabled + * again. + */ + return THE_NON_VALUE; + } + /* * See if there are funs that refer to the old version of the module. */ @@ -786,6 +809,8 @@ check_process_code(Process* rp, Module* modp) if (done_gc) { return am_true; } else { + if (!allow_gc) + return am_aborted; /* * Try to get rid of this fun by garbage collecting. * Clear both fvalue and ftrace to make sure they @@ -796,7 +821,7 @@ check_process_code(Process* rp, Module* modp) rp->ftrace = NIL; done_gc = 1; FLAGS(rp) |= F_NEED_FULLSWEEP; - (void) erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); + *redsp += erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); goto rescan; } } @@ -850,6 +875,9 @@ check_process_code(Process* rp, Module* modp) Uint lit_size; struct erl_off_heap_header* oh; + if (!allow_gc) + return am_aborted; + /* * Try to get rid of constants by by garbage collecting. * Clear both fvalue and ftrace. @@ -859,11 +887,12 @@ check_process_code(Process* rp, Module* modp) rp->ftrace = NIL; done_gc = 1; FLAGS(rp) |= F_NEED_FULLSWEEP; - (void) erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); + *redsp += erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); literals = (Eterm *) modp->old.code[MI_LITERALS_START]; lit_size = (Eterm *) modp->old.code[MI_LITERALS_END] - literals; oh = (struct erl_off_heap_header *) modp->old.code[MI_LITERALS_OFF_HEAP]; + *redsp += lit_size / 10; /* Need, better value... */ erts_garbage_collect_literals(rp, literals, lit_size, oh); } } diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 13d31285b2..96666d98ed 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -3764,45 +3764,6 @@ BIF_RETTYPE now_0(BIF_ALIST_0) /**********************************************************************/ -BIF_RETTYPE garbage_collect_1(BIF_ALIST_1) -{ - int reds; - Process *rp; - - if (is_not_pid(BIF_ARG_1)) { - BIF_ERROR(BIF_P, BADARG); - } - - if (BIF_P->common.id == BIF_ARG_1) - rp = BIF_P; - else { -#ifdef ERTS_SMP - rp = erts_pid2proc_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, - BIF_ARG_1, ERTS_PROC_LOCK_MAIN); - if (rp == ERTS_PROC_LOCK_BUSY) - ERTS_BIF_YIELD1(bif_export[BIF_garbage_collect_1], BIF_P, BIF_ARG_1); -#else - rp = erts_proc_lookup(BIF_ARG_1); -#endif - if (!rp) - BIF_RET(am_false); - } - - /* The GC cost is taken for the process executing this BIF. */ - - FLAGS(rp) |= F_NEED_FULLSWEEP; - reds = erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); - -#ifdef ERTS_SMP - if (BIF_P != rp) { - erts_resume(rp, ERTS_PROC_LOCK_MAIN); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); - } -#endif - - BIF_RET2(am_true, reds); -} - BIF_RETTYPE garbage_collect_0(BIF_ALIST_0) { int reds; diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index b6cce84cdd..dd50df636c 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -46,7 +46,6 @@ bif erlang:atom_to_list/1 bif erlang:binary_to_list/1 bif erlang:binary_to_list/3 bif erlang:binary_to_term/1 -bif erlang:check_process_code/2 bif erlang:crc32/1 bif erlang:crc32/2 bif erlang:crc32_combine/3 @@ -67,7 +66,6 @@ bif erlang:float_to_list/1 bif erlang:float_to_list/2 bif erlang:fun_info/2 bif erlang:garbage_collect/0 -bif erlang:garbage_collect/1 bif erlang:get/0 bif erlang:get/1 bif erlang:get_keys/1 @@ -155,6 +153,10 @@ bif erts_internal:port_control/3 bif erts_internal:port_close/1 bif erts_internal:port_connect/2 +bif erts_internal:request_system_task/3 +bif erts_internal:check_process_code/2 + + # inet_db support bif erlang:port_set_data/2 bif erlang:port_get_data/1 diff --git a/erts/emulator/beam/break.c b/erts/emulator/beam/break.c index b7e1092907..7d4f52ee23 100644 --- a/erts/emulator/beam/break.c +++ b/erts/emulator/beam/break.c @@ -112,10 +112,12 @@ process_killer(void) erts_smp_proc_lock(rp, rp_locks); state = erts_smp_atomic32_read_acqb(&rp->state); if (state & (ERTS_PSFLG_FREE - | ERTS_PSFLG_EXITING - | ERTS_PSFLG_ACTIVE - | ERTS_PSFLG_IN_RUNQ - | ERTS_PSFLG_RUNNING)) { + | ERTS_PSFLG_EXITING + | ERTS_PSFLG_ACTIVE + | ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_IN_RUNQ + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS)) { erts_printf("Can only kill WAITING processes this way\n"); } else { diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index bb5eba80be..32308fae9b 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -268,6 +268,8 @@ type PROC_INTERVAL LONG_LIVED SYSTEM process_interval type BUSY_CALLER_TAB SHORT_LIVED SYSTEM busy_caller_table type BUSY_CALLER SHORT_LIVED SYSTEM busy_caller type PORT_DATA_HEAP STANDARD SYSTEM port_data_heap +type PROC_SYS_TSK SHORT_LIVED PROCESSES proc_sys_task +type PROC_SYS_TSK_QS SHORT_LIVED PROCESSES proc_sys_task_queues +if threads_no_smp # Need thread safe allocs, but std_alloc and fix_alloc are not; diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index d7f1e2d971..8fa3aa29eb 100755 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -3603,6 +3603,19 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2) BIF_RET(am_true); } } + else if (ERTS_IS_ATOM_STR("gc_state", BIF_ARG_1)) { + /* Used by process_SUITE (emulator) */ + int res, enable; + + switch (BIF_ARG_2) { + case am_true: enable = 1; break; + case am_false: enable = 0; break; + default: BIF_ERROR(BIF_P, BADARG); break; + } + + res = erts_set_gc_state(BIF_P, enable); + BIF_RET(res ? am_true : am_false); + } else if (ERTS_IS_ATOM_STR("send_fake_exit_signal", BIF_ARG_1)) { /* Used by signal_SUITE (emulator) */ diff --git a/erts/emulator/beam/erl_debug.c b/erts/emulator/beam/erl_debug.c index b90d00f236..dc79d45be7 100644 --- a/erts/emulator/beam/erl_debug.c +++ b/erts/emulator/beam/erl_debug.c @@ -299,6 +299,9 @@ void erts_check_for_holes(Process* p) ErlHeapFragment* hf; Eterm* start; + if (p->flags & F_DISABLE_GC) + return; + start = p->last_htop ? p->last_htop : HEAP_START(p); check_memory(start, HEAP_TOP(p)); p->last_htop = HEAP_TOP(p); diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c index e89725c190..c5585d39e8 100644 --- a/erts/emulator/beam/erl_gc.c +++ b/erts/emulator/beam/erl_gc.c @@ -400,10 +400,16 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) Uint reclaimed_now = 0; int done = 0; Uint ms1, s1, us1; - ErtsSchedulerData *esdp = erts_get_scheduler_data(); + ErtsSchedulerData *esdp; #ifdef USE_VM_PROBES DTRACE_CHARBUF(pidbuf, DTRACE_TERM_BUF_SIZE); #endif + + if (p->flags & F_DISABLE_GC) + return 1; + + esdp = erts_get_scheduler_data(); + if (IS_TRACED_FL(p, F_TRACE_GC)) { trace_gc(p, am_gc_start); } @@ -532,6 +538,9 @@ erts_garbage_collect_hibernate(Process* p) Uint area_size; Sint offs; + if (p->flags & F_DISABLE_GC) + ERTS_INTERNAL_ERROR("GC disabled"); + /* * Preliminaries. */ @@ -667,6 +676,8 @@ erts_garbage_collect_literals(Process* p, Eterm* literals, Uint n; struct erl_off_heap_header** prev; + if (p->flags & F_DISABLE_GC) + return; /* * Set GC state. */ diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 771eba431f..0f3bb8d281 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -46,6 +46,11 @@ typedef struct erl_off_heap { Uint64 overhead; /* Administrative overhead (used to force GC). */ } ErlOffHeap; +#define ERTS_INIT_OFF_HEAP(OHP) \ + do { \ + (OHP)->first = NULL; \ + (OHP)->overhead = 0; \ + } while (0) #include "external.h" #include "erl_process.h" diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 1efd070afd..0a41fb596d 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -283,6 +283,40 @@ struct erts_system_profile_flags_t erts_system_profile_flags; #error "Need to store process_count in another type" #endif +typedef enum { + ERTS_PSTT_GC, /* Garbage Collect */ + ERTS_PSTT_CPC /* Check Process Code */ +} ErtsProcSysTaskType; + +#define ERTS_MAX_PROC_SYS_TASK_ARGS 2 + +struct ErtsProcSysTask_ { + ErtsProcSysTask *next; + ErtsProcSysTask *prev; + ErtsProcSysTaskType type; + Eterm requester; + Eterm reply_tag; + Eterm req_id; + Uint req_id_sz; + Eterm arg[ERTS_MAX_PROC_SYS_TASK_ARGS]; + ErlOffHeap off_heap; + Eterm heap[1]; +}; + +#define ERTS_PROC_SYS_TASK_SIZE(HSz) \ + (sizeof(ErtsProcSysTask) - sizeof(Eterm) + sizeof(Eterm)*(HSz)) + +struct ErtsProcSysTaskQs_ { + int qmask; + int ncount; + ErtsProcSysTask *q[ERTS_NO_PROC_PRIO_LEVELS]; +}; + +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(proc_sys_task_queues, + ErtsProcSysTaskQs, + 50, + ERTS_ALC_T_PROC_SYS_TSK_QS) + ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_op_list, ErtsMiscOpList, 10, @@ -353,6 +387,14 @@ static void aux_work_timeout_early_init(int no_schedulers); static void aux_work_timeout_late_init(void); static void setup_aux_work_timer(void); +static int execute_sys_tasks(Process *c_p, + erts_aint32_t *statep, + int in_reds); +static int cleanup_sys_tasks(Process *c_p, + erts_aint32_t in_state, + int in_reds); + + #if defined(DEBUG) || 0 #define ERTS_DBG_CHK_AUX_WORK_VAL(V) dbg_chk_aux_work_val((V)) static void @@ -471,6 +513,11 @@ erts_pre_init_process(void) erts_psd_required_locks[ERTS_PSD_CALL_TIME_BP].set_locks = ERTS_PSD_CALL_TIME_BP_SET_LOCKS; + erts_psd_required_locks[ERTS_PSD_DELAYED_GC_TASK_QS].get_locks + = ERTS_PSD_DELAYED_GC_TASK_QS_GET_LOCKS; + erts_psd_required_locks[ERTS_PSD_DELAYED_GC_TASK_QS].set_locks + = ERTS_PSD_DELAYED_GC_TASK_QS_SET_LOCKS; + /* Check that we have locks for all entries */ for (ix = 0; ix < ERTS_PSD_SIZE; ix++) { ERTS_SMP_LC_ASSERT(erts_psd_required_locks[ix].get_locks); @@ -2848,7 +2895,7 @@ dequeue_process(ErtsRunQueue *runq, int prio_q, erts_aint32_t *statep) if (statep) *statep = state; - prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state); rqi = &runq->procs.prio_info[prio]; @@ -2997,7 +3044,7 @@ immigrate(ErtsRunQueue *c_rq, ErtsMigrationPath *mp) erts_aint32_t state; state = erts_smp_atomic32_read_acqb(&proc->state); if (!(ERTS_PSFLG_BOUND & state) - && (prio == (int) (ERTS_PSFLG_PRIO_MASK & state))) { + && (prio == (int) ERTS_PSFLGS_GET_PRQ_PRIO(state))) { ErtsRunQueueInfo *rqi = &rq->procs.prio_info[prio]; unqueue_process(rq, rpq, rqi, prio, prev_proc, proc); erts_smp_runq_unlock(rq); @@ -3079,7 +3126,7 @@ schedule_bound_processes(ErtsRunQueue *rq, while (proc) { erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state); next = proc->next; - enqueue_process(rq, (int) (ERTS_PSFLG_PRIO_MASK & state), proc); + enqueue_process(rq, (int) ERTS_PSFLGS_GET_PRQ_PRIO(state), proc); proc = next; } } @@ -3184,7 +3231,7 @@ evacuate_run_queue(ErtsRunQueue *rq, sbpp->last = proc; } else { - int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + int prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state); erts_smp_runq_unlock(rq); to_rq = mp->prio[prio].runq; @@ -3257,7 +3304,7 @@ try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq, erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state); if (!(ERTS_PSFLG_BOUND & state)) { /* Steal process */ - int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + int prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state); ErtsRunQueueInfo *rqi = &vrq->procs.prio_info[prio]; unqueue_process(vrq, rpq, rqi, prio, prev_proc, proc); erts_smp_runq_unlock(vrq); @@ -4575,6 +4622,7 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online) #endif init_misc_op_list_alloc(); + init_proc_sys_task_queues_alloc(); #ifdef ERTS_SMP set_wakeup_other_data(); @@ -4858,46 +4906,166 @@ erts_get_scheduler_data(void) #endif +static Process * +make_proxy_proc(Process *prev_proxy, Process *proc, erts_aint32_t prio) +{ + erts_aint32_t state; + Process *proxy; +#ifdef ERTS_SMP + ErtsRunQueue *rq = RUNQ_READ_RQ(&proc->run_queue); +#endif + + state = (ERTS_PSFLG_PROXY + | ERTS_PSFLG_IN_RUNQ + | (((erts_aint32_t) 1) << (prio + ERTS_PSFLGS_IN_PRQ_MASK_OFFSET)) + | (prio << ERTS_PSFLGS_PRQ_PRIO_OFFSET) + | (prio << ERTS_PSFLGS_USR_PRIO_OFFSET) + | (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET)); + + if (prev_proxy) { + proxy = prev_proxy; + ASSERT(erts_smp_atomic32_read_nob(&proxy->state) & ERTS_PSFLG_PROXY); + erts_smp_atomic32_set_nob(&proxy->state, state); +#ifdef ERTS_SMP + RUNQ_SET_RQ(&proc->run_queue, rq); +#endif + } + else { + proxy = erts_alloc(ERTS_ALC_T_PROC, sizeof(Process)); +#ifdef DEBUG + { + int i; + Uint32 *ui32 = (Uint32 *) (char *) proxy; + for (i = 0; i < sizeof(Process)/sizeof(Uint32); i++) + ui32[i] = (Uint32) 0xdeadbeef; + } +#endif + erts_smp_atomic32_init_nob(&proxy->state, state); +#ifdef ERTS_SMP + erts_smp_atomic_init_nob(&proxy->run_queue, + erts_smp_atomic_read_nob(&proc->run_queue)); +#endif + } + + proxy->common.id = proc->common.id; + + return proxy; +} + +static ERTS_INLINE void +free_proxy_proc(Process *proxy) +{ + ASSERT(erts_smp_atomic32_read_nob(&proxy->state) & ERTS_PSFLG_PROXY); + erts_free(ERTS_ALC_T_PROC, proxy); +} + + +static ERTS_INLINE int +check_enqueue_in_prio_queue(erts_aint32_t *prq_prio_p, + erts_aint32_t *newp, + erts_aint32_t actual) +{ + erts_aint32_t aprio, qbit, max_qbit; + + aprio = (actual >> ERTS_PSFLGS_ACT_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK; + qbit = 1 << aprio; + + *prq_prio_p = aprio; + + max_qbit = (actual >> ERTS_PSFLGS_IN_PRQ_MASK_OFFSET) & ERTS_PSFLGS_QMASK; + max_qbit |= 1 << ERTS_PSFLGS_QMASK_BITS; + max_qbit &= -max_qbit; + /* + * max_qbit now either contain bit set for highest prio queue or a bit + * out of range (which will have a value larger than valid range). + */ + + if (qbit >= max_qbit) + return 0; /* Already queued in higher or equal prio */ + + /* Need to enqueue (if already enqueued, it is in lower prio) */ + *newp |= qbit << ERTS_PSFLGS_IN_PRQ_MASK_OFFSET; + + if ((actual & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLGS_USR_PRIO_MASK)) + != (aprio << ERTS_PSFLGS_USR_PRIO_OFFSET)) { + /* + * Process struct already enqueued, or actual prio not + * equal to user prio, i.e., enqueue using proxy. + */ + return -1; + } + + /* + * Enqueue using process struct. + */ + *newp &= ~ERTS_PSFLGS_PRQ_PRIO_MASK; + *newp |= ERTS_PSFLG_IN_RUNQ | (aprio << ERTS_PSFLGS_PRQ_PRIO_OFFSET); + return 1; +} + /* * scheduler_out_process() return with c_rq locked. */ static ERTS_INLINE int -schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p) +schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Process *proxy) { - erts_aint32_t a, e, n; + erts_aint32_t a, e, n, enq_prio = -1; int res = 0; + int enqueue; /* < 0 -> use proxy */ a = state; while (1) { n = e = a; - ASSERT(a & ERTS_PSFLG_RUNNING); - ASSERT(!(a & ERTS_PSFLG_IN_RUNQ)); + ASSERT(a & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS)); - n &= ~ERTS_PSFLG_RUNNING; - if ((a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) - n |= ERTS_PSFLG_IN_RUNQ; + enqueue = 0; + + n &= ~(ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS); + if (a & ERTS_PSFLG_ACTIVE_SYS + || (a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) { + enqueue = check_enqueue_in_prio_queue(&enq_prio, &n, a); + } a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); if (a == e) break; } - if (!(n & ERTS_PSFLG_IN_RUNQ)) { - if (erts_system_profile_flags.runnable_procs) - profile_runnable_proc(p, am_inactive); + if (!enqueue) { + + if (erts_system_profile_flags.runnable_procs) { + + if (!(a & ERTS_PSFLG_ACTIVE_SYS) + && (!(a & ERTS_PSFLG_ACTIVE) + || (a & ERTS_PSFLG_SUSPENDED))) { + /* Process inactive */ + profile_runnable_proc(p, am_inactive); + } + } + + if (proxy) + free_proxy_proc(proxy); } else { - int prio = (int) (ERTS_PSFLG_PRIO_MASK & n); + Process *sched_p; ErtsRunQueue *runq = erts_get_runq_proc(p); - ASSERT(!(n & ERTS_PSFLG_SUSPENDED)); + ASSERT(!(n & ERTS_PSFLG_SUSPENDED) || (n & ERTS_PSFLG_ACTIVE_SYS)); + + if (enqueue < 0) + sched_p = make_proxy_proc(proxy, p, enq_prio); + else { + sched_p = p; + if (proxy) + free_proxy_proc(proxy); + } #ifdef ERTS_SMP if (!(ERTS_PSFLG_BOUND & n)) { - ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio); + ErtsRunQueue *new_runq = erts_check_emigration_need(runq, enq_prio); if (new_runq) { - RUNQ_SET_RQ(&p->run_queue, new_runq); + RUNQ_SET_RQ(&sched_p->run_queue, new_runq); runq = new_runq; } } @@ -4908,7 +5076,7 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p) erts_smp_runq_lock(runq); /* Enqueue the process */ - enqueue_process(runq, prio, p); + enqueue_process(runq, (int) enq_prio, sched_p); if (runq == c_rq) return res; @@ -4920,14 +5088,13 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p) } static ERTS_INLINE void -add2runq(Process *p, erts_aint32_t state) +add2runq(Process *p, erts_aint32_t state, erts_aint32_t prio) { - int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); ErtsRunQueue *runq = erts_get_runq_proc(p); #ifdef ERTS_SMP if (!(ERTS_PSFLG_BOUND & state)) { - ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio); + ErtsRunQueue *new_runq = erts_check_emigration_need(runq, (int) prio); if (new_runq) { RUNQ_SET_RQ(&p->run_queue, new_runq); runq = new_runq; @@ -4939,101 +5106,236 @@ add2runq(Process *p, erts_aint32_t state) erts_smp_runq_lock(runq); /* Enqueue the process */ - enqueue_process(runq, prio, p); + enqueue_process(runq, (int) prio, p); erts_smp_runq_unlock(runq); smp_notify_inc_runq(runq); } -static ERTS_INLINE void -schedule_process(Process *p, erts_aint32_t state, int active_enq) +static ERTS_INLINE int +change_proc_schedule_state(Process *p, + erts_aint32_t clear_state_flags, + erts_aint32_t set_state_flags, + erts_aint32_t *statep, + erts_aint32_t *enq_prio_p) { - erts_aint32_t a = state, n; + /* + * NOTE: ERTS_PSFLG_RUNNING, ERTS_PSFLG_RUNNING_SYS and + * ERTS_PSFLG_ACTIVE_SYS are not allowed to be + * altered by this function! + */ + erts_aint32_t a = *statep, n; + int enqueue; /* < 0 -> use proxy */ + + ASSERT(!(a & ERTS_PSFLG_PROXY)); + ASSERT((clear_state_flags & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_ACTIVE_SYS)) == 0); + ASSERT((set_state_flags & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_ACTIVE_SYS)) == 0); while (1) { erts_aint32_t e; n = e = a; + enqueue = 0; + if (a & ERTS_PSFLG_FREE) - return; /* We don't want to schedule free processes... */ - n |= ERTS_PSFLG_ACTIVE; - if (!(a & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_RUNNING))) - n |= ERTS_PSFLG_IN_RUNQ; + break; /* We don't want to schedule free processes... */ + + if (clear_state_flags) + n &= ~clear_state_flags; + + if (set_state_flags) + n |= set_state_flags; + + if ((n & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_IN_RUNQ + | ERTS_PSFLG_ACTIVE)) == ERTS_PSFLG_ACTIVE) { + /* + * Active and seemingly need to be enqueued, but + * process may be in a run queue via proxy, need + * further inspection... + */ + enqueue = check_enqueue_in_prio_queue(enq_prio_p, &n, a); + } + a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); if (a == e) break; - if (!active_enq && (a & ERTS_PSFLG_ACTIVE)) - return; /* Someone else activated process ... */ + if (enqueue == 0 && n == a) + break; } - if (erts_system_profile_flags.runnable_procs - && !(a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) { - profile_runnable_proc(p, am_active); + if (erts_system_profile_flags.runnable_procs) { + + if (((n & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_ACTIVE)) == ERTS_PSFLG_ACTIVE) + && (!(a & (ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS) + && (!(a & ERTS_PSFLG_ACTIVE) + || (a & ERTS_PSFLG_SUSPENDED))))) { + /* We activated a prevously inactive process */ + profile_runnable_proc(p, am_active); + } + } - if ((n & ERTS_PSFLG_IN_RUNQ) && !(a & ERTS_PSFLG_IN_RUNQ)) - add2runq(p, n); + *statep = a; + + return enqueue; +} + +static ERTS_INLINE void +schedule_process(Process *p, erts_aint32_t in_state) +{ + erts_aint32_t enq_prio = -1; + erts_aint32_t state = in_state; + int enqueue = change_proc_schedule_state(p, + 0, + ERTS_PSFLG_ACTIVE, + &state, + &enq_prio); + if (enqueue) + add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), + state, + enq_prio); } void erts_schedule_process(Process *p, erts_aint32_t state) { - schedule_process(p, state, 0); + schedule_process(p, state); +} + +static void +schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy) +{ + erts_aint32_t a = state, n, enq_prio = -1; + int enqueue; /* < 0 -> use proxy */ + + ASSERT(!(state & ERTS_PSFLG_PROXY)); + + while (1) { + erts_aint32_t e; + n = e = a; + + if (a & ERTS_PSFLG_FREE) + return; /* We don't want to schedule free processes... */ + + enqueue = 0; + n |= ERTS_PSFLG_ACTIVE_SYS; + if (!(a & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS))) + enqueue = check_enqueue_in_prio_queue(&enq_prio, &n, a); + a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); + if (a == e) + break; + if (a == n && !enqueue) + goto cleanup; + } + + if (erts_system_profile_flags.runnable_procs) { + + if (!(a & (ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS)) + && (!(a & ERTS_PSFLG_ACTIVE) || (a & ERTS_PSFLG_SUSPENDED))) { + /* We activated a prevously inactive process */ + profile_runnable_proc(p, am_active); + } + + } + + if (enqueue) { + Process *sched_p; + if (enqueue > 0) + sched_p = p; + else { + sched_p = make_proxy_proc(proxy, p, enq_prio); + proxy = NULL; + } + add2runq(sched_p, n, enq_prio); + } + +cleanup: + if (proxy) + free_proxy_proc(proxy); } static ERTS_INLINE int suspend_process(Process *c_p, Process *p) { - erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); + erts_aint32_t state; int suspended = 0; ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); + state = erts_smp_atomic32_read_acqb(&p->state); + if ((state & ERTS_PSFLG_SUSPENDED)) suspended = -1; else { if (c_p == p) { state = erts_smp_atomic32_read_bor_relb(&p->state, ERTS_PSFLG_SUSPENDED); - state |= ERTS_PSFLG_SUSPENDED; ASSERT(state & ERTS_PSFLG_RUNNING); - suspended = 1; + suspended = (state & ERTS_PSFLG_SUSPENDED) ? -1: 1; } else { while (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_EXITING))) { - erts_aint32_t e, n; + erts_aint32_t n, e; + n = e = state; n |= ERTS_PSFLG_SUSPENDED; state = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e); if (state == e) { - state = n; suspended = 1; break; } + if (state & ERTS_PSFLG_SUSPENDED) { + suspended = -1; + break; + } } } } - if (state & ERTS_PSFLG_SUSPENDED) { + if (suspended) { ASSERT(!(ERTS_PSFLG_RUNNING & state) || p == erts_get_current_process()); - if (erts_system_profile_flags.runnable_procs - && (p->rcount == 0) - && (state & ERTS_PSFLG_ACTIVE)) { - profile_runnable_proc(p, am_inactive); + if (suspended > 0 && erts_system_profile_flags.runnable_procs) { + + /* 'state' is before our change... */ + + if ((state & (ERTS_PSFLG_ACTIVE + | ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) { + /* We made process inactive */ + profile_runnable_proc(p, am_inactive); + } + } p->rcount++; /* count number of suspend */ } + return suspended; } static ERTS_INLINE void resume_process(Process *p) { - erts_aint32_t state; + erts_aint32_t state, enq_prio = -1; + int enqueue; + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); ASSERT(p->rcount > 0); @@ -5041,14 +5343,16 @@ resume_process(Process *p) if (--p->rcount > 0) /* multiple suspend */ return; - state = erts_smp_atomic32_read_band_mb(&p->state, ~ERTS_PSFLG_SUSPENDED); - state &= ~ERTS_PSFLG_SUSPENDED; - if ((state & (ERTS_PSFLG_EXITING - | ERTS_PSFLG_ACTIVE - | ERTS_PSFLG_IN_RUNQ - | ERTS_PSFLG_RUNNING)) == ERTS_PSFLG_ACTIVE) { - schedule_process(p, state, 1); - } + state = erts_smp_atomic32_read_nob(&p->state); + enqueue = change_proc_schedule_state(p, + ERTS_PSFLG_SUSPENDED, + 0, + &state, + &enq_prio); + if (enqueue) + add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), + state, + enq_prio); } int @@ -6032,7 +6336,8 @@ pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks, goto done; } else { - if (!(ERTS_PSFLG_RUNNING & erts_smp_atomic32_read_acqb(&rp->state))) + if (!((ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS) + & erts_smp_atomic32_read_acqb(&rp->state))) goto done; } @@ -6695,7 +7000,7 @@ Eterm erts_get_process_priority(Process *p) { erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); - switch (state & ERTS_PSFLG_PRIO_MASK) { + switch (ERTS_PSFLGS_GET_USR_PRIO(state)) { case PRIORITY_MAX: return am_max; case PRIORITY_HIGH: return am_high; case PRIORITY_NORMAL: return am_normal; @@ -6718,18 +7023,68 @@ erts_set_process_priority(Process *p, Eterm value) } a = erts_smp_atomic32_read_nob(&p->state); - if (nprio == (a & ERTS_PSFLG_PRIO_MASK)) + if (nprio == ERTS_PSFLGS_GET_USR_PRIO(a)) oprio = nprio; else { - erts_aint32_t e, n; + int slocked = 0; + erts_aint32_t e, n, aprio; + + if (a & ERTS_PSFLG_ACTIVE_SYS) { + erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); + slocked = 1; + } + do { - oprio = a & ERTS_PSFLG_PRIO_MASK; + oprio = ERTS_PSFLGS_GET_USR_PRIO(a); n = e = a; - ASSERT(!(a & ERTS_PSFLG_IN_RUNQ)); + if (!(a & (ERTS_PSFLG_ACTIVE_SYS|ERTS_PSFLG_DELAYED_SYS))) + aprio = nprio; + else { + int max_qbit; + + if (!slocked) { + erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); + slocked = 1; + } + + max_qbit = 0; + if (a & ERTS_PSFLG_ACTIVE_SYS) + max_qbit |= p->sys_task_qs->qmask; + if (a & ERTS_PSFLG_DELAYED_SYS) { + ErtsProcSysTaskQs *qs; + qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(p); + ASSERT(qs); + max_qbit |= qs->qmask; + } + max_qbit &= -max_qbit; + switch (max_qbit) { + case MAX_BIT: + aprio = PRIORITY_MAX; + break; + case HIGH_BIT: + aprio = PRIORITY_HIGH; + break; + case NORMAL_BIT: + aprio = PRIORITY_NORMAL; + break; + case LOW_BIT: + aprio = PRIORITY_LOW; + break; + default: + ERTS_INTERNAL_ERROR("Invalid qmask"); + aprio = -1; + } + + if (aprio > nprio) /* low value -> high prio */ + aprio = nprio; + } + + n &= ~(ERTS_PSFLGS_USR_PRIO_MASK + | ERTS_PSFLGS_ACT_PRIO_MASK); + n |= ((nprio << ERTS_PSFLGS_USR_PRIO_OFFSET) + | (aprio << ERTS_PSFLGS_ACT_PRIO_OFFSET)); - n &= ~ERTS_PSFLG_PRIO_MASK; - n |= nprio; a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); } while (a != e); } @@ -6763,6 +7118,7 @@ erts_set_process_priority(Process *p, Eterm value) Process *schedule(Process *p, int calls) { + Process *proxy_p = NULL; ErtsRunQueue *rq; erts_aint_t dt; ErtsSchedulerData *esdp; @@ -6805,6 +7161,8 @@ Process *schedule(Process *p, int calls) actual_reds = reds = 0; erts_smp_runq_lock(rq); } else { + sched_out_proc: + #ifdef ERTS_SMP ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); esdp = p->scheduler_data; @@ -6858,10 +7216,11 @@ Process *schedule(Process *p, int calls) esdp->reductions += reds; - schedule_out_process(rq, state, p); /* Returns with rq locked! */ + schedule_out_process(rq, state, p, proxy_p); /* Returns with rq locked! */ + proxy_p = NULL; ERTS_PROC_REDUCTIONS_EXECUTED(rq, - (int) (state & ERTS_PSFLG_PRIO_MASK), + (int) ERTS_PSFLGS_GET_USR_PRIO(state), reds, actual_reds); @@ -6870,18 +7229,19 @@ Process *schedule(Process *p, int calls) p->scheduler_data = NULL; #endif + erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); if (state & ERTS_PSFLG_FREE) { #ifdef ERTS_SMP ASSERT(esdp->free_process == p); esdp->free_process = NULL; -#else - erts_free_proc(p); +#else + state = erts_smp_atomic32_read_nob(&p->state); + if (!(state & ERTS_PSFLG_IN_RUNQ)) + erts_free_proc(p); #endif } - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); - #ifdef ERTS_SMP ASSERT(!esdp->free_process); #endif @@ -7088,6 +7448,8 @@ Process *schedule(Process *p, int calls) * Find a new process to run. */ pick_next_process: { + erts_aint32_t psflg_band_mask; + erts_aint32_t running_flag; int prio_q; int qmask; @@ -7121,18 +7483,62 @@ Process *schedule(Process *p, int calls) ASSERT(p); /* Wrong qmask in rq->flags? */ + psflg_band_mask = ~(((erts_aint32_t) 1) << (ERTS_PSFLGS_GET_PRQ_PRIO(state) + + ERTS_PSFLGS_IN_PRQ_MASK_OFFSET)); + + if (!(state & ERTS_PSFLG_PROXY)) + psflg_band_mask &= ~ERTS_PSFLG_IN_RUNQ; + else { + proxy_p = p; + p = erts_proc_lookup_raw(proxy_p->common.id); + if (!p) { + free_proxy_proc(proxy_p); + proxy_p = NULL; + goto pick_next_process; + } + state = erts_smp_atomic32_read_nob(&p->state); + } + + + if (state & ERTS_PSFLG_ACTIVE_SYS) + running_flag = ERTS_PSFLG_RUNNING_SYS; + else + running_flag = ERTS_PSFLG_RUNNING; + while (1) { erts_aint32_t exp, new, tmp; tmp = new = exp = state; - new &= ~ERTS_PSFLG_IN_RUNQ; - tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); - if (tmp != ERTS_PSFLG_SUSPENDED) - new |= ERTS_PSFLG_RUNNING; + new &= psflg_band_mask; + if (!(state & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS))) { + tmp = state & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_PENDING_EXIT + | ERTS_PSFLG_ACTIVE_SYS); + if (tmp != ERTS_PSFLG_SUSPENDED) + new |= running_flag; + } state = erts_smp_atomic32_cmpxchg_relb(&p->state, new, exp); if (state == exp) { - tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); - if (tmp == ERTS_PSFLG_SUSPENDED) + if ((state & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_FREE)) + || ((state & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_PENDING_EXIT + | ERTS_PSFLG_ACTIVE_SYS)) + == ERTS_PSFLG_SUSPENDED)) { + if (state & ERTS_PSFLG_FREE) { +#ifdef ERTS_SMP + erts_smp_proc_dec_refc(p); +#else + erts_free_proc(p); +#endif + } + if (proxy_p) { + free_proxy_proc(proxy_p); + proxy_p = NULL; + } goto pick_next_process; + } state = new; break; } @@ -7162,7 +7568,7 @@ Process *schedule(Process *p, int calls) (UWord) esdp->no); int migrated = old && old != esdp->no; - prio = (int) (state & ERTS_PSFLG_PRIO_MASK); + prio = (int) ERTS_PSFLGS_GET_USR_PRIO(state); erts_smp_spin_lock(&erts_sched_stat.lock); erts_sched_stat.prio[prio].total_executed++; @@ -7182,9 +7588,6 @@ Process *schedule(Process *p, int calls) ASSERT(!p->scheduler_data); p->scheduler_data = esdp; #endif - /* Never run a suspended process */ - ASSERT(!(ERTS_PSFLG_SUSPENDED & erts_smp_atomic32_read_nob(&p->state))); - reds = context_reds; if (IS_TRACED(p)) { @@ -7210,21 +7613,783 @@ Process *schedule(Process *p, int calls) erts_check_my_tracer_proc(p); #endif + if (state & ERTS_PSFLG_RUNNING_SYS) { + reds -= execute_sys_tasks(p, &state, reds); + if (reds <= 0) { + p->fcalls = reds; + goto sched_out_proc; + } + + ASSERT(state & ERTS_PSFLG_RUNNING_SYS); + ASSERT(!(state & ERTS_PSFLG_RUNNING)); + + while (1) { + erts_aint32_t n, e; + + if (((state & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_ACTIVE)) != ERTS_PSFLG_ACTIVE) + && !(state & ERTS_PSFLG_EXITING)) + goto sched_out_proc; + + n = e = state; + n &= ~ERTS_PSFLG_RUNNING_SYS; + n |= ERTS_PSFLG_RUNNING; + + state = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); + if (state == e) { + state = n; + break; + } + + ASSERT(state & ERTS_PSFLG_RUNNING_SYS); + ASSERT(!(state & ERTS_PSFLG_RUNNING)); + } + } + if (!(state & ERTS_PSFLG_EXITING) && ((FLAGS(p) & F_FORCE_GC) || (MSO(p).overhead > BIN_VHEAP_SZ(p)))) { reds -= erts_garbage_collect(p, 0, p->arg_reg, p->arity); - if (reds < 0) { - reds = 1; + if (reds <= 0) { + p->fcalls = reds; + goto sched_out_proc; } } + + if (proxy_p) { + free_proxy_proc(proxy_p); + proxy_p = NULL; + } p->fcalls = reds; ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); + + /* Never run a suspended process */ + ASSERT(!(ERTS_PSFLG_SUSPENDED & erts_smp_atomic32_read_nob(&p->state))); + return p; } } +static int +notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, Eterm st_result) +{ + Process *rp = erts_proc_lookup(st->requester); + if (rp) { + ErtsProcLocks rp_locks; + ErlOffHeap *ohp; + ErlHeapFragment* bp; + Eterm *hp, msg, req_id, result; + Uint st_result_sz, hsz; +#ifdef DEBUG + Eterm *hp_start; +#endif + + rp_locks = (c_p == rp) ? ERTS_PROC_LOCK_MAIN : 0; + + st_result_sz = is_immed(st_result) ? 0 : size_object(st_result); + hsz = st->req_id_sz + st_result_sz + 4 /* 3-tuple */; + + hp = erts_alloc_message_heap(hsz, + &bp, + &ohp, + rp, + &rp_locks); + +#ifdef DEBUG + hp_start = hp; +#endif + + req_id = st->req_id_sz == 0 ? st->req_id : copy_struct(st->req_id, + st->req_id_sz, + &hp, + ohp); + + result = st_result_sz == 0 ? st_result : copy_struct(st_result, + st_result_sz, + &hp, + ohp); + + ASSERT(is_immed(st->reply_tag)); + + msg = TUPLE3(hp, st->reply_tag, req_id, result); + +#ifdef DEBUG + hp += 4; + ASSERT(hp_start + hsz == hp); +#endif + + erts_queue_message(rp, + &rp_locks, + bp, + msg, + NIL +#ifdef USE_VM_PROBES + , NIL +#endif + ); + + if (c_p == rp) + rp_locks &= ~ERTS_PROC_LOCK_MAIN; + + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); + } + + erts_cleanup_offheap(&st->off_heap); + + erts_free(ERTS_ALC_T_PROC_SYS_TSK, st); + + return rp ? 1 : 0; +} + +static ERTS_INLINE ErtsProcSysTask * +fetch_sys_task(Process *c_p, erts_aint32_t state, int *qmaskp, int *priop) +{ + ErtsProcSysTaskQs *unused_qs = NULL; + int qbit, qmask; + ErtsProcSysTask *st, **qp; + + *priop = -1; /* Shut up annoying erroneous warning */ + + erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); + + if (!c_p->sys_task_qs) { + qmask = 0; + st = NULL; + goto update_state; + } + + qmask = c_p->sys_task_qs->qmask; + + if ((state & (ERTS_PSFLG_ACTIVE + | ERTS_PSFLG_EXITING + | ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) { + /* No sys tasks if we got exclusively higher prio user work to do */ + st = NULL; + switch (ERTS_PSFLGS_GET_USR_PRIO(state)) { + case PRIORITY_MAX: + if (!(qmask & MAX_BIT)) + goto done; + break; + case PRIORITY_HIGH: + if (!(qmask & (MAX_BIT|HIGH_BIT))) + goto done; + break; + default: + break; + } + } + + qbit = qmask & -qmask; + switch (qbit) { + case MAX_BIT: + qp = &c_p->sys_task_qs->q[PRIORITY_MAX]; + *priop = PRIORITY_MAX; + break; + case HIGH_BIT: + qp = &c_p->sys_task_qs->q[PRIORITY_HIGH]; + *priop = PRIORITY_HIGH; + break; + case NORMAL_BIT: + if (!(qmask & PRIORITY_LOW) + || ++c_p->sys_task_qs->ncount <= RESCHEDULE_LOW) { + qp = &c_p->sys_task_qs->q[PRIORITY_NORMAL]; + *priop = PRIORITY_NORMAL; + break; + } + c_p->sys_task_qs->ncount = 0; + /* Fall through */ + case LOW_BIT: + qp = &c_p->sys_task_qs->q[PRIORITY_LOW]; + *priop = PRIORITY_LOW; + break; + default: + ERTS_INTERNAL_ERROR("Invalid qmask"); + } + + st = *qp; + ASSERT(st); + if (st->next != st) { + *qp = st->next; + st->next->prev = st->prev; + st->prev->next = st->next; + } + else { + erts_aint32_t a, e, n, st_prio, qmask2; + + *qp = NULL; + qmask &= ~qbit; + c_p->sys_task_qs->qmask = qmask; + + update_state: + + qmask2 = qmask; + + if (state & ERTS_PSFLG_DELAYED_SYS) { + ErtsProcSysTaskQs *qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(c_p); + ASSERT(qs); + qmask2 |= qs->qmask; + } + + switch (qmask2 & -qmask2) { + case MAX_BIT: + st_prio = PRIORITY_MAX; + break; + case HIGH_BIT: + st_prio = PRIORITY_HIGH; + break; + case NORMAL_BIT: + st_prio = PRIORITY_NORMAL; + break; + case LOW_BIT: + case 0: + st_prio = PRIORITY_LOW; + break; + default: + ERTS_INTERNAL_ERROR("Invalid qmask"); + } + + if (!qmask) { + unused_qs = c_p->sys_task_qs; + c_p->sys_task_qs = NULL; + } + + a = state; + do { + erts_aint32_t prio = ERTS_PSFLGS_GET_USR_PRIO(a); + + if (prio > st_prio) + prio = st_prio; + + n = e = a; + + n &= ~ERTS_PSFLGS_ACT_PRIO_MASK; + n |= (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET); + + if (!qmask) + n &= ~ERTS_PSFLG_ACTIVE_SYS; + + if (a == n) + break; + a = erts_smp_atomic32_cmpxchg_nob(&c_p->state, n, e); + } while (a != e); + } + +done: + + erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS); + + if (unused_qs) + proc_sys_task_queues_free(unused_qs); + + *qmaskp = qmask; + + return st; +} + +static void save_gc_task(Process *c_p, ErtsProcSysTask *st, int prio); + +static int +execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) +{ + int garbage_collected = 0; + erts_aint32_t state = *statep; + int max_reds = in_reds; + int reds = 0; + int qmask = 0; + + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); + + do { + ErtsProcSysTask *st; + int st_prio; + Eterm st_res; + + if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { +#ifdef ERTS_SMP + if (state & ERTS_PSFLG_PENDING_EXIT) + erts_handle_pending_exit(c_p, ERTS_PROC_LOCK_MAIN); +#endif + ASSERT(ERTS_PROC_IS_EXITING(c_p)); + break; + } + + st = fetch_sys_task(c_p, state, &qmask, &st_prio); + if (!st) + break; + + switch (st->type) { + case ERTS_PSTT_GC: + if (c_p->flags & F_DISABLE_GC) { + save_gc_task(c_p, st, st_prio); + st = NULL; + reds++; + } + else { + if (!garbage_collected) { + FLAGS(c_p) |= F_NEED_FULLSWEEP; + reds += erts_garbage_collect(c_p, + 0, + c_p->arg_reg, + c_p->arity); + garbage_collected = 1; + } + st_res = am_true; + } + break; + case ERTS_PSTT_CPC: + st_res = erts_check_process_code(c_p, + st->arg[0], + st->arg[1] == am_true, + &reds); + if (is_non_value(st_res)) { + /* Needed gc, but gc was disabled */ + save_gc_task(c_p, st, st_prio); + st = NULL; + } + break; + default: + ERTS_INTERNAL_ERROR("Invalid process sys task type"); + st_res = am_false; + } + + if (st) + reds += notify_sys_task_executed(c_p, st, st_res); + + state = erts_smp_atomic32_read_acqb(&c_p->state); + } while (qmask && reds < max_reds); + + *statep = state; + + return reds; +} + +static int +cleanup_sys_tasks(Process *c_p, erts_aint32_t in_state, int in_reds) +{ + erts_aint32_t state = in_state; + int max_reds = in_reds; + int reds = 0; + int qmask = 0; + + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); + + do { + ErtsProcSysTask *st; + Eterm st_res; + int st_prio; + + st = fetch_sys_task(c_p, state, &qmask, &st_prio); + if (!st) + break; + + switch (st->type) { + case ERTS_PSTT_GC: + st_res = am_false; + break; + case ERTS_PSTT_CPC: + st_res = am_false; + break; + default: + ERTS_INTERNAL_ERROR("Invalid process sys task type"); + st_res = am_false; + break; + } + + reds += notify_sys_task_executed(c_p, st, st_res); + + state = erts_smp_atomic32_read_acqb(&c_p->state); + } while (qmask && reds < max_reds); + + return reds; +} + +BIF_RETTYPE +erts_internal_request_system_task_3(BIF_ALIST_3) +{ + Process *rp = erts_proc_lookup(BIF_ARG_1); + ErtsProcSysTaskQs *stqs, *free_stqs = NULL; + ErtsProcSysTask *st = NULL; + erts_aint32_t prio, rp_state; + int rp_locked; + Eterm noproc_res, req_type; + + if (!rp && !is_internal_pid(BIF_ARG_1)) { + if (!is_external_pid(BIF_ARG_1)) + goto badarg; + if (external_pid_dist_entry(BIF_ARG_1) != erts_this_dist_entry) + goto badarg; + } + + switch (BIF_ARG_2) { + case am_max: prio = PRIORITY_MAX; break; + case am_high: prio = PRIORITY_HIGH; break; + case am_normal: prio = PRIORITY_NORMAL; break; + case am_low: prio = PRIORITY_LOW; break; + default: goto badarg; + } + + if (is_not_tuple(BIF_ARG_3)) + goto badarg; + else { + int i; + Eterm *tp = tuple_val(BIF_ARG_3); + Uint arity = arityval(*tp); + Eterm req_id; + Uint req_id_sz; + Eterm arg[ERTS_MAX_PROC_SYS_TASK_ARGS]; + Uint arg_sz[ERTS_MAX_PROC_SYS_TASK_ARGS]; + Uint tot_sz; + Eterm *hp; + + if (arity < 2) + goto badarg; + if (arity > 2 + ERTS_MAX_PROC_SYS_TASK_ARGS) + goto badarg; + req_type = tp[1]; + req_id = tp[2]; + req_id_sz = is_immed(req_id) ? req_id : size_object(req_id); + tot_sz = req_id_sz; + for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++) { + int tix = 3 + i; + if (tix > arity) { + arg[i] = THE_NON_VALUE; + arg_sz[i] = 0; + } + else { + arg[i] = tp[tix]; + if (is_immed(arg[i])) + arg_sz[i] = 0; + else { + arg_sz[i] = size_object(arg[i]); + tot_sz += arg_sz[i]; + } + } + } + st = erts_alloc(ERTS_ALC_T_PROC_SYS_TSK, + ERTS_PROC_SYS_TASK_SIZE(tot_sz)); + st->next = st->prev = st; /* Prep for empty prio queue */ + ERTS_INIT_OFF_HEAP(&st->off_heap); + hp = &st->heap[0]; + + st->requester = BIF_P->common.id; + st->reply_tag = req_type; + st->req_id_sz = req_id_sz; + st->req_id = req_id_sz == 0 ? req_id : copy_struct(req_id, + req_id_sz, + &hp, + &st->off_heap); + + for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++) + st->arg[i] = arg_sz[i] == 0 ? arg[i] : copy_struct(arg[i], + arg_sz[i], + &hp, + &st->off_heap); + ASSERT(&st->heap[0] + tot_sz == hp); + } + + switch (req_type) { + + case am_garbage_collect: + st->type = ERTS_PSTT_GC; + noproc_res = am_false; + if (!rp) + goto noproc; + break; + + case am_check_process_code: + if (is_not_atom(st->arg[0])) + goto badarg; + if (st->arg[1] != am_true && st->arg[1] != am_false) + goto badarg; + noproc_res = am_false; + st->type = ERTS_PSTT_CPC; + if (!rp) + goto noproc; + break; + + default: + goto badarg; + } + + rp_state = erts_smp_atomic32_read_nob(&rp->state); + + rp_locked = 0; + + free_stqs = NULL; + if (rp_state & ERTS_PSFLG_ACTIVE_SYS) + stqs = NULL; + else { + alloc_qs: + stqs = proc_sys_task_queues_alloc(); + stqs->qmask = 1 << prio; + stqs->ncount = 0; + stqs->q[PRIORITY_MAX] = NULL; + stqs->q[PRIORITY_HIGH] = NULL; + stqs->q[PRIORITY_NORMAL] = NULL; + stqs->q[PRIORITY_LOW] = NULL; + stqs->q[prio] = st; + } + + if (!rp_locked) { + rp_locked = 1; + erts_smp_proc_lock(rp, ERTS_PROC_LOCK_STATUS); + + rp_state = erts_smp_atomic32_read_nob(&rp->state); + if (rp_state & ERTS_PSFLG_EXITING) { + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); + rp = NULL; + free_stqs = stqs; + goto noproc; + } + } + + if (!rp->sys_task_qs) { + if (stqs) + rp->sys_task_qs = stqs; + else + goto alloc_qs; + } + else { + if (stqs) + free_stqs = stqs; + stqs = rp->sys_task_qs; + if (!stqs->q[prio]) { + stqs->q[prio] = st; + stqs->qmask |= 1 << prio; + } + else { + st->next = stqs->q[prio]; + st->prev = stqs->q[prio]->prev; + st->next->prev = st; + st->prev->next = st; + ASSERT(stqs->qmask & (1 << prio)); + } + } + + if (ERTS_PSFLGS_GET_ACT_PRIO(rp_state) > prio) { + erts_aint32_t n, a, e; + /* Need to elevate actual prio */ + + a = rp_state; + do { + if (ERTS_PSFLGS_GET_ACT_PRIO(a) <= prio) { + n = a; + break; + } + n = e = a; + n &= ~ERTS_PSFLGS_ACT_PRIO_MASK; + n |= (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET); + a = erts_smp_atomic32_cmpxchg_nob(&rp->state, n, e); + } while (a != e); + rp_state = n; + } + + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); + + schedule_process_sys_task(rp, rp_state, NULL); + + if (free_stqs) + proc_sys_task_queues_free(free_stqs); + + BIF_RET(am_ok); + +noproc: + + notify_sys_task_executed(BIF_P, st, noproc_res); + if (free_stqs) + proc_sys_task_queues_free(free_stqs); + BIF_RET(am_ok); + +badarg: + + if (st) { + erts_cleanup_offheap(&st->off_heap); + erts_free(ERTS_ALC_T_PROC_SYS_TSK, st); + } + if (free_stqs) + proc_sys_task_queues_free(free_stqs); + BIF_ERROR(BIF_P, BADARG); +} + +static void +save_gc_task(Process *c_p, ErtsProcSysTask *st, int prio) +{ + erts_aint32_t state; + ErtsProcSysTaskQs *qs; + + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); + + qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(c_p); + if (!qs) { + st->next = st->prev = st; + qs = proc_sys_task_queues_alloc(); + qs->qmask = 1 << prio; + qs->ncount = 0; + qs->q[PRIORITY_MAX] = NULL; + qs->q[PRIORITY_HIGH] = NULL; + qs->q[PRIORITY_NORMAL] = NULL; + qs->q[PRIORITY_LOW] = NULL; + qs->q[prio] = st; + (void) ERTS_PROC_SET_DELAYED_GC_TASK_QS(c_p, ERTS_PROC_LOCK_MAIN, qs); + } + else { + if (!qs->q[prio]) { + st->next = st->prev = st; + qs->q[prio] = st; + qs->qmask |= 1 << prio; + } + else { + st->next = qs->q[prio]; + st->prev = qs->q[prio]->prev; + st->next->prev = st; + st->prev->next = st; + ASSERT(qs->qmask & (1 << prio)); + } + } + + state = erts_smp_atomic32_read_nob(&c_p->state); + ASSERT((ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS) & state); + + while (!(state & ERTS_PSFLG_DELAYED_SYS) + || prio < ERTS_PSFLGS_GET_ACT_PRIO(state)) { + erts_aint32_t n, e; + + n = e = state; + n |= ERTS_PSFLG_DELAYED_SYS; + if (prio < ERTS_PSFLGS_GET_ACT_PRIO(state)) { + n &= ~ERTS_PSFLGS_ACT_PRIO_MASK; + n |= prio << ERTS_PSFLGS_ACT_PRIO_OFFSET; + } + state = erts_smp_atomic32_cmpxchg_relb(&c_p->state, n, e); + if (state == e) + break; + } +} + +int +erts_set_gc_state(Process *c_p, int enable) +{ + int res; + ErtsProcSysTaskQs *dgc_tsk_qs; + ASSERT(c_p == erts_get_current_process()); + ASSERT((ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS) + & erts_smp_atomic32_read_nob(&c_p->state)); + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); + + res = !(c_p->flags & F_DISABLE_GC); + + if (!enable) { + c_p->flags |= F_DISABLE_GC; + return res; + } + + c_p->flags &= ~F_DISABLE_GC; + + dgc_tsk_qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(c_p); + if (!dgc_tsk_qs) + return res; + + /* Move delayed gc tasks into sys tasks queues. */ + + erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); + + if (!c_p->sys_task_qs) { + c_p->sys_task_qs = dgc_tsk_qs; + dgc_tsk_qs = NULL; + } + else { + ErtsProcSysTaskQs *stsk_qs; + int prio; + + /* + * We push delayed tasks to the front of the queue + * since they have already made it to the front + * once and then been delayed after that. + */ + + stsk_qs = c_p->sys_task_qs; + + while (dgc_tsk_qs->qmask) { + int qbit = dgc_tsk_qs->qmask & -dgc_tsk_qs->qmask; + dgc_tsk_qs->qmask &= ~qbit; + switch (qbit) { + case MAX_BIT: + prio = PRIORITY_MAX; + break; + case HIGH_BIT: + prio = PRIORITY_HIGH; + break; + case NORMAL_BIT: + prio = PRIORITY_NORMAL; + break; + case LOW_BIT: + prio = PRIORITY_LOW; + break; + default: + ERTS_INTERNAL_ERROR("Invalid qmask"); + prio = -1; + break; + } + + ASSERT(dgc_tsk_qs->q[prio]); + + if (!stsk_qs->q[prio]) { + stsk_qs->q[prio] = dgc_tsk_qs->q[prio]; + stsk_qs->qmask |= 1 << prio; + } + else { + ErtsProcSysTask *first1, *last1, *first2, *last2; + + ASSERT(stsk_qs->qmask & (1 << prio)); + first1 = dgc_tsk_qs->q[prio]; + last1 = first1->prev; + first2 = stsk_qs->q[prio]; + last2 = first1->prev; + + last1->next = first2; + first2->prev = last1; + + first1->prev = last2; + last2->next = first1; + + stsk_qs->q[prio] = first1; + } + + } + } + +#ifdef DEBUG + { + int qmask; + erts_aint32_t aprio, state = +#endif + + erts_smp_atomic32_read_bset_nob(&c_p->state, + (ERTS_PSFLG_DELAYED_SYS + | ERTS_PSFLG_ACTIVE_SYS), + ERTS_PSFLG_ACTIVE_SYS); + +#ifdef DEBUG + ASSERT(state & ERTS_PSFLG_DELAYED_SYS); + qmask = c_p->sys_task_qs->qmask; + aprio = ERTS_PSFLGS_GET_ACT_PRIO(state); + ASSERT(ERTS_PSFLGS_GET_USR_PRIO(state) >= aprio); + ASSERT((qmask & -qmask) >= (1 << aprio)); + } +#endif + + erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS); + + (void) ERTS_PROC_SET_DELAYED_GC_TASK_QS(c_p, ERTS_PROC_LOCK_MAIN, NULL); + + if (dgc_tsk_qs) + proc_sys_task_queues_free(dgc_tsk_qs); + + return res; +} + void erts_sched_stat_modify(int what) { @@ -7521,7 +8686,8 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). prio = (erts_aint32_t) so->priority; } - state |= (prio & ERTS_PSFLG_PRIO_MASK); + state |= (((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_ACT_PRIO_OFFSET) + | ((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_USR_PRIO_OFFSET)); if (!rq) rq = erts_get_runq_proc(parent); @@ -7599,6 +8765,8 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->bin_old_vheap = 0; p->bin_vheap_mature = 0; + p->sys_task_qs = NULL; + /* No need to initialize p->fcalls. */ p->current = p->initial+INITIAL_MOD; @@ -7764,7 +8932,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). * Schedule process for execution. */ - schedule_process(p, state, 0); + schedule_process(p, state); VERBOSE(DEBUG_PROCESSES, ("Created a new process: %T\n",p->common.id)); @@ -7815,6 +8983,7 @@ void erts_init_empty_process(Process *p) p->bin_vheap_sz = BIN_VH_MIN_SIZE; p->bin_old_vheap_sz = BIN_VH_MIN_SIZE; p->bin_old_vheap = 0; + p->sys_task_qs = NULL; p->bin_vheap_mature = 0; #ifdef ERTS_SMP p->common.u.alive.ptimer = NULL; @@ -8070,33 +9239,21 @@ delete_process(Process* p) p->fvalue = NIL; } -static ERTS_INLINE erts_aint32_t -set_proc_exiting_state(Process *p, erts_aint32_t state) -{ - erts_aint32_t a, n, e; - a = state; - while (1) { - n = e = a; - n &= ~(ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); - n |= ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE; - if (!(a & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING))) - n |= ERTS_PSFLG_IN_RUNQ; - a = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e); - if (a == e) - break; - } - return a; -} - static ERTS_INLINE void set_proc_exiting(Process *p, - erts_aint32_t state, + erts_aint32_t in_state, Eterm reason, ErlHeapFragment *bp) { + erts_aint32_t state = in_state, enq_prio = -1; + int enqueue; ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(p) == ERTS_PROC_LOCKS_ALL); - state = set_proc_exiting_state(p, state); + enqueue = change_proc_schedule_state(p, + ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT, + ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE, + &state, + &enq_prio); p->fvalue = reason; if (bp) @@ -8111,15 +9268,37 @@ set_proc_exiting(Process *p, cancel_timer(p); p->i = (BeamInstr *) beam_exit; - if (erts_system_profile_flags.runnable_procs - && !(state & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) { - profile_runnable_proc(p, am_active); - } - - if (!(state & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING))) - add2runq(p, state); + if (enqueue) + add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), + state, + enq_prio); } +static ERTS_INLINE erts_aint32_t +set_proc_self_exiting(Process *c_p) +{ +#ifdef DEBUG + int enqueue; +#endif + erts_aint32_t state, enq_prio = -1; + + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCKS_ALL); + + state = erts_smp_atomic32_read_nob(&c_p->state); + ASSERT(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS)); + +#ifdef DEBUG + enqueue = +#endif + change_proc_schedule_state(c_p, + ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT, + ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE, + &state, + &enq_prio); + + ASSERT(!enqueue); + return state; +} #ifdef ERTS_SMP @@ -8167,7 +9346,7 @@ handle_pending_exiters(ErtsProcList *pnd_xtrs) if (p) { if (erts_proclist_same(plp, p)) { erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); - if (!(state & ERTS_PSFLG_RUNNING)) { + if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS))) { ASSERT(state & ERTS_PSFLG_PENDING_EXIT); erts_handle_pending_exit(p, ERTS_PROC_LOCKS_ALL); } @@ -8388,7 +9567,7 @@ send_exit_signal(Process *c_p, /* current process if and only } set_proc_exiting(c_p, state, rsn, NULL); } - else if (!(state & ERTS_PSFLG_RUNNING)) { + else if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS))) { /* Process not running ... */ ErtsProcLocks need_locks = ~(*rp_locks) & ERTS_PROC_LOCKS_ALL; if (need_locks @@ -8765,9 +9944,6 @@ resume_suspend_monitor(ErtsSuspendMonitor *smon, void *vc_p) void erts_do_exit_process(Process* p, Eterm reason) { -#ifdef ERTS_SMP - erts_aint32_t state; -#endif p->arity = 0; /* No live registers */ p->fvalue = reason; @@ -8792,10 +9968,9 @@ erts_do_exit_process(Process* p, Eterm reason) #endif #ifndef ERTS_SMP - set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state)); + set_proc_self_exiting(p); #else - state = set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state)); - if (state & ERTS_PSFLG_PENDING_EXIT) { + if (ERTS_PSFLG_PENDING_EXIT & set_proc_self_exiting(p)) { /* Process exited before pending exit was received... */ p->pending_exit.reason = THE_NON_VALUE; if (p->pending_exit.bp) { @@ -8849,6 +10024,7 @@ erts_continue_exit_process(Process *p) DistEntry *dep; struct saved_calls *scb; process_breakpoint_time_t *pbt; + erts_aint32_t state; #ifdef DEBUG int yield_allowed = 1; @@ -8885,6 +10061,13 @@ erts_continue_exit_process(Process *p) p->flags &= ~F_USING_DB; } + erts_set_gc_state(p, 1); + state = erts_smp_atomic32_read_acqb(&p->state); + if (state & ERTS_PSFLG_ACTIVE_SYS) { + if (cleanup_sys_tasks(p, state, CONTEXT_REDS) >= CONTEXT_REDS/2) + goto yield; + } + if (p->flags & F_USING_DDLL) { erts_ddll_proc_dead(p, ERTS_PROC_LOCK_MAIN); p->flags &= ~F_USING_DDLL; @@ -8962,17 +10145,31 @@ erts_continue_exit_process(Process *p) { /* Inactivate and notify free */ erts_aint32_t n, e, a = erts_smp_atomic32_read_nob(&p->state); +#ifdef ERTS_SMP + int refc_inced = 0; +#endif while (1) { n = e = a; ASSERT(a & ERTS_PSFLG_EXITING); n |= ERTS_PSFLG_FREE; n &= ~ERTS_PSFLG_ACTIVE; +#ifdef ERTS_SMP + if ((n & ERTS_PSFLG_IN_RUNQ) && !refc_inced) { + erts_smp_proc_inc_refc(p); + refc_inced = 1; + } +#endif a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); if (a == e) break; } - } +#ifdef ERTS_SMP + if (refc_inced && !(n & ERTS_PSFLG_IN_RUNQ)) + erts_smp_proc_dec_refc(p); +#endif + } + dep = ((p->flags & F_DISTRIBUTION) ? ERTS_PROC_SET_DIST_ENTRY(p, ERTS_PROC_LOCKS_ALL, NULL) : NULL); @@ -9075,7 +10272,7 @@ timeout_proc(Process* p) state = erts_smp_atomic32_read_acqb(&p->state); if (!(state & ERTS_PSFLG_ACTIVE)) - schedule_process(p, state, 0); + schedule_process(p, state); } @@ -9153,7 +10350,9 @@ erts_program_counter_info(int to, void *to_arg, Process *p) print_function_from_pc(to, to_arg, p->cp); erts_print(to, to_arg, ")\n"); state = erts_smp_atomic32_read_acqb(&p->state); - if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_GC))) { + if (!(state & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_GC))) { erts_print(to, to_arg, "arity = %d\n",p->arity); if (!ERTS_IS_CRASH_DUMPING) { /* diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 8d136f6e8b..043621125c 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -631,8 +631,9 @@ erts_smp_reset_max_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi) #define ERTS_PSD_SCHED_ID 2 #define ERTS_PSD_DIST_ENTRY 3 #define ERTS_PSD_CALL_TIME_BP 4 +#define ERTS_PSD_DELAYED_GC_TASK_QS 5 -#define ERTS_PSD_SIZE 5 +#define ERTS_PSD_SIZE 6 typedef struct { void *data[ERTS_PSD_SIZE]; @@ -656,6 +657,9 @@ typedef struct { #define ERTS_PSD_CALL_TIME_BP_GET_LOCKS ERTS_PROC_LOCK_MAIN #define ERTS_PSD_CALL_TIME_BP_SET_LOCKS ERTS_PROC_LOCK_MAIN +#define ERTS_PSD_DELAYED_GC_TASK_QS_GET_LOCKS ERTS_PROC_LOCK_MAIN +#define ERTS_PSD_DELAYED_GC_TASK_QS_SET_LOCKS ERTS_PROC_LOCK_MAIN + typedef struct { ErtsProcLocks get_locks; ErtsProcLocks set_locks; @@ -688,6 +692,9 @@ typedef struct { ErlHeapFragment *bp; } ErtsPendExit; +typedef struct ErtsProcSysTask_ ErtsProcSysTask; +typedef struct ErtsProcSysTaskQs_ ErtsProcSysTaskQs; + #ifdef ERTS_SMP typedef struct ErtsPendingSuspend_ ErtsPendingSuspend; @@ -855,6 +862,8 @@ struct process { Uint64 bin_old_vheap_sz; /* Virtual old heap block size for binaries */ Uint64 bin_old_vheap; /* Virtual old heap size for binaries */ + ErtsProcSysTaskQs *sys_task_qs; + erts_smp_atomic32_t state; /* Process state flags (see ERTS_PSFLG_*) */ #ifdef ERTS_SMP @@ -924,24 +933,67 @@ void erts_check_for_holes(Process* p); # error "Need to increase ERTS_PSFLG_PRIO_SHIFT" #endif -#define ERTS_PSFLG_PRIO_SHIFT 2 +#define ERTS_PSFLGS_PRIO_BITS 2 +#define ERTS_PSFLGS_PRIO_MASK \ + ((((erts_aint32_t) 1) << ERTS_PSFLGS_PRIO_BITS) - 1) -#define ERTS_PSFLG_BIT(N) \ - (((erts_aint32_t) 1) << (ERTS_PSFLG_PRIO_SHIFT + (N))) +#define ERTS_PSFLGS_ACT_PRIO_OFFSET (0*ERTS_PSFLGS_PRIO_BITS) +#define ERTS_PSFLGS_USR_PRIO_OFFSET (1*ERTS_PSFLGS_PRIO_BITS) +#define ERTS_PSFLGS_PRQ_PRIO_OFFSET (2*ERTS_PSFLGS_PRIO_BITS) +#define ERTS_PSFLGS_ZERO_BIT_OFFSET (3*ERTS_PSFLGS_PRIO_BITS) -#define ERTS_PSFLG_PRIO_MASK (ERTS_PSFLG_BIT(0) - 1) +#define ERTS_PSFLGS_QMASK_BITS 4 +#define ERTS_PSFLGS_QMASK \ + ((((erts_aint32_t) 1) << ERTS_PSFLGS_QMASK_BITS) - 1) +#define ERTS_PSFLGS_IN_PRQ_MASK_OFFSET \ + ERTS_PSFLGS_ZERO_BIT_OFFSET -#define ERTS_PSFLG_FREE ERTS_PSFLG_BIT(0) -#define ERTS_PSFLG_EXITING ERTS_PSFLG_BIT(1) -#define ERTS_PSFLG_PENDING_EXIT ERTS_PSFLG_BIT(2) -#define ERTS_PSFLG_ACTIVE ERTS_PSFLG_BIT(3) -#define ERTS_PSFLG_IN_RUNQ ERTS_PSFLG_BIT(4) -#define ERTS_PSFLG_RUNNING ERTS_PSFLG_BIT(5) -#define ERTS_PSFLG_SUSPENDED ERTS_PSFLG_BIT(6) -#define ERTS_PSFLG_GC ERTS_PSFLG_BIT(7) -#define ERTS_PSFLG_BOUND ERTS_PSFLG_BIT(8) -#define ERTS_PSFLG_TRAP_EXIT ERTS_PSFLG_BIT(9) +#define ERTS_PSFLG_BIT(N) \ + (((erts_aint32_t) 1) << (ERTS_PSFLGS_ZERO_BIT_OFFSET + (N))) +/* + * ACT_PRIO -> Active prio, i.e., currently active prio. This + * prio may be higher than user prio. + * USR_PRIO -> User prio. i.e., prio the user has set. + * PRQ_PRIO -> Prio queue prio, i.e., prio queue currently + * enqueued in. + */ +#define ERTS_PSFLGS_ACT_PRIO_MASK \ + (ERTS_PSFLGS_PRIO_MASK << ERTS_PSFLGS_ACT_PRIO_OFFSET) +#define ERTS_PSFLGS_USR_PRIO_MASK \ + (ERTS_PSFLGS_PRIO_MASK << ERTS_PSFLGS_USR_PRIO_OFFSET) +#define ERTS_PSFLGS_PRQ_PRIO_MASK \ + (ERTS_PSFLGS_PRIO_MASK << ERTS_PSFLGS_PRQ_PRIO_OFFSET) +#define ERTS_PSFLG_IN_PRQ_MAX ERTS_PSFLG_BIT(0) +#define ERTS_PSFLG_IN_PRQ_HIGH ERTS_PSFLG_BIT(1) +#define ERTS_PSFLG_IN_PRQ_NORMAL ERTS_PSFLG_BIT(2) +#define ERTS_PSFLG_IN_PRQ_LOW ERTS_PSFLG_BIT(3) +#define ERTS_PSFLG_FREE ERTS_PSFLG_BIT(4) +#define ERTS_PSFLG_EXITING ERTS_PSFLG_BIT(5) +#define ERTS_PSFLG_PENDING_EXIT ERTS_PSFLG_BIT(6) +#define ERTS_PSFLG_ACTIVE ERTS_PSFLG_BIT(7) +#define ERTS_PSFLG_IN_RUNQ ERTS_PSFLG_BIT(8) +#define ERTS_PSFLG_RUNNING ERTS_PSFLG_BIT(9) +#define ERTS_PSFLG_SUSPENDED ERTS_PSFLG_BIT(10) +#define ERTS_PSFLG_GC ERTS_PSFLG_BIT(11) +#define ERTS_PSFLG_BOUND ERTS_PSFLG_BIT(12) +#define ERTS_PSFLG_TRAP_EXIT ERTS_PSFLG_BIT(13) +#define ERTS_PSFLG_ACTIVE_SYS ERTS_PSFLG_BIT(14) +#define ERTS_PSFLG_RUNNING_SYS ERTS_PSFLG_BIT(15) +#define ERTS_PSFLG_PROXY ERTS_PSFLG_BIT(16) +#define ERTS_PSFLG_DELAYED_SYS ERTS_PSFLG_BIT(17) + +#define ERTS_PSFLGS_IN_PRQ_MASK (ERTS_PSFLG_IN_PRQ_MAX \ + | ERTS_PSFLG_IN_PRQ_HIGH \ + | ERTS_PSFLG_IN_PRQ_NORMAL \ + | ERTS_PSFLG_IN_PRQ_LOW) + +#define ERTS_PSFLGS_GET_ACT_PRIO(PSFLGS) \ + (((PSFLGS) >> ERTS_PSFLGS_ACT_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK) +#define ERTS_PSFLGS_GET_USR_PRIO(PSFLGS) \ + (((PSFLGS) >> ERTS_PSFLGS_USR_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK) +#define ERTS_PSFLGS_GET_PRQ_PRIO(PSFLGS) \ + (((PSFLGS) >> ERTS_PSFLGS_USR_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK) /* The sequential tracing token is a tuple of size 5: * @@ -1056,6 +1108,7 @@ extern struct erts_system_profile_flags_t erts_system_profile_flags; #define F_HAVE_BLCKD_MSCHED (1 << 8) /* Process has blocked multi-scheduling */ #define F_P2PNR_RESCHED (1 << 9) /* Process has been rescheduled via erts_pid2proc_not_running() */ #define F_FORCE_GC (1 << 10) /* Force gc at process in-scheduling */ +#define F_DISABLE_GC (1 << 11) /* Disable GC */ /* process trace_flags */ #define F_SENSITIVE (1 << 0) @@ -1146,6 +1199,7 @@ void erts_late_init_process(void); void erts_early_init_scheduling(int); void erts_init_scheduling(int, int); +int erts_set_gc_state(Process *c_p, int enable); Eterm erts_sched_wall_time_request(Process *c_p, int set, int enable); Eterm erts_gc_info_request(Process *c_p); Uint64 erts_get_proc_interval(void); @@ -1591,6 +1645,11 @@ erts_psd_set(Process *p, ErtsProcLocks plocks, int ix, void *data) #define ERTS_PROC_SET_CALL_TIME(P, L, PBT) \ ((process_breakpoint_time_t *) erts_psd_set((P), (L), ERTS_PSD_CALL_TIME_BP, (void *) (PBT))) +#define ERTS_PROC_GET_DELAYED_GC_TASK_QS(P) \ + ((ErtsProcSysTaskQs *) erts_psd_get((P), ERTS_PSD_DELAYED_GC_TASK_QS)) +#define ERTS_PROC_SET_DELAYED_GC_TASK_QS(P, L, PBT) \ + ((ErtsProcSysTaskQs *) erts_psd_set((P), (L), ERTS_PSD_DELAYED_GC_TASK_QS, (void *) (PBT))) + ERTS_GLB_INLINE Eterm erts_proc_get_error_handler(Process *p); ERTS_GLB_INLINE Eterm erts_proc_set_error_handler(Process *p, diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index c1fda3f96c..94bc1b172a 100755 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -650,6 +650,10 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg); Eterm erl_is_function(Process* p, Eterm arg1, Eterm arg2); +/* beam_bif_load.c */ +Eterm erts_check_process_code(Process *c_p, Eterm module, int allow_gc, int *redsp); + + /* beam_load.c */ typedef struct { BeamInstr* current; /* Pointer to: Mod, Name, Arity */ @@ -1121,7 +1125,12 @@ erts_alloc_message_heap_state(Uint size, if (statep) *statep = state; if ((state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) + || (receiver->flags & F_DISABLE_GC) || HEAP_LIMIT(receiver) - HEAP_TOP(receiver) <= size) { + /* + * The heap is either potentially in an inconsistent + * state, or not large enough. + */ #ifdef ERTS_SMP if (locked_main) { *receiver_locks &= ~ERTS_PROC_LOCK_MAIN; diff --git a/erts/emulator/beam/ops.tab b/erts/emulator/beam/ops.tab index 1e5ae46bfa..c29f3f9b1b 100644 --- a/erts/emulator/beam/ops.tab +++ b/erts/emulator/beam/ops.tab @@ -763,17 +763,17 @@ allocate_init t I y ################################################################# # -# The BIFs erlang:check_process_code/2 must be called like a function, +# The BIFs erts_internal:check_process_code/2 must be called like a function, # to ensure that c_p->i (program counter) is set correctly (an ordinary # BIF call doesn't set it). # -call_ext u==2 Bif=u$bif:erlang:check_process_code/2 => i_call_ext Bif -call_ext_last u==2 Bif=u$bif:erlang:check_process_code/2 D => i_call_ext_last Bif D -call_ext_only u==2 Bif=u$bif:erlang:check_process_code/2 => i_call_ext_only Bif +call_ext u==2 Bif=u$bif:erts_internal:check_process_code/2 => i_call_ext Bif +call_ext_last u==2 Bif=u$bif:erts_internal:check_process_code/2 D => i_call_ext_last Bif D +call_ext_only u==2 Bif=u$bif:erts_internal:check_process_code/2 => i_call_ext_only Bif # -# The BIFs erlang:garbage_collect/0,1 must be called like functions, +# The BIFs erlang:garbage_collect/0 must be called like a function, # to allow them to invoke the garbage collector. (The stack pointer must # be saved and p->arity must be zeroed, which is not done on ordinary BIF calls.) # @@ -782,10 +782,6 @@ call_ext u==0 Bif=u$bif:erlang:garbage_collect/0 => i_call_ext Bif call_ext_last u==0 Bif=u$bif:erlang:garbage_collect/0 D => i_call_ext_last Bif D call_ext_only u==0 Bif=u$bif:erlang:garbage_collect/0 => i_call_ext_only Bif -call_ext u==1 Bif=u$bif:erlang:garbage_collect/1 => i_call_ext Bif -call_ext_last u==1 Bif=u$bif:erlang:garbage_collect/1 D => i_call_ext_last Bif D -call_ext_only u==1 Bif=u$bif:erlang:garbage_collect/1 => i_call_ext_only Bif - # # put/2 and erase/1 must be able to do garbage collection, so we must call # them like functions. diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c index 605a625282..297c4bf439 100644 --- a/erts/emulator/beam/utils.c +++ b/erts/emulator/beam/utils.c @@ -1675,7 +1675,7 @@ static int do_send_to_logger(Eterm tag, Eterm gleader, char *buf, int len) p = erts_whereis_process(NULL, 0, am_error_logger, 0, 0); if (p) { erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); - if (state & ERTS_PSFLG_RUNNING) + if (state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS)) p = NULL; } } |