diff options
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r-- | erts/emulator/beam/erl_message.c | 2110 |
1 files changed, 1402 insertions, 708 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 325d77e911..71ab92937d 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -1,18 +1,19 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1997-2012. All Rights Reserved. + * Copyright Ericsson AB 1997-2016. All Rights Reserved. * - * The contents of this file are subject to the Erlang Public License, - * Version 1.1, (the "License"); you may not use this file except in - * compliance with the License. You should have received a copy of the - * Erlang Public License along with this software. If not, it can be - * retrieved online at http://www.erlang.org/. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * * %CopyrightEnd% */ @@ -31,9 +32,10 @@ #include "erl_process.h" #include "erl_binary.h" #include "dtrace-wrapper.h" +#include "beam_bp.h" -ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message, - ErlMessage, +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message_ref, + ErtsMessageRef, ERL_MESSAGE_BUF_SZ, ERTS_ALC_T_MSG_REF) @@ -43,25 +45,20 @@ ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message, #undef HARD_DEBUG #endif - - - -static ERTS_INLINE int in_heapfrag(const Eterm* ptr, const ErlHeapFragment *bp) +void +init_message(void) { - return ((unsigned)(ptr - bp->mem) < bp->used_size); + init_message_ref_alloc(); } - -void -init_message(void) +void *erts_alloc_message_ref(void) { - init_message_alloc(); + return (void *) message_ref_alloc(); } -void -free_message(ErlMessage* mp) +void erts_free_message_ref(void *mp) { - message_free(mp); + message_ref_free((ErtsMessageRef *) mp); } /* Allocate message buffer (size in words) */ @@ -71,7 +68,8 @@ new_message_buffer(Uint size) ErlHeapFragment* bp; bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP_FRAG, ERTS_HEAP_FRAG_SIZE(size)); - ERTS_INIT_HEAP_FRAG(bp, size); + ERTS_INIT_HEAP_FRAG(bp, size, size); + VERBOSE(DEBUG_SHCOPY, ("[pid=%T] new message buffer %p\n", erts_get_current_pid(), bp->mem)); return bp; } @@ -91,9 +89,6 @@ erts_resize_message_buffer(ErlHeapFragment *bp, Uint size, #endif ErlHeapFragment* nbp; - /* ToDo: Make use of 'used_size' to avoid realloc - when shrinking just a few words */ - #ifdef DEBUG { Uint off_sz = size < bp->used_size ? size : bp->used_size; @@ -108,8 +103,10 @@ erts_resize_message_buffer(ErlHeapFragment *bp, Uint size, } #endif - if (size == bp->used_size) + if (size >= (bp->used_size - bp->used_size / 16)) { + bp->used_size = size; return bp; + } #ifdef HARD_DEBUG dbg_brefs = erts_alloc(ERTS_ALC_T_UNDEF, sizeof(Eterm *)*brefs_size); @@ -201,108 +198,66 @@ free_message_buffer(ErlHeapFragment* bp) }while (bp != NULL); } -static ERTS_INLINE void -link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp) +void +erts_cleanup_messages(ErtsMessage *msgp) { - if (bp) { - /* Link the message buffer */ - bp->next = MBUF(proc); - MBUF(proc) = bp; - MBUF_SIZE(proc) += bp->used_size; - FLAGS(proc) |= F_FORCE_GC; - - /* Move any off_heap's into the process */ - if (bp->off_heap.first != NULL) { - struct erl_off_heap_header** next_p = &bp->off_heap.first; - while (*next_p != NULL) { - next_p = &((*next_p)->next); + ErtsMessage *mp = msgp; + while (mp) { + ErtsMessage *fmp; + ErlHeapFragment *bp; + if (is_non_value(ERL_MESSAGE_TERM(mp))) { + if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) { + bp = (ErlHeapFragment *) mp->data.dist_ext->ext_endp; + erts_cleanup_offheap(&bp->off_heap); } - *next_p = MSO(proc).first; - MSO(proc).first = bp->off_heap.first; - bp->off_heap.first = NULL; - OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead); + if (mp->data.dist_ext) + erts_free_dist_ext_copy(mp->data.dist_ext); } + else { + if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) + bp = mp->data.heap_frag; + else { + bp = mp->hfrag.next; + erts_cleanup_offheap(&mp->hfrag.off_heap); + } + if (bp) + free_message_buffer(bp); + } + fmp = mp; + mp = mp->next; + erts_free_message(fmp); } } -Eterm -erts_msg_distext2heap(Process *pp, - ErtsProcLocks *plcksp, - ErlHeapFragment **bpp, - Eterm *tokenp, - ErtsDistExternal *dist_extp) +ErtsMessage * +erts_realloc_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_size) { - Eterm msg; - Uint tok_sz = 0; - Eterm *hp = NULL; - Eterm *hp_end = NULL; - ErlOffHeap *ohp; - Sint sz; - - *bpp = NULL; - sz = erts_decode_dist_ext_size(dist_extp); - if (sz < 0) - goto decode_error; - if (is_not_nil(*tokenp)) { - ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp); - tok_sz = heap_frag->used_size; - sz += tok_sz; - } - if (pp) - hp = erts_alloc_message_heap(sz, bpp, &ohp, pp, plcksp); - else { - *bpp = new_message_buffer(sz); - hp = (*bpp)->mem; - ohp = &(*bpp)->off_heap; - } - hp_end = hp + sz; - msg = erts_decode_dist_ext(&hp, ohp, dist_extp); - if (is_non_value(msg)) - goto decode_error; - if (is_not_nil(*tokenp)) { - ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp); - *tokenp = copy_struct(*tokenp, tok_sz, &hp, ohp); - erts_cleanup_offheap(&heap_frag->off_heap); - } - erts_free_dist_ext_copy(dist_extp); - if (hp_end != hp) { - if (!(*bpp)) { - HRelease(pp, hp_end, hp); - } - else { - Uint final_size = hp - &(*bpp)->mem[0]; - Eterm brefs[2] = {msg, *tokenp}; - ASSERT(sz - (hp_end - hp) == final_size); - *bpp = erts_resize_message_buffer(*bpp, final_size, &brefs[0], 2); - msg = brefs[0]; - *tokenp = brefs[1]; - } + ErtsMessage *nmp = erts_realloc(ERTS_ALC_T_MSG, mp, + sizeof(ErtsMessage) + (sz - 1)*sizeof(Eterm)); + if (nmp != mp) { + Eterm *sp = &mp->hfrag.mem[0]; + Eterm *ep = sp + sz; + Sint offs = &nmp->hfrag.mem[0] - sp; + erts_offset_off_heap(&nmp->hfrag.off_heap, offs, sp, ep); + erts_offset_heap(&nmp->hfrag.mem[0], sz, offs, sp, ep); + if (brefs && brefs_size) + erts_offset_heap_ptr(brefs, brefs_size, offs, sp, ep); } - return msg; - decode_error: - if (is_not_nil(*tokenp)) { - ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp); - erts_cleanup_offheap(&heap_frag->off_heap); - } - erts_free_dist_ext_copy(dist_extp); - if (*bpp) { - free_message_buffer(*bpp); - *bpp = NULL; - } - else if (hp) { - HRelease(pp, hp_end, hp); - } - return THE_NON_VALUE; - } + nmp->hfrag.used_size = sz; + nmp->hfrag.alloc_size = sz; + + return nmp; +} void erts_queue_dist_message(Process *rcvr, - ErtsProcLocks *rcvr_locks, + ErtsProcLocks rcvr_locks, ErtsDistExternal *dist_ext, - Eterm token) + Eterm token, + Eterm from) { - ErlMessage* mp; + ErtsMessage* mp; #ifdef USE_VM_PROBES Sint tok_label = 0; Sint tok_lastcnt = 0; @@ -312,15 +267,25 @@ erts_queue_dist_message(Process *rcvr, erts_aint_t state; #endif - ERTS_SMP_LC_ASSERT(*rcvr_locks == erts_proc_lc_my_proc_locks(rcvr)); + ERTS_SMP_LC_ASSERT(rcvr_locks == erts_proc_lc_my_proc_locks(rcvr)); - mp = message_alloc(); + mp = erts_alloc_message(0, NULL); + mp->data.dist_ext = dist_ext; + + ERL_MESSAGE_TERM(mp) = THE_NON_VALUE; +#ifdef USE_VM_PROBES + ERL_MESSAGE_DT_UTAG(mp) = NIL; + if (token == am_have_dt_utag) + ERL_MESSAGE_TOKEN(mp) = NIL; + else +#endif + ERL_MESSAGE_TOKEN(mp) = token; #ifdef ERTS_SMP - if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) { + if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) { if (erts_smp_proc_trylock(rcvr, ERTS_PROC_LOCK_MSGQ) == EBUSY) { ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ; - if (*rcvr_locks & ERTS_PROC_LOCK_STATUS) { + if (rcvr_locks & ERTS_PROC_LOCK_STATUS) { erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_STATUS); need_locks |= ERTS_PROC_LOCK_STATUS; } @@ -330,71 +295,52 @@ erts_queue_dist_message(Process *rcvr, state = erts_smp_atomic32_read_acqb(&rcvr->state); if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) { - if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) + if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); /* Drop message if receiver is exiting or has a pending exit ... */ - if (is_not_nil(token)) { - ErlHeapFragment *heap_frag; - heap_frag = erts_dist_ext_trailer(mp->data.dist_ext); - erts_cleanup_offheap(&heap_frag->off_heap); - } - erts_free_dist_ext_copy(dist_ext); - message_free(mp); + erts_cleanup_messages(mp); } else #endif if (IS_TRACED_FL(rcvr, F_TRACE_RECEIVE)) { + if (from == am_Empty) + from = dist_ext->dep->sysname; + /* Ahh... need to decode it in order to trace it... */ - ErlHeapFragment *mbuf; - Eterm msg; - if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) + if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); - message_free(mp); - msg = erts_msg_distext2heap(rcvr, rcvr_locks, &mbuf, &token, dist_ext); - if (is_value(msg)) -#ifdef USE_VM_PROBES - if (DTRACE_ENABLED(message_queued)) { - DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); - - dtrace_proc_str(rcvr, receiver_name); - if (token != NIL && token != am_have_dt_utag) { - tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); - tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); - tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); - } - DTRACE6(message_queued, - receiver_name, size_object(msg), rcvr->msg.len, - tok_label, tok_lastcnt, tok_serial); - } -#endif - erts_queue_message(rcvr, rcvr_locks, mbuf, msg, token + if (!erts_decode_dist_message(rcvr, rcvr_locks, mp, 0)) + erts_free_message(mp); + else { + Eterm msg = ERL_MESSAGE_TERM(mp); + token = ERL_MESSAGE_TOKEN(mp); #ifdef USE_VM_PROBES - , NIL + if (DTRACE_ENABLED(message_queued)) { + DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); + + dtrace_proc_str(rcvr, receiver_name); + if (have_seqtrace(token)) { + tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); + tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); + tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); + } + DTRACE6(message_queued, + receiver_name, size_object(msg), rcvr->msg.len, + tok_label, tok_lastcnt, tok_serial); + } #endif - ); + erts_queue_message(rcvr, rcvr_locks, mp, msg, from); + } } else { /* Enqueue message on external format */ - ERL_MESSAGE_TERM(mp) = THE_NON_VALUE; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = NIL; - if (token == am_have_dt_utag) { - ERL_MESSAGE_TOKEN(mp) = NIL; - } else { -#endif - ERL_MESSAGE_TOKEN(mp) = token; -#ifdef USE_VM_PROBES - } -#endif - mp->next = NULL; - #ifdef USE_VM_PROBES if (DTRACE_ENABLED(message_queued)) { DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); dtrace_proc_str(rcvr, receiver_name); - if (token != NIL && token != am_have_dt_utag) { + if (have_seqtrace(token)) { tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); @@ -407,70 +353,73 @@ erts_queue_dist_message(Process *rcvr, tok_label, tok_lastcnt, tok_serial); } #endif - mp->data.dist_ext = dist_ext; - LINK_MESSAGE(rcvr, mp); - if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) + LINK_MESSAGE(rcvr, mp, &mp->next, 1); + + if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); - erts_proc_notify_new_message(rcvr); + erts_proc_notify_new_message(rcvr, +#ifdef ERTS_SMP + rcvr_locks +#else + 0 +#endif + ); } } -/* Add a message last in message queue */ +/* Add messages last in message queue */ static Sint -queue_message(Process *c_p, - Process* receiver, - ErtsProcLocks *receiver_locks, - erts_aint32_t *receiver_state, - ErlHeapFragment* bp, - Eterm message, - Eterm seq_trace_token -#ifdef USE_VM_PROBES - , Eterm dt_utag -#endif - ) +queue_messages(Process* receiver, + erts_aint32_t *receiver_state, + ErtsProcLocks receiver_locks, + ErtsMessage* first, + ErtsMessage** last, + Uint len, + Eterm from) { + ErtsTracingEvent* te; Sint res; - ErlMessage* mp; int locked_msgq = 0; - erts_aint_t state; + erts_aint32_t state; -#ifndef ERTS_SMP - ASSERT(bp != NULL || receiver->mbuf == NULL); -#endif - - ERTS_SMP_LC_ASSERT(*receiver_locks == erts_proc_lc_my_proc_locks(receiver)); - - mp = message_alloc(); - - if (receiver_state) - state = *receiver_state; - else - state = erts_smp_atomic32_read_acqb(&receiver->state); + ASSERT(is_value(ERL_MESSAGE_TERM(first))); + ASSERT(ERL_MESSAGE_TOKEN(first) == am_undefined || + ERL_MESSAGE_TOKEN(first) == NIL || + is_tuple(ERL_MESSAGE_TOKEN(first))); #ifdef ERTS_SMP +#ifdef ERTS_ENABLE_LOCK_CHECK + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(receiver) < ERTS_PROC_LOCK_MSGQ || + receiver_locks == erts_proc_lc_my_proc_locks(receiver)); +#endif - if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) - goto exiting; - - if (!(*receiver_locks & ERTS_PROC_LOCK_MSGQ)) { + if (!(receiver_locks & ERTS_PROC_LOCK_MSGQ)) { if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) { - ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ; - if (*receiver_locks & ERTS_PROC_LOCK_STATUS) { - erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS); - need_locks |= ERTS_PROC_LOCK_STATUS; + ErtsProcLocks need_locks; + + if (receiver_state) + state = *receiver_state; + else + state = erts_smp_atomic32_read_nob(&receiver->state); + if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) + goto exiting; + + need_locks = receiver_locks & (ERTS_PROC_LOCK_STATUS|ERTS_PROC_LOCK_TRACE); + if (need_locks) { + erts_smp_proc_unlock(receiver, need_locks); } + need_locks |= ERTS_PROC_LOCK_MSGQ; erts_smp_proc_lock(receiver, need_locks); } locked_msgq = 1; - state = erts_smp_atomic32_read_nob(&receiver->state); - if (receiver_state) - *receiver_state = state; } #endif + state = erts_smp_atomic32_read_nob(&receiver->state); + if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) { #ifdef ERTS_SMP exiting: @@ -478,25 +427,13 @@ queue_message(Process *c_p, /* Drop message if receiver is exiting or has a pending exit... */ if (locked_msgq) erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ); - if (bp) - free_message_buffer(bp); - message_free(mp); + erts_cleanup_messages(first); return 0; } - ERL_MESSAGE_TERM(mp) = message; - ERL_MESSAGE_TOKEN(mp) = seq_trace_token; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = dt_utag; -#endif - mp->next = NULL; - mp->data.heap_frag = bp; - -#ifndef ERTS_SMP res = receiver->msg.len; -#else - res = receiver->msg_inq.len; - if (*receiver_locks & ERTS_PROC_LOCK_MAIN) { +#ifdef ERTS_SMP + if (receiver_locks & ERTS_PROC_LOCK_MAIN) { /* * We move 'in queue' to 'private queue' and place * message at the end of 'private queue' in order @@ -505,313 +442,120 @@ queue_message(Process *c_p, * we don't need to include the 'in queue' in * the root set when garbage collecting. */ - res += receiver->msg.len; + res += receiver->msg_inq.len; ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); - LINK_MESSAGE_PRIVQ(receiver, mp); + LINK_MESSAGE_PRIVQ(receiver, first, last, len); } else #endif { - LINK_MESSAGE(receiver, mp); + LINK_MESSAGE(receiver, first, last, len); } + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE) + && (te = &erts_receive_tracing[erts_active_bp_ix()], + te->on)) { + + ErtsMessage *msg = first; + #ifdef USE_VM_PROBES - if (DTRACE_ENABLED(message_queued)) { - DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); - Sint tok_label = 0; - Sint tok_lastcnt = 0; - Sint tok_serial = 0; - - dtrace_proc_str(receiver, receiver_name); - if (seq_trace_token != NIL && is_tuple(seq_trace_token)) { - tok_label = signed_val(SEQ_TRACE_T_LABEL(seq_trace_token)); - tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(seq_trace_token)); - tok_serial = signed_val(SEQ_TRACE_T_SERIAL(seq_trace_token)); + if (DTRACE_ENABLED(message_queued)) { + DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); + Sint tok_label = 0; + Sint tok_lastcnt = 0; + Sint tok_serial = 0; + Eterm seq_trace_token = ERL_MESSAGE_TOKEN(msg); + + dtrace_proc_str(receiver, receiver_name); + if (seq_trace_token != NIL && is_tuple(seq_trace_token)) { + tok_label = signed_val(SEQ_TRACE_T_LABEL(seq_trace_token)); + tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(seq_trace_token)); + tok_serial = signed_val(SEQ_TRACE_T_SERIAL(seq_trace_token)); + } + DTRACE6(message_queued, + receiver_name, size_object(ERL_MESSAGE_TERM(msg)), + receiver->msg.len, + tok_label, tok_lastcnt, tok_serial); } - DTRACE6(message_queued, - receiver_name, size_object(message), receiver->msg.len, - tok_label, tok_lastcnt, tok_serial); - } #endif + while (msg) { + trace_receive(receiver, from, ERL_MESSAGE_TERM(msg), te); + msg = msg->next; + } - if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) - trace_receive(receiver, message); - - if (locked_msgq) + } + if (locked_msgq) { erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ); + } - erts_proc_notify_new_message(receiver); - -#ifndef ERTS_SMP - ERTS_HOLE_CHECK(receiver); +#ifdef ERTS_SMP + erts_proc_notify_new_message(receiver, receiver_locks); +#else + erts_proc_notify_new_message(receiver, 0); #endif return res; } -void -erts_queue_message(Process* receiver, - ErtsProcLocks *receiver_locks, - ErlHeapFragment* bp, - Eterm message, - Eterm seq_trace_token -#ifdef USE_VM_PROBES - , Eterm dt_utag -#endif - ) +static Sint +queue_message(Process* receiver, + erts_aint32_t *receiver_state, + ErtsProcLocks receiver_locks, + ErtsMessage* mp, Eterm msg, Eterm from) { - queue_message(NULL, - receiver, - receiver_locks, - NULL, - bp, - message, - seq_trace_token -#ifdef USE_VM_PROBES - , dt_utag -#endif - ); + ERL_MESSAGE_TERM(mp) = msg; + return queue_messages(receiver, receiver_state, receiver_locks, + mp, &mp->next, 1, from); } -void -erts_link_mbuf_to_proc(struct process *proc, ErlHeapFragment *bp) +Sint +erts_queue_message(Process* receiver, ErtsProcLocks receiver_locks, + ErtsMessage* mp, Eterm msg, Eterm from) { - Eterm* htop = HEAP_TOP(proc); + return queue_message(receiver, NULL, receiver_locks, mp, msg, from); +} - link_mbuf_to_proc(proc, bp); - if (htop < HEAP_LIMIT(proc)) { - *htop = make_pos_bignum_header(HEAP_LIMIT(proc)-htop-1); - HEAP_TOP(proc) = HEAP_LIMIT(proc); - } + +Sint +erts_queue_messages(Process* receiver, ErtsProcLocks receiver_locks, + ErtsMessage* first, ErtsMessage** last, Uint len, + Eterm from) +{ + return queue_messages(receiver, NULL, receiver_locks, + first, last, len, from); } -/* - * Moves content of message buffer attached to a message into a heap. - * The message buffer is deallocated. - */ void -erts_move_msg_mbuf_to_heap(Eterm** hpp, ErlOffHeap* off_heap, ErlMessage *msg) +erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp) { - struct erl_off_heap_header* oh; - Eterm term, token, *fhp, *hp; - Sint offs; - Uint sz; - ErlHeapFragment *bp; -#ifdef USE_VM_PROBES - Eterm utag; -#endif - -#ifdef HARD_DEBUG - struct erl_off_heap_header* dbg_oh_start = off_heap->first; - Eterm dbg_term, dbg_token; - ErlHeapFragment *dbg_bp; - Uint *dbg_hp, *dbg_thp_start; - Uint dbg_term_sz, dbg_token_sz; -#ifdef USE_VM_PROBES - Eterm dbg_utag; - Uint dbg_utag_sz; -#endif -#endif - - bp = msg->data.heap_frag; - term = ERL_MESSAGE_TERM(msg); - token = ERL_MESSAGE_TOKEN(msg); -#ifdef USE_VM_PROBES - utag = ERL_MESSAGE_DT_UTAG(msg); -#endif - if (!bp) { -#ifdef USE_VM_PROBES - ASSERT(is_immed(term) && is_immed(token) && is_immed(utag)); -#else - ASSERT(is_immed(term) && is_immed(token)); -#endif - return; - } - -#ifdef HARD_DEBUG - dbg_term_sz = size_object(term); - dbg_token_sz = size_object(token); - dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz); -#ifdef USE_VM_PROBES - dbg_utag_sz = size_object(utag); - dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz + dbg_utag_sz ); -#endif - /*ASSERT(dbg_term_sz + dbg_token_sz == erts_msg_used_frag_sz(msg)); - Copied size may be smaller due to removed SubBins's or garbage. - Copied size may be larger due to duplicated shared terms. - */ - dbg_hp = dbg_bp->mem; - dbg_term = copy_struct(term, dbg_term_sz, &dbg_hp, &dbg_bp->off_heap); - dbg_token = copy_struct(token, dbg_token_sz, &dbg_hp, &dbg_bp->off_heap); -#ifdef USE_VM_PROBES - dbg_utag = copy_struct(utag, dbg_utag_sz, &dbg_hp, &dbg_bp->off_heap); -#endif - dbg_thp_start = *hpp; -#endif - - if (bp->next != NULL) { - move_multi_frags(hpp, off_heap, bp, msg->m, -#ifdef USE_VM_PROBES - 3 -#else - 2 -#endif - ); - goto copy_done; - } - - OH_OVERHEAD(off_heap, bp->off_heap.overhead); - sz = bp->used_size; - - ASSERT(is_immed(term) || in_heapfrag(ptr_val(term),bp)); - ASSERT(is_immed(token) || in_heapfrag(ptr_val(token),bp)); - - fhp = bp->mem; - hp = *hpp; - offs = hp - fhp; - - oh = NULL; - while (sz--) { - Uint cpy_sz; - Eterm val = *fhp++; - - switch (primary_tag(val)) { - case TAG_PRIMARY_IMMED1: - *hp++ = val; - break; - case TAG_PRIMARY_LIST: - case TAG_PRIMARY_BOXED: - ASSERT(in_heapfrag(ptr_val(val), bp)); - *hp++ = offset_ptr(val, offs); - break; - case TAG_PRIMARY_HEADER: - *hp++ = val; - switch (val & _HEADER_SUBTAG_MASK) { - case ARITYVAL_SUBTAG: - break; - case REFC_BINARY_SUBTAG: - case FUN_SUBTAG: - case EXTERNAL_PID_SUBTAG: - case EXTERNAL_PORT_SUBTAG: - case EXTERNAL_REF_SUBTAG: - oh = (struct erl_off_heap_header*) (hp-1); - cpy_sz = thing_arityval(val); - goto cpy_words; - default: - cpy_sz = header_arity(val); - - cpy_words: - ASSERT(sz >= cpy_sz); - sz -= cpy_sz; - while (cpy_sz >= 8) { - cpy_sz -= 8; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - } - switch (cpy_sz) { - case 7: *hp++ = *fhp++; - case 6: *hp++ = *fhp++; - case 5: *hp++ = *fhp++; - case 4: *hp++ = *fhp++; - case 3: *hp++ = *fhp++; - case 2: *hp++ = *fhp++; - case 1: *hp++ = *fhp++; - default: break; - } - if (oh) { - /* Add to offheap list */ - oh->next = off_heap->first; - off_heap->first = oh; - ASSERT(*hpp <= (Eterm*)oh); - ASSERT(hp > (Eterm*)oh); - oh = NULL; + if (first_bp) { + ErlHeapFragment *bp = first_bp; + + while (1) { + /* Move any off_heap's into the process */ + if (bp->off_heap.first != NULL) { + struct erl_off_heap_header** next_p = &bp->off_heap.first; + while (*next_p != NULL) { + next_p = &((*next_p)->next); } - break; + *next_p = MSO(proc).first; + MSO(proc).first = bp->off_heap.first; + bp->off_heap.first = NULL; + OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead); } - break; + MBUF_SIZE(proc) += bp->used_size; + if (!bp->next) + break; + bp = bp->next; } - } - - ASSERT(bp->used_size == hp - *hpp); - *hpp = hp; - if (is_not_immed(token)) { - ASSERT(in_heapfrag(ptr_val(token), bp)); - ERL_MESSAGE_TOKEN(msg) = offset_ptr(token, offs); -#ifdef HARD_DEBUG - ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TOKEN(msg))); - ASSERT(hp > ptr_val(ERL_MESSAGE_TOKEN(msg))); -#endif - } - - if (is_not_immed(term)) { - ASSERT(in_heapfrag(ptr_val(term),bp)); - ERL_MESSAGE_TERM(msg) = offset_ptr(term, offs); -#ifdef HARD_DEBUG - ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TERM(msg))); - ASSERT(hp > ptr_val(ERL_MESSAGE_TERM(msg))); -#endif - } -#ifdef USE_VM_PROBES - if (is_not_immed(utag)) { - ASSERT(in_heapfrag(ptr_val(utag), bp)); - ERL_MESSAGE_DT_UTAG(msg) = offset_ptr(utag, offs); -#ifdef HARD_DEBUG - ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_DT_UTAG(msg))); - ASSERT(hp > ptr_val(ERL_MESSAGE_DT_UTAG(msg))); -#endif - } -#endif - -copy_done: - -#ifdef HARD_DEBUG - { - int i, j; - ErlHeapFragment* frag; - { - struct erl_off_heap_header* dbg_oh = off_heap->first; - i = j = 0; - while (dbg_oh != dbg_oh_start) { - dbg_oh = dbg_oh->next; - i++; - } - for (frag=bp; frag; frag=frag->next) { - dbg_oh = frag->off_heap.first; - while (dbg_oh) { - dbg_oh = dbg_oh->next; - j++; - } - } - ASSERT(i == j); - } + /* Link the message buffer */ + bp->next = MBUF(proc); + MBUF(proc) = first_bp; } -#endif - - - bp->off_heap.first = NULL; - free_message_buffer(bp); - msg->data.heap_frag = NULL; - -#ifdef HARD_DEBUG - ASSERT(eq(ERL_MESSAGE_TERM(msg), dbg_term)); - ASSERT(eq(ERL_MESSAGE_TOKEN(msg), dbg_token)); -#ifdef USE_VM_PROBES - ASSERT(eq(ERL_MESSAGE_DT_UTAG(msg), dbg_utag)); -#endif - free_message_buffer(dbg_bp); -#endif - } - Uint -erts_msg_attached_data_size_aux(ErlMessage *msg) +erts_msg_attached_data_size_aux(ErtsMessage *msg) { Sint sz; ASSERT(is_non_value(ERL_MESSAGE_TERM(msg))); @@ -840,29 +584,89 @@ erts_msg_attached_data_size_aux(ErlMessage *msg) return sz; } -void -erts_move_msg_attached_data_to_heap(Eterm **hpp, ErlOffHeap *ohp, ErlMessage *msg) +ErtsMessage * +erts_try_alloc_message_on_heap(Process *pp, + erts_aint32_t *psp, + ErtsProcLocks *plp, + Uint sz, + Eterm **hpp, + ErlOffHeap **ohpp, + int *on_heap_p) { - if (is_value(ERL_MESSAGE_TERM(msg))) - erts_move_msg_mbuf_to_heap(hpp, ohp, msg); - else if (msg->data.dist_ext) { - ASSERT(msg->data.dist_ext->heap_size >= 0); - if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) { - ErlHeapFragment *heap_frag; - heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); - ERL_MESSAGE_TOKEN(msg) = copy_struct(ERL_MESSAGE_TOKEN(msg), - heap_frag->used_size, - hpp, - ohp); - erts_cleanup_offheap(&heap_frag->off_heap); +#ifdef ERTS_SMP + int locked_main = 0; +#endif + ErtsMessage *mp; + + ASSERT(!(*psp & ERTS_PSFLG_OFF_HEAP_MSGQ)); + + if ((*psp) & ERTS_PSFLGS_VOLATILE_HEAP) + goto in_message_fragment; + else if ( +#if defined(ERTS_SMP) + *plp & ERTS_PROC_LOCK_MAIN +#else + pp +#endif + ) { +#ifdef ERTS_SMP + try_on_heap: +#endif + if (((*psp) & ERTS_PSFLGS_VOLATILE_HEAP) + || (pp->flags & F_DISABLE_GC) + || HEAP_LIMIT(pp) - HEAP_TOP(pp) <= sz) { + /* + * The heap is either potentially in an inconsistent + * state, or not large enough. + */ +#ifdef ERTS_SMP + if (locked_main) { + *plp &= ~ERTS_PROC_LOCK_MAIN; + erts_smp_proc_unlock(pp, ERTS_PROC_LOCK_MAIN); + } +#endif + goto in_message_fragment; } - ERL_MESSAGE_TERM(msg) = erts_decode_dist_ext(hpp, - ohp, - msg->data.dist_ext); - erts_free_dist_ext_copy(msg->data.dist_ext); - msg->data.dist_ext = NULL; + + *hpp = HEAP_TOP(pp); + HEAP_TOP(pp) = *hpp + sz; + *ohpp = &MSO(pp); + mp = erts_alloc_message(0, NULL); + mp->data.attached = NULL; + *on_heap_p = !0; } - /* else: bad external detected when calculating size */ +#ifdef ERTS_SMP + else if (pp && erts_smp_proc_trylock(pp, ERTS_PROC_LOCK_MAIN) == 0) { + locked_main = 1; + *psp = erts_smp_atomic32_read_nob(&pp->state); + *plp |= ERTS_PROC_LOCK_MAIN; + goto try_on_heap; + } +#endif + else { + in_message_fragment: + if (!((*psp) & ERTS_PSFLG_ON_HEAP_MSGQ)) { + mp = erts_alloc_message(sz, hpp); + *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; + } + else { + mp = erts_alloc_message(0, NULL); + if (!sz) { + *hpp = NULL; + *ohpp = NULL; + } + else { + ErlHeapFragment *bp; + bp = new_message_buffer(sz); + *hpp = &bp->mem[0]; + mp->data.heap_frag = bp; + *ohpp = &bp->off_heap; + } + } + *on_heap_p = 0; + } + + return mp; } /* @@ -877,7 +681,8 @@ erts_send_message(Process* sender, unsigned flags) { Uint msize; - ErlHeapFragment* bp = NULL; + ErtsMessage* mp; + ErlOffHeap *ohp; Eterm token = NIL; Sint res = 0; #ifdef USE_VM_PROBES @@ -886,78 +691,99 @@ erts_send_message(Process* sender, Sint tok_label = 0; Sint tok_lastcnt = 0; Sint tok_serial = 0; + Eterm utag = NIL; +#endif + erts_aint32_t receiver_state; +#ifdef SHCOPY_SEND + erts_shcopy_t info; #endif BM_STOP_TIMER(system); BM_MESSAGE(message,sender,receiver); BM_START_TIMER(send); - #ifdef USE_VM_PROBES +#ifdef USE_VM_PROBES *sender_name = *receiver_name = '\0'; - if (DTRACE_ENABLED(message_send)) { - erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), "%T", sender->common.id); - erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)), "%T", receiver->common.id); + if (DTRACE_ENABLED(message_send)) { + erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), + "%T", sender->common.id); + erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)), + "%T", receiver->common.id); } #endif + + receiver_state = erts_smp_atomic32_read_nob(&receiver->state); + if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) { Eterm* hp; Eterm stoken = SEQ_TRACE_TOKEN(sender); Uint seq_trace_size = 0; #ifdef USE_VM_PROBES Uint dt_utag_size = 0; - Eterm utag = NIL; -#endif - - BM_SWAP_TIMER(send,size); - msize = size_object(message); - BM_SWAP_TIMER(size,send); - -#ifdef USE_VM_PROBES - if (stoken != am_have_dt_utag) { #endif + BM_SWAP_TIMER(send,size); + /* SHCOPY corrupts the heap between + * copy_shared_calculate, and + * copy_shared_perform. (it inserts move_markers like the gc). + * Make sure we don't use the heap between those instances. + */ + if (have_seqtrace(stoken)) { seq_trace_update_send(sender); seq_trace_output(stoken, message, SEQ_TRACE_SEND, receiver->common.id, sender); seq_trace_size = 6; /* TUPLE5 */ -#ifdef USE_VM_PROBES - } - if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) { - dt_utag_size = size_object(DT_UTAG(sender)); - } else if (stoken == am_have_dt_utag ) { - stoken = NIL; } +#ifdef USE_VM_PROBES + if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) { + dt_utag_size = size_object(DT_UTAG(sender)); + } else if (stoken == am_have_dt_utag ) { + stoken = NIL; + } #endif - bp = new_message_buffer(msize + seq_trace_size +#ifdef SHCOPY_SEND + INITIALIZE_SHCOPY(info); + msize = copy_shared_calculate(message, &info); +#else + msize = size_object(message); +#endif + BM_SWAP_TIMER(size,send); + + mp = erts_alloc_message_heap_state(receiver, + &receiver_state, + receiver_locks, + (msize #ifdef USE_VM_PROBES - + dt_utag_size + + dt_utag_size #endif - ); - hp = bp->mem; + + seq_trace_size), + &hp, + &ohp); BM_SWAP_TIMER(send,copy); - token = copy_struct(stoken, - seq_trace_size, - &hp, - &bp->off_heap); - message = copy_struct(message, msize, &hp, &bp->off_heap); -#ifdef USE_VM_PROBES - if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) { - utag = copy_struct(DT_UTAG(sender), dt_utag_size, &hp, &bp->off_heap); -#ifdef DTRACE_TAG_HARDDEBUG - erts_fprintf(stderr, - "Dtrace -> (%T) Spreading tag (%T) with " - "message %T!\r\n",sender->common.id, utag, message); -#endif - } +#ifdef SHCOPY_SEND + if (is_not_immed(message)) + message = copy_shared_perform(message, msize, &info, &hp, ohp); + DESTROY_SHCOPY(info); +#else + if (is_not_immed(message)) + message = copy_struct(message, msize, &hp, ohp); #endif - BM_MESSAGE_COPIED(msize); - BM_SWAP_TIMER(copy,send); + if (is_immed(stoken)) + token = stoken; + else + token = copy_struct(stoken, seq_trace_size, &hp, ohp); #ifdef USE_VM_PROBES + if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) { + if (is_immed(DT_UTAG(sender))) + utag = DT_UTAG(sender); + else + utag = copy_struct(DT_UTAG(sender), dt_utag_size, &hp, ohp); + } if (DTRACE_ENABLED(message_send)) { - if (stoken != NIL && stoken != am_have_dt_utag) { + if (have_seqtrace(stoken)) { tok_label = signed_val(SEQ_TRACE_T_LABEL(stoken)); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(stoken)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(stoken)); @@ -966,140 +792,66 @@ erts_send_message(Process* sender, msize, tok_label, tok_lastcnt, tok_serial); } #endif - res = queue_message(NULL, - receiver, - receiver_locks, - NULL, - bp, - message, - token -#ifdef USE_VM_PROBES - , utag -#endif - ); - BM_SWAP_TIMER(send,system); - } else if (sender == receiver) { - /* Drop message if receiver has a pending exit ... */ -#ifdef ERTS_SMP - ErtsProcLocks need_locks = (~(*receiver_locks) - & (ERTS_PROC_LOCK_MSGQ - | ERTS_PROC_LOCK_STATUS)); - if (need_locks) { - *receiver_locks |= need_locks; - if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) { - if (need_locks == ERTS_PROC_LOCK_MSGQ) { - erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS); - need_locks = ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS; - } - erts_smp_proc_lock(receiver, need_locks); - } - } - if (!ERTS_PROC_PENDING_EXIT(receiver)) -#endif - { - ErlMessage* mp = message_alloc(); - - DTRACE6(message_send, sender_name, receiver_name, - size_object(message), tok_label, tok_lastcnt, tok_serial); - mp->data.attached = NULL; - ERL_MESSAGE_TERM(mp) = message; - ERL_MESSAGE_TOKEN(mp) = NIL; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = NIL; -#endif - mp->next = NULL; - /* - * We move 'in queue' to 'private queue' and place - * message at the end of 'private queue' in order - * to ensure that the 'in queue' doesn't contain - * references into the heap. By ensuring this, - * we don't need to include the 'in queue' in - * the root set when garbage collecting. - */ - - ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); - LINK_MESSAGE_PRIVQ(receiver, mp); - - res = receiver->msg.len; + BM_MESSAGE_COPIED(msize); + BM_SWAP_TIMER(copy,send); - if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { - trace_receive(receiver, message); - } - } - BM_SWAP_TIMER(send,system); } else { -#ifdef ERTS_SMP - ErlOffHeap *ohp; Eterm *hp; - erts_aint32_t state; - - BM_SWAP_TIMER(send,size); - msize = size_object(message); - BM_SWAP_TIMER(size,send); - hp = erts_alloc_message_heap_state(msize, - &bp, - &ohp, - receiver, - receiver_locks, - &state); - BM_SWAP_TIMER(send,copy); - message = copy_struct(message, msize, &hp, ohp); - BM_MESSAGE_COPIED(msz); - BM_SWAP_TIMER(copy,send); - DTRACE6(message_send, sender_name, receiver_name, - msize, tok_label, tok_lastcnt, tok_serial); - res = queue_message(sender, - receiver, - receiver_locks, - &state, - bp, - message, - token -#ifdef USE_VM_PROBES - , NIL + + if (receiver == sender && !(receiver_state & ERTS_PSFLG_OFF_HEAP_MSGQ)) { + mp = erts_alloc_message(0, NULL); + msize = 0; + } + else { + BM_SWAP_TIMER(send,size); +#ifdef SHCOPY_SEND + INITIALIZE_SHCOPY(info); + msize = copy_shared_calculate(message, &info); +#else + msize = size_object(message); #endif - ); - BM_SWAP_TIMER(send,system); + BM_SWAP_TIMER(size,send); + + mp = erts_alloc_message_heap_state(receiver, + &receiver_state, + receiver_locks, + msize, + &hp, + &ohp); + BM_SWAP_TIMER(send,copy); +#ifdef SHCOPY_SEND + if (is_not_immed(message)) + message = copy_shared_perform(message, msize, &info, &hp, ohp); + DESTROY_SHCOPY(info); #else - ErlMessage* mp = message_alloc(); - Eterm *hp; - BM_SWAP_TIMER(send,size); - msize = size_object(message); - BM_SWAP_TIMER(size,send); - - if (receiver->stop - receiver->htop <= msize) { - BM_SWAP_TIMER(send,system); - erts_garbage_collect(receiver, msize, receiver->arg_reg, receiver->arity); - BM_SWAP_TIMER(system,send); + if (is_not_immed(message)) + message = copy_struct(message, msize, &hp, ohp); +#endif + BM_MESSAGE_COPIED(msz); + BM_SWAP_TIMER(copy,send); } - hp = receiver->htop; - receiver->htop = hp + msize; - BM_SWAP_TIMER(send,copy); - message = copy_struct(message, msize, &hp, &receiver->off_heap); - BM_MESSAGE_COPIED(msize); - BM_SWAP_TIMER(copy,send); - DTRACE6(message_send, sender_name, receiver_name, - (uint32_t)msize, tok_label, tok_lastcnt, tok_serial); - ERL_MESSAGE_TERM(mp) = message; - ERL_MESSAGE_TOKEN(mp) = NIL; #ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = NIL; + DTRACE6(message_send, sender_name, receiver_name, + msize, tok_label, tok_lastcnt, tok_serial); #endif - mp->next = NULL; - mp->data.attached = NULL; - LINK_MESSAGE(receiver, mp); - res = receiver->msg.len; - erts_proc_notify_new_message(receiver); - - if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { - trace_receive(receiver, message); - } - BM_SWAP_TIMER(send,system); -#endif /* #ifndef ERTS_SMP */ } - return res; + + ERL_MESSAGE_TOKEN(mp) = token; +#ifdef USE_VM_PROBES + ERL_MESSAGE_DT_UTAG(mp) = utag; +#endif + res = queue_message(receiver, + &receiver_state, + *receiver_locks, + mp, message, + sender->common.id); + + BM_SWAP_TIMER(send,system); + + return res; } + /* * This function delivers an EXIT message to a process * which is trapping EXITs. @@ -1117,53 +869,995 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp, Uint sz_from; Eterm* hp; Eterm temptoken; - ErlHeapFragment* bp = NULL; - - if (token != NIL -#ifdef USE_VM_PROBES - && token != am_have_dt_utag + ErtsMessage* mp; + ErlOffHeap *ohp; +#ifdef SHCOPY_SEND + erts_shcopy_t info; #endif - ) { + if (have_seqtrace(token)) { ASSERT(is_tuple(token)); - sz_reason = size_object(reason); sz_token = size_object(token); sz_from = size_object(from); - bp = new_message_buffer(sz_reason + sz_from + sz_token + 4); - hp = bp->mem; - mess = copy_struct(reason, sz_reason, &hp, &bp->off_heap); - from_copy = copy_struct(from, sz_from, &hp, &bp->off_heap); +#ifdef SHCOPY_SEND + INITIALIZE_SHCOPY(info); + sz_reason = copy_shared_calculate(reason, &info); +#else + sz_reason = size_object(reason); +#endif + mp = erts_alloc_message_heap(to, to_locksp, + sz_reason + sz_from + sz_token + 4, + &hp, &ohp); +#ifdef SHCOPY_SEND + mess = copy_shared_perform(reason, sz_reason, &info, &hp, ohp); + DESTROY_SHCOPY(info); +#else + mess = copy_struct(reason, sz_reason, &hp, ohp); +#endif + from_copy = copy_struct(from, sz_from, &hp, ohp); save = TUPLE3(hp, am_EXIT, from_copy, mess); hp += 4; /* the trace token must in this case be updated by the caller */ seq_trace_output(token, save, SEQ_TRACE_SEND, to->common.id, NULL); - temptoken = copy_struct(token, sz_token, &hp, &bp->off_heap); - erts_queue_message(to, to_locksp, bp, save, temptoken -#ifdef USE_VM_PROBES - , NIL -#endif - ); + temptoken = copy_struct(token, sz_token, &hp, ohp); + ERL_MESSAGE_TOKEN(mp) = temptoken; + erts_queue_message(to, *to_locksp, mp, save, am_system); } else { - ErlOffHeap *ohp; - sz_reason = size_object(reason); sz_from = IS_CONST(from) ? 0 : size_object(from); +#ifdef SHCOPY_SEND + INITIALIZE_SHCOPY(info); + sz_reason = copy_shared_calculate(reason, &info); +#else + sz_reason = size_object(reason); +#endif + mp = erts_alloc_message_heap(to, to_locksp, + sz_reason+sz_from+4, &hp, &ohp); - hp = erts_alloc_message_heap(sz_reason+sz_from+4, - &bp, - &ohp, - to, - to_locksp); - +#ifdef SHCOPY_SEND + mess = copy_shared_perform(reason, sz_reason, &info, &hp, ohp); + DESTROY_SHCOPY(info); +#else mess = copy_struct(reason, sz_reason, &hp, ohp); +#endif from_copy = (IS_CONST(from) ? from : copy_struct(from, sz_from, &hp, ohp)); save = TUPLE3(hp, am_EXIT, from_copy, mess); - erts_queue_message(to, to_locksp, bp, save, NIL + erts_queue_message(to, *to_locksp, mp, save, am_system); + } +} + +void erts_save_message_in_proc(Process *p, ErtsMessage *msgp) +{ + ErlHeapFragment *hfp; + + if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) + hfp = &msgp->hfrag; + else if (msgp->data.attached) { + hfp = msgp->data.heap_frag; + } + else { + erts_free_message(msgp); + return; /* Nothing to save */ + } + + while (1) { + struct erl_off_heap_header *ohhp = hfp->off_heap.first; + if (ohhp) { + for ( ; ohhp->next; ohhp = ohhp->next) + ; + ohhp->next = p->off_heap.first; + p->off_heap.first = hfp->off_heap.first; + hfp->off_heap.first = NULL; + } + p->off_heap.overhead += hfp->off_heap.overhead; + hfp->off_heap.overhead = 0; + p->mbuf_sz += hfp->used_size; + + if (!hfp->next) + break; + hfp = hfp->next; + } + + msgp->next = p->msg_frag; + p->msg_frag = msgp; +} + +Sint +erts_move_messages_off_heap(Process *c_p) +{ + int reds = 1; + /* + * Move all messages off heap. This *only* occurs when the + * process had off heap message disabled and just enabled + * it... + */ + ErtsMessage *mp; + + reds += c_p->msg.len / 10; + + ASSERT(erts_smp_atomic32_read_nob(&c_p->state) + & ERTS_PSFLG_OFF_HEAP_MSGQ); + ASSERT(c_p->flags & F_OFF_HEAP_MSGQ_CHNG); + + for (mp = c_p->msg.first; mp; mp = mp->next) { + Uint msg_sz, token_sz; +#ifdef USE_VM_PROBES + Uint utag_sz; +#endif + Eterm *hp; + ErlHeapFragment *hfrag; + + if (mp->data.attached) + continue; + + if (is_immed(ERL_MESSAGE_TERM(mp)) +#ifdef USE_VM_PROBES + && is_immed(ERL_MESSAGE_DT_UTAG(mp)) +#endif + && is_not_immed(ERL_MESSAGE_TOKEN(mp))) + continue; + + /* + * The message refers into the heap. Copy the message + * from the heap into a heap fragment and attach + * it to the message... + */ + msg_sz = size_object(ERL_MESSAGE_TERM(mp)); +#ifdef USE_VM_PROBES + utag_sz = size_object(ERL_MESSAGE_DT_UTAG(mp)); +#endif + token_sz = size_object(ERL_MESSAGE_TOKEN(mp)); + + hfrag = new_message_buffer(msg_sz +#ifdef USE_VM_PROBES + + utag_sz +#endif + + token_sz); + hp = hfrag->mem; + if (is_not_immed(ERL_MESSAGE_TERM(mp))) + ERL_MESSAGE_TERM(mp) = copy_struct(ERL_MESSAGE_TERM(mp), + msg_sz, &hp, + &hfrag->off_heap); + if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) + ERL_MESSAGE_TOKEN(mp) = copy_struct(ERL_MESSAGE_TOKEN(mp), + token_sz, &hp, + &hfrag->off_heap); #ifdef USE_VM_PROBES - , NIL + if (is_not_immed(ERL_MESSAGE_DT_UTAG(mp))) + ERL_MESSAGE_DT_UTAG(mp) = copy_struct(ERL_MESSAGE_DT_UTAG(mp), + utag_sz, &hp, + &hfrag->off_heap); #endif - ); + mp->data.heap_frag = hfrag; + reds += 1; + } + + return reds; +} + +Sint +erts_complete_off_heap_message_queue_change(Process *c_p) +{ + int reds = 1; + + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); + ASSERT(c_p->flags & F_OFF_HEAP_MSGQ_CHNG); + ASSERT(erts_smp_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ); + + /* + * This job was first initiated when the process changed to off heap + * message queue management. Since then ERTS_PSFLG_OFF_HEAP_MSGQ + * has been set. However, the management state might have been changed + * again (multiple times) since then. Check users last requested state + * (the flags F_OFF_HEAP_MSGQ, and F_ON_HEAP_MSGQ), and make the state + * consistent with that. + */ + + if (!(c_p->flags & F_OFF_HEAP_MSGQ)) + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_OFF_HEAP_MSGQ); + else { + reds += 2; + erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p); + erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); + reds += erts_move_messages_off_heap(c_p); } + c_p->flags &= ~F_OFF_HEAP_MSGQ_CHNG; + return reds; +} + +typedef struct { + Eterm pid; + ErtsThrPrgrLaterOp lop; +} ErtsChangeOffHeapMessageQueue; + +static void +change_off_heap_msgq(void *vcohmq) +{ + ErtsChangeOffHeapMessageQueue *cohmq; + /* + * Now we've waited thread progress which ensures that all + * messages to the process are enqueued off heap. Schedule + * completion of this change as a system task on the process + * itself. This in order to avoid lock contention on its + * main lock. We will be called in + * erts_complete_off_heap_message_queue_change() (above) when + * the system task has been selected for execution. + */ + cohmq = (ErtsChangeOffHeapMessageQueue *) vcohmq; + erts_schedule_complete_off_heap_message_queue_change(cohmq->pid); + erts_free(ERTS_ALC_T_MSGQ_CHNG, vcohmq); } +Eterm +erts_change_message_queue_management(Process *c_p, Eterm new_state) +{ + Eterm res; + +#ifdef DEBUG + if (c_p->flags & F_OFF_HEAP_MSGQ) { + ASSERT(erts_smp_atomic32_read_nob(&c_p->state) + & ERTS_PSFLG_OFF_HEAP_MSGQ); + } + else { + if (c_p->flags & F_OFF_HEAP_MSGQ_CHNG) { + ASSERT(erts_smp_atomic32_read_nob(&c_p->state) + & ERTS_PSFLG_OFF_HEAP_MSGQ); + } + else { + ASSERT(!(erts_smp_atomic32_read_nob(&c_p->state) + & ERTS_PSFLG_OFF_HEAP_MSGQ)); + } + } +#endif + + switch (c_p->flags & (F_OFF_HEAP_MSGQ|F_ON_HEAP_MSGQ)) { + + case F_OFF_HEAP_MSGQ: + res = am_off_heap; + + switch (new_state) { + case am_off_heap: + break; + case am_on_heap: + c_p->flags |= F_ON_HEAP_MSGQ; + c_p->flags &= ~F_OFF_HEAP_MSGQ; + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_ON_HEAP_MSGQ); + /* + * We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ + * if a off heap change is ongoing. It will be adjusted + * when the change completes... + */ + if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { + /* Safe to clear ERTS_PSFLG_OFF_HEAP_MSGQ... */ + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_OFF_HEAP_MSGQ); + } + break; + default: + res = THE_NON_VALUE; /* badarg */ + break; + } + break; + + case F_ON_HEAP_MSGQ: + res = am_on_heap; + + switch (new_state) { + case am_on_heap: + break; + case am_off_heap: + c_p->flags &= ~F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_ON_HEAP_MSGQ); + goto change_to_off_heap; + default: + res = THE_NON_VALUE; /* badarg */ + break; + } + break; + + default: + res = am_error; + ERTS_INTERNAL_ERROR("Inconsistent message queue management state"); + break; + } + + return res; + +change_to_off_heap: + + c_p->flags |= F_OFF_HEAP_MSGQ; + + /* + * We do not have to schedule a change if + * we have an ongoing off heap change... + */ + if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { + ErtsChangeOffHeapMessageQueue *cohmq; + /* + * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait + * thread progress before completing the change in + * order to ensure that all senders observe that + * messages should be passed off heap. When the + * change has completed, GC does not need to inspect + * the message queue at all. + */ + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_OFF_HEAP_MSGQ); + c_p->flags |= F_OFF_HEAP_MSGQ_CHNG; + cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG, + sizeof(ErtsChangeOffHeapMessageQueue)); + cohmq->pid = c_p->common.id; + erts_schedule_thr_prgr_later_op(change_off_heap_msgq, + (void *) cohmq, + &cohmq->lop); + } + + return res; +} + +int +erts_decode_dist_message(Process *proc, ErtsProcLocks proc_locks, + ErtsMessage *msgp, int force_off_heap) +{ + ErtsHeapFactory factory; + Eterm msg; + ErlHeapFragment *bp; + Sint need; + int decode_in_heap_frag; + + decode_in_heap_frag = (force_off_heap + || !(proc_locks & ERTS_PROC_LOCK_MAIN) + || (proc->flags & F_OFF_HEAP_MSGQ)); + + if (msgp->data.dist_ext->heap_size >= 0) + need = msgp->data.dist_ext->heap_size; + else { + need = erts_decode_dist_ext_size(msgp->data.dist_ext); + if (need < 0) { + /* bad msg; remove it... */ + if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { + bp = erts_dist_ext_trailer(msgp->data.dist_ext); + erts_cleanup_offheap(&bp->off_heap); + } + erts_free_dist_ext_copy(msgp->data.dist_ext); + msgp->data.dist_ext = NULL; + return 0; + } + + msgp->data.dist_ext->heap_size = need; + } + + if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { + bp = erts_dist_ext_trailer(msgp->data.dist_ext); + need += bp->used_size; + } + + if (decode_in_heap_frag) + erts_factory_heap_frag_init(&factory, new_message_buffer(need)); + else + erts_factory_proc_prealloc_init(&factory, proc, need); + + ASSERT(msgp->data.dist_ext->heap_size >= 0); + if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { + ErlHeapFragment *heap_frag; + heap_frag = erts_dist_ext_trailer(msgp->data.dist_ext); + ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp), + heap_frag->used_size, + &factory.hp, + factory.off_heap); + erts_cleanup_offheap(&heap_frag->off_heap); + } + + msg = erts_decode_dist_ext(&factory, msgp->data.dist_ext); + ERL_MESSAGE_TERM(msgp) = msg; + erts_free_dist_ext_copy(msgp->data.dist_ext); + msgp->data.attached = NULL; + + if (is_non_value(msg)) { + erts_factory_undo(&factory); + return 0; + } + + erts_factory_trim_and_close(&factory, msgp->m, + ERL_MESSAGE_REF_ARRAY_SZ); + + ASSERT(!msgp->data.heap_frag); + + if (decode_in_heap_frag) + msgp->data.heap_frag = factory.heap_frags; + + return 1; +} + +/* + * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0 will move off heap messages + * into the heap of the inspected process if off_heap_message_queue + * is false when process_info(_, messages) is called. That is, the + * following GC will have more data in the rootset compared to the + * scenario when process_info(_, messages) had not been called. + * + * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS != 0 will keep off heap messages + * off heap when process_info(_, messages) is called regardless of + * the off_heap_message_queue setting of the process. That is, it + * will change the following execution of the process as little as + * possible. + */ +#define ERTS_INSPECT_MSGQ_KEEP_OH_MSGS 1 + +Uint +erts_prep_msgq_for_inspection(Process *c_p, Process *rp, + ErtsProcLocks rp_locks, ErtsMessageInfo *mip) +{ + Uint tot_heap_size; + ErtsMessage* mp; + Sint i; + int self_on_heap; + + /* + * Prepare the message queue for inspection + * by process_info(). + * + * + * - Decode all messages on external format + * - Remove all corrupt dist messages from queue + * - Save pointer to, and heap size need of each + * message in the mip array. + * - Return total heap size need for all messages + * that needs to be copied. + * + * If ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0: + * - In case off heap messages is disabled and + * we are inspecting our own queue, move all + * off heap data into the heap. + */ + + self_on_heap = c_p == rp && !(c_p->flags & F_OFF_HEAP_MSGQ); + + tot_heap_size = 0; + i = 0; + mp = rp->msg.first; + while (mp) { + Eterm msg = ERL_MESSAGE_TERM(mp); + + mip[i].size = 0; + + if (is_non_value(msg)) { + /* Dist message on external format; decode it... */ + if (mp->data.attached) + erts_decode_dist_message(rp, rp_locks, mp, + ERTS_INSPECT_MSGQ_KEEP_OH_MSGS); + + msg = ERL_MESSAGE_TERM(mp); + + if (is_non_value(msg)) { + ErtsMessage **mpp; + ErtsMessage *bad_mp = mp; + /* + * Bad distribution message; remove + * it from the queue... + */ + ASSERT(!mp->data.attached); + + mpp = i == 0 ? &rp->msg.first : &mip[i-1].msgp->next; + + ASSERT(*mpp == bad_mp); + + erts_msgq_update_internal_pointers(&rp->msg, mpp, &bad_mp->next); + + mp = mp->next; + *mpp = mp; + rp->msg.len--; + bad_mp->next = NULL; + erts_cleanup_messages(bad_mp); + continue; + } + } + + ASSERT(is_value(msg)); + +#if ERTS_INSPECT_MSGQ_KEEP_OH_MSGS + if (is_not_immed(msg) && (!self_on_heap || mp->data.attached)) { + Uint sz = size_object(msg); + mip[i].size = sz; + tot_heap_size += sz; + } +#else + if (self_on_heap) { + if (mp->data.attached) { + ErtsMessage *tmp = NULL; + if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) { + erts_link_mbuf_to_proc(rp, mp->data.heap_frag); + mp->data.attached = NULL; + } + else { + /* + * Need to replace the message reference since + * we will get references to the message data + * from the heap... + */ + ErtsMessage **mpp; + tmp = erts_alloc_message(0, NULL); + sys_memcpy((void *) tmp->m, (void *) mp->m, + sizeof(Eterm)*ERL_MESSAGE_REF_ARRAY_SZ); + mpp = i == 0 ? &rp->msg.first : &mip[i-1].msgp->next; + erts_msgq_replace_msg_ref(&rp->msg, tmp, mpp); + erts_save_message_in_proc(rp, mp); + mp = tmp; + } + } + } + else if (is_not_immed(msg)) { + Uint sz = size_object(msg); + mip[i].size = sz; + tot_heap_size += sz; + } + +#endif + + mip[i].msgp = mp; + i++; + mp = mp->next; + } + + return tot_heap_size; +} + +void erts_factory_proc_init(ErtsHeapFactory* factory, + Process* p) +{ + erts_factory_proc_prealloc_init(factory, p, HEAP_LIMIT(p) - HEAP_TOP(p)); +} + +void erts_factory_proc_prealloc_init(ErtsHeapFactory* factory, + Process* p, + Sint size) +{ + ErlHeapFragment *bp = p->mbuf; + factory->mode = FACTORY_HALLOC; + factory->p = p; + factory->hp_start = HAlloc(p, size); + factory->hp = factory->hp_start; + factory->hp_end = factory->hp_start + size; + factory->off_heap = &p->off_heap; + factory->message = NULL; + factory->off_heap_saved.first = p->off_heap.first; + factory->off_heap_saved.overhead = p->off_heap.overhead; + factory->heap_frags_saved = bp; + factory->heap_frags_saved_used = bp ? bp->used_size : 0; + factory->heap_frags = NULL; /* not used */ + factory->alloc_type = 0; /* not used */ +} + +void erts_factory_heap_frag_init(ErtsHeapFactory* factory, + ErlHeapFragment* bp) +{ + factory->mode = FACTORY_HEAP_FRAGS; + factory->p = NULL; + factory->hp_start = bp->mem; + factory->hp = bp->mem; + factory->hp_end = bp->mem + bp->alloc_size; + factory->off_heap = &bp->off_heap; + factory->message = NULL; + factory->heap_frags = bp; + factory->heap_frags_saved = NULL; + factory->heap_frags_saved_used = 0; + factory->alloc_type = ERTS_ALC_T_HEAP_FRAG; + ASSERT(!bp->next); + factory->off_heap_saved.first = factory->off_heap->first; + factory->off_heap_saved.overhead = factory->off_heap->overhead; + + ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end); +} + + +ErtsMessage * +erts_factory_message_create(ErtsHeapFactory* factory, + Process *proc, + ErtsProcLocks *proc_locksp, + Uint sz) +{ + Eterm *hp; + ErlOffHeap *ohp; + ErtsMessage *msgp; + int on_heap; + erts_aint32_t state; + + state = proc ? erts_smp_atomic32_read_nob(&proc->state) : 0; + + if (state & ERTS_PSFLG_OFF_HEAP_MSGQ) { + msgp = erts_alloc_message(sz, &hp); + ohp = sz == 0 ? NULL : &msgp->hfrag.off_heap; + on_heap = 0; + } + else { + msgp = erts_try_alloc_message_on_heap(proc, &state, + proc_locksp, + sz, &hp, &ohp, + &on_heap); + } + + if (on_heap) { + ERTS_SMP_ASSERT(*proc_locksp & ERTS_PROC_LOCK_MAIN); + ASSERT(ohp == &proc->off_heap); + factory->mode = FACTORY_HALLOC; + factory->p = proc; + factory->heap_frags_saved = proc->mbuf; + factory->heap_frags_saved_used = proc->mbuf ? proc->mbuf->used_size : 0; + } + else { + factory->mode = FACTORY_MESSAGE; + factory->p = NULL; + factory->heap_frags_saved = NULL; + factory->heap_frags_saved_used = 0; + + if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) { + ASSERT(!msgp->hfrag.next); + factory->heap_frags = NULL; + } + else { + ASSERT(!msgp->data.heap_frag + || !msgp->data.heap_frag->next); + factory->heap_frags = msgp->data.heap_frag; + } + } + factory->hp_start = hp; + factory->hp = hp; + factory->hp_end = hp + sz; + factory->message = msgp; + factory->off_heap = ohp; + factory->alloc_type = ERTS_ALC_T_HEAP_FRAG; + if (ohp) { + factory->off_heap_saved.first = ohp->first; + factory->off_heap_saved.overhead = ohp->overhead; + } + else { + factory->off_heap_saved.first = NULL; + factory->off_heap_saved.overhead = 0; + } + + ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end); + + return msgp; +} + +void erts_factory_selfcontained_message_init(ErtsHeapFactory* factory, + ErtsMessage *msgp, + Eterm *hp) +{ + ErlHeapFragment* bp; + if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) { + bp = &msgp->hfrag; + factory->heap_frags = NULL; + } + else { + bp = msgp->data.heap_frag; + factory->heap_frags = bp; + } + factory->mode = FACTORY_MESSAGE; + factory->p = NULL; + factory->hp_start = bp->mem; + factory->hp = hp; + factory->hp_end = bp->mem + bp->alloc_size; + factory->message = msgp; + factory->off_heap = &bp->off_heap; + factory->heap_frags_saved = NULL; + factory->heap_frags_saved_used = 0; + factory->alloc_type = ERTS_ALC_T_HEAP_FRAG; + ASSERT(!bp->next); + factory->off_heap_saved.first = factory->off_heap->first; + factory->off_heap_saved.overhead = factory->off_heap->overhead; + + ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end); +} + +/* One static sized heap that must suffice. + No extra heap fragments will be allocated. +*/ +void erts_factory_static_init(ErtsHeapFactory* factory, + Eterm* hp, + Uint size, + ErlOffHeap* off_heap) +{ + factory->mode = FACTORY_STATIC; + factory->hp_start = hp; + factory->hp = hp; + factory->hp_end = hp + size; + factory->off_heap = off_heap; + factory->off_heap_saved.first = factory->off_heap->first; + factory->off_heap_saved.overhead = factory->off_heap->overhead; +} + +/* A temporary heap with default buffer allocated/freed by client. + * factory_close is same as factory_undo + */ +void erts_factory_tmp_init(ErtsHeapFactory* factory, Eterm* hp, Uint size, + Uint32 atype) +{ + factory->mode = FACTORY_TMP; + factory->hp_start = hp; + factory->hp = hp; + factory->hp_end = hp + size; + factory->heap_frags = NULL; + factory->off_heap_saved.first = NULL; + factory->off_heap_saved.overhead = 0; + factory->off_heap = &factory->off_heap_saved; + factory->alloc_type = atype; +} + +/* When we know the term is an immediate and need no heap. +*/ +void erts_factory_dummy_init(ErtsHeapFactory* factory) +{ + factory->mode = FACTORY_CLOSED; +} + +static void reserve_heap(ErtsHeapFactory*, Uint need, Uint xtra); + +Eterm* erts_produce_heap(ErtsHeapFactory* factory, Uint need, Uint xtra) +{ + Eterm* res; + + ASSERT((unsigned int)factory->mode > (unsigned int)FACTORY_CLOSED); + if (factory->hp + need > factory->hp_end) { + reserve_heap(factory, need, xtra); + } + res = factory->hp; + factory->hp += need; + return res; +} + +Eterm* erts_reserve_heap(ErtsHeapFactory* factory, Uint need) +{ + ASSERT((unsigned int)factory->mode > (unsigned int)FACTORY_CLOSED); + if (factory->hp + need > factory->hp_end) { + reserve_heap(factory, need, 200); + } + return factory->hp; +} + +static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra) +{ + ErlHeapFragment* bp; + + switch (factory->mode) { + case FACTORY_HALLOC: + HRelease(factory->p, factory->hp_end, factory->hp); + factory->hp = HAllocX(factory->p, need, xtra); + factory->hp_end = factory->hp + need; + return; + + case FACTORY_MESSAGE: + if (!factory->heap_frags) { + ASSERT(factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG); + bp = &factory->message->hfrag; + } + else { + /* Fall through */ + case FACTORY_HEAP_FRAGS: + case FACTORY_TMP: + bp = factory->heap_frags; + } + + if (bp) { + ASSERT(factory->hp > bp->mem); + ASSERT(factory->hp <= factory->hp_end); + ASSERT(factory->hp_end == bp->mem + bp->alloc_size); + + bp->used_size = factory->hp - bp->mem; + } + bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(factory->alloc_type, + ERTS_HEAP_FRAG_SIZE(need+xtra)); + bp->next = factory->heap_frags; + factory->heap_frags = bp; + bp->alloc_size = need + xtra; + bp->used_size = need; + bp->off_heap.first = NULL; + bp->off_heap.overhead = 0; + + factory->hp = bp->mem; + factory->hp_end = bp->mem + bp->alloc_size; + return; + + case FACTORY_STATIC: + case FACTORY_CLOSED: + default: + ASSERT(!"Invalid factory mode"); + } +} + +void erts_factory_close(ErtsHeapFactory* factory) +{ + ErlHeapFragment* bp; + + switch (factory->mode) { + case FACTORY_HALLOC: + HRelease(factory->p, factory->hp_end, factory->hp); + break; + + case FACTORY_MESSAGE: + if (!factory->heap_frags) { + if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG) + bp = &factory->message->hfrag; + else + bp = NULL; + } + else { + if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG) + factory->message->hfrag.next = factory->heap_frags; + else + factory->message->data.heap_frag = factory->heap_frags; + + /* Fall through */ + case FACTORY_HEAP_FRAGS: + bp = factory->heap_frags; + } + + if (bp) { + ASSERT(factory->hp >= bp->mem); + ASSERT(factory->hp <= factory->hp_end); + ASSERT(factory->hp_end == bp->mem + bp->alloc_size); + + bp->used_size = factory->hp - bp->mem; + } + break; + case FACTORY_TMP: + erts_factory_undo(factory); + break; + case FACTORY_STATIC: break; + case FACTORY_CLOSED: break; + default: + ASSERT(!"Invalid factory mode"); + } + factory->mode = FACTORY_CLOSED; +} + +void erts_factory_trim_and_close(ErtsHeapFactory* factory, + Eterm *brefs, Uint brefs_size) +{ + ErlHeapFragment *bp; + + switch (factory->mode) { + case FACTORY_MESSAGE: { + ErtsMessage *mp = factory->message; + if (mp->data.attached == ERTS_MSG_COMBINED_HFRAG) { + if (!factory->heap_frags) { + Uint sz = factory->hp - factory->hp_start; + mp = erts_shrink_message(mp, sz, brefs, brefs_size); + factory->message = mp; + factory->mode = FACTORY_CLOSED; + return; + } + /*else we don't trim multi fragmented messages for now (off_heap...) */ + break; + } + /* Fall through... */ + } + case FACTORY_HEAP_FRAGS: + bp = factory->heap_frags; + if (!bp) + break; + if (bp->next == NULL) { + Uint used_sz = factory->hp - bp->mem; + ASSERT(used_sz <= bp->alloc_size); + if (used_sz > 0) + bp = erts_resize_message_buffer(bp, used_sz, + brefs, brefs_size); + else { + free_message_buffer(bp); + bp = NULL; + } + factory->heap_frags = bp; + if (factory->mode == FACTORY_MESSAGE) + factory->message->data.heap_frag = bp; + factory->mode = FACTORY_CLOSED; + return; + } + /*else we don't trim multi fragmented messages for now (off_heap...) */ + default: + break; + } + erts_factory_close(factory); +} + +void erts_factory_undo(ErtsHeapFactory* factory) +{ + ErlHeapFragment* bp; + struct erl_off_heap_header *hdr, **hdr_nextp; + + switch (factory->mode) { + case FACTORY_HALLOC: + case FACTORY_STATIC: + /* Cleanup off-heap + */ + hdr_nextp = NULL; + for (hdr = factory->off_heap->first; + hdr != factory->off_heap_saved.first; + hdr = hdr->next) { + + hdr_nextp = &hdr->next; + } + + if (hdr_nextp != NULL) { + *hdr_nextp = NULL; + erts_cleanup_offheap(factory->off_heap); + factory->off_heap->first = factory->off_heap_saved.first; + factory->off_heap->overhead = factory->off_heap_saved.overhead; + } + + if (factory->mode == FACTORY_HALLOC) { + /* Free heap frags + */ + bp = factory->p->mbuf; + if (bp != factory->heap_frags_saved) { + do { + ErlHeapFragment *next_bp = bp->next; + ASSERT(bp->off_heap.first == NULL); + ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, (void *) bp, + ERTS_HEAP_FRAG_SIZE(bp->alloc_size)); + bp = next_bp; + } while (bp != factory->heap_frags_saved); + + factory->p->mbuf = bp; + } + + /* Rollback heap top + */ + + if (HEAP_START(factory->p) <= factory->hp_start + && factory->hp_start <= HEAP_LIMIT(factory->p)) { + HEAP_TOP(factory->p) = factory->hp_start; + } + + /* Fix last heap frag */ + if (factory->heap_frags_saved) { + ASSERT(factory->heap_frags_saved == factory->p->mbuf); + if (factory->hp_start != factory->heap_frags_saved->mem) + factory->heap_frags_saved->used_size = factory->heap_frags_saved_used; + else { + factory->p->mbuf = factory->p->mbuf->next; + ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, factory->heap_frags_saved, + ERTS_HEAP_FRAG_SIZE(factory->heap_frags_saved->alloc_size)); + } + } + } + break; + + case FACTORY_MESSAGE: + if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG) + factory->message->hfrag.next = factory->heap_frags; + else + factory->message->data.heap_frag = factory->heap_frags; + erts_cleanup_messages(factory->message); + break; + case FACTORY_TMP: + case FACTORY_HEAP_FRAGS: + erts_cleanup_offheap(factory->off_heap); + factory->off_heap->first = NULL; + + bp = factory->heap_frags; + while (bp != NULL) { + ErlHeapFragment* next_bp = bp->next; + + ASSERT(bp->off_heap.first == NULL); + ERTS_HEAP_FREE(factory->alloc_type, (void *) bp, + ERTS_HEAP_FRAG_SIZE(bp->alloc_size)); + bp = next_bp; + } + break; + + case FACTORY_CLOSED: break; + default: + ASSERT(!"Invalid factory mode"); + } + factory->mode = FACTORY_CLOSED; +#ifdef DEBUG + factory->p = NULL; + factory->hp = NULL; + factory->heap_frags = NULL; +#endif +} |