From b21b604137c5cb5f5039a40994e429871e5b707b Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Wed, 26 Aug 2015 19:47:10 +0200 Subject: Introduce literal tag --- erts/emulator/beam/erl_message.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'erts/emulator/beam/erl_message.c') diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index ef52823287..e23c79d301 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -642,13 +642,13 @@ erts_move_msg_mbuf_to_heap(Eterm** hpp, ErlOffHeap* off_heap, ErlMessage *msg) #endif if (bp->next != NULL) { - move_multi_frags(hpp, off_heap, bp, msg->m, + erts_move_multi_frags(hpp, off_heap, bp, msg->m, #ifdef USE_VM_PROBES - 3 + 3, #else - 2 + 2, #endif - ); + 0); goto copy_done; } -- cgit v1.2.3 From 3ac08f9b668613a4292436979eacc61863c2ab94 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Wed, 16 Sep 2015 15:02:50 +0200 Subject: Fragmented young heap generation and off_heap_message_queue option * The youngest generation of the heap can now consist of multiple blocks. Heap fragments and message fragments are added to the youngest generation when needed without triggering a GC. After a GC the youngest generation is contained in one single block. * The off_heap_message_queue process flag has been added. When enabled all message data in the queue is kept off heap. When a message is selected from the queue, the message fragment (or heap fragment) containing the actual message is attached to the youngest generation. Messages stored off heap is not part of GC. --- erts/emulator/beam/erl_message.c | 1509 +++++++++++++++++++++++--------------- 1 file changed, 920 insertions(+), 589 deletions(-) (limited to 'erts/emulator/beam/erl_message.c') diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index e23c79d301..79739501a8 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -33,8 +33,8 @@ #include "erl_binary.h" #include "dtrace-wrapper.h" -ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message, - ErlMessage, +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message_ref, + ErtsMessageRef, ERL_MESSAGE_BUF_SZ, ERTS_ALC_T_MSG_REF) @@ -44,27 +44,20 @@ ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message, #undef HARD_DEBUG #endif - - - -#ifdef DEBUG -static ERTS_INLINE int in_heapfrag(const Eterm* ptr, const ErlHeapFragment *bp) +void +init_message(void) { - return ((unsigned)(ptr - bp->mem) < bp->used_size); + init_message_ref_alloc(); } -#endif - -void -init_message(void) +void *erts_alloc_message_ref(void) { - init_message_alloc(); + return (void *) message_ref_alloc(); } -void -free_message(ErlMessage* mp) +void erts_free_message_ref(void *mp) { - message_free(mp); + message_ref_free((ErtsMessageRef *) mp); } /* Allocate message buffer (size in words) */ @@ -74,7 +67,7 @@ new_message_buffer(Uint size) ErlHeapFragment* bp; bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP_FRAG, ERTS_HEAP_FRAG_SIZE(size)); - ERTS_INIT_HEAP_FRAG(bp, size); + ERTS_INIT_HEAP_FRAG(bp, size, size); return bp; } @@ -203,83 +196,87 @@ free_message_buffer(ErlHeapFragment* bp) }while (bp != NULL); } -static ERTS_INLINE void -link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp) +void +erts_cleanup_messages(ErtsMessage *msgp) { - if (bp) { - /* Link the message buffer */ - bp->next = MBUF(proc); - MBUF(proc) = bp; - MBUF_SIZE(proc) += bp->used_size; - FLAGS(proc) |= F_FORCE_GC; - - /* Move any off_heap's into the process */ - if (bp->off_heap.first != NULL) { - struct erl_off_heap_header** next_p = &bp->off_heap.first; - while (*next_p != NULL) { - next_p = &((*next_p)->next); + ErtsMessage *mp = msgp; + while (mp) { + ErtsMessage *fmp; + ErlHeapFragment *bp; + if (is_non_value(ERL_MESSAGE_TERM(mp))) { + if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) { + bp = (ErlHeapFragment *) mp->data.dist_ext->ext_endp; + erts_cleanup_offheap(&bp->off_heap); } - *next_p = MSO(proc).first; - MSO(proc).first = bp->off_heap.first; - bp->off_heap.first = NULL; - OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead); + if (mp->data.dist_ext) + erts_free_dist_ext_copy(mp->data.dist_ext); } + else { + if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) + bp = mp->data.heap_frag; + else { + bp = mp->hfrag.next; + erts_cleanup_offheap(&mp->hfrag.off_heap); + } + if (bp) + free_message_buffer(bp); + } + fmp = mp; + mp = mp->next; + erts_free_message(fmp); } } -Eterm -erts_msg_distext2heap(Process *pp, - ErtsProcLocks *plcksp, - ErlHeapFragment **bpp, - Eterm *tokenp, - ErtsDistExternal *dist_extp) +ErtsMessage * +erts_realloc_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_size) { - Eterm msg; - Uint tok_sz = 0; - Eterm *hp = NULL; - ErtsHeapFactory factory; - Sint sz; - - *bpp = NULL; - sz = erts_decode_dist_ext_size(dist_extp); - if (sz < 0) - goto decode_error; - if (is_not_nil(*tokenp)) { - ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp); - tok_sz = heap_frag->used_size; - sz += tok_sz; - } - if (pp) { - ErlOffHeap *ohp; - hp = erts_alloc_message_heap(sz, bpp, &ohp, pp, plcksp); - } - else { - *bpp = new_message_buffer(sz); - hp = (*bpp)->mem; - } - erts_factory_message_init(&factory, pp, hp, *bpp); - msg = erts_decode_dist_ext(&factory, dist_extp); - if (is_non_value(msg)) - goto decode_error; - if (is_not_nil(*tokenp)) { - ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp); - hp = erts_produce_heap(&factory, tok_sz, 0); - *tokenp = copy_struct(*tokenp, tok_sz, &hp, factory.off_heap); - erts_cleanup_offheap(&heap_frag->off_heap); + ErtsMessage *nmp = erts_realloc(ERTS_ALC_T_MSG, mp, + sizeof(ErtsMessage) + (sz - 1)*sizeof(Eterm)); + if (nmp != mp) { + Eterm *sp = &mp->hfrag.mem[0]; + Eterm *ep = sp + sz; + Sint offs = &nmp->hfrag.mem[0] - sp; + erts_offset_off_heap(&nmp->hfrag.off_heap, offs, sp, ep); + erts_offset_heap(&nmp->hfrag.mem[0], sz, offs, sp, ep); + if (brefs && brefs_size) + erts_offset_heap_ptr(brefs, brefs_size, offs, sp, ep); } - erts_free_dist_ext_copy(dist_extp); - erts_factory_close(&factory); - return msg; - decode_error: - if (is_not_nil(*tokenp)) { - ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp); - erts_cleanup_offheap(&heap_frag->off_heap); + nmp->hfrag.used_size = sz; + nmp->hfrag.alloc_size = sz; + + return nmp; +} + +void +erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp) +{ + if (first_bp) { + ErlHeapFragment *bp = first_bp; + + while (1) { + /* Move any off_heap's into the process */ + if (bp->off_heap.first != NULL) { + struct erl_off_heap_header** next_p = &bp->off_heap.first; + while (*next_p != NULL) { + next_p = &((*next_p)->next); + } + *next_p = MSO(proc).first; + MSO(proc).first = bp->off_heap.first; + bp->off_heap.first = NULL; + OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead); + } + MBUF_SIZE(proc) += bp->used_size; + if (!bp->next) + break; + bp = bp->next; + } + + /* Link the message buffer */ + bp->next = MBUF(proc); + MBUF(proc) = first_bp; } - erts_free_dist_ext_copy(dist_extp); - *bpp = NULL; - return THE_NON_VALUE; - } +} void erts_queue_dist_message(Process *rcvr, @@ -287,7 +284,7 @@ erts_queue_dist_message(Process *rcvr, ErtsDistExternal *dist_ext, Eterm token) { - ErlMessage* mp; + ErtsMessage* mp; #ifdef USE_VM_PROBES Sint tok_label = 0; Sint tok_lastcnt = 0; @@ -299,7 +296,17 @@ erts_queue_dist_message(Process *rcvr, ERTS_SMP_LC_ASSERT(*rcvr_locks == erts_proc_lc_my_proc_locks(rcvr)); - mp = message_alloc(); + mp = erts_alloc_message(0, NULL); + mp->data.dist_ext = dist_ext; + + ERL_MESSAGE_TERM(mp) = THE_NON_VALUE; +#ifdef USE_VM_PROBES + ERL_MESSAGE_DT_UTAG(mp) = NIL; + if (token == am_have_dt_utag) + ERL_MESSAGE_TOKEN(mp) = NIL; + else +#endif + ERL_MESSAGE_TOKEN(mp) = token; #ifdef ERTS_SMP if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) { @@ -318,58 +325,40 @@ erts_queue_dist_message(Process *rcvr, if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); /* Drop message if receiver is exiting or has a pending exit ... */ - if (is_not_nil(token)) { - ErlHeapFragment *heap_frag; - heap_frag = erts_dist_ext_trailer(mp->data.dist_ext); - erts_cleanup_offheap(&heap_frag->off_heap); - } - erts_free_dist_ext_copy(dist_ext); - message_free(mp); + erts_cleanup_messages(mp); } else #endif if (IS_TRACED_FL(rcvr, F_TRACE_RECEIVE)) { /* Ahh... need to decode it in order to trace it... */ - ErlHeapFragment *mbuf; - Eterm msg; if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); - message_free(mp); - msg = erts_msg_distext2heap(rcvr, rcvr_locks, &mbuf, &token, dist_ext); - if (is_value(msg)) + if (!erts_decode_dist_message(rcvr, *rcvr_locks, mp, 0)) + erts_free_message(mp); + else { + Eterm msg = ERL_MESSAGE_TERM(mp); + token = ERL_MESSAGE_TOKEN(mp); #ifdef USE_VM_PROBES - if (DTRACE_ENABLED(message_queued)) { - DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); - - dtrace_proc_str(rcvr, receiver_name); - if (token != NIL && token != am_have_dt_utag) { - tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); - tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); - tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); - } - DTRACE6(message_queued, - receiver_name, size_object(msg), rcvr->msg.len, - tok_label, tok_lastcnt, tok_serial); - } + if (DTRACE_ENABLED(message_queued)) { + DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); + + dtrace_proc_str(rcvr, receiver_name); + if (token != NIL && token != am_have_dt_utag) { + tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); + tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); + tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); + } + DTRACE6(message_queued, + receiver_name, size_object(msg), rcvr->msg.len, + tok_label, tok_lastcnt, tok_serial); + } #endif - erts_queue_message(rcvr, rcvr_locks, mbuf, msg, token); + erts_queue_message(rcvr, rcvr_locks, mp, msg, token); + } } else { /* Enqueue message on external format */ - ERL_MESSAGE_TERM(mp) = THE_NON_VALUE; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = NIL; - if (token == am_have_dt_utag) { - ERL_MESSAGE_TOKEN(mp) = NIL; - } else { -#endif - ERL_MESSAGE_TOKEN(mp) = token; -#ifdef USE_VM_PROBES - } -#endif - mp->next = NULL; - #ifdef USE_VM_PROBES if (DTRACE_ENABLED(message_queued)) { DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); @@ -388,7 +377,7 @@ erts_queue_dist_message(Process *rcvr, tok_label, tok_lastcnt, tok_serial); } #endif - mp->data.dist_ext = dist_ext; + LINK_MESSAGE(rcvr, mp); if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) @@ -408,9 +397,9 @@ erts_queue_dist_message(Process *rcvr, static Sint queue_message(Process *c_p, Process* receiver, - ErtsProcLocks *receiver_locks, erts_aint32_t *receiver_state, - ErlHeapFragment* bp, + ErtsProcLocks *receiver_locks, + ErtsMessage* mp, Eterm message, Eterm seq_trace_token #ifdef USE_VM_PROBES @@ -419,31 +408,24 @@ queue_message(Process *c_p, ) { Sint res; - ErlMessage* mp; int locked_msgq = 0; - erts_aint_t state; - -#ifndef ERTS_SMP - ASSERT(bp != NULL || receiver->mbuf == NULL); -#endif + erts_aint32_t state; ERTS_SMP_LC_ASSERT(*receiver_locks == erts_proc_lc_my_proc_locks(receiver)); - mp = message_alloc(); - - if (receiver_state) - state = *receiver_state; - else - state = erts_smp_atomic32_read_acqb(&receiver->state); - #ifdef ERTS_SMP - if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) - goto exiting; - if (!(*receiver_locks & ERTS_PROC_LOCK_MSGQ)) { if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) { ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ; + + if (receiver_state) + state = *receiver_state; + else + state = erts_smp_atomic32_read_nob(&receiver->state); + if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) + goto exiting; + if (*receiver_locks & ERTS_PROC_LOCK_STATUS) { erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS); need_locks |= ERTS_PROC_LOCK_STATUS; @@ -451,13 +433,12 @@ queue_message(Process *c_p, erts_smp_proc_lock(receiver, need_locks); } locked_msgq = 1; - state = erts_smp_atomic32_read_nob(&receiver->state); - if (receiver_state) - *receiver_state = state; } #endif + state = erts_smp_atomic32_read_nob(&receiver->state); + if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) { #ifdef ERTS_SMP exiting: @@ -465,9 +446,7 @@ queue_message(Process *c_p, /* Drop message if receiver is exiting or has a pending exit... */ if (locked_msgq) erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ); - if (bp) - free_message_buffer(bp); - message_free(mp); + erts_cleanup_messages(mp); return 0; } @@ -476,13 +455,9 @@ queue_message(Process *c_p, #ifdef USE_VM_PROBES ERL_MESSAGE_DT_UTAG(mp) = dt_utag; #endif - mp->next = NULL; - mp->data.heap_frag = bp; -#ifndef ERTS_SMP res = receiver->msg.len; -#else - res = receiver->msg_inq.len; +#ifdef ERTS_SMP if (*receiver_locks & ERTS_PROC_LOCK_MAIN) { /* * We move 'in queue' to 'private queue' and place @@ -492,7 +467,7 @@ queue_message(Process *c_p, * we don't need to include the 'in queue' in * the root set when garbage collecting. */ - res += receiver->msg.len; + res += receiver->msg_inq.len; ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); LINK_MESSAGE_PRIVQ(receiver, mp); } @@ -544,19 +519,19 @@ queue_message(Process *c_p, void #ifdef USE_VM_PROBES erts_queue_message_probe(Process* receiver, ErtsProcLocks *receiver_locks, - ErlHeapFragment* bp, + ErtsMessage* mp, Eterm message, Eterm seq_trace_token, Eterm dt_utag) #else erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks, - ErlHeapFragment* bp, + ErtsMessage* mp, Eterm message, Eterm seq_trace_token) #endif { queue_message(NULL, receiver, - receiver_locks, NULL, - bp, + receiver_locks, + mp, message, seq_trace_token #ifdef USE_VM_PROBES @@ -565,246 +540,8 @@ erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks, ); } -void -erts_link_mbuf_to_proc(struct process *proc, ErlHeapFragment *bp) -{ - Eterm* htop = HEAP_TOP(proc); - - link_mbuf_to_proc(proc, bp); - if (htop < HEAP_LIMIT(proc)) { - *htop = make_pos_bignum_header(HEAP_LIMIT(proc)-htop-1); - HEAP_TOP(proc) = HEAP_LIMIT(proc); - } -} - -/* - * Moves content of message buffer attached to a message into a heap. - * The message buffer is deallocated. - */ -void -erts_move_msg_mbuf_to_heap(Eterm** hpp, ErlOffHeap* off_heap, ErlMessage *msg) -{ - struct erl_off_heap_header* oh; - Eterm term, token, *fhp, *hp; - Sint offs; - Uint sz; - ErlHeapFragment *bp; -#ifdef USE_VM_PROBES - Eterm utag; -#endif - -#ifdef HARD_DEBUG - struct erl_off_heap_header* dbg_oh_start = off_heap->first; - Eterm dbg_term, dbg_token; - ErlHeapFragment *dbg_bp; - Uint *dbg_hp, *dbg_thp_start; - Uint dbg_term_sz, dbg_token_sz; -#ifdef USE_VM_PROBES - Eterm dbg_utag; - Uint dbg_utag_sz; -#endif -#endif - - bp = msg->data.heap_frag; - term = ERL_MESSAGE_TERM(msg); - token = ERL_MESSAGE_TOKEN(msg); -#ifdef USE_VM_PROBES - utag = ERL_MESSAGE_DT_UTAG(msg); -#endif - if (!bp) { -#ifdef USE_VM_PROBES - ASSERT(is_immed(term) && is_immed(token) && is_immed(utag)); -#else - ASSERT(is_immed(term) && is_immed(token)); -#endif - return; - } - -#ifdef HARD_DEBUG - dbg_term_sz = size_object(term); - dbg_token_sz = size_object(token); - dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz); -#ifdef USE_VM_PROBES - dbg_utag_sz = size_object(utag); - dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz + dbg_utag_sz ); -#endif - /*ASSERT(dbg_term_sz + dbg_token_sz == erts_msg_used_frag_sz(msg)); - Copied size may be smaller due to removed SubBins's or garbage. - Copied size may be larger due to duplicated shared terms. - */ - dbg_hp = dbg_bp->mem; - dbg_term = copy_struct(term, dbg_term_sz, &dbg_hp, &dbg_bp->off_heap); - dbg_token = copy_struct(token, dbg_token_sz, &dbg_hp, &dbg_bp->off_heap); -#ifdef USE_VM_PROBES - dbg_utag = copy_struct(utag, dbg_utag_sz, &dbg_hp, &dbg_bp->off_heap); -#endif - dbg_thp_start = *hpp; -#endif - - if (bp->next != NULL) { - erts_move_multi_frags(hpp, off_heap, bp, msg->m, -#ifdef USE_VM_PROBES - 3, -#else - 2, -#endif - 0); - goto copy_done; - } - - OH_OVERHEAD(off_heap, bp->off_heap.overhead); - sz = bp->used_size; - - ASSERT(is_immed(term) || in_heapfrag(ptr_val(term),bp)); - ASSERT(is_immed(token) || in_heapfrag(ptr_val(token),bp)); - - fhp = bp->mem; - hp = *hpp; - offs = hp - fhp; - - oh = NULL; - while (sz--) { - Uint cpy_sz; - Eterm val = *fhp++; - - switch (primary_tag(val)) { - case TAG_PRIMARY_IMMED1: - *hp++ = val; - break; - case TAG_PRIMARY_LIST: - case TAG_PRIMARY_BOXED: - ASSERT(in_heapfrag(ptr_val(val), bp)); - *hp++ = offset_ptr(val, offs); - break; - case TAG_PRIMARY_HEADER: - *hp++ = val; - switch (val & _HEADER_SUBTAG_MASK) { - case ARITYVAL_SUBTAG: - break; - case REFC_BINARY_SUBTAG: - case FUN_SUBTAG: - case EXTERNAL_PID_SUBTAG: - case EXTERNAL_PORT_SUBTAG: - case EXTERNAL_REF_SUBTAG: - oh = (struct erl_off_heap_header*) (hp-1); - cpy_sz = thing_arityval(val); - goto cpy_words; - default: - cpy_sz = header_arity(val); - - cpy_words: - ASSERT(sz >= cpy_sz); - sz -= cpy_sz; - while (cpy_sz >= 8) { - cpy_sz -= 8; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - *hp++ = *fhp++; - } - switch (cpy_sz) { - case 7: *hp++ = *fhp++; - case 6: *hp++ = *fhp++; - case 5: *hp++ = *fhp++; - case 4: *hp++ = *fhp++; - case 3: *hp++ = *fhp++; - case 2: *hp++ = *fhp++; - case 1: *hp++ = *fhp++; - default: break; - } - if (oh) { - /* Add to offheap list */ - oh->next = off_heap->first; - off_heap->first = oh; - ASSERT(*hpp <= (Eterm*)oh); - ASSERT(hp > (Eterm*)oh); - oh = NULL; - } - break; - } - break; - } - } - - ASSERT(bp->used_size == hp - *hpp); - *hpp = hp; - - if (is_not_immed(token)) { - ASSERT(in_heapfrag(ptr_val(token), bp)); - ERL_MESSAGE_TOKEN(msg) = offset_ptr(token, offs); -#ifdef HARD_DEBUG - ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TOKEN(msg))); - ASSERT(hp > ptr_val(ERL_MESSAGE_TOKEN(msg))); -#endif - } - - if (is_not_immed(term)) { - ASSERT(in_heapfrag(ptr_val(term),bp)); - ERL_MESSAGE_TERM(msg) = offset_ptr(term, offs); -#ifdef HARD_DEBUG - ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TERM(msg))); - ASSERT(hp > ptr_val(ERL_MESSAGE_TERM(msg))); -#endif - } -#ifdef USE_VM_PROBES - if (is_not_immed(utag)) { - ASSERT(in_heapfrag(ptr_val(utag), bp)); - ERL_MESSAGE_DT_UTAG(msg) = offset_ptr(utag, offs); -#ifdef HARD_DEBUG - ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_DT_UTAG(msg))); - ASSERT(hp > ptr_val(ERL_MESSAGE_DT_UTAG(msg))); -#endif - } -#endif - -copy_done: - -#ifdef HARD_DEBUG - { - int i, j; - ErlHeapFragment* frag; - { - struct erl_off_heap_header* dbg_oh = off_heap->first; - i = j = 0; - while (dbg_oh != dbg_oh_start) { - dbg_oh = dbg_oh->next; - i++; - } - for (frag=bp; frag; frag=frag->next) { - dbg_oh = frag->off_heap.first; - while (dbg_oh) { - dbg_oh = dbg_oh->next; - j++; - } - } - ASSERT(i == j); - } - } -#endif - - - bp->off_heap.first = NULL; - free_message_buffer(bp); - msg->data.heap_frag = NULL; - -#ifdef HARD_DEBUG - ASSERT(eq(ERL_MESSAGE_TERM(msg), dbg_term)); - ASSERT(eq(ERL_MESSAGE_TOKEN(msg), dbg_token)); -#ifdef USE_VM_PROBES - ASSERT(eq(ERL_MESSAGE_DT_UTAG(msg), dbg_utag)); -#endif - free_message_buffer(dbg_bp); -#endif - -} - - Uint -erts_msg_attached_data_size_aux(ErlMessage *msg) +erts_msg_attached_data_size_aux(ErtsMessage *msg) { Sint sz; ASSERT(is_non_value(ERL_MESSAGE_TERM(msg))); @@ -833,29 +570,72 @@ erts_msg_attached_data_size_aux(ErlMessage *msg) return sz; } -void -erts_move_msg_attached_data_to_heap(ErtsHeapFactory* factory, - ErlMessage *msg) +ErtsMessage * +erts_try_alloc_message_on_heap(Process *pp, + erts_aint32_t *psp, + ErtsProcLocks *plp, + Uint sz, + Eterm **hpp, + ErlOffHeap **ohpp, + int *on_heap_p) { - if (is_value(ERL_MESSAGE_TERM(msg))) - erts_move_msg_mbuf_to_heap(&factory->hp, factory->off_heap, msg); - else if (msg->data.dist_ext) { - ASSERT(msg->data.dist_ext->heap_size >= 0); - if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) { - ErlHeapFragment *heap_frag; - heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); - ERL_MESSAGE_TOKEN(msg) = copy_struct(ERL_MESSAGE_TOKEN(msg), - heap_frag->used_size, - &factory->hp, - factory->off_heap); - erts_cleanup_offheap(&heap_frag->off_heap); +#ifdef ERTS_SMP + int locked_main = 0; +#endif + ErtsMessage *mp; + + ASSERT(!(*psp & ERTS_PSFLG_OFF_HEAP_MSGQ)); + + if ( +#if defined(ERTS_SMP) + *plp & ERTS_PROC_LOCK_MAIN +#else + 1 +#endif + ) { +#ifdef ERTS_SMP + try_on_heap: +#endif + if ((*psp & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) + || (pp->flags & F_DISABLE_GC) + || HEAP_LIMIT(pp) - HEAP_TOP(pp) <= sz) { + /* + * The heap is either potentially in an inconsistent + * state, or not large enough. + */ +#ifdef ERTS_SMP + if (locked_main) { + *plp &= ~ERTS_PROC_LOCK_MAIN; + erts_smp_proc_unlock(pp, ERTS_PROC_LOCK_MAIN); + } +#endif + goto in_message_fragment; } - ERL_MESSAGE_TERM(msg) = erts_decode_dist_ext(factory, - msg->data.dist_ext); - erts_free_dist_ext_copy(msg->data.dist_ext); - msg->data.dist_ext = NULL; + + *hpp = HEAP_TOP(pp); + HEAP_TOP(pp) = *hpp + sz; + *ohpp = &MSO(pp); + mp = erts_alloc_message(0, NULL); + mp->data.attached = NULL; + *on_heap_p = !0; + } +#ifdef ERTS_SMP + else if (erts_smp_proc_trylock(pp, ERTS_PROC_LOCK_MAIN) == 0) { + locked_main = 1; + *psp = erts_smp_atomic32_read_nob(&pp->state); + *plp |= ERTS_PROC_LOCK_MAIN; + goto try_on_heap; + } +#endif + else { + in_message_fragment: + + mp = erts_alloc_message(sz, hpp); + *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; + *on_heap_p = 0; } - /* else: bad external detected when calculating size */ + + return mp; } /* @@ -870,7 +650,8 @@ erts_send_message(Process* sender, unsigned flags) { Uint msize; - ErlHeapFragment* bp = NULL; + ErtsMessage* mp; + ErlOffHeap *ohp; Eterm token = NIL; Sint res = 0; #ifdef USE_VM_PROBES @@ -879,27 +660,31 @@ erts_send_message(Process* sender, Sint tok_label = 0; Sint tok_lastcnt = 0; Sint tok_serial = 0; + Eterm utag = NIL; #endif + erts_aint32_t receiver_state; BM_STOP_TIMER(system); BM_MESSAGE(message,sender,receiver); BM_START_TIMER(send); #ifdef USE_VM_PROBES *sender_name = *receiver_name = '\0'; - if (DTRACE_ENABLED(message_send)) { + if (DTRACE_ENABLED(message_send)) { erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), "%T", sender->common.id); erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)), "%T", receiver->common.id); } #endif + + receiver_state = erts_smp_atomic32_read_nob(&receiver->state); + if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) { Eterm* hp; Eterm stoken = SEQ_TRACE_TOKEN(sender); Uint seq_trace_size = 0; #ifdef USE_VM_PROBES Uint dt_utag_size = 0; - Eterm utag = NIL; #endif BM_SWAP_TIMER(send,size); @@ -923,23 +708,32 @@ erts_send_message(Process* sender, } #endif - bp = new_message_buffer(msize + seq_trace_size + mp = erts_alloc_message_heap_state(receiver, + &receiver_state, + receiver_locks, + (msize #ifdef USE_VM_PROBES - + dt_utag_size + + dt_utag_size #endif - ); - hp = bp->mem; + + seq_trace_size), + &hp, + &ohp); BM_SWAP_TIMER(send,copy); - token = copy_struct(stoken, - seq_trace_size, - &hp, - &bp->off_heap); + if (is_immed(stoken)) + token = stoken; + else + token = copy_struct(stoken, seq_trace_size, &hp, ohp); + + if (is_not_immed(message)) + message = copy_struct(message, msize, &hp, ohp); - message = copy_struct(message, msize, &hp, &bp->off_heap); #ifdef USE_VM_PROBES if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) { - utag = copy_struct(DT_UTAG(sender), dt_utag_size, &hp, &bp->off_heap); + if (is_immed(DT_UTAG(sender))) + utag = DT_UTAG(sender); + else + utag = copy_struct(DT_UTAG(sender), dt_utag_size, ohp); #ifdef DTRACE_TAG_HARDDEBUG erts_fprintf(stderr, "Dtrace -> (%T) Spreading tag (%T) with " @@ -961,101 +755,49 @@ erts_send_message(Process* sender, msize, tok_label, tok_lastcnt, tok_serial); } #endif - res = queue_message(NULL, - receiver, - receiver_locks, - NULL, - bp, - message, - token -#ifdef USE_VM_PROBES - , utag -#endif - ); - BM_SWAP_TIMER(send,system); - } else if (sender == receiver) { - /* Drop message if receiver has a pending exit ... */ -#ifdef ERTS_SMP - ErtsProcLocks need_locks = (~(*receiver_locks) - & (ERTS_PROC_LOCK_MSGQ - | ERTS_PROC_LOCK_STATUS)); - if (need_locks) { - *receiver_locks |= need_locks; - if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) { - if (need_locks == ERTS_PROC_LOCK_MSGQ) { - erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS); - need_locks = ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS; - } - erts_smp_proc_lock(receiver, need_locks); - } - } - if (!ERTS_PROC_PENDING_EXIT(receiver)) -#endif - { - ErlMessage* mp = message_alloc(); - - DTRACE6(message_send, sender_name, receiver_name, - size_object(message), tok_label, tok_lastcnt, tok_serial); - mp->data.attached = NULL; - ERL_MESSAGE_TERM(mp) = message; - ERL_MESSAGE_TOKEN(mp) = NIL; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = NIL; -#endif - mp->next = NULL; - /* - * We move 'in queue' to 'private queue' and place - * message at the end of 'private queue' in order - * to ensure that the 'in queue' doesn't contain - * references into the heap. By ensuring this, - * we don't need to include the 'in queue' in - * the root set when garbage collecting. - */ - - ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); - LINK_MESSAGE_PRIVQ(receiver, mp); - - res = receiver->msg.len; - - if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { - trace_receive(receiver, message); - } - } - BM_SWAP_TIMER(send,system); } else { - ErlOffHeap *ohp; Eterm *hp; - erts_aint32_t state; - BM_SWAP_TIMER(send,size); - msize = size_object(message); - BM_SWAP_TIMER(size,send); - hp = erts_alloc_message_heap_state(msize, - &bp, - &ohp, - receiver, - receiver_locks, - &state); - BM_SWAP_TIMER(send,copy); - message = copy_struct(message, msize, &hp, ohp); - BM_MESSAGE_COPIED(msz); - BM_SWAP_TIMER(copy,send); + if (receiver == sender && !(receiver_state & ERTS_PSFLG_OFF_HEAP_MSGQ)) { + mp = erts_alloc_message(0, NULL); + msize = 0; + } + else { + BM_SWAP_TIMER(send,size); + msize = size_object(message); + BM_SWAP_TIMER(size,send); + + mp = erts_alloc_message_heap_state(receiver, + &receiver_state, + receiver_locks, + msize, + &hp, + &ohp); + BM_SWAP_TIMER(send,copy); + if (is_not_immed(message)) + message = copy_struct(message, msize, &hp, ohp); + BM_MESSAGE_COPIED(msz); + BM_SWAP_TIMER(copy,send); + } DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); - res = queue_message(sender, - receiver, - receiver_locks, - &state, - bp, - message, - token + } + + res = queue_message(sender, + receiver, + &receiver_state, + receiver_locks, + mp, + message, + token #ifdef USE_VM_PROBES - , NIL + , utag #endif - ); - BM_SWAP_TIMER(send,system); - } - return res; + ); + + BM_SWAP_TIMER(send,system); + + return res; } /* @@ -1075,7 +817,8 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp, Uint sz_from; Eterm* hp; Eterm temptoken; - ErlHeapFragment* bp = NULL; + ErtsMessage* mp; + ErlOffHeap *ohp; if (token != NIL #ifdef USE_VM_PROBES @@ -1087,36 +830,483 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp, sz_reason = size_object(reason); sz_token = size_object(token); sz_from = size_object(from); - bp = new_message_buffer(sz_reason + sz_from + sz_token + 4); - hp = bp->mem; - mess = copy_struct(reason, sz_reason, &hp, &bp->off_heap); - from_copy = copy_struct(from, sz_from, &hp, &bp->off_heap); + mp = erts_alloc_message_heap(to, to_locksp, + sz_reason + sz_from + sz_token + 4, + &hp, &ohp); + mess = copy_struct(reason, sz_reason, &hp, ohp); + from_copy = copy_struct(from, sz_from, &hp, ohp); save = TUPLE3(hp, am_EXIT, from_copy, mess); hp += 4; /* the trace token must in this case be updated by the caller */ seq_trace_output(token, save, SEQ_TRACE_SEND, to->common.id, NULL); - temptoken = copy_struct(token, sz_token, &hp, &bp->off_heap); - erts_queue_message(to, to_locksp, bp, save, temptoken); + temptoken = copy_struct(token, sz_token, &hp, ohp); + erts_queue_message(to, to_locksp, mp, save, temptoken); } else { - ErlOffHeap *ohp; sz_reason = size_object(reason); sz_from = IS_CONST(from) ? 0 : size_object(from); - hp = erts_alloc_message_heap(sz_reason+sz_from+4, - &bp, - &ohp, - to, - to_locksp); + mp = erts_alloc_message_heap(to, to_locksp, + sz_reason+sz_from+4, &hp, &ohp); mess = copy_struct(reason, sz_reason, &hp, ohp); from_copy = (IS_CONST(from) ? from : copy_struct(from, sz_from, &hp, ohp)); save = TUPLE3(hp, am_EXIT, from_copy, mess); - erts_queue_message(to, to_locksp, bp, save, NIL); + erts_queue_message(to, to_locksp, mp, save, NIL); } } +void erts_save_message_in_proc(Process *p, ErtsMessage *msgp) +{ + ErlHeapFragment *hfp; + + if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) + hfp = &msgp->hfrag; + else if (msgp->data.attached) { + hfp = msgp->data.heap_frag; + } + else { + erts_free_message(msgp); + return; /* Nothing to save */ + } + + while (1) { + struct erl_off_heap_header *ohhp = hfp->off_heap.first; + if (ohhp) { + for ( ; ohhp->next; ohhp = ohhp->next) + ; + ohhp->next = p->off_heap.first; + p->off_heap.first = hfp->off_heap.first; + hfp->off_heap.first = NULL; + } + p->off_heap.overhead += hfp->off_heap.overhead; + hfp->off_heap.overhead = 0; + p->mbuf_sz += hfp->used_size; + + if (!hfp->next) + break; + hfp = hfp->next; + } + + msgp->next = p->msg_frag; + p->msg_frag = msgp; +} + +Sint +erts_move_messages_off_heap(Process *c_p) +{ + int reds = 1; + /* + * Move all messages off heap. This *only* occurs when the + * process had off heap message disabled and just enabled + * it... + */ + ErtsMessage *mp; + + reds += c_p->msg.len / 10; + + ASSERT(erts_smp_atomic32_read_nob(&c_p->state) + & ERTS_PSFLG_OFF_HEAP_MSGQ); + ASSERT(c_p->flags & F_OFF_HEAP_MSGQ_CHNG); + + for (mp = c_p->msg.first; mp; mp = mp->next) { + Uint msg_sz, token_sz; +#ifdef USE_VM_PROBES + Uint utag_sz; +#endif + Eterm *hp; + ErlHeapFragment *hfrag; + + if (mp->data.attached) + continue; + + if (is_immed(ERL_MESSAGE_TERM(mp)) +#ifdef USE_VM_PROBES + && is_immed(ERL_MESSAGE_DT_UTAG(mp)) +#endif + && is_not_immed(ERL_MESSAGE_TOKEN(mp))) + continue; + + /* + * The message refers into the heap. Copy the message + * from the heap into a heap fragment and attach + * it to the message... + */ + msg_sz = size_object(ERL_MESSAGE_TERM(mp)); +#ifdef USE_VM_PROBES + utag_sz = size_object(ERL_MESSAGE_DT_UTAG(mp)); +#endif + token_sz = size_object(ERL_MESSAGE_TOKEN(mp)); + + hfrag = new_message_buffer(msg_sz +#ifdef USE_VM_PROBES + + utag_sz +#endif + + token_sz); + hp = hfrag->mem; + if (is_not_immed(ERL_MESSAGE_TERM(mp))) + ERL_MESSAGE_TERM(mp) = copy_struct(ERL_MESSAGE_TERM(mp), + msg_sz, &hp, + &hfrag->off_heap); + if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) + ERL_MESSAGE_TOKEN(mp) = copy_struct(ERL_MESSAGE_TOKEN(mp), + token_sz, &hp, + &hfrag->off_heap); +#ifdef USE_VM_PROBES + if (is_not_immed(ERL_MESSAGE_DT_UTAG(mp))) + ERL_MESSAGE_DT_UTAG(mp) = copy_struct(ERL_MESSAGE_DT_UTAG(mp), + utag_sz, &hp, + &hfrag->off_heap); +#endif + mp->data.heap_frag = hfrag; + reds += 1; + } + + return reds; +} + +Sint +erts_complete_off_heap_message_queue_change(Process *c_p) +{ + int reds = 1; + + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); + ASSERT(c_p->flags & F_OFF_HEAP_MSGQ_CHNG); + ASSERT(erts_smp_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ); + + /* + * This job was first initiated when the process changed + * "off heap message queue" state from false to true. Since + * then ERTS_PSFLG_OFF_HEAP_MSGQ has been set. However, the + * state change might have been changed again (multiple times) + * since then. Check users last requested state (the flag + * F_OFF_HEAP_MSGQ), and make the state consistent with that. + */ + + if (!(c_p->flags & F_OFF_HEAP_MSGQ)) + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_OFF_HEAP_MSGQ); + else { + reds += 2; + erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p); + erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); + reds += erts_move_messages_off_heap(c_p); + } + c_p->flags &= ~F_OFF_HEAP_MSGQ_CHNG; + return reds; +} + +typedef struct { + Eterm pid; + ErtsThrPrgrLaterOp lop; +} ErtsChangeOffHeapMessageQueue; + +static void +change_off_heap_msgq(void *vcohmq) +{ + ErtsChangeOffHeapMessageQueue *cohmq; + /* + * Now we've waited thread progress which ensures that all + * messages to the process are enqueued off heap. Schedule + * completion of this change as a system task on the process + * itself. This in order to avoid lock contention on its + * main lock. We will be called in + * erts_complete_off_heap_message_queue_change() (above) when + * the system task has been selected for execution. + */ + cohmq = (ErtsChangeOffHeapMessageQueue *) vcohmq; + erts_schedule_complete_off_heap_message_queue_change(cohmq->pid); + erts_free(ERTS_ALC_T_MSGQ_CHNG, vcohmq); +} + +Eterm +erts_change_off_heap_message_queue_state(Process *c_p, int enable) +{ + +#ifdef DEBUG + if (c_p->flags & F_OFF_HEAP_MSGQ) { + ASSERT(erts_smp_atomic32_read_nob(&c_p->state) + & ERTS_PSFLG_OFF_HEAP_MSGQ); + } + else { + if (c_p->flags & F_OFF_HEAP_MSGQ_CHNG) { + ASSERT(erts_smp_atomic32_read_nob(&c_p->state) + & ERTS_PSFLG_OFF_HEAP_MSGQ); + } + else { + ASSERT(!(erts_smp_atomic32_read_nob(&c_p->state) + & ERTS_PSFLG_OFF_HEAP_MSGQ)); + } + } +#endif + + if (c_p->flags & F_OFF_HEAP_MSGQ) { + /* Off heap message queue is enabled */ + + if (!enable) { + c_p->flags &= ~F_OFF_HEAP_MSGQ; + /* + * We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ + * if a change is ongoing. It will be adjusted when the + * change completes... + */ + if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { + /* Safe to clear ERTS_PSFLG_OFF_HEAP_MSGQ... */ + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_OFF_HEAP_MSGQ); + } + } + + return am_true; /* Old state */ + } + + /* Off heap message queue is disabled */ + + if (enable) { + c_p->flags |= F_OFF_HEAP_MSGQ; + /* + * We do not have to schedule a change if + * we have an ongoing change... + */ + if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { + ErtsChangeOffHeapMessageQueue *cohmq; + /* + * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait + * thread progress before completing the change in + * order to ensure that all senders observe that + * messages should be passed off heap. When the + * change has completed, GC does not need to inspect + * the message queue at all. + */ + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_OFF_HEAP_MSGQ); + c_p->flags |= F_OFF_HEAP_MSGQ_CHNG; + cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG, + sizeof(ErtsChangeOffHeapMessageQueue)); + cohmq->pid = c_p->common.id; + erts_schedule_thr_prgr_later_op(change_off_heap_msgq, + (void *) cohmq, + &cohmq->lop); + } + } + + return am_false; /* Old state */ +} + +int +erts_decode_dist_message(Process *proc, ErtsProcLocks proc_locks, + ErtsMessage *msgp, int force_off_heap) +{ + ErtsHeapFactory factory; + Eterm msg; + ErlHeapFragment *bp; + Sint need; + int decode_in_heap_frag; + + decode_in_heap_frag = (force_off_heap + || !(proc_locks & ERTS_PROC_LOCK_MAIN) + || (proc->flags & F_OFF_HEAP_MSGQ)); + + if (msgp->data.dist_ext->heap_size >= 0) + need = msgp->data.dist_ext->heap_size; + else { + need = erts_decode_dist_ext_size(msgp->data.dist_ext); + if (need < 0) { + /* bad msg; remove it... */ + if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { + bp = erts_dist_ext_trailer(msgp->data.dist_ext); + erts_cleanup_offheap(&bp->off_heap); + } + erts_free_dist_ext_copy(msgp->data.dist_ext); + msgp->data.dist_ext = NULL; + return 0; + } + + msgp->data.dist_ext->heap_size = need; + } + + if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { + bp = erts_dist_ext_trailer(msgp->data.dist_ext); + need += bp->used_size; + } + + if (decode_in_heap_frag) + erts_factory_heap_frag_init(&factory, new_message_buffer(need)); + else + erts_factory_proc_prealloc_init(&factory, proc, need); + + ASSERT(msgp->data.dist_ext->heap_size >= 0); + if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { + ErlHeapFragment *heap_frag; + heap_frag = erts_dist_ext_trailer(msgp->data.dist_ext); + ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp), + heap_frag->used_size, + &factory.hp, + factory.off_heap); + erts_cleanup_offheap(&heap_frag->off_heap); + } + + msg = erts_decode_dist_ext(&factory, msgp->data.dist_ext); + ERL_MESSAGE_TERM(msgp) = msg; + erts_free_dist_ext_copy(msgp->data.dist_ext); + msgp->data.attached = NULL; + + if (is_non_value(msg)) { + erts_factory_undo(&factory); + return 0; + } + + erts_factory_trim_and_close(&factory, msgp->m, + ERL_MESSAGE_REF_ARRAY_SZ); + + ASSERT(!msgp->data.heap_frag); + + if (decode_in_heap_frag) + msgp->data.heap_frag = factory.heap_frags; + + return 1; +} + +/* + * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0 will move off heap messages + * into the heap of the inspected process if off_heap_message_queue + * is false when process_info(_, messages) is called. That is, the + * following GC will have more data in the rootset compared to the + * scenario when process_info(_, messages) had not been called. + * + * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS != 0 will keep off heap messages + * off heap when process_info(_, messages) is called regardless of + * the off_heap_message_queue setting of the process. That is, it + * will change the following execution of the process as little as + * possible. + */ +#define ERTS_INSPECT_MSGQ_KEEP_OH_MSGS 1 + +Uint +erts_prep_msgq_for_inspection(Process *c_p, Process *rp, + ErtsProcLocks rp_locks, ErtsMessageInfo *mip) +{ + Uint tot_heap_size; + ErtsMessage* mp; + Sint i; + int self_on_heap; + + /* + * Prepare the message queue for inspection + * by process_info(). + * + * + * - Decode all messages on external format + * - Remove all corrupt dist messages from queue + * - Save pointer to, and heap size need of each + * message in the mip array. + * - Return total heap size need for all messages + * that needs to be copied. + * + * If ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0: + * - In case off heap messages is disabled and + * we are inspecting our own queue, move all + * off heap data into the heap. + */ + + self_on_heap = c_p == rp && !(c_p->flags & F_OFF_HEAP_MSGQ); + + tot_heap_size = 0; + i = 0; + mp = rp->msg.first; + while (mp) { + Eterm msg = ERL_MESSAGE_TERM(mp); + + mip[i].size = 0; + + if (is_non_value(msg)) { + /* Dist message on external format; decode it... */ + if (mp->data.attached) + erts_decode_dist_message(rp, rp_locks, mp, + ERTS_INSPECT_MSGQ_KEEP_OH_MSGS); + + msg = ERL_MESSAGE_TERM(mp); + + if (is_non_value(msg)) { + ErtsMessage **mpp; + ErtsMessage *bad_mp = mp; + /* + * Bad distribution message; remove + * it from the queue... + */ + ASSERT(!mp->data.attached); + + mpp = i == 0 ? &rp->msg.first : &mip[i-1].msgp->next; + + if (rp->msg.save == &bad_mp->next) + rp->msg.save = mpp; + if (rp->msg.last == &bad_mp->next) + rp->msg.last = mpp; + mp = mp->next; + *mpp = mp; + rp->msg.len--; + bad_mp->next = NULL; + erts_cleanup_messages(bad_mp); + continue; + } + } + + ASSERT(is_value(msg)); + +#if ERTS_INSPECT_MSGQ_KEEP_OH_MSGS + if (is_not_immed(msg) && (!self_on_heap || mp->data.attached)) { + Uint sz = size_object(msg); + mip[i].size = sz; + tot_heap_size += sz; + } +#else + if (self_on_heap) { + if (mp->data.attached) { + ErtsMessage *tmp = NULL; + if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) { + erts_link_mbuf_to_proc(rp, mp->data.heap_frag); + mp->data.attached = NULL; + } + else { + /* + * Need to replace the message reference since + * we will get references to the message data + * from the heap... + */ + ErtsMessage **mpp; + tmp = erts_alloc_message(0, NULL); + sys_memcpy((void *) tmp->m, (void *) mp->m, + sizeof(Eterm)*ERL_MESSAGE_REF_ARRAY_SZ); + mpp = i == 0 ? &rp->msg.first : &mip[i-1].msgp->next; + tmp->next = mp->next; + if (rp->msg.save == &mp->next) + rp->msg.save = &tmp->next; + if (rp->msg.last == &mp->next) + rp->msg.last = &tmp->next; + *mpp = tmp; + erts_save_message_in_proc(rp, mp); + mp = tmp; + } + } + } + else if (is_not_immed(msg)) { + Uint sz = size_object(msg); + mip[i].size = sz; + tot_heap_size += sz; + } + +#endif + + mip[i].msgp = mp; + i++; + mp = mp->next; + } + + return tot_heap_size; +} + void erts_factory_proc_init(ErtsHeapFactory* factory, Process* p) { @@ -1127,47 +1317,138 @@ void erts_factory_proc_prealloc_init(ErtsHeapFactory* factory, Process* p, Sint size) { + ErlHeapFragment *bp = p->mbuf; factory->mode = FACTORY_HALLOC; factory->p = p; factory->hp_start = HAlloc(p, size); factory->hp = factory->hp_start; factory->hp_end = factory->hp_start + size; factory->off_heap = &p->off_heap; + factory->message = NULL; factory->off_heap_saved.first = p->off_heap.first; factory->off_heap_saved.overhead = p->off_heap.overhead; - factory->heap_frags_saved = p->mbuf; + factory->heap_frags_saved = bp; + factory->heap_frags_saved_used = bp ? bp->used_size : 0; factory->heap_frags = NULL; /* not used */ factory->alloc_type = 0; /* not used */ } -void erts_factory_message_init(ErtsHeapFactory* factory, - Process* rp, - Eterm* hp, - ErlHeapFragment* bp) +void erts_factory_heap_frag_init(ErtsHeapFactory* factory, + ErlHeapFragment* bp) +{ + factory->mode = FACTORY_HEAP_FRAGS; + factory->p = NULL; + factory->hp_start = bp->mem; + factory->hp = bp->mem; + factory->hp_end = bp->mem + bp->alloc_size; + factory->off_heap = &bp->off_heap; + factory->message = NULL; + factory->heap_frags = bp; + factory->heap_frags_saved = NULL; + factory->heap_frags_saved_used = 0; + factory->alloc_type = ERTS_ALC_T_HEAP_FRAG; + ASSERT(!bp->next); + factory->off_heap_saved.first = factory->off_heap->first; + factory->off_heap_saved.overhead = factory->off_heap->overhead; + + ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end); +} + + +ErtsMessage * +erts_factory_message_create(ErtsHeapFactory* factory, + Process *proc, + ErtsProcLocks *proc_locksp, + Uint sz) +{ + Eterm *hp; + ErlOffHeap *ohp; + ErtsMessage *msgp; + int on_heap; + erts_aint32_t state; + + state = erts_smp_atomic32_read_nob(&proc->state); + + if (state & ERTS_PSFLG_OFF_HEAP_MSGQ) { + msgp = erts_alloc_message(sz, &hp); + ohp = sz == 0 ? NULL : &msgp->hfrag.off_heap; + on_heap = 0; + } + else { + msgp = erts_try_alloc_message_on_heap(proc, &state, + proc_locksp, + sz, &hp, &ohp, + &on_heap); + } + + if (on_heap) { + ASSERT(*proc_locksp & ERTS_PROC_LOCK_MAIN); + ASSERT(ohp == &proc->off_heap); + factory->mode = FACTORY_HALLOC; + factory->p = proc; + factory->heap_frags_saved = proc->mbuf; + factory->heap_frags_saved_used = proc->mbuf ? proc->mbuf->used_size : 0; + } + else { + factory->mode = FACTORY_MESSAGE; + factory->p = NULL; + factory->heap_frags_saved = NULL; + factory->heap_frags_saved_used = 0; + + if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) { + ASSERT(!msgp->hfrag.next); + factory->heap_frags = NULL; + } + else { + ASSERT(!msgp->data.heap_frag + || !msgp->data.heap_frag->next); + factory->heap_frags = msgp->data.heap_frag; + } + } + factory->hp_start = hp; + factory->hp = hp; + factory->hp_end = hp + sz; + factory->message = msgp; + factory->off_heap = ohp; + factory->alloc_type = ERTS_ALC_T_HEAP_FRAG; + if (ohp) { + factory->off_heap_saved.first = ohp->first; + factory->off_heap_saved.overhead = ohp->overhead; + } + else { + factory->off_heap_saved.first = NULL; + factory->off_heap_saved.overhead = 0; + } + + ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end); + + return msgp; +} + +void erts_factory_selfcontained_message_init(ErtsHeapFactory* factory, + ErtsMessage *msgp, + Eterm *hp) { - if (bp) { - factory->mode = FACTORY_HEAP_FRAGS; - factory->p = NULL; - factory->hp_start = bp->mem; - factory->hp = hp ? hp : bp->mem; - factory->hp_end = bp->mem + bp->alloc_size; - factory->off_heap = &bp->off_heap; - factory->heap_frags = bp; - factory->heap_frags_saved = bp; - factory->alloc_type = ERTS_ALC_T_HEAP_FRAG; - ASSERT(!bp->next); + ErlHeapFragment* bp; + if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) { + bp = &msgp->hfrag; + factory->heap_frags = NULL; } else { - factory->mode = FACTORY_HALLOC; - factory->p = rp; - factory->hp_start = hp; - factory->hp = hp; - factory->hp_end = HEAP_TOP(rp); - factory->off_heap = &rp->off_heap; - factory->heap_frags_saved = rp->mbuf; - factory->heap_frags = NULL; /* not used */ - factory->alloc_type = 0; /* not used */ + bp = msgp->data.heap_frag; + factory->heap_frags = bp; } + factory->mode = FACTORY_MESSAGE; + factory->p = NULL; + factory->hp_start = bp->mem; + factory->hp = hp; + factory->hp_end = bp->mem + bp->alloc_size; + factory->message = msgp; + factory->off_heap = &bp->off_heap; + factory->heap_frags_saved = NULL; + factory->heap_frags_saved_used = 0; + factory->alloc_type = ERTS_ALC_T_HEAP_FRAG; + ASSERT(!bp->next); factory->off_heap_saved.first = factory->off_heap->first; factory->off_heap_saved.overhead = factory->off_heap->overhead; @@ -1230,8 +1511,16 @@ static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra) factory->hp_end = factory->hp + need; return; - case FACTORY_HEAP_FRAGS: - bp = factory->heap_frags; + case FACTORY_MESSAGE: + if (!factory->heap_frags) { + ASSERT(factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG); + bp = &factory->message->hfrag; + } + else { + /* Fall through */ + case FACTORY_HEAP_FRAGS: + bp = factory->heap_frags; + } if (bp) { ASSERT(factory->hp > bp->mem); @@ -1269,8 +1558,23 @@ void erts_factory_close(ErtsHeapFactory* factory) HRelease(factory->p, factory->hp_end, factory->hp); break; - case FACTORY_HEAP_FRAGS: - bp = factory->heap_frags; + case FACTORY_MESSAGE: + if (!factory->heap_frags) { + if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG) + bp = &factory->message->hfrag; + else + bp = NULL; + } + else { + if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG) + factory->message->hfrag.next = factory->heap_frags; + else + factory->message->data.heap_frag = factory->heap_frags; + + /* Fall through */ + case FACTORY_HEAP_FRAGS: + bp = factory->heap_frags; + } if (bp) { ASSERT(factory->hp >= bp->mem); @@ -1291,17 +1595,47 @@ void erts_factory_close(ErtsHeapFactory* factory) void erts_factory_trim_and_close(ErtsHeapFactory* factory, Eterm *brefs, Uint brefs_size) { - if (factory->mode == FACTORY_HEAP_FRAGS) { - ErlHeapFragment* bp = factory->heap_frags; + ErlHeapFragment *bp; + + switch (factory->mode) { + case FACTORY_MESSAGE: { + ErtsMessage *mp = factory->message; + if (mp->data.attached == ERTS_MSG_COMBINED_HFRAG) { + if (!mp->hfrag.next) { + Uint sz = factory->hp - factory->hp_start; + mp = erts_shrink_message(mp, sz, brefs, brefs_size); + factory->message = mp; + factory->mode = FACTORY_CLOSED; + return; + } + /*else we don't trim multi fragmented messages for now (off_heap...) */ + break; + } + /* Fall through... */ + } + case FACTORY_HEAP_FRAGS: + bp = factory->heap_frags; + if (!bp) + break; if (bp->next == NULL) { Uint used_sz = factory->hp - bp->mem; ASSERT(used_sz <= bp->alloc_size); - factory->heap_frags = erts_resize_message_buffer(bp, used_sz, - brefs, brefs_size); + if (used_sz > 0) + bp = erts_resize_message_buffer(bp, used_sz, + brefs, brefs_size); + else { + free_message_buffer(bp); + bp = NULL; + } + factory->heap_frags = bp; + if (factory->mode == FACTORY_MESSAGE) + factory->message->data.heap_frag = bp; factory->mode = FACTORY_CLOSED; return; } - /*else we don't trim multi fragmented messages for now */ + /*else we don't trim multi fragmented messages for now (off_heap...) */ + default: + break; } erts_factory_close(factory); } @@ -1349,38 +1683,35 @@ void erts_factory_undo(ErtsHeapFactory* factory) /* Rollback heap top */ - if (factory->heap_frags_saved == NULL) { /* No heap frags when we started */ - ASSERT(factory->hp_start >= HEAP_START(factory->p)); - ASSERT(factory->hp_start <= HEAP_LIMIT(factory->p)); - HEAP_TOP(factory->p) = factory->hp_start; - } - else { + if (HEAP_START(factory->p) <= factory->hp_start + && factory->hp_start <= HEAP_LIMIT(factory->p)) { + HEAP_TOP(factory->p) = factory->hp_start; + } + + /* Fix last heap frag */ + if (factory->heap_frags_saved) { ASSERT(factory->heap_frags_saved == factory->p->mbuf); - if (factory->hp_start == factory->heap_frags_saved->mem) { + if (factory->hp_start != factory->heap_frags_saved->mem) + factory->heap_frags_saved->used_size = factory->heap_frags_saved_used; + else { factory->p->mbuf = factory->p->mbuf->next; ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, factory->heap_frags_saved, ERTS_HEAP_FRAG_SIZE(factory->heap_frags_saved->alloc_size)); } - else if (factory->hp_start != factory->hp_end) { - unsigned remains = factory->hp_start - factory->heap_frags_saved->mem; - ASSERT(remains > 0 && remains < factory->heap_frags_saved->used_size); - factory->heap_frags_saved->used_size = remains; - } } } break; + case FACTORY_MESSAGE: + if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG) + factory->message->hfrag.next = factory->heap_frags; + else + factory->message->data.heap_frag = factory->heap_frags; + erts_cleanup_messages(factory->message); + break; case FACTORY_HEAP_FRAGS: - bp = factory->heap_frags; - do { - ErlHeapFragment* next_bp = bp->next; - - erts_cleanup_offheap(&bp->off_heap); - ERTS_HEAP_FREE(factory->alloc_type, (void *) bp, - ERTS_HEAP_FRAG_SIZE(bp->size)); - bp = next_bp; - }while (bp != NULL); + free_message_buffer(factory->heap_frags); break; case FACTORY_CLOSED: break; -- cgit v1.2.3 From f3ae60838a5e8c83cfdd3000e25562dfcbbd438d Mon Sep 17 00:00:00 2001 From: "Nikolaos S. Papaspyrou" Date: Sat, 9 Jun 2012 01:12:50 +0300 Subject: Enable shcopy for sending messages --- erts/emulator/beam/erl_message.c | 114 +++++++++++++++++++++++++-------------- 1 file changed, 73 insertions(+), 41 deletions(-) (limited to 'erts/emulator/beam/erl_message.c') diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 79739501a8..b1fca1df0c 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -68,6 +68,9 @@ new_message_buffer(Uint size) bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP_FRAG, ERTS_HEAP_FRAG_SIZE(size)); ERTS_INIT_HEAP_FRAG(bp, size, size); +#ifdef SHCOPY_DEBUG + VERBOSE_DEBUG("[pid=%T] new message buffer %p\n", erts_get_current_pid(), bp->mem); +#endif return bp; } @@ -248,36 +251,6 @@ erts_realloc_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_s return nmp; } -void -erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp) -{ - if (first_bp) { - ErlHeapFragment *bp = first_bp; - - while (1) { - /* Move any off_heap's into the process */ - if (bp->off_heap.first != NULL) { - struct erl_off_heap_header** next_p = &bp->off_heap.first; - while (*next_p != NULL) { - next_p = &((*next_p)->next); - } - *next_p = MSO(proc).first; - MSO(proc).first = bp->off_heap.first; - bp->off_heap.first = NULL; - OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead); - } - MBUF_SIZE(proc) += bp->used_size; - if (!bp->next) - break; - bp = bp->next; - } - - /* Link the message buffer */ - bp->next = MBUF(proc); - MBUF(proc) = first_bp; - } -} - void erts_queue_dist_message(Process *rcvr, ErtsProcLocks *rcvr_locks, @@ -540,6 +513,36 @@ erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks, ); } +void +erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp) +{ + if (first_bp) { + ErlHeapFragment *bp = first_bp; + + while (1) { + /* Move any off_heap's into the process */ + if (bp->off_heap.first != NULL) { + struct erl_off_heap_header** next_p = &bp->off_heap.first; + while (*next_p != NULL) { + next_p = &((*next_p)->next); + } + *next_p = MSO(proc).first; + MSO(proc).first = bp->off_heap.first; + bp->off_heap.first = NULL; + OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead); + } + MBUF_SIZE(proc) += bp->used_size; + if (!bp->next) + break; + bp = bp->next; + } + + /* Link the message buffer */ + bp->next = MBUF(proc); + MBUF(proc) = first_bp; + } +} + Uint erts_msg_attached_data_size_aux(ErtsMessage *msg) { @@ -663,11 +666,15 @@ erts_send_message(Process* sender, Eterm utag = NIL; #endif erts_aint32_t receiver_state; +#ifdef SHCOPY_SEND + unsigned shflags = (flags & ERTS_SND_FLG_SHCOPY_MASK) >> ERTS_SND_FLG_SHCOPY_SHIFT; + erts_shcopy_t info; +#endif BM_STOP_TIMER(system); BM_MESSAGE(message,sender,receiver); BM_START_TIMER(send); - #ifdef USE_VM_PROBES +#ifdef USE_VM_PROBES *sender_name = *receiver_name = '\0'; if (DTRACE_ENABLED(message_send)) { erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), @@ -686,15 +693,23 @@ erts_send_message(Process* sender, #ifdef USE_VM_PROBES Uint dt_utag_size = 0; #endif - +#ifdef SHCOPY_SEND + unsigned shflags = (flags & ERTS_SND_FLG_SHCOPY_MASK) >> ERTS_SND_FLG_SHCOPY_SHIFT; + erts_shcopy_t info; + INITIALIZE_SHCOPY(info); +#endif BM_SWAP_TIMER(send,size); - msize = size_object(message); +#ifdef SHCOPY_SEND + INITIALIZE_SHCOPY(info); + msize = copy_shared_calculate(message, &info, shflags); +#else + msize = size_object(message); +#endif BM_SWAP_TIMER(size,send); #ifdef USE_VM_PROBES if (stoken != am_have_dt_utag) { #endif - seq_trace_update_send(sender); seq_trace_output(stoken, message, SEQ_TRACE_SEND, receiver->common.id, sender); @@ -720,14 +735,20 @@ erts_send_message(Process* sender, &ohp); BM_SWAP_TIMER(send,copy); + +#ifdef SHCOPY_SEND + if (is_not_immed(message)) + message = copy_shared_perform(message, msize, &info, &hp, ohp, shflags); + DESTROY_SHCOPY(info); +#else + if (is_not_immed(message)) + message = copy_struct(message, msize, &hp, ohp); +#endif if (is_immed(stoken)) token = stoken; else token = copy_struct(stoken, seq_trace_size, &hp, ohp); - if (is_not_immed(message)) - message = copy_struct(message, msize, &hp, ohp); - #ifdef USE_VM_PROBES if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) { if (is_immed(DT_UTAG(sender))) @@ -764,7 +785,12 @@ erts_send_message(Process* sender, } else { BM_SWAP_TIMER(send,size); - msize = size_object(message); +#ifdef SHCOPY_SEND + INITIALIZE_SHCOPY(info); + msize = copy_shared_calculate(message, &info, shflags); +#else + msize = size_object(message); +#endif BM_SWAP_TIMER(size,send); mp = erts_alloc_message_heap_state(receiver, @@ -774,8 +800,14 @@ erts_send_message(Process* sender, &hp, &ohp); BM_SWAP_TIMER(send,copy); - if (is_not_immed(message)) - message = copy_struct(message, msize, &hp, ohp); +#ifdef SHCOPY_SEND + if (is_not_immed(message)) + message = copy_shared_perform(message, msize, &info, &hp, ohp, shflags); + DESTROY_SHCOPY(info); +#else + if (is_not_immed(message)) + message = copy_struct(message, msize, &hp, ohp); +#endif BM_MESSAGE_COPIED(msz); BM_SWAP_TIMER(copy,send); } @@ -800,6 +832,7 @@ erts_send_message(Process* sender, return res; } + /* * This function delivers an EXIT message to a process * which is trapping EXITs. @@ -1725,4 +1758,3 @@ void erts_factory_undo(ErtsHeapFactory* factory) factory->heap_frags = NULL; #endif } - -- cgit v1.2.3 From 277e8e77384ed6628009243e63d62f0555d10c69 Mon Sep 17 00:00:00 2001 From: "Nikolaos S. Papaspyrou" Date: Mon, 11 Jun 2012 15:17:01 +0300 Subject: Add -debug +vc flag for debuging SHCOPY This is very verbose, you have been warned. It should work with the copy-spy.py script, which may be a bit outdated. --- erts/emulator/beam/erl_message.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'erts/emulator/beam/erl_message.c') diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index b1fca1df0c..11890a756d 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -68,9 +68,7 @@ new_message_buffer(Uint size) bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP_FRAG, ERTS_HEAP_FRAG_SIZE(size)); ERTS_INIT_HEAP_FRAG(bp, size, size); -#ifdef SHCOPY_DEBUG - VERBOSE_DEBUG("[pid=%T] new message buffer %p\n", erts_get_current_pid(), bp->mem); -#endif + VERBOSE(DEBUG_SHCOPY, ("[pid=%T] new message buffer %p\n", erts_get_current_pid(), bp->mem)); return bp; } -- cgit v1.2.3 From 3f29cf1f72f50d20ce1864f76e1a298602429ca4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn-Egil=20Dahlberg?= Date: Wed, 18 Nov 2015 16:59:08 +0100 Subject: Fix rebase of SHCOPY seq_tokens --- erts/emulator/beam/erl_message.c | 55 ++++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 27 deletions(-) (limited to 'erts/emulator/beam/erl_message.c') diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 11890a756d..66b337402d 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -691,22 +691,15 @@ erts_send_message(Process* sender, #ifdef USE_VM_PROBES Uint dt_utag_size = 0; #endif -#ifdef SHCOPY_SEND - unsigned shflags = (flags & ERTS_SND_FLG_SHCOPY_MASK) >> ERTS_SND_FLG_SHCOPY_SHIFT; - erts_shcopy_t info; - INITIALIZE_SHCOPY(info); -#endif - BM_SWAP_TIMER(send,size); -#ifdef SHCOPY_SEND - INITIALIZE_SHCOPY(info); - msize = copy_shared_calculate(message, &info, shflags); -#else - msize = size_object(message); -#endif - BM_SWAP_TIMER(size,send); + BM_SWAP_TIMER(send,size); + /* SHCOPY corrupts the heap between + * copy_shared_calculate, and + * copy_shared_perform. (it inserts move_markers like the gc). + * Make sure we don't use the heap between those instances. + */ #ifdef USE_VM_PROBES - if (stoken != am_have_dt_utag) { + if (stoken != am_have_dt_utag) { #endif seq_trace_update_send(sender); seq_trace_output(stoken, message, SEQ_TRACE_SEND, @@ -714,23 +707,31 @@ erts_send_message(Process* sender, seq_trace_size = 6; /* TUPLE5 */ #ifdef USE_VM_PROBES } - if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) { - dt_utag_size = size_object(DT_UTAG(sender)); - } else if (stoken == am_have_dt_utag ) { - stoken = NIL; - } + if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) { + dt_utag_size = size_object(DT_UTAG(sender)); + } else if (stoken == am_have_dt_utag ) { + stoken = NIL; + } +#endif + +#ifdef SHCOPY_SEND + INITIALIZE_SHCOPY(info); + msize = copy_shared_calculate(message, &info, shflags); +#else + msize = size_object(message); #endif + BM_SWAP_TIMER(size,send); - mp = erts_alloc_message_heap_state(receiver, - &receiver_state, - receiver_locks, - (msize + mp = erts_alloc_message_heap_state(receiver, + &receiver_state, + receiver_locks, + (msize #ifdef USE_VM_PROBES - + dt_utag_size + + dt_utag_size #endif - + seq_trace_size), - &hp, - &ohp); + + seq_trace_size), + &hp, + &ohp); BM_SWAP_TIMER(send,copy); -- cgit v1.2.3 From 5d764f988ab09326d24e39a172083b09ab364c6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn-Egil=20Dahlberg?= Date: Wed, 18 Nov 2015 17:58:14 +0100 Subject: Refactor sharing preserved copy flags The TMPBUF option is no longer needed due to is_literal test and NONE was only used for initial debugging. So we remove the entire option. --- erts/emulator/beam/erl_message.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'erts/emulator/beam/erl_message.c') diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 66b337402d..a964f1968b 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -665,7 +665,6 @@ erts_send_message(Process* sender, #endif erts_aint32_t receiver_state; #ifdef SHCOPY_SEND - unsigned shflags = (flags & ERTS_SND_FLG_SHCOPY_MASK) >> ERTS_SND_FLG_SHCOPY_SHIFT; erts_shcopy_t info; #endif BM_STOP_TIMER(system); @@ -716,7 +715,7 @@ erts_send_message(Process* sender, #ifdef SHCOPY_SEND INITIALIZE_SHCOPY(info); - msize = copy_shared_calculate(message, &info, shflags); + msize = copy_shared_calculate(message, &info); #else msize = size_object(message); #endif @@ -737,7 +736,7 @@ erts_send_message(Process* sender, #ifdef SHCOPY_SEND if (is_not_immed(message)) - message = copy_shared_perform(message, msize, &info, &hp, ohp, shflags); + message = copy_shared_perform(message, msize, &info, &hp, ohp); DESTROY_SHCOPY(info); #else if (is_not_immed(message)) @@ -786,7 +785,7 @@ erts_send_message(Process* sender, BM_SWAP_TIMER(send,size); #ifdef SHCOPY_SEND INITIALIZE_SHCOPY(info); - msize = copy_shared_calculate(message, &info, shflags); + msize = copy_shared_calculate(message, &info); #else msize = size_object(message); #endif @@ -801,7 +800,7 @@ erts_send_message(Process* sender, BM_SWAP_TIMER(send,copy); #ifdef SHCOPY_SEND if (is_not_immed(message)) - message = copy_shared_perform(message, msize, &info, &hp, ohp, shflags); + message = copy_shared_perform(message, msize, &info, &hp, ohp); DESTROY_SHCOPY(info); #else if (is_not_immed(message)) -- cgit v1.2.3 From 02f038355d16e9b6474837727878f58e4ca669c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn-Egil=20Dahlberg?= Date: Wed, 18 Nov 2015 20:12:38 +0100 Subject: Use sharing preserving copy error messages and exceptions --- erts/emulator/beam/erl_message.c | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) (limited to 'erts/emulator/beam/erl_message.c') diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index a964f1968b..f64385d92b 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -850,6 +850,9 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp, Eterm temptoken; ErtsMessage* mp; ErlOffHeap *ohp; +#ifdef SHCOPY_SEND + erts_shcopy_t info; +#endif if (token != NIL #ifdef USE_VM_PROBES @@ -858,13 +861,23 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp, ) { ASSERT(is_tuple(token)); - sz_reason = size_object(reason); sz_token = size_object(token); sz_from = size_object(from); +#ifdef SHCOPY_SEND + INITIALIZE_SHCOPY(info); + sz_reason = copy_shared_calculate(reason, &info); +#else + sz_reason = size_object(reason); +#endif mp = erts_alloc_message_heap(to, to_locksp, sz_reason + sz_from + sz_token + 4, &hp, &ohp); +#ifdef SHCOPY_SEND + mess = copy_shared_perform(reason, sz_reason, &info, &hp, ohp); + DESTROY_SHCOPY(info); +#else mess = copy_struct(reason, sz_reason, &hp, ohp); +#endif from_copy = copy_struct(from, sz_from, &hp, ohp); save = TUPLE3(hp, am_EXIT, from_copy, mess); hp += 4; @@ -873,13 +886,22 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp, temptoken = copy_struct(token, sz_token, &hp, ohp); erts_queue_message(to, to_locksp, mp, save, temptoken); } else { - sz_reason = size_object(reason); sz_from = IS_CONST(from) ? 0 : size_object(from); - +#ifdef SHCOPY_SEND + INITIALIZE_SHCOPY(info); + sz_reason = copy_shared_calculate(reason, &info); +#else + sz_reason = size_object(reason); +#endif mp = erts_alloc_message_heap(to, to_locksp, sz_reason+sz_from+4, &hp, &ohp); +#ifdef SHCOPY_SEND + mess = copy_shared_perform(reason, sz_reason, &info, &hp, ohp); + DESTROY_SHCOPY(info); +#else mess = copy_struct(reason, sz_reason, &hp, ohp); +#endif from_copy = (IS_CONST(from) ? from : copy_struct(from, sz_from, &hp, ohp)); -- cgit v1.2.3 From 6ff15f23c68db356bbad6ab5f939c191b58d453d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn-Egil=20Dahlberg?= Date: Thu, 19 Nov 2015 19:36:08 +0100 Subject: Refactor have seq_trace token test --- erts/emulator/beam/erl_message.c | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) (limited to 'erts/emulator/beam/erl_message.c') diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index f64385d92b..1811651a58 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -314,7 +314,7 @@ erts_queue_dist_message(Process *rcvr, DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); dtrace_proc_str(rcvr, receiver_name); - if (token != NIL && token != am_have_dt_utag) { + if (have_seqtrace(token)) { tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); @@ -335,7 +335,7 @@ erts_queue_dist_message(Process *rcvr, DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); dtrace_proc_str(rcvr, receiver_name); - if (token != NIL && token != am_have_dt_utag) { + if (have_seqtrace(token)) { tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); @@ -697,15 +697,13 @@ erts_send_message(Process* sender, * copy_shared_perform. (it inserts move_markers like the gc). * Make sure we don't use the heap between those instances. */ -#ifdef USE_VM_PROBES - if (stoken != am_have_dt_utag) { -#endif + if (have_seqtrace(stoken)) { seq_trace_update_send(sender); seq_trace_output(stoken, message, SEQ_TRACE_SEND, receiver->common.id, sender); seq_trace_size = 6; /* TUPLE5 */ -#ifdef USE_VM_PROBES } +#ifdef USE_VM_PROBES if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) { dt_utag_size = size_object(DT_UTAG(sender)); } else if (stoken == am_have_dt_utag ) { @@ -765,7 +763,7 @@ erts_send_message(Process* sender, #ifdef USE_VM_PROBES if (DTRACE_ENABLED(message_send)) { - if (stoken != NIL && stoken != am_have_dt_utag) { + if (have_seqtrace(stoken)) { tok_label = signed_val(SEQ_TRACE_T_LABEL(stoken)); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(stoken)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(stoken)); @@ -854,12 +852,7 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp, erts_shcopy_t info; #endif - if (token != NIL -#ifdef USE_VM_PROBES - && token != am_have_dt_utag -#endif - ) { - + if (have_seqtrace(token)) { ASSERT(is_tuple(token)); sz_token = size_object(token); sz_from = size_object(from); -- cgit v1.2.3 From bc40e21224e871c9bcbb5c2f27854c308bc4fe01 Mon Sep 17 00:00:00 2001 From: Luca Favatella Date: Sun, 6 Dec 2015 21:37:42 +0000 Subject: Fix compilation with `--enable-vm-probes` ... broken by 3ac08f9b. Compilation error: ``` beam/erl_message.c: In function 'erts_send_message': beam/erl_message.c:753:56: error: macro "copy_struct" requires 4 arguments, but only 3 given utag = copy_struct(DT_UTAG(sender), dt_utag_size, ohp); ^ beam/erl_message.c:753:10: error: 'copy_struct' undeclared (first use in this function) utag = copy_struct(DT_UTAG(sender), dt_utag_size, ohp); ^ beam/erl_message.c:753:10: note: each undeclared identifier is reported only once for each function it appears in ``` --- erts/emulator/beam/erl_message.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'erts/emulator/beam/erl_message.c') diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 1811651a58..d593108f8e 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -750,7 +750,7 @@ erts_send_message(Process* sender, if (is_immed(DT_UTAG(sender))) utag = DT_UTAG(sender); else - utag = copy_struct(DT_UTAG(sender), dt_utag_size, ohp); + utag = copy_struct(DT_UTAG(sender), dt_utag_size, &hp, ohp); #ifdef DTRACE_TAG_HARDDEBUG erts_fprintf(stderr, "Dtrace -> (%T) Spreading tag (%T) with " -- cgit v1.2.3 From 19c4689eea86f26c5af9b8f712c227ce4f62310b Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Tue, 24 Nov 2015 15:57:55 +0100 Subject: Replace off_heap_message_queue option with message_queue_data option The message_queue_data option can have the values - off_heap - on_heap - mixed --- erts/emulator/beam/erl_message.c | 160 +++++++++++++++++++++++++++++---------- 1 file changed, 118 insertions(+), 42 deletions(-) (limited to 'erts/emulator/beam/erl_message.c') diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 79739501a8..797212450c 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -629,9 +629,24 @@ erts_try_alloc_message_on_heap(Process *pp, #endif else { in_message_fragment: - - mp = erts_alloc_message(sz, hpp); - *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; + if (!((*psp) & ERTS_PSFLG_ON_HEAP_MSGQ)) { + mp = erts_alloc_message(sz, hpp); + *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; + } + else { + mp = erts_alloc_message(0, NULL); + if (!sz) { + *hpp = NULL; + *ohpp = NULL; + } + else { + ErlHeapFragment *bp; + bp = new_message_buffer(sz); + *hpp = &bp->mem[0]; + mp->data.heap_frag = bp; + *ohpp = &bp->off_heap; + } + } *on_heap_p = 0; } @@ -976,12 +991,12 @@ erts_complete_off_heap_message_queue_change(Process *c_p) ASSERT(erts_smp_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ); /* - * This job was first initiated when the process changed - * "off heap message queue" state from false to true. Since - * then ERTS_PSFLG_OFF_HEAP_MSGQ has been set. However, the - * state change might have been changed again (multiple times) - * since then. Check users last requested state (the flag - * F_OFF_HEAP_MSGQ), and make the state consistent with that. + * This job was first initiated when the process changed to off heap + * message queue management. Since then ERTS_PSFLG_OFF_HEAP_MSGQ + * has been set. However, the management state might have been changed + * again (multiple times) since then. Check users last requested state + * (the flags F_OFF_HEAP_MSGQ, and F_ON_HEAP_MSGQ), and make the state + * consistent with that. */ if (!(c_p->flags & F_OFF_HEAP_MSGQ)) @@ -1022,8 +1037,9 @@ change_off_heap_msgq(void *vcohmq) } Eterm -erts_change_off_heap_message_queue_state(Process *c_p, int enable) +erts_change_message_queue_management(Process *c_p, Eterm new_state) { + Eterm res; #ifdef DEBUG if (c_p->flags & F_OFF_HEAP_MSGQ) { @@ -1042,57 +1058,117 @@ erts_change_off_heap_message_queue_state(Process *c_p, int enable) } #endif - if (c_p->flags & F_OFF_HEAP_MSGQ) { - /* Off heap message queue is enabled */ + switch (c_p->flags & (F_OFF_HEAP_MSGQ|F_ON_HEAP_MSGQ)) { - if (!enable) { + case F_OFF_HEAP_MSGQ: + res = am_off_heap; + + switch (new_state) { + case am_off_heap: + break; + case am_on_heap: + c_p->flags |= F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_ON_HEAP_MSGQ); + /* fall through */ + case am_mixed: c_p->flags &= ~F_OFF_HEAP_MSGQ; /* * We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ - * if a change is ongoing. It will be adjusted when the - * change completes... + * if a off heap change is ongoing. It will be adjusted + * when the change completes... */ if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { /* Safe to clear ERTS_PSFLG_OFF_HEAP_MSGQ... */ erts_smp_atomic32_read_band_nob(&c_p->state, ~ERTS_PSFLG_OFF_HEAP_MSGQ); } + break; + default: + res = THE_NON_VALUE; /* badarg */ + break; } + break; + + case F_ON_HEAP_MSGQ: + res = am_on_heap; - return am_true; /* Old state */ + switch (new_state) { + case am_on_heap: + break; + case am_mixed: + c_p->flags &= ~F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_ON_HEAP_MSGQ); + break; + case am_off_heap: + c_p->flags &= ~F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_ON_HEAP_MSGQ); + goto change_to_off_heap; + default: + res = THE_NON_VALUE; /* badarg */ + break; + } + break; + + case 0: + res = am_mixed; + + switch (new_state) { + case am_mixed: + break; + case am_on_heap: + c_p->flags |= F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_ON_HEAP_MSGQ); + break; + case am_off_heap: + goto change_to_off_heap; + default: + res = THE_NON_VALUE; /* badarg */ + break; + } + break; + + default: + res = am_error; + ERTS_INTERNAL_ERROR("Inconsistent message queue management state"); + break; } - /* Off heap message queue is disabled */ + return res; + +change_to_off_heap: - if (enable) { - c_p->flags |= F_OFF_HEAP_MSGQ; + c_p->flags |= F_OFF_HEAP_MSGQ; + + /* + * We do not have to schedule a change if + * we have an ongoing off heap change... + */ + if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { + ErtsChangeOffHeapMessageQueue *cohmq; /* - * We do not have to schedule a change if - * we have an ongoing change... + * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait + * thread progress before completing the change in + * order to ensure that all senders observe that + * messages should be passed off heap. When the + * change has completed, GC does not need to inspect + * the message queue at all. */ - if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { - ErtsChangeOffHeapMessageQueue *cohmq; - /* - * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait - * thread progress before completing the change in - * order to ensure that all senders observe that - * messages should be passed off heap. When the - * change has completed, GC does not need to inspect - * the message queue at all. - */ - erts_smp_atomic32_read_bor_nob(&c_p->state, - ERTS_PSFLG_OFF_HEAP_MSGQ); - c_p->flags |= F_OFF_HEAP_MSGQ_CHNG; - cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG, - sizeof(ErtsChangeOffHeapMessageQueue)); - cohmq->pid = c_p->common.id; - erts_schedule_thr_prgr_later_op(change_off_heap_msgq, - (void *) cohmq, - &cohmq->lop); - } + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_OFF_HEAP_MSGQ); + c_p->flags |= F_OFF_HEAP_MSGQ_CHNG; + cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG, + sizeof(ErtsChangeOffHeapMessageQueue)); + cohmq->pid = c_p->common.id; + erts_schedule_thr_prgr_later_op(change_off_heap_msgq, + (void *) cohmq, + &cohmq->lop); } - return am_false; /* Old state */ + return res; } int -- cgit v1.2.3 From e9d6797e15e687828e5ef0d33fb790181d657779 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 21 Jan 2016 19:54:46 +0100 Subject: erts: Fix faulty assert for non-smp --- erts/emulator/beam/erl_message.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'erts/emulator/beam/erl_message.c') diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index b3e74e3e6a..88efb2c59f 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -1504,7 +1504,7 @@ erts_factory_message_create(ErtsHeapFactory* factory, } if (on_heap) { - ASSERT(*proc_locksp & ERTS_PROC_LOCK_MAIN); + ERTS_SMP_ASSERT(*proc_locksp & ERTS_PROC_LOCK_MAIN); ASSERT(ohp == &proc->off_heap); factory->mode = FACTORY_HALLOC; factory->p = proc; -- cgit v1.2.3