diff options
40 files changed, 2549 insertions, 1981 deletions
diff --git a/erts/doc/src/erl.xml b/erts/doc/src/erl.xml index b0322b7d43..8c1be4dff5 100644 --- a/erts/doc/src/erl.xml +++ b/erts/doc/src/erl.xml @@ -1335,6 +1335,21 @@ <seealso marker="kernel:error_logger#warning_map/0">error_logger(3)</seealso> for further information.</p> </item> + <tag><c><![CDATA[+xFlag Value]]></c></tag> + <item> + <p>Default process flag settings.</p> + <taglist> + <tag><marker id="+xohmq"><c>+xohmq true|false</c></marker></tag> + <item><p> + Sets the default value for the process flag + <c>off_heap_message_queue</c>. If <c>+xohmq</c> is not + passed, <c>false</c> will be the default. For more information, + see the documentation of + <seealso marker="erlang#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue, + OHMQ)</c></seealso>. + </p></item> + </taglist> + </item> <tag><c><![CDATA[+zFlag Value]]></c></tag> <item> <p>Miscellaneous flags.</p> diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml index e77532463e..9426d30390 100644 --- a/erts/doc/src/erlang.xml +++ b/erts/doc/src/erlang.xml @@ -4058,8 +4058,46 @@ os_prompt% </pre> process.</p> <p>Returns the old value of the flag.</p> </desc> </func> + <marker id="process_flag_off_heap_message_queue"/> <func> <name name="process_flag" arity="2" clause_i="5"/> + <fsummary>Set process flag <c>off_heap_message_queue</c> for the calling process</fsummary> + <desc> + <p>This flag determines how messages in the message queue + are stored. When the flag is:</p> + <taglist> + <tag><c>true</c></tag> + <item><p> + <em>All</em> messages in the message queue will be stored + outside of the process heap. This implies that <em>no</em> + messages in the message queue will be part of a garbage + collection of the process. + </p></item> + <tag><c>false</c></tag> + <item><p> + Messages may be placed either on the heap or outside + of the heap. + </p></item> + </taglist> + <p> + If the process potentially may get a hugh amount of messages, + you are recommended to set the flag to <c>true</c>. This since + a garbage collection with lots of messages placed on the heap + may become extremly expensive. Performance of the actual + message passing is however generally better when setting the + flag to <c>false</c>. + </p> + <p> + When changing this flag from <c>false</c> to <c>true</c>, + all messages in the message queue are moved off heap. This + work has been initiated but not completed when this function + call returns. + </p> + <p>Returns the old value of the flag.</p> + </desc> + </func> + <func> + <name name="process_flag" arity="2" clause_i="6"/> <type name="priority_level"/> <fsummary>Set process flag priority for the calling process</fsummary> <desc> @@ -4138,7 +4176,7 @@ os_prompt% </pre> </desc> </func> <func> - <name name="process_flag" arity="2" clause_i="6"/> + <name name="process_flag" arity="2" clause_i="7"/> <fsummary>Set process flag save_calls for the calling process</fsummary> <desc> <p><c><anno>N</anno></c> must be an integer in the interval 0..10000. @@ -4162,7 +4200,7 @@ os_prompt% </pre> </desc> </func> <func> - <name name="process_flag" arity="2" clause_i="7"/> + <name name="process_flag" arity="2" clause_i="8"/> <fsummary>Set process flag sensitive for the calling process</fsummary> <desc> <p>Set or clear the <c>sensitive</c> flag for the current process. @@ -4408,6 +4446,14 @@ os_prompt% </pre> monitor by name, the list item is <c>{process, {<anno>RegName</anno>, <anno>Node</anno>}}</c>.</p> </item> + <tag><c>{off_heap_message_queue, <anno>OHMQ</anno>}</c></tag> + <item> + <p>Returns the current state of the <c>off_heap_message_queue</c> + process flag. <c><anno>OHMQ</anno></c> is either <c>true</c>, or + <c>false</c>. For more information, see the documentation of + <seealso marker="#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue, + OHMQ)</c></seealso>.</p> + </item> <tag><c>{priority, Level}</c></tag> <item> <p><c><anno>Level</anno></c> is the current priority level for @@ -5067,6 +5113,7 @@ true</pre> <func> <name name="spawn_opt" arity="2"/> <type name="priority_level" /> + <type name="spawn_opt_option" /> <fsummary>Create a new process with a fun as entry point</fsummary> <desc> <p>Returns the pid of a new process started by the application @@ -5081,6 +5128,7 @@ true</pre> <func> <name name="spawn_opt" arity="3"/> <type name="priority_level" /> + <type name="spawn_opt_option" /> <fsummary>Create a new process with a fun as entry point on a given node</fsummary> <desc> <p>Returns the pid of a new process started by the application @@ -5093,6 +5141,7 @@ true</pre> <func> <name name="spawn_opt" arity="4"/> <type name="priority_level" /> + <type name="spawn_opt_option" /> <fsummary>Create a new process with a function as entry point</fsummary> <desc> <p>Works exactly like @@ -5188,6 +5237,18 @@ true</pre> fine-tuning an application and to measure the execution time with various <c><anno>VSize</anno></c> values.</p> </item> + <tag><c>{off_heap_message_queue, <anno>OHMQ</anno>}</c></tag> + <item> + <p>Sets the state of the <c>off_heap_message_queue</c> process + flag. <c><anno>OHMQ</anno></c> should be either <c>true</c>, or + <c>false</c>. The default <c>off_heap_message_queue</c> process + flag is determined by the + <seealso marker="erl#+xohmq"><c>+xohmq</c></seealso> <c>erl</c> + command line argument. For more information, see the + documentation of + <seealso marker="#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue, + <anno>OHMQ</anno>)</c></seealso>.</p> + </item> </taglist> </desc> @@ -5195,6 +5256,7 @@ true</pre> <func> <name name="spawn_opt" arity="5"/> <type name="priority_level" /> + <type name="spawn_opt_option" /> <fsummary>Create a new process with a function as entry point on a given node</fsummary> <desc> <p>Returns the pid of a new process started by the application @@ -6224,6 +6286,7 @@ ok <name name="system_info" arity="1" clause_i="65"/> <name name="system_info" arity="1" clause_i="66"/> <name name="system_info" arity="1" clause_i="67"/> + <name name="system_info" arity="1" clause_i="68"/> <fsummary>Information about the system</fsummary> <desc> <p>Returns various information about the current system @@ -6614,6 +6677,16 @@ ok <p>Returns a string containing the erlang NIF version used by the runtime system. It will be on the form "<major ver>.<minor ver>".</p> </item> + <tag><marker id="system_info_off_heap_message_queue"><c>off_heap_message_queue</c></marker></tag> + <item> + <p>Returns the default value of the <c>off_heap_message_queue</c> + process flag which is either <c>true</c> or <c>false</c>. This + default is set by the <c>erl</c> command line argument + <seealso marker="erl#+xohmq"><c>+xohmq</c></seealso>. For more information on the + <c>off_heap_message_queue</c> process flag, see documentation of + <seealso marker="#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue, + OHMQ)</c></seealso>.</p> + </item> <tag><marker id="system_info_otp_release"><c>otp_release</c></marker></tag> <item> <p>Returns a string containing the OTP release number of the diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 6328b3d18f..f142cf1142 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -423,6 +423,7 @@ atom notify atom notsup atom nouse_stdio atom objects +atom off_heap_message_queue atom offset atom ok atom old_heap_block_size diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index 2c275c4649..22b4e26c77 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -723,29 +723,50 @@ set_default_trace_pattern(Eterm module) } } +static ERTS_INLINE int +check_mod_funs(Process *p, ErlOffHeap *off_heap, char *area, size_t area_size) +{ + struct erl_off_heap_header* oh; + for (oh = off_heap->first; oh; oh = oh->next) { + if (thing_subtag(oh->thing_word) == FUN_SUBTAG) { + ErlFunThing* funp = (ErlFunThing*) oh; + if (ErtsInArea(funp->fe->address, area, area_size)) + return !0; + } + } + return 0; +} + + static Eterm check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp) { BeamInstr* start; char* literals; Uint lit_bsize; - BeamInstr* end; + char* mod_start; + Uint mod_size; Eterm* sp; - struct erl_off_heap_header* oh; int done_gc = 0; + int need_gc = 0; + ErtsMessage *msgp; + ErlHeapFragment *hfrag; -#define INSIDE(a) (start <= (a) && (a) < end) +#define ERTS_ORDINARY_GC__ (1 << 0) +#define ERTS_LITERAL_GC__ (1 << 1) /* * Pick up limits for the module. */ start = (BeamInstr*) modp->old.code_hdr; - end = (BeamInstr *)((char *)start + modp->old.code_length); + mod_start = (char *) start; + mod_size = modp->old.code_length; /* * Check if current instruction or continuation pointer points into module. */ - if (INSIDE(rp->i) || INSIDE(rp->cp)) { + if (ErtsInArea(rp->i, mod_start, mod_size) + || ErtsInArea(rp->cp, mod_start, mod_size)) { return am_true; } @@ -753,7 +774,7 @@ check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp) * Check all continuation pointers stored on the stack. */ for (sp = rp->stop; sp < STACK_START(rp); sp++) { - if (is_CP(*sp) && INSIDE(cp_val(*sp))) { + if (is_CP(*sp) && ErtsInArea(cp_val(*sp), mod_start, mod_size)) { return am_true; } } @@ -767,15 +788,15 @@ check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp) struct StackTrace *s; ASSERT(is_list(rp->ftrace)); s = (struct StackTrace *) big_val(CDR(list_val(rp->ftrace))); - if ((s->pc && INSIDE(s->pc)) || - (s->current && INSIDE(s->current))) { + if ((s->pc && ErtsInArea(s->pc, mod_start, mod_size)) || + (s->current && ErtsInArea(s->current, mod_start, mod_size))) { rp->freason = EXC_NULL; rp->fvalue = NIL; rp->ftrace = NIL; } else { int i; for (i = 0; i < s->depth; i++) { - if (INSIDE(s->trace[i])) { + if (ErtsInArea(s->trace[i], mod_start, mod_size)) { rp->freason = EXC_NULL; rp->fvalue = NIL; rp->ftrace = NIL; @@ -796,108 +817,141 @@ check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp) } /* - * See if there are funs that refer to the old version of the module. + * Message queue can contains funs, but (at least currently) no + * constants. If we got references to this module from the message + * queue, a GC cannot remove these... */ - rescan: - for (oh = MSO(rp).first; oh; oh = oh->next) { - if (thing_subtag(oh->thing_word) == FUN_SUBTAG) { - ErlFunThing* funp = (ErlFunThing*) oh; + erts_smp_proc_lock(rp, ERTS_PROC_LOCK_MSGQ); + ERTS_SMP_MSGQ_MV_INQ2PRIVQ(rp); + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MSGQ); - if (INSIDE((BeamInstr *) funp->fe->address)) { - if (done_gc) { - return am_true; - } else { - if (!allow_gc) - return am_aborted; - /* - * Try to get rid of this fun by garbage collecting. - * Clear both fvalue and ftrace to make sure they - * don't hold any funs. - */ - rp->freason = EXC_NULL; - rp->fvalue = NIL; - rp->ftrace = NIL; - done_gc = 1; - FLAGS(rp) |= F_NEED_FULLSWEEP; - *redsp += erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); - goto rescan; - } - } + for (msgp = rp->msg.first; msgp; msgp = msgp->next) { + if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) + hfrag = &msgp->hfrag; + else if (is_value(ERL_MESSAGE_TERM(msgp)) && msgp->data.heap_frag) + hfrag = msgp->data.heap_frag; + else + continue; + for (; hfrag; hfrag = hfrag->next) { + if (check_mod_funs(rp, &hfrag->off_heap, mod_start, mod_size)) + return am_true; + /* Should not contain any constants... */ + ASSERT(!any_heap_ref_ptrs(&hfrag->mem[0], + &hfrag->mem[hfrag->used_size], + mod_start, + mod_size)); } } - /* - * See if there are constants inside the module referenced by the process. - */ - done_gc = 0; literals = (char*) modp->old.code_hdr->literals_start; lit_bsize = (char*) modp->old.code_hdr->literals_end - literals; - for (;;) { - ErlMessage* mp; + while (1) { + + /* Check heap, stack etc... */ + if (check_mod_funs(rp, &rp->off_heap, mod_start, mod_size)) + goto try_gc; if (any_heap_ref_ptrs(&rp->fvalue, &rp->fvalue+1, literals, lit_bsize)) { rp->freason = EXC_NULL; rp->fvalue = NIL; rp->ftrace = NIL; } - if (any_heap_ref_ptrs(rp->stop, rp->hend, literals, lit_bsize)) { - goto need_gc; - } - if (any_heap_refs(rp->heap, rp->htop, literals, lit_bsize)) { - goto need_gc; - } + if (any_heap_ref_ptrs(rp->stop, rp->hend, literals, lit_bsize)) + goto try_literal_gc; + if (any_heap_refs(rp->heap, rp->htop, literals, lit_bsize)) + goto try_literal_gc; + if (any_heap_refs(rp->old_heap, rp->old_htop, literals, lit_bsize)) + goto try_literal_gc; + + /* Check dictionary */ + if (rp->dictionary) { + Eterm* start = rp->dictionary->data; + Eterm* end = start + rp->dictionary->used; - if (any_heap_refs(rp->old_heap, rp->old_htop, literals, lit_bsize)) { - goto need_gc; + if (any_heap_ref_ptrs(start, end, literals, lit_bsize)) + goto try_literal_gc; } - if (rp->dictionary != NULL) { - Eterm* start = rp->dictionary->data; - Eterm* end = start + rp->dictionary->used; + /* Check heap fragments */ + for (hfrag = rp->mbuf; hfrag; hfrag = hfrag->next) { + Eterm *hp, *hp_end; + /* Off heap lists should already have been moved into process */ + ASSERT(!check_mod_funs(rp, &hfrag->off_heap, mod_start, mod_size)); - if (any_heap_ref_ptrs(start, end, literals, lit_bsize)) { - goto need_gc; - } + hp = &hfrag->mem[0]; + hp_end = &hfrag->mem[hfrag->used_size]; + if (any_heap_ref_ptrs(hp, hp_end, mod_start, lit_bsize)) + goto try_literal_gc; } - for (mp = rp->msg.first; mp != NULL; mp = mp->next) { - if (any_heap_ref_ptrs(mp->m, mp->m+2, literals, lit_bsize)) { - goto need_gc; +#ifdef DEBUG + /* + * Message buffer fragments should not have any references + * to constants, and off heap lists should already have + * been moved into process off heap structure. + */ + for (msgp = rp->msg_frag; msgp; msgp = msgp->next) { + if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) + hfrag = &msgp->hfrag; + else + hfrag = msgp->data.heap_frag; + for (; hfrag; hfrag = hfrag->next) { + Eterm *hp, *hp_end; + ASSERT(!check_mod_funs(rp, &hfrag->off_heap, mod_start, mod_size)); + + hp = &hfrag->mem[0]; + hp_end = &hfrag->mem[hfrag->used_size]; + ASSERT(!any_heap_ref_ptrs(hp, hp_end, mod_start, lit_bsize)); } } - break; - need_gc: - if (done_gc) { +#endif + + return am_false; + + try_literal_gc: + need_gc |= ERTS_LITERAL_GC__; + + try_gc: + need_gc |= ERTS_ORDINARY_GC__; + + if ((done_gc & need_gc) == need_gc) return am_true; - } else { - struct erl_off_heap_header* oh; - if (!allow_gc) - return am_aborted; + if (!allow_gc) + return am_aborted; - /* - * Try to get rid of constants by by garbage collecting. - * Clear both fvalue and ftrace. - */ - rp->freason = EXC_NULL; - rp->fvalue = NIL; - rp->ftrace = NIL; - done_gc = 1; + need_gc &= ~done_gc; + + /* + * Try to get rid of constants by by garbage collecting. + * Clear both fvalue and ftrace. + */ + + rp->freason = EXC_NULL; + rp->fvalue = NIL; + rp->ftrace = NIL; + + if (need_gc & ERTS_ORDINARY_GC__) { FLAGS(rp) |= F_NEED_FULLSWEEP; *redsp += erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity); + done_gc |= ERTS_ORDINARY_GC__; + } + if (need_gc & ERTS_LITERAL_GC__) { + struct erl_off_heap_header* oh; oh = modp->old.code_hdr->literals_off_heap; *redsp += lit_bsize / 64; /* Need, better value... */ erts_garbage_collect_literals(rp, (Eterm*)literals, lit_bsize, oh); + done_gc |= ERTS_LITERAL_GC__; } + need_gc = 0; } - return am_false; -#undef INSIDE -} -#define in_area(ptr,start,nbytes) \ - ((UWord)((char*)(ptr) - (char*)(start)) < (nbytes)) +#undef ERTS_ORDINARY_GC__ +#undef ERTS_LITERAL_GC__ + +} static int any_heap_ref_ptrs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size) @@ -910,7 +964,7 @@ any_heap_ref_ptrs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size) switch (primary_tag(val)) { case TAG_PRIMARY_BOXED: case TAG_PRIMARY_LIST: - if (in_area(val, mod_start, mod_size)) { + if (ErtsInArea(val, mod_start, mod_size)) { return 1; } break; @@ -930,7 +984,7 @@ any_heap_refs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size) switch (primary_tag(val)) { case TAG_PRIMARY_BOXED: case TAG_PRIMARY_LIST: - if (in_area(val, mod_start, mod_size)) { + if (ErtsInArea(val, mod_start, mod_size)) { return 1; } break; @@ -940,7 +994,7 @@ any_heap_refs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size) if (header_is_bin_matchstate(val)) { ErlBinMatchState *ms = (ErlBinMatchState*) p; ErlBinMatchBuffer *mb = &(ms->mb); - if (in_area(mb->orig, mod_start, mod_size)) { + if (ErtsInArea(mb->orig, mod_start, mod_size)) { return 1; } } @@ -953,8 +1007,6 @@ any_heap_refs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size) return 0; } -#undef in_area - BIF_RETTYPE purge_module_1(BIF_ALIST_1) { ErtsCodeIndex code_ix; diff --git a/erts/emulator/beam/beam_emu.c b/erts/emulator/beam/beam_emu.c index 4d19f52a52..1dd56ff989 100644 --- a/erts/emulator/beam/beam_emu.c +++ b/erts/emulator/beam/beam_emu.c @@ -1654,10 +1654,6 @@ void process_main(void) ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); ERTS_SMP_REQ_PROC_MAIN_LOCK(c_p); PROCESS_MAIN_CHK_LOCKS(c_p); - if (c_p->mbuf || MSO(c_p).overhead >= BIN_VHEAP_SZ(c_p)) { - result = erts_gc_after_bif_call(c_p, result, reg, 2); - E = c_p->stop; - } HTOP = HEAP_TOP(c_p); FCALLS = c_p->fcalls; if (is_value(result)) { @@ -1745,8 +1741,7 @@ void process_main(void) SWAPIN; } /* only x(2) is included in the rootset here */ - if (E - HTOP < 3 || c_p->mbuf) { /* Force GC in case add_stacktrace() - * created heap fragments */ + if (E - HTOP < 3) { SWAPOUT; PROCESS_MAIN_CHK_LOCKS(c_p); FCALLS -= erts_garbage_collect(c_p, 3, reg+2, 1); @@ -1833,10 +1828,17 @@ void process_main(void) OpCase(i_loop_rec_f): { BeamInstr *next; - ErlMessage* msgp; + ErtsMessage* msgp; - loop_rec__: + /* + * We need to disable GC while matching messages + * in the queue. This since messages with data outside + * the heap will be corrupted by a GC. + */ + ASSERT(!(c_p->flags & F_DISABLE_GC)); + c_p->flags |= F_DISABLE_GC; + loop_rec__: PROCESS_MAIN_CHK_LOCKS(c_p); msgp = PEEK_MESSAGE(c_p); @@ -1848,6 +1850,7 @@ void process_main(void) if (ERTS_PROC_PENDING_EXIT(c_p)) { erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); SWAPOUT; + c_p->flags &= ~F_DISABLE_GC; goto do_schedule; /* Will be rescheduled for exit */ } ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p); @@ -1857,30 +1860,27 @@ void process_main(void) else #endif { + c_p->flags &= ~F_DISABLE_GC; SET_I((BeamInstr *) Arg(0)); Goto(*I); /* Jump to a wait or wait_timeout instruction */ } } - ErtsMoveMsgAttachmentIntoProc(msgp, c_p, E, HTOP, FCALLS, - { - SWAPOUT; - PROCESS_MAIN_CHK_LOCKS(c_p); - }, - { - ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); - PROCESS_MAIN_CHK_LOCKS(c_p); - SWAPIN; - }); if (is_non_value(ERL_MESSAGE_TERM(msgp))) { - /* - * A corrupt distribution message that we weren't able to decode; - * remove it... - */ - ASSERT(!msgp->data.attached); - /* TODO: Add DTrace probe for this bad message situation? */ - UNLINK_MESSAGE(c_p, msgp); - free_message(msgp); - goto loop_rec__; + SWAPOUT; /* erts_decode_dist_message() may write to heap... */ + if (!erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) { + /* + * A corrupt distribution message that we weren't able to decode; + * remove it... + */ + /* No swapin should be needed */ + ASSERT(HTOP == c_p->htop && E == c_p->stop); + /* TODO: Add DTrace probe for this bad message situation? */ + UNLINK_MESSAGE(c_p, msgp); + msgp->next = NULL; + erts_cleanup_messages(msgp); + goto loop_rec__; + } + SWAPIN; } PreFetch(1, next); r(0) = ERL_MESSAGE_TERM(msgp); @@ -1892,8 +1892,7 @@ void process_main(void) */ OpCase(remove_message): { BeamInstr *next; - ErlMessage* msgp; - + ErtsMessage* msgp; PROCESS_MAIN_CHK_LOCKS(c_p); PreFetch(0, next); @@ -1988,11 +1987,21 @@ void process_main(void) UNLINK_MESSAGE(c_p, msgp); JOIN_MESSAGE(c_p); CANCEL_TIMER(c_p); - free_message(msgp); + + erts_save_message_in_proc(c_p, msgp); + c_p->flags &= ~F_DISABLE_GC; + + if (ERTS_IS_GC_DESIRED_INTERNAL(c_p, HTOP, E)) { + /* + * We want to GC soon but we leave a few + * reductions giving the message some time + * to turn into garbage. + */ + ERTS_VBUMP_LEAVE_REDS_INTERNAL(c_p, 5, FCALLS); + } ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); PROCESS_MAIN_CHK_LOCKS(c_p); - NextPF(0, next); } @@ -2001,9 +2010,22 @@ void process_main(void) * message didn't match), then jump to the loop_rec instruction. */ OpCase(loop_rec_end_f): { + + ASSERT(c_p->flags & F_DISABLE_GC); + SET_I((BeamInstr *) Arg(0)); SAVE_MESSAGE(c_p); - goto loop_rec__; + if (FCALLS > 0 || FCALLS > neg_o_reds) { + FCALLS--; + goto loop_rec__; + } + + c_p->flags &= ~F_DISABLE_GC; + c_p->i = I; + SWAPOUT; + c_p->arity = 0; + c_p->current = NULL; + goto do_schedule; } /* * Prepare to wait for a message or a timeout, whichever occurs first. @@ -2733,6 +2755,7 @@ do { \ Eterm (*bf)(Process*, Eterm*, BeamInstr*) = GET_BIF_ADDRESS(Arg(0)); Eterm result; BeamInstr *next; + ErlHeapFragment *live_hf_end; PRE_BIF_SWAPOUT(c_p); c_p->fcalls = FCALLS - 1; @@ -2742,17 +2765,18 @@ do { \ PreFetch(1, next); ASSERT(!ERTS_PROC_IS_EXITING(c_p)); ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); + live_hf_end = c_p->mbuf; result = (*bf)(c_p, reg, I); ASSERT(!ERTS_PROC_IS_EXITING(c_p) || is_non_value(result)); ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); ERTS_HOLE_CHECK(c_p); ERTS_SMP_REQ_PROC_MAIN_LOCK(c_p); - PROCESS_MAIN_CHK_LOCKS(c_p); - if (c_p->mbuf || MSO(c_p).overhead >= BIN_VHEAP_SZ(c_p)) { + if (ERTS_IS_GC_DESIRED(c_p)) { Uint arity = ((Export *)Arg(0))->code[2]; - result = erts_gc_after_bif_call(c_p, result, reg, arity); + result = erts_gc_after_bif_call_lhf(c_p, live_hf_end, result, reg, arity); E = c_p->stop; } + PROCESS_MAIN_CHK_LOCKS(c_p); HTOP = HEAP_TOP(c_p); FCALLS = c_p->fcalls; if (is_value(result)) { @@ -3414,9 +3438,6 @@ do { \ goto do_schedule; } else { ASSERT(!is_value(r(0))); - if (c_p->mbuf) { - erts_garbage_collect(c_p, 0, reg+1, 3); - } SWAPIN; Goto(*I); } @@ -3440,6 +3461,7 @@ do { \ * I[3]: Function pointer to dirty NIF */ BifFunction vbf; + ErlHeapFragment *live_hf_end; DTRACE_NIF_ENTRY(c_p, (Eterm)I[-3], (Eterm)I[-2], (Uint)I[-1]); c_p->current = I-3; /* current and vbf set to please handle_error */ @@ -3455,6 +3477,7 @@ do { \ NifF* fp = vbf = (NifF*) I[1]; struct enif_environment_t env; erts_pre_nif(&env, c_p, (struct erl_module_nif*)I[2]); + live_hf_end = c_p->mbuf; nif_bif_result = (*fp)(&env, bif_nif_arity, reg); if (env.exception_thrown) nif_bif_result = THE_NON_VALUE; @@ -3497,6 +3520,7 @@ do { \ { Eterm (*bf)(Process*, Eterm*, BeamInstr*) = vbf; ASSERT(!ERTS_PROC_IS_EXITING(c_p)); + live_hf_end = c_p->mbuf; nif_bif_result = (*bf)(c_p, reg, I); ASSERT(!ERTS_PROC_IS_EXITING(c_p) || is_non_value(nif_bif_result)); @@ -3509,9 +3533,17 @@ do { \ apply_bif_or_nif_epilogue: ERTS_SMP_REQ_PROC_MAIN_LOCK(c_p); ERTS_HOLE_CHECK(c_p); - if (c_p->mbuf) { - nif_bif_result = erts_gc_after_bif_call(c_p, nif_bif_result, - reg, bif_nif_arity); + /* + * We want to test with ERTS_IS_GC_DESIRED(c_p) in order + * to trigger gc due to binaries based on same conditions + * regardless of how the bif is called. This change will + * however be introduced in a separate commit in order to + * easier identify why the characteristics changed. + */ + if (c_p->stop - c_p->htop < c_p->mbuf_sz) { + nif_bif_result = erts_gc_after_bif_call_lhf(c_p, live_hf_end, + nif_bif_result, + reg, bif_nif_arity); } SWAPIN; /* There might have been a garbage collection. */ FCALLS = c_p->fcalls; @@ -6340,13 +6372,6 @@ new_map(Process* p, Eterm* reg, BeamInstr* I) erts_factory_proc_init(&factory, p); res = erts_hashmap_from_array(&factory, thp, n/2, 0); erts_factory_close(&factory); - if (p->mbuf) { - Uint live = Arg(2); - reg[live] = res; - erts_garbage_collect(p, 0, reg, live+1); - res = reg[live]; - E = p->stop; - } return res; } @@ -6412,13 +6437,6 @@ update_map_assoc(Process* p, Eterm* reg, Eterm map, BeamInstr* I) hx = hashmap_make_hash(new_key); res = erts_hashmap_insert(p, hx, new_key, val, res, 0); - if (p->mbuf) { - Uint live = Arg(3); - reg[live] = res; - erts_garbage_collect(p, 0, reg, live+1); - res = reg[live]; - E = p->stop; - } new_p += 2; } @@ -6578,12 +6596,6 @@ update_map_assoc(Process* p, Eterm* reg, Eterm map, BeamInstr* I) /* The expensive case, need to build a hashmap */ if (n > MAP_SMALL_MAP_LIMIT) { res = erts_hashmap_from_ks_and_vs(p,flatmap_get_keys(mp),flatmap_get_values(mp),n); - if (p->mbuf) { - Uint live = Arg(3); - reg[live] = res; - erts_garbage_collect(p, 0, reg, live+1); - res = reg[live]; - } } return res; } @@ -6639,14 +6651,6 @@ update_map_exact(Process* p, Eterm* reg, Eterm map, BeamInstr* I) return res; } - if (p->mbuf) { - Uint live = Arg(3); - reg[live] = res; - erts_garbage_collect(p, 0, reg, live+1); - res = reg[live]; - E = p->stop; - } - new_p += 2; } return res; diff --git a/erts/emulator/beam/beam_load.c b/erts/emulator/beam/beam_load.c index 7a1a563be2..5db971b6af 100644 --- a/erts/emulator/beam/beam_load.c +++ b/erts/emulator/beam/beam_load.c @@ -902,7 +902,7 @@ static ErlHeapFragment* new_literal_fragment(Uint size) ErlHeapFragment* bp; bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_PREPARED_CODE, ERTS_HEAP_FRAG_SIZE(size)); - ERTS_INIT_HEAP_FRAG(bp, size); + ERTS_INIT_HEAP_FRAG(bp, size, size); return bp; } @@ -1528,8 +1528,8 @@ read_literal_table(LoaderState* stp) } if (heap_size > 0) { - erts_factory_message_init(&factory, NULL, NULL, - new_literal_fragment(heap_size)); + erts_factory_heap_frag_init(&factory, + new_literal_fragment(heap_size)); factory.alloc_type = ERTS_ALC_T_PREPARED_CODE; val = erts_decode_ext(&factory, &p); diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 5ec1840c7b..e4283ac945 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -72,7 +72,7 @@ BIF_RETTYPE spawn_3(BIF_ALIST_3) ErlSpawnOpts so; Eterm pid; - so.flags = 0; + so.flags = erts_default_spo_flags; pid = erl_create_process(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, &so); if (is_non_value(pid)) { BIF_ERROR(BIF_P, so.error_code); @@ -589,7 +589,7 @@ erts_queue_monitor_message(Process *p, Eterm reason_copy, ref_copy, item_copy; Uint reason_size, ref_size, item_size, heap_size; ErlOffHeap *ohp; - ErlHeapFragment *bp; + ErtsMessage *msgp; reason_size = IS_CONST(reason) ? 0 : size_object(reason); item_size = IS_CONST(item) ? 0 : size_object(item); @@ -597,11 +597,8 @@ erts_queue_monitor_message(Process *p, heap_size = 6+reason_size+ref_size+item_size; - hp = erts_alloc_message_heap(heap_size, - &bp, - &ohp, - p, - p_locksp); + msgp = erts_alloc_message_heap(p, p_locksp, heap_size, + &hp, &ohp); reason_copy = (IS_CONST(reason) ? reason @@ -612,7 +609,7 @@ erts_queue_monitor_message(Process *p, ref_copy = copy_struct(ref, ref_size, &hp, ohp); tup = TUPLE5(hp, am_DOWN, ref_copy, type, item_copy, reason_copy); - erts_queue_message(p, p_locksp, bp, tup, NIL); + erts_queue_message(p, p_locksp, msgp, tup, NIL); } static BIF_RETTYPE @@ -841,7 +838,7 @@ BIF_RETTYPE spawn_link_3(BIF_ALIST_3) ErlSpawnOpts so; Eterm pid; - so.flags = SPO_LINK; + so.flags = erts_default_spo_flags|SPO_LINK; pid = erl_create_process(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, &so); if (is_non_value(pid)) { BIF_ERROR(BIF_P, so.error_code); @@ -878,7 +875,7 @@ BIF_RETTYPE spawn_opt_1(BIF_ALIST_1) /* * Store default values for options. */ - so.flags = SPO_USE_ARGS; + so.flags = erts_default_spo_flags|SPO_USE_ARGS; so.min_heap_size = H_MIN_SIZE; so.min_vheap_size = BIN_VH_MIN_SIZE; so.priority = PRIORITY_NORMAL; @@ -913,6 +910,13 @@ BIF_RETTYPE spawn_opt_1(BIF_ALIST_1) so.priority = PRIORITY_LOW; else goto error; + } else if (arg == am_off_heap_message_queue) { + if (val == am_true) + so.flags |= SPO_OFF_HEAP_MSGQ; + else if (val == am_false) + so.flags &= ~SPO_OFF_HEAP_MSGQ; + else + goto error; } else if (arg == am_min_heap_size && is_small(val)) { Sint min_heap_size = signed_val(val); if (min_heap_size < 0) { @@ -1691,6 +1695,17 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2) } BIF_RET(old_value); } + else if (BIF_ARG_1 == am_off_heap_message_queue) { + int enable; + if (BIF_ARG_2 == am_true) + enable = 1; + else if (BIF_ARG_2 == am_false) + enable = 0; + else + goto error; + old_value = erts_change_off_heap_message_queue_state(BIF_P, enable); + BIF_RET(old_value); + } else if (BIF_ARG_1 == am_sensitive) { Uint is_sensitive; if (BIF_ARG_2 == am_true) { @@ -1931,7 +1946,7 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext* ctx) } else if (is_atom(to)) { Eterm id = erts_whereis_name_to_id(p, to); - rp = erts_proc_lookup(id); + rp = erts_proc_lookup_raw(id); if (rp) { if (IS_TRACED(p)) trace_send(p, to, msg); @@ -4479,7 +4494,7 @@ BIF_RETTYPE system_flag_2(BIF_ALIST_2) } } else if (BIF_ARG_1 == make_small(1)) { int i, max; - ErlMessage* mp; + ErtsMessage* mp; erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); erts_smp_thr_progress_block(); diff --git a/erts/emulator/beam/bif.h b/erts/emulator/beam/bif.h index c6ed60376a..a62eddf36b 100644 --- a/erts/emulator/beam/bif.h +++ b/erts/emulator/beam/bif.h @@ -54,22 +54,24 @@ extern Export *erts_convert_time_unit_trap; (p)->fcalls = -CONTEXT_REDS; \ } while(0) - -#define ERTS_VBUMP_ALL_REDS(p) \ +#define ERTS_VBUMP_ALL_REDS_INTERNAL(p, fcalls) \ do { \ if (!ERTS_PROC_GET_SAVED_CALLS_BUF((p))) { \ - if ((p)->fcalls > 0) \ - ERTS_PROC_GET_SCHDATA((p))->virtual_reds += (p)->fcalls; \ - (p)->fcalls = 0; \ + if ((fcalls) > 0) \ + ERTS_PROC_GET_SCHDATA((p))->virtual_reds += (fcalls); \ + (fcalls) = 0; \ } \ else { \ - if ((p)->fcalls > -CONTEXT_REDS) \ + if ((fcalls) > -CONTEXT_REDS) \ ERTS_PROC_GET_SCHDATA((p))->virtual_reds \ - += ((p)->fcalls - (-CONTEXT_REDS)); \ - (p)->fcalls = -CONTEXT_REDS; \ + += ((fcalls) - (-CONTEXT_REDS)); \ + (fcalls) = -CONTEXT_REDS; \ } \ } while(0) +#define ERTS_VBUMP_ALL_REDS(p) \ + ERTS_VBUMP_ALL_REDS_INTERNAL((p), (p)->fcalls) + #define BUMP_REDS(p, gc) do { \ ASSERT(p); \ ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN & erts_proc_lc_my_proc_locks(p));\ @@ -110,10 +112,34 @@ do { \ } \ } while(0) -#define ERTS_BIF_REDS_LEFT(p) \ +#define ERTS_VBUMP_LEAVE_REDS_INTERNAL(P, Reds, FCalls) \ + do { \ + if (ERTS_PROC_GET_SAVED_CALLS_BUF((P))) { \ + int nreds__ = ((int)(Reds)) - CONTEXT_REDS; \ + if ((FCalls) > nreds__) { \ + ERTS_PROC_GET_SCHDATA((P))->virtual_reds \ + += (FCalls) - nreds__; \ + (FCalls) = nreds__; \ + } \ + } \ + else { \ + if ((FCalls) > (Reds)) { \ + ERTS_PROC_GET_SCHDATA((P))->virtual_reds \ + += (FCalls) - (Reds); \ + (FCalls) = (Reds); \ + } \ + } \ + } while (0) + +#define ERTS_VBUMP_LEAVE_REDS(P, Reds) \ + ERTS_VBUMP_LEAVE_REDS_INTERNAL(P, Reds, (P)->fcalls) + +#define ERTS_REDS_LEFT(p, FCalls) \ (ERTS_PROC_GET_SAVED_CALLS_BUF((p)) \ - ? ((p)->fcalls > -CONTEXT_REDS ? ((p)->fcalls - (-CONTEXT_REDS)) : 0)\ - : ((p)->fcalls > 0 ? (p)->fcalls : 0)) + ? ((FCalls) > -CONTEXT_REDS ? ((FCalls) - (-CONTEXT_REDS)) : 0) \ + : ((FCalls) > 0 ? (FCalls) : 0)) + +#define ERTS_BIF_REDS_LEFT(p) ERTS_REDS_LEFT(p, p->fcalls) #define BIF_RET2(x, gc) do { \ BUMP_REDS(BIF_P, (gc)); \ diff --git a/erts/emulator/beam/break.c b/erts/emulator/beam/break.c index c7e7411935..aa5ec123a7 100644 --- a/erts/emulator/beam/break.c +++ b/erts/emulator/beam/break.c @@ -252,7 +252,7 @@ print_process_info(int to, void *to_arg, Process *p) /* display the message queue only if there is anything in it */ if (!ERTS_IS_CRASH_DUMPING && p->msg.first != NULL && !garbing) { - ErlMessage* mp; + ErtsMessage* mp; erts_print(to, to_arg, "Message queue: ["); for (mp = p->msg.first; mp; mp = mp->next) erts_print(to, to_arg, mp->next ? "%T," : "%T", ERL_MESSAGE_TERM(mp)); @@ -323,7 +323,7 @@ print_process_info(int to, void *to_arg, Process *p) erts_print(to, to_arg, "Heap unused: %bpu\n", (p->hend - p->htop)); erts_print(to, to_arg, "OldHeap unused: %bpu\n", (OLD_HEAP(p) == NULL) ? 0 : (OLD_HEND(p) - OLD_HTOP(p)) ); - erts_print(to, to_arg, "Memory: %beu\n", erts_process_memory(p)); + erts_print(to, to_arg, "Memory: %beu\n", erts_process_memory(p, !0)); if (garbing) { print_garb_info(to, to_arg, p); diff --git a/erts/emulator/beam/copy.c b/erts/emulator/beam/copy.c index b185758b1d..f27c526413 100644 --- a/erts/emulator/beam/copy.c +++ b/erts/emulator/beam/copy.c @@ -279,7 +279,7 @@ Eterm copy_struct(Eterm obj, Uint sz, Eterm** hpp, ErlOffHeap* off_heap) break; case TAG_PRIMARY_LIST: objp = list_val(obj); - if (in_area(objp,hstart,hsize)) { + if (ErtsInArea(objp,hstart,hsize)) { hp++; break; } @@ -318,7 +318,7 @@ Eterm copy_struct(Eterm obj, Uint sz, Eterm** hpp, ErlOffHeap* off_heap) } case TAG_PRIMARY_BOXED: - if (in_area(boxed_val(obj),hstart,hsize)) { + if (ErtsInArea(boxed_val(obj),hstart,hsize)) { hp++; break; } diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index efd5109269..bfddcadca3 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -373,10 +373,11 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp) ASSERT(lnk->type == LINK_NODE); if (is_internal_pid(lnk->pid)) { ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK; - rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks); - if (!rp) { + ErlOffHeap *ohp; + rp = erts_proc_lookup(lnk->pid); + if (!rp) goto done; - } + erts_smp_proc_lock(rp, rp_locks); rlnk = erts_remove_link(&ERTS_P_LINKS(rp), name); if (rlnk != NULL) { ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE)); @@ -384,12 +385,14 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp) } n = ERTS_LINK_REFC(lnk); for (i = 0; i < n; ++i) { - ErlHeapFragment* bp; - ErlOffHeap *ohp; Eterm tup; - Eterm *hp = erts_alloc_message_heap(3,&bp,&ohp,rp,&rp_locks); + Eterm *hp; + ErtsMessage *msgp; + + msgp = erts_alloc_message_heap(rp, &rp_locks, + 3, &hp, &ohp); tup = TUPLE2(hp, am_nodedown, name); - erts_queue_message(rp, &rp_locks, bp, tup, NIL); + erts_queue_message(rp, &rp_locks, msgp, tup, NIL); } erts_smp_proc_unlock(rp, rp_locks); } @@ -1458,7 +1461,7 @@ int erts_net_message(Port *prt, ErlOffHeap *ohp; ASSERT(xsize); heap_frag = erts_dist_ext_trailer(ede_copy); - ERTS_INIT_HEAP_FRAG(heap_frag, token_size); + ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size); hp = heap_frag->mem; ohp = &heap_frag->off_heap; token = tuple[5]; @@ -1507,7 +1510,7 @@ int erts_net_message(Port *prt, ErlOffHeap *ohp; ASSERT(xsize); heap_frag = erts_dist_ext_trailer(ede_copy); - ERTS_INIT_HEAP_FRAG(heap_frag, token_size); + ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size); hp = heap_frag->mem; ohp = &heap_frag->off_heap; token = tuple[4]; @@ -3267,11 +3270,16 @@ send_nodes_mon_msg(Process *rp, Uint sz) { Eterm msg; - ErlHeapFragment* bp; + Eterm *hp; + ErtsMessage *mp; ErlOffHeap *ohp; - Eterm *hp = erts_alloc_message_heap(sz, &bp, &ohp, rp, rp_locksp); #ifdef DEBUG - Eterm *hend = hp + sz; + Eterm *hend; +#endif + + mp = erts_alloc_message_heap(rp, rp_locksp, sz, &hp, &ohp); +#ifdef DEBUG + hend = hp + sz; #endif if (!nmp->opts) { @@ -3317,7 +3325,7 @@ send_nodes_mon_msg(Process *rp, } ASSERT(hend == hp); - erts_queue_message(rp, rp_locksp, bp, msg, NIL); + erts_queue_message(rp, rp_locksp, mp, msg, NIL); } static void diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c index 3e300f88ea..019aa0f16c 100644 --- a/erts/emulator/beam/erl_alloc.c +++ b/erts/emulator/beam/erl_alloc.c @@ -582,7 +582,7 @@ erts_alloc_init(int *argc, char **argv, ErtsAllocInitOpts *eaiop) fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_DRV_SEL_D_STATE)] = sizeof(ErtsDrvSelectDataState); fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_MSG_REF)] - = sizeof(ErlMessage); + = sizeof(ErtsMessageRef); #ifdef ERTS_SMP fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_THR_Q_EL_SL)] = sizeof(ErtsThrQElement_t); @@ -2916,12 +2916,12 @@ reply_alloc_info(void *vair) int global_instances = air->req_sched == sched_id; ErtsProcLocks rp_locks; Process *rp = air->proc; - Eterm ref_copy = NIL, ai_list, msg; - Eterm *hp = NULL, *hp_end = NULL, *hp_start = NULL; + Eterm ref_copy = NIL, ai_list, msg = NIL; + Eterm *hp = NULL, *hp_start = NULL, *hp_end = NULL; Eterm **hpp; Uint sz, *szp; ErlOffHeap *ohp = NULL; - ErlHeapFragment *bp = NULL; + ErtsMessage *mp = NULL; struct erts_mmap_info_struct emis; int i; Eterm (*info_func)(Allctr_t *, @@ -3123,20 +3123,17 @@ reply_alloc_info(void *vair) if (hpp) break; - hp = erts_alloc_message_heap(sz, &bp, &ohp, rp, &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, sz, &hp, &ohp); hp_start = hp; hp_end = hp + sz; szp = NULL; hpp = &hp; } - if (bp) - bp = erts_resize_message_buffer(bp, hp - hp_start, &msg, 1); - else { - ASSERT(hp); - HRelease(rp, hp_end, hp); - } - erts_queue_message(rp, &rp_locks, bp, msg, NIL); + if (hp != hp_end) + erts_shrink_message_heap(&mp, rp, hp_start, hp, hp_end, &msg, 1); + + erts_queue_message(rp, &rp_locks, mp, msg, NIL); if (air->req_sched == sched_id) rp_locks &= ~ERTS_PROC_LOCK_MAIN; diff --git a/erts/emulator/beam/erl_alloc.h b/erts/emulator/beam/erl_alloc.h index 9da9c823f7..14e80960f5 100644 --- a/erts/emulator/beam/erl_alloc.h +++ b/erts/emulator/beam/erl_alloc.h @@ -44,9 +44,11 @@ #if ERTS_CAN_INLINE && ERTS_ALC_WANT_INLINE # define ERTS_ALC_DO_INLINE 1 # define ERTS_ALC_INLINE static ERTS_INLINE +# define ERTS_ALC_FORCE_INLINE static ERTS_FORCE_INLINE #else # define ERTS_ALC_DO_INLINE 0 # define ERTS_ALC_INLINE +# define ERTS_ALC_FORCE_INLINE #endif #define ERTS_ALC_NO_FIXED_SIZES \ @@ -293,7 +295,7 @@ int erts_is_allctr_wrapper_prelocked(void) #ifdef ERTS_HAVE_IS_IN_LITERAL_RANGE -ERTS_ALC_INLINE +ERTS_ALC_FORCE_INLINE int erts_is_in_literal_range(void* ptr) { #if defined(ARCH_32) diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 7d519c1be4..75b4913012 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -152,6 +152,8 @@ type OLD_HEAP EHEAP PROCESSES old_heap type HEAP_FRAG EHEAP PROCESSES heap_frag type TMP_HEAP TEMPORARY PROCESSES tmp_heap type MSG_REF FIXED_SIZE PROCESSES msg_ref +type MSG EHEAP PROCESSES message +type MSGQ_CHNG SHORT_LIVED PROCESSES messages_queue_change type MSG_ROOTS TEMPORARY PROCESSES msg_roots type ROOTSET TEMPORARY PROCESSES root_set type LOADER_TMP TEMPORARY CODE loader_tmp diff --git a/erts/emulator/beam/erl_bif_ddll.c b/erts/emulator/beam/erl_bif_ddll.c index 28bec6325c..2b1d875bfe 100644 --- a/erts/emulator/beam/erl_bif_ddll.c +++ b/erts/emulator/beam/erl_bif_ddll.c @@ -1707,18 +1707,19 @@ static void notify_proc(Process *proc, Eterm ref, Eterm driver_name, Eterm type, Eterm mess; Eterm r; Eterm *hp; - ErlHeapFragment *bp; - ErlOffHeap *ohp; + ErtsMessage *mp; ErtsProcLocks rp_locks = 0; + ErlOffHeap *ohp; ERTS_SMP_CHK_NO_PROC_LOCKS; assert_drv_list_rwlocked(); if (errcode != 0) { int need = load_error_need(errcode); Eterm e; - hp = erts_alloc_message_heap(6 /* tuple */ + 3 /* Error tuple */ + - REF_THING_SIZE + need, &bp, &ohp, - proc, &rp_locks); + mp = erts_alloc_message_heap(proc, &rp_locks, + (6 /* tuple */ + 3 /* Error tuple */ + + REF_THING_SIZE + need), + &hp, &ohp); r = copy_ref(ref,hp); hp += REF_THING_SIZE; e = build_load_error_hp(hp, errcode); @@ -1727,12 +1728,14 @@ static void notify_proc(Process *proc, Eterm ref, Eterm driver_name, Eterm type, hp += 3; mess = TUPLE5(hp,type,r,am_driver,driver_name,mess); } else { - hp = erts_alloc_message_heap(6 /* tuple */ + REF_THING_SIZE, &bp, &ohp, proc, &rp_locks); + mp = erts_alloc_message_heap(proc, &rp_locks, + 6 /* tuple */ + REF_THING_SIZE, + &hp, &ohp); r = copy_ref(ref,hp); hp += REF_THING_SIZE; mess = TUPLE5(hp,type,r,am_driver,driver_name,tag); } - erts_queue_message(proc, &rp_locks, bp, mess, am_undefined); + erts_queue_message(proc, &rp_locks, mp, mess, am_undefined); erts_smp_proc_unlock(proc, rp_locks); ERTS_SMP_CHK_NO_PROC_LOCKS; } diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index a684c81445..1eb106a551 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -589,6 +589,7 @@ static Eterm pi_args[] = { am_min_bin_vheap_size, am_current_location, am_current_stacktrace, + am_off_heap_message_queue }; #define ERTS_PI_ARGS ((int) (sizeof(pi_args)/sizeof(Eterm))) @@ -636,6 +637,7 @@ pi_arg2ix(Eterm arg) case am_min_bin_vheap_size: return 28; case am_current_location: return 29; case am_current_stacktrace: return 30; + case am_off_heap_message_queue: return 31; default: return -1; } } @@ -718,9 +720,10 @@ pi_pid2proc(Process *c_p, Eterm pid, ErtsProcLocks info_locks) -BIF_RETTYPE +static BIF_RETTYPE process_info_aux(Process *BIF_P, Process *rp, + ErtsProcLocks rp_locks, Eterm rpid, Eterm item, int always_wrap); @@ -811,10 +814,31 @@ process_info_list(Process *c_p, Eterm pid, Eterm list, int always_wrap, *fail_type = ERTS_PI_FAIL_TYPE_AWAIT_EXIT; goto done; } - else if (!(locks & ERTS_PROC_LOCK_STATUS)) { - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); + else { + ErtsProcLocks unlock_locks = 0; + + if (c_p == rp) + locks |= ERTS_PROC_LOCK_MAIN; + + if (!(locks & ERTS_PROC_LOCK_STATUS)) + unlock_locks |= ERTS_PROC_LOCK_STATUS; + + if (locks & ERTS_PROC_LOCK_MSGQ) { + /* + * Move in queue into private queue and + * release msgq lock, enabling others to + * send messages to the process while it + * is being inspected... + */ + ASSERT(locks & ERTS_PROC_LOCK_MAIN); + ERTS_SMP_MSGQ_MV_INQ2PRIVQ(rp); + locks &= ~ERTS_PROC_LOCK_MSGQ; + unlock_locks |= ERTS_PROC_LOCK_MSGQ; + } + + if (unlock_locks) + erts_smp_proc_unlock(rp, unlock_locks); } - /* * We always handle 'messages' first if it should be part @@ -826,7 +850,7 @@ process_info_list(Process *c_p, Eterm pid, Eterm list, int always_wrap, if (want_messages) { ix = pi_arg2ix(am_messages); ASSERT(part_res[ix] == THE_NON_VALUE); - part_res[ix] = process_info_aux(c_p, rp, pid, am_messages, always_wrap); + part_res[ix] = process_info_aux(c_p, rp, locks, pid, am_messages, always_wrap); ASSERT(part_res[ix] != THE_NON_VALUE); } @@ -834,7 +858,7 @@ process_info_list(Process *c_p, Eterm pid, Eterm list, int always_wrap, ix = res_elem_ix[res_elem_ix_ix]; if (part_res[ix] == THE_NON_VALUE) { arg = pi_ix2arg(ix); - part_res[ix] = process_info_aux(c_p, rp, pid, arg, always_wrap); + part_res[ix] = process_info_aux(c_p, rp, locks, pid, arg, always_wrap); ASSERT(part_res[ix] != THE_NON_VALUE); } } @@ -965,9 +989,31 @@ BIF_RETTYPE process_info_2(BIF_ALIST_2) ERTS_BIF_AWAIT_X_DATA_TRAP(BIF_P, BIF_ARG_1, am_undefined); } else { + ErtsProcLocks unlock_locks = 0; + + if (BIF_P == rp) + info_locks |= ERTS_PROC_LOCK_MAIN; + if (!(info_locks & ERTS_PROC_LOCK_STATUS)) - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); - res = process_info_aux(BIF_P, rp, pid, BIF_ARG_2, 0); + unlock_locks |= ERTS_PROC_LOCK_STATUS; + + if (info_locks & ERTS_PROC_LOCK_MSGQ) { + /* + * Move in queue into private queue and + * release msgq lock, enabling others to + * send messages to the process while it + * is being inspected... + */ + ASSERT(info_locks & ERTS_PROC_LOCK_MAIN); + ERTS_SMP_MSGQ_MV_INQ2PRIVQ(rp); + info_locks &= ~ERTS_PROC_LOCK_MSGQ; + unlock_locks |= ERTS_PROC_LOCK_MSGQ; + } + + if (unlock_locks) + erts_smp_proc_unlock(rp, unlock_locks); + + res = process_info_aux(BIF_P, rp, info_locks, pid, BIF_ARG_2, 0); } ASSERT(is_value(res)); @@ -985,6 +1031,7 @@ BIF_RETTYPE process_info_2(BIF_ALIST_2) Eterm process_info_aux(Process *BIF_P, Process *rp, + ErtsProcLocks rp_locks, Eterm rpid, Eterm item, int always_wrap) @@ -1056,171 +1103,55 @@ process_info_aux(Process *BIF_P, break; case am_messages: { - ErlMessage* mp; - int n; - ERTS_SMP_MSGQ_MV_INQ2PRIVQ(rp); - n = rp->msg.len; - - if (n == 0 || ERTS_TRACE_FLAGS(rp) & F_SENSITIVE) { + if (rp->msg.len == 0 || ERTS_TRACE_FLAGS(rp) & F_SENSITIVE) { hp = HAlloc(BIF_P, 3); } else { - int remove_bad_messages = 0; - struct { - Uint copy_struct_size; - ErlMessage* msgp; - } *mq = erts_alloc(ERTS_ALC_T_TMP, n*sizeof(*mq)); - Sint i = 0; - Uint heap_need = 3; + ErtsMessageInfo *mip; + Sint i; + Uint heap_need; +#ifdef DEBUG Eterm *hp_end; +#endif - for (mp = rp->msg.first; mp; mp = mp->next) { - heap_need += 2; - mq[i].msgp = mp; - if (rp != BIF_P) { - Eterm msg = ERL_MESSAGE_TERM(mq[i].msgp); - if (is_value(msg)) { - mq[i].copy_struct_size = (is_immed(msg)? 0 : - size_object(msg)); - } - else if (mq[i].msgp->data.attached) { - mq[i].copy_struct_size - = erts_msg_attached_data_size(mq[i].msgp); - } - else { - /* Bad distribution message; ignore */ - remove_bad_messages = 1; - mq[i].copy_struct_size = 0; - } - heap_need += mq[i].copy_struct_size; - } - else { - mq[i].copy_struct_size = mp->data.attached ? - erts_msg_attached_data_size(mp) : 0; - } - i++; - } + mip = erts_alloc(ERTS_ALC_T_TMP, + rp->msg.len*sizeof(ErtsMessageInfo)); - if (rp != BIF_P) { - hp = HAlloc(BIF_P, heap_need); - hp_end = hp + heap_need; - ASSERT(i == n); - for (i--; i >= 0; i--) { - Eterm msg = ERL_MESSAGE_TERM(mq[i].msgp); - if (is_value(msg)) { - if (mq[i].copy_struct_size) - msg = copy_struct(msg, - mq[i].copy_struct_size, - &hp, - &MSO(BIF_P)); - } - else if (mq[i].msgp->data.attached) { - ErlHeapFragment *hfp; - /* - * Decode it into a message buffer and attach it - * to the message instead of the attached external - * term. - * - * Note that we may not pass a process pointer - * to erts_msg_distext2heap(), since it would then - * try to alter locks on that process. - */ - msg = erts_msg_distext2heap( - NULL, NULL, &hfp, &ERL_MESSAGE_TOKEN(mq[i].msgp), - mq[i].msgp->data.dist_ext); - - ERL_MESSAGE_TERM(mq[i].msgp) = msg; - mq[i].msgp->data.heap_frag = hfp; - - if (is_non_value(msg)) { - ASSERT(!mq[i].msgp->data.heap_frag); - /* Bad distribution message; ignore */ - remove_bad_messages = 1; - continue; - } - else { - /* Make our copy of the message */ - ASSERT(size_object(msg) == erts_used_frag_sz(hfp)); - msg = copy_struct(msg, - erts_used_frag_sz(hfp), - &hp, - &MSO(BIF_P)); - } - } - else { - /* Bad distribution message; ignore */ - remove_bad_messages = 1; - continue; - } - res = CONS(hp, msg, res); - hp += 2; - } - HRelease(BIF_P, hp_end, hp+3); - } - else { - for (i--; i >= 0; i--) { - ErtsHeapFactory factory; - Eterm msg = ERL_MESSAGE_TERM(mq[i].msgp); - - erts_factory_proc_prealloc_init(&factory, BIF_P, - mq[i].copy_struct_size+2); - if (mq[i].msgp->data.attached) { - /* Decode it on the heap */ - erts_move_msg_attached_data_to_heap(&factory, - mq[i].msgp); - msg = ERL_MESSAGE_TERM(mq[i].msgp); - ASSERT(!mq[i].msgp->data.attached); - } - if (is_value(msg)) { - hp = erts_produce_heap(&factory, 2, 0); - res = CONS(hp, msg, res); - } - else { - /* Bad distribution message; ignore */ - remove_bad_messages = 1; - continue; - } - erts_factory_close(&factory); - } - hp = HAlloc(BIF_P, 3); - } - erts_free(ERTS_ALC_T_TMP, mq); - if (remove_bad_messages) { - ErlMessage **mpp; - /* - * We need to remove bad distribution messages from - * the queue, so that the value returned for - * 'message_queue_len' is consistent with the value - * returned for 'messages'. - */ - mpp = &rp->msg.first; - mp = rp->msg.first; - while (mp) { - if (is_value(ERL_MESSAGE_TERM(mp))) { - mpp = &mp->next; - mp = mp->next; - } - else { - ErlMessage* bad_mp = mp; - ASSERT(!mp->data.attached); - if (rp->msg.save == &mp->next) - rp->msg.save = mpp; - if (rp->msg.last == &mp->next) - rp->msg.last = mpp; - *mpp = mp->next; - mp = mp->next; - rp->msg.len--; - free_message(bad_mp); - } - } + /* + * Note that message queue may shrink when calling + * erts_prep_msgq_for_inspection() since it removes + * corrupt distribution messages. + */ + heap_need = erts_prep_msgq_for_inspection(BIF_P, rp, rp_locks, mip); + heap_need += 3; /* top 2-tuple */ + heap_need += rp->msg.len*2; /* Cons cells */ + + hp = HAlloc(BIF_P, heap_need); /* heap_need is exact */ +#ifdef DEBUG + hp_end = hp + heap_need; +#endif + + /* Build list of messages... */ + for (i = rp->msg.len - 1, res = NIL; i >= 0; i--) { + Eterm msg = ERL_MESSAGE_TERM(mip[i].msgp); + Uint sz = mip[i].size; + + if (sz != 0) + msg = copy_struct(msg, sz, &hp, &BIF_P->off_heap); + + res = CONS(hp, msg, res); + hp += 2; } + + ASSERT(hp_end == hp + 3); + + erts_free(ERTS_ALC_T_TMP, mip); } break; } case am_message_queue_len: hp = HAlloc(BIF_P, 3); - ERTS_SMP_MSGQ_MV_INQ2PRIVQ(rp); res = make_small(rp->msg.len); break; @@ -1408,7 +1339,7 @@ process_info_aux(Process *BIF_P, } case am_total_heap_size: { - ErlMessage *mp; + ErtsMessage *mp; Uint total_heap_size; Uint hsz = 3; @@ -1418,8 +1349,6 @@ process_info_aux(Process *BIF_P, total_heap_size += rp->mbuf_sz; - ERTS_SMP_MSGQ_MV_INQ2PRIVQ(rp); - for (mp = rp->msg.first; mp; mp = mp->next) if (mp->data.attached) total_heap_size += erts_msg_attached_data_size(mp); @@ -1441,7 +1370,7 @@ process_info_aux(Process *BIF_P, case am_memory: { /* Memory consumed in bytes */ Uint hsz = 3; - Uint size = erts_process_memory(rp); + Uint size = erts_process_memory(rp, 0); (void) erts_bld_uint(NULL, &hsz, size); hp = HAlloc(BIF_P, hsz); res = erts_bld_uint(&hp, NULL, size); @@ -1567,6 +1496,11 @@ process_info_aux(Process *BIF_P, break; } + case am_off_heap_message_queue: + res = BIF_P->flags & F_OFF_HEAP_MSGQ ? am_true : am_false; + hp = HAlloc(BIF_P, 3); + break; + default: return THE_NON_VALUE; /* will produce badarg */ @@ -2728,6 +2662,10 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1) BIF_RET(am_true); } #endif + else if (BIF_ARG_1 == am_off_heap_message_queue) { + BIF_RET(erts_default_spo_flags & SPO_OFF_HEAP_MSGQ + ? am_true : am_false); + } else if (ERTS_IS_ATOM_STR("compile_info",BIF_ARG_1)) { Uint sz; Eterm res = NIL, tup, text; @@ -3865,9 +3803,7 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2) BIF_RET(am_false); } else { - FLAGS(rp) |= F_FORCE_GC; - if (BIF_P != rp) - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); + ERTS_FORCE_GC(BIF_P); BIF_RET(am_true); } } diff --git a/erts/emulator/beam/erl_debug.c b/erts/emulator/beam/erl_debug.c index 434b6c6d7a..6652ef9bb3 100644 --- a/erts/emulator/beam/erl_debug.c +++ b/erts/emulator/beam/erl_debug.c @@ -309,6 +309,8 @@ void erts_check_for_holes(Process* p) p->last_htop = HEAP_TOP(p); for (hf = MBUF(p); hf != 0; hf = hf->next) { + if (hf == p->heap_hfrag) + continue; if (hf == p->last_mbuf) { break; } @@ -399,7 +401,7 @@ void verify_process(Process *p) erl_exit(1,"Wild pointer found in " name " of %T!\n",p->common.id); } - ErlMessage* mp = p->msg.first; + ErtsMessage* mp = p->msg.first; VERBOSE(DEBUG_MEMORY,("Verify process: %T...\n",p->common.id)); @@ -528,7 +530,7 @@ static void print_process_memory(Process *p) PTR_SIZE, "PCB", dashes, dashes, dashes, dashes); if (p->msg.first != NULL) { - ErlMessage* mp; + ErtsMessage* mp; erts_printf(" Message Queue:\n"); mp = p->msg.first; while (mp != NULL) { diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c index 2d7b7cafa4..6a52e1a890 100644 --- a/erts/emulator/beam/erl_gc.c +++ b/erts/emulator/beam/erl_gc.c @@ -42,8 +42,6 @@ #include "dtrace-wrapper.h" #include "erl_bif_unique.h" -#define ERTS_CONTINUOUS_NEW_HEAP - #define ERTS_INACT_WR_PB_LEAVE_MUCH_LIMIT 1 #define ERTS_INACT_WR_PB_LEAVE_MUCH_PERCENTAGE 20 #define ERTS_INACT_WR_PB_LEAVE_LIMIT 10 @@ -60,58 +58,6 @@ # define ERTS_GC_ASSERT(B) ((void) 1) #endif -#ifdef ERTS_CONTINUOUS_NEW_HEAP -#define ERTS_IS_NEW_HEAP_PTR__(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \ - ErtsInArea((Ptr), (NhPtr), (NhSz)) -#define ERTS_IS_LITERAL_PTR__(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \ - (!ErtsInArea((Ptr), (NhPtr), (NhSz)) \ - && !ErtsInArea((Ptr), (OhPtr), (OhSz))) - -#ifdef ERTS_GC_DEBUG -#define ERTS_IS_NEW_HEAP_PTR(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \ - (ERTS_IS_NEW_HEAP_PTR__((TPtr), (Ptr), (NhPtr), (NhSz), (OhPtr), (OhSz)) \ - ? (ERTS_GC_ASSERT(!erts_is_literal((TPtr), (Ptr)) \ - && !ErtsInArea((Ptr), (OhPtr), (OhSz))), 1) \ - : (ERTS_GC_ASSERT(erts_is_literal((TPtr), (Ptr)) \ - || ErtsInArea((Ptr), (OhPtr), (OhSz))), 0)) -#define ERTS_IS_LITERAL_PTR(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \ - (ERTS_IS_LITERAL_PTR__((TPtr), (Ptr), (NhPtr), (NhSz), (OhPtr), (OhSz)) \ - ? (ERTS_GC_ASSERT(erts_is_literal((TPtr), (Ptr))), 1) \ - : (ERTS_GC_ASSERT(!erts_is_literal((TPtr), (Ptr))), 0)) -#endif - -#else - -#define ERTS_IS_NEW_HEAP_PTR__(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \ - (!erts_is_literal((TPtr), (Ptr)) && !ErtsInArea((Ptr), (OhPtr), (OhSz))) -#define ERTS_IS_LITERAL_PTR__(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \ - (erts_is_literal((TPtr), (Ptr))) - -#ifdef ERTS_GC_DEBUG -#define ERTS_IS_NEW_HEAP_PTR(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \ - (ERTS_IS_NEW_HEAP_PTR__((TPtr), (Ptr), (NhPtr), (NhSz), (OhPtr), (OhSz)) \ - ? (ERTS_GC_ASSERT(ErtsInArea((Ptr), (NhPtr), (NhSz))), 1) \ - : (ERTS_GC_ASSERT(!ErtsInArea((Ptr), (NhPtr), (NhSz))), 0)) -#define ERTS_IS_LITERAL_PTR(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \ - (ERTS_IS_LITERAL_PTR__((TPtr), (Ptr), (NhPtr), (NhSz), (OhPtr), (OhSz)) \ - ? (ERTS_GC_ASSERT(!ErtsInArea((Ptr), (NhPtr), (NhSz)) \ - && !ErtsInArea((Ptr), (OhPtr), (OhSz))), 1) \ - : (ERTS_GC_ASSERT(ErtsInArea((Ptr), (NhPtr), (NhSz)) \ - || ErtsInArea((Ptr), (OhPtr), (OhSz))), 0)) -#endif - -#endif - -#ifndef ERTS_IS_NEW_HEAP_PTR -#define ERTS_IS_NEW_HEAP_PTR(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \ - ERTS_IS_NEW_HEAP_PTR__((TPtr), (Ptr), (NhPtr), (NhSz), (OhPtr), (OhSz)) -#endif - -#ifndef ERTS_IS_LITERAL_PTR -#define ERTS_IS_LITERAL_PTR(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \ - ERTS_IS_LITERAL_PTR__((TPtr), (Ptr), (NhPtr), (NhSz), (OhPtr), (OhSz)) -#endif - /* * Returns number of elements in an array. */ @@ -132,10 +78,10 @@ #define ErtsGcQuickSanityCheck(P) \ do { \ ASSERT((P)->heap < (P)->hend); \ - ASSERT((P)->heap_sz == (P)->hend - (P)->heap); \ + ASSERT((p)->abandoned_heap || (P)->heap_sz == (P)->hend - (P)->heap); \ ASSERT((P)->heap <= (P)->htop && (P)->htop <= (P)->hend); \ ASSERT((P)->heap <= (P)->stop && (P)->stop <= (P)->hend); \ - ASSERT((P)->heap <= (P)->high_water && (P)->high_water <= (P)->hend);\ + ASSERT((p)->abandoned_heap || ((P)->heap <= (P)->high_water && (P)->high_water <= (P)->hend)); \ OverRunCheck((P)); \ } while (0) #else @@ -163,31 +109,32 @@ typedef struct { static Uint setup_rootset(Process*, Eterm*, int, Rootset*); static void cleanup_rootset(Rootset *rootset); -static Uint combined_message_size(Process* p); static void remove_message_buffers(Process* p); static Eterm *full_sweep_heaps(Process *p, int hibernate, Eterm *n_heap, Eterm* n_htop, - char *h, Uint h_size, char *oh, Uint oh_size, Eterm *objv, int nobj); -static int major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl); -static int minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl); -static void do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj); +static int garbage_collect(Process* p, ErlHeapFragment *live_hf_end, + int need, Eterm* objv, int nobj); +static int major_collection(Process* p, ErlHeapFragment *live_hf_end, + int need, Eterm* objv, int nobj, Uint *recl); +static int minor_collection(Process* p, ErlHeapFragment *live_hf_end, + int need, Eterm* objv, int nobj, Uint *recl); +static void do_minor(Process *p, ErlHeapFragment *live_hf_end, + char *mature, Uint mature_size, + Uint new_sz, Eterm* objv, int nobj); static Eterm *sweep_new_heap(Eterm *n_hp, Eterm *n_htop, - char* new_heap, Uint new_heap_size, char* old_heap, Uint old_heap_size); static Eterm *sweep_heaps(Eterm *n_hp, Eterm *n_htop, - char* new_heap, Uint new_heap_size, char* old_heap, Uint old_heap_size); static Eterm* sweep_literal_area(Eterm* n_hp, Eterm* n_htop, - char* new_heap, Uint new_heap_size, - char* old_heap, Uint old_heap_size, - char* src, Uint src_size); + char* old_heap, Uint old_heap_size, + char* src, Uint src_size); static Eterm* sweep_literals_to_old_heap(Eterm* heap_ptr, Eterm* heap_end, Eterm* htop, char* src, Uint src_size); -static Eterm* collect_heap_frags(Process* p, Eterm* heap, - Eterm* htop, Eterm* objv, int nobj); +static Eterm* collect_live_heap_frags(Process* p, ErlHeapFragment *live_hf_end, + Eterm* heap, Eterm* htop, Eterm* objv, int nobj); static void adjust_after_fullsweep(Process *p, int need, Eterm *objv, int nobj); static void shrink_new_heap(Process *p, Uint new_sz, Eterm *objv, int nobj); static void grow_new_heap(Process *p, Uint new_sz, Eterm* objv, int nobj); @@ -204,7 +151,6 @@ static void init_gc_info(ErtsGCInfo *gcip); #ifdef HARDDEBUG static void disallow_heap_frag_ref_in_heap(Process* p); static void disallow_heap_frag_ref_in_old_heap(Process* p); -static void disallow_heap_frag_ref(Process* p, Eterm* n_htop, Eterm* objv, int nobj); #endif #if defined(ARCH_64) @@ -411,10 +357,19 @@ erts_offset_off_heap(ErlOffHeap *ohp, Sint offs, Eterm* low, Eterm* high) #undef ptr_within Eterm -erts_gc_after_bif_call(Process* p, Eterm result, Eterm* regs, Uint arity) +erts_gc_after_bif_call_lhf(Process* p, ErlHeapFragment *live_hf_end, + Eterm result, Eterm* regs, Uint arity) { int cost; + if (p->flags & F_HIBERNATE_SCHED) { + /* + * We just hibernated. We do *not* want to mess + * up the hibernation by an ordinary GC... + */ + return result; + } + if (is_non_value(result)) { if (p->freason == TRAP) { #if HIPE @@ -422,21 +377,28 @@ erts_gc_after_bif_call(Process* p, Eterm result, Eterm* regs, Uint arity) regs = ERTS_PROC_GET_SCHDATA(p)->x_reg_array; } #endif - cost = erts_garbage_collect(p, 0, regs, p->arity); + cost = garbage_collect(p, live_hf_end, 0, regs, p->arity); } else { - cost = erts_garbage_collect(p, 0, regs, arity); + cost = garbage_collect(p, live_hf_end, 0, regs, arity); } } else { Eterm val[1]; val[0] = result; - cost = erts_garbage_collect(p, 0, val, 1); + cost = garbage_collect(p, live_hf_end, 0, val, 1); result = val[0]; } BUMP_REDS(p, cost); return result; } +Eterm +erts_gc_after_bif_call(Process* p, Eterm result, Eterm* regs, Uint arity) +{ + return erts_gc_after_bif_call_lhf(p, ERTS_INVALID_HFRAG_PTR, + result, regs, arity); +} + static ERTS_INLINE void reset_active_writer(Process *p) { struct erl_off_heap_header* ptr; @@ -450,6 +412,117 @@ static ERTS_INLINE void reset_active_writer(Process *p) } } +#define ERTS_DELAY_GC_EXTRA_FREE 40 + +static int +delay_garbage_collection(Process *p, ErlHeapFragment *live_hf_end, int need) +{ + ErlHeapFragment *hfrag; + Eterm *orig_heap, *orig_hend, *orig_htop, *orig_stop; + Eterm *stop, *hend; + Uint hsz, ssz; + + ERTS_HOLE_CHECK(p); + + if (p->live_hf_end == ERTS_INVALID_HFRAG_PTR) + p->live_hf_end = live_hf_end; + + if (need == 0) + return 1; + + /* + * Satisfy need in a heap fragment... + */ + ASSERT(need > 0); + + orig_heap = p->heap; + orig_hend = p->hend; + orig_htop = p->htop; + orig_stop = p->stop; + + ssz = orig_hend - orig_stop; + hsz = ssz + need + ERTS_DELAY_GC_EXTRA_FREE; + + hfrag = new_message_buffer(hsz); + hfrag->next = p->mbuf; + p->mbuf = hfrag; + p->mbuf_sz += hsz; + p->heap = p->htop = &hfrag->mem[0]; + p->hend = hend = &hfrag->mem[hsz]; + p->stop = stop = hend - ssz; + sys_memcpy((void *) stop, (void *) orig_stop, ssz * sizeof(Eterm)); + + if (p->abandoned_heap) { + /* Active heap already in a fragment; adjust it... */ + ErlHeapFragment *hfrag = ((ErlHeapFragment *) + (((char *) orig_heap) + - offsetof(ErlHeapFragment, mem))); + Uint unused = orig_hend - orig_htop; + ASSERT(hfrag->used_size == hfrag->alloc_size); + ASSERT(hfrag->used_size >= unused); + hfrag->used_size -= unused; + p->mbuf_sz -= unused; + } + else { + /* Do not leave a hole in the abandoned heap... */ + if (orig_htop < orig_hend) { + *orig_htop = make_pos_bignum_header(orig_hend-orig_htop-1); + if (orig_htop + 1 < orig_hend) { + orig_hend[-1] = (Uint) (orig_htop - orig_heap); + p->flags |= F_ABANDONED_HEAP_USE; + } + } + p->abandoned_heap = orig_heap; + } + +#ifdef CHECK_FOR_HOLES + p->last_htop = p->htop; + p->heap_hfrag = hfrag; +#endif + + /* Make sure that we do a proper GC as soon as possible... */ + p->flags |= F_FORCE_GC; + return CONTEXT_REDS; +} + +static ERTS_FORCE_INLINE Uint +young_gen_usage(Process *p) +{ + Uint hsz; + Eterm *aheap; + + hsz = p->mbuf_sz; + aheap = p->abandoned_heap; + if (!aheap) + hsz += p->htop - p->heap; + else { + /* used in orig heap */ + if (p->flags & F_ABANDONED_HEAP_USE) + hsz += aheap[p->heap_sz-1]; + else + hsz += p->heap_sz; + /* Remove unused part in latest fragment */ + hsz -= p->hend - p->htop; + } + return hsz; +} + +#define ERTS_GET_ORIG_HEAP(Proc, Heap, HTop) \ + do { \ + Eterm *aheap__ = (Proc)->abandoned_heap; \ + if (!aheap__) { \ + (Heap) = (Proc)->heap; \ + (HTop) = (Proc)->htop; \ + } \ + else { \ + (Heap) = aheap__; \ + if ((Proc)->flags & F_ABANDONED_HEAP_USE) \ + (HTop) = aheap__ + aheap__[(Proc)->heap_sz-1]; \ + else \ + (HTop) = aheap__ + (Proc)->heap_sz; \ + } \ + } while (0) + /* * Garbage collect a process. * @@ -458,8 +531,9 @@ static ERTS_INLINE void reset_active_writer(Process *p) * objv: Array of terms to add to rootset; that is to preserve. * nobj: Number of objects in objv. */ -int -erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) +static int +garbage_collect(Process* p, ErlHeapFragment *live_hf_end, + int need, Eterm* objv, int nobj) { Uint reclaimed_now = 0; int done = 0; @@ -469,10 +543,11 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) DTRACE_CHARBUF(pidbuf, DTRACE_TERM_BUF_SIZE); #endif - if (p->flags & F_DISABLE_GC) { - ASSERT(need == 0); - return 1; - } + if (p->flags & F_DISABLE_GC) + return delay_garbage_collection(p, live_hf_end, need); + + if (p->live_hf_end != ERTS_INVALID_HFRAG_PTR) + live_hf_end = p->live_hf_end; esdp = erts_get_scheduler_data(); @@ -480,7 +555,7 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) trace_gc(p, am_gc_start); } - (void) erts_smp_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC); + erts_smp_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC); if (erts_system_monitor_long_gc != 0) start_time = erts_get_monotonic_time(esdp); @@ -505,11 +580,11 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) while (!done) { if ((FLAGS(p) & F_NEED_FULLSWEEP) != 0) { DTRACE2(gc_major_start, pidbuf, need); - done = major_collection(p, need, objv, nobj, &reclaimed_now); + done = major_collection(p, live_hf_end, need, objv, nobj, &reclaimed_now); DTRACE2(gc_major_end, pidbuf, reclaimed_now); } else { DTRACE2(gc_minor_start, pidbuf, need); - done = minor_collection(p, need, objv, nobj, &reclaimed_now); + done = minor_collection(p, live_hf_end, need, objv, nobj, &reclaimed_now); DTRACE2(gc_minor_end, pidbuf, reclaimed_now); } } @@ -551,6 +626,7 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) esdp->gc_info.reclaimed += reclaimed_now; FLAGS(p) &= ~F_FORCE_GC; + p->live_hf_end = ERTS_INVALID_HFRAG_PTR; #ifdef CHECK_FOR_HOLES /* @@ -583,6 +659,12 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) } } +int +erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) +{ + return garbage_collect(p, ERTS_INVALID_HFRAG_PTR, need, objv, nobj); +} + /* * Place all living data on a the new heap; deallocate any old heap. * Meant to be used by hibernate/3. @@ -606,16 +688,16 @@ erts_garbage_collect_hibernate(Process* p) */ erts_smp_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC); ErtsGcQuickSanityCheck(p); - ASSERT(p->mbuf_sz == 0); - ASSERT(p->mbuf == 0); + ASSERT(p->mbuf == NULL); ASSERT(p->stop == p->hend); /* Stack must be empty. */ + ASSERT(!p->abandoned_heap); /* * Do it. */ - heap_size = p->heap_sz + (p->old_htop - p->old_heap); + heap_size = p->heap_sz + (p->old_htop - p->old_heap) + p->mbuf_sz; heap = (Eterm*) ERTS_HEAP_ALLOC(ERTS_ALC_T_TMP_HEAP, sizeof(Eterm)*heap_size); htop = heap; @@ -624,8 +706,6 @@ erts_garbage_collect_hibernate(Process* p) 1, heap, htop, - (char *) p->heap, - (char *) p->htop - (char *) p->heap, (char *) p->old_heap, (char *) p->old_htop - (char *) p->old_heap, p->arg_reg, @@ -644,6 +724,7 @@ erts_garbage_collect_hibernate(Process* p) } FLAGS(p) &= ~F_FORCE_GC; + p->live_hf_end = ERTS_INVALID_HFRAG_PTR; /* * Move the heap to its final destination. @@ -663,6 +744,8 @@ erts_garbage_collect_hibernate(Process* p) sys_memcpy((void *) heap, (void *) p->heap, actual_size*sizeof(Eterm)); ERTS_HEAP_FREE(ERTS_ALC_T_TMP_HEAP, p->heap, p->heap_sz*sizeof(Eterm)); + remove_message_buffers(p); + p->stop = p->hend = heap + heap_size; offs = heap - p->heap; @@ -808,7 +891,6 @@ erts_garbage_collect_literals(Process* p, Eterm* literals, old_htop = sweep_literals_to_old_heap(p->heap, p->htop, old_htop, area, area_size); old_htop = sweep_literal_area(p->old_heap, old_htop, - (char *) p->heap, sizeof(Eterm)*p->heap_sz, (char *) p->old_heap, sizeof(Eterm)*old_heap_size, area, area_size); ASSERT(p->old_htop <= old_htop && old_htop <= p->old_hend); @@ -869,15 +951,18 @@ erts_garbage_collect_literals(Process* p, Eterm* literals, } static int -minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl) +minor_collection(Process* p, ErlHeapFragment *live_hf_end, + int need, Eterm* objv, int nobj, Uint *recl) { - Uint mature = HIGH_WATER(p) - HEAP_START(p); + Eterm *mature = p->abandoned_heap ? p->abandoned_heap : p->heap; + Uint mature_size = p->high_water - mature; + Uint size_before = young_gen_usage(p); /* * Allocate an old heap if we don't have one and if we'll need one. */ - if (OLD_HEAP(p) == NULL && mature != 0) { + if (OLD_HEAP(p) == NULL && mature_size != 0) { Eterm* n_old; /* Note: We choose a larger heap size than strictly needed, @@ -885,7 +970,7 @@ minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl) * This improved Estone by more than 1200 estones on my computer * (Ultra Sparc 10). */ - Uint new_sz = erts_next_heap_size(HEAP_TOP(p) - HEAP_START(p), 1); + Uint new_sz = erts_next_heap_size(size_before, 1); /* Create new, empty old_heap */ n_old = (Eterm *) ERTS_HEAP_ALLOC(ERTS_ALC_T_OLD_HEAP, @@ -901,41 +986,25 @@ minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl) */ if (OLD_HEAP(p) && - ((mature <= OLD_HEND(p) - OLD_HTOP(p)) && - ((BIN_VHEAP_MATURE(p) < ( BIN_OLD_VHEAP_SZ(p) - BIN_OLD_VHEAP(p)))) && - ((BIN_OLD_VHEAP_SZ(p) > BIN_OLD_VHEAP(p))) ) ) { - ErlMessage *msgp; - Uint size_after; - Uint need_after; - const Uint stack_size = STACK_SZ_ON_HEAP(p); - const Uint size_before = MBUF_SIZE(p) + (HEAP_TOP(p) - HEAP_START(p)); - Uint new_sz = HEAP_SIZE(p) + MBUF_SIZE(p) + combined_message_size(p); + ((mature_size <= OLD_HEND(p) - OLD_HTOP(p)) && + ((BIN_VHEAP_MATURE(p) < ( BIN_OLD_VHEAP_SZ(p) - BIN_OLD_VHEAP(p)))) && + ((BIN_OLD_VHEAP_SZ(p) > BIN_OLD_VHEAP(p))) ) ) { + Uint stack_size, size_after, need_after, new_sz; + + stack_size = p->hend - p->stop; + new_sz = stack_size + size_before; new_sz = next_heap_size(p, new_sz, 0); - do_minor(p, new_sz, objv, nobj); + do_minor(p, live_hf_end, (char *) mature, mature_size*sizeof(Eterm), + new_sz, objv, nobj); size_after = HEAP_TOP(p) - HEAP_START(p); *recl += (size_before - size_after); - /* - * Copy newly received message onto the end of the new heap. - */ - ErtsGcQuickSanityCheck(p); - for (msgp = p->msg.first; msgp; msgp = msgp->next) { - if (msgp->data.attached) { - ErtsHeapFactory factory; - erts_factory_proc_prealloc_init(&factory, p, - erts_msg_attached_data_size(msgp)); - erts_move_msg_attached_data_to_heap(&factory, msgp); - erts_factory_close(&factory); - ErtsGcQuickSanityCheck(p); - } - } ErtsGcQuickSanityCheck(p); GEN_GCS(p)++; need_after = ((HEAP_TOP(p) - HEAP_START(p)) - + erts_used_frag_sz(MBUF(p)) + need + stack_size); @@ -984,10 +1053,13 @@ minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl) ASSERT(MBUF(p) == NULL); return 1; } + + grow_new_heap(p, next_heap_size(p, need_after, 0), objv, nobj); + return 1; } /* - * Still not enough room after minor collection. Must force a major collection. + * Not enough room for a minor collection. Must force a major collection. */ FLAGS(p) |= F_NEED_FULLSWEEP; return 0; @@ -1044,7 +1116,9 @@ static ERTS_INLINE void offset_nstack(Process* p, Sint offs, #endif /* HIPE */ static void -do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj) +do_minor(Process *p, ErlHeapFragment *live_hf_end, + char *mature, Uint mature_size, + Uint new_sz, Eterm* objv, int nobj) { Rootset rootset; /* Rootset for GC (stack, dictionary, etc). */ Roots* roots; @@ -1053,9 +1127,6 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj) Eterm* ptr; Eterm val; Eterm gval; - char* heap = (char *) HEAP_START(p); - Uint heap_size = (char *) HEAP_TOP(p) - heap; - Uint mature_size = (char *) HIGH_WATER(p) - heap; Eterm* old_htop = OLD_HTOP(p); Eterm* n_heap; char* oh = (char *) OLD_HEAP(p); @@ -1064,8 +1135,13 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj) n_htop = n_heap = (Eterm*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP, sizeof(Eterm)*new_sz); - if (MBUF(p) != NULL) { - n_htop = collect_heap_frags(p, n_heap, n_htop, objv, nobj); + if (live_hf_end != ERTS_INVALID_HFRAG_PTR) { + /* + * Move heap frags that we know are completely live + * directly into the new young heap generation. + */ + n_htop = collect_live_heap_frags(p, live_hf_end, n_heap, n_htop, + objv, nobj); } n = setup_rootset(p, objv, nobj, &rootset); @@ -1088,11 +1164,9 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj) if (IS_MOVED_BOXED(val)) { ASSERT(is_boxed(val)); *g_ptr++ = val; - } else if (ErtsInArea(ptr, heap, mature_size)) { + } else if (ErtsInArea(ptr, mature, mature_size)) { MOVE_BOXED(ptr,val,old_htop,g_ptr++); - } else if (ERTS_IS_NEW_HEAP_PTR(gval, ptr, - heap, heap_size, - oh, oh_size)) { + } else if (ErtsInYoungGen(gval, ptr, oh, oh_size)) { MOVE_BOXED(ptr,val,n_htop,g_ptr++); } else { g_ptr++; @@ -1105,11 +1179,9 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj) val = *ptr; if (IS_MOVED_CONS(val)) { /* Moved */ *g_ptr++ = ptr[1]; - } else if (ErtsInArea(ptr, heap, mature_size)) { + } else if (ErtsInArea(ptr, mature, mature_size)) { MOVE_CONS(ptr,val,old_htop,g_ptr++); - } else if (ERTS_IS_NEW_HEAP_PTR(gval, ptr, - heap, heap_size, - oh, oh_size)) { + } else if (ErtsInYoungGen(gval, ptr, oh, oh_size)) { MOVE_CONS(ptr,val,n_htop,g_ptr++); } else { g_ptr++; @@ -1134,8 +1206,7 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj) */ if (mature_size == 0) { - n_htop = sweep_new_heap(n_heap, n_htop, heap, heap_size, - oh, oh_size); + n_htop = sweep_new_heap(n_heap, n_htop, oh, oh_size); } else { Eterm* n_hp = n_heap; Eterm* ptr; @@ -1152,11 +1223,9 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj) if (IS_MOVED_BOXED(val)) { ASSERT(is_boxed(val)); *n_hp++ = val; - } else if (ErtsInArea(ptr, heap, mature_size)) { + } else if (ErtsInArea(ptr, mature, mature_size)) { MOVE_BOXED(ptr,val,old_htop,n_hp++); - } else if (ERTS_IS_NEW_HEAP_PTR(gval, ptr, - heap, heap_size, - oh, oh_size)) { + } else if (ErtsInYoungGen(gval, ptr, oh, oh_size)) { MOVE_BOXED(ptr,val,n_htop,n_hp++); } else { n_hp++; @@ -1168,11 +1237,9 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj) val = *ptr; if (IS_MOVED_CONS(val)) { *n_hp++ = ptr[1]; - } else if (ErtsInArea(ptr, heap, mature_size)) { + } else if (ErtsInArea(ptr, mature, mature_size)) { MOVE_CONS(ptr,val,old_htop,n_hp++); - } else if (ERTS_IS_NEW_HEAP_PTR(gval, ptr, - heap, heap_size, - oh, oh_size)) { + } else if (ErtsInYoungGen(gval, ptr, oh, oh_size)) { MOVE_CONS(ptr,val,n_htop,n_hp++); } else { n_hp++; @@ -1192,12 +1259,10 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj) if (IS_MOVED_BOXED(val)) { *origptr = val; mb->base = binary_bytes(val); - } else if (ErtsInArea(ptr, heap, mature_size)) { + } else if (ErtsInArea(ptr, mature, mature_size)) { MOVE_BOXED(ptr,val,old_htop,origptr); mb->base = binary_bytes(mb->orig); - } else if (ERTS_IS_NEW_HEAP_PTR(*origptr, ptr, - heap, heap_size, - oh, oh_size)) { + } else if (ErtsInYoungGen(*origptr, ptr, oh, oh_size)) { MOVE_BOXED(ptr,val,n_htop,origptr); mb->base = binary_bytes(mb->orig); } @@ -1218,11 +1283,8 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj) * may point to the old (soon to be deleted) new_heap. */ - if (OLD_HTOP(p) < old_htop) { - old_htop = sweep_new_heap(OLD_HTOP(p), old_htop, - heap, heap_size, - oh, oh_size); - } + if (OLD_HTOP(p) < old_htop) + old_htop = sweep_new_heap(OLD_HTOP(p), old_htop, oh, oh_size); OLD_HTOP(p) = old_htop; HIGH_WATER(p) = n_htop; @@ -1254,8 +1316,12 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj) #endif ERTS_HEAP_FREE(ERTS_ALC_T_HEAP, - (void*)HEAP_START(p), + (p->abandoned_heap + ? p->abandoned_heap + : HEAP_START(p)), HEAP_SIZE(p) * sizeof(Eterm)); + p->abandoned_heap = NULL; + p->flags &= ~F_ABANDONED_HEAP_USE; HEAP_START(p) = n_heap; HEAP_TOP(p) = n_htop; HEAP_SIZE(p) = new_sz; @@ -1272,15 +1338,12 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj) */ static int -major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl) +major_collection(Process* p, ErlHeapFragment *live_hf_end, + int need, Eterm* objv, int nobj, Uint *recl) { - const Uint size_before = ((HEAP_TOP(p) - HEAP_START(p)) - + (OLD_HTOP(p) - OLD_HEAP(p)) - + MBUF_SIZE(p)); + Uint size_before, stack_size; Eterm* n_heap; Eterm* n_htop; - char* src = (char *) HEAP_START(p); - Uint src_size = (char *) HEAP_TOP(p) - src; char* oh = (char *) OLD_HEAP(p); Uint oh_size = (char *) OLD_HTOP(p) - oh; Uint new_sz, stk_sz; @@ -1290,9 +1353,11 @@ major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl) * to receive all live data. */ - new_sz = (HEAP_SIZE(p) + MBUF_SIZE(p) - + combined_message_size(p) - + (OLD_HTOP(p) - OLD_HEAP(p))); + size_before = young_gen_usage(p); + size_before += p->old_htop - p->old_heap; + stack_size = p->hend - p->stop; + + new_sz = stack_size + size_before; new_sz = next_heap_size(p, new_sz, 0); /* @@ -1306,16 +1371,16 @@ major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl) n_htop = n_heap = (Eterm *) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP, sizeof(Eterm)*new_sz); - /* - * Get rid of heap fragments. - */ - - if (MBUF(p) != NULL) { - n_htop = collect_heap_frags(p, n_heap, n_htop, objv, nobj); + if (live_hf_end != ERTS_INVALID_HFRAG_PTR) { + /* + * Move heap frags that we know are completely live + * directly into the heap. + */ + n_htop = collect_live_heap_frags(p, live_hf_end, n_heap, n_htop, + objv, nobj); } - n_htop = full_sweep_heaps(p, 0, n_heap, n_htop, src, src_size, - oh, oh_size, objv, nobj); + n_htop = full_sweep_heaps(p, 0, n_heap, n_htop, oh, oh_size, objv, nobj); /* Move the stack to the end of the heap */ stk_sz = HEAP_END(p) - p->stop; @@ -1332,8 +1397,12 @@ major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl) #endif ERTS_HEAP_FREE(ERTS_ALC_T_HEAP, - (void *) HEAP_START(p), + (p->abandoned_heap + ? p->abandoned_heap + : HEAP_START(p)), (HEAP_END(p) - HEAP_START(p)) * sizeof(Eterm)); + p->abandoned_heap = NULL; + p->flags &= ~F_ABANDONED_HEAP_USE; HEAP_START(p) = n_heap; HEAP_TOP(p) = n_htop; HEAP_SIZE(p) = new_sz; @@ -1346,24 +1415,6 @@ major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl) *recl += size_before - (HEAP_TOP(p) - HEAP_START(p)); - { - ErlMessage *msgp; - - /* - * Copy newly received message onto the end of the new heap. - */ - for (msgp = p->msg.first; msgp; msgp = msgp->next) { - if (msgp->data.attached) { - ErtsHeapFactory factory; - erts_factory_proc_prealloc_init(&factory, p, - erts_msg_attached_data_size(msgp)); - erts_move_msg_attached_data_to_heap(&factory, msgp); - erts_factory_close(&factory); - ErtsGcQuickSanityCheck(p); - } - } - } - adjust_after_fullsweep(p, need, objv, nobj); #ifdef HARDDEBUG @@ -1379,7 +1430,6 @@ static Eterm * full_sweep_heaps(Process *p, int hibernate, Eterm *n_heap, Eterm* n_htop, - char *h, Uint h_size, char *oh, Uint oh_size, Eterm *objv, int nobj) { @@ -1420,9 +1470,7 @@ full_sweep_heaps(Process *p, if (IS_MOVED_BOXED(val)) { ASSERT(is_boxed(val)); *g_ptr++ = val; - } else if (!ERTS_IS_LITERAL_PTR(gval, ptr, - h, h_size, - oh, oh_size)) { + } else if (!erts_is_literal(gval, ptr)) { MOVE_BOXED(ptr,val,n_htop,g_ptr++); } else { g_ptr++; @@ -1435,9 +1483,7 @@ full_sweep_heaps(Process *p, val = *ptr; if (IS_MOVED_CONS(val)) { *g_ptr++ = ptr[1]; - } else if (!ERTS_IS_LITERAL_PTR(gval, ptr, - h, h_size, - oh, oh_size)) { + } else if (!erts_is_literal(gval, ptr)) { MOVE_CONS(ptr,val,n_htop,g_ptr++); } else { g_ptr++; @@ -1462,7 +1508,7 @@ full_sweep_heaps(Process *p, * until all is copied. */ - n_htop = sweep_heaps(n_heap, n_htop, h, h_size, oh, oh_size); + n_htop = sweep_heaps(n_heap, n_htop, oh, oh_size); if (MSO(p).first) { sweep_off_heap(p, 1); @@ -1514,23 +1560,6 @@ adjust_after_fullsweep(Process *p, int need, Eterm *objv, int nobj) } /* - * Return the size of all message buffers that are NOT linked in the - * mbuf list. - */ -static Uint -combined_message_size(Process* p) -{ - Uint sz; - ErlMessage *msgp; - - for (sz = 0, msgp = p->msg.first; msgp; msgp = msgp->next) { - if (msgp->data.attached) - sz += erts_msg_attached_data_size(msgp); - } - return sz; -} - -/* * Remove all message buffers. */ static void @@ -1540,6 +1569,10 @@ remove_message_buffers(Process* p) free_message_buffer(MBUF(p)); MBUF(p) = NULL; } + if (p->msg_frag) { + erts_cleanup_messages(p->msg_frag); + p->msg_frag = NULL; + } MBUF_SIZE(p) = 0; } #ifdef HARDDEBUG @@ -1551,64 +1584,6 @@ remove_message_buffers(Process* p) * For performance reasons, we use _unchecked_list_val(), _unchecked_boxed_val(), * and so on to avoid a function call. */ - -static void -disallow_heap_frag_ref(Process* p, Eterm* n_htop, Eterm* objv, int nobj) -{ - ErlHeapFragment* mbuf; - ErlHeapFragment* qb; - Eterm gval; - Eterm* ptr; - Eterm val; - - ASSERT(p->htop != NULL); - mbuf = MBUF(p); - - while (nobj--) { - gval = *objv; - - switch (primary_tag(gval)) { - - case TAG_PRIMARY_BOXED: { - ptr = _unchecked_boxed_val(gval); - val = *ptr; - if (IS_MOVED_BOXED(val)) { - ASSERT(is_boxed(val)); - objv++; - } else { - for (qb = mbuf; qb != NULL; qb = qb->next) { - if (ErtsInArea(ptr, qb->mem, qb->alloc_size*sizeof(Eterm))) { - abort(); - } - } - objv++; - } - break; - } - - case TAG_PRIMARY_LIST: { - ptr = _unchecked_list_val(gval); - val = *ptr; - if (IS_MOVED_CONS(val)) { - objv++; - } else { - for (qb = mbuf; qb != NULL; qb = qb->next) { - if (ErtsInArea(ptr, qb->mem, qb->alloc_size*sizeof(Eterm))) { - abort(); - } - } - objv++; - } - break; - } - - default: { - objv++; - break; - } - } - } -} static void disallow_heap_frag_ref_in_heap(Process* p) @@ -1737,7 +1712,6 @@ typedef enum { static ERTS_FORCE_INLINE Eterm * sweep(Eterm *n_hp, Eterm *n_htop, ErtsSweepType type, - char *h, Uint hsz, char *oh, Uint ohsz, char *src, Uint src_size) { @@ -1749,14 +1723,10 @@ sweep(Eterm *n_hp, Eterm *n_htop, #define ERTS_IS_IN_SWEEP_AREA(TPtr, Ptr) \ (type == ErtsSweepHeaps \ - ? !ERTS_IS_LITERAL_PTR((TPtr), (Ptr), h, hsz, oh, ohsz) \ + ? !erts_is_literal((TPtr), (Ptr)) \ : (type == ErtsSweepNewHeap \ - ? ERTS_IS_NEW_HEAP_PTR((TPtr), (Ptr), h, hsz, oh, ohsz) \ - : (ErtsInArea((Ptr), src, src_size) \ - ? (ERTS_GC_ASSERT(erts_is_literal((TPtr), (Ptr)) \ - && !ErtsInArea((Ptr), h, hsz) \ - && !ErtsInArea((Ptr), oh, ohsz)), 1) \ - : 0))) + ? ErtsInYoungGen((TPtr), (Ptr), oh, ohsz) \ + : ErtsInArea((Ptr), src, src_size))) while (n_hp != n_htop) { ASSERT(n_hp < n_htop); @@ -1820,38 +1790,30 @@ sweep(Eterm *n_hp, Eterm *n_htop, } static Eterm * -sweep_new_heap(Eterm *n_hp, Eterm *n_htop, - char* new_heap, Uint new_heap_size, - char* old_heap, Uint old_heap_size) +sweep_new_heap(Eterm *n_hp, Eterm *n_htop, char* old_heap, Uint old_heap_size) { return sweep(n_hp, n_htop, ErtsSweepNewHeap, - new_heap, new_heap_size, old_heap, old_heap_size, NULL, 0); } static Eterm * -sweep_heaps(Eterm *n_hp, Eterm *n_htop, - char* new_heap, Uint new_heap_size, - char* old_heap, Uint old_heap_size) +sweep_heaps(Eterm *n_hp, Eterm *n_htop, char* old_heap, Uint old_heap_size) { return sweep(n_hp, n_htop, ErtsSweepHeaps, - new_heap, new_heap_size, old_heap, old_heap_size, NULL, 0); } static Eterm * sweep_literal_area(Eterm *n_hp, Eterm *n_htop, - char* new_heap, Uint new_heap_size, char* old_heap, Uint old_heap_size, char* src, Uint src_size) { return sweep(n_hp, n_htop, ErtsSweepLiteralArea, - new_heap, new_heap_size, old_heap, old_heap_size, src, src_size); } @@ -1956,32 +1918,21 @@ move_one_area(Eterm* n_htop, char* src, Uint src_size) */ static Eterm* -collect_heap_frags(Process* p, Eterm* n_hstart, Eterm* n_htop, - Eterm* objv, int nobj) +collect_live_heap_frags(Process* p, ErlHeapFragment *live_hf_end, + Eterm* n_hstart, Eterm* n_htop, + Eterm* objv, int nobj) { ErlHeapFragment* qb; char* frag_begin; Uint frag_size; /* - * We don't allow references to a heap fragments from the stack, heap, - * or process dictionary. - */ -#ifdef HARDDEBUG - disallow_heap_frag_ref(p, n_htop, p->stop, STACK_START(p) - p->stop); - if (p->dictionary != NULL) { - disallow_heap_frag_ref(p, n_htop, p->dictionary->data, p->dictionary->used); - } - disallow_heap_frag_ref_in_heap(p); -#endif - - /* * Move the heap fragments to the new heap. Note that no GC is done on * the heap fragments. Any garbage will thus be moved as well and survive * until next GC. */ qb = MBUF(p); - while (qb != NULL) { + while (qb != live_hf_end) { ASSERT(!qb->off_heap.first); /* process fragments use the MSO(p) list */ frag_size = qb->used_size * sizeof(Eterm); if (frag_size != 0) { @@ -1996,9 +1947,7 @@ collect_heap_frags(Process* p, Eterm* n_hstart, Eterm* n_htop, static Uint setup_rootset(Process *p, Eterm *objv, int nobj, Rootset *rootset) { - Uint avail; Roots* roots; - ErlMessage* mp; Uint n; n = 0; @@ -2076,31 +2025,48 @@ setup_rootset(Process *p, Eterm *objv, int nobj, Rootset *rootset) ASSERT(n <= rootset->size); - mp = p->msg.first; - avail = rootset->size - n; - while (mp != NULL) { - if (avail == 0) { - Uint new_size = 2*rootset->size; - if (roots == rootset->def) { - roots = erts_alloc(ERTS_ALC_T_ROOTSET, - new_size*sizeof(Roots)); - sys_memcpy(roots, rootset->def, sizeof(rootset->def)); - } else { - roots = erts_realloc(ERTS_ALC_T_ROOTSET, - (void *) roots, - new_size*sizeof(Roots)); - } + switch (p->flags & (F_OFF_HEAP_MSGQ|F_OFF_HEAP_MSGQ_CHNG)) { + case F_OFF_HEAP_MSGQ|F_OFF_HEAP_MSGQ_CHNG: + (void) erts_move_messages_off_heap(p); + case F_OFF_HEAP_MSGQ: + break; + case F_OFF_HEAP_MSGQ_CHNG: + case 0: { + /* + * Off heap message queue disabled, i.e. we may + * have references from the message queue to the + * heap... + */ + ErtsMessage *mp; + + /* Ensure large enough rootset... */ + if (n + p->msg.len > rootset->size) { + Uint new_size = n + p->msg.len; + ERTS_GC_ASSERT(roots == rootset->def); + roots = erts_alloc(ERTS_ALC_T_ROOTSET, + new_size*sizeof(Roots)); + sys_memcpy(roots, rootset->def, n*sizeof(Roots)); rootset->size = new_size; - avail = new_size - n; } - if (mp->data.attached == NULL) { - roots[n].v = mp->m; - roots[n].sz = 2; - n++; - avail--; + + for (mp = p->msg.first; mp; mp = mp->next) { + + if (!mp->data.attached) { + /* + * Message may refer data on heap; + * add it to rootset... + */ + roots[n].v = mp->m; + roots[n].sz = ERL_MESSAGE_REF_ARRAY_SZ; + n++; + } } - mp = mp->next; + break; + } } + + ASSERT(rootset->size >= n); + rootset->roots = roots; rootset->num_roots = n; return n; @@ -2569,7 +2535,7 @@ offset_off_heap(Process* p, Sint offs, char* area, Uint area_size) static void offset_mqueue(Process *p, Sint offs, char* area, Uint area_size) { - ErlMessage* mp = p->msg.first; + ErtsMessage* mp = p->msg.first; while (mp != NULL) { Eterm mesg = ERL_MESSAGE_TERM(mp); @@ -2656,7 +2622,7 @@ reply_gc_info(void *vgcirp) Eterm **hpp; Uint sz, *szp; ErlOffHeap *ohp = NULL; - ErlHeapFragment *bp = NULL; + ErtsMessage *mp = NULL; ASSERT(esdp); @@ -2682,12 +2648,13 @@ reply_gc_info(void *vgcirp) if (hpp) break; - hp = erts_alloc_message_heap(sz, &bp, &ohp, rp, &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, sz, &hp, &ohp); + szp = NULL; hpp = &hp; } - erts_queue_message(rp, &rp_locks, bp, msg, NIL); + erts_queue_message(rp, &rp_locks, mp, msg, NIL); if (gcirp->req_sched == esdp->no) rp_locks &= ~ERTS_PROC_LOCK_MAIN; @@ -2739,36 +2706,49 @@ erts_gc_info_request(Process *c_p) static int within2(Eterm *ptr, Process *p, Eterm *real_htop) { - ErlHeapFragment* bp = MBUF(p); - ErlMessage* mp = p->msg.first; - Eterm *htop = real_htop ? real_htop : HEAP_TOP(p); + ErlHeapFragment* bp; + ErtsMessage* mp; + Eterm *htop, *heap; + + if (p->abandoned_heap) + ERTS_GET_ORIG_HEAP(p, heap, htop); + else { + heap = p->heap; + htop = real_htop ? real_htop : HEAP_TOP(p); + } if (OLD_HEAP(p) && (OLD_HEAP(p) <= ptr && ptr < OLD_HEND(p))) { return 1; } - if (HEAP_START(p) <= ptr && ptr < htop) { + if (heap <= ptr && ptr < htop) { return 1; } - while (bp != NULL) { - if (bp->mem <= ptr && ptr < bp->mem + bp->used_size) { - return 1; - } - bp = bp->next; - } + + mp = p->msg_frag; + bp = p->mbuf; + + if (bp) + goto search_heap_frags; + while (mp) { - if (mp->data.attached) { - ErlHeapFragment *hfp; - if (is_value(ERL_MESSAGE_TERM(mp))) - hfp = mp->data.heap_frag; - else if (is_not_nil(ERL_MESSAGE_TOKEN(mp))) - hfp = erts_dist_ext_trailer(mp->data.dist_ext); - else - hfp = NULL; - if (hfp && hfp->mem <= ptr && ptr < hfp->mem + hfp->used_size) + + if (mp->data.attached == ERTS_MSG_COMBINED_HFRAG) + bp = &mp->hfrag; + else + bp = mp->data.heap_frag; + + mp = mp->next; + + search_heap_frags: + + while (bp) { + if (bp->mem <= ptr && ptr < bp->mem + bp->used_size) { return 1; + } + bp = bp->next; } - mp = mp->next; } + return 0; } @@ -2790,11 +2770,11 @@ do { \ __FILE__, __LINE__, #EXP); \ } while (0) + #ifdef ERTS_OFFHEAP_DEBUG_CHK_CIRCULAR_LIST # define ERTS_OFFHEAP_VISITED_BIT ((Eterm) 1 << 31) #endif - void erts_check_off_heap2(Process *p, Eterm *htop) { @@ -2823,7 +2803,7 @@ erts_check_off_heap2(Process *p, Eterm *htop) } ERTS_CHK_OFFHEAP_ASSERT(refc >= 1); #ifdef ERTS_OFFHEAP_DEBUG_CHK_CIRCULAR_LIST - ERTS_CHK_OFFHEAP_ASSERT(!(u.hdr->thing_word & ERTS_EXTERNAL_VISITED_BIT)); + ERTS_CHK_OFFHEAP_ASSERT(!(u.hdr->thing_word & ERTS_OFFHEAP_VISITED_BIT)); u.hdr->thing_word |= ERTS_OFFHEAP_VISITED_BIT; #endif if (old) { @@ -2836,7 +2816,7 @@ erts_check_off_heap2(Process *p, Eterm *htop) } } -#ifdef ERTS_OFFHEAP_DEBUG_CHK_CIRCULAR_EXTERNAL_LIST +#ifdef ERTS_OFFHEAP_DEBUG_CHK_CIRCULAR_LIST for (u.hdr = MSO(p).first; u.hdr; u.hdr = u.hdr->next) u.hdr->thing_word &= ~ERTS_OFFHEAP_VISITED_BIT; #endif diff --git a/erts/emulator/beam/erl_gc.h b/erts/emulator/beam/erl_gc.h index fdbf948f9d..a496c5f008 100644 --- a/erts/emulator/beam/erl_gc.h +++ b/erts/emulator/beam/erl_gc.h @@ -69,13 +69,14 @@ do { \ while (nelts--) *HTOP++ = *PTR++; \ } while(0) -#define in_area(ptr,start,nbytes) \ - ((UWord)((char*)(ptr) - (char*)(start)) < (nbytes)) - #if defined(DEBUG) || defined(ERTS_OFFHEAP_DEBUG) int within(Eterm *ptr, Process *p); #endif +#define ErtsInYoungGen(TPtr, Ptr, OldHeap, OldHeapSz) \ + (!erts_is_literal((TPtr), (Ptr)) \ + & !ErtsInArea((Ptr), (OldHeap), (OldHeapSz))) + ERTS_GLB_INLINE Eterm follow_moved(Eterm term, Eterm xptr_tag); #if ERTS_GLB_INLINE_INCL_FUNC_DEF @@ -98,6 +99,7 @@ ERTS_GLB_INLINE Eterm follow_moved(Eterm term, Eterm xptr_tag) } return term; } + #endif #endif /* ERL_GC_C__ || HIPE_GC_C__ */ @@ -106,6 +108,23 @@ ERTS_GLB_INLINE Eterm follow_moved(Eterm term, Eterm xptr_tag) * Global exported */ +#define ERTS_IS_GC_DESIRED_INTERNAL(Proc, HTop, STop) \ + ((((STop) - (HTop) < (Proc)->mbuf_sz)) \ + | ((Proc)->off_heap.overhead > (Proc)->bin_vheap_sz) \ + | !!((Proc)->flags & F_FORCE_GC)) + +#define ERTS_IS_GC_DESIRED(Proc) \ + ERTS_IS_GC_DESIRED_INTERNAL((Proc), (Proc)->htop, (Proc)->stop) + +#define ERTS_FORCE_GC_INTERNAL(Proc, FCalls) \ + do { \ + (Proc)->flags |= F_FORCE_GC; \ + ERTS_VBUMP_ALL_REDS_INTERNAL((Proc), (FCalls)); \ + } while (0) + +#define ERTS_FORCE_GC(Proc) \ + ERTS_FORCE_GC_INTERNAL((Proc), (Proc)->fcalls) + extern Uint erts_test_long_gc_sleep; typedef struct { @@ -117,6 +136,8 @@ void erts_gc_info(ErtsGCInfo *gcip); void erts_init_gc(void); int erts_garbage_collect(struct process*, int, Eterm*, int); void erts_garbage_collect_hibernate(struct process* p); +Eterm erts_gc_after_bif_call_lhf(struct process* p, ErlHeapFragment *live_hf_end, + Eterm result, Eterm* regs, Uint arity); Eterm erts_gc_after_bif_call(struct process* p, Eterm result, Eterm* regs, Uint arity); void erts_garbage_collect_literals(struct process* p, Eterm* literals, Uint lit_size, diff --git a/erts/emulator/beam/erl_hl_timer.c b/erts/emulator/beam/erl_hl_timer.c index 51a0d68247..6853278828 100644 --- a/erts/emulator/beam/erl_hl_timer.c +++ b/erts/emulator/beam/erl_hl_timer.c @@ -1245,7 +1245,9 @@ hlt_bif_timer_timeout(ErtsHLTimer *tmr, Uint32 roflgs) * the middle of tree destruction). */ if (!ERTS_PROC_IS_EXITING(proc)) { - erts_queue_message(proc, &proc_locks, tmr->btm.bp, + ErtsMessage *mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = tmr->btm.bp; + erts_queue_message(proc, &proc_locks, mp, tmr->btm.message, NIL); erts_smp_proc_unlock(proc, ERTS_PROC_LOCKS_MSG_SEND); queued_message = 1; @@ -1926,36 +1928,31 @@ access_sched_local_btm(Process *c_p, Eterm pid, if (proc) { Uint hsz; - ErlOffHeap *ohp; - ErlHeapFragment* bp; + ErtsMessage *mp; Eterm *hp, msg, ref, result; + ErlOffHeap *ohp; + Uint32 *refn; #ifdef ERTS_HLT_DEBUG Eterm *hp_end; #endif - hsz = 3; /* 2-tuple */ - if (!async) - hsz += REF_THING_SIZE; + hsz = REF_THING_SIZE; + if (async) { + refn = trefn; /* timer ref */ + hsz += 4; /* 3-tuple */ + } else { - if (is_non_value(tref) || proc != c_p) - hsz += REF_THING_SIZE; - hsz += 1; /* upgrade to 3-tuple */ + refn = rrefn; /* request ref */ + hsz += 3; /* 2-tuple */ } + + ERTS_HLT_ASSERT(refn); + if (time_left > (Sint64) MAX_SMALL) hsz += ERTS_SINT64_HEAP_SIZE(time_left); - if (proc == c_p) { - bp = NULL; - ohp = NULL; - hp = HAlloc(c_p, hsz); - } - else { - hp = erts_alloc_message_heap(hsz, - &bp, - &ohp, - proc, - &proc_locks); - } + mp = erts_alloc_message_heap(proc, &proc_locks, + hsz, &hp, &ohp); #ifdef ERTS_HLT_DEBUG hp_end = hp + hsz; @@ -1968,35 +1965,22 @@ access_sched_local_btm(Process *c_p, Eterm pid, else result = erts_sint64_to_big(time_left, &hp); - if (!async) { - write_ref_thing(hp, - rrefn[0], - rrefn[1], - rrefn[2]); - ref = make_internal_ref(hp); - hp += REF_THING_SIZE; - msg = TUPLE2(hp, ref, result); + write_ref_thing(hp, + refn[0], + refn[1], + refn[2]); + ref = make_internal_ref(hp); + hp += REF_THING_SIZE; - ERTS_HLT_ASSERT(hp + 3 == hp_end); - } - else { - Eterm tag = cancel ? am_cancel_timer : am_read_timer; - if (is_value(tref) && proc == c_p) - ref = tref; - else { - write_ref_thing(hp, - trefn[0], - trefn[1], - trefn[2]); - ref = make_internal_ref(hp); - hp += REF_THING_SIZE; - } - msg = TUPLE3(hp, tag, ref, result); + msg = (async + ? TUPLE3(hp, (cancel + ? am_cancel_timer + : am_read_timer), ref, result) + : TUPLE2(hp, ref, result)); - ERTS_HLT_ASSERT(hp + 4 == hp_end); + ERTS_HLT_ASSERT(hp + (async ? 4 : 3) == hp_end); - } - erts_queue_message(proc, &proc_locks, bp, msg, NIL); + erts_queue_message(proc, &proc_locks, mp, msg, NIL); if (c_p) proc_locks &= ~ERTS_PROC_LOCK_MAIN; @@ -2093,16 +2077,19 @@ try_access_sched_remote_btm(ErtsSchedulerData *esdp, } } else { + ErtsMessage *mp; Eterm tag, res, msg; Uint hsz; Eterm *hp; ErtsProcLocks proc_locks = ERTS_PROC_LOCK_MAIN; + ErlOffHeap *ohp; hsz = 4; if (time_left > (Sint64) MAX_SMALL) hsz += ERTS_SINT64_HEAP_SIZE(time_left); - hp = HAlloc(c_p, hsz); + mp = erts_alloc_message_heap(c_p, &proc_locks, + hsz, &hp, &ohp); if (cancel) tag = am_cancel_timer; else @@ -2117,7 +2104,7 @@ try_access_sched_remote_btm(ErtsSchedulerData *esdp, msg = TUPLE3(hp, tag, tref, res); - erts_queue_message(c_p, &proc_locks, NULL, msg, NIL); + erts_queue_message(c_p, &proc_locks, mp, msg, NIL); proc_locks &= ~ERTS_PROC_LOCK_MAIN; if (proc_locks) diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index 5c209a4af2..f396a0a156 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -431,7 +431,7 @@ erl_first_process_otp(char* modname, void* code, unsigned size, int argc, char** hp += 2; args = CONS(hp, env, args); - so.flags = SPO_SYSTEM_PROC; + so.flags = erts_default_spo_flags|SPO_SYSTEM_PROC; res = erl_create_process(&parent, start_mod, am_start, args, &so); erts_smp_proc_unlock(&parent, ERTS_PROC_LOCK_MAIN); erts_cleanup_empty_process(&parent); @@ -630,6 +630,7 @@ void erts_usage(void) erts_fprintf(stderr, "-W<i|w|e> set error logger warnings mapping,\n"); erts_fprintf(stderr, " see error_logger documentation for details\n"); + erts_fprintf(stderr, "-xohmq bool set default off_heap_message_queue flag for processes\n"); erts_fprintf(stderr, "-zdbbl size set the distribution buffer busy limit in kilobytes\n"); erts_fprintf(stderr, " valid range is [1-%d]\n", INT_MAX/1024); erts_fprintf(stderr, "-zdntgc time set delayed node table gc in seconds\n"); @@ -2015,6 +2016,26 @@ erl_start(int argc, char **argv) } break; + case 'x': { + char *sub_param = argv[i]+2; + if (has_prefix("ohmq", sub_param)) { + arg = get_arg(sub_param+4, argv[i+1], &i); + if (sys_strcmp(arg, "true") == 0) + erts_default_spo_flags |= SPO_OFF_HEAP_MSGQ; + else if (sys_strcmp(arg, "false") == 0) + erts_default_spo_flags &= ~SPO_OFF_HEAP_MSGQ; + else { + erts_fprintf(stderr, + "Invalid off_heap_message_queue flag: %s\n", arg); + erts_usage(); + } + } else { + erts_fprintf(stderr, "bad -x option %s\n", argv[i]); + erts_usage(); + } + break; + } + case 'z': { char *sub_param = argv[i]+2; @@ -2068,7 +2089,8 @@ erl_start(int argc, char **argv) "Invalid ets busy wait threshold: %s\n", arg); erts_usage(); } - } else { + } + else { erts_fprintf(stderr, "bad -z option %s\n", argv[i]); erts_usage(); } diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index e23c79d301..79739501a8 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -33,8 +33,8 @@ #include "erl_binary.h" #include "dtrace-wrapper.h" -ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message, - ErlMessage, +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message_ref, + ErtsMessageRef, ERL_MESSAGE_BUF_SZ, ERTS_ALC_T_MSG_REF) @@ -44,27 +44,20 @@ ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message, #undef HARD_DEBUG #endif - - - -#ifdef DEBUG -static ERTS_INLINE int in_heapfrag(const Eterm* ptr, const ErlHeapFragment *bp) +void +init_message(void) { - return ((unsigned)(ptr - bp->mem) < bp->used_size); + init_message_ref_alloc(); } -#endif - -void -init_message(void) +void *erts_alloc_message_ref(void) { - init_message_alloc(); + return (void *) message_ref_alloc(); } -void -free_message(ErlMessage* mp) +void erts_free_message_ref(void *mp) { - message_free(mp); + message_ref_free((ErtsMessageRef *) mp); } /* Allocate message buffer (size in words) */ @@ -74,7 +67,7 @@ new_message_buffer(Uint size) ErlHeapFragment* bp; bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP_FRAG, ERTS_HEAP_FRAG_SIZE(size)); - ERTS_INIT_HEAP_FRAG(bp, size); + ERTS_INIT_HEAP_FRAG(bp, size, size); return bp; } @@ -203,83 +196,87 @@ free_message_buffer(ErlHeapFragment* bp) }while (bp != NULL); } -static ERTS_INLINE void -link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp) +void +erts_cleanup_messages(ErtsMessage *msgp) { - if (bp) { - /* Link the message buffer */ - bp->next = MBUF(proc); - MBUF(proc) = bp; - MBUF_SIZE(proc) += bp->used_size; - FLAGS(proc) |= F_FORCE_GC; - - /* Move any off_heap's into the process */ - if (bp->off_heap.first != NULL) { - struct erl_off_heap_header** next_p = &bp->off_heap.first; - while (*next_p != NULL) { - next_p = &((*next_p)->next); + ErtsMessage *mp = msgp; + while (mp) { + ErtsMessage *fmp; + ErlHeapFragment *bp; + if (is_non_value(ERL_MESSAGE_TERM(mp))) { + if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) { + bp = (ErlHeapFragment *) mp->data.dist_ext->ext_endp; + erts_cleanup_offheap(&bp->off_heap); } - *next_p = MSO(proc).first; - MSO(proc).first = bp->off_heap.first; - bp->off_heap.first = NULL; - OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead); + if (mp->data.dist_ext) + erts_free_dist_ext_copy(mp->data.dist_ext); } + else { + if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) + bp = mp->data.heap_frag; + else { + bp = mp->hfrag.next; + erts_cleanup_offheap(&mp->hfrag.off_heap); + } + if (bp) + free_message_buffer(bp); + } + fmp = mp; + mp = mp->next; + erts_free_message(fmp); } } -Eterm -erts_msg_distext2heap(Process *pp, - ErtsProcLocks *plcksp, - ErlHeapFragment **bpp, - Eterm *tokenp, - ErtsDistExternal *dist_extp) +ErtsMessage * +erts_realloc_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_size) { - Eterm msg; - Uint tok_sz = 0; - Eterm *hp = NULL; - ErtsHeapFactory factory; - Sint sz; - - *bpp = NULL; - sz = erts_decode_dist_ext_size(dist_extp); - if (sz < 0) - goto decode_error; - if (is_not_nil(*tokenp)) { - ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp); - tok_sz = heap_frag->used_size; - sz += tok_sz; - } - if (pp) { - ErlOffHeap *ohp; - hp = erts_alloc_message_heap(sz, bpp, &ohp, pp, plcksp); - } - else { - *bpp = new_message_buffer(sz); - hp = (*bpp)->mem; - } - erts_factory_message_init(&factory, pp, hp, *bpp); - msg = erts_decode_dist_ext(&factory, dist_extp); - if (is_non_value(msg)) - goto decode_error; - if (is_not_nil(*tokenp)) { - ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp); - hp = erts_produce_heap(&factory, tok_sz, 0); - *tokenp = copy_struct(*tokenp, tok_sz, &hp, factory.off_heap); - erts_cleanup_offheap(&heap_frag->off_heap); + ErtsMessage *nmp = erts_realloc(ERTS_ALC_T_MSG, mp, + sizeof(ErtsMessage) + (sz - 1)*sizeof(Eterm)); + if (nmp != mp) { + Eterm *sp = &mp->hfrag.mem[0]; + Eterm *ep = sp + sz; + Sint offs = &nmp->hfrag.mem[0] - sp; + erts_offset_off_heap(&nmp->hfrag.off_heap, offs, sp, ep); + erts_offset_heap(&nmp->hfrag.mem[0], sz, offs, sp, ep); + if (brefs && brefs_size) + erts_offset_heap_ptr(brefs, brefs_size, offs, sp, ep); } - erts_free_dist_ext_copy(dist_extp); - erts_factory_close(&factory); - return msg; - decode_error: - if (is_not_nil(*tokenp)) { - ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp); - erts_cleanup_offheap(&heap_frag->off_heap); + nmp->hfrag.used_size = sz; + nmp->hfrag.alloc_size = sz; + + return nmp; +} + +void +erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp) +{ + if (first_bp) { + ErlHeapFragment *bp = first_bp; + + while (1) { + /* Move any off_heap's into the process */ + if (bp->off_heap.first != NULL) { + struct erl_off_heap_header** next_p = &bp->off_heap.first; + while (*next_p != NULL) { + next_p = &((*next_p)->next); + } + *next_p = MSO(proc).first; + MSO(proc).first = bp->off_heap.first; + bp->off_heap.first = NULL; + OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead); + } + MBUF_SIZE(proc) += bp->used_size; + if (!bp->next) + break; + bp = bp->next; + } + + /* Link the message buffer */ + bp->next = MBUF(proc); + MBUF(proc) = first_bp; } - erts_free_dist_ext_copy(dist_extp); - *bpp = NULL; - return THE_NON_VALUE; - } +} void erts_queue_dist_message(Process *rcvr, @@ -287,7 +284,7 @@ erts_queue_dist_message(Process *rcvr, ErtsDistExternal *dist_ext, Eterm token) { - ErlMessage* mp; + ErtsMessage* mp; #ifdef USE_VM_PROBES Sint tok_label = 0; Sint tok_lastcnt = 0; @@ -299,7 +296,17 @@ erts_queue_dist_message(Process *rcvr, ERTS_SMP_LC_ASSERT(*rcvr_locks == erts_proc_lc_my_proc_locks(rcvr)); - mp = message_alloc(); + mp = erts_alloc_message(0, NULL); + mp->data.dist_ext = dist_ext; + + ERL_MESSAGE_TERM(mp) = THE_NON_VALUE; +#ifdef USE_VM_PROBES + ERL_MESSAGE_DT_UTAG(mp) = NIL; + if (token == am_have_dt_utag) + ERL_MESSAGE_TOKEN(mp) = NIL; + else +#endif + ERL_MESSAGE_TOKEN(mp) = token; #ifdef ERTS_SMP if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) { @@ -318,58 +325,40 @@ erts_queue_dist_message(Process *rcvr, if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); /* Drop message if receiver is exiting or has a pending exit ... */ - if (is_not_nil(token)) { - ErlHeapFragment *heap_frag; - heap_frag = erts_dist_ext_trailer(mp->data.dist_ext); - erts_cleanup_offheap(&heap_frag->off_heap); - } - erts_free_dist_ext_copy(dist_ext); - message_free(mp); + erts_cleanup_messages(mp); } else #endif if (IS_TRACED_FL(rcvr, F_TRACE_RECEIVE)) { /* Ahh... need to decode it in order to trace it... */ - ErlHeapFragment *mbuf; - Eterm msg; if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); - message_free(mp); - msg = erts_msg_distext2heap(rcvr, rcvr_locks, &mbuf, &token, dist_ext); - if (is_value(msg)) + if (!erts_decode_dist_message(rcvr, *rcvr_locks, mp, 0)) + erts_free_message(mp); + else { + Eterm msg = ERL_MESSAGE_TERM(mp); + token = ERL_MESSAGE_TOKEN(mp); #ifdef USE_VM_PROBES - if (DTRACE_ENABLED(message_queued)) { - DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); - - dtrace_proc_str(rcvr, receiver_name); - if (token != NIL && token != am_have_dt_utag) { - tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); - tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); - tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); - } - DTRACE6(message_queued, - receiver_name, size_object(msg), rcvr->msg.len, - tok_label, tok_lastcnt, tok_serial); - } + if (DTRACE_ENABLED(message_queued)) { + DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); + + dtrace_proc_str(rcvr, receiver_name); + if (token != NIL && token != am_have_dt_utag) { + tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); + tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); + tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); + } + DTRACE6(message_queued, + receiver_name, size_object(msg), rcvr->msg.len, + tok_label, tok_lastcnt, tok_serial); + } #endif - erts_queue_message(rcvr, rcvr_locks, mbuf, msg, token); + erts_queue_message(rcvr, rcvr_locks, mp, msg, token); + } } else { /* Enqueue message on external format */ - ERL_MESSAGE_TERM(mp) = THE_NON_VALUE; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = NIL; - if (token == am_have_dt_utag) { - ERL_MESSAGE_TOKEN(mp) = NIL; - } else { -#endif - ERL_MESSAGE_TOKEN(mp) = token; -#ifdef USE_VM_PROBES - } -#endif - mp->next = NULL; - #ifdef USE_VM_PROBES if (DTRACE_ENABLED(message_queued)) { DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); @@ -388,7 +377,7 @@ erts_queue_dist_message(Process *rcvr, tok_label, tok_lastcnt, tok_serial); } #endif - mp->data.dist_ext = dist_ext; + LINK_MESSAGE(rcvr, mp); if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) @@ -408,9 +397,9 @@ erts_queue_dist_message(Process *rcvr, static Sint queue_message(Process *c_p, Process* receiver, - ErtsProcLocks *receiver_locks, erts_aint32_t *receiver_state, - ErlHeapFragment* bp, + ErtsProcLocks *receiver_locks, + ErtsMessage* mp, Eterm message, Eterm seq_trace_token #ifdef USE_VM_PROBES @@ -419,31 +408,24 @@ queue_message(Process *c_p, ) { Sint res; - ErlMessage* mp; int locked_msgq = 0; - erts_aint_t state; - -#ifndef ERTS_SMP - ASSERT(bp != NULL || receiver->mbuf == NULL); -#endif + erts_aint32_t state; ERTS_SMP_LC_ASSERT(*receiver_locks == erts_proc_lc_my_proc_locks(receiver)); - mp = message_alloc(); - - if (receiver_state) - state = *receiver_state; - else - state = erts_smp_atomic32_read_acqb(&receiver->state); - #ifdef ERTS_SMP - if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) - goto exiting; - if (!(*receiver_locks & ERTS_PROC_LOCK_MSGQ)) { if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) { ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ; + + if (receiver_state) + state = *receiver_state; + else + state = erts_smp_atomic32_read_nob(&receiver->state); + if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) + goto exiting; + if (*receiver_locks & ERTS_PROC_LOCK_STATUS) { erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS); need_locks |= ERTS_PROC_LOCK_STATUS; @@ -451,13 +433,12 @@ queue_message(Process *c_p, erts_smp_proc_lock(receiver, need_locks); } locked_msgq = 1; - state = erts_smp_atomic32_read_nob(&receiver->state); - if (receiver_state) - *receiver_state = state; } #endif + state = erts_smp_atomic32_read_nob(&receiver->state); + if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) { #ifdef ERTS_SMP exiting: @@ -465,9 +446,7 @@ queue_message(Process *c_p, /* Drop message if receiver is exiting or has a pending exit... */ if (locked_msgq) erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ); - if (bp) - free_message_buffer(bp); - message_free(mp); + erts_cleanup_messages(mp); return 0; } @@ -476,13 +455,9 @@ queue_message(Process *c_p, #ifdef USE_VM_PROBES ERL_MESSAGE_DT_UTAG(mp) = dt_utag; #endif - mp->next = NULL; - mp->data.heap_frag = bp; -#ifndef ERTS_SMP res = receiver->msg.len; -#else - res = receiver->msg_inq.len; +#ifdef ERTS_SMP if (*receiver_locks & ERTS_PROC_LOCK_MAIN) { /* * We move 'in queue' to 'private queue' and place @@ -492,7 +467,7 @@ queue_message(Process *c_p, * we don't need to include the 'in queue' in * the root set when garbage collecting. */ - res += receiver->msg.len; + res += receiver->msg_inq.len; ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); LINK_MESSAGE_PRIVQ(receiver, mp); } @@ -544,19 +519,19 @@ queue_message(Process *c_p, void #ifdef USE_VM_PROBES erts_queue_message_probe(Process* receiver, ErtsProcLocks *receiver_locks, - ErlHeapFragment* bp, + ErtsMessage* mp, Eterm message, Eterm seq_trace_token, Eterm dt_utag) #else erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks, - ErlHeapFragment* bp, + ErtsMessage* mp, Eterm message, Eterm seq_trace_token) #endif { queue_message(NULL, receiver, - receiver_locks, NULL, - bp, + receiver_locks, + mp, message, seq_trace_token #ifdef USE_VM_PROBES @@ -565,246 +540,8 @@ erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks, ); } -void -erts_link_mbuf_to_proc(struct process *proc, ErlHeapFragment *bp) -{ - Eterm* htop = HEAP_TOP(proc); - - link_mbuf_to_proc(proc, bp); - if (htop < HEAP_LIMIT(proc)) { - *htop = make_pos_bignum_header(HEAP_LIMIT(proc)-htop-1); - HEAP_TOP(proc) = HEAP_LIMIT(proc); - } -} - -/* - * Moves content of message buffer attached to a message into a heap. - * The message buffer is deallocated. - */ -void -erts_move_msg_mbuf_to_heap(Eterm** hpp, ErlOffHeap* off_heap, ErlMessage *msg) -{ - struct erl_off_heap_header* oh; - Eterm term, token, *fhp, *hp; - Sint offs; - Uint sz; - ErlHeapFragment *bp; -#ifdef USE_VM_PROBES - Eterm utag; -#endif - -#ifdef HARD_DEBUG - struct erl_off_heap_header* dbg_oh_start = off_heap->first; - Eterm dbg_term, dbg_token; - ErlHeapFragment *dbg_bp; - Uint *dbg_hp, *dbg_thp_start; - Uint dbg_term_sz, dbg_token_sz; -#ifdef USE_VM_PROBES - Eterm dbg_utag; - Uint dbg_utag_sz; -#endif -#endif - - bp = msg->data.heap_frag; - term = ERL_MESSAGE_TERM(msg); - token = ERL_MESSAGE_TOKEN(msg); -#ifdef USE_VM_PROBES - utag = ERL_MESSAGE_DT_UTAG(msg); -#endif - if (!bp) { -#ifdef USE_VM_PROBES - ASSERT(is_immed(term) && is_immed(token) && is_immed(utag)); -#else - ASSERT(is_immed(term) && is_immed(token)); -#endif - return; - } - -#ifdef HARD_DEBUG - dbg_term_sz = size_object(term); - dbg_token_sz = size_object(token); - dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz); -#ifdef USE_VM_PROBES - dbg_utag_sz = size_object(utag); - dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz + dbg_utag_sz ); -#endif - /*ASSERT(dbg_term_sz + dbg_token_sz == erts_msg_used_frag_sz(msg)); - Copied size may be smaller due to removed SubBins's or garbage. - Copied size may be larger due to duplicated shared terms. - */ - dbg_hp = dbg_bp->mem; - dbg_term = copy_struct(term, dbg_term_sz, &dbg_hp, &dbg_bp->off_heap); - dbg_token = copy_struct(token, dbg_token_sz, &dbg_hp, &dbg_bp->off_heap); -#ifdef USE_VM_PROBES - dbg_utag = copy_struct(utag, dbg_utag_sz, &dbg_hp, &dbg_bp->off_heap); -#endif - dbg_thp_start = *hpp; -#endif - - if (bp->next != NULL) { - erts_move_multi_frags(hpp, off_heap, bp, msg->m, -#ifdef USE_VM_PROBES - 3, -#else - 2, -#endif - 0); - goto copy_done; - } - - OH_OVERHEAD(off_heap, bp->off_heap.overhead); - sz = bp->used_size; - - ASSERT(is_immed(term) || in_heapfrag(ptr_val(term),bp)); - ASSERT(is_immed(token) || in_heapfrag(ptr_val(token),bp)); - - fhp = bp->mem; - hp = *hpp; - offs = hp - fhp; - - oh = NULL; - while (sz--) { - Uint cpy_sz; - Eterm val = *fhp++; - - switch (primary_tag(val)) { - case TAG_PRIMARY_IMMED1: - *hp++ = val; - break; - case TAG_PRIMARY_LIST: - case TAG_PRIMARY_BOXED: - ASSERT(in_heapfrag(ptr_val(val), bp)); - *hp++ = offset_ptr(val, offs); - break; - case TAG_PRIMARY_HEADER: - *hp++ = val; - switch (val & _HEADER_SUBTAG_MASK) { - case ARITYVAL_SUBTAG: - break; - case REFC_BINARY_SUBTAG: - case FUN_SUBTAG: - case EXTERNAL_PID_SUBTAG: - case EXTERNAL_PORT_SUBTAG: - case EXTERNAL_REF_SUBTAG: - oh = (struct erl_off_heap_header*) (hp-1); - cpy_sz = thing_arityval(val); - goto cpy_words; - default: - cpy_sz = header_arity(val); - - cpy_words: - ASSERT(sz >= cpy_sz); - sz -= cpy_sz; - while (cpy_sz >= 8) { - cpy_sz -= 8; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - } - switch (cpy_sz) { - case 7: *hp++ = *fhp++; - case 6: *hp++ = *fhp++; - case 5: *hp++ = *fhp++; - case 4: *hp++ = *fhp++; - case 3: *hp++ = *fhp++; - case 2: *hp++ = *fhp++; - case 1: *hp++ = *fhp++; - default: break; - } - if (oh) { - /* Add to offheap list */ - oh->next = off_heap->first; - off_heap->first = oh; - ASSERT(*hpp <= (Eterm*)oh); - ASSERT(hp > (Eterm*)oh); - oh = NULL; - } - break; - } - break; - } - } - - ASSERT(bp->used_size == hp - *hpp); - *hpp = hp; - - if (is_not_immed(token)) { - ASSERT(in_heapfrag(ptr_val(token), bp)); - ERL_MESSAGE_TOKEN(msg) = offset_ptr(token, offs); -#ifdef HARD_DEBUG - ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TOKEN(msg))); - ASSERT(hp > ptr_val(ERL_MESSAGE_TOKEN(msg))); -#endif - } - - if (is_not_immed(term)) { - ASSERT(in_heapfrag(ptr_val(term),bp)); - ERL_MESSAGE_TERM(msg) = offset_ptr(term, offs); -#ifdef HARD_DEBUG - ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TERM(msg))); - ASSERT(hp > ptr_val(ERL_MESSAGE_TERM(msg))); -#endif - } -#ifdef USE_VM_PROBES - if (is_not_immed(utag)) { - ASSERT(in_heapfrag(ptr_val(utag), bp)); - ERL_MESSAGE_DT_UTAG(msg) = offset_ptr(utag, offs); -#ifdef HARD_DEBUG - ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_DT_UTAG(msg))); - ASSERT(hp > ptr_val(ERL_MESSAGE_DT_UTAG(msg))); -#endif - } -#endif - -copy_done: - -#ifdef HARD_DEBUG - { - int i, j; - ErlHeapFragment* frag; - { - struct erl_off_heap_header* dbg_oh = off_heap->first; - i = j = 0; - while (dbg_oh != dbg_oh_start) { - dbg_oh = dbg_oh->next; - i++; - } - for (frag=bp; frag; frag=frag->next) { - dbg_oh = frag->off_heap.first; - while (dbg_oh) { - dbg_oh = dbg_oh->next; - j++; - } - } - ASSERT(i == j); - } - } -#endif - - - bp->off_heap.first = NULL; - free_message_buffer(bp); - msg->data.heap_frag = NULL; - -#ifdef HARD_DEBUG - ASSERT(eq(ERL_MESSAGE_TERM(msg), dbg_term)); - ASSERT(eq(ERL_MESSAGE_TOKEN(msg), dbg_token)); -#ifdef USE_VM_PROBES - ASSERT(eq(ERL_MESSAGE_DT_UTAG(msg), dbg_utag)); -#endif - free_message_buffer(dbg_bp); -#endif - -} - - Uint -erts_msg_attached_data_size_aux(ErlMessage *msg) +erts_msg_attached_data_size_aux(ErtsMessage *msg) { Sint sz; ASSERT(is_non_value(ERL_MESSAGE_TERM(msg))); @@ -833,29 +570,72 @@ erts_msg_attached_data_size_aux(ErlMessage *msg) return sz; } -void -erts_move_msg_attached_data_to_heap(ErtsHeapFactory* factory, - ErlMessage *msg) +ErtsMessage * +erts_try_alloc_message_on_heap(Process *pp, + erts_aint32_t *psp, + ErtsProcLocks *plp, + Uint sz, + Eterm **hpp, + ErlOffHeap **ohpp, + int *on_heap_p) { - if (is_value(ERL_MESSAGE_TERM(msg))) - erts_move_msg_mbuf_to_heap(&factory->hp, factory->off_heap, msg); - else if (msg->data.dist_ext) { - ASSERT(msg->data.dist_ext->heap_size >= 0); - if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) { - ErlHeapFragment *heap_frag; - heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); - ERL_MESSAGE_TOKEN(msg) = copy_struct(ERL_MESSAGE_TOKEN(msg), - heap_frag->used_size, - &factory->hp, - factory->off_heap); - erts_cleanup_offheap(&heap_frag->off_heap); +#ifdef ERTS_SMP + int locked_main = 0; +#endif + ErtsMessage *mp; + + ASSERT(!(*psp & ERTS_PSFLG_OFF_HEAP_MSGQ)); + + if ( +#if defined(ERTS_SMP) + *plp & ERTS_PROC_LOCK_MAIN +#else + 1 +#endif + ) { +#ifdef ERTS_SMP + try_on_heap: +#endif + if ((*psp & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) + || (pp->flags & F_DISABLE_GC) + || HEAP_LIMIT(pp) - HEAP_TOP(pp) <= sz) { + /* + * The heap is either potentially in an inconsistent + * state, or not large enough. + */ +#ifdef ERTS_SMP + if (locked_main) { + *plp &= ~ERTS_PROC_LOCK_MAIN; + erts_smp_proc_unlock(pp, ERTS_PROC_LOCK_MAIN); + } +#endif + goto in_message_fragment; } - ERL_MESSAGE_TERM(msg) = erts_decode_dist_ext(factory, - msg->data.dist_ext); - erts_free_dist_ext_copy(msg->data.dist_ext); - msg->data.dist_ext = NULL; + + *hpp = HEAP_TOP(pp); + HEAP_TOP(pp) = *hpp + sz; + *ohpp = &MSO(pp); + mp = erts_alloc_message(0, NULL); + mp->data.attached = NULL; + *on_heap_p = !0; + } +#ifdef ERTS_SMP + else if (erts_smp_proc_trylock(pp, ERTS_PROC_LOCK_MAIN) == 0) { + locked_main = 1; + *psp = erts_smp_atomic32_read_nob(&pp->state); + *plp |= ERTS_PROC_LOCK_MAIN; + goto try_on_heap; + } +#endif + else { + in_message_fragment: + + mp = erts_alloc_message(sz, hpp); + *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; + *on_heap_p = 0; } - /* else: bad external detected when calculating size */ + + return mp; } /* @@ -870,7 +650,8 @@ erts_send_message(Process* sender, unsigned flags) { Uint msize; - ErlHeapFragment* bp = NULL; + ErtsMessage* mp; + ErlOffHeap *ohp; Eterm token = NIL; Sint res = 0; #ifdef USE_VM_PROBES @@ -879,27 +660,31 @@ erts_send_message(Process* sender, Sint tok_label = 0; Sint tok_lastcnt = 0; Sint tok_serial = 0; + Eterm utag = NIL; #endif + erts_aint32_t receiver_state; BM_STOP_TIMER(system); BM_MESSAGE(message,sender,receiver); BM_START_TIMER(send); #ifdef USE_VM_PROBES *sender_name = *receiver_name = '\0'; - if (DTRACE_ENABLED(message_send)) { + if (DTRACE_ENABLED(message_send)) { erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), "%T", sender->common.id); erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)), "%T", receiver->common.id); } #endif + + receiver_state = erts_smp_atomic32_read_nob(&receiver->state); + if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) { Eterm* hp; Eterm stoken = SEQ_TRACE_TOKEN(sender); Uint seq_trace_size = 0; #ifdef USE_VM_PROBES Uint dt_utag_size = 0; - Eterm utag = NIL; #endif BM_SWAP_TIMER(send,size); @@ -923,23 +708,32 @@ erts_send_message(Process* sender, } #endif - bp = new_message_buffer(msize + seq_trace_size + mp = erts_alloc_message_heap_state(receiver, + &receiver_state, + receiver_locks, + (msize #ifdef USE_VM_PROBES - + dt_utag_size + + dt_utag_size #endif - ); - hp = bp->mem; + + seq_trace_size), + &hp, + &ohp); BM_SWAP_TIMER(send,copy); - token = copy_struct(stoken, - seq_trace_size, - &hp, - &bp->off_heap); + if (is_immed(stoken)) + token = stoken; + else + token = copy_struct(stoken, seq_trace_size, &hp, ohp); + + if (is_not_immed(message)) + message = copy_struct(message, msize, &hp, ohp); - message = copy_struct(message, msize, &hp, &bp->off_heap); #ifdef USE_VM_PROBES if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) { - utag = copy_struct(DT_UTAG(sender), dt_utag_size, &hp, &bp->off_heap); + if (is_immed(DT_UTAG(sender))) + utag = DT_UTAG(sender); + else + utag = copy_struct(DT_UTAG(sender), dt_utag_size, ohp); #ifdef DTRACE_TAG_HARDDEBUG erts_fprintf(stderr, "Dtrace -> (%T) Spreading tag (%T) with " @@ -961,101 +755,49 @@ erts_send_message(Process* sender, msize, tok_label, tok_lastcnt, tok_serial); } #endif - res = queue_message(NULL, - receiver, - receiver_locks, - NULL, - bp, - message, - token -#ifdef USE_VM_PROBES - , utag -#endif - ); - BM_SWAP_TIMER(send,system); - } else if (sender == receiver) { - /* Drop message if receiver has a pending exit ... */ -#ifdef ERTS_SMP - ErtsProcLocks need_locks = (~(*receiver_locks) - & (ERTS_PROC_LOCK_MSGQ - | ERTS_PROC_LOCK_STATUS)); - if (need_locks) { - *receiver_locks |= need_locks; - if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) { - if (need_locks == ERTS_PROC_LOCK_MSGQ) { - erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS); - need_locks = ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS; - } - erts_smp_proc_lock(receiver, need_locks); - } - } - if (!ERTS_PROC_PENDING_EXIT(receiver)) -#endif - { - ErlMessage* mp = message_alloc(); - - DTRACE6(message_send, sender_name, receiver_name, - size_object(message), tok_label, tok_lastcnt, tok_serial); - mp->data.attached = NULL; - ERL_MESSAGE_TERM(mp) = message; - ERL_MESSAGE_TOKEN(mp) = NIL; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = NIL; -#endif - mp->next = NULL; - /* - * We move 'in queue' to 'private queue' and place - * message at the end of 'private queue' in order - * to ensure that the 'in queue' doesn't contain - * references into the heap. By ensuring this, - * we don't need to include the 'in queue' in - * the root set when garbage collecting. - */ - - ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); - LINK_MESSAGE_PRIVQ(receiver, mp); - - res = receiver->msg.len; - - if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { - trace_receive(receiver, message); - } - } - BM_SWAP_TIMER(send,system); } else { - ErlOffHeap *ohp; Eterm *hp; - erts_aint32_t state; - BM_SWAP_TIMER(send,size); - msize = size_object(message); - BM_SWAP_TIMER(size,send); - hp = erts_alloc_message_heap_state(msize, - &bp, - &ohp, - receiver, - receiver_locks, - &state); - BM_SWAP_TIMER(send,copy); - message = copy_struct(message, msize, &hp, ohp); - BM_MESSAGE_COPIED(msz); - BM_SWAP_TIMER(copy,send); + if (receiver == sender && !(receiver_state & ERTS_PSFLG_OFF_HEAP_MSGQ)) { + mp = erts_alloc_message(0, NULL); + msize = 0; + } + else { + BM_SWAP_TIMER(send,size); + msize = size_object(message); + BM_SWAP_TIMER(size,send); + + mp = erts_alloc_message_heap_state(receiver, + &receiver_state, + receiver_locks, + msize, + &hp, + &ohp); + BM_SWAP_TIMER(send,copy); + if (is_not_immed(message)) + message = copy_struct(message, msize, &hp, ohp); + BM_MESSAGE_COPIED(msz); + BM_SWAP_TIMER(copy,send); + } DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); - res = queue_message(sender, - receiver, - receiver_locks, - &state, - bp, - message, - token + } + + res = queue_message(sender, + receiver, + &receiver_state, + receiver_locks, + mp, + message, + token #ifdef USE_VM_PROBES - , NIL + , utag #endif - ); - BM_SWAP_TIMER(send,system); - } - return res; + ); + + BM_SWAP_TIMER(send,system); + + return res; } /* @@ -1075,7 +817,8 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp, Uint sz_from; Eterm* hp; Eterm temptoken; - ErlHeapFragment* bp = NULL; + ErtsMessage* mp; + ErlOffHeap *ohp; if (token != NIL #ifdef USE_VM_PROBES @@ -1087,36 +830,483 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp, sz_reason = size_object(reason); sz_token = size_object(token); sz_from = size_object(from); - bp = new_message_buffer(sz_reason + sz_from + sz_token + 4); - hp = bp->mem; - mess = copy_struct(reason, sz_reason, &hp, &bp->off_heap); - from_copy = copy_struct(from, sz_from, &hp, &bp->off_heap); + mp = erts_alloc_message_heap(to, to_locksp, + sz_reason + sz_from + sz_token + 4, + &hp, &ohp); + mess = copy_struct(reason, sz_reason, &hp, ohp); + from_copy = copy_struct(from, sz_from, &hp, ohp); save = TUPLE3(hp, am_EXIT, from_copy, mess); hp += 4; /* the trace token must in this case be updated by the caller */ seq_trace_output(token, save, SEQ_TRACE_SEND, to->common.id, NULL); - temptoken = copy_struct(token, sz_token, &hp, &bp->off_heap); - erts_queue_message(to, to_locksp, bp, save, temptoken); + temptoken = copy_struct(token, sz_token, &hp, ohp); + erts_queue_message(to, to_locksp, mp, save, temptoken); } else { - ErlOffHeap *ohp; sz_reason = size_object(reason); sz_from = IS_CONST(from) ? 0 : size_object(from); - hp = erts_alloc_message_heap(sz_reason+sz_from+4, - &bp, - &ohp, - to, - to_locksp); + mp = erts_alloc_message_heap(to, to_locksp, + sz_reason+sz_from+4, &hp, &ohp); mess = copy_struct(reason, sz_reason, &hp, ohp); from_copy = (IS_CONST(from) ? from : copy_struct(from, sz_from, &hp, ohp)); save = TUPLE3(hp, am_EXIT, from_copy, mess); - erts_queue_message(to, to_locksp, bp, save, NIL); + erts_queue_message(to, to_locksp, mp, save, NIL); } } +void erts_save_message_in_proc(Process *p, ErtsMessage *msgp) +{ + ErlHeapFragment *hfp; + + if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) + hfp = &msgp->hfrag; + else if (msgp->data.attached) { + hfp = msgp->data.heap_frag; + } + else { + erts_free_message(msgp); + return; /* Nothing to save */ + } + + while (1) { + struct erl_off_heap_header *ohhp = hfp->off_heap.first; + if (ohhp) { + for ( ; ohhp->next; ohhp = ohhp->next) + ; + ohhp->next = p->off_heap.first; + p->off_heap.first = hfp->off_heap.first; + hfp->off_heap.first = NULL; + } + p->off_heap.overhead += hfp->off_heap.overhead; + hfp->off_heap.overhead = 0; + p->mbuf_sz += hfp->used_size; + + if (!hfp->next) + break; + hfp = hfp->next; + } + + msgp->next = p->msg_frag; + p->msg_frag = msgp; +} + +Sint +erts_move_messages_off_heap(Process *c_p) +{ + int reds = 1; + /* + * Move all messages off heap. This *only* occurs when the + * process had off heap message disabled and just enabled + * it... + */ + ErtsMessage *mp; + + reds += c_p->msg.len / 10; + + ASSERT(erts_smp_atomic32_read_nob(&c_p->state) + & ERTS_PSFLG_OFF_HEAP_MSGQ); + ASSERT(c_p->flags & F_OFF_HEAP_MSGQ_CHNG); + + for (mp = c_p->msg.first; mp; mp = mp->next) { + Uint msg_sz, token_sz; +#ifdef USE_VM_PROBES + Uint utag_sz; +#endif + Eterm *hp; + ErlHeapFragment *hfrag; + + if (mp->data.attached) + continue; + + if (is_immed(ERL_MESSAGE_TERM(mp)) +#ifdef USE_VM_PROBES + && is_immed(ERL_MESSAGE_DT_UTAG(mp)) +#endif + && is_not_immed(ERL_MESSAGE_TOKEN(mp))) + continue; + + /* + * The message refers into the heap. Copy the message + * from the heap into a heap fragment and attach + * it to the message... + */ + msg_sz = size_object(ERL_MESSAGE_TERM(mp)); +#ifdef USE_VM_PROBES + utag_sz = size_object(ERL_MESSAGE_DT_UTAG(mp)); +#endif + token_sz = size_object(ERL_MESSAGE_TOKEN(mp)); + + hfrag = new_message_buffer(msg_sz +#ifdef USE_VM_PROBES + + utag_sz +#endif + + token_sz); + hp = hfrag->mem; + if (is_not_immed(ERL_MESSAGE_TERM(mp))) + ERL_MESSAGE_TERM(mp) = copy_struct(ERL_MESSAGE_TERM(mp), + msg_sz, &hp, + &hfrag->off_heap); + if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) + ERL_MESSAGE_TOKEN(mp) = copy_struct(ERL_MESSAGE_TOKEN(mp), + token_sz, &hp, + &hfrag->off_heap); +#ifdef USE_VM_PROBES + if (is_not_immed(ERL_MESSAGE_DT_UTAG(mp))) + ERL_MESSAGE_DT_UTAG(mp) = copy_struct(ERL_MESSAGE_DT_UTAG(mp), + utag_sz, &hp, + &hfrag->off_heap); +#endif + mp->data.heap_frag = hfrag; + reds += 1; + } + + return reds; +} + +Sint +erts_complete_off_heap_message_queue_change(Process *c_p) +{ + int reds = 1; + + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); + ASSERT(c_p->flags & F_OFF_HEAP_MSGQ_CHNG); + ASSERT(erts_smp_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ); + + /* + * This job was first initiated when the process changed + * "off heap message queue" state from false to true. Since + * then ERTS_PSFLG_OFF_HEAP_MSGQ has been set. However, the + * state change might have been changed again (multiple times) + * since then. Check users last requested state (the flag + * F_OFF_HEAP_MSGQ), and make the state consistent with that. + */ + + if (!(c_p->flags & F_OFF_HEAP_MSGQ)) + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_OFF_HEAP_MSGQ); + else { + reds += 2; + erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p); + erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); + reds += erts_move_messages_off_heap(c_p); + } + c_p->flags &= ~F_OFF_HEAP_MSGQ_CHNG; + return reds; +} + +typedef struct { + Eterm pid; + ErtsThrPrgrLaterOp lop; +} ErtsChangeOffHeapMessageQueue; + +static void +change_off_heap_msgq(void *vcohmq) +{ + ErtsChangeOffHeapMessageQueue *cohmq; + /* + * Now we've waited thread progress which ensures that all + * messages to the process are enqueued off heap. Schedule + * completion of this change as a system task on the process + * itself. This in order to avoid lock contention on its + * main lock. We will be called in + * erts_complete_off_heap_message_queue_change() (above) when + * the system task has been selected for execution. + */ + cohmq = (ErtsChangeOffHeapMessageQueue *) vcohmq; + erts_schedule_complete_off_heap_message_queue_change(cohmq->pid); + erts_free(ERTS_ALC_T_MSGQ_CHNG, vcohmq); +} + +Eterm +erts_change_off_heap_message_queue_state(Process *c_p, int enable) +{ + +#ifdef DEBUG + if (c_p->flags & F_OFF_HEAP_MSGQ) { + ASSERT(erts_smp_atomic32_read_nob(&c_p->state) + & ERTS_PSFLG_OFF_HEAP_MSGQ); + } + else { + if (c_p->flags & F_OFF_HEAP_MSGQ_CHNG) { + ASSERT(erts_smp_atomic32_read_nob(&c_p->state) + & ERTS_PSFLG_OFF_HEAP_MSGQ); + } + else { + ASSERT(!(erts_smp_atomic32_read_nob(&c_p->state) + & ERTS_PSFLG_OFF_HEAP_MSGQ)); + } + } +#endif + + if (c_p->flags & F_OFF_HEAP_MSGQ) { + /* Off heap message queue is enabled */ + + if (!enable) { + c_p->flags &= ~F_OFF_HEAP_MSGQ; + /* + * We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ + * if a change is ongoing. It will be adjusted when the + * change completes... + */ + if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { + /* Safe to clear ERTS_PSFLG_OFF_HEAP_MSGQ... */ + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_OFF_HEAP_MSGQ); + } + } + + return am_true; /* Old state */ + } + + /* Off heap message queue is disabled */ + + if (enable) { + c_p->flags |= F_OFF_HEAP_MSGQ; + /* + * We do not have to schedule a change if + * we have an ongoing change... + */ + if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { + ErtsChangeOffHeapMessageQueue *cohmq; + /* + * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait + * thread progress before completing the change in + * order to ensure that all senders observe that + * messages should be passed off heap. When the + * change has completed, GC does not need to inspect + * the message queue at all. + */ + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_OFF_HEAP_MSGQ); + c_p->flags |= F_OFF_HEAP_MSGQ_CHNG; + cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG, + sizeof(ErtsChangeOffHeapMessageQueue)); + cohmq->pid = c_p->common.id; + erts_schedule_thr_prgr_later_op(change_off_heap_msgq, + (void *) cohmq, + &cohmq->lop); + } + } + + return am_false; /* Old state */ +} + +int +erts_decode_dist_message(Process *proc, ErtsProcLocks proc_locks, + ErtsMessage *msgp, int force_off_heap) +{ + ErtsHeapFactory factory; + Eterm msg; + ErlHeapFragment *bp; + Sint need; + int decode_in_heap_frag; + + decode_in_heap_frag = (force_off_heap + || !(proc_locks & ERTS_PROC_LOCK_MAIN) + || (proc->flags & F_OFF_HEAP_MSGQ)); + + if (msgp->data.dist_ext->heap_size >= 0) + need = msgp->data.dist_ext->heap_size; + else { + need = erts_decode_dist_ext_size(msgp->data.dist_ext); + if (need < 0) { + /* bad msg; remove it... */ + if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { + bp = erts_dist_ext_trailer(msgp->data.dist_ext); + erts_cleanup_offheap(&bp->off_heap); + } + erts_free_dist_ext_copy(msgp->data.dist_ext); + msgp->data.dist_ext = NULL; + return 0; + } + + msgp->data.dist_ext->heap_size = need; + } + + if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { + bp = erts_dist_ext_trailer(msgp->data.dist_ext); + need += bp->used_size; + } + + if (decode_in_heap_frag) + erts_factory_heap_frag_init(&factory, new_message_buffer(need)); + else + erts_factory_proc_prealloc_init(&factory, proc, need); + + ASSERT(msgp->data.dist_ext->heap_size >= 0); + if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { + ErlHeapFragment *heap_frag; + heap_frag = erts_dist_ext_trailer(msgp->data.dist_ext); + ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp), + heap_frag->used_size, + &factory.hp, + factory.off_heap); + erts_cleanup_offheap(&heap_frag->off_heap); + } + + msg = erts_decode_dist_ext(&factory, msgp->data.dist_ext); + ERL_MESSAGE_TERM(msgp) = msg; + erts_free_dist_ext_copy(msgp->data.dist_ext); + msgp->data.attached = NULL; + + if (is_non_value(msg)) { + erts_factory_undo(&factory); + return 0; + } + + erts_factory_trim_and_close(&factory, msgp->m, + ERL_MESSAGE_REF_ARRAY_SZ); + + ASSERT(!msgp->data.heap_frag); + + if (decode_in_heap_frag) + msgp->data.heap_frag = factory.heap_frags; + + return 1; +} + +/* + * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0 will move off heap messages + * into the heap of the inspected process if off_heap_message_queue + * is false when process_info(_, messages) is called. That is, the + * following GC will have more data in the rootset compared to the + * scenario when process_info(_, messages) had not been called. + * + * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS != 0 will keep off heap messages + * off heap when process_info(_, messages) is called regardless of + * the off_heap_message_queue setting of the process. That is, it + * will change the following execution of the process as little as + * possible. + */ +#define ERTS_INSPECT_MSGQ_KEEP_OH_MSGS 1 + +Uint +erts_prep_msgq_for_inspection(Process *c_p, Process *rp, + ErtsProcLocks rp_locks, ErtsMessageInfo *mip) +{ + Uint tot_heap_size; + ErtsMessage* mp; + Sint i; + int self_on_heap; + + /* + * Prepare the message queue for inspection + * by process_info(). + * + * + * - Decode all messages on external format + * - Remove all corrupt dist messages from queue + * - Save pointer to, and heap size need of each + * message in the mip array. + * - Return total heap size need for all messages + * that needs to be copied. + * + * If ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0: + * - In case off heap messages is disabled and + * we are inspecting our own queue, move all + * off heap data into the heap. + */ + + self_on_heap = c_p == rp && !(c_p->flags & F_OFF_HEAP_MSGQ); + + tot_heap_size = 0; + i = 0; + mp = rp->msg.first; + while (mp) { + Eterm msg = ERL_MESSAGE_TERM(mp); + + mip[i].size = 0; + + if (is_non_value(msg)) { + /* Dist message on external format; decode it... */ + if (mp->data.attached) + erts_decode_dist_message(rp, rp_locks, mp, + ERTS_INSPECT_MSGQ_KEEP_OH_MSGS); + + msg = ERL_MESSAGE_TERM(mp); + + if (is_non_value(msg)) { + ErtsMessage **mpp; + ErtsMessage *bad_mp = mp; + /* + * Bad distribution message; remove + * it from the queue... + */ + ASSERT(!mp->data.attached); + + mpp = i == 0 ? &rp->msg.first : &mip[i-1].msgp->next; + + if (rp->msg.save == &bad_mp->next) + rp->msg.save = mpp; + if (rp->msg.last == &bad_mp->next) + rp->msg.last = mpp; + mp = mp->next; + *mpp = mp; + rp->msg.len--; + bad_mp->next = NULL; + erts_cleanup_messages(bad_mp); + continue; + } + } + + ASSERT(is_value(msg)); + +#if ERTS_INSPECT_MSGQ_KEEP_OH_MSGS + if (is_not_immed(msg) && (!self_on_heap || mp->data.attached)) { + Uint sz = size_object(msg); + mip[i].size = sz; + tot_heap_size += sz; + } +#else + if (self_on_heap) { + if (mp->data.attached) { + ErtsMessage *tmp = NULL; + if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) { + erts_link_mbuf_to_proc(rp, mp->data.heap_frag); + mp->data.attached = NULL; + } + else { + /* + * Need to replace the message reference since + * we will get references to the message data + * from the heap... + */ + ErtsMessage **mpp; + tmp = erts_alloc_message(0, NULL); + sys_memcpy((void *) tmp->m, (void *) mp->m, + sizeof(Eterm)*ERL_MESSAGE_REF_ARRAY_SZ); + mpp = i == 0 ? &rp->msg.first : &mip[i-1].msgp->next; + tmp->next = mp->next; + if (rp->msg.save == &mp->next) + rp->msg.save = &tmp->next; + if (rp->msg.last == &mp->next) + rp->msg.last = &tmp->next; + *mpp = tmp; + erts_save_message_in_proc(rp, mp); + mp = tmp; + } + } + } + else if (is_not_immed(msg)) { + Uint sz = size_object(msg); + mip[i].size = sz; + tot_heap_size += sz; + } + +#endif + + mip[i].msgp = mp; + i++; + mp = mp->next; + } + + return tot_heap_size; +} + void erts_factory_proc_init(ErtsHeapFactory* factory, Process* p) { @@ -1127,47 +1317,138 @@ void erts_factory_proc_prealloc_init(ErtsHeapFactory* factory, Process* p, Sint size) { + ErlHeapFragment *bp = p->mbuf; factory->mode = FACTORY_HALLOC; factory->p = p; factory->hp_start = HAlloc(p, size); factory->hp = factory->hp_start; factory->hp_end = factory->hp_start + size; factory->off_heap = &p->off_heap; + factory->message = NULL; factory->off_heap_saved.first = p->off_heap.first; factory->off_heap_saved.overhead = p->off_heap.overhead; - factory->heap_frags_saved = p->mbuf; + factory->heap_frags_saved = bp; + factory->heap_frags_saved_used = bp ? bp->used_size : 0; factory->heap_frags = NULL; /* not used */ factory->alloc_type = 0; /* not used */ } -void erts_factory_message_init(ErtsHeapFactory* factory, - Process* rp, - Eterm* hp, - ErlHeapFragment* bp) +void erts_factory_heap_frag_init(ErtsHeapFactory* factory, + ErlHeapFragment* bp) +{ + factory->mode = FACTORY_HEAP_FRAGS; + factory->p = NULL; + factory->hp_start = bp->mem; + factory->hp = bp->mem; + factory->hp_end = bp->mem + bp->alloc_size; + factory->off_heap = &bp->off_heap; + factory->message = NULL; + factory->heap_frags = bp; + factory->heap_frags_saved = NULL; + factory->heap_frags_saved_used = 0; + factory->alloc_type = ERTS_ALC_T_HEAP_FRAG; + ASSERT(!bp->next); + factory->off_heap_saved.first = factory->off_heap->first; + factory->off_heap_saved.overhead = factory->off_heap->overhead; + + ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end); +} + + +ErtsMessage * +erts_factory_message_create(ErtsHeapFactory* factory, + Process *proc, + ErtsProcLocks *proc_locksp, + Uint sz) +{ + Eterm *hp; + ErlOffHeap *ohp; + ErtsMessage *msgp; + int on_heap; + erts_aint32_t state; + + state = erts_smp_atomic32_read_nob(&proc->state); + + if (state & ERTS_PSFLG_OFF_HEAP_MSGQ) { + msgp = erts_alloc_message(sz, &hp); + ohp = sz == 0 ? NULL : &msgp->hfrag.off_heap; + on_heap = 0; + } + else { + msgp = erts_try_alloc_message_on_heap(proc, &state, + proc_locksp, + sz, &hp, &ohp, + &on_heap); + } + + if (on_heap) { + ASSERT(*proc_locksp & ERTS_PROC_LOCK_MAIN); + ASSERT(ohp == &proc->off_heap); + factory->mode = FACTORY_HALLOC; + factory->p = proc; + factory->heap_frags_saved = proc->mbuf; + factory->heap_frags_saved_used = proc->mbuf ? proc->mbuf->used_size : 0; + } + else { + factory->mode = FACTORY_MESSAGE; + factory->p = NULL; + factory->heap_frags_saved = NULL; + factory->heap_frags_saved_used = 0; + + if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) { + ASSERT(!msgp->hfrag.next); + factory->heap_frags = NULL; + } + else { + ASSERT(!msgp->data.heap_frag + || !msgp->data.heap_frag->next); + factory->heap_frags = msgp->data.heap_frag; + } + } + factory->hp_start = hp; + factory->hp = hp; + factory->hp_end = hp + sz; + factory->message = msgp; + factory->off_heap = ohp; + factory->alloc_type = ERTS_ALC_T_HEAP_FRAG; + if (ohp) { + factory->off_heap_saved.first = ohp->first; + factory->off_heap_saved.overhead = ohp->overhead; + } + else { + factory->off_heap_saved.first = NULL; + factory->off_heap_saved.overhead = 0; + } + + ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end); + + return msgp; +} + +void erts_factory_selfcontained_message_init(ErtsHeapFactory* factory, + ErtsMessage *msgp, + Eterm *hp) { - if (bp) { - factory->mode = FACTORY_HEAP_FRAGS; - factory->p = NULL; - factory->hp_start = bp->mem; - factory->hp = hp ? hp : bp->mem; - factory->hp_end = bp->mem + bp->alloc_size; - factory->off_heap = &bp->off_heap; - factory->heap_frags = bp; - factory->heap_frags_saved = bp; - factory->alloc_type = ERTS_ALC_T_HEAP_FRAG; - ASSERT(!bp->next); + ErlHeapFragment* bp; + if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) { + bp = &msgp->hfrag; + factory->heap_frags = NULL; } else { - factory->mode = FACTORY_HALLOC; - factory->p = rp; - factory->hp_start = hp; - factory->hp = hp; - factory->hp_end = HEAP_TOP(rp); - factory->off_heap = &rp->off_heap; - factory->heap_frags_saved = rp->mbuf; - factory->heap_frags = NULL; /* not used */ - factory->alloc_type = 0; /* not used */ + bp = msgp->data.heap_frag; + factory->heap_frags = bp; } + factory->mode = FACTORY_MESSAGE; + factory->p = NULL; + factory->hp_start = bp->mem; + factory->hp = hp; + factory->hp_end = bp->mem + bp->alloc_size; + factory->message = msgp; + factory->off_heap = &bp->off_heap; + factory->heap_frags_saved = NULL; + factory->heap_frags_saved_used = 0; + factory->alloc_type = ERTS_ALC_T_HEAP_FRAG; + ASSERT(!bp->next); factory->off_heap_saved.first = factory->off_heap->first; factory->off_heap_saved.overhead = factory->off_heap->overhead; @@ -1230,8 +1511,16 @@ static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra) factory->hp_end = factory->hp + need; return; - case FACTORY_HEAP_FRAGS: - bp = factory->heap_frags; + case FACTORY_MESSAGE: + if (!factory->heap_frags) { + ASSERT(factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG); + bp = &factory->message->hfrag; + } + else { + /* Fall through */ + case FACTORY_HEAP_FRAGS: + bp = factory->heap_frags; + } if (bp) { ASSERT(factory->hp > bp->mem); @@ -1269,8 +1558,23 @@ void erts_factory_close(ErtsHeapFactory* factory) HRelease(factory->p, factory->hp_end, factory->hp); break; - case FACTORY_HEAP_FRAGS: - bp = factory->heap_frags; + case FACTORY_MESSAGE: + if (!factory->heap_frags) { + if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG) + bp = &factory->message->hfrag; + else + bp = NULL; + } + else { + if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG) + factory->message->hfrag.next = factory->heap_frags; + else + factory->message->data.heap_frag = factory->heap_frags; + + /* Fall through */ + case FACTORY_HEAP_FRAGS: + bp = factory->heap_frags; + } if (bp) { ASSERT(factory->hp >= bp->mem); @@ -1291,17 +1595,47 @@ void erts_factory_close(ErtsHeapFactory* factory) void erts_factory_trim_and_close(ErtsHeapFactory* factory, Eterm *brefs, Uint brefs_size) { - if (factory->mode == FACTORY_HEAP_FRAGS) { - ErlHeapFragment* bp = factory->heap_frags; + ErlHeapFragment *bp; + + switch (factory->mode) { + case FACTORY_MESSAGE: { + ErtsMessage *mp = factory->message; + if (mp->data.attached == ERTS_MSG_COMBINED_HFRAG) { + if (!mp->hfrag.next) { + Uint sz = factory->hp - factory->hp_start; + mp = erts_shrink_message(mp, sz, brefs, brefs_size); + factory->message = mp; + factory->mode = FACTORY_CLOSED; + return; + } + /*else we don't trim multi fragmented messages for now (off_heap...) */ + break; + } + /* Fall through... */ + } + case FACTORY_HEAP_FRAGS: + bp = factory->heap_frags; + if (!bp) + break; if (bp->next == NULL) { Uint used_sz = factory->hp - bp->mem; ASSERT(used_sz <= bp->alloc_size); - factory->heap_frags = erts_resize_message_buffer(bp, used_sz, - brefs, brefs_size); + if (used_sz > 0) + bp = erts_resize_message_buffer(bp, used_sz, + brefs, brefs_size); + else { + free_message_buffer(bp); + bp = NULL; + } + factory->heap_frags = bp; + if (factory->mode == FACTORY_MESSAGE) + factory->message->data.heap_frag = bp; factory->mode = FACTORY_CLOSED; return; } - /*else we don't trim multi fragmented messages for now */ + /*else we don't trim multi fragmented messages for now (off_heap...) */ + default: + break; } erts_factory_close(factory); } @@ -1349,38 +1683,35 @@ void erts_factory_undo(ErtsHeapFactory* factory) /* Rollback heap top */ - if (factory->heap_frags_saved == NULL) { /* No heap frags when we started */ - ASSERT(factory->hp_start >= HEAP_START(factory->p)); - ASSERT(factory->hp_start <= HEAP_LIMIT(factory->p)); - HEAP_TOP(factory->p) = factory->hp_start; - } - else { + if (HEAP_START(factory->p) <= factory->hp_start + && factory->hp_start <= HEAP_LIMIT(factory->p)) { + HEAP_TOP(factory->p) = factory->hp_start; + } + + /* Fix last heap frag */ + if (factory->heap_frags_saved) { ASSERT(factory->heap_frags_saved == factory->p->mbuf); - if (factory->hp_start == factory->heap_frags_saved->mem) { + if (factory->hp_start != factory->heap_frags_saved->mem) + factory->heap_frags_saved->used_size = factory->heap_frags_saved_used; + else { factory->p->mbuf = factory->p->mbuf->next; ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, factory->heap_frags_saved, ERTS_HEAP_FRAG_SIZE(factory->heap_frags_saved->alloc_size)); } - else if (factory->hp_start != factory->hp_end) { - unsigned remains = factory->hp_start - factory->heap_frags_saved->mem; - ASSERT(remains > 0 && remains < factory->heap_frags_saved->used_size); - factory->heap_frags_saved->used_size = remains; - } } } break; + case FACTORY_MESSAGE: + if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG) + factory->message->hfrag.next = factory->heap_frags; + else + factory->message->data.heap_frag = factory->heap_frags; + erts_cleanup_messages(factory->message); + break; case FACTORY_HEAP_FRAGS: - bp = factory->heap_frags; - do { - ErlHeapFragment* next_bp = bp->next; - - erts_cleanup_offheap(&bp->off_heap); - ERTS_HEAP_FREE(factory->alloc_type, (void *) bp, - ERTS_HEAP_FRAG_SIZE(bp->size)); - bp = next_bp; - }while (bp != NULL); + free_message_buffer(factory->heap_frags); break; case FACTORY_CLOSED: break; diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index f37b430d27..740ae46a0f 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -24,6 +24,8 @@ struct proc_bin; struct external_thing_; +typedef struct erl_mesg ErtsMessage; + /* * This struct represents data that must be updated by structure copy, * but is stored outside of any heap. @@ -54,6 +56,7 @@ typedef struct { enum { FACTORY_CLOSED = 0, FACTORY_HALLOC, + FACTORY_MESSAGE, FACTORY_HEAP_FRAGS, FACTORY_STATIC } mode; @@ -61,8 +64,10 @@ typedef struct { Eterm* hp_start; Eterm* hp; Eterm* hp_end; + ErtsMessage *message; struct erl_heap_fragment* heap_frags; struct erl_heap_fragment* heap_frags_saved; + Uint heap_frags_saved_used; ErlOffHeap* off_heap; ErlOffHeap off_heap_saved; Uint32 alloc_type; @@ -70,7 +75,10 @@ typedef struct { void erts_factory_proc_init(ErtsHeapFactory*, Process*); void erts_factory_proc_prealloc_init(ErtsHeapFactory*, Process*, Sint size); -void erts_factory_message_init(ErtsHeapFactory*, Process*, Eterm* hp, struct erl_heap_fragment*); +void erts_factory_heap_frag_init(ErtsHeapFactory*, struct erl_heap_fragment*); +ErtsMessage *erts_factory_message_create(ErtsHeapFactory *, Process *, + ErtsProcLocks *, Uint sz); +void erts_factory_selfcontained_message_init(ErtsHeapFactory*, ErtsMessage *, Eterm *); void erts_factory_static_init(ErtsHeapFactory*, Eterm* hp, Uint size, ErlOffHeap*); void erts_factory_dummy_init(ErtsHeapFactory*); @@ -91,6 +99,8 @@ void erts_factory_undo(ErtsHeapFactory*); #include "external.h" #include "erl_process.h" +#define ERTS_INVALID_HFRAG_PTR ((ErlHeapFragment *) ~((UWord) 7)) + /* * This struct represents a heap fragment, which is used when there * isn't sufficient room in the process heap and we can't do a GC. @@ -105,33 +115,46 @@ struct erl_heap_fragment { Eterm mem[1]; /* Data */ }; -typedef struct erl_mesg { - struct erl_mesg* next; /* Next message */ - union { - ErtsDistExternal *dist_ext; - ErlHeapFragment *heap_frag; - void *attached; - } data; -#ifdef USE_VM_PROBES - Eterm m[3]; /* m[0] = message, m[1] = seq trace token, m[3] = dynamic trace user tag */ -#else - Eterm m[2]; /* m[0] = message, m[1] = seq trace token */ -#endif -} ErlMessage; - +/* m[0] = message, m[1] = seq trace token */ +#define ERL_MESSAGE_REF_ARRAY_SZ 2 #define ERL_MESSAGE_TERM(mp) ((mp)->m[0]) #define ERL_MESSAGE_TOKEN(mp) ((mp)->m[1]) + #ifdef USE_VM_PROBES +/* m[2] = dynamic trace user tag */ +#undef ERL_MESSAGE_REF_ARRAY_SZ +#define ERL_MESSAGE_REF_ARRAY_SZ 3 #define ERL_MESSAGE_DT_UTAG(mp) ((mp)->m[2]) +#else #endif +#define ERL_MESSAGE_REF_FIELDS__ \ + ErtsMessage *next; /* Next message */ \ + union { \ + ErtsDistExternal *dist_ext; \ + ErlHeapFragment *heap_frag; \ + void *attached; \ + } data; \ + Eterm m[ERL_MESSAGE_REF_ARRAY_SZ] + + +typedef struct erl_msg_ref__ { + ERL_MESSAGE_REF_FIELDS__; +} ErtsMessageRef; + +struct erl_mesg { + ERL_MESSAGE_REF_FIELDS__; + + ErlHeapFragment hfrag; +}; + /* Size of default message buffer (erl_message.c) */ #define ERL_MESSAGE_BUF_SZ 500 typedef struct { - ErlMessage* first; - ErlMessage** last; /* point to the last next pointer */ - ErlMessage** save; + ErtsMessage* first; + ErtsMessage** last; /* point to the last next pointer */ + ErtsMessage** save; Sint len; /* queue length */ /* @@ -139,14 +162,14 @@ typedef struct { * recv_set/1 instructions. */ BeamInstr* mark; /* address to rec_loop/2 instruction */ - ErlMessage** saved_last; /* saved last pointer */ + ErtsMessage** saved_last; /* saved last pointer */ } ErlMessageQueue; #ifdef ERTS_SMP typedef struct { - ErlMessage* first; - ErlMessage** last; /* point to the last next pointer */ + ErtsMessage* first; + ErtsMessage** last; /* point to the last next pointer */ Sint len; /* queue length */ } ErlMessageInQueue; @@ -197,7 +220,7 @@ do { \ /* Unlink current message */ #define UNLINK_MESSAGE(p,msgp) do { \ - ErlMessage* __mp = (msgp)->next; \ + ErtsMessage* __mp = (msgp)->next; \ *(p)->msg.save = __mp; \ (p)->msg.len--; \ if (__mp == NULL) \ @@ -213,76 +236,33 @@ do { \ #define SAVE_MESSAGE(p) \ (p)->msg.save = &(*(p)->msg.save)->next -/* - * ErtsMoveMsgAttachmentIntoProc() moves data attached to a message - * onto the heap of a process. The attached data is the content of - * the the message either on the internal format or on the external - * format, and also possibly a seq trace token on the internal format. - * If the message content is on the external format, the decode might - * fail. If the decoding fails, ERL_MESSAGE_TERM(M) will contain - * THE_NON_VALUE. That is, ERL_MESSAGE_TERM(M) *has* to be checked - * afterwards and taken care of appropriately. - * - * ErtsMoveMsgAttachmentIntoProc() will shallow copy to heap if - * possible; otherwise, move to heap via garbage collection. - * - * ErtsMoveMsgAttachmentIntoProc() is used when receiveing messages - * in process_main() and in hipe_check_get_msg(). - */ - -#define ErtsMoveMsgAttachmentIntoProc(M, P, ST, HT, FC, SWPO, SWPI) \ -do { \ - if ((M)->data.attached) { \ - Uint need__ = erts_msg_attached_data_size((M)); \ - { SWPO ; } \ - if ((ST) - (HT) >= need__) { \ - ErtsHeapFactory factory__; \ - erts_factory_proc_prealloc_init(&factory__, (P), need__); \ - erts_move_msg_attached_data_to_heap(&factory__, (M)); \ - erts_factory_close(&factory__); \ - if ((P)->mbuf != NULL) { \ - /* Heap was exhausted by messages. This is a rare case */ \ - /* that can currently (OTP 18) only happen if hamts are */ \ - /* far exceeding the estimated heap size. Do GC. */ \ - (FC) -= erts_garbage_collect((P), 0, NULL, 0); \ - } \ - } \ - else { \ - (FC) -= erts_garbage_collect((P), 0, NULL, 0); \ - } \ - { SWPI ; } \ - ASSERT(!(M)->data.attached); \ - } \ -} while (0) - #define ERTS_SND_FLG_NO_SEQ_TRACE (((unsigned) 1) << 0) #define ERTS_HEAP_FRAG_SIZE(DATA_WORDS) \ (sizeof(ErlHeapFragment) - sizeof(Eterm) + (DATA_WORDS)*sizeof(Eterm)) -#define ERTS_INIT_HEAP_FRAG(HEAP_FRAG_P, DATA_WORDS) \ -do { \ - (HEAP_FRAG_P)->next = NULL; \ - (HEAP_FRAG_P)->alloc_size = (DATA_WORDS); \ - (HEAP_FRAG_P)->used_size = (DATA_WORDS); \ - (HEAP_FRAG_P)->off_heap.first = NULL; \ - (HEAP_FRAG_P)->off_heap.overhead = 0; \ -} while (0) +#define ERTS_INIT_HEAP_FRAG(HEAP_FRAG_P, USED_WORDS, DATA_WORDS) \ + do { \ + (HEAP_FRAG_P)->next = NULL; \ + (HEAP_FRAG_P)->alloc_size = (DATA_WORDS); \ + (HEAP_FRAG_P)->used_size = (USED_WORDS); \ + (HEAP_FRAG_P)->off_heap.first = NULL; \ + (HEAP_FRAG_P)->off_heap.overhead = 0; \ + } while (0) void init_message(void); -void free_message(ErlMessage *); ErlHeapFragment* new_message_buffer(Uint); ErlHeapFragment* erts_resize_message_buffer(ErlHeapFragment *, Uint, Eterm *, Uint); void free_message_buffer(ErlHeapFragment *); void erts_queue_dist_message(Process*, ErtsProcLocks*, ErtsDistExternal *, Eterm); #ifdef USE_VM_PROBES -void erts_queue_message_probe(Process*, ErtsProcLocks*, ErlHeapFragment*, +void erts_queue_message_probe(Process*, ErtsProcLocks*, ErtsMessage*, Eterm message, Eterm seq_trace_token, Eterm dt_utag); #define erts_queue_message(RP,RL,BP,Msg,SEQ) \ erts_queue_message_probe((RP),(RL),(BP),(Msg),(SEQ),NIL) #else -void erts_queue_message(Process*, ErtsProcLocks*, ErlHeapFragment*, +void erts_queue_message(Process*, ErtsProcLocks*, ErtsMessage*, Eterm message, Eterm seq_trace_token); #define erts_queue_message_probe(RP,RL,BP,Msg,SEQ,TAG) \ erts_queue_message((RP),(RL),(BP),(Msg),(SEQ)) @@ -291,20 +271,141 @@ void erts_deliver_exit_message(Eterm, Process*, ErtsProcLocks *, Eterm, Eterm); Sint erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned); void erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp); -void erts_move_msg_mbuf_to_heap(Eterm**, ErlOffHeap*, ErlMessage *); - -Uint erts_msg_attached_data_size_aux(ErlMessage *msg); -void erts_move_msg_attached_data_to_heap(ErtsHeapFactory*, ErlMessage *); -Eterm erts_msg_distext2heap(Process *, ErtsProcLocks *, ErlHeapFragment **, - Eterm *, ErtsDistExternal *); +Uint erts_msg_attached_data_size_aux(ErtsMessage *msg); void erts_cleanup_offheap(ErlOffHeap *offheap); +void erts_save_message_in_proc(Process *p, ErtsMessage *msg); +Sint erts_move_messages_off_heap(Process *c_p); +Sint erts_complete_off_heap_message_queue_change(Process *c_p); +Eterm erts_change_off_heap_message_queue_state(Process *c_p, int enable); + +int erts_decode_dist_message(Process *, ErtsProcLocks, ErtsMessage *, int); + +void erts_cleanup_messages(ErtsMessage *mp); + +typedef struct { + Uint size; + ErtsMessage *msgp; +} ErtsMessageInfo; + +Uint erts_prep_msgq_for_inspection(Process *c_p, + Process *rp, + ErtsProcLocks rp_locks, + ErtsMessageInfo *mip); +void *erts_alloc_message_ref(void); +void erts_free_message_ref(void *); +#define ERTS_SMALL_FIX_MSG_SZ 10 +#define ERTS_MEDIUM_FIX_MSG_SZ 20 +#define ERTS_LARGE_FIX_MSG_SZ 30 + +void *erts_alloc_small_message(void); +void erts_free_small_message(void *mp); + +typedef struct { + ErtsMessage m; + Eterm data[ERTS_SMALL_FIX_MSG_SZ-1]; +} ErtsSmallFixSzMessage; + +typedef struct { + ErtsMessage m; + Eterm data[ERTS_MEDIUM_FIX_MSG_SZ-1]; +} ErtsMediumFixSzMessage; + +typedef struct { + ErtsMessage m; + Eterm data[ERTS_LARGE_FIX_MSG_SZ-1]; +} ErtsLargeFixSzMessage; + +ErtsMessage *erts_try_alloc_message_on_heap(Process *pp, + erts_aint32_t *psp, + ErtsProcLocks *plp, + Uint sz, + Eterm **hpp, + ErlOffHeap **ohpp, + int *on_heap_p); +ErtsMessage *erts_realloc_shrink_message(ErtsMessage *mp, Uint sz, + Eterm *brefs, Uint brefs_size); + +ERTS_GLB_FORCE_INLINE ErtsMessage *erts_alloc_message(Uint sz, Eterm **hpp); +ERTS_GLB_FORCE_INLINE ErtsMessage *erts_shrink_message(ErtsMessage *mp, Uint sz, + Eterm *brefs, Uint brefs_size); +ERTS_GLB_FORCE_INLINE void erts_free_message(ErtsMessage *mp); ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment*); -ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErlMessage *msg); +ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErtsMessage *msg); + +#define ERTS_MSG_COMBINED_HFRAG ((void *) 0x1) #if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_FORCE_INLINE ErtsMessage *erts_alloc_message(Uint sz, Eterm **hpp) +{ + ErtsMessage *mp; + + if (sz == 0) { + mp = erts_alloc_message_ref(); + mp->next = NULL; + ERL_MESSAGE_TERM(mp) = NIL; + mp->data.attached = NULL; + if (hpp) + *hpp = NULL; + return mp; + } + + mp = erts_alloc(ERTS_ALC_T_MSG, + sizeof(ErtsMessage) + (sz - 1)*sizeof(Eterm)); + + mp->next = NULL; + ERL_MESSAGE_TERM(mp) = NIL; + mp->data.attached = ERTS_MSG_COMBINED_HFRAG; + ERTS_INIT_HEAP_FRAG(&mp->hfrag, sz, sz); + + if (hpp) + *hpp = &mp->hfrag.mem[0]; + + return mp; +} + +ERTS_GLB_FORCE_INLINE ErtsMessage * +erts_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_size) +{ + if (sz == 0) { + ErtsMessage *nmp; + if (!mp->data.attached) + return mp; + ASSERT(mp->data.attached == ERTS_MSG_COMBINED_HFRAG); + nmp = erts_alloc_message_ref(); +#ifdef DEBUG + if (brefs && brefs_size) { + int i; + for (i = 0; i < brefs_size; i++) + ASSERT(is_non_value(brefs[i]) || is_immed(brefs[i])); + } +#endif + erts_free(ERTS_ALC_T_MSG, mp); + return nmp; + } + + ASSERT(mp->data.attached == ERTS_MSG_COMBINED_HFRAG); + ASSERT(mp->hfrag.used_size >= sz); + + if (sz >= (mp->hfrag.alloc_size - mp->hfrag.alloc_size / 16)) { + mp->hfrag.used_size = sz; + return mp; + } + + return erts_realloc_shrink_message(mp, sz, brefs, brefs_size); +} + +ERTS_GLB_FORCE_INLINE void erts_free_message(ErtsMessage *mp) +{ + if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) + erts_free_message_ref(mp); + else + erts_free(ERTS_ALC_T_MSG, mp); +} + ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment* bp) { Uint sz = 0; @@ -314,11 +415,17 @@ ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment* bp) return sz; } -ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErlMessage *msg) +ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErtsMessage *msg) { ASSERT(msg->data.attached); - if (is_value(ERL_MESSAGE_TERM(msg))) - return erts_used_frag_sz(msg->data.heap_frag); + if (is_value(ERL_MESSAGE_TERM(msg))) { + ErlHeapFragment *bp; + if (msg->data.attached == ERTS_MSG_COMBINED_HFRAG) + bp = &msg->hfrag; + else + bp = msg->data.heap_frag; + return erts_used_frag_sz(bp); + } else if (msg->data.dist_ext->heap_size < 0) return erts_msg_attached_data_size_aux(msg); else { diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c index 01414f326d..a37cda93ef 100644 --- a/erts/emulator/beam/erl_nif.c +++ b/erts/emulator/beam/erl_nif.c @@ -314,6 +314,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, ErtsProcLocks rp_locks = 0; Process* rp; Process* c_p; + ErtsMessage *mp; ErlHeapFragment* frags; Eterm receiver = to_pid->pid; int flush_me = 0; @@ -347,7 +348,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, ASSERT(frags == MBUF(&menv->phony_proc)); if (frags != NULL) { /* Move all offheap's from phony proc to the first fragment. - Quick and dirty, but erts_move_msg_mbuf_to_heap doesn't care. */ + Quick and dirty... */ ASSERT(!is_offheap(&frags->off_heap)); frags->off_heap = MSO(&menv->phony_proc); clear_offheap(&MSO(&menv->phony_proc)); @@ -359,7 +360,9 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, if (flush_me) { flush_env(env); /* Needed for ERTS_HOLE_CHECK */ } - erts_queue_message(rp, &rp_locks, frags, msg, am_undefined); + mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = frags; + erts_queue_message(rp, &rp_locks, mp, msg, am_undefined); if (c_p == rp) rp_locks &= ~ERTS_PROC_LOCK_MAIN; if (rp_locks) diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 62a44f7129..a4da288e79 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -1401,56 +1401,50 @@ setup_reference_table(void) for (i = 0; i < max; i++) { Process *proc = erts_pix2proc(i); if (proc) { - ErlMessage *msg; + int mli; + ErtsMessage *msg_list[] = { + proc->msg.first, +#ifdef ERTS_SMP + proc->msg_inq.first, +#endif + proc->msg_frag}; /* Insert Heap */ insert_offheap(&(proc->off_heap), HEAP_REF, proc->common.id); - /* Insert message buffers */ + /* Insert heap fragments buffers */ for(hfp = proc->mbuf; hfp; hfp = hfp->next) insert_offheap(&(hfp->off_heap), HEAP_REF, proc->common.id); - /* Insert msg msg buffers */ - for (msg = proc->msg.first; msg; msg = msg->next) { - ErlHeapFragment *heap_frag = NULL; - if (msg->data.attached) { - if (is_value(ERL_MESSAGE_TERM(msg))) - heap_frag = msg->data.heap_frag; - else { - if (msg->data.dist_ext->dep) - insert_dist_entry(msg->data.dist_ext->dep, - HEAP_REF, proc->common.id, 0); - if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) - heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); + + /* Insert msg buffers */ + for (mli = 0; mli < sizeof(msg_list)/sizeof(msg_list[0]); mli++) { + ErtsMessage *msg; + for (msg = msg_list[mli]; msg; msg = msg->next) { + ErlHeapFragment *heap_frag = NULL; + if (msg->data.attached) { + if (msg->data.attached == ERTS_MSG_COMBINED_HFRAG) + heap_frag = &msg->hfrag; + else if (is_value(ERL_MESSAGE_TERM(msg))) + heap_frag = msg->data.heap_frag; + else { + if (msg->data.dist_ext->dep) + insert_dist_entry(msg->data.dist_ext->dep, + HEAP_REF, proc->common.id, 0); + if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) + heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); + } } - } - if (heap_frag) - insert_offheap(&(heap_frag->off_heap), - HEAP_REF, - proc->common.id); - } -#ifdef ERTS_SMP - for (msg = proc->msg_inq.first; msg; msg = msg->next) { - ErlHeapFragment *heap_frag = NULL; - if (msg->data.attached) { - if (is_value(ERL_MESSAGE_TERM(msg))) - heap_frag = msg->data.heap_frag; - else { - if (msg->data.dist_ext->dep) - insert_dist_entry(msg->data.dist_ext->dep, - HEAP_REF, proc->common.id, 0); - if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) - heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); + while (heap_frag) { + insert_offheap(&(heap_frag->off_heap), + HEAP_REF, + proc->common.id); + heap_frag = heap_frag->next; } } - if (heap_frag) - insert_offheap(&(heap_frag->off_heap), - HEAP_REF, - proc->common.id); } -#endif /* Insert links */ if (ERTS_P_LINKS(proc)) insert_links(ERTS_P_LINKS(proc), proc->common.id); diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index bad9da90ea..e490ffea5a 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -148,6 +148,7 @@ extern BeamInstr beam_apply[]; extern BeamInstr beam_exit[]; extern BeamInstr beam_continue_exit[]; +int erts_default_spo_flags = 0; int erts_eager_check_io = 1; int erts_sched_compact_load; int erts_sched_balance_util = 0; @@ -351,7 +352,8 @@ struct erts_system_profile_flags_t erts_system_profile_flags; typedef enum { ERTS_PSTT_GC, /* Garbage Collect */ - ERTS_PSTT_CPC /* Check Process Code */ + ERTS_PSTT_CPC, /* Check Process Code */ + ERTS_PSTT_COHMQ /* Change off heap message queue */ } ErtsProcSysTaskType; #define ERTS_MAX_PROC_SYS_TASK_ARGS 2 @@ -982,7 +984,7 @@ reply_sched_wall_time(void *vswtrp) Eterm **hpp; Uint sz, *szp; ErlOffHeap *ohp = NULL; - ErlHeapFragment *bp = NULL; + ErtsMessage *mp = NULL; ASSERT(esdp); #ifdef ERTS_DIRTY_SCHEDULERS @@ -1038,12 +1040,12 @@ reply_sched_wall_time(void *vswtrp) if (hpp) break; - hp = erts_alloc_message_heap(sz, &bp, &ohp, rp, &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, sz, &hp, &ohp); szp = NULL; hpp = &hp; } - erts_queue_message(rp, &rp_locks, bp, msg, NIL); + erts_queue_message(rp, &rp_locks, mp, msg, NIL); if (swtrp->req_sched == esdp->no) rp_locks &= ~ERTS_PROC_LOCK_MAIN; @@ -6294,22 +6296,99 @@ erts_schedule_process(Process *p, erts_aint32_t state, ErtsProcLocks locks) schedule_process(p, state, locks); } -static void -schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy) +static int +schedule_process_sys_task(Process *p, erts_aint32_t prio, ErtsProcSysTask *st) { - /* - * Expects status lock to be locked when called, and - * returns with status lock unlocked... - */ - erts_aint32_t a = state, n, enq_prio = -1; + int res; + int locked; + ErtsProcSysTaskQs *stqs, *free_stqs; + erts_aint32_t state, a, n, enq_prio; int enqueue; /* < 0 -> use proxy */ - unsigned int prof_runnable_procs = erts_system_profile_flags.runnable_procs; + unsigned int prof_runnable_procs; + + res = 1; /* prepare for success */ + st->next = st->prev = st; /* Prep for empty prio queue */ + state = erts_smp_atomic32_read_nob(&p->state); + prof_runnable_procs = erts_system_profile_flags.runnable_procs; + locked = 0; + free_stqs = NULL; + if (state & ERTS_PSFLG_ACTIVE_SYS) + stqs = NULL; + else { + alloc_qs: + stqs = proc_sys_task_queues_alloc(); + stqs->qmask = 1 << prio; + stqs->ncount = 0; + stqs->q[PRIORITY_MAX] = NULL; + stqs->q[PRIORITY_HIGH] = NULL; + stqs->q[PRIORITY_NORMAL] = NULL; + stqs->q[PRIORITY_LOW] = NULL; + stqs->q[prio] = st; + } + + if (!locked) { + locked = 1; + erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS); + + state = erts_smp_atomic32_read_nob(&p->state); + if (state & ERTS_PSFLG_EXITING) { + free_stqs = stqs; + res = 0; + goto cleanup; + } + } + + if (!p->sys_task_qs) { + if (stqs) + p->sys_task_qs = stqs; + else + goto alloc_qs; + } + else { + free_stqs = stqs; + stqs = p->sys_task_qs; + if (!stqs->q[prio]) { + stqs->q[prio] = st; + stqs->qmask |= 1 << prio; + } + else { + st->next = stqs->q[prio]; + st->prev = stqs->q[prio]->prev; + st->next->prev = st; + st->prev->next = st; + ASSERT(stqs->qmask & (1 << prio)); + } + } + + if (ERTS_PSFLGS_GET_ACT_PRIO(state) > prio) { + erts_aint32_t n, a, e; + /* Need to elevate actual prio */ + + a = state; + do { + if (ERTS_PSFLGS_GET_ACT_PRIO(a) <= prio) { + n = a; + break; + } + n = e = a; + n &= ~ERTS_PSFLGS_ACT_PRIO_MASK; + n |= (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET); + a = erts_smp_atomic32_cmpxchg_nob(&p->state, n, e); + } while (a != e); + state = n; + } + + + a = state; + enq_prio = -1; /* Status lock prevents out of order "runnable proc" trace msgs */ ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)); - if (!prof_runnable_procs) + if (!prof_runnable_procs) { erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); + locked = 0; + } ASSERT(!(state & ERTS_PSFLG_PROXY)); @@ -6317,8 +6396,10 @@ schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy) erts_aint32_t e; n = e = a; - if (a & ERTS_PSFLG_FREE) + if (a & ERTS_PSFLG_FREE) { + res = 0; goto cleanup; /* We don't want to schedule free processes... */ + } enqueue = ERTS_ENQUEUE_NOT; n |= ERTS_PSFLG_ACTIVE_SYS; @@ -6342,29 +6423,24 @@ schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy) } erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); - prof_runnable_procs = 0; + locked = 0; } - if (enqueue != ERTS_ENQUEUE_NOT) { - Process *sched_p; - if (enqueue > 0) - sched_p = p; - else { - sched_p = make_proxy_proc(proxy, p, enq_prio); - proxy = NULL; - } - add2runq(sched_p, n, enq_prio); - } + if (enqueue != ERTS_ENQUEUE_NOT) + add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio), + n, enq_prio); cleanup: - if (prof_runnable_procs) + if (locked) erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS); - if (proxy) - free_proxy_proc(proxy); + if (free_stqs) + proc_sys_task_queues_free(free_stqs); ERTS_SMP_LC_ASSERT(!(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p))); + + return res; } static ERTS_INLINE int @@ -9696,13 +9772,13 @@ Process *schedule(Process *p, int calls) } } - if (!(state & ERTS_PSFLG_EXITING) - && ((FLAGS(p) & F_FORCE_GC) - || (MSO(p).overhead > BIN_VHEAP_SZ(p)))) { - reds -= erts_garbage_collect(p, 0, p->arg_reg, p->arity); - if (reds <= 0) { - p->fcalls = reds; - goto sched_out_proc; + if (ERTS_IS_GC_DESIRED(p)) { + if (!(state & ERTS_PSFLG_EXITING) && !(p->flags & F_DISABLE_GC)) { + reds -= erts_garbage_collect(p, 0, p->arg_reg, p->arity); + if (reds <= 0) { + p->fcalls = reds; + goto sched_out_proc; + } } } @@ -9742,7 +9818,7 @@ notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, Eterm st_result) if (rp) { ErtsProcLocks rp_locks; ErlOffHeap *ohp; - ErlHeapFragment* bp; + ErtsMessage *mp; Eterm *hp, msg, req_id, result; Uint st_result_sz, hsz; #ifdef DEBUG @@ -9754,11 +9830,7 @@ notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, Eterm st_result) st_result_sz = is_immed(st_result) ? 0 : size_object(st_result); hsz = st->req_id_sz + st_result_sz + 4 /* 3-tuple */; - hp = erts_alloc_message_heap(hsz, - &bp, - &ohp, - rp, - &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, hsz, &hp, &ohp); #ifdef DEBUG hp_start = hp; @@ -9783,7 +9855,7 @@ notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, Eterm st_result) ASSERT(hp_start + hsz == hp); #endif - erts_queue_message(rp, &rp_locks, bp, msg, NIL); + erts_queue_message(rp, &rp_locks, mp, msg, NIL); if (c_p == rp) rp_locks &= ~ERTS_PROC_LOCK_MAIN; @@ -10005,6 +10077,10 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) st = NULL; } break; + case ERTS_PSTT_COHMQ: + reds += erts_complete_off_heap_message_queue_change(c_p); + st_res = am_true; + break; default: ERTS_INTERNAL_ERROR("Invalid process sys task type"); st_res = am_false; @@ -10047,6 +10123,9 @@ cleanup_sys_tasks(Process *c_p, erts_aint32_t in_state, int in_reds) case ERTS_PSTT_CPC: st_res = am_false; break; + case ERTS_PSTT_COHMQ: + st_res = am_false; + break; default: ERTS_INTERNAL_ERROR("Invalid process sys task type"); st_res = am_false; @@ -10065,10 +10144,8 @@ BIF_RETTYPE erts_internal_request_system_task_3(BIF_ALIST_3) { Process *rp = erts_proc_lookup(BIF_ARG_1); - ErtsProcSysTaskQs *stqs, *free_stqs = NULL; ErtsProcSysTask *st = NULL; - erts_aint32_t prio, rp_state; - int rp_locked; + erts_aint32_t prio; Eterm noproc_res, req_type; if (!rp && !is_internal_pid(BIF_ARG_1)) { @@ -10125,7 +10202,6 @@ erts_internal_request_system_task_3(BIF_ALIST_3) } st = erts_alloc(ERTS_ALC_T_PROC_SYS_TSK, ERTS_PROC_SYS_TASK_SIZE(tot_sz)); - st->next = st->prev = st; /* Prep for empty prio queue */ ERTS_INIT_OFF_HEAP(&st->off_heap); hp = &st->heap[0]; @@ -10169,95 +10245,11 @@ erts_internal_request_system_task_3(BIF_ALIST_3) goto badarg; } - rp_state = erts_smp_atomic32_read_nob(&rp->state); - - rp_locked = 0; - - free_stqs = NULL; - if (rp_state & ERTS_PSFLG_ACTIVE_SYS) - stqs = NULL; - else { - alloc_qs: - stqs = proc_sys_task_queues_alloc(); - stqs->qmask = 1 << prio; - stqs->ncount = 0; - stqs->q[PRIORITY_MAX] = NULL; - stqs->q[PRIORITY_HIGH] = NULL; - stqs->q[PRIORITY_NORMAL] = NULL; - stqs->q[PRIORITY_LOW] = NULL; - stqs->q[prio] = st; - } - - if (!rp_locked) { - rp_locked = 1; - erts_smp_proc_lock(rp, ERTS_PROC_LOCK_STATUS); - - rp_state = erts_smp_atomic32_read_nob(&rp->state); - if (rp_state & ERTS_PSFLG_EXITING) { - erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); - rp = NULL; - free_stqs = stqs; - goto noproc; - } + if (!schedule_process_sys_task(rp, prio, st)) { + noproc: + notify_sys_task_executed(BIF_P, st, noproc_res); } - if (!rp->sys_task_qs) { - if (stqs) - rp->sys_task_qs = stqs; - else - goto alloc_qs; - } - else { - if (stqs) - free_stqs = stqs; - stqs = rp->sys_task_qs; - if (!stqs->q[prio]) { - stqs->q[prio] = st; - stqs->qmask |= 1 << prio; - } - else { - st->next = stqs->q[prio]; - st->prev = stqs->q[prio]->prev; - st->next->prev = st; - st->prev->next = st; - ASSERT(stqs->qmask & (1 << prio)); - } - } - - if (ERTS_PSFLGS_GET_ACT_PRIO(rp_state) > prio) { - erts_aint32_t n, a, e; - /* Need to elevate actual prio */ - - a = rp_state; - do { - if (ERTS_PSFLGS_GET_ACT_PRIO(a) <= prio) { - n = a; - break; - } - n = e = a; - n &= ~ERTS_PSFLGS_ACT_PRIO_MASK; - n |= (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET); - a = erts_smp_atomic32_cmpxchg_nob(&rp->state, n, e); - } while (a != e); - rp_state = n; - } - - /* - * schedule_process_sys_task() unlocks status - * lock on process. - */ - schedule_process_sys_task(rp, rp_state, NULL); - - if (free_stqs) - proc_sys_task_queues_free(free_stqs); - - BIF_RET(am_ok); - -noproc: - - notify_sys_task_executed(BIF_P, st, noproc_res); - if (free_stqs) - proc_sys_task_queues_free(free_stqs); BIF_RET(am_ok); badarg: @@ -10266,11 +10258,35 @@ badarg: erts_cleanup_offheap(&st->off_heap); erts_free(ERTS_ALC_T_PROC_SYS_TSK, st); } - if (free_stqs) - proc_sys_task_queues_free(free_stqs); BIF_ERROR(BIF_P, BADARG); } +void +erts_schedule_complete_off_heap_message_queue_change(Eterm pid) +{ + Process *rp = erts_proc_lookup(pid); + if (rp) { + ErtsProcSysTask *st; + erts_aint32_t state; + int i; + + st = erts_alloc(ERTS_ALC_T_PROC_SYS_TSK, + ERTS_PROC_SYS_TASK_SIZE(0)); + st->type = ERTS_PSTT_COHMQ; + st->requester = NIL; + st->reply_tag = NIL; + st->req_id = NIL; + st->req_id_sz = 0; + for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++) + st->arg[i] = NIL; + ERTS_INIT_OFF_HEAP(&st->off_heap); + state = erts_smp_atomic32_read_nob(&rp->state); + + if (!schedule_process_sys_task(rp, ERTS_PSFLGS_GET_USR_PRIO(state), st)) + erts_free(ERTS_ALC_T_PROC_SYS_TSK, st); + } +} + static void save_gc_task(Process *c_p, ErtsProcSysTask *st, int prio) { @@ -10716,6 +10732,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). Eterm args, /* Arguments for function (must be well-formed list). */ ErlSpawnOpts* so) /* Options for spawn. */ { + Uint flags = erts_default_process_flags; ErtsRunQueue *rq = NULL; Process *p; Sint arity; /* Number of arguments. */ @@ -10753,6 +10770,11 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). state |= (((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_ACT_PRIO_OFFSET) | ((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_USR_PRIO_OFFSET)); + if (so->flags & SPO_OFF_HEAP_MSGQ) { + state |= ERTS_PSFLG_OFF_HEAP_MSGQ; + flags |= F_OFF_HEAP_MSGQ; + } + if (!rq) rq = erts_get_runq_proc(parent); @@ -10775,7 +10797,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). BM_SWAP_TIMER(size,system); heap_need = arg_size; - p->flags = erts_default_process_flags; + p->flags = flags; p->static_flags = 0; if (so->flags & SPO_SYSTEM_PROC) @@ -10824,6 +10846,8 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->stop = p->hend = p->heap + sz; p->htop = p->heap; p->heap_sz = sz; + p->abandoned_heap = NULL; + p->live_hf_end = ERTS_INVALID_HFRAG_PTR; p->catches = 0; p->bin_vheap_sz = p->min_vheap_size; @@ -10894,6 +10918,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->accessor_bif_timers = NULL; #endif p->mbuf = NULL; + p->msg_frag = NULL; p->mbuf_sz = 0; p->psd = NULL; p->dictionary = NULL; @@ -11029,6 +11054,8 @@ void erts_init_empty_process(Process *p) p->stop = NULL; p->hend = NULL; p->heap = NULL; + p->abandoned_heap = NULL; + p->live_hf_end = ERTS_INVALID_HFRAG_PTR; p->gen_gcs = 0; p->max_gen_gcs = 0; p->min_heap_size = 0; @@ -11061,6 +11088,7 @@ void erts_init_empty_process(Process *p) p->old_htop = NULL; p->old_heap = NULL; p->mbuf = NULL; + p->msg_frag = NULL; p->mbuf_sz = 0; p->psd = NULL; ERTS_P_MONITORS(p) = NULL; @@ -11149,6 +11177,8 @@ erts_debug_verify_clean_empty_process(Process* p) ASSERT(p->htop == NULL); ASSERT(p->stop == NULL); ASSERT(p->hend == NULL); + ASSERT(p->abandoned_heap == NULL); + ASSERT(p->live_hf_end == ERTS_INVALID_HFRAG_PTR); ASSERT(p->heap == NULL); ASSERT(p->common.id == ERTS_INVALID_PID); ASSERT(ERTS_TRACER_PROC(p) == NIL); @@ -11226,8 +11256,6 @@ erts_cleanup_empty_process(Process* p) static void delete_process(Process* p) { - ErlMessage* mp; - VERBOSE(DEBUG_PROCESSES, ("Removing process: %T\n",p->common.id)); /* Cleanup psd */ @@ -11283,24 +11311,8 @@ delete_process(Process* p) erts_erase_dicts(p); /* free all pending messages */ - mp = p->msg.first; - while(mp != NULL) { - ErlMessage* next_mp = mp->next; - if (mp->data.attached) { - if (is_value(mp->m[0])) - free_message_buffer(mp->data.heap_frag); - else { - if (is_not_nil(mp->m[1])) { - ErlHeapFragment *heap_frag; - heap_frag = (ErlHeapFragment *) mp->data.dist_ext->ext_endp; - erts_cleanup_offheap(&heap_frag->off_heap); - } - erts_free_dist_ext_copy(mp->data.dist_ext); - } - } - free_message(mp); - mp = next_mp; - } + erts_cleanup_messages(p->msg.first); + p->msg.first = NULL; ASSERT(!p->nodes_monitors); ASSERT(!p->suspend_monitors); @@ -11488,6 +11500,9 @@ static ERTS_INLINE void send_exit_message(Process *to, ErtsProcLocks *to_locksp, Eterm exit_term, Uint term_size, Eterm token) { + ErtsMessage *mp; + ErlOffHeap *ohp; + if (token == NIL #ifdef USE_VM_PROBES || token == am_have_dt_utag @@ -11495,14 +11510,12 @@ send_exit_message(Process *to, ErtsProcLocks *to_locksp, ) { Eterm* hp; Eterm mess; - ErlHeapFragment* bp; - ErlOffHeap *ohp; - hp = erts_alloc_message_heap(term_size, &bp, &ohp, to, to_locksp); + mp = erts_alloc_message_heap(to, to_locksp, + term_size, &hp, &ohp); mess = copy_struct(exit_term, term_size, &hp, ohp); - erts_queue_message(to, to_locksp, bp, mess, NIL); + erts_queue_message(to, to_locksp, mp, mess, NIL); } else { - ErlHeapFragment* bp; Eterm* hp; Eterm mess; Eterm temp_token; @@ -11510,13 +11523,14 @@ send_exit_message(Process *to, ErtsProcLocks *to_locksp, ASSERT(is_tuple(token)); sz_token = size_object(token); - bp = new_message_buffer(term_size+sz_token); - hp = bp->mem; - mess = copy_struct(exit_term, term_size, &hp, &bp->off_heap); + + mp = erts_alloc_message_heap(to, to_locksp, + term_size+sz_token, &hp, &ohp); + mess = copy_struct(exit_term, term_size, &hp, ohp); /* the trace token must in this case be updated by the caller */ seq_trace_output(token, mess, SEQ_TRACE_SEND, to->common.id, NULL); - temp_token = copy_struct(token, sz_token, &hp, &bp->off_heap); - erts_queue_message(to, to_locksp, bp, mess, temp_token); + temp_token = copy_struct(token, sz_token, &hp, ohp); + erts_queue_message(to, to_locksp, mp, mess, temp_token); } } diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 65422b8c15..c6376c0166 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -916,6 +916,7 @@ struct process { Eterm* stop; /* Stack top */ Eterm* heap; /* Heap start */ Eterm* hend; /* Heap end */ + Eterm* abandoned_heap; Uint heap_sz; /* Size of heap in words */ Uint min_heap_size; /* Minimum size of heap (in words). */ Uint min_vheap_size; /* Minimum size of virtual heap (in words). */ @@ -1012,8 +1013,10 @@ struct process { Uint16 gen_gcs; /* Number of (minor) generational GCs. */ Uint16 max_gen_gcs; /* Max minor gen GCs before fullsweep. */ ErlOffHeap off_heap; /* Off-heap data updated by copy_struct(). */ - ErlHeapFragment* mbuf; /* Pointer to message buffer list */ - Uint mbuf_sz; /* Size of all message buffers */ + ErlHeapFragment* mbuf; /* Pointer to heap fragment list */ + ErlHeapFragment* live_hf_end; + ErtsMessage *msg_frag; /* Pointer to message fragment list */ + Uint mbuf_sz; /* Total size of heap fragments and message fragments */ ErtsPSD *psd; /* Rarely used process specific data */ Uint64 bin_vheap_sz; /* Virtual heap block size for binaries */ @@ -1041,6 +1044,7 @@ struct process { #ifdef CHECK_FOR_HOLES Eterm* last_htop; /* No need to scan the heap below this point. */ ErlHeapFragment* last_mbuf; /* No need to scan beyond this mbuf. */ + ErlHeapFragment* heap_hfrag; /* Heap abandoned, htop now lives in this frag */ #endif #ifdef DEBUG @@ -1064,6 +1068,7 @@ extern const Process erts_invalid_process; do { \ (p)->last_htop = 0; \ (p)->last_mbuf = 0; \ + (p)->heap_hfrag = NULL; \ } while (0) # define ERTS_HOLE_CHECK(p) erts_check_for_holes((p)) @@ -1141,14 +1146,15 @@ void erts_check_for_holes(Process* p); #define ERTS_PSFLG_RUNNING_SYS ERTS_PSFLG_BIT(15) #define ERTS_PSFLG_PROXY ERTS_PSFLG_BIT(16) #define ERTS_PSFLG_DELAYED_SYS ERTS_PSFLG_BIT(17) +#define ERTS_PSFLG_OFF_HEAP_MSGQ ERTS_PSFLG_BIT(18) #ifdef ERTS_DIRTY_SCHEDULERS -#define ERTS_PSFLG_DIRTY_CPU_PROC ERTS_PSFLG_BIT(18) -#define ERTS_PSFLG_DIRTY_IO_PROC ERTS_PSFLG_BIT(19) -#define ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q ERTS_PSFLG_BIT(20) -#define ERTS_PSFLG_DIRTY_IO_PROC_IN_Q ERTS_PSFLG_BIT(21) -#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 22) +#define ERTS_PSFLG_DIRTY_CPU_PROC ERTS_PSFLG_BIT(19) +#define ERTS_PSFLG_DIRTY_IO_PROC ERTS_PSFLG_BIT(20) +#define ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q ERTS_PSFLG_BIT(21) +#define ERTS_PSFLG_DIRTY_IO_PROC_IN_Q ERTS_PSFLG_BIT(22) +#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 23) #else -#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 18) +#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 19) #endif #define ERTS_PSFLGS_IN_PRQ_MASK (ERTS_PSFLG_IN_PRQ_MAX \ @@ -1196,12 +1202,15 @@ void erts_check_for_holes(Process* p); #define SPO_USE_ARGS 2 #define SPO_MONITOR 4 #define SPO_SYSTEM_PROC 8 +#define SPO_OFF_HEAP_MSGQ 16 + +extern int erts_default_spo_flags; /* * The following struct contains options for a process to be spawned. */ typedef struct { - Uint flags; + int flags; int error_code; /* Error code returned from create_process(). */ Eterm mref; /* Monitor ref returned (if SPO_MONITOR was given). */ @@ -1283,6 +1292,9 @@ extern struct erts_system_profile_flags_t erts_system_profile_flags; #define F_P2PNR_RESCHED (1 << 9) /* Process has been rescheduled via erts_pid2proc_not_running() */ #define F_FORCE_GC (1 << 10) /* Force gc at process in-scheduling */ #define F_DISABLE_GC (1 << 11) /* Disable GC */ +#define F_OFF_HEAP_MSGQ (1 << 12) /* Off heap msg queue */ +#define F_OFF_HEAP_MSGQ_CHNG (1 << 13) /* Off heap msg queue changing */ +#define F_ABANDONED_HEAP_USE (1 << 14) /* Have usage of abandoned heap */ /* process trace_flags */ #define F_SENSITIVE (1 << 0) @@ -1616,6 +1628,7 @@ void erts_schedule_thr_prgr_later_cleanup_op(void (*)(void *), void *, ErtsThrPrgrLaterOp *, UWord); +void erts_schedule_complete_off_heap_message_queue_change(Eterm pid); #if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK) int erts_dbg_check_halloc_lock(Process *p); @@ -1743,7 +1756,7 @@ Uint erts_debug_nbalance(void); int erts_debug_wait_completed(Process *c_p, int flags); -Uint erts_process_memory(Process *c_p); +Uint erts_process_memory(Process *c_p, int incl_msg_inq); #ifdef ERTS_SMP # define ERTS_GET_SCHEDULER_DATA_FROM_PROC(PROC) ((PROC)->scheduler_data) @@ -2058,6 +2071,22 @@ ERTS_GLB_INLINE void erts_smp_xrunq_unlock(ErtsRunQueue *rq, ErtsRunQueue *xrq); ERTS_GLB_INLINE void erts_smp_runqs_lock(ErtsRunQueue *rq1, ErtsRunQueue *rq2); ERTS_GLB_INLINE void erts_smp_runqs_unlock(ErtsRunQueue *rq1, ErtsRunQueue *rq2); +ERTS_GLB_INLINE ErtsMessage *erts_alloc_message_heap_state(Process *pp, + erts_aint32_t *psp, + ErtsProcLocks *plp, + Uint sz, + Eterm **hpp, + ErlOffHeap **ohpp); +ERTS_GLB_INLINE ErtsMessage *erts_alloc_message_heap(Process *pp, + ErtsProcLocks *plp, + Uint sz, + Eterm **hpp, + ErlOffHeap **ohpp); + +ERTS_GLB_INLINE void erts_shrink_message_heap(ErtsMessage **msgpp, Process *pp, + Eterm *start_hp, Eterm *used_hp, Eterm *end_hp, + Eterm *brefs, Uint brefs_size); + #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE @@ -2206,6 +2235,63 @@ erts_smp_runqs_unlock(ErtsRunQueue *rq1, ErtsRunQueue *rq2) #endif } +ERTS_GLB_INLINE ErtsMessage * +erts_alloc_message_heap_state(Process *pp, + erts_aint32_t *psp, + ErtsProcLocks *plp, + Uint sz, + Eterm **hpp, + ErlOffHeap **ohpp) +{ + int on_heap; + + if ((*psp) & ERTS_PSFLG_OFF_HEAP_MSGQ) { + ErtsMessage *mp = erts_alloc_message(sz, hpp); + *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; + return mp; + } + + return erts_try_alloc_message_on_heap(pp, psp, plp, sz, hpp, ohpp, &on_heap); +} + +ERTS_GLB_INLINE ErtsMessage * +erts_alloc_message_heap(Process *pp, + ErtsProcLocks *plp, + Uint sz, + Eterm **hpp, + ErlOffHeap **ohpp) +{ + erts_aint32_t state = erts_smp_atomic32_read_nob(&pp->state); + return erts_alloc_message_heap_state(pp, &state, plp, sz, hpp, ohpp); +} + +ERTS_GLB_INLINE void +erts_shrink_message_heap(ErtsMessage **msgpp, Process *pp, + Eterm *start_hp, Eterm *used_hp, Eterm *end_hp, + Eterm *brefs, Uint brefs_size) +{ + ASSERT(start_hp <= used_hp && used_hp <= end_hp); + if ((*msgpp)->data.attached == ERTS_MSG_COMBINED_HFRAG) + *msgpp = erts_shrink_message(*msgpp, used_hp - start_hp, + brefs, brefs_size); + else if (!(*msgpp)->data.attached) { + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN + & erts_proc_lc_my_proc_locks(pp)); + HRelease(pp, end_hp, used_hp); + } + else { + ErlHeapFragment *hfrag = (*msgpp)->data.heap_frag; + if (start_hp != used_hp) + hfrag = erts_resize_message_buffer(hfrag, used_hp - start_hp, + brefs, brefs_size); + else { + free_message_buffer(hfrag); + hfrag = NULL; + } + (*msgpp)->data.heap_frag = hfrag; + } +} + #endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ ERTS_GLB_INLINE ErtsAtomCacheMap *erts_get_atom_cache_map(Process *c_p); diff --git a/erts/emulator/beam/erl_process_dump.c b/erts/emulator/beam/erl_process_dump.c index 3b8ae11e94..71396561a3 100644 --- a/erts/emulator/beam/erl_process_dump.c +++ b/erts/emulator/beam/erl_process_dump.c @@ -78,13 +78,14 @@ erts_deep_process_dump(int to, void *to_arg) dump_binaries(to, to_arg, all_binaries); } -Uint erts_process_memory(Process *p) { - ErlMessage *mp; +Uint erts_process_memory(Process *p, int incl_msg_inq) { + ErtsMessage *mp; Uint size = 0; struct saved_calls *scb; size += sizeof(Process); - ERTS_SMP_MSGQ_MV_INQ2PRIVQ(p); + if (incl_msg_inq) + ERTS_SMP_MSGQ_MV_INQ2PRIVQ(p); erts_doforall_links(ERTS_P_LINKS(p), &erts_one_link_size, &size); erts_doforall_monitors(ERTS_P_MONITORS(p), &erts_one_mon_size, &size); @@ -92,7 +93,7 @@ Uint erts_process_memory(Process *p) { if (p->old_hend && p->old_heap) size += (p->old_hend - p->old_heap) * sizeof(Eterm); - size += p->msg.len * sizeof(ErlMessage); + size += p->msg.len * sizeof(ErtsMessage); for (mp = p->msg.first; mp; mp = mp->next) if (mp->data.attached) @@ -119,7 +120,7 @@ static void dump_process_info(int to, void *to_arg, Process *p) { Eterm* sp; - ErlMessage* mp; + ErtsMessage* mp; int yreg = -1; ERTS_SMP_MSGQ_MV_INQ2PRIVQ(p); @@ -657,6 +658,8 @@ erts_dump_extended_process_state(int to, void *to_arg, erts_aint32_t psflg) { erts_print(to, to_arg, "PROXY"); break; case ERTS_PSFLG_DELAYED_SYS: erts_print(to, to_arg, "DELAYED_SYS"); break; + case ERTS_PSFLG_OFF_HEAP_MSGQ: + erts_print(to, to_arg, "OFF_HEAP_MSGQ"); break; #ifdef ERTS_DIRTY_SCHEDULERS case ERTS_PSFLG_DIRTY_CPU_PROC: erts_print(to, to_arg, "DIRTY_CPU_PROC"); break; diff --git a/erts/emulator/beam/erl_process_lock.h b/erts/emulator/beam/erl_process_lock.h index 788348e613..a64c993e8f 100644 --- a/erts/emulator/beam/erl_process_lock.h +++ b/erts/emulator/beam/erl_process_lock.h @@ -854,9 +854,6 @@ ERTS_GLB_INLINE void erts_proc_dec_refc(Process *p) #endif if (!referred) { ASSERT(ERTS_PROC_IS_EXITING(p)); - ASSERT(ERTS_AINT_NULL - == erts_ptab_pix2intptr_ddrb(&erts_proc, - internal_pid_index(p->common.id))); erts_free_proc(p); } } @@ -872,9 +869,6 @@ ERTS_GLB_INLINE void erts_proc_add_refc(Process *p, Sint add_refc) #endif if (!referred) { ASSERT(ERTS_PROC_IS_EXITING(p)); - ASSERT(ERTS_AINT_NULL - == erts_ptab_pix2intptr_ddrb(&erts_proc, - internal_pid_index(p->common.id))); erts_free_proc(p); } } diff --git a/erts/emulator/beam/erl_time_sup.c b/erts/emulator/beam/erl_time_sup.c index 7327e0b48c..7ec64506e8 100644 --- a/erts/emulator/beam/erl_time_sup.c +++ b/erts/emulator/beam/erl_time_sup.c @@ -1919,15 +1919,16 @@ send_time_offset_changed_notifications(void *new_offsetp) ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK; erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK); if (erts_lookup_monitor(ERTS_P_MONITORS(rp), ref)) { - ErlHeapFragment *bp; + ErtsMessage *mp; ErlOffHeap *ohp; Eterm message; - hp = erts_alloc_message_heap(hsz, &bp, &ohp, rp, &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, + hsz, &hp, &ohp); *patch_refp = ref; ASSERT(hsz == size_object(message_template)); message = copy_struct(message_template, hsz, &hp, ohp); - erts_queue_message(rp, &rp_locks, bp, message, NIL); + erts_queue_message(rp, &rp_locks, mp, message, NIL); } erts_smp_proc_unlock(rp, rp_locks); } diff --git a/erts/emulator/beam/erl_trace.c b/erts/emulator/beam/erl_trace.c index e9dd96efc4..d02f1f7213 100644 --- a/erts/emulator/beam/erl_trace.c +++ b/erts/emulator/beam/erl_trace.c @@ -114,15 +114,10 @@ void erts_init_trace(void) { static Eterm system_seq_tracer; -#ifdef ERTS_SMP #define ERTS_ALLOC_SYSMSG_HEAP(SZ, BPP, OHPP, UNUSED) \ (*(BPP) = new_message_buffer((SZ)), \ *(OHPP) = &(*(BPP))->off_heap, \ (*(BPP))->mem) -#else -#define ERTS_ALLOC_SYSMSG_HEAP(SZ, BPP, OHPP, RPP) \ - erts_alloc_message_heap((SZ), (BPP), (OHPP), (RPP), 0) -#endif #ifdef ERTS_SMP #define ERTS_ENQ_TRACE_MSG(FPID, TPID, MSG, BP) \ @@ -131,8 +126,12 @@ do { \ enqueue_sys_msg_unlocked(SYS_MSG_TYPE_TRACE, (FPID), (TPID), (MSG), (BP)); \ } while(0) #else -#define ERTS_ENQ_TRACE_MSG(FPID, TPROC, MSG, BP) \ - erts_queue_message((TPROC), NULL, (BP), (MSG), NIL) +#define ERTS_ENQ_TRACE_MSG(FPID, TPROC, MSG, BP) \ + do { \ + ErtsMessage *mp__ = erts_alloc_message(0, NULL); \ + mp__->data.heap_frag = (BP); \ + erts_queue_message((TPROC), NULL, mp__, (MSG), NIL); \ + } while (0) #endif /* @@ -591,11 +590,9 @@ send_to_port(Process *c_p, Eterm message, static void profile_send(Eterm from, Eterm message) { Uint sz = 0; - ErlHeapFragment *bp = NULL; Uint *hp = NULL; Eterm msg = NIL; Process *profile_p = NULL; - ErlOffHeap *off_heap = NULL; Eterm profiler = erts_get_system_profile(); @@ -621,6 +618,7 @@ profile_send(Eterm from, Eterm message) { } } else { + ErtsMessage *mp; ASSERT(is_internal_pid(profiler)); profile_p = erts_proc_lookup(profiler); @@ -629,10 +627,13 @@ profile_send(Eterm from, Eterm message) { return; sz = size_object(message); - hp = erts_alloc_message_heap(sz, &bp, &off_heap, profile_p, 0); - msg = copy_struct(message, sz, &hp, &bp->off_heap); - - erts_queue_message(profile_p, NULL, bp, msg, NIL); + mp = erts_alloc_message(sz, &hp); + if (sz == 0) + msg = message; + else + msg = copy_struct(message, sz, &hp, &mp->hfrag.off_heap); + + erts_queue_message(profile_p, NULL, mp, msg, NIL); } } @@ -1233,7 +1234,11 @@ seq_trace_output_generic(Eterm token, Eterm msg, Uint type, erts_smp_mtx_unlock(&smq_mtx); #else /* trace_token must be NIL here */ - erts_queue_message(tracer, NULL, bp, mess, NIL); + { + ErtsMessage *mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = bp; + erts_queue_message(tracer, NULL, mp, mess, NIL); + } #endif } } @@ -2308,7 +2313,11 @@ monitor_long_schedule_proc(Process *p, BeamInstr *in_fp, BeamInstr *out_fp, Uint #ifdef ERTS_SMP enqueue_sys_msg(SYS_MSG_TYPE_SYSMON, p->common.id, NIL, msg, bp); #else - erts_queue_message(monitor_p, NULL, bp, msg, NIL); + { + ErtsMessage *mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = bp; + erts_queue_message(monitor_p, NULL, mp, msg, NIL); + } #endif } void @@ -2369,7 +2378,11 @@ monitor_long_schedule_port(Port *pp, ErtsPortTaskType type, Uint time) #ifdef ERTS_SMP enqueue_sys_msg(SYS_MSG_TYPE_SYSMON, pp->common.id, NIL, msg, bp); #else - erts_queue_message(monitor_p, NULL, bp, msg, NIL); + { + ErtsMessage *mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = bp; + erts_queue_message(monitor_p, NULL, mp, msg, NIL); + } #endif } @@ -2440,7 +2453,11 @@ monitor_long_gc(Process *p, Uint time) { #ifdef ERTS_SMP enqueue_sys_msg(SYS_MSG_TYPE_SYSMON, p->common.id, NIL, msg, bp); #else - erts_queue_message(monitor_p, NULL, bp, msg, NIL); + { + ErtsMessage *mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = bp; + erts_queue_message(monitor_p, NULL, mp, msg, NIL); + } #endif } @@ -2511,7 +2528,11 @@ monitor_large_heap(Process *p) { #ifdef ERTS_SMP enqueue_sys_msg(SYS_MSG_TYPE_SYSMON, p->common.id, NIL, msg, bp); #else - erts_queue_message(monitor_p, NULL, bp, msg, NIL); + { + ErtsMessage *mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = bp; + erts_queue_message(monitor_p, NULL, mp, msg, NIL); + } #endif } @@ -2539,7 +2560,11 @@ monitor_generic(Process *p, Eterm type, Eterm spec) { #ifdef ERTS_SMP enqueue_sys_msg(SYS_MSG_TYPE_SYSMON, p->common.id, NIL, msg, bp); #else - erts_queue_message(monitor_p, NULL, bp, msg, NIL); + { + ErtsMessage *mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = bp; + erts_queue_message(monitor_p, NULL, mp, msg, NIL); + } #endif } @@ -3331,8 +3356,11 @@ sys_msg_dispatcher_func(void *unused) goto failure; } else { + ErtsMessage *mp; queue_proc_msg: - erts_queue_message(proc,&proc_locks,smqp->bp,smqp->msg,NIL); + mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = smqp->bp; + erts_queue_message(proc,&proc_locks,mp,smqp->msg,NIL); #ifdef DEBUG_PRINTOUTS erts_fprintf(stderr, "delivered\n"); #endif diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index 052994b972..594c0ccf94 100644 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -1274,11 +1274,11 @@ int erts_print_system_version(int to, void *arg, Process *c_p); int erts_hibernate(Process* c_p, Eterm module, Eterm function, Eterm args, Eterm* reg); -ERTS_GLB_INLINE int erts_is_literal(Eterm tptr, Eterm *ptr); +ERTS_GLB_FORCE_INLINE int erts_is_literal(Eterm tptr, Eterm *ptr); #if ERTS_GLB_INLINE_INCL_FUNC_DEF -ERTS_GLB_INLINE int erts_is_literal(Eterm tptr, Eterm *ptr) +ERTS_GLB_FORCE_INLINE int erts_is_literal(Eterm tptr, Eterm *ptr) { ASSERT(is_boxed(tptr) || is_list(tptr)); ASSERT(ptr == ptr_val(tptr)); @@ -1347,124 +1347,6 @@ extern erts_driver_t fd_driver; int erts_beam_jump_table(void); -/* Should maybe be placed in erl_message.h, but then we get an include mess. */ -ERTS_GLB_INLINE Eterm * -erts_alloc_message_heap_state(Uint size, - ErlHeapFragment **bpp, - ErlOffHeap **ohpp, - Process *receiver, - ErtsProcLocks *receiver_locks, - erts_aint32_t *statep); - -ERTS_GLB_INLINE Eterm * -erts_alloc_message_heap(Uint size, - ErlHeapFragment **bpp, - ErlOffHeap **ohpp, - Process *receiver, - ErtsProcLocks *receiver_locks); - -#if ERTS_GLB_INLINE_INCL_FUNC_DEF - -/* - * NOTE: erts_alloc_message_heap() releases msg q and status - * lock on receiver without ensuring that other locks are - * held. User is responsible to ensure that the receiver - * pointer cannot become invalid until after message has - * been passed. This is normal done either by increasing - * reference count on process (preferred) or by holding - * main or link lock over the whole message passing - * operation. - */ - -ERTS_GLB_INLINE Eterm * -erts_alloc_message_heap_state(Uint size, - ErlHeapFragment **bpp, - ErlOffHeap **ohpp, - Process *receiver, - ErtsProcLocks *receiver_locks, - erts_aint32_t *statep) -{ - Eterm *hp; - erts_aint32_t state; -#ifdef ERTS_SMP - int locked_main = 0; - state = erts_smp_atomic32_read_acqb(&receiver->state); - if (statep) - *statep = state; - if (state & (ERTS_PSFLG_EXITING - | ERTS_PSFLG_PENDING_EXIT)) - goto allocate_in_mbuf; -#endif - - if (size > (Uint) INT_MAX) - erl_exit(ERTS_ABORT_EXIT, "HUGE size (%beu)\n", size); - - if ( -#if defined(ERTS_SMP) - *receiver_locks & ERTS_PROC_LOCK_MAIN -#else - 1 -#endif - ) { -#ifdef ERTS_SMP - try_allocate_on_heap: -#endif - state = erts_smp_atomic32_read_nob(&receiver->state); - if (statep) - *statep = state; - if ((state & (ERTS_PSFLG_EXITING - | ERTS_PSFLG_PENDING_EXIT)) - || (receiver->flags & F_DISABLE_GC) - || HEAP_LIMIT(receiver) - HEAP_TOP(receiver) <= size) { - /* - * The heap is either potentially in an inconsistent - * state, or not large enough. - */ -#ifdef ERTS_SMP - if (locked_main) { - *receiver_locks &= ~ERTS_PROC_LOCK_MAIN; - erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MAIN); - } -#endif - goto allocate_in_mbuf; - } - hp = HEAP_TOP(receiver); - HEAP_TOP(receiver) = hp + size; - *bpp = NULL; - *ohpp = &MSO(receiver); - } -#ifdef ERTS_SMP - else if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MAIN) == 0) { - locked_main = 1; - *receiver_locks |= ERTS_PROC_LOCK_MAIN; - goto try_allocate_on_heap; - } -#endif - else { - ErlHeapFragment *bp; - allocate_in_mbuf: - bp = new_message_buffer(size); - hp = bp->mem; - *bpp = bp; - *ohpp = &bp->off_heap; - } - - return hp; -} - -ERTS_GLB_INLINE Eterm * -erts_alloc_message_heap(Uint size, - ErlHeapFragment **bpp, - ErlOffHeap **ohpp, - Process *receiver, - ErtsProcLocks *receiver_locks) -{ - return erts_alloc_message_heap_state(size, bpp, ohpp, receiver, - receiver_locks, NULL); -} - -#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ - #define DeclareTmpHeap(VariableName,Size,Process) \ Eterm VariableName[Size] #define DeclareTypedTmpHeap(Type,VariableName,Process) \ @@ -1522,6 +1404,7 @@ dtrace_fun_decode(Process *process, erts_snprintf(mfa_buf, DTRACE_TERM_BUF_SIZE, "%T:%T/%d", module, function, arity); } + #endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */ #endif /* !__GLOBAL_H__ */ diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index fdd26fcc4b..1b0c617632 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -1410,7 +1410,7 @@ queue_port_sched_op_reply(Process *rp, erts_factory_trim_and_close(factory, &msg, 1); - erts_queue_message(rp, rp_locksp, factory->heap_frags, msg, NIL); + erts_queue_message(rp, rp_locksp, factory->message, msg, NIL); } static void @@ -1418,12 +1418,9 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg) { Process *rp = erts_proc_lookup_raw(to); if (rp) { - ErlOffHeap *ohp; - ErlHeapFragment* bp; ErtsHeapFactory factory; Eterm msg_copy; Uint hsz, msg_sz; - Eterm *hp; ErtsProcLocks rp_locks = 0; hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; @@ -1434,18 +1431,13 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg) hsz += msg_sz; } - hp = erts_alloc_message_heap(hsz, - &bp, - &ohp, - rp, - &rp_locks); - erts_factory_message_init(&factory, rp, hp, bp); - if (is_immed(msg)) - msg_copy = msg; - else { - msg_copy = copy_struct(msg, msg_sz, &hp, ohp); - factory.hp = hp; - } + (void) erts_factory_message_create(&factory, rp, + &rp_locks, hsz); + msg_copy = (is_immed(msg) + ? msg + : copy_struct(msg, msg_sz, + &factory.hp, + factory.off_heap)); queue_port_sched_op_reply(rp, &rp_locks, @@ -3050,16 +3042,17 @@ deliver_result(Eterm sender, Eterm pid, Eterm res) if (rp) { Eterm tuple; - ErlHeapFragment *bp; + ErtsMessage *mp; ErlOffHeap *ohp; Eterm* hp; Uint sz_res; sz_res = size_object(res); - hp = erts_alloc_message_heap(sz_res + 3, &bp, &ohp, rp, &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, + sz_res + 3, &hp, &ohp); res = copy_struct(res, sz_res, &hp, ohp); tuple = TUPLE2(hp, sender, res); - erts_queue_message(rp, &rp_locks, bp, tuple, NIL); + erts_queue_message(rp, &rp_locks, mp, tuple, NIL); if (rp_locks) erts_smp_proc_unlock(rp, rp_locks); @@ -3087,7 +3080,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to, Eterm tuple; Process* rp; Eterm* hp; - ErlHeapFragment *bp; + ErtsMessage *mp; ErlOffHeap *ohp; ErtsProcLocks rp_locks = 0; int scheduler = erts_get_scheduler_id() != 0; @@ -3113,7 +3106,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to, if (!rp) return; - hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, need, &hp, &ohp); listp = NIL; if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) { @@ -3155,7 +3148,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to, tuple = TUPLE2(hp, prt->common.id, tuple); hp += 3; - erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined); + erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined); if (rp_locks) erts_smp_proc_unlock(rp, rp_locks); if (!scheduler) @@ -3229,7 +3222,7 @@ deliver_vec_message(Port* prt, /* Port */ Eterm tuple; Process* rp; Eterm* hp; - ErlHeapFragment *bp; + ErtsMessage *mp; ErlOffHeap *ohp; ErtsProcLocks rp_locks = 0; int scheduler = erts_get_scheduler_id() != 0; @@ -3261,7 +3254,7 @@ deliver_vec_message(Port* prt, /* Port */ need += (hlen+csize)*2; } - hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, need, &hp, &ohp); listp = NIL; iov += vsize; @@ -3322,7 +3315,7 @@ deliver_vec_message(Port* prt, /* Port */ tuple = TUPLE2(hp, prt->common.id, tuple); hp += 3; - erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined); + erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined); erts_smp_proc_unlock(rp, rp_locks); if (!scheduler) erts_proc_dec_refc(rp); @@ -3813,7 +3806,6 @@ write_port_control_result(int control_flags, ErlDrvSizeT resp_size, char *pre_alloc_buf, Eterm **hpp, - ErlHeapFragment *bp, ErlOffHeap *ohp) { Eterm res; @@ -3887,9 +3879,6 @@ port_sig_control(Port *prt, if (res == ERTS_PORT_OP_DONE) { Eterm msg; - Eterm *hp; - ErlHeapFragment *bp; - ErlOffHeap *ohp; ErtsHeapFactory factory; Process *rp; ErtsProcLocks rp_locks = 0; @@ -3909,22 +3898,15 @@ port_sig_control(Port *prt, hsz = rsz + ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; - hp = erts_alloc_message_heap(hsz, - &bp, - &ohp, - rp, - &rp_locks); - erts_factory_message_init(&factory, rp, hp, bp); + (void) erts_factory_message_create(&factory, rp, + &rp_locks, hsz); msg = write_port_control_result(control_flags, resp_bufp, resp_size, &resp_buf[0], - &hp, - bp, - ohp); - factory.hp = hp; - + &factory.hp, + factory.off_heap); queue_port_sched_op_reply(rp, &rp_locks, &factory, @@ -4065,7 +4047,6 @@ erts_port_control(Process* c_p, resp_size, &resp_buf[0], &hp, - NULL, &c_p->off_heap); BUMP_REDS(c_p, ERTS_PORT_REDS_CONTROL); return ERTS_PORT_OP_DONE; @@ -4224,21 +4205,14 @@ port_sig_call(Port *prt, hsz = erts_decode_ext_size((byte *) resp_bufp, resp_size); if (hsz >= 0) { - ErlHeapFragment* bp; - ErlOffHeap* ohp; ErtsHeapFactory factory; byte *endp; hsz += 3; /* ok tuple */ hsz += ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE; - hp = erts_alloc_message_heap(hsz, - &bp, - &ohp, - rp, - &rp_locks); + (void) erts_factory_message_create(&factory, rp, &rp_locks, hsz); endp = (byte *) resp_bufp; - erts_factory_message_init(&factory, rp, hp, bp); msg = erts_decode_ext(&factory, &endp); if (is_value(msg)) { hp = erts_produce_heap(&factory, @@ -4499,7 +4473,9 @@ port_sig_info(Port *prt, sigdp->u.info.item); if (is_value(value)) { ErtsHeapFactory factory; - erts_factory_message_init(&factory, NULL, hp, bp); + ErtsMessage *mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = bp; + erts_factory_selfcontained_message_init(&factory, mp, hp); queue_port_sched_op_reply(rp, &rp_locks, &factory, @@ -4587,8 +4563,8 @@ reply_io_bytes(void *vreq) rp = erts_proc_lookup(req->pid); if (rp) { - ErlOffHeap *ohp = NULL; - ErlHeapFragment *bp = NULL; + ErlOffHeap *ohp; + ErtsMessage *mp; ErtsProcLocks rp_locks; Eterm ref, msg, ein, eout, *hp; Uint64 in, out; @@ -4610,7 +4586,7 @@ reply_io_bytes(void *vreq) erts_bld_uint64(NULL, &hsz, in); erts_bld_uint64(NULL, &hsz, out); - hp = erts_alloc_message_heap(hsz, &bp, &ohp, rp, &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, hsz, &hp, &ohp); ref = make_internal_ref(hp); write_ref_thing(hp, req->refn[0], req->refn[1], req->refn[2]); @@ -4620,7 +4596,7 @@ reply_io_bytes(void *vreq) eout = erts_bld_uint64(&hp, NULL, out); msg = TUPLE4(hp, ref, make_small(sched_id), ein, eout); - erts_queue_message(rp, &rp_locks, bp, msg, NIL); + erts_queue_message(rp, &rp_locks, mp, msg, NIL); if (req->sched_id == sched_id) rp_locks &= ~ERTS_PROC_LOCK_MAIN; @@ -5065,11 +5041,11 @@ ErlDrvTermData driver_mk_term_nil(void) void driver_report_exit(ErlDrvPort ix, int status) { Eterm* hp; + ErlOffHeap *ohp; Eterm tuple; Process *rp; Eterm pid; - ErlHeapFragment *bp = NULL; - ErlOffHeap *ohp; + ErtsMessage *mp; ErtsProcLocks rp_locks = 0; int scheduler = erts_get_scheduler_id() != 0; Port* prt = erts_drvport2port(ix); @@ -5089,13 +5065,13 @@ void driver_report_exit(ErlDrvPort ix, int status) if (!rp) return; - hp = erts_alloc_message_heap(3+3, &bp, &ohp, rp, &rp_locks); + mp = erts_alloc_message_heap(rp, &rp_locks, 3+3, &hp, &ohp); tuple = TUPLE2(hp, am_exit_status, make_small(status)); hp += 3; tuple = TUPLE2(hp, prt->common.id, tuple); - erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined); + erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined); erts_smp_proc_unlock(rp, rp_locks); if (!scheduler) @@ -5205,7 +5181,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) ErtsProcLocks rp_locks = 0; struct b2t_states__ b2t; int scheduler; - int is_heap_need_limited = 1; ErtsSchedulerData *esdp = erts_get_scheduler_data(); ERTS_UNDEF(mess,NIL); @@ -5374,9 +5349,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) need += hsz; ptr += 2; depth++; - if (size > MAP_SMALL_MAP_LIMIT*3) { /* may contain big map */ - is_heap_need_limited = 0; - } break; } case ERL_DRV_MAP: { /* int */ @@ -5384,7 +5356,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) if ((int) ptr[0] < 0) ERTS_DDT_FAIL; if (ptr[0] > MAP_SMALL_MAP_LIMIT) { need += HASHMAP_ESTIMATED_HEAP_SIZE(ptr[0]); - is_heap_need_limited = 0; } else { need += MAP_HEADER_FLATMAP_SZ + 1 + 2*ptr[0]; } @@ -5423,17 +5394,7 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) goto done; } - /* Try copy directly to destination heap if we know there are no big maps */ - if (is_heap_need_limited) { - ErlOffHeap *ohp; - ErlHeapFragment* bp; - Eterm* hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks); - erts_factory_message_init(&factory, rp, hp, bp); - } - else { - erts_factory_message_init(&factory, NULL, NULL, - new_message_buffer(need)); - } + (void) erts_factory_message_create(&factory, rp, &rp_locks, need); /* * Interpret the instructions and build the term. @@ -5702,9 +5663,9 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) if (res > 0) { mess = ESTACK_POP(stack); /* get resulting value */ - erts_factory_close(&factory); + erts_factory_trim_and_close(&factory, &mess, 1); /* send message */ - erts_queue_message(rp, &rp_locks, factory.heap_frags, mess, am_undefined); + erts_queue_message(rp, &rp_locks, factory.message, mess, am_undefined); } else { if (b2t.ix > b2t.used) diff --git a/erts/emulator/beam/sys.h b/erts/emulator/beam/sys.h index 5db7ee6d7c..90e16ca14f 100644 --- a/erts/emulator/beam/sys.h +++ b/erts/emulator/beam/sys.h @@ -70,8 +70,10 @@ #endif #if ERTS_CAN_INLINE +#define ERTS_GLB_FORCE_INLINE static ERTS_FORCE_INLINE #define ERTS_GLB_INLINE static ERTS_INLINE #else +#define ERTS_GLB_FORCE_INLINE #define ERTS_GLB_INLINE #endif diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c index a741e2e2e6..e03113b8cc 100644 --- a/erts/emulator/beam/utils.c +++ b/erts/emulator/beam/utils.c @@ -110,7 +110,6 @@ Eterm* erts_heap_alloc(Process* p, Uint need, Uint xtra) { ErlHeapFragment* bp; - Eterm* htop; Uint n; #if defined(DEBUG) || defined(CHECK_FOR_HOLES) Uint i; @@ -156,16 +155,6 @@ erts_heap_alloc(Process* p, Uint need, Uint xtra) n--; #endif - /* - * When we have created a heap fragment, we are no longer allowed - * to store anything more on the heap. - */ - htop = HEAP_TOP(p); - if (htop < HEAP_LIMIT(p)) { - *htop = make_pos_bignum_header(HEAP_LIMIT(p)-htop-1); - HEAP_TOP(p) = HEAP_LIMIT(p); - } - bp->next = MBUF(p); MBUF(p) = bp; bp->alloc_size = n; @@ -2285,7 +2274,11 @@ static void do_send_logger_message(Eterm *hp, ErlOffHeap *ohp, ErlHeapFragment * erts_queue_error_logger_message(from, message, bp); } #else - erts_queue_message(p, NULL /* only used for smp build */, bp, message, NIL); + { + ErtsMessage *mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = bp; + erts_queue_message(p, NULL /* only used for smp build */, mp, message, NIL); + } #endif } diff --git a/erts/emulator/hipe/hipe_gc.c b/erts/emulator/hipe/hipe_gc.c index 2c747771ac..2e19bf88bf 100644 --- a/erts/emulator/hipe/hipe_gc.c +++ b/erts/emulator/hipe/hipe_gc.c @@ -46,10 +46,6 @@ Eterm *fullsweep_nstack(Process *p, Eterm *n_htop) /* arch-specific nstack walk state */ struct nstack_walk_state walk_state; - /* fullsweep-specific state */ - char *src, *oh; - Uint src_size, oh_size; - if (!p->hipe.nstack) { ASSERT(!p->hipe.nsp && !p->hipe.nstend); return n_htop; @@ -66,11 +62,6 @@ Eterm *fullsweep_nstack(Process *p, Eterm *n_htop) sdesc = nstack_walk_init_sdesc(p, &walk_state); - src = (char*)HEAP_START(p); - src_size = (char*)HEAP_TOP(p) - src; - oh = (char*)OLD_HEAP(p); - oh_size = (char*)OLD_HTOP(p) - oh; - for (;;) { if (nstack_walk_nsp_reached_end(nsp, nsp_end)) { if (nsp == nsp_end) { @@ -97,8 +88,7 @@ Eterm *fullsweep_nstack(Process *p, Eterm *n_htop) if (IS_MOVED_BOXED(val)) { ASSERT(is_boxed(val)); *nsp_i = val; - } else if (in_area(ptr, src, src_size) || - in_area(ptr, oh, oh_size)) { + } else if (!erts_is_literal(gval, ptr)) { MOVE_BOXED(ptr, val, n_htop, nsp_i); } } else if (is_list(gval)) { @@ -106,8 +96,7 @@ Eterm *fullsweep_nstack(Process *p, Eterm *n_htop) Eterm val = *ptr; if (IS_MOVED_CONS(val)) { *nsp_i = ptr[1]; - } else if (in_area(ptr, src, src_size) || - in_area(ptr, oh, oh_size)) { + } else if (!erts_is_literal(gval, ptr)) { ASSERT(within(ptr, p)); MOVE_CONS(ptr, val, n_htop, nsp_i); } @@ -139,11 +128,13 @@ void gensweep_nstack(Process *p, Eterm **ptr_old_htop, Eterm **ptr_n_htop) unsigned int mask; /* arch-specific nstack walk state */ struct nstack_walk_state walk_state; + char *oh; + Uint oh_size; /* gensweep-specific state */ Eterm *old_htop, *n_htop; - char *heap; - Uint heap_size, mature_size; + char *mature; + Uint mature_size; if (!p->hipe.nstack) { ASSERT(!p->hipe.nsp && !p->hipe.nstend); @@ -168,9 +159,10 @@ void gensweep_nstack(Process *p, Eterm **ptr_old_htop, Eterm **ptr_n_htop) old_htop = *ptr_old_htop; n_htop = *ptr_n_htop; - heap = (char*)HEAP_START(p); - heap_size = (char*)HEAP_TOP(p) - heap; - mature_size = (char*)HIGH_WATER(p) - heap; + mature = (char *) (p->abandoned_heap ? p->abandoned_heap : p->heap); + mature_size = (char*)HIGH_WATER(p) - mature; + oh = (char*)OLD_HEAP(p); + oh_size = (char*)OLD_HTOP(p) - oh; for (;;) { if (nstack_walk_nsp_reached_end(nsp, nsp_end)) { @@ -209,9 +201,9 @@ void gensweep_nstack(Process *p, Eterm **ptr_old_htop, Eterm **ptr_n_htop) if (IS_MOVED_BOXED(val)) { ASSERT(is_boxed(val)); *nsp_i = val; - } else if (in_area(ptr, heap, mature_size)) { + } else if (ErtsInArea(ptr, mature, mature_size)) { MOVE_BOXED(ptr, val, old_htop, nsp_i); - } else if (in_area(ptr, heap, heap_size)) { + } else if (ErtsInYoungGen(gval, ptr, oh, oh_size)) { ASSERT(within(ptr, p)); MOVE_BOXED(ptr, val, n_htop, nsp_i); } @@ -220,9 +212,9 @@ void gensweep_nstack(Process *p, Eterm **ptr_old_htop, Eterm **ptr_n_htop) Eterm val = *ptr; if (IS_MOVED_CONS(val)) { *nsp_i = ptr[1]; - } else if (in_area(ptr, heap, mature_size)) { + } else if (ErtsInArea(ptr, mature, mature_size)) { MOVE_CONS(ptr, val, old_htop, nsp_i); - } else if (in_area(ptr, heap, heap_size)) { + } else if (ErtsInYoungGen(gval, ptr, oh, oh_size)) { ASSERT(within(ptr, p)); MOVE_CONS(ptr, val, n_htop, nsp_i); } diff --git a/erts/emulator/hipe/hipe_native_bif.c b/erts/emulator/hipe/hipe_native_bif.c index 98bda43f0e..ceae3497c5 100644 --- a/erts/emulator/hipe/hipe_native_bif.c +++ b/erts/emulator/hipe/hipe_native_bif.c @@ -160,13 +160,22 @@ BIF_RETTYPE hipe_set_timeout(BIF_ALIST_1) */ void hipe_select_msg(Process *p) { - ErlMessage *msgp; + ErtsMessage *msgp; msgp = PEEK_MESSAGE(p); UNLINK_MESSAGE(p, msgp); /* decrements global 'erts_proc_tot_mem' variable */ JOIN_MESSAGE(p); CANCEL_TIMER(p); /* calls erts_cancel_proc_timer() */ - free_message(msgp); + erts_save_message_in_proc(p, msgp); + p->flags &= ~F_DISABLE_GC; + if (ERTS_IS_GC_DESIRED(p)) { + /* + * We want to GC soon but we leave a few + * reductions giving the message some time + * to turn into garbage. + */ + ERTS_VBUMP_LEAVE_REDS(p, 5); + } } void hipe_fclearerror_error(Process *p) @@ -511,8 +520,9 @@ int hipe_bs_validate_unicode_retract(ErlBinMatchBuffer* mb, Eterm arg) */ Eterm hipe_check_get_msg(Process *c_p) { - Eterm ret; - ErlMessage *msgp; + ErtsMessage *msgp; + + c_p->flags |= F_DISABLE_GC; next_message: @@ -534,25 +544,29 @@ Eterm hipe_check_get_msg(Process *c_p) /* XXX: BEAM doesn't need this */ c_p->hipe_smp.have_receive_locks = 1; #endif + c_p->flags &= ~F_DISABLE_GC; return THE_NON_VALUE; #ifdef ERTS_SMP } #endif } - ErtsMoveMsgAttachmentIntoProc(msgp, c_p, c_p->stop, HEAP_TOP(c_p), - c_p->fcalls, (void) 0, (void) 0); - ret = ERL_MESSAGE_TERM(msgp); - if (is_non_value(ret)) { + + if (is_non_value(ERL_MESSAGE_TERM(msgp)) + && !erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) { /* * A corrupt distribution message that we weren't able to decode; * remove it... */ ASSERT(!msgp->data.attached); UNLINK_MESSAGE(c_p, msgp); - free_message(msgp); + msgp->next = NULL; + erts_cleanup_messages(msgp); goto next_message; } - return ret; + + ASSERT(is_value(ERL_MESSAGE_TERM(msgp))); + + return ERL_MESSAGE_TERM(msgp); } /* diff --git a/erts/etc/common/erlexec.c b/erts/etc/common/erlexec.c index cde0b25a2a..461957be10 100644 --- a/erts/etc/common/erlexec.c +++ b/erts/etc/common/erlexec.c @@ -155,6 +155,12 @@ static char *plusr_val_switches[] = { NULL }; +/* +x arguments with values */ +static char *plusx_val_switches[] = { + "ohmq", + NULL +}; + /* +z arguments with values */ static char *plusz_val_switches[] = { "dbbl", @@ -975,6 +981,20 @@ int main(int argc, char **argv) add_Eargs(argv[i+1]); i++; break; + case 'x': + if (!is_one_of_strings(&argv[i][2], plusx_val_switches)) { + goto the_default; + } else { + if (i+1 >= argc + || argv[i+1][0] == '-' + || argv[i+1][0] == '+') + usage(argv[i]); + argv[i][0] = '-'; + add_Eargs(argv[i]); + add_Eargs(argv[i+1]); + i++; + } + break; case 'z': if (!is_one_of_strings(&argv[i][2], plusz_val_switches)) { goto the_default; @@ -1175,7 +1195,7 @@ usage_aux(void) "[+S NO_SCHEDULERS:NO_SCHEDULERS_ONLINE] " "[+SP PERCENTAGE_SCHEDULERS:PERCENTAGE_SCHEDULERS_ONLINE] " "[+T LEVEL] [+V] [+v] " - "[+W<i|w|e>] [+z MISC_OPTION] [args ...]\n"); + "[+W<i|w|e>] [+x DEFAULT_PROC_FLAGS] [+z MISC_OPTION] [args ...]\n"); exit(1); } diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam Binary files differindex 863a5e61ef..641fac2d26 100644 --- a/erts/preloaded/ebin/erlang.beam +++ b/erts/preloaded/ebin/erlang.beam diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 291356c7b1..e46d64eb0a 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -2044,6 +2044,9 @@ open_port(_PortName,_PortSettings) -> (min_bin_vheap_size, MinBinVHeapSize) -> OldMinBinVHeapSize when MinBinVHeapSize :: non_neg_integer(), OldMinBinVHeapSize :: non_neg_integer(); + (off_heap_message_queue, OHMQ) -> OldOHMQ when + OHMQ :: boolean(), + OldOHMQ :: boolean(); (priority, Level) -> OldLevel when Level :: priority_level(), OldLevel :: priority_level(); @@ -2082,6 +2085,7 @@ process_flag(_Flag, _Value) -> min_bin_vheap_size | monitored_by | monitors | + off_heap_message_queue | priority | reductions | registered_name | @@ -2123,6 +2127,7 @@ process_flag(_Flag, _Value) -> {monitors, Monitors :: [{process, Pid :: pid() | {RegName :: atom(), Node :: node()}}]} | + {off_heap_message_queue, OHMQ :: boolean()} | {priority, Level :: priority_level()} | {reductions, Number :: non_neg_integer()} | {registered_name, Atom :: atom()} | @@ -2425,6 +2430,7 @@ tuple_to_list(_Tuple) -> (multi_scheduling) -> disabled | blocked | enabled; (multi_scheduling_blockers) -> [PID :: pid()]; (nif_version) -> string(); + (off_heap_message_queue) -> boolean(); (otp_release) -> string(); (os_monotonic_time_source) -> [{atom(),term()}]; (os_system_time_source) -> [{atom(),term()}]; @@ -2552,14 +2558,19 @@ spawn_monitor(M, F, A) when erlang:is_atom(M), spawn_monitor(M, F, A) -> erlang:error(badarg, [M,F,A]). + +-type spawn_opt_option() :: + link + | monitor + | {priority, Level :: priority_level()} + | {fullsweep_after, Number :: non_neg_integer()} + | {min_heap_size, Size :: non_neg_integer()} + | {min_bin_vheap_size, VSize :: non_neg_integer()} + | {off_heap_message_queue, OHMQ :: boolean()}. + -spec spawn_opt(Fun, Options) -> pid() | {pid(), reference()} when Fun :: function(), - Options :: [Option], - Option :: link | monitor - | {priority, Level :: priority_level()} - | {fullsweep_after, Number :: non_neg_integer()} - | {min_heap_size, Size :: non_neg_integer()} - | {min_bin_vheap_size, VSize :: non_neg_integer()}. + Options :: [spawn_opt_option()]. spawn_opt(F, O) when erlang:is_function(F) -> spawn_opt(erlang, apply, [F, []], O); spawn_opt({M,F}=MF, O) when erlang:is_atom(M), erlang:is_atom(F) -> @@ -2572,12 +2583,7 @@ spawn_opt(F, O) -> -spec spawn_opt(Node, Fun, Options) -> pid() | {pid(), reference()} when Node :: node(), Fun :: function(), - Options :: [Option], - Option :: link | monitor - | {priority, Level :: priority_level()} - | {fullsweep_after, Number :: non_neg_integer()} - | {min_heap_size, Size :: non_neg_integer()} - | {min_bin_vheap_size, VSize :: non_neg_integer()}. + Options :: [spawn_opt_option()]. spawn_opt(N, F, O) when N =:= erlang:node() -> spawn_opt(F, O); spawn_opt(N, F, O) when erlang:is_function(F) -> @@ -2664,12 +2670,7 @@ spawn_link(N,M,F,A) -> Module :: module(), Function :: atom(), Args :: [term()], - Options :: [Option], - Option :: link | monitor - | {priority, Level :: priority_level()} - | {fullsweep_after, Number :: non_neg_integer()} - | {min_heap_size, Size :: non_neg_integer()} - | {min_bin_vheap_size, VSize :: non_neg_integer()}. + Options :: [spawn_opt_option()]. spawn_opt(M, F, A, Opts) -> case catch erlang:spawn_opt({M,F,A,Opts}) of {'EXIT',{Reason,_}} -> @@ -2684,12 +2685,7 @@ spawn_opt(M, F, A, Opts) -> Module :: module(), Function :: atom(), Args :: [term()], - Options :: [Option], - Option :: link | monitor - | {priority, Level :: priority_level()} - | {fullsweep_after, Number :: non_neg_integer()} - | {min_heap_size, Size :: non_neg_integer()} - | {min_bin_vheap_size, VSize :: non_neg_integer()}. + Options :: [spawn_opt_option()]. spawn_opt(N, M, F, A, O) when N =:= erlang:node(), erlang:is_atom(M), erlang:is_atom(F), erlang:is_list(A), erlang:is_list(O) -> |