aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/doc/src/erl.xml15
-rw-r--r--erts/doc/src/erlang.xml77
-rw-r--r--erts/emulator/beam/atom.names1
-rw-r--r--erts/emulator/beam/beam_bif_load.c216
-rw-r--r--erts/emulator/beam/beam_emu.c142
-rw-r--r--erts/emulator/beam/beam_load.c6
-rw-r--r--erts/emulator/beam/bif.c39
-rw-r--r--erts/emulator/beam/bif.h48
-rw-r--r--erts/emulator/beam/break.c4
-rw-r--r--erts/emulator/beam/copy.c4
-rw-r--r--erts/emulator/beam/dist.c34
-rw-r--r--erts/emulator/beam/erl_alloc.c21
-rw-r--r--erts/emulator/beam/erl_alloc.h4
-rw-r--r--erts/emulator/beam/erl_alloc.types2
-rw-r--r--erts/emulator/beam/erl_bif_ddll.c17
-rw-r--r--erts/emulator/beam/erl_bif_info.c270
-rw-r--r--erts/emulator/beam/erl_debug.c6
-rw-r--r--erts/emulator/beam/erl_gc.c688
-rw-r--r--erts/emulator/beam/erl_gc.h27
-rw-r--r--erts/emulator/beam/erl_hl_timer.c85
-rw-r--r--erts/emulator/beam/erl_init.c26
-rw-r--r--erts/emulator/beam/erl_message.c1509
-rw-r--r--erts/emulator/beam/erl_message.h277
-rw-r--r--erts/emulator/beam/erl_nif.c7
-rw-r--r--erts/emulator/beam/erl_node_tables.c68
-rw-r--r--erts/emulator/beam/erl_process.c346
-rw-r--r--erts/emulator/beam/erl_process.h106
-rw-r--r--erts/emulator/beam/erl_process_dump.c13
-rw-r--r--erts/emulator/beam/erl_process_lock.h6
-rw-r--r--erts/emulator/beam/erl_time_sup.c7
-rw-r--r--erts/emulator/beam/erl_trace.c68
-rw-r--r--erts/emulator/beam/global.h123
-rw-r--r--erts/emulator/beam/io.c113
-rw-r--r--erts/emulator/beam/sys.h2
-rw-r--r--erts/emulator/beam/utils.c17
-rw-r--r--erts/emulator/hipe/hipe_gc.c36
-rw-r--r--erts/emulator/hipe/hipe_native_bif.c34
-rw-r--r--erts/etc/common/erlexec.c22
-rw-r--r--erts/preloaded/ebin/erlang.beambin101840 -> 101544 bytes
-rw-r--r--erts/preloaded/src/erlang.erl44
40 files changed, 2549 insertions, 1981 deletions
diff --git a/erts/doc/src/erl.xml b/erts/doc/src/erl.xml
index b0322b7d43..8c1be4dff5 100644
--- a/erts/doc/src/erl.xml
+++ b/erts/doc/src/erl.xml
@@ -1335,6 +1335,21 @@
<seealso marker="kernel:error_logger#warning_map/0">error_logger(3)</seealso>
for further information.</p>
</item>
+ <tag><c><![CDATA[+xFlag Value]]></c></tag>
+ <item>
+ <p>Default process flag settings.</p>
+ <taglist>
+ <tag><marker id="+xohmq"><c>+xohmq true|false</c></marker></tag>
+ <item><p>
+ Sets the default value for the process flag
+ <c>off_heap_message_queue</c>. If <c>+xohmq</c> is not
+ passed, <c>false</c> will be the default. For more information,
+ see the documentation of
+ <seealso marker="erlang#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue,
+ OHMQ)</c></seealso>.
+ </p></item>
+ </taglist>
+ </item>
<tag><c><![CDATA[+zFlag Value]]></c></tag>
<item>
<p>Miscellaneous flags.</p>
diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml
index e77532463e..9426d30390 100644
--- a/erts/doc/src/erlang.xml
+++ b/erts/doc/src/erlang.xml
@@ -4058,8 +4058,46 @@ os_prompt% </pre>
process.</p>
<p>Returns the old value of the flag.</p> </desc>
</func>
+ <marker id="process_flag_off_heap_message_queue"/>
<func>
<name name="process_flag" arity="2" clause_i="5"/>
+ <fsummary>Set process flag <c>off_heap_message_queue</c> for the calling process</fsummary>
+ <desc>
+ <p>This flag determines how messages in the message queue
+ are stored. When the flag is:</p>
+ <taglist>
+ <tag><c>true</c></tag>
+ <item><p>
+ <em>All</em> messages in the message queue will be stored
+ outside of the process heap. This implies that <em>no</em>
+ messages in the message queue will be part of a garbage
+ collection of the process.
+ </p></item>
+ <tag><c>false</c></tag>
+ <item><p>
+ Messages may be placed either on the heap or outside
+ of the heap.
+ </p></item>
+ </taglist>
+ <p>
+ If the process potentially may get a hugh amount of messages,
+ you are recommended to set the flag to <c>true</c>. This since
+ a garbage collection with lots of messages placed on the heap
+ may become extremly expensive. Performance of the actual
+ message passing is however generally better when setting the
+ flag to <c>false</c>.
+ </p>
+ <p>
+ When changing this flag from <c>false</c> to <c>true</c>,
+ all messages in the message queue are moved off heap. This
+ work has been initiated but not completed when this function
+ call returns.
+ </p>
+ <p>Returns the old value of the flag.</p>
+ </desc>
+ </func>
+ <func>
+ <name name="process_flag" arity="2" clause_i="6"/>
<type name="priority_level"/>
<fsummary>Set process flag priority for the calling process</fsummary>
<desc>
@@ -4138,7 +4176,7 @@ os_prompt% </pre>
</desc>
</func>
<func>
- <name name="process_flag" arity="2" clause_i="6"/>
+ <name name="process_flag" arity="2" clause_i="7"/>
<fsummary>Set process flag save_calls for the calling process</fsummary>
<desc>
<p><c><anno>N</anno></c> must be an integer in the interval 0..10000.
@@ -4162,7 +4200,7 @@ os_prompt% </pre>
</desc>
</func>
<func>
- <name name="process_flag" arity="2" clause_i="7"/>
+ <name name="process_flag" arity="2" clause_i="8"/>
<fsummary>Set process flag sensitive for the calling process</fsummary>
<desc>
<p>Set or clear the <c>sensitive</c> flag for the current process.
@@ -4408,6 +4446,14 @@ os_prompt% </pre>
monitor by name, the list item is
<c>{process, {<anno>RegName</anno>, <anno>Node</anno>}}</c>.</p>
</item>
+ <tag><c>{off_heap_message_queue, <anno>OHMQ</anno>}</c></tag>
+ <item>
+ <p>Returns the current state of the <c>off_heap_message_queue</c>
+ process flag. <c><anno>OHMQ</anno></c> is either <c>true</c>, or
+ <c>false</c>. For more information, see the documentation of
+ <seealso marker="#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue,
+ OHMQ)</c></seealso>.</p>
+ </item>
<tag><c>{priority, Level}</c></tag>
<item>
<p><c><anno>Level</anno></c> is the current priority level for
@@ -5067,6 +5113,7 @@ true</pre>
<func>
<name name="spawn_opt" arity="2"/>
<type name="priority_level" />
+ <type name="spawn_opt_option" />
<fsummary>Create a new process with a fun as entry point</fsummary>
<desc>
<p>Returns the pid of a new process started by the application
@@ -5081,6 +5128,7 @@ true</pre>
<func>
<name name="spawn_opt" arity="3"/>
<type name="priority_level" />
+ <type name="spawn_opt_option" />
<fsummary>Create a new process with a fun as entry point on a given node</fsummary>
<desc>
<p>Returns the pid of a new process started by the application
@@ -5093,6 +5141,7 @@ true</pre>
<func>
<name name="spawn_opt" arity="4"/>
<type name="priority_level" />
+ <type name="spawn_opt_option" />
<fsummary>Create a new process with a function as entry point</fsummary>
<desc>
<p>Works exactly like
@@ -5188,6 +5237,18 @@ true</pre>
fine-tuning an application and to measure the execution
time with various <c><anno>VSize</anno></c> values.</p>
</item>
+ <tag><c>{off_heap_message_queue, <anno>OHMQ</anno>}</c></tag>
+ <item>
+ <p>Sets the state of the <c>off_heap_message_queue</c> process
+ flag. <c><anno>OHMQ</anno></c> should be either <c>true</c>, or
+ <c>false</c>. The default <c>off_heap_message_queue</c> process
+ flag is determined by the
+ <seealso marker="erl#+xohmq"><c>+xohmq</c></seealso> <c>erl</c>
+ command line argument. For more information, see the
+ documentation of
+ <seealso marker="#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue,
+ <anno>OHMQ</anno>)</c></seealso>.</p>
+ </item>
</taglist>
</desc>
@@ -5195,6 +5256,7 @@ true</pre>
<func>
<name name="spawn_opt" arity="5"/>
<type name="priority_level" />
+ <type name="spawn_opt_option" />
<fsummary>Create a new process with a function as entry point on a given node</fsummary>
<desc>
<p>Returns the pid of a new process started by the application
@@ -6224,6 +6286,7 @@ ok
<name name="system_info" arity="1" clause_i="65"/>
<name name="system_info" arity="1" clause_i="66"/>
<name name="system_info" arity="1" clause_i="67"/>
+ <name name="system_info" arity="1" clause_i="68"/>
<fsummary>Information about the system</fsummary>
<desc>
<p>Returns various information about the current system
@@ -6614,6 +6677,16 @@ ok
<p>Returns a string containing the erlang NIF version
used by the runtime system. It will be on the form "&lt;major ver&gt;.&lt;minor ver&gt;".</p>
</item>
+ <tag><marker id="system_info_off_heap_message_queue"><c>off_heap_message_queue</c></marker></tag>
+ <item>
+ <p>Returns the default value of the <c>off_heap_message_queue</c>
+ process flag which is either <c>true</c> or <c>false</c>. This
+ default is set by the <c>erl</c> command line argument
+ <seealso marker="erl#+xohmq"><c>+xohmq</c></seealso>. For more information on the
+ <c>off_heap_message_queue</c> process flag, see documentation of
+ <seealso marker="#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue,
+ OHMQ)</c></seealso>.</p>
+ </item>
<tag><marker id="system_info_otp_release"><c>otp_release</c></marker></tag>
<item>
<p>Returns a string containing the OTP release number of the
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names
index 6328b3d18f..f142cf1142 100644
--- a/erts/emulator/beam/atom.names
+++ b/erts/emulator/beam/atom.names
@@ -423,6 +423,7 @@ atom notify
atom notsup
atom nouse_stdio
atom objects
+atom off_heap_message_queue
atom offset
atom ok
atom old_heap_block_size
diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c
index 2c275c4649..22b4e26c77 100644
--- a/erts/emulator/beam/beam_bif_load.c
+++ b/erts/emulator/beam/beam_bif_load.c
@@ -723,29 +723,50 @@ set_default_trace_pattern(Eterm module)
}
}
+static ERTS_INLINE int
+check_mod_funs(Process *p, ErlOffHeap *off_heap, char *area, size_t area_size)
+{
+ struct erl_off_heap_header* oh;
+ for (oh = off_heap->first; oh; oh = oh->next) {
+ if (thing_subtag(oh->thing_word) == FUN_SUBTAG) {
+ ErlFunThing* funp = (ErlFunThing*) oh;
+ if (ErtsInArea(funp->fe->address, area, area_size))
+ return !0;
+ }
+ }
+ return 0;
+}
+
+
static Eterm
check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp)
{
BeamInstr* start;
char* literals;
Uint lit_bsize;
- BeamInstr* end;
+ char* mod_start;
+ Uint mod_size;
Eterm* sp;
- struct erl_off_heap_header* oh;
int done_gc = 0;
+ int need_gc = 0;
+ ErtsMessage *msgp;
+ ErlHeapFragment *hfrag;
-#define INSIDE(a) (start <= (a) && (a) < end)
+#define ERTS_ORDINARY_GC__ (1 << 0)
+#define ERTS_LITERAL_GC__ (1 << 1)
/*
* Pick up limits for the module.
*/
start = (BeamInstr*) modp->old.code_hdr;
- end = (BeamInstr *)((char *)start + modp->old.code_length);
+ mod_start = (char *) start;
+ mod_size = modp->old.code_length;
/*
* Check if current instruction or continuation pointer points into module.
*/
- if (INSIDE(rp->i) || INSIDE(rp->cp)) {
+ if (ErtsInArea(rp->i, mod_start, mod_size)
+ || ErtsInArea(rp->cp, mod_start, mod_size)) {
return am_true;
}
@@ -753,7 +774,7 @@ check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp)
* Check all continuation pointers stored on the stack.
*/
for (sp = rp->stop; sp < STACK_START(rp); sp++) {
- if (is_CP(*sp) && INSIDE(cp_val(*sp))) {
+ if (is_CP(*sp) && ErtsInArea(cp_val(*sp), mod_start, mod_size)) {
return am_true;
}
}
@@ -767,15 +788,15 @@ check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp)
struct StackTrace *s;
ASSERT(is_list(rp->ftrace));
s = (struct StackTrace *) big_val(CDR(list_val(rp->ftrace)));
- if ((s->pc && INSIDE(s->pc)) ||
- (s->current && INSIDE(s->current))) {
+ if ((s->pc && ErtsInArea(s->pc, mod_start, mod_size)) ||
+ (s->current && ErtsInArea(s->current, mod_start, mod_size))) {
rp->freason = EXC_NULL;
rp->fvalue = NIL;
rp->ftrace = NIL;
} else {
int i;
for (i = 0; i < s->depth; i++) {
- if (INSIDE(s->trace[i])) {
+ if (ErtsInArea(s->trace[i], mod_start, mod_size)) {
rp->freason = EXC_NULL;
rp->fvalue = NIL;
rp->ftrace = NIL;
@@ -796,108 +817,141 @@ check_process_code(Process* rp, Module* modp, int allow_gc, int *redsp)
}
/*
- * See if there are funs that refer to the old version of the module.
+ * Message queue can contains funs, but (at least currently) no
+ * constants. If we got references to this module from the message
+ * queue, a GC cannot remove these...
*/
- rescan:
- for (oh = MSO(rp).first; oh; oh = oh->next) {
- if (thing_subtag(oh->thing_word) == FUN_SUBTAG) {
- ErlFunThing* funp = (ErlFunThing*) oh;
+ erts_smp_proc_lock(rp, ERTS_PROC_LOCK_MSGQ);
+ ERTS_SMP_MSGQ_MV_INQ2PRIVQ(rp);
+ erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MSGQ);
- if (INSIDE((BeamInstr *) funp->fe->address)) {
- if (done_gc) {
- return am_true;
- } else {
- if (!allow_gc)
- return am_aborted;
- /*
- * Try to get rid of this fun by garbage collecting.
- * Clear both fvalue and ftrace to make sure they
- * don't hold any funs.
- */
- rp->freason = EXC_NULL;
- rp->fvalue = NIL;
- rp->ftrace = NIL;
- done_gc = 1;
- FLAGS(rp) |= F_NEED_FULLSWEEP;
- *redsp += erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity);
- goto rescan;
- }
- }
+ for (msgp = rp->msg.first; msgp; msgp = msgp->next) {
+ if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG)
+ hfrag = &msgp->hfrag;
+ else if (is_value(ERL_MESSAGE_TERM(msgp)) && msgp->data.heap_frag)
+ hfrag = msgp->data.heap_frag;
+ else
+ continue;
+ for (; hfrag; hfrag = hfrag->next) {
+ if (check_mod_funs(rp, &hfrag->off_heap, mod_start, mod_size))
+ return am_true;
+ /* Should not contain any constants... */
+ ASSERT(!any_heap_ref_ptrs(&hfrag->mem[0],
+ &hfrag->mem[hfrag->used_size],
+ mod_start,
+ mod_size));
}
}
- /*
- * See if there are constants inside the module referenced by the process.
- */
- done_gc = 0;
literals = (char*) modp->old.code_hdr->literals_start;
lit_bsize = (char*) modp->old.code_hdr->literals_end - literals;
- for (;;) {
- ErlMessage* mp;
+ while (1) {
+
+ /* Check heap, stack etc... */
+ if (check_mod_funs(rp, &rp->off_heap, mod_start, mod_size))
+ goto try_gc;
if (any_heap_ref_ptrs(&rp->fvalue, &rp->fvalue+1, literals, lit_bsize)) {
rp->freason = EXC_NULL;
rp->fvalue = NIL;
rp->ftrace = NIL;
}
- if (any_heap_ref_ptrs(rp->stop, rp->hend, literals, lit_bsize)) {
- goto need_gc;
- }
- if (any_heap_refs(rp->heap, rp->htop, literals, lit_bsize)) {
- goto need_gc;
- }
+ if (any_heap_ref_ptrs(rp->stop, rp->hend, literals, lit_bsize))
+ goto try_literal_gc;
+ if (any_heap_refs(rp->heap, rp->htop, literals, lit_bsize))
+ goto try_literal_gc;
+ if (any_heap_refs(rp->old_heap, rp->old_htop, literals, lit_bsize))
+ goto try_literal_gc;
+
+ /* Check dictionary */
+ if (rp->dictionary) {
+ Eterm* start = rp->dictionary->data;
+ Eterm* end = start + rp->dictionary->used;
- if (any_heap_refs(rp->old_heap, rp->old_htop, literals, lit_bsize)) {
- goto need_gc;
+ if (any_heap_ref_ptrs(start, end, literals, lit_bsize))
+ goto try_literal_gc;
}
- if (rp->dictionary != NULL) {
- Eterm* start = rp->dictionary->data;
- Eterm* end = start + rp->dictionary->used;
+ /* Check heap fragments */
+ for (hfrag = rp->mbuf; hfrag; hfrag = hfrag->next) {
+ Eterm *hp, *hp_end;
+ /* Off heap lists should already have been moved into process */
+ ASSERT(!check_mod_funs(rp, &hfrag->off_heap, mod_start, mod_size));
- if (any_heap_ref_ptrs(start, end, literals, lit_bsize)) {
- goto need_gc;
- }
+ hp = &hfrag->mem[0];
+ hp_end = &hfrag->mem[hfrag->used_size];
+ if (any_heap_ref_ptrs(hp, hp_end, mod_start, lit_bsize))
+ goto try_literal_gc;
}
- for (mp = rp->msg.first; mp != NULL; mp = mp->next) {
- if (any_heap_ref_ptrs(mp->m, mp->m+2, literals, lit_bsize)) {
- goto need_gc;
+#ifdef DEBUG
+ /*
+ * Message buffer fragments should not have any references
+ * to constants, and off heap lists should already have
+ * been moved into process off heap structure.
+ */
+ for (msgp = rp->msg_frag; msgp; msgp = msgp->next) {
+ if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG)
+ hfrag = &msgp->hfrag;
+ else
+ hfrag = msgp->data.heap_frag;
+ for (; hfrag; hfrag = hfrag->next) {
+ Eterm *hp, *hp_end;
+ ASSERT(!check_mod_funs(rp, &hfrag->off_heap, mod_start, mod_size));
+
+ hp = &hfrag->mem[0];
+ hp_end = &hfrag->mem[hfrag->used_size];
+ ASSERT(!any_heap_ref_ptrs(hp, hp_end, mod_start, lit_bsize));
}
}
- break;
- need_gc:
- if (done_gc) {
+#endif
+
+ return am_false;
+
+ try_literal_gc:
+ need_gc |= ERTS_LITERAL_GC__;
+
+ try_gc:
+ need_gc |= ERTS_ORDINARY_GC__;
+
+ if ((done_gc & need_gc) == need_gc)
return am_true;
- } else {
- struct erl_off_heap_header* oh;
- if (!allow_gc)
- return am_aborted;
+ if (!allow_gc)
+ return am_aborted;
- /*
- * Try to get rid of constants by by garbage collecting.
- * Clear both fvalue and ftrace.
- */
- rp->freason = EXC_NULL;
- rp->fvalue = NIL;
- rp->ftrace = NIL;
- done_gc = 1;
+ need_gc &= ~done_gc;
+
+ /*
+ * Try to get rid of constants by by garbage collecting.
+ * Clear both fvalue and ftrace.
+ */
+
+ rp->freason = EXC_NULL;
+ rp->fvalue = NIL;
+ rp->ftrace = NIL;
+
+ if (need_gc & ERTS_ORDINARY_GC__) {
FLAGS(rp) |= F_NEED_FULLSWEEP;
*redsp += erts_garbage_collect(rp, 0, rp->arg_reg, rp->arity);
+ done_gc |= ERTS_ORDINARY_GC__;
+ }
+ if (need_gc & ERTS_LITERAL_GC__) {
+ struct erl_off_heap_header* oh;
oh = modp->old.code_hdr->literals_off_heap;
*redsp += lit_bsize / 64; /* Need, better value... */
erts_garbage_collect_literals(rp, (Eterm*)literals, lit_bsize, oh);
+ done_gc |= ERTS_LITERAL_GC__;
}
+ need_gc = 0;
}
- return am_false;
-#undef INSIDE
-}
-#define in_area(ptr,start,nbytes) \
- ((UWord)((char*)(ptr) - (char*)(start)) < (nbytes))
+#undef ERTS_ORDINARY_GC__
+#undef ERTS_LITERAL_GC__
+
+}
static int
any_heap_ref_ptrs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size)
@@ -910,7 +964,7 @@ any_heap_ref_ptrs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size)
switch (primary_tag(val)) {
case TAG_PRIMARY_BOXED:
case TAG_PRIMARY_LIST:
- if (in_area(val, mod_start, mod_size)) {
+ if (ErtsInArea(val, mod_start, mod_size)) {
return 1;
}
break;
@@ -930,7 +984,7 @@ any_heap_refs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size)
switch (primary_tag(val)) {
case TAG_PRIMARY_BOXED:
case TAG_PRIMARY_LIST:
- if (in_area(val, mod_start, mod_size)) {
+ if (ErtsInArea(val, mod_start, mod_size)) {
return 1;
}
break;
@@ -940,7 +994,7 @@ any_heap_refs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size)
if (header_is_bin_matchstate(val)) {
ErlBinMatchState *ms = (ErlBinMatchState*) p;
ErlBinMatchBuffer *mb = &(ms->mb);
- if (in_area(mb->orig, mod_start, mod_size)) {
+ if (ErtsInArea(mb->orig, mod_start, mod_size)) {
return 1;
}
}
@@ -953,8 +1007,6 @@ any_heap_refs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size)
return 0;
}
-#undef in_area
-
BIF_RETTYPE purge_module_1(BIF_ALIST_1)
{
ErtsCodeIndex code_ix;
diff --git a/erts/emulator/beam/beam_emu.c b/erts/emulator/beam/beam_emu.c
index 4d19f52a52..1dd56ff989 100644
--- a/erts/emulator/beam/beam_emu.c
+++ b/erts/emulator/beam/beam_emu.c
@@ -1654,10 +1654,6 @@ void process_main(void)
ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
ERTS_SMP_REQ_PROC_MAIN_LOCK(c_p);
PROCESS_MAIN_CHK_LOCKS(c_p);
- if (c_p->mbuf || MSO(c_p).overhead >= BIN_VHEAP_SZ(c_p)) {
- result = erts_gc_after_bif_call(c_p, result, reg, 2);
- E = c_p->stop;
- }
HTOP = HEAP_TOP(c_p);
FCALLS = c_p->fcalls;
if (is_value(result)) {
@@ -1745,8 +1741,7 @@ void process_main(void)
SWAPIN;
}
/* only x(2) is included in the rootset here */
- if (E - HTOP < 3 || c_p->mbuf) { /* Force GC in case add_stacktrace()
- * created heap fragments */
+ if (E - HTOP < 3) {
SWAPOUT;
PROCESS_MAIN_CHK_LOCKS(c_p);
FCALLS -= erts_garbage_collect(c_p, 3, reg+2, 1);
@@ -1833,10 +1828,17 @@ void process_main(void)
OpCase(i_loop_rec_f):
{
BeamInstr *next;
- ErlMessage* msgp;
+ ErtsMessage* msgp;
- loop_rec__:
+ /*
+ * We need to disable GC while matching messages
+ * in the queue. This since messages with data outside
+ * the heap will be corrupted by a GC.
+ */
+ ASSERT(!(c_p->flags & F_DISABLE_GC));
+ c_p->flags |= F_DISABLE_GC;
+ loop_rec__:
PROCESS_MAIN_CHK_LOCKS(c_p);
msgp = PEEK_MESSAGE(c_p);
@@ -1848,6 +1850,7 @@ void process_main(void)
if (ERTS_PROC_PENDING_EXIT(c_p)) {
erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
SWAPOUT;
+ c_p->flags &= ~F_DISABLE_GC;
goto do_schedule; /* Will be rescheduled for exit */
}
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p);
@@ -1857,30 +1860,27 @@ void process_main(void)
else
#endif
{
+ c_p->flags &= ~F_DISABLE_GC;
SET_I((BeamInstr *) Arg(0));
Goto(*I); /* Jump to a wait or wait_timeout instruction */
}
}
- ErtsMoveMsgAttachmentIntoProc(msgp, c_p, E, HTOP, FCALLS,
- {
- SWAPOUT;
- PROCESS_MAIN_CHK_LOCKS(c_p);
- },
- {
- ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
- PROCESS_MAIN_CHK_LOCKS(c_p);
- SWAPIN;
- });
if (is_non_value(ERL_MESSAGE_TERM(msgp))) {
- /*
- * A corrupt distribution message that we weren't able to decode;
- * remove it...
- */
- ASSERT(!msgp->data.attached);
- /* TODO: Add DTrace probe for this bad message situation? */
- UNLINK_MESSAGE(c_p, msgp);
- free_message(msgp);
- goto loop_rec__;
+ SWAPOUT; /* erts_decode_dist_message() may write to heap... */
+ if (!erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) {
+ /*
+ * A corrupt distribution message that we weren't able to decode;
+ * remove it...
+ */
+ /* No swapin should be needed */
+ ASSERT(HTOP == c_p->htop && E == c_p->stop);
+ /* TODO: Add DTrace probe for this bad message situation? */
+ UNLINK_MESSAGE(c_p, msgp);
+ msgp->next = NULL;
+ erts_cleanup_messages(msgp);
+ goto loop_rec__;
+ }
+ SWAPIN;
}
PreFetch(1, next);
r(0) = ERL_MESSAGE_TERM(msgp);
@@ -1892,8 +1892,7 @@ void process_main(void)
*/
OpCase(remove_message): {
BeamInstr *next;
- ErlMessage* msgp;
-
+ ErtsMessage* msgp;
PROCESS_MAIN_CHK_LOCKS(c_p);
PreFetch(0, next);
@@ -1988,11 +1987,21 @@ void process_main(void)
UNLINK_MESSAGE(c_p, msgp);
JOIN_MESSAGE(c_p);
CANCEL_TIMER(c_p);
- free_message(msgp);
+
+ erts_save_message_in_proc(c_p, msgp);
+ c_p->flags &= ~F_DISABLE_GC;
+
+ if (ERTS_IS_GC_DESIRED_INTERNAL(c_p, HTOP, E)) {
+ /*
+ * We want to GC soon but we leave a few
+ * reductions giving the message some time
+ * to turn into garbage.
+ */
+ ERTS_VBUMP_LEAVE_REDS_INTERNAL(c_p, 5, FCALLS);
+ }
ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
PROCESS_MAIN_CHK_LOCKS(c_p);
-
NextPF(0, next);
}
@@ -2001,9 +2010,22 @@ void process_main(void)
* message didn't match), then jump to the loop_rec instruction.
*/
OpCase(loop_rec_end_f): {
+
+ ASSERT(c_p->flags & F_DISABLE_GC);
+
SET_I((BeamInstr *) Arg(0));
SAVE_MESSAGE(c_p);
- goto loop_rec__;
+ if (FCALLS > 0 || FCALLS > neg_o_reds) {
+ FCALLS--;
+ goto loop_rec__;
+ }
+
+ c_p->flags &= ~F_DISABLE_GC;
+ c_p->i = I;
+ SWAPOUT;
+ c_p->arity = 0;
+ c_p->current = NULL;
+ goto do_schedule;
}
/*
* Prepare to wait for a message or a timeout, whichever occurs first.
@@ -2733,6 +2755,7 @@ do { \
Eterm (*bf)(Process*, Eterm*, BeamInstr*) = GET_BIF_ADDRESS(Arg(0));
Eterm result;
BeamInstr *next;
+ ErlHeapFragment *live_hf_end;
PRE_BIF_SWAPOUT(c_p);
c_p->fcalls = FCALLS - 1;
@@ -2742,17 +2765,18 @@ do { \
PreFetch(1, next);
ASSERT(!ERTS_PROC_IS_EXITING(c_p));
ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
+ live_hf_end = c_p->mbuf;
result = (*bf)(c_p, reg, I);
ASSERT(!ERTS_PROC_IS_EXITING(c_p) || is_non_value(result));
ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
ERTS_HOLE_CHECK(c_p);
ERTS_SMP_REQ_PROC_MAIN_LOCK(c_p);
- PROCESS_MAIN_CHK_LOCKS(c_p);
- if (c_p->mbuf || MSO(c_p).overhead >= BIN_VHEAP_SZ(c_p)) {
+ if (ERTS_IS_GC_DESIRED(c_p)) {
Uint arity = ((Export *)Arg(0))->code[2];
- result = erts_gc_after_bif_call(c_p, result, reg, arity);
+ result = erts_gc_after_bif_call_lhf(c_p, live_hf_end, result, reg, arity);
E = c_p->stop;
}
+ PROCESS_MAIN_CHK_LOCKS(c_p);
HTOP = HEAP_TOP(c_p);
FCALLS = c_p->fcalls;
if (is_value(result)) {
@@ -3414,9 +3438,6 @@ do { \
goto do_schedule;
} else {
ASSERT(!is_value(r(0)));
- if (c_p->mbuf) {
- erts_garbage_collect(c_p, 0, reg+1, 3);
- }
SWAPIN;
Goto(*I);
}
@@ -3440,6 +3461,7 @@ do { \
* I[3]: Function pointer to dirty NIF
*/
BifFunction vbf;
+ ErlHeapFragment *live_hf_end;
DTRACE_NIF_ENTRY(c_p, (Eterm)I[-3], (Eterm)I[-2], (Uint)I[-1]);
c_p->current = I-3; /* current and vbf set to please handle_error */
@@ -3455,6 +3477,7 @@ do { \
NifF* fp = vbf = (NifF*) I[1];
struct enif_environment_t env;
erts_pre_nif(&env, c_p, (struct erl_module_nif*)I[2]);
+ live_hf_end = c_p->mbuf;
nif_bif_result = (*fp)(&env, bif_nif_arity, reg);
if (env.exception_thrown)
nif_bif_result = THE_NON_VALUE;
@@ -3497,6 +3520,7 @@ do { \
{
Eterm (*bf)(Process*, Eterm*, BeamInstr*) = vbf;
ASSERT(!ERTS_PROC_IS_EXITING(c_p));
+ live_hf_end = c_p->mbuf;
nif_bif_result = (*bf)(c_p, reg, I);
ASSERT(!ERTS_PROC_IS_EXITING(c_p) ||
is_non_value(nif_bif_result));
@@ -3509,9 +3533,17 @@ do { \
apply_bif_or_nif_epilogue:
ERTS_SMP_REQ_PROC_MAIN_LOCK(c_p);
ERTS_HOLE_CHECK(c_p);
- if (c_p->mbuf) {
- nif_bif_result = erts_gc_after_bif_call(c_p, nif_bif_result,
- reg, bif_nif_arity);
+ /*
+ * We want to test with ERTS_IS_GC_DESIRED(c_p) in order
+ * to trigger gc due to binaries based on same conditions
+ * regardless of how the bif is called. This change will
+ * however be introduced in a separate commit in order to
+ * easier identify why the characteristics changed.
+ */
+ if (c_p->stop - c_p->htop < c_p->mbuf_sz) {
+ nif_bif_result = erts_gc_after_bif_call_lhf(c_p, live_hf_end,
+ nif_bif_result,
+ reg, bif_nif_arity);
}
SWAPIN; /* There might have been a garbage collection. */
FCALLS = c_p->fcalls;
@@ -6340,13 +6372,6 @@ new_map(Process* p, Eterm* reg, BeamInstr* I)
erts_factory_proc_init(&factory, p);
res = erts_hashmap_from_array(&factory, thp, n/2, 0);
erts_factory_close(&factory);
- if (p->mbuf) {
- Uint live = Arg(2);
- reg[live] = res;
- erts_garbage_collect(p, 0, reg, live+1);
- res = reg[live];
- E = p->stop;
- }
return res;
}
@@ -6412,13 +6437,6 @@ update_map_assoc(Process* p, Eterm* reg, Eterm map, BeamInstr* I)
hx = hashmap_make_hash(new_key);
res = erts_hashmap_insert(p, hx, new_key, val, res, 0);
- if (p->mbuf) {
- Uint live = Arg(3);
- reg[live] = res;
- erts_garbage_collect(p, 0, reg, live+1);
- res = reg[live];
- E = p->stop;
- }
new_p += 2;
}
@@ -6578,12 +6596,6 @@ update_map_assoc(Process* p, Eterm* reg, Eterm map, BeamInstr* I)
/* The expensive case, need to build a hashmap */
if (n > MAP_SMALL_MAP_LIMIT) {
res = erts_hashmap_from_ks_and_vs(p,flatmap_get_keys(mp),flatmap_get_values(mp),n);
- if (p->mbuf) {
- Uint live = Arg(3);
- reg[live] = res;
- erts_garbage_collect(p, 0, reg, live+1);
- res = reg[live];
- }
}
return res;
}
@@ -6639,14 +6651,6 @@ update_map_exact(Process* p, Eterm* reg, Eterm map, BeamInstr* I)
return res;
}
- if (p->mbuf) {
- Uint live = Arg(3);
- reg[live] = res;
- erts_garbage_collect(p, 0, reg, live+1);
- res = reg[live];
- E = p->stop;
- }
-
new_p += 2;
}
return res;
diff --git a/erts/emulator/beam/beam_load.c b/erts/emulator/beam/beam_load.c
index 7a1a563be2..5db971b6af 100644
--- a/erts/emulator/beam/beam_load.c
+++ b/erts/emulator/beam/beam_load.c
@@ -902,7 +902,7 @@ static ErlHeapFragment* new_literal_fragment(Uint size)
ErlHeapFragment* bp;
bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_PREPARED_CODE,
ERTS_HEAP_FRAG_SIZE(size));
- ERTS_INIT_HEAP_FRAG(bp, size);
+ ERTS_INIT_HEAP_FRAG(bp, size, size);
return bp;
}
@@ -1528,8 +1528,8 @@ read_literal_table(LoaderState* stp)
}
if (heap_size > 0) {
- erts_factory_message_init(&factory, NULL, NULL,
- new_literal_fragment(heap_size));
+ erts_factory_heap_frag_init(&factory,
+ new_literal_fragment(heap_size));
factory.alloc_type = ERTS_ALC_T_PREPARED_CODE;
val = erts_decode_ext(&factory, &p);
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index 5ec1840c7b..e4283ac945 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -72,7 +72,7 @@ BIF_RETTYPE spawn_3(BIF_ALIST_3)
ErlSpawnOpts so;
Eterm pid;
- so.flags = 0;
+ so.flags = erts_default_spo_flags;
pid = erl_create_process(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, &so);
if (is_non_value(pid)) {
BIF_ERROR(BIF_P, so.error_code);
@@ -589,7 +589,7 @@ erts_queue_monitor_message(Process *p,
Eterm reason_copy, ref_copy, item_copy;
Uint reason_size, ref_size, item_size, heap_size;
ErlOffHeap *ohp;
- ErlHeapFragment *bp;
+ ErtsMessage *msgp;
reason_size = IS_CONST(reason) ? 0 : size_object(reason);
item_size = IS_CONST(item) ? 0 : size_object(item);
@@ -597,11 +597,8 @@ erts_queue_monitor_message(Process *p,
heap_size = 6+reason_size+ref_size+item_size;
- hp = erts_alloc_message_heap(heap_size,
- &bp,
- &ohp,
- p,
- p_locksp);
+ msgp = erts_alloc_message_heap(p, p_locksp, heap_size,
+ &hp, &ohp);
reason_copy = (IS_CONST(reason)
? reason
@@ -612,7 +609,7 @@ erts_queue_monitor_message(Process *p,
ref_copy = copy_struct(ref, ref_size, &hp, ohp);
tup = TUPLE5(hp, am_DOWN, ref_copy, type, item_copy, reason_copy);
- erts_queue_message(p, p_locksp, bp, tup, NIL);
+ erts_queue_message(p, p_locksp, msgp, tup, NIL);
}
static BIF_RETTYPE
@@ -841,7 +838,7 @@ BIF_RETTYPE spawn_link_3(BIF_ALIST_3)
ErlSpawnOpts so;
Eterm pid;
- so.flags = SPO_LINK;
+ so.flags = erts_default_spo_flags|SPO_LINK;
pid = erl_create_process(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, &so);
if (is_non_value(pid)) {
BIF_ERROR(BIF_P, so.error_code);
@@ -878,7 +875,7 @@ BIF_RETTYPE spawn_opt_1(BIF_ALIST_1)
/*
* Store default values for options.
*/
- so.flags = SPO_USE_ARGS;
+ so.flags = erts_default_spo_flags|SPO_USE_ARGS;
so.min_heap_size = H_MIN_SIZE;
so.min_vheap_size = BIN_VH_MIN_SIZE;
so.priority = PRIORITY_NORMAL;
@@ -913,6 +910,13 @@ BIF_RETTYPE spawn_opt_1(BIF_ALIST_1)
so.priority = PRIORITY_LOW;
else
goto error;
+ } else if (arg == am_off_heap_message_queue) {
+ if (val == am_true)
+ so.flags |= SPO_OFF_HEAP_MSGQ;
+ else if (val == am_false)
+ so.flags &= ~SPO_OFF_HEAP_MSGQ;
+ else
+ goto error;
} else if (arg == am_min_heap_size && is_small(val)) {
Sint min_heap_size = signed_val(val);
if (min_heap_size < 0) {
@@ -1691,6 +1695,17 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2)
}
BIF_RET(old_value);
}
+ else if (BIF_ARG_1 == am_off_heap_message_queue) {
+ int enable;
+ if (BIF_ARG_2 == am_true)
+ enable = 1;
+ else if (BIF_ARG_2 == am_false)
+ enable = 0;
+ else
+ goto error;
+ old_value = erts_change_off_heap_message_queue_state(BIF_P, enable);
+ BIF_RET(old_value);
+ }
else if (BIF_ARG_1 == am_sensitive) {
Uint is_sensitive;
if (BIF_ARG_2 == am_true) {
@@ -1931,7 +1946,7 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext* ctx)
} else if (is_atom(to)) {
Eterm id = erts_whereis_name_to_id(p, to);
- rp = erts_proc_lookup(id);
+ rp = erts_proc_lookup_raw(id);
if (rp) {
if (IS_TRACED(p))
trace_send(p, to, msg);
@@ -4479,7 +4494,7 @@ BIF_RETTYPE system_flag_2(BIF_ALIST_2)
}
} else if (BIF_ARG_1 == make_small(1)) {
int i, max;
- ErlMessage* mp;
+ ErtsMessage* mp;
erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
erts_smp_thr_progress_block();
diff --git a/erts/emulator/beam/bif.h b/erts/emulator/beam/bif.h
index c6ed60376a..a62eddf36b 100644
--- a/erts/emulator/beam/bif.h
+++ b/erts/emulator/beam/bif.h
@@ -54,22 +54,24 @@ extern Export *erts_convert_time_unit_trap;
(p)->fcalls = -CONTEXT_REDS; \
} while(0)
-
-#define ERTS_VBUMP_ALL_REDS(p) \
+#define ERTS_VBUMP_ALL_REDS_INTERNAL(p, fcalls) \
do { \
if (!ERTS_PROC_GET_SAVED_CALLS_BUF((p))) { \
- if ((p)->fcalls > 0) \
- ERTS_PROC_GET_SCHDATA((p))->virtual_reds += (p)->fcalls; \
- (p)->fcalls = 0; \
+ if ((fcalls) > 0) \
+ ERTS_PROC_GET_SCHDATA((p))->virtual_reds += (fcalls); \
+ (fcalls) = 0; \
} \
else { \
- if ((p)->fcalls > -CONTEXT_REDS) \
+ if ((fcalls) > -CONTEXT_REDS) \
ERTS_PROC_GET_SCHDATA((p))->virtual_reds \
- += ((p)->fcalls - (-CONTEXT_REDS)); \
- (p)->fcalls = -CONTEXT_REDS; \
+ += ((fcalls) - (-CONTEXT_REDS)); \
+ (fcalls) = -CONTEXT_REDS; \
} \
} while(0)
+#define ERTS_VBUMP_ALL_REDS(p) \
+ ERTS_VBUMP_ALL_REDS_INTERNAL((p), (p)->fcalls)
+
#define BUMP_REDS(p, gc) do { \
ASSERT(p); \
ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN & erts_proc_lc_my_proc_locks(p));\
@@ -110,10 +112,34 @@ do { \
} \
} while(0)
-#define ERTS_BIF_REDS_LEFT(p) \
+#define ERTS_VBUMP_LEAVE_REDS_INTERNAL(P, Reds, FCalls) \
+ do { \
+ if (ERTS_PROC_GET_SAVED_CALLS_BUF((P))) { \
+ int nreds__ = ((int)(Reds)) - CONTEXT_REDS; \
+ if ((FCalls) > nreds__) { \
+ ERTS_PROC_GET_SCHDATA((P))->virtual_reds \
+ += (FCalls) - nreds__; \
+ (FCalls) = nreds__; \
+ } \
+ } \
+ else { \
+ if ((FCalls) > (Reds)) { \
+ ERTS_PROC_GET_SCHDATA((P))->virtual_reds \
+ += (FCalls) - (Reds); \
+ (FCalls) = (Reds); \
+ } \
+ } \
+ } while (0)
+
+#define ERTS_VBUMP_LEAVE_REDS(P, Reds) \
+ ERTS_VBUMP_LEAVE_REDS_INTERNAL(P, Reds, (P)->fcalls)
+
+#define ERTS_REDS_LEFT(p, FCalls) \
(ERTS_PROC_GET_SAVED_CALLS_BUF((p)) \
- ? ((p)->fcalls > -CONTEXT_REDS ? ((p)->fcalls - (-CONTEXT_REDS)) : 0)\
- : ((p)->fcalls > 0 ? (p)->fcalls : 0))
+ ? ((FCalls) > -CONTEXT_REDS ? ((FCalls) - (-CONTEXT_REDS)) : 0) \
+ : ((FCalls) > 0 ? (FCalls) : 0))
+
+#define ERTS_BIF_REDS_LEFT(p) ERTS_REDS_LEFT(p, p->fcalls)
#define BIF_RET2(x, gc) do { \
BUMP_REDS(BIF_P, (gc)); \
diff --git a/erts/emulator/beam/break.c b/erts/emulator/beam/break.c
index c7e7411935..aa5ec123a7 100644
--- a/erts/emulator/beam/break.c
+++ b/erts/emulator/beam/break.c
@@ -252,7 +252,7 @@ print_process_info(int to, void *to_arg, Process *p)
/* display the message queue only if there is anything in it */
if (!ERTS_IS_CRASH_DUMPING && p->msg.first != NULL && !garbing) {
- ErlMessage* mp;
+ ErtsMessage* mp;
erts_print(to, to_arg, "Message queue: [");
for (mp = p->msg.first; mp; mp = mp->next)
erts_print(to, to_arg, mp->next ? "%T," : "%T", ERL_MESSAGE_TERM(mp));
@@ -323,7 +323,7 @@ print_process_info(int to, void *to_arg, Process *p)
erts_print(to, to_arg, "Heap unused: %bpu\n", (p->hend - p->htop));
erts_print(to, to_arg, "OldHeap unused: %bpu\n",
(OLD_HEAP(p) == NULL) ? 0 : (OLD_HEND(p) - OLD_HTOP(p)) );
- erts_print(to, to_arg, "Memory: %beu\n", erts_process_memory(p));
+ erts_print(to, to_arg, "Memory: %beu\n", erts_process_memory(p, !0));
if (garbing) {
print_garb_info(to, to_arg, p);
diff --git a/erts/emulator/beam/copy.c b/erts/emulator/beam/copy.c
index b185758b1d..f27c526413 100644
--- a/erts/emulator/beam/copy.c
+++ b/erts/emulator/beam/copy.c
@@ -279,7 +279,7 @@ Eterm copy_struct(Eterm obj, Uint sz, Eterm** hpp, ErlOffHeap* off_heap)
break;
case TAG_PRIMARY_LIST:
objp = list_val(obj);
- if (in_area(objp,hstart,hsize)) {
+ if (ErtsInArea(objp,hstart,hsize)) {
hp++;
break;
}
@@ -318,7 +318,7 @@ Eterm copy_struct(Eterm obj, Uint sz, Eterm** hpp, ErlOffHeap* off_heap)
}
case TAG_PRIMARY_BOXED:
- if (in_area(boxed_val(obj),hstart,hsize)) {
+ if (ErtsInArea(boxed_val(obj),hstart,hsize)) {
hp++;
break;
}
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index efd5109269..bfddcadca3 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -373,10 +373,11 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp)
ASSERT(lnk->type == LINK_NODE);
if (is_internal_pid(lnk->pid)) {
ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK;
- rp = erts_pid2proc(NULL, 0, lnk->pid, rp_locks);
- if (!rp) {
+ ErlOffHeap *ohp;
+ rp = erts_proc_lookup(lnk->pid);
+ if (!rp)
goto done;
- }
+ erts_smp_proc_lock(rp, rp_locks);
rlnk = erts_remove_link(&ERTS_P_LINKS(rp), name);
if (rlnk != NULL) {
ASSERT(is_atom(rlnk->pid) && (rlnk->type == LINK_NODE));
@@ -384,12 +385,14 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp)
}
n = ERTS_LINK_REFC(lnk);
for (i = 0; i < n; ++i) {
- ErlHeapFragment* bp;
- ErlOffHeap *ohp;
Eterm tup;
- Eterm *hp = erts_alloc_message_heap(3,&bp,&ohp,rp,&rp_locks);
+ Eterm *hp;
+ ErtsMessage *msgp;
+
+ msgp = erts_alloc_message_heap(rp, &rp_locks,
+ 3, &hp, &ohp);
tup = TUPLE2(hp, am_nodedown, name);
- erts_queue_message(rp, &rp_locks, bp, tup, NIL);
+ erts_queue_message(rp, &rp_locks, msgp, tup, NIL);
}
erts_smp_proc_unlock(rp, rp_locks);
}
@@ -1458,7 +1461,7 @@ int erts_net_message(Port *prt,
ErlOffHeap *ohp;
ASSERT(xsize);
heap_frag = erts_dist_ext_trailer(ede_copy);
- ERTS_INIT_HEAP_FRAG(heap_frag, token_size);
+ ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
hp = heap_frag->mem;
ohp = &heap_frag->off_heap;
token = tuple[5];
@@ -1507,7 +1510,7 @@ int erts_net_message(Port *prt,
ErlOffHeap *ohp;
ASSERT(xsize);
heap_frag = erts_dist_ext_trailer(ede_copy);
- ERTS_INIT_HEAP_FRAG(heap_frag, token_size);
+ ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
hp = heap_frag->mem;
ohp = &heap_frag->off_heap;
token = tuple[4];
@@ -3267,11 +3270,16 @@ send_nodes_mon_msg(Process *rp,
Uint sz)
{
Eterm msg;
- ErlHeapFragment* bp;
+ Eterm *hp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
- Eterm *hp = erts_alloc_message_heap(sz, &bp, &ohp, rp, rp_locksp);
#ifdef DEBUG
- Eterm *hend = hp + sz;
+ Eterm *hend;
+#endif
+
+ mp = erts_alloc_message_heap(rp, rp_locksp, sz, &hp, &ohp);
+#ifdef DEBUG
+ hend = hp + sz;
#endif
if (!nmp->opts) {
@@ -3317,7 +3325,7 @@ send_nodes_mon_msg(Process *rp,
}
ASSERT(hend == hp);
- erts_queue_message(rp, rp_locksp, bp, msg, NIL);
+ erts_queue_message(rp, rp_locksp, mp, msg, NIL);
}
static void
diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c
index 3e300f88ea..019aa0f16c 100644
--- a/erts/emulator/beam/erl_alloc.c
+++ b/erts/emulator/beam/erl_alloc.c
@@ -582,7 +582,7 @@ erts_alloc_init(int *argc, char **argv, ErtsAllocInitOpts *eaiop)
fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_DRV_SEL_D_STATE)]
= sizeof(ErtsDrvSelectDataState);
fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_MSG_REF)]
- = sizeof(ErlMessage);
+ = sizeof(ErtsMessageRef);
#ifdef ERTS_SMP
fix_type_sizes[ERTS_ALC_FIX_TYPE_IX(ERTS_ALC_T_THR_Q_EL_SL)]
= sizeof(ErtsThrQElement_t);
@@ -2916,12 +2916,12 @@ reply_alloc_info(void *vair)
int global_instances = air->req_sched == sched_id;
ErtsProcLocks rp_locks;
Process *rp = air->proc;
- Eterm ref_copy = NIL, ai_list, msg;
- Eterm *hp = NULL, *hp_end = NULL, *hp_start = NULL;
+ Eterm ref_copy = NIL, ai_list, msg = NIL;
+ Eterm *hp = NULL, *hp_start = NULL, *hp_end = NULL;
Eterm **hpp;
Uint sz, *szp;
ErlOffHeap *ohp = NULL;
- ErlHeapFragment *bp = NULL;
+ ErtsMessage *mp = NULL;
struct erts_mmap_info_struct emis;
int i;
Eterm (*info_func)(Allctr_t *,
@@ -3123,20 +3123,17 @@ reply_alloc_info(void *vair)
if (hpp)
break;
- hp = erts_alloc_message_heap(sz, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, sz, &hp, &ohp);
hp_start = hp;
hp_end = hp + sz;
szp = NULL;
hpp = &hp;
}
- if (bp)
- bp = erts_resize_message_buffer(bp, hp - hp_start, &msg, 1);
- else {
- ASSERT(hp);
- HRelease(rp, hp_end, hp);
- }
- erts_queue_message(rp, &rp_locks, bp, msg, NIL);
+ if (hp != hp_end)
+ erts_shrink_message_heap(&mp, rp, hp_start, hp, hp_end, &msg, 1);
+
+ erts_queue_message(rp, &rp_locks, mp, msg, NIL);
if (air->req_sched == sched_id)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
diff --git a/erts/emulator/beam/erl_alloc.h b/erts/emulator/beam/erl_alloc.h
index 9da9c823f7..14e80960f5 100644
--- a/erts/emulator/beam/erl_alloc.h
+++ b/erts/emulator/beam/erl_alloc.h
@@ -44,9 +44,11 @@
#if ERTS_CAN_INLINE && ERTS_ALC_WANT_INLINE
# define ERTS_ALC_DO_INLINE 1
# define ERTS_ALC_INLINE static ERTS_INLINE
+# define ERTS_ALC_FORCE_INLINE static ERTS_FORCE_INLINE
#else
# define ERTS_ALC_DO_INLINE 0
# define ERTS_ALC_INLINE
+# define ERTS_ALC_FORCE_INLINE
#endif
#define ERTS_ALC_NO_FIXED_SIZES \
@@ -293,7 +295,7 @@ int erts_is_allctr_wrapper_prelocked(void)
#ifdef ERTS_HAVE_IS_IN_LITERAL_RANGE
-ERTS_ALC_INLINE
+ERTS_ALC_FORCE_INLINE
int erts_is_in_literal_range(void* ptr)
{
#if defined(ARCH_32)
diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types
index 7d519c1be4..75b4913012 100644
--- a/erts/emulator/beam/erl_alloc.types
+++ b/erts/emulator/beam/erl_alloc.types
@@ -152,6 +152,8 @@ type OLD_HEAP EHEAP PROCESSES old_heap
type HEAP_FRAG EHEAP PROCESSES heap_frag
type TMP_HEAP TEMPORARY PROCESSES tmp_heap
type MSG_REF FIXED_SIZE PROCESSES msg_ref
+type MSG EHEAP PROCESSES message
+type MSGQ_CHNG SHORT_LIVED PROCESSES messages_queue_change
type MSG_ROOTS TEMPORARY PROCESSES msg_roots
type ROOTSET TEMPORARY PROCESSES root_set
type LOADER_TMP TEMPORARY CODE loader_tmp
diff --git a/erts/emulator/beam/erl_bif_ddll.c b/erts/emulator/beam/erl_bif_ddll.c
index 28bec6325c..2b1d875bfe 100644
--- a/erts/emulator/beam/erl_bif_ddll.c
+++ b/erts/emulator/beam/erl_bif_ddll.c
@@ -1707,18 +1707,19 @@ static void notify_proc(Process *proc, Eterm ref, Eterm driver_name, Eterm type,
Eterm mess;
Eterm r;
Eterm *hp;
- ErlHeapFragment *bp;
- ErlOffHeap *ohp;
+ ErtsMessage *mp;
ErtsProcLocks rp_locks = 0;
+ ErlOffHeap *ohp;
ERTS_SMP_CHK_NO_PROC_LOCKS;
assert_drv_list_rwlocked();
if (errcode != 0) {
int need = load_error_need(errcode);
Eterm e;
- hp = erts_alloc_message_heap(6 /* tuple */ + 3 /* Error tuple */ +
- REF_THING_SIZE + need, &bp, &ohp,
- proc, &rp_locks);
+ mp = erts_alloc_message_heap(proc, &rp_locks,
+ (6 /* tuple */ + 3 /* Error tuple */ +
+ REF_THING_SIZE + need),
+ &hp, &ohp);
r = copy_ref(ref,hp);
hp += REF_THING_SIZE;
e = build_load_error_hp(hp, errcode);
@@ -1727,12 +1728,14 @@ static void notify_proc(Process *proc, Eterm ref, Eterm driver_name, Eterm type,
hp += 3;
mess = TUPLE5(hp,type,r,am_driver,driver_name,mess);
} else {
- hp = erts_alloc_message_heap(6 /* tuple */ + REF_THING_SIZE, &bp, &ohp, proc, &rp_locks);
+ mp = erts_alloc_message_heap(proc, &rp_locks,
+ 6 /* tuple */ + REF_THING_SIZE,
+ &hp, &ohp);
r = copy_ref(ref,hp);
hp += REF_THING_SIZE;
mess = TUPLE5(hp,type,r,am_driver,driver_name,tag);
}
- erts_queue_message(proc, &rp_locks, bp, mess, am_undefined);
+ erts_queue_message(proc, &rp_locks, mp, mess, am_undefined);
erts_smp_proc_unlock(proc, rp_locks);
ERTS_SMP_CHK_NO_PROC_LOCKS;
}
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index a684c81445..1eb106a551 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -589,6 +589,7 @@ static Eterm pi_args[] = {
am_min_bin_vheap_size,
am_current_location,
am_current_stacktrace,
+ am_off_heap_message_queue
};
#define ERTS_PI_ARGS ((int) (sizeof(pi_args)/sizeof(Eterm)))
@@ -636,6 +637,7 @@ pi_arg2ix(Eterm arg)
case am_min_bin_vheap_size: return 28;
case am_current_location: return 29;
case am_current_stacktrace: return 30;
+ case am_off_heap_message_queue: return 31;
default: return -1;
}
}
@@ -718,9 +720,10 @@ pi_pid2proc(Process *c_p, Eterm pid, ErtsProcLocks info_locks)
-BIF_RETTYPE
+static BIF_RETTYPE
process_info_aux(Process *BIF_P,
Process *rp,
+ ErtsProcLocks rp_locks,
Eterm rpid,
Eterm item,
int always_wrap);
@@ -811,10 +814,31 @@ process_info_list(Process *c_p, Eterm pid, Eterm list, int always_wrap,
*fail_type = ERTS_PI_FAIL_TYPE_AWAIT_EXIT;
goto done;
}
- else if (!(locks & ERTS_PROC_LOCK_STATUS)) {
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
+ else {
+ ErtsProcLocks unlock_locks = 0;
+
+ if (c_p == rp)
+ locks |= ERTS_PROC_LOCK_MAIN;
+
+ if (!(locks & ERTS_PROC_LOCK_STATUS))
+ unlock_locks |= ERTS_PROC_LOCK_STATUS;
+
+ if (locks & ERTS_PROC_LOCK_MSGQ) {
+ /*
+ * Move in queue into private queue and
+ * release msgq lock, enabling others to
+ * send messages to the process while it
+ * is being inspected...
+ */
+ ASSERT(locks & ERTS_PROC_LOCK_MAIN);
+ ERTS_SMP_MSGQ_MV_INQ2PRIVQ(rp);
+ locks &= ~ERTS_PROC_LOCK_MSGQ;
+ unlock_locks |= ERTS_PROC_LOCK_MSGQ;
+ }
+
+ if (unlock_locks)
+ erts_smp_proc_unlock(rp, unlock_locks);
}
-
/*
* We always handle 'messages' first if it should be part
@@ -826,7 +850,7 @@ process_info_list(Process *c_p, Eterm pid, Eterm list, int always_wrap,
if (want_messages) {
ix = pi_arg2ix(am_messages);
ASSERT(part_res[ix] == THE_NON_VALUE);
- part_res[ix] = process_info_aux(c_p, rp, pid, am_messages, always_wrap);
+ part_res[ix] = process_info_aux(c_p, rp, locks, pid, am_messages, always_wrap);
ASSERT(part_res[ix] != THE_NON_VALUE);
}
@@ -834,7 +858,7 @@ process_info_list(Process *c_p, Eterm pid, Eterm list, int always_wrap,
ix = res_elem_ix[res_elem_ix_ix];
if (part_res[ix] == THE_NON_VALUE) {
arg = pi_ix2arg(ix);
- part_res[ix] = process_info_aux(c_p, rp, pid, arg, always_wrap);
+ part_res[ix] = process_info_aux(c_p, rp, locks, pid, arg, always_wrap);
ASSERT(part_res[ix] != THE_NON_VALUE);
}
}
@@ -965,9 +989,31 @@ BIF_RETTYPE process_info_2(BIF_ALIST_2)
ERTS_BIF_AWAIT_X_DATA_TRAP(BIF_P, BIF_ARG_1, am_undefined);
}
else {
+ ErtsProcLocks unlock_locks = 0;
+
+ if (BIF_P == rp)
+ info_locks |= ERTS_PROC_LOCK_MAIN;
+
if (!(info_locks & ERTS_PROC_LOCK_STATUS))
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
- res = process_info_aux(BIF_P, rp, pid, BIF_ARG_2, 0);
+ unlock_locks |= ERTS_PROC_LOCK_STATUS;
+
+ if (info_locks & ERTS_PROC_LOCK_MSGQ) {
+ /*
+ * Move in queue into private queue and
+ * release msgq lock, enabling others to
+ * send messages to the process while it
+ * is being inspected...
+ */
+ ASSERT(info_locks & ERTS_PROC_LOCK_MAIN);
+ ERTS_SMP_MSGQ_MV_INQ2PRIVQ(rp);
+ info_locks &= ~ERTS_PROC_LOCK_MSGQ;
+ unlock_locks |= ERTS_PROC_LOCK_MSGQ;
+ }
+
+ if (unlock_locks)
+ erts_smp_proc_unlock(rp, unlock_locks);
+
+ res = process_info_aux(BIF_P, rp, info_locks, pid, BIF_ARG_2, 0);
}
ASSERT(is_value(res));
@@ -985,6 +1031,7 @@ BIF_RETTYPE process_info_2(BIF_ALIST_2)
Eterm
process_info_aux(Process *BIF_P,
Process *rp,
+ ErtsProcLocks rp_locks,
Eterm rpid,
Eterm item,
int always_wrap)
@@ -1056,171 +1103,55 @@ process_info_aux(Process *BIF_P,
break;
case am_messages: {
- ErlMessage* mp;
- int n;
- ERTS_SMP_MSGQ_MV_INQ2PRIVQ(rp);
- n = rp->msg.len;
-
- if (n == 0 || ERTS_TRACE_FLAGS(rp) & F_SENSITIVE) {
+ if (rp->msg.len == 0 || ERTS_TRACE_FLAGS(rp) & F_SENSITIVE) {
hp = HAlloc(BIF_P, 3);
} else {
- int remove_bad_messages = 0;
- struct {
- Uint copy_struct_size;
- ErlMessage* msgp;
- } *mq = erts_alloc(ERTS_ALC_T_TMP, n*sizeof(*mq));
- Sint i = 0;
- Uint heap_need = 3;
+ ErtsMessageInfo *mip;
+ Sint i;
+ Uint heap_need;
+#ifdef DEBUG
Eterm *hp_end;
+#endif
- for (mp = rp->msg.first; mp; mp = mp->next) {
- heap_need += 2;
- mq[i].msgp = mp;
- if (rp != BIF_P) {
- Eterm msg = ERL_MESSAGE_TERM(mq[i].msgp);
- if (is_value(msg)) {
- mq[i].copy_struct_size = (is_immed(msg)? 0 :
- size_object(msg));
- }
- else if (mq[i].msgp->data.attached) {
- mq[i].copy_struct_size
- = erts_msg_attached_data_size(mq[i].msgp);
- }
- else {
- /* Bad distribution message; ignore */
- remove_bad_messages = 1;
- mq[i].copy_struct_size = 0;
- }
- heap_need += mq[i].copy_struct_size;
- }
- else {
- mq[i].copy_struct_size = mp->data.attached ?
- erts_msg_attached_data_size(mp) : 0;
- }
- i++;
- }
+ mip = erts_alloc(ERTS_ALC_T_TMP,
+ rp->msg.len*sizeof(ErtsMessageInfo));
- if (rp != BIF_P) {
- hp = HAlloc(BIF_P, heap_need);
- hp_end = hp + heap_need;
- ASSERT(i == n);
- for (i--; i >= 0; i--) {
- Eterm msg = ERL_MESSAGE_TERM(mq[i].msgp);
- if (is_value(msg)) {
- if (mq[i].copy_struct_size)
- msg = copy_struct(msg,
- mq[i].copy_struct_size,
- &hp,
- &MSO(BIF_P));
- }
- else if (mq[i].msgp->data.attached) {
- ErlHeapFragment *hfp;
- /*
- * Decode it into a message buffer and attach it
- * to the message instead of the attached external
- * term.
- *
- * Note that we may not pass a process pointer
- * to erts_msg_distext2heap(), since it would then
- * try to alter locks on that process.
- */
- msg = erts_msg_distext2heap(
- NULL, NULL, &hfp, &ERL_MESSAGE_TOKEN(mq[i].msgp),
- mq[i].msgp->data.dist_ext);
-
- ERL_MESSAGE_TERM(mq[i].msgp) = msg;
- mq[i].msgp->data.heap_frag = hfp;
-
- if (is_non_value(msg)) {
- ASSERT(!mq[i].msgp->data.heap_frag);
- /* Bad distribution message; ignore */
- remove_bad_messages = 1;
- continue;
- }
- else {
- /* Make our copy of the message */
- ASSERT(size_object(msg) == erts_used_frag_sz(hfp));
- msg = copy_struct(msg,
- erts_used_frag_sz(hfp),
- &hp,
- &MSO(BIF_P));
- }
- }
- else {
- /* Bad distribution message; ignore */
- remove_bad_messages = 1;
- continue;
- }
- res = CONS(hp, msg, res);
- hp += 2;
- }
- HRelease(BIF_P, hp_end, hp+3);
- }
- else {
- for (i--; i >= 0; i--) {
- ErtsHeapFactory factory;
- Eterm msg = ERL_MESSAGE_TERM(mq[i].msgp);
-
- erts_factory_proc_prealloc_init(&factory, BIF_P,
- mq[i].copy_struct_size+2);
- if (mq[i].msgp->data.attached) {
- /* Decode it on the heap */
- erts_move_msg_attached_data_to_heap(&factory,
- mq[i].msgp);
- msg = ERL_MESSAGE_TERM(mq[i].msgp);
- ASSERT(!mq[i].msgp->data.attached);
- }
- if (is_value(msg)) {
- hp = erts_produce_heap(&factory, 2, 0);
- res = CONS(hp, msg, res);
- }
- else {
- /* Bad distribution message; ignore */
- remove_bad_messages = 1;
- continue;
- }
- erts_factory_close(&factory);
- }
- hp = HAlloc(BIF_P, 3);
- }
- erts_free(ERTS_ALC_T_TMP, mq);
- if (remove_bad_messages) {
- ErlMessage **mpp;
- /*
- * We need to remove bad distribution messages from
- * the queue, so that the value returned for
- * 'message_queue_len' is consistent with the value
- * returned for 'messages'.
- */
- mpp = &rp->msg.first;
- mp = rp->msg.first;
- while (mp) {
- if (is_value(ERL_MESSAGE_TERM(mp))) {
- mpp = &mp->next;
- mp = mp->next;
- }
- else {
- ErlMessage* bad_mp = mp;
- ASSERT(!mp->data.attached);
- if (rp->msg.save == &mp->next)
- rp->msg.save = mpp;
- if (rp->msg.last == &mp->next)
- rp->msg.last = mpp;
- *mpp = mp->next;
- mp = mp->next;
- rp->msg.len--;
- free_message(bad_mp);
- }
- }
+ /*
+ * Note that message queue may shrink when calling
+ * erts_prep_msgq_for_inspection() since it removes
+ * corrupt distribution messages.
+ */
+ heap_need = erts_prep_msgq_for_inspection(BIF_P, rp, rp_locks, mip);
+ heap_need += 3; /* top 2-tuple */
+ heap_need += rp->msg.len*2; /* Cons cells */
+
+ hp = HAlloc(BIF_P, heap_need); /* heap_need is exact */
+#ifdef DEBUG
+ hp_end = hp + heap_need;
+#endif
+
+ /* Build list of messages... */
+ for (i = rp->msg.len - 1, res = NIL; i >= 0; i--) {
+ Eterm msg = ERL_MESSAGE_TERM(mip[i].msgp);
+ Uint sz = mip[i].size;
+
+ if (sz != 0)
+ msg = copy_struct(msg, sz, &hp, &BIF_P->off_heap);
+
+ res = CONS(hp, msg, res);
+ hp += 2;
}
+
+ ASSERT(hp_end == hp + 3);
+
+ erts_free(ERTS_ALC_T_TMP, mip);
}
break;
}
case am_message_queue_len:
hp = HAlloc(BIF_P, 3);
- ERTS_SMP_MSGQ_MV_INQ2PRIVQ(rp);
res = make_small(rp->msg.len);
break;
@@ -1408,7 +1339,7 @@ process_info_aux(Process *BIF_P,
}
case am_total_heap_size: {
- ErlMessage *mp;
+ ErtsMessage *mp;
Uint total_heap_size;
Uint hsz = 3;
@@ -1418,8 +1349,6 @@ process_info_aux(Process *BIF_P,
total_heap_size += rp->mbuf_sz;
- ERTS_SMP_MSGQ_MV_INQ2PRIVQ(rp);
-
for (mp = rp->msg.first; mp; mp = mp->next)
if (mp->data.attached)
total_heap_size += erts_msg_attached_data_size(mp);
@@ -1441,7 +1370,7 @@ process_info_aux(Process *BIF_P,
case am_memory: { /* Memory consumed in bytes */
Uint hsz = 3;
- Uint size = erts_process_memory(rp);
+ Uint size = erts_process_memory(rp, 0);
(void) erts_bld_uint(NULL, &hsz, size);
hp = HAlloc(BIF_P, hsz);
res = erts_bld_uint(&hp, NULL, size);
@@ -1567,6 +1496,11 @@ process_info_aux(Process *BIF_P,
break;
}
+ case am_off_heap_message_queue:
+ res = BIF_P->flags & F_OFF_HEAP_MSGQ ? am_true : am_false;
+ hp = HAlloc(BIF_P, 3);
+ break;
+
default:
return THE_NON_VALUE; /* will produce badarg */
@@ -2728,6 +2662,10 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1)
BIF_RET(am_true);
}
#endif
+ else if (BIF_ARG_1 == am_off_heap_message_queue) {
+ BIF_RET(erts_default_spo_flags & SPO_OFF_HEAP_MSGQ
+ ? am_true : am_false);
+ }
else if (ERTS_IS_ATOM_STR("compile_info",BIF_ARG_1)) {
Uint sz;
Eterm res = NIL, tup, text;
@@ -3865,9 +3803,7 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2)
BIF_RET(am_false);
}
else {
- FLAGS(rp) |= F_FORCE_GC;
- if (BIF_P != rp)
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);
+ ERTS_FORCE_GC(BIF_P);
BIF_RET(am_true);
}
}
diff --git a/erts/emulator/beam/erl_debug.c b/erts/emulator/beam/erl_debug.c
index 434b6c6d7a..6652ef9bb3 100644
--- a/erts/emulator/beam/erl_debug.c
+++ b/erts/emulator/beam/erl_debug.c
@@ -309,6 +309,8 @@ void erts_check_for_holes(Process* p)
p->last_htop = HEAP_TOP(p);
for (hf = MBUF(p); hf != 0; hf = hf->next) {
+ if (hf == p->heap_hfrag)
+ continue;
if (hf == p->last_mbuf) {
break;
}
@@ -399,7 +401,7 @@ void verify_process(Process *p)
erl_exit(1,"Wild pointer found in " name " of %T!\n",p->common.id); }
- ErlMessage* mp = p->msg.first;
+ ErtsMessage* mp = p->msg.first;
VERBOSE(DEBUG_MEMORY,("Verify process: %T...\n",p->common.id));
@@ -528,7 +530,7 @@ static void print_process_memory(Process *p)
PTR_SIZE, "PCB", dashes, dashes, dashes, dashes);
if (p->msg.first != NULL) {
- ErlMessage* mp;
+ ErtsMessage* mp;
erts_printf(" Message Queue:\n");
mp = p->msg.first;
while (mp != NULL) {
diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c
index 2d7b7cafa4..6a52e1a890 100644
--- a/erts/emulator/beam/erl_gc.c
+++ b/erts/emulator/beam/erl_gc.c
@@ -42,8 +42,6 @@
#include "dtrace-wrapper.h"
#include "erl_bif_unique.h"
-#define ERTS_CONTINUOUS_NEW_HEAP
-
#define ERTS_INACT_WR_PB_LEAVE_MUCH_LIMIT 1
#define ERTS_INACT_WR_PB_LEAVE_MUCH_PERCENTAGE 20
#define ERTS_INACT_WR_PB_LEAVE_LIMIT 10
@@ -60,58 +58,6 @@
# define ERTS_GC_ASSERT(B) ((void) 1)
#endif
-#ifdef ERTS_CONTINUOUS_NEW_HEAP
-#define ERTS_IS_NEW_HEAP_PTR__(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \
- ErtsInArea((Ptr), (NhPtr), (NhSz))
-#define ERTS_IS_LITERAL_PTR__(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \
- (!ErtsInArea((Ptr), (NhPtr), (NhSz)) \
- && !ErtsInArea((Ptr), (OhPtr), (OhSz)))
-
-#ifdef ERTS_GC_DEBUG
-#define ERTS_IS_NEW_HEAP_PTR(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \
- (ERTS_IS_NEW_HEAP_PTR__((TPtr), (Ptr), (NhPtr), (NhSz), (OhPtr), (OhSz)) \
- ? (ERTS_GC_ASSERT(!erts_is_literal((TPtr), (Ptr)) \
- && !ErtsInArea((Ptr), (OhPtr), (OhSz))), 1) \
- : (ERTS_GC_ASSERT(erts_is_literal((TPtr), (Ptr)) \
- || ErtsInArea((Ptr), (OhPtr), (OhSz))), 0))
-#define ERTS_IS_LITERAL_PTR(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \
- (ERTS_IS_LITERAL_PTR__((TPtr), (Ptr), (NhPtr), (NhSz), (OhPtr), (OhSz)) \
- ? (ERTS_GC_ASSERT(erts_is_literal((TPtr), (Ptr))), 1) \
- : (ERTS_GC_ASSERT(!erts_is_literal((TPtr), (Ptr))), 0))
-#endif
-
-#else
-
-#define ERTS_IS_NEW_HEAP_PTR__(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \
- (!erts_is_literal((TPtr), (Ptr)) && !ErtsInArea((Ptr), (OhPtr), (OhSz)))
-#define ERTS_IS_LITERAL_PTR__(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \
- (erts_is_literal((TPtr), (Ptr)))
-
-#ifdef ERTS_GC_DEBUG
-#define ERTS_IS_NEW_HEAP_PTR(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \
- (ERTS_IS_NEW_HEAP_PTR__((TPtr), (Ptr), (NhPtr), (NhSz), (OhPtr), (OhSz)) \
- ? (ERTS_GC_ASSERT(ErtsInArea((Ptr), (NhPtr), (NhSz))), 1) \
- : (ERTS_GC_ASSERT(!ErtsInArea((Ptr), (NhPtr), (NhSz))), 0))
-#define ERTS_IS_LITERAL_PTR(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \
- (ERTS_IS_LITERAL_PTR__((TPtr), (Ptr), (NhPtr), (NhSz), (OhPtr), (OhSz)) \
- ? (ERTS_GC_ASSERT(!ErtsInArea((Ptr), (NhPtr), (NhSz)) \
- && !ErtsInArea((Ptr), (OhPtr), (OhSz))), 1) \
- : (ERTS_GC_ASSERT(ErtsInArea((Ptr), (NhPtr), (NhSz)) \
- || ErtsInArea((Ptr), (OhPtr), (OhSz))), 0))
-#endif
-
-#endif
-
-#ifndef ERTS_IS_NEW_HEAP_PTR
-#define ERTS_IS_NEW_HEAP_PTR(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \
- ERTS_IS_NEW_HEAP_PTR__((TPtr), (Ptr), (NhPtr), (NhSz), (OhPtr), (OhSz))
-#endif
-
-#ifndef ERTS_IS_LITERAL_PTR
-#define ERTS_IS_LITERAL_PTR(TPtr, Ptr, NhPtr, NhSz, OhPtr, OhSz) \
- ERTS_IS_LITERAL_PTR__((TPtr), (Ptr), (NhPtr), (NhSz), (OhPtr), (OhSz))
-#endif
-
/*
* Returns number of elements in an array.
*/
@@ -132,10 +78,10 @@
#define ErtsGcQuickSanityCheck(P) \
do { \
ASSERT((P)->heap < (P)->hend); \
- ASSERT((P)->heap_sz == (P)->hend - (P)->heap); \
+ ASSERT((p)->abandoned_heap || (P)->heap_sz == (P)->hend - (P)->heap); \
ASSERT((P)->heap <= (P)->htop && (P)->htop <= (P)->hend); \
ASSERT((P)->heap <= (P)->stop && (P)->stop <= (P)->hend); \
- ASSERT((P)->heap <= (P)->high_water && (P)->high_water <= (P)->hend);\
+ ASSERT((p)->abandoned_heap || ((P)->heap <= (P)->high_water && (P)->high_water <= (P)->hend)); \
OverRunCheck((P)); \
} while (0)
#else
@@ -163,31 +109,32 @@ typedef struct {
static Uint setup_rootset(Process*, Eterm*, int, Rootset*);
static void cleanup_rootset(Rootset *rootset);
-static Uint combined_message_size(Process* p);
static void remove_message_buffers(Process* p);
static Eterm *full_sweep_heaps(Process *p,
int hibernate,
Eterm *n_heap, Eterm* n_htop,
- char *h, Uint h_size,
char *oh, Uint oh_size,
Eterm *objv, int nobj);
-static int major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl);
-static int minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl);
-static void do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj);
+static int garbage_collect(Process* p, ErlHeapFragment *live_hf_end,
+ int need, Eterm* objv, int nobj);
+static int major_collection(Process* p, ErlHeapFragment *live_hf_end,
+ int need, Eterm* objv, int nobj, Uint *recl);
+static int minor_collection(Process* p, ErlHeapFragment *live_hf_end,
+ int need, Eterm* objv, int nobj, Uint *recl);
+static void do_minor(Process *p, ErlHeapFragment *live_hf_end,
+ char *mature, Uint mature_size,
+ Uint new_sz, Eterm* objv, int nobj);
static Eterm *sweep_new_heap(Eterm *n_hp, Eterm *n_htop,
- char* new_heap, Uint new_heap_size,
char* old_heap, Uint old_heap_size);
static Eterm *sweep_heaps(Eterm *n_hp, Eterm *n_htop,
- char* new_heap, Uint new_heap_size,
char* old_heap, Uint old_heap_size);
static Eterm* sweep_literal_area(Eterm* n_hp, Eterm* n_htop,
- char* new_heap, Uint new_heap_size,
- char* old_heap, Uint old_heap_size,
- char* src, Uint src_size);
+ char* old_heap, Uint old_heap_size,
+ char* src, Uint src_size);
static Eterm* sweep_literals_to_old_heap(Eterm* heap_ptr, Eterm* heap_end, Eterm* htop,
char* src, Uint src_size);
-static Eterm* collect_heap_frags(Process* p, Eterm* heap,
- Eterm* htop, Eterm* objv, int nobj);
+static Eterm* collect_live_heap_frags(Process* p, ErlHeapFragment *live_hf_end,
+ Eterm* heap, Eterm* htop, Eterm* objv, int nobj);
static void adjust_after_fullsweep(Process *p, int need, Eterm *objv, int nobj);
static void shrink_new_heap(Process *p, Uint new_sz, Eterm *objv, int nobj);
static void grow_new_heap(Process *p, Uint new_sz, Eterm* objv, int nobj);
@@ -204,7 +151,6 @@ static void init_gc_info(ErtsGCInfo *gcip);
#ifdef HARDDEBUG
static void disallow_heap_frag_ref_in_heap(Process* p);
static void disallow_heap_frag_ref_in_old_heap(Process* p);
-static void disallow_heap_frag_ref(Process* p, Eterm* n_htop, Eterm* objv, int nobj);
#endif
#if defined(ARCH_64)
@@ -411,10 +357,19 @@ erts_offset_off_heap(ErlOffHeap *ohp, Sint offs, Eterm* low, Eterm* high)
#undef ptr_within
Eterm
-erts_gc_after_bif_call(Process* p, Eterm result, Eterm* regs, Uint arity)
+erts_gc_after_bif_call_lhf(Process* p, ErlHeapFragment *live_hf_end,
+ Eterm result, Eterm* regs, Uint arity)
{
int cost;
+ if (p->flags & F_HIBERNATE_SCHED) {
+ /*
+ * We just hibernated. We do *not* want to mess
+ * up the hibernation by an ordinary GC...
+ */
+ return result;
+ }
+
if (is_non_value(result)) {
if (p->freason == TRAP) {
#if HIPE
@@ -422,21 +377,28 @@ erts_gc_after_bif_call(Process* p, Eterm result, Eterm* regs, Uint arity)
regs = ERTS_PROC_GET_SCHDATA(p)->x_reg_array;
}
#endif
- cost = erts_garbage_collect(p, 0, regs, p->arity);
+ cost = garbage_collect(p, live_hf_end, 0, regs, p->arity);
} else {
- cost = erts_garbage_collect(p, 0, regs, arity);
+ cost = garbage_collect(p, live_hf_end, 0, regs, arity);
}
} else {
Eterm val[1];
val[0] = result;
- cost = erts_garbage_collect(p, 0, val, 1);
+ cost = garbage_collect(p, live_hf_end, 0, val, 1);
result = val[0];
}
BUMP_REDS(p, cost);
return result;
}
+Eterm
+erts_gc_after_bif_call(Process* p, Eterm result, Eterm* regs, Uint arity)
+{
+ return erts_gc_after_bif_call_lhf(p, ERTS_INVALID_HFRAG_PTR,
+ result, regs, arity);
+}
+
static ERTS_INLINE void reset_active_writer(Process *p)
{
struct erl_off_heap_header* ptr;
@@ -450,6 +412,117 @@ static ERTS_INLINE void reset_active_writer(Process *p)
}
}
+#define ERTS_DELAY_GC_EXTRA_FREE 40
+
+static int
+delay_garbage_collection(Process *p, ErlHeapFragment *live_hf_end, int need)
+{
+ ErlHeapFragment *hfrag;
+ Eterm *orig_heap, *orig_hend, *orig_htop, *orig_stop;
+ Eterm *stop, *hend;
+ Uint hsz, ssz;
+
+ ERTS_HOLE_CHECK(p);
+
+ if (p->live_hf_end == ERTS_INVALID_HFRAG_PTR)
+ p->live_hf_end = live_hf_end;
+
+ if (need == 0)
+ return 1;
+
+ /*
+ * Satisfy need in a heap fragment...
+ */
+ ASSERT(need > 0);
+
+ orig_heap = p->heap;
+ orig_hend = p->hend;
+ orig_htop = p->htop;
+ orig_stop = p->stop;
+
+ ssz = orig_hend - orig_stop;
+ hsz = ssz + need + ERTS_DELAY_GC_EXTRA_FREE;
+
+ hfrag = new_message_buffer(hsz);
+ hfrag->next = p->mbuf;
+ p->mbuf = hfrag;
+ p->mbuf_sz += hsz;
+ p->heap = p->htop = &hfrag->mem[0];
+ p->hend = hend = &hfrag->mem[hsz];
+ p->stop = stop = hend - ssz;
+ sys_memcpy((void *) stop, (void *) orig_stop, ssz * sizeof(Eterm));
+
+ if (p->abandoned_heap) {
+ /* Active heap already in a fragment; adjust it... */
+ ErlHeapFragment *hfrag = ((ErlHeapFragment *)
+ (((char *) orig_heap)
+ - offsetof(ErlHeapFragment, mem)));
+ Uint unused = orig_hend - orig_htop;
+ ASSERT(hfrag->used_size == hfrag->alloc_size);
+ ASSERT(hfrag->used_size >= unused);
+ hfrag->used_size -= unused;
+ p->mbuf_sz -= unused;
+ }
+ else {
+ /* Do not leave a hole in the abandoned heap... */
+ if (orig_htop < orig_hend) {
+ *orig_htop = make_pos_bignum_header(orig_hend-orig_htop-1);
+ if (orig_htop + 1 < orig_hend) {
+ orig_hend[-1] = (Uint) (orig_htop - orig_heap);
+ p->flags |= F_ABANDONED_HEAP_USE;
+ }
+ }
+ p->abandoned_heap = orig_heap;
+ }
+
+#ifdef CHECK_FOR_HOLES
+ p->last_htop = p->htop;
+ p->heap_hfrag = hfrag;
+#endif
+
+ /* Make sure that we do a proper GC as soon as possible... */
+ p->flags |= F_FORCE_GC;
+ return CONTEXT_REDS;
+}
+
+static ERTS_FORCE_INLINE Uint
+young_gen_usage(Process *p)
+{
+ Uint hsz;
+ Eterm *aheap;
+
+ hsz = p->mbuf_sz;
+ aheap = p->abandoned_heap;
+ if (!aheap)
+ hsz += p->htop - p->heap;
+ else {
+ /* used in orig heap */
+ if (p->flags & F_ABANDONED_HEAP_USE)
+ hsz += aheap[p->heap_sz-1];
+ else
+ hsz += p->heap_sz;
+ /* Remove unused part in latest fragment */
+ hsz -= p->hend - p->htop;
+ }
+ return hsz;
+}
+
+#define ERTS_GET_ORIG_HEAP(Proc, Heap, HTop) \
+ do { \
+ Eterm *aheap__ = (Proc)->abandoned_heap; \
+ if (!aheap__) { \
+ (Heap) = (Proc)->heap; \
+ (HTop) = (Proc)->htop; \
+ } \
+ else { \
+ (Heap) = aheap__; \
+ if ((Proc)->flags & F_ABANDONED_HEAP_USE) \
+ (HTop) = aheap__ + aheap__[(Proc)->heap_sz-1]; \
+ else \
+ (HTop) = aheap__ + (Proc)->heap_sz; \
+ } \
+ } while (0)
+
/*
* Garbage collect a process.
*
@@ -458,8 +531,9 @@ static ERTS_INLINE void reset_active_writer(Process *p)
* objv: Array of terms to add to rootset; that is to preserve.
* nobj: Number of objects in objv.
*/
-int
-erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj)
+static int
+garbage_collect(Process* p, ErlHeapFragment *live_hf_end,
+ int need, Eterm* objv, int nobj)
{
Uint reclaimed_now = 0;
int done = 0;
@@ -469,10 +543,11 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj)
DTRACE_CHARBUF(pidbuf, DTRACE_TERM_BUF_SIZE);
#endif
- if (p->flags & F_DISABLE_GC) {
- ASSERT(need == 0);
- return 1;
- }
+ if (p->flags & F_DISABLE_GC)
+ return delay_garbage_collection(p, live_hf_end, need);
+
+ if (p->live_hf_end != ERTS_INVALID_HFRAG_PTR)
+ live_hf_end = p->live_hf_end;
esdp = erts_get_scheduler_data();
@@ -480,7 +555,7 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj)
trace_gc(p, am_gc_start);
}
- (void) erts_smp_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC);
+ erts_smp_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC);
if (erts_system_monitor_long_gc != 0)
start_time = erts_get_monotonic_time(esdp);
@@ -505,11 +580,11 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj)
while (!done) {
if ((FLAGS(p) & F_NEED_FULLSWEEP) != 0) {
DTRACE2(gc_major_start, pidbuf, need);
- done = major_collection(p, need, objv, nobj, &reclaimed_now);
+ done = major_collection(p, live_hf_end, need, objv, nobj, &reclaimed_now);
DTRACE2(gc_major_end, pidbuf, reclaimed_now);
} else {
DTRACE2(gc_minor_start, pidbuf, need);
- done = minor_collection(p, need, objv, nobj, &reclaimed_now);
+ done = minor_collection(p, live_hf_end, need, objv, nobj, &reclaimed_now);
DTRACE2(gc_minor_end, pidbuf, reclaimed_now);
}
}
@@ -551,6 +626,7 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj)
esdp->gc_info.reclaimed += reclaimed_now;
FLAGS(p) &= ~F_FORCE_GC;
+ p->live_hf_end = ERTS_INVALID_HFRAG_PTR;
#ifdef CHECK_FOR_HOLES
/*
@@ -583,6 +659,12 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj)
}
}
+int
+erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj)
+{
+ return garbage_collect(p, ERTS_INVALID_HFRAG_PTR, need, objv, nobj);
+}
+
/*
* Place all living data on a the new heap; deallocate any old heap.
* Meant to be used by hibernate/3.
@@ -606,16 +688,16 @@ erts_garbage_collect_hibernate(Process* p)
*/
erts_smp_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC);
ErtsGcQuickSanityCheck(p);
- ASSERT(p->mbuf_sz == 0);
- ASSERT(p->mbuf == 0);
+ ASSERT(p->mbuf == NULL);
ASSERT(p->stop == p->hend); /* Stack must be empty. */
+ ASSERT(!p->abandoned_heap);
/*
* Do it.
*/
- heap_size = p->heap_sz + (p->old_htop - p->old_heap);
+ heap_size = p->heap_sz + (p->old_htop - p->old_heap) + p->mbuf_sz;
heap = (Eterm*) ERTS_HEAP_ALLOC(ERTS_ALC_T_TMP_HEAP,
sizeof(Eterm)*heap_size);
htop = heap;
@@ -624,8 +706,6 @@ erts_garbage_collect_hibernate(Process* p)
1,
heap,
htop,
- (char *) p->heap,
- (char *) p->htop - (char *) p->heap,
(char *) p->old_heap,
(char *) p->old_htop - (char *) p->old_heap,
p->arg_reg,
@@ -644,6 +724,7 @@ erts_garbage_collect_hibernate(Process* p)
}
FLAGS(p) &= ~F_FORCE_GC;
+ p->live_hf_end = ERTS_INVALID_HFRAG_PTR;
/*
* Move the heap to its final destination.
@@ -663,6 +744,8 @@ erts_garbage_collect_hibernate(Process* p)
sys_memcpy((void *) heap, (void *) p->heap, actual_size*sizeof(Eterm));
ERTS_HEAP_FREE(ERTS_ALC_T_TMP_HEAP, p->heap, p->heap_sz*sizeof(Eterm));
+ remove_message_buffers(p);
+
p->stop = p->hend = heap + heap_size;
offs = heap - p->heap;
@@ -808,7 +891,6 @@ erts_garbage_collect_literals(Process* p, Eterm* literals,
old_htop = sweep_literals_to_old_heap(p->heap, p->htop, old_htop, area, area_size);
old_htop = sweep_literal_area(p->old_heap, old_htop,
- (char *) p->heap, sizeof(Eterm)*p->heap_sz,
(char *) p->old_heap, sizeof(Eterm)*old_heap_size,
area, area_size);
ASSERT(p->old_htop <= old_htop && old_htop <= p->old_hend);
@@ -869,15 +951,18 @@ erts_garbage_collect_literals(Process* p, Eterm* literals,
}
static int
-minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl)
+minor_collection(Process* p, ErlHeapFragment *live_hf_end,
+ int need, Eterm* objv, int nobj, Uint *recl)
{
- Uint mature = HIGH_WATER(p) - HEAP_START(p);
+ Eterm *mature = p->abandoned_heap ? p->abandoned_heap : p->heap;
+ Uint mature_size = p->high_water - mature;
+ Uint size_before = young_gen_usage(p);
/*
* Allocate an old heap if we don't have one and if we'll need one.
*/
- if (OLD_HEAP(p) == NULL && mature != 0) {
+ if (OLD_HEAP(p) == NULL && mature_size != 0) {
Eterm* n_old;
/* Note: We choose a larger heap size than strictly needed,
@@ -885,7 +970,7 @@ minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl)
* This improved Estone by more than 1200 estones on my computer
* (Ultra Sparc 10).
*/
- Uint new_sz = erts_next_heap_size(HEAP_TOP(p) - HEAP_START(p), 1);
+ Uint new_sz = erts_next_heap_size(size_before, 1);
/* Create new, empty old_heap */
n_old = (Eterm *) ERTS_HEAP_ALLOC(ERTS_ALC_T_OLD_HEAP,
@@ -901,41 +986,25 @@ minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl)
*/
if (OLD_HEAP(p) &&
- ((mature <= OLD_HEND(p) - OLD_HTOP(p)) &&
- ((BIN_VHEAP_MATURE(p) < ( BIN_OLD_VHEAP_SZ(p) - BIN_OLD_VHEAP(p)))) &&
- ((BIN_OLD_VHEAP_SZ(p) > BIN_OLD_VHEAP(p))) ) ) {
- ErlMessage *msgp;
- Uint size_after;
- Uint need_after;
- const Uint stack_size = STACK_SZ_ON_HEAP(p);
- const Uint size_before = MBUF_SIZE(p) + (HEAP_TOP(p) - HEAP_START(p));
- Uint new_sz = HEAP_SIZE(p) + MBUF_SIZE(p) + combined_message_size(p);
+ ((mature_size <= OLD_HEND(p) - OLD_HTOP(p)) &&
+ ((BIN_VHEAP_MATURE(p) < ( BIN_OLD_VHEAP_SZ(p) - BIN_OLD_VHEAP(p)))) &&
+ ((BIN_OLD_VHEAP_SZ(p) > BIN_OLD_VHEAP(p))) ) ) {
+ Uint stack_size, size_after, need_after, new_sz;
+
+ stack_size = p->hend - p->stop;
+ new_sz = stack_size + size_before;
new_sz = next_heap_size(p, new_sz, 0);
- do_minor(p, new_sz, objv, nobj);
+ do_minor(p, live_hf_end, (char *) mature, mature_size*sizeof(Eterm),
+ new_sz, objv, nobj);
size_after = HEAP_TOP(p) - HEAP_START(p);
*recl += (size_before - size_after);
- /*
- * Copy newly received message onto the end of the new heap.
- */
- ErtsGcQuickSanityCheck(p);
- for (msgp = p->msg.first; msgp; msgp = msgp->next) {
- if (msgp->data.attached) {
- ErtsHeapFactory factory;
- erts_factory_proc_prealloc_init(&factory, p,
- erts_msg_attached_data_size(msgp));
- erts_move_msg_attached_data_to_heap(&factory, msgp);
- erts_factory_close(&factory);
- ErtsGcQuickSanityCheck(p);
- }
- }
ErtsGcQuickSanityCheck(p);
GEN_GCS(p)++;
need_after = ((HEAP_TOP(p) - HEAP_START(p))
- + erts_used_frag_sz(MBUF(p))
+ need
+ stack_size);
@@ -984,10 +1053,13 @@ minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl)
ASSERT(MBUF(p) == NULL);
return 1;
}
+
+ grow_new_heap(p, next_heap_size(p, need_after, 0), objv, nobj);
+ return 1;
}
/*
- * Still not enough room after minor collection. Must force a major collection.
+ * Not enough room for a minor collection. Must force a major collection.
*/
FLAGS(p) |= F_NEED_FULLSWEEP;
return 0;
@@ -1044,7 +1116,9 @@ static ERTS_INLINE void offset_nstack(Process* p, Sint offs,
#endif /* HIPE */
static void
-do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj)
+do_minor(Process *p, ErlHeapFragment *live_hf_end,
+ char *mature, Uint mature_size,
+ Uint new_sz, Eterm* objv, int nobj)
{
Rootset rootset; /* Rootset for GC (stack, dictionary, etc). */
Roots* roots;
@@ -1053,9 +1127,6 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj)
Eterm* ptr;
Eterm val;
Eterm gval;
- char* heap = (char *) HEAP_START(p);
- Uint heap_size = (char *) HEAP_TOP(p) - heap;
- Uint mature_size = (char *) HIGH_WATER(p) - heap;
Eterm* old_htop = OLD_HTOP(p);
Eterm* n_heap;
char* oh = (char *) OLD_HEAP(p);
@@ -1064,8 +1135,13 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj)
n_htop = n_heap = (Eterm*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP,
sizeof(Eterm)*new_sz);
- if (MBUF(p) != NULL) {
- n_htop = collect_heap_frags(p, n_heap, n_htop, objv, nobj);
+ if (live_hf_end != ERTS_INVALID_HFRAG_PTR) {
+ /*
+ * Move heap frags that we know are completely live
+ * directly into the new young heap generation.
+ */
+ n_htop = collect_live_heap_frags(p, live_hf_end, n_heap, n_htop,
+ objv, nobj);
}
n = setup_rootset(p, objv, nobj, &rootset);
@@ -1088,11 +1164,9 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj)
if (IS_MOVED_BOXED(val)) {
ASSERT(is_boxed(val));
*g_ptr++ = val;
- } else if (ErtsInArea(ptr, heap, mature_size)) {
+ } else if (ErtsInArea(ptr, mature, mature_size)) {
MOVE_BOXED(ptr,val,old_htop,g_ptr++);
- } else if (ERTS_IS_NEW_HEAP_PTR(gval, ptr,
- heap, heap_size,
- oh, oh_size)) {
+ } else if (ErtsInYoungGen(gval, ptr, oh, oh_size)) {
MOVE_BOXED(ptr,val,n_htop,g_ptr++);
} else {
g_ptr++;
@@ -1105,11 +1179,9 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj)
val = *ptr;
if (IS_MOVED_CONS(val)) { /* Moved */
*g_ptr++ = ptr[1];
- } else if (ErtsInArea(ptr, heap, mature_size)) {
+ } else if (ErtsInArea(ptr, mature, mature_size)) {
MOVE_CONS(ptr,val,old_htop,g_ptr++);
- } else if (ERTS_IS_NEW_HEAP_PTR(gval, ptr,
- heap, heap_size,
- oh, oh_size)) {
+ } else if (ErtsInYoungGen(gval, ptr, oh, oh_size)) {
MOVE_CONS(ptr,val,n_htop,g_ptr++);
} else {
g_ptr++;
@@ -1134,8 +1206,7 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj)
*/
if (mature_size == 0) {
- n_htop = sweep_new_heap(n_heap, n_htop, heap, heap_size,
- oh, oh_size);
+ n_htop = sweep_new_heap(n_heap, n_htop, oh, oh_size);
} else {
Eterm* n_hp = n_heap;
Eterm* ptr;
@@ -1152,11 +1223,9 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj)
if (IS_MOVED_BOXED(val)) {
ASSERT(is_boxed(val));
*n_hp++ = val;
- } else if (ErtsInArea(ptr, heap, mature_size)) {
+ } else if (ErtsInArea(ptr, mature, mature_size)) {
MOVE_BOXED(ptr,val,old_htop,n_hp++);
- } else if (ERTS_IS_NEW_HEAP_PTR(gval, ptr,
- heap, heap_size,
- oh, oh_size)) {
+ } else if (ErtsInYoungGen(gval, ptr, oh, oh_size)) {
MOVE_BOXED(ptr,val,n_htop,n_hp++);
} else {
n_hp++;
@@ -1168,11 +1237,9 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj)
val = *ptr;
if (IS_MOVED_CONS(val)) {
*n_hp++ = ptr[1];
- } else if (ErtsInArea(ptr, heap, mature_size)) {
+ } else if (ErtsInArea(ptr, mature, mature_size)) {
MOVE_CONS(ptr,val,old_htop,n_hp++);
- } else if (ERTS_IS_NEW_HEAP_PTR(gval, ptr,
- heap, heap_size,
- oh, oh_size)) {
+ } else if (ErtsInYoungGen(gval, ptr, oh, oh_size)) {
MOVE_CONS(ptr,val,n_htop,n_hp++);
} else {
n_hp++;
@@ -1192,12 +1259,10 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj)
if (IS_MOVED_BOXED(val)) {
*origptr = val;
mb->base = binary_bytes(val);
- } else if (ErtsInArea(ptr, heap, mature_size)) {
+ } else if (ErtsInArea(ptr, mature, mature_size)) {
MOVE_BOXED(ptr,val,old_htop,origptr);
mb->base = binary_bytes(mb->orig);
- } else if (ERTS_IS_NEW_HEAP_PTR(*origptr, ptr,
- heap, heap_size,
- oh, oh_size)) {
+ } else if (ErtsInYoungGen(*origptr, ptr, oh, oh_size)) {
MOVE_BOXED(ptr,val,n_htop,origptr);
mb->base = binary_bytes(mb->orig);
}
@@ -1218,11 +1283,8 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj)
* may point to the old (soon to be deleted) new_heap.
*/
- if (OLD_HTOP(p) < old_htop) {
- old_htop = sweep_new_heap(OLD_HTOP(p), old_htop,
- heap, heap_size,
- oh, oh_size);
- }
+ if (OLD_HTOP(p) < old_htop)
+ old_htop = sweep_new_heap(OLD_HTOP(p), old_htop, oh, oh_size);
OLD_HTOP(p) = old_htop;
HIGH_WATER(p) = n_htop;
@@ -1254,8 +1316,12 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj)
#endif
ERTS_HEAP_FREE(ERTS_ALC_T_HEAP,
- (void*)HEAP_START(p),
+ (p->abandoned_heap
+ ? p->abandoned_heap
+ : HEAP_START(p)),
HEAP_SIZE(p) * sizeof(Eterm));
+ p->abandoned_heap = NULL;
+ p->flags &= ~F_ABANDONED_HEAP_USE;
HEAP_START(p) = n_heap;
HEAP_TOP(p) = n_htop;
HEAP_SIZE(p) = new_sz;
@@ -1272,15 +1338,12 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj)
*/
static int
-major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl)
+major_collection(Process* p, ErlHeapFragment *live_hf_end,
+ int need, Eterm* objv, int nobj, Uint *recl)
{
- const Uint size_before = ((HEAP_TOP(p) - HEAP_START(p))
- + (OLD_HTOP(p) - OLD_HEAP(p))
- + MBUF_SIZE(p));
+ Uint size_before, stack_size;
Eterm* n_heap;
Eterm* n_htop;
- char* src = (char *) HEAP_START(p);
- Uint src_size = (char *) HEAP_TOP(p) - src;
char* oh = (char *) OLD_HEAP(p);
Uint oh_size = (char *) OLD_HTOP(p) - oh;
Uint new_sz, stk_sz;
@@ -1290,9 +1353,11 @@ major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl)
* to receive all live data.
*/
- new_sz = (HEAP_SIZE(p) + MBUF_SIZE(p)
- + combined_message_size(p)
- + (OLD_HTOP(p) - OLD_HEAP(p)));
+ size_before = young_gen_usage(p);
+ size_before += p->old_htop - p->old_heap;
+ stack_size = p->hend - p->stop;
+
+ new_sz = stack_size + size_before;
new_sz = next_heap_size(p, new_sz, 0);
/*
@@ -1306,16 +1371,16 @@ major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl)
n_htop = n_heap = (Eterm *) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP,
sizeof(Eterm)*new_sz);
- /*
- * Get rid of heap fragments.
- */
-
- if (MBUF(p) != NULL) {
- n_htop = collect_heap_frags(p, n_heap, n_htop, objv, nobj);
+ if (live_hf_end != ERTS_INVALID_HFRAG_PTR) {
+ /*
+ * Move heap frags that we know are completely live
+ * directly into the heap.
+ */
+ n_htop = collect_live_heap_frags(p, live_hf_end, n_heap, n_htop,
+ objv, nobj);
}
- n_htop = full_sweep_heaps(p, 0, n_heap, n_htop, src, src_size,
- oh, oh_size, objv, nobj);
+ n_htop = full_sweep_heaps(p, 0, n_heap, n_htop, oh, oh_size, objv, nobj);
/* Move the stack to the end of the heap */
stk_sz = HEAP_END(p) - p->stop;
@@ -1332,8 +1397,12 @@ major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl)
#endif
ERTS_HEAP_FREE(ERTS_ALC_T_HEAP,
- (void *) HEAP_START(p),
+ (p->abandoned_heap
+ ? p->abandoned_heap
+ : HEAP_START(p)),
(HEAP_END(p) - HEAP_START(p)) * sizeof(Eterm));
+ p->abandoned_heap = NULL;
+ p->flags &= ~F_ABANDONED_HEAP_USE;
HEAP_START(p) = n_heap;
HEAP_TOP(p) = n_htop;
HEAP_SIZE(p) = new_sz;
@@ -1346,24 +1415,6 @@ major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl)
*recl += size_before - (HEAP_TOP(p) - HEAP_START(p));
- {
- ErlMessage *msgp;
-
- /*
- * Copy newly received message onto the end of the new heap.
- */
- for (msgp = p->msg.first; msgp; msgp = msgp->next) {
- if (msgp->data.attached) {
- ErtsHeapFactory factory;
- erts_factory_proc_prealloc_init(&factory, p,
- erts_msg_attached_data_size(msgp));
- erts_move_msg_attached_data_to_heap(&factory, msgp);
- erts_factory_close(&factory);
- ErtsGcQuickSanityCheck(p);
- }
- }
- }
-
adjust_after_fullsweep(p, need, objv, nobj);
#ifdef HARDDEBUG
@@ -1379,7 +1430,6 @@ static Eterm *
full_sweep_heaps(Process *p,
int hibernate,
Eterm *n_heap, Eterm* n_htop,
- char *h, Uint h_size,
char *oh, Uint oh_size,
Eterm *objv, int nobj)
{
@@ -1420,9 +1470,7 @@ full_sweep_heaps(Process *p,
if (IS_MOVED_BOXED(val)) {
ASSERT(is_boxed(val));
*g_ptr++ = val;
- } else if (!ERTS_IS_LITERAL_PTR(gval, ptr,
- h, h_size,
- oh, oh_size)) {
+ } else if (!erts_is_literal(gval, ptr)) {
MOVE_BOXED(ptr,val,n_htop,g_ptr++);
} else {
g_ptr++;
@@ -1435,9 +1483,7 @@ full_sweep_heaps(Process *p,
val = *ptr;
if (IS_MOVED_CONS(val)) {
*g_ptr++ = ptr[1];
- } else if (!ERTS_IS_LITERAL_PTR(gval, ptr,
- h, h_size,
- oh, oh_size)) {
+ } else if (!erts_is_literal(gval, ptr)) {
MOVE_CONS(ptr,val,n_htop,g_ptr++);
} else {
g_ptr++;
@@ -1462,7 +1508,7 @@ full_sweep_heaps(Process *p,
* until all is copied.
*/
- n_htop = sweep_heaps(n_heap, n_htop, h, h_size, oh, oh_size);
+ n_htop = sweep_heaps(n_heap, n_htop, oh, oh_size);
if (MSO(p).first) {
sweep_off_heap(p, 1);
@@ -1514,23 +1560,6 @@ adjust_after_fullsweep(Process *p, int need, Eterm *objv, int nobj)
}
/*
- * Return the size of all message buffers that are NOT linked in the
- * mbuf list.
- */
-static Uint
-combined_message_size(Process* p)
-{
- Uint sz;
- ErlMessage *msgp;
-
- for (sz = 0, msgp = p->msg.first; msgp; msgp = msgp->next) {
- if (msgp->data.attached)
- sz += erts_msg_attached_data_size(msgp);
- }
- return sz;
-}
-
-/*
* Remove all message buffers.
*/
static void
@@ -1540,6 +1569,10 @@ remove_message_buffers(Process* p)
free_message_buffer(MBUF(p));
MBUF(p) = NULL;
}
+ if (p->msg_frag) {
+ erts_cleanup_messages(p->msg_frag);
+ p->msg_frag = NULL;
+ }
MBUF_SIZE(p) = 0;
}
#ifdef HARDDEBUG
@@ -1551,64 +1584,6 @@ remove_message_buffers(Process* p)
* For performance reasons, we use _unchecked_list_val(), _unchecked_boxed_val(),
* and so on to avoid a function call.
*/
-
-static void
-disallow_heap_frag_ref(Process* p, Eterm* n_htop, Eterm* objv, int nobj)
-{
- ErlHeapFragment* mbuf;
- ErlHeapFragment* qb;
- Eterm gval;
- Eterm* ptr;
- Eterm val;
-
- ASSERT(p->htop != NULL);
- mbuf = MBUF(p);
-
- while (nobj--) {
- gval = *objv;
-
- switch (primary_tag(gval)) {
-
- case TAG_PRIMARY_BOXED: {
- ptr = _unchecked_boxed_val(gval);
- val = *ptr;
- if (IS_MOVED_BOXED(val)) {
- ASSERT(is_boxed(val));
- objv++;
- } else {
- for (qb = mbuf; qb != NULL; qb = qb->next) {
- if (ErtsInArea(ptr, qb->mem, qb->alloc_size*sizeof(Eterm))) {
- abort();
- }
- }
- objv++;
- }
- break;
- }
-
- case TAG_PRIMARY_LIST: {
- ptr = _unchecked_list_val(gval);
- val = *ptr;
- if (IS_MOVED_CONS(val)) {
- objv++;
- } else {
- for (qb = mbuf; qb != NULL; qb = qb->next) {
- if (ErtsInArea(ptr, qb->mem, qb->alloc_size*sizeof(Eterm))) {
- abort();
- }
- }
- objv++;
- }
- break;
- }
-
- default: {
- objv++;
- break;
- }
- }
- }
-}
static void
disallow_heap_frag_ref_in_heap(Process* p)
@@ -1737,7 +1712,6 @@ typedef enum {
static ERTS_FORCE_INLINE Eterm *
sweep(Eterm *n_hp, Eterm *n_htop,
ErtsSweepType type,
- char *h, Uint hsz,
char *oh, Uint ohsz,
char *src, Uint src_size)
{
@@ -1749,14 +1723,10 @@ sweep(Eterm *n_hp, Eterm *n_htop,
#define ERTS_IS_IN_SWEEP_AREA(TPtr, Ptr) \
(type == ErtsSweepHeaps \
- ? !ERTS_IS_LITERAL_PTR((TPtr), (Ptr), h, hsz, oh, ohsz) \
+ ? !erts_is_literal((TPtr), (Ptr)) \
: (type == ErtsSweepNewHeap \
- ? ERTS_IS_NEW_HEAP_PTR((TPtr), (Ptr), h, hsz, oh, ohsz) \
- : (ErtsInArea((Ptr), src, src_size) \
- ? (ERTS_GC_ASSERT(erts_is_literal((TPtr), (Ptr)) \
- && !ErtsInArea((Ptr), h, hsz) \
- && !ErtsInArea((Ptr), oh, ohsz)), 1) \
- : 0)))
+ ? ErtsInYoungGen((TPtr), (Ptr), oh, ohsz) \
+ : ErtsInArea((Ptr), src, src_size)))
while (n_hp != n_htop) {
ASSERT(n_hp < n_htop);
@@ -1820,38 +1790,30 @@ sweep(Eterm *n_hp, Eterm *n_htop,
}
static Eterm *
-sweep_new_heap(Eterm *n_hp, Eterm *n_htop,
- char* new_heap, Uint new_heap_size,
- char* old_heap, Uint old_heap_size)
+sweep_new_heap(Eterm *n_hp, Eterm *n_htop, char* old_heap, Uint old_heap_size)
{
return sweep(n_hp, n_htop,
ErtsSweepNewHeap,
- new_heap, new_heap_size,
old_heap, old_heap_size,
NULL, 0);
}
static Eterm *
-sweep_heaps(Eterm *n_hp, Eterm *n_htop,
- char* new_heap, Uint new_heap_size,
- char* old_heap, Uint old_heap_size)
+sweep_heaps(Eterm *n_hp, Eterm *n_htop, char* old_heap, Uint old_heap_size)
{
return sweep(n_hp, n_htop,
ErtsSweepHeaps,
- new_heap, new_heap_size,
old_heap, old_heap_size,
NULL, 0);
}
static Eterm *
sweep_literal_area(Eterm *n_hp, Eterm *n_htop,
- char* new_heap, Uint new_heap_size,
char* old_heap, Uint old_heap_size,
char* src, Uint src_size)
{
return sweep(n_hp, n_htop,
ErtsSweepLiteralArea,
- new_heap, new_heap_size,
old_heap, old_heap_size,
src, src_size);
}
@@ -1956,32 +1918,21 @@ move_one_area(Eterm* n_htop, char* src, Uint src_size)
*/
static Eterm*
-collect_heap_frags(Process* p, Eterm* n_hstart, Eterm* n_htop,
- Eterm* objv, int nobj)
+collect_live_heap_frags(Process* p, ErlHeapFragment *live_hf_end,
+ Eterm* n_hstart, Eterm* n_htop,
+ Eterm* objv, int nobj)
{
ErlHeapFragment* qb;
char* frag_begin;
Uint frag_size;
/*
- * We don't allow references to a heap fragments from the stack, heap,
- * or process dictionary.
- */
-#ifdef HARDDEBUG
- disallow_heap_frag_ref(p, n_htop, p->stop, STACK_START(p) - p->stop);
- if (p->dictionary != NULL) {
- disallow_heap_frag_ref(p, n_htop, p->dictionary->data, p->dictionary->used);
- }
- disallow_heap_frag_ref_in_heap(p);
-#endif
-
- /*
* Move the heap fragments to the new heap. Note that no GC is done on
* the heap fragments. Any garbage will thus be moved as well and survive
* until next GC.
*/
qb = MBUF(p);
- while (qb != NULL) {
+ while (qb != live_hf_end) {
ASSERT(!qb->off_heap.first); /* process fragments use the MSO(p) list */
frag_size = qb->used_size * sizeof(Eterm);
if (frag_size != 0) {
@@ -1996,9 +1947,7 @@ collect_heap_frags(Process* p, Eterm* n_hstart, Eterm* n_htop,
static Uint
setup_rootset(Process *p, Eterm *objv, int nobj, Rootset *rootset)
{
- Uint avail;
Roots* roots;
- ErlMessage* mp;
Uint n;
n = 0;
@@ -2076,31 +2025,48 @@ setup_rootset(Process *p, Eterm *objv, int nobj, Rootset *rootset)
ASSERT(n <= rootset->size);
- mp = p->msg.first;
- avail = rootset->size - n;
- while (mp != NULL) {
- if (avail == 0) {
- Uint new_size = 2*rootset->size;
- if (roots == rootset->def) {
- roots = erts_alloc(ERTS_ALC_T_ROOTSET,
- new_size*sizeof(Roots));
- sys_memcpy(roots, rootset->def, sizeof(rootset->def));
- } else {
- roots = erts_realloc(ERTS_ALC_T_ROOTSET,
- (void *) roots,
- new_size*sizeof(Roots));
- }
+ switch (p->flags & (F_OFF_HEAP_MSGQ|F_OFF_HEAP_MSGQ_CHNG)) {
+ case F_OFF_HEAP_MSGQ|F_OFF_HEAP_MSGQ_CHNG:
+ (void) erts_move_messages_off_heap(p);
+ case F_OFF_HEAP_MSGQ:
+ break;
+ case F_OFF_HEAP_MSGQ_CHNG:
+ case 0: {
+ /*
+ * Off heap message queue disabled, i.e. we may
+ * have references from the message queue to the
+ * heap...
+ */
+ ErtsMessage *mp;
+
+ /* Ensure large enough rootset... */
+ if (n + p->msg.len > rootset->size) {
+ Uint new_size = n + p->msg.len;
+ ERTS_GC_ASSERT(roots == rootset->def);
+ roots = erts_alloc(ERTS_ALC_T_ROOTSET,
+ new_size*sizeof(Roots));
+ sys_memcpy(roots, rootset->def, n*sizeof(Roots));
rootset->size = new_size;
- avail = new_size - n;
}
- if (mp->data.attached == NULL) {
- roots[n].v = mp->m;
- roots[n].sz = 2;
- n++;
- avail--;
+
+ for (mp = p->msg.first; mp; mp = mp->next) {
+
+ if (!mp->data.attached) {
+ /*
+ * Message may refer data on heap;
+ * add it to rootset...
+ */
+ roots[n].v = mp->m;
+ roots[n].sz = ERL_MESSAGE_REF_ARRAY_SZ;
+ n++;
+ }
}
- mp = mp->next;
+ break;
+ }
}
+
+ ASSERT(rootset->size >= n);
+
rootset->roots = roots;
rootset->num_roots = n;
return n;
@@ -2569,7 +2535,7 @@ offset_off_heap(Process* p, Sint offs, char* area, Uint area_size)
static void
offset_mqueue(Process *p, Sint offs, char* area, Uint area_size)
{
- ErlMessage* mp = p->msg.first;
+ ErtsMessage* mp = p->msg.first;
while (mp != NULL) {
Eterm mesg = ERL_MESSAGE_TERM(mp);
@@ -2656,7 +2622,7 @@ reply_gc_info(void *vgcirp)
Eterm **hpp;
Uint sz, *szp;
ErlOffHeap *ohp = NULL;
- ErlHeapFragment *bp = NULL;
+ ErtsMessage *mp = NULL;
ASSERT(esdp);
@@ -2682,12 +2648,13 @@ reply_gc_info(void *vgcirp)
if (hpp)
break;
- hp = erts_alloc_message_heap(sz, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, sz, &hp, &ohp);
+
szp = NULL;
hpp = &hp;
}
- erts_queue_message(rp, &rp_locks, bp, msg, NIL);
+ erts_queue_message(rp, &rp_locks, mp, msg, NIL);
if (gcirp->req_sched == esdp->no)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
@@ -2739,36 +2706,49 @@ erts_gc_info_request(Process *c_p)
static int
within2(Eterm *ptr, Process *p, Eterm *real_htop)
{
- ErlHeapFragment* bp = MBUF(p);
- ErlMessage* mp = p->msg.first;
- Eterm *htop = real_htop ? real_htop : HEAP_TOP(p);
+ ErlHeapFragment* bp;
+ ErtsMessage* mp;
+ Eterm *htop, *heap;
+
+ if (p->abandoned_heap)
+ ERTS_GET_ORIG_HEAP(p, heap, htop);
+ else {
+ heap = p->heap;
+ htop = real_htop ? real_htop : HEAP_TOP(p);
+ }
if (OLD_HEAP(p) && (OLD_HEAP(p) <= ptr && ptr < OLD_HEND(p))) {
return 1;
}
- if (HEAP_START(p) <= ptr && ptr < htop) {
+ if (heap <= ptr && ptr < htop) {
return 1;
}
- while (bp != NULL) {
- if (bp->mem <= ptr && ptr < bp->mem + bp->used_size) {
- return 1;
- }
- bp = bp->next;
- }
+
+ mp = p->msg_frag;
+ bp = p->mbuf;
+
+ if (bp)
+ goto search_heap_frags;
+
while (mp) {
- if (mp->data.attached) {
- ErlHeapFragment *hfp;
- if (is_value(ERL_MESSAGE_TERM(mp)))
- hfp = mp->data.heap_frag;
- else if (is_not_nil(ERL_MESSAGE_TOKEN(mp)))
- hfp = erts_dist_ext_trailer(mp->data.dist_ext);
- else
- hfp = NULL;
- if (hfp && hfp->mem <= ptr && ptr < hfp->mem + hfp->used_size)
+
+ if (mp->data.attached == ERTS_MSG_COMBINED_HFRAG)
+ bp = &mp->hfrag;
+ else
+ bp = mp->data.heap_frag;
+
+ mp = mp->next;
+
+ search_heap_frags:
+
+ while (bp) {
+ if (bp->mem <= ptr && ptr < bp->mem + bp->used_size) {
return 1;
+ }
+ bp = bp->next;
}
- mp = mp->next;
}
+
return 0;
}
@@ -2790,11 +2770,11 @@ do { \
__FILE__, __LINE__, #EXP); \
} while (0)
+
#ifdef ERTS_OFFHEAP_DEBUG_CHK_CIRCULAR_LIST
# define ERTS_OFFHEAP_VISITED_BIT ((Eterm) 1 << 31)
#endif
-
void
erts_check_off_heap2(Process *p, Eterm *htop)
{
@@ -2823,7 +2803,7 @@ erts_check_off_heap2(Process *p, Eterm *htop)
}
ERTS_CHK_OFFHEAP_ASSERT(refc >= 1);
#ifdef ERTS_OFFHEAP_DEBUG_CHK_CIRCULAR_LIST
- ERTS_CHK_OFFHEAP_ASSERT(!(u.hdr->thing_word & ERTS_EXTERNAL_VISITED_BIT));
+ ERTS_CHK_OFFHEAP_ASSERT(!(u.hdr->thing_word & ERTS_OFFHEAP_VISITED_BIT));
u.hdr->thing_word |= ERTS_OFFHEAP_VISITED_BIT;
#endif
if (old) {
@@ -2836,7 +2816,7 @@ erts_check_off_heap2(Process *p, Eterm *htop)
}
}
-#ifdef ERTS_OFFHEAP_DEBUG_CHK_CIRCULAR_EXTERNAL_LIST
+#ifdef ERTS_OFFHEAP_DEBUG_CHK_CIRCULAR_LIST
for (u.hdr = MSO(p).first; u.hdr; u.hdr = u.hdr->next)
u.hdr->thing_word &= ~ERTS_OFFHEAP_VISITED_BIT;
#endif
diff --git a/erts/emulator/beam/erl_gc.h b/erts/emulator/beam/erl_gc.h
index fdbf948f9d..a496c5f008 100644
--- a/erts/emulator/beam/erl_gc.h
+++ b/erts/emulator/beam/erl_gc.h
@@ -69,13 +69,14 @@ do { \
while (nelts--) *HTOP++ = *PTR++; \
} while(0)
-#define in_area(ptr,start,nbytes) \
- ((UWord)((char*)(ptr) - (char*)(start)) < (nbytes))
-
#if defined(DEBUG) || defined(ERTS_OFFHEAP_DEBUG)
int within(Eterm *ptr, Process *p);
#endif
+#define ErtsInYoungGen(TPtr, Ptr, OldHeap, OldHeapSz) \
+ (!erts_is_literal((TPtr), (Ptr)) \
+ & !ErtsInArea((Ptr), (OldHeap), (OldHeapSz)))
+
ERTS_GLB_INLINE Eterm follow_moved(Eterm term, Eterm xptr_tag);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
@@ -98,6 +99,7 @@ ERTS_GLB_INLINE Eterm follow_moved(Eterm term, Eterm xptr_tag)
}
return term;
}
+
#endif
#endif /* ERL_GC_C__ || HIPE_GC_C__ */
@@ -106,6 +108,23 @@ ERTS_GLB_INLINE Eterm follow_moved(Eterm term, Eterm xptr_tag)
* Global exported
*/
+#define ERTS_IS_GC_DESIRED_INTERNAL(Proc, HTop, STop) \
+ ((((STop) - (HTop) < (Proc)->mbuf_sz)) \
+ | ((Proc)->off_heap.overhead > (Proc)->bin_vheap_sz) \
+ | !!((Proc)->flags & F_FORCE_GC))
+
+#define ERTS_IS_GC_DESIRED(Proc) \
+ ERTS_IS_GC_DESIRED_INTERNAL((Proc), (Proc)->htop, (Proc)->stop)
+
+#define ERTS_FORCE_GC_INTERNAL(Proc, FCalls) \
+ do { \
+ (Proc)->flags |= F_FORCE_GC; \
+ ERTS_VBUMP_ALL_REDS_INTERNAL((Proc), (FCalls)); \
+ } while (0)
+
+#define ERTS_FORCE_GC(Proc) \
+ ERTS_FORCE_GC_INTERNAL((Proc), (Proc)->fcalls)
+
extern Uint erts_test_long_gc_sleep;
typedef struct {
@@ -117,6 +136,8 @@ void erts_gc_info(ErtsGCInfo *gcip);
void erts_init_gc(void);
int erts_garbage_collect(struct process*, int, Eterm*, int);
void erts_garbage_collect_hibernate(struct process* p);
+Eterm erts_gc_after_bif_call_lhf(struct process* p, ErlHeapFragment *live_hf_end,
+ Eterm result, Eterm* regs, Uint arity);
Eterm erts_gc_after_bif_call(struct process* p, Eterm result, Eterm* regs, Uint arity);
void erts_garbage_collect_literals(struct process* p, Eterm* literals,
Uint lit_size,
diff --git a/erts/emulator/beam/erl_hl_timer.c b/erts/emulator/beam/erl_hl_timer.c
index 51a0d68247..6853278828 100644
--- a/erts/emulator/beam/erl_hl_timer.c
+++ b/erts/emulator/beam/erl_hl_timer.c
@@ -1245,7 +1245,9 @@ hlt_bif_timer_timeout(ErtsHLTimer *tmr, Uint32 roflgs)
* the middle of tree destruction).
*/
if (!ERTS_PROC_IS_EXITING(proc)) {
- erts_queue_message(proc, &proc_locks, tmr->btm.bp,
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = tmr->btm.bp;
+ erts_queue_message(proc, &proc_locks, mp,
tmr->btm.message, NIL);
erts_smp_proc_unlock(proc, ERTS_PROC_LOCKS_MSG_SEND);
queued_message = 1;
@@ -1926,36 +1928,31 @@ access_sched_local_btm(Process *c_p, Eterm pid,
if (proc) {
Uint hsz;
- ErlOffHeap *ohp;
- ErlHeapFragment* bp;
+ ErtsMessage *mp;
Eterm *hp, msg, ref, result;
+ ErlOffHeap *ohp;
+ Uint32 *refn;
#ifdef ERTS_HLT_DEBUG
Eterm *hp_end;
#endif
- hsz = 3; /* 2-tuple */
- if (!async)
- hsz += REF_THING_SIZE;
+ hsz = REF_THING_SIZE;
+ if (async) {
+ refn = trefn; /* timer ref */
+ hsz += 4; /* 3-tuple */
+ }
else {
- if (is_non_value(tref) || proc != c_p)
- hsz += REF_THING_SIZE;
- hsz += 1; /* upgrade to 3-tuple */
+ refn = rrefn; /* request ref */
+ hsz += 3; /* 2-tuple */
}
+
+ ERTS_HLT_ASSERT(refn);
+
if (time_left > (Sint64) MAX_SMALL)
hsz += ERTS_SINT64_HEAP_SIZE(time_left);
- if (proc == c_p) {
- bp = NULL;
- ohp = NULL;
- hp = HAlloc(c_p, hsz);
- }
- else {
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- proc,
- &proc_locks);
- }
+ mp = erts_alloc_message_heap(proc, &proc_locks,
+ hsz, &hp, &ohp);
#ifdef ERTS_HLT_DEBUG
hp_end = hp + hsz;
@@ -1968,35 +1965,22 @@ access_sched_local_btm(Process *c_p, Eterm pid,
else
result = erts_sint64_to_big(time_left, &hp);
- if (!async) {
- write_ref_thing(hp,
- rrefn[0],
- rrefn[1],
- rrefn[2]);
- ref = make_internal_ref(hp);
- hp += REF_THING_SIZE;
- msg = TUPLE2(hp, ref, result);
+ write_ref_thing(hp,
+ refn[0],
+ refn[1],
+ refn[2]);
+ ref = make_internal_ref(hp);
+ hp += REF_THING_SIZE;
- ERTS_HLT_ASSERT(hp + 3 == hp_end);
- }
- else {
- Eterm tag = cancel ? am_cancel_timer : am_read_timer;
- if (is_value(tref) && proc == c_p)
- ref = tref;
- else {
- write_ref_thing(hp,
- trefn[0],
- trefn[1],
- trefn[2]);
- ref = make_internal_ref(hp);
- hp += REF_THING_SIZE;
- }
- msg = TUPLE3(hp, tag, ref, result);
+ msg = (async
+ ? TUPLE3(hp, (cancel
+ ? am_cancel_timer
+ : am_read_timer), ref, result)
+ : TUPLE2(hp, ref, result));
- ERTS_HLT_ASSERT(hp + 4 == hp_end);
+ ERTS_HLT_ASSERT(hp + (async ? 4 : 3) == hp_end);
- }
- erts_queue_message(proc, &proc_locks, bp, msg, NIL);
+ erts_queue_message(proc, &proc_locks, mp, msg, NIL);
if (c_p)
proc_locks &= ~ERTS_PROC_LOCK_MAIN;
@@ -2093,16 +2077,19 @@ try_access_sched_remote_btm(ErtsSchedulerData *esdp,
}
}
else {
+ ErtsMessage *mp;
Eterm tag, res, msg;
Uint hsz;
Eterm *hp;
ErtsProcLocks proc_locks = ERTS_PROC_LOCK_MAIN;
+ ErlOffHeap *ohp;
hsz = 4;
if (time_left > (Sint64) MAX_SMALL)
hsz += ERTS_SINT64_HEAP_SIZE(time_left);
- hp = HAlloc(c_p, hsz);
+ mp = erts_alloc_message_heap(c_p, &proc_locks,
+ hsz, &hp, &ohp);
if (cancel)
tag = am_cancel_timer;
else
@@ -2117,7 +2104,7 @@ try_access_sched_remote_btm(ErtsSchedulerData *esdp,
msg = TUPLE3(hp, tag, tref, res);
- erts_queue_message(c_p, &proc_locks, NULL, msg, NIL);
+ erts_queue_message(c_p, &proc_locks, mp, msg, NIL);
proc_locks &= ~ERTS_PROC_LOCK_MAIN;
if (proc_locks)
diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c
index 5c209a4af2..f396a0a156 100644
--- a/erts/emulator/beam/erl_init.c
+++ b/erts/emulator/beam/erl_init.c
@@ -431,7 +431,7 @@ erl_first_process_otp(char* modname, void* code, unsigned size, int argc, char**
hp += 2;
args = CONS(hp, env, args);
- so.flags = SPO_SYSTEM_PROC;
+ so.flags = erts_default_spo_flags|SPO_SYSTEM_PROC;
res = erl_create_process(&parent, start_mod, am_start, args, &so);
erts_smp_proc_unlock(&parent, ERTS_PROC_LOCK_MAIN);
erts_cleanup_empty_process(&parent);
@@ -630,6 +630,7 @@ void erts_usage(void)
erts_fprintf(stderr, "-W<i|w|e> set error logger warnings mapping,\n");
erts_fprintf(stderr, " see error_logger documentation for details\n");
+ erts_fprintf(stderr, "-xohmq bool set default off_heap_message_queue flag for processes\n");
erts_fprintf(stderr, "-zdbbl size set the distribution buffer busy limit in kilobytes\n");
erts_fprintf(stderr, " valid range is [1-%d]\n", INT_MAX/1024);
erts_fprintf(stderr, "-zdntgc time set delayed node table gc in seconds\n");
@@ -2015,6 +2016,26 @@ erl_start(int argc, char **argv)
}
break;
+ case 'x': {
+ char *sub_param = argv[i]+2;
+ if (has_prefix("ohmq", sub_param)) {
+ arg = get_arg(sub_param+4, argv[i+1], &i);
+ if (sys_strcmp(arg, "true") == 0)
+ erts_default_spo_flags |= SPO_OFF_HEAP_MSGQ;
+ else if (sys_strcmp(arg, "false") == 0)
+ erts_default_spo_flags &= ~SPO_OFF_HEAP_MSGQ;
+ else {
+ erts_fprintf(stderr,
+ "Invalid off_heap_message_queue flag: %s\n", arg);
+ erts_usage();
+ }
+ } else {
+ erts_fprintf(stderr, "bad -x option %s\n", argv[i]);
+ erts_usage();
+ }
+ break;
+ }
+
case 'z': {
char *sub_param = argv[i]+2;
@@ -2068,7 +2089,8 @@ erl_start(int argc, char **argv)
"Invalid ets busy wait threshold: %s\n", arg);
erts_usage();
}
- } else {
+ }
+ else {
erts_fprintf(stderr, "bad -z option %s\n", argv[i]);
erts_usage();
}
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index e23c79d301..79739501a8 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -33,8 +33,8 @@
#include "erl_binary.h"
#include "dtrace-wrapper.h"
-ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message,
- ErlMessage,
+ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message_ref,
+ ErtsMessageRef,
ERL_MESSAGE_BUF_SZ,
ERTS_ALC_T_MSG_REF)
@@ -44,27 +44,20 @@ ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message,
#undef HARD_DEBUG
#endif
-
-
-
-#ifdef DEBUG
-static ERTS_INLINE int in_heapfrag(const Eterm* ptr, const ErlHeapFragment *bp)
+void
+init_message(void)
{
- return ((unsigned)(ptr - bp->mem) < bp->used_size);
+ init_message_ref_alloc();
}
-#endif
-
-void
-init_message(void)
+void *erts_alloc_message_ref(void)
{
- init_message_alloc();
+ return (void *) message_ref_alloc();
}
-void
-free_message(ErlMessage* mp)
+void erts_free_message_ref(void *mp)
{
- message_free(mp);
+ message_ref_free((ErtsMessageRef *) mp);
}
/* Allocate message buffer (size in words) */
@@ -74,7 +67,7 @@ new_message_buffer(Uint size)
ErlHeapFragment* bp;
bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP_FRAG,
ERTS_HEAP_FRAG_SIZE(size));
- ERTS_INIT_HEAP_FRAG(bp, size);
+ ERTS_INIT_HEAP_FRAG(bp, size, size);
return bp;
}
@@ -203,83 +196,87 @@ free_message_buffer(ErlHeapFragment* bp)
}while (bp != NULL);
}
-static ERTS_INLINE void
-link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp)
+void
+erts_cleanup_messages(ErtsMessage *msgp)
{
- if (bp) {
- /* Link the message buffer */
- bp->next = MBUF(proc);
- MBUF(proc) = bp;
- MBUF_SIZE(proc) += bp->used_size;
- FLAGS(proc) |= F_FORCE_GC;
-
- /* Move any off_heap's into the process */
- if (bp->off_heap.first != NULL) {
- struct erl_off_heap_header** next_p = &bp->off_heap.first;
- while (*next_p != NULL) {
- next_p = &((*next_p)->next);
+ ErtsMessage *mp = msgp;
+ while (mp) {
+ ErtsMessage *fmp;
+ ErlHeapFragment *bp;
+ if (is_non_value(ERL_MESSAGE_TERM(mp))) {
+ if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) {
+ bp = (ErlHeapFragment *) mp->data.dist_ext->ext_endp;
+ erts_cleanup_offheap(&bp->off_heap);
}
- *next_p = MSO(proc).first;
- MSO(proc).first = bp->off_heap.first;
- bp->off_heap.first = NULL;
- OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead);
+ if (mp->data.dist_ext)
+ erts_free_dist_ext_copy(mp->data.dist_ext);
}
+ else {
+ if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG)
+ bp = mp->data.heap_frag;
+ else {
+ bp = mp->hfrag.next;
+ erts_cleanup_offheap(&mp->hfrag.off_heap);
+ }
+ if (bp)
+ free_message_buffer(bp);
+ }
+ fmp = mp;
+ mp = mp->next;
+ erts_free_message(fmp);
}
}
-Eterm
-erts_msg_distext2heap(Process *pp,
- ErtsProcLocks *plcksp,
- ErlHeapFragment **bpp,
- Eterm *tokenp,
- ErtsDistExternal *dist_extp)
+ErtsMessage *
+erts_realloc_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_size)
{
- Eterm msg;
- Uint tok_sz = 0;
- Eterm *hp = NULL;
- ErtsHeapFactory factory;
- Sint sz;
-
- *bpp = NULL;
- sz = erts_decode_dist_ext_size(dist_extp);
- if (sz < 0)
- goto decode_error;
- if (is_not_nil(*tokenp)) {
- ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp);
- tok_sz = heap_frag->used_size;
- sz += tok_sz;
- }
- if (pp) {
- ErlOffHeap *ohp;
- hp = erts_alloc_message_heap(sz, bpp, &ohp, pp, plcksp);
- }
- else {
- *bpp = new_message_buffer(sz);
- hp = (*bpp)->mem;
- }
- erts_factory_message_init(&factory, pp, hp, *bpp);
- msg = erts_decode_dist_ext(&factory, dist_extp);
- if (is_non_value(msg))
- goto decode_error;
- if (is_not_nil(*tokenp)) {
- ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp);
- hp = erts_produce_heap(&factory, tok_sz, 0);
- *tokenp = copy_struct(*tokenp, tok_sz, &hp, factory.off_heap);
- erts_cleanup_offheap(&heap_frag->off_heap);
+ ErtsMessage *nmp = erts_realloc(ERTS_ALC_T_MSG, mp,
+ sizeof(ErtsMessage) + (sz - 1)*sizeof(Eterm));
+ if (nmp != mp) {
+ Eterm *sp = &mp->hfrag.mem[0];
+ Eterm *ep = sp + sz;
+ Sint offs = &nmp->hfrag.mem[0] - sp;
+ erts_offset_off_heap(&nmp->hfrag.off_heap, offs, sp, ep);
+ erts_offset_heap(&nmp->hfrag.mem[0], sz, offs, sp, ep);
+ if (brefs && brefs_size)
+ erts_offset_heap_ptr(brefs, brefs_size, offs, sp, ep);
}
- erts_free_dist_ext_copy(dist_extp);
- erts_factory_close(&factory);
- return msg;
- decode_error:
- if (is_not_nil(*tokenp)) {
- ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp);
- erts_cleanup_offheap(&heap_frag->off_heap);
+ nmp->hfrag.used_size = sz;
+ nmp->hfrag.alloc_size = sz;
+
+ return nmp;
+}
+
+void
+erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp)
+{
+ if (first_bp) {
+ ErlHeapFragment *bp = first_bp;
+
+ while (1) {
+ /* Move any off_heap's into the process */
+ if (bp->off_heap.first != NULL) {
+ struct erl_off_heap_header** next_p = &bp->off_heap.first;
+ while (*next_p != NULL) {
+ next_p = &((*next_p)->next);
+ }
+ *next_p = MSO(proc).first;
+ MSO(proc).first = bp->off_heap.first;
+ bp->off_heap.first = NULL;
+ OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead);
+ }
+ MBUF_SIZE(proc) += bp->used_size;
+ if (!bp->next)
+ break;
+ bp = bp->next;
+ }
+
+ /* Link the message buffer */
+ bp->next = MBUF(proc);
+ MBUF(proc) = first_bp;
}
- erts_free_dist_ext_copy(dist_extp);
- *bpp = NULL;
- return THE_NON_VALUE;
- }
+}
void
erts_queue_dist_message(Process *rcvr,
@@ -287,7 +284,7 @@ erts_queue_dist_message(Process *rcvr,
ErtsDistExternal *dist_ext,
Eterm token)
{
- ErlMessage* mp;
+ ErtsMessage* mp;
#ifdef USE_VM_PROBES
Sint tok_label = 0;
Sint tok_lastcnt = 0;
@@ -299,7 +296,17 @@ erts_queue_dist_message(Process *rcvr,
ERTS_SMP_LC_ASSERT(*rcvr_locks == erts_proc_lc_my_proc_locks(rcvr));
- mp = message_alloc();
+ mp = erts_alloc_message(0, NULL);
+ mp->data.dist_ext = dist_ext;
+
+ ERL_MESSAGE_TERM(mp) = THE_NON_VALUE;
+#ifdef USE_VM_PROBES
+ ERL_MESSAGE_DT_UTAG(mp) = NIL;
+ if (token == am_have_dt_utag)
+ ERL_MESSAGE_TOKEN(mp) = NIL;
+ else
+#endif
+ ERL_MESSAGE_TOKEN(mp) = token;
#ifdef ERTS_SMP
if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) {
@@ -318,58 +325,40 @@ erts_queue_dist_message(Process *rcvr,
if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
/* Drop message if receiver is exiting or has a pending exit ... */
- if (is_not_nil(token)) {
- ErlHeapFragment *heap_frag;
- heap_frag = erts_dist_ext_trailer(mp->data.dist_ext);
- erts_cleanup_offheap(&heap_frag->off_heap);
- }
- erts_free_dist_ext_copy(dist_ext);
- message_free(mp);
+ erts_cleanup_messages(mp);
}
else
#endif
if (IS_TRACED_FL(rcvr, F_TRACE_RECEIVE)) {
/* Ahh... need to decode it in order to trace it... */
- ErlHeapFragment *mbuf;
- Eterm msg;
if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
- message_free(mp);
- msg = erts_msg_distext2heap(rcvr, rcvr_locks, &mbuf, &token, dist_ext);
- if (is_value(msg))
+ if (!erts_decode_dist_message(rcvr, *rcvr_locks, mp, 0))
+ erts_free_message(mp);
+ else {
+ Eterm msg = ERL_MESSAGE_TERM(mp);
+ token = ERL_MESSAGE_TOKEN(mp);
#ifdef USE_VM_PROBES
- if (DTRACE_ENABLED(message_queued)) {
- DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
-
- dtrace_proc_str(rcvr, receiver_name);
- if (token != NIL && token != am_have_dt_utag) {
- tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
- tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
- tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
- }
- DTRACE6(message_queued,
- receiver_name, size_object(msg), rcvr->msg.len,
- tok_label, tok_lastcnt, tok_serial);
- }
+ if (DTRACE_ENABLED(message_queued)) {
+ DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
+
+ dtrace_proc_str(rcvr, receiver_name);
+ if (token != NIL && token != am_have_dt_utag) {
+ tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
+ tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
+ tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
+ }
+ DTRACE6(message_queued,
+ receiver_name, size_object(msg), rcvr->msg.len,
+ tok_label, tok_lastcnt, tok_serial);
+ }
#endif
- erts_queue_message(rcvr, rcvr_locks, mbuf, msg, token);
+ erts_queue_message(rcvr, rcvr_locks, mp, msg, token);
+ }
}
else {
/* Enqueue message on external format */
- ERL_MESSAGE_TERM(mp) = THE_NON_VALUE;
-#ifdef USE_VM_PROBES
- ERL_MESSAGE_DT_UTAG(mp) = NIL;
- if (token == am_have_dt_utag) {
- ERL_MESSAGE_TOKEN(mp) = NIL;
- } else {
-#endif
- ERL_MESSAGE_TOKEN(mp) = token;
-#ifdef USE_VM_PROBES
- }
-#endif
- mp->next = NULL;
-
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(message_queued)) {
DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
@@ -388,7 +377,7 @@ erts_queue_dist_message(Process *rcvr,
tok_label, tok_lastcnt, tok_serial);
}
#endif
- mp->data.dist_ext = dist_ext;
+
LINK_MESSAGE(rcvr, mp);
if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
@@ -408,9 +397,9 @@ erts_queue_dist_message(Process *rcvr,
static Sint
queue_message(Process *c_p,
Process* receiver,
- ErtsProcLocks *receiver_locks,
erts_aint32_t *receiver_state,
- ErlHeapFragment* bp,
+ ErtsProcLocks *receiver_locks,
+ ErtsMessage* mp,
Eterm message,
Eterm seq_trace_token
#ifdef USE_VM_PROBES
@@ -419,31 +408,24 @@ queue_message(Process *c_p,
)
{
Sint res;
- ErlMessage* mp;
int locked_msgq = 0;
- erts_aint_t state;
-
-#ifndef ERTS_SMP
- ASSERT(bp != NULL || receiver->mbuf == NULL);
-#endif
+ erts_aint32_t state;
ERTS_SMP_LC_ASSERT(*receiver_locks == erts_proc_lc_my_proc_locks(receiver));
- mp = message_alloc();
-
- if (receiver_state)
- state = *receiver_state;
- else
- state = erts_smp_atomic32_read_acqb(&receiver->state);
-
#ifdef ERTS_SMP
- if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))
- goto exiting;
-
if (!(*receiver_locks & ERTS_PROC_LOCK_MSGQ)) {
if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) {
ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ;
+
+ if (receiver_state)
+ state = *receiver_state;
+ else
+ state = erts_smp_atomic32_read_nob(&receiver->state);
+ if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))
+ goto exiting;
+
if (*receiver_locks & ERTS_PROC_LOCK_STATUS) {
erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS);
need_locks |= ERTS_PROC_LOCK_STATUS;
@@ -451,13 +433,12 @@ queue_message(Process *c_p,
erts_smp_proc_lock(receiver, need_locks);
}
locked_msgq = 1;
- state = erts_smp_atomic32_read_nob(&receiver->state);
- if (receiver_state)
- *receiver_state = state;
}
#endif
+ state = erts_smp_atomic32_read_nob(&receiver->state);
+
if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) {
#ifdef ERTS_SMP
exiting:
@@ -465,9 +446,7 @@ queue_message(Process *c_p,
/* Drop message if receiver is exiting or has a pending exit... */
if (locked_msgq)
erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
- if (bp)
- free_message_buffer(bp);
- message_free(mp);
+ erts_cleanup_messages(mp);
return 0;
}
@@ -476,13 +455,9 @@ queue_message(Process *c_p,
#ifdef USE_VM_PROBES
ERL_MESSAGE_DT_UTAG(mp) = dt_utag;
#endif
- mp->next = NULL;
- mp->data.heap_frag = bp;
-#ifndef ERTS_SMP
res = receiver->msg.len;
-#else
- res = receiver->msg_inq.len;
+#ifdef ERTS_SMP
if (*receiver_locks & ERTS_PROC_LOCK_MAIN) {
/*
* We move 'in queue' to 'private queue' and place
@@ -492,7 +467,7 @@ queue_message(Process *c_p,
* we don't need to include the 'in queue' in
* the root set when garbage collecting.
*/
- res += receiver->msg.len;
+ res += receiver->msg_inq.len;
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);
LINK_MESSAGE_PRIVQ(receiver, mp);
}
@@ -544,19 +519,19 @@ queue_message(Process *c_p,
void
#ifdef USE_VM_PROBES
erts_queue_message_probe(Process* receiver, ErtsProcLocks *receiver_locks,
- ErlHeapFragment* bp,
+ ErtsMessage* mp,
Eterm message, Eterm seq_trace_token, Eterm dt_utag)
#else
erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks,
- ErlHeapFragment* bp,
+ ErtsMessage* mp,
Eterm message, Eterm seq_trace_token)
#endif
{
queue_message(NULL,
receiver,
- receiver_locks,
NULL,
- bp,
+ receiver_locks,
+ mp,
message,
seq_trace_token
#ifdef USE_VM_PROBES
@@ -565,246 +540,8 @@ erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks,
);
}
-void
-erts_link_mbuf_to_proc(struct process *proc, ErlHeapFragment *bp)
-{
- Eterm* htop = HEAP_TOP(proc);
-
- link_mbuf_to_proc(proc, bp);
- if (htop < HEAP_LIMIT(proc)) {
- *htop = make_pos_bignum_header(HEAP_LIMIT(proc)-htop-1);
- HEAP_TOP(proc) = HEAP_LIMIT(proc);
- }
-}
-
-/*
- * Moves content of message buffer attached to a message into a heap.
- * The message buffer is deallocated.
- */
-void
-erts_move_msg_mbuf_to_heap(Eterm** hpp, ErlOffHeap* off_heap, ErlMessage *msg)
-{
- struct erl_off_heap_header* oh;
- Eterm term, token, *fhp, *hp;
- Sint offs;
- Uint sz;
- ErlHeapFragment *bp;
-#ifdef USE_VM_PROBES
- Eterm utag;
-#endif
-
-#ifdef HARD_DEBUG
- struct erl_off_heap_header* dbg_oh_start = off_heap->first;
- Eterm dbg_term, dbg_token;
- ErlHeapFragment *dbg_bp;
- Uint *dbg_hp, *dbg_thp_start;
- Uint dbg_term_sz, dbg_token_sz;
-#ifdef USE_VM_PROBES
- Eterm dbg_utag;
- Uint dbg_utag_sz;
-#endif
-#endif
-
- bp = msg->data.heap_frag;
- term = ERL_MESSAGE_TERM(msg);
- token = ERL_MESSAGE_TOKEN(msg);
-#ifdef USE_VM_PROBES
- utag = ERL_MESSAGE_DT_UTAG(msg);
-#endif
- if (!bp) {
-#ifdef USE_VM_PROBES
- ASSERT(is_immed(term) && is_immed(token) && is_immed(utag));
-#else
- ASSERT(is_immed(term) && is_immed(token));
-#endif
- return;
- }
-
-#ifdef HARD_DEBUG
- dbg_term_sz = size_object(term);
- dbg_token_sz = size_object(token);
- dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz);
-#ifdef USE_VM_PROBES
- dbg_utag_sz = size_object(utag);
- dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz + dbg_utag_sz );
-#endif
- /*ASSERT(dbg_term_sz + dbg_token_sz == erts_msg_used_frag_sz(msg));
- Copied size may be smaller due to removed SubBins's or garbage.
- Copied size may be larger due to duplicated shared terms.
- */
- dbg_hp = dbg_bp->mem;
- dbg_term = copy_struct(term, dbg_term_sz, &dbg_hp, &dbg_bp->off_heap);
- dbg_token = copy_struct(token, dbg_token_sz, &dbg_hp, &dbg_bp->off_heap);
-#ifdef USE_VM_PROBES
- dbg_utag = copy_struct(utag, dbg_utag_sz, &dbg_hp, &dbg_bp->off_heap);
-#endif
- dbg_thp_start = *hpp;
-#endif
-
- if (bp->next != NULL) {
- erts_move_multi_frags(hpp, off_heap, bp, msg->m,
-#ifdef USE_VM_PROBES
- 3,
-#else
- 2,
-#endif
- 0);
- goto copy_done;
- }
-
- OH_OVERHEAD(off_heap, bp->off_heap.overhead);
- sz = bp->used_size;
-
- ASSERT(is_immed(term) || in_heapfrag(ptr_val(term),bp));
- ASSERT(is_immed(token) || in_heapfrag(ptr_val(token),bp));
-
- fhp = bp->mem;
- hp = *hpp;
- offs = hp - fhp;
-
- oh = NULL;
- while (sz--) {
- Uint cpy_sz;
- Eterm val = *fhp++;
-
- switch (primary_tag(val)) {
- case TAG_PRIMARY_IMMED1:
- *hp++ = val;
- break;
- case TAG_PRIMARY_LIST:
- case TAG_PRIMARY_BOXED:
- ASSERT(in_heapfrag(ptr_val(val), bp));
- *hp++ = offset_ptr(val, offs);
- break;
- case TAG_PRIMARY_HEADER:
- *hp++ = val;
- switch (val & _HEADER_SUBTAG_MASK) {
- case ARITYVAL_SUBTAG:
- break;
- case REFC_BINARY_SUBTAG:
- case FUN_SUBTAG:
- case EXTERNAL_PID_SUBTAG:
- case EXTERNAL_PORT_SUBTAG:
- case EXTERNAL_REF_SUBTAG:
- oh = (struct erl_off_heap_header*) (hp-1);
- cpy_sz = thing_arityval(val);
- goto cpy_words;
- default:
- cpy_sz = header_arity(val);
-
- cpy_words:
- ASSERT(sz >= cpy_sz);
- sz -= cpy_sz;
- while (cpy_sz >= 8) {
- cpy_sz -= 8;
- *hp++ = *fhp++;
- *hp++ = *fhp++;
- *hp++ = *fhp++;
- *hp++ = *fhp++;
- *hp++ = *fhp++;
- *hp++ = *fhp++;
- *hp++ = *fhp++;
- *hp++ = *fhp++;
- }
- switch (cpy_sz) {
- case 7: *hp++ = *fhp++;
- case 6: *hp++ = *fhp++;
- case 5: *hp++ = *fhp++;
- case 4: *hp++ = *fhp++;
- case 3: *hp++ = *fhp++;
- case 2: *hp++ = *fhp++;
- case 1: *hp++ = *fhp++;
- default: break;
- }
- if (oh) {
- /* Add to offheap list */
- oh->next = off_heap->first;
- off_heap->first = oh;
- ASSERT(*hpp <= (Eterm*)oh);
- ASSERT(hp > (Eterm*)oh);
- oh = NULL;
- }
- break;
- }
- break;
- }
- }
-
- ASSERT(bp->used_size == hp - *hpp);
- *hpp = hp;
-
- if (is_not_immed(token)) {
- ASSERT(in_heapfrag(ptr_val(token), bp));
- ERL_MESSAGE_TOKEN(msg) = offset_ptr(token, offs);
-#ifdef HARD_DEBUG
- ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TOKEN(msg)));
- ASSERT(hp > ptr_val(ERL_MESSAGE_TOKEN(msg)));
-#endif
- }
-
- if (is_not_immed(term)) {
- ASSERT(in_heapfrag(ptr_val(term),bp));
- ERL_MESSAGE_TERM(msg) = offset_ptr(term, offs);
-#ifdef HARD_DEBUG
- ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TERM(msg)));
- ASSERT(hp > ptr_val(ERL_MESSAGE_TERM(msg)));
-#endif
- }
-#ifdef USE_VM_PROBES
- if (is_not_immed(utag)) {
- ASSERT(in_heapfrag(ptr_val(utag), bp));
- ERL_MESSAGE_DT_UTAG(msg) = offset_ptr(utag, offs);
-#ifdef HARD_DEBUG
- ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_DT_UTAG(msg)));
- ASSERT(hp > ptr_val(ERL_MESSAGE_DT_UTAG(msg)));
-#endif
- }
-#endif
-
-copy_done:
-
-#ifdef HARD_DEBUG
- {
- int i, j;
- ErlHeapFragment* frag;
- {
- struct erl_off_heap_header* dbg_oh = off_heap->first;
- i = j = 0;
- while (dbg_oh != dbg_oh_start) {
- dbg_oh = dbg_oh->next;
- i++;
- }
- for (frag=bp; frag; frag=frag->next) {
- dbg_oh = frag->off_heap.first;
- while (dbg_oh) {
- dbg_oh = dbg_oh->next;
- j++;
- }
- }
- ASSERT(i == j);
- }
- }
-#endif
-
-
- bp->off_heap.first = NULL;
- free_message_buffer(bp);
- msg->data.heap_frag = NULL;
-
-#ifdef HARD_DEBUG
- ASSERT(eq(ERL_MESSAGE_TERM(msg), dbg_term));
- ASSERT(eq(ERL_MESSAGE_TOKEN(msg), dbg_token));
-#ifdef USE_VM_PROBES
- ASSERT(eq(ERL_MESSAGE_DT_UTAG(msg), dbg_utag));
-#endif
- free_message_buffer(dbg_bp);
-#endif
-
-}
-
-
Uint
-erts_msg_attached_data_size_aux(ErlMessage *msg)
+erts_msg_attached_data_size_aux(ErtsMessage *msg)
{
Sint sz;
ASSERT(is_non_value(ERL_MESSAGE_TERM(msg)));
@@ -833,29 +570,72 @@ erts_msg_attached_data_size_aux(ErlMessage *msg)
return sz;
}
-void
-erts_move_msg_attached_data_to_heap(ErtsHeapFactory* factory,
- ErlMessage *msg)
+ErtsMessage *
+erts_try_alloc_message_on_heap(Process *pp,
+ erts_aint32_t *psp,
+ ErtsProcLocks *plp,
+ Uint sz,
+ Eterm **hpp,
+ ErlOffHeap **ohpp,
+ int *on_heap_p)
{
- if (is_value(ERL_MESSAGE_TERM(msg)))
- erts_move_msg_mbuf_to_heap(&factory->hp, factory->off_heap, msg);
- else if (msg->data.dist_ext) {
- ASSERT(msg->data.dist_ext->heap_size >= 0);
- if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) {
- ErlHeapFragment *heap_frag;
- heap_frag = erts_dist_ext_trailer(msg->data.dist_ext);
- ERL_MESSAGE_TOKEN(msg) = copy_struct(ERL_MESSAGE_TOKEN(msg),
- heap_frag->used_size,
- &factory->hp,
- factory->off_heap);
- erts_cleanup_offheap(&heap_frag->off_heap);
+#ifdef ERTS_SMP
+ int locked_main = 0;
+#endif
+ ErtsMessage *mp;
+
+ ASSERT(!(*psp & ERTS_PSFLG_OFF_HEAP_MSGQ));
+
+ if (
+#if defined(ERTS_SMP)
+ *plp & ERTS_PROC_LOCK_MAIN
+#else
+ 1
+#endif
+ ) {
+#ifdef ERTS_SMP
+ try_on_heap:
+#endif
+ if ((*psp & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))
+ || (pp->flags & F_DISABLE_GC)
+ || HEAP_LIMIT(pp) - HEAP_TOP(pp) <= sz) {
+ /*
+ * The heap is either potentially in an inconsistent
+ * state, or not large enough.
+ */
+#ifdef ERTS_SMP
+ if (locked_main) {
+ *plp &= ~ERTS_PROC_LOCK_MAIN;
+ erts_smp_proc_unlock(pp, ERTS_PROC_LOCK_MAIN);
+ }
+#endif
+ goto in_message_fragment;
}
- ERL_MESSAGE_TERM(msg) = erts_decode_dist_ext(factory,
- msg->data.dist_ext);
- erts_free_dist_ext_copy(msg->data.dist_ext);
- msg->data.dist_ext = NULL;
+
+ *hpp = HEAP_TOP(pp);
+ HEAP_TOP(pp) = *hpp + sz;
+ *ohpp = &MSO(pp);
+ mp = erts_alloc_message(0, NULL);
+ mp->data.attached = NULL;
+ *on_heap_p = !0;
+ }
+#ifdef ERTS_SMP
+ else if (erts_smp_proc_trylock(pp, ERTS_PROC_LOCK_MAIN) == 0) {
+ locked_main = 1;
+ *psp = erts_smp_atomic32_read_nob(&pp->state);
+ *plp |= ERTS_PROC_LOCK_MAIN;
+ goto try_on_heap;
+ }
+#endif
+ else {
+ in_message_fragment:
+
+ mp = erts_alloc_message(sz, hpp);
+ *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap;
+ *on_heap_p = 0;
}
- /* else: bad external detected when calculating size */
+
+ return mp;
}
/*
@@ -870,7 +650,8 @@ erts_send_message(Process* sender,
unsigned flags)
{
Uint msize;
- ErlHeapFragment* bp = NULL;
+ ErtsMessage* mp;
+ ErlOffHeap *ohp;
Eterm token = NIL;
Sint res = 0;
#ifdef USE_VM_PROBES
@@ -879,27 +660,31 @@ erts_send_message(Process* sender,
Sint tok_label = 0;
Sint tok_lastcnt = 0;
Sint tok_serial = 0;
+ Eterm utag = NIL;
#endif
+ erts_aint32_t receiver_state;
BM_STOP_TIMER(system);
BM_MESSAGE(message,sender,receiver);
BM_START_TIMER(send);
#ifdef USE_VM_PROBES
*sender_name = *receiver_name = '\0';
- if (DTRACE_ENABLED(message_send)) {
+ if (DTRACE_ENABLED(message_send)) {
erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
"%T", sender->common.id);
erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)),
"%T", receiver->common.id);
}
#endif
+
+ receiver_state = erts_smp_atomic32_read_nob(&receiver->state);
+
if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) {
Eterm* hp;
Eterm stoken = SEQ_TRACE_TOKEN(sender);
Uint seq_trace_size = 0;
#ifdef USE_VM_PROBES
Uint dt_utag_size = 0;
- Eterm utag = NIL;
#endif
BM_SWAP_TIMER(send,size);
@@ -923,23 +708,32 @@ erts_send_message(Process* sender,
}
#endif
- bp = new_message_buffer(msize + seq_trace_size
+ mp = erts_alloc_message_heap_state(receiver,
+ &receiver_state,
+ receiver_locks,
+ (msize
#ifdef USE_VM_PROBES
- + dt_utag_size
+ + dt_utag_size
#endif
- );
- hp = bp->mem;
+ + seq_trace_size),
+ &hp,
+ &ohp);
BM_SWAP_TIMER(send,copy);
- token = copy_struct(stoken,
- seq_trace_size,
- &hp,
- &bp->off_heap);
+ if (is_immed(stoken))
+ token = stoken;
+ else
+ token = copy_struct(stoken, seq_trace_size, &hp, ohp);
+
+ if (is_not_immed(message))
+ message = copy_struct(message, msize, &hp, ohp);
- message = copy_struct(message, msize, &hp, &bp->off_heap);
#ifdef USE_VM_PROBES
if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
- utag = copy_struct(DT_UTAG(sender), dt_utag_size, &hp, &bp->off_heap);
+ if (is_immed(DT_UTAG(sender)))
+ utag = DT_UTAG(sender);
+ else
+ utag = copy_struct(DT_UTAG(sender), dt_utag_size, ohp);
#ifdef DTRACE_TAG_HARDDEBUG
erts_fprintf(stderr,
"Dtrace -> (%T) Spreading tag (%T) with "
@@ -961,101 +755,49 @@ erts_send_message(Process* sender,
msize, tok_label, tok_lastcnt, tok_serial);
}
#endif
- res = queue_message(NULL,
- receiver,
- receiver_locks,
- NULL,
- bp,
- message,
- token
-#ifdef USE_VM_PROBES
- , utag
-#endif
- );
- BM_SWAP_TIMER(send,system);
- } else if (sender == receiver) {
- /* Drop message if receiver has a pending exit ... */
-#ifdef ERTS_SMP
- ErtsProcLocks need_locks = (~(*receiver_locks)
- & (ERTS_PROC_LOCK_MSGQ
- | ERTS_PROC_LOCK_STATUS));
- if (need_locks) {
- *receiver_locks |= need_locks;
- if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) {
- if (need_locks == ERTS_PROC_LOCK_MSGQ) {
- erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS);
- need_locks = ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS;
- }
- erts_smp_proc_lock(receiver, need_locks);
- }
- }
- if (!ERTS_PROC_PENDING_EXIT(receiver))
-#endif
- {
- ErlMessage* mp = message_alloc();
-
- DTRACE6(message_send, sender_name, receiver_name,
- size_object(message), tok_label, tok_lastcnt, tok_serial);
- mp->data.attached = NULL;
- ERL_MESSAGE_TERM(mp) = message;
- ERL_MESSAGE_TOKEN(mp) = NIL;
-#ifdef USE_VM_PROBES
- ERL_MESSAGE_DT_UTAG(mp) = NIL;
-#endif
- mp->next = NULL;
- /*
- * We move 'in queue' to 'private queue' and place
- * message at the end of 'private queue' in order
- * to ensure that the 'in queue' doesn't contain
- * references into the heap. By ensuring this,
- * we don't need to include the 'in queue' in
- * the root set when garbage collecting.
- */
-
- ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);
- LINK_MESSAGE_PRIVQ(receiver, mp);
-
- res = receiver->msg.len;
-
- if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
- trace_receive(receiver, message);
- }
- }
- BM_SWAP_TIMER(send,system);
} else {
- ErlOffHeap *ohp;
Eterm *hp;
- erts_aint32_t state;
- BM_SWAP_TIMER(send,size);
- msize = size_object(message);
- BM_SWAP_TIMER(size,send);
- hp = erts_alloc_message_heap_state(msize,
- &bp,
- &ohp,
- receiver,
- receiver_locks,
- &state);
- BM_SWAP_TIMER(send,copy);
- message = copy_struct(message, msize, &hp, ohp);
- BM_MESSAGE_COPIED(msz);
- BM_SWAP_TIMER(copy,send);
+ if (receiver == sender && !(receiver_state & ERTS_PSFLG_OFF_HEAP_MSGQ)) {
+ mp = erts_alloc_message(0, NULL);
+ msize = 0;
+ }
+ else {
+ BM_SWAP_TIMER(send,size);
+ msize = size_object(message);
+ BM_SWAP_TIMER(size,send);
+
+ mp = erts_alloc_message_heap_state(receiver,
+ &receiver_state,
+ receiver_locks,
+ msize,
+ &hp,
+ &ohp);
+ BM_SWAP_TIMER(send,copy);
+ if (is_not_immed(message))
+ message = copy_struct(message, msize, &hp, ohp);
+ BM_MESSAGE_COPIED(msz);
+ BM_SWAP_TIMER(copy,send);
+ }
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
- res = queue_message(sender,
- receiver,
- receiver_locks,
- &state,
- bp,
- message,
- token
+ }
+
+ res = queue_message(sender,
+ receiver,
+ &receiver_state,
+ receiver_locks,
+ mp,
+ message,
+ token
#ifdef USE_VM_PROBES
- , NIL
+ , utag
#endif
- );
- BM_SWAP_TIMER(send,system);
- }
- return res;
+ );
+
+ BM_SWAP_TIMER(send,system);
+
+ return res;
}
/*
@@ -1075,7 +817,8 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp,
Uint sz_from;
Eterm* hp;
Eterm temptoken;
- ErlHeapFragment* bp = NULL;
+ ErtsMessage* mp;
+ ErlOffHeap *ohp;
if (token != NIL
#ifdef USE_VM_PROBES
@@ -1087,36 +830,483 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp,
sz_reason = size_object(reason);
sz_token = size_object(token);
sz_from = size_object(from);
- bp = new_message_buffer(sz_reason + sz_from + sz_token + 4);
- hp = bp->mem;
- mess = copy_struct(reason, sz_reason, &hp, &bp->off_heap);
- from_copy = copy_struct(from, sz_from, &hp, &bp->off_heap);
+ mp = erts_alloc_message_heap(to, to_locksp,
+ sz_reason + sz_from + sz_token + 4,
+ &hp, &ohp);
+ mess = copy_struct(reason, sz_reason, &hp, ohp);
+ from_copy = copy_struct(from, sz_from, &hp, ohp);
save = TUPLE3(hp, am_EXIT, from_copy, mess);
hp += 4;
/* the trace token must in this case be updated by the caller */
seq_trace_output(token, save, SEQ_TRACE_SEND, to->common.id, NULL);
- temptoken = copy_struct(token, sz_token, &hp, &bp->off_heap);
- erts_queue_message(to, to_locksp, bp, save, temptoken);
+ temptoken = copy_struct(token, sz_token, &hp, ohp);
+ erts_queue_message(to, to_locksp, mp, save, temptoken);
} else {
- ErlOffHeap *ohp;
sz_reason = size_object(reason);
sz_from = IS_CONST(from) ? 0 : size_object(from);
- hp = erts_alloc_message_heap(sz_reason+sz_from+4,
- &bp,
- &ohp,
- to,
- to_locksp);
+ mp = erts_alloc_message_heap(to, to_locksp,
+ sz_reason+sz_from+4, &hp, &ohp);
mess = copy_struct(reason, sz_reason, &hp, ohp);
from_copy = (IS_CONST(from)
? from
: copy_struct(from, sz_from, &hp, ohp));
save = TUPLE3(hp, am_EXIT, from_copy, mess);
- erts_queue_message(to, to_locksp, bp, save, NIL);
+ erts_queue_message(to, to_locksp, mp, save, NIL);
}
}
+void erts_save_message_in_proc(Process *p, ErtsMessage *msgp)
+{
+ ErlHeapFragment *hfp;
+
+ if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG)
+ hfp = &msgp->hfrag;
+ else if (msgp->data.attached) {
+ hfp = msgp->data.heap_frag;
+ }
+ else {
+ erts_free_message(msgp);
+ return; /* Nothing to save */
+ }
+
+ while (1) {
+ struct erl_off_heap_header *ohhp = hfp->off_heap.first;
+ if (ohhp) {
+ for ( ; ohhp->next; ohhp = ohhp->next)
+ ;
+ ohhp->next = p->off_heap.first;
+ p->off_heap.first = hfp->off_heap.first;
+ hfp->off_heap.first = NULL;
+ }
+ p->off_heap.overhead += hfp->off_heap.overhead;
+ hfp->off_heap.overhead = 0;
+ p->mbuf_sz += hfp->used_size;
+
+ if (!hfp->next)
+ break;
+ hfp = hfp->next;
+ }
+
+ msgp->next = p->msg_frag;
+ p->msg_frag = msgp;
+}
+
+Sint
+erts_move_messages_off_heap(Process *c_p)
+{
+ int reds = 1;
+ /*
+ * Move all messages off heap. This *only* occurs when the
+ * process had off heap message disabled and just enabled
+ * it...
+ */
+ ErtsMessage *mp;
+
+ reds += c_p->msg.len / 10;
+
+ ASSERT(erts_smp_atomic32_read_nob(&c_p->state)
+ & ERTS_PSFLG_OFF_HEAP_MSGQ);
+ ASSERT(c_p->flags & F_OFF_HEAP_MSGQ_CHNG);
+
+ for (mp = c_p->msg.first; mp; mp = mp->next) {
+ Uint msg_sz, token_sz;
+#ifdef USE_VM_PROBES
+ Uint utag_sz;
+#endif
+ Eterm *hp;
+ ErlHeapFragment *hfrag;
+
+ if (mp->data.attached)
+ continue;
+
+ if (is_immed(ERL_MESSAGE_TERM(mp))
+#ifdef USE_VM_PROBES
+ && is_immed(ERL_MESSAGE_DT_UTAG(mp))
+#endif
+ && is_not_immed(ERL_MESSAGE_TOKEN(mp)))
+ continue;
+
+ /*
+ * The message refers into the heap. Copy the message
+ * from the heap into a heap fragment and attach
+ * it to the message...
+ */
+ msg_sz = size_object(ERL_MESSAGE_TERM(mp));
+#ifdef USE_VM_PROBES
+ utag_sz = size_object(ERL_MESSAGE_DT_UTAG(mp));
+#endif
+ token_sz = size_object(ERL_MESSAGE_TOKEN(mp));
+
+ hfrag = new_message_buffer(msg_sz
+#ifdef USE_VM_PROBES
+ + utag_sz
+#endif
+ + token_sz);
+ hp = hfrag->mem;
+ if (is_not_immed(ERL_MESSAGE_TERM(mp)))
+ ERL_MESSAGE_TERM(mp) = copy_struct(ERL_MESSAGE_TERM(mp),
+ msg_sz, &hp,
+ &hfrag->off_heap);
+ if (is_not_immed(ERL_MESSAGE_TOKEN(mp)))
+ ERL_MESSAGE_TOKEN(mp) = copy_struct(ERL_MESSAGE_TOKEN(mp),
+ token_sz, &hp,
+ &hfrag->off_heap);
+#ifdef USE_VM_PROBES
+ if (is_not_immed(ERL_MESSAGE_DT_UTAG(mp)))
+ ERL_MESSAGE_DT_UTAG(mp) = copy_struct(ERL_MESSAGE_DT_UTAG(mp),
+ utag_sz, &hp,
+ &hfrag->off_heap);
+#endif
+ mp->data.heap_frag = hfrag;
+ reds += 1;
+ }
+
+ return reds;
+}
+
+Sint
+erts_complete_off_heap_message_queue_change(Process *c_p)
+{
+ int reds = 1;
+
+ ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
+ ASSERT(c_p->flags & F_OFF_HEAP_MSGQ_CHNG);
+ ASSERT(erts_smp_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ);
+
+ /*
+ * This job was first initiated when the process changed
+ * "off heap message queue" state from false to true. Since
+ * then ERTS_PSFLG_OFF_HEAP_MSGQ has been set. However, the
+ * state change might have been changed again (multiple times)
+ * since then. Check users last requested state (the flag
+ * F_OFF_HEAP_MSGQ), and make the state consistent with that.
+ */
+
+ if (!(c_p->flags & F_OFF_HEAP_MSGQ))
+ erts_smp_atomic32_read_band_nob(&c_p->state,
+ ~ERTS_PSFLG_OFF_HEAP_MSGQ);
+ else {
+ reds += 2;
+ erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
+ ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p);
+ erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
+ reds += erts_move_messages_off_heap(c_p);
+ }
+ c_p->flags &= ~F_OFF_HEAP_MSGQ_CHNG;
+ return reds;
+}
+
+typedef struct {
+ Eterm pid;
+ ErtsThrPrgrLaterOp lop;
+} ErtsChangeOffHeapMessageQueue;
+
+static void
+change_off_heap_msgq(void *vcohmq)
+{
+ ErtsChangeOffHeapMessageQueue *cohmq;
+ /*
+ * Now we've waited thread progress which ensures that all
+ * messages to the process are enqueued off heap. Schedule
+ * completion of this change as a system task on the process
+ * itself. This in order to avoid lock contention on its
+ * main lock. We will be called in
+ * erts_complete_off_heap_message_queue_change() (above) when
+ * the system task has been selected for execution.
+ */
+ cohmq = (ErtsChangeOffHeapMessageQueue *) vcohmq;
+ erts_schedule_complete_off_heap_message_queue_change(cohmq->pid);
+ erts_free(ERTS_ALC_T_MSGQ_CHNG, vcohmq);
+}
+
+Eterm
+erts_change_off_heap_message_queue_state(Process *c_p, int enable)
+{
+
+#ifdef DEBUG
+ if (c_p->flags & F_OFF_HEAP_MSGQ) {
+ ASSERT(erts_smp_atomic32_read_nob(&c_p->state)
+ & ERTS_PSFLG_OFF_HEAP_MSGQ);
+ }
+ else {
+ if (c_p->flags & F_OFF_HEAP_MSGQ_CHNG) {
+ ASSERT(erts_smp_atomic32_read_nob(&c_p->state)
+ & ERTS_PSFLG_OFF_HEAP_MSGQ);
+ }
+ else {
+ ASSERT(!(erts_smp_atomic32_read_nob(&c_p->state)
+ & ERTS_PSFLG_OFF_HEAP_MSGQ));
+ }
+ }
+#endif
+
+ if (c_p->flags & F_OFF_HEAP_MSGQ) {
+ /* Off heap message queue is enabled */
+
+ if (!enable) {
+ c_p->flags &= ~F_OFF_HEAP_MSGQ;
+ /*
+ * We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ
+ * if a change is ongoing. It will be adjusted when the
+ * change completes...
+ */
+ if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) {
+ /* Safe to clear ERTS_PSFLG_OFF_HEAP_MSGQ... */
+ erts_smp_atomic32_read_band_nob(&c_p->state,
+ ~ERTS_PSFLG_OFF_HEAP_MSGQ);
+ }
+ }
+
+ return am_true; /* Old state */
+ }
+
+ /* Off heap message queue is disabled */
+
+ if (enable) {
+ c_p->flags |= F_OFF_HEAP_MSGQ;
+ /*
+ * We do not have to schedule a change if
+ * we have an ongoing change...
+ */
+ if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) {
+ ErtsChangeOffHeapMessageQueue *cohmq;
+ /*
+ * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait
+ * thread progress before completing the change in
+ * order to ensure that all senders observe that
+ * messages should be passed off heap. When the
+ * change has completed, GC does not need to inspect
+ * the message queue at all.
+ */
+ erts_smp_atomic32_read_bor_nob(&c_p->state,
+ ERTS_PSFLG_OFF_HEAP_MSGQ);
+ c_p->flags |= F_OFF_HEAP_MSGQ_CHNG;
+ cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG,
+ sizeof(ErtsChangeOffHeapMessageQueue));
+ cohmq->pid = c_p->common.id;
+ erts_schedule_thr_prgr_later_op(change_off_heap_msgq,
+ (void *) cohmq,
+ &cohmq->lop);
+ }
+ }
+
+ return am_false; /* Old state */
+}
+
+int
+erts_decode_dist_message(Process *proc, ErtsProcLocks proc_locks,
+ ErtsMessage *msgp, int force_off_heap)
+{
+ ErtsHeapFactory factory;
+ Eterm msg;
+ ErlHeapFragment *bp;
+ Sint need;
+ int decode_in_heap_frag;
+
+ decode_in_heap_frag = (force_off_heap
+ || !(proc_locks & ERTS_PROC_LOCK_MAIN)
+ || (proc->flags & F_OFF_HEAP_MSGQ));
+
+ if (msgp->data.dist_ext->heap_size >= 0)
+ need = msgp->data.dist_ext->heap_size;
+ else {
+ need = erts_decode_dist_ext_size(msgp->data.dist_ext);
+ if (need < 0) {
+ /* bad msg; remove it... */
+ if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
+ bp = erts_dist_ext_trailer(msgp->data.dist_ext);
+ erts_cleanup_offheap(&bp->off_heap);
+ }
+ erts_free_dist_ext_copy(msgp->data.dist_ext);
+ msgp->data.dist_ext = NULL;
+ return 0;
+ }
+
+ msgp->data.dist_ext->heap_size = need;
+ }
+
+ if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
+ bp = erts_dist_ext_trailer(msgp->data.dist_ext);
+ need += bp->used_size;
+ }
+
+ if (decode_in_heap_frag)
+ erts_factory_heap_frag_init(&factory, new_message_buffer(need));
+ else
+ erts_factory_proc_prealloc_init(&factory, proc, need);
+
+ ASSERT(msgp->data.dist_ext->heap_size >= 0);
+ if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
+ ErlHeapFragment *heap_frag;
+ heap_frag = erts_dist_ext_trailer(msgp->data.dist_ext);
+ ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp),
+ heap_frag->used_size,
+ &factory.hp,
+ factory.off_heap);
+ erts_cleanup_offheap(&heap_frag->off_heap);
+ }
+
+ msg = erts_decode_dist_ext(&factory, msgp->data.dist_ext);
+ ERL_MESSAGE_TERM(msgp) = msg;
+ erts_free_dist_ext_copy(msgp->data.dist_ext);
+ msgp->data.attached = NULL;
+
+ if (is_non_value(msg)) {
+ erts_factory_undo(&factory);
+ return 0;
+ }
+
+ erts_factory_trim_and_close(&factory, msgp->m,
+ ERL_MESSAGE_REF_ARRAY_SZ);
+
+ ASSERT(!msgp->data.heap_frag);
+
+ if (decode_in_heap_frag)
+ msgp->data.heap_frag = factory.heap_frags;
+
+ return 1;
+}
+
+/*
+ * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0 will move off heap messages
+ * into the heap of the inspected process if off_heap_message_queue
+ * is false when process_info(_, messages) is called. That is, the
+ * following GC will have more data in the rootset compared to the
+ * scenario when process_info(_, messages) had not been called.
+ *
+ * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS != 0 will keep off heap messages
+ * off heap when process_info(_, messages) is called regardless of
+ * the off_heap_message_queue setting of the process. That is, it
+ * will change the following execution of the process as little as
+ * possible.
+ */
+#define ERTS_INSPECT_MSGQ_KEEP_OH_MSGS 1
+
+Uint
+erts_prep_msgq_for_inspection(Process *c_p, Process *rp,
+ ErtsProcLocks rp_locks, ErtsMessageInfo *mip)
+{
+ Uint tot_heap_size;
+ ErtsMessage* mp;
+ Sint i;
+ int self_on_heap;
+
+ /*
+ * Prepare the message queue for inspection
+ * by process_info().
+ *
+ *
+ * - Decode all messages on external format
+ * - Remove all corrupt dist messages from queue
+ * - Save pointer to, and heap size need of each
+ * message in the mip array.
+ * - Return total heap size need for all messages
+ * that needs to be copied.
+ *
+ * If ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0:
+ * - In case off heap messages is disabled and
+ * we are inspecting our own queue, move all
+ * off heap data into the heap.
+ */
+
+ self_on_heap = c_p == rp && !(c_p->flags & F_OFF_HEAP_MSGQ);
+
+ tot_heap_size = 0;
+ i = 0;
+ mp = rp->msg.first;
+ while (mp) {
+ Eterm msg = ERL_MESSAGE_TERM(mp);
+
+ mip[i].size = 0;
+
+ if (is_non_value(msg)) {
+ /* Dist message on external format; decode it... */
+ if (mp->data.attached)
+ erts_decode_dist_message(rp, rp_locks, mp,
+ ERTS_INSPECT_MSGQ_KEEP_OH_MSGS);
+
+ msg = ERL_MESSAGE_TERM(mp);
+
+ if (is_non_value(msg)) {
+ ErtsMessage **mpp;
+ ErtsMessage *bad_mp = mp;
+ /*
+ * Bad distribution message; remove
+ * it from the queue...
+ */
+ ASSERT(!mp->data.attached);
+
+ mpp = i == 0 ? &rp->msg.first : &mip[i-1].msgp->next;
+
+ if (rp->msg.save == &bad_mp->next)
+ rp->msg.save = mpp;
+ if (rp->msg.last == &bad_mp->next)
+ rp->msg.last = mpp;
+ mp = mp->next;
+ *mpp = mp;
+ rp->msg.len--;
+ bad_mp->next = NULL;
+ erts_cleanup_messages(bad_mp);
+ continue;
+ }
+ }
+
+ ASSERT(is_value(msg));
+
+#if ERTS_INSPECT_MSGQ_KEEP_OH_MSGS
+ if (is_not_immed(msg) && (!self_on_heap || mp->data.attached)) {
+ Uint sz = size_object(msg);
+ mip[i].size = sz;
+ tot_heap_size += sz;
+ }
+#else
+ if (self_on_heap) {
+ if (mp->data.attached) {
+ ErtsMessage *tmp = NULL;
+ if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) {
+ erts_link_mbuf_to_proc(rp, mp->data.heap_frag);
+ mp->data.attached = NULL;
+ }
+ else {
+ /*
+ * Need to replace the message reference since
+ * we will get references to the message data
+ * from the heap...
+ */
+ ErtsMessage **mpp;
+ tmp = erts_alloc_message(0, NULL);
+ sys_memcpy((void *) tmp->m, (void *) mp->m,
+ sizeof(Eterm)*ERL_MESSAGE_REF_ARRAY_SZ);
+ mpp = i == 0 ? &rp->msg.first : &mip[i-1].msgp->next;
+ tmp->next = mp->next;
+ if (rp->msg.save == &mp->next)
+ rp->msg.save = &tmp->next;
+ if (rp->msg.last == &mp->next)
+ rp->msg.last = &tmp->next;
+ *mpp = tmp;
+ erts_save_message_in_proc(rp, mp);
+ mp = tmp;
+ }
+ }
+ }
+ else if (is_not_immed(msg)) {
+ Uint sz = size_object(msg);
+ mip[i].size = sz;
+ tot_heap_size += sz;
+ }
+
+#endif
+
+ mip[i].msgp = mp;
+ i++;
+ mp = mp->next;
+ }
+
+ return tot_heap_size;
+}
+
void erts_factory_proc_init(ErtsHeapFactory* factory,
Process* p)
{
@@ -1127,47 +1317,138 @@ void erts_factory_proc_prealloc_init(ErtsHeapFactory* factory,
Process* p,
Sint size)
{
+ ErlHeapFragment *bp = p->mbuf;
factory->mode = FACTORY_HALLOC;
factory->p = p;
factory->hp_start = HAlloc(p, size);
factory->hp = factory->hp_start;
factory->hp_end = factory->hp_start + size;
factory->off_heap = &p->off_heap;
+ factory->message = NULL;
factory->off_heap_saved.first = p->off_heap.first;
factory->off_heap_saved.overhead = p->off_heap.overhead;
- factory->heap_frags_saved = p->mbuf;
+ factory->heap_frags_saved = bp;
+ factory->heap_frags_saved_used = bp ? bp->used_size : 0;
factory->heap_frags = NULL; /* not used */
factory->alloc_type = 0; /* not used */
}
-void erts_factory_message_init(ErtsHeapFactory* factory,
- Process* rp,
- Eterm* hp,
- ErlHeapFragment* bp)
+void erts_factory_heap_frag_init(ErtsHeapFactory* factory,
+ ErlHeapFragment* bp)
+{
+ factory->mode = FACTORY_HEAP_FRAGS;
+ factory->p = NULL;
+ factory->hp_start = bp->mem;
+ factory->hp = bp->mem;
+ factory->hp_end = bp->mem + bp->alloc_size;
+ factory->off_heap = &bp->off_heap;
+ factory->message = NULL;
+ factory->heap_frags = bp;
+ factory->heap_frags_saved = NULL;
+ factory->heap_frags_saved_used = 0;
+ factory->alloc_type = ERTS_ALC_T_HEAP_FRAG;
+ ASSERT(!bp->next);
+ factory->off_heap_saved.first = factory->off_heap->first;
+ factory->off_heap_saved.overhead = factory->off_heap->overhead;
+
+ ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end);
+}
+
+
+ErtsMessage *
+erts_factory_message_create(ErtsHeapFactory* factory,
+ Process *proc,
+ ErtsProcLocks *proc_locksp,
+ Uint sz)
+{
+ Eterm *hp;
+ ErlOffHeap *ohp;
+ ErtsMessage *msgp;
+ int on_heap;
+ erts_aint32_t state;
+
+ state = erts_smp_atomic32_read_nob(&proc->state);
+
+ if (state & ERTS_PSFLG_OFF_HEAP_MSGQ) {
+ msgp = erts_alloc_message(sz, &hp);
+ ohp = sz == 0 ? NULL : &msgp->hfrag.off_heap;
+ on_heap = 0;
+ }
+ else {
+ msgp = erts_try_alloc_message_on_heap(proc, &state,
+ proc_locksp,
+ sz, &hp, &ohp,
+ &on_heap);
+ }
+
+ if (on_heap) {
+ ASSERT(*proc_locksp & ERTS_PROC_LOCK_MAIN);
+ ASSERT(ohp == &proc->off_heap);
+ factory->mode = FACTORY_HALLOC;
+ factory->p = proc;
+ factory->heap_frags_saved = proc->mbuf;
+ factory->heap_frags_saved_used = proc->mbuf ? proc->mbuf->used_size : 0;
+ }
+ else {
+ factory->mode = FACTORY_MESSAGE;
+ factory->p = NULL;
+ factory->heap_frags_saved = NULL;
+ factory->heap_frags_saved_used = 0;
+
+ if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) {
+ ASSERT(!msgp->hfrag.next);
+ factory->heap_frags = NULL;
+ }
+ else {
+ ASSERT(!msgp->data.heap_frag
+ || !msgp->data.heap_frag->next);
+ factory->heap_frags = msgp->data.heap_frag;
+ }
+ }
+ factory->hp_start = hp;
+ factory->hp = hp;
+ factory->hp_end = hp + sz;
+ factory->message = msgp;
+ factory->off_heap = ohp;
+ factory->alloc_type = ERTS_ALC_T_HEAP_FRAG;
+ if (ohp) {
+ factory->off_heap_saved.first = ohp->first;
+ factory->off_heap_saved.overhead = ohp->overhead;
+ }
+ else {
+ factory->off_heap_saved.first = NULL;
+ factory->off_heap_saved.overhead = 0;
+ }
+
+ ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end);
+
+ return msgp;
+}
+
+void erts_factory_selfcontained_message_init(ErtsHeapFactory* factory,
+ ErtsMessage *msgp,
+ Eterm *hp)
{
- if (bp) {
- factory->mode = FACTORY_HEAP_FRAGS;
- factory->p = NULL;
- factory->hp_start = bp->mem;
- factory->hp = hp ? hp : bp->mem;
- factory->hp_end = bp->mem + bp->alloc_size;
- factory->off_heap = &bp->off_heap;
- factory->heap_frags = bp;
- factory->heap_frags_saved = bp;
- factory->alloc_type = ERTS_ALC_T_HEAP_FRAG;
- ASSERT(!bp->next);
+ ErlHeapFragment* bp;
+ if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) {
+ bp = &msgp->hfrag;
+ factory->heap_frags = NULL;
}
else {
- factory->mode = FACTORY_HALLOC;
- factory->p = rp;
- factory->hp_start = hp;
- factory->hp = hp;
- factory->hp_end = HEAP_TOP(rp);
- factory->off_heap = &rp->off_heap;
- factory->heap_frags_saved = rp->mbuf;
- factory->heap_frags = NULL; /* not used */
- factory->alloc_type = 0; /* not used */
+ bp = msgp->data.heap_frag;
+ factory->heap_frags = bp;
}
+ factory->mode = FACTORY_MESSAGE;
+ factory->p = NULL;
+ factory->hp_start = bp->mem;
+ factory->hp = hp;
+ factory->hp_end = bp->mem + bp->alloc_size;
+ factory->message = msgp;
+ factory->off_heap = &bp->off_heap;
+ factory->heap_frags_saved = NULL;
+ factory->heap_frags_saved_used = 0;
+ factory->alloc_type = ERTS_ALC_T_HEAP_FRAG;
+ ASSERT(!bp->next);
factory->off_heap_saved.first = factory->off_heap->first;
factory->off_heap_saved.overhead = factory->off_heap->overhead;
@@ -1230,8 +1511,16 @@ static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra)
factory->hp_end = factory->hp + need;
return;
- case FACTORY_HEAP_FRAGS:
- bp = factory->heap_frags;
+ case FACTORY_MESSAGE:
+ if (!factory->heap_frags) {
+ ASSERT(factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG);
+ bp = &factory->message->hfrag;
+ }
+ else {
+ /* Fall through */
+ case FACTORY_HEAP_FRAGS:
+ bp = factory->heap_frags;
+ }
if (bp) {
ASSERT(factory->hp > bp->mem);
@@ -1269,8 +1558,23 @@ void erts_factory_close(ErtsHeapFactory* factory)
HRelease(factory->p, factory->hp_end, factory->hp);
break;
- case FACTORY_HEAP_FRAGS:
- bp = factory->heap_frags;
+ case FACTORY_MESSAGE:
+ if (!factory->heap_frags) {
+ if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG)
+ bp = &factory->message->hfrag;
+ else
+ bp = NULL;
+ }
+ else {
+ if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG)
+ factory->message->hfrag.next = factory->heap_frags;
+ else
+ factory->message->data.heap_frag = factory->heap_frags;
+
+ /* Fall through */
+ case FACTORY_HEAP_FRAGS:
+ bp = factory->heap_frags;
+ }
if (bp) {
ASSERT(factory->hp >= bp->mem);
@@ -1291,17 +1595,47 @@ void erts_factory_close(ErtsHeapFactory* factory)
void erts_factory_trim_and_close(ErtsHeapFactory* factory,
Eterm *brefs, Uint brefs_size)
{
- if (factory->mode == FACTORY_HEAP_FRAGS) {
- ErlHeapFragment* bp = factory->heap_frags;
+ ErlHeapFragment *bp;
+
+ switch (factory->mode) {
+ case FACTORY_MESSAGE: {
+ ErtsMessage *mp = factory->message;
+ if (mp->data.attached == ERTS_MSG_COMBINED_HFRAG) {
+ if (!mp->hfrag.next) {
+ Uint sz = factory->hp - factory->hp_start;
+ mp = erts_shrink_message(mp, sz, brefs, brefs_size);
+ factory->message = mp;
+ factory->mode = FACTORY_CLOSED;
+ return;
+ }
+ /*else we don't trim multi fragmented messages for now (off_heap...) */
+ break;
+ }
+ /* Fall through... */
+ }
+ case FACTORY_HEAP_FRAGS:
+ bp = factory->heap_frags;
+ if (!bp)
+ break;
if (bp->next == NULL) {
Uint used_sz = factory->hp - bp->mem;
ASSERT(used_sz <= bp->alloc_size);
- factory->heap_frags = erts_resize_message_buffer(bp, used_sz,
- brefs, brefs_size);
+ if (used_sz > 0)
+ bp = erts_resize_message_buffer(bp, used_sz,
+ brefs, brefs_size);
+ else {
+ free_message_buffer(bp);
+ bp = NULL;
+ }
+ factory->heap_frags = bp;
+ if (factory->mode == FACTORY_MESSAGE)
+ factory->message->data.heap_frag = bp;
factory->mode = FACTORY_CLOSED;
return;
}
- /*else we don't trim multi fragmented messages for now */
+ /*else we don't trim multi fragmented messages for now (off_heap...) */
+ default:
+ break;
}
erts_factory_close(factory);
}
@@ -1349,38 +1683,35 @@ void erts_factory_undo(ErtsHeapFactory* factory)
/* Rollback heap top
*/
- if (factory->heap_frags_saved == NULL) { /* No heap frags when we started */
- ASSERT(factory->hp_start >= HEAP_START(factory->p));
- ASSERT(factory->hp_start <= HEAP_LIMIT(factory->p));
- HEAP_TOP(factory->p) = factory->hp_start;
- }
- else {
+ if (HEAP_START(factory->p) <= factory->hp_start
+ && factory->hp_start <= HEAP_LIMIT(factory->p)) {
+ HEAP_TOP(factory->p) = factory->hp_start;
+ }
+
+ /* Fix last heap frag */
+ if (factory->heap_frags_saved) {
ASSERT(factory->heap_frags_saved == factory->p->mbuf);
- if (factory->hp_start == factory->heap_frags_saved->mem) {
+ if (factory->hp_start != factory->heap_frags_saved->mem)
+ factory->heap_frags_saved->used_size = factory->heap_frags_saved_used;
+ else {
factory->p->mbuf = factory->p->mbuf->next;
ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, factory->heap_frags_saved,
ERTS_HEAP_FRAG_SIZE(factory->heap_frags_saved->alloc_size));
}
- else if (factory->hp_start != factory->hp_end) {
- unsigned remains = factory->hp_start - factory->heap_frags_saved->mem;
- ASSERT(remains > 0 && remains < factory->heap_frags_saved->used_size);
- factory->heap_frags_saved->used_size = remains;
- }
}
}
break;
+ case FACTORY_MESSAGE:
+ if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG)
+ factory->message->hfrag.next = factory->heap_frags;
+ else
+ factory->message->data.heap_frag = factory->heap_frags;
+ erts_cleanup_messages(factory->message);
+ break;
case FACTORY_HEAP_FRAGS:
- bp = factory->heap_frags;
- do {
- ErlHeapFragment* next_bp = bp->next;
-
- erts_cleanup_offheap(&bp->off_heap);
- ERTS_HEAP_FREE(factory->alloc_type, (void *) bp,
- ERTS_HEAP_FRAG_SIZE(bp->size));
- bp = next_bp;
- }while (bp != NULL);
+ free_message_buffer(factory->heap_frags);
break;
case FACTORY_CLOSED: break;
diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h
index f37b430d27..740ae46a0f 100644
--- a/erts/emulator/beam/erl_message.h
+++ b/erts/emulator/beam/erl_message.h
@@ -24,6 +24,8 @@
struct proc_bin;
struct external_thing_;
+typedef struct erl_mesg ErtsMessage;
+
/*
* This struct represents data that must be updated by structure copy,
* but is stored outside of any heap.
@@ -54,6 +56,7 @@ typedef struct {
enum {
FACTORY_CLOSED = 0,
FACTORY_HALLOC,
+ FACTORY_MESSAGE,
FACTORY_HEAP_FRAGS,
FACTORY_STATIC
} mode;
@@ -61,8 +64,10 @@ typedef struct {
Eterm* hp_start;
Eterm* hp;
Eterm* hp_end;
+ ErtsMessage *message;
struct erl_heap_fragment* heap_frags;
struct erl_heap_fragment* heap_frags_saved;
+ Uint heap_frags_saved_used;
ErlOffHeap* off_heap;
ErlOffHeap off_heap_saved;
Uint32 alloc_type;
@@ -70,7 +75,10 @@ typedef struct {
void erts_factory_proc_init(ErtsHeapFactory*, Process*);
void erts_factory_proc_prealloc_init(ErtsHeapFactory*, Process*, Sint size);
-void erts_factory_message_init(ErtsHeapFactory*, Process*, Eterm* hp, struct erl_heap_fragment*);
+void erts_factory_heap_frag_init(ErtsHeapFactory*, struct erl_heap_fragment*);
+ErtsMessage *erts_factory_message_create(ErtsHeapFactory *, Process *,
+ ErtsProcLocks *, Uint sz);
+void erts_factory_selfcontained_message_init(ErtsHeapFactory*, ErtsMessage *, Eterm *);
void erts_factory_static_init(ErtsHeapFactory*, Eterm* hp, Uint size, ErlOffHeap*);
void erts_factory_dummy_init(ErtsHeapFactory*);
@@ -91,6 +99,8 @@ void erts_factory_undo(ErtsHeapFactory*);
#include "external.h"
#include "erl_process.h"
+#define ERTS_INVALID_HFRAG_PTR ((ErlHeapFragment *) ~((UWord) 7))
+
/*
* This struct represents a heap fragment, which is used when there
* isn't sufficient room in the process heap and we can't do a GC.
@@ -105,33 +115,46 @@ struct erl_heap_fragment {
Eterm mem[1]; /* Data */
};
-typedef struct erl_mesg {
- struct erl_mesg* next; /* Next message */
- union {
- ErtsDistExternal *dist_ext;
- ErlHeapFragment *heap_frag;
- void *attached;
- } data;
-#ifdef USE_VM_PROBES
- Eterm m[3]; /* m[0] = message, m[1] = seq trace token, m[3] = dynamic trace user tag */
-#else
- Eterm m[2]; /* m[0] = message, m[1] = seq trace token */
-#endif
-} ErlMessage;
-
+/* m[0] = message, m[1] = seq trace token */
+#define ERL_MESSAGE_REF_ARRAY_SZ 2
#define ERL_MESSAGE_TERM(mp) ((mp)->m[0])
#define ERL_MESSAGE_TOKEN(mp) ((mp)->m[1])
+
#ifdef USE_VM_PROBES
+/* m[2] = dynamic trace user tag */
+#undef ERL_MESSAGE_REF_ARRAY_SZ
+#define ERL_MESSAGE_REF_ARRAY_SZ 3
#define ERL_MESSAGE_DT_UTAG(mp) ((mp)->m[2])
+#else
#endif
+#define ERL_MESSAGE_REF_FIELDS__ \
+ ErtsMessage *next; /* Next message */ \
+ union { \
+ ErtsDistExternal *dist_ext; \
+ ErlHeapFragment *heap_frag; \
+ void *attached; \
+ } data; \
+ Eterm m[ERL_MESSAGE_REF_ARRAY_SZ]
+
+
+typedef struct erl_msg_ref__ {
+ ERL_MESSAGE_REF_FIELDS__;
+} ErtsMessageRef;
+
+struct erl_mesg {
+ ERL_MESSAGE_REF_FIELDS__;
+
+ ErlHeapFragment hfrag;
+};
+
/* Size of default message buffer (erl_message.c) */
#define ERL_MESSAGE_BUF_SZ 500
typedef struct {
- ErlMessage* first;
- ErlMessage** last; /* point to the last next pointer */
- ErlMessage** save;
+ ErtsMessage* first;
+ ErtsMessage** last; /* point to the last next pointer */
+ ErtsMessage** save;
Sint len; /* queue length */
/*
@@ -139,14 +162,14 @@ typedef struct {
* recv_set/1 instructions.
*/
BeamInstr* mark; /* address to rec_loop/2 instruction */
- ErlMessage** saved_last; /* saved last pointer */
+ ErtsMessage** saved_last; /* saved last pointer */
} ErlMessageQueue;
#ifdef ERTS_SMP
typedef struct {
- ErlMessage* first;
- ErlMessage** last; /* point to the last next pointer */
+ ErtsMessage* first;
+ ErtsMessage** last; /* point to the last next pointer */
Sint len; /* queue length */
} ErlMessageInQueue;
@@ -197,7 +220,7 @@ do { \
/* Unlink current message */
#define UNLINK_MESSAGE(p,msgp) do { \
- ErlMessage* __mp = (msgp)->next; \
+ ErtsMessage* __mp = (msgp)->next; \
*(p)->msg.save = __mp; \
(p)->msg.len--; \
if (__mp == NULL) \
@@ -213,76 +236,33 @@ do { \
#define SAVE_MESSAGE(p) \
(p)->msg.save = &(*(p)->msg.save)->next
-/*
- * ErtsMoveMsgAttachmentIntoProc() moves data attached to a message
- * onto the heap of a process. The attached data is the content of
- * the the message either on the internal format or on the external
- * format, and also possibly a seq trace token on the internal format.
- * If the message content is on the external format, the decode might
- * fail. If the decoding fails, ERL_MESSAGE_TERM(M) will contain
- * THE_NON_VALUE. That is, ERL_MESSAGE_TERM(M) *has* to be checked
- * afterwards and taken care of appropriately.
- *
- * ErtsMoveMsgAttachmentIntoProc() will shallow copy to heap if
- * possible; otherwise, move to heap via garbage collection.
- *
- * ErtsMoveMsgAttachmentIntoProc() is used when receiveing messages
- * in process_main() and in hipe_check_get_msg().
- */
-
-#define ErtsMoveMsgAttachmentIntoProc(M, P, ST, HT, FC, SWPO, SWPI) \
-do { \
- if ((M)->data.attached) { \
- Uint need__ = erts_msg_attached_data_size((M)); \
- { SWPO ; } \
- if ((ST) - (HT) >= need__) { \
- ErtsHeapFactory factory__; \
- erts_factory_proc_prealloc_init(&factory__, (P), need__); \
- erts_move_msg_attached_data_to_heap(&factory__, (M)); \
- erts_factory_close(&factory__); \
- if ((P)->mbuf != NULL) { \
- /* Heap was exhausted by messages. This is a rare case */ \
- /* that can currently (OTP 18) only happen if hamts are */ \
- /* far exceeding the estimated heap size. Do GC. */ \
- (FC) -= erts_garbage_collect((P), 0, NULL, 0); \
- } \
- } \
- else { \
- (FC) -= erts_garbage_collect((P), 0, NULL, 0); \
- } \
- { SWPI ; } \
- ASSERT(!(M)->data.attached); \
- } \
-} while (0)
-
#define ERTS_SND_FLG_NO_SEQ_TRACE (((unsigned) 1) << 0)
#define ERTS_HEAP_FRAG_SIZE(DATA_WORDS) \
(sizeof(ErlHeapFragment) - sizeof(Eterm) + (DATA_WORDS)*sizeof(Eterm))
-#define ERTS_INIT_HEAP_FRAG(HEAP_FRAG_P, DATA_WORDS) \
-do { \
- (HEAP_FRAG_P)->next = NULL; \
- (HEAP_FRAG_P)->alloc_size = (DATA_WORDS); \
- (HEAP_FRAG_P)->used_size = (DATA_WORDS); \
- (HEAP_FRAG_P)->off_heap.first = NULL; \
- (HEAP_FRAG_P)->off_heap.overhead = 0; \
-} while (0)
+#define ERTS_INIT_HEAP_FRAG(HEAP_FRAG_P, USED_WORDS, DATA_WORDS) \
+ do { \
+ (HEAP_FRAG_P)->next = NULL; \
+ (HEAP_FRAG_P)->alloc_size = (DATA_WORDS); \
+ (HEAP_FRAG_P)->used_size = (USED_WORDS); \
+ (HEAP_FRAG_P)->off_heap.first = NULL; \
+ (HEAP_FRAG_P)->off_heap.overhead = 0; \
+ } while (0)
void init_message(void);
-void free_message(ErlMessage *);
ErlHeapFragment* new_message_buffer(Uint);
ErlHeapFragment* erts_resize_message_buffer(ErlHeapFragment *, Uint,
Eterm *, Uint);
void free_message_buffer(ErlHeapFragment *);
void erts_queue_dist_message(Process*, ErtsProcLocks*, ErtsDistExternal *, Eterm);
#ifdef USE_VM_PROBES
-void erts_queue_message_probe(Process*, ErtsProcLocks*, ErlHeapFragment*,
+void erts_queue_message_probe(Process*, ErtsProcLocks*, ErtsMessage*,
Eterm message, Eterm seq_trace_token, Eterm dt_utag);
#define erts_queue_message(RP,RL,BP,Msg,SEQ) \
erts_queue_message_probe((RP),(RL),(BP),(Msg),(SEQ),NIL)
#else
-void erts_queue_message(Process*, ErtsProcLocks*, ErlHeapFragment*,
+void erts_queue_message(Process*, ErtsProcLocks*, ErtsMessage*,
Eterm message, Eterm seq_trace_token);
#define erts_queue_message_probe(RP,RL,BP,Msg,SEQ,TAG) \
erts_queue_message((RP),(RL),(BP),(Msg),(SEQ))
@@ -291,20 +271,141 @@ void erts_deliver_exit_message(Eterm, Process*, ErtsProcLocks *, Eterm, Eterm);
Sint erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned);
void erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp);
-void erts_move_msg_mbuf_to_heap(Eterm**, ErlOffHeap*, ErlMessage *);
-
-Uint erts_msg_attached_data_size_aux(ErlMessage *msg);
-void erts_move_msg_attached_data_to_heap(ErtsHeapFactory*, ErlMessage *);
-Eterm erts_msg_distext2heap(Process *, ErtsProcLocks *, ErlHeapFragment **,
- Eterm *, ErtsDistExternal *);
+Uint erts_msg_attached_data_size_aux(ErtsMessage *msg);
void erts_cleanup_offheap(ErlOffHeap *offheap);
+void erts_save_message_in_proc(Process *p, ErtsMessage *msg);
+Sint erts_move_messages_off_heap(Process *c_p);
+Sint erts_complete_off_heap_message_queue_change(Process *c_p);
+Eterm erts_change_off_heap_message_queue_state(Process *c_p, int enable);
+
+int erts_decode_dist_message(Process *, ErtsProcLocks, ErtsMessage *, int);
+
+void erts_cleanup_messages(ErtsMessage *mp);
+
+typedef struct {
+ Uint size;
+ ErtsMessage *msgp;
+} ErtsMessageInfo;
+
+Uint erts_prep_msgq_for_inspection(Process *c_p,
+ Process *rp,
+ ErtsProcLocks rp_locks,
+ ErtsMessageInfo *mip);
+void *erts_alloc_message_ref(void);
+void erts_free_message_ref(void *);
+#define ERTS_SMALL_FIX_MSG_SZ 10
+#define ERTS_MEDIUM_FIX_MSG_SZ 20
+#define ERTS_LARGE_FIX_MSG_SZ 30
+
+void *erts_alloc_small_message(void);
+void erts_free_small_message(void *mp);
+
+typedef struct {
+ ErtsMessage m;
+ Eterm data[ERTS_SMALL_FIX_MSG_SZ-1];
+} ErtsSmallFixSzMessage;
+
+typedef struct {
+ ErtsMessage m;
+ Eterm data[ERTS_MEDIUM_FIX_MSG_SZ-1];
+} ErtsMediumFixSzMessage;
+
+typedef struct {
+ ErtsMessage m;
+ Eterm data[ERTS_LARGE_FIX_MSG_SZ-1];
+} ErtsLargeFixSzMessage;
+
+ErtsMessage *erts_try_alloc_message_on_heap(Process *pp,
+ erts_aint32_t *psp,
+ ErtsProcLocks *plp,
+ Uint sz,
+ Eterm **hpp,
+ ErlOffHeap **ohpp,
+ int *on_heap_p);
+ErtsMessage *erts_realloc_shrink_message(ErtsMessage *mp, Uint sz,
+ Eterm *brefs, Uint brefs_size);
+
+ERTS_GLB_FORCE_INLINE ErtsMessage *erts_alloc_message(Uint sz, Eterm **hpp);
+ERTS_GLB_FORCE_INLINE ErtsMessage *erts_shrink_message(ErtsMessage *mp, Uint sz,
+ Eterm *brefs, Uint brefs_size);
+ERTS_GLB_FORCE_INLINE void erts_free_message(ErtsMessage *mp);
ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment*);
-ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErlMessage *msg);
+ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErtsMessage *msg);
+
+#define ERTS_MSG_COMBINED_HFRAG ((void *) 0x1)
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_FORCE_INLINE ErtsMessage *erts_alloc_message(Uint sz, Eterm **hpp)
+{
+ ErtsMessage *mp;
+
+ if (sz == 0) {
+ mp = erts_alloc_message_ref();
+ mp->next = NULL;
+ ERL_MESSAGE_TERM(mp) = NIL;
+ mp->data.attached = NULL;
+ if (hpp)
+ *hpp = NULL;
+ return mp;
+ }
+
+ mp = erts_alloc(ERTS_ALC_T_MSG,
+ sizeof(ErtsMessage) + (sz - 1)*sizeof(Eterm));
+
+ mp->next = NULL;
+ ERL_MESSAGE_TERM(mp) = NIL;
+ mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
+ ERTS_INIT_HEAP_FRAG(&mp->hfrag, sz, sz);
+
+ if (hpp)
+ *hpp = &mp->hfrag.mem[0];
+
+ return mp;
+}
+
+ERTS_GLB_FORCE_INLINE ErtsMessage *
+erts_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_size)
+{
+ if (sz == 0) {
+ ErtsMessage *nmp;
+ if (!mp->data.attached)
+ return mp;
+ ASSERT(mp->data.attached == ERTS_MSG_COMBINED_HFRAG);
+ nmp = erts_alloc_message_ref();
+#ifdef DEBUG
+ if (brefs && brefs_size) {
+ int i;
+ for (i = 0; i < brefs_size; i++)
+ ASSERT(is_non_value(brefs[i]) || is_immed(brefs[i]));
+ }
+#endif
+ erts_free(ERTS_ALC_T_MSG, mp);
+ return nmp;
+ }
+
+ ASSERT(mp->data.attached == ERTS_MSG_COMBINED_HFRAG);
+ ASSERT(mp->hfrag.used_size >= sz);
+
+ if (sz >= (mp->hfrag.alloc_size - mp->hfrag.alloc_size / 16)) {
+ mp->hfrag.used_size = sz;
+ return mp;
+ }
+
+ return erts_realloc_shrink_message(mp, sz, brefs, brefs_size);
+}
+
+ERTS_GLB_FORCE_INLINE void erts_free_message(ErtsMessage *mp)
+{
+ if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG)
+ erts_free_message_ref(mp);
+ else
+ erts_free(ERTS_ALC_T_MSG, mp);
+}
+
ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment* bp)
{
Uint sz = 0;
@@ -314,11 +415,17 @@ ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment* bp)
return sz;
}
-ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErlMessage *msg)
+ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErtsMessage *msg)
{
ASSERT(msg->data.attached);
- if (is_value(ERL_MESSAGE_TERM(msg)))
- return erts_used_frag_sz(msg->data.heap_frag);
+ if (is_value(ERL_MESSAGE_TERM(msg))) {
+ ErlHeapFragment *bp;
+ if (msg->data.attached == ERTS_MSG_COMBINED_HFRAG)
+ bp = &msg->hfrag;
+ else
+ bp = msg->data.heap_frag;
+ return erts_used_frag_sz(bp);
+ }
else if (msg->data.dist_ext->heap_size < 0)
return erts_msg_attached_data_size_aux(msg);
else {
diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c
index 01414f326d..a37cda93ef 100644
--- a/erts/emulator/beam/erl_nif.c
+++ b/erts/emulator/beam/erl_nif.c
@@ -314,6 +314,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid,
ErtsProcLocks rp_locks = 0;
Process* rp;
Process* c_p;
+ ErtsMessage *mp;
ErlHeapFragment* frags;
Eterm receiver = to_pid->pid;
int flush_me = 0;
@@ -347,7 +348,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid,
ASSERT(frags == MBUF(&menv->phony_proc));
if (frags != NULL) {
/* Move all offheap's from phony proc to the first fragment.
- Quick and dirty, but erts_move_msg_mbuf_to_heap doesn't care. */
+ Quick and dirty... */
ASSERT(!is_offheap(&frags->off_heap));
frags->off_heap = MSO(&menv->phony_proc);
clear_offheap(&MSO(&menv->phony_proc));
@@ -359,7 +360,9 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid,
if (flush_me) {
flush_env(env); /* Needed for ERTS_HOLE_CHECK */
}
- erts_queue_message(rp, &rp_locks, frags, msg, am_undefined);
+ mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = frags;
+ erts_queue_message(rp, &rp_locks, mp, msg, am_undefined);
if (c_p == rp)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
if (rp_locks)
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index 62a44f7129..a4da288e79 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -1401,56 +1401,50 @@ setup_reference_table(void)
for (i = 0; i < max; i++) {
Process *proc = erts_pix2proc(i);
if (proc) {
- ErlMessage *msg;
+ int mli;
+ ErtsMessage *msg_list[] = {
+ proc->msg.first,
+#ifdef ERTS_SMP
+ proc->msg_inq.first,
+#endif
+ proc->msg_frag};
/* Insert Heap */
insert_offheap(&(proc->off_heap),
HEAP_REF,
proc->common.id);
- /* Insert message buffers */
+ /* Insert heap fragments buffers */
for(hfp = proc->mbuf; hfp; hfp = hfp->next)
insert_offheap(&(hfp->off_heap),
HEAP_REF,
proc->common.id);
- /* Insert msg msg buffers */
- for (msg = proc->msg.first; msg; msg = msg->next) {
- ErlHeapFragment *heap_frag = NULL;
- if (msg->data.attached) {
- if (is_value(ERL_MESSAGE_TERM(msg)))
- heap_frag = msg->data.heap_frag;
- else {
- if (msg->data.dist_ext->dep)
- insert_dist_entry(msg->data.dist_ext->dep,
- HEAP_REF, proc->common.id, 0);
- if (is_not_nil(ERL_MESSAGE_TOKEN(msg)))
- heap_frag = erts_dist_ext_trailer(msg->data.dist_ext);
+
+ /* Insert msg buffers */
+ for (mli = 0; mli < sizeof(msg_list)/sizeof(msg_list[0]); mli++) {
+ ErtsMessage *msg;
+ for (msg = msg_list[mli]; msg; msg = msg->next) {
+ ErlHeapFragment *heap_frag = NULL;
+ if (msg->data.attached) {
+ if (msg->data.attached == ERTS_MSG_COMBINED_HFRAG)
+ heap_frag = &msg->hfrag;
+ else if (is_value(ERL_MESSAGE_TERM(msg)))
+ heap_frag = msg->data.heap_frag;
+ else {
+ if (msg->data.dist_ext->dep)
+ insert_dist_entry(msg->data.dist_ext->dep,
+ HEAP_REF, proc->common.id, 0);
+ if (is_not_nil(ERL_MESSAGE_TOKEN(msg)))
+ heap_frag = erts_dist_ext_trailer(msg->data.dist_ext);
+ }
}
- }
- if (heap_frag)
- insert_offheap(&(heap_frag->off_heap),
- HEAP_REF,
- proc->common.id);
- }
-#ifdef ERTS_SMP
- for (msg = proc->msg_inq.first; msg; msg = msg->next) {
- ErlHeapFragment *heap_frag = NULL;
- if (msg->data.attached) {
- if (is_value(ERL_MESSAGE_TERM(msg)))
- heap_frag = msg->data.heap_frag;
- else {
- if (msg->data.dist_ext->dep)
- insert_dist_entry(msg->data.dist_ext->dep,
- HEAP_REF, proc->common.id, 0);
- if (is_not_nil(ERL_MESSAGE_TOKEN(msg)))
- heap_frag = erts_dist_ext_trailer(msg->data.dist_ext);
+ while (heap_frag) {
+ insert_offheap(&(heap_frag->off_heap),
+ HEAP_REF,
+ proc->common.id);
+ heap_frag = heap_frag->next;
}
}
- if (heap_frag)
- insert_offheap(&(heap_frag->off_heap),
- HEAP_REF,
- proc->common.id);
}
-#endif
/* Insert links */
if (ERTS_P_LINKS(proc))
insert_links(ERTS_P_LINKS(proc), proc->common.id);
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index bad9da90ea..e490ffea5a 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -148,6 +148,7 @@ extern BeamInstr beam_apply[];
extern BeamInstr beam_exit[];
extern BeamInstr beam_continue_exit[];
+int erts_default_spo_flags = 0;
int erts_eager_check_io = 1;
int erts_sched_compact_load;
int erts_sched_balance_util = 0;
@@ -351,7 +352,8 @@ struct erts_system_profile_flags_t erts_system_profile_flags;
typedef enum {
ERTS_PSTT_GC, /* Garbage Collect */
- ERTS_PSTT_CPC /* Check Process Code */
+ ERTS_PSTT_CPC, /* Check Process Code */
+ ERTS_PSTT_COHMQ /* Change off heap message queue */
} ErtsProcSysTaskType;
#define ERTS_MAX_PROC_SYS_TASK_ARGS 2
@@ -982,7 +984,7 @@ reply_sched_wall_time(void *vswtrp)
Eterm **hpp;
Uint sz, *szp;
ErlOffHeap *ohp = NULL;
- ErlHeapFragment *bp = NULL;
+ ErtsMessage *mp = NULL;
ASSERT(esdp);
#ifdef ERTS_DIRTY_SCHEDULERS
@@ -1038,12 +1040,12 @@ reply_sched_wall_time(void *vswtrp)
if (hpp)
break;
- hp = erts_alloc_message_heap(sz, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, sz, &hp, &ohp);
szp = NULL;
hpp = &hp;
}
- erts_queue_message(rp, &rp_locks, bp, msg, NIL);
+ erts_queue_message(rp, &rp_locks, mp, msg, NIL);
if (swtrp->req_sched == esdp->no)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
@@ -6294,22 +6296,99 @@ erts_schedule_process(Process *p, erts_aint32_t state, ErtsProcLocks locks)
schedule_process(p, state, locks);
}
-static void
-schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy)
+static int
+schedule_process_sys_task(Process *p, erts_aint32_t prio, ErtsProcSysTask *st)
{
- /*
- * Expects status lock to be locked when called, and
- * returns with status lock unlocked...
- */
- erts_aint32_t a = state, n, enq_prio = -1;
+ int res;
+ int locked;
+ ErtsProcSysTaskQs *stqs, *free_stqs;
+ erts_aint32_t state, a, n, enq_prio;
int enqueue; /* < 0 -> use proxy */
- unsigned int prof_runnable_procs = erts_system_profile_flags.runnable_procs;
+ unsigned int prof_runnable_procs;
+
+ res = 1; /* prepare for success */
+ st->next = st->prev = st; /* Prep for empty prio queue */
+ state = erts_smp_atomic32_read_nob(&p->state);
+ prof_runnable_procs = erts_system_profile_flags.runnable_procs;
+ locked = 0;
+ free_stqs = NULL;
+ if (state & ERTS_PSFLG_ACTIVE_SYS)
+ stqs = NULL;
+ else {
+ alloc_qs:
+ stqs = proc_sys_task_queues_alloc();
+ stqs->qmask = 1 << prio;
+ stqs->ncount = 0;
+ stqs->q[PRIORITY_MAX] = NULL;
+ stqs->q[PRIORITY_HIGH] = NULL;
+ stqs->q[PRIORITY_NORMAL] = NULL;
+ stqs->q[PRIORITY_LOW] = NULL;
+ stqs->q[prio] = st;
+ }
+
+ if (!locked) {
+ locked = 1;
+ erts_smp_proc_lock(p, ERTS_PROC_LOCK_STATUS);
+
+ state = erts_smp_atomic32_read_nob(&p->state);
+ if (state & ERTS_PSFLG_EXITING) {
+ free_stqs = stqs;
+ res = 0;
+ goto cleanup;
+ }
+ }
+
+ if (!p->sys_task_qs) {
+ if (stqs)
+ p->sys_task_qs = stqs;
+ else
+ goto alloc_qs;
+ }
+ else {
+ free_stqs = stqs;
+ stqs = p->sys_task_qs;
+ if (!stqs->q[prio]) {
+ stqs->q[prio] = st;
+ stqs->qmask |= 1 << prio;
+ }
+ else {
+ st->next = stqs->q[prio];
+ st->prev = stqs->q[prio]->prev;
+ st->next->prev = st;
+ st->prev->next = st;
+ ASSERT(stqs->qmask & (1 << prio));
+ }
+ }
+
+ if (ERTS_PSFLGS_GET_ACT_PRIO(state) > prio) {
+ erts_aint32_t n, a, e;
+ /* Need to elevate actual prio */
+
+ a = state;
+ do {
+ if (ERTS_PSFLGS_GET_ACT_PRIO(a) <= prio) {
+ n = a;
+ break;
+ }
+ n = e = a;
+ n &= ~ERTS_PSFLGS_ACT_PRIO_MASK;
+ n |= (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET);
+ a = erts_smp_atomic32_cmpxchg_nob(&p->state, n, e);
+ } while (a != e);
+ state = n;
+ }
+
+
+ a = state;
+ enq_prio = -1;
/* Status lock prevents out of order "runnable proc" trace msgs */
ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p));
- if (!prof_runnable_procs)
+ if (!prof_runnable_procs) {
erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
+ locked = 0;
+ }
ASSERT(!(state & ERTS_PSFLG_PROXY));
@@ -6317,8 +6396,10 @@ schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy)
erts_aint32_t e;
n = e = a;
- if (a & ERTS_PSFLG_FREE)
+ if (a & ERTS_PSFLG_FREE) {
+ res = 0;
goto cleanup; /* We don't want to schedule free processes... */
+ }
enqueue = ERTS_ENQUEUE_NOT;
n |= ERTS_PSFLG_ACTIVE_SYS;
@@ -6342,29 +6423,24 @@ schedule_process_sys_task(Process *p, erts_aint32_t state, Process *proxy)
}
erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
- prof_runnable_procs = 0;
+ locked = 0;
}
- if (enqueue != ERTS_ENQUEUE_NOT) {
- Process *sched_p;
- if (enqueue > 0)
- sched_p = p;
- else {
- sched_p = make_proxy_proc(proxy, p, enq_prio);
- proxy = NULL;
- }
- add2runq(sched_p, n, enq_prio);
- }
+ if (enqueue != ERTS_ENQUEUE_NOT)
+ add2runq(enqueue > 0 ? p : make_proxy_proc(NULL, p, enq_prio),
+ n, enq_prio);
cleanup:
- if (prof_runnable_procs)
+ if (locked)
erts_smp_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
- if (proxy)
- free_proxy_proc(proxy);
+ if (free_stqs)
+ proc_sys_task_queues_free(free_stqs);
ERTS_SMP_LC_ASSERT(!(ERTS_PROC_LOCK_STATUS & erts_proc_lc_my_proc_locks(p)));
+
+ return res;
}
static ERTS_INLINE int
@@ -9696,13 +9772,13 @@ Process *schedule(Process *p, int calls)
}
}
- if (!(state & ERTS_PSFLG_EXITING)
- && ((FLAGS(p) & F_FORCE_GC)
- || (MSO(p).overhead > BIN_VHEAP_SZ(p)))) {
- reds -= erts_garbage_collect(p, 0, p->arg_reg, p->arity);
- if (reds <= 0) {
- p->fcalls = reds;
- goto sched_out_proc;
+ if (ERTS_IS_GC_DESIRED(p)) {
+ if (!(state & ERTS_PSFLG_EXITING) && !(p->flags & F_DISABLE_GC)) {
+ reds -= erts_garbage_collect(p, 0, p->arg_reg, p->arity);
+ if (reds <= 0) {
+ p->fcalls = reds;
+ goto sched_out_proc;
+ }
}
}
@@ -9742,7 +9818,7 @@ notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, Eterm st_result)
if (rp) {
ErtsProcLocks rp_locks;
ErlOffHeap *ohp;
- ErlHeapFragment* bp;
+ ErtsMessage *mp;
Eterm *hp, msg, req_id, result;
Uint st_result_sz, hsz;
#ifdef DEBUG
@@ -9754,11 +9830,7 @@ notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, Eterm st_result)
st_result_sz = is_immed(st_result) ? 0 : size_object(st_result);
hsz = st->req_id_sz + st_result_sz + 4 /* 3-tuple */;
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, hsz, &hp, &ohp);
#ifdef DEBUG
hp_start = hp;
@@ -9783,7 +9855,7 @@ notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, Eterm st_result)
ASSERT(hp_start + hsz == hp);
#endif
- erts_queue_message(rp, &rp_locks, bp, msg, NIL);
+ erts_queue_message(rp, &rp_locks, mp, msg, NIL);
if (c_p == rp)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
@@ -10005,6 +10077,10 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds)
st = NULL;
}
break;
+ case ERTS_PSTT_COHMQ:
+ reds += erts_complete_off_heap_message_queue_change(c_p);
+ st_res = am_true;
+ break;
default:
ERTS_INTERNAL_ERROR("Invalid process sys task type");
st_res = am_false;
@@ -10047,6 +10123,9 @@ cleanup_sys_tasks(Process *c_p, erts_aint32_t in_state, int in_reds)
case ERTS_PSTT_CPC:
st_res = am_false;
break;
+ case ERTS_PSTT_COHMQ:
+ st_res = am_false;
+ break;
default:
ERTS_INTERNAL_ERROR("Invalid process sys task type");
st_res = am_false;
@@ -10065,10 +10144,8 @@ BIF_RETTYPE
erts_internal_request_system_task_3(BIF_ALIST_3)
{
Process *rp = erts_proc_lookup(BIF_ARG_1);
- ErtsProcSysTaskQs *stqs, *free_stqs = NULL;
ErtsProcSysTask *st = NULL;
- erts_aint32_t prio, rp_state;
- int rp_locked;
+ erts_aint32_t prio;
Eterm noproc_res, req_type;
if (!rp && !is_internal_pid(BIF_ARG_1)) {
@@ -10125,7 +10202,6 @@ erts_internal_request_system_task_3(BIF_ALIST_3)
}
st = erts_alloc(ERTS_ALC_T_PROC_SYS_TSK,
ERTS_PROC_SYS_TASK_SIZE(tot_sz));
- st->next = st->prev = st; /* Prep for empty prio queue */
ERTS_INIT_OFF_HEAP(&st->off_heap);
hp = &st->heap[0];
@@ -10169,95 +10245,11 @@ erts_internal_request_system_task_3(BIF_ALIST_3)
goto badarg;
}
- rp_state = erts_smp_atomic32_read_nob(&rp->state);
-
- rp_locked = 0;
-
- free_stqs = NULL;
- if (rp_state & ERTS_PSFLG_ACTIVE_SYS)
- stqs = NULL;
- else {
- alloc_qs:
- stqs = proc_sys_task_queues_alloc();
- stqs->qmask = 1 << prio;
- stqs->ncount = 0;
- stqs->q[PRIORITY_MAX] = NULL;
- stqs->q[PRIORITY_HIGH] = NULL;
- stqs->q[PRIORITY_NORMAL] = NULL;
- stqs->q[PRIORITY_LOW] = NULL;
- stqs->q[prio] = st;
- }
-
- if (!rp_locked) {
- rp_locked = 1;
- erts_smp_proc_lock(rp, ERTS_PROC_LOCK_STATUS);
-
- rp_state = erts_smp_atomic32_read_nob(&rp->state);
- if (rp_state & ERTS_PSFLG_EXITING) {
- erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
- rp = NULL;
- free_stqs = stqs;
- goto noproc;
- }
+ if (!schedule_process_sys_task(rp, prio, st)) {
+ noproc:
+ notify_sys_task_executed(BIF_P, st, noproc_res);
}
- if (!rp->sys_task_qs) {
- if (stqs)
- rp->sys_task_qs = stqs;
- else
- goto alloc_qs;
- }
- else {
- if (stqs)
- free_stqs = stqs;
- stqs = rp->sys_task_qs;
- if (!stqs->q[prio]) {
- stqs->q[prio] = st;
- stqs->qmask |= 1 << prio;
- }
- else {
- st->next = stqs->q[prio];
- st->prev = stqs->q[prio]->prev;
- st->next->prev = st;
- st->prev->next = st;
- ASSERT(stqs->qmask & (1 << prio));
- }
- }
-
- if (ERTS_PSFLGS_GET_ACT_PRIO(rp_state) > prio) {
- erts_aint32_t n, a, e;
- /* Need to elevate actual prio */
-
- a = rp_state;
- do {
- if (ERTS_PSFLGS_GET_ACT_PRIO(a) <= prio) {
- n = a;
- break;
- }
- n = e = a;
- n &= ~ERTS_PSFLGS_ACT_PRIO_MASK;
- n |= (prio << ERTS_PSFLGS_ACT_PRIO_OFFSET);
- a = erts_smp_atomic32_cmpxchg_nob(&rp->state, n, e);
- } while (a != e);
- rp_state = n;
- }
-
- /*
- * schedule_process_sys_task() unlocks status
- * lock on process.
- */
- schedule_process_sys_task(rp, rp_state, NULL);
-
- if (free_stqs)
- proc_sys_task_queues_free(free_stqs);
-
- BIF_RET(am_ok);
-
-noproc:
-
- notify_sys_task_executed(BIF_P, st, noproc_res);
- if (free_stqs)
- proc_sys_task_queues_free(free_stqs);
BIF_RET(am_ok);
badarg:
@@ -10266,11 +10258,35 @@ badarg:
erts_cleanup_offheap(&st->off_heap);
erts_free(ERTS_ALC_T_PROC_SYS_TSK, st);
}
- if (free_stqs)
- proc_sys_task_queues_free(free_stqs);
BIF_ERROR(BIF_P, BADARG);
}
+void
+erts_schedule_complete_off_heap_message_queue_change(Eterm pid)
+{
+ Process *rp = erts_proc_lookup(pid);
+ if (rp) {
+ ErtsProcSysTask *st;
+ erts_aint32_t state;
+ int i;
+
+ st = erts_alloc(ERTS_ALC_T_PROC_SYS_TSK,
+ ERTS_PROC_SYS_TASK_SIZE(0));
+ st->type = ERTS_PSTT_COHMQ;
+ st->requester = NIL;
+ st->reply_tag = NIL;
+ st->req_id = NIL;
+ st->req_id_sz = 0;
+ for (i = 0; i < ERTS_MAX_PROC_SYS_TASK_ARGS; i++)
+ st->arg[i] = NIL;
+ ERTS_INIT_OFF_HEAP(&st->off_heap);
+ state = erts_smp_atomic32_read_nob(&rp->state);
+
+ if (!schedule_process_sys_task(rp, ERTS_PSFLGS_GET_USR_PRIO(state), st))
+ erts_free(ERTS_ALC_T_PROC_SYS_TSK, st);
+ }
+}
+
static void
save_gc_task(Process *c_p, ErtsProcSysTask *st, int prio)
{
@@ -10716,6 +10732,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
Eterm args, /* Arguments for function (must be well-formed list). */
ErlSpawnOpts* so) /* Options for spawn. */
{
+ Uint flags = erts_default_process_flags;
ErtsRunQueue *rq = NULL;
Process *p;
Sint arity; /* Number of arguments. */
@@ -10753,6 +10770,11 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
state |= (((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_ACT_PRIO_OFFSET)
| ((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_USR_PRIO_OFFSET));
+ if (so->flags & SPO_OFF_HEAP_MSGQ) {
+ state |= ERTS_PSFLG_OFF_HEAP_MSGQ;
+ flags |= F_OFF_HEAP_MSGQ;
+ }
+
if (!rq)
rq = erts_get_runq_proc(parent);
@@ -10775,7 +10797,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
BM_SWAP_TIMER(size,system);
heap_need = arg_size;
- p->flags = erts_default_process_flags;
+ p->flags = flags;
p->static_flags = 0;
if (so->flags & SPO_SYSTEM_PROC)
@@ -10824,6 +10846,8 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
p->stop = p->hend = p->heap + sz;
p->htop = p->heap;
p->heap_sz = sz;
+ p->abandoned_heap = NULL;
+ p->live_hf_end = ERTS_INVALID_HFRAG_PTR;
p->catches = 0;
p->bin_vheap_sz = p->min_vheap_size;
@@ -10894,6 +10918,7 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
p->accessor_bif_timers = NULL;
#endif
p->mbuf = NULL;
+ p->msg_frag = NULL;
p->mbuf_sz = 0;
p->psd = NULL;
p->dictionary = NULL;
@@ -11029,6 +11054,8 @@ void erts_init_empty_process(Process *p)
p->stop = NULL;
p->hend = NULL;
p->heap = NULL;
+ p->abandoned_heap = NULL;
+ p->live_hf_end = ERTS_INVALID_HFRAG_PTR;
p->gen_gcs = 0;
p->max_gen_gcs = 0;
p->min_heap_size = 0;
@@ -11061,6 +11088,7 @@ void erts_init_empty_process(Process *p)
p->old_htop = NULL;
p->old_heap = NULL;
p->mbuf = NULL;
+ p->msg_frag = NULL;
p->mbuf_sz = 0;
p->psd = NULL;
ERTS_P_MONITORS(p) = NULL;
@@ -11149,6 +11177,8 @@ erts_debug_verify_clean_empty_process(Process* p)
ASSERT(p->htop == NULL);
ASSERT(p->stop == NULL);
ASSERT(p->hend == NULL);
+ ASSERT(p->abandoned_heap == NULL);
+ ASSERT(p->live_hf_end == ERTS_INVALID_HFRAG_PTR);
ASSERT(p->heap == NULL);
ASSERT(p->common.id == ERTS_INVALID_PID);
ASSERT(ERTS_TRACER_PROC(p) == NIL);
@@ -11226,8 +11256,6 @@ erts_cleanup_empty_process(Process* p)
static void
delete_process(Process* p)
{
- ErlMessage* mp;
-
VERBOSE(DEBUG_PROCESSES, ("Removing process: %T\n",p->common.id));
/* Cleanup psd */
@@ -11283,24 +11311,8 @@ delete_process(Process* p)
erts_erase_dicts(p);
/* free all pending messages */
- mp = p->msg.first;
- while(mp != NULL) {
- ErlMessage* next_mp = mp->next;
- if (mp->data.attached) {
- if (is_value(mp->m[0]))
- free_message_buffer(mp->data.heap_frag);
- else {
- if (is_not_nil(mp->m[1])) {
- ErlHeapFragment *heap_frag;
- heap_frag = (ErlHeapFragment *) mp->data.dist_ext->ext_endp;
- erts_cleanup_offheap(&heap_frag->off_heap);
- }
- erts_free_dist_ext_copy(mp->data.dist_ext);
- }
- }
- free_message(mp);
- mp = next_mp;
- }
+ erts_cleanup_messages(p->msg.first);
+ p->msg.first = NULL;
ASSERT(!p->nodes_monitors);
ASSERT(!p->suspend_monitors);
@@ -11488,6 +11500,9 @@ static ERTS_INLINE void
send_exit_message(Process *to, ErtsProcLocks *to_locksp,
Eterm exit_term, Uint term_size, Eterm token)
{
+ ErtsMessage *mp;
+ ErlOffHeap *ohp;
+
if (token == NIL
#ifdef USE_VM_PROBES
|| token == am_have_dt_utag
@@ -11495,14 +11510,12 @@ send_exit_message(Process *to, ErtsProcLocks *to_locksp,
) {
Eterm* hp;
Eterm mess;
- ErlHeapFragment* bp;
- ErlOffHeap *ohp;
- hp = erts_alloc_message_heap(term_size, &bp, &ohp, to, to_locksp);
+ mp = erts_alloc_message_heap(to, to_locksp,
+ term_size, &hp, &ohp);
mess = copy_struct(exit_term, term_size, &hp, ohp);
- erts_queue_message(to, to_locksp, bp, mess, NIL);
+ erts_queue_message(to, to_locksp, mp, mess, NIL);
} else {
- ErlHeapFragment* bp;
Eterm* hp;
Eterm mess;
Eterm temp_token;
@@ -11510,13 +11523,14 @@ send_exit_message(Process *to, ErtsProcLocks *to_locksp,
ASSERT(is_tuple(token));
sz_token = size_object(token);
- bp = new_message_buffer(term_size+sz_token);
- hp = bp->mem;
- mess = copy_struct(exit_term, term_size, &hp, &bp->off_heap);
+
+ mp = erts_alloc_message_heap(to, to_locksp,
+ term_size+sz_token, &hp, &ohp);
+ mess = copy_struct(exit_term, term_size, &hp, ohp);
/* the trace token must in this case be updated by the caller */
seq_trace_output(token, mess, SEQ_TRACE_SEND, to->common.id, NULL);
- temp_token = copy_struct(token, sz_token, &hp, &bp->off_heap);
- erts_queue_message(to, to_locksp, bp, mess, temp_token);
+ temp_token = copy_struct(token, sz_token, &hp, ohp);
+ erts_queue_message(to, to_locksp, mp, mess, temp_token);
}
}
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index 65422b8c15..c6376c0166 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -916,6 +916,7 @@ struct process {
Eterm* stop; /* Stack top */
Eterm* heap; /* Heap start */
Eterm* hend; /* Heap end */
+ Eterm* abandoned_heap;
Uint heap_sz; /* Size of heap in words */
Uint min_heap_size; /* Minimum size of heap (in words). */
Uint min_vheap_size; /* Minimum size of virtual heap (in words). */
@@ -1012,8 +1013,10 @@ struct process {
Uint16 gen_gcs; /* Number of (minor) generational GCs. */
Uint16 max_gen_gcs; /* Max minor gen GCs before fullsweep. */
ErlOffHeap off_heap; /* Off-heap data updated by copy_struct(). */
- ErlHeapFragment* mbuf; /* Pointer to message buffer list */
- Uint mbuf_sz; /* Size of all message buffers */
+ ErlHeapFragment* mbuf; /* Pointer to heap fragment list */
+ ErlHeapFragment* live_hf_end;
+ ErtsMessage *msg_frag; /* Pointer to message fragment list */
+ Uint mbuf_sz; /* Total size of heap fragments and message fragments */
ErtsPSD *psd; /* Rarely used process specific data */
Uint64 bin_vheap_sz; /* Virtual heap block size for binaries */
@@ -1041,6 +1044,7 @@ struct process {
#ifdef CHECK_FOR_HOLES
Eterm* last_htop; /* No need to scan the heap below this point. */
ErlHeapFragment* last_mbuf; /* No need to scan beyond this mbuf. */
+ ErlHeapFragment* heap_hfrag; /* Heap abandoned, htop now lives in this frag */
#endif
#ifdef DEBUG
@@ -1064,6 +1068,7 @@ extern const Process erts_invalid_process;
do { \
(p)->last_htop = 0; \
(p)->last_mbuf = 0; \
+ (p)->heap_hfrag = NULL; \
} while (0)
# define ERTS_HOLE_CHECK(p) erts_check_for_holes((p))
@@ -1141,14 +1146,15 @@ void erts_check_for_holes(Process* p);
#define ERTS_PSFLG_RUNNING_SYS ERTS_PSFLG_BIT(15)
#define ERTS_PSFLG_PROXY ERTS_PSFLG_BIT(16)
#define ERTS_PSFLG_DELAYED_SYS ERTS_PSFLG_BIT(17)
+#define ERTS_PSFLG_OFF_HEAP_MSGQ ERTS_PSFLG_BIT(18)
#ifdef ERTS_DIRTY_SCHEDULERS
-#define ERTS_PSFLG_DIRTY_CPU_PROC ERTS_PSFLG_BIT(18)
-#define ERTS_PSFLG_DIRTY_IO_PROC ERTS_PSFLG_BIT(19)
-#define ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q ERTS_PSFLG_BIT(20)
-#define ERTS_PSFLG_DIRTY_IO_PROC_IN_Q ERTS_PSFLG_BIT(21)
-#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 22)
+#define ERTS_PSFLG_DIRTY_CPU_PROC ERTS_PSFLG_BIT(19)
+#define ERTS_PSFLG_DIRTY_IO_PROC ERTS_PSFLG_BIT(20)
+#define ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q ERTS_PSFLG_BIT(21)
+#define ERTS_PSFLG_DIRTY_IO_PROC_IN_Q ERTS_PSFLG_BIT(22)
+#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 23)
#else
-#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 18)
+#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 19)
#endif
#define ERTS_PSFLGS_IN_PRQ_MASK (ERTS_PSFLG_IN_PRQ_MAX \
@@ -1196,12 +1202,15 @@ void erts_check_for_holes(Process* p);
#define SPO_USE_ARGS 2
#define SPO_MONITOR 4
#define SPO_SYSTEM_PROC 8
+#define SPO_OFF_HEAP_MSGQ 16
+
+extern int erts_default_spo_flags;
/*
* The following struct contains options for a process to be spawned.
*/
typedef struct {
- Uint flags;
+ int flags;
int error_code; /* Error code returned from create_process(). */
Eterm mref; /* Monitor ref returned (if SPO_MONITOR was given). */
@@ -1283,6 +1292,9 @@ extern struct erts_system_profile_flags_t erts_system_profile_flags;
#define F_P2PNR_RESCHED (1 << 9) /* Process has been rescheduled via erts_pid2proc_not_running() */
#define F_FORCE_GC (1 << 10) /* Force gc at process in-scheduling */
#define F_DISABLE_GC (1 << 11) /* Disable GC */
+#define F_OFF_HEAP_MSGQ (1 << 12) /* Off heap msg queue */
+#define F_OFF_HEAP_MSGQ_CHNG (1 << 13) /* Off heap msg queue changing */
+#define F_ABANDONED_HEAP_USE (1 << 14) /* Have usage of abandoned heap */
/* process trace_flags */
#define F_SENSITIVE (1 << 0)
@@ -1616,6 +1628,7 @@ void erts_schedule_thr_prgr_later_cleanup_op(void (*)(void *),
void *,
ErtsThrPrgrLaterOp *,
UWord);
+void erts_schedule_complete_off_heap_message_queue_change(Eterm pid);
#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
int erts_dbg_check_halloc_lock(Process *p);
@@ -1743,7 +1756,7 @@ Uint erts_debug_nbalance(void);
int erts_debug_wait_completed(Process *c_p, int flags);
-Uint erts_process_memory(Process *c_p);
+Uint erts_process_memory(Process *c_p, int incl_msg_inq);
#ifdef ERTS_SMP
# define ERTS_GET_SCHEDULER_DATA_FROM_PROC(PROC) ((PROC)->scheduler_data)
@@ -2058,6 +2071,22 @@ ERTS_GLB_INLINE void erts_smp_xrunq_unlock(ErtsRunQueue *rq, ErtsRunQueue *xrq);
ERTS_GLB_INLINE void erts_smp_runqs_lock(ErtsRunQueue *rq1, ErtsRunQueue *rq2);
ERTS_GLB_INLINE void erts_smp_runqs_unlock(ErtsRunQueue *rq1, ErtsRunQueue *rq2);
+ERTS_GLB_INLINE ErtsMessage *erts_alloc_message_heap_state(Process *pp,
+ erts_aint32_t *psp,
+ ErtsProcLocks *plp,
+ Uint sz,
+ Eterm **hpp,
+ ErlOffHeap **ohpp);
+ERTS_GLB_INLINE ErtsMessage *erts_alloc_message_heap(Process *pp,
+ ErtsProcLocks *plp,
+ Uint sz,
+ Eterm **hpp,
+ ErlOffHeap **ohpp);
+
+ERTS_GLB_INLINE void erts_shrink_message_heap(ErtsMessage **msgpp, Process *pp,
+ Eterm *start_hp, Eterm *used_hp, Eterm *end_hp,
+ Eterm *brefs, Uint brefs_size);
+
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE
@@ -2206,6 +2235,63 @@ erts_smp_runqs_unlock(ErtsRunQueue *rq1, ErtsRunQueue *rq2)
#endif
}
+ERTS_GLB_INLINE ErtsMessage *
+erts_alloc_message_heap_state(Process *pp,
+ erts_aint32_t *psp,
+ ErtsProcLocks *plp,
+ Uint sz,
+ Eterm **hpp,
+ ErlOffHeap **ohpp)
+{
+ int on_heap;
+
+ if ((*psp) & ERTS_PSFLG_OFF_HEAP_MSGQ) {
+ ErtsMessage *mp = erts_alloc_message(sz, hpp);
+ *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap;
+ return mp;
+ }
+
+ return erts_try_alloc_message_on_heap(pp, psp, plp, sz, hpp, ohpp, &on_heap);
+}
+
+ERTS_GLB_INLINE ErtsMessage *
+erts_alloc_message_heap(Process *pp,
+ ErtsProcLocks *plp,
+ Uint sz,
+ Eterm **hpp,
+ ErlOffHeap **ohpp)
+{
+ erts_aint32_t state = erts_smp_atomic32_read_nob(&pp->state);
+ return erts_alloc_message_heap_state(pp, &state, plp, sz, hpp, ohpp);
+}
+
+ERTS_GLB_INLINE void
+erts_shrink_message_heap(ErtsMessage **msgpp, Process *pp,
+ Eterm *start_hp, Eterm *used_hp, Eterm *end_hp,
+ Eterm *brefs, Uint brefs_size)
+{
+ ASSERT(start_hp <= used_hp && used_hp <= end_hp);
+ if ((*msgpp)->data.attached == ERTS_MSG_COMBINED_HFRAG)
+ *msgpp = erts_shrink_message(*msgpp, used_hp - start_hp,
+ brefs, brefs_size);
+ else if (!(*msgpp)->data.attached) {
+ ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN
+ & erts_proc_lc_my_proc_locks(pp));
+ HRelease(pp, end_hp, used_hp);
+ }
+ else {
+ ErlHeapFragment *hfrag = (*msgpp)->data.heap_frag;
+ if (start_hp != used_hp)
+ hfrag = erts_resize_message_buffer(hfrag, used_hp - start_hp,
+ brefs, brefs_size);
+ else {
+ free_message_buffer(hfrag);
+ hfrag = NULL;
+ }
+ (*msgpp)->data.heap_frag = hfrag;
+ }
+}
+
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
ERTS_GLB_INLINE ErtsAtomCacheMap *erts_get_atom_cache_map(Process *c_p);
diff --git a/erts/emulator/beam/erl_process_dump.c b/erts/emulator/beam/erl_process_dump.c
index 3b8ae11e94..71396561a3 100644
--- a/erts/emulator/beam/erl_process_dump.c
+++ b/erts/emulator/beam/erl_process_dump.c
@@ -78,13 +78,14 @@ erts_deep_process_dump(int to, void *to_arg)
dump_binaries(to, to_arg, all_binaries);
}
-Uint erts_process_memory(Process *p) {
- ErlMessage *mp;
+Uint erts_process_memory(Process *p, int incl_msg_inq) {
+ ErtsMessage *mp;
Uint size = 0;
struct saved_calls *scb;
size += sizeof(Process);
- ERTS_SMP_MSGQ_MV_INQ2PRIVQ(p);
+ if (incl_msg_inq)
+ ERTS_SMP_MSGQ_MV_INQ2PRIVQ(p);
erts_doforall_links(ERTS_P_LINKS(p), &erts_one_link_size, &size);
erts_doforall_monitors(ERTS_P_MONITORS(p), &erts_one_mon_size, &size);
@@ -92,7 +93,7 @@ Uint erts_process_memory(Process *p) {
if (p->old_hend && p->old_heap)
size += (p->old_hend - p->old_heap) * sizeof(Eterm);
- size += p->msg.len * sizeof(ErlMessage);
+ size += p->msg.len * sizeof(ErtsMessage);
for (mp = p->msg.first; mp; mp = mp->next)
if (mp->data.attached)
@@ -119,7 +120,7 @@ static void
dump_process_info(int to, void *to_arg, Process *p)
{
Eterm* sp;
- ErlMessage* mp;
+ ErtsMessage* mp;
int yreg = -1;
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(p);
@@ -657,6 +658,8 @@ erts_dump_extended_process_state(int to, void *to_arg, erts_aint32_t psflg) {
erts_print(to, to_arg, "PROXY"); break;
case ERTS_PSFLG_DELAYED_SYS:
erts_print(to, to_arg, "DELAYED_SYS"); break;
+ case ERTS_PSFLG_OFF_HEAP_MSGQ:
+ erts_print(to, to_arg, "OFF_HEAP_MSGQ"); break;
#ifdef ERTS_DIRTY_SCHEDULERS
case ERTS_PSFLG_DIRTY_CPU_PROC:
erts_print(to, to_arg, "DIRTY_CPU_PROC"); break;
diff --git a/erts/emulator/beam/erl_process_lock.h b/erts/emulator/beam/erl_process_lock.h
index 788348e613..a64c993e8f 100644
--- a/erts/emulator/beam/erl_process_lock.h
+++ b/erts/emulator/beam/erl_process_lock.h
@@ -854,9 +854,6 @@ ERTS_GLB_INLINE void erts_proc_dec_refc(Process *p)
#endif
if (!referred) {
ASSERT(ERTS_PROC_IS_EXITING(p));
- ASSERT(ERTS_AINT_NULL
- == erts_ptab_pix2intptr_ddrb(&erts_proc,
- internal_pid_index(p->common.id)));
erts_free_proc(p);
}
}
@@ -872,9 +869,6 @@ ERTS_GLB_INLINE void erts_proc_add_refc(Process *p, Sint add_refc)
#endif
if (!referred) {
ASSERT(ERTS_PROC_IS_EXITING(p));
- ASSERT(ERTS_AINT_NULL
- == erts_ptab_pix2intptr_ddrb(&erts_proc,
- internal_pid_index(p->common.id)));
erts_free_proc(p);
}
}
diff --git a/erts/emulator/beam/erl_time_sup.c b/erts/emulator/beam/erl_time_sup.c
index 7327e0b48c..7ec64506e8 100644
--- a/erts/emulator/beam/erl_time_sup.c
+++ b/erts/emulator/beam/erl_time_sup.c
@@ -1919,15 +1919,16 @@ send_time_offset_changed_notifications(void *new_offsetp)
ErtsProcLocks rp_locks = ERTS_PROC_LOCK_LINK;
erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK);
if (erts_lookup_monitor(ERTS_P_MONITORS(rp), ref)) {
- ErlHeapFragment *bp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
Eterm message;
- hp = erts_alloc_message_heap(hsz, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks,
+ hsz, &hp, &ohp);
*patch_refp = ref;
ASSERT(hsz == size_object(message_template));
message = copy_struct(message_template, hsz, &hp, ohp);
- erts_queue_message(rp, &rp_locks, bp, message, NIL);
+ erts_queue_message(rp, &rp_locks, mp, message, NIL);
}
erts_smp_proc_unlock(rp, rp_locks);
}
diff --git a/erts/emulator/beam/erl_trace.c b/erts/emulator/beam/erl_trace.c
index e9dd96efc4..d02f1f7213 100644
--- a/erts/emulator/beam/erl_trace.c
+++ b/erts/emulator/beam/erl_trace.c
@@ -114,15 +114,10 @@ void erts_init_trace(void) {
static Eterm system_seq_tracer;
-#ifdef ERTS_SMP
#define ERTS_ALLOC_SYSMSG_HEAP(SZ, BPP, OHPP, UNUSED) \
(*(BPP) = new_message_buffer((SZ)), \
*(OHPP) = &(*(BPP))->off_heap, \
(*(BPP))->mem)
-#else
-#define ERTS_ALLOC_SYSMSG_HEAP(SZ, BPP, OHPP, RPP) \
- erts_alloc_message_heap((SZ), (BPP), (OHPP), (RPP), 0)
-#endif
#ifdef ERTS_SMP
#define ERTS_ENQ_TRACE_MSG(FPID, TPID, MSG, BP) \
@@ -131,8 +126,12 @@ do { \
enqueue_sys_msg_unlocked(SYS_MSG_TYPE_TRACE, (FPID), (TPID), (MSG), (BP)); \
} while(0)
#else
-#define ERTS_ENQ_TRACE_MSG(FPID, TPROC, MSG, BP) \
- erts_queue_message((TPROC), NULL, (BP), (MSG), NIL)
+#define ERTS_ENQ_TRACE_MSG(FPID, TPROC, MSG, BP) \
+ do { \
+ ErtsMessage *mp__ = erts_alloc_message(0, NULL); \
+ mp__->data.heap_frag = (BP); \
+ erts_queue_message((TPROC), NULL, mp__, (MSG), NIL); \
+ } while (0)
#endif
/*
@@ -591,11 +590,9 @@ send_to_port(Process *c_p, Eterm message,
static void
profile_send(Eterm from, Eterm message) {
Uint sz = 0;
- ErlHeapFragment *bp = NULL;
Uint *hp = NULL;
Eterm msg = NIL;
Process *profile_p = NULL;
- ErlOffHeap *off_heap = NULL;
Eterm profiler = erts_get_system_profile();
@@ -621,6 +618,7 @@ profile_send(Eterm from, Eterm message) {
}
} else {
+ ErtsMessage *mp;
ASSERT(is_internal_pid(profiler));
profile_p = erts_proc_lookup(profiler);
@@ -629,10 +627,13 @@ profile_send(Eterm from, Eterm message) {
return;
sz = size_object(message);
- hp = erts_alloc_message_heap(sz, &bp, &off_heap, profile_p, 0);
- msg = copy_struct(message, sz, &hp, &bp->off_heap);
-
- erts_queue_message(profile_p, NULL, bp, msg, NIL);
+ mp = erts_alloc_message(sz, &hp);
+ if (sz == 0)
+ msg = message;
+ else
+ msg = copy_struct(message, sz, &hp, &mp->hfrag.off_heap);
+
+ erts_queue_message(profile_p, NULL, mp, msg, NIL);
}
}
@@ -1233,7 +1234,11 @@ seq_trace_output_generic(Eterm token, Eterm msg, Uint type,
erts_smp_mtx_unlock(&smq_mtx);
#else
/* trace_token must be NIL here */
- erts_queue_message(tracer, NULL, bp, mess, NIL);
+ {
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = bp;
+ erts_queue_message(tracer, NULL, mp, mess, NIL);
+ }
#endif
}
}
@@ -2308,7 +2313,11 @@ monitor_long_schedule_proc(Process *p, BeamInstr *in_fp, BeamInstr *out_fp, Uint
#ifdef ERTS_SMP
enqueue_sys_msg(SYS_MSG_TYPE_SYSMON, p->common.id, NIL, msg, bp);
#else
- erts_queue_message(monitor_p, NULL, bp, msg, NIL);
+ {
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = bp;
+ erts_queue_message(monitor_p, NULL, mp, msg, NIL);
+ }
#endif
}
void
@@ -2369,7 +2378,11 @@ monitor_long_schedule_port(Port *pp, ErtsPortTaskType type, Uint time)
#ifdef ERTS_SMP
enqueue_sys_msg(SYS_MSG_TYPE_SYSMON, pp->common.id, NIL, msg, bp);
#else
- erts_queue_message(monitor_p, NULL, bp, msg, NIL);
+ {
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = bp;
+ erts_queue_message(monitor_p, NULL, mp, msg, NIL);
+ }
#endif
}
@@ -2440,7 +2453,11 @@ monitor_long_gc(Process *p, Uint time) {
#ifdef ERTS_SMP
enqueue_sys_msg(SYS_MSG_TYPE_SYSMON, p->common.id, NIL, msg, bp);
#else
- erts_queue_message(monitor_p, NULL, bp, msg, NIL);
+ {
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = bp;
+ erts_queue_message(monitor_p, NULL, mp, msg, NIL);
+ }
#endif
}
@@ -2511,7 +2528,11 @@ monitor_large_heap(Process *p) {
#ifdef ERTS_SMP
enqueue_sys_msg(SYS_MSG_TYPE_SYSMON, p->common.id, NIL, msg, bp);
#else
- erts_queue_message(monitor_p, NULL, bp, msg, NIL);
+ {
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = bp;
+ erts_queue_message(monitor_p, NULL, mp, msg, NIL);
+ }
#endif
}
@@ -2539,7 +2560,11 @@ monitor_generic(Process *p, Eterm type, Eterm spec) {
#ifdef ERTS_SMP
enqueue_sys_msg(SYS_MSG_TYPE_SYSMON, p->common.id, NIL, msg, bp);
#else
- erts_queue_message(monitor_p, NULL, bp, msg, NIL);
+ {
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = bp;
+ erts_queue_message(monitor_p, NULL, mp, msg, NIL);
+ }
#endif
}
@@ -3331,8 +3356,11 @@ sys_msg_dispatcher_func(void *unused)
goto failure;
}
else {
+ ErtsMessage *mp;
queue_proc_msg:
- erts_queue_message(proc,&proc_locks,smqp->bp,smqp->msg,NIL);
+ mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = smqp->bp;
+ erts_queue_message(proc,&proc_locks,mp,smqp->msg,NIL);
#ifdef DEBUG_PRINTOUTS
erts_fprintf(stderr, "delivered\n");
#endif
diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h
index 052994b972..594c0ccf94 100644
--- a/erts/emulator/beam/global.h
+++ b/erts/emulator/beam/global.h
@@ -1274,11 +1274,11 @@ int erts_print_system_version(int to, void *arg, Process *c_p);
int erts_hibernate(Process* c_p, Eterm module, Eterm function, Eterm args, Eterm* reg);
-ERTS_GLB_INLINE int erts_is_literal(Eterm tptr, Eterm *ptr);
+ERTS_GLB_FORCE_INLINE int erts_is_literal(Eterm tptr, Eterm *ptr);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
-ERTS_GLB_INLINE int erts_is_literal(Eterm tptr, Eterm *ptr)
+ERTS_GLB_FORCE_INLINE int erts_is_literal(Eterm tptr, Eterm *ptr)
{
ASSERT(is_boxed(tptr) || is_list(tptr));
ASSERT(ptr == ptr_val(tptr));
@@ -1347,124 +1347,6 @@ extern erts_driver_t fd_driver;
int erts_beam_jump_table(void);
-/* Should maybe be placed in erl_message.h, but then we get an include mess. */
-ERTS_GLB_INLINE Eterm *
-erts_alloc_message_heap_state(Uint size,
- ErlHeapFragment **bpp,
- ErlOffHeap **ohpp,
- Process *receiver,
- ErtsProcLocks *receiver_locks,
- erts_aint32_t *statep);
-
-ERTS_GLB_INLINE Eterm *
-erts_alloc_message_heap(Uint size,
- ErlHeapFragment **bpp,
- ErlOffHeap **ohpp,
- Process *receiver,
- ErtsProcLocks *receiver_locks);
-
-#if ERTS_GLB_INLINE_INCL_FUNC_DEF
-
-/*
- * NOTE: erts_alloc_message_heap() releases msg q and status
- * lock on receiver without ensuring that other locks are
- * held. User is responsible to ensure that the receiver
- * pointer cannot become invalid until after message has
- * been passed. This is normal done either by increasing
- * reference count on process (preferred) or by holding
- * main or link lock over the whole message passing
- * operation.
- */
-
-ERTS_GLB_INLINE Eterm *
-erts_alloc_message_heap_state(Uint size,
- ErlHeapFragment **bpp,
- ErlOffHeap **ohpp,
- Process *receiver,
- ErtsProcLocks *receiver_locks,
- erts_aint32_t *statep)
-{
- Eterm *hp;
- erts_aint32_t state;
-#ifdef ERTS_SMP
- int locked_main = 0;
- state = erts_smp_atomic32_read_acqb(&receiver->state);
- if (statep)
- *statep = state;
- if (state & (ERTS_PSFLG_EXITING
- | ERTS_PSFLG_PENDING_EXIT))
- goto allocate_in_mbuf;
-#endif
-
- if (size > (Uint) INT_MAX)
- erl_exit(ERTS_ABORT_EXIT, "HUGE size (%beu)\n", size);
-
- if (
-#if defined(ERTS_SMP)
- *receiver_locks & ERTS_PROC_LOCK_MAIN
-#else
- 1
-#endif
- ) {
-#ifdef ERTS_SMP
- try_allocate_on_heap:
-#endif
- state = erts_smp_atomic32_read_nob(&receiver->state);
- if (statep)
- *statep = state;
- if ((state & (ERTS_PSFLG_EXITING
- | ERTS_PSFLG_PENDING_EXIT))
- || (receiver->flags & F_DISABLE_GC)
- || HEAP_LIMIT(receiver) - HEAP_TOP(receiver) <= size) {
- /*
- * The heap is either potentially in an inconsistent
- * state, or not large enough.
- */
-#ifdef ERTS_SMP
- if (locked_main) {
- *receiver_locks &= ~ERTS_PROC_LOCK_MAIN;
- erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MAIN);
- }
-#endif
- goto allocate_in_mbuf;
- }
- hp = HEAP_TOP(receiver);
- HEAP_TOP(receiver) = hp + size;
- *bpp = NULL;
- *ohpp = &MSO(receiver);
- }
-#ifdef ERTS_SMP
- else if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MAIN) == 0) {
- locked_main = 1;
- *receiver_locks |= ERTS_PROC_LOCK_MAIN;
- goto try_allocate_on_heap;
- }
-#endif
- else {
- ErlHeapFragment *bp;
- allocate_in_mbuf:
- bp = new_message_buffer(size);
- hp = bp->mem;
- *bpp = bp;
- *ohpp = &bp->off_heap;
- }
-
- return hp;
-}
-
-ERTS_GLB_INLINE Eterm *
-erts_alloc_message_heap(Uint size,
- ErlHeapFragment **bpp,
- ErlOffHeap **ohpp,
- Process *receiver,
- ErtsProcLocks *receiver_locks)
-{
- return erts_alloc_message_heap_state(size, bpp, ohpp, receiver,
- receiver_locks, NULL);
-}
-
-#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
-
#define DeclareTmpHeap(VariableName,Size,Process) \
Eterm VariableName[Size]
#define DeclareTypedTmpHeap(Type,VariableName,Process) \
@@ -1522,6 +1404,7 @@ dtrace_fun_decode(Process *process,
erts_snprintf(mfa_buf, DTRACE_TERM_BUF_SIZE, "%T:%T/%d",
module, function, arity);
}
+
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
#endif /* !__GLOBAL_H__ */
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index fdd26fcc4b..1b0c617632 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -1410,7 +1410,7 @@ queue_port_sched_op_reply(Process *rp,
erts_factory_trim_and_close(factory, &msg, 1);
- erts_queue_message(rp, rp_locksp, factory->heap_frags, msg, NIL);
+ erts_queue_message(rp, rp_locksp, factory->message, msg, NIL);
}
static void
@@ -1418,12 +1418,9 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg)
{
Process *rp = erts_proc_lookup_raw(to);
if (rp) {
- ErlOffHeap *ohp;
- ErlHeapFragment* bp;
ErtsHeapFactory factory;
Eterm msg_copy;
Uint hsz, msg_sz;
- Eterm *hp;
ErtsProcLocks rp_locks = 0;
hsz = ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
@@ -1434,18 +1431,13 @@ port_sched_op_reply(Eterm to, Uint32 *ref_num, Eterm msg)
hsz += msg_sz;
}
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
- erts_factory_message_init(&factory, rp, hp, bp);
- if (is_immed(msg))
- msg_copy = msg;
- else {
- msg_copy = copy_struct(msg, msg_sz, &hp, ohp);
- factory.hp = hp;
- }
+ (void) erts_factory_message_create(&factory, rp,
+ &rp_locks, hsz);
+ msg_copy = (is_immed(msg)
+ ? msg
+ : copy_struct(msg, msg_sz,
+ &factory.hp,
+ factory.off_heap));
queue_port_sched_op_reply(rp,
&rp_locks,
@@ -3050,16 +3042,17 @@ deliver_result(Eterm sender, Eterm pid, Eterm res)
if (rp) {
Eterm tuple;
- ErlHeapFragment *bp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
Eterm* hp;
Uint sz_res;
sz_res = size_object(res);
- hp = erts_alloc_message_heap(sz_res + 3, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks,
+ sz_res + 3, &hp, &ohp);
res = copy_struct(res, sz_res, &hp, ohp);
tuple = TUPLE2(hp, sender, res);
- erts_queue_message(rp, &rp_locks, bp, tuple, NIL);
+ erts_queue_message(rp, &rp_locks, mp, tuple, NIL);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
@@ -3087,7 +3080,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
Eterm tuple;
Process* rp;
Eterm* hp;
- ErlHeapFragment *bp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
@@ -3113,7 +3106,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
if (!rp)
return;
- hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, need, &hp, &ohp);
listp = NIL;
if ((state & ERTS_PORT_SFLG_BINARY_IO) == 0) {
@@ -3155,7 +3148,7 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
tuple = TUPLE2(hp, prt->common.id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
@@ -3229,7 +3222,7 @@ deliver_vec_message(Port* prt, /* Port */
Eterm tuple;
Process* rp;
Eterm* hp;
- ErlHeapFragment *bp;
+ ErtsMessage *mp;
ErlOffHeap *ohp;
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
@@ -3261,7 +3254,7 @@ deliver_vec_message(Port* prt, /* Port */
need += (hlen+csize)*2;
}
- hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, need, &hp, &ohp);
listp = NIL;
iov += vsize;
@@ -3322,7 +3315,7 @@ deliver_vec_message(Port* prt, /* Port */
tuple = TUPLE2(hp, prt->common.id, tuple);
hp += 3;
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
erts_proc_dec_refc(rp);
@@ -3813,7 +3806,6 @@ write_port_control_result(int control_flags,
ErlDrvSizeT resp_size,
char *pre_alloc_buf,
Eterm **hpp,
- ErlHeapFragment *bp,
ErlOffHeap *ohp)
{
Eterm res;
@@ -3887,9 +3879,6 @@ port_sig_control(Port *prt,
if (res == ERTS_PORT_OP_DONE) {
Eterm msg;
- Eterm *hp;
- ErlHeapFragment *bp;
- ErlOffHeap *ohp;
ErtsHeapFactory factory;
Process *rp;
ErtsProcLocks rp_locks = 0;
@@ -3909,22 +3898,15 @@ port_sig_control(Port *prt,
hsz = rsz + ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
- erts_factory_message_init(&factory, rp, hp, bp);
+ (void) erts_factory_message_create(&factory, rp,
+ &rp_locks, hsz);
msg = write_port_control_result(control_flags,
resp_bufp,
resp_size,
&resp_buf[0],
- &hp,
- bp,
- ohp);
- factory.hp = hp;
-
+ &factory.hp,
+ factory.off_heap);
queue_port_sched_op_reply(rp,
&rp_locks,
&factory,
@@ -4065,7 +4047,6 @@ erts_port_control(Process* c_p,
resp_size,
&resp_buf[0],
&hp,
- NULL,
&c_p->off_heap);
BUMP_REDS(c_p, ERTS_PORT_REDS_CONTROL);
return ERTS_PORT_OP_DONE;
@@ -4224,21 +4205,14 @@ port_sig_call(Port *prt,
hsz = erts_decode_ext_size((byte *) resp_bufp, resp_size);
if (hsz >= 0) {
- ErlHeapFragment* bp;
- ErlOffHeap* ohp;
ErtsHeapFactory factory;
byte *endp;
hsz += 3; /* ok tuple */
hsz += ERTS_QUEUE_PORT_SCHED_OP_REPLY_SIZE;
- hp = erts_alloc_message_heap(hsz,
- &bp,
- &ohp,
- rp,
- &rp_locks);
+ (void) erts_factory_message_create(&factory, rp, &rp_locks, hsz);
endp = (byte *) resp_bufp;
- erts_factory_message_init(&factory, rp, hp, bp);
msg = erts_decode_ext(&factory, &endp);
if (is_value(msg)) {
hp = erts_produce_heap(&factory,
@@ -4499,7 +4473,9 @@ port_sig_info(Port *prt,
sigdp->u.info.item);
if (is_value(value)) {
ErtsHeapFactory factory;
- erts_factory_message_init(&factory, NULL, hp, bp);
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = bp;
+ erts_factory_selfcontained_message_init(&factory, mp, hp);
queue_port_sched_op_reply(rp,
&rp_locks,
&factory,
@@ -4587,8 +4563,8 @@ reply_io_bytes(void *vreq)
rp = erts_proc_lookup(req->pid);
if (rp) {
- ErlOffHeap *ohp = NULL;
- ErlHeapFragment *bp = NULL;
+ ErlOffHeap *ohp;
+ ErtsMessage *mp;
ErtsProcLocks rp_locks;
Eterm ref, msg, ein, eout, *hp;
Uint64 in, out;
@@ -4610,7 +4586,7 @@ reply_io_bytes(void *vreq)
erts_bld_uint64(NULL, &hsz, in);
erts_bld_uint64(NULL, &hsz, out);
- hp = erts_alloc_message_heap(hsz, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, hsz, &hp, &ohp);
ref = make_internal_ref(hp);
write_ref_thing(hp, req->refn[0], req->refn[1], req->refn[2]);
@@ -4620,7 +4596,7 @@ reply_io_bytes(void *vreq)
eout = erts_bld_uint64(&hp, NULL, out);
msg = TUPLE4(hp, ref, make_small(sched_id), ein, eout);
- erts_queue_message(rp, &rp_locks, bp, msg, NIL);
+ erts_queue_message(rp, &rp_locks, mp, msg, NIL);
if (req->sched_id == sched_id)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
@@ -5065,11 +5041,11 @@ ErlDrvTermData driver_mk_term_nil(void)
void driver_report_exit(ErlDrvPort ix, int status)
{
Eterm* hp;
+ ErlOffHeap *ohp;
Eterm tuple;
Process *rp;
Eterm pid;
- ErlHeapFragment *bp = NULL;
- ErlOffHeap *ohp;
+ ErtsMessage *mp;
ErtsProcLocks rp_locks = 0;
int scheduler = erts_get_scheduler_id() != 0;
Port* prt = erts_drvport2port(ix);
@@ -5089,13 +5065,13 @@ void driver_report_exit(ErlDrvPort ix, int status)
if (!rp)
return;
- hp = erts_alloc_message_heap(3+3, &bp, &ohp, rp, &rp_locks);
+ mp = erts_alloc_message_heap(rp, &rp_locks, 3+3, &hp, &ohp);
tuple = TUPLE2(hp, am_exit_status, make_small(status));
hp += 3;
tuple = TUPLE2(hp, prt->common.id, tuple);
- erts_queue_message(rp, &rp_locks, bp, tuple, am_undefined);
+ erts_queue_message(rp, &rp_locks, mp, tuple, am_undefined);
erts_smp_proc_unlock(rp, rp_locks);
if (!scheduler)
@@ -5205,7 +5181,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
ErtsProcLocks rp_locks = 0;
struct b2t_states__ b2t;
int scheduler;
- int is_heap_need_limited = 1;
ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_UNDEF(mess,NIL);
@@ -5374,9 +5349,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
need += hsz;
ptr += 2;
depth++;
- if (size > MAP_SMALL_MAP_LIMIT*3) { /* may contain big map */
- is_heap_need_limited = 0;
- }
break;
}
case ERL_DRV_MAP: { /* int */
@@ -5384,7 +5356,6 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
if ((int) ptr[0] < 0) ERTS_DDT_FAIL;
if (ptr[0] > MAP_SMALL_MAP_LIMIT) {
need += HASHMAP_ESTIMATED_HEAP_SIZE(ptr[0]);
- is_heap_need_limited = 0;
} else {
need += MAP_HEADER_FLATMAP_SZ + 1 + 2*ptr[0];
}
@@ -5423,17 +5394,7 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
goto done;
}
- /* Try copy directly to destination heap if we know there are no big maps */
- if (is_heap_need_limited) {
- ErlOffHeap *ohp;
- ErlHeapFragment* bp;
- Eterm* hp = erts_alloc_message_heap(need, &bp, &ohp, rp, &rp_locks);
- erts_factory_message_init(&factory, rp, hp, bp);
- }
- else {
- erts_factory_message_init(&factory, NULL, NULL,
- new_message_buffer(need));
- }
+ (void) erts_factory_message_create(&factory, rp, &rp_locks, need);
/*
* Interpret the instructions and build the term.
@@ -5702,9 +5663,9 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
if (res > 0) {
mess = ESTACK_POP(stack); /* get resulting value */
- erts_factory_close(&factory);
+ erts_factory_trim_and_close(&factory, &mess, 1);
/* send message */
- erts_queue_message(rp, &rp_locks, factory.heap_frags, mess, am_undefined);
+ erts_queue_message(rp, &rp_locks, factory.message, mess, am_undefined);
}
else {
if (b2t.ix > b2t.used)
diff --git a/erts/emulator/beam/sys.h b/erts/emulator/beam/sys.h
index 5db7ee6d7c..90e16ca14f 100644
--- a/erts/emulator/beam/sys.h
+++ b/erts/emulator/beam/sys.h
@@ -70,8 +70,10 @@
#endif
#if ERTS_CAN_INLINE
+#define ERTS_GLB_FORCE_INLINE static ERTS_FORCE_INLINE
#define ERTS_GLB_INLINE static ERTS_INLINE
#else
+#define ERTS_GLB_FORCE_INLINE
#define ERTS_GLB_INLINE
#endif
diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c
index a741e2e2e6..e03113b8cc 100644
--- a/erts/emulator/beam/utils.c
+++ b/erts/emulator/beam/utils.c
@@ -110,7 +110,6 @@ Eterm*
erts_heap_alloc(Process* p, Uint need, Uint xtra)
{
ErlHeapFragment* bp;
- Eterm* htop;
Uint n;
#if defined(DEBUG) || defined(CHECK_FOR_HOLES)
Uint i;
@@ -156,16 +155,6 @@ erts_heap_alloc(Process* p, Uint need, Uint xtra)
n--;
#endif
- /*
- * When we have created a heap fragment, we are no longer allowed
- * to store anything more on the heap.
- */
- htop = HEAP_TOP(p);
- if (htop < HEAP_LIMIT(p)) {
- *htop = make_pos_bignum_header(HEAP_LIMIT(p)-htop-1);
- HEAP_TOP(p) = HEAP_LIMIT(p);
- }
-
bp->next = MBUF(p);
MBUF(p) = bp;
bp->alloc_size = n;
@@ -2285,7 +2274,11 @@ static void do_send_logger_message(Eterm *hp, ErlOffHeap *ohp, ErlHeapFragment *
erts_queue_error_logger_message(from, message, bp);
}
#else
- erts_queue_message(p, NULL /* only used for smp build */, bp, message, NIL);
+ {
+ ErtsMessage *mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = bp;
+ erts_queue_message(p, NULL /* only used for smp build */, mp, message, NIL);
+ }
#endif
}
diff --git a/erts/emulator/hipe/hipe_gc.c b/erts/emulator/hipe/hipe_gc.c
index 2c747771ac..2e19bf88bf 100644
--- a/erts/emulator/hipe/hipe_gc.c
+++ b/erts/emulator/hipe/hipe_gc.c
@@ -46,10 +46,6 @@ Eterm *fullsweep_nstack(Process *p, Eterm *n_htop)
/* arch-specific nstack walk state */
struct nstack_walk_state walk_state;
- /* fullsweep-specific state */
- char *src, *oh;
- Uint src_size, oh_size;
-
if (!p->hipe.nstack) {
ASSERT(!p->hipe.nsp && !p->hipe.nstend);
return n_htop;
@@ -66,11 +62,6 @@ Eterm *fullsweep_nstack(Process *p, Eterm *n_htop)
sdesc = nstack_walk_init_sdesc(p, &walk_state);
- src = (char*)HEAP_START(p);
- src_size = (char*)HEAP_TOP(p) - src;
- oh = (char*)OLD_HEAP(p);
- oh_size = (char*)OLD_HTOP(p) - oh;
-
for (;;) {
if (nstack_walk_nsp_reached_end(nsp, nsp_end)) {
if (nsp == nsp_end) {
@@ -97,8 +88,7 @@ Eterm *fullsweep_nstack(Process *p, Eterm *n_htop)
if (IS_MOVED_BOXED(val)) {
ASSERT(is_boxed(val));
*nsp_i = val;
- } else if (in_area(ptr, src, src_size) ||
- in_area(ptr, oh, oh_size)) {
+ } else if (!erts_is_literal(gval, ptr)) {
MOVE_BOXED(ptr, val, n_htop, nsp_i);
}
} else if (is_list(gval)) {
@@ -106,8 +96,7 @@ Eterm *fullsweep_nstack(Process *p, Eterm *n_htop)
Eterm val = *ptr;
if (IS_MOVED_CONS(val)) {
*nsp_i = ptr[1];
- } else if (in_area(ptr, src, src_size) ||
- in_area(ptr, oh, oh_size)) {
+ } else if (!erts_is_literal(gval, ptr)) {
ASSERT(within(ptr, p));
MOVE_CONS(ptr, val, n_htop, nsp_i);
}
@@ -139,11 +128,13 @@ void gensweep_nstack(Process *p, Eterm **ptr_old_htop, Eterm **ptr_n_htop)
unsigned int mask;
/* arch-specific nstack walk state */
struct nstack_walk_state walk_state;
+ char *oh;
+ Uint oh_size;
/* gensweep-specific state */
Eterm *old_htop, *n_htop;
- char *heap;
- Uint heap_size, mature_size;
+ char *mature;
+ Uint mature_size;
if (!p->hipe.nstack) {
ASSERT(!p->hipe.nsp && !p->hipe.nstend);
@@ -168,9 +159,10 @@ void gensweep_nstack(Process *p, Eterm **ptr_old_htop, Eterm **ptr_n_htop)
old_htop = *ptr_old_htop;
n_htop = *ptr_n_htop;
- heap = (char*)HEAP_START(p);
- heap_size = (char*)HEAP_TOP(p) - heap;
- mature_size = (char*)HIGH_WATER(p) - heap;
+ mature = (char *) (p->abandoned_heap ? p->abandoned_heap : p->heap);
+ mature_size = (char*)HIGH_WATER(p) - mature;
+ oh = (char*)OLD_HEAP(p);
+ oh_size = (char*)OLD_HTOP(p) - oh;
for (;;) {
if (nstack_walk_nsp_reached_end(nsp, nsp_end)) {
@@ -209,9 +201,9 @@ void gensweep_nstack(Process *p, Eterm **ptr_old_htop, Eterm **ptr_n_htop)
if (IS_MOVED_BOXED(val)) {
ASSERT(is_boxed(val));
*nsp_i = val;
- } else if (in_area(ptr, heap, mature_size)) {
+ } else if (ErtsInArea(ptr, mature, mature_size)) {
MOVE_BOXED(ptr, val, old_htop, nsp_i);
- } else if (in_area(ptr, heap, heap_size)) {
+ } else if (ErtsInYoungGen(gval, ptr, oh, oh_size)) {
ASSERT(within(ptr, p));
MOVE_BOXED(ptr, val, n_htop, nsp_i);
}
@@ -220,9 +212,9 @@ void gensweep_nstack(Process *p, Eterm **ptr_old_htop, Eterm **ptr_n_htop)
Eterm val = *ptr;
if (IS_MOVED_CONS(val)) {
*nsp_i = ptr[1];
- } else if (in_area(ptr, heap, mature_size)) {
+ } else if (ErtsInArea(ptr, mature, mature_size)) {
MOVE_CONS(ptr, val, old_htop, nsp_i);
- } else if (in_area(ptr, heap, heap_size)) {
+ } else if (ErtsInYoungGen(gval, ptr, oh, oh_size)) {
ASSERT(within(ptr, p));
MOVE_CONS(ptr, val, n_htop, nsp_i);
}
diff --git a/erts/emulator/hipe/hipe_native_bif.c b/erts/emulator/hipe/hipe_native_bif.c
index 98bda43f0e..ceae3497c5 100644
--- a/erts/emulator/hipe/hipe_native_bif.c
+++ b/erts/emulator/hipe/hipe_native_bif.c
@@ -160,13 +160,22 @@ BIF_RETTYPE hipe_set_timeout(BIF_ALIST_1)
*/
void hipe_select_msg(Process *p)
{
- ErlMessage *msgp;
+ ErtsMessage *msgp;
msgp = PEEK_MESSAGE(p);
UNLINK_MESSAGE(p, msgp); /* decrements global 'erts_proc_tot_mem' variable */
JOIN_MESSAGE(p);
CANCEL_TIMER(p); /* calls erts_cancel_proc_timer() */
- free_message(msgp);
+ erts_save_message_in_proc(p, msgp);
+ p->flags &= ~F_DISABLE_GC;
+ if (ERTS_IS_GC_DESIRED(p)) {
+ /*
+ * We want to GC soon but we leave a few
+ * reductions giving the message some time
+ * to turn into garbage.
+ */
+ ERTS_VBUMP_LEAVE_REDS(p, 5);
+ }
}
void hipe_fclearerror_error(Process *p)
@@ -511,8 +520,9 @@ int hipe_bs_validate_unicode_retract(ErlBinMatchBuffer* mb, Eterm arg)
*/
Eterm hipe_check_get_msg(Process *c_p)
{
- Eterm ret;
- ErlMessage *msgp;
+ ErtsMessage *msgp;
+
+ c_p->flags |= F_DISABLE_GC;
next_message:
@@ -534,25 +544,29 @@ Eterm hipe_check_get_msg(Process *c_p)
/* XXX: BEAM doesn't need this */
c_p->hipe_smp.have_receive_locks = 1;
#endif
+ c_p->flags &= ~F_DISABLE_GC;
return THE_NON_VALUE;
#ifdef ERTS_SMP
}
#endif
}
- ErtsMoveMsgAttachmentIntoProc(msgp, c_p, c_p->stop, HEAP_TOP(c_p),
- c_p->fcalls, (void) 0, (void) 0);
- ret = ERL_MESSAGE_TERM(msgp);
- if (is_non_value(ret)) {
+
+ if (is_non_value(ERL_MESSAGE_TERM(msgp))
+ && !erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) {
/*
* A corrupt distribution message that we weren't able to decode;
* remove it...
*/
ASSERT(!msgp->data.attached);
UNLINK_MESSAGE(c_p, msgp);
- free_message(msgp);
+ msgp->next = NULL;
+ erts_cleanup_messages(msgp);
goto next_message;
}
- return ret;
+
+ ASSERT(is_value(ERL_MESSAGE_TERM(msgp)));
+
+ return ERL_MESSAGE_TERM(msgp);
}
/*
diff --git a/erts/etc/common/erlexec.c b/erts/etc/common/erlexec.c
index cde0b25a2a..461957be10 100644
--- a/erts/etc/common/erlexec.c
+++ b/erts/etc/common/erlexec.c
@@ -155,6 +155,12 @@ static char *plusr_val_switches[] = {
NULL
};
+/* +x arguments with values */
+static char *plusx_val_switches[] = {
+ "ohmq",
+ NULL
+};
+
/* +z arguments with values */
static char *plusz_val_switches[] = {
"dbbl",
@@ -975,6 +981,20 @@ int main(int argc, char **argv)
add_Eargs(argv[i+1]);
i++;
break;
+ case 'x':
+ if (!is_one_of_strings(&argv[i][2], plusx_val_switches)) {
+ goto the_default;
+ } else {
+ if (i+1 >= argc
+ || argv[i+1][0] == '-'
+ || argv[i+1][0] == '+')
+ usage(argv[i]);
+ argv[i][0] = '-';
+ add_Eargs(argv[i]);
+ add_Eargs(argv[i+1]);
+ i++;
+ }
+ break;
case 'z':
if (!is_one_of_strings(&argv[i][2], plusz_val_switches)) {
goto the_default;
@@ -1175,7 +1195,7 @@ usage_aux(void)
"[+S NO_SCHEDULERS:NO_SCHEDULERS_ONLINE] "
"[+SP PERCENTAGE_SCHEDULERS:PERCENTAGE_SCHEDULERS_ONLINE] "
"[+T LEVEL] [+V] [+v] "
- "[+W<i|w|e>] [+z MISC_OPTION] [args ...]\n");
+ "[+W<i|w|e>] [+x DEFAULT_PROC_FLAGS] [+z MISC_OPTION] [args ...]\n");
exit(1);
}
diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam
index 863a5e61ef..641fac2d26 100644
--- a/erts/preloaded/ebin/erlang.beam
+++ b/erts/preloaded/ebin/erlang.beam
Binary files differ
diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl
index 291356c7b1..e46d64eb0a 100644
--- a/erts/preloaded/src/erlang.erl
+++ b/erts/preloaded/src/erlang.erl
@@ -2044,6 +2044,9 @@ open_port(_PortName,_PortSettings) ->
(min_bin_vheap_size, MinBinVHeapSize) -> OldMinBinVHeapSize when
MinBinVHeapSize :: non_neg_integer(),
OldMinBinVHeapSize :: non_neg_integer();
+ (off_heap_message_queue, OHMQ) -> OldOHMQ when
+ OHMQ :: boolean(),
+ OldOHMQ :: boolean();
(priority, Level) -> OldLevel when
Level :: priority_level(),
OldLevel :: priority_level();
@@ -2082,6 +2085,7 @@ process_flag(_Flag, _Value) ->
min_bin_vheap_size |
monitored_by |
monitors |
+ off_heap_message_queue |
priority |
reductions |
registered_name |
@@ -2123,6 +2127,7 @@ process_flag(_Flag, _Value) ->
{monitors,
Monitors :: [{process, Pid :: pid() |
{RegName :: atom(), Node :: node()}}]} |
+ {off_heap_message_queue, OHMQ :: boolean()} |
{priority, Level :: priority_level()} |
{reductions, Number :: non_neg_integer()} |
{registered_name, Atom :: atom()} |
@@ -2425,6 +2430,7 @@ tuple_to_list(_Tuple) ->
(multi_scheduling) -> disabled | blocked | enabled;
(multi_scheduling_blockers) -> [PID :: pid()];
(nif_version) -> string();
+ (off_heap_message_queue) -> boolean();
(otp_release) -> string();
(os_monotonic_time_source) -> [{atom(),term()}];
(os_system_time_source) -> [{atom(),term()}];
@@ -2552,14 +2558,19 @@ spawn_monitor(M, F, A) when erlang:is_atom(M),
spawn_monitor(M, F, A) ->
erlang:error(badarg, [M,F,A]).
+
+-type spawn_opt_option() ::
+ link
+ | monitor
+ | {priority, Level :: priority_level()}
+ | {fullsweep_after, Number :: non_neg_integer()}
+ | {min_heap_size, Size :: non_neg_integer()}
+ | {min_bin_vheap_size, VSize :: non_neg_integer()}
+ | {off_heap_message_queue, OHMQ :: boolean()}.
+
-spec spawn_opt(Fun, Options) -> pid() | {pid(), reference()} when
Fun :: function(),
- Options :: [Option],
- Option :: link | monitor
- | {priority, Level :: priority_level()}
- | {fullsweep_after, Number :: non_neg_integer()}
- | {min_heap_size, Size :: non_neg_integer()}
- | {min_bin_vheap_size, VSize :: non_neg_integer()}.
+ Options :: [spawn_opt_option()].
spawn_opt(F, O) when erlang:is_function(F) ->
spawn_opt(erlang, apply, [F, []], O);
spawn_opt({M,F}=MF, O) when erlang:is_atom(M), erlang:is_atom(F) ->
@@ -2572,12 +2583,7 @@ spawn_opt(F, O) ->
-spec spawn_opt(Node, Fun, Options) -> pid() | {pid(), reference()} when
Node :: node(),
Fun :: function(),
- Options :: [Option],
- Option :: link | monitor
- | {priority, Level :: priority_level()}
- | {fullsweep_after, Number :: non_neg_integer()}
- | {min_heap_size, Size :: non_neg_integer()}
- | {min_bin_vheap_size, VSize :: non_neg_integer()}.
+ Options :: [spawn_opt_option()].
spawn_opt(N, F, O) when N =:= erlang:node() ->
spawn_opt(F, O);
spawn_opt(N, F, O) when erlang:is_function(F) ->
@@ -2664,12 +2670,7 @@ spawn_link(N,M,F,A) ->
Module :: module(),
Function :: atom(),
Args :: [term()],
- Options :: [Option],
- Option :: link | monitor
- | {priority, Level :: priority_level()}
- | {fullsweep_after, Number :: non_neg_integer()}
- | {min_heap_size, Size :: non_neg_integer()}
- | {min_bin_vheap_size, VSize :: non_neg_integer()}.
+ Options :: [spawn_opt_option()].
spawn_opt(M, F, A, Opts) ->
case catch erlang:spawn_opt({M,F,A,Opts}) of
{'EXIT',{Reason,_}} ->
@@ -2684,12 +2685,7 @@ spawn_opt(M, F, A, Opts) ->
Module :: module(),
Function :: atom(),
Args :: [term()],
- Options :: [Option],
- Option :: link | monitor
- | {priority, Level :: priority_level()}
- | {fullsweep_after, Number :: non_neg_integer()}
- | {min_heap_size, Size :: non_neg_integer()}
- | {min_bin_vheap_size, VSize :: non_neg_integer()}.
+ Options :: [spawn_opt_option()].
spawn_opt(N, M, F, A, O) when N =:= erlang:node(),
erlang:is_atom(M), erlang:is_atom(F),
erlang:is_list(A), erlang:is_list(O) ->