diff options
Diffstat (limited to 'erts')
39 files changed, 999 insertions, 217 deletions
diff --git a/erts/doc/src/absform.xml b/erts/doc/src/absform.xml index 63e9abc210..1c0c3e1319 100644 --- a/erts/doc/src/absform.xml +++ b/erts/doc/src/absform.xml @@ -583,7 +583,7 @@ <list type="bulleted"> <item>If C is a constraint <c>is_subtype(V, T)</c> or <c>V :: T</c>, where <c>V</c> is a type variable and <c>T</c> is a type, then - Rep(C) = <c>{type,LINE,constraint,[Rep(F),[Rep(V),Rep(T)]]}</c>. + Rep(C) = <c>{type,LINE,constraint,[{atom,LINE,is_subtype},[Rep(V),Rep(T)]]}</c>. </item> </list> </section> diff --git a/erts/doc/src/erl.xml b/erts/doc/src/erl.xml index c4eb0e16ec..b6fa4c254c 100644 --- a/erts/doc/src/erl.xml +++ b/erts/doc/src/erl.xml @@ -1338,14 +1338,14 @@ <item> <p>Default process flag settings.</p> <taglist> - <tag><marker id="+xohmq"><c>+xohmq true|false</c></marker></tag> + <tag><marker id="+xmqd"><c>+xmqd off_heap|on_heap|mixed</c></marker></tag> <item><p> Sets the default value for the process flag - <c>off_heap_message_queue</c>. If <c>+xohmq</c> is not - passed, <c>false</c> will be the default. For more information, + <c>message_queue_data</c>. If <c>+xmqd</c> is not + passed, <c>mixed</c> will be the default. For more information, see the documentation of - <seealso marker="erlang#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue, - OHMQ)</c></seealso>. + <seealso marker="erlang#process_flag_message_queue_data"><c>process_flag(message_queue_data, + MQD)</c></seealso>. </p></item> </taglist> </item> diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml index 2e82bb62a9..6ed03f3dfc 100644 --- a/erts/doc/src/erlang.xml +++ b/erts/doc/src/erlang.xml @@ -59,6 +59,12 @@ </datatype> <datatype> + <name name="message_queue_data"></name> + <desc><p>See <seealso marker="#process_flag_message_queue_data"><c>erlang:process_flag(message_queue_data, MQD)</c></seealso>.</p> + </desc> + </datatype> + + <datatype> <name name="timestamp"></name> <desc><p>See <seealso marker="#timestamp/0">erlang:timestamp/0</seealso>.</p> </desc> @@ -4280,39 +4286,52 @@ os_prompt% </pre> <p>Returns the old value of the flag.</p> </desc> </func> - <marker id="process_flag_off_heap_message_queue"/> + <marker id="process_flag_message_queue_data"/> <func> <name name="process_flag" arity="2" clause_i="5"/> - <fsummary>Set process flag <c>off_heap_message_queue</c> for the calling process</fsummary> + <fsummary>Set process flag <c>message_queue_data</c> for the calling process</fsummary> + <type name="message_queue_data"/> <desc> <p>This flag determines how messages in the message queue are stored. When the flag is:</p> <taglist> - <tag><c>true</c></tag> + <tag><c>off_heap</c></tag> <item><p> <em>All</em> messages in the message queue will be stored outside of the process heap. This implies that <em>no</em> messages in the message queue will be part of a garbage collection of the process. </p></item> - <tag><c>false</c></tag> + <tag><c>on_heap</c></tag> + <item><p> + All messages in the message queue will eventually be + placed on heap. They may however temporarily be stored + off heap. This is how messages always have been stored + up until ERTS version 8.0. + </p></item> + <tag><c>mixed</c></tag> <item><p> Messages may be placed either on the heap or outside of the heap. </p></item> </taglist> <p> + The default <c>message_queue_data</c> process flag is determined + by the <seealso marker="erl#+xmqd"><c>+xmqd</c></seealso> + <c>erl</c> command line argument. + </p> + <p> If the process potentially may get a hugh amount of messages, - you are recommended to set the flag to <c>true</c>. This since - a garbage collection with lots of messages placed on the heap - may become extremly expensive. Performance of the actual - message passing is however generally better when setting the - flag to <c>false</c>. + you are recommended to set the flag to <c>off_heap</c>. This + since a garbage collection with lots of messages placed on + the heap may become extremly expensive and the process may + consume large amounts of memory. Performance of the + actual message passing is however generally better when not + using the <c>off_heap</c> flag. </p> <p> - When changing this flag from <c>false</c> to <c>true</c>, - all messages in the message queue are moved off heap. This - work has been initiated but not completed when this function + When changing this flag messages will be moved. This work + has been initiated but not completed when this function call returns. </p> <p>Returns the old value of the flag.</p> @@ -4478,6 +4497,7 @@ os_prompt% </pre> <type name="process_info_result_item"/> <type name="priority_level"/> <type name="stack_item"/> + <type name="message_queue_data" /> <desc> <p>Returns a list containing <c><anno>InfoTuple</anno></c>s with miscellaneous information about the process identified by @@ -4530,6 +4550,7 @@ os_prompt% </pre> <type name="process_info_result_item"/> <type name="stack_item"/> <type name="priority_level"/> + <type name="message_queue_data" /> <desc> <p>Returns information about the process identified by <c><anno>Pid</anno></c>, as specified by @@ -4698,13 +4719,14 @@ os_prompt% </pre> monitor by name, the list item is <c>{process, {<anno>RegName</anno>, <anno>Node</anno>}}</c>.</p> </item> - <tag><c>{off_heap_message_queue, <anno>OHMQ</anno>}</c></tag> + <tag><c>{message_queue_data, <anno>MQD</anno>}</c></tag> <item> - <p>Returns the current state of the <c>off_heap_message_queue</c> - process flag. <c><anno>OHMQ</anno></c> is either <c>true</c>, or - <c>false</c>. For more information, see the documentation of - <seealso marker="#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue, - OHMQ)</c></seealso>.</p> + <p>Returns the current state of the <c>message_queue_data</c> + process flag. <c><anno>MQD</anno></c> is either <c>off_heap</c>, + <c>on_heap</c>, or <c>mixed</c>. For more information, see the + documentation of + <seealso marker="#process_flag_message_queue_data"><c>process_flag(message_queue_data, + MQD)</c></seealso>.</p> </item> <tag><c>{priority, <anno>Level</anno>}</c></tag> <item> @@ -5474,6 +5496,7 @@ true</pre> <name name="spawn_opt" arity="2"/> <fsummary>Creates a new process with a fun as entry point.</fsummary> <type name="priority_level"/> + <type name="message_queue_data" /> <type name="spawn_opt_option" /> <desc> <p>Returns the process identifier (pid) of a new process @@ -5490,6 +5513,7 @@ true</pre> <name name="spawn_opt" arity="3"/> <fsummary>Creates a new process with a fun as entry point on a given node.</fsummary> <type name="priority_level"/> + <type name="message_queue_data" /> <type name="spawn_opt_option" /> <desc> <p>Returns the process identifier (pid) of a new process started @@ -5505,6 +5529,7 @@ true</pre> <name name="spawn_opt" arity="4"/> <fsummary>Creates a new process with a function as entry point.</fsummary> <type name="priority_level"/> + <type name="message_queue_data" /> <type name="spawn_opt_option" /> <desc> <p>Works as @@ -5607,17 +5632,17 @@ true</pre> fine-tuning an application and to measure the execution time with various <c><anno>VSize</anno></c> values.</p> </item> - <tag><c>{off_heap_message_queue, <anno>OHMQ</anno>}</c></tag> + <tag><c>{message_queue_data, <anno>MQD</anno>}</c></tag> <item> - <p>Sets the state of the <c>off_heap_message_queue</c> process - flag. <c><anno>OHMQ</anno></c> should be either <c>true</c>, or - <c>false</c>. The default <c>off_heap_message_queue</c> process - flag is determined by the - <seealso marker="erl#+xohmq"><c>+xohmq</c></seealso> <c>erl</c> + <p>Sets the state of the <c>message_queue_data</c> process + flag. <c><anno>MQD</anno></c> should be either <c>off_heap</c>, + <c>on_heap</c>, or <c>mixed</c>. The default + <c>message_queue_data</c> process flag is determined by the + <seealso marker="erl#+xmqd"><c>+xmqd</c></seealso> <c>erl</c> command line argument. For more information, see the documentation of - <seealso marker="#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue, - <anno>OHMQ</anno>)</c></seealso>.</p> + <seealso marker="#process_flag_message_queue_data"><c>process_flag(message_queue_data, + <anno>MQD</anno>)</c></seealso>.</p> </item> </taglist> </desc> @@ -5627,6 +5652,7 @@ true</pre> <name name="spawn_opt" arity="5"/> <fsummary>Creates a new process with a function as entry point on a given node.</fsummary> <type name="priority_level"/> + <type name="message_queue_data" /> <type name="spawn_opt_option" /> <desc> <p>Returns the process identifier (pid) of a new process started @@ -7106,15 +7132,15 @@ ok used by the runtime system. It is on the form "<major ver>.<minor ver>".</p> </item> - <tag><marker id="system_info_off_heap_message_queue"><c>off_heap_message_queue</c></marker></tag> + <tag><marker id="system_info_message_queue_data"><c>message_queue_data</c></marker></tag> <item> - <p>Returns the default value of the <c>off_heap_message_queue</c> - process flag which is either <c>true</c> or <c>false</c>. This - default is set by the <c>erl</c> command line argument - <seealso marker="erl#+xohmq"><c>+xohmq</c></seealso>. For more information on the - <c>off_heap_message_queue</c> process flag, see documentation of - <seealso marker="#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue, - OHMQ)</c></seealso>.</p> + <p>Returns the default value of the <c>message_queue_data</c> + process flag which is either <c>off_heap</c>, <c>on_heap</c>, or <c>mixed</c>. + This default is set by the <c>erl</c> command line argument + <seealso marker="erl#+xmqd"><c>+xmqd</c></seealso>. For more information on the + <c>message_queue_data</c> process flag, see documentation of + <seealso marker="#process_flag_message_queue_data"><c>process_flag(message_queue_data, + MQD)</c></seealso>.</p> </item> <tag><marker id="system_info_otp_release"><c>otp_release</c></marker></tag> <item> diff --git a/erts/emulator/beam/atom.h b/erts/emulator/beam/atom.h index ead56c83d8..2c002ca92f 100644 --- a/erts/emulator/beam/atom.h +++ b/erts/emulator/beam/atom.h @@ -129,6 +129,7 @@ typedef enum { (erts_is_atom_utf8_bytes((byte *) LSTR, sizeof(LSTR) - 1, (TERM))) #define ERTS_DECL_AM(S) Eterm AM_ ## S = am_atom_put(#S, sizeof(#S) - 1) #define ERTS_INIT_AM(S) AM_ ## S = am_atom_put(#S, sizeof(#S) - 1) +#define ERTS_MAKE_AM(Str) am_atom_put(Str, sizeof(Str) - 1) int atom_table_size(void); /* number of elements */ int atom_table_sz(void); /* table size in bytes, excluding stored objects */ diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index ea04495574..7424e47ec3 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -350,6 +350,7 @@ atom memory_internal atom memory_types atom message atom message_binary +atom message_queue_data atom message_queue_len atom messages atom merge_trap @@ -361,6 +362,7 @@ atom min_heap_size atom min_bin_vheap_size atom minor_version atom Minus='-' +atom mixed atom module atom module_info atom monitored_by @@ -423,11 +425,12 @@ atom notify atom notsup atom nouse_stdio atom objects -atom off_heap_message_queue +atom off_heap atom offset atom ok atom old_heap_block_size atom old_heap_size +atom on_heap atom on_load atom open atom open_error diff --git a/erts/emulator/beam/beam_emu.c b/erts/emulator/beam/beam_emu.c index 1a4133bceb..4d7b00b032 100644 --- a/erts/emulator/beam/beam_emu.c +++ b/erts/emulator/beam/beam_emu.c @@ -1843,8 +1843,8 @@ void process_main(void) * in the queue. This since messages with data outside * the heap will be corrupted by a GC. */ - ASSERT(!(c_p->flags & F_DISABLE_GC)); - c_p->flags |= F_DISABLE_GC; + ASSERT(!(c_p->flags & F_DELAY_GC)); + c_p->flags |= F_DELAY_GC; loop_rec__: PROCESS_MAIN_CHK_LOCKS(c_p); @@ -1858,7 +1858,7 @@ void process_main(void) if (ERTS_PROC_PENDING_EXIT(c_p)) { erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); SWAPOUT; - c_p->flags &= ~F_DISABLE_GC; + c_p->flags &= ~F_DELAY_GC; goto do_schedule; /* Will be rescheduled for exit */ } ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p); @@ -1868,7 +1868,7 @@ void process_main(void) else #endif { - c_p->flags &= ~F_DISABLE_GC; + c_p->flags &= ~F_DELAY_GC; SET_I((BeamInstr *) Arg(0)); Goto(*I); /* Jump to a wait or wait_timeout instruction */ } @@ -1978,7 +1978,7 @@ void process_main(void) CANCEL_TIMER(c_p); erts_save_message_in_proc(c_p, msgp); - c_p->flags &= ~F_DISABLE_GC; + c_p->flags &= ~F_DELAY_GC; if (ERTS_IS_GC_DESIRED_INTERNAL(c_p, HTOP, E)) { /* @@ -2000,7 +2000,7 @@ void process_main(void) */ OpCase(loop_rec_end_f): { - ASSERT(c_p->flags & F_DISABLE_GC); + ASSERT(c_p->flags & F_DELAY_GC); SET_I((BeamInstr *) Arg(0)); SAVE_MESSAGE(c_p); @@ -2009,7 +2009,7 @@ void process_main(void) goto loop_rec__; } - c_p->flags &= ~F_DISABLE_GC; + c_p->flags &= ~F_DELAY_GC; c_p->i = I; SWAPOUT; c_p->arity = 0; @@ -3558,6 +3558,16 @@ do { \ StoreBifResult(1, result); } + OpCase(i_get_hash_cId): + { + Eterm arg; + Eterm result; + + GetArg1(0, arg); + result = erts_pd_hash_get_with_hx(c_p, Arg(1), arg); + StoreBifResult(2, result); + } + { Eterm case_end_val; diff --git a/erts/emulator/beam/beam_load.c b/erts/emulator/beam/beam_load.c index 5db971b6af..d367cce212 100644 --- a/erts/emulator/beam/beam_load.c +++ b/erts/emulator/beam/beam_load.c @@ -4284,6 +4284,53 @@ gen_get_map_element(LoaderState* stp, GenOpArg Fail, GenOpArg Src, return op; } +static int +hash_internal_genop_arg(LoaderState* stp, GenOpArg Key, Uint32* hx) +{ + switch (Key.type) { + case TAG_a: + *hx = atom_tab(atom_val(Key.val))->slot.bucket.hvalue; + return 1; + case TAG_i: + *hx = Key.val; + return 1; + case TAG_n: + *hx = make_internal_hash(NIL); + return 1; + case TAG_q: + *hx = make_internal_hash(stp->literals[Key.val].term); + return 1; + default: + return 0; + } +} + + +static GenOp* +gen_get(LoaderState* stp, GenOpArg Src, GenOpArg Dst) +{ + GenOp* op; + Uint32 hx = 0; + + NEW_GENOP(stp, op); + op->next = NULL; + if (hash_internal_genop_arg(stp, Src, &hx)) { + op->arity = 3; + op->op = genop_i_get_hash_3; + op->a[0] = Src; + op->a[1].type = TAG_u; + op->a[1].val = (BeamInstr) hx; + op->a[2] = Dst; + } else { + op->arity = 2; + op->op = genop_i_get_2; + op->a[0] = Src; + op->a[1] = Dst; + } + return op; +} + + static GenOp* gen_get_map_elements(LoaderState* stp, GenOpArg Fail, GenOpArg Src, GenOpArg Size, GenOpArg* Rest) diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 886b19fe6e..bb9165cd79 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -910,13 +910,22 @@ BIF_RETTYPE spawn_opt_1(BIF_ALIST_1) so.priority = PRIORITY_LOW; else goto error; - } else if (arg == am_off_heap_message_queue) { - if (val == am_true) - so.flags |= SPO_OFF_HEAP_MSGQ; - else if (val == am_false) + } else if (arg == am_message_queue_data) { + switch (val) { + case am_mixed: + so.flags &= ~(SPO_OFF_HEAP_MSGQ|SPO_ON_HEAP_MSGQ); + break; + case am_on_heap: so.flags &= ~SPO_OFF_HEAP_MSGQ; - else + so.flags |= SPO_ON_HEAP_MSGQ; + break; + case am_off_heap: + so.flags &= ~SPO_ON_HEAP_MSGQ; + so.flags |= SPO_OFF_HEAP_MSGQ; + break; + default: goto error; + } } else if (arg == am_min_heap_size && is_small(val)) { Sint min_heap_size = signed_val(val); if (min_heap_size < 0) { @@ -1695,15 +1704,10 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2) } BIF_RET(old_value); } - else if (BIF_ARG_1 == am_off_heap_message_queue) { - int enable; - if (BIF_ARG_2 == am_true) - enable = 1; - else if (BIF_ARG_2 == am_false) - enable = 0; - else + else if (BIF_ARG_1 == am_message_queue_data) { + old_value = erts_change_message_queue_management(BIF_P, BIF_ARG_2); + if (is_non_value(old_value)) goto error; - old_value = erts_change_off_heap_message_queue_state(BIF_P, enable); BIF_RET(old_value); } else if (BIF_ARG_1 == am_sensitive) { diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index c49a3ff313..07d4702b92 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -167,7 +167,7 @@ bif erts_internal:request_system_task/3 bif erts_internal:check_process_code/2 bif erts_internal:map_to_tuple_keys/1 -bif erts_internal:map_type/1 +bif erts_internal:term_type/1 bif erts_internal:map_hashmap_children/1 bif erts_internal:time_unit/0 diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c index 99458b4268..5544712e8d 100644 --- a/erts/emulator/beam/erl_alloc.c +++ b/erts/emulator/beam/erl_alloc.c @@ -292,7 +292,7 @@ static void set_default_literal_alloc_opts(struct au_init *ip) { SET_DEFAULT_ALLOC_OPTS(ip); - ip->enable = AU_ALLOC_DEFAULT_ENABLE(1); + ip->enable = 1; ip->thr_spec = 0; ip->atype = BESTFIT; ip->init.bf.ao = 1; diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 82c2aa4b9e..f952f937ce 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -592,7 +592,7 @@ static Eterm pi_args[] = { am_min_bin_vheap_size, am_current_location, am_current_stacktrace, - am_off_heap_message_queue + am_message_queue_data }; #define ERTS_PI_ARGS ((int) (sizeof(pi_args)/sizeof(Eterm))) @@ -640,7 +640,7 @@ pi_arg2ix(Eterm arg) case am_min_bin_vheap_size: return 28; case am_current_location: return 29; case am_current_stacktrace: return 30; - case am_off_heap_message_queue: return 31; + case am_message_queue_data: return 31; default: return -1; } } @@ -1499,8 +1499,22 @@ process_info_aux(Process *BIF_P, break; } - case am_off_heap_message_queue: - res = BIF_P->flags & F_OFF_HEAP_MSGQ ? am_true : am_false; + case am_message_queue_data: + switch (rp->flags & (F_OFF_HEAP_MSGQ|F_ON_HEAP_MSGQ)) { + case F_OFF_HEAP_MSGQ: + res = am_off_heap; + break; + case F_ON_HEAP_MSGQ: + res = am_on_heap; + break; + case 0: + res = am_mixed; + break; + default: + res = am_error; + ERTS_INTERNAL_ERROR("Inconsistent message queue management state"); + break; + } hp = HAlloc(BIF_P, 3); break; @@ -2665,9 +2679,18 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1) BIF_RET(am_true); } #endif - else if (BIF_ARG_1 == am_off_heap_message_queue) { - BIF_RET(erts_default_spo_flags & SPO_OFF_HEAP_MSGQ - ? am_true : am_false); + else if (BIF_ARG_1 == am_message_queue_data) { + switch (erts_default_spo_flags & (SPO_ON_HEAP_MSGQ|SPO_OFF_HEAP_MSGQ)) { + case SPO_OFF_HEAP_MSGQ: + BIF_RET(am_off_heap); + case SPO_ON_HEAP_MSGQ: + BIF_RET(am_on_heap); + case 0: + BIF_RET(am_mixed); + default: + ERTS_INTERNAL_ERROR("Inconsistent message queue management state"); + BIF_RET(am_error); + } } else if (ERTS_IS_ATOM_STR("compile_info",BIF_ARG_1)) { Uint sz; diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c index c50756d56b..15226074c3 100644 --- a/erts/emulator/beam/erl_gc.c +++ b/erts/emulator/beam/erl_gc.c @@ -145,6 +145,7 @@ static void offset_rootset(Process *p, Sint offs, char* area, Uint area_size, Eterm* objv, int nobj); static void offset_off_heap(Process* p, Sint offs, char* area, Uint area_size); static void offset_mqueue(Process *p, Sint offs, char* area, Uint area_size); +static void move_msgq_to_heap(Process *p); static void init_gc_info(ErtsGCInfo *gcip); @@ -440,8 +441,15 @@ delay_garbage_collection(Process *p, ErlHeapFragment *live_hf_end, int need) ERTS_HOLE_CHECK(p); - if (p->live_hf_end == ERTS_INVALID_HFRAG_PTR) + if ((p->flags & F_DISABLE_GC) + && p->live_hf_end == ERTS_INVALID_HFRAG_PTR) { + /* + * A BIF yielded with disabled GC. Remember + * heap fragments created by the BIF until we + * do next GC. + */ p->live_hf_end = live_hf_end; + } if (need == 0) return 1; @@ -513,6 +521,14 @@ young_gen_usage(Process *p) Eterm *aheap; hsz = p->mbuf_sz; + + if (p->flags & F_ON_HEAP_MSGQ) { + ErtsMessage *mp; + for (mp = p->msg.first; mp; mp = mp->next) + if (mp->data.attached) + hsz += erts_msg_attached_data_size(mp); + } + aheap = p->abandoned_heap; if (!aheap) hsz += p->htop - p->heap; @@ -564,10 +580,12 @@ garbage_collect(Process* p, ErlHeapFragment *live_hf_end, DTRACE_CHARBUF(pidbuf, DTRACE_TERM_BUF_SIZE); #endif - if (p->flags & F_DISABLE_GC) + if (p->flags & (F_DISABLE_GC|F_DELAY_GC)) return delay_garbage_collection(p, live_hf_end, need); - if (p->live_hf_end != ERTS_INVALID_HFRAG_PTR) + if (p->abandoned_heap) + live_hf_end = ERTS_INVALID_HFRAG_PTR; + else if (p->live_hf_end != ERTS_INVALID_HFRAG_PTR) live_hf_end = p->live_hf_end; esdp = erts_get_scheduler_data(); @@ -734,6 +752,12 @@ erts_garbage_collect_hibernate(Process* p) p->arg_reg, p->arity); + ERTS_HEAP_FREE(ERTS_ALC_T_HEAP, + (p->abandoned_heap + ? p->abandoned_heap + : p->heap), + p->heap_sz * sizeof(Eterm)); + p->heap = heap; p->high_water = htop; p->htop = htop; @@ -1025,10 +1049,13 @@ minor_collection(Process* p, ErlHeapFragment *live_hf_end, do_minor(p, live_hf_end, (char *) mature, mature_size*sizeof(Eterm), new_sz, objv, nobj); + if (p->flags & F_ON_HEAP_MSGQ) + move_msgq_to_heap(p); + new_mature = p->old_htop - prev_old_htop; size_after = new_mature; - size_after += HEAP_TOP(p) - HEAP_START(p); + size_after += HEAP_TOP(p) - HEAP_START(p) + p->mbuf_sz; *recl += (size_before - size_after); ErtsGcQuickSanityCheck(p); @@ -1441,7 +1468,7 @@ major_collection(Process* p, ErlHeapFragment *live_hf_end, (p->abandoned_heap ? p->abandoned_heap : HEAP_START(p)), - (HEAP_END(p) - HEAP_START(p)) * sizeof(Eterm)); + p->heap_sz * sizeof(Eterm)); p->abandoned_heap = NULL; p->flags &= ~F_ABANDONED_HEAP_USE; HEAP_START(p) = n_heap; @@ -1452,9 +1479,14 @@ major_collection(Process* p, ErlHeapFragment *live_hf_end, HIGH_WATER(p) = HEAP_TOP(p); + remove_message_buffers(p); + + if (p->flags & F_ON_HEAP_MSGQ) + move_msgq_to_heap(p); + ErtsGcQuickSanityCheck(p); - size_after = HEAP_TOP(p) - HEAP_START(p); + size_after = HEAP_TOP(p) - HEAP_START(p) + p->mbuf_sz; *recl += size_before - size_after; adjusted = adjust_after_fullsweep(p, need, objv, nobj); @@ -1462,8 +1494,6 @@ major_collection(Process* p, ErlHeapFragment *live_hf_end, #ifdef HARDDEBUG disallow_heap_frag_ref_in_heap(p); #endif - remove_message_buffers(p); - ErtsGcQuickSanityCheck(p); return gc_cost(size_after, adjusted ? size_after : 0); @@ -1991,6 +2021,173 @@ collect_live_heap_frags(Process* p, ErlHeapFragment *live_hf_end, return n_htop; } +static ERTS_INLINE void +copy_one_frag(Eterm** hpp, ErlOffHeap* off_heap, + ErlHeapFragment *bp, Eterm *refs, int nrefs) +{ + Uint sz; + int i; + Sint offs; + struct erl_off_heap_header* oh; + Eterm *fhp, *hp; + + OH_OVERHEAD(off_heap, bp->off_heap.overhead); + sz = bp->used_size; + + fhp = bp->mem; + hp = *hpp; + offs = hp - fhp; + + oh = NULL; + while (sz--) { + Uint cpy_sz; + Eterm val = *fhp++; + + switch (primary_tag(val)) { + case TAG_PRIMARY_IMMED1: + *hp++ = val; + break; + case TAG_PRIMARY_LIST: + case TAG_PRIMARY_BOXED: + *hp++ = offset_ptr(val, offs); + break; + case TAG_PRIMARY_HEADER: + *hp++ = val; + switch (val & _HEADER_SUBTAG_MASK) { + case ARITYVAL_SUBTAG: + break; + case REFC_BINARY_SUBTAG: + case FUN_SUBTAG: + case EXTERNAL_PID_SUBTAG: + case EXTERNAL_PORT_SUBTAG: + case EXTERNAL_REF_SUBTAG: + oh = (struct erl_off_heap_header*) (hp-1); + cpy_sz = thing_arityval(val); + goto cpy_words; + default: + cpy_sz = header_arity(val); + + cpy_words: + ASSERT(sz >= cpy_sz); + sz -= cpy_sz; + while (cpy_sz >= 8) { + cpy_sz -= 8; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + } + switch (cpy_sz) { + case 7: *hp++ = *fhp++; + case 6: *hp++ = *fhp++; + case 5: *hp++ = *fhp++; + case 4: *hp++ = *fhp++; + case 3: *hp++ = *fhp++; + case 2: *hp++ = *fhp++; + case 1: *hp++ = *fhp++; + default: break; + } + if (oh) { + /* Add to offheap list */ + oh->next = off_heap->first; + off_heap->first = oh; + ASSERT(*hpp <= (Eterm*)oh); + ASSERT(hp > (Eterm*)oh); + oh = NULL; + } + break; + } + break; + } + } + + ASSERT(bp->used_size == hp - *hpp); + *hpp = hp; + + for (i = 0; i < nrefs; i++) { + if (is_not_immed(refs[i])) + refs[i] = offset_ptr(refs[i], offs); + } + bp->off_heap.first = NULL; +} + +static void +move_msgq_to_heap(Process *p) +{ + ErtsMessage **mpp = &p->msg.first; + + while (*mpp) { + ErtsMessage *mp = *mpp; + + if (mp->data.attached) { + ErlHeapFragment *bp; + ErtsHeapFactory factory; + + erts_factory_proc_prealloc_init(&factory, p, + erts_msg_attached_data_size(mp)); + + if (is_non_value(ERL_MESSAGE_TERM(mp))) { + if (mp->data.dist_ext) { + ASSERT(mp->data.dist_ext->heap_size >= 0); + if (is_not_nil(ERL_MESSAGE_TOKEN(mp))) { + bp = erts_dist_ext_trailer(mp->data.dist_ext); + ERL_MESSAGE_TOKEN(mp) = copy_struct(ERL_MESSAGE_TOKEN(mp), + bp->used_size, + &factory.hp, + factory.off_heap); + erts_cleanup_offheap(&bp->off_heap); + } + ERL_MESSAGE_TERM(mp) = erts_decode_dist_ext(&factory, + mp->data.dist_ext); + erts_free_dist_ext_copy(mp->data.dist_ext); + mp->data.dist_ext = NULL; + } + } + else { + + if (mp->data.attached == ERTS_MSG_COMBINED_HFRAG) + bp = &mp->hfrag; + else + bp = mp->data.heap_frag; + + if (bp->next) + erts_move_multi_frags(&factory.hp, factory.off_heap, bp, + mp->m, ERL_MESSAGE_REF_ARRAY_SZ, 0); + else + copy_one_frag(&factory.hp, factory.off_heap, bp, + mp->m, ERL_MESSAGE_REF_ARRAY_SZ); + + if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) { + mp->data.heap_frag = NULL; + free_message_buffer(bp); + } + else { + ErtsMessage *tmp = erts_alloc_message(0, NULL); + sys_memcpy((void *) tmp->m, (void *) mp->m, + sizeof(Eterm)*ERL_MESSAGE_REF_ARRAY_SZ); + tmp->next = mp->next; + if (p->msg.save == &mp->next) + p->msg.save = &tmp->next; + if (p->msg.last == &mp->next) + p->msg.last = &tmp->next; + *mpp = tmp; + mp->next = NULL; + erts_cleanup_messages(mp); + mp = tmp; + } + } + + erts_factory_close(&factory); + } + + mpp = &(*mpp)->next; + } +} + static Uint setup_rootset(Process *p, Eterm *objv, int nobj, Rootset *rootset) { @@ -2080,9 +2277,8 @@ setup_rootset(Process *p, Eterm *objv, int nobj, Rootset *rootset) case F_OFF_HEAP_MSGQ_CHNG: case 0: { /* - * Off heap message queue disabled, i.e. we may - * have references from the message queue to the - * heap... + * We do not have off heap message queue enabled, i.e. we + * need to add message queue to rootset... */ ErtsMessage *mp; diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index 296cfdabc3..99d8a2a987 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -630,7 +630,8 @@ void erts_usage(void) erts_fprintf(stderr, "-W<i|w|e> set error logger warnings mapping,\n"); erts_fprintf(stderr, " see error_logger documentation for details\n"); - erts_fprintf(stderr, "-xohmq bool set default off_heap_message_queue flag for processes\n"); + erts_fprintf(stderr, "-xmqd val set default message queue data flag for processes,\n"); + erts_fprintf(stderr, " valid values are: off_heap | on_heap | mixed\n"); erts_fprintf(stderr, "-zdbbl size set the distribution buffer busy limit in kilobytes\n"); erts_fprintf(stderr, " valid range is [1-%d]\n", INT_MAX/1024); erts_fprintf(stderr, "-zdntgc time set delayed node table gc in seconds\n"); @@ -2020,15 +2021,21 @@ erl_start(int argc, char **argv) case 'x': { char *sub_param = argv[i]+2; - if (has_prefix("ohmq", sub_param)) { - arg = get_arg(sub_param+4, argv[i+1], &i); - if (sys_strcmp(arg, "true") == 0) - erts_default_spo_flags |= SPO_OFF_HEAP_MSGQ; - else if (sys_strcmp(arg, "false") == 0) + if (has_prefix("mqd", sub_param)) { + arg = get_arg(sub_param+3, argv[i+1], &i); + if (sys_strcmp(arg, "mixed") == 0) + erts_default_spo_flags &= ~(SPO_ON_HEAP_MSGQ|SPO_OFF_HEAP_MSGQ); + else if (sys_strcmp(arg, "on_heap") == 0) { erts_default_spo_flags &= ~SPO_OFF_HEAP_MSGQ; + erts_default_spo_flags |= SPO_ON_HEAP_MSGQ; + } + else if (sys_strcmp(arg, "off_heap") == 0) { + erts_default_spo_flags &= ~SPO_ON_HEAP_MSGQ; + erts_default_spo_flags |= SPO_OFF_HEAP_MSGQ; + } else { erts_fprintf(stderr, - "Invalid off_heap_message_queue flag: %s\n", arg); + "Invalid message_queue_data flag: %s\n", arg); erts_usage(); } } else { diff --git a/erts/emulator/beam/erl_map.c b/erts/emulator/beam/erl_map.c index 29b3024644..d0ffb11e79 100644 --- a/erts/emulator/beam/erl_map.c +++ b/erts/emulator/beam/erl_map.c @@ -2698,32 +2698,88 @@ BIF_RETTYPE erts_internal_map_to_tuple_keys_1(BIF_ALIST_1) { } /* - * erts_internal:map_type/1 + * erts_internal:term_type/1 * * Used in erts_debug:size/1 */ -BIF_RETTYPE erts_internal_map_type_1(BIF_ALIST_1) { - DECL_AM(hashmap); - DECL_AM(hashmap_node); - DECL_AM(flatmap); - if (is_map(BIF_ARG_1)) { - Eterm hdr = *(boxed_val(BIF_ARG_1)); - ASSERT(is_header(hdr)); - switch (hdr & _HEADER_MAP_SUBTAG_MASK) { - case HAMT_SUBTAG_HEAD_FLATMAP: - BIF_RET(AM_flatmap); - case HAMT_SUBTAG_HEAD_ARRAY: - case HAMT_SUBTAG_HEAD_BITMAP: - BIF_RET(AM_hashmap); - case HAMT_SUBTAG_NODE_BITMAP: - BIF_RET(AM_hashmap_node); - default: - erl_exit(1, "bad header"); +BIF_RETTYPE erts_internal_term_type_1(BIF_ALIST_1) { + Eterm obj = BIF_ARG_1; + switch (primary_tag(obj)) { + case TAG_PRIMARY_LIST: + BIF_RET(ERTS_MAKE_AM("list")); + case TAG_PRIMARY_BOXED: { + Eterm hdr = *boxed_val(obj); + ASSERT(is_header(hdr)); + switch (hdr & _TAG_HEADER_MASK) { + case ARITYVAL_SUBTAG: + BIF_RET(ERTS_MAKE_AM("tuple")); + case EXPORT_SUBTAG: + BIF_RET(ERTS_MAKE_AM("export")); + case FUN_SUBTAG: + BIF_RET(ERTS_MAKE_AM("fun")); + case MAP_SUBTAG: + switch (MAP_HEADER_TYPE(hdr)) { + case MAP_HEADER_TAG_FLATMAP_HEAD : + BIF_RET(ERTS_MAKE_AM("flatmap")); + case MAP_HEADER_TAG_HAMT_HEAD_BITMAP : + case MAP_HEADER_TAG_HAMT_HEAD_ARRAY : + BIF_RET(ERTS_MAKE_AM("hashmap")); + case MAP_HEADER_TAG_HAMT_NODE_BITMAP : + BIF_RET(ERTS_MAKE_AM("hashmap_node")); + default: + erl_exit(ERTS_ABORT_EXIT, "term_type: bad map header type %d\n", MAP_HEADER_TYPE(hdr)); + } + case REFC_BINARY_SUBTAG: + BIF_RET(ERTS_MAKE_AM("refc_binary")); + case HEAP_BINARY_SUBTAG: + BIF_RET(ERTS_MAKE_AM("heap_binary")); + case SUB_BINARY_SUBTAG: + BIF_RET(ERTS_MAKE_AM("sub_binary")); + case BIN_MATCHSTATE_SUBTAG: + BIF_RET(ERTS_MAKE_AM("matchstate")); + case POS_BIG_SUBTAG: + case NEG_BIG_SUBTAG: + BIF_RET(ERTS_MAKE_AM("bignum")); + case REF_SUBTAG: + BIF_RET(ERTS_MAKE_AM("reference")); + case EXTERNAL_REF_SUBTAG: + BIF_RET(ERTS_MAKE_AM("external_reference")); + case EXTERNAL_PID_SUBTAG: + BIF_RET(ERTS_MAKE_AM("external_pid")); + case EXTERNAL_PORT_SUBTAG: + BIF_RET(ERTS_MAKE_AM("external_port")); + case FLOAT_SUBTAG: + BIF_RET(ERTS_MAKE_AM("hfloat")); + default: + erl_exit(ERTS_ABORT_EXIT, "term_type: Invalid tag (0x%X)\n", hdr); + } } + case TAG_PRIMARY_IMMED1: + switch (obj & _TAG_IMMED1_MASK) { + case _TAG_IMMED1_SMALL: + BIF_RET(ERTS_MAKE_AM("fixnum")); + case _TAG_IMMED1_PID: + BIF_RET(ERTS_MAKE_AM("pid")); + case _TAG_IMMED1_PORT: + BIF_RET(ERTS_MAKE_AM("port")); + case _TAG_IMMED1_IMMED2: + switch (obj & _TAG_IMMED2_MASK) { + case _TAG_IMMED2_ATOM: + BIF_RET(ERTS_MAKE_AM("atom")); + case _TAG_IMMED2_CATCH: + BIF_RET(ERTS_MAKE_AM("catch")); + case _TAG_IMMED2_NIL: + BIF_RET(ERTS_MAKE_AM("nil")); + default: + erl_exit(ERTS_ABORT_EXIT, "term_type: Invalid tag (0x%X)\n", obj); + } + default: + erl_exit(ERTS_ABORT_EXIT, "term_type: Invalid tag (0x%X)\n", obj); + } + default: + erl_exit(ERTS_ABORT_EXIT, "term_type: Invalid tag (0x%X)\n", obj); } - BIF_P->fvalue = BIF_ARG_1; - BIF_ERROR(BIF_P, BADMAP); } /* diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 3f515f5181..ac5ec79fe4 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -630,9 +630,24 @@ erts_try_alloc_message_on_heap(Process *pp, #endif else { in_message_fragment: - - mp = erts_alloc_message(sz, hpp); - *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; + if (!((*psp) & ERTS_PSFLG_ON_HEAP_MSGQ)) { + mp = erts_alloc_message(sz, hpp); + *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; + } + else { + mp = erts_alloc_message(0, NULL); + if (!sz) { + *hpp = NULL; + *ohpp = NULL; + } + else { + ErlHeapFragment *bp; + bp = new_message_buffer(sz); + *hpp = &bp->mem[0]; + mp->data.heap_frag = bp; + *ohpp = &bp->off_heap; + } + } *on_heap_p = 0; } @@ -1022,12 +1037,12 @@ erts_complete_off_heap_message_queue_change(Process *c_p) ASSERT(erts_smp_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ); /* - * This job was first initiated when the process changed - * "off heap message queue" state from false to true. Since - * then ERTS_PSFLG_OFF_HEAP_MSGQ has been set. However, the - * state change might have been changed again (multiple times) - * since then. Check users last requested state (the flag - * F_OFF_HEAP_MSGQ), and make the state consistent with that. + * This job was first initiated when the process changed to off heap + * message queue management. Since then ERTS_PSFLG_OFF_HEAP_MSGQ + * has been set. However, the management state might have been changed + * again (multiple times) since then. Check users last requested state + * (the flags F_OFF_HEAP_MSGQ, and F_ON_HEAP_MSGQ), and make the state + * consistent with that. */ if (!(c_p->flags & F_OFF_HEAP_MSGQ)) @@ -1068,8 +1083,9 @@ change_off_heap_msgq(void *vcohmq) } Eterm -erts_change_off_heap_message_queue_state(Process *c_p, int enable) +erts_change_message_queue_management(Process *c_p, Eterm new_state) { + Eterm res; #ifdef DEBUG if (c_p->flags & F_OFF_HEAP_MSGQ) { @@ -1088,57 +1104,117 @@ erts_change_off_heap_message_queue_state(Process *c_p, int enable) } #endif - if (c_p->flags & F_OFF_HEAP_MSGQ) { - /* Off heap message queue is enabled */ + switch (c_p->flags & (F_OFF_HEAP_MSGQ|F_ON_HEAP_MSGQ)) { - if (!enable) { + case F_OFF_HEAP_MSGQ: + res = am_off_heap; + + switch (new_state) { + case am_off_heap: + break; + case am_on_heap: + c_p->flags |= F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_ON_HEAP_MSGQ); + /* fall through */ + case am_mixed: c_p->flags &= ~F_OFF_HEAP_MSGQ; /* * We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ - * if a change is ongoing. It will be adjusted when the - * change completes... + * if a off heap change is ongoing. It will be adjusted + * when the change completes... */ if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { /* Safe to clear ERTS_PSFLG_OFF_HEAP_MSGQ... */ erts_smp_atomic32_read_band_nob(&c_p->state, ~ERTS_PSFLG_OFF_HEAP_MSGQ); } + break; + default: + res = THE_NON_VALUE; /* badarg */ + break; } + break; + + case F_ON_HEAP_MSGQ: + res = am_on_heap; - return am_true; /* Old state */ + switch (new_state) { + case am_on_heap: + break; + case am_mixed: + c_p->flags &= ~F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_ON_HEAP_MSGQ); + break; + case am_off_heap: + c_p->flags &= ~F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_ON_HEAP_MSGQ); + goto change_to_off_heap; + default: + res = THE_NON_VALUE; /* badarg */ + break; + } + break; + + case 0: + res = am_mixed; + + switch (new_state) { + case am_mixed: + break; + case am_on_heap: + c_p->flags |= F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_ON_HEAP_MSGQ); + break; + case am_off_heap: + goto change_to_off_heap; + default: + res = THE_NON_VALUE; /* badarg */ + break; + } + break; + + default: + res = am_error; + ERTS_INTERNAL_ERROR("Inconsistent message queue management state"); + break; } - /* Off heap message queue is disabled */ + return res; + +change_to_off_heap: - if (enable) { - c_p->flags |= F_OFF_HEAP_MSGQ; + c_p->flags |= F_OFF_HEAP_MSGQ; + + /* + * We do not have to schedule a change if + * we have an ongoing off heap change... + */ + if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { + ErtsChangeOffHeapMessageQueue *cohmq; /* - * We do not have to schedule a change if - * we have an ongoing change... + * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait + * thread progress before completing the change in + * order to ensure that all senders observe that + * messages should be passed off heap. When the + * change has completed, GC does not need to inspect + * the message queue at all. */ - if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { - ErtsChangeOffHeapMessageQueue *cohmq; - /* - * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait - * thread progress before completing the change in - * order to ensure that all senders observe that - * messages should be passed off heap. When the - * change has completed, GC does not need to inspect - * the message queue at all. - */ - erts_smp_atomic32_read_bor_nob(&c_p->state, - ERTS_PSFLG_OFF_HEAP_MSGQ); - c_p->flags |= F_OFF_HEAP_MSGQ_CHNG; - cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG, - sizeof(ErtsChangeOffHeapMessageQueue)); - cohmq->pid = c_p->common.id; - erts_schedule_thr_prgr_later_op(change_off_heap_msgq, - (void *) cohmq, - &cohmq->lop); - } + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_OFF_HEAP_MSGQ); + c_p->flags |= F_OFF_HEAP_MSGQ_CHNG; + cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG, + sizeof(ErtsChangeOffHeapMessageQueue)); + cohmq->pid = c_p->common.id; + erts_schedule_thr_prgr_later_op(change_off_heap_msgq, + (void *) cohmq, + &cohmq->lop); } - return am_false; /* Old state */ + return res; } int diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 19528e6b4b..60035d15ae 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -286,7 +286,7 @@ void erts_cleanup_offheap(ErlOffHeap *offheap); void erts_save_message_in_proc(Process *p, ErtsMessage *msg); Sint erts_move_messages_off_heap(Process *c_p); Sint erts_complete_off_heap_message_queue_change(Process *c_p); -Eterm erts_change_off_heap_message_queue_state(Process *c_p, int enable); +Eterm erts_change_message_queue_management(Process *c_p, Eterm new_state); int erts_decode_dist_message(Process *, ErtsProcLocks, ErtsMessage *, int); diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index a714068314..6c132a7e3d 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -335,7 +335,6 @@ static ErtsAlignedSchedulerSleepInfo *aligned_dirty_io_sched_sleep_info; static Uint last_reductions; static Uint last_exact_reductions; -Uint erts_default_process_flags; Eterm erts_system_monitor; Eterm erts_system_monitor_long_gc; Uint erts_system_monitor_long_schedule; @@ -677,7 +676,6 @@ erts_init_process(int ncpu, int proc_tab_size, int legacy_proc_tab) last_reductions = 0; last_exact_reductions = 0; - erts_default_process_flags = 0; } void @@ -9259,6 +9257,8 @@ Process *schedule(Process *p, int calls) } else { sched_out_proc: + ASSERT(!(p->flags & F_DELAY_GC)); + #ifdef ERTS_SMP ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); esdp = p->scheduler_data; @@ -10732,7 +10732,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). Eterm args, /* Arguments for function (must be well-formed list). */ ErlSpawnOpts* so) /* Options for spawn. */ { - Uint flags = erts_default_process_flags; + Uint flags = 0; ErtsRunQueue *rq = NULL; Process *p; Sint arity; /* Number of arguments. */ @@ -10778,6 +10778,10 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). state |= ERTS_PSFLG_OFF_HEAP_MSGQ; flags |= F_OFF_HEAP_MSGQ; } + else if (so->flags & SPO_ON_HEAP_MSGQ) { + state |= ERTS_PSFLG_ON_HEAP_MSGQ; + flags |= F_ON_HEAP_MSGQ; + } if (!rq) rq = erts_get_runq_proc(parent); @@ -11267,6 +11271,7 @@ erts_cleanup_empty_process(Process* p) static void delete_process(Process* p) { + Eterm *heap; VERBOSE(DEBUG_PROCESSES, ("Removing process: %T\n",p->common.id)); VERBOSE(DEBUG_SHCOPY, ("[pid=%T] delete process: %p %p %p %p\n", p->common.id, HEAP_START(p), HEAP_END(p), OLD_HEAP(p), OLD_HEND(p))); @@ -11293,16 +11298,17 @@ delete_process(Process* p) * Release heaps. Clobber contents in DEBUG build. */ - -#ifdef DEBUG - sys_memset(p->heap, DEBUG_BAD_BYTE, p->heap_sz*sizeof(Eterm)); -#endif - #ifdef HIPE hipe_delete_process(&p->hipe); #endif - ERTS_HEAP_FREE(ERTS_ALC_T_HEAP, (void*) p->heap, p->heap_sz*sizeof(Eterm)); + heap = p->abandoned_heap ? p->abandoned_heap : p->heap; + +#ifdef DEBUG + sys_memset(heap, DEBUG_BAD_BYTE, p->heap_sz*sizeof(Eterm)); +#endif + + ERTS_HEAP_FREE(ERTS_ALC_T_HEAP, (void*) heap, p->heap_sz*sizeof(Eterm)); if (p->old_heap != NULL) { #ifdef DEBUG @@ -11321,6 +11327,9 @@ delete_process(Process* p) free_message_buffer(p->mbuf); } + if (p->msg_frag) + erts_cleanup_messages(p->msg_frag); + erts_erase_dicts(p); /* free all pending messages */ diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index a72c5b8ad4..71065d875a 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -1141,14 +1141,15 @@ void erts_check_for_holes(Process* p); #define ERTS_PSFLG_PROXY ERTS_PSFLG_BIT(16) #define ERTS_PSFLG_DELAYED_SYS ERTS_PSFLG_BIT(17) #define ERTS_PSFLG_OFF_HEAP_MSGQ ERTS_PSFLG_BIT(18) +#define ERTS_PSFLG_ON_HEAP_MSGQ ERTS_PSFLG_BIT(19) #ifdef ERTS_DIRTY_SCHEDULERS -#define ERTS_PSFLG_DIRTY_CPU_PROC ERTS_PSFLG_BIT(19) -#define ERTS_PSFLG_DIRTY_IO_PROC ERTS_PSFLG_BIT(20) -#define ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q ERTS_PSFLG_BIT(21) -#define ERTS_PSFLG_DIRTY_IO_PROC_IN_Q ERTS_PSFLG_BIT(22) -#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 23) +#define ERTS_PSFLG_DIRTY_CPU_PROC ERTS_PSFLG_BIT(20) +#define ERTS_PSFLG_DIRTY_IO_PROC ERTS_PSFLG_BIT(21) +#define ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q ERTS_PSFLG_BIT(22) +#define ERTS_PSFLG_DIRTY_IO_PROC_IN_Q ERTS_PSFLG_BIT(23) +#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 24) #else -#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 19) +#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 20) #endif #define ERTS_PSFLGS_IN_PRQ_MASK (ERTS_PSFLG_IN_PRQ_MAX \ @@ -1197,6 +1198,7 @@ void erts_check_for_holes(Process* p); #define SPO_MONITOR 4 #define SPO_SYSTEM_PROC 8 #define SPO_OFF_HEAP_MSGQ 16 +#define SPO_ON_HEAP_MSGQ 32 extern int erts_default_spo_flags; @@ -1244,7 +1246,6 @@ Eterm* erts_heap_alloc(Process* p, Uint need, Uint xtra); Eterm* erts_set_hole_marker(Eterm* ptr, Uint sz); #endif -extern Uint erts_default_process_flags; extern erts_smp_rwmtx_t erts_cpu_bind_rwmtx; /* If any of the erts_system_monitor_* variables are set (enabled), ** erts_system_monitor must be != NIL, to allow testing on just @@ -1285,10 +1286,23 @@ extern struct erts_system_profile_flags_t erts_system_profile_flags; #define F_HAVE_BLCKD_MSCHED (1 << 8) /* Process has blocked multi-scheduling */ #define F_P2PNR_RESCHED (1 << 9) /* Process has been rescheduled via erts_pid2proc_not_running() */ #define F_FORCE_GC (1 << 10) /* Force gc at process in-scheduling */ -#define F_DISABLE_GC (1 << 11) /* Disable GC */ +#define F_DISABLE_GC (1 << 11) /* Disable GC (see below) */ #define F_OFF_HEAP_MSGQ (1 << 12) /* Off heap msg queue */ -#define F_OFF_HEAP_MSGQ_CHNG (1 << 13) /* Off heap msg queue changing */ -#define F_ABANDONED_HEAP_USE (1 << 14) /* Have usage of abandoned heap */ +#define F_ON_HEAP_MSGQ (1 << 13) /* Off heap msg queue */ +#define F_OFF_HEAP_MSGQ_CHNG (1 << 14) /* Off heap msg queue changing */ +#define F_ABANDONED_HEAP_USE (1 << 15) /* Have usage of abandoned heap */ +#define F_DELAY_GC (1 << 16) /* Similar to disable GC (see below) */ + +/* + * F_DISABLE_GC and F_DELAY_GC are similar. Both will prevent + * GC of the process, but it is important to use the right + * one: + * - F_DISABLE_GC should *only* be used by BIFs. This when + * the BIF needs to yield while preventig a GC. + * - F_DELAY_GC should only be used when GC is temporarily + * disabled while the process is scheduled. A process must + * not be scheduled out while F_DELAY_GC is set. + */ /* process trace_flags */ #define F_SENSITIVE (1 << 0) diff --git a/erts/emulator/beam/erl_process_dict.c b/erts/emulator/beam/erl_process_dict.c index f82cad745a..da9ebd92ab 100644 --- a/erts/emulator/beam/erl_process_dict.c +++ b/erts/emulator/beam/erl_process_dict.c @@ -53,14 +53,17 @@ /* Hash utility macros */ #define HASH_RANGE(PDict) ((PDict)->homeSize + (PDict)->splitPosition) -#define MAKE_HASH(Term) \ -((is_small(Term)) ? unsigned_val(Term) : \ - ((is_atom(Term)) ? \ - (atom_tab(atom_val(term))->slot.bucket.hvalue) : \ - make_hash2(Term))) +#define MAKE_HASH(Term) \ + ((is_small(Term)) ? unsigned_val(Term) : \ + ((is_atom(Term)) ? \ + (atom_tab(atom_val(Term))->slot.bucket.hvalue) : \ + make_internal_hash(Term))) #define PD_SZ2BYTES(Sz) (sizeof(ProcDict) + ((Sz) - 1)*sizeof(Eterm)) +#define pd_hash_value(Pdict, Key) \ + pd_hash_value_to_ix(Pdict, MAKE_HASH((Key))) + /* Memory allocation macros */ #define PD_ALLOC(Sz) \ erts_alloc(ERTS_ALC_T_PROC_DICT, (Sz)) @@ -82,6 +85,7 @@ */ static void pd_hash_erase(Process *p, Eterm id, Eterm *ret); static void pd_hash_erase_all(Process *p); +static Eterm pd_hash_get_with_hval(Process *p, Eterm bucket, Eterm id); static Eterm pd_hash_get_keys(Process *p, Eterm value); static Eterm pd_hash_get_all_keys(Process *p, ProcDict *pd); static Eterm pd_hash_get_all(Process *p, ProcDict *pd); @@ -93,7 +97,7 @@ static void grow(Process *p); static void array_shrink(ProcDict **ppd, unsigned int need); static Eterm array_put(ProcDict **ppdict, unsigned int ndx, Eterm term); -static unsigned int pd_hash_value(ProcDict *pdict, Eterm term); +static unsigned int pd_hash_value_to_ix(ProcDict *pdict, Uint32 hx); static unsigned int next_array_size(unsigned int need); /* @@ -390,40 +394,55 @@ static void pd_hash_erase_all(Process *p) } } +Eterm erts_pd_hash_get_with_hx(Process *p, Uint32 hx, Eterm id) +{ + unsigned int hval; + ProcDict *pd = p->dictionary; + + if (pd == NULL) + return am_undefined; + hval = pd_hash_value_to_ix(pd, hx); + return pd_hash_get_with_hval(p, ARRAY_GET(pd, hval), id); +} + Eterm erts_pd_hash_get(Process *p, Eterm id) { unsigned int hval; - Eterm tmp; ProcDict *pd = p->dictionary; if (pd == NULL) return am_undefined; hval = pd_hash_value(pd, id); - tmp = ARRAY_GET(pd, hval); - if (is_boxed(tmp)) { /* Tuple */ - ASSERT(is_tuple(tmp)); - if (EQ(tuple_val(tmp)[1], id)) { - return tuple_val(tmp)[2]; + return pd_hash_get_with_hval(p, ARRAY_GET(pd, hval), id); +} + +Eterm pd_hash_get_with_hval(Process *p, Eterm bucket, Eterm id) +{ + if (is_boxed(bucket)) { /* Tuple */ + ASSERT(is_tuple(bucket)); + if (EQ(tuple_val(bucket)[1], id)) { + return tuple_val(bucket)[2]; } - } else if (is_list(tmp)) { - for (; tmp != NIL && !EQ(tuple_val(TCAR(tmp))[1], id); tmp = TCDR(tmp)) { + } else if (is_list(bucket)) { + for (; bucket != NIL && !EQ(tuple_val(TCAR(bucket))[1], id); bucket = TCDR(bucket)) { ; } - if (tmp != NIL) { - return tuple_val(TCAR(tmp))[2]; + if (bucket != NIL) { + return tuple_val(TCAR(bucket))[2]; } - } else if (is_not_nil(tmp)) { + } else if (is_not_nil(bucket)) { #ifdef DEBUG erts_fprintf(stderr, "Process dictionary for process %T is broken, trying to " "display term found in line %d:\n" - "%T\n", p->common.id, __LINE__, tmp); + "%T\n", p->common.id, __LINE__, bucket); #endif erl_exit(1, "Damaged process dictionary found during get/1."); } return am_undefined; } + #define PD_GET_TKEY(Dst,Src) \ do { \ ASSERT(is_tuple((Src))); \ @@ -932,17 +951,16 @@ static Eterm array_put(ProcDict **ppdict, unsigned int ndx, Eterm term) ** Basic utilities */ -static unsigned int pd_hash_value(ProcDict *pdict, Eterm term) +static unsigned int pd_hash_value_to_ix(ProcDict *pdict, Uint32 hx) { - Uint hash, high; - - hash = MAKE_HASH(term); - high = hash % (pdict->homeSize*2); + Uint high; + high = hx % (pdict->homeSize*2); if (high >= HASH_RANGE(pdict)) - return hash % pdict->homeSize; + return hx % pdict->homeSize; return high; } + static unsigned int next_array_size(unsigned int need) { static unsigned int tab[] = diff --git a/erts/emulator/beam/erl_process_dict.h b/erts/emulator/beam/erl_process_dict.h index cc53800eb5..9aa21b7c38 100644 --- a/erts/emulator/beam/erl_process_dict.h +++ b/erts/emulator/beam/erl_process_dict.h @@ -39,5 +39,6 @@ void erts_deep_dictionary_dump(int to, void *to_arg, Eterm erts_dictionary_copy(struct process *p, ProcDict *pd); Eterm erts_pd_hash_get(struct process *p, Eterm id); +Eterm erts_pd_hash_get_with_hx(Process *p, Uint32 hx, Eterm id); #endif diff --git a/erts/emulator/beam/ops.tab b/erts/emulator/beam/ops.tab index 46fefb88af..081c4108a0 100644 --- a/erts/emulator/beam/ops.tab +++ b/erts/emulator/beam/ops.tab @@ -1036,7 +1036,7 @@ call_bif e bif0 u$bif:erlang:self/0 Dst=d => self Dst bif0 u$bif:erlang:node/0 Dst=d => node Dst -bif1 Fail Bif=u$bif:erlang:get/1 Src=s Dst=d => i_get Src Dst +bif1 Fail Bif=u$bif:erlang:get/1 Src=s Dst=d => gen_get(Src, Dst) bif2 Jump=j u$bif:erlang:element/2 S1=s S2=xy Dst=d => gen_element(Jump, S1, S2, Dst) @@ -1045,6 +1045,7 @@ bif1 p Bif S1 Dst => bif1_body Bif S1 Dst bif2 p Bif S1 S2 Dst => i_bif2_body Bif S1 S2 Dst bif2 Fail Bif S1 S2 Dst => i_bif2 Fail Bif S1 S2 Dst +i_get_hash c I d i_get s d %macro: self Self diff --git a/erts/emulator/hipe/hipe_native_bif.c b/erts/emulator/hipe/hipe_native_bif.c index ad8fb685e5..1bfee94e9e 100644 --- a/erts/emulator/hipe/hipe_native_bif.c +++ b/erts/emulator/hipe/hipe_native_bif.c @@ -164,7 +164,7 @@ void hipe_select_msg(Process *p) JOIN_MESSAGE(p); CANCEL_TIMER(p); /* calls erts_cancel_proc_timer() */ erts_save_message_in_proc(p, msgp); - p->flags &= ~F_DISABLE_GC; + p->flags &= ~F_DELAY_GC; if (ERTS_IS_GC_DESIRED(p)) { /* * We want to GC soon but we leave a few @@ -519,7 +519,7 @@ Eterm hipe_check_get_msg(Process *c_p) { ErtsMessage *msgp; - c_p->flags |= F_DISABLE_GC; + c_p->flags |= F_DELAY_GC; next_message: @@ -541,7 +541,7 @@ Eterm hipe_check_get_msg(Process *c_p) /* XXX: BEAM doesn't need this */ c_p->hipe_smp.have_receive_locks = 1; #endif - c_p->flags &= ~F_DISABLE_GC; + c_p->flags &= ~F_DELAY_GC; return THE_NON_VALUE; #ifdef ERTS_SMP } diff --git a/erts/emulator/test/Makefile b/erts/emulator/test/Makefile index 77614d455c..8cc47937b7 100644 --- a/erts/emulator/test/Makefile +++ b/erts/emulator/test/Makefile @@ -79,6 +79,7 @@ MODULES= \ node_container_SUITE \ nofrag_SUITE \ num_bif_SUITE \ + message_queue_data_SUITE \ op_SUITE \ port_SUITE \ port_bif_SUITE \ diff --git a/erts/emulator/test/erts_debug_SUITE.erl b/erts/emulator/test/erts_debug_SUITE.erl index 35677f9953..bbba829501 100644 --- a/erts/emulator/test/erts_debug_SUITE.erl +++ b/erts/emulator/test/erts_debug_SUITE.erl @@ -24,13 +24,13 @@ -export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1, init_per_group/2,end_per_group/2, init_per_testcase/2,end_per_testcase/2, - test_size/1,flat_size_big/1,df/1, + test_size/1,flat_size_big/1,df/1,term_type/1, instructions/1]). suite() -> [{ct_hooks,[ts_install_cth]}]. all() -> - [test_size, flat_size_big, df, instructions]. + [test_size, flat_size_big, df, instructions, term_type]. groups() -> []. @@ -138,6 +138,47 @@ flat_size_big_1(Term, Size0, Limit) when Size0 < Limit -> end; flat_size_big_1(_, _, _) -> ok. + +term_type(Config) when is_list(Config) -> + Ts = [{fixnum, 1}, + {fixnum, -1}, + {bignum, 1 bsl 300}, + {bignum, -(1 bsl 300)}, + {hfloat, 0.0}, + {hfloat, 0.0/-1}, + {hfloat, 1.0/(1 bsl 302)}, + {hfloat, 1.0*(1 bsl 302)}, + {hfloat, -1.0/(1 bsl 302)}, + {hfloat, -1.0*(1 bsl 302)}, + {hfloat, 3.1416}, + {hfloat, 1.0e18}, + {hfloat, -3.1416}, + {hfloat, -1.0e18}, + + {heap_binary, <<1,2,3>>}, + {refc_binary, <<0:(8*80)>>}, + {sub_binary, <<5:7>>}, + + {flatmap, #{ a => 1}}, + {hashmap, maps:from_list([{I,I}||I <- lists:seq(1,76)])}, + + {list, [1,2,3]}, + {nil, []}, + {tuple, {1,2,3}}, + {tuple, {}}, + + {export, fun lists:sort/1}, + {'fun', fun() -> ok end}, + {pid, self()}, + {atom, atom}], + lists:foreach(fun({E,Val}) -> + R = erts_internal:term_type(Val), + io:format("expecting term type ~w, got ~w (~p)~n", [E,R,Val]), + E = R + end, Ts), + ok. + + df(Config) when is_list(Config) -> P0 = pps(), PrivDir = ?config(priv_dir, Config), diff --git a/erts/emulator/test/map_SUITE.erl b/erts/emulator/test/map_SUITE.erl index a256cf4195..1d0dc9e9ae 100644 --- a/erts/emulator/test/map_SUITE.erl +++ b/erts/emulator/test/map_SUITE.erl @@ -2598,7 +2598,7 @@ hashmap_balance(KeyFun) -> F = fun(I, {M0,Max0}) -> Key = KeyFun(I), M1 = M0#{Key => Key}, - Max1 = case erts_internal:map_type(M1) of + Max1 = case erts_internal:term_type(M1) of hashmap -> Nodes = hashmap_nodes(M1), Avg = maps:size(M1) * 0.4, @@ -3006,7 +3006,7 @@ t_gc_rare_map_overflow(Config) -> Loop() end), FatMap = fatmap(34), - false = (flatmap =:= erts_internal:map_type(FatMap)), + false = (flatmap =:= erts_internal:term_type(FatMap)), t_gc_rare_map_overflow_do(Echo, FatMap, fun() -> erlang:garbage_collect() end), diff --git a/erts/emulator/test/message_queue_data_SUITE.erl b/erts/emulator/test/message_queue_data_SUITE.erl new file mode 100644 index 0000000000..11481409aa --- /dev/null +++ b/erts/emulator/test/message_queue_data_SUITE.erl @@ -0,0 +1,239 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2014. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(message_queue_data_SUITE). + +-export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1, + init_per_group/2,end_per_group/2, + init_per_testcase/2,end_per_testcase/2]). +-export([basic/1, process_info_messages/1]). + +-export([basic_test/1]). + +-include_lib("test_server/include/test_server.hrl"). + +init_per_testcase(Case, Config) -> + ?line Dog=test_server:timetrap(test_server:minutes(2)), + [{watchdog, Dog}, {testcase, Case}|Config]. + +end_per_testcase(_, Config) -> + Dog=?config(watchdog, Config), + test_server:timetrap_cancel(Dog), + ok. + +suite() -> [{ct_hooks,[ts_install_cth]}]. + +all() -> + [basic, process_info_messages]. + +groups() -> + []. + +init_per_suite(Config) -> +%% erts_debug:set_internal_state(available_internal_state, true), + Config. + +end_per_suite(_Config) -> +%% erts_debug:set_internal_state(available_internal_state, false), + ok. + +init_per_group(_GroupName, Config) -> + Config. + +end_per_group(_GroupName, Config) -> + Config. + +%% +%% +%% Test cases +%% +%% + +basic(Config) when is_list(Config) -> + + basic_test(erlang:system_info(message_queue_data)), + + {ok, Node1} = start_node(Config, "+xmqd off_heap"), + ok = rpc:call(Node1, ?MODULE, basic_test, [off_heap]), + stop_node(Node1), + + {ok, Node2} = start_node(Config, "+xmqd on_heap"), + ok = rpc:call(Node2, ?MODULE, basic_test, [on_heap]), + stop_node(Node2), + + {ok, Node3} = start_node(Config, "+xmqd mixed"), + ok = rpc:call(Node3, ?MODULE, basic_test, [mixed]), + stop_node(Node3), + + ok. + +is_valid_mqd_value(off_heap) -> + true; +is_valid_mqd_value(on_heap) -> + true; +is_valid_mqd_value(mixed) -> + true; +is_valid_mqd_value(_) -> + false. + + +basic_test(Default) -> + + Default = erlang:system_info(message_queue_data), + true = is_valid_mqd_value(Default), + + {message_queue_data, Default} = process_info(self(), message_queue_data), + Default = process_flag(message_queue_data, off_heap), + {message_queue_data, off_heap} = process_info(self(), message_queue_data), + off_heap = process_flag(message_queue_data, on_heap), + {message_queue_data, on_heap} = process_info(self(), message_queue_data), + on_heap = process_flag(message_queue_data, mixed), + {message_queue_data, mixed} = process_info(self(), message_queue_data), + mixed = process_flag(message_queue_data, Default), + {'EXIT', _} = (catch process_flag(message_queue_data, blupp)), + + P1 = spawn_opt(fun () -> receive after infinity -> ok end end, + [link]), + {message_queue_data, Default} = process_info(P1, message_queue_data), + unlink(P1), + exit(P1, bye), + + P2 = spawn_opt(fun () -> receive after infinity -> ok end end, + [link, {message_queue_data, off_heap}]), + {message_queue_data, off_heap} = process_info(P2, message_queue_data), + unlink(P2), + exit(P2, bye), + + P3 = spawn_opt(fun () -> receive after infinity -> ok end end, + [link, {message_queue_data, on_heap}]), + {message_queue_data, on_heap} = process_info(P3, message_queue_data), + unlink(P3), + exit(P3, bye), + + P4 = spawn_opt(fun () -> receive after infinity -> ok end end, + [link, {message_queue_data, mixed}]), + {message_queue_data, mixed} = process_info(P4, message_queue_data), + unlink(P4), + exit(P4, bye), + + {'EXIT', _} = (catch spawn_opt(fun () -> receive after infinity -> ok end end, + [link, {message_queue_data, blapp}])), + + ok. + +process_info_messages(Config) when is_list(Config) -> + Tester = self(), + P1 = spawn_opt(fun () -> + receive after 500 -> ok end, + mixed = process_flag(message_queue_data, off_heap), + Tester ! first, + receive after 500 -> ok end, + off_heap = process_flag(message_queue_data, on_heap), + Tester ! second, + receive after 500 -> ok end, + on_heap = process_flag(message_queue_data, mixed), + Tester ! third, + receive after 500 -> ok end, + mixed = process_flag(message_queue_data, off_heap), + Tester ! fourth, + + receive after infinity -> ok end + end, + [link, {message_queue_data, mixed}]), + + P1 ! "A", + receive first -> ok end, + P1 ! "B", + receive second -> ok end, + P1 ! "C", + receive third -> ok end, + P1 ! "D", + receive fourth -> ok end, + P1 ! "E", + + {messages, ["A", "B", "C", "D", "E"]} = process_info(P1, messages), + + P2 = spawn_opt(fun () -> + receive after 500 -> ok end, + mixed = process_flag(message_queue_data, off_heap), + Tester ! first, + receive after 500 -> ok end, + off_heap = process_flag(message_queue_data, on_heap), + Tester ! second, + receive after 500 -> ok end, + on_heap = process_flag(message_queue_data, mixed), + Tester ! third, + receive after 500 -> ok end, + mixed = process_flag(message_queue_data, off_heap), + Tester ! fourth, + receive after 500 -> ok end, + + Tester ! process_info(self(), messages), + + receive M1 -> M1 = "A" end, + receive M2 -> M2 = "B" end, + receive M3 -> M3 = "C" end, + receive M4 -> M4 = "D" end, + receive M5 -> M5 = "E" end, + + Tester ! self() + end, + [link, {message_queue_data, mixed}]), + + P2 ! "A", + receive first -> ok end, + P2 ! "B", + receive second -> ok end, + P2 ! "C", + receive third -> ok end, + P2 ! "D", + receive fourth -> ok end, + P2 ! "E", + + receive + Msg -> + {messages, ["A", "B", "C", "D", "E"]} = Msg + end, + + receive P2 -> ok end, + + ok. + +%% +%% +%% helpers +%% +%% + +start_node(Config) -> + start_node(Config, []). +start_node(Config, Opts) when is_list(Config), is_list(Opts) -> + Pa = filename:dirname(code:which(?MODULE)), + Name = list_to_atom(atom_to_list(?MODULE) + ++ "-" + ++ atom_to_list(?config(testcase, Config)) + ++ "-" + ++ integer_to_list(erlang:system_time(seconds)) + ++ "-" + ++ integer_to_list(erlang:unique_integer([positive]))), + ?t:start_node(Name, slave, [{args, Opts++" -pa "++Pa}]). + +stop_node(Node) -> + ?t:stop_node(Node). diff --git a/erts/etc/common/erlexec.c b/erts/etc/common/erlexec.c index 7b0fe46a01..f21671e837 100644 --- a/erts/etc/common/erlexec.c +++ b/erts/etc/common/erlexec.c @@ -158,7 +158,7 @@ static char *plusr_val_switches[] = { /* +x arguments with values */ static char *plusx_val_switches[] = { - "ohmq", + "mqd", NULL }; diff --git a/erts/preloaded/ebin/erl_prim_loader.beam b/erts/preloaded/ebin/erl_prim_loader.beam Binary files differindex df12c6f8e0..e94a1ba796 100644 --- a/erts/preloaded/ebin/erl_prim_loader.beam +++ b/erts/preloaded/ebin/erl_prim_loader.beam diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam Binary files differindex 4f35928db2..632defdb46 100644 --- a/erts/preloaded/ebin/erlang.beam +++ b/erts/preloaded/ebin/erlang.beam diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam Binary files differindex dc8c711e1a..fd0a502d2c 100644 --- a/erts/preloaded/ebin/erts_internal.beam +++ b/erts/preloaded/ebin/erts_internal.beam diff --git a/erts/preloaded/ebin/init.beam b/erts/preloaded/ebin/init.beam Binary files differindex 73dfb3d351..60c08819eb 100644 --- a/erts/preloaded/ebin/init.beam +++ b/erts/preloaded/ebin/init.beam diff --git a/erts/preloaded/ebin/otp_ring0.beam b/erts/preloaded/ebin/otp_ring0.beam Binary files differindex 33c112f4de..04814c091b 100644 --- a/erts/preloaded/ebin/otp_ring0.beam +++ b/erts/preloaded/ebin/otp_ring0.beam diff --git a/erts/preloaded/ebin/prim_eval.beam b/erts/preloaded/ebin/prim_eval.beam Binary files differindex ebca6e7eea..7779c8374d 100644 --- a/erts/preloaded/ebin/prim_eval.beam +++ b/erts/preloaded/ebin/prim_eval.beam diff --git a/erts/preloaded/ebin/prim_file.beam b/erts/preloaded/ebin/prim_file.beam Binary files differindex e8817d183e..254b0e5b90 100644 --- a/erts/preloaded/ebin/prim_file.beam +++ b/erts/preloaded/ebin/prim_file.beam diff --git a/erts/preloaded/ebin/prim_inet.beam b/erts/preloaded/ebin/prim_inet.beam Binary files differindex 357bcd3d9a..b7cfe26462 100644 --- a/erts/preloaded/ebin/prim_inet.beam +++ b/erts/preloaded/ebin/prim_inet.beam diff --git a/erts/preloaded/ebin/prim_zip.beam b/erts/preloaded/ebin/prim_zip.beam Binary files differindex 969239be98..6b5c6195c8 100644 --- a/erts/preloaded/ebin/prim_zip.beam +++ b/erts/preloaded/ebin/prim_zip.beam diff --git a/erts/preloaded/ebin/zlib.beam b/erts/preloaded/ebin/zlib.beam Binary files differindex 281f668f8c..43d7b436be 100644 --- a/erts/preloaded/ebin/zlib.beam +++ b/erts/preloaded/ebin/zlib.beam diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 0d5176019f..8ebb92d5b2 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -2040,6 +2040,9 @@ open_port(_PortName,_PortSettings) -> -type priority_level() :: low | normal | high | max. +-type message_queue_data() :: + off_heap | on_heap | mixed. + -spec process_flag(trap_exit, Boolean) -> OldBoolean when Boolean :: boolean(), OldBoolean :: boolean(); @@ -2052,9 +2055,9 @@ open_port(_PortName,_PortSettings) -> (min_bin_vheap_size, MinBinVHeapSize) -> OldMinBinVHeapSize when MinBinVHeapSize :: non_neg_integer(), OldMinBinVHeapSize :: non_neg_integer(); - (off_heap_message_queue, OHMQ) -> OldOHMQ when - OHMQ :: boolean(), - OldOHMQ :: boolean(); + (message_queue_data, MQD) -> OldMQD when + MQD :: message_queue_data(), + OldMQD :: message_queue_data(); (priority, Level) -> OldLevel when Level :: priority_level(), OldLevel :: priority_level(); @@ -2093,7 +2096,7 @@ process_flag(_Flag, _Value) -> min_bin_vheap_size | monitored_by | monitors | - off_heap_message_queue | + message_queue_data | priority | reductions | registered_name | @@ -2135,7 +2138,7 @@ process_flag(_Flag, _Value) -> {monitors, Monitors :: [{process, Pid :: pid() | {RegName :: atom(), Node :: node()}}]} | - {off_heap_message_queue, OHMQ :: boolean()} | + {message_queue_data, MQD :: message_queue_data()} | {priority, Level :: priority_level()} | {reductions, Number :: non_neg_integer()} | {registered_name, Atom :: atom()} | @@ -2438,7 +2441,7 @@ tuple_to_list(_Tuple) -> (multi_scheduling) -> disabled | blocked | enabled; (multi_scheduling_blockers) -> [Pid :: pid()]; (nif_version) -> string(); - (off_heap_message_queue) -> boolean(); + (message_queue_data) -> message_queue_data(); (otp_release) -> string(); (os_monotonic_time_source) -> [{atom(),term()}]; (os_system_time_source) -> [{atom(),term()}]; @@ -2574,7 +2577,7 @@ spawn_monitor(M, F, A) -> | {fullsweep_after, Number :: non_neg_integer()} | {min_heap_size, Size :: non_neg_integer()} | {min_bin_vheap_size, VSize :: non_neg_integer()} - | {off_heap_message_queue, OHMQ :: boolean()}. + | {message_queue_data, MQD :: message_queue_data()}. -spec spawn_opt(Fun, Options) -> pid() | {pid(), reference()} when Fun :: function(), diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index 7ed4efea4b..ce0a6a1d9e 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -31,7 +31,7 @@ -export([await_port_send_result/3]). -export([cmp_term/2]). --export([map_to_tuple_keys/1, map_type/1, map_hashmap_children/1]). +-export([map_to_tuple_keys/1, term_type/1, map_hashmap_children/1]). -export([port_command/3, port_connect/2, port_close/1, port_control/3, port_call/3, port_info/1, port_info/2]). @@ -215,12 +215,18 @@ cmp_term(_A,_B) -> map_to_tuple_keys(_M) -> erlang:nif_error(undefined). -%% return the internal map type --spec map_type(M) -> Type when - M :: map(), - Type :: 'flatmap' | 'hashmap' | 'hashmap_node'. - -map_type(_M) -> +%% return the internal term type +-spec term_type(T) -> Type when + T :: term(), + Type :: 'flatmap' | 'hashmap' | 'hashmap_node' + | 'fixnum' | 'bignum' | 'hfloat' + | 'list' | 'tuple' | 'export' | 'fun' + | 'refc_binary' | 'heap_binary' | 'sub_binary' + | 'reference' | 'external_reference' + | 'pid' | 'external_pid' | 'port' | 'external_port' + | 'atom' | 'catch' | 'nil'. + +term_type(_T) -> erlang:nif_error(undefined). %% return the internal hashmap sub-nodes from |