diff options
-rw-r--r-- | erts/doc/src/erl.xml | 10 | ||||
-rw-r--r-- | erts/doc/src/erlang.xml | 94 | ||||
-rw-r--r-- | erts/emulator/beam/atom.names | 5 | ||||
-rw-r--r-- | erts/emulator/beam/bif.c | 30 | ||||
-rw-r--r-- | erts/emulator/beam/erl_bif_info.c | 37 | ||||
-rw-r--r-- | erts/emulator/beam/erl_gc.c | 195 | ||||
-rw-r--r-- | erts/emulator/beam/erl_init.c | 21 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.c | 160 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.h | 2 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 4 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 21 | ||||
-rw-r--r-- | erts/emulator/test/Makefile | 2 | ||||
-rw-r--r-- | erts/emulator/test/message_queue_data_SUITE.erl (renamed from erts/emulator/test/off_heap_message_queue_SUITE.erl) | 85 | ||||
-rw-r--r-- | erts/etc/common/erlexec.c | 2 | ||||
-rw-r--r-- | erts/preloaded/src/erlang.erl | 17 |
15 files changed, 519 insertions, 166 deletions
diff --git a/erts/doc/src/erl.xml b/erts/doc/src/erl.xml index c4eb0e16ec..b6fa4c254c 100644 --- a/erts/doc/src/erl.xml +++ b/erts/doc/src/erl.xml @@ -1338,14 +1338,14 @@ <item> <p>Default process flag settings.</p> <taglist> - <tag><marker id="+xohmq"><c>+xohmq true|false</c></marker></tag> + <tag><marker id="+xmqd"><c>+xmqd off_heap|on_heap|mixed</c></marker></tag> <item><p> Sets the default value for the process flag - <c>off_heap_message_queue</c>. If <c>+xohmq</c> is not - passed, <c>false</c> will be the default. For more information, + <c>message_queue_data</c>. If <c>+xmqd</c> is not + passed, <c>mixed</c> will be the default. For more information, see the documentation of - <seealso marker="erlang#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue, - OHMQ)</c></seealso>. + <seealso marker="erlang#process_flag_message_queue_data"><c>process_flag(message_queue_data, + MQD)</c></seealso>. </p></item> </taglist> </item> diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml index 2e82bb62a9..6ed03f3dfc 100644 --- a/erts/doc/src/erlang.xml +++ b/erts/doc/src/erlang.xml @@ -59,6 +59,12 @@ </datatype> <datatype> + <name name="message_queue_data"></name> + <desc><p>See <seealso marker="#process_flag_message_queue_data"><c>erlang:process_flag(message_queue_data, MQD)</c></seealso>.</p> + </desc> + </datatype> + + <datatype> <name name="timestamp"></name> <desc><p>See <seealso marker="#timestamp/0">erlang:timestamp/0</seealso>.</p> </desc> @@ -4280,39 +4286,52 @@ os_prompt% </pre> <p>Returns the old value of the flag.</p> </desc> </func> - <marker id="process_flag_off_heap_message_queue"/> + <marker id="process_flag_message_queue_data"/> <func> <name name="process_flag" arity="2" clause_i="5"/> - <fsummary>Set process flag <c>off_heap_message_queue</c> for the calling process</fsummary> + <fsummary>Set process flag <c>message_queue_data</c> for the calling process</fsummary> + <type name="message_queue_data"/> <desc> <p>This flag determines how messages in the message queue are stored. When the flag is:</p> <taglist> - <tag><c>true</c></tag> + <tag><c>off_heap</c></tag> <item><p> <em>All</em> messages in the message queue will be stored outside of the process heap. This implies that <em>no</em> messages in the message queue will be part of a garbage collection of the process. </p></item> - <tag><c>false</c></tag> + <tag><c>on_heap</c></tag> + <item><p> + All messages in the message queue will eventually be + placed on heap. They may however temporarily be stored + off heap. This is how messages always have been stored + up until ERTS version 8.0. + </p></item> + <tag><c>mixed</c></tag> <item><p> Messages may be placed either on the heap or outside of the heap. </p></item> </taglist> <p> + The default <c>message_queue_data</c> process flag is determined + by the <seealso marker="erl#+xmqd"><c>+xmqd</c></seealso> + <c>erl</c> command line argument. + </p> + <p> If the process potentially may get a hugh amount of messages, - you are recommended to set the flag to <c>true</c>. This since - a garbage collection with lots of messages placed on the heap - may become extremly expensive. Performance of the actual - message passing is however generally better when setting the - flag to <c>false</c>. + you are recommended to set the flag to <c>off_heap</c>. This + since a garbage collection with lots of messages placed on + the heap may become extremly expensive and the process may + consume large amounts of memory. Performance of the + actual message passing is however generally better when not + using the <c>off_heap</c> flag. </p> <p> - When changing this flag from <c>false</c> to <c>true</c>, - all messages in the message queue are moved off heap. This - work has been initiated but not completed when this function + When changing this flag messages will be moved. This work + has been initiated but not completed when this function call returns. </p> <p>Returns the old value of the flag.</p> @@ -4478,6 +4497,7 @@ os_prompt% </pre> <type name="process_info_result_item"/> <type name="priority_level"/> <type name="stack_item"/> + <type name="message_queue_data" /> <desc> <p>Returns a list containing <c><anno>InfoTuple</anno></c>s with miscellaneous information about the process identified by @@ -4530,6 +4550,7 @@ os_prompt% </pre> <type name="process_info_result_item"/> <type name="stack_item"/> <type name="priority_level"/> + <type name="message_queue_data" /> <desc> <p>Returns information about the process identified by <c><anno>Pid</anno></c>, as specified by @@ -4698,13 +4719,14 @@ os_prompt% </pre> monitor by name, the list item is <c>{process, {<anno>RegName</anno>, <anno>Node</anno>}}</c>.</p> </item> - <tag><c>{off_heap_message_queue, <anno>OHMQ</anno>}</c></tag> + <tag><c>{message_queue_data, <anno>MQD</anno>}</c></tag> <item> - <p>Returns the current state of the <c>off_heap_message_queue</c> - process flag. <c><anno>OHMQ</anno></c> is either <c>true</c>, or - <c>false</c>. For more information, see the documentation of - <seealso marker="#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue, - OHMQ)</c></seealso>.</p> + <p>Returns the current state of the <c>message_queue_data</c> + process flag. <c><anno>MQD</anno></c> is either <c>off_heap</c>, + <c>on_heap</c>, or <c>mixed</c>. For more information, see the + documentation of + <seealso marker="#process_flag_message_queue_data"><c>process_flag(message_queue_data, + MQD)</c></seealso>.</p> </item> <tag><c>{priority, <anno>Level</anno>}</c></tag> <item> @@ -5474,6 +5496,7 @@ true</pre> <name name="spawn_opt" arity="2"/> <fsummary>Creates a new process with a fun as entry point.</fsummary> <type name="priority_level"/> + <type name="message_queue_data" /> <type name="spawn_opt_option" /> <desc> <p>Returns the process identifier (pid) of a new process @@ -5490,6 +5513,7 @@ true</pre> <name name="spawn_opt" arity="3"/> <fsummary>Creates a new process with a fun as entry point on a given node.</fsummary> <type name="priority_level"/> + <type name="message_queue_data" /> <type name="spawn_opt_option" /> <desc> <p>Returns the process identifier (pid) of a new process started @@ -5505,6 +5529,7 @@ true</pre> <name name="spawn_opt" arity="4"/> <fsummary>Creates a new process with a function as entry point.</fsummary> <type name="priority_level"/> + <type name="message_queue_data" /> <type name="spawn_opt_option" /> <desc> <p>Works as @@ -5607,17 +5632,17 @@ true</pre> fine-tuning an application and to measure the execution time with various <c><anno>VSize</anno></c> values.</p> </item> - <tag><c>{off_heap_message_queue, <anno>OHMQ</anno>}</c></tag> + <tag><c>{message_queue_data, <anno>MQD</anno>}</c></tag> <item> - <p>Sets the state of the <c>off_heap_message_queue</c> process - flag. <c><anno>OHMQ</anno></c> should be either <c>true</c>, or - <c>false</c>. The default <c>off_heap_message_queue</c> process - flag is determined by the - <seealso marker="erl#+xohmq"><c>+xohmq</c></seealso> <c>erl</c> + <p>Sets the state of the <c>message_queue_data</c> process + flag. <c><anno>MQD</anno></c> should be either <c>off_heap</c>, + <c>on_heap</c>, or <c>mixed</c>. The default + <c>message_queue_data</c> process flag is determined by the + <seealso marker="erl#+xmqd"><c>+xmqd</c></seealso> <c>erl</c> command line argument. For more information, see the documentation of - <seealso marker="#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue, - <anno>OHMQ</anno>)</c></seealso>.</p> + <seealso marker="#process_flag_message_queue_data"><c>process_flag(message_queue_data, + <anno>MQD</anno>)</c></seealso>.</p> </item> </taglist> </desc> @@ -5627,6 +5652,7 @@ true</pre> <name name="spawn_opt" arity="5"/> <fsummary>Creates a new process with a function as entry point on a given node.</fsummary> <type name="priority_level"/> + <type name="message_queue_data" /> <type name="spawn_opt_option" /> <desc> <p>Returns the process identifier (pid) of a new process started @@ -7106,15 +7132,15 @@ ok used by the runtime system. It is on the form "<major ver>.<minor ver>".</p> </item> - <tag><marker id="system_info_off_heap_message_queue"><c>off_heap_message_queue</c></marker></tag> + <tag><marker id="system_info_message_queue_data"><c>message_queue_data</c></marker></tag> <item> - <p>Returns the default value of the <c>off_heap_message_queue</c> - process flag which is either <c>true</c> or <c>false</c>. This - default is set by the <c>erl</c> command line argument - <seealso marker="erl#+xohmq"><c>+xohmq</c></seealso>. For more information on the - <c>off_heap_message_queue</c> process flag, see documentation of - <seealso marker="#process_flag_off_heap_message_queue"><c>process_flag(off_heap_message_queue, - OHMQ)</c></seealso>.</p> + <p>Returns the default value of the <c>message_queue_data</c> + process flag which is either <c>off_heap</c>, <c>on_heap</c>, or <c>mixed</c>. + This default is set by the <c>erl</c> command line argument + <seealso marker="erl#+xmqd"><c>+xmqd</c></seealso>. For more information on the + <c>message_queue_data</c> process flag, see documentation of + <seealso marker="#process_flag_message_queue_data"><c>process_flag(message_queue_data, + MQD)</c></seealso>.</p> </item> <tag><marker id="system_info_otp_release"><c>otp_release</c></marker></tag> <item> diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index ea04495574..7424e47ec3 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -350,6 +350,7 @@ atom memory_internal atom memory_types atom message atom message_binary +atom message_queue_data atom message_queue_len atom messages atom merge_trap @@ -361,6 +362,7 @@ atom min_heap_size atom min_bin_vheap_size atom minor_version atom Minus='-' +atom mixed atom module atom module_info atom monitored_by @@ -423,11 +425,12 @@ atom notify atom notsup atom nouse_stdio atom objects -atom off_heap_message_queue +atom off_heap atom offset atom ok atom old_heap_block_size atom old_heap_size +atom on_heap atom on_load atom open atom open_error diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index f0340540cb..410a6cecac 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -910,13 +910,22 @@ BIF_RETTYPE spawn_opt_1(BIF_ALIST_1) so.priority = PRIORITY_LOW; else goto error; - } else if (arg == am_off_heap_message_queue) { - if (val == am_true) - so.flags |= SPO_OFF_HEAP_MSGQ; - else if (val == am_false) + } else if (arg == am_message_queue_data) { + switch (val) { + case am_mixed: + so.flags &= ~(SPO_OFF_HEAP_MSGQ|SPO_ON_HEAP_MSGQ); + break; + case am_on_heap: so.flags &= ~SPO_OFF_HEAP_MSGQ; - else + so.flags |= SPO_ON_HEAP_MSGQ; + break; + case am_off_heap: + so.flags &= ~SPO_ON_HEAP_MSGQ; + so.flags |= SPO_OFF_HEAP_MSGQ; + break; + default: goto error; + } } else if (arg == am_min_heap_size && is_small(val)) { Sint min_heap_size = signed_val(val); if (min_heap_size < 0) { @@ -1695,15 +1704,10 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2) } BIF_RET(old_value); } - else if (BIF_ARG_1 == am_off_heap_message_queue) { - int enable; - if (BIF_ARG_2 == am_true) - enable = 1; - else if (BIF_ARG_2 == am_false) - enable = 0; - else + else if (BIF_ARG_1 == am_message_queue_data) { + old_value = erts_change_message_queue_management(BIF_P, BIF_ARG_2); + if (is_non_value(old_value)) goto error; - old_value = erts_change_off_heap_message_queue_state(BIF_P, enable); BIF_RET(old_value); } else if (BIF_ARG_1 == am_sensitive) { diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 3fe8aed50e..bb75b8abb6 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -589,7 +589,7 @@ static Eterm pi_args[] = { am_min_bin_vheap_size, am_current_location, am_current_stacktrace, - am_off_heap_message_queue + am_message_queue_data }; #define ERTS_PI_ARGS ((int) (sizeof(pi_args)/sizeof(Eterm))) @@ -637,7 +637,7 @@ pi_arg2ix(Eterm arg) case am_min_bin_vheap_size: return 28; case am_current_location: return 29; case am_current_stacktrace: return 30; - case am_off_heap_message_queue: return 31; + case am_message_queue_data: return 31; default: return -1; } } @@ -1496,8 +1496,22 @@ process_info_aux(Process *BIF_P, break; } - case am_off_heap_message_queue: - res = rp->flags & F_OFF_HEAP_MSGQ ? am_true : am_false; + case am_message_queue_data: + switch (rp->flags & (F_OFF_HEAP_MSGQ|F_ON_HEAP_MSGQ)) { + case F_OFF_HEAP_MSGQ: + res = am_off_heap; + break; + case F_ON_HEAP_MSGQ: + res = am_on_heap; + break; + case 0: + res = am_mixed; + break; + default: + res = am_error; + ERTS_INTERNAL_ERROR("Inconsistent message queue management state"); + break; + } hp = HAlloc(BIF_P, 3); break; @@ -2662,9 +2676,18 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1) BIF_RET(am_true); } #endif - else if (BIF_ARG_1 == am_off_heap_message_queue) { - BIF_RET(erts_default_spo_flags & SPO_OFF_HEAP_MSGQ - ? am_true : am_false); + else if (BIF_ARG_1 == am_message_queue_data) { + switch (erts_default_spo_flags & (SPO_ON_HEAP_MSGQ|SPO_OFF_HEAP_MSGQ)) { + case SPO_OFF_HEAP_MSGQ: + BIF_RET(am_off_heap); + case SPO_ON_HEAP_MSGQ: + BIF_RET(am_on_heap); + case 0: + BIF_RET(am_mixed); + default: + ERTS_INTERNAL_ERROR("Inconsistent message queue management state"); + BIF_RET(am_error); + } } else if (ERTS_IS_ATOM_STR("compile_info",BIF_ARG_1)) { Uint sz; diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c index 3784367195..b63567e563 100644 --- a/erts/emulator/beam/erl_gc.c +++ b/erts/emulator/beam/erl_gc.c @@ -145,6 +145,7 @@ static void offset_rootset(Process *p, Sint offs, char* area, Uint area_size, Eterm* objv, int nobj); static void offset_off_heap(Process* p, Sint offs, char* area, Uint area_size); static void offset_mqueue(Process *p, Sint offs, char* area, Uint area_size); +static void move_msgq_to_heap(Process *p); static void init_gc_info(ErtsGCInfo *gcip); @@ -520,6 +521,14 @@ young_gen_usage(Process *p) Eterm *aheap; hsz = p->mbuf_sz; + + if (p->flags & F_ON_HEAP_MSGQ) { + ErtsMessage *mp; + for (mp = p->msg.first; mp; mp = mp->next) + if (mp->data.attached) + hsz += erts_msg_attached_data_size(mp); + } + aheap = p->abandoned_heap; if (!aheap) hsz += p->htop - p->heap; @@ -1040,10 +1049,13 @@ minor_collection(Process* p, ErlHeapFragment *live_hf_end, do_minor(p, live_hf_end, (char *) mature, mature_size*sizeof(Eterm), new_sz, objv, nobj); + if (p->flags & F_ON_HEAP_MSGQ) + move_msgq_to_heap(p); + new_mature = p->old_htop - prev_old_htop; size_after = new_mature; - size_after += HEAP_TOP(p) - HEAP_START(p); + size_after += HEAP_TOP(p) - HEAP_START(p) + p->mbuf_sz; *recl += (size_before - size_after); ErtsGcQuickSanityCheck(p); @@ -1461,9 +1473,14 @@ major_collection(Process* p, ErlHeapFragment *live_hf_end, HIGH_WATER(p) = HEAP_TOP(p); + remove_message_buffers(p); + + if (p->flags & F_ON_HEAP_MSGQ) + move_msgq_to_heap(p); + ErtsGcQuickSanityCheck(p); - size_after = HEAP_TOP(p) - HEAP_START(p); + size_after = HEAP_TOP(p) - HEAP_START(p) + p->mbuf_sz; *recl += size_before - size_after; adjusted = adjust_after_fullsweep(p, need, objv, nobj); @@ -1471,8 +1488,6 @@ major_collection(Process* p, ErlHeapFragment *live_hf_end, #ifdef HARDDEBUG disallow_heap_frag_ref_in_heap(p); #endif - remove_message_buffers(p); - ErtsGcQuickSanityCheck(p); return gc_cost(size_after, adjusted ? size_after : 0); @@ -2000,6 +2015,173 @@ collect_live_heap_frags(Process* p, ErlHeapFragment *live_hf_end, return n_htop; } +static ERTS_INLINE void +copy_one_frag(Eterm** hpp, ErlOffHeap* off_heap, + ErlHeapFragment *bp, Eterm *refs, int nrefs) +{ + Uint sz; + int i; + Sint offs; + struct erl_off_heap_header* oh; + Eterm *fhp, *hp; + + OH_OVERHEAD(off_heap, bp->off_heap.overhead); + sz = bp->used_size; + + fhp = bp->mem; + hp = *hpp; + offs = hp - fhp; + + oh = NULL; + while (sz--) { + Uint cpy_sz; + Eterm val = *fhp++; + + switch (primary_tag(val)) { + case TAG_PRIMARY_IMMED1: + *hp++ = val; + break; + case TAG_PRIMARY_LIST: + case TAG_PRIMARY_BOXED: + *hp++ = offset_ptr(val, offs); + break; + case TAG_PRIMARY_HEADER: + *hp++ = val; + switch (val & _HEADER_SUBTAG_MASK) { + case ARITYVAL_SUBTAG: + break; + case REFC_BINARY_SUBTAG: + case FUN_SUBTAG: + case EXTERNAL_PID_SUBTAG: + case EXTERNAL_PORT_SUBTAG: + case EXTERNAL_REF_SUBTAG: + oh = (struct erl_off_heap_header*) (hp-1); + cpy_sz = thing_arityval(val); + goto cpy_words; + default: + cpy_sz = header_arity(val); + + cpy_words: + ASSERT(sz >= cpy_sz); + sz -= cpy_sz; + while (cpy_sz >= 8) { + cpy_sz -= 8; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + } + switch (cpy_sz) { + case 7: *hp++ = *fhp++; + case 6: *hp++ = *fhp++; + case 5: *hp++ = *fhp++; + case 4: *hp++ = *fhp++; + case 3: *hp++ = *fhp++; + case 2: *hp++ = *fhp++; + case 1: *hp++ = *fhp++; + default: break; + } + if (oh) { + /* Add to offheap list */ + oh->next = off_heap->first; + off_heap->first = oh; + ASSERT(*hpp <= (Eterm*)oh); + ASSERT(hp > (Eterm*)oh); + oh = NULL; + } + break; + } + break; + } + } + + ASSERT(bp->used_size == hp - *hpp); + *hpp = hp; + + for (i = 0; i < nrefs; i++) { + if (is_not_immed(refs[i])) + refs[i] = offset_ptr(refs[i], offs); + } + bp->off_heap.first = NULL; +} + +static void +move_msgq_to_heap(Process *p) +{ + ErtsMessage **mpp = &p->msg.first; + + while (*mpp) { + ErtsMessage *mp = *mpp; + + if (mp->data.attached) { + ErlHeapFragment *bp; + ErtsHeapFactory factory; + + erts_factory_proc_prealloc_init(&factory, p, + erts_msg_attached_data_size(mp)); + + if (is_non_value(ERL_MESSAGE_TERM(mp))) { + if (mp->data.dist_ext) { + ASSERT(mp->data.dist_ext->heap_size >= 0); + if (is_not_nil(ERL_MESSAGE_TOKEN(mp))) { + bp = erts_dist_ext_trailer(mp->data.dist_ext); + ERL_MESSAGE_TOKEN(mp) = copy_struct(ERL_MESSAGE_TOKEN(mp), + bp->used_size, + &factory.hp, + factory.off_heap); + erts_cleanup_offheap(&bp->off_heap); + } + ERL_MESSAGE_TERM(mp) = erts_decode_dist_ext(&factory, + mp->data.dist_ext); + erts_free_dist_ext_copy(mp->data.dist_ext); + mp->data.dist_ext = NULL; + } + } + else { + + if (mp->data.attached == ERTS_MSG_COMBINED_HFRAG) + bp = &mp->hfrag; + else + bp = mp->data.heap_frag; + + if (bp->next) + erts_move_multi_frags(&factory.hp, factory.off_heap, bp, + mp->m, ERL_MESSAGE_REF_ARRAY_SZ, 0); + else + copy_one_frag(&factory.hp, factory.off_heap, bp, + mp->m, ERL_MESSAGE_REF_ARRAY_SZ); + + if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) { + mp->data.heap_frag = NULL; + free_message_buffer(bp); + } + else { + ErtsMessage *tmp = erts_alloc_message(0, NULL); + sys_memcpy((void *) tmp->m, (void *) mp->m, + sizeof(Eterm)*ERL_MESSAGE_REF_ARRAY_SZ); + tmp->next = mp->next; + if (p->msg.save == &mp->next) + p->msg.save = &tmp->next; + if (p->msg.last == &mp->next) + p->msg.last = &tmp->next; + *mpp = tmp; + mp->next = NULL; + erts_cleanup_messages(mp); + mp = tmp; + } + } + + erts_factory_close(&factory); + } + + mpp = &(*mpp)->next; + } +} + static Uint setup_rootset(Process *p, Eterm *objv, int nobj, Rootset *rootset) { @@ -2089,9 +2271,8 @@ setup_rootset(Process *p, Eterm *objv, int nobj, Rootset *rootset) case F_OFF_HEAP_MSGQ_CHNG: case 0: { /* - * Off heap message queue disabled, i.e. we may - * have references from the message queue to the - * heap... + * We do not have off heap message queue enabled, i.e. we + * need to add message queue to rootset... */ ErtsMessage *mp; diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index f396a0a156..628915eb23 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -630,7 +630,8 @@ void erts_usage(void) erts_fprintf(stderr, "-W<i|w|e> set error logger warnings mapping,\n"); erts_fprintf(stderr, " see error_logger documentation for details\n"); - erts_fprintf(stderr, "-xohmq bool set default off_heap_message_queue flag for processes\n"); + erts_fprintf(stderr, "-xmqd val set default message queue data flag for processes,\n"); + erts_fprintf(stderr, " valid values are: off_heap | on_heap | mixed\n"); erts_fprintf(stderr, "-zdbbl size set the distribution buffer busy limit in kilobytes\n"); erts_fprintf(stderr, " valid range is [1-%d]\n", INT_MAX/1024); erts_fprintf(stderr, "-zdntgc time set delayed node table gc in seconds\n"); @@ -2018,15 +2019,21 @@ erl_start(int argc, char **argv) case 'x': { char *sub_param = argv[i]+2; - if (has_prefix("ohmq", sub_param)) { - arg = get_arg(sub_param+4, argv[i+1], &i); - if (sys_strcmp(arg, "true") == 0) - erts_default_spo_flags |= SPO_OFF_HEAP_MSGQ; - else if (sys_strcmp(arg, "false") == 0) + if (has_prefix("mqd", sub_param)) { + arg = get_arg(sub_param+3, argv[i+1], &i); + if (sys_strcmp(arg, "mixed") == 0) + erts_default_spo_flags &= ~(SPO_ON_HEAP_MSGQ|SPO_OFF_HEAP_MSGQ); + else if (sys_strcmp(arg, "on_heap") == 0) { erts_default_spo_flags &= ~SPO_OFF_HEAP_MSGQ; + erts_default_spo_flags |= SPO_ON_HEAP_MSGQ; + } + else if (sys_strcmp(arg, "off_heap") == 0) { + erts_default_spo_flags &= ~SPO_ON_HEAP_MSGQ; + erts_default_spo_flags |= SPO_OFF_HEAP_MSGQ; + } else { erts_fprintf(stderr, - "Invalid off_heap_message_queue flag: %s\n", arg); + "Invalid message_queue_data flag: %s\n", arg); erts_usage(); } } else { diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 79739501a8..797212450c 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -629,9 +629,24 @@ erts_try_alloc_message_on_heap(Process *pp, #endif else { in_message_fragment: - - mp = erts_alloc_message(sz, hpp); - *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; + if (!((*psp) & ERTS_PSFLG_ON_HEAP_MSGQ)) { + mp = erts_alloc_message(sz, hpp); + *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; + } + else { + mp = erts_alloc_message(0, NULL); + if (!sz) { + *hpp = NULL; + *ohpp = NULL; + } + else { + ErlHeapFragment *bp; + bp = new_message_buffer(sz); + *hpp = &bp->mem[0]; + mp->data.heap_frag = bp; + *ohpp = &bp->off_heap; + } + } *on_heap_p = 0; } @@ -976,12 +991,12 @@ erts_complete_off_heap_message_queue_change(Process *c_p) ASSERT(erts_smp_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ); /* - * This job was first initiated when the process changed - * "off heap message queue" state from false to true. Since - * then ERTS_PSFLG_OFF_HEAP_MSGQ has been set. However, the - * state change might have been changed again (multiple times) - * since then. Check users last requested state (the flag - * F_OFF_HEAP_MSGQ), and make the state consistent with that. + * This job was first initiated when the process changed to off heap + * message queue management. Since then ERTS_PSFLG_OFF_HEAP_MSGQ + * has been set. However, the management state might have been changed + * again (multiple times) since then. Check users last requested state + * (the flags F_OFF_HEAP_MSGQ, and F_ON_HEAP_MSGQ), and make the state + * consistent with that. */ if (!(c_p->flags & F_OFF_HEAP_MSGQ)) @@ -1022,8 +1037,9 @@ change_off_heap_msgq(void *vcohmq) } Eterm -erts_change_off_heap_message_queue_state(Process *c_p, int enable) +erts_change_message_queue_management(Process *c_p, Eterm new_state) { + Eterm res; #ifdef DEBUG if (c_p->flags & F_OFF_HEAP_MSGQ) { @@ -1042,57 +1058,117 @@ erts_change_off_heap_message_queue_state(Process *c_p, int enable) } #endif - if (c_p->flags & F_OFF_HEAP_MSGQ) { - /* Off heap message queue is enabled */ + switch (c_p->flags & (F_OFF_HEAP_MSGQ|F_ON_HEAP_MSGQ)) { - if (!enable) { + case F_OFF_HEAP_MSGQ: + res = am_off_heap; + + switch (new_state) { + case am_off_heap: + break; + case am_on_heap: + c_p->flags |= F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_ON_HEAP_MSGQ); + /* fall through */ + case am_mixed: c_p->flags &= ~F_OFF_HEAP_MSGQ; /* * We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ - * if a change is ongoing. It will be adjusted when the - * change completes... + * if a off heap change is ongoing. It will be adjusted + * when the change completes... */ if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { /* Safe to clear ERTS_PSFLG_OFF_HEAP_MSGQ... */ erts_smp_atomic32_read_band_nob(&c_p->state, ~ERTS_PSFLG_OFF_HEAP_MSGQ); } + break; + default: + res = THE_NON_VALUE; /* badarg */ + break; } + break; + + case F_ON_HEAP_MSGQ: + res = am_on_heap; - return am_true; /* Old state */ + switch (new_state) { + case am_on_heap: + break; + case am_mixed: + c_p->flags &= ~F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_ON_HEAP_MSGQ); + break; + case am_off_heap: + c_p->flags &= ~F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_ON_HEAP_MSGQ); + goto change_to_off_heap; + default: + res = THE_NON_VALUE; /* badarg */ + break; + } + break; + + case 0: + res = am_mixed; + + switch (new_state) { + case am_mixed: + break; + case am_on_heap: + c_p->flags |= F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_ON_HEAP_MSGQ); + break; + case am_off_heap: + goto change_to_off_heap; + default: + res = THE_NON_VALUE; /* badarg */ + break; + } + break; + + default: + res = am_error; + ERTS_INTERNAL_ERROR("Inconsistent message queue management state"); + break; } - /* Off heap message queue is disabled */ + return res; + +change_to_off_heap: - if (enable) { - c_p->flags |= F_OFF_HEAP_MSGQ; + c_p->flags |= F_OFF_HEAP_MSGQ; + + /* + * We do not have to schedule a change if + * we have an ongoing off heap change... + */ + if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { + ErtsChangeOffHeapMessageQueue *cohmq; /* - * We do not have to schedule a change if - * we have an ongoing change... + * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait + * thread progress before completing the change in + * order to ensure that all senders observe that + * messages should be passed off heap. When the + * change has completed, GC does not need to inspect + * the message queue at all. */ - if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { - ErtsChangeOffHeapMessageQueue *cohmq; - /* - * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait - * thread progress before completing the change in - * order to ensure that all senders observe that - * messages should be passed off heap. When the - * change has completed, GC does not need to inspect - * the message queue at all. - */ - erts_smp_atomic32_read_bor_nob(&c_p->state, - ERTS_PSFLG_OFF_HEAP_MSGQ); - c_p->flags |= F_OFF_HEAP_MSGQ_CHNG; - cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG, - sizeof(ErtsChangeOffHeapMessageQueue)); - cohmq->pid = c_p->common.id; - erts_schedule_thr_prgr_later_op(change_off_heap_msgq, - (void *) cohmq, - &cohmq->lop); - } + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_OFF_HEAP_MSGQ); + c_p->flags |= F_OFF_HEAP_MSGQ_CHNG; + cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG, + sizeof(ErtsChangeOffHeapMessageQueue)); + cohmq->pid = c_p->common.id; + erts_schedule_thr_prgr_later_op(change_off_heap_msgq, + (void *) cohmq, + &cohmq->lop); } - return am_false; /* Old state */ + return res; } int diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 740ae46a0f..e241926638 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -277,7 +277,7 @@ void erts_cleanup_offheap(ErlOffHeap *offheap); void erts_save_message_in_proc(Process *p, ErtsMessage *msg); Sint erts_move_messages_off_heap(Process *c_p); Sint erts_complete_off_heap_message_queue_change(Process *c_p); -Eterm erts_change_off_heap_message_queue_state(Process *c_p, int enable); +Eterm erts_change_message_queue_management(Process *c_p, Eterm new_state); int erts_decode_dist_message(Process *, ErtsProcLocks, ErtsMessage *, int); diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index d17355207b..7a31aa3e33 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -10779,6 +10779,10 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). state |= ERTS_PSFLG_OFF_HEAP_MSGQ; flags |= F_OFF_HEAP_MSGQ; } + else if (so->flags & SPO_ON_HEAP_MSGQ) { + state |= ERTS_PSFLG_ON_HEAP_MSGQ; + flags |= F_ON_HEAP_MSGQ; + } if (!rq) rq = erts_get_runq_proc(parent); diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 41b367435d..f0798d8c2d 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -1145,14 +1145,15 @@ void erts_check_for_holes(Process* p); #define ERTS_PSFLG_PROXY ERTS_PSFLG_BIT(16) #define ERTS_PSFLG_DELAYED_SYS ERTS_PSFLG_BIT(17) #define ERTS_PSFLG_OFF_HEAP_MSGQ ERTS_PSFLG_BIT(18) +#define ERTS_PSFLG_ON_HEAP_MSGQ ERTS_PSFLG_BIT(19) #ifdef ERTS_DIRTY_SCHEDULERS -#define ERTS_PSFLG_DIRTY_CPU_PROC ERTS_PSFLG_BIT(19) -#define ERTS_PSFLG_DIRTY_IO_PROC ERTS_PSFLG_BIT(20) -#define ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q ERTS_PSFLG_BIT(21) -#define ERTS_PSFLG_DIRTY_IO_PROC_IN_Q ERTS_PSFLG_BIT(22) -#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 23) +#define ERTS_PSFLG_DIRTY_CPU_PROC ERTS_PSFLG_BIT(20) +#define ERTS_PSFLG_DIRTY_IO_PROC ERTS_PSFLG_BIT(21) +#define ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q ERTS_PSFLG_BIT(22) +#define ERTS_PSFLG_DIRTY_IO_PROC_IN_Q ERTS_PSFLG_BIT(23) +#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 24) #else -#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 19) +#define ERTS_PSFLG_MAX (ERTS_PSFLGS_ZERO_BIT_OFFSET + 20) #endif #define ERTS_PSFLGS_IN_PRQ_MASK (ERTS_PSFLG_IN_PRQ_MAX \ @@ -1201,6 +1202,7 @@ void erts_check_for_holes(Process* p); #define SPO_MONITOR 4 #define SPO_SYSTEM_PROC 8 #define SPO_OFF_HEAP_MSGQ 16 +#define SPO_ON_HEAP_MSGQ 32 extern int erts_default_spo_flags; @@ -1290,9 +1292,10 @@ extern struct erts_system_profile_flags_t erts_system_profile_flags; #define F_FORCE_GC (1 << 10) /* Force gc at process in-scheduling */ #define F_DISABLE_GC (1 << 11) /* Disable GC (see below) */ #define F_OFF_HEAP_MSGQ (1 << 12) /* Off heap msg queue */ -#define F_OFF_HEAP_MSGQ_CHNG (1 << 13) /* Off heap msg queue changing */ -#define F_ABANDONED_HEAP_USE (1 << 14) /* Have usage of abandoned heap */ -#define F_DELAY_GC (1 << 15) /* Similar to disable GC (see below) */ +#define F_ON_HEAP_MSGQ (1 << 13) /* Off heap msg queue */ +#define F_OFF_HEAP_MSGQ_CHNG (1 << 14) /* Off heap msg queue changing */ +#define F_ABANDONED_HEAP_USE (1 << 15) /* Have usage of abandoned heap */ +#define F_DELAY_GC (1 << 16) /* Similar to disable GC (see below) */ /* * F_DISABLE_GC and F_DELAY_GC are similar. Both will prevent diff --git a/erts/emulator/test/Makefile b/erts/emulator/test/Makefile index 6519fd8982..8cc47937b7 100644 --- a/erts/emulator/test/Makefile +++ b/erts/emulator/test/Makefile @@ -79,7 +79,7 @@ MODULES= \ node_container_SUITE \ nofrag_SUITE \ num_bif_SUITE \ - off_heap_message_queue_SUITE \ + message_queue_data_SUITE \ op_SUITE \ port_SUITE \ port_bif_SUITE \ diff --git a/erts/emulator/test/off_heap_message_queue_SUITE.erl b/erts/emulator/test/message_queue_data_SUITE.erl index a667704942..11481409aa 100644 --- a/erts/emulator/test/off_heap_message_queue_SUITE.erl +++ b/erts/emulator/test/message_queue_data_SUITE.erl @@ -18,7 +18,7 @@ %% %CopyrightEnd% %% --module(off_heap_message_queue_SUITE). +-module(message_queue_data_SUITE). -export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1, init_per_group/2,end_per_group/2, @@ -68,50 +68,73 @@ end_per_group(_GroupName, Config) -> basic(Config) when is_list(Config) -> - basic_test(erlang:system_info(off_heap_message_queue)), + basic_test(erlang:system_info(message_queue_data)), - {ok, Node1} = start_node(Config, "+xohmq true"), - ok = rpc:call(Node1, ?MODULE, basic_test, [true]), + {ok, Node1} = start_node(Config, "+xmqd off_heap"), + ok = rpc:call(Node1, ?MODULE, basic_test, [off_heap]), stop_node(Node1), - {ok, Node2} = start_node(Config, "+xohmq false"), - ok = rpc:call(Node2, ?MODULE, basic_test, [false]), + {ok, Node2} = start_node(Config, "+xmqd on_heap"), + ok = rpc:call(Node2, ?MODULE, basic_test, [on_heap]), stop_node(Node2), + + {ok, Node3} = start_node(Config, "+xmqd mixed"), + ok = rpc:call(Node3, ?MODULE, basic_test, [mixed]), + stop_node(Node3), + ok. +is_valid_mqd_value(off_heap) -> + true; +is_valid_mqd_value(on_heap) -> + true; +is_valid_mqd_value(mixed) -> + true; +is_valid_mqd_value(_) -> + false. + + basic_test(Default) -> - Default = erlang:system_info(off_heap_message_queue), - true = (Default == true) orelse (Default == false), + Default = erlang:system_info(message_queue_data), + true = is_valid_mqd_value(Default), - {off_heap_message_queue, Default} = process_info(self(), off_heap_message_queue), - Default = process_flag(off_heap_message_queue, true), - {off_heap_message_queue, true} = process_info(self(), off_heap_message_queue), - true = process_flag(off_heap_message_queue, false), - {off_heap_message_queue, false} = process_info(self(), off_heap_message_queue), - false = process_flag(off_heap_message_queue, Default), - {'EXIT', _} = (catch process_flag(off_heap_message_queue, blupp)), + {message_queue_data, Default} = process_info(self(), message_queue_data), + Default = process_flag(message_queue_data, off_heap), + {message_queue_data, off_heap} = process_info(self(), message_queue_data), + off_heap = process_flag(message_queue_data, on_heap), + {message_queue_data, on_heap} = process_info(self(), message_queue_data), + on_heap = process_flag(message_queue_data, mixed), + {message_queue_data, mixed} = process_info(self(), message_queue_data), + mixed = process_flag(message_queue_data, Default), + {'EXIT', _} = (catch process_flag(message_queue_data, blupp)), P1 = spawn_opt(fun () -> receive after infinity -> ok end end, [link]), - {off_heap_message_queue, Default} = process_info(P1, off_heap_message_queue), + {message_queue_data, Default} = process_info(P1, message_queue_data), unlink(P1), exit(P1, bye), P2 = spawn_opt(fun () -> receive after infinity -> ok end end, - [link, {off_heap_message_queue, false}]), - {off_heap_message_queue, false} = process_info(P2, off_heap_message_queue), + [link, {message_queue_data, off_heap}]), + {message_queue_data, off_heap} = process_info(P2, message_queue_data), unlink(P2), exit(P2, bye), P3 = spawn_opt(fun () -> receive after infinity -> ok end end, - [link, {off_heap_message_queue, true}]), - {off_heap_message_queue, true} = process_info(P3, off_heap_message_queue), + [link, {message_queue_data, on_heap}]), + {message_queue_data, on_heap} = process_info(P3, message_queue_data), unlink(P3), exit(P3, bye), + P4 = spawn_opt(fun () -> receive after infinity -> ok end end, + [link, {message_queue_data, mixed}]), + {message_queue_data, mixed} = process_info(P4, message_queue_data), + unlink(P4), + exit(P4, bye), + {'EXIT', _} = (catch spawn_opt(fun () -> receive after infinity -> ok end end, - [link, {off_heap_message_queue, blapp}])), + [link, {message_queue_data, blapp}])), ok. @@ -119,21 +142,21 @@ process_info_messages(Config) when is_list(Config) -> Tester = self(), P1 = spawn_opt(fun () -> receive after 500 -> ok end, - false = process_flag(off_heap_message_queue, true), + mixed = process_flag(message_queue_data, off_heap), Tester ! first, receive after 500 -> ok end, - true = process_flag(off_heap_message_queue, false), + off_heap = process_flag(message_queue_data, on_heap), Tester ! second, receive after 500 -> ok end, - false = process_flag(off_heap_message_queue, true), + on_heap = process_flag(message_queue_data, mixed), Tester ! third, receive after 500 -> ok end, - true = process_flag(off_heap_message_queue, false), + mixed = process_flag(message_queue_data, off_heap), Tester ! fourth, receive after infinity -> ok end end, - [link, {off_heap_message_queue, false}]), + [link, {message_queue_data, mixed}]), P1 ! "A", receive first -> ok end, @@ -149,16 +172,16 @@ process_info_messages(Config) when is_list(Config) -> P2 = spawn_opt(fun () -> receive after 500 -> ok end, - false = process_flag(off_heap_message_queue, true), + mixed = process_flag(message_queue_data, off_heap), Tester ! first, receive after 500 -> ok end, - true = process_flag(off_heap_message_queue, false), + off_heap = process_flag(message_queue_data, on_heap), Tester ! second, receive after 500 -> ok end, - false = process_flag(off_heap_message_queue, true), + on_heap = process_flag(message_queue_data, mixed), Tester ! third, receive after 500 -> ok end, - true = process_flag(off_heap_message_queue, false), + mixed = process_flag(message_queue_data, off_heap), Tester ! fourth, receive after 500 -> ok end, @@ -172,7 +195,7 @@ process_info_messages(Config) when is_list(Config) -> Tester ! self() end, - [link, {off_heap_message_queue, false}]), + [link, {message_queue_data, mixed}]), P2 ! "A", receive first -> ok end, diff --git a/erts/etc/common/erlexec.c b/erts/etc/common/erlexec.c index 461957be10..f1cabe5d0b 100644 --- a/erts/etc/common/erlexec.c +++ b/erts/etc/common/erlexec.c @@ -157,7 +157,7 @@ static char *plusr_val_switches[] = { /* +x arguments with values */ static char *plusx_val_switches[] = { - "ohmq", + "mqd", NULL }; diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 6a9ec9c915..7a76c95c53 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -2033,6 +2033,9 @@ open_port(_PortName,_PortSettings) -> -type priority_level() :: low | normal | high | max. +-type message_queue_data() :: + off_heap | on_heap | mixed. + -spec process_flag(trap_exit, Boolean) -> OldBoolean when Boolean :: boolean(), OldBoolean :: boolean(); @@ -2045,9 +2048,9 @@ open_port(_PortName,_PortSettings) -> (min_bin_vheap_size, MinBinVHeapSize) -> OldMinBinVHeapSize when MinBinVHeapSize :: non_neg_integer(), OldMinBinVHeapSize :: non_neg_integer(); - (off_heap_message_queue, OHMQ) -> OldOHMQ when - OHMQ :: boolean(), - OldOHMQ :: boolean(); + (message_queue_data, MQD) -> OldMQD when + MQD :: message_queue_data(), + OldMQD :: message_queue_data(); (priority, Level) -> OldLevel when Level :: priority_level(), OldLevel :: priority_level(); @@ -2086,7 +2089,7 @@ process_flag(_Flag, _Value) -> min_bin_vheap_size | monitored_by | monitors | - off_heap_message_queue | + message_queue_data | priority | reductions | registered_name | @@ -2128,7 +2131,7 @@ process_flag(_Flag, _Value) -> {monitors, Monitors :: [{process, Pid :: pid() | {RegName :: atom(), Node :: node()}}]} | - {off_heap_message_queue, OHMQ :: boolean()} | + {message_queue_data, MQD :: message_queue_data()} | {priority, Level :: priority_level()} | {reductions, Number :: non_neg_integer()} | {registered_name, Atom :: atom()} | @@ -2431,7 +2434,7 @@ tuple_to_list(_Tuple) -> (multi_scheduling) -> disabled | blocked | enabled; (multi_scheduling_blockers) -> [Pid :: pid()]; (nif_version) -> string(); - (off_heap_message_queue) -> boolean(); + (message_queue_data) -> message_queue_data(); (otp_release) -> string(); (os_monotonic_time_source) -> [{atom(),term()}]; (os_system_time_source) -> [{atom(),term()}]; @@ -2567,7 +2570,7 @@ spawn_monitor(M, F, A) -> | {fullsweep_after, Number :: non_neg_integer()} | {min_heap_size, Size :: non_neg_integer()} | {min_bin_vheap_size, VSize :: non_neg_integer()} - | {off_heap_message_queue, OHMQ :: boolean()}. + | {message_queue_data, MQD :: message_queue_data()}. -spec spawn_opt(Fun, Options) -> pid() | {pid(), reference()} when Fun :: function(), |