diff options
author | Rickard Green <[email protected]> | 2013-12-07 00:19:05 +0100 |
---|---|---|
committer | Rickard Green <[email protected]> | 2013-12-07 00:19:05 +0100 |
commit | 9232f50d4cbe7b051dfd3e83625de0b22536c4c4 (patch) | |
tree | f3c52ccc719a9df7a0714de05e2d1e9dde700df0 | |
parent | 3c00452a81dfde57f85c882029186cfa3c0d348d (diff) | |
parent | 9f1f0bff7f98d62f8406e5ecd76f6eb7c1a66ff3 (diff) | |
download | otp-9232f50d4cbe7b051dfd3e83625de0b22536c4c4.tar.gz otp-9232f50d4cbe7b051dfd3e83625de0b22536c4c4.tar.bz2 otp-9232f50d4cbe7b051dfd3e83625de0b22536c4c4.zip |
Merge branch 'rickard/garbage_collect/OTP-11388'
* rickard/garbage_collect/OTP-11388:
Parallel check_process_code when code_server purge a module
Functionality for disabling garbage collection
Use asynchronous check_process_code in code_parallel_SUITE
Execution of system tasks in context of another process
Conflicts:
bootstrap/lib/kernel/ebin/hipe_unified_loader.beam
erts/preloaded/ebin/erlang.beam
erts/preloaded/ebin/erts_internal.beam
30 files changed, 2412 insertions, 359 deletions
diff --git a/bootstrap/lib/kernel/ebin/code_server.beam b/bootstrap/lib/kernel/ebin/code_server.beam Binary files differindex 89417b095b..896e3cb61b 100644 --- a/bootstrap/lib/kernel/ebin/code_server.beam +++ b/bootstrap/lib/kernel/ebin/code_server.beam diff --git a/bootstrap/lib/stdlib/ebin/erl_internal.beam b/bootstrap/lib/stdlib/ebin/erl_internal.beam Binary files differindex d8c77627f1..01c6598cdc 100644 --- a/bootstrap/lib/stdlib/ebin/erl_internal.beam +++ b/bootstrap/lib/stdlib/ebin/erl_internal.beam diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml index bff544100f..711473afd2 100644 --- a/erts/doc/src/erlang.xml +++ b/erts/doc/src/erlang.xml @@ -501,16 +501,87 @@ <name name="check_process_code" arity="2"/> <fsummary>Check if a process is executing old code for a module</fsummary> <desc> - <p>Returns <c>true</c> if the process <c><anno>Pid</anno></c> is executing - old code for <c><anno>Module</anno></c>. That is, if the current call of - the process executes old code for this module, or if the - process has references to old code for this module, or if the - process contains funs that references old code for this - module. Otherwise, it returns <c>false</c>.</p> - <pre> -> <input>check_process_code(Pid, lists).</input> -false</pre> + <p>The same as + <seealso marker="#check_process_code/3"><c>erlang:check_process_code(<anno>Pid</anno>, + <anno>Module</anno>, [])</c></seealso>.</p> + </desc> + </func> + <func> + <name name="check_process_code" arity="3"/> + <fsummary>Check if a process is executing old code for a module</fsummary> + <desc> + <p>Check if the node local process identified by <c><anno>Pid</anno></c> + is executing old code for <c><anno>Module</anno></c>.</p> + <p>Currently available <c><anno>Option</anno>s</c>:</p> + <taglist> + <tag><c>{allow_gc, boolean()}</c></tag> + <item> + Determines if garbage collection is allowed when performing + the operation. If <c>{allow_gc, false}</c> is passed, and + a garbage collection is needed in order to determine the + result of the operation, the operation will be aborted + (see information on <c><anno>CheckResult</anno></c> below). + The default is to allow garbage collection, i.e., + <c>{allow_gc, true}</c>. + </item> + <tag><c>{async, RequestId}</c></tag> + <item> + The <c>check_process_code/3</c> function will return + the value <c>async</c> immediately after the request + has been sent. When the request has been processed, the + process that called this function will be passed a + message on the form:<br/> + <c>{check_process_code, <anno>RequestId</anno>, <anno>CheckResult</anno>}</c>. + </item> + </taglist> + <p>If <c><anno>Pid</anno></c> equals <c>self()</c>, and + no <c>async</c> option has been passed, the operation will + be performed at once. In all other cases a request for + the operation will be sent to the process identified by + <c><anno>Pid</anno></c>, and will be handled when + appropriate. If no <c>async</c> option has been passed, + the caller will block until <c><anno>CheckResult</anno></c> + is available and can be returned.</p> + <p><c><anno>CheckResult</anno></c> informs about the result of + the request:</p> + <taglist> + <tag><c>true</c></tag> + <item> + The process identified by <c><anno>Pid</anno></c> is + executing old code for <c><anno>Module</anno></c>. + That is, the current call of the process executes old + code for this module, or the process has references + to old code for this module, or the process contains + funs that references old code for this module. + </item> + <tag><c>false</c></tag> + <item> + The process identified by <c><anno>Pid</anno></c> is + not executing old code for <c><anno>Module</anno></c>. + </item> + <tag><c>aborted</c></tag> + <item> + The operation was aborted since the process needed to + be garbage collected in order to determine the result + of the operation, and the operation was requested + by passing the <c>{allow_gc, false}</c> option.</item> + </taglist> <p>See also <seealso marker="kernel:code">code(3)</seealso>.</p> + <p>Failures:</p> + <taglist> + <tag><c>badarg</c></tag> + <item> + If <c><anno>Pid</anno></c> is not a node local process identifier. + </item> + <tag><c>badarg</c></tag> + <item> + If <c><anno>Module</anno></c> is not an atom. + </item> + <tag><c>badarg</c></tag> + <item> + If <c><anno>OptionList</anno></c> is not a valid list of options. + </item> + </taglist> </desc> </func> <func> @@ -1197,20 +1268,74 @@ true that the spontaneous garbage collection will occur too late or not at all. Improper use may seriously degrade system performance.</p> - <p>Compatibility note: In versions of OTP prior to R7, - the garbage collection took place at the next context switch, - not immediately. To force a context switch after a call to - <c>erlang:garbage_collect()</c>, it was sufficient to make - any function call.</p> </desc> </func> <func> <name name="garbage_collect" arity="1"/> - <fsummary>Force an immediate garbage collection of a process</fsummary> + <fsummary>Garbage collect a process</fsummary> + <desc> + <p>The same as + <seealso marker="#garbage_collect/2"><c>garbage_collect(<anno>Pid</anno>, [])</c></seealso>.</p> + </desc> + </func> + <func> + <name name="garbage_collect" arity="2"/> + <fsummary>Garbage collect a process</fsummary> <desc> - <p>Works like <c>erlang:garbage_collect()</c> but on any - process. The same caveats apply. Returns <c>false</c> if - <c><anno>Pid</anno></c> refers to a dead process; <c>true</c> otherwise.</p> + <p>Garbage collect the node local process identified by + <c><anno>Pid</anno></c>.</p> + <p>Currently available <c><anno>Option</anno></c>s:</p> + <taglist> + <tag><c>{async, RequestId}</c></tag> + <item> + The <c>garbage_collect/2</c> function will return + the value <c>async</c> immediately after the request + has been sent. When the request has been processed, the + process that called this function will be passed a + message on the form:<br/> + <c>{garbage_collect, <anno>RequestId</anno>, <anno>GCResult</anno>}</c>. + </item> + </taglist> + <p>If <c><anno>Pid</anno></c> equals <c>self()</c>, and + no <c>async</c> option has been passed, the garbage + collection will be performed at once, i.e. the same as + calling + <seealso marker="#garbage_collect/0">garbage_collect/0</seealso>. + In all other cases a request for garbage collection will + be sent to the process identified by <c><anno>Pid</anno></c>, + and will be handled when appropriate. If no <c>async</c> + option has been passed, the caller will block until + <c><anno>GCResult</anno></c> is available and can be + returned.</p> + <p><c><anno>GCResult</anno></c> informs about the result of + the garbage collection request:</p> + <taglist> + <tag><c>true</c></tag> + <item> + The process identified by <c><anno>Pid</anno></c> has + been garbage collected. + </item> + <tag><c>false</c></tag> + <item> + No garbage collection was performed. This since the + the process identified by <c><anno>Pid</anno></c> + terminated before the request could be satisfied. + </item> + </taglist> + <p>Note that the same caveats as for + <seealso marker="#garbage_collect/0">garbage_collect/0</seealso> + apply.</p> + <p>Failures:</p> + <taglist> + <tag><c>badarg</c></tag> + <item> + If <c><anno>Pid</anno></c> is not a node local process identifier. + </item> + <tag><c>badarg</c></tag> + <item> + If <c><anno>OptionList</anno></c> is not a valid list of options. + </item> + </taglist> </desc> </func> <func> diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index e108e706c3..eee4badfb8 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -79,6 +79,7 @@ atom allocated_areas atom allocator atom allocator_sizes atom alloc_util_allocators +atom allow_gc atom allow_passive_connect atom already_loaded atom amd64 diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index 649594a334..3f92c5b025 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -36,7 +36,7 @@ #include "erl_thr_progress.h" static void set_default_trace_pattern(Eterm module); -static Eterm check_process_code(Process* rp, Module* modp); +static Eterm check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp); static void delete_code(Module* modp); static void decrement_refc(BeamInstr* code); static int is_native(BeamInstr* code); @@ -427,69 +427,82 @@ check_old_code_1(BIF_ALIST_1) } Eterm -check_process_code_2(BIF_ALIST_2) +erts_check_process_code(Process *c_p, Eterm module, int allow_gc, int *redsp) { - Process* rp; Module* modp; + Eterm res; + ErtsCodeIndex code_ix; - if (is_not_atom(BIF_ARG_2)) { - goto error; - } - if (is_internal_pid(BIF_ARG_1)) { - Eterm res; - ErtsCodeIndex code_ix; - - code_ix = erts_active_code_ix(); - modp = erts_get_module(BIF_ARG_2, code_ix); - if (modp == NULL) { /* Doesn't exist. */ - return am_false; - } - erts_rlock_old_code(code_ix); - if (modp->old.code == NULL) { /* No old code. */ - erts_runlock_old_code(code_ix); - return am_false; - } - erts_runlock_old_code(code_ix); - -#ifdef ERTS_SMP - rp = erts_pid2proc_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, - BIF_ARG_1, ERTS_PROC_LOCK_MAIN); -#else - rp = erts_pid2proc(BIF_P, 0, BIF_ARG_1, 0); -#endif - if (!rp) { - BIF_RET(am_false); - } - if (rp == ERTS_PROC_LOCK_BUSY) { - ERTS_BIF_YIELD2(bif_export[BIF_check_process_code_2], BIF_P, - BIF_ARG_1, BIF_ARG_2); - } - erts_rlock_old_code(code_ix); - if (modp->old.code != NULL) { /* must check again */ - res = check_process_code(rp, modp); - } - else { - res = am_false; - } - erts_runlock_old_code(code_ix); -#ifdef ERTS_SMP - if (BIF_P != rp) { - erts_resume(rp, ERTS_PROC_LOCK_MAIN); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); + (*redsp)++; + + ASSERT(is_atom(module)); + + code_ix = erts_active_code_ix(); + modp = erts_get_module(module, code_ix); + if (!modp) + return am_false; + erts_rlock_old_code(code_ix); + res = modp->old.code ? check_process_code(c_p, modp, allow_gc, redsp) : am_false; + erts_runlock_old_code(code_ix); + + return res; +} + +BIF_RETTYPE erts_internal_check_process_code_2(BIF_ALIST_2) +{ + int reds = 0; + Eterm res; + Eterm olist = BIF_ARG_2; + int allow_gc = 1; + + if (is_not_atom(BIF_ARG_1)) + goto badarg; + + while (is_list(olist)) { + Eterm *lp = list_val(olist); + Eterm opt = CAR(lp); + if (is_tuple(opt)) { + Eterm* tp = tuple_val(opt); + switch (arityval(tp[0])) { + case 2: + switch (tp[1]) { + case am_allow_gc: + switch (tp[2]) { + case am_false: + allow_gc = 0; + break; + case am_true: + allow_gc = 1; + break; + default: + goto badarg; + } + break; + default: + goto badarg; + } + break; + default: + goto badarg; + } } -#endif - BIF_RET(res); - } - else if (is_external_pid(BIF_ARG_1) - && external_pid_dist_entry(BIF_ARG_1) == erts_this_dist_entry) { - BIF_RET(am_false); + else + goto badarg; + olist = CDR(lp); } + if (is_not_nil(olist)) + goto badarg; + + res = erts_check_process_code(BIF_P, BIF_ARG_1, allow_gc, &reds); + + ASSERT(is_value(res)); + + BIF_RET2(res, reds); - error: +badarg: BIF_ERROR(BIF_P, BADARG); } - BIF_RETTYPE delete_module_1(BIF_ALIST_1) { ErtsCodeIndex code_ix; @@ -710,7 +723,7 @@ set_default_trace_pattern(Eterm module) } static Eterm -check_process_code(Process* rp, Module* modp) +check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp) { BeamInstr* start; char* mod_start; @@ -773,6 +786,16 @@ check_process_code(Process* rp, Module* modp) } } + if (rp->flags & F_DISABLE_GC) { + /* + * Cannot proceed. Process has disabled gc in order to + * safely leave inconsistent data on the heap and/or + * off heap lists. Need to wait for gc to be enabled + * again. + */ + return THE_NON_VALUE; + } + /* * See if there are funs that refer to the old version of the module. */ @@ -786,6 +809,8 @@ check_process_code(Process* rp, Module* modp) if (done_gc) { return am_true; } else { + if (!allow_gc) + return am_aborted; /* * Try to get rid of this fun by garbage collecting. * Clear both fvalue and ftrace to make sure they @@ -796,7 +821,7 @@ check_process_code(Process* rp, Module* modp) rp->ftrace = NIL; done_gc = 1; FLAGS(rp) |= F_NEED_FULLSWEEP; - (void) erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); + *redsp += erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); goto rescan; } } @@ -850,6 +875,9 @@ check_process_code(Process* rp, Module* modp) Uint lit_size; struct erl_off_heap_header* oh; + if (!allow_gc) + return am_aborted; + /* * Try to get rid of constants by by garbage collecting. * Clear both fvalue and ftrace. @@ -859,11 +887,12 @@ check_process_code(Process* rp, Module* modp) rp->ftrace = NIL; done_gc = 1; FLAGS(rp) |= F_NEED_FULLSWEEP; - (void) erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); + *redsp += erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); literals = (Eterm *) modp->old.code[MI_LITERALS_START]; lit_size = (Eterm *) modp->old.code[MI_LITERALS_END] - literals; oh = (struct erl_off_heap_header *) modp->old.code[MI_LITERALS_OFF_HEAP]; + *redsp += lit_size / 10; /* Need, better value... */ erts_garbage_collect_literals(rp, literals, lit_size, oh); } } diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 13d31285b2..96666d98ed 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -3764,45 +3764,6 @@ BIF_RETTYPE now_0(BIF_ALIST_0) /**********************************************************************/ -BIF_RETTYPE garbage_collect_1(BIF_ALIST_1) -{ - int reds; - Process *rp; - - if (is_not_pid(BIF_ARG_1)) { - BIF_ERROR(BIF_P, BADARG); - } - - if (BIF_P->common.id == BIF_ARG_1) - rp = BIF_P; - else { -#ifdef ERTS_SMP - rp = erts_pid2proc_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, - BIF_ARG_1, ERTS_PROC_LOCK_MAIN); - if (rp == ERTS_PROC_LOCK_BUSY) - ERTS_BIF_YIELD1(bif_export[BIF_garbage_collect_1], BIF_P, BIF_ARG_1); -#else - rp = erts_proc_lookup(BIF_ARG_1); -#endif - if (!rp) - BIF_RET(am_false); - } - - /* The GC cost is taken for the process executing this BIF. */ - - FLAGS(rp) |= F_NEED_FULLSWEEP; - reds = erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); - -#ifdef ERTS_SMP - if (BIF_P != rp) { - erts_resume(rp, ERTS_PROC_LOCK_MAIN); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); - } -#endif - - BIF_RET2(am_true, reds); -} - BIF_RETTYPE garbage_collect_0(BIF_ALIST_0) { int reds; diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index b6cce84cdd..dd50df636c 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -46,7 +46,6 @@ bif erlang:atom_to_list/1 bif erlang:binary_to_list/1 bif erlang:binary_to_list/3 bif erlang:binary_to_term/1 -bif erlang:check_process_code/2 bif erlang:crc32/1 bif erlang:crc32/2 bif erlang:crc32_combine/3 @@ -67,7 +66,6 @@ bif erlang:float_to_list/1 bif erlang:float_to_list/2 bif erlang:fun_info/2 bif erlang:garbage_collect/0 -bif erlang:garbage_collect/1 bif erlang:get/0 bif erlang:get/1 bif erlang:get_keys/1 @@ -155,6 +153,10 @@ bif erts_internal:port_control/3 bif erts_internal:port_close/1 bif erts_internal:port_connect/2 +bif erts_internal:request_system_task/3 +bif erts_internal:check_process_code/2 + + # inet_db support bif erlang:port_set_data/2 bif erlang:port_get_data/1 diff --git a/erts/emulator/beam/break.c b/erts/emulator/beam/break.c index b7e1092907..7d4f52ee23 100644 --- a/erts/emulator/beam/break.c +++ b/erts/emulator/beam/break.c @@ -112,10 +112,12 @@ process_killer(void) erts_smp_proc_lock(rp, rp_locks); state = erts_smp_atomic32_read_acqb(&rp->state); if (state & (ERTS_PSFLG_FREE - | ERTS_PSFLG_EXITING - | ERTS_PSFLG_ACTIVE - | ERTS_PSFLG_IN_RUNQ - | ERTS_PSFLG_RUNNING)) { + | ERTS_PSFLG_EXITING + | ERTS_PSFLG_ACTIVE + | ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_IN_RUNQ + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS)) { erts_printf("Can only kill WAITING processes this way\n"); } else { diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index bb5eba80be..32308fae9b 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -268,6 +268,8 @@ type PROC_INTERVAL LONG_LIVED SYSTEM process_interval type BUSY_CALLER_TAB SHORT_LIVED SYSTEM busy_caller_table type BUSY_CALLER SHORT_LIVED SYSTEM busy_caller type PORT_DATA_HEAP STANDARD SYSTEM port_data_heap +type PROC_SYS_TSK SHORT_LIVED PROCESSES proc_sys_task +type PROC_SYS_TSK_QS SHORT_LIVED PROCESSES proc_sys_task_queues +if threads_no_smp # Need thread safe allocs, but std_alloc and fix_alloc are not; diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index d7f1e2d971..8fa3aa29eb 100755 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -3603,6 +3603,19 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2) BIF_RET(am_true); } } + else if (ERTS_IS_ATOM_STR("gc_state", BIF_ARG_1)) { + /* Used by process_SUITE (emulator) */ + int res, enable; + + switch (BIF_ARG_2) { + case am_true: enable = 1; break; + case am_false: enable = 0; break; + default: BIF_ERROR(BIF_P, BADARG); break; + } + + res = erts_set_gc_state(BIF_P, enable); + BIF_RET(res ? am_true : am_false); + } else if (ERTS_IS_ATOM_STR("send_fake_exit_signal", BIF_ARG_1)) { /* Used by signal_SUITE (emulator) */ diff --git a/erts/emulator/beam/erl_debug.c b/erts/emulator/beam/erl_debug.c index b90d00f236..dc79d45be7 100644 --- a/erts/emulator/beam/erl_debug.c +++ b/erts/emulator/beam/erl_debug.c @@ -299,6 +299,9 @@ void erts_check_for_holes(Process* p) ErlHeapFragment* hf; Eterm* start; + if (p->flags & F_DISABLE_GC) + return; + start = p->last_htop ? p->last_htop : HEAP_START(p); check_memory(start, HEAP_TOP(p)); p->last_htop = HEAP_TOP(p); diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c index e89725c190..c5585d39e8 100644 --- a/erts/emulator/beam/erl_gc.c +++ b/erts/emulator/beam/erl_gc.c @@ -400,10 +400,16 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) Uint reclaimed_now = 0; int done = 0; Uint ms1, s1, us1; - ErtsSchedulerData *esdp = erts_get_scheduler_data(); + ErtsSchedulerData *esdp; #ifdef USE_VM_PROBES DTRACE_CHARBUF(pidbuf, DTRACE_TERM_BUF_SIZE); #endif + + if (p->flags & F_DISABLE_GC) + return 1; + + esdp = erts_get_scheduler_data(); + if (IS_TRACED_FL(p, F_TRACE_GC)) { trace_gc(p, am_gc_start); } @@ -532,6 +538,9 @@ erts_garbage_collect_hibernate(Process* p) Uint area_size; Sint offs; + if (p->flags & F_DISABLE_GC) + ERTS_INTERNAL_ERROR("GC disabled"); + /* * Preliminaries. */ @@ -667,6 +676,8 @@ erts_garbage_collect_literals(Process* p, Eterm* literals, Uint n; struct erl_off_heap_header** prev; + if (p->flags & F_DISABLE_GC) + return; /* * Set GC state. */ diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 771eba431f..0f3bb8d281 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -46,6 +46,11 @@ typedef struct erl_off_heap { Uint64 overhead; /* Administrative overhead (used to force GC). */ } ErlOffHeap; +#define ERTS_INIT_OFF_HEAP(OHP) \ + do { \ + (OHP)->first = NULL; \ + (OHP)->overhead = 0; \ + } while (0) #include "external.h" #include "erl_process.h" diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 1efd070afd..0a41fb596d 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -283,6 +283,40 @@ struct erts_system_profile_flags_t erts_system_profile_flags; #error "Need to store process_count in another type" #endif +typedef enum { + ERTS_PSTT_GC, /* Garbage Collect */ + ERTS_PSTT_CPC /* Check Process Code */ +} ErtsProcSysTaskType; + +#define ERTS_MAX_PROC_SYS_TASK_ARGS 2 + +struct ErtsProcSysTask_ { + ErtsProcSysTask *next; + ErtsProcSysTask *prev; + ErtsProcSysTaskType type; + Eterm requester; + Eterm reply_tag; + Eterm req_id; + Uint req_id_sz; + Eterm arg[ERTS_MAX_PROC_SYS_TASK_ARGS]; + ErlOffHeap off_heap; + Eterm heap[1]; +}; + +#define ERTS_PROC_SYS_TASK_SIZE(HSz) \ + (sizeof(ErtsProcSysTask) - sizeof(Eterm) + sizeof(Eterm)*(HSz)) + +struct ErtsProcSysTaskQs_ { + int qmask; + int ncount; + ErtsProcSysTask *q[ERTS_NO_PROC_PRIO_LEVELS]; +}; + +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(proc_sys_task_queues, + ErtsProcSysTaskQs, + 50, + ERTS_ALC_T_PROC_SYS_TSK_QS) + ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_op_list, ErtsMiscOpList, 10, @@ -353,6 +387,14 @@ static void aux_work_timeout_early_init(int no_schedulers); static void aux_work_timeout_late_init(void); static void setup_aux_work_timer(void); +static int execute_sys_tasks(Process *c_p, + erts_aint32_t *statep, + int in_reds); +static int cleanup_sys_tasks(Process *c_p, + erts_aint32_t in_state, + int in_reds); + + #if defined(DEBUG) || 0 #define ERTS_DBG_CHK_AUX_WORK_VAL(V) dbg_chk_aux_work_val((V)) static void @@ -471,6 +513,11 @@ erts_pre_init_process(void) erts_psd_required_locks[ERTS_PSD_CALL_TIME_BP].set_locks = ERTS_PSD_CALL_TIME_BP_SET_LOCKS; + erts_psd_required_locks[ERTS_PSD_DELAYED_GC_TASK_QS].get_locks + = ERTS_PSD_DELAYED_GC_TASK_QS_GET_LOCKS; + erts_psd_required_locks[ERTS_PSD_DELAYED_GC_TASK_QS].set_locks + = ERTS_PSD_DELAYED_GC_TASK_QS_SET_LOCKS; + /* Check that we have locks for all entries */ for (ix = 0; ix < ERTS_PSD_SIZE; ix++) { ERTS_SMP_LC_ASSERT(erts_psd_required_locks[ix].get_locks); @@ -2848,7 +2895,7 @@ dequeue_process(ErtsRunQueue *runq, int prio_q, erts_aint32_t *statep) if (statep) *statep = state; - prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state); rqi = &runq->procs.prio_info[prio]; @@ -2997,7 +3044,7 @@ immigrate(ErtsRunQueue *c_rq, ErtsMigrationPath *mp) erts_aint32_t state; state = erts_smp_atomic32_read_acqb(&proc->state); if (!(ERTS_PSFLG_BOUND & state) - && (prio == (int) (ERTS_PSFLG_PRIO_MASK & state))) { + && (prio == (int) ERTS_PSFLGS_GET_PRQ_PRIO(state))) { ErtsRunQueueInfo *rqi = &rq->procs.prio_info[prio]; unqueue_process(rq, rpq, rqi, prio, prev_proc, proc); erts_smp_runq_unlock(rq); @@ -3079,7 +3126,7 @@ schedule_bound_processes(ErtsRunQueue *rq, while (proc) { erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state); next = proc->next; - enqueue_process(rq, (int) (ERTS_PSFLG_PRIO_MASK & state), proc); + enqueue_process(rq, (int) ERTS_PSFLGS_GET_PRQ_PRIO(state), proc); proc = next; } } @@ -3184,7 +3231,7 @@ evacuate_run_queue(ErtsRunQueue *rq, sbpp->last = proc; } else { - int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + int prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state); erts_smp_runq_unlock(rq); to_rq = mp->prio[prio].runq; @@ -3257,7 +3304,7 @@ try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq, erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state); if (!(ERTS_PSFLG_BOUND & state)) { /* Steal process */ - int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + int prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state); ErtsRunQueueInfo *rqi = &vrq->procs.prio_info[prio]; unqueue_process(vrq, rpq, rqi, prio, prev_proc, proc); erts_smp_runq_unlock(vrq); @@ -4575,6 +4622,7 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online) #endif init_misc_op_list_alloc(); + init_proc_sys_task_queues_alloc(); #ifdef ERTS_SMP set_wakeup_other_data(); @@ -4858,46 +4906,166 @@ erts_get_scheduler_data(void) #endif +static Process * +make_proxy_proc(Process *prev_proxy, Process *proc, erts_aint32_t prio) +{ + erts_aint32_t state; + Process *proxy; +#ifdef ERTS_SMP + ErtsRunQueue *rq = RUNQ_READ_RQ(&proc->run_queue); +#endif + + state = (ERTS_PSFLG_PROXY + | ERTS_PSFLG_IN_RUNQ + | (((erts_aint32_t) 1) << (prio + ERTS_PSFLGS_IN_PRQ_MASK_OFFSET)) + | (prio << ERTS_PSFLGS_PRQ_PRIO_OFFSET) + | (prio << ERTS_PSFLGS_USR_PRIO_OFFSET) + | (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET)); + + if (prev_proxy) { + proxy = prev_proxy; + ASSERT(erts_smp_atomic32_read_nob(&proxy->state) & ERTS_PSFLG_PROXY); + erts_smp_atomic32_set_nob(&proxy->state, state); +#ifdef ERTS_SMP + RUNQ_SET_RQ(&proc->run_queue, rq); +#endif + } + else { + proxy = erts_alloc(ERTS_ALC_T_PROC, sizeof(Process)); +#ifdef DEBUG + { + int i; + Uint32 *ui32 = (Uint32 *) (char *) proxy; + for (i = 0; i < sizeof(Process)/sizeof(Uint32); i++) + ui32[i] = (Uint32) 0xdeadbeef; + } +#endif + erts_smp_atomic32_init_nob(&proxy->state, state); +#ifdef ERTS_SMP + erts_smp_atomic_init_nob(&proxy->run_queue, + erts_smp_atomic_read_nob(&proc->run_queue)); +#endif + } + + proxy->common.id = proc->common.id; + + return proxy; +} + +static ERTS_INLINE void +free_proxy_proc(Process *proxy) +{ + ASSERT(erts_smp_atomic32_read_nob(&proxy->state) & ERTS_PSFLG_PROXY); + erts_free(ERTS_ALC_T_PROC, proxy); +} + + +static ERTS_INLINE int +check_enqueue_in_prio_queue(erts_aint32_t *prq_prio_p, + erts_aint32_t *newp, + erts_aint32_t actual) +{ + erts_aint32_t aprio, qbit, max_qbit; + + aprio = (actual >> ERTS_PSFLGS_ACT_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK; + qbit = 1 << aprio; + + *prq_prio_p = aprio; + + max_qbit = (actual >> ERTS_PSFLGS_IN_PRQ_MASK_OFFSET) & ERTS_PSFLGS_QMASK; + max_qbit |= 1 << ERTS_PSFLGS_QMASK_BITS; + max_qbit &= -max_qbit; + /* + * max_qbit now either contain bit set for highest prio queue or a bit + * out of range (which will have a value larger than valid range). + */ + + if (qbit >= max_qbit) + return 0; /* Already queued in higher or equal prio */ + + /* Need to enqueue (if already enqueued, it is in lower prio) */ + *newp |= qbit << ERTS_PSFLGS_IN_PRQ_MASK_OFFSET; + + if ((actual & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLGS_USR_PRIO_MASK)) + != (aprio << ERTS_PSFLGS_USR_PRIO_OFFSET)) { + /* + * Process struct already enqueued, or actual prio not + * equal to user prio, i.e., enqueue using proxy. + */ + return -1; + } + + /* + * Enqueue using process struct. + */ + *newp &= ~ERTS_PSFLGS_PRQ_PRIO_MASK; + *newp |= ERTS_PSFLG_IN_RUNQ | (aprio << ERTS_PSFLGS_PRQ_PRIO_OFFSET); + return 1; +} + /* * scheduler_out_process() return with c_rq locked. */ static ERTS_INLINE int -schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p) +schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Process *proxy) { - erts_aint32_t a, e, n; + erts_aint32_t a, e, n, enq_prio = -1; int res = 0; + int enqueue; /* < 0 -> use proxy */ a = state; while (1) { n = e = a; - ASSERT(a & ERTS_PSFLG_RUNNING); - ASSERT(!(a & ERTS_PSFLG_IN_RUNQ)); + ASSERT(a & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS)); - n &= ~ERTS_PSFLG_RUNNING; - if ((a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) - n |= ERTS_PSFLG_IN_RUNQ; + enqueue = 0; + + n &= ~(ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS); + if (a & ERTS_PSFLG_ACTIVE_SYS + || (a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) { + enqueue = check_enqueue_in_prio_queue(&enq_prio, &n, a); + } a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); if (a == e) break; } - if (!(n & ERTS_PSFLG_IN_RUNQ)) { - if (erts_system_profile_flags.runnable_procs) - profile_runnable_proc(p, am_inactive); + if (!enqueue) { + + if (erts_system_profile_flags.runnable_procs) { + + if (!(a & ERTS_PSFLG_ACTIVE_SYS) + && (!(a & ERTS_PSFLG_ACTIVE) + || (a & ERTS_PSFLG_SUSPENDED))) { + /* Process inactive */ + profile_runnable_proc(p, am_inactive); + } + } + + if (proxy) + free_proxy_proc(proxy); } else { - int prio = (int) (ERTS_PSFLG_PRIO_MASK & n); + Process *sched_p; ErtsRunQueue *runq = erts_get_runq_proc(p); - ASSERT(!(n & ERTS_PSFLG_SUSPENDED)); + ASSERT(!(n & ERTS_PSFLG_SUSPENDED) || (n & ERTS_PSFLG_ACTIVE_SYS)); + + if (enqueue < 0) + sched_p = make_proxy_proc(proxy, p, enq_prio); + else { + sched_p = p; + if (proxy) + free_proxy_proc(proxy); + } #ifdef ERTS_SMP if (!(ERTS_PSFLG_BOUND & n)) { - ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio); + ErtsRunQueue *new_runq = erts_check_emigration_need(runq, enq_prio); if (new_runq) { - RUNQ_SET_RQ(&p->run_queue, new_runq); + RUNQ_SET_RQ(&sched_p->run_queue, new_runq); runq = new_runq; } } @@ -4908,7 +5076,7 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p) erts_smp_runq_lock(runq); /* Enqueue the process */ - enqueue_process(runq, prio, p); + enqueue_process(runq, (int) enq_prio, sched_p); if (runq == c_rq) return res; @@ -4920,14 +5088,13 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p) } static ERTS_INLINE void -add2runq(Process *p, erts_aint32_t state) +add2runq(Process *p, erts_aint32_t state, erts_aint32_t prio) { - int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); ErtsRunQueue *runq = erts_get_runq_proc(p); #ifdef ERTS_SMP if (!(ERTS_PSFLG_BOUND & state)) { - ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio); + ErtsRunQueue *new_runq = erts_check_emigration_need(runq, (int) prio); if (new_runq) { RUNQ_SET_RQ(&p->run_queue, new_runq); runq = new_runq; @@ -4939,101 +5106,236 @@ add2runq(Process *p, erts_aint32_t state) erts_smp_runq_lock(runq); /* Enqueue the process */ - enqueue_process(runq, prio, p); + enqueue_process(runq, (int) prio, p); erts_smp_runq_unlock(runq); smp_notify_inc_runq(runq); } -static ERTS_INLINE void -schedule_process(Process *p, erts_aint32_t state, int active_enq) +static ERTS_INLINE int +change_proc_schedule_state(Process *p, + erts_aint32_t clear_state_flags, + erts_aint32_t set_state_flags, + erts_aint32_t *statep, + erts_aint32_t *enq_prio_p) { - erts_aint32_t a = state, n; + /* + * NOTE: ERTS_PSFLG_RUNNING, ERTS_PSFLG_RUNNING_SYS and + * ERTS_PSFLG_ACTIVE_SYS are not allowed to be + * altered by this function! + */ + erts_aint32_t a = *statep, n; + int enqueue; /* < 0 -> use proxy */ + + ASSERT(!(a & ERTS_PSFLG_PROXY)); + ASSERT((clear_state_flags & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_ACTIVE_SYS)) == 0); + ASSERT((set_state_flags & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_ACTIVE_SYS)) == 0); while (1) { erts_aint32_t e; n = e = a; + enqueue = 0; + if (a & ERTS_PSFLG_FREE) - return; /* We don't want to schedule free processes... */ - n |= ERTS_PSFLG_ACTIVE; - if (!(a & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_RUNNING))) - n |= ERTS_PSFLG_IN_RUNQ; + break; /* We don't want to schedule free processes... */ + + if (clear_state_flags) + n &= ~clear_state_flags; + + if (set_state_flags) + n |= set_state_flags; + + if ((n & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_IN_RUNQ + | ERTS_PSFLG_ACTIVE)) == ERTS_PSFLG_ACTIVE) { + /* + * Active and seemingly need to be enqueued, but + * process may be in a run queue via proxy, need + * further inspection... + */ + enqueue = check_enqueue_in_prio_queue(enq_prio_p, &n, a); + } + a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); if (a == e) break; - if (!active_enq && (a & ERTS_PSFLG_ACTIVE)) - return; /* Someone else activated process ... */ + if (enqueue == 0 && n == a) + break; } - if (erts_system_profile_flags.runnable_procs - && !(a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) { - profile_runnable_proc(p, am_active); + if (erts_system_profile_flags.runnable_procs) { + + if (((n & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_ACTIVE)) == ERTS_PSFLG_ACTIVE) + && (!(a & (ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS) + && (!(a & ERTS_PSFLG_ACTIVE) + || (a & ERTS_PSFLG_SUSPENDED))))) { + /* We activated a prevously inactive process */ + profile_runnable_proc(p, am_active); + } + } - if ((n & ERTS_PSFLG_IN_RUNQ) && !(a & ERTS_PSFLG_IN_RUNQ)) - add2runq(p, n); + *statep = a; + + return enqueue; +} + +static ERTS_INLINE void +schedule_process(Process *p, erts_aint32_t in_state) +{ + erts_aint32_t enq_prio = -1; + erts_aint32_t state = in_state; + int enqueue = change_proc_schedule_state(p, + 0, + ERTS_PSFLG_ACTIVE, + &state, + &enq_prio); + if (enqueue) + add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), + state, + enq_prio); } void erts_schedule_process(Process *p, erts_aint32_t state) { - schedule_process(p, state, 0); + schedule_process(p, state); +} + +static void +schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy) +{ + erts_aint32_t a = state, n, enq_prio = -1; + int enqueue; /* < 0 -> use proxy */ + + ASSERT(!(state & ERTS_PSFLG_PROXY)); + + while (1) { + erts_aint32_t e; + n = e = a; + + if (a & ERTS_PSFLG_FREE) + return; /* We don't want to schedule free processes... */ + + enqueue = 0; + n |= ERTS_PSFLG_ACTIVE_SYS; + if (!(a & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS))) + enqueue = check_enqueue_in_prio_queue(&enq_prio, &n, a); + a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); + if (a == e) + break; + if (a == n && !enqueue) + goto cleanup; + } + + if (erts_system_profile_flags.runnable_procs) { + + if (!(a & (ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS)) + && (!(a & ERTS_PSFLG_ACTIVE) || (a & ERTS_PSFLG_SUSPENDED))) { + /* We activated a prevously inactive process */ + profile_runnable_proc(p, am_active); + } + + } + + if (enqueue) { + Process *sched_p; + if (enqueue > 0) + sched_p = p; + else { + sched_p = make_proxy_proc(proxy, p, enq_prio); + proxy = NULL; + } + add2runq(sched_p, n, enq_prio); + } + +cleanup: + if (proxy) + free_proxy_proc(proxy); } static ERTS_INLINE int suspend_process(Process *c_p, Process *p) { - erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); + erts_aint32_t state; int suspended = 0; ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); + state = erts_smp_atomic32_read_acqb(&p->state); + if ((state & ERTS_PSFLG_SUSPENDED)) suspended = -1; else { if (c_p == p) { state = erts_smp_atomic32_read_bor_relb(&p->state, ERTS_PSFLG_SUSPENDED); - state |= ERTS_PSFLG_SUSPENDED; ASSERT(state & ERTS_PSFLG_RUNNING); - suspended = 1; + suspended = (state & ERTS_PSFLG_SUSPENDED) ? -1: 1; } else { while (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_EXITING))) { - erts_aint32_t e, n; + erts_aint32_t n, e; + n = e = state; n |= ERTS_PSFLG_SUSPENDED; state = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e); if (state == e) { - state = n; suspended = 1; break; } + if (state & ERTS_PSFLG_SUSPENDED) { + suspended = -1; + break; + } } } } - if (state & ERTS_PSFLG_SUSPENDED) { + if (suspended) { ASSERT(!(ERTS_PSFLG_RUNNING & state) || p == erts_get_current_process()); - if (erts_system_profile_flags.runnable_procs - && (p->rcount == 0) - && (state & ERTS_PSFLG_ACTIVE)) { - profile_runnable_proc(p, am_inactive); + if (suspended > 0 && erts_system_profile_flags.runnable_procs) { + + /* 'state' is before our change... */ + + if ((state & (ERTS_PSFLG_ACTIVE + | ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) { + /* We made process inactive */ + profile_runnable_proc(p, am_inactive); + } + } p->rcount++; /* count number of suspend */ } + return suspended; } static ERTS_INLINE void resume_process(Process *p) { - erts_aint32_t state; + erts_aint32_t state, enq_prio = -1; + int enqueue; + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); ASSERT(p->rcount > 0); @@ -5041,14 +5343,16 @@ resume_process(Process *p) if (--p->rcount > 0) /* multiple suspend */ return; - state = erts_smp_atomic32_read_band_mb(&p->state, ~ERTS_PSFLG_SUSPENDED); - state &= ~ERTS_PSFLG_SUSPENDED; - if ((state & (ERTS_PSFLG_EXITING - | ERTS_PSFLG_ACTIVE - | ERTS_PSFLG_IN_RUNQ - | ERTS_PSFLG_RUNNING)) == ERTS_PSFLG_ACTIVE) { - schedule_process(p, state, 1); - } + state = erts_smp_atomic32_read_nob(&p->state); + enqueue = change_proc_schedule_state(p, + ERTS_PSFLG_SUSPENDED, + 0, + &state, + &enq_prio); + if (enqueue) + add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), + state, + enq_prio); } int @@ -6032,7 +6336,8 @@ pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks, goto done; } else { - if (!(ERTS_PSFLG_RUNNING & erts_smp_atomic32_read_acqb(&rp->state))) + if (!((ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS) + & erts_smp_atomic32_read_acqb(&rp->state))) goto done; } @@ -6695,7 +7000,7 @@ Eterm erts_get_process_priority(Process *p) { erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); - switch (state & ERTS_PSFLG_PRIO_MASK) { + switch (ERTS_PSFLGS_GET_USR_PRIO(state)) { case PRIORITY_MAX: return am_max; case PRIORITY_HIGH: return am_high; case PRIORITY_NORMAL: return am_normal; @@ -6718,18 +7023,68 @@ erts_set_process_priority(Process *p, Eterm value) } a = erts_smp_atomic32_read_nob(&p->state); - if (nprio == (a & ERTS_PSFLG_PRIO_MASK)) + if (nprio == ERTS_PSFLGS_GET_USR_PRIO(a)) oprio = nprio; else { - erts_aint32_t e, n; + int slocked = 0; + erts_aint32_t e, n, aprio; + + if (a & ERTS_PSFLG_ACTIVE_SYS) { + erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); + slocked = 1; + } + do { - oprio = a & ERTS_PSFLG_PRIO_MASK; + oprio = ERTS_PSFLGS_GET_USR_PRIO(a); n = e = a; - ASSERT(!(a & ERTS_PSFLG_IN_RUNQ)); + if (!(a & (ERTS_PSFLG_ACTIVE_SYS|ERTS_PSFLG_DELAYED_SYS))) + aprio = nprio; + else { + int max_qbit; + + if (!slocked) { + erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); + slocked = 1; + } + + max_qbit = 0; + if (a & ERTS_PSFLG_ACTIVE_SYS) + max_qbit |= p->sys_task_qs->qmask; + if (a & ERTS_PSFLG_DELAYED_SYS) { + ErtsProcSysTaskQs *qs; + qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(p); + ASSERT(qs); + max_qbit |= qs->qmask; + } + max_qbit &= -max_qbit; + switch (max_qbit) { + case MAX_BIT: + aprio = PRIORITY_MAX; + break; + case HIGH_BIT: + aprio = PRIORITY_HIGH; + break; + case NORMAL_BIT: + aprio = PRIORITY_NORMAL; + break; + case LOW_BIT: + aprio = PRIORITY_LOW; + break; + default: + ERTS_INTERNAL_ERROR("Invalid qmask"); + aprio = -1; + } + + if (aprio > nprio) /* low value -> high prio */ + aprio = nprio; + } + + n &= ~(ERTS_PSFLGS_USR_PRIO_MASK + | ERTS_PSFLGS_ACT_PRIO_MASK); + n |= ((nprio << ERTS_PSFLGS_USR_PRIO_OFFSET) + | (aprio << ERTS_PSFLGS_ACT_PRIO_OFFSET)); - n &= ~ERTS_PSFLG_PRIO_MASK; - n |= nprio; a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); } while (a != e); } @@ -6763,6 +7118,7 @@ erts_set_process_priority(Process *p, Eterm value) Process *schedule(Process *p, int calls) { + Process *proxy_p = NULL; ErtsRunQueue *rq; erts_aint_t dt; ErtsSchedulerData *esdp; @@ -6805,6 +7161,8 @@ Process *schedule(Process *p, int calls) actual_reds = reds = 0; erts_smp_runq_lock(rq); } else { + sched_out_proc: + #ifdef ERTS_SMP ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); esdp = p->scheduler_data; @@ -6858,10 +7216,11 @@ Process *schedule(Process *p, int calls) esdp->reductions += reds; - schedule_out_process(rq, state, p); /* Returns with rq locked! */ + schedule_out_process(rq, state, p, proxy_p); /* Returns with rq locked! */ + proxy_p = NULL; ERTS_PROC_REDUCTIONS_EXECUTED(rq, - (int) (state & ERTS_PSFLG_PRIO_MASK), + (int) ERTS_PSFLGS_GET_USR_PRIO(state), reds, actual_reds); @@ -6870,18 +7229,19 @@ Process *schedule(Process *p, int calls) p->scheduler_data = NULL; #endif + erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); if (state & ERTS_PSFLG_FREE) { #ifdef ERTS_SMP ASSERT(esdp->free_process == p); esdp->free_process = NULL; -#else - erts_free_proc(p); +#else + state = erts_smp_atomic32_read_nob(&p->state); + if (!(state & ERTS_PSFLG_IN_RUNQ)) + erts_free_proc(p); #endif } - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); - #ifdef ERTS_SMP ASSERT(!esdp->free_process); #endif @@ -7088,6 +7448,8 @@ Process *schedule(Process *p, int calls) * Find a new process to run. */ pick_next_process: { + erts_aint32_t psflg_band_mask; + erts_aint32_t running_flag; int prio_q; int qmask; @@ -7121,18 +7483,62 @@ Process *schedule(Process *p, int calls) ASSERT(p); /* Wrong qmask in rq->flags? */ + psflg_band_mask = ~(((erts_aint32_t) 1) << (ERTS_PSFLGS_GET_PRQ_PRIO(state) + + ERTS_PSFLGS_IN_PRQ_MASK_OFFSET)); + + if (!(state & ERTS_PSFLG_PROXY)) + psflg_band_mask &= ~ERTS_PSFLG_IN_RUNQ; + else { + proxy_p = p; + p = erts_proc_lookup_raw(proxy_p->common.id); + if (!p) { + free_proxy_proc(proxy_p); + proxy_p = NULL; + goto pick_next_process; + } + state = erts_smp_atomic32_read_nob(&p->state); + } + + + if (state & ERTS_PSFLG_ACTIVE_SYS) + running_flag = ERTS_PSFLG_RUNNING_SYS; + else + running_flag = ERTS_PSFLG_RUNNING; + while (1) { erts_aint32_t exp, new, tmp; tmp = new = exp = state; - new &= ~ERTS_PSFLG_IN_RUNQ; - tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); - if (tmp != ERTS_PSFLG_SUSPENDED) - new |= ERTS_PSFLG_RUNNING; + new &= psflg_band_mask; + if (!(state & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS))) { + tmp = state & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_PENDING_EXIT + | ERTS_PSFLG_ACTIVE_SYS); + if (tmp != ERTS_PSFLG_SUSPENDED) + new |= running_flag; + } state = erts_smp_atomic32_cmpxchg_relb(&p->state, new, exp); if (state == exp) { - tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); - if (tmp == ERTS_PSFLG_SUSPENDED) + if ((state & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_FREE)) + || ((state & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_PENDING_EXIT + | ERTS_PSFLG_ACTIVE_SYS)) + == ERTS_PSFLG_SUSPENDED)) { + if (state & ERTS_PSFLG_FREE) { +#ifdef ERTS_SMP + erts_smp_proc_dec_refc(p); +#else + erts_free_proc(p); +#endif + } + if (proxy_p) { + free_proxy_proc(proxy_p); + proxy_p = NULL; + } goto pick_next_process; + } state = new; break; } @@ -7162,7 +7568,7 @@ Process *schedule(Process *p, int calls) (UWord) esdp->no); int migrated = old && old != esdp->no; - prio = (int) (state & ERTS_PSFLG_PRIO_MASK); + prio = (int) ERTS_PSFLGS_GET_USR_PRIO(state); erts_smp_spin_lock(&erts_sched_stat.lock); erts_sched_stat.prio[prio].total_executed++; @@ -7182,9 +7588,6 @@ Process *schedule(Process *p, int calls) ASSERT(!p->scheduler_data); p->scheduler_data = esdp; #endif - /* Never run a suspended process */ - ASSERT(!(ERTS_PSFLG_SUSPENDED & erts_smp_atomic32_read_nob(&p->state))); - reds = context_reds; if (IS_TRACED(p)) { @@ -7210,21 +7613,783 @@ Process *schedule(Process *p, int calls) erts_check_my_tracer_proc(p); #endif + if (state & ERTS_PSFLG_RUNNING_SYS) { + reds -= execute_sys_tasks(p, &state, reds); + if (reds <= 0) { + p->fcalls = reds; + goto sched_out_proc; + } + + ASSERT(state & ERTS_PSFLG_RUNNING_SYS); + ASSERT(!(state & ERTS_PSFLG_RUNNING)); + + while (1) { + erts_aint32_t n, e; + + if (((state & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_ACTIVE)) != ERTS_PSFLG_ACTIVE) + && !(state & ERTS_PSFLG_EXITING)) + goto sched_out_proc; + + n = e = state; + n &= ~ERTS_PSFLG_RUNNING_SYS; + n |= ERTS_PSFLG_RUNNING; + + state = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); + if (state == e) { + state = n; + break; + } + + ASSERT(state & ERTS_PSFLG_RUNNING_SYS); + ASSERT(!(state & ERTS_PSFLG_RUNNING)); + } + } + if (!(state & ERTS_PSFLG_EXITING) && ((FLAGS(p) & F_FORCE_GC) || (MSO(p).overhead > BIN_VHEAP_SZ(p)))) { reds -= erts_garbage_collect(p, 0, p->arg_reg, p->arity); - if (reds < 0) { - reds = 1; + if (reds <= 0) { + p->fcalls = reds; + goto sched_out_proc; } } + + if (proxy_p) { + free_proxy_proc(proxy_p); + proxy_p = NULL; + } p->fcalls = reds; ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); + + /* Never run a suspended process */ + ASSERT(!(ERTS_PSFLG_SUSPENDED & erts_smp_atomic32_read_nob(&p->state))); + return p; } } +static int +notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, Eterm st_result) +{ + Process *rp = erts_proc_lookup(st->requester); + if (rp) { + ErtsProcLocks rp_locks; + ErlOffHeap *ohp; + ErlHeapFragment* bp; + Eterm *hp, msg, req_id, result; + Uint st_result_sz, hsz; +#ifdef DEBUG + Eterm *hp_start; +#endif + + rp_locks = (c_p == rp) ? ERTS_PROC_LOCK_MAIN : 0; + + st_result_sz = is_immed(st_result) ? 0 : size_object(st_result); + hsz = st->req_id_sz + st_result_sz + 4 /* 3-tuple */; + + hp = erts_alloc_message_heap(hsz, + &bp, + &ohp, + rp, + &rp_locks); + +#ifdef DEBUG + hp_start = hp; +#endif + + req_id = st->req_id_sz == 0 ? st->req_id : copy_struct(st->req_id, + st->req_id_sz, + &hp, + ohp); + + result = st_result_sz == 0 ? st_result : copy_struct(st_result, + st_result_sz, + &hp, + ohp); + + ASSERT(is_immed(st->reply_tag)); + + msg = TUPLE3(hp, st->reply_tag, req_id, result); + +#ifdef DEBUG + hp += 4; + ASSERT(hp_start + hsz == hp); +#endif + + erts_queue_message(rp, + &rp_locks, + bp, + msg, + NIL +#ifdef USE_VM_PROBES + , NIL +#endif + ); + + if (c_p == rp) + rp_locks &= ~ERTS_PROC_LOCK_MAIN; + + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); + } + + erts_cleanup_offheap(&st->off_heap); + + erts_free(ERTS_ALC_T_PROC_SYS_TSK, st); + + return rp ? 1 : 0; +} + +static ERTS_INLINE ErtsProcSysTask * +fetch_sys_task(Process *c_p, erts_aint32_t state, int *qmaskp, int *priop) +{ + ErtsProcSysTaskQs *unused_qs = NULL; + int qbit, qmask; + ErtsProcSysTask *st, **qp; + + *priop = -1; /* Shut up annoying erroneous warning */ + + erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); + + if (!c_p->sys_task_qs) { + qmask = 0; + st = NULL; + goto update_state; + } + + qmask = c_p->sys_task_qs->qmask; + + if ((state & (ERTS_PSFLG_ACTIVE + | ERTS_PSFLG_EXITING + | ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) { + /* No sys tasks if we got exclusively higher prio user work to do */ + st = NULL; + switch (ERTS_PSFLGS_GET_USR_PRIO(state)) { + case PRIORITY_MAX: + if (!(qmask & MAX_BIT)) + goto done; + break; + case PRIORITY_HIGH: + if (!(qmask & (MAX_BIT|HIGH_BIT))) + goto done; + break; + default: + break; + } + } + + qbit = qmask & -qmask; + switch (qbit) { + case MAX_BIT: + qp = &c_p->sys_task_qs->q[PRIORITY_MAX]; + *priop = PRIORITY_MAX; + break; + case HIGH_BIT: + qp = &c_p->sys_task_qs->q[PRIORITY_HIGH]; + *priop = PRIORITY_HIGH; + break; + case NORMAL_BIT: + if (!(qmask & PRIORITY_LOW) + || ++c_p->sys_task_qs->ncount <= RESCHEDULE_LOW) { + qp = &c_p->sys_task_qs->q[PRIORITY_NORMAL]; + *priop = PRIORITY_NORMAL; + break; + } + c_p->sys_task_qs->ncount = 0; + /* Fall through */ + case LOW_BIT: + qp = &c_p->sys_task_qs->q[PRIORITY_LOW]; + *priop = PRIORITY_LOW; + break; + default: + ERTS_INTERNAL_ERROR("Invalid qmask"); + } + + st = *qp; + ASSERT(st); + if (st->next != st) { + *qp = st->next; + st->next->prev = st->prev; + st->prev->next = st->next; + } + else { + erts_aint32_t a, e, n, st_prio, qmask2; + + *qp = NULL; + qmask &= ~qbit; + c_p->sys_task_qs->qmask = qmask; + + update_state: + + qmask2 = qmask; + + if (state & ERTS_PSFLG_DELAYED_SYS) { + ErtsProcSysTaskQs *qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(c_p); + ASSERT(qs); + qmask2 |= qs->qmask; + } + + switch (qmask2 & -qmask2) { + case MAX_BIT: + st_prio = PRIORITY_MAX; + break; + case HIGH_BIT: + st_prio = PRIORITY_HIGH; + break; + case NORMAL_BIT: + st_prio = PRIORITY_NORMAL; + break; + case LOW_BIT: + case 0: + st_prio = PRIORITY_LOW; + break; + default: + ERTS_INTERNAL_ERROR("Invalid qmask"); + } + + if (!qmask) { + unused_qs = c_p->sys_task_qs; + c_p->sys_task_qs = NULL; + } + + a = state; + do { + erts_aint32_t prio = ERTS_PSFLGS_GET_USR_PRIO(a); + + if (prio > st_prio) + prio = st_prio; + + n = e = a; + + n &= ~ERTS_PSFLGS_ACT_PRIO_MASK; + n |= (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET); + + if (!qmask) + n &= ~ERTS_PSFLG_ACTIVE_SYS; + + if (a == n) + break; + a = erts_smp_atomic32_cmpxchg_nob(&c_p->state, n, e); + } while (a != e); + } + +done: + + erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS); + + if (unused_qs) + proc_sys_task_queues_free(unused_qs); + + *qmaskp = qmask; + + return st; +} + +static void save_gc_task(Process *c_p, ErtsProcSysTask *st, int prio); + +static int +execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) +{ + int garbage_collected = 0; + erts_aint32_t state = *statep; + int max_reds = in_reds; + int reds = 0; + int qmask = 0; + + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); + + do { + ErtsProcSysTask *st; + int st_prio; + Eterm st_res; + + if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { +#ifdef ERTS_SMP + if (state & ERTS_PSFLG_PENDING_EXIT) + erts_handle_pending_exit(c_p, ERTS_PROC_LOCK_MAIN); +#endif + ASSERT(ERTS_PROC_IS_EXITING(c_p)); + break; + } + + st = fetch_sys_task(c_p, state, &qmask, &st_prio); + if (!st) + break; + + switch (st->type) { + case ERTS_PSTT_GC: + if (c_p->flags & F_DISABLE_GC) { + save_gc_task(c_p, st, st_prio); + st = NULL; + reds++; + } + else { + if (!garbage_collected) { + FLAGS(c_p) |= F_NEED_FULLSWEEP; + reds += erts_garbage_collect(c_p, + 0, + c_p->arg_reg, + c_p->arity); + garbage_collected = 1; + } + st_res = am_true; + } + break; + case ERTS_PSTT_CPC: + st_res = erts_check_process_code(c_p, + st->arg[0], + st->arg[1] == am_true, + &reds); + if (is_non_value(st_res)) { + /* Needed gc, but gc was disabled */ + save_gc_task(c_p, st, st_prio); + st = NULL; + } + break; + default: + ERTS_INTERNAL_ERROR("Invalid process sys task type"); + st_res = am_false; + } + + if (st) + reds += notify_sys_task_executed(c_p, st, st_res); + + state = erts_smp_atomic32_read_acqb(&c_p->state); + } while (qmask && reds < max_reds); + + *statep = state; + + return reds; +} + +static int +cleanup_sys_tasks(Process *c_p, erts_aint32_t in_state, int in_reds) +{ + erts_aint32_t state = in_state; + int max_reds = in_reds; + int reds = 0; + int qmask = 0; + + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); + + do { + ErtsProcSysTask *st; + Eterm st_res; + int st_prio; + + st = fetch_sys_task(c_p, state, &qmask, &st_prio); + if (!st) + break; + + switch (st->type) { + case ERTS_PSTT_GC: + st_res = am_false; + break; + case ERTS_PSTT_CPC: + st_res = am_false; + break; + default: + ERTS_INTERNAL_ERROR("Invalid process sys task type"); + st_res = am_false; + break; + } + + reds += notify_sys_task_executed(c_p, st, st_res); + + state = erts_smp_atomic32_read_acqb(&c_p->state); + } while (qmask && reds < max_reds); + + return reds; +} + +BIF_RETTYPE +erts_internal_request_system_task_3(BIF_ALIST_3) +{ + Process *rp = erts_proc_lookup(BIF_ARG_1); + ErtsProcSysTaskQs *stqs, *free_stqs = NULL; + ErtsProcSysTask *st = NULL; + erts_aint32_t prio, rp_state; + int rp_locked; + Eterm noproc_res, req_type; + + if (!rp && !is_internal_pid(BIF_ARG_1)) { + if (!is_external_pid(BIF_ARG_1)) + goto badarg; + if (external_pid_dist_entry(BIF_ARG_1) != erts_this_dist_entry) + goto badarg; + } + + switch (BIF_ARG_2) { + case am_max: prio = PRIORITY_MAX; break; + case am_high: prio = PRIORITY_HIGH; break; + case am_normal: prio = PRIORITY_NORMAL; break; + case am_low: prio = PRIORITY_LOW; break; + default: goto badarg; + } + + if (is_not_tuple(BIF_ARG_3)) + goto badarg; + else { + int i; + Eterm *tp = tuple_val(BIF_ARG_3); + Uint arity = arityval(*tp); + Eterm req_id; + Uint req_id_sz; + Eterm arg[ERTS_MAX_PROC_SYS_TASK_ARGS]; + Uint arg_sz[ERTS_MAX_PROC_SYS_TASK_ARGS]; + Uint tot_sz; + Eterm *hp; + + if (arity < 2) + goto badarg; + if (arity > 2 + ERTS_MAX_PROC_SYS_TASK_ARGS) + goto badarg; + req_type = tp[1]; + req_id = tp[2]; + req_id_sz = is_immed(req_id) ? req_id : size_object(req_id); + tot_sz = req_id_sz; + for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++) { + int tix = 3 + i; + if (tix > arity) { + arg[i] = THE_NON_VALUE; + arg_sz[i] = 0; + } + else { + arg[i] = tp[tix]; + if (is_immed(arg[i])) + arg_sz[i] = 0; + else { + arg_sz[i] = size_object(arg[i]); + tot_sz += arg_sz[i]; + } + } + } + st = erts_alloc(ERTS_ALC_T_PROC_SYS_TSK, + ERTS_PROC_SYS_TASK_SIZE(tot_sz)); + st->next = st->prev = st; /* Prep for empty prio queue */ + ERTS_INIT_OFF_HEAP(&st->off_heap); + hp = &st->heap[0]; + + st->requester = BIF_P->common.id; + st->reply_tag = req_type; + st->req_id_sz = req_id_sz; + st->req_id = req_id_sz == 0 ? req_id : copy_struct(req_id, + req_id_sz, + &hp, + &st->off_heap); + + for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++) + st->arg[i] = arg_sz[i] == 0 ? arg[i] : copy_struct(arg[i], + arg_sz[i], + &hp, + &st->off_heap); + ASSERT(&st->heap[0] + tot_sz == hp); + } + + switch (req_type) { + + case am_garbage_collect: + st->type = ERTS_PSTT_GC; + noproc_res = am_false; + if (!rp) + goto noproc; + break; + + case am_check_process_code: + if (is_not_atom(st->arg[0])) + goto badarg; + if (st->arg[1] != am_true && st->arg[1] != am_false) + goto badarg; + noproc_res = am_false; + st->type = ERTS_PSTT_CPC; + if (!rp) + goto noproc; + break; + + default: + goto badarg; + } + + rp_state = erts_smp_atomic32_read_nob(&rp->state); + + rp_locked = 0; + + free_stqs = NULL; + if (rp_state & ERTS_PSFLG_ACTIVE_SYS) + stqs = NULL; + else { + alloc_qs: + stqs = proc_sys_task_queues_alloc(); + stqs->qmask = 1 << prio; + stqs->ncount = 0; + stqs->q[PRIORITY_MAX] = NULL; + stqs->q[PRIORITY_HIGH] = NULL; + stqs->q[PRIORITY_NORMAL] = NULL; + stqs->q[PRIORITY_LOW] = NULL; + stqs->q[prio] = st; + } + + if (!rp_locked) { + rp_locked = 1; + erts_smp_proc_lock(rp, ERTS_PROC_LOCK_STATUS); + + rp_state = erts_smp_atomic32_read_nob(&rp->state); + if (rp_state & ERTS_PSFLG_EXITING) { + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); + rp = NULL; + free_stqs = stqs; + goto noproc; + } + } + + if (!rp->sys_task_qs) { + if (stqs) + rp->sys_task_qs = stqs; + else + goto alloc_qs; + } + else { + if (stqs) + free_stqs = stqs; + stqs = rp->sys_task_qs; + if (!stqs->q[prio]) { + stqs->q[prio] = st; + stqs->qmask |= 1 << prio; + } + else { + st->next = stqs->q[prio]; + st->prev = stqs->q[prio]->prev; + st->next->prev = st; + st->prev->next = st; + ASSERT(stqs->qmask & (1 << prio)); + } + } + + if (ERTS_PSFLGS_GET_ACT_PRIO(rp_state) > prio) { + erts_aint32_t n, a, e; + /* Need to elevate actual prio */ + + a = rp_state; + do { + if (ERTS_PSFLGS_GET_ACT_PRIO(a) <= prio) { + n = a; + break; + } + n = e = a; + n &= ~ERTS_PSFLGS_ACT_PRIO_MASK; + n |= (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET); + a = erts_smp_atomic32_cmpxchg_nob(&rp->state, n, e); + } while (a != e); + rp_state = n; + } + + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); + + schedule_process_sys_task(rp, rp_state, NULL); + + if (free_stqs) + proc_sys_task_queues_free(free_stqs); + + BIF_RET(am_ok); + +noproc: + + notify_sys_task_executed(BIF_P, st, noproc_res); + if (free_stqs) + proc_sys_task_queues_free(free_stqs); + BIF_RET(am_ok); + +badarg: + + if (st) { + erts_cleanup_offheap(&st->off_heap); + erts_free(ERTS_ALC_T_PROC_SYS_TSK, st); + } + if (free_stqs) + proc_sys_task_queues_free(free_stqs); + BIF_ERROR(BIF_P, BADARG); +} + +static void +save_gc_task(Process *c_p, ErtsProcSysTask *st, int prio) +{ + erts_aint32_t state; + ErtsProcSysTaskQs *qs; + + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); + + qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(c_p); + if (!qs) { + st->next = st->prev = st; + qs = proc_sys_task_queues_alloc(); + qs->qmask = 1 << prio; + qs->ncount = 0; + qs->q[PRIORITY_MAX] = NULL; + qs->q[PRIORITY_HIGH] = NULL; + qs->q[PRIORITY_NORMAL] = NULL; + qs->q[PRIORITY_LOW] = NULL; + qs->q[prio] = st; + (void) ERTS_PROC_SET_DELAYED_GC_TASK_QS(c_p, ERTS_PROC_LOCK_MAIN, qs); + } + else { + if (!qs->q[prio]) { + st->next = st->prev = st; + qs->q[prio] = st; + qs->qmask |= 1 << prio; + } + else { + st->next = qs->q[prio]; + st->prev = qs->q[prio]->prev; + st->next->prev = st; + st->prev->next = st; + ASSERT(qs->qmask & (1 << prio)); + } + } + + state = erts_smp_atomic32_read_nob(&c_p->state); + ASSERT((ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS) & state); + + while (!(state & ERTS_PSFLG_DELAYED_SYS) + || prio < ERTS_PSFLGS_GET_ACT_PRIO(state)) { + erts_aint32_t n, e; + + n = e = state; + n |= ERTS_PSFLG_DELAYED_SYS; + if (prio < ERTS_PSFLGS_GET_ACT_PRIO(state)) { + n &= ~ERTS_PSFLGS_ACT_PRIO_MASK; + n |= prio << ERTS_PSFLGS_ACT_PRIO_OFFSET; + } + state = erts_smp_atomic32_cmpxchg_relb(&c_p->state, n, e); + if (state == e) + break; + } +} + +int +erts_set_gc_state(Process *c_p, int enable) +{ + int res; + ErtsProcSysTaskQs *dgc_tsk_qs; + ASSERT(c_p == erts_get_current_process()); + ASSERT((ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS) + & erts_smp_atomic32_read_nob(&c_p->state)); + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); + + res = !(c_p->flags & F_DISABLE_GC); + + if (!enable) { + c_p->flags |= F_DISABLE_GC; + return res; + } + + c_p->flags &= ~F_DISABLE_GC; + + dgc_tsk_qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(c_p); + if (!dgc_tsk_qs) + return res; + + /* Move delayed gc tasks into sys tasks queues. */ + + erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); + + if (!c_p->sys_task_qs) { + c_p->sys_task_qs = dgc_tsk_qs; + dgc_tsk_qs = NULL; + } + else { + ErtsProcSysTaskQs *stsk_qs; + int prio; + + /* + * We push delayed tasks to the front of the queue + * since they have already made it to the front + * once and then been delayed after that. + */ + + stsk_qs = c_p->sys_task_qs; + + while (dgc_tsk_qs->qmask) { + int qbit = dgc_tsk_qs->qmask & -dgc_tsk_qs->qmask; + dgc_tsk_qs->qmask &= ~qbit; + switch (qbit) { + case MAX_BIT: + prio = PRIORITY_MAX; + break; + case HIGH_BIT: + prio = PRIORITY_HIGH; + break; + case NORMAL_BIT: + prio = PRIORITY_NORMAL; + break; + case LOW_BIT: + prio = PRIORITY_LOW; + break; + default: + ERTS_INTERNAL_ERROR("Invalid qmask"); + prio = -1; + break; + } + + ASSERT(dgc_tsk_qs->q[prio]); + + if (!stsk_qs->q[prio]) { + stsk_qs->q[prio] = dgc_tsk_qs->q[prio]; + stsk_qs->qmask |= 1 << prio; + } + else { + ErtsProcSysTask *first1, *last1, *first2, *last2; + + ASSERT(stsk_qs->qmask & (1 << prio)); + first1 = dgc_tsk_qs->q[prio]; + last1 = first1->prev; + first2 = stsk_qs->q[prio]; + last2 = first1->prev; + + last1->next = first2; + first2->prev = last1; + + first1->prev = last2; + last2->next = first1; + + stsk_qs->q[prio] = first1; + } + + } + } + +#ifdef DEBUG + { + int qmask; + erts_aint32_t aprio, state = +#endif + + erts_smp_atomic32_read_bset_nob(&c_p->state, + (ERTS_PSFLG_DELAYED_SYS + | ERTS_PSFLG_ACTIVE_SYS), + ERTS_PSFLG_ACTIVE_SYS); + +#ifdef DEBUG + ASSERT(state & ERTS_PSFLG_DELAYED_SYS); + qmask = c_p->sys_task_qs->qmask; + aprio = ERTS_PSFLGS_GET_ACT_PRIO(state); + ASSERT(ERTS_PSFLGS_GET_USR_PRIO(state) >= aprio); + ASSERT((qmask & -qmask) >= (1 << aprio)); + } +#endif + + erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS); + + (void) ERTS_PROC_SET_DELAYED_GC_TASK_QS(c_p, ERTS_PROC_LOCK_MAIN, NULL); + + if (dgc_tsk_qs) + proc_sys_task_queues_free(dgc_tsk_qs); + + return res; +} + void erts_sched_stat_modify(int what) { @@ -7521,7 +8686,8 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). prio = (erts_aint32_t) so->priority; } - state |= (prio & ERTS_PSFLG_PRIO_MASK); + state |= (((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_ACT_PRIO_OFFSET) + | ((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_USR_PRIO_OFFSET)); if (!rq) rq = erts_get_runq_proc(parent); @@ -7599,6 +8765,8 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->bin_old_vheap = 0; p->bin_vheap_mature = 0; + p->sys_task_qs = NULL; + /* No need to initialize p->fcalls. */ p->current = p->initial+INITIAL_MOD; @@ -7764,7 +8932,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). * Schedule process for execution. */ - schedule_process(p, state, 0); + schedule_process(p, state); VERBOSE(DEBUG_PROCESSES, ("Created a new process: %T\n",p->common.id)); @@ -7815,6 +8983,7 @@ void erts_init_empty_process(Process *p) p->bin_vheap_sz = BIN_VH_MIN_SIZE; p->bin_old_vheap_sz = BIN_VH_MIN_SIZE; p->bin_old_vheap = 0; + p->sys_task_qs = NULL; p->bin_vheap_mature = 0; #ifdef ERTS_SMP p->common.u.alive.ptimer = NULL; @@ -8070,33 +9239,21 @@ delete_process(Process* p) p->fvalue = NIL; } -static ERTS_INLINE erts_aint32_t -set_proc_exiting_state(Process *p, erts_aint32_t state) -{ - erts_aint32_t a, n, e; - a = state; - while (1) { - n = e = a; - n &= ~(ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); - n |= ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE; - if (!(a & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING))) - n |= ERTS_PSFLG_IN_RUNQ; - a = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e); - if (a == e) - break; - } - return a; -} - static ERTS_INLINE void set_proc_exiting(Process *p, - erts_aint32_t state, + erts_aint32_t in_state, Eterm reason, ErlHeapFragment *bp) { + erts_aint32_t state = in_state, enq_prio = -1; + int enqueue; ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(p) == ERTS_PROC_LOCKS_ALL); - state = set_proc_exiting_state(p, state); + enqueue = change_proc_schedule_state(p, + ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT, + ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE, + &state, + &enq_prio); p->fvalue = reason; if (bp) @@ -8111,15 +9268,37 @@ set_proc_exiting(Process *p, cancel_timer(p); p->i = (BeamInstr *) beam_exit; - if (erts_system_profile_flags.runnable_procs - && !(state & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) { - profile_runnable_proc(p, am_active); - } - - if (!(state & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING))) - add2runq(p, state); + if (enqueue) + add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), + state, + enq_prio); } +static ERTS_INLINE erts_aint32_t +set_proc_self_exiting(Process *c_p) +{ +#ifdef DEBUG + int enqueue; +#endif + erts_aint32_t state, enq_prio = -1; + + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCKS_ALL); + + state = erts_smp_atomic32_read_nob(&c_p->state); + ASSERT(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS)); + +#ifdef DEBUG + enqueue = +#endif + change_proc_schedule_state(c_p, + ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT, + ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE, + &state, + &enq_prio); + + ASSERT(!enqueue); + return state; +} #ifdef ERTS_SMP @@ -8167,7 +9346,7 @@ handle_pending_exiters(ErtsProcList *pnd_xtrs) if (p) { if (erts_proclist_same(plp, p)) { erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); - if (!(state & ERTS_PSFLG_RUNNING)) { + if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS))) { ASSERT(state & ERTS_PSFLG_PENDING_EXIT); erts_handle_pending_exit(p, ERTS_PROC_LOCKS_ALL); } @@ -8388,7 +9567,7 @@ send_exit_signal(Process *c_p, /* current process if and only } set_proc_exiting(c_p, state, rsn, NULL); } - else if (!(state & ERTS_PSFLG_RUNNING)) { + else if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS))) { /* Process not running ... */ ErtsProcLocks need_locks = ~(*rp_locks) & ERTS_PROC_LOCKS_ALL; if (need_locks @@ -8765,9 +9944,6 @@ resume_suspend_monitor(ErtsSuspendMonitor *smon, void *vc_p) void erts_do_exit_process(Process* p, Eterm reason) { -#ifdef ERTS_SMP - erts_aint32_t state; -#endif p->arity = 0; /* No live registers */ p->fvalue = reason; @@ -8792,10 +9968,9 @@ erts_do_exit_process(Process* p, Eterm reason) #endif #ifndef ERTS_SMP - set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state)); + set_proc_self_exiting(p); #else - state = set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state)); - if (state & ERTS_PSFLG_PENDING_EXIT) { + if (ERTS_PSFLG_PENDING_EXIT & set_proc_self_exiting(p)) { /* Process exited before pending exit was received... */ p->pending_exit.reason = THE_NON_VALUE; if (p->pending_exit.bp) { @@ -8849,6 +10024,7 @@ erts_continue_exit_process(Process *p) DistEntry *dep; struct saved_calls *scb; process_breakpoint_time_t *pbt; + erts_aint32_t state; #ifdef DEBUG int yield_allowed = 1; @@ -8885,6 +10061,13 @@ erts_continue_exit_process(Process *p) p->flags &= ~F_USING_DB; } + erts_set_gc_state(p, 1); + state = erts_smp_atomic32_read_acqb(&p->state); + if (state & ERTS_PSFLG_ACTIVE_SYS) { + if (cleanup_sys_tasks(p, state, CONTEXT_REDS) >= CONTEXT_REDS/2) + goto yield; + } + if (p->flags & F_USING_DDLL) { erts_ddll_proc_dead(p, ERTS_PROC_LOCK_MAIN); p->flags &= ~F_USING_DDLL; @@ -8962,17 +10145,31 @@ erts_continue_exit_process(Process *p) { /* Inactivate and notify free */ erts_aint32_t n, e, a = erts_smp_atomic32_read_nob(&p->state); +#ifdef ERTS_SMP + int refc_inced = 0; +#endif while (1) { n = e = a; ASSERT(a & ERTS_PSFLG_EXITING); n |= ERTS_PSFLG_FREE; n &= ~ERTS_PSFLG_ACTIVE; +#ifdef ERTS_SMP + if ((n & ERTS_PSFLG_IN_RUNQ) && !refc_inced) { + erts_smp_proc_inc_refc(p); + refc_inced = 1; + } +#endif a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); if (a == e) break; } - } +#ifdef ERTS_SMP + if (refc_inced && !(n & ERTS_PSFLG_IN_RUNQ)) + erts_smp_proc_dec_refc(p); +#endif + } + dep = ((p->flags & F_DISTRIBUTION) ? ERTS_PROC_SET_DIST_ENTRY(p, ERTS_PROC_LOCKS_ALL, NULL) : NULL); @@ -9075,7 +10272,7 @@ timeout_proc(Process* p) state = erts_smp_atomic32_read_acqb(&p->state); if (!(state & ERTS_PSFLG_ACTIVE)) - schedule_process(p, state, 0); + schedule_process(p, state); } @@ -9153,7 +10350,9 @@ erts_program_counter_info(int to, void *to_arg, Process *p) print_function_from_pc(to, to_arg, p->cp); erts_print(to, to_arg, ")\n"); state = erts_smp_atomic32_read_acqb(&p->state); - if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_GC))) { + if (!(state & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_GC))) { erts_print(to, to_arg, "arity = %d\n",p->arity); if (!ERTS_IS_CRASH_DUMPING) { /* diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 8d136f6e8b..043621125c 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -631,8 +631,9 @@ erts_smp_reset_max_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi) #define ERTS_PSD_SCHED_ID 2 #define ERTS_PSD_DIST_ENTRY 3 #define ERTS_PSD_CALL_TIME_BP 4 +#define ERTS_PSD_DELAYED_GC_TASK_QS 5 -#define ERTS_PSD_SIZE 5 +#define ERTS_PSD_SIZE 6 typedef struct { void *data[ERTS_PSD_SIZE]; @@ -656,6 +657,9 @@ typedef struct { #define ERTS_PSD_CALL_TIME_BP_GET_LOCKS ERTS_PROC_LOCK_MAIN #define ERTS_PSD_CALL_TIME_BP_SET_LOCKS ERTS_PROC_LOCK_MAIN +#define ERTS_PSD_DELAYED_GC_TASK_QS_GET_LOCKS ERTS_PROC_LOCK_MAIN +#define ERTS_PSD_DELAYED_GC_TASK_QS_SET_LOCKS ERTS_PROC_LOCK_MAIN + typedef struct { ErtsProcLocks get_locks; ErtsProcLocks set_locks; @@ -688,6 +692,9 @@ typedef struct { ErlHeapFragment *bp; } ErtsPendExit; +typedef struct ErtsProcSysTask_ ErtsProcSysTask; +typedef struct ErtsProcSysTaskQs_ ErtsProcSysTaskQs; + #ifdef ERTS_SMP typedef struct ErtsPendingSuspend_ ErtsPendingSuspend; @@ -855,6 +862,8 @@ struct process { Uint64 bin_old_vheap_sz; /* Virtual old heap block size for binaries */ Uint64 bin_old_vheap; /* Virtual old heap size for binaries */ + ErtsProcSysTaskQs *sys_task_qs; + erts_smp_atomic32_t state; /* Process state flags (see ERTS_PSFLG_*) */ #ifdef ERTS_SMP @@ -924,24 +933,67 @@ void erts_check_for_holes(Process* p); # error "Need to increase ERTS_PSFLG_PRIO_SHIFT" #endif -#define ERTS_PSFLG_PRIO_SHIFT 2 +#define ERTS_PSFLGS_PRIO_BITS 2 +#define ERTS_PSFLGS_PRIO_MASK \ + ((((erts_aint32_t) 1) << ERTS_PSFLGS_PRIO_BITS) - 1) -#define ERTS_PSFLG_BIT(N) \ - (((erts_aint32_t) 1) << (ERTS_PSFLG_PRIO_SHIFT + (N))) +#define ERTS_PSFLGS_ACT_PRIO_OFFSET (0*ERTS_PSFLGS_PRIO_BITS) +#define ERTS_PSFLGS_USR_PRIO_OFFSET (1*ERTS_PSFLGS_PRIO_BITS) +#define ERTS_PSFLGS_PRQ_PRIO_OFFSET (2*ERTS_PSFLGS_PRIO_BITS) +#define ERTS_PSFLGS_ZERO_BIT_OFFSET (3*ERTS_PSFLGS_PRIO_BITS) -#define ERTS_PSFLG_PRIO_MASK (ERTS_PSFLG_BIT(0) - 1) +#define ERTS_PSFLGS_QMASK_BITS 4 +#define ERTS_PSFLGS_QMASK \ + ((((erts_aint32_t) 1) << ERTS_PSFLGS_QMASK_BITS) - 1) +#define ERTS_PSFLGS_IN_PRQ_MASK_OFFSET \ + ERTS_PSFLGS_ZERO_BIT_OFFSET -#define ERTS_PSFLG_FREE ERTS_PSFLG_BIT(0) -#define ERTS_PSFLG_EXITING ERTS_PSFLG_BIT(1) -#define ERTS_PSFLG_PENDING_EXIT ERTS_PSFLG_BIT(2) -#define ERTS_PSFLG_ACTIVE ERTS_PSFLG_BIT(3) -#define ERTS_PSFLG_IN_RUNQ ERTS_PSFLG_BIT(4) -#define ERTS_PSFLG_RUNNING ERTS_PSFLG_BIT(5) -#define ERTS_PSFLG_SUSPENDED ERTS_PSFLG_BIT(6) -#define ERTS_PSFLG_GC ERTS_PSFLG_BIT(7) -#define ERTS_PSFLG_BOUND ERTS_PSFLG_BIT(8) -#define ERTS_PSFLG_TRAP_EXIT ERTS_PSFLG_BIT(9) +#define ERTS_PSFLG_BIT(N) \ + (((erts_aint32_t) 1) << (ERTS_PSFLGS_ZERO_BIT_OFFSET + (N))) +/* + * ACT_PRIO -> Active prio, i.e., currently active prio. This + * prio may be higher than user prio. + * USR_PRIO -> User prio. i.e., prio the user has set. + * PRQ_PRIO -> Prio queue prio, i.e., prio queue currently + * enqueued in. + */ +#define ERTS_PSFLGS_ACT_PRIO_MASK \ + (ERTS_PSFLGS_PRIO_MASK << ERTS_PSFLGS_ACT_PRIO_OFFSET) +#define ERTS_PSFLGS_USR_PRIO_MASK \ + (ERTS_PSFLGS_PRIO_MASK << ERTS_PSFLGS_USR_PRIO_OFFSET) +#define ERTS_PSFLGS_PRQ_PRIO_MASK \ + (ERTS_PSFLGS_PRIO_MASK << ERTS_PSFLGS_PRQ_PRIO_OFFSET) +#define ERTS_PSFLG_IN_PRQ_MAX ERTS_PSFLG_BIT(0) +#define ERTS_PSFLG_IN_PRQ_HIGH ERTS_PSFLG_BIT(1) +#define ERTS_PSFLG_IN_PRQ_NORMAL ERTS_PSFLG_BIT(2) +#define ERTS_PSFLG_IN_PRQ_LOW ERTS_PSFLG_BIT(3) +#define ERTS_PSFLG_FREE ERTS_PSFLG_BIT(4) +#define ERTS_PSFLG_EXITING ERTS_PSFLG_BIT(5) +#define ERTS_PSFLG_PENDING_EXIT ERTS_PSFLG_BIT(6) +#define ERTS_PSFLG_ACTIVE ERTS_PSFLG_BIT(7) +#define ERTS_PSFLG_IN_RUNQ ERTS_PSFLG_BIT(8) +#define ERTS_PSFLG_RUNNING ERTS_PSFLG_BIT(9) +#define ERTS_PSFLG_SUSPENDED ERTS_PSFLG_BIT(10) +#define ERTS_PSFLG_GC ERTS_PSFLG_BIT(11) +#define ERTS_PSFLG_BOUND ERTS_PSFLG_BIT(12) +#define ERTS_PSFLG_TRAP_EXIT ERTS_PSFLG_BIT(13) +#define ERTS_PSFLG_ACTIVE_SYS ERTS_PSFLG_BIT(14) +#define ERTS_PSFLG_RUNNING_SYS ERTS_PSFLG_BIT(15) +#define ERTS_PSFLG_PROXY ERTS_PSFLG_BIT(16) +#define ERTS_PSFLG_DELAYED_SYS ERTS_PSFLG_BIT(17) + +#define ERTS_PSFLGS_IN_PRQ_MASK (ERTS_PSFLG_IN_PRQ_MAX \ + | ERTS_PSFLG_IN_PRQ_HIGH \ + | ERTS_PSFLG_IN_PRQ_NORMAL \ + | ERTS_PSFLG_IN_PRQ_LOW) + +#define ERTS_PSFLGS_GET_ACT_PRIO(PSFLGS) \ + (((PSFLGS) >> ERTS_PSFLGS_ACT_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK) +#define ERTS_PSFLGS_GET_USR_PRIO(PSFLGS) \ + (((PSFLGS) >> ERTS_PSFLGS_USR_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK) +#define ERTS_PSFLGS_GET_PRQ_PRIO(PSFLGS) \ + (((PSFLGS) >> ERTS_PSFLGS_USR_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK) /* The sequential tracing token is a tuple of size 5: * @@ -1056,6 +1108,7 @@ extern struct erts_system_profile_flags_t erts_system_profile_flags; #define F_HAVE_BLCKD_MSCHED (1 << 8) /* Process has blocked multi-scheduling */ #define F_P2PNR_RESCHED (1 << 9) /* Process has been rescheduled via erts_pid2proc_not_running() */ #define F_FORCE_GC (1 << 10) /* Force gc at process in-scheduling */ +#define F_DISABLE_GC (1 << 11) /* Disable GC */ /* process trace_flags */ #define F_SENSITIVE (1 << 0) @@ -1146,6 +1199,7 @@ void erts_late_init_process(void); void erts_early_init_scheduling(int); void erts_init_scheduling(int, int); +int erts_set_gc_state(Process *c_p, int enable); Eterm erts_sched_wall_time_request(Process *c_p, int set, int enable); Eterm erts_gc_info_request(Process *c_p); Uint64 erts_get_proc_interval(void); @@ -1591,6 +1645,11 @@ erts_psd_set(Process *p, ErtsProcLocks plocks, int ix, void *data) #define ERTS_PROC_SET_CALL_TIME(P, L, PBT) \ ((process_breakpoint_time_t *) erts_psd_set((P), (L), ERTS_PSD_CALL_TIME_BP, (void *) (PBT))) +#define ERTS_PROC_GET_DELAYED_GC_TASK_QS(P) \ + ((ErtsProcSysTaskQs *) erts_psd_get((P), ERTS_PSD_DELAYED_GC_TASK_QS)) +#define ERTS_PROC_SET_DELAYED_GC_TASK_QS(P, L, PBT) \ + ((ErtsProcSysTaskQs *) erts_psd_set((P), (L), ERTS_PSD_DELAYED_GC_TASK_QS, (void *) (PBT))) + ERTS_GLB_INLINE Eterm erts_proc_get_error_handler(Process *p); ERTS_GLB_INLINE Eterm erts_proc_set_error_handler(Process *p, diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index c1fda3f96c..94bc1b172a 100755 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -650,6 +650,10 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg); Eterm erl_is_function(Process* p, Eterm arg1, Eterm arg2); +/* beam_bif_load.c */ +Eterm erts_check_process_code(Process *c_p, Eterm module, int allow_gc, int *redsp); + + /* beam_load.c */ typedef struct { BeamInstr* current; /* Pointer to: Mod, Name, Arity */ @@ -1121,7 +1125,12 @@ erts_alloc_message_heap_state(Uint size, if (statep) *statep = state; if ((state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) + || (receiver->flags & F_DISABLE_GC) || HEAP_LIMIT(receiver) - HEAP_TOP(receiver) <= size) { + /* + * The heap is either potentially in an inconsistent + * state, or not large enough. + */ #ifdef ERTS_SMP if (locked_main) { *receiver_locks &= ~ERTS_PROC_LOCK_MAIN; diff --git a/erts/emulator/beam/ops.tab b/erts/emulator/beam/ops.tab index 1e5ae46bfa..c29f3f9b1b 100644 --- a/erts/emulator/beam/ops.tab +++ b/erts/emulator/beam/ops.tab @@ -763,17 +763,17 @@ allocate_init t I y ################################################################# # -# The BIFs erlang:check_process_code/2 must be called like a function, +# The BIFs erts_internal:check_process_code/2 must be called like a function, # to ensure that c_p->i (program counter) is set correctly (an ordinary # BIF call doesn't set it). # -call_ext u==2 Bif=u$bif:erlang:check_process_code/2 => i_call_ext Bif -call_ext_last u==2 Bif=u$bif:erlang:check_process_code/2 D => i_call_ext_last Bif D -call_ext_only u==2 Bif=u$bif:erlang:check_process_code/2 => i_call_ext_only Bif +call_ext u==2 Bif=u$bif:erts_internal:check_process_code/2 => i_call_ext Bif +call_ext_last u==2 Bif=u$bif:erts_internal:check_process_code/2 D => i_call_ext_last Bif D +call_ext_only u==2 Bif=u$bif:erts_internal:check_process_code/2 => i_call_ext_only Bif # -# The BIFs erlang:garbage_collect/0,1 must be called like functions, +# The BIFs erlang:garbage_collect/0 must be called like a function, # to allow them to invoke the garbage collector. (The stack pointer must # be saved and p->arity must be zeroed, which is not done on ordinary BIF calls.) # @@ -782,10 +782,6 @@ call_ext u==0 Bif=u$bif:erlang:garbage_collect/0 => i_call_ext Bif call_ext_last u==0 Bif=u$bif:erlang:garbage_collect/0 D => i_call_ext_last Bif D call_ext_only u==0 Bif=u$bif:erlang:garbage_collect/0 => i_call_ext_only Bif -call_ext u==1 Bif=u$bif:erlang:garbage_collect/1 => i_call_ext Bif -call_ext_last u==1 Bif=u$bif:erlang:garbage_collect/1 D => i_call_ext_last Bif D -call_ext_only u==1 Bif=u$bif:erlang:garbage_collect/1 => i_call_ext_only Bif - # # put/2 and erase/1 must be able to do garbage collection, so we must call # them like functions. diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c index 605a625282..297c4bf439 100644 --- a/erts/emulator/beam/utils.c +++ b/erts/emulator/beam/utils.c @@ -1675,7 +1675,7 @@ static int do_send_to_logger(Eterm tag, Eterm gleader, char *buf, int len) p = erts_whereis_process(NULL, 0, am_error_logger, 0, 0); if (p) { erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); - if (state & ERTS_PSFLG_RUNNING) + if (state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS)) p = NULL; } } diff --git a/erts/emulator/hipe/hipe_bif_list.m4 b/erts/emulator/hipe/hipe_bif_list.m4 index 764b8d180c..b1fedf4838 100644 --- a/erts/emulator/hipe/hipe_bif_list.m4 +++ b/erts/emulator/hipe/hipe_bif_list.m4 @@ -151,10 +151,9 @@ standard_bif_interface_0(nbif_ports_0, ports_0) * BIFs and primops that may do a GC (change heap limit and walk the native stack). * XXX: erase/1 and put/2 cannot fail */ -gc_bif_interface_2(nbif_check_process_code_2, hipe_check_process_code_2) +gc_bif_interface_2(nbif_erts_internal_check_process_code_2, hipe_erts_internal_check_process_code_2) gc_bif_interface_1(nbif_erase_1, erase_1) gc_bif_interface_0(nbif_garbage_collect_0, garbage_collect_0) -gc_bif_interface_1(nbif_garbage_collect_1, hipe_garbage_collect_1) gc_nofail_primop_interface_1(nbif_gc_1, hipe_gc) gc_bif_interface_2(nbif_put_2, put_2) diff --git a/erts/emulator/hipe/hipe_native_bif.c b/erts/emulator/hipe/hipe_native_bif.c index 1f76268934..7d343dd91e 100644 --- a/erts/emulator/hipe/hipe_native_bif.c +++ b/erts/emulator/hipe/hipe_native_bif.c @@ -41,8 +41,7 @@ */ /* for -Wmissing-prototypes :-( */ -extern Eterm hipe_check_process_code_2(BIF_ALIST_2); -extern Eterm hipe_garbage_collect_1(BIF_ALIST_1); +extern Eterm hipe_erts_internal_check_process_code_2(BIF_ALIST_2); extern Eterm hipe_show_nstack_1(BIF_ALIST_1); /* Used when a BIF can trigger a stack walk. */ @@ -51,22 +50,12 @@ static __inline__ void hipe_set_narity(Process *p, unsigned int arity) p->hipe.narity = arity; } -Eterm hipe_check_process_code_2(BIF_ALIST_2) +Eterm hipe_erts_internal_check_process_code_2(BIF_ALIST_2) { Eterm ret; hipe_set_narity(BIF_P, 2); - ret = check_process_code_2(BIF_P, BIF__ARGS); - hipe_set_narity(BIF_P, 0); - return ret; -} - -Eterm hipe_garbage_collect_1(BIF_ALIST_1) -{ - Eterm ret; - - hipe_set_narity(BIF_P, 1); - ret = garbage_collect_1(BIF_P, BIF__ARGS); + ret = erts_internal_check_process_code_2(BIF_P, BIF__ARGS); hipe_set_narity(BIF_P, 0); return ret; } diff --git a/erts/emulator/test/code_parallel_load_SUITE.erl b/erts/emulator/test/code_parallel_load_SUITE.erl index 1cfe015ea6..428f1242ab 100644 --- a/erts/emulator/test/code_parallel_load_SUITE.erl +++ b/erts/emulator/test/code_parallel_load_SUITE.erl @@ -159,22 +159,34 @@ setup_checkers(_,0) -> []; setup_checkers(T,N) -> [spawn_link(fun() -> ?model:check(T) end) | setup_checkers(T, N-1)]. check_and_purge_processes_code(Pids, M) -> - check_and_purge_processes_code(Pids, M, []). -check_and_purge_processes_code([], M, []) -> + Tag = make_ref(), + N = request_cpc(Pids, M, Tag), + ok = handle_cpc_responses(N, Tag, M), erlang:purge_module(M), + ok. + +request_cpc(Pid, M, Tag) when is_pid(Pid) -> + erlang:check_process_code(Pid, M, [{async, {Tag, Pid}}]), + 1; +request_cpc(Pids, M, Tag) when is_list(Pids) -> + request_cpc(Pids, M, Tag, 0). + +request_cpc([], _M, _Tag, N) -> + N; +request_cpc([Pid|Pids], M, Tag, N) -> + request_cpc(Pids, M, Tag, N + request_cpc(Pid, M, Tag)). + +handle_cpc_responses(0, _Tag, _Module) -> ok; -check_and_purge_processes_code([], M, Pending) -> - io:format("Processes ~w are still executing old code - retrying.~n", [Pending]), - check_and_purge_processes_code(Pending, M, []); -check_and_purge_processes_code([Pid|Pids], M, Pending) -> - case erlang:check_process_code(Pid, M) of - false -> - check_and_purge_processes_code(Pids, M, Pending); - true -> - check_and_purge_processes_code(Pids, M, [Pid|Pending]) +handle_cpc_responses(N, Tag, Module) -> + receive + {check_process_code, {Tag, _Pid}, false} -> + handle_cpc_responses(N-1, Tag, Module); + {check_process_code, {Tag, Pid}, true} -> + 1 = request_cpc(Pid, Module, Tag), + handle_cpc_responses(N, Tag, Module) end. - generate(Module, Attributes, FunStrings) -> FunForms = function_forms(FunStrings), Forms = [ diff --git a/erts/emulator/test/process_SUITE.erl b/erts/emulator/test/process_SUITE.erl index e3aae17df4..bf31655066 100644 --- a/erts/emulator/test/process_SUITE.erl +++ b/erts/emulator/test/process_SUITE.erl @@ -51,7 +51,13 @@ processes_term_proc_list/1, otp_7738_waiting/1, otp_7738_suspended/1, otp_7738_resume/1, - garb_other_running/1]). + garb_other_running/1, + no_priority_inversion/1, + no_priority_inversion2/1, + system_task_blast/1, + system_task_on_suspended/1, + gc_request_when_gc_disabled/1, + gc_request_blast_when_gc_disabled/1]). -export([prio_server/2, prio_client/2]). -export([init_per_testcase/2, end_per_testcase/2]). @@ -73,7 +79,8 @@ all() -> bad_register, garbage_collect, process_info_messages, process_flag_badarg, process_flag_heap_size, spawn_opt_heap_size, otp_6237, {group, processes_bif}, - {group, otp_7738}, garb_other_running]. + {group, otp_7738}, garb_other_running, + {group, system_task}]. groups() -> [{t_exit_2, [], @@ -87,7 +94,11 @@ groups() -> processes_gc_trap, processes_term_proc_list]}, {otp_7738, [], [otp_7738_waiting, otp_7738_suspended, - otp_7738_resume]}]. + otp_7738_resume]}, + {system_task, [], + [no_priority_inversion, no_priority_inversion2, + system_task_blast, system_task_on_suspended, + gc_request_when_gc_disabled, gc_request_blast_when_gc_disabled]}]. init_per_suite(Config) -> A0 = case application:start(sasl) of @@ -2214,6 +2225,208 @@ garb_other_running(Config) when is_list(Config) -> receive {'DOWN', Mon, process, Pid, normal} -> ok end, ok. +no_priority_inversion(Config) when is_list(Config) -> + Prio = process_flag(priority, max), + HTLs = lists:map(fun (_) -> + spawn_opt(fun () -> + tok_loop() + end, + [{priority, high}, monitor, link]) + end, + lists:seq(1, 2*erlang:system_info(schedulers))), + receive after 500 -> ok end, + LTL = spawn_opt(fun () -> + tok_loop() + end, + [{priority, low}, monitor, link]), + false = erlang:check_process_code(element(1, LTL), nonexisting_module), + true = erlang:garbage_collect(element(1, LTL)), + lists:foreach(fun ({P, _}) -> + unlink(P), + exit(P, kill) + end, [LTL | HTLs]), + lists:foreach(fun ({P, M}) -> + receive + {'DOWN', M, process, P, killed} -> + ok + end + end, [LTL | HTLs]), + process_flag(priority, Prio), + ok. + +no_priority_inversion2(Config) when is_list(Config) -> + Prio = process_flag(priority, max), + MTLs = lists:map(fun (_) -> + spawn_opt(fun () -> + tok_loop() + end, + [{priority, max}, monitor, link]) + end, + lists:seq(1, 2*erlang:system_info(schedulers))), + receive after 500 -> ok end, + {PL, ML} = spawn_opt(fun () -> + tok_loop() + end, + [{priority, low}, monitor, link]), + RL = request_gc(PL, low), + RN = request_gc(PL, normal), + RH = request_gc(PL, high), + receive + {garbage_collect, _, _} -> + ?t:fail(unexpected_gc) + after 1000 -> + ok + end, + RM = request_gc(PL, max), + receive + {garbage_collect, RM, true} -> + ok + end, + lists:foreach(fun ({P, _}) -> + unlink(P), + exit(P, kill) + end, MTLs), + lists:foreach(fun ({P, M}) -> + receive + {'DOWN', M, process, P, killed} -> + ok + end + end, MTLs), + receive + {garbage_collect, RH, true} -> + ok + end, + receive + {garbage_collect, RN, true} -> + ok + end, + receive + {garbage_collect, RL, true} -> + ok + end, + unlink(PL), + exit(PL, kill), + receive + {'DOWN', ML, process, PL, killed} -> + ok + end, + process_flag(priority, Prio), + ok. + +request_gc(Pid, Prio) -> + Ref = make_ref(), + erts_internal:request_system_task(Pid, Prio, {garbage_collect, Ref}), + Ref. + +system_task_blast(Config) when is_list(Config) -> + Me = self(), + GCReq = fun () -> + RL = gc_req(Me, 100), + lists:foreach(fun (R) -> + receive + {garbage_collect, R, true} -> + ok + end + end, RL), + exit(it_worked) + end, + HTLs = lists:map(fun (_) -> spawn_monitor(GCReq) end, lists:seq(1, 1000)), + lists:foreach(fun ({P, M}) -> + receive + {'DOWN', M, process, P, it_worked} -> + ok + end + end, HTLs), + ok. + +gc_req(_Pid, 0) -> + []; +gc_req(Pid, N) -> + R0 = request_gc(Pid, low), + R1 = request_gc(Pid, normal), + R2 = request_gc(Pid, high), + R3 = request_gc(Pid, max), + [R0, R1, R2, R3 | gc_req(Pid, N-1)]. + +system_task_on_suspended(Config) when is_list(Config) -> + {P, M} = spawn_monitor(fun () -> + tok_loop() + end), + true = erlang:suspend_process(P), + {status, suspended} = process_info(P, status), + true = erlang:garbage_collect(P), + {status, suspended} = process_info(P, status), + true = erlang:resume_process(P), + false = ({status, suspended} == process_info(P, status)), + exit(P, kill), + receive + {'DOWN', M, process, P, killed} -> + ok + end. + +gc_request_when_gc_disabled(Config) when is_list(Config) -> + Master = self(), + AIS = erts_debug:set_internal_state(available_internal_state, true), + {P, M} = spawn_opt(fun () -> + true = erts_debug:set_internal_state(gc_state, + false), + Master ! {self(), gc_state, false}, + receive after 1000 -> ok end, + Master ! {self(), gc_state, true}, + false = erts_debug:set_internal_state(gc_state, + true), + receive after 100 -> ok end + end, [monitor, link]), + receive {P, gc_state, false} -> ok end, + ReqId = make_ref(), + async = garbage_collect(P, [{async, ReqId}]), + receive + {garbage_collect, ReqId, Result} -> + ?t:fail({unexpected_gc, Result}); + {P, gc_state, true} -> + ok + end, + receive {garbage_collect, ReqId, true} -> ok end, + erts_debug:set_internal_state(available_internal_state, AIS), + receive {'DOWN', M, process, P, _Reason} -> ok end, + ok. + +gc_request_blast_when_gc_disabled(Config) when is_list(Config) -> + Master = self(), + AIS = erts_debug:set_internal_state(available_internal_state, true), + {P, M} = spawn_opt(fun () -> + true = erts_debug:set_internal_state(gc_state, + false), + Master ! {self(), gc_state, false}, + receive after 1000 -> ok end, + false = erts_debug:set_internal_state(gc_state, + true), + receive after 100 -> ok end + end, [monitor, link]), + receive {P, gc_state, false} -> ok end, + PMs = lists:map(fun (N) -> + Prio = case N rem 4 of + 0 -> max; + 1 -> high; + 2 -> normal; + 3 -> low + end, + spawn_opt(fun () -> + erlang:garbage_collect(P) + end, [monitor, link, {priority, Prio}]) + end, lists:seq(1, 10000)), + lists:foreach(fun ({Proc, Mon}) -> + receive + {'DOWN', Mon, process, Proc, normal} -> + ok + end + end, + PMs), + erts_debug:set_internal_state(available_internal_state, AIS), + receive {'DOWN', M, process, P, _Reason} -> ok end, + ok. + + %% Internal functions wait_until(Fun) -> diff --git a/erts/etc/unix/etp-commands.in b/erts/etc/unix/etp-commands.in index 54ff7b3e3a..73887931cc 100644 --- a/erts/etc/unix/etp-commands.in +++ b/erts/etc/unix/etp-commands.in @@ -1316,49 +1316,102 @@ end define etp-proc-state-int # Args: int # - if ($arg0 & 0xfffff000) + if ($arg0 & 0xff000000) printf "GARBAGE | " end - if ($arg0 & 0x800) + if ($arg0 & 0x800000) + printf "delayed-sys | " + end + if ($arg0 & 0x400000) + printf "proxy | " + set $proxy_process = 1 + else + set $proxy_process = 0 + end + if ($arg0 & 0x200000) + printf "running-sys | " + end + if ($arg0 & 0x100000) + printf "active-sys | " + end + if ($arg0 & 0x80000) printf "trapping-exit | " end - if ($arg0 & 0x400) + if ($arg0 & 0x40000) printf "bound | " end - if ($arg0 & 0x200) + if ($arg0 & 0x20000) printf "garbage-collecting | " end - if ($arg0 & 0x100) + if ($arg0 & 0x10000) printf "suspended | " end - if ($arg0 & 0x80) + if ($arg0 & 0x8000) printf "running | " end - if ($arg0 & 0x40) + if ($arg0 & 0x4000) printf "in-run-queue | " end - if ($arg0 & 0x20) + if ($arg0 & 0x2000) printf "active | " end - if ($arg0 & 0x10) + if ($arg0 & 0x1000) printf "pending-exit | " end - if ($arg0 & 0x8) + if ($arg0 & 0x800) printf "exiting | " end - if ($arg0 & 0x4) + if ($arg0 & 0x400) printf "free | " end - if ($arg0 & 0x3) == 0 - printf "prio-max\n" + if ($arg0 & 0x200) + printf "in-prq-low | " + end + if ($arg0 & 0x100) + printf "in-prq-normal | " + end + if ($arg0 & 0x80) + printf "in-prq-high | " + end + if ($arg0 & 0x40) + printf "in-prq-max | " + end + if ($arg0 & 0x30) == 0x0 + printf "prq-prio-max | " + else + if ($arg0 & 0x30) == 0x10 + printf "prq-prio-high | " + else + if ($arg0 & 0x30) == 0x20 + printf "prq-prio-normal | " + else + printf "prq-prio-low | " + end + end + end + if ($arg0 & 0xc) == 0x0 + printf "usr-prio-max | " + else + if ($arg0 & 0xc) == 0x4 + printf "usr-prio-high | " + else + if ($arg0 & 0xc) == 0x8 + printf "usr-prio-normal | " + else + printf "usr-prio-low | " + end + end + end + if ($arg0 & 0x3) == 0x0 + printf "act-prio-max\n" else - if ($arg0 & 0x3) == 1 - printf "prio-high\n" + if ($arg0 & 0x3) == 0x1 + printf "act-prio-high\n" else - if ($arg0 & 0x3) == 2 - printf "prio-normal\n" + if ($arg0 & 0x3) == 0x2 + printf "act-prio-normal\n" else - printf "prio-low\n" + printf "act-prio-low\n" end end end @@ -1395,6 +1448,12 @@ define etp-process-info etp-1 $arg0->common.id printf "\n State: " etp-proc-state $arg0 + if $proxy_process != 0 + printf " Pointer: (Process *) %p\n", $arg0 + printf " *** PROXY process struct *** refer to: \n" + etp-pid2proc-1 $arg0->common.id + etp-process-info $proc + else if (*(((Uint32 *) &(((Process *) $arg0)->state))) & 0x4) == 0 if ($arg0->common.u.alive.reg) printf " Registered name: " @@ -1432,6 +1491,7 @@ define etp-process-info printf " Parent: " etp-1 $arg0->parent printf "\n Pointer: (Process *) %p\n", $arg0 + end end document etp-process-info diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam Binary files differindex ecb65af214..53891bd1d0 100644 --- a/erts/preloaded/ebin/erlang.beam +++ b/erts/preloaded/ebin/erlang.beam diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam Binary files differindex 881fea4665..0a0fb739aa 100644 --- a/erts/preloaded/ebin/erts_internal.beam +++ b/erts/preloaded/ebin/erts_internal.beam diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index aa50ae7f76..a21da2ecc9 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -81,7 +81,8 @@ -export([binary_to_list/3, binary_to_term/1, binary_to_term/2]). -export([bit_size/1, bitsize/1, bitstr_to_list/1, bitstring_to_list/1]). -export([bump_reductions/1, byte_size/1, call_on_load_function/1]). --export([cancel_timer/1, check_old_code/1, check_process_code/2, crc32/1]). +-export([cancel_timer/1, check_old_code/1, check_process_code/2, + check_process_code/3, crc32/1]). -export([crc32/2, crc32_combine/3, date/0, decode_packet/3]). -export([delete_element/2]). -export([delete_module/1, demonitor/1, demonitor/2, display/1]). @@ -91,7 +92,7 @@ -export([float_to_binary/1, float_to_binary/2, float_to_list/1, float_to_list/2]). -export([fun_info/2, fun_to_list/1, function_exported/3]). --export([garbage_collect/0, garbage_collect/1]). +-export([garbage_collect/0, garbage_collect/1, garbage_collect/2]). -export([garbage_collect_message_area/0, get/0, get/1, get_keys/1]). -export([get_module_info/1, get_stacktrace/0, group_leader/0]). -export([group_leader/2, halt/0, halt/1, halt/2, hash/2, hibernate/3]). @@ -429,11 +430,71 @@ check_old_code(_Module) -> erlang:nif_error(undefined). %% check_process_code/2 --spec check_process_code(Pid, Module) -> boolean() when +-spec check_process_code(Pid, Module) -> CheckResult when Pid :: pid(), - Module :: module(). -check_process_code(_Pid, _Module) -> - erlang:nif_error(undefined). + Module :: module(), + CheckResult :: boolean(). +check_process_code(Pid, Module) -> + try + erlang:check_process_code(Pid, Module, [{allow_gc, true}]) + catch + error:Error -> erlang:error(Error, [Pid, Module]) + end. + +%% check_process_code/3 +-spec check_process_code(Pid, Module, OptionList) -> CheckResult | async when + Pid :: pid(), + Module :: module(), + RequestId :: term(), + Option :: {async, RequestId} | {allow_gc, boolean()}, + OptionList :: [Option], + CheckResult :: boolean() | aborted. +check_process_code(Pid, Module, OptionList) -> + try + {Async, AllowGC} = get_cpc_opts(OptionList, sync, true), + case Async of + {async, ReqId} -> + {priority, Prio} = erlang:process_info(erlang:self(), + priority), + erts_internal:request_system_task(Pid, + Prio, + {check_process_code, + ReqId, + Module, + AllowGC}), + async; + sync -> + case Pid == erlang:self() of + true -> + erts_internal:check_process_code(Module, + [{allow_gc, AllowGC}]); + false -> + {priority, Prio} = erlang:process_info(erlang:self(), + priority), + ReqId = erlang:make_ref(), + erts_internal:request_system_task(Pid, + Prio, + {check_process_code, + ReqId, + Module, + AllowGC}), + receive + {check_process_code, ReqId, CheckResult} -> + CheckResult + end + end + end + catch + error:Error -> erlang:error(Error, [Pid, Module, OptionList]) + end. + +% gets async and allow_gc opts and verify valid option list +get_cpc_opts([{async, _ReqId} = AsyncTuple | Options], _OldAsync, AllowGC) -> + get_cpc_opts(Options, AsyncTuple, AllowGC); +get_cpc_opts([{allow_gc, AllowGC} | Options], Async, _OldAllowGC) -> + get_cpc_opts(Options, Async, AllowGC); +get_cpc_opts([], Async, AllowGC) -> + {Async, AllowGC}. %% crc32/1 -spec erlang:crc32(Data) -> non_neg_integer() when @@ -793,10 +854,61 @@ garbage_collect() -> erlang:nif_error(undefined). %% garbage_collect/1 --spec garbage_collect(Pid) -> boolean() when - Pid :: pid(). -garbage_collect(_Pid) -> - erlang:nif_error(undefined). +-spec garbage_collect(Pid) -> GCResult when + Pid :: pid(), + GCResult :: boolean(). +garbage_collect(Pid) -> + try + erlang:garbage_collect(Pid, []) + catch + error:Error -> erlang:error(Error, [Pid]) + end. + +%% garbage_collect/2 +-spec garbage_collect(Pid, OptionList) -> GCResult | async when + Pid :: pid(), + RequestId :: term(), + Option :: {async, RequestId}, + OptionList :: [Option], + GCResult :: boolean(). +garbage_collect(Pid, OptionList) -> + try + Async = get_gc_opts(OptionList, sync), + case Async of + {async, ReqId} -> + {priority, Prio} = erlang:process_info(erlang:self(), + priority), + erts_internal:request_system_task(Pid, + Prio, + {garbage_collect, ReqId}), + async; + sync -> + case Pid == erlang:self() of + true -> + erlang:garbage_collect(); + false -> + {priority, Prio} = erlang:process_info(erlang:self(), + priority), + ReqId = erlang:make_ref(), + erts_internal:request_system_task(Pid, + Prio, + {garbage_collect, + ReqId}), + receive + {garbage_collect, ReqId, GCResult} -> + GCResult + end + end + end + catch + error:Error -> erlang:error(Error, [Pid, OptionList]) + end. + +% gets async opt and verify valid option list +get_gc_opts([{async, _ReqId} = AsyncTuple | Options], _OldAsync) -> + get_gc_opts(Options, AsyncTuple); +get_gc_opts([], Async) -> + Async. %% garbage_collect_message_area/0 -spec erlang:garbage_collect_message_area() -> boolean(). diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index 8a8cd52d64..c8e8e7e069 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -33,6 +33,10 @@ -export([port_command/3, port_connect/2, port_close/1, port_control/3, port_call/3, port_info/1, port_info/2]). +-export([request_system_task/3]). + +-export([check_process_code/2]). + %% %% Await result of send to port %% @@ -139,3 +143,20 @@ port_info(_Result) -> port_info(_Result, _Item) -> erlang:nif_error(undefined). + +-spec request_system_task(Pid, Prio, Request) -> 'ok' when + Prio :: 'max' | 'high' | 'normal' | 'low', + Request :: {'garbage_collect', term()} + | {'check_process_code', term(), module(), boolean()}, + Pid :: pid(). + +request_system_task(_Pid, _Prio, _Request) -> + erlang:nif_error(undefined). + +-spec check_process_code(Module, OptionList) -> boolean() when + Module :: module(), + Option :: {allow_gc, boolean()}, + OptionList :: [Option]. +check_process_code(_Module, _OptionList) -> + erlang:nif_error(undefined). + diff --git a/lib/kernel/src/code_server.erl b/lib/kernel/src/code_server.erl index 9358e2201e..fc7ac08699 100644 --- a/lib/kernel/src/code_server.erl +++ b/lib/kernel/src/code_server.erl @@ -1410,45 +1410,236 @@ absname_vr([[X, $:]|Name], _, _AbsBase) -> do_purge(Mod0) -> Mod = to_atom(Mod0), case erlang:check_old_code(Mod) of - false -> false; - true -> do_purge(processes(), Mod, false) - end. - -do_purge([P|Ps], Mod, Purged) -> - case erlang:check_process_code(P, Mod) of + false -> + false; true -> - Ref = erlang:monitor(process, P), - exit(P, kill), - receive - {'DOWN',Ref,process,_Pid,_} -> ok + Res = check_proc_code(erlang:processes(), Mod, true), + try + erlang:purge_module(Mod) + catch + _:_ -> ignore end, - do_purge(Ps, Mod, true); - false -> - do_purge(Ps, Mod, Purged) - end; -do_purge([], Mod, Purged) -> - catch erlang:purge_module(Mod), - Purged. + Res + end. %% do_soft_purge(Module) %% Purge old code only if no procs remain that run old code. %% Return true in that case, false if procs remain (in this %% case old code is not purged) -do_soft_purge(Mod) -> +do_soft_purge(Mod0) -> + Mod = to_atom(Mod0), case erlang:check_old_code(Mod) of - false -> true; - true -> do_soft_purge(processes(), Mod) + false -> + true; + true -> + case check_proc_code(erlang:processes(), Mod, false) of + false -> + false; + true -> + try + erlang:purge_module(Mod) + catch + _:_ -> ignore + end, + true + end end. -do_soft_purge([P|Ps], Mod) -> - case erlang:check_process_code(P, Mod) of - true -> false; - false -> do_soft_purge(Ps, Mod) +%% +%% check_proc_code(Pids, Mod, Hard) - Send asynchronous +%% requests to all processes to perform a check_process_code +%% operation. Each process will check their own state and +%% reply with the result. If 'Hard' equals +%% - true, processes that refer 'Mod' will be killed. If +%% any processes were killed true is returned; otherwise, +%% false. +%% - false, and any processes refer 'Mod', false will +%% returned; otherwise, true. +%% +%% Requests will be sent to all processes identified by +%% Pids at once, but without allowing GC to be performed. +%% Check process code operations that are aborted due to +%% GC need, will be restarted allowing GC. However, only +%% ?MAX_CPC_GC_PROCS outstanding operation allowing GC at +%% a time will be allowed. This in order not to blow up +%% memory wise. +%% +%% We also only allow ?MAX_CPC_NO_OUTSTANDING_KILLS +%% outstanding kills. This both in order to avoid flooding +%% our message queue with 'DOWN' messages and limiting the +%% amount of memory used to keep references to all +%% outstanding kills. +%% + +%% We maybe should allow more than two outstanding +%% GC requests, but for now we play it safe... +-define(MAX_CPC_GC_PROCS, 2). +-define(MAX_CPC_NO_OUTSTANDING_KILLS, 10). + +-record(cpc_static, {hard, module, tag}). + +-record(cpc_kill, {outstanding = [], + no_outstanding = 0, + waiting = [], + killed = false}). + +check_proc_code(Pids, Mod, Hard) -> + Tag = erlang:make_ref(), + CpcS = #cpc_static{hard = Hard, + module = Mod, + tag = Tag}, + check_proc_code(CpcS, cpc_init(CpcS, Pids, 0), 0, [], #cpc_kill{}, true). + +check_proc_code(#cpc_static{hard = true}, 0, 0, [], + #cpc_kill{outstanding = [], waiting = [], killed = Killed}, + true) -> + %% No outstanding requests. We did a hard check, so result is whether or + %% not we killed any processes... + Killed; +check_proc_code(#cpc_static{hard = false}, 0, 0, [], _KillState, Success) -> + %% No outstanding requests and we did a soft check... + Success; +check_proc_code(#cpc_static{hard = false, tag = Tag} = CpcS, NoReq0, NoGcReq0, + [], _KillState, false) -> + %% Failed soft check; just cleanup the remaining replies corresponding + %% to the requests we've sent... + {NoReq1, NoGcReq1} = receive + {check_process_code, {Tag, _P, GC}, _Res} -> + case GC of + false -> {NoReq0-1, NoGcReq0}; + true -> {NoReq0, NoGcReq0-1} + end + end, + check_proc_code(CpcS, NoReq1, NoGcReq1, [], _KillState, false); +check_proc_code(#cpc_static{tag = Tag} = CpcS, NoReq0, NoGcReq0, NeedGC0, + KillState0, Success) -> + + %% Check if we should request a GC operation + {NoGcReq1, NeedGC1} = case NoGcReq0 < ?MAX_CPC_GC_PROCS of + GcOpAllowed when GcOpAllowed == false; + NeedGC0 == [] -> + {NoGcReq0, NeedGC0}; + _ -> + {NoGcReq0+1, cpc_request_gc(CpcS,NeedGC0)} + end, + + %% Wait for a cpc reply or 'DOWN' message + {NoReq1, NoGcReq2, Pid, Result, KillState1} = cpc_recv(Tag, + NoReq0, + NoGcReq1, + KillState0), + + %% Check the result of the reply + case Result of + aborted -> + %% Operation aborted due to the need to GC in order to + %% determine if the process is referring the module. + %% Schedule the operation for restart allowing GC... + check_proc_code(CpcS, NoReq1, NoGcReq2, [Pid|NeedGC1], KillState1, + Success); + false -> + %% Process not referring the module; done with this process... + check_proc_code(CpcS, NoReq1, NoGcReq2, NeedGC1, KillState1, + Success); + true -> + %% Process referring the module... + case CpcS#cpc_static.hard of + false -> + %% ... and soft check. The whole operation failed so + %% no point continuing; clean up and fail... + check_proc_code(CpcS, NoReq1, NoGcReq2, [], KillState1, + false); + true -> + %% ... and hard check; schedule kill of it... + check_proc_code(CpcS, NoReq1, NoGcReq2, NeedGC1, + cpc_sched_kill(Pid, KillState1), Success) + end; + 'DOWN' -> + %% Handled 'DOWN' message + check_proc_code(CpcS, NoReq1, NoGcReq2, NeedGC1, + KillState1, Success) + end. + +cpc_recv(Tag, NoReq, NoGcReq, #cpc_kill{outstanding = []} = KillState) -> + receive + {check_process_code, {Tag, Pid, GC}, Res} -> + cpc_handle_cpc(NoReq, NoGcReq, GC, Pid, Res, KillState) end; -do_soft_purge([], Mod) -> - catch erlang:purge_module(Mod), - true. +cpc_recv(Tag, NoReq, NoGcReq, + #cpc_kill{outstanding = [R0, R1, R2, R3, R4 | _]} = KillState) -> + receive + {'DOWN', R, process, _, _} when R == R0; + R == R1; + R == R2; + R == R3; + R == R4 -> + cpc_handle_down(NoReq, NoGcReq, R, KillState); + {check_process_code, {Tag, Pid, GC}, Res} -> + cpc_handle_cpc(NoReq, NoGcReq, GC, Pid, Res, KillState) + end; +cpc_recv(Tag, NoReq, NoGcReq, #cpc_kill{outstanding = [R|_]} = KillState) -> + receive + {'DOWN', R, process, _, _} -> + cpc_handle_down(NoReq, NoGcReq, R, KillState); + {check_process_code, {Tag, Pid, GC}, Res} -> + cpc_handle_cpc(NoReq, NoGcReq, GC, Pid, Res, KillState) + end. + +cpc_handle_down(NoReq, NoGcReq, R, #cpc_kill{outstanding = Rs, + no_outstanding = N} = KillState) -> + {NoReq, NoGcReq, undefined, 'DOWN', + cpc_sched_kill_waiting(KillState#cpc_kill{outstanding = cpc_list_rm(R, Rs), + no_outstanding = N-1})}. + +cpc_list_rm(R, [R|Rs]) -> + Rs; +cpc_list_rm(R0, [R1|Rs]) -> + [R1|cpc_list_rm(R0, Rs)]. + +cpc_handle_cpc(NoReq, NoGcReq, false, Pid, Res, KillState) -> + {NoReq-1, NoGcReq, Pid, Res, KillState}; +cpc_handle_cpc(NoReq, NoGcReq, true, Pid, Res, KillState) -> + {NoReq, NoGcReq-1, Pid, Res, KillState}. + +cpc_sched_kill_waiting(#cpc_kill{waiting = []} = KillState) -> + KillState; +cpc_sched_kill_waiting(#cpc_kill{outstanding = Rs, + no_outstanding = N, + waiting = [P|Ps]} = KillState) -> + R = erlang:monitor(process, P), + exit(P, kill), + KillState#cpc_kill{outstanding = [R|Rs], + no_outstanding = N+1, + waiting = Ps, + killed = true}. + +cpc_sched_kill(Pid, #cpc_kill{no_outstanding = N, waiting = Pids} = KillState) + when N >= ?MAX_CPC_NO_OUTSTANDING_KILLS -> + KillState#cpc_kill{waiting = [Pid|Pids]}; +cpc_sched_kill(Pid, + #cpc_kill{outstanding = Rs, no_outstanding = N} = KillState) -> + R = erlang:monitor(process, Pid), + exit(Pid, kill), + KillState#cpc_kill{outstanding = [R|Rs], + no_outstanding = N+1, + killed = true}. + +cpc_request(#cpc_static{tag = Tag, module = Mod}, Pid, AllowGc) -> + erlang:check_process_code(Pid, Mod, [{async, {Tag, Pid, AllowGc}}, + {allow_gc, AllowGc}]). + +cpc_request_gc(CpcS, [Pid|Pids]) -> + cpc_request(CpcS, Pid, true), + Pids. + +cpc_init(_CpcS, [], NoReqs) -> + NoReqs; +cpc_init(CpcS, [Pid|Pids], NoReqs) -> + cpc_request(CpcS, Pid, false), + cpc_init(CpcS, Pids, NoReqs+1). + +% end of check_proc_code() implementation. is_loaded(M, Db) -> case ets:lookup(Db, M) of diff --git a/lib/kernel/test/code_SUITE.erl b/lib/kernel/test/code_SUITE.erl index cd9359f2aa..17983e972d 100644 --- a/lib/kernel/test/code_SUITE.erl +++ b/lib/kernel/test/code_SUITE.erl @@ -23,7 +23,8 @@ -export([all/0, suite/0,groups/0,init_per_group/2,end_per_group/2]). -export([set_path/1, get_path/1, add_path/1, add_paths/1, del_path/1, replace_path/1, load_file/1, load_abs/1, ensure_loaded/1, - delete/1, purge/1, soft_purge/1, is_loaded/1, all_loaded/1, + delete/1, purge/1, purge_many_exits/1, soft_purge/1, is_loaded/1, + all_loaded/1, load_binary/1, dir_req/1, object_code/1, set_path_file/1, upgrade/1, sticky_dir/1, pa_pz_option/1, add_del_path/1, @@ -51,7 +52,7 @@ suite() -> [{ct_hooks,[ts_install_cth]}]. all() -> [set_path, get_path, add_path, add_paths, del_path, replace_path, load_file, load_abs, ensure_loaded, - delete, purge, soft_purge, is_loaded, all_loaded, + delete, purge, purge_many_exits, soft_purge, is_loaded, all_loaded, load_binary, dir_req, object_code, set_path_file, upgrade, pa_pz_option, add_del_path, dir_disappeared, @@ -369,6 +370,42 @@ purge(Config) when is_list(Config) -> process_flag(trap_exit, OldFlag), ok. +purge_many_exits(Config) when is_list(Config) -> + OldFlag = process_flag(trap_exit, true), + code:purge(code_b_test), + {'EXIT',_} = (catch code:purge({})), + false = code:purge(code_b_test), + TPids = lists:map(fun (_) -> + {code_b_test:do_spawn(), + spawn_link(fun () -> + receive + after infinity -> ok + end + end)} + end, + lists:seq(1, 1000)), + % Give them time to start... + receive after 1000 -> ok end, + true = code:delete(code_b_test), + lists:foreach(fun ({Pid1, Pid2}) -> + true = erlang:is_process_alive(Pid1), + false = code_b_test:check_exit(Pid1), + true = erlang:is_process_alive(Pid2) + end, TPids), + true = code:purge(code_b_test), + lists:foreach(fun ({Pid1, Pid2}) -> + false = erlang:is_process_alive(Pid1), + true = code_b_test:check_exit(Pid1), + true = erlang:is_process_alive(Pid2), + exit(Pid2, kill) + end, TPids), + lists:foreach(fun ({_Pid1, Pid2}) -> + receive {'EXIT', Pid2, _} -> ok end + end, TPids), + process_flag(trap_exit, OldFlag), + ok. + + soft_purge(suite) -> []; soft_purge(doc) -> []; soft_purge(Config) when is_list(Config) -> diff --git a/lib/stdlib/src/erl_internal.erl b/lib/stdlib/src/erl_internal.erl index 378e629ac9..28de7205ea 100644 --- a/lib/stdlib/src/erl_internal.erl +++ b/lib/stdlib/src/erl_internal.erl @@ -267,6 +267,7 @@ bif(bitstring_to_list, 1) -> true; bif(byte_size, 1) -> true; bif(check_old_code, 1) -> true; bif(check_process_code, 2) -> true; +bif(check_process_code, 3) -> true; bif(date, 0) -> true; bif(delete_module, 1) -> true; bif(demonitor, 1) -> true; @@ -286,6 +287,7 @@ bif(float_to_binary, 1) -> true; bif(float_to_binary, 2) -> true; bif(garbage_collect, 0) -> true; bif(garbage_collect, 1) -> true; +bif(garbage_collect, 2) -> true; bif(get, 0) -> true; bif(get, 1) -> true; bif(get_keys, 1) -> true; |