From ca0425c6ff85262bc15367f5fd9cbc51cde52b20 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Wed, 2 Oct 2013 10:07:27 +0200 Subject: Execution of system tasks in context of another process A process requesting a system task to be executed in the context of another process will be notified by a message when the task has executed. This message will be on the form: {RequestType, RequestId, Pid, Result}. A process requesting a system task to be executed can set priority on the system task. The requester typically set the same priority on the task as its own process priority, and by this avoiding priority inversion. A request for execution of a system task is made by calling the statically linked in NIF erts_internal:request_system_task(Pid, Prio, Request). This is an undocumented ERTS internal function that should remain so. It should *only* be called from BIF implementations. Currently defined system tasks are: * garbage_collect * check_process_code Further system tasks can and will be implemented in the future. The erlang:garbage_collect/[1,2] and erlang:check_process_code/[2,3] BIFs are now implemented using system tasks. Both the 'garbage_collect' and the 'check_process_code' operations perform or may perform garbage_collections. By doing these via the system task functionality all garbage collect operations in the system will be performed solely in the context of the process being garbage collected. This makes it possible to later implement functionality for disabling garbage collection of a process over context switches. Newly introduced BIFs: * erlang:garbage_collect/2 - The new second argument is an option list. Introduced option: * {async, RequestId} - making it possible for users to issue asynchronous garbage collect requests. * erlang:check_process_code/3 - The new third argument is an option list. Introduced options: * {async, RequestId} - making it possible for users to issue asynchronous check process code requests. * {allow_gc, boolean()} - making it possible to issue requests that aren't allowed to garbage collect (operation will abort if gc should be needed). These options have been introduced as a preparation for parallelization of check_process_code operations when the code_server is about to purge a module. --- erts/doc/src/erlang.xml | 161 ++++- erts/emulator/beam/atom.names | 1 + erts/emulator/beam/beam_bif_load.c | 133 ++-- erts/emulator/beam/bif.c | 39 - erts/emulator/beam/bif.tab | 6 +- erts/emulator/beam/break.c | 10 +- erts/emulator/beam/erl_alloc.types | 2 + erts/emulator/beam/erl_message.h | 5 + erts/emulator/beam/erl_process.c | 1224 ++++++++++++++++++++++++++++---- erts/emulator/beam/erl_process.h | 74 +- erts/emulator/beam/global.h | 4 + erts/emulator/beam/ops.tab | 14 +- erts/emulator/beam/utils.c | 2 +- erts/emulator/hipe/hipe_bif_list.m4 | 3 +- erts/emulator/hipe/hipe_native_bif.c | 17 +- erts/emulator/test/process_SUITE.erl | 162 ++++- erts/etc/unix/etp-commands.in | 93 ++- erts/preloaded/ebin/erlang.beam | Bin 94152 -> 97500 bytes erts/preloaded/ebin/erts_internal.beam | Bin 3260 -> 3780 bytes erts/preloaded/src/erlang.erl | 132 +++- erts/preloaded/src/erts_internal.erl | 21 + 21 files changed, 1787 insertions(+), 316 deletions(-) (limited to 'erts') diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml index 5ee40823bc..a9642be6fa 100644 --- a/erts/doc/src/erlang.xml +++ b/erts/doc/src/erlang.xml @@ -501,16 +501,87 @@ Check if a process is executing old code for a module -

Returns true if the process Pid is executing - old code for Module. That is, if the current call of - the process executes old code for this module, or if the - process has references to old code for this module, or if the - process contains funs that references old code for this - module. Otherwise, it returns false.

-
-> check_process_code(Pid, lists).
-false
+

The same as + erlang:check_process_code(Pid, + Module, []).

+
+ + + + Check if a process is executing old code for a module + +

Check if the node local process identified by Pid + is executing old code for Module.

+

Currently available Options:

+ + {allow_gc, boolean()} + + Determines if garbage collection is allowed when performing + the operation. If {allow_gc, false} is passed, and + a garbage collection is needed in order to determine the + result of the operation, the operation will be aborted + (see information on CheckResult below). + The default is to allow garbage collection, i.e., + {allow_gc, true}. + + {async, RequestId} + + The check_process_code/3 function will return + the value async immediately after the request + has been sent. When the request has been processed, the + process that called this function will be passed a + message on the form:
+ {check_process_code, RequestId, CheckResult}. +
+
+

If Pid equals self(), and + no async option has been passed, the operation will + be performed at once. In all other cases a request for + the operation will be sent to the process identified by + Pid, and will be handled when + appropriate. If no async option has been passed, + the caller will block until CheckResult + is available and can be returned.

+

CheckResult informs about the result of + the request:

+ + true + + The process identified by Pid is + executing old code for Module. + That is, the current call of the process executes old + code for this module, or the process has references + to old code for this module, or the process contains + funs that references old code for this module. + + false + + The process identified by Pid is + not executing old code for Module. + + aborted + + The operation was aborted since the process needed to + be garbage collected in order to determine the result + of the operation, and the operation was requested + by passing the {allow_gc, false} option. +

See also code(3).

+

Failures:

+ + badarg + + If Pid is not a node local process identifier. + + badarg + + If Module is not an atom. + + badarg + + If OptionList is not a valid list of options. + +
@@ -1197,20 +1268,74 @@ true that the spontaneous garbage collection will occur too late or not at all. Improper use may seriously degrade system performance.

-

Compatibility note: In versions of OTP prior to R7, - the garbage collection took place at the next context switch, - not immediately. To force a context switch after a call to - erlang:garbage_collect(), it was sufficient to make - any function call.

- Force an immediate garbage collection of a process + Garbage collect a process + +

The same as + garbage_collect(Pid, []).

+
+
+ + + Garbage collect a process -

Works like erlang:garbage_collect() but on any - process. The same caveats apply. Returns false if - Pid refers to a dead process; true otherwise.

+

Garbage collect the node local process identified by + Pid.

+

Currently available Options:

+ + {async, RequestId} + + The garbage_collect/2 function will return + the value async immediately after the request + has been sent. When the request has been processed, the + process that called this function will be passed a + message on the form:
+ {garbage_collect, RequestId, GCResult}. +
+
+

If Pid equals self(), and + no async option has been passed, the garbage + collection will be performed at once, i.e. the same as + calling + garbage_collect/0. + In all other cases a request for garbage collection will + be sent to the process identified by Pid, + and will be handled when appropriate. If no async + option has been passed, the caller will block until + GCResult is available and can be + returned.

+

GCResult informs about the result of + the garbage collection request:

+ + true + + The process identified by Pid has + been garbage collected. + + false + + No garbage collection was performed. This since the + the process identified by Pid + terminated before the request could be satisfied. + + +

Note that the same caveats as for + garbage_collect/0 + apply.

+

Failures:

+ + badarg + + If Pid is not a node local process identifier. + + badarg + + If OptionList is not a valid list of options. + +
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index c2f32ba089..8433fd826a 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -78,6 +78,7 @@ atom allocated_areas atom allocator atom allocator_sizes atom alloc_util_allocators +atom allow_gc atom allow_passive_connect atom already_loaded atom amd64 diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index 649594a334..5239940a50 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -36,7 +36,7 @@ #include "erl_thr_progress.h" static void set_default_trace_pattern(Eterm module); -static Eterm check_process_code(Process* rp, Module* modp); +static Eterm check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp); static void delete_code(Module* modp); static void decrement_refc(BeamInstr* code); static int is_native(BeamInstr* code); @@ -427,69 +427,80 @@ check_old_code_1(BIF_ALIST_1) } Eterm -check_process_code_2(BIF_ALIST_2) +erts_check_process_code(Process *c_p, Eterm module, int allow_gc, int *redsp) { - Process* rp; Module* modp; + Eterm res; + ErtsCodeIndex code_ix; - if (is_not_atom(BIF_ARG_2)) { - goto error; - } - if (is_internal_pid(BIF_ARG_1)) { - Eterm res; - ErtsCodeIndex code_ix; - - code_ix = erts_active_code_ix(); - modp = erts_get_module(BIF_ARG_2, code_ix); - if (modp == NULL) { /* Doesn't exist. */ - return am_false; - } - erts_rlock_old_code(code_ix); - if (modp->old.code == NULL) { /* No old code. */ - erts_runlock_old_code(code_ix); - return am_false; - } - erts_runlock_old_code(code_ix); - -#ifdef ERTS_SMP - rp = erts_pid2proc_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, - BIF_ARG_1, ERTS_PROC_LOCK_MAIN); -#else - rp = erts_pid2proc(BIF_P, 0, BIF_ARG_1, 0); -#endif - if (!rp) { - BIF_RET(am_false); - } - if (rp == ERTS_PROC_LOCK_BUSY) { - ERTS_BIF_YIELD2(bif_export[BIF_check_process_code_2], BIF_P, - BIF_ARG_1, BIF_ARG_2); - } - erts_rlock_old_code(code_ix); - if (modp->old.code != NULL) { /* must check again */ - res = check_process_code(rp, modp); - } - else { - res = am_false; - } - erts_runlock_old_code(code_ix); -#ifdef ERTS_SMP - if (BIF_P != rp) { - erts_resume(rp, ERTS_PROC_LOCK_MAIN); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); + (*redsp)++; + + ASSERT(is_atom(module)); + + code_ix = erts_active_code_ix(); + modp = erts_get_module(module, code_ix); + if (!modp) + return am_false; + erts_rlock_old_code(code_ix); + res = modp->old.code ? check_process_code(c_p, modp, allow_gc, redsp) : am_false; + erts_runlock_old_code(code_ix); + + return res; +} + +BIF_RETTYPE erts_internal_check_process_code_2(BIF_ALIST_2) +{ + int reds = 0; + Eterm res; + Eterm olist = BIF_ARG_2; + int allow_gc = 1; + + if (is_not_atom(BIF_ARG_1)) + goto badarg; + + while (is_list(olist)) { + Eterm *lp = list_val(olist); + Eterm opt = CAR(lp); + if (is_tuple(opt)) { + Eterm* tp = tuple_val(opt); + switch (arityval(tp[0])) { + case 2: + switch (tp[1]) { + case am_allow_gc: + switch (tp[2]) { + case am_false: + allow_gc = 0; + break; + case am_true: + allow_gc = 1; + break; + default: + goto badarg; + } + break; + default: + goto badarg; + } + break; + default: + goto badarg; + } } -#endif - BIF_RET(res); - } - else if (is_external_pid(BIF_ARG_1) - && external_pid_dist_entry(BIF_ARG_1) == erts_this_dist_entry) { - BIF_RET(am_false); + else + goto badarg; + olist = CDR(lp); } + if (is_not_nil(olist)) + goto badarg; - error: + res = erts_check_process_code(BIF_P, BIF_ARG_1, allow_gc, &reds); + + BIF_RET2(res, reds); + +badarg: BIF_ERROR(BIF_P, BADARG); } - BIF_RETTYPE delete_module_1(BIF_ALIST_1) { ErtsCodeIndex code_ix; @@ -710,7 +721,7 @@ set_default_trace_pattern(Eterm module) } static Eterm -check_process_code(Process* rp, Module* modp) +check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp) { BeamInstr* start; char* mod_start; @@ -786,6 +797,8 @@ check_process_code(Process* rp, Module* modp) if (done_gc) { return am_true; } else { + if (!allow_gc) + return am_aborted; /* * Try to get rid of this fun by garbage collecting. * Clear both fvalue and ftrace to make sure they @@ -796,7 +809,7 @@ check_process_code(Process* rp, Module* modp) rp->ftrace = NIL; done_gc = 1; FLAGS(rp) |= F_NEED_FULLSWEEP; - (void) erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); + *redsp += erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); goto rescan; } } @@ -850,6 +863,9 @@ check_process_code(Process* rp, Module* modp) Uint lit_size; struct erl_off_heap_header* oh; + if (!allow_gc) + return am_aborted; + /* * Try to get rid of constants by by garbage collecting. * Clear both fvalue and ftrace. @@ -859,11 +875,12 @@ check_process_code(Process* rp, Module* modp) rp->ftrace = NIL; done_gc = 1; FLAGS(rp) |= F_NEED_FULLSWEEP; - (void) erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); + *redsp += erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); literals = (Eterm *) modp->old.code[MI_LITERALS_START]; lit_size = (Eterm *) modp->old.code[MI_LITERALS_END] - literals; oh = (struct erl_off_heap_header *) modp->old.code[MI_LITERALS_OFF_HEAP]; + *redsp += lit_size / 10; /* Need, better value... */ erts_garbage_collect_literals(rp, literals, lit_size, oh); } } diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 755c5e6882..58bd6aac0b 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -3741,45 +3741,6 @@ BIF_RETTYPE now_0(BIF_ALIST_0) /**********************************************************************/ -BIF_RETTYPE garbage_collect_1(BIF_ALIST_1) -{ - int reds; - Process *rp; - - if (is_not_pid(BIF_ARG_1)) { - BIF_ERROR(BIF_P, BADARG); - } - - if (BIF_P->common.id == BIF_ARG_1) - rp = BIF_P; - else { -#ifdef ERTS_SMP - rp = erts_pid2proc_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, - BIF_ARG_1, ERTS_PROC_LOCK_MAIN); - if (rp == ERTS_PROC_LOCK_BUSY) - ERTS_BIF_YIELD1(bif_export[BIF_garbage_collect_1], BIF_P, BIF_ARG_1); -#else - rp = erts_proc_lookup(BIF_ARG_1); -#endif - if (!rp) - BIF_RET(am_false); - } - - /* The GC cost is taken for the process executing this BIF. */ - - FLAGS(rp) |= F_NEED_FULLSWEEP; - reds = erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); - -#ifdef ERTS_SMP - if (BIF_P != rp) { - erts_resume(rp, ERTS_PROC_LOCK_MAIN); - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); - } -#endif - - BIF_RET2(am_true, reds); -} - BIF_RETTYPE garbage_collect_0(BIF_ALIST_0) { int reds; diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index dc8e9101de..3b7bb56885 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -46,7 +46,6 @@ bif erlang:atom_to_list/1 bif erlang:binary_to_list/1 bif erlang:binary_to_list/3 bif erlang:binary_to_term/1 -bif erlang:check_process_code/2 bif erlang:crc32/1 bif erlang:crc32/2 bif erlang:crc32_combine/3 @@ -67,7 +66,6 @@ bif erlang:float_to_list/1 bif erlang:float_to_list/2 bif erlang:fun_info/2 bif erlang:garbage_collect/0 -bif erlang:garbage_collect/1 bif erlang:get/0 bif erlang:get/1 bif erlang:get_keys/1 @@ -155,6 +153,10 @@ bif erts_internal:port_control/3 bif erts_internal:port_close/1 bif erts_internal:port_connect/2 +bif erts_internal:request_system_task/3 +bif erts_internal:check_process_code/2 + + # inet_db support bif erlang:port_set_data/2 bif erlang:port_get_data/1 diff --git a/erts/emulator/beam/break.c b/erts/emulator/beam/break.c index ad9a89b642..acbd700cc4 100644 --- a/erts/emulator/beam/break.c +++ b/erts/emulator/beam/break.c @@ -109,10 +109,12 @@ process_killer(void) erts_smp_proc_lock(rp, rp_locks); state = erts_smp_atomic32_read_acqb(&rp->state); if (state & (ERTS_PSFLG_FREE - | ERTS_PSFLG_EXITING - | ERTS_PSFLG_ACTIVE - | ERTS_PSFLG_IN_RUNQ - | ERTS_PSFLG_RUNNING)) { + | ERTS_PSFLG_EXITING + | ERTS_PSFLG_ACTIVE + | ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_IN_RUNQ + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS)) { erts_printf("Can only kill WAITING processes this way\n"); } else { diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index bb5eba80be..32308fae9b 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -268,6 +268,8 @@ type PROC_INTERVAL LONG_LIVED SYSTEM process_interval type BUSY_CALLER_TAB SHORT_LIVED SYSTEM busy_caller_table type BUSY_CALLER SHORT_LIVED SYSTEM busy_caller type PORT_DATA_HEAP STANDARD SYSTEM port_data_heap +type PROC_SYS_TSK SHORT_LIVED PROCESSES proc_sys_task +type PROC_SYS_TSK_QS SHORT_LIVED PROCESSES proc_sys_task_queues +if threads_no_smp # Need thread safe allocs, but std_alloc and fix_alloc are not; diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 771eba431f..0f3bb8d281 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -46,6 +46,11 @@ typedef struct erl_off_heap { Uint64 overhead; /* Administrative overhead (used to force GC). */ } ErlOffHeap; +#define ERTS_INIT_OFF_HEAP(OHP) \ + do { \ + (OHP)->first = NULL; \ + (OHP)->overhead = 0; \ + } while (0) #include "external.h" #include "erl_process.h" diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 434d5ca147..ad806da660 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -283,6 +283,40 @@ struct erts_system_profile_flags_t erts_system_profile_flags; #error "Need to store process_count in another type" #endif +typedef enum { + ERTS_PSTT_GC, /* Garbage Collect */ + ERTS_PSTT_CPC /* Check Process Code */ +} ErtsProcSysTaskType; + +#define ERTS_MAX_PROC_SYS_TASK_ARGS 2 + +struct ErtsProcSysTask_ { + ErtsProcSysTask *next; + ErtsProcSysTask *prev; + ErtsProcSysTaskType type; + Eterm requester; + Eterm reply_tag; + Eterm req_id; + Uint req_id_sz; + Eterm arg[ERTS_MAX_PROC_SYS_TASK_ARGS]; + ErlOffHeap off_heap; + Eterm heap[1]; +}; + +#define ERTS_PROC_SYS_TASK_SIZE(HSz) \ + (sizeof(ErtsProcSysTask) - sizeof(Eterm) + sizeof(Eterm)*(HSz)) + +struct ErtsProcSysTaskQs_ { + int qmask; + int ncount; + ErtsProcSysTask *q[ERTS_NO_PROC_PRIO_LEVELS]; +}; + +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(proc_sys_task_queues, + ErtsProcSysTaskQs, + 50, + ERTS_ALC_T_PROC_SYS_TSK_QS) + ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_op_list, ErtsMiscOpList, 10, @@ -353,6 +387,14 @@ static void aux_work_timeout_early_init(int no_schedulers); static void aux_work_timeout_late_init(void); static void setup_aux_work_timer(void); +static int execute_sys_tasks(Process *c_p, + erts_aint32_t *statep, + int in_reds); +static int cleanup_sys_tasks(Process *c_p, + erts_aint32_t in_state, + int in_reds); + + #if defined(DEBUG) || 0 #define ERTS_DBG_CHK_AUX_WORK_VAL(V) dbg_chk_aux_work_val((V)) static void @@ -2848,7 +2890,7 @@ dequeue_process(ErtsRunQueue *runq, int prio_q, erts_aint32_t *statep) if (statep) *statep = state; - prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state); rqi = &runq->procs.prio_info[prio]; @@ -2997,7 +3039,7 @@ immigrate(ErtsRunQueue *c_rq, ErtsMigrationPath *mp) erts_aint32_t state; state = erts_smp_atomic32_read_acqb(&proc->state); if (!(ERTS_PSFLG_BOUND & state) - && (prio == (int) (ERTS_PSFLG_PRIO_MASK & state))) { + && (prio == (int) ERTS_PSFLGS_GET_PRQ_PRIO(state))) { ErtsRunQueueInfo *rqi = &rq->procs.prio_info[prio]; unqueue_process(rq, rpq, rqi, prio, prev_proc, proc); erts_smp_runq_unlock(rq); @@ -3079,7 +3121,7 @@ schedule_bound_processes(ErtsRunQueue *rq, while (proc) { erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state); next = proc->next; - enqueue_process(rq, (int) (ERTS_PSFLG_PRIO_MASK & state), proc); + enqueue_process(rq, (int) ERTS_PSFLGS_GET_PRQ_PRIO(state), proc); proc = next; } } @@ -3184,7 +3226,7 @@ evacuate_run_queue(ErtsRunQueue *rq, sbpp->last = proc; } else { - int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + int prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state); erts_smp_runq_unlock(rq); to_rq = mp->prio[prio].runq; @@ -3257,7 +3299,7 @@ try_steal_task_from_victim(ErtsRunQueue *rq, int *rq_lockedp, ErtsRunQueue *vrq, erts_aint32_t state = erts_smp_atomic32_read_acqb(&proc->state); if (!(ERTS_PSFLG_BOUND & state)) { /* Steal process */ - int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); + int prio = (int) ERTS_PSFLGS_GET_PRQ_PRIO(state); ErtsRunQueueInfo *rqi = &vrq->procs.prio_info[prio]; unqueue_process(vrq, rpq, rqi, prio, prev_proc, proc); erts_smp_runq_unlock(vrq); @@ -4575,6 +4617,7 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online) #endif init_misc_op_list_alloc(); + init_proc_sys_task_queues_alloc(); #ifdef ERTS_SMP set_wakeup_other_data(); @@ -4858,46 +4901,166 @@ erts_get_scheduler_data(void) #endif +static Process * +make_proxy_proc(Process *prev_proxy, Process *proc, erts_aint32_t prio) +{ + erts_aint32_t state; + Process *proxy; +#ifdef ERTS_SMP + ErtsRunQueue *rq = RUNQ_READ_RQ(&proc->run_queue); +#endif + + state = (ERTS_PSFLG_PROXY + | ERTS_PSFLG_IN_RUNQ + | (((erts_aint32_t) 1) << (prio + ERTS_PSFLGS_IN_PRQ_MASK_OFFSET)) + | (prio << ERTS_PSFLGS_PRQ_PRIO_OFFSET) + | (prio << ERTS_PSFLGS_USR_PRIO_OFFSET) + | (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET)); + + if (prev_proxy) { + proxy = prev_proxy; + ASSERT(erts_smp_atomic32_read_nob(&proxy->state) & ERTS_PSFLG_PROXY); + erts_smp_atomic32_set_nob(&proxy->state, state); +#ifdef ERTS_SMP + RUNQ_SET_RQ(&proc->run_queue, rq); +#endif + } + else { + proxy = erts_alloc(ERTS_ALC_T_PROC, sizeof(Process)); +#ifdef DEBUG + { + int i; + Uint32 *ui32 = (Uint32 *) (char *) proxy; + for (i = 0; i < sizeof(Process)/sizeof(Uint32); i++) + ui32[i] = (Uint32) 0xdeadbeef; + } +#endif + erts_smp_atomic32_init_nob(&proxy->state, state); +#ifdef ERTS_SMP + erts_smp_atomic_init_nob(&proxy->run_queue, + erts_smp_atomic_read_nob(&proc->run_queue)); +#endif + } + + proxy->common.id = proc->common.id; + + return proxy; +} + +static ERTS_INLINE void +free_proxy_proc(Process *proxy) +{ + ASSERT(erts_smp_atomic32_read_nob(&proxy->state) & ERTS_PSFLG_PROXY); + erts_free(ERTS_ALC_T_PROC, proxy); +} + + +static ERTS_INLINE int +check_enqueue_in_prio_queue(erts_aint32_t *prq_prio_p, + erts_aint32_t *newp, + erts_aint32_t actual) +{ + erts_aint32_t aprio, qbit, max_qbit; + + aprio = (actual >> ERTS_PSFLGS_ACT_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK; + qbit = 1 << aprio; + + *prq_prio_p = aprio; + + max_qbit = (actual >> ERTS_PSFLGS_IN_PRQ_MASK_OFFSET) & ERTS_PSFLGS_QMASK; + max_qbit |= 1 << ERTS_PSFLGS_QMASK_BITS; + max_qbit &= -max_qbit; + /* + * max_qbit now either contain bit set for highest prio queue or a bit + * out of range (which will have a value larger than valid range). + */ + + if (qbit >= max_qbit) + return 0; /* Already queued in higher or equal prio */ + + /* Need to enqueue (if already enqueued, it is in lower prio) */ + *newp |= qbit << ERTS_PSFLGS_IN_PRQ_MASK_OFFSET; + + if ((actual & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLGS_USR_PRIO_MASK)) + != (aprio << ERTS_PSFLGS_USR_PRIO_OFFSET)) { + /* + * Process struct already enqueued, or actual prio not + * equal to user prio, i.e., enqueue using proxy. + */ + return -1; + } + + /* + * Enqueue using process struct. + */ + *newp &= ~ERTS_PSFLGS_PRQ_PRIO_MASK; + *newp |= ERTS_PSFLG_IN_RUNQ | (aprio << ERTS_PSFLGS_PRQ_PRIO_OFFSET); + return 1; +} + /* * scheduler_out_process() return with c_rq locked. */ static ERTS_INLINE int -schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p) +schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Process *proxy) { - erts_aint32_t a, e, n; + erts_aint32_t a, e, n, enq_prio = -1; int res = 0; + int enqueue; /* < 0 -> use proxy */ a = state; while (1) { n = e = a; - ASSERT(a & ERTS_PSFLG_RUNNING); - ASSERT(!(a & ERTS_PSFLG_IN_RUNQ)); + ASSERT(a & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS)); - n &= ~ERTS_PSFLG_RUNNING; - if ((a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) - n |= ERTS_PSFLG_IN_RUNQ; + enqueue = 0; + + n &= ~(ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS); + if (a & ERTS_PSFLG_ACTIVE_SYS + || (a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) { + enqueue = check_enqueue_in_prio_queue(&enq_prio, &n, a); + } a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); if (a == e) break; } - if (!(n & ERTS_PSFLG_IN_RUNQ)) { - if (erts_system_profile_flags.runnable_procs) - profile_runnable_proc(p, am_inactive); + if (!enqueue) { + + if (erts_system_profile_flags.runnable_procs) { + + if (!(a & ERTS_PSFLG_ACTIVE_SYS) + && (!(a & ERTS_PSFLG_ACTIVE) + || (a & ERTS_PSFLG_SUSPENDED))) { + /* Process inactive */ + profile_runnable_proc(p, am_inactive); + } + } + + if (proxy) + free_proxy_proc(proxy); } else { - int prio = (int) (ERTS_PSFLG_PRIO_MASK & n); + Process *sched_p; ErtsRunQueue *runq = erts_get_runq_proc(p); - ASSERT(!(n & ERTS_PSFLG_SUSPENDED)); + ASSERT(!(n & ERTS_PSFLG_SUSPENDED) || (n & ERTS_PSFLG_ACTIVE_SYS)); + + if (enqueue < 0) + sched_p = make_proxy_proc(proxy, p, enq_prio); + else { + sched_p = p; + if (proxy) + free_proxy_proc(proxy); + } #ifdef ERTS_SMP if (!(ERTS_PSFLG_BOUND & n)) { - ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio); + ErtsRunQueue *new_runq = erts_check_emigration_need(runq, enq_prio); if (new_runq) { - RUNQ_SET_RQ(&p->run_queue, new_runq); + RUNQ_SET_RQ(&sched_p->run_queue, new_runq); runq = new_runq; } } @@ -4908,7 +5071,7 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p) erts_smp_runq_lock(runq); /* Enqueue the process */ - enqueue_process(runq, prio, p); + enqueue_process(runq, (int) enq_prio, sched_p); if (runq == c_rq) return res; @@ -4920,14 +5083,13 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p) } static ERTS_INLINE void -add2runq(Process *p, erts_aint32_t state) +add2runq(Process *p, erts_aint32_t state, erts_aint32_t prio) { - int prio = (int) (ERTS_PSFLG_PRIO_MASK & state); ErtsRunQueue *runq = erts_get_runq_proc(p); #ifdef ERTS_SMP if (!(ERTS_PSFLG_BOUND & state)) { - ErtsRunQueue *new_runq = erts_check_emigration_need(runq, prio); + ErtsRunQueue *new_runq = erts_check_emigration_need(runq, (int) prio); if (new_runq) { RUNQ_SET_RQ(&p->run_queue, new_runq); runq = new_runq; @@ -4939,101 +5101,236 @@ add2runq(Process *p, erts_aint32_t state) erts_smp_runq_lock(runq); /* Enqueue the process */ - enqueue_process(runq, prio, p); + enqueue_process(runq, (int) prio, p); erts_smp_runq_unlock(runq); smp_notify_inc_runq(runq); } -static ERTS_INLINE void -schedule_process(Process *p, erts_aint32_t state, int active_enq) +static ERTS_INLINE int +change_proc_schedule_state(Process *p, + erts_aint32_t clear_state_flags, + erts_aint32_t set_state_flags, + erts_aint32_t *statep, + erts_aint32_t *enq_prio_p) { - erts_aint32_t a = state, n; + /* + * NOTE: ERTS_PSFLG_RUNNING, ERTS_PSFLG_RUNNING_SYS and + * ERTS_PSFLG_ACTIVE_SYS are not allowed to be + * altered by this function! + */ + erts_aint32_t a = *statep, n; + int enqueue; /* < 0 -> use proxy */ + + ASSERT(!(a & ERTS_PSFLG_PROXY)); + ASSERT((clear_state_flags & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_ACTIVE_SYS)) == 0); + ASSERT((set_state_flags & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_ACTIVE_SYS)) == 0); while (1) { erts_aint32_t e; n = e = a; + enqueue = 0; + if (a & ERTS_PSFLG_FREE) - return; /* We don't want to schedule free processes... */ - n |= ERTS_PSFLG_ACTIVE; - if (!(a & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_RUNNING))) - n |= ERTS_PSFLG_IN_RUNQ; + break; /* We don't want to schedule free processes... */ + + if (clear_state_flags) + n &= ~clear_state_flags; + + if (set_state_flags) + n |= set_state_flags; + + if ((n & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_IN_RUNQ + | ERTS_PSFLG_ACTIVE)) == ERTS_PSFLG_ACTIVE) { + /* + * Active and seemingly need to be enqueued, but + * process may be in a run queue via proxy, need + * further inspection... + */ + enqueue = check_enqueue_in_prio_queue(enq_prio_p, &n, a); + } + a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); if (a == e) break; - if (!active_enq && (a & ERTS_PSFLG_ACTIVE)) - return; /* Someone else activated process ... */ + if (enqueue == 0 && n == a) + break; } - if (erts_system_profile_flags.runnable_procs - && !(a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) { - profile_runnable_proc(p, am_active); + if (erts_system_profile_flags.runnable_procs) { + + if (((n & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_ACTIVE)) == ERTS_PSFLG_ACTIVE) + && (!(a & (ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS) + && (!(a & ERTS_PSFLG_ACTIVE) + || (a & ERTS_PSFLG_SUSPENDED))))) { + /* We activated a prevously inactive process */ + profile_runnable_proc(p, am_active); + } + } - if ((n & ERTS_PSFLG_IN_RUNQ) && !(a & ERTS_PSFLG_IN_RUNQ)) - add2runq(p, n); + *statep = a; + + return enqueue; +} + +static ERTS_INLINE void +schedule_process(Process *p, erts_aint32_t in_state) +{ + erts_aint32_t enq_prio = -1; + erts_aint32_t state = in_state; + int enqueue = change_proc_schedule_state(p, + 0, + ERTS_PSFLG_ACTIVE, + &state, + &enq_prio); + if (enqueue) + add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), + state, + enq_prio); } void erts_schedule_process(Process *p, erts_aint32_t state) { - schedule_process(p, state, 0); + schedule_process(p, state); +} + +static void +schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy) +{ + erts_aint32_t a = state, n, enq_prio = -1; + int enqueue; /* < 0 -> use proxy */ + + ASSERT(!(state & ERTS_PSFLG_PROXY)); + + while (1) { + erts_aint32_t e; + n = e = a; + + if (a & ERTS_PSFLG_FREE) + return; /* We don't want to schedule free processes... */ + + enqueue = 0; + n |= ERTS_PSFLG_ACTIVE_SYS; + if (!(a & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS))) + enqueue = check_enqueue_in_prio_queue(&enq_prio, &n, a); + a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); + if (a == e) + break; + if (a == n && !enqueue) + goto cleanup; + } + + if (erts_system_profile_flags.runnable_procs) { + + if (!(a & (ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS)) + && (!(a & ERTS_PSFLG_ACTIVE) || (a & ERTS_PSFLG_SUSPENDED))) { + /* We activated a prevously inactive process */ + profile_runnable_proc(p, am_active); + } + + } + + if (enqueue) { + Process *sched_p; + if (enqueue > 0) + sched_p = p; + else { + sched_p = make_proxy_proc(proxy, p, enq_prio); + proxy = NULL; + } + add2runq(sched_p, n, enq_prio); + } + +cleanup: + if (proxy) + free_proxy_proc(proxy); } static ERTS_INLINE int suspend_process(Process *c_p, Process *p) { - erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); + erts_aint32_t state; int suspended = 0; ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); + state = erts_smp_atomic32_read_acqb(&p->state); + if ((state & ERTS_PSFLG_SUSPENDED)) suspended = -1; else { if (c_p == p) { state = erts_smp_atomic32_read_bor_relb(&p->state, ERTS_PSFLG_SUSPENDED); - state |= ERTS_PSFLG_SUSPENDED; ASSERT(state & ERTS_PSFLG_RUNNING); - suspended = 1; + suspended = (state & ERTS_PSFLG_SUSPENDED) ? -1: 1; } else { while (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_EXITING))) { - erts_aint32_t e, n; + erts_aint32_t n, e; + n = e = state; n |= ERTS_PSFLG_SUSPENDED; state = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e); if (state == e) { - state = n; suspended = 1; break; } + if (state & ERTS_PSFLG_SUSPENDED) { + suspended = -1; + break; + } } } } - if (state & ERTS_PSFLG_SUSPENDED) { + if (suspended) { ASSERT(!(ERTS_PSFLG_RUNNING & state) || p == erts_get_current_process()); - if (erts_system_profile_flags.runnable_procs - && (p->rcount == 0) - && (state & ERTS_PSFLG_ACTIVE)) { - profile_runnable_proc(p, am_inactive); + if (suspended > 0 && erts_system_profile_flags.runnable_procs) { + + /* 'state' is before our change... */ + + if ((state & (ERTS_PSFLG_ACTIVE + | ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) { + /* We made process inactive */ + profile_runnable_proc(p, am_inactive); + } + } p->rcount++; /* count number of suspend */ } + return suspended; } static ERTS_INLINE void resume_process(Process *p) { - erts_aint32_t state; + erts_aint32_t state, enq_prio = -1; + int enqueue; + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); ASSERT(p->rcount > 0); @@ -5041,14 +5338,16 @@ resume_process(Process *p) if (--p->rcount > 0) /* multiple suspend */ return; - state = erts_smp_atomic32_read_band_mb(&p->state, ~ERTS_PSFLG_SUSPENDED); - state &= ~ERTS_PSFLG_SUSPENDED; - if ((state & (ERTS_PSFLG_EXITING - | ERTS_PSFLG_ACTIVE - | ERTS_PSFLG_IN_RUNQ - | ERTS_PSFLG_RUNNING)) == ERTS_PSFLG_ACTIVE) { - schedule_process(p, state, 1); - } + state = erts_smp_atomic32_read_nob(&p->state); + enqueue = change_proc_schedule_state(p, + ERTS_PSFLG_SUSPENDED, + 0, + &state, + &enq_prio); + if (enqueue) + add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), + state, + enq_prio); } int @@ -6032,7 +6331,8 @@ pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks, goto done; } else { - if (!(ERTS_PSFLG_RUNNING & erts_smp_atomic32_read_acqb(&rp->state))) + if (!((ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS) + & erts_smp_atomic32_read_acqb(&rp->state))) goto done; } @@ -6695,7 +6995,7 @@ Eterm erts_get_process_priority(Process *p) { erts_aint32_t state = erts_smp_atomic32_read_nob(&p->state); - switch (state & ERTS_PSFLG_PRIO_MASK) { + switch (ERTS_PSFLGS_GET_USR_PRIO(state)) { case PRIORITY_MAX: return am_max; case PRIORITY_HIGH: return am_high; case PRIORITY_NORMAL: return am_normal; @@ -6718,18 +7018,60 @@ erts_set_process_priority(Process *p, Eterm value) } a = erts_smp_atomic32_read_nob(&p->state); - if (nprio == (a & ERTS_PSFLG_PRIO_MASK)) + if (nprio == ERTS_PSFLGS_GET_USR_PRIO(a)) oprio = nprio; else { - erts_aint32_t e, n; + int slocked = 0; + erts_aint32_t e, n, aprio; + + if (a & ERTS_PSFLG_ACTIVE_SYS) { + erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); + slocked = 1; + } + do { - oprio = a & ERTS_PSFLG_PRIO_MASK; + oprio = ERTS_PSFLGS_GET_USR_PRIO(a); n = e = a; - ASSERT(!(a & ERTS_PSFLG_IN_RUNQ)); + if (!(a & ERTS_PSFLG_ACTIVE_SYS)) + aprio = nprio; + else { + int max_qbit; + + if (!slocked) { + erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); + slocked = 1; + } + + max_qbit = p->sys_task_qs->qmask; + max_qbit &= -max_qbit; + switch (max_qbit) { + case MAX_BIT: + aprio = PRIORITY_MAX; + break; + case HIGH_BIT: + aprio = PRIORITY_HIGH; + break; + case NORMAL_BIT: + aprio = PRIORITY_NORMAL; + break; + case LOW_BIT: + aprio = PRIORITY_LOW; + break; + default: + ERTS_INTERNAL_ERROR("Invalid qmask"); + aprio = -1; + } + + if (aprio > nprio) /* low value -> high prio */ + aprio = nprio; + } + + n &= ~(ERTS_PSFLGS_USR_PRIO_MASK + | ERTS_PSFLGS_ACT_PRIO_MASK); + n |= ((nprio << ERTS_PSFLGS_USR_PRIO_OFFSET) + | (aprio << ERTS_PSFLGS_ACT_PRIO_OFFSET)); - n &= ~ERTS_PSFLG_PRIO_MASK; - n |= nprio; a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); } while (a != e); } @@ -6763,6 +7105,7 @@ erts_set_process_priority(Process *p, Eterm value) Process *schedule(Process *p, int calls) { + Process *proxy_p = NULL; ErtsRunQueue *rq; erts_aint_t dt; ErtsSchedulerData *esdp; @@ -6805,6 +7148,8 @@ Process *schedule(Process *p, int calls) actual_reds = reds = 0; erts_smp_runq_lock(rq); } else { + sched_out_proc: + #ifdef ERTS_SMP ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); esdp = p->scheduler_data; @@ -6858,10 +7203,11 @@ Process *schedule(Process *p, int calls) esdp->reductions += reds; - schedule_out_process(rq, state, p); /* Returns with rq locked! */ + schedule_out_process(rq, state, p, proxy_p); /* Returns with rq locked! */ + proxy_p = NULL; ERTS_PROC_REDUCTIONS_EXECUTED(rq, - (int) (state & ERTS_PSFLG_PRIO_MASK), + (int) ERTS_PSFLGS_GET_USR_PRIO(state), reds, actual_reds); @@ -6870,18 +7216,19 @@ Process *schedule(Process *p, int calls) p->scheduler_data = NULL; #endif + erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); if (state & ERTS_PSFLG_FREE) { #ifdef ERTS_SMP ASSERT(esdp->free_process == p); esdp->free_process = NULL; -#else - erts_free_proc(p); +#else + state = erts_smp_atomic32_read_nob(&p->state); + if (!(state & ERTS_PSFLG_IN_RUNQ)) + erts_free_proc(p); #endif } - erts_smp_proc_unlock(p, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); - #ifdef ERTS_SMP ASSERT(!esdp->free_process); #endif @@ -7088,6 +7435,8 @@ Process *schedule(Process *p, int calls) * Find a new process to run. */ pick_next_process: { + erts_aint32_t psflg_band_mask; + erts_aint32_t running_flag; int prio_q; int qmask; @@ -7121,18 +7470,62 @@ Process *schedule(Process *p, int calls) ASSERT(p); /* Wrong qmask in rq->flags? */ + psflg_band_mask = ~(((erts_aint32_t) 1) << (ERTS_PSFLGS_GET_PRQ_PRIO(state) + + ERTS_PSFLGS_IN_PRQ_MASK_OFFSET)); + + if (!(state & ERTS_PSFLG_PROXY)) + psflg_band_mask &= ~ERTS_PSFLG_IN_RUNQ; + else { + proxy_p = p; + p = erts_proc_lookup_raw(proxy_p->common.id); + if (!p) { + free_proxy_proc(proxy_p); + proxy_p = NULL; + goto pick_next_process; + } + state = erts_smp_atomic32_read_nob(&p->state); + } + + + if (state & ERTS_PSFLG_ACTIVE_SYS) + running_flag = ERTS_PSFLG_RUNNING_SYS; + else + running_flag = ERTS_PSFLG_RUNNING; + while (1) { erts_aint32_t exp, new, tmp; tmp = new = exp = state; - new &= ~ERTS_PSFLG_IN_RUNQ; - tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); - if (tmp != ERTS_PSFLG_SUSPENDED) - new |= ERTS_PSFLG_RUNNING; + new &= psflg_band_mask; + if (!(state & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS))) { + tmp = state & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_PENDING_EXIT + | ERTS_PSFLG_ACTIVE_SYS); + if (tmp != ERTS_PSFLG_SUSPENDED) + new |= running_flag; + } state = erts_smp_atomic32_cmpxchg_relb(&p->state, new, exp); if (state == exp) { - tmp = state & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); - if (tmp == ERTS_PSFLG_SUSPENDED) + if ((state & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_FREE)) + || ((state & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_PENDING_EXIT + | ERTS_PSFLG_ACTIVE_SYS)) + == ERTS_PSFLG_SUSPENDED)) { + if (state & ERTS_PSFLG_FREE) { +#ifdef ERTS_SMP + erts_smp_proc_dec_refc(p); +#else + erts_free_proc(p); +#endif + } + if (proxy_p) { + free_proxy_proc(proxy_p); + proxy_p = NULL; + } goto pick_next_process; + } state = new; break; } @@ -7162,7 +7555,7 @@ Process *schedule(Process *p, int calls) (UWord) esdp->no); int migrated = old && old != esdp->no; - prio = (int) (state & ERTS_PSFLG_PRIO_MASK); + prio = (int) ERTS_PSFLGS_GET_USR_PRIO(state); erts_smp_spin_lock(&erts_sched_stat.lock); erts_sched_stat.prio[prio].total_executed++; @@ -7182,9 +7575,6 @@ Process *schedule(Process *p, int calls) ASSERT(!p->scheduler_data); p->scheduler_data = esdp; #endif - /* Never run a suspended process */ - ASSERT(!(ERTS_PSFLG_SUSPENDED & erts_smp_atomic32_read_nob(&p->state))); - reds = context_reds; if (IS_TRACED(p)) { @@ -7210,21 +7600,574 @@ Process *schedule(Process *p, int calls) erts_check_my_tracer_proc(p); #endif + if (state & ERTS_PSFLG_RUNNING_SYS) { + reds -= execute_sys_tasks(p, &state, reds); + if (reds <= 0) { + p->fcalls = reds; + goto sched_out_proc; + } + + ASSERT(state & ERTS_PSFLG_RUNNING_SYS); + ASSERT(!(state & ERTS_PSFLG_RUNNING)); + + while (1) { + erts_aint32_t n, e; + + if (((state & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_ACTIVE)) != ERTS_PSFLG_ACTIVE) + && !(state & ERTS_PSFLG_EXITING)) + goto sched_out_proc; + + n = e = state; + n &= ~ERTS_PSFLG_RUNNING_SYS; + n |= ERTS_PSFLG_RUNNING; + + state = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); + if (state == e) { + state = n; + break; + } + + ASSERT(state & ERTS_PSFLG_RUNNING_SYS); + ASSERT(!(state & ERTS_PSFLG_RUNNING)); + } + } + if (!(state & ERTS_PSFLG_EXITING) && ((FLAGS(p) & F_FORCE_GC) || (MSO(p).overhead > BIN_VHEAP_SZ(p)))) { reds -= erts_garbage_collect(p, 0, p->arg_reg, p->arity); - if (reds < 0) { - reds = 1; + if (reds <= 0) { + p->fcalls = reds; + goto sched_out_proc; } } + + if (proxy_p) { + free_proxy_proc(proxy_p); + proxy_p = NULL; + } p->fcalls = reds; ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); + + /* Never run a suspended process */ + ASSERT(!(ERTS_PSFLG_SUSPENDED & erts_smp_atomic32_read_nob(&p->state))); + return p; } } +static int +notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, Eterm st_result) +{ + Process *rp = erts_proc_lookup(st->requester); + if (rp) { + ErtsProcLocks rp_locks; + ErlOffHeap *ohp; + ErlHeapFragment* bp; + Eterm *hp, msg, req_id, result; + Uint st_result_sz, hsz; +#ifdef DEBUG + Eterm *hp_start; +#endif + + rp_locks = (c_p == rp) ? ERTS_PROC_LOCK_MAIN : 0; + + st_result_sz = is_immed(st_result) ? 0 : size_object(st_result); + hsz = st->req_id_sz + st_result_sz + 4 /* 3-tuple */; + + hp = erts_alloc_message_heap(hsz, + &bp, + &ohp, + rp, + &rp_locks); + +#ifdef DEBUG + hp_start = hp; +#endif + + req_id = st->req_id_sz == 0 ? st->req_id : copy_struct(st->req_id, + st->req_id_sz, + &hp, + ohp); + + result = st_result_sz == 0 ? st_result : copy_struct(st_result, + st_result_sz, + &hp, + ohp); + + ASSERT(is_immed(st->reply_tag)); + + msg = TUPLE3(hp, st->reply_tag, req_id, result); + +#ifdef DEBUG + hp += 4; + ASSERT(hp_start + hsz == hp); +#endif + + erts_queue_message(rp, + &rp_locks, + bp, + msg, + NIL +#ifdef USE_VM_PROBES + , NIL +#endif + ); + + if (c_p == rp) + rp_locks &= ~ERTS_PROC_LOCK_MAIN; + + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); + } + + erts_cleanup_offheap(&st->off_heap); + + erts_free(ERTS_ALC_T_PROC_SYS_TSK, st); + + return rp ? 1 : 0; +} + +static ERTS_INLINE ErtsProcSysTask * +fetch_sys_task(Process *c_p, erts_aint32_t state, int *qmaskp) +{ + ErtsProcSysTaskQs *unused_qs = NULL; + int qbit, qmask; + ErtsProcSysTask *st, **qp; + + erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); + + if (!c_p->sys_task_qs) { + qmask = 0; + st = NULL; + goto update_state; + } + + qmask = c_p->sys_task_qs->qmask; + + if ((state & (ERTS_PSFLG_ACTIVE + | ERTS_PSFLG_EXITING + | ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE) { + /* No sys tasks if we got exclusively higher prio user work to do */ + st = NULL; + switch (ERTS_PSFLGS_GET_USR_PRIO(state)) { + case PRIORITY_MAX: + if (!(qmask & MAX_BIT)) + goto done; + break; + case PRIORITY_HIGH: + if (!(qmask & (MAX_BIT|HIGH_BIT))) + goto done; + break; + default: + break; + } + } + + qbit = qmask & -qmask; + switch (qbit) { + case MAX_BIT: + qp = &c_p->sys_task_qs->q[PRIORITY_MAX]; + break; + case HIGH_BIT: + qp = &c_p->sys_task_qs->q[PRIORITY_HIGH]; + break; + case NORMAL_BIT: + if (!(qmask & PRIORITY_LOW) + || ++c_p->sys_task_qs->ncount <= RESCHEDULE_LOW) { + qp = &c_p->sys_task_qs->q[PRIORITY_NORMAL]; + break; + } + c_p->sys_task_qs->ncount = 0; + /* Fall through */ + case LOW_BIT: + qp = &c_p->sys_task_qs->q[PRIORITY_LOW]; + break; + default: + ERTS_INTERNAL_ERROR("Invalid qmask"); + } + + st = *qp; + ASSERT(st); + if (st->next != st) { + *qp = st->next; + st->next->prev = st->prev; + st->prev->next = st->next; + } + else { + erts_aint32_t a, e, n, st_prio, qmask2; + + *qp = NULL; + qmask &= ~qbit; + c_p->sys_task_qs->qmask = qmask; + + update_state: + + qmask2 = qmask; + + switch (qmask2 & -qmask2) { + case MAX_BIT: + st_prio = PRIORITY_MAX; + break; + case HIGH_BIT: + st_prio = PRIORITY_HIGH; + break; + case NORMAL_BIT: + st_prio = PRIORITY_NORMAL; + break; + case LOW_BIT: + st_prio = PRIORITY_LOW; + break; + case 0: + st_prio = PRIORITY_LOW; + unused_qs = c_p->sys_task_qs; + c_p->sys_task_qs = NULL; + break; + default: + ERTS_INTERNAL_ERROR("Invalid qmask"); + } + + a = state; + do { + erts_aint32_t prio = ERTS_PSFLGS_GET_USR_PRIO(a); + + if (prio > st_prio) + prio = st_prio; + + n = e = a; + + n &= ~ERTS_PSFLGS_ACT_PRIO_MASK; + n |= (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET); + + if (!qmask) + n &= ~ERTS_PSFLG_ACTIVE_SYS; + + if (a == n) + break; + a = erts_smp_atomic32_cmpxchg_nob(&c_p->state, n, e); + } while (a != e); + } + +done: + + erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS); + + if (unused_qs) + proc_sys_task_queues_free(unused_qs); + + *qmaskp = qmask; + + return st; +} + +static int +execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) +{ + int garbage_collected = 0; + erts_aint32_t state = *statep; + int max_reds = in_reds; + int reds = 0; + int qmask = 0; + + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); + + do { + ErtsProcSysTask *st; + Eterm st_res; + + if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { +#ifdef ERTS_SMP + if (state & ERTS_PSFLG_PENDING_EXIT) + erts_handle_pending_exit(c_p, ERTS_PROC_LOCK_MAIN); +#endif + ASSERT(ERTS_PROC_IS_EXITING(c_p)); + break; + } + + st = fetch_sys_task(c_p, state, &qmask); + if (!st) + break; + + switch (st->type) { + case ERTS_PSTT_GC: + if (!garbage_collected) { + FLAGS(c_p) |= F_NEED_FULLSWEEP; + reds += erts_garbage_collect(c_p, 0, c_p->arg_reg, c_p->arity); + garbage_collected = 1; + } + st_res = am_true; + break; + case ERTS_PSTT_CPC: + st_res = erts_check_process_code(c_p, + st->arg[0], + st->arg[1] == am_true, + &reds); + break; + default: + ERTS_INTERNAL_ERROR("Invalid process sys task type"); + st_res = am_false; + } + + if (st) + reds += notify_sys_task_executed(c_p, st, st_res); + + state = erts_smp_atomic32_read_acqb(&c_p->state); + } while (qmask && reds < max_reds); + + *statep = state; + + return reds; +} + +static int +cleanup_sys_tasks(Process *c_p, erts_aint32_t in_state, int in_reds) +{ + erts_aint32_t state = in_state; + int max_reds = in_reds; + int reds = 0; + int qmask = 0; + + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); + + do { + ErtsProcSysTask *st; + Eterm st_res; + + st = fetch_sys_task(c_p, state, &qmask); + if (!st) + break; + + switch (st->type) { + case ERTS_PSTT_GC: + st_res = am_false; + break; + case ERTS_PSTT_CPC: + st_res = am_false; + break; + default: + ERTS_INTERNAL_ERROR("Invalid process sys task type"); + st_res = am_false; + break; + } + + reds += notify_sys_task_executed(c_p, st, st_res); + + state = erts_smp_atomic32_read_acqb(&c_p->state); + } while (qmask && reds < max_reds); + + return reds; +} + +BIF_RETTYPE +erts_internal_request_system_task_3(BIF_ALIST_3) +{ + Process *rp = erts_proc_lookup(BIF_ARG_1); + ErtsProcSysTaskQs *stqs, *free_stqs = NULL; + ErtsProcSysTask *st = NULL; + erts_aint32_t prio, rp_state; + int rp_locked; + Eterm noproc_res, req_type; + + if (!rp && !is_internal_pid(BIF_ARG_1)) { + if (!is_external_pid(BIF_ARG_1)) + goto badarg; + if (external_pid_dist_entry(BIF_ARG_1) != erts_this_dist_entry) + goto badarg; + } + + switch (BIF_ARG_2) { + case am_max: prio = PRIORITY_MAX; break; + case am_high: prio = PRIORITY_HIGH; break; + case am_normal: prio = PRIORITY_NORMAL; break; + case am_low: prio = PRIORITY_LOW; break; + default: goto badarg; + } + + if (is_not_tuple(BIF_ARG_3)) + goto badarg; + else { + int i; + Eterm *tp = tuple_val(BIF_ARG_3); + Uint arity = arityval(*tp); + Eterm req_id; + Uint req_id_sz; + Eterm arg[ERTS_MAX_PROC_SYS_TASK_ARGS]; + Uint arg_sz[ERTS_MAX_PROC_SYS_TASK_ARGS]; + Uint tot_sz; + Eterm *hp; + + if (arity < 2) + goto badarg; + if (arity > 2 + ERTS_MAX_PROC_SYS_TASK_ARGS) + goto badarg; + req_type = tp[1]; + req_id = tp[2]; + req_id_sz = is_immed(req_id) ? req_id : size_object(req_id); + tot_sz = req_id_sz; + for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++) { + int tix = 3 + i; + if (tix > arity) { + arg[i] = THE_NON_VALUE; + arg_sz[i] = 0; + } + else { + arg[i] = tp[tix]; + if (is_immed(arg[i])) + arg_sz[i] = 0; + else { + arg_sz[i] = size_object(arg[i]); + tot_sz += arg_sz[i]; + } + } + } + st = erts_alloc(ERTS_ALC_T_PROC_SYS_TSK, + ERTS_PROC_SYS_TASK_SIZE(tot_sz)); + st->next = st->prev = st; /* Prep for empty prio queue */ + ERTS_INIT_OFF_HEAP(&st->off_heap); + hp = &st->heap[0]; + + st->requester = BIF_P->common.id; + st->reply_tag = req_type; + st->req_id_sz = req_id_sz; + st->req_id = req_id_sz == 0 ? req_id : copy_struct(req_id, + req_id_sz, + &hp, + &st->off_heap); + + for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++) + st->arg[i] = arg_sz[i] == 0 ? arg[i] : copy_struct(arg[i], + arg_sz[i], + &hp, + &st->off_heap); + ASSERT(&st->heap[0] + tot_sz == hp); + } + + switch (req_type) { + + case am_garbage_collect: + st->type = ERTS_PSTT_GC; + noproc_res = am_false; + if (!rp) + goto noproc; + break; + + case am_check_process_code: + if (is_not_atom(st->arg[0])) + goto badarg; + if (st->arg[1] != am_true && st->arg[1] != am_false) + goto badarg; + noproc_res = am_false; + st->type = ERTS_PSTT_CPC; + if (!rp) + goto noproc; + break; + + default: + goto badarg; + } + + rp_state = erts_smp_atomic32_read_nob(&rp->state); + + rp_locked = 0; + + free_stqs = NULL; + if (rp_state & ERTS_PSFLG_ACTIVE_SYS) + stqs = NULL; + else { + alloc_qs: + stqs = proc_sys_task_queues_alloc(); + stqs->qmask = 1 << prio; + stqs->ncount = 0; + stqs->q[PRIORITY_MAX] = NULL; + stqs->q[PRIORITY_HIGH] = NULL; + stqs->q[PRIORITY_NORMAL] = NULL; + stqs->q[PRIORITY_LOW] = NULL; + stqs->q[prio] = st; + } + + if (!rp_locked) { + rp_locked = 1; + erts_smp_proc_lock(rp, ERTS_PROC_LOCK_STATUS); + + rp_state = erts_smp_atomic32_read_nob(&rp->state); + if (rp_state & ERTS_PSFLG_EXITING) { + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); + rp = NULL; + free_stqs = stqs; + goto noproc; + } + } + + if (!rp->sys_task_qs) { + if (stqs) + rp->sys_task_qs = stqs; + else + goto alloc_qs; + } + else { + if (stqs) + free_stqs = stqs; + stqs = rp->sys_task_qs; + if (!stqs->q[prio]) { + stqs->q[prio] = st; + stqs->qmask |= 1 << prio; + } + else { + st->next = stqs->q[prio]; + st->prev = stqs->q[prio]->prev; + st->next->prev = st; + st->prev->next = st; + ASSERT(stqs->qmask & (1 << prio)); + } + } + + if (ERTS_PSFLGS_GET_ACT_PRIO(rp_state) > prio) { + erts_aint32_t n, a, e; + /* Need to elevate actual prio */ + + a = rp_state; + do { + if (ERTS_PSFLGS_GET_ACT_PRIO(a) <= prio) { + n = a; + break; + } + n = e = a; + n &= ~ERTS_PSFLGS_ACT_PRIO_MASK; + n |= (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET); + a = erts_smp_atomic32_cmpxchg_nob(&rp->state, n, e); + } while (a != e); + rp_state = n; + } + + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); + + schedule_process_sys_task(rp, rp_state, NULL); + + if (free_stqs) + proc_sys_task_queues_free(free_stqs); + + BIF_RET(am_ok); + +noproc: + + notify_sys_task_executed(BIF_P, st, noproc_res); + if (free_stqs) + proc_sys_task_queues_free(free_stqs); + BIF_RET(am_ok); + +badarg: + + if (st) { + erts_cleanup_offheap(&st->off_heap); + erts_free(ERTS_ALC_T_PROC_SYS_TSK, st); + } + if (free_stqs) + proc_sys_task_queues_free(free_stqs); + BIF_ERROR(BIF_P, BADARG); +} + void erts_sched_stat_modify(int what) { @@ -7520,7 +8463,8 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). prio = (erts_aint32_t) so->priority; } - state |= (prio & ERTS_PSFLG_PRIO_MASK); + state |= (((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_ACT_PRIO_OFFSET) + | ((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_USR_PRIO_OFFSET)); if (!rq) rq = erts_get_runq_proc(parent); @@ -7599,6 +8543,8 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->bin_old_vheap = 0; p->bin_vheap_mature = 0; + p->sys_task_qs = NULL; + /* No need to initialize p->fcalls. */ p->current = p->initial+INITIAL_MOD; @@ -7764,7 +8710,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). * Schedule process for execution. */ - schedule_process(p, state, 0); + schedule_process(p, state); VERBOSE(DEBUG_PROCESSES, ("Created a new process: %T\n",p->common.id)); @@ -7815,6 +8761,7 @@ void erts_init_empty_process(Process *p) p->bin_vheap_sz = BIN_VH_MIN_SIZE; p->bin_old_vheap_sz = BIN_VH_MIN_SIZE; p->bin_old_vheap = 0; + p->sys_task_qs = NULL; p->bin_vheap_mature = 0; #ifdef ERTS_SMP p->common.u.alive.ptimer = NULL; @@ -8070,33 +9017,21 @@ delete_process(Process* p) p->fvalue = NIL; } -static ERTS_INLINE erts_aint32_t -set_proc_exiting_state(Process *p, erts_aint32_t state) -{ - erts_aint32_t a, n, e; - a = state; - while (1) { - n = e = a; - n &= ~(ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT); - n |= ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE; - if (!(a & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING))) - n |= ERTS_PSFLG_IN_RUNQ; - a = erts_smp_atomic32_cmpxchg_relb(&p->state, n, e); - if (a == e) - break; - } - return a; -} - static ERTS_INLINE void set_proc_exiting(Process *p, - erts_aint32_t state, + erts_aint32_t in_state, Eterm reason, ErlHeapFragment *bp) { + erts_aint32_t state = in_state, enq_prio = -1; + int enqueue; ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(p) == ERTS_PROC_LOCKS_ALL); - state = set_proc_exiting_state(p, state); + enqueue = change_proc_schedule_state(p, + ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT, + ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE, + &state, + &enq_prio); p->fvalue = reason; if (bp) @@ -8111,15 +9046,37 @@ set_proc_exiting(Process *p, cancel_timer(p); p->i = (BeamInstr *) beam_exit; - if (erts_system_profile_flags.runnable_procs - && !(state & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) { - profile_runnable_proc(p, am_active); - } - - if (!(state & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLG_RUNNING))) - add2runq(p, state); + if (enqueue) + add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), + state, + enq_prio); } +static ERTS_INLINE erts_aint32_t +set_proc_self_exiting(Process *c_p) +{ +#ifdef DEBUG + int enqueue; +#endif + erts_aint32_t state, enq_prio = -1; + + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCKS_ALL); + + state = erts_smp_atomic32_read_nob(&c_p->state); + ASSERT(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS)); + +#ifdef DEBUG + enqueue = +#endif + change_proc_schedule_state(c_p, + ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_PENDING_EXIT, + ERTS_PSFLG_EXITING|ERTS_PSFLG_ACTIVE, + &state, + &enq_prio); + + ASSERT(!enqueue); + return state; +} #ifdef ERTS_SMP @@ -8167,7 +9124,7 @@ handle_pending_exiters(ErtsProcList *pnd_xtrs) if (p) { if (erts_proclist_same(plp, p)) { erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); - if (!(state & ERTS_PSFLG_RUNNING)) { + if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS))) { ASSERT(state & ERTS_PSFLG_PENDING_EXIT); erts_handle_pending_exit(p, ERTS_PROC_LOCKS_ALL); } @@ -8388,7 +9345,7 @@ send_exit_signal(Process *c_p, /* current process if and only } set_proc_exiting(c_p, state, rsn, NULL); } - else if (!(state & ERTS_PSFLG_RUNNING)) { + else if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS))) { /* Process not running ... */ ErtsProcLocks need_locks = ~(*rp_locks) & ERTS_PROC_LOCKS_ALL; if (need_locks @@ -8765,9 +9722,6 @@ resume_suspend_monitor(ErtsSuspendMonitor *smon, void *vc_p) void erts_do_exit_process(Process* p, Eterm reason) { -#ifdef ERTS_SMP - erts_aint32_t state; -#endif p->arity = 0; /* No live registers */ p->fvalue = reason; @@ -8792,10 +9746,9 @@ erts_do_exit_process(Process* p, Eterm reason) #endif #ifndef ERTS_SMP - set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state)); + set_proc_self_exiting(p); #else - state = set_proc_exiting_state(p, erts_smp_atomic32_read_nob(&p->state)); - if (state & ERTS_PSFLG_PENDING_EXIT) { + if (ERTS_PSFLG_PENDING_EXIT & set_proc_self_exiting(p)) { /* Process exited before pending exit was received... */ p->pending_exit.reason = THE_NON_VALUE; if (p->pending_exit.bp) { @@ -8849,6 +9802,7 @@ erts_continue_exit_process(Process *p) DistEntry *dep; struct saved_calls *scb; process_breakpoint_time_t *pbt; + erts_aint32_t state; #ifdef DEBUG int yield_allowed = 1; @@ -8885,6 +9839,12 @@ erts_continue_exit_process(Process *p) p->flags &= ~F_USING_DB; } + state = erts_smp_atomic32_read_acqb(&p->state); + if (state & ERTS_PSFLG_ACTIVE_SYS) { + if (cleanup_sys_tasks(p, state, CONTEXT_REDS) >= CONTEXT_REDS/2) + goto yield; + } + if (p->flags & F_USING_DDLL) { erts_ddll_proc_dead(p, ERTS_PROC_LOCK_MAIN); p->flags &= ~F_USING_DDLL; @@ -8962,17 +9922,31 @@ erts_continue_exit_process(Process *p) { /* Inactivate and notify free */ erts_aint32_t n, e, a = erts_smp_atomic32_read_nob(&p->state); +#ifdef ERTS_SMP + int refc_inced = 0; +#endif while (1) { n = e = a; ASSERT(a & ERTS_PSFLG_EXITING); n |= ERTS_PSFLG_FREE; n &= ~ERTS_PSFLG_ACTIVE; +#ifdef ERTS_SMP + if ((n & ERTS_PSFLG_IN_RUNQ) && !refc_inced) { + erts_smp_proc_inc_refc(p); + refc_inced = 1; + } +#endif a = erts_smp_atomic32_cmpxchg_mb(&p->state, n, e); if (a == e) break; } - } +#ifdef ERTS_SMP + if (refc_inced && !(n & ERTS_PSFLG_IN_RUNQ)) + erts_smp_proc_dec_refc(p); +#endif + } + dep = ((p->flags & F_DISTRIBUTION) ? ERTS_PROC_SET_DIST_ENTRY(p, ERTS_PROC_LOCKS_ALL, NULL) : NULL); @@ -9075,7 +10049,7 @@ timeout_proc(Process* p) state = erts_smp_atomic32_read_acqb(&p->state); if (!(state & ERTS_PSFLG_ACTIVE)) - schedule_process(p, state, 0); + schedule_process(p, state); } @@ -9153,7 +10127,9 @@ erts_program_counter_info(int to, void *to_arg, Process *p) print_function_from_pc(to, to_arg, p->cp); erts_print(to, to_arg, ")\n"); state = erts_smp_atomic32_read_acqb(&p->state); - if (!(state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_GC))) { + if (!(state & (ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_GC))) { erts_print(to, to_arg, "arity = %d\n",p->arity); if (!ERTS_IS_CRASH_DUMPING) { /* diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 8e5467f196..814e3d34df 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -688,6 +688,9 @@ typedef struct { ErlHeapFragment *bp; } ErtsPendExit; +typedef struct ErtsProcSysTask_ ErtsProcSysTask; +typedef struct ErtsProcSysTaskQs_ ErtsProcSysTaskQs; + #ifdef ERTS_SMP typedef struct ErtsPendingSuspend_ ErtsPendingSuspend; @@ -855,6 +858,7 @@ struct process { Uint64 bin_old_vheap_sz; /* Virtual old heap block size for binaries */ Uint64 bin_old_vheap; /* Virtual old heap size for binaries */ + ErtsProcSysTaskQs *sys_task_qs; erts_smp_atomic32_t state; /* Process state flags (see ERTS_PSFLG_*) */ #ifdef ERTS_SMP @@ -924,24 +928,66 @@ void erts_check_for_holes(Process* p); # error "Need to increase ERTS_PSFLG_PRIO_SHIFT" #endif -#define ERTS_PSFLG_PRIO_SHIFT 2 +#define ERTS_PSFLGS_PRIO_BITS 2 +#define ERTS_PSFLGS_PRIO_MASK \ + ((((erts_aint32_t) 1) << ERTS_PSFLGS_PRIO_BITS) - 1) -#define ERTS_PSFLG_BIT(N) \ - (((erts_aint32_t) 1) << (ERTS_PSFLG_PRIO_SHIFT + (N))) +#define ERTS_PSFLGS_ACT_PRIO_OFFSET (0*ERTS_PSFLGS_PRIO_BITS) +#define ERTS_PSFLGS_USR_PRIO_OFFSET (1*ERTS_PSFLGS_PRIO_BITS) +#define ERTS_PSFLGS_PRQ_PRIO_OFFSET (2*ERTS_PSFLGS_PRIO_BITS) +#define ERTS_PSFLGS_ZERO_BIT_OFFSET (3*ERTS_PSFLGS_PRIO_BITS) -#define ERTS_PSFLG_PRIO_MASK (ERTS_PSFLG_BIT(0) - 1) +#define ERTS_PSFLGS_QMASK_BITS 4 +#define ERTS_PSFLGS_QMASK \ + ((((erts_aint32_t) 1) << ERTS_PSFLGS_QMASK_BITS) - 1) +#define ERTS_PSFLGS_IN_PRQ_MASK_OFFSET \ + ERTS_PSFLGS_ZERO_BIT_OFFSET -#define ERTS_PSFLG_FREE ERTS_PSFLG_BIT(0) -#define ERTS_PSFLG_EXITING ERTS_PSFLG_BIT(1) -#define ERTS_PSFLG_PENDING_EXIT ERTS_PSFLG_BIT(2) -#define ERTS_PSFLG_ACTIVE ERTS_PSFLG_BIT(3) -#define ERTS_PSFLG_IN_RUNQ ERTS_PSFLG_BIT(4) -#define ERTS_PSFLG_RUNNING ERTS_PSFLG_BIT(5) -#define ERTS_PSFLG_SUSPENDED ERTS_PSFLG_BIT(6) -#define ERTS_PSFLG_GC ERTS_PSFLG_BIT(7) -#define ERTS_PSFLG_BOUND ERTS_PSFLG_BIT(8) -#define ERTS_PSFLG_TRAP_EXIT ERTS_PSFLG_BIT(9) +#define ERTS_PSFLG_BIT(N) \ + (((erts_aint32_t) 1) << (ERTS_PSFLGS_ZERO_BIT_OFFSET + (N))) +/* + * ACT_PRIO -> Active prio, i.e., currently active prio. This + * prio may be higher than user prio. + * USR_PRIO -> User prio. i.e., prio the user has set. + * PRQ_PRIO -> Prio queue prio, i.e., prio queue currently + * enqueued in. + */ +#define ERTS_PSFLGS_ACT_PRIO_MASK \ + (ERTS_PSFLGS_PRIO_MASK << ERTS_PSFLGS_ACT_PRIO_OFFSET) +#define ERTS_PSFLGS_USR_PRIO_MASK \ + (ERTS_PSFLGS_PRIO_MASK << ERTS_PSFLGS_USR_PRIO_OFFSET) +#define ERTS_PSFLGS_PRQ_PRIO_MASK \ + (ERTS_PSFLGS_PRIO_MASK << ERTS_PSFLGS_PRQ_PRIO_OFFSET) +#define ERTS_PSFLG_IN_PRQ_MAX ERTS_PSFLG_BIT(0) +#define ERTS_PSFLG_IN_PRQ_HIGH ERTS_PSFLG_BIT(1) +#define ERTS_PSFLG_IN_PRQ_NORMAL ERTS_PSFLG_BIT(2) +#define ERTS_PSFLG_IN_PRQ_LOW ERTS_PSFLG_BIT(3) +#define ERTS_PSFLG_FREE ERTS_PSFLG_BIT(4) +#define ERTS_PSFLG_EXITING ERTS_PSFLG_BIT(5) +#define ERTS_PSFLG_PENDING_EXIT ERTS_PSFLG_BIT(6) +#define ERTS_PSFLG_ACTIVE ERTS_PSFLG_BIT(7) +#define ERTS_PSFLG_IN_RUNQ ERTS_PSFLG_BIT(8) +#define ERTS_PSFLG_RUNNING ERTS_PSFLG_BIT(9) +#define ERTS_PSFLG_SUSPENDED ERTS_PSFLG_BIT(10) +#define ERTS_PSFLG_GC ERTS_PSFLG_BIT(11) +#define ERTS_PSFLG_BOUND ERTS_PSFLG_BIT(12) +#define ERTS_PSFLG_TRAP_EXIT ERTS_PSFLG_BIT(13) +#define ERTS_PSFLG_ACTIVE_SYS ERTS_PSFLG_BIT(14) +#define ERTS_PSFLG_RUNNING_SYS ERTS_PSFLG_BIT(15) +#define ERTS_PSFLG_PROXY ERTS_PSFLG_BIT(16) + +#define ERTS_PSFLGS_IN_PRQ_MASK (ERTS_PSFLG_IN_PRQ_MAX \ + | ERTS_PSFLG_IN_PRQ_HIGH \ + | ERTS_PSFLG_IN_PRQ_NORMAL \ + | ERTS_PSFLG_IN_PRQ_LOW) + +#define ERTS_PSFLGS_GET_ACT_PRIO(PSFLGS) \ + (((PSFLGS) >> ERTS_PSFLGS_ACT_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK) +#define ERTS_PSFLGS_GET_USR_PRIO(PSFLGS) \ + (((PSFLGS) >> ERTS_PSFLGS_USR_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK) +#define ERTS_PSFLGS_GET_PRQ_PRIO(PSFLGS) \ + (((PSFLGS) >> ERTS_PSFLGS_USR_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK) /* The sequential tracing token is a tuple of size 5: * diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index bacd5a5752..885f092aca 100755 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -655,6 +655,10 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg); Eterm erl_is_function(Process* p, Eterm arg1, Eterm arg2); +/* beam_bif_load.c */ +Eterm erts_check_process_code(Process *c_p, Eterm module, int allow_gc, int *redsp); + + /* beam_load.c */ typedef struct { BeamInstr* current; /* Pointer to: Mod, Name, Arity */ diff --git a/erts/emulator/beam/ops.tab b/erts/emulator/beam/ops.tab index 1e5ae46bfa..c29f3f9b1b 100644 --- a/erts/emulator/beam/ops.tab +++ b/erts/emulator/beam/ops.tab @@ -763,17 +763,17 @@ allocate_init t I y ################################################################# # -# The BIFs erlang:check_process_code/2 must be called like a function, +# The BIFs erts_internal:check_process_code/2 must be called like a function, # to ensure that c_p->i (program counter) is set correctly (an ordinary # BIF call doesn't set it). # -call_ext u==2 Bif=u$bif:erlang:check_process_code/2 => i_call_ext Bif -call_ext_last u==2 Bif=u$bif:erlang:check_process_code/2 D => i_call_ext_last Bif D -call_ext_only u==2 Bif=u$bif:erlang:check_process_code/2 => i_call_ext_only Bif +call_ext u==2 Bif=u$bif:erts_internal:check_process_code/2 => i_call_ext Bif +call_ext_last u==2 Bif=u$bif:erts_internal:check_process_code/2 D => i_call_ext_last Bif D +call_ext_only u==2 Bif=u$bif:erts_internal:check_process_code/2 => i_call_ext_only Bif # -# The BIFs erlang:garbage_collect/0,1 must be called like functions, +# The BIFs erlang:garbage_collect/0 must be called like a function, # to allow them to invoke the garbage collector. (The stack pointer must # be saved and p->arity must be zeroed, which is not done on ordinary BIF calls.) # @@ -782,10 +782,6 @@ call_ext u==0 Bif=u$bif:erlang:garbage_collect/0 => i_call_ext Bif call_ext_last u==0 Bif=u$bif:erlang:garbage_collect/0 D => i_call_ext_last Bif D call_ext_only u==0 Bif=u$bif:erlang:garbage_collect/0 => i_call_ext_only Bif -call_ext u==1 Bif=u$bif:erlang:garbage_collect/1 => i_call_ext Bif -call_ext_last u==1 Bif=u$bif:erlang:garbage_collect/1 D => i_call_ext_last Bif D -call_ext_only u==1 Bif=u$bif:erlang:garbage_collect/1 => i_call_ext_only Bif - # # put/2 and erase/1 must be able to do garbage collection, so we must call # them like functions. diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c index bd2be7afca..a5b2cc589c 100644 --- a/erts/emulator/beam/utils.c +++ b/erts/emulator/beam/utils.c @@ -1675,7 +1675,7 @@ static int do_send_to_logger(Eterm tag, Eterm gleader, char *buf, int len) p = erts_whereis_process(NULL, 0, am_error_logger, 0, 0); if (p) { erts_aint32_t state = erts_smp_atomic32_read_acqb(&p->state); - if (state & ERTS_PSFLG_RUNNING) + if (state & (ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS)) p = NULL; } } diff --git a/erts/emulator/hipe/hipe_bif_list.m4 b/erts/emulator/hipe/hipe_bif_list.m4 index 764b8d180c..b1fedf4838 100644 --- a/erts/emulator/hipe/hipe_bif_list.m4 +++ b/erts/emulator/hipe/hipe_bif_list.m4 @@ -151,10 +151,9 @@ standard_bif_interface_0(nbif_ports_0, ports_0) * BIFs and primops that may do a GC (change heap limit and walk the native stack). * XXX: erase/1 and put/2 cannot fail */ -gc_bif_interface_2(nbif_check_process_code_2, hipe_check_process_code_2) +gc_bif_interface_2(nbif_erts_internal_check_process_code_2, hipe_erts_internal_check_process_code_2) gc_bif_interface_1(nbif_erase_1, erase_1) gc_bif_interface_0(nbif_garbage_collect_0, garbage_collect_0) -gc_bif_interface_1(nbif_garbage_collect_1, hipe_garbage_collect_1) gc_nofail_primop_interface_1(nbif_gc_1, hipe_gc) gc_bif_interface_2(nbif_put_2, put_2) diff --git a/erts/emulator/hipe/hipe_native_bif.c b/erts/emulator/hipe/hipe_native_bif.c index 1f76268934..7d343dd91e 100644 --- a/erts/emulator/hipe/hipe_native_bif.c +++ b/erts/emulator/hipe/hipe_native_bif.c @@ -41,8 +41,7 @@ */ /* for -Wmissing-prototypes :-( */ -extern Eterm hipe_check_process_code_2(BIF_ALIST_2); -extern Eterm hipe_garbage_collect_1(BIF_ALIST_1); +extern Eterm hipe_erts_internal_check_process_code_2(BIF_ALIST_2); extern Eterm hipe_show_nstack_1(BIF_ALIST_1); /* Used when a BIF can trigger a stack walk. */ @@ -51,22 +50,12 @@ static __inline__ void hipe_set_narity(Process *p, unsigned int arity) p->hipe.narity = arity; } -Eterm hipe_check_process_code_2(BIF_ALIST_2) +Eterm hipe_erts_internal_check_process_code_2(BIF_ALIST_2) { Eterm ret; hipe_set_narity(BIF_P, 2); - ret = check_process_code_2(BIF_P, BIF__ARGS); - hipe_set_narity(BIF_P, 0); - return ret; -} - -Eterm hipe_garbage_collect_1(BIF_ALIST_1) -{ - Eterm ret; - - hipe_set_narity(BIF_P, 1); - ret = garbage_collect_1(BIF_P, BIF__ARGS); + ret = erts_internal_check_process_code_2(BIF_P, BIF__ARGS); hipe_set_narity(BIF_P, 0); return ret; } diff --git a/erts/emulator/test/process_SUITE.erl b/erts/emulator/test/process_SUITE.erl index e3aae17df4..e66c6f09b6 100644 --- a/erts/emulator/test/process_SUITE.erl +++ b/erts/emulator/test/process_SUITE.erl @@ -51,7 +51,11 @@ processes_term_proc_list/1, otp_7738_waiting/1, otp_7738_suspended/1, otp_7738_resume/1, - garb_other_running/1]). + garb_other_running/1, + no_priority_inversion/1, + no_priority_inversion2/1, + system_task_blast/1, + system_task_on_suspended/1]). -export([prio_server/2, prio_client/2]). -export([init_per_testcase/2, end_per_testcase/2]). @@ -73,7 +77,8 @@ all() -> bad_register, garbage_collect, process_info_messages, process_flag_badarg, process_flag_heap_size, spawn_opt_heap_size, otp_6237, {group, processes_bif}, - {group, otp_7738}, garb_other_running]. + {group, otp_7738}, garb_other_running, + {group, system_task}]. groups() -> [{t_exit_2, [], @@ -87,7 +92,10 @@ groups() -> processes_gc_trap, processes_term_proc_list]}, {otp_7738, [], [otp_7738_waiting, otp_7738_suspended, - otp_7738_resume]}]. + otp_7738_resume]}, + {system_task, [], + [no_priority_inversion, no_priority_inversion2, + system_task_blast, system_task_on_suspended]}]. init_per_suite(Config) -> A0 = case application:start(sasl) of @@ -2214,6 +2222,154 @@ garb_other_running(Config) when is_list(Config) -> receive {'DOWN', Mon, process, Pid, normal} -> ok end, ok. +no_priority_inversion(Config) when is_list(Config) -> + Prio = process_flag(priority, max), + HTLs = lists:map(fun (_) -> + spawn_opt(fun () -> + tok_loop() + end, + [{priority, high}, monitor, link]) + end, + lists:seq(1, 2*erlang:system_info(schedulers))), + receive after 500 -> ok end, + LTL = spawn_opt(fun () -> + tok_loop() + end, + [{priority, low}, monitor, link]), + false = erlang:check_process_code(element(1, LTL), nonexisting_module), + true = erlang:garbage_collect(element(1, LTL)), + lists:foreach(fun ({P, _}) -> + unlink(P), + exit(P, kill) + end, [LTL | HTLs]), + lists:foreach(fun ({P, M}) -> + receive + {'DOWN', M, process, P, killed} -> + ok + end + end, [LTL | HTLs]), + process_flag(priority, Prio), + ok. + +no_priority_inversion2(Config) when is_list(Config) -> + Prio = process_flag(priority, max), + MTLs = lists:map(fun (_) -> + spawn_opt(fun () -> + tok_loop() + end, + [{priority, max}, monitor, link]) + end, + lists:seq(1, 2*erlang:system_info(schedulers))), + receive after 500 -> ok end, + {PL, ML} = spawn_opt(fun () -> + tok_loop() + end, + [{priority, low}, monitor, link]), + RL = request_gc(PL, low), + RN = request_gc(PL, normal), + RH = request_gc(PL, high), + receive + {garbage_collect, _, _} -> + ?t:fail(unexpected_gc) + after 1000 -> + ok + end, + RM = request_gc(PL, max), + receive + {garbage_collect, RM, true} -> + ok + end, + lists:foreach(fun ({P, _}) -> + unlink(P), + exit(P, kill) + end, MTLs), + lists:foreach(fun ({P, M}) -> + receive + {'DOWN', M, process, P, killed} -> + ok + end + end, MTLs), + receive + {garbage_collect, RH, true} -> + ok + end, + receive + {garbage_collect, RN, true} -> + ok + end, + receive + {garbage_collect, RL, true} -> + ok + end, + unlink(PL), + exit(PL, kill), + receive + {'DOWN', ML, process, PL, killed} -> + ok + end, + process_flag(priority, Prio), + ok. + +request_gc(Pid, Prio) -> + Ref = make_ref(), + erts_internal:request_system_task(Pid, Prio, {garbage_collect, Ref}), + Ref. + +system_task_blast(Config) when is_list(Config) -> + Me = self(), + GCReq = fun () -> + RL = gc_req(Me, 100), + lists:foreach(fun (R) -> + receive + {garbage_collect, R, true} -> + ok + end + end, RL), + exit(it_worked) + end, + HTLs = lists:map(fun (_) -> spawn_monitor(GCReq) end, lists:seq(1, 1000)), + lists:foreach(fun ({P, M}) -> + receive + {'DOWN', M, process, P, it_worked} -> + ok + end + end, HTLs), + ok. + +gc_req(_Pid, 0) -> + []; +gc_req(Pid, N) -> + R0 = request_gc(Pid, low), + R1 = request_gc(Pid, normal), + R2 = request_gc(Pid, high), + R3 = request_gc(Pid, max), + [R0, R1, R2, R3 | gc_req(Pid, N-1)]. + +system_task_on_suspended(Config) when is_list(Config) -> + {P, M} = spawn_monitor(fun () -> + tok_loop() + end), + true = erlang:suspend_process(P), + {status, suspended} = process_info(P, status), + true = erlang:garbage_collect(P), + {status, suspended} = process_info(P, status), + true = erlang:resume_process(P), + false = ({status, suspended} == process_info(P, status)), + exit(P, kill), + receive + {'DOWN', M, process, P, killed} -> + ok + end. + +gc_req(_Pid, 0) -> + []; +gc_req(Pid, N) -> + R0 = erts_internal:request_system_task(Pid, low, garbage_collect), + R1 = erts_internal:request_system_task(Pid, normal, garbage_collect), + R2 = erts_internal:request_system_task(Pid, high, garbage_collect), + R3 = erts_internal:request_system_task(Pid, max, garbage_collect), + [R0, R1, R2, R3 | gc_req(Pid, N-1)]. + %% Internal functions wait_until(Fun) -> diff --git a/erts/etc/unix/etp-commands.in b/erts/etc/unix/etp-commands.in index 54ff7b3e3a..6e3466b8c0 100644 --- a/erts/etc/unix/etp-commands.in +++ b/erts/etc/unix/etp-commands.in @@ -1316,49 +1316,99 @@ end define etp-proc-state-int # Args: int # - if ($arg0 & 0xfffff000) + if ($arg0 & 0xff800000) printf "GARBAGE | " end - if ($arg0 & 0x800) + if ($arg0 & 0x400000) + printf "proxy | " + set $proxy_process = 1 + else + set $proxy_process = 0 + end + if ($arg0 & 0x200000) + printf "running-sys | " + end + if ($arg0 & 0x100000) + printf "active-sys | " + end + if ($arg0 & 0x80000) printf "trapping-exit | " end - if ($arg0 & 0x400) + if ($arg0 & 0x40000) printf "bound | " end - if ($arg0 & 0x200) + if ($arg0 & 0x20000) printf "garbage-collecting | " end - if ($arg0 & 0x100) + if ($arg0 & 0x10000) printf "suspended | " end - if ($arg0 & 0x80) + if ($arg0 & 0x8000) printf "running | " end - if ($arg0 & 0x40) + if ($arg0 & 0x4000) printf "in-run-queue | " end - if ($arg0 & 0x20) + if ($arg0 & 0x2000) printf "active | " end - if ($arg0 & 0x10) + if ($arg0 & 0x1000) printf "pending-exit | " end - if ($arg0 & 0x8) + if ($arg0 & 0x800) printf "exiting | " end - if ($arg0 & 0x4) + if ($arg0 & 0x400) printf "free | " end - if ($arg0 & 0x3) == 0 - printf "prio-max\n" + if ($arg0 & 0x200) + printf "in-prq-low | " + end + if ($arg0 & 0x100) + printf "in-prq-normal | " + end + if ($arg0 & 0x80) + printf "in-prq-high | " + end + if ($arg0 & 0x40) + printf "in-prq-max | " + end + if ($arg0 & 0x30) == 0x0 + printf "prq-prio-max | " else - if ($arg0 & 0x3) == 1 - printf "prio-high\n" + if ($arg0 & 0x30) == 0x10 + printf "prq-prio-high | " else - if ($arg0 & 0x3) == 2 - printf "prio-normal\n" + if ($arg0 & 0x30) == 0x20 + printf "prq-prio-normal | " else - printf "prio-low\n" + printf "prq-prio-low | " + end + end + end + if ($arg0 & 0xc) == 0x0 + printf "usr-prio-max | " + else + if ($arg0 & 0xc) == 0x4 + printf "usr-prio-high | " + else + if ($arg0 & 0xc) == 0x8 + printf "usr-prio-normal | " + else + printf "usr-prio-low | " + end + end + end + if ($arg0 & 0x3) == 0x0 + printf "act-prio-max\n" + else + if ($arg0 & 0x3) == 0x1 + printf "act-prio-high\n" + else + if ($arg0 & 0x3) == 0x2 + printf "act-prio-normal\n" + else + printf "act-prio-low\n" end end end @@ -1395,6 +1445,12 @@ define etp-process-info etp-1 $arg0->common.id printf "\n State: " etp-proc-state $arg0 + if $proxy_process != 0 + printf " Pointer: (Process *) %p\n", $arg0 + printf " *** PROXY process struct *** refer to: \n" + etp-pid2proc-1 $arg0->common.id + etp-process-info $proc + else if (*(((Uint32 *) &(((Process *) $arg0)->state))) & 0x4) == 0 if ($arg0->common.u.alive.reg) printf " Registered name: " @@ -1432,6 +1488,7 @@ define etp-process-info printf " Parent: " etp-1 $arg0->parent printf "\n Pointer: (Process *) %p\n", $arg0 + end end document etp-process-info diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam index 9da4f3cd00..159c91e37c 100644 Binary files a/erts/preloaded/ebin/erlang.beam and b/erts/preloaded/ebin/erlang.beam differ diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam index 3ab52615ab..9d8a4cfd00 100644 Binary files a/erts/preloaded/ebin/erts_internal.beam and b/erts/preloaded/ebin/erts_internal.beam differ diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index a969ef91dc..e521c6fc3d 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -81,7 +81,8 @@ -export([binary_to_list/3, binary_to_term/1, binary_to_term/2]). -export([bit_size/1, bitsize/1, bitstr_to_list/1, bitstring_to_list/1]). -export([bump_reductions/1, byte_size/1, call_on_load_function/1]). --export([cancel_timer/1, check_old_code/1, check_process_code/2, crc32/1]). +-export([cancel_timer/1, check_old_code/1, check_process_code/2, + check_process_code/3, crc32/1]). -export([crc32/2, crc32_combine/3, date/0, decode_packet/3]). -export([delete_element/2]). -export([delete_module/1, demonitor/1, demonitor/2, display/1]). @@ -91,7 +92,7 @@ -export([float_to_binary/1, float_to_binary/2, float_to_list/1, float_to_list/2]). -export([fun_info/2, fun_to_list/1, function_exported/3]). --export([garbage_collect/0, garbage_collect/1]). +-export([garbage_collect/0, garbage_collect/1, garbage_collect/2]). -export([garbage_collect_message_area/0, get/0, get/1, get_keys/1]). -export([get_module_info/1, get_stacktrace/0, group_leader/0]). -export([group_leader/2, halt/0, halt/1, halt/2, hash/2, hibernate/3]). @@ -429,11 +430,71 @@ check_old_code(_Module) -> erlang:nif_error(undefined). %% check_process_code/2 --spec check_process_code(Pid, Module) -> boolean() when +-spec check_process_code(Pid, Module) -> CheckResult when Pid :: pid(), - Module :: module(). -check_process_code(_Pid, _Module) -> - erlang:nif_error(undefined). + Module :: module(), + CheckResult :: boolean(). +check_process_code(Pid, Module) -> + try + erlang:check_process_code(Pid, Module, [{allow_gc, true}]) + catch + error:Error -> erlang:error(Error, [Pid, Module]) + end. + +%% check_process_code/3 +-spec check_process_code(Pid, Module, OptionList) -> CheckResult | async when + Pid :: pid(), + Module :: module(), + RequestId :: term(), + Option :: {async, RequestId} | {allow_gc, boolean()}, + OptionList :: [Option], + CheckResult :: boolean() | aborted. +check_process_code(Pid, Module, OptionList) -> + try + {Async, AllowGC} = get_cpc_opts(OptionList, sync, true), + case Async of + {async, ReqId} -> + {priority, Prio} = erlang:process_info(erlang:self(), + priority), + erts_internal:request_system_task(Pid, + Prio, + {check_process_code, + ReqId, + Module, + AllowGC}), + async; + sync -> + case Pid == erlang:self() of + true -> + erts_internal:check_process_code(Module, + [{allow_gc, AllowGC}]); + false -> + {priority, Prio} = erlang:process_info(erlang:self(), + priority), + ReqId = erlang:make_ref(), + erts_internal:request_system_task(Pid, + Prio, + {check_process_code, + ReqId, + Module, + AllowGC}), + receive + {check_process_code, ReqId, CheckResult} -> + CheckResult + end + end + end + catch + error:Error -> erlang:error(Error, [Pid, Module, OptionList]) + end. + +% gets async and allow_gc opts and verify valid option list +get_cpc_opts([{async, _ReqId} = AsyncTuple | Options], _OldAsync, AllowGC) -> + get_cpc_opts(Options, AsyncTuple, AllowGC); +get_cpc_opts([{allow_gc, AllowGC} | Options], Async, _OldAllowGC) -> + get_cpc_opts(Options, Async, AllowGC); +get_cpc_opts([], Async, AllowGC) -> + {Async, AllowGC}. %% crc32/1 -spec erlang:crc32(Data) -> non_neg_integer() when @@ -793,10 +854,61 @@ garbage_collect() -> erlang:nif_error(undefined). %% garbage_collect/1 --spec garbage_collect(Pid) -> boolean() when - Pid :: pid(). -garbage_collect(_Pid) -> - erlang:nif_error(undefined). +-spec garbage_collect(Pid) -> GCResult when + Pid :: pid(), + GCResult :: boolean(). +garbage_collect(Pid) -> + try + erlang:garbage_collect(Pid, []) + catch + error:Error -> erlang:error(Error, [Pid]) + end. + +%% garbage_collect/2 +-spec garbage_collect(Pid, OptionList) -> GCResult | async when + Pid :: pid(), + RequestId :: term(), + Option :: {async, RequestId}, + OptionList :: [Option], + GCResult :: boolean(). +garbage_collect(Pid, OptionList) -> + try + Async = get_gc_opts(OptionList, sync), + case Async of + {async, ReqId} -> + {priority, Prio} = erlang:process_info(erlang:self(), + priority), + erts_internal:request_system_task(Pid, + Prio, + {garbage_collect, ReqId}), + async; + sync -> + case Pid == erlang:self() of + true -> + erlang:garbage_collect(); + false -> + {priority, Prio} = erlang:process_info(erlang:self(), + priority), + ReqId = erlang:make_ref(), + erts_internal:request_system_task(Pid, + Prio, + {garbage_collect, + ReqId}), + receive + {garbage_collect, ReqId, GCResult} -> + GCResult + end + end + end + catch + error:Error -> erlang:error(Error, [Pid, OptionList]) + end. + +% gets async opt and verify valid option list +get_gc_opts([{async, _ReqId} = AsyncTuple | Options], _OldAsync) -> + get_gc_opts(Options, AsyncTuple); +get_gc_opts([], Async) -> + Async. %% garbage_collect_message_area/0 -spec erlang:garbage_collect_message_area() -> boolean(). diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index 8a8cd52d64..c8e8e7e069 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -33,6 +33,10 @@ -export([port_command/3, port_connect/2, port_close/1, port_control/3, port_call/3, port_info/1, port_info/2]). +-export([request_system_task/3]). + +-export([check_process_code/2]). + %% %% Await result of send to port %% @@ -139,3 +143,20 @@ port_info(_Result) -> port_info(_Result, _Item) -> erlang:nif_error(undefined). + +-spec request_system_task(Pid, Prio, Request) -> 'ok' when + Prio :: 'max' | 'high' | 'normal' | 'low', + Request :: {'garbage_collect', term()} + | {'check_process_code', term(), module(), boolean()}, + Pid :: pid(). + +request_system_task(_Pid, _Prio, _Request) -> + erlang:nif_error(undefined). + +-spec check_process_code(Module, OptionList) -> boolean() when + Module :: module(), + Option :: {allow_gc, boolean()}, + OptionList :: [Option]. +check_process_code(_Module, _OptionList) -> + erlang:nif_error(undefined). + -- cgit v1.2.3 From 406fd5c773b5ae73dbfc6b305a502ffbe236a9bb Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Fri, 8 Nov 2013 17:41:20 +0100 Subject: Use asynchronous check_process_code in code_parallel_SUITE --- erts/emulator/test/code_parallel_load_SUITE.erl | 36 ++++++++++++++++--------- 1 file changed, 24 insertions(+), 12 deletions(-) (limited to 'erts') diff --git a/erts/emulator/test/code_parallel_load_SUITE.erl b/erts/emulator/test/code_parallel_load_SUITE.erl index 1cfe015ea6..428f1242ab 100644 --- a/erts/emulator/test/code_parallel_load_SUITE.erl +++ b/erts/emulator/test/code_parallel_load_SUITE.erl @@ -159,22 +159,34 @@ setup_checkers(_,0) -> []; setup_checkers(T,N) -> [spawn_link(fun() -> ?model:check(T) end) | setup_checkers(T, N-1)]. check_and_purge_processes_code(Pids, M) -> - check_and_purge_processes_code(Pids, M, []). -check_and_purge_processes_code([], M, []) -> + Tag = make_ref(), + N = request_cpc(Pids, M, Tag), + ok = handle_cpc_responses(N, Tag, M), erlang:purge_module(M), + ok. + +request_cpc(Pid, M, Tag) when is_pid(Pid) -> + erlang:check_process_code(Pid, M, [{async, {Tag, Pid}}]), + 1; +request_cpc(Pids, M, Tag) when is_list(Pids) -> + request_cpc(Pids, M, Tag, 0). + +request_cpc([], _M, _Tag, N) -> + N; +request_cpc([Pid|Pids], M, Tag, N) -> + request_cpc(Pids, M, Tag, N + request_cpc(Pid, M, Tag)). + +handle_cpc_responses(0, _Tag, _Module) -> ok; -check_and_purge_processes_code([], M, Pending) -> - io:format("Processes ~w are still executing old code - retrying.~n", [Pending]), - check_and_purge_processes_code(Pending, M, []); -check_and_purge_processes_code([Pid|Pids], M, Pending) -> - case erlang:check_process_code(Pid, M) of - false -> - check_and_purge_processes_code(Pids, M, Pending); - true -> - check_and_purge_processes_code(Pids, M, [Pid|Pending]) +handle_cpc_responses(N, Tag, Module) -> + receive + {check_process_code, {Tag, _Pid}, false} -> + handle_cpc_responses(N-1, Tag, Module); + {check_process_code, {Tag, Pid}, true} -> + 1 = request_cpc(Pid, Module, Tag), + handle_cpc_responses(N, Tag, Module) end. - generate(Module, Attributes, FunStrings) -> FunForms = function_forms(FunStrings), Forms = [ -- cgit v1.2.3 From c6cb0293ba33e1671f8ed670d5add082e5ee674a Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Wed, 30 Oct 2013 17:29:18 +0100 Subject: Functionality for disabling garbage collection Being able to disable garbage collection over context switches vastly simplifies implementation of yielding native code that builds large or complex data structures on the heap. This since the heap can be left in an inconsistent state over the context switch. --- erts/emulator/beam/beam_bif_load.c | 12 ++ erts/emulator/beam/erl_bif_info.c | 13 ++ erts/emulator/beam/erl_debug.c | 3 + erts/emulator/beam/erl_gc.c | 13 +- erts/emulator/beam/erl_process.c | 251 +++++++++++++++++++++++++++++++++-- erts/emulator/beam/erl_process.h | 15 ++- erts/emulator/beam/global.h | 5 + erts/emulator/test/process_SUITE.erl | 79 +++++++++-- erts/etc/unix/etp-commands.in | 5 +- 9 files changed, 368 insertions(+), 28 deletions(-) (limited to 'erts') diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index 5239940a50..3f92c5b025 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -495,6 +495,8 @@ BIF_RETTYPE erts_internal_check_process_code_2(BIF_ALIST_2) res = erts_check_process_code(BIF_P, BIF_ARG_1, allow_gc, &reds); + ASSERT(is_value(res)); + BIF_RET2(res, reds); badarg: @@ -784,6 +786,16 @@ check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp) } } + if (rp->flags & F_DISABLE_GC) { + /* + * Cannot proceed. Process has disabled gc in order to + * safely leave inconsistent data on the heap and/or + * off heap lists. Need to wait for gc to be enabled + * again. + */ + return THE_NON_VALUE; + } + /* * See if there are funs that refer to the old version of the module. */ diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 673dfc658c..f5893f9291 100755 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -3593,6 +3593,19 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2) BIF_RET(am_true); } } + else if (ERTS_IS_ATOM_STR("gc_state", BIF_ARG_1)) { + /* Used by process_SUITE (emulator) */ + int res, enable; + + switch (BIF_ARG_2) { + case am_true: enable = 1; break; + case am_false: enable = 0; break; + default: BIF_ERROR(BIF_P, BADARG); break; + } + + res = erts_set_gc_state(BIF_P, enable); + BIF_RET(res ? am_true : am_false); + } else if (ERTS_IS_ATOM_STR("send_fake_exit_signal", BIF_ARG_1)) { /* Used by signal_SUITE (emulator) */ diff --git a/erts/emulator/beam/erl_debug.c b/erts/emulator/beam/erl_debug.c index b90d00f236..dc79d45be7 100644 --- a/erts/emulator/beam/erl_debug.c +++ b/erts/emulator/beam/erl_debug.c @@ -299,6 +299,9 @@ void erts_check_for_holes(Process* p) ErlHeapFragment* hf; Eterm* start; + if (p->flags & F_DISABLE_GC) + return; + start = p->last_htop ? p->last_htop : HEAP_START(p); check_memory(start, HEAP_TOP(p)); p->last_htop = HEAP_TOP(p); diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c index 8ba94d89e9..da254286c4 100644 --- a/erts/emulator/beam/erl_gc.c +++ b/erts/emulator/beam/erl_gc.c @@ -400,10 +400,16 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) Uint reclaimed_now = 0; int done = 0; Uint ms1, s1, us1; - ErtsSchedulerData *esdp = erts_get_scheduler_data(); + ErtsSchedulerData *esdp; #ifdef USE_VM_PROBES DTRACE_CHARBUF(pidbuf, DTRACE_TERM_BUF_SIZE); #endif + + if (p->flags & F_DISABLE_GC) + return 1; + + esdp = erts_get_scheduler_data(); + if (IS_TRACED_FL(p, F_TRACE_GC)) { trace_gc(p, am_gc_start); } @@ -532,6 +538,9 @@ erts_garbage_collect_hibernate(Process* p) Uint area_size; Sint offs; + if (p->flags & F_DISABLE_GC) + ERTS_INTERNAL_ERROR("GC disabled"); + /* * Preliminaries. */ @@ -667,6 +676,8 @@ erts_garbage_collect_literals(Process* p, Eterm* literals, Uint n; struct erl_off_heap_header** prev; + if (p->flags & F_DISABLE_GC) + return; /* * Set GC state. */ diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index ad806da660..13b18e9e0e 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -513,6 +513,11 @@ erts_pre_init_process(void) erts_psd_required_locks[ERTS_PSD_CALL_TIME_BP].set_locks = ERTS_PSD_CALL_TIME_BP_SET_LOCKS; + erts_psd_required_locks[ERTS_PSD_DELAYED_GC_TASK_QS].get_locks + = ERTS_PSD_DELAYED_GC_TASK_QS_GET_LOCKS; + erts_psd_required_locks[ERTS_PSD_DELAYED_GC_TASK_QS].set_locks + = ERTS_PSD_DELAYED_GC_TASK_QS_SET_LOCKS; + /* Check that we have locks for all entries */ for (ix = 0; ix < ERTS_PSD_SIZE; ix++) { ERTS_SMP_LC_ASSERT(erts_psd_required_locks[ix].get_locks); @@ -7033,7 +7038,7 @@ erts_set_process_priority(Process *p, Eterm value) oprio = ERTS_PSFLGS_GET_USR_PRIO(a); n = e = a; - if (!(a & ERTS_PSFLG_ACTIVE_SYS)) + if (!(a & (ERTS_PSFLG_ACTIVE_SYS|ERTS_PSFLG_DELAYED_SYS))) aprio = nprio; else { int max_qbit; @@ -7043,7 +7048,15 @@ erts_set_process_priority(Process *p, Eterm value) slocked = 1; } - max_qbit = p->sys_task_qs->qmask; + max_qbit = 0; + if (a & ERTS_PSFLG_ACTIVE_SYS) + max_qbit |= p->sys_task_qs->qmask; + if (a & ERTS_PSFLG_DELAYED_SYS) { + ErtsProcSysTaskQs *qs; + qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(p); + ASSERT(qs); + max_qbit |= qs->qmask; + } max_qbit &= -max_qbit; switch (max_qbit) { case MAX_BIT: @@ -7731,12 +7744,14 @@ notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, Eterm st_result) } static ERTS_INLINE ErtsProcSysTask * -fetch_sys_task(Process *c_p, erts_aint32_t state, int *qmaskp) +fetch_sys_task(Process *c_p, erts_aint32_t state, int *qmaskp, int *priop) { ErtsProcSysTaskQs *unused_qs = NULL; int qbit, qmask; ErtsProcSysTask *st, **qp; + *priop = -1; /* Shut up annoying erroneous warning */ + erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); if (!c_p->sys_task_qs) { @@ -7770,20 +7785,24 @@ fetch_sys_task(Process *c_p, erts_aint32_t state, int *qmaskp) switch (qbit) { case MAX_BIT: qp = &c_p->sys_task_qs->q[PRIORITY_MAX]; + *priop = PRIORITY_MAX; break; case HIGH_BIT: qp = &c_p->sys_task_qs->q[PRIORITY_HIGH]; + *priop = PRIORITY_HIGH; break; case NORMAL_BIT: if (!(qmask & PRIORITY_LOW) || ++c_p->sys_task_qs->ncount <= RESCHEDULE_LOW) { qp = &c_p->sys_task_qs->q[PRIORITY_NORMAL]; + *priop = PRIORITY_NORMAL; break; } c_p->sys_task_qs->ncount = 0; /* Fall through */ case LOW_BIT: qp = &c_p->sys_task_qs->q[PRIORITY_LOW]; + *priop = PRIORITY_LOW; break; default: ERTS_INTERNAL_ERROR("Invalid qmask"); @@ -7807,6 +7826,12 @@ fetch_sys_task(Process *c_p, erts_aint32_t state, int *qmaskp) qmask2 = qmask; + if (state & ERTS_PSFLG_DELAYED_SYS) { + ErtsProcSysTaskQs *qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(c_p); + ASSERT(qs); + qmask2 |= qs->qmask; + } + switch (qmask2 & -qmask2) { case MAX_BIT: st_prio = PRIORITY_MAX; @@ -7818,17 +7843,18 @@ fetch_sys_task(Process *c_p, erts_aint32_t state, int *qmaskp) st_prio = PRIORITY_NORMAL; break; case LOW_BIT: - st_prio = PRIORITY_LOW; - break; case 0: st_prio = PRIORITY_LOW; - unused_qs = c_p->sys_task_qs; - c_p->sys_task_qs = NULL; break; default: ERTS_INTERNAL_ERROR("Invalid qmask"); } + if (!qmask) { + unused_qs = c_p->sys_task_qs; + c_p->sys_task_qs = NULL; + } + a = state; do { erts_aint32_t prio = ERTS_PSFLGS_GET_USR_PRIO(a); @@ -7862,6 +7888,8 @@ done: return st; } +static void save_gc_task(Process *c_p, ErtsProcSysTask *st, int prio); + static int execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) { @@ -7875,6 +7903,7 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) do { ErtsProcSysTask *st; + int st_prio; Eterm st_res; if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { @@ -7886,24 +7915,39 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) break; } - st = fetch_sys_task(c_p, state, &qmask); + st = fetch_sys_task(c_p, state, &qmask, &st_prio); if (!st) break; switch (st->type) { case ERTS_PSTT_GC: - if (!garbage_collected) { - FLAGS(c_p) |= F_NEED_FULLSWEEP; - reds += erts_garbage_collect(c_p, 0, c_p->arg_reg, c_p->arity); - garbage_collected = 1; + if (c_p->flags & F_DISABLE_GC) { + save_gc_task(c_p, st, st_prio); + st = NULL; + reds++; + } + else { + if (!garbage_collected) { + FLAGS(c_p) |= F_NEED_FULLSWEEP; + reds += erts_garbage_collect(c_p, + 0, + c_p->arg_reg, + c_p->arity); + garbage_collected = 1; + } + st_res = am_true; } - st_res = am_true; break; case ERTS_PSTT_CPC: st_res = erts_check_process_code(c_p, st->arg[0], st->arg[1] == am_true, &reds); + if (is_non_value(st_res)) { + /* Needed gc, but gc was disabled */ + save_gc_task(c_p, st, st_prio); + st = NULL; + } break; default: ERTS_INTERNAL_ERROR("Invalid process sys task type"); @@ -7934,8 +7978,9 @@ cleanup_sys_tasks(Process *c_p, erts_aint32_t in_state, int in_reds) do { ErtsProcSysTask *st; Eterm st_res; + int st_prio; - st = fetch_sys_task(c_p, state, &qmask); + st = fetch_sys_task(c_p, state, &qmask, &st_prio); if (!st) break; @@ -8168,6 +8213,183 @@ badarg: BIF_ERROR(BIF_P, BADARG); } +static void +save_gc_task(Process *c_p, ErtsProcSysTask *st, int prio) +{ + erts_aint32_t state; + ErtsProcSysTaskQs *qs; + + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); + + qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(c_p); + if (!qs) { + st->next = st->prev = st; + qs = proc_sys_task_queues_alloc(); + qs->qmask = 1 << prio; + qs->ncount = 0; + qs->q[PRIORITY_MAX] = NULL; + qs->q[PRIORITY_HIGH] = NULL; + qs->q[PRIORITY_NORMAL] = NULL; + qs->q[PRIORITY_LOW] = NULL; + qs->q[prio] = st; + (void) ERTS_PROC_SET_DELAYED_GC_TASK_QS(c_p, ERTS_PROC_LOCK_MAIN, qs); + } + else { + if (!qs->q[prio]) { + st->next = st->prev = st; + qs->q[prio] = st; + qs->qmask |= 1 << prio; + } + else { + st->next = qs->q[prio]; + st->prev = qs->q[prio]->prev; + st->next->prev = st; + st->prev->next = st; + ASSERT(qs->qmask & (1 << prio)); + } + } + + state = erts_smp_atomic32_read_nob(&c_p->state); + ASSERT((ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS) & state); + + while (!(state & ERTS_PSFLG_DELAYED_SYS) + || prio < ERTS_PSFLGS_GET_ACT_PRIO(state)) { + erts_aint32_t n, e; + + n = e = state; + n |= ERTS_PSFLG_DELAYED_SYS; + if (prio < ERTS_PSFLGS_GET_ACT_PRIO(state)) { + n &= ~ERTS_PSFLGS_ACT_PRIO_MASK; + n |= prio << ERTS_PSFLGS_ACT_PRIO_OFFSET; + } + state = erts_smp_atomic32_cmpxchg_relb(&c_p->state, n, e); + if (state == e) + break; + } +} + +int +erts_set_gc_state(Process *c_p, int enable) +{ + int res; + ErtsProcSysTaskQs *dgc_tsk_qs; + ASSERT(c_p == erts_get_current_process()); + ASSERT((ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS) + & erts_smp_atomic32_read_nob(&c_p->state)); + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); + + res = !(c_p->flags & F_DISABLE_GC); + + if (!enable) { + c_p->flags |= F_DISABLE_GC; + return res; + } + + c_p->flags &= ~F_DISABLE_GC; + + dgc_tsk_qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(c_p); + if (!dgc_tsk_qs) + return res; + + /* Move delayed gc tasks into sys tasks queues. */ + + erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); + + if (!c_p->sys_task_qs) { + c_p->sys_task_qs = dgc_tsk_qs; + dgc_tsk_qs = NULL; + } + else { + ErtsProcSysTaskQs *stsk_qs; + int prio; + + /* + * We push delayed tasks to the front of the queue + * since they have already made it to the front + * once and then been delayed after that. + */ + + stsk_qs = c_p->sys_task_qs; + + while (dgc_tsk_qs->qmask) { + int qbit = dgc_tsk_qs->qmask & -dgc_tsk_qs->qmask; + dgc_tsk_qs->qmask &= ~qbit; + switch (qbit) { + case MAX_BIT: + prio = PRIORITY_MAX; + break; + case HIGH_BIT: + prio = PRIORITY_HIGH; + break; + case NORMAL_BIT: + prio = PRIORITY_NORMAL; + break; + case LOW_BIT: + prio = PRIORITY_LOW; + break; + default: + ERTS_INTERNAL_ERROR("Invalid qmask"); + prio = -1; + break; + } + + ASSERT(dgc_tsk_qs->q[prio]); + + if (!stsk_qs->q[prio]) { + stsk_qs->q[prio] = dgc_tsk_qs->q[prio]; + stsk_qs->qmask |= 1 << prio; + } + else { + ErtsProcSysTask *first1, *last1, *first2, *last2; + + ASSERT(stsk_qs->qmask & (1 << prio)); + first1 = dgc_tsk_qs->q[prio]; + last1 = first1->prev; + first2 = stsk_qs->q[prio]; + last2 = first1->prev; + + last1->next = first2; + first2->prev = last1; + + first1->prev = last2; + last2->next = first1; + + stsk_qs->q[prio] = first1; + } + + } + } + +#ifdef DEBUG + { + int qmask; + erts_aint32_t aprio, state = +#endif + + erts_smp_atomic32_read_bset_nob(&c_p->state, + (ERTS_PSFLG_DELAYED_SYS + | ERTS_PSFLG_ACTIVE_SYS), + ERTS_PSFLG_ACTIVE_SYS); + +#ifdef DEBUG + ASSERT(state & ERTS_PSFLG_DELAYED_SYS); + qmask = c_p->sys_task_qs->qmask; + aprio = ERTS_PSFLGS_GET_ACT_PRIO(state); + ASSERT(ERTS_PSFLGS_GET_USR_PRIO(state) >= aprio); + ASSERT((qmask & -qmask) >= (1 << aprio)); + } +#endif + + erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS); + + (void) ERTS_PROC_SET_DELAYED_GC_TASK_QS(c_p, ERTS_PROC_LOCK_MAIN, NULL); + + if (dgc_tsk_qs) + proc_sys_task_queues_free(dgc_tsk_qs); + + return res; +} + void erts_sched_stat_modify(int what) { @@ -9839,6 +10061,7 @@ erts_continue_exit_process(Process *p) p->flags &= ~F_USING_DB; } + erts_set_gc_state(p, 1); state = erts_smp_atomic32_read_acqb(&p->state); if (state & ERTS_PSFLG_ACTIVE_SYS) { if (cleanup_sys_tasks(p, state, CONTEXT_REDS) >= CONTEXT_REDS/2) diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 814e3d34df..0b8d2976ed 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -631,8 +631,9 @@ erts_smp_reset_max_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi) #define ERTS_PSD_SCHED_ID 2 #define ERTS_PSD_DIST_ENTRY 3 #define ERTS_PSD_CALL_TIME_BP 4 +#define ERTS_PSD_DELAYED_GC_TASK_QS 5 -#define ERTS_PSD_SIZE 5 +#define ERTS_PSD_SIZE 6 typedef struct { void *data[ERTS_PSD_SIZE]; @@ -656,6 +657,9 @@ typedef struct { #define ERTS_PSD_CALL_TIME_BP_GET_LOCKS ERTS_PROC_LOCK_MAIN #define ERTS_PSD_CALL_TIME_BP_SET_LOCKS ERTS_PROC_LOCK_MAIN +#define ERTS_PSD_DELAYED_GC_TASK_QS_GET_LOCKS ERTS_PROC_LOCK_MAIN +#define ERTS_PSD_DELAYED_GC_TASK_QS_SET_LOCKS ERTS_PROC_LOCK_MAIN + typedef struct { ErtsProcLocks get_locks; ErtsProcLocks set_locks; @@ -859,6 +863,7 @@ struct process { Uint64 bin_old_vheap; /* Virtual old heap size for binaries */ ErtsProcSysTaskQs *sys_task_qs; + erts_smp_atomic32_t state; /* Process state flags (see ERTS_PSFLG_*) */ #ifdef ERTS_SMP @@ -976,6 +981,7 @@ void erts_check_for_holes(Process* p); #define ERTS_PSFLG_ACTIVE_SYS ERTS_PSFLG_BIT(14) #define ERTS_PSFLG_RUNNING_SYS ERTS_PSFLG_BIT(15) #define ERTS_PSFLG_PROXY ERTS_PSFLG_BIT(16) +#define ERTS_PSFLG_DELAYED_SYS ERTS_PSFLG_BIT(17) #define ERTS_PSFLGS_IN_PRQ_MASK (ERTS_PSFLG_IN_PRQ_MAX \ | ERTS_PSFLG_IN_PRQ_HIGH \ @@ -1102,6 +1108,7 @@ extern struct erts_system_profile_flags_t erts_system_profile_flags; #define F_HAVE_BLCKD_MSCHED (1 << 8) /* Process has blocked multi-scheduling */ #define F_P2PNR_RESCHED (1 << 9) /* Process has been rescheduled via erts_pid2proc_not_running() */ #define F_FORCE_GC (1 << 10) /* Force gc at process in-scheduling */ +#define F_DISABLE_GC (1 << 11) /* Disable GC */ /* process trace_flags */ #define F_SENSITIVE (1 << 0) @@ -1192,6 +1199,7 @@ void erts_late_init_process(void); void erts_early_init_scheduling(int); void erts_init_scheduling(int, int); +int erts_set_gc_state(Process *c_p, int enable); Eterm erts_sched_wall_time_request(Process *c_p, int set, int enable); Eterm erts_gc_info_request(Process *c_p); Uint64 erts_get_proc_interval(void); @@ -1637,6 +1645,11 @@ erts_psd_set(Process *p, ErtsProcLocks plocks, int ix, void *data) #define ERTS_PROC_SET_CALL_TIME(P, L, PBT) \ ((process_breakpoint_time_t *) erts_psd_set((P), (L), ERTS_PSD_CALL_TIME_BP, (void *) (PBT))) +#define ERTS_PROC_GET_DELAYED_GC_TASK_QS(P) \ + ((ErtsProcSysTaskQs *) erts_psd_get((P), ERTS_PSD_DELAYED_GC_TASK_QS)) +#define ERTS_PROC_SET_DELAYED_GC_TASK_QS(P, L, PBT) \ + ((ErtsProcSysTaskQs *) erts_psd_set((P), (L), ERTS_PSD_DELAYED_GC_TASK_QS, (void *) (PBT))) + ERTS_GLB_INLINE Eterm erts_proc_get_error_handler(Process *p); ERTS_GLB_INLINE Eterm erts_proc_set_error_handler(Process *p, diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index 885f092aca..6b720b53d8 100755 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -1114,7 +1114,12 @@ erts_alloc_message_heap_state(Uint size, if (statep) *statep = state; if ((state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) + || (receiver->flags & F_DISABLE_GC) || HEAP_LIMIT(receiver) - HEAP_TOP(receiver) <= size) { + /* + * The heap is either potentially in an inconsistent + * state, or not large enough. + */ #ifdef ERTS_SMP if (locked_main) { *receiver_locks &= ~ERTS_PROC_LOCK_MAIN; diff --git a/erts/emulator/test/process_SUITE.erl b/erts/emulator/test/process_SUITE.erl index e66c6f09b6..bf31655066 100644 --- a/erts/emulator/test/process_SUITE.erl +++ b/erts/emulator/test/process_SUITE.erl @@ -55,7 +55,9 @@ no_priority_inversion/1, no_priority_inversion2/1, system_task_blast/1, - system_task_on_suspended/1]). + system_task_on_suspended/1, + gc_request_when_gc_disabled/1, + gc_request_blast_when_gc_disabled/1]). -export([prio_server/2, prio_client/2]). -export([init_per_testcase/2, end_per_testcase/2]). @@ -95,7 +97,8 @@ groups() -> otp_7738_resume]}, {system_task, [], [no_priority_inversion, no_priority_inversion2, - system_task_blast, system_task_on_suspended]}]. + system_task_blast, system_task_on_suspended, + gc_request_when_gc_disabled, gc_request_blast_when_gc_disabled]}]. init_per_suite(Config) -> A0 = case application:start(sasl) of @@ -2361,15 +2364,69 @@ system_task_on_suspended(Config) when is_list(Config) -> ok end. -gc_req(_Pid, 0) -> - []; -gc_req(Pid, N) -> - R0 = erts_internal:request_system_task(Pid, low, garbage_collect), - R1 = erts_internal:request_system_task(Pid, normal, garbage_collect), - R2 = erts_internal:request_system_task(Pid, high, garbage_collect), - R3 = erts_internal:request_system_task(Pid, max, garbage_collect), - [R0, R1, R2, R3 | gc_req(Pid, N-1)]. - +gc_request_when_gc_disabled(Config) when is_list(Config) -> + Master = self(), + AIS = erts_debug:set_internal_state(available_internal_state, true), + {P, M} = spawn_opt(fun () -> + true = erts_debug:set_internal_state(gc_state, + false), + Master ! {self(), gc_state, false}, + receive after 1000 -> ok end, + Master ! {self(), gc_state, true}, + false = erts_debug:set_internal_state(gc_state, + true), + receive after 100 -> ok end + end, [monitor, link]), + receive {P, gc_state, false} -> ok end, + ReqId = make_ref(), + async = garbage_collect(P, [{async, ReqId}]), + receive + {garbage_collect, ReqId, Result} -> + ?t:fail({unexpected_gc, Result}); + {P, gc_state, true} -> + ok + end, + receive {garbage_collect, ReqId, true} -> ok end, + erts_debug:set_internal_state(available_internal_state, AIS), + receive {'DOWN', M, process, P, _Reason} -> ok end, + ok. + +gc_request_blast_when_gc_disabled(Config) when is_list(Config) -> + Master = self(), + AIS = erts_debug:set_internal_state(available_internal_state, true), + {P, M} = spawn_opt(fun () -> + true = erts_debug:set_internal_state(gc_state, + false), + Master ! {self(), gc_state, false}, + receive after 1000 -> ok end, + false = erts_debug:set_internal_state(gc_state, + true), + receive after 100 -> ok end + end, [monitor, link]), + receive {P, gc_state, false} -> ok end, + PMs = lists:map(fun (N) -> + Prio = case N rem 4 of + 0 -> max; + 1 -> high; + 2 -> normal; + 3 -> low + end, + spawn_opt(fun () -> + erlang:garbage_collect(P) + end, [monitor, link, {priority, Prio}]) + end, lists:seq(1, 10000)), + lists:foreach(fun ({Proc, Mon}) -> + receive + {'DOWN', Mon, process, Proc, normal} -> + ok + end + end, + PMs), + erts_debug:set_internal_state(available_internal_state, AIS), + receive {'DOWN', M, process, P, _Reason} -> ok end, + ok. + + %% Internal functions wait_until(Fun) -> diff --git a/erts/etc/unix/etp-commands.in b/erts/etc/unix/etp-commands.in index 6e3466b8c0..73887931cc 100644 --- a/erts/etc/unix/etp-commands.in +++ b/erts/etc/unix/etp-commands.in @@ -1316,9 +1316,12 @@ end define etp-proc-state-int # Args: int # - if ($arg0 & 0xff800000) + if ($arg0 & 0xff000000) printf "GARBAGE | " end + if ($arg0 & 0x800000) + printf "delayed-sys | " + end if ($arg0 & 0x400000) printf "proxy | " set $proxy_process = 1 -- cgit v1.2.3 From 7f17d16335231b4bedd9d2cdd7507ab88a5bf021 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 8 Oct 2013 12:18:46 +0200 Subject: trapping binary_to_term passing binary_SUITE --- erts/emulator/beam/atom.names | 1 + erts/emulator/beam/erl_debug.c | 2 +- erts/emulator/beam/external.c | 299 ++++++++++++++++++++++++++++++++++------- erts/emulator/beam/external.h | 1 + erts/emulator/beam/sys.h | 6 + 5 files changed, 256 insertions(+), 53 deletions(-) (limited to 'erts') diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 8433fd826a..71addedfdc 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -115,6 +115,7 @@ atom binary_longest_prefix_trap atom binary_longest_suffix_trap atom binary_match_trap atom binary_matches_trap +atom binary_to_term_trap atom block atom blocked atom bm diff --git a/erts/emulator/beam/erl_debug.c b/erts/emulator/beam/erl_debug.c index dc79d45be7..873a9860da 100644 --- a/erts/emulator/beam/erl_debug.c +++ b/erts/emulator/beam/erl_debug.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1998-2012. All Rights Reserved. + * Copyright Ericsson AB 1998-2013. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 1c88765381..b1cdad1343 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -61,6 +61,9 @@ */ # define ERTS_DEBUG_USE_DIST_SEP # endif +# define IF_DEBUG(X) X +#else +# define IF_DEBUG(X) #endif /* Does Sint fit in Sint32? @@ -89,7 +92,8 @@ static int enc_term_int(Process *p,ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, static Uint is_external_string(Eterm obj, int* p_is_string); static byte* enc_atom(ErtsAtomCacheMap *, Eterm, byte*, Uint32); static byte* enc_pid(ErtsAtomCacheMap *, Eterm, byte*, Uint32); -static byte* dec_term(ErtsDistExternal *, Eterm**, byte*, ErlOffHeap*, Eterm*); +struct B2TContext_t; +static byte* dec_term(ErtsDistExternal *, Eterm**, byte*, ErlOffHeap*, Eterm*, struct B2TContext_t*); static byte* dec_atom(ErtsDistExternal *, byte*, Eterm*); static byte* dec_pid(ErtsDistExternal *, Eterm**, byte*, ErlOffHeap*, Eterm*); static Sint decoded_size(byte *ep, byte* endp, int internal_tags); @@ -102,11 +106,19 @@ static Uint encode_size_struct2(ErtsAtomCacheMap *, Eterm, unsigned); static int encode_size_struct_int(Process *p, ErtsAtomCacheMap *acmp, Eterm obj, unsigned dflags, Sint *reds, Uint *res); +static Export binary_to_term_trap_export; +static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1); +static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b); + void erts_init_external(void) { #if 1 /* In R16 */ erts_init_trap_export(&term_to_binary_trap_export, am_erlang, am_term_to_binary_trap, 1, &term_to_binary_trap_1); + + erts_init_trap_export(&binary_to_term_trap_export, + am_erlang, am_binary_to_term_trap, 1, + &binary_to_term_trap_1); #else sys_memset((void *) &term_to_binary_trap_export, 0, sizeof(Export)); term_to_binary_trap_export.address = &term_to_binary_trap_export.code[3]; @@ -927,7 +939,7 @@ erts_decode_dist_ext(Eterm** hpp, goto error; ep++; } - ep = dec_term(edep, hpp, ep, off_heap, &obj); + ep = dec_term(edep, hpp, ep, off_heap, &obj, NULL); if (!ep) goto error; @@ -948,7 +960,7 @@ Eterm erts_decode_ext(Eterm **hpp, ErlOffHeap *off_heap, byte **ext) byte *ep = *ext; if (*ep++ != VERSION_MAGIC) return THE_NON_VALUE; - ep = dec_term(NULL, hpp, ep, off_heap, &obj); + ep = dec_term(NULL, hpp, ep, off_heap, &obj, NULL); if (!ep) { #ifdef DEBUG bin_write(ERTS_PRINT_STDERR,NULL,*ext,500); @@ -962,7 +974,7 @@ Eterm erts_decode_ext(Eterm **hpp, ErlOffHeap *off_heap, byte **ext) Eterm erts_decode_ext_ets(Eterm **hpp, ErlOffHeap *off_heap, byte *ext) { Eterm obj; - ext = dec_term(NULL, hpp, ext, off_heap, &obj); + ext = dec_term(NULL, hpp, ext, off_heap, &obj, NULL); ASSERT(ext); return obj; } @@ -1112,6 +1124,46 @@ BIF_RETTYPE term_to_binary_2(BIF_ALIST_2) } } + +enum B2TState { /*B2TUncompress,*/ B2TSize, B2TDecodeInit, B2TDecode, B2TDecodeFail, B2TBadArg, B2TDone }; + +typedef struct { +} B2TSizeContext; + +typedef struct { + byte* ep; + Eterm res; + Eterm* next; + Eterm* hp_start; + Eterm* hp; + Eterm* hp_end; +} B2TDecodeContext; + +typedef struct { + /*Uint real_size; + Uint dest_len; + byte *dbytes; + Binary *result_bin; + Binary *destination_bin; + z_stream stream;*/ +} B2TUncompressContext; + +typedef struct B2TContext_t { + Sint heap_size; + byte* aligned_alloc; + ErtsBinary2TermState b2ts; + //Uint ext_size; + Uint reds; + Eterm trap_bin; + enum B2TState state; + union { + B2TSizeContext sc; + B2TDecodeContext dc; + B2TUncompressContext uc; + } u; +} B2TContext; + + static uLongf binary2term_uncomp_size(byte* data, Sint size) { z_stream stream; @@ -1153,7 +1205,7 @@ binary2term_prepare(ErtsBinary2TermState *state, byte *data, Sint data_size) if (size < 1 || *bytes != VERSION_MAGIC) { error: if (state->exttmp) - erts_free(ERTS_ALC_T_TMP, state->extp); + erts_free(ERTS_ALC_T_EXT_TERM_DATA, state->extp); state->extp = NULL; state->exttmp = 0; return -1; @@ -1168,17 +1220,18 @@ binary2term_prepare(ErtsBinary2TermState *state, byte *data, Sint data_size) bytes += 5; size -= 5; if (dest_len > 32*1024*1024 - || (state->extp = erts_alloc_fnf(ERTS_ALC_T_TMP, dest_len)) == NULL) { + || (state->extp = erts_alloc_fnf(ERTS_ALC_T_EXT_TERM_DATA, dest_len)) == NULL) { if (dest_len != binary2term_uncomp_size(bytes, size)) { goto error; } - state->extp = erts_alloc(ERTS_ALC_T_TMP, dest_len); + state->extp = erts_alloc(ERTS_ALC_T_EXT_TERM_DATA, dest_len); } state->exttmp = 1; if (erl_zlib_uncompress(state->extp, &dest_len, bytes, size) != Z_OK) goto error; size = (Sint) dest_len; } + state->extsize = size; res = decoded_size(state->extp, state->extp + size, 0); if (res < 0) goto error; @@ -1190,7 +1243,7 @@ binary2term_abort(ErtsBinary2TermState *state) { if (state->exttmp) { state->exttmp = 0; - erts_free(ERTS_ALC_T_TMP, state->extp); + erts_free(ERTS_ALC_T_EXT_TERM_DATA, state->extp); } } @@ -1198,11 +1251,11 @@ static ERTS_INLINE Eterm binary2term_create(ErtsDistExternal *edep, ErtsBinary2TermState *state, Eterm **hpp, ErlOffHeap *ohp) { Eterm res; - if (!dec_term(edep, hpp, state->extp, ohp, &res)) + if (!dec_term(edep, hpp, state->extp, ohp, &res, NULL)) res = THE_NON_VALUE; if (state->exttmp) { state->exttmp = 0; - erts_free(ERTS_ALC_T_TMP, state->extp); + erts_free(ERTS_ALC_T_EXT_TERM_DATA, state->extp); } return res; } @@ -1225,46 +1278,155 @@ erts_binary2term_create(ErtsBinary2TermState *state, Eterm **hpp, ErlOffHeap *oh return binary2term_create(NULL,state, hpp, ohp); } +static void b2t_destroy_context(B2TContext* context) +{ + erts_free_aligned_binary_bytes_extra(context->aligned_alloc, + ERTS_ALC_T_EXT_TERM_DATA); + context->aligned_alloc = NULL; + binary2term_abort(&context->b2ts); +} + +static void b2t_context_destructor(Binary *context_bin) +{ + B2TContext* ctx = (B2TContext*) ERTS_MAGIC_BIN_DATA(context_bin); + ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(context_bin) == b2t_context_destructor); + + b2t_destroy_context(ctx); +} + +static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1) +{ + Binary *context_bin = ((ProcBin *) binary_val(BIF_ARG_1))->val; + ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(context_bin) == b2t_context_destructor); + + return binary_to_term_int(BIF_P, THE_NON_VALUE, context_bin); +} + BIF_RETTYPE binary_to_term_1(BIF_ALIST_1) { - Sint heap_size; - Eterm res; - Eterm* hp; - Eterm* endp; - Sint size; + return binary_to_term_int(BIF_P, BIF_ARG_1, NULL); +} + +#define B2T_BYTES_PER_REDUCTION 100 + + +static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b) +{ byte* bytes; - byte* temp_alloc = NULL; - ErtsBinary2TermState b2ts; + B2TContext c_buff; + B2TContext *ctx; + Eterm* magic_space = NULL; - if ((bytes = erts_get_aligned_binary_bytes(BIF_ARG_1, &temp_alloc)) == NULL) { - error: - erts_free_aligned_binary_bytes(temp_alloc); - BIF_ERROR(BIF_P, BADARG); - } - size = binary_size(BIF_ARG_1); + if (context_b == NULL) { + /* Setup enough to get started */ + ctx = &c_buff; + ctx->state = B2TSize; + ctx->aligned_alloc = NULL; + IF_DEBUG(ctx->trap_bin = THE_NON_VALUE;) + } else { + ctx = ERTS_MAGIC_BIN_DATA(context_b); + } + ctx->reds = 1; /*(Uint)(ERTS_BIF_REDS_LEFT(p) * B2T_BYTES_PER_REDUCTION);*/ + + do { + switch (ctx->state) { + case B2TSize: + bytes = erts_get_aligned_binary_bytes_extra(bin, + &ctx->aligned_alloc, + ERTS_ALC_T_EXT_TERM_DATA, + 0); + if (bytes == NULL) { + ctx->b2ts.exttmp = 0; + ctx->state = B2TBadArg; + break; + } - heap_size = binary2term_prepare(&b2ts, bytes, size); - if (heap_size < 0) - goto error; + ctx->heap_size = binary2term_prepare(&ctx->b2ts, bytes, + binary_size(bin)); + if (ctx->heap_size < 0) { + ctx->state = B2TBadArg; + break; + } - hp = HAlloc(BIF_P, heap_size); - endp = hp + heap_size; - res = binary2term_create(NULL, &b2ts, &hp, &MSO(BIF_P)); - erts_free_aligned_binary_bytes(temp_alloc); + ctx->state = B2TDecodeInit; - if (hp > endp) { - erl_exit(1, ":%s, line %d: heap overrun by %d words(s)\n", - __FILE__, __LINE__, hp-endp); - } + if (ctx->b2ts.extsize >= ctx->reds) { + ctx->reds = 0; + break; + } + ctx->reds -= ctx->b2ts.extsize; + /*fall through*/ + case B2TDecodeInit: + if (context_b == NULL && ctx->b2ts.extsize > ctx->reds) { + /* dec_term will probably trap, allocate space for magic bin + before result term to make it easy to trim with HRelease. + */ + magic_space = HAlloc(p, PROC_BIN_SIZE); + *magic_space = make_pos_bignum_header(PROC_BIN_SIZE-1); + } + ctx->u.dc.ep = ctx->b2ts.extp; + ctx->u.dc.res = (Eterm) (UWord) NULL; + ctx->u.dc.next = &ctx->u.dc.res; + ctx->u.dc.hp_start = HAlloc(p, ctx->heap_size); + ctx->u.dc.hp = ctx->u.dc.hp_start; + ctx->u.dc.hp_end = ctx->u.dc.hp_start + ctx->heap_size; + ctx->state = B2TDecode; + /*fall through*/ + case B2TDecode: + dec_term(NULL, NULL, NULL, &MSO(p), NULL, ctx); + break; + + case B2TDecodeFail: + HRelease(p, ctx->u.dc.hp_end, ctx->u.dc.hp_start); + /*fall through*/ + case B2TBadArg: + b2t_destroy_context(ctx); + if (context_b) { + erts_set_gc_state(p, 1); + } + BIF_ERROR(p, BADARG); - HRelease(BIF_P, endp, hp); + case B2TDone: + b2t_destroy_context(ctx); - if (res == THE_NON_VALUE) - goto error; + if (ctx->u.dc.hp > ctx->u.dc.hp_end) { + erl_exit(1, ":%s, line %d: heap overrun by %d words(s)\n", + __FILE__, __LINE__, ctx->u.dc.hp - ctx->u.dc.hp_end); + } + HRelease(p, ctx->u.dc.hp_end, ctx->u.dc.hp); - return res; + if (context_b) { + erts_set_gc_state(p, 1); + } + + return ctx->u.dc.res; + + } + }while (ctx->reds); + + if (context_b == NULL) { + ASSERT(ctx->trap_bin == THE_NON_VALUE); + + context_b = erts_create_magic_binary(sizeof(B2TContext), + b2t_context_destructor); + ctx = ERTS_MAGIC_BIN_DATA(context_b); + sys_memcpy(ctx, &c_buff, sizeof(B2TContext)); + + if (!magic_space) { + ASSERT(ctx->state != B2TDecode); + magic_space = HAlloc(p, PROC_BIN_SIZE); + } + ctx->trap_bin = erts_mk_magic_binary_term(&magic_space, &MSO(p), context_b); + + erts_set_gc_state(p, 0); + } + ASSERT(context_b); + ASSERT(ctx->trap_bin != THE_NON_VALUE); + + BUMP_ALL_REDS(p); + BIF_TRAP1(&binary_to_term_trap_export, p, ctx->trap_bin); } BIF_RETTYPE binary_to_term_2(BIF_ALIST_2) @@ -1514,7 +1676,7 @@ typedef struct { } s; } TTBContext; -static void context_destructor(Binary *context_bin) +static void ttb_context_destructor(Binary *context_bin) { TTBContext *context = ERTS_MAGIC_BIN_DATA(context_bin); if (context->alive) { @@ -1567,7 +1729,7 @@ static Eterm erts_term_to_binary_int(Process* p, Eterm Term, int level, Uint fla do { \ if (context_b == NULL) { \ context_b = erts_create_magic_binary(sizeof(TTBContext), \ - context_destructor); \ + ttb_context_destructor); \ context = ERTS_MAGIC_BIN_DATA(context_b); \ memcpy(context,&c_buff,sizeof(TTBContext)); \ } \ @@ -2608,15 +2770,34 @@ undo_offheap_in_area(ErlOffHeap* off_heap, Eterm* start, Eterm* end) ** On failure return NULL and (R13B04) *hpp will be unchanged. */ static byte* -dec_term(ErtsDistExternal *edep, Eterm** hpp, byte* ep, ErlOffHeap* off_heap, Eterm* objp) +dec_term(ErtsDistExternal *edep, Eterm** hpp, byte* ep, ErlOffHeap* off_heap, + Eterm* objp, B2TContext* ctx) { - Eterm* hp_saved = *hpp; + Eterm* hp_saved; int n; ErtsAtomEncoding char_enc; - register Eterm* hp = *hpp; /* Please don't take the address of hp */ - Eterm* next = objp; - - *next = (Eterm) (UWord) NULL; + register Eterm* hp; /* Please don't take the address of hp */ + Eterm* next; + byte* ep_trap_limit; + + if (ctx) { + hp_saved = ctx->u.dc.hp_start; + ep = ctx->u.dc.ep; + ep_trap_limit = ep + ctx->reds; + if (ep_trap_limit < ep) { + ep_trap_limit = (byte*)ERTS_UWORD_MAX; /*SVERK Is there a safe way to create a "largest ptr" */ + } + next = ctx->u.dc.next; + hpp = &ctx->u.dc.hp; + } + else { + hp_saved = *hpp; + ep_trap_limit = (byte*)ERTS_UWORD_MAX; /*SVERK - " - */ + next = objp; + *next = (Eterm) (UWord) NULL; + } + ASSERT(ep < ep_trap_limit); + hp = *hpp; while (next != NULL) { objp = next; @@ -3065,7 +3246,7 @@ dec_term_atom_common: goto error; } *hpp = hp; - ep = dec_term(edep, hpp, ep, off_heap, &temp); + ep = dec_term(edep, hpp, ep, off_heap, &temp, NULL); hp = *hpp; if (ep == NULL) { goto error; @@ -3125,7 +3306,7 @@ dec_term_atom_common: } *hpp = hp; /* Index */ - if ((ep = dec_term(edep, hpp, ep, off_heap, &temp)) == NULL) { + if ((ep = dec_term(edep, hpp, ep, off_heap, &temp, NULL)) == NULL) { goto error; } if (!is_small(temp)) { @@ -3134,7 +3315,7 @@ dec_term_atom_common: old_index = unsigned_val(temp); /* Uniq */ - if ((ep = dec_term(edep, hpp, ep, off_heap, &temp)) == NULL) { + if ((ep = dec_term(edep, hpp, ep, off_heap, &temp, NULL)) == NULL) { goto error; } if (!is_small(temp)) { @@ -3202,7 +3383,7 @@ dec_term_atom_common: } /* Index */ - if ((ep = dec_term(edep, hpp, ep, off_heap, &temp)) == NULL) { + if ((ep = dec_term(edep, hpp, ep, off_heap, &temp, NULL)) == NULL) { goto error; } if (!is_small(temp)) { @@ -3211,7 +3392,7 @@ dec_term_atom_common: old_index = unsigned_val(temp); /* Uniq */ - if ((ep = dec_term(edep, hpp, ep, off_heap, &temp)) == NULL) { + if ((ep = dec_term(edep, hpp, ep, off_heap, &temp, NULL)) == NULL) { goto error; } if (!is_small(temp)) { @@ -3311,8 +3492,22 @@ dec_term_atom_common: } undo_offheap_in_area(off_heap, hp_saved, hp); *hpp = hp_saved; - return NULL; + if (ctx) { + ctx->state = B2TDecodeFail; + } + return NULL; } + if (ep > ep_trap_limit) { + ASSERT(ctx); + ctx->u.dc.ep = ep; + ctx->u.dc.next = next; + ctx->u.dc.hp = hp; + ctx->reds = 0; + return NULL; + } + } + if (ctx) { + ctx->state = B2TDone; } *hpp = hp; return ep; diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index e37d47919e..9afdb52329 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -146,6 +146,7 @@ typedef struct { typedef struct { byte *extp; int exttmp; + Uint extsize; } ErtsBinary2TermState; /* -------------------------------------------------------------------------- */ diff --git a/erts/emulator/beam/sys.h b/erts/emulator/beam/sys.h index 05bff430e3..11ba554a3d 100644 --- a/erts/emulator/beam/sys.h +++ b/erts/emulator/beam/sys.h @@ -282,16 +282,19 @@ typedef unsigned long UWord; typedef long SWord; #define SWORD_CONSTANT(Const) Const##L #define UWORD_CONSTANT(Const) Const##UL +#define ERTS_UWORD_MAX ULONG_MAX #elif SIZEOF_VOID_P == SIZEOF_INT typedef unsigned int UWord; typedef int SWord; #define SWORD_CONSTANT(Const) Const #define UWORD_CONSTANT(Const) Const##U +#define ERTS_UWORD_MAX UINT_MAX #elif SIZEOF_VOID_P == SIZEOF_LONG_LONG typedef unsigned long long UWord; typedef long long SWord; #define SWORD_CONSTANT(Const) Const##LL #define UWORD_CONSTANT(Const) Const##ULL +#define ERTS_UWORD_MAX ULLONG_MAX #else #error Found no appropriate type to use for 'Eterm', 'Uint' and 'Sint' #endif @@ -304,6 +307,7 @@ typedef unsigned long Uint; typedef long Sint; #define SWORD_CONSTANT(Const) Const##L #define UWORD_CONSTANT(Const) Const##UL +#define ERTS_UWORD_MAX ULONG_MAX #define ERTS_SIZEOF_ETERM SIZEOF_LONG #define ErtsStrToSint strtol #elif SIZEOF_VOID_P == SIZEOF_INT @@ -312,6 +316,7 @@ typedef unsigned int Uint; typedef int Sint; #define SWORD_CONSTANT(Const) Const #define UWORD_CONSTANT(Const) Const##U +#define ERTS_UWORD_MAX UINT_MAX #define ERTS_SIZEOF_ETERM SIZEOF_INT #define ErtsStrToSint strtol #elif SIZEOF_VOID_P == SIZEOF_LONG_LONG @@ -320,6 +325,7 @@ typedef unsigned long long Uint; typedef long long Sint; #define SWORD_CONSTANT(Const) Const##LL #define UWORD_CONSTANT(Const) Const##ULL +#define ERTS_UWORD_MAX ULLONG_MAX #define ERTS_SIZEOF_ETERM SIZEOF_LONG_LONG #if defined(__WIN32__) #define ErtsStrToSint _strtoi64 -- cgit v1.2.3 From 78a5dd69c9ed1749f2ad4fd24151e4aa784b1cba Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 10 Oct 2013 12:11:29 +0200 Subject: trapping lists and tuples --- erts/emulator/beam/external.c | 153 +++++++++++++++++++++++++++++++++++------- erts/emulator/beam/sys.h | 6 ++ 2 files changed, 133 insertions(+), 26 deletions(-) (limited to 'erts') diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index b1cdad1343..48536d610b 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -1125,7 +1125,18 @@ BIF_RETTYPE term_to_binary_2(BIF_ALIST_2) } -enum B2TState { /*B2TUncompress,*/ B2TSize, B2TDecodeInit, B2TDecode, B2TDecodeFail, B2TBadArg, B2TDone }; +enum B2TState { + /*B2TUncompress,*/ + B2TSize, + B2TDecodeInit, + B2TDecode, + B2TDecodeList, + B2TDecodeTuple, + + B2TDone, + B2TDecodeFail, + B2TBadArg +}; typedef struct { } B2TSizeContext; @@ -1137,6 +1148,10 @@ typedef struct { Eterm* hp_start; Eterm* hp; Eterm* hp_end; + int remaining_n; +#ifdef DEBUG + Eterm* container_start; +#endif } B2TDecodeContext; typedef struct { @@ -1153,7 +1168,7 @@ typedef struct B2TContext_t { byte* aligned_alloc; ErtsBinary2TermState b2ts; //Uint ext_size; - Uint reds; + SWord reds; Eterm trap_bin; enum B2TState state; union { @@ -1309,6 +1324,12 @@ BIF_RETTYPE binary_to_term_1(BIF_ALIST_1) #define B2T_BYTES_PER_REDUCTION 100 +static unsigned sverk_rand(void) +{ + static unsigned prev = 17; + prev = (prev * 214013 + 2531011); + return prev; +} static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b) { @@ -1326,7 +1347,7 @@ static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b) } else { ctx = ERTS_MAGIC_BIN_DATA(context_b); } - ctx->reds = 1; /*(Uint)(ERTS_BIF_REDS_LEFT(p) * B2T_BYTES_PER_REDUCTION);*/ + ctx->reds = 1 + sverk_rand() % 4; /*(Uint)(ERTS_BIF_REDS_LEFT(p) * B2T_BYTES_PER_REDUCTION);*/ do { switch (ctx->state) { @@ -1360,6 +1381,10 @@ static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b) /*fall through*/ case B2TDecodeInit: if (context_b == NULL && ctx->b2ts.extsize > ctx->reds) { + /*SVERK Can we do a better prediction that is still safe + OR is there a way to do HAlloc after result some how... + ... support for mutiple HReleases maybe? + */ /* dec_term will probably trap, allocate space for magic bin before result term to make it easy to trim with HRelease. */ @@ -1374,7 +1399,9 @@ static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b) ctx->u.dc.hp_end = ctx->u.dc.hp_start + ctx->heap_size; ctx->state = B2TDecode; /*fall through*/ - case B2TDecode: + case B2TDecode: + case B2TDecodeList: + case B2TDecodeTuple: dec_term(NULL, NULL, NULL, &MSO(p), NULL, ctx); break; @@ -1404,7 +1431,7 @@ static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b) return ctx->u.dc.res; } - }while (ctx->reds); + }while (ctx->reds || ctx->state >= B2TDone); if (context_b == NULL) { ASSERT(ctx->trap_bin == THE_NON_VALUE); @@ -1415,7 +1442,7 @@ static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b) sys_memcpy(ctx, &c_buff, sizeof(B2TContext)); if (!magic_space) { - ASSERT(ctx->state != B2TDecode); + ASSERT(ctx->state < B2TDecode); magic_space = HAlloc(p, PROC_BIN_SIZE); } ctx->trap_bin = erts_mk_magic_binary_term(&magic_space, &MSO(p), context_b); @@ -2778,28 +2805,81 @@ dec_term(ErtsDistExternal *edep, Eterm** hpp, byte* ep, ErlOffHeap* off_heap, ErtsAtomEncoding char_enc; register Eterm* hp; /* Please don't take the address of hp */ Eterm* next; - byte* ep_trap_limit; + SWord reds; if (ctx) { + reds = ctx->reds; + next = ctx->u.dc.next; + + if (ctx->state != B2TDecode) { + n = ctx->u.dc.remaining_n; + if (reds < n) { + ctx->u.dc.remaining_n -= reds; + n = reds; + } + else { + ctx->u.dc.remaining_n = 0; + } + reds -= n; + + switch (ctx->state) { + case B2TDecodeList: + objp = next - 2; + while (n > 0) { + objp[0] = (Eterm) COMPRESS_POINTER(next); + objp[1] = make_list(next); + next = objp; + objp -= 2; + n--; + } + break; + + case B2TDecodeTuple: + objp = next - 1; + while (n-- > 0) { + objp[0] = (Eterm) COMPRESS_POINTER(next); + next = objp; + objp--; + } + break; + + default: + ASSERT(!"Unknown state"); + } + if (ctx->u.dc.remaining_n) { + ctx->u.dc.next = next; + ctx->reds = 0; + return NULL; + } + ASSERT(next == ctx->u.dc.container_start); + ctx->state = B2TDecode; + } + hp_saved = ctx->u.dc.hp_start; ep = ctx->u.dc.ep; - ep_trap_limit = ep + ctx->reds; - if (ep_trap_limit < ep) { - ep_trap_limit = (byte*)ERTS_UWORD_MAX; /*SVERK Is there a safe way to create a "largest ptr" */ - } - next = ctx->u.dc.next; hpp = &ctx->u.dc.hp; } else { hp_saved = *hpp; - ep_trap_limit = (byte*)ERTS_UWORD_MAX; /*SVERK - " - */ + reds = ERTS_SWORD_MAX; next = objp; *next = (Eterm) (UWord) NULL; } - ASSERT(ep < ep_trap_limit); hp = *hpp; while (next != NULL) { + + if (reds <= 0) { + if (ctx) { + ctx->u.dc.ep = ep; + ctx->u.dc.next = next; + ctx->u.dc.hp = hp; + ctx->reds = 0; + return NULL; + } + reds = ERTS_SWORD_MAX; + } + objp = next; next = (Eterm *) EXPAND_POINTER(*objp); @@ -2918,8 +2998,20 @@ dec_term_atom_common: tuple_loop: *objp = make_tuple(hp); *hp++ = make_arityval(n); + #ifdef DEBUG + if (ctx) ctx->u.dc.container_start = hp; + #endif hp += n; - objp = hp - 1; + objp = hp - 1; + if (ctx) { + if (reds < n) { + ASSERT(reds > 0); + ctx->state = B2TDecodeTuple; + ctx->u.dc.remaining_n = n - reds; + n = reds; + } + reds -= n; + } while (n-- > 0) { objp[0] = (Eterm) COMPRESS_POINTER(next); next = objp; @@ -2937,17 +3029,30 @@ dec_term_atom_common: break; } *objp = make_list(hp); - hp += 2*n; + #ifdef DEBUG + if (ctx) ctx->u.dc.container_start = hp; + #endif + hp += 2 * n; objp = hp - 2; objp[0] = (Eterm) COMPRESS_POINTER((objp+1)); objp[1] = (Eterm) COMPRESS_POINTER(next); next = objp; objp -= 2; - while (--n > 0) { + n--; + if (ctx) { + if (reds < n) { + ctx->state = B2TDecodeList; + ctx->u.dc.remaining_n = n - reds; + n = reds; + } + reds -= n; + } + while (n > 0) { objp[0] = (Eterm) COMPRESS_POINTER(next); - objp[1] = make_list(objp + 2); + objp[1] = make_list(next); next = objp; objp -= 2; + n--; } break; case STRING_EXT: @@ -3494,20 +3599,16 @@ dec_term_atom_common: *hpp = hp_saved; if (ctx) { ctx->state = B2TDecodeFail; + ctx->reds = reds; } return NULL; } - if (ep > ep_trap_limit) { - ASSERT(ctx); - ctx->u.dc.ep = ep; - ctx->u.dc.next = next; - ctx->u.dc.hp = hp; - ctx->reds = 0; - return NULL; - } + + --reds; } if (ctx) { ctx->state = B2TDone; + ctx->reds = reds; } *hpp = hp; return ep; diff --git a/erts/emulator/beam/sys.h b/erts/emulator/beam/sys.h index 11ba554a3d..0f4169c81d 100644 --- a/erts/emulator/beam/sys.h +++ b/erts/emulator/beam/sys.h @@ -283,18 +283,21 @@ typedef long SWord; #define SWORD_CONSTANT(Const) Const##L #define UWORD_CONSTANT(Const) Const##UL #define ERTS_UWORD_MAX ULONG_MAX +#define ERTS_SWORD_MAX LONG_MAX #elif SIZEOF_VOID_P == SIZEOF_INT typedef unsigned int UWord; typedef int SWord; #define SWORD_CONSTANT(Const) Const #define UWORD_CONSTANT(Const) Const##U #define ERTS_UWORD_MAX UINT_MAX +#define ERTS_SWORD_MAX INT_MAX #elif SIZEOF_VOID_P == SIZEOF_LONG_LONG typedef unsigned long long UWord; typedef long long SWord; #define SWORD_CONSTANT(Const) Const##LL #define UWORD_CONSTANT(Const) Const##ULL #define ERTS_UWORD_MAX ULLONG_MAX +#define ERTS_SWORD_MAX LLONG_MAX #else #error Found no appropriate type to use for 'Eterm', 'Uint' and 'Sint' #endif @@ -308,6 +311,7 @@ typedef long Sint; #define SWORD_CONSTANT(Const) Const##L #define UWORD_CONSTANT(Const) Const##UL #define ERTS_UWORD_MAX ULONG_MAX +#define ERTS_SWORD_MAX LONG_MAX #define ERTS_SIZEOF_ETERM SIZEOF_LONG #define ErtsStrToSint strtol #elif SIZEOF_VOID_P == SIZEOF_INT @@ -317,6 +321,7 @@ typedef int Sint; #define SWORD_CONSTANT(Const) Const #define UWORD_CONSTANT(Const) Const##U #define ERTS_UWORD_MAX UINT_MAX +#define ERTS_SWORD_MAX INT_MAX #define ERTS_SIZEOF_ETERM SIZEOF_INT #define ErtsStrToSint strtol #elif SIZEOF_VOID_P == SIZEOF_LONG_LONG @@ -326,6 +331,7 @@ typedef long long Sint; #define SWORD_CONSTANT(Const) Const##LL #define UWORD_CONSTANT(Const) Const##ULL #define ERTS_UWORD_MAX ULLONG_MAX +#define ERTS_SWORD_MAX LLONG_MAX #define ERTS_SIZEOF_ETERM SIZEOF_LONG_LONG #if defined(__WIN32__) #define ErtsStrToSint _strtoi64 -- cgit v1.2.3 From d42bd3b37d3b741d546d94fd7611afda81bba419 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 10 Oct 2013 12:11:59 +0200 Subject: trapping STRING_EXT --- erts/emulator/beam/external.c | 84 ++++++++++++++++++++++++++++--------------- 1 file changed, 55 insertions(+), 29 deletions(-) (limited to 'erts') diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 48536d610b..2116c25cd2 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -1125,13 +1125,15 @@ BIF_RETTYPE term_to_binary_2(BIF_ALIST_2) } -enum B2TState { +enum B2TState { /* order is somewhat significant */ /*B2TUncompress,*/ B2TSize, B2TDecodeInit, + B2TDecode, B2TDecodeList, B2TDecodeTuple, + B2TDecodeString, B2TDone, B2TDecodeFail, @@ -1150,7 +1152,7 @@ typedef struct { Eterm* hp_end; int remaining_n; #ifdef DEBUG - Eterm* container_start; + Eterm* container_end; #endif } B2TDecodeContext; @@ -1402,6 +1404,7 @@ static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b) case B2TDecode: case B2TDecodeList: case B2TDecodeTuple: + case B2TDecodeString: dec_term(NULL, NULL, NULL, &MSO(p), NULL, ctx); break; @@ -2808,8 +2811,11 @@ dec_term(ErtsDistExternal *edep, Eterm** hpp, byte* ep, ErlOffHeap* off_heap, SWord reds; if (ctx) { - reds = ctx->reds; - next = ctx->u.dc.next; + hp_saved = ctx->u.dc.hp_start; + reds = ctx->reds; + next = ctx->u.dc.next; + ep = ctx->u.dc.ep; + hpp = &ctx->u.dc.hp; if (ctx->state != B2TDecode) { n = ctx->u.dc.remaining_n; @@ -2832,6 +2838,7 @@ dec_term(ErtsDistExternal *edep, Eterm** hpp, byte* ep, ErlOffHeap* off_heap, objp -= 2; n--; } + ASSERT(ctx->u.dc.remaining_n || next == ctx->u.dc.container_end); break; case B2TDecodeTuple: @@ -2841,23 +2848,35 @@ dec_term(ErtsDistExternal *edep, Eterm** hpp, byte* ep, ErlOffHeap* off_heap, next = objp; objp--; } + ASSERT(ctx->u.dc.remaining_n || next == ctx->u.dc.container_end); + break; + + case B2TDecodeString: + hp = *hpp; + hp[-1] = make_list(hp); /* overwrite the premature NIL */ + while (n-- > 0) { + hp[0] = make_small(*ep++); + hp[1] = make_list(hp+2); + hp += 2; + } + hp[-1] = NIL; + *hpp = hp; + ASSERT(ctx->u.dc.remaining_n || hp == ctx->u.dc.container_end); break; default: ASSERT(!"Unknown state"); } - if (ctx->u.dc.remaining_n) { + if (!ctx->u.dc.remaining_n) { + ctx->state = B2TDecode; + } + if (reds <= 0) { ctx->u.dc.next = next; + ctx->u.dc.ep = ep; ctx->reds = 0; return NULL; } - ASSERT(next == ctx->u.dc.container_start); - ctx->state = B2TDecode; } - - hp_saved = ctx->u.dc.hp_start; - ep = ctx->u.dc.ep; - hpp = &ctx->u.dc.hp; } else { hp_saved = *hpp; @@ -2869,17 +2888,6 @@ dec_term(ErtsDistExternal *edep, Eterm** hpp, byte* ep, ErlOffHeap* off_heap, while (next != NULL) { - if (reds <= 0) { - if (ctx) { - ctx->u.dc.ep = ep; - ctx->u.dc.next = next; - ctx->u.dc.hp = hp; - ctx->reds = 0; - return NULL; - } - reds = ERTS_SWORD_MAX; - } - objp = next; next = (Eterm *) EXPAND_POINTER(*objp); @@ -2998,13 +3006,11 @@ dec_term_atom_common: tuple_loop: *objp = make_tuple(hp); *hp++ = make_arityval(n); - #ifdef DEBUG - if (ctx) ctx->u.dc.container_start = hp; - #endif hp += n; objp = hp - 1; if (ctx) { if (reds < n) { + IF_DEBUG(ctx->u.dc.container_end = hp - n;) ASSERT(reds > 0); ctx->state = B2TDecodeTuple; ctx->u.dc.remaining_n = n - reds; @@ -3029,9 +3035,6 @@ dec_term_atom_common: break; } *objp = make_list(hp); - #ifdef DEBUG - if (ctx) ctx->u.dc.container_start = hp; - #endif hp += 2 * n; objp = hp - 2; objp[0] = (Eterm) COMPRESS_POINTER((objp+1)); @@ -3041,6 +3044,7 @@ dec_term_atom_common: n--; if (ctx) { if (reds < n) { + IF_DEBUG(ctx->u.dc.container_end = hp - 2*(n+1);) ctx->state = B2TDecodeList; ctx->u.dc.remaining_n = n - reds; n = reds; @@ -3063,6 +3067,15 @@ dec_term_atom_common: break; } *objp = make_list(hp); + if (ctx) { + if (reds < n) { + IF_DEBUG(ctx->u.dc.container_end = hp + n*2;) + ctx->state = B2TDecodeString; + ctx->u.dc.remaining_n = n - reds; + n = reds; + } + reds -= n; + } while (n-- > 0) { hp[0] = make_small(*ep++); hp[1] = make_list(hp+2); @@ -3604,7 +3617,20 @@ dec_term_atom_common: return NULL; } - --reds; + if (--reds <= 0) { + if (ctx) { + if (next || ctx->state != B2TDecode) { + ctx->u.dc.ep = ep; + ctx->u.dc.next = next; + ctx->u.dc.hp = hp; + ctx->reds = 0; + return NULL; + } + } + else { + reds = ERTS_SWORD_MAX; + } + } } if (ctx) { ctx->state = B2TDone; -- cgit v1.2.3 From 74c5c0314d4cf85365d89c1fdf41a5d286a7191a Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 10 Oct 2013 19:59:09 +0200 Subject: trapping binary_to_term/2 --- erts/emulator/beam/external.c | 73 +++++++++++++++---------------------------- 1 file changed, 26 insertions(+), 47 deletions(-) (limited to 'erts') diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 2116c25cd2..38280b7a99 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -108,7 +108,7 @@ static int encode_size_struct_int(Process *p, ErtsAtomCacheMap *acmp, Eterm obj, static Export binary_to_term_trap_export; static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1); -static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b); +static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* context_b); void erts_init_external(void) { #if 1 /* In R16 */ @@ -1169,7 +1169,7 @@ typedef struct B2TContext_t { Sint heap_size; byte* aligned_alloc; ErtsBinary2TermState b2ts; - //Uint ext_size; + Uint32 flags; SWord reds; Eterm trap_bin; enum B2TState state; @@ -1316,15 +1316,22 @@ static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1) Binary *context_bin = ((ProcBin *) binary_val(BIF_ARG_1))->val; ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(context_bin) == b2t_context_destructor); - return binary_to_term_int(BIF_P, THE_NON_VALUE, context_bin); + return binary_to_term_int(BIF_P, 0, THE_NON_VALUE, context_bin); } BIF_RETTYPE binary_to_term_1(BIF_ALIST_1) { - return binary_to_term_int(BIF_P, BIF_ARG_1, NULL); +/*SVERK if (++sverk_cnt % 1000 == 0) { + erts_fprintf(stderr, "Call #%u to binary_to_term_int()\n", sverk_cnt); + } + if (sverk_cnt == 1767) { + sverk_break(); + } +*/ + return binary_to_term_int(BIF_P, 0, BIF_ARG_1, NULL); } -#define B2T_BYTES_PER_REDUCTION 100 +#define B2T_BYTES_PER_REDUCTION 128 static unsigned sverk_rand(void) { @@ -1333,11 +1340,13 @@ static unsigned sverk_rand(void) return prev; } -static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b) +static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* context_b) { + SWord initial_reds = 1 + sverk_rand() % 4; /*(Uint)(ERTS_BIF_REDS_LEFT(p) * B2T_BYTES_PER_REDUCTION);*/ byte* bytes; B2TContext c_buff; B2TContext *ctx; + ErtsDistExternal fakedep; Eterm* magic_space = NULL; if (context_b == NULL) { @@ -1345,11 +1354,12 @@ static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b) ctx = &c_buff; ctx->state = B2TSize; ctx->aligned_alloc = NULL; + ctx->flags = flags; IF_DEBUG(ctx->trap_bin = THE_NON_VALUE;) } else { ctx = ERTS_MAGIC_BIN_DATA(context_b); } - ctx->reds = 1 + sverk_rand() % 4; /*(Uint)(ERTS_BIF_REDS_LEFT(p) * B2T_BYTES_PER_REDUCTION);*/ + ctx->reds = initial_reds; do { switch (ctx->state) { @@ -1405,7 +1415,8 @@ static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b) case B2TDecodeList: case B2TDecodeTuple: case B2TDecodeString: - dec_term(NULL, NULL, NULL, &MSO(p), NULL, ctx); + fakedep.flags = ctx->flags; + dec_term(&fakedep, NULL, NULL, &MSO(p), NULL, ctx); break; case B2TDecodeFail: @@ -1416,6 +1427,7 @@ static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b) if (context_b) { erts_set_gc_state(p, 1); } + BUMP_REDS(p, (initial_reds - ctx->reds) / B2T_BYTES_PER_REDUCTION); BIF_ERROR(p, BADARG); case B2TDone: @@ -1431,6 +1443,7 @@ static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b) erts_set_gc_state(p, 1); } + BUMP_REDS(p, (initial_reds - ctx->reds) / B2T_BYTES_PER_REDUCTION); return ctx->u.dc.res; } @@ -1461,24 +1474,15 @@ static Eterm binary_to_term_int(Process* p, Eterm bin, Binary* context_b) BIF_RETTYPE binary_to_term_2(BIF_ALIST_2) { - Sint heap_size; - Eterm res; Eterm opts; Eterm opt; - Eterm* hp; - Eterm* endp; - Sint size; - byte* bytes; - byte* temp_alloc = NULL; - ErtsBinary2TermState b2ts; - ErtsDistExternal fakedep; + Uint32 flags = 0; - fakedep.flags = 0; opts = BIF_ARG_2; while (is_list(opts)) { opt = CAR(list_val(opts)); if (opt == am_safe) { - fakedep.flags |= ERTS_DIST_EXT_BTT_SAFE; + flags |= ERTS_DIST_EXT_BTT_SAFE; } else { goto error; @@ -1489,35 +1493,10 @@ BIF_RETTYPE binary_to_term_2(BIF_ALIST_2) if (is_not_nil(opts)) goto error; - if ((bytes = erts_get_aligned_binary_bytes(BIF_ARG_1, &temp_alloc)) == NULL) { - error: - erts_free_aligned_binary_bytes(temp_alloc); - BIF_ERROR(BIF_P, BADARG); - } - size = binary_size(BIF_ARG_1); - - heap_size = binary2term_prepare(&b2ts, bytes, size); - if (heap_size < 0) - goto error; - - hp = HAlloc(BIF_P, heap_size); - endp = hp + heap_size; - - res = binary2term_create(&fakedep, &b2ts, &hp, &MSO(BIF_P)); - - erts_free_aligned_binary_bytes(temp_alloc); - - if (hp > endp) { - erl_exit(1, ":%s, line %d: heap overrun by %d words(s)\n", - __FILE__, __LINE__, hp-endp); - } - - HRelease(BIF_P, endp, hp); - - if (res == THE_NON_VALUE) - goto error; + return binary_to_term_int(BIF_P, flags, BIF_ARG_1, NULL); - return res; +error: + BIF_ERROR(BIF_P, BADARG); } Eterm -- cgit v1.2.3 From 38f1140d197e14f6cb4d0040859b0b19876d2a14 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 11 Oct 2013 16:28:59 +0200 Subject: trapping size calculation --- erts/emulator/beam/external.c | 175 +++++++++++++++++++++++++++--------------- 1 file changed, 114 insertions(+), 61 deletions(-) (limited to 'erts') diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 38280b7a99..d4ffc23536 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -96,7 +96,7 @@ struct B2TContext_t; static byte* dec_term(ErtsDistExternal *, Eterm**, byte*, ErlOffHeap*, Eterm*, struct B2TContext_t*); static byte* dec_atom(ErtsDistExternal *, byte*, Eterm*); static byte* dec_pid(ErtsDistExternal *, Eterm**, byte*, ErlOffHeap*, Eterm*); -static Sint decoded_size(byte *ep, byte* endp, int internal_tags); +static Sint decoded_size(byte *ep, byte* endp, int internal_tags, struct B2TContext_t*); static BIF_RETTYPE term_to_binary_trap_1(BIF_ALIST_1); static Eterm erts_term_to_binary_int(Process* p, Eterm Term, int level, Uint flags, @@ -889,7 +889,7 @@ erts_decode_dist_ext_size(ErtsDistExternal *edep) goto fail; ep = edep->extp+1; } - res = decoded_size(ep, edep->ext_endp, 0); + res = decoded_size(ep, edep->ext_endp, 0, NULL); if (res >= 0) return res; fail: @@ -901,12 +901,12 @@ Sint erts_decode_ext_size(byte *ext, Uint size) { if (size == 0 || *ext != VERSION_MAGIC) return -1; - return decoded_size(ext+1, ext+size, 0); + return decoded_size(ext+1, ext+size, 0, NULL); } Sint erts_decode_ext_size_ets(byte *ext, Uint size) { - Sint sz = decoded_size(ext, ext+size, 1); + Sint sz = decoded_size(ext, ext+size, 1, NULL); ASSERT(sz >= 0); return sz; } @@ -1126,7 +1126,7 @@ BIF_RETTYPE term_to_binary_2(BIF_ALIST_2) enum B2TState { /* order is somewhat significant */ - /*B2TUncompress,*/ + B2TPrepare, B2TSize, B2TDecodeInit, @@ -1141,6 +1141,10 @@ enum B2TState { /* order is somewhat significant */ }; typedef struct { + int heap_size; + int terms; + byte* ep; + int atom_extra_skip; } B2TSizeContext; typedef struct { @@ -1210,21 +1214,15 @@ static uLongf binary2term_uncomp_size(byte* data, Sint size) return err == Z_STREAM_END ? uncomp_size : 0; } -static ERTS_INLINE Sint +static ERTS_INLINE int binary2term_prepare(ErtsBinary2TermState *state, byte *data, Sint data_size) { - Sint res; byte *bytes = data; Sint size = data_size; state->exttmp = 0; if (size < 1 || *bytes != VERSION_MAGIC) { - error: - if (state->exttmp) - erts_free(ERTS_ALC_T_EXT_TERM_DATA, state->extp); - state->extp = NULL; - state->exttmp = 0; return -1; } bytes++; @@ -1239,20 +1237,17 @@ binary2term_prepare(ErtsBinary2TermState *state, byte *data, Sint data_size) if (dest_len > 32*1024*1024 || (state->extp = erts_alloc_fnf(ERTS_ALC_T_EXT_TERM_DATA, dest_len)) == NULL) { if (dest_len != binary2term_uncomp_size(bytes, size)) { - goto error; + return -1; } state->extp = erts_alloc(ERTS_ALC_T_EXT_TERM_DATA, dest_len); } state->exttmp = 1; if (erl_zlib_uncompress(state->extp, &dest_len, bytes, size) != Z_OK) - goto error; + return -1; size = (Sint) dest_len; } state->extsize = size; - res = decoded_size(state->extp, state->extp + size, 0); - if (res < 0) - goto error; - return res; + return 0; } static ERTS_INLINE void @@ -1280,7 +1275,18 @@ binary2term_create(ErtsDistExternal *edep, ErtsBinary2TermState *state, Eterm ** Sint erts_binary2term_prepare(ErtsBinary2TermState *state, byte *data, Sint data_size) { - return binary2term_prepare(state, data, data_size); + Sint res; + + if (binary2term_prepare(state, data, data_size) < 0 || + (res=decoded_size(state->extp, state->extp + state->extsize, 0, NULL)) < 0) { + + if (state->exttmp) + erts_free(ERTS_ALC_T_EXT_TERM_DATA, state->extp); + state->extp = NULL; + state->exttmp = 0; + return -1; + } + return res; } void @@ -1319,17 +1325,6 @@ static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1) return binary_to_term_int(BIF_P, 0, THE_NON_VALUE, context_bin); } -BIF_RETTYPE binary_to_term_1(BIF_ALIST_1) -{ -/*SVERK if (++sverk_cnt % 1000 == 0) { - erts_fprintf(stderr, "Call #%u to binary_to_term_int()\n", sverk_cnt); - } - if (sverk_cnt == 1767) { - sverk_break(); - } -*/ - return binary_to_term_int(BIF_P, 0, BIF_ARG_1, NULL); -} #define B2T_BYTES_PER_REDUCTION 128 @@ -1352,18 +1347,19 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con if (context_b == NULL) { /* Setup enough to get started */ ctx = &c_buff; - ctx->state = B2TSize; + ctx->state = B2TPrepare; ctx->aligned_alloc = NULL; ctx->flags = flags; IF_DEBUG(ctx->trap_bin = THE_NON_VALUE;) } else { ctx = ERTS_MAGIC_BIN_DATA(context_b); + ASSERT(ctx->state != B2TPrepare); } ctx->reds = initial_reds; do { switch (ctx->state) { - case B2TSize: + case B2TPrepare: bytes = erts_get_aligned_binary_bytes_extra(bin, &ctx->aligned_alloc, ERTS_ALC_T_EXT_TERM_DATA, @@ -1381,16 +1377,19 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con break; } + ctx->reds = 0; /*SVERK*/ + ctx->u.sc.heap_size = 0; + ctx->u.sc.terms = 1; + ctx->u.sc.ep = ctx->b2ts.extp; + ctx->u.sc.atom_extra_skip = 0; + ctx->state = B2TSize; + break; + case B2TSize: + ctx->heap_size = decoded_size(NULL, ctx->b2ts.extp + ctx->b2ts.extsize, + 0, ctx); + break; - ctx->state = B2TDecodeInit; - - if (ctx->b2ts.extsize >= ctx->reds) { - ctx->reds = 0; - break; - } - ctx->reds -= ctx->b2ts.extsize; - /*fall through*/ case B2TDecodeInit: if (context_b == NULL && ctx->b2ts.extsize > ctx->reds) { /*SVERK Can we do a better prediction that is still safe @@ -1472,6 +1471,18 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con BIF_TRAP1(&binary_to_term_trap_export, p, ctx->trap_bin); } +BIF_RETTYPE binary_to_term_1(BIF_ALIST_1) +{ +/*SVERK if (++sverk_cnt % 1000 == 0) { + erts_fprintf(stderr, "Call #%u to binary_to_term_int()\n", sverk_cnt); + } + if (sverk_cnt == 1767) { + sverk_break(); + } +*/ + return binary_to_term_int(BIF_P, 0, BIF_ARG_1, NULL); +} + BIF_RETTYPE binary_to_term_2(BIF_ALIST_2) { Eterm opts; @@ -3901,18 +3912,32 @@ encode_size_struct_int(Process *p, ErtsAtomCacheMap *acmp, Eterm obj, } static Sint -decoded_size(byte *ep, byte* endp, int internal_tags) +decoded_size(byte *ep, byte* endp, int internal_tags, B2TContext* ctx) { int heap_size = 0; int terms; int atom_extra_skip = 0; Uint n; + SWord reds; + if (ctx) { + heap_size = ctx->u.sc.heap_size; + terms = ctx->u.sc.terms; + ep = ctx->u.sc.ep; + atom_extra_skip = ctx->u.sc.atom_extra_skip; + reds = ctx->reds; + } + else { + heap_size = 0; + terms = 1; + atom_extra_skip = 0; + reds = ERTS_SWORD_MAX; + } #define SKIP(sz) \ do { \ if ((sz) <= endp-ep) { \ ep += (sz); \ - } else { return -1; }; \ + } else { goto error; }; \ } while (0) #define SKIP2(sz1, sz2) \ @@ -3920,25 +3945,27 @@ decoded_size(byte *ep, byte* endp, int internal_tags) Uint sz = (sz1) + (sz2); \ if (sz1 < sz && (sz) <= endp-ep) { \ ep += (sz); \ - } else { return -1; } \ + } else { goto error; } \ } while (0) #define CHKSIZE(sz) \ do { \ - if ((sz) > endp-ep) { return -1; } \ + if ((sz) > endp-ep) { goto error; } \ } while (0) #define ADDTERMS(n) \ do { \ int before = terms; \ terms += (n); \ - if (terms < before) return -1; \ + if (terms < before) goto error; \ } while (0) - - for (terms=1; terms > 0; terms--) { - int tag; - + ASSERT(terms > 0); + do { + int tag; + /*SVERK + erts_fprintf(stderr, "SVERK tag=%d ep=%p terms=%d heap_size=%d endp=%p\n", + ep[0], ep, terms, heap_size, endp); */ CHKSIZE(1); tag = ep++[0]; switch (tag) { @@ -3959,7 +3986,7 @@ decoded_size(byte *ep, byte* endp, int internal_tags) CHKSIZE(4); n = get_int32(ep); if (n > BIG_ARITY_MAX*sizeof(ErtsDigit)) { - return -1; + goto error; } SKIP2(n,4+1); /* skip, size,sign,digits */ heap_size += 1+1+(n+sizeof(Eterm)-1)/sizeof(Eterm); /* XXX: 1 too much? */ @@ -3968,7 +3995,7 @@ decoded_size(byte *ep, byte* endp, int internal_tags) CHKSIZE(2); n = get_int16(ep); if (n > MAX_ATOM_CHARACTERS) { - return -1; + goto error; } SKIP(n+2+atom_extra_skip); atom_extra_skip = 0; @@ -3978,7 +4005,7 @@ decoded_size(byte *ep, byte* endp, int internal_tags) n = get_int16(ep); ep += 2; if (n > MAX_ATOM_SZ_LIMIT) { - return -1; + goto error; } SKIP(n+atom_extra_skip); atom_extra_skip = 0; @@ -3987,7 +4014,7 @@ decoded_size(byte *ep, byte* endp, int internal_tags) CHKSIZE(1); n = get_int8(ep); if (n > MAX_ATOM_CHARACTERS) { - return -1; + goto error; } SKIP(n+1+atom_extra_skip); atom_extra_skip = 0; @@ -3997,7 +4024,7 @@ decoded_size(byte *ep, byte* endp, int internal_tags) n = get_int8(ep); ep++; if (n > MAX_ATOM_SZ_LIMIT) { - return -1; + goto error; } SKIP(n+atom_extra_skip); atom_extra_skip = 0; @@ -4026,7 +4053,7 @@ decoded_size(byte *ep, byte* endp, int internal_tags) id_words = get_int16(ep); if (id_words > ERTS_MAX_REF_NUMBERS) - return -1; + goto error; ep += 2; atom_extra_skip = 1 + 4*id_words; @@ -4128,7 +4155,7 @@ decoded_size(byte *ep, byte* endp, int internal_tags) num_free = get_int32(ep); ep += 4; if (num_free > MAX_ARG) { - return -1; + goto error; } terms += 4 + num_free; heap_size += ERL_FUN_SIZE + num_free; @@ -4145,24 +4172,50 @@ decoded_size(byte *ep, byte* endp, int internal_tags) case BINARY_INTERNAL_REF: if (!internal_tags) { - return -1; + goto error; } SKIP(sizeof(ProcBin)); heap_size += PROC_BIN_SIZE; break; case BIT_BINARY_INTERNAL_REF: if (!internal_tags) { - return -1; + goto error; } SKIP(2+sizeof(ProcBin)); heap_size += PROC_BIN_SIZE + ERL_SUB_BIN_SIZE; break; default: - return -1; + goto error; } - } + terms--; + + if (--reds <= 0) { + if (ctx && terms > 0) { + ctx->u.sc.heap_size = heap_size; + ctx->u.sc.terms = terms; + ctx->u.sc.ep = ep; + ctx->u.sc.atom_extra_skip = atom_extra_skip; + ctx->reds = 0; + return 0; + } + reds = ERTS_SWORD_MAX; + } + }while (terms > 0); + /* 'terms' may be non-zero if it has wrapped around */ - return terms==0 ? heap_size : -1; + if (terms == 0) { + if (ctx) { + ctx->state = B2TDecodeInit; + ctx->reds = reds; + } + return heap_size; + } + +error: + if (ctx) { + ctx->state = B2TBadArg; + } + return -1; #undef SKIP #undef SKIP2 #undef CHKSIZE -- cgit v1.2.3 From f10ea68ce28e9b93ce614b5f829b1ca7f4cc753f Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Mon, 14 Oct 2013 17:46:28 +0200 Subject: trapping uncompress --- erts/emulator/beam/erl_zlib.c | 40 ++++++++++++ erts/emulator/beam/erl_zlib.h | 6 ++ erts/emulator/beam/external.c | 146 ++++++++++++++++++++++++++++-------------- 3 files changed, 144 insertions(+), 48 deletions(-) (limited to 'erts') diff --git a/erts/emulator/beam/erl_zlib.c b/erts/emulator/beam/erl_zlib.c index 47fd92988e..8e33144f96 100644 --- a/erts/emulator/beam/erl_zlib.c +++ b/erts/emulator/beam/erl_zlib.c @@ -87,6 +87,46 @@ int ZEXPORT erl_zlib_deflate_finish(z_stream *streamp) return deflateEnd(streamp); } +int ZEXPORT erl_zlib_inflate_start(z_stream *streamp, const Bytef* source, + uLong sourceLen) +{ + streamp->next_in = (Bytef*)source; + streamp->avail_in = (uInt)sourceLen; + streamp->total_out = streamp->avail_out = 0; + streamp->next_out = NULL; + erl_zlib_alloc_init(streamp); + return inflateInit(streamp); +} +/* + * Inflate a chunk, The destination length is the limit. + * Returns Z_OK if more to process, Z_STREAM_END if we are done. + */ +int ZEXPORT erl_zlib_inflate_chunk(z_stream *streamp, Bytef* dest, uLongf* destLen) +{ + int err; + uLongf last_tot = streamp->total_out; + + streamp->next_out = dest; + streamp->avail_out = (uInt)*destLen; + + if ((uLong)streamp->avail_out != *destLen) return Z_BUF_ERROR; + + err = inflate(streamp, Z_NO_FLUSH); + ASSERT(err != Z_STREAM_ERROR); + *destLen = streamp->total_out - last_tot; + return err; +} + +/* + * When we are done, free up the inflate structure + * Retyurns Z_OK or Error + */ +int ZEXPORT erl_zlib_inflate_finish(z_stream *streamp) +{ + return inflateEnd(streamp); +} + + int ZEXPORT erl_zlib_compress2 (Bytef* dest, uLongf* destLen, const Bytef* source, uLong sourceLen, int level) diff --git a/erts/emulator/beam/erl_zlib.h b/erts/emulator/beam/erl_zlib.h index 5ac849d21c..160166c66b 100644 --- a/erts/emulator/beam/erl_zlib.h +++ b/erts/emulator/beam/erl_zlib.h @@ -39,6 +39,12 @@ int ZEXPORT erl_zlib_deflate_start(z_stream *streamp, const Bytef* source, int ZEXPORT erl_zlib_deflate_chunk(z_stream *streamp, Bytef* dest, uLongf* destLen); int ZEXPORT erl_zlib_deflate_finish(z_stream *streamp); +int ZEXPORT erl_zlib_inflate_start(z_stream *streamp, const Bytef* source, + uLong sourceLen); +int ZEXPORT erl_zlib_inflate_chunk(z_stream *streamp, Bytef* dest, uLongf* destLen); +int ZEXPORT erl_zlib_inflate_finish(z_stream *streamp); + + /* Use instead of compress */ #define erl_zlib_compress(dest,destLen,source,sourceLen) \ diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index d4ffc23536..0d2de1b199 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -1127,6 +1127,8 @@ BIF_RETTYPE term_to_binary_2(BIF_ALIST_2) enum B2TState { /* order is somewhat significant */ B2TPrepare, + B2TUncompressChunk, + B2TSizeInit, B2TSize, B2TDecodeInit, @@ -1161,12 +1163,9 @@ typedef struct { } B2TDecodeContext; typedef struct { - /*Uint real_size; - Uint dest_len; - byte *dbytes; - Binary *result_bin; - Binary *destination_bin; - z_stream stream;*/ + z_stream stream; + byte* dbytes; + Uint dleft; } B2TUncompressContext; typedef struct B2TContext_t { @@ -1215,7 +1214,8 @@ static uLongf binary2term_uncomp_size(byte* data, Sint size) } static ERTS_INLINE int -binary2term_prepare(ErtsBinary2TermState *state, byte *data, Sint data_size) +binary2term_prepare(ErtsBinary2TermState *state, byte *data, Sint data_size, + B2TContext* ctx) { byte *bytes = data; Sint size = data_size; @@ -1229,6 +1229,8 @@ binary2term_prepare(ErtsBinary2TermState *state, byte *data, Sint data_size) size--; if (size < 5 || *bytes != COMPRESSED) { state->extp = bytes; + if (ctx) + ctx->state = B2TSizeInit; } else { uLongf dest_len = (Uint32) get_int32(bytes+1); @@ -1236,14 +1238,33 @@ binary2term_prepare(ErtsBinary2TermState *state, byte *data, Sint data_size) size -= 5; if (dest_len > 32*1024*1024 || (state->extp = erts_alloc_fnf(ERTS_ALC_T_EXT_TERM_DATA, dest_len)) == NULL) { + /* + * Try avoid out-of-memory crash due to corrupted 'dest_len' + * by checking the actual length of the uncompressed data. + * The only way to do that is to uncompress it. Sad but true. + */ if (dest_len != binary2term_uncomp_size(bytes, size)) { return -1; } state->extp = erts_alloc(ERTS_ALC_T_EXT_TERM_DATA, dest_len); + ctx->reds -= dest_len; } state->exttmp = 1; - if (erl_zlib_uncompress(state->extp, &dest_len, bytes, size) != Z_OK) - return -1; + if (ctx) { + if (erl_zlib_inflate_start(&ctx->u.uc.stream, bytes, size) != Z_OK) + return -1; + + ctx->u.uc.dbytes = state->extp; + ctx->u.uc.dleft = dest_len; + ctx->state = B2TUncompressChunk; + } + else { + uLongf dlen = dest_len; + if (erl_zlib_uncompress(state->extp, &dlen, bytes, size) != Z_OK + || dlen != dest_len) { + return -1; + } + } size = (Sint) dest_len; } state->extsize = size; @@ -1277,7 +1298,7 @@ erts_binary2term_prepare(ErtsBinary2TermState *state, byte *data, Sint data_size { Sint res; - if (binary2term_prepare(state, data, data_size) < 0 || + if (binary2term_prepare(state, data, data_size, NULL) < 0 || (res=decoded_size(state->extp, state->extp + state->extsize, 0, NULL)) < 0) { if (state->exttmp) @@ -1307,6 +1328,9 @@ static void b2t_destroy_context(B2TContext* context) ERTS_ALC_T_EXT_TERM_DATA); context->aligned_alloc = NULL; binary2term_abort(&context->b2ts); + if (context->state == B2TUncompressChunk) { + erl_zlib_inflate_finish(&context->u.uc.stream); + } } static void b2t_context_destructor(Binary *context_bin) @@ -1359,7 +1383,8 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con do { switch (ctx->state) { - case B2TPrepare: + case B2TPrepare: { + Uint bin_size; bytes = erts_get_aligned_binary_bytes_extra(bin, &ctx->aligned_alloc, ERTS_ALC_T_EXT_TERM_DATA, @@ -1369,24 +1394,48 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con ctx->state = B2TBadArg; break; } - - ctx->heap_size = binary2term_prepare(&ctx->b2ts, bytes, - binary_size(bin)); - if (ctx->heap_size < 0) { - ctx->state = B2TBadArg; - break; + bin_size = binary_size(bin); + if (ctx->aligned_alloc) { + ctx->reds -= bin_size / 8; } - - ctx->reds = 0; /*SVERK*/ - ctx->u.sc.heap_size = 0; - ctx->u.sc.terms = 1; - ctx->u.sc.ep = ctx->b2ts.extp; - ctx->u.sc.atom_extra_skip = 0; - ctx->state = B2TSize; + if (binary2term_prepare(&ctx->b2ts, bytes, bin_size, ctx) < 0) { + ctx->state = B2TBadArg; + } break; + } + case B2TUncompressChunk: + { + Uint chunk = ctx->reds; + int zret; + if (chunk > ctx->u.uc.dleft) + chunk = ctx->u.uc.dleft; + + zret = erl_zlib_inflate_chunk(&ctx->u.uc.stream, + ctx->u.uc.dbytes, &chunk); + ctx->u.uc.dbytes += chunk; + ctx->u.uc.dleft -= chunk; + if (zret == Z_OK && ctx->u.uc.dleft > 0) { + ctx->reds = 0; + } + else if (erl_zlib_inflate_finish(&ctx->u.uc.stream) == Z_OK + && zret == Z_STREAM_END + && ctx->u.uc.dleft == 0) { + ctx->reds -= chunk; + ctx->state = B2TSizeInit; + } + else { + ctx->state = B2TBadArg; + } + break; + } + case B2TSizeInit: + ctx->u.sc.ep = NULL; + ctx->state = B2TSize; + /*fall through*/ case B2TSize: - ctx->heap_size = decoded_size(NULL, ctx->b2ts.extp + ctx->b2ts.extsize, + ctx->heap_size = decoded_size(ctx->b2ts.extp, + ctx->b2ts.extp + ctx->b2ts.extsize, 0, ctx); break; @@ -1455,7 +1504,9 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con b2t_context_destructor); ctx = ERTS_MAGIC_BIN_DATA(context_b); sys_memcpy(ctx, &c_buff, sizeof(B2TContext)); - + if (ctx->state >= B2TDecode && ctx->u.dc.next == &c_buff.u.dc.res) { + ctx->u.dc.next = &ctx->u.dc.res; + } if (!magic_space) { ASSERT(ctx->state < B2TDecode); magic_space = HAlloc(p, PROC_BIN_SIZE); @@ -3914,25 +3965,27 @@ encode_size_struct_int(Process *p, ErtsAtomCacheMap *acmp, Eterm obj, static Sint decoded_size(byte *ep, byte* endp, int internal_tags, B2TContext* ctx) { - int heap_size = 0; + int heap_size; int terms; - int atom_extra_skip = 0; + int atom_extra_skip; Uint n; SWord reds; if (ctx) { - heap_size = ctx->u.sc.heap_size; - terms = ctx->u.sc.terms; - ep = ctx->u.sc.ep; - atom_extra_skip = ctx->u.sc.atom_extra_skip; reds = ctx->reds; + if (ctx->u.sc.ep) { + heap_size = ctx->u.sc.heap_size; + terms = ctx->u.sc.terms; + ep = ctx->u.sc.ep; + atom_extra_skip = ctx->u.sc.atom_extra_skip; + goto init_done; + } } - else { - heap_size = 0; - terms = 1; - atom_extra_skip = 0; - reds = ERTS_SWORD_MAX; - } + heap_size = 0; + terms = 1; + atom_extra_skip = 0; +init_done: + #define SKIP(sz) \ do { \ if ((sz) <= endp-ep) { \ @@ -4189,16 +4242,13 @@ decoded_size(byte *ep, byte* endp, int internal_tags, B2TContext* ctx) } terms--; - if (--reds <= 0) { - if (ctx && terms > 0) { - ctx->u.sc.heap_size = heap_size; - ctx->u.sc.terms = terms; - ctx->u.sc.ep = ep; - ctx->u.sc.atom_extra_skip = atom_extra_skip; - ctx->reds = 0; - return 0; - } - reds = ERTS_SWORD_MAX; + if (ctx && --reds <= 0 && terms > 0) { + ctx->u.sc.heap_size = heap_size; + ctx->u.sc.terms = terms; + ctx->u.sc.ep = ep; + ctx->u.sc.atom_extra_skip = atom_extra_skip; + ctx->reds = 0; + return 0; } }while (terms > 0); -- cgit v1.2.3 From d5b6c6f0bd96108d788cdfb9be15059125b3d87f Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 16 Oct 2013 16:28:21 +0200 Subject: erts: Add erlang wrappers to binary_to_term to not expose the trapping BIF in the stacktrace when it throws badarg. --- erts/emulator/beam/bif.tab | 8 ++------ erts/emulator/beam/external.c | 6 +++--- erts/preloaded/ebin/erlang.beam | Bin 97500 -> 97740 bytes erts/preloaded/ebin/erts_internal.beam | Bin 3780 -> 4084 bytes erts/preloaded/src/erlang.erl | 18 ++++++++++++++---- erts/preloaded/src/erts_internal.erl | 12 +++++++++++- 6 files changed, 30 insertions(+), 14 deletions(-) (limited to 'erts') diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index 3b7bb56885..9f3de5f780 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -45,7 +45,6 @@ bif erlang:apply/3 bif erlang:atom_to_list/1 bif erlang:binary_to_list/1 bif erlang:binary_to_list/3 -bif erlang:binary_to_term/1 bif erlang:crc32/1 bif erlang:crc32/2 bif erlang:crc32_combine/3 @@ -152,6 +151,8 @@ bif erts_internal:port_command/3 bif erts_internal:port_control/3 bif erts_internal:port_close/1 bif erts_internal:port_connect/2 +bif erts_internal:binary_to_term/1 +bif erts_internal:binary_to_term/2 bif erts_internal:request_system_task/3 bif erts_internal:check_process_code/2 @@ -478,11 +479,6 @@ bif erlang:load_nif/2 bif erlang:call_on_load_function/1 bif erlang:finish_after_on_load/2 -# -# New Bifs in R13B4 -# -bif erlang:binary_to_term/2 - # # The binary match bifs (New in R14A - EEP9) # diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 0d2de1b199..e3e199b198 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -1476,7 +1476,7 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con erts_set_gc_state(p, 1); } BUMP_REDS(p, (initial_reds - ctx->reds) / B2T_BYTES_PER_REDUCTION); - BIF_ERROR(p, BADARG); + BIF_ERROR(p, BADARG & ~EXF_SAVETRACE); case B2TDone: b2t_destroy_context(ctx); @@ -1522,7 +1522,7 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con BIF_TRAP1(&binary_to_term_trap_export, p, ctx->trap_bin); } -BIF_RETTYPE binary_to_term_1(BIF_ALIST_1) +BIF_RETTYPE erts_internal_binary_to_term_1(BIF_ALIST_1) { /*SVERK if (++sverk_cnt % 1000 == 0) { erts_fprintf(stderr, "Call #%u to binary_to_term_int()\n", sverk_cnt); @@ -1534,7 +1534,7 @@ BIF_RETTYPE binary_to_term_1(BIF_ALIST_1) return binary_to_term_int(BIF_P, 0, BIF_ARG_1, NULL); } -BIF_RETTYPE binary_to_term_2(BIF_ALIST_2) +BIF_RETTYPE erts_internal_binary_to_term_2(BIF_ALIST_2) { Eterm opts; Eterm opt; diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam index 159c91e37c..63fa535427 100644 Binary files a/erts/preloaded/ebin/erlang.beam and b/erts/preloaded/ebin/erlang.beam differ diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam index 9d8a4cfd00..6c7dcff420 100644 Binary files a/erts/preloaded/ebin/erts_internal.beam and b/erts/preloaded/ebin/erts_internal.beam differ diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index e521c6fc3d..7b59f6c8b4 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -362,15 +362,25 @@ binary_to_list(_Binary, _Start, _Stop) -> %% binary_to_term/1 -spec binary_to_term(Binary) -> term() when Binary :: ext_binary(). -binary_to_term(_Binary) -> - erlang:nif_error(undefined). +binary_to_term(Binary) -> + %% This BIF may throw badarg while trapping + try + erts_internal:binary_to_term(Binary) + catch + error:Reason -> erlang:error(Reason,[Binary]) + end. %% binary_to_term/2 -spec binary_to_term(Binary, Opts) -> term() when Binary :: ext_binary(), Opts :: [safe]. -binary_to_term(_Binary, _Opts) -> - erlang:nif_error(undefined). +binary_to_term(Binary, Opts) -> + %% This BIF may throw badarg while trapping + try + erts_internal:binary_to_term(Binary,Opts) + catch + error:Reason -> erlang:error(Reason,[Binary,Opts]) + end. %% bit_size/1 %% Shadowed by erl_bif_types: erlang:bit_size/1 diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index c8e8e7e069..d6a185482e 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -29,7 +29,7 @@ -module(erts_internal). -export([await_port_send_result/3]). - +-export([binary_to_term/1, binary_to_term/2]). -export([port_command/3, port_connect/2, port_close/1, port_control/3, port_call/3, port_info/1, port_info/2]). @@ -160,3 +160,13 @@ request_system_task(_Pid, _Prio, _Request) -> check_process_code(_Module, _OptionList) -> erlang:nif_error(undefined). +-spec binary_to_term(Binary) -> term() when + Binary :: binary(). +binary_to_term(_Binary) -> + erlang:nif_error(undefined). + +-spec binary_to_term(Binary, Opts) -> term() when + Binary :: binary(), + Opts :: [safe]. +binary_to_term(_Binary, _Opts) -> + erlang:nif_error(undefined). -- cgit v1.2.3 From abbc9c2755396d4050db8f15e02c21032a528049 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 16 Oct 2013 16:28:40 +0200 Subject: erts: Cleanup code for trapping binary_to_term --- erts/emulator/beam/external.c | 103 ++++++++++++++++++------------------------ 1 file changed, 45 insertions(+), 58 deletions(-) (limited to 'erts') diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index e3e199b198..651aac03a2 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -1131,7 +1131,6 @@ enum B2TState { /* order is somewhat significant */ B2TSizeInit, B2TSize, B2TDecodeInit, - B2TDecode, B2TDecodeList, B2TDecodeTuple, @@ -1157,9 +1156,6 @@ typedef struct { Eterm* hp; Eterm* hp_end; int remaining_n; -#ifdef DEBUG - Eterm* container_end; -#endif } B2TDecodeContext; typedef struct { @@ -1352,20 +1348,28 @@ static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1) #define B2T_BYTES_PER_REDUCTION 128 -static unsigned sverk_rand(void) +/* Define for testing */ +/*#define EXTREME_B2T_TRAPPING 1*/ + +#ifdef EXTREME_B2T_TRAPPING +static unsigned b2t_rand(void) { static unsigned prev = 17; prev = (prev * 214013 + 2531011); return prev; } +#endif + static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* context_b) { - SWord initial_reds = 1 + sverk_rand() % 4; /*(Uint)(ERTS_BIF_REDS_LEFT(p) * B2T_BYTES_PER_REDUCTION);*/ - byte* bytes; +#ifdef EXTREME_B2T_TRAPPING + SWord initial_reds = 1 + b2t_rand() % 4; +#else + SWord initial_reds = (Uint)(ERTS_BIF_REDS_LEFT(p) * B2T_BYTES_PER_REDUCTION); +#endif B2TContext c_buff; B2TContext *ctx; - ErtsDistExternal fakedep; Eterm* magic_space = NULL; if (context_b == NULL) { @@ -1384,6 +1388,7 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con do { switch (ctx->state) { case B2TPrepare: { + byte* bytes; Uint bin_size; bytes = erts_get_aligned_binary_bytes_extra(bin, &ctx->aligned_alloc, @@ -1403,32 +1408,30 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con } break; } - case B2TUncompressChunk: - { - Uint chunk = ctx->reds; - int zret; - if (chunk > ctx->u.uc.dleft) - chunk = ctx->u.uc.dleft; - - zret = erl_zlib_inflate_chunk(&ctx->u.uc.stream, - ctx->u.uc.dbytes, &chunk); - ctx->u.uc.dbytes += chunk; - ctx->u.uc.dleft -= chunk; - if (zret == Z_OK && ctx->u.uc.dleft > 0) { - ctx->reds = 0; - } - else if (erl_zlib_inflate_finish(&ctx->u.uc.stream) == Z_OK - && zret == Z_STREAM_END - && ctx->u.uc.dleft == 0) { - ctx->reds -= chunk; - ctx->state = B2TSizeInit; - } - else { - ctx->state = B2TBadArg; - } - break; - } - + case B2TUncompressChunk: { + Uint chunk = ctx->reds; + int zret; + + if (chunk > ctx->u.uc.dleft) + chunk = ctx->u.uc.dleft; + zret = erl_zlib_inflate_chunk(&ctx->u.uc.stream, + ctx->u.uc.dbytes, &chunk); + ctx->u.uc.dbytes += chunk; + ctx->u.uc.dleft -= chunk; + if (zret == Z_OK && ctx->u.uc.dleft > 0) { + ctx->reds = 0; + } + else if (erl_zlib_inflate_finish(&ctx->u.uc.stream) == Z_OK + && zret == Z_STREAM_END + && ctx->u.uc.dleft == 0) { + ctx->reds -= chunk; + ctx->state = B2TSizeInit; + } + else { + ctx->state = B2TBadArg; + } + break; + } case B2TSizeInit: ctx->u.sc.ep = NULL; ctx->state = B2TSize; @@ -1441,11 +1444,7 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con case B2TDecodeInit: if (context_b == NULL && ctx->b2ts.extsize > ctx->reds) { - /*SVERK Can we do a better prediction that is still safe - OR is there a way to do HAlloc after result some how... - ... support for mutiple HReleases maybe? - */ - /* dec_term will probably trap, allocate space for magic bin + /* dec_term will maybe trap, allocate space for magic bin before result term to make it easy to trim with HRelease. */ magic_space = HAlloc(p, PROC_BIN_SIZE); @@ -1462,11 +1461,12 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con case B2TDecode: case B2TDecodeList: case B2TDecodeTuple: - case B2TDecodeString: + case B2TDecodeString: { + ErtsDistExternal fakedep; fakedep.flags = ctx->flags; dec_term(&fakedep, NULL, NULL, &MSO(p), NULL, ctx); break; - + } case B2TDecodeFail: HRelease(p, ctx->u.dc.hp_end, ctx->u.dc.hp_start); /*fall through*/ @@ -1495,7 +1495,7 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con return ctx->u.dc.res; } - }while (ctx->reds || ctx->state >= B2TDone); + }while (ctx->reds > 0 || ctx->state >= B2TDone); if (context_b == NULL) { ASSERT(ctx->trap_bin == THE_NON_VALUE); @@ -1524,13 +1524,6 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con BIF_RETTYPE erts_internal_binary_to_term_1(BIF_ALIST_1) { -/*SVERK if (++sverk_cnt % 1000 == 0) { - erts_fprintf(stderr, "Call #%u to binary_to_term_int()\n", sverk_cnt); - } - if (sverk_cnt == 1767) { - sverk_break(); - } -*/ return binary_to_term_int(BIF_P, 0, BIF_ARG_1, NULL); } @@ -2879,7 +2872,6 @@ dec_term(ErtsDistExternal *edep, Eterm** hpp, byte* ep, ErlOffHeap* off_heap, objp -= 2; n--; } - ASSERT(ctx->u.dc.remaining_n || next == ctx->u.dc.container_end); break; case B2TDecodeTuple: @@ -2889,7 +2881,6 @@ dec_term(ErtsDistExternal *edep, Eterm** hpp, byte* ep, ErlOffHeap* off_heap, next = objp; objp--; } - ASSERT(ctx->u.dc.remaining_n || next == ctx->u.dc.container_end); break; case B2TDecodeString: @@ -2902,7 +2893,6 @@ dec_term(ErtsDistExternal *edep, Eterm** hpp, byte* ep, ErlOffHeap* off_heap, } hp[-1] = NIL; *hpp = hp; - ASSERT(ctx->u.dc.remaining_n || hp == ctx->u.dc.container_end); break; default: @@ -3051,7 +3041,6 @@ dec_term_atom_common: objp = hp - 1; if (ctx) { if (reds < n) { - IF_DEBUG(ctx->u.dc.container_end = hp - n;) ASSERT(reds > 0); ctx->state = B2TDecodeTuple; ctx->u.dc.remaining_n = n - reds; @@ -3085,7 +3074,6 @@ dec_term_atom_common: n--; if (ctx) { if (reds < n) { - IF_DEBUG(ctx->u.dc.container_end = hp - 2*(n+1);) ctx->state = B2TDecodeList; ctx->u.dc.remaining_n = n - reds; n = reds; @@ -3110,7 +3098,6 @@ dec_term_atom_common: *objp = make_list(hp); if (ctx) { if (reds < n) { - IF_DEBUG(ctx->u.dc.container_end = hp + n*2;) ctx->state = B2TDecodeString; ctx->u.dc.remaining_n = n - reds; n = reds; @@ -3981,6 +3968,9 @@ decoded_size(byte *ep, byte* endp, int internal_tags, B2TContext* ctx) goto init_done; } } + else + reds = 0; /* not used but compiler warns anyway */ + heap_size = 0; terms = 1; atom_extra_skip = 0; @@ -4016,9 +4006,6 @@ init_done: ASSERT(terms > 0); do { int tag; - /*SVERK - erts_fprintf(stderr, "SVERK tag=%d ep=%p terms=%d heap_size=%d endp=%p\n", - ep[0], ep, terms, heap_size, endp); */ CHKSIZE(1); tag = ep++[0]; switch (tag) { -- cgit v1.2.3 From bcb227d1f467439fa5f901233e8ad382e75373cb Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Mon, 4 Nov 2013 17:53:43 +0100 Subject: erts: Trapping memcpy in binary_to_term --- erts/emulator/beam/external.c | 73 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 11 deletions(-) (limited to 'erts') diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 651aac03a2..a73734d487 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -1135,6 +1135,7 @@ enum B2TState { /* order is somewhat significant */ B2TDecodeList, B2TDecodeTuple, B2TDecodeString, + B2TDecodeBinary, B2TDone, B2TDecodeFail, @@ -1156,6 +1157,7 @@ typedef struct { Eterm* hp; Eterm* hp_end; int remaining_n; + char* remaining_bytes; } B2TDecodeContext; typedef struct { @@ -1347,6 +1349,7 @@ static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1) #define B2T_BYTES_PER_REDUCTION 128 +#define B2T_MEMCPY_FACTOR 8 /* Define for testing */ /*#define EXTREME_B2T_TRAPPING 1*/ @@ -1461,7 +1464,8 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con case B2TDecode: case B2TDecodeList: case B2TDecodeTuple: - case B2TDecodeString: { + case B2TDecodeString: + case B2TDecodeBinary: { ErtsDistExternal fakedep; fakedep.flags = ctx->flags; dec_term(&fakedep, NULL, NULL, &MSO(p), NULL, ctx); @@ -1494,6 +1498,8 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con BUMP_REDS(p, (initial_reds - ctx->reds) / B2T_BYTES_PER_REDUCTION); return ctx->u.dc.res; + default: + ASSERT(!"Unknown state in binary_to_term"); } }while (ctx->reds > 0 || ctx->state >= B2TDone); @@ -2830,6 +2836,7 @@ undo_offheap_in_area(ErlOffHeap* off_heap, Eterm* start, Eterm* end) #endif /* DEBUG */ } + /* Decode term from external format into *objp. ** On failure return NULL and (R13B04) *hpp will be unchanged. */ @@ -2852,15 +2859,25 @@ dec_term(ErtsDistExternal *edep, Eterm** hpp, byte* ep, ErlOffHeap* off_heap, hpp = &ctx->u.dc.hp; if (ctx->state != B2TDecode) { - n = ctx->u.dc.remaining_n; - if (reds < n) { - ctx->u.dc.remaining_n -= reds; - n = reds; + int n_limit = reds; + + n = ctx->u.dc.remaining_n; + if (ctx->state == B2TDecodeBinary) { + n_limit *= B2T_MEMCPY_FACTOR; + ASSERT(n_limit >= reds); + reds -= n / B2T_MEMCPY_FACTOR; + } + else + reds -= n; + + if (n > n_limit) { + ctx->u.dc.remaining_n -= n_limit; + n = n_limit; + reds = 0; } else { ctx->u.dc.remaining_n = 0; } - reds -= n; switch (ctx->state) { case B2TDecodeList: @@ -2895,6 +2912,12 @@ dec_term(ErtsDistExternal *edep, Eterm** hpp, byte* ep, ErlOffHeap* off_heap, *hpp = hp; break; + case B2TDecodeBinary: + sys_memcpy(ctx->u.dc.remaining_bytes, ep, n); + ctx->u.dc.remaining_bytes += n; + ep += n; + break; + default: ASSERT(!"Unknown state"); } @@ -3311,7 +3334,6 @@ dec_term_atom_common: dbin->flags = 0; dbin->orig_size = n; erts_refc_init(&dbin->refc, 1); - sys_memcpy(dbin->orig_bytes, ep, n); pb = (ProcBin *) hp; hp += PROC_BIN_SIZE; pb->thing_word = HEADER_PROC_BIN; @@ -3322,7 +3344,20 @@ dec_term_atom_common: pb->bytes = (byte*) dbin->orig_bytes; pb->flags = 0; *objp = make_binary(pb); - } + if (ctx) { + int n_limit = reds * B2T_MEMCPY_FACTOR; + if (n > n_limit) { + ctx->state = B2TDecodeBinary; + ctx->u.dc.remaining_n = n - n_limit; + ctx->u.dc.remaining_bytes = dbin->orig_bytes + n_limit; + n = n_limit; + reds = 0; + } + else + reds -= n / B2T_MEMCPY_FACTOR; + } + sys_memcpy(dbin->orig_bytes, ep, n); + } ep += n; break; } @@ -3343,13 +3378,14 @@ dec_term_atom_common: sys_memcpy(hb->data, ep, n); bin = make_binary(hb); hp += heap_bin_size(n); + ep += n; } else { Binary* dbin = erts_bin_nrml_alloc(n); ProcBin* pb; + dbin->flags = 0; dbin->orig_size = n; erts_refc_init(&dbin->refc, 1); - sys_memcpy(dbin->orig_bytes, ep, n); pb = (ProcBin *) hp; pb->thing_word = HEADER_PROC_BIN; pb->size = n; @@ -3360,8 +3396,23 @@ dec_term_atom_common: pb->flags = 0; bin = make_binary(pb); hp += PROC_BIN_SIZE; - } - ep += n; + if (ctx) { + int n_limit = reds * B2T_MEMCPY_FACTOR; + if (n > n_limit) { + ctx->state = B2TDecodeBinary; + ctx->u.dc.remaining_n = n - n_limit; + ctx->u.dc.remaining_bytes = dbin->orig_bytes + n_limit; + n = n_limit; + reds = 0; + } + else + reds -= n / B2T_MEMCPY_FACTOR; + } + sys_memcpy(dbin->orig_bytes, ep, n); + ep += n; + n = pb->size; + } + if (bitsize == 0) { *objp = bin; } else { -- cgit v1.2.3 From adcdcb2c7985ad6076a2bedebdfce08d14465b43 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 5 Nov 2013 18:44:39 +0100 Subject: erts: Fix crash when binary_to_term throws badarg after it has built off_heap data and then done at least one trap call. The undo mechanism in dec_term does not work if we build the magic binary after any other off_heap data. --- erts/emulator/beam/external.c | 51 +++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 24 deletions(-) (limited to 'erts') diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index a73734d487..6a7d15cc88 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -1364,6 +1364,21 @@ static unsigned b2t_rand(void) #endif +static B2TContext* b2t_export_context(Process* p, B2TContext* src) +{ + Binary* context_b = erts_create_magic_binary(sizeof(B2TContext), + b2t_context_destructor); + B2TContext* ctx = ERTS_MAGIC_BIN_DATA(context_b); + Eterm* hp; + sys_memcpy(ctx, src, sizeof(B2TContext)); + if (ctx->state >= B2TDecode && ctx->u.dc.next == &src->u.dc.res) { + ctx->u.dc.next = &ctx->u.dc.res; + } + hp = HAlloc(p, PROC_BIN_SIZE); + ctx->trap_bin = erts_mk_magic_binary_term(&hp, &MSO(p), context_b); + return ctx; +} + static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* context_b) { #ifdef EXTREME_B2T_TRAPPING @@ -1373,16 +1388,18 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con #endif B2TContext c_buff; B2TContext *ctx; - Eterm* magic_space = NULL; + int is_first_call; if (context_b == NULL) { /* Setup enough to get started */ + is_first_call = 1; ctx = &c_buff; ctx->state = B2TPrepare; ctx->aligned_alloc = NULL; ctx->flags = flags; IF_DEBUG(ctx->trap_bin = THE_NON_VALUE;) } else { + is_first_call = 0; ctx = ERTS_MAGIC_BIN_DATA(context_b); ASSERT(ctx->state != B2TPrepare); } @@ -1446,12 +1463,11 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con break; case B2TDecodeInit: - if (context_b == NULL && ctx->b2ts.extsize > ctx->reds) { + if (ctx == &c_buff && ctx->b2ts.extsize > ctx->reds) { /* dec_term will maybe trap, allocate space for magic bin before result term to make it easy to trim with HRelease. */ - magic_space = HAlloc(p, PROC_BIN_SIZE); - *magic_space = make_pos_bignum_header(PROC_BIN_SIZE-1); + ctx = b2t_export_context(p, &c_buff); } ctx->u.dc.ep = ctx->b2ts.extp; ctx->u.dc.res = (Eterm) (UWord) NULL; @@ -1476,7 +1492,7 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con /*fall through*/ case B2TBadArg: b2t_destroy_context(ctx); - if (context_b) { + if (!is_first_call) { erts_set_gc_state(p, 1); } BUMP_REDS(p, (initial_reds - ctx->reds) / B2T_BYTES_PER_REDUCTION); @@ -1491,10 +1507,9 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con } HRelease(p, ctx->u.dc.hp_end, ctx->u.dc.hp); - if (context_b) { + if (!is_first_call) { erts_set_gc_state(p, 1); } - BUMP_REDS(p, (initial_reds - ctx->reds) / B2T_BYTES_PER_REDUCTION); return ctx->u.dc.res; @@ -1503,27 +1518,15 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con } }while (ctx->reds > 0 || ctx->state >= B2TDone); - if (context_b == NULL) { + if (ctx == &c_buff) { ASSERT(ctx->trap_bin == THE_NON_VALUE); - - context_b = erts_create_magic_binary(sizeof(B2TContext), - b2t_context_destructor); - ctx = ERTS_MAGIC_BIN_DATA(context_b); - sys_memcpy(ctx, &c_buff, sizeof(B2TContext)); - if (ctx->state >= B2TDecode && ctx->u.dc.next == &c_buff.u.dc.res) { - ctx->u.dc.next = &ctx->u.dc.res; - } - if (!magic_space) { - ASSERT(ctx->state < B2TDecode); - magic_space = HAlloc(p, PROC_BIN_SIZE); - } - ctx->trap_bin = erts_mk_magic_binary_term(&magic_space, &MSO(p), context_b); - - erts_set_gc_state(p, 0); + ctx = b2t_export_context(p, &c_buff); } - ASSERT(context_b); ASSERT(ctx->trap_bin != THE_NON_VALUE); + if (is_first_call) { + erts_set_gc_state(p, 0); + } BUMP_ALL_REDS(p); BIF_TRAP1(&binary_to_term_trap_export, p, ctx->trap_bin); } -- cgit v1.2.3 From 53c26db17f6bf89783e0dd48f7777cfeff18f756 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 5 Nov 2013 21:59:53 +0100 Subject: erts: Fix bug in binary_to_term for compressed on halfword --- erts/emulator/beam/external.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'erts') diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 6a7d15cc88..28c6caf5fe 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -1429,7 +1429,7 @@ static Eterm binary_to_term_int(Process* p, Uint32 flags, Eterm bin, Binary* con break; } case B2TUncompressChunk: { - Uint chunk = ctx->reds; + uLongf chunk = ctx->reds; int zret; if (chunk > ctx->u.uc.dleft) -- cgit v1.2.3 From 6817539f36b0f5b4fcc3164d812e9535ee4be9ff Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 8 Nov 2013 17:51:56 +0100 Subject: erts: Improve stress of binary_to_term in binary_SUITE by doing repeated calls with different reduction count. --- erts/emulator/test/binary_SUITE.erl | 143 ++++++++++++++++++++++++------------ 1 file changed, 96 insertions(+), 47 deletions(-) (limited to 'erts') diff --git a/erts/emulator/test/binary_SUITE.erl b/erts/emulator/test/binary_SUITE.erl index 08ab094019..2e5f8ce64d 100644 --- a/erts/emulator/test/binary_SUITE.erl +++ b/erts/emulator/test/binary_SUITE.erl @@ -447,26 +447,26 @@ terms(Config) when is_list(Config) -> Sz1 when is_integer(Sz1), size(Bin1) =< Sz1 -> ok end, - Term = binary_to_term(Bin), - Term = binary_to_term(Bin, [safe]), + Term = binary_to_term_stress(Bin), + Term = binary_to_term_stress(Bin, [safe]), Unaligned = make_unaligned_sub_binary(Bin), - Term = binary_to_term(Unaligned), - Term = binary_to_term(Unaligned, []), - Term = binary_to_term(Bin, [safe]), + Term = binary_to_term_stress(Unaligned), + Term = binary_to_term_stress(Unaligned, []), + Term = binary_to_term_stress(Bin, [safe]), BinC = erlang:term_to_binary(Term, [compressed]), - Term = binary_to_term(BinC), + Term = binary_to_term_stress(BinC), true = size(BinC) =< size(Bin), Bin = term_to_binary(Term, [{compressed,0}]), terms_compression_levels(Term, size(Bin), 1), UnalignedC = make_unaligned_sub_binary(BinC), - Term = binary_to_term(UnalignedC) + Term = binary_to_term_stress(UnalignedC) end, ?line test_terms(TestFun), ok. terms_compression_levels(Term, UncompressedSz, Level) when Level < 10 -> BinC = erlang:term_to_binary(Term, [{compressed,Level}]), - Term = binary_to_term(BinC), + Term = binary_to_term_stress(BinC), Sz = byte_size(BinC), true = Sz =< UncompressedSz, terms_compression_levels(Term, UncompressedSz, Level+1); @@ -476,9 +476,9 @@ terms_float(Config) when is_list(Config) -> ?line test_floats(fun(Term) -> Bin0 = term_to_binary(Term), Bin0 = term_to_binary(Term, [{minor_version,0}]), - Term = binary_to_term(Bin0), + Term = binary_to_term_stress(Bin0), Bin1 = term_to_binary(Term, [{minor_version,1}]), - Term = binary_to_term(Bin1), + Term = binary_to_term_stress(Bin1), true = size(Bin1) < size(Bin0), Size0 = erlang:external_size(Term), Size00 = erlang:external_size(Term, [{minor_version, 0}]), @@ -490,7 +490,7 @@ terms_float(Config) when is_list(Config) -> float_middle_endian(Config) when is_list(Config) -> %% Testing for roundtrip is not enough. ?line <<131,70,63,240,0,0,0,0,0,0>> = term_to_binary(1.0, [{minor_version,1}]), - ?line 1.0 = binary_to_term(<<131,70,63,240,0,0,0,0,0,0>>). + ?line 1.0 = binary_to_term_stress(<<131,70,63,240,0,0,0,0,0,0>>). external_size(Config) when is_list(Config) -> %% Build a term whose external size only fits in a big num (on 32-bit CPU). @@ -608,10 +608,10 @@ bad_binary_to_term(Config) when is_list(Config) -> ok. bad_bin_to_term(BadBin) -> - {'EXIT',{badarg,_}} = (catch binary_to_term(BadBin)). + {'EXIT',{badarg,_}} = (catch binary_to_term_stress(BadBin)). bad_bin_to_term(BadBin,Opts) -> - {'EXIT',{badarg,_}} = (catch binary_to_term(BadBin,Opts)). + {'EXIT',{badarg,_}} = (catch binary_to_term_stress(BadBin,Opts)). safe_binary_to_term2(doc) -> "Test safety options for binary_to_term/2"; safe_binary_to_term2(Config) when is_list(Config) -> @@ -622,7 +622,7 @@ safe_binary_to_term2(Config) when is_list(Config) -> BadRef = <<131,114,0,3,BadHostAtom/binary,0,<<0,0,0,255>>/binary, Empty/binary,Empty/binary>>, ?line bad_bin_to_term(BadRef, [safe]), % good ref, with a bad atom - ?line fullsweep_after = binary_to_term(<<131,100,0,15,"fullsweep_after">>, [safe]), % should be a good atom + ?line fullsweep_after = binary_to_term_stress(<<131,100,0,15,"fullsweep_after">>, [safe]), % should be a good atom BadExtFun = <<131,113,100,0,4,98,108,117,101,100,0,4,109,111,111,110,97,3>>, ?line bad_bin_to_term(BadExtFun, [safe]), ok. @@ -673,14 +673,14 @@ corrupter0(Term) -> corrupter(Bin, Pos) when Pos >= 0 -> ?line {ShorterBin, Rest} = split_binary(Bin, Pos), - ?line catch binary_to_term(ShorterBin), %% emulator shouldn't crash + ?line catch binary_to_term_stress(ShorterBin), %% emulator shouldn't crash ?line MovedBin = list_to_binary([ShorterBin]), - ?line catch binary_to_term(MovedBin), %% emulator shouldn't crash + ?line catch binary_to_term_stress(MovedBin), %% emulator shouldn't crash %% Bit faults, shouldn't crash <> = Rest, Fun = fun(M) -> FaultyByte = Byte bxor M, - catch binary_to_term(<>) end, ?line lists:foreach(Fun,[1,2,4,8,16,32,64,128,255]), ?line corrupter(Bin, Pos-1); @@ -694,7 +694,7 @@ more_bad_terms(Config) when is_list(Config) -> ?line ok = io:format("File: ~s\n", [BadFile]), ?line case file:read_file(BadFile) of {ok,Bin} -> - ?line {'EXIT',{badarg,_}} = (catch binary_to_term(Bin)), + ?line {'EXIT',{badarg,_}} = (catch binary_to_term_stress(Bin)), ok; Other -> ?line ?t:fail(Other) @@ -703,7 +703,7 @@ more_bad_terms(Config) when is_list(Config) -> otp_5484(Config) when is_list(Config) -> ?line {'EXIT',_} = (catch - binary_to_term( + binary_to_term_stress( <<131, 104,2, %Tuple, 2 elements 103, %Pid @@ -716,7 +716,7 @@ otp_5484(Config) when is_list(Config) -> ?line {'EXIT',_} = (catch - binary_to_term( + binary_to_term_stress( <<131, 104,2, %Tuple, 2 elements 103, %Pid @@ -728,13 +728,13 @@ otp_5484(Config) when is_list(Config) -> ?line {'EXIT',_} = (catch - binary_to_term( + binary_to_term_stress( %% A old-type fun in a list containing a bad creator pid. <<131,108,0,0,0,1,117,0,0,0,0,103,100,0,13,110,111,110,111,100,101,64,110,111,104,111,115,116,255,255,0,25,255,0,0,0,0,100,0,1,116,97,0,98,6,142,121,72,106>>)), ?line {'EXIT',_} = (catch - binary_to_term( + binary_to_term_stress( %% A new-type fun in a list containing a bad creator pid. %% <<131, @@ -746,7 +746,7 @@ otp_5484(Config) when is_list(Config) -> ?line {'EXIT',_} = (catch - binary_to_term( + binary_to_term_stress( %% A new-type fun in a list containing a bad module. <<131, 108,0,0,0,1, %List, 1 element @@ -757,7 +757,7 @@ otp_5484(Config) when is_list(Config) -> ?line {'EXIT',_} = (catch - binary_to_term( + binary_to_term_stress( %% A new-type fun in a list containing a bad index. <<131, 108,0,0,0,1, %List, 1 element @@ -769,7 +769,7 @@ otp_5484(Config) when is_list(Config) -> ?line {'EXIT',_} = (catch - binary_to_term( + binary_to_term_stress( %% A new-type fun in a list containing a bad unique value. <<131, 108,0,0,0,1, %List, 1 element @@ -782,46 +782,46 @@ otp_5484(Config) when is_list(Config) -> %% An absurdly large atom. ?line {'EXIT',_} = - (catch binary_to_term(iolist_to_binary([<<131,100,65000:16>>| + (catch binary_to_term_stress(iolist_to_binary([<<131,100,65000:16>>| lists:duplicate(65000, 42)]))), %% Longer than 255 characters. ?line {'EXIT',_} = - (catch binary_to_term(iolist_to_binary([<<131,100,256:16>>| + (catch binary_to_term_stress(iolist_to_binary([<<131,100,256:16>>| lists:duplicate(256, 42)]))), %% OTP-7218. Thanks to Matthew Dempsky. Also make sure that we %% cover the other error cases for external funs (EXPORT_EXT). ?line {'EXIT',_} = - (catch binary_to_term( + (catch binary_to_term_stress( <<131, 113, %EXPORT_EXP 97,13, %Integer: 13 97,13, %Integer: 13 97,13>>)), %Integer: 13 ?line {'EXIT',_} = - (catch binary_to_term( + (catch binary_to_term_stress( <<131, 113, %EXPORT_EXP 100,0,1,64, %Atom: '@' 97,13, %Integer: 13 97,13>>)), %Integer: 13 ?line {'EXIT',_} = - (catch binary_to_term( + (catch binary_to_term_stress( <<131, 113, %EXPORT_EXP 100,0,1,64, %Atom: '@' 100,0,1,64, %Atom: '@' 106>>)), %NIL ?line {'EXIT',_} = - (catch binary_to_term( + (catch binary_to_term_stress( <<131, 113, %EXPORT_EXP 100,0,1,64, %Atom: '@' 100,0,1,64, %Atom: '@' 98,255,255,255,255>>)), %Integer: -1 ?line {'EXIT',_} = - (catch binary_to_term( + (catch binary_to_term_stress( <<131, 113, %EXPORT_EXP 100,0,1,64, %Atom: '@' @@ -829,7 +829,7 @@ otp_5484(Config) when is_list(Config) -> 113,97,13,97,13,97,13>>)), %fun 13:13/13 %% Bad funs. - ?line {'EXIT',_} = (catch binary_to_term(fake_fun(0, lists:seq(0, 256)))), + ?line {'EXIT',_} = (catch binary_to_term_stress(fake_fun(0, lists:seq(0, 256)))), ok. fake_fun(Arity, Env0) -> @@ -863,7 +863,7 @@ try_bad_lengths(B) -> try_bad_lengths(B, L) when L > 16#FFFFFFF0 -> Bin = <>, io:format("~p\n", [Bin]), - {'EXIT',_} = (catch binary_to_term(Bin)), + {'EXIT',_} = (catch binary_to_term_stress(Bin)), try_bad_lengths(B, L-1); try_bad_lengths(_, _) -> ok. @@ -917,7 +917,7 @@ otp_6817_try_bin(Bin) -> %% If the bug is present, the heap pointer will moved when the invalid term %% is found and we will have a linked list passing through the limbo area %% between the heap top and the stack pointer. - catch binary_to_term(Bin), + catch binary_to_term_stress(Bin), %% If the bug is present, we will overwrite the pointers in the limbo area. Filler = erlang:make_tuple(1024, 16#3FA), @@ -1227,12 +1227,12 @@ bit_sized_binary_sizes(Config) when is_list(Config) -> bsbs_1(0) -> BinSize = 32+8, io:format("A: ~p BinSize: ~p", [0,BinSize]), - Bin = binary_to_term(<<131,$M,5:32,0,0,0,0,0,0>>), + Bin = binary_to_term_stress(<<131,$M,5:32,0,0,0,0,0,0>>), BinSize = bit_size(Bin); bsbs_1(A) -> BinSize = 32+A, io:format("A: ~p BinSize: ~p", [A,BinSize]), - Bin = binary_to_term(<<131,$M,5:32,A,0,0,0,0,0>>), + Bin = binary_to_term_stress(<<131,$M,5:32,A,0,0,0,0,0>>), BinSize = bit_size(Bin). deep(Config) when is_list(Config) -> @@ -1249,7 +1249,7 @@ deep(Config) when is_list(Config) -> deep_roundtrip(T) -> B = term_to_binary(T), - T = binary_to_term(B). + T = binary_to_term_stress(B). obsolete_funs(Config) when is_list(Config) -> erts_debug:set_internal_state(available_internal_state, true), @@ -1284,29 +1284,29 @@ obsolete_fun(Fun) -> Tuple = no_fun_roundtrip(Fun). no_fun_roundtrip(Term) -> - binary_to_term(erts_debug:get_internal_state({term_to_binary_no_funs,Term})). + binary_to_term_stress(erts_debug:get_internal_state({term_to_binary_no_funs,Term})). %% Test non-standard encodings never generated by term_to_binary/1 %% but recognized by binary_to_term/1. robustness(Config) when is_list(Config) -> - ?line [] = binary_to_term(<<131,107,0,0>>), %Empty string. - ?line [] = binary_to_term(<<131,108,0,0,0,0,106>>), %Zero-length list. + ?line [] = binary_to_term_stress(<<131,107,0,0>>), %Empty string. + ?line [] = binary_to_term_stress(<<131,108,0,0,0,0,106>>), %Zero-length list. %% {[],a} where [] is a zero-length list. - ?line {[],a} = binary_to_term(<<131,104,2,108,0,0,0,0,106,100,0,1,97>>), + ?line {[],a} = binary_to_term_stress(<<131,104,2,108,0,0,0,0,106,100,0,1,97>>), %% {42,a} where 42 is a zero-length list with 42 in the tail. - ?line {42,a} = binary_to_term(<<131,104,2,108,0,0,0,0,97,42,100,0,1,97>>), + ?line {42,a} = binary_to_term_stress(<<131,104,2,108,0,0,0,0,97,42,100,0,1,97>>), %% {{x,y},a} where {x,y} is a zero-length list with {x,y} in the tail. - ?line {{x,y},a} = binary_to_term(<<131,104,2,108,0,0,0,0, + ?line {{x,y},a} = binary_to_term_stress(<<131,104,2,108,0,0,0,0, 104,2,100,0,1,120,100,0,1, 121,100,0,1,97>>), %% Bignums fitting in 32 bits. - ?line 16#7FFFFFFF = binary_to_term(<<131,98,127,255,255,255>>), - ?line -1 = binary_to_term(<<131,98,255,255,255,255>>), + ?line 16#7FFFFFFF = binary_to_term_stress(<<131,98,127,255,255,255>>), + ?line -1 = binary_to_term_stress(<<131,98,255,255,255,255>>), ok. @@ -1324,7 +1324,7 @@ run_otp_8180(Name) -> ?line {ok,Bins} = file:consult(Name), [begin io:format("~p\n", [Bin]), - ?line {'EXIT',{badarg,_}} = (catch binary_to_term(Bin)) + ?line {'EXIT',{badarg,_}} = (catch binary_to_term_stress(Bin)) end || Bin <- Bins], ok. @@ -1393,3 +1393,52 @@ unaligned_sub_bin(Bin0, Offs) -> Bin. id(I) -> I. + + +%% Stress binary_to_term with different initial reductions +binary_to_term_stress(Bin) -> + binary_to_term_stress(Bin, no_opts). + +binary_to_term_stress(Bin, Opts) -> + Reds = get_reds(), + T = b2t(erlang:system_info(context_reductions), + Bin, Opts, catch_binary_to_term(Bin, Opts)), + set_reds(Reds), + T = case Opts of + no_opts -> binary_to_term(Bin); + _ -> binary_to_term(Bin,Opts) + end. + +catch_binary_to_term(Bin, no_opts) -> + try binary_to_term(Bin) + catch + error:badarg -> binary_to_term_throws_badarg + end; +catch_binary_to_term(Bin, Opts) -> + try binary_to_term(Bin, Opts) + catch + error:badarg -> binary_to_term_throws_badarg + end. + +b2t(0, _Bin, _Opts, Term) -> + Term; +b2t(Reds, Bin, Opts, Term) -> + set_reds(Reds), + Term = catch_binary_to_term(Bin,Opts), + b2t(Reds div 3, Bin, Opts, Term). + +set_reds(Reds) -> + try erts_debug:set_internal_state(reds_left, Reds) + catch + error:undef -> + erts_debug:set_internal_state(available_internal_state, true), + set_reds(Reds) + end. + +get_reds() -> + try erts_debug:get_internal_state(reds_left) + catch + error:undef -> + erts_debug:set_internal_state(available_internal_state, true), + get_reds() + end. -- cgit v1.2.3 From 522a29666088d5d96956d2752ffb1596d778cffd Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 13 Nov 2013 18:07:33 +0100 Subject: erts: Let term_to_binary disable gc while trapping as an attempt to improve performance --- erts/emulator/beam/external.c | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'erts') diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 28c6caf5fe..d5f3b19b82 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -1055,8 +1055,10 @@ static BIF_RETTYPE term_to_binary_trap_1(BIF_ALIST_1) Binary *bin = ((ProcBin *) binary_val(bt))->val; Eterm res = erts_term_to_binary_int(BIF_P, Term, 0, 0,bin); if (is_tuple(res)) { + ASSERT(BIF_P->flags & F_DISABLE_GC); BIF_TRAP1(&term_to_binary_trap_export,BIF_P,res); } else { + erts_set_gc_state(BIF_P, 1); BIF_RET(res); } } @@ -1065,8 +1067,10 @@ BIF_RETTYPE term_to_binary_1(BIF_ALIST_1) { Eterm res = erts_term_to_binary_int(BIF_P, BIF_ARG_1, 0, TERM_TO_BINARY_DFLAGS, NULL); if (is_tuple(res)) { + erts_set_gc_state(BIF_P, 0); BIF_TRAP1(&term_to_binary_trap_export,BIF_P,res); } else { + ASSERT(!(BIF_P->flags & F_DISABLE_GC)); BIF_RET(res); } } @@ -1118,8 +1122,10 @@ BIF_RETTYPE term_to_binary_2(BIF_ALIST_2) res = erts_term_to_binary_int(p, Term, level, flags, bin); if (is_tuple(res)) { + erts_set_gc_state(p, 0); BIF_TRAP1(&term_to_binary_trap_export,BIF_P,res); } else { + ASSERT(!(BIF_P->flags & F_DISABLE_GC)); BIF_RET(res); } } -- cgit v1.2.3 From 1f09936f34f5daee534bbfde4f16e5bbb434b6c4 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 13 Nov 2013 18:46:17 +0100 Subject: erts: Yield after trapping term_to_binary if gc has been ordered or if "too much" offheap binaries has been built --- erts/emulator/beam/erl_bif_info.c | 5 +++-- erts/emulator/beam/erl_process.c | 9 +++------ erts/emulator/beam/external.c | 7 +++++-- 3 files changed, 11 insertions(+), 10 deletions(-) (limited to 'erts') diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index f5893f9291..39bbd6b182 100755 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -3603,8 +3603,9 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2) default: BIF_ERROR(BIF_P, BADARG); break; } - res = erts_set_gc_state(BIF_P, enable); - BIF_RET(res ? am_true : am_false); + res = (BIF_P->flags & F_DISABLE_GC) ? am_false : am_true; + erts_set_gc_state(BIF_P, enable); + BIF_RET(res); } else if (ERTS_IS_ATOM_STR("send_fake_exit_signal", BIF_ARG_1)) { /* Used by signal_SUITE (emulator) */ diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 13b18e9e0e..b88c871ffc 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -8271,25 +8271,22 @@ save_gc_task(Process *c_p, ErtsProcSysTask *st, int prio) int erts_set_gc_state(Process *c_p, int enable) { - int res; ErtsProcSysTaskQs *dgc_tsk_qs; ASSERT(c_p == erts_get_current_process()); ASSERT((ERTS_PSFLG_RUNNING|ERTS_PSFLG_RUNNING_SYS) & erts_smp_atomic32_read_nob(&c_p->state)); ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); - res = !(c_p->flags & F_DISABLE_GC); - if (!enable) { c_p->flags |= F_DISABLE_GC; - return res; + return 0; } c_p->flags &= ~F_DISABLE_GC; dgc_tsk_qs = ERTS_PROC_GET_DELAYED_GC_TASK_QS(c_p); if (!dgc_tsk_qs) - return res; + return 0; /* Move delayed gc tasks into sys tasks queues. */ @@ -8387,7 +8384,7 @@ erts_set_gc_state(Process *c_p, int enable) if (dgc_tsk_qs) proc_sys_task_queues_free(dgc_tsk_qs); - return res; + return 1; } void diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index d5f3b19b82..7dc7ba6f98 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -1058,8 +1058,11 @@ static BIF_RETTYPE term_to_binary_trap_1(BIF_ALIST_1) ASSERT(BIF_P->flags & F_DISABLE_GC); BIF_TRAP1(&term_to_binary_trap_export,BIF_P,res); } else { - erts_set_gc_state(BIF_P, 1); - BIF_RET(res); + if (erts_set_gc_state(BIF_P, 1) + || MSO(BIF_P).overhead > BIN_VHEAP_SZ(BIF_P)) + ERTS_BIF_YIELD_RETURN(BIF_P, res); + else + BIF_RET(res); } } -- cgit v1.2.3 From 6cff38512b753172a7dfa2bedd60e8987156736d Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 10 Oct 2013 20:54:12 +0200 Subject: erts: Adjust term_to_binary reduction factors Made them powers of 2 for faster calculations. 500 encoded terms per reductions seemed a bit much, lowered to 32. TERM_TO_BINARY_SIZE_FACTOR was not used in practice as it was only applied to small binaries. Lowered from 500kb to 256kb compressed bytes per trap call. --- erts/emulator/beam/external.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'erts') diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 7dc7ba6f98..4600d691c7 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -1717,12 +1717,10 @@ erts_term_to_binary(Process* p, Eterm Term, int level, Uint flags) { /* #define EXTREME_TTB_TRAPPING 1 */ #ifndef EXTREME_TTB_TRAPPING -#define TERM_TO_BINARY_LOOP_FACTOR 500 -#define TERM_TO_BINARY_SIZE_FACTOR 500000 -#define TERM_TO_BINARY_COMPRESS_CHUNK 500000 +#define TERM_TO_BINARY_LOOP_FACTOR 32 +#define TERM_TO_BINARY_COMPRESS_CHUNK (1 << 18) #else #define TERM_TO_BINARY_LOOP_FACTOR 1 -#define TERM_TO_BINARY_SIZE_FACTOR 10 #define TERM_TO_BINARY_COMPRESS_CHUNK 10 #endif @@ -1859,7 +1857,7 @@ static Eterm erts_term_to_binary_int(Process* p, Eterm Term, int level, Uint fla /* Finish in one go */ res = erts_term_to_binary_simple(p, Term, size, level, flags); - BUMP_REDS(p, size / TERM_TO_BINARY_SIZE_FACTOR); + BUMP_REDS(p, 1); return res; } -- cgit v1.2.3