aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_message.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r--erts/emulator/beam/erl_message.c1590
1 files changed, 983 insertions, 607 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index ef52823287..1811651a58 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -33,8 +33,8 @@
#include "erl_binary.h"
#include "dtrace-wrapper.h"
-ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message,
- ErlMessage,
+ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message_ref,
+ ErtsMessageRef,
ERL_MESSAGE_BUF_SZ,
ERTS_ALC_T_MSG_REF)
@@ -44,27 +44,20 @@ ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message,
#undef HARD_DEBUG
#endif
-
-
-
-#ifdef DEBUG
-static ERTS_INLINE int in_heapfrag(const Eterm* ptr, const ErlHeapFragment *bp)
+void
+init_message(void)
{
- return ((unsigned)(ptr - bp->mem) < bp->used_size);
+ init_message_ref_alloc();
}
-#endif
-
-void
-init_message(void)
+void *erts_alloc_message_ref(void)
{
- init_message_alloc();
+ return (void *) message_ref_alloc();
}
-void
-free_message(ErlMessage* mp)
+void erts_free_message_ref(void *mp)
{
- message_free(mp);
+ message_ref_free((ErtsMessageRef *) mp);
}
/* Allocate message buffer (size in words) */
@@ -74,7 +67,8 @@ new_message_buffer(Uint size)
ErlHeapFragment* bp;
bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP_FRAG,
ERTS_HEAP_FRAG_SIZE(size));
- ERTS_INIT_HEAP_FRAG(bp, size);
+ ERTS_INIT_HEAP_FRAG(bp, size, size);
+ VERBOSE(DEBUG_SHCOPY, ("[pid=%T] new message buffer %p\n", erts_get_current_pid(), bp->mem));
return bp;
}
@@ -203,83 +197,57 @@ free_message_buffer(ErlHeapFragment* bp)
}while (bp != NULL);
}
-static ERTS_INLINE void
-link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp)
+void
+erts_cleanup_messages(ErtsMessage *msgp)
{
- if (bp) {
- /* Link the message buffer */
- bp->next = MBUF(proc);
- MBUF(proc) = bp;
- MBUF_SIZE(proc) += bp->used_size;
- FLAGS(proc) |= F_FORCE_GC;
-
- /* Move any off_heap's into the process */
- if (bp->off_heap.first != NULL) {
- struct erl_off_heap_header** next_p = &bp->off_heap.first;
- while (*next_p != NULL) {
- next_p = &((*next_p)->next);
+ ErtsMessage *mp = msgp;
+ while (mp) {
+ ErtsMessage *fmp;
+ ErlHeapFragment *bp;
+ if (is_non_value(ERL_MESSAGE_TERM(mp))) {
+ if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) {
+ bp = (ErlHeapFragment *) mp->data.dist_ext->ext_endp;
+ erts_cleanup_offheap(&bp->off_heap);
}
- *next_p = MSO(proc).first;
- MSO(proc).first = bp->off_heap.first;
- bp->off_heap.first = NULL;
- OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead);
+ if (mp->data.dist_ext)
+ erts_free_dist_ext_copy(mp->data.dist_ext);
}
+ else {
+ if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG)
+ bp = mp->data.heap_frag;
+ else {
+ bp = mp->hfrag.next;
+ erts_cleanup_offheap(&mp->hfrag.off_heap);
+ }
+ if (bp)
+ free_message_buffer(bp);
+ }
+ fmp = mp;
+ mp = mp->next;
+ erts_free_message(fmp);
}
}
-Eterm
-erts_msg_distext2heap(Process *pp,
- ErtsProcLocks *plcksp,
- ErlHeapFragment **bpp,
- Eterm *tokenp,
- ErtsDistExternal *dist_extp)
+ErtsMessage *
+erts_realloc_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_size)
{
- Eterm msg;
- Uint tok_sz = 0;
- Eterm *hp = NULL;
- ErtsHeapFactory factory;
- Sint sz;
-
- *bpp = NULL;
- sz = erts_decode_dist_ext_size(dist_extp);
- if (sz < 0)
- goto decode_error;
- if (is_not_nil(*tokenp)) {
- ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp);
- tok_sz = heap_frag->used_size;
- sz += tok_sz;
- }
- if (pp) {
- ErlOffHeap *ohp;
- hp = erts_alloc_message_heap(sz, bpp, &ohp, pp, plcksp);
- }
- else {
- *bpp = new_message_buffer(sz);
- hp = (*bpp)->mem;
- }
- erts_factory_message_init(&factory, pp, hp, *bpp);
- msg = erts_decode_dist_ext(&factory, dist_extp);
- if (is_non_value(msg))
- goto decode_error;
- if (is_not_nil(*tokenp)) {
- ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp);
- hp = erts_produce_heap(&factory, tok_sz, 0);
- *tokenp = copy_struct(*tokenp, tok_sz, &hp, factory.off_heap);
- erts_cleanup_offheap(&heap_frag->off_heap);
+ ErtsMessage *nmp = erts_realloc(ERTS_ALC_T_MSG, mp,
+ sizeof(ErtsMessage) + (sz - 1)*sizeof(Eterm));
+ if (nmp != mp) {
+ Eterm *sp = &mp->hfrag.mem[0];
+ Eterm *ep = sp + sz;
+ Sint offs = &nmp->hfrag.mem[0] - sp;
+ erts_offset_off_heap(&nmp->hfrag.off_heap, offs, sp, ep);
+ erts_offset_heap(&nmp->hfrag.mem[0], sz, offs, sp, ep);
+ if (brefs && brefs_size)
+ erts_offset_heap_ptr(brefs, brefs_size, offs, sp, ep);
}
- erts_free_dist_ext_copy(dist_extp);
- erts_factory_close(&factory);
- return msg;
- decode_error:
- if (is_not_nil(*tokenp)) {
- ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp);
- erts_cleanup_offheap(&heap_frag->off_heap);
- }
- erts_free_dist_ext_copy(dist_extp);
- *bpp = NULL;
- return THE_NON_VALUE;
- }
+ nmp->hfrag.used_size = sz;
+ nmp->hfrag.alloc_size = sz;
+
+ return nmp;
+}
void
erts_queue_dist_message(Process *rcvr,
@@ -287,7 +255,7 @@ erts_queue_dist_message(Process *rcvr,
ErtsDistExternal *dist_ext,
Eterm token)
{
- ErlMessage* mp;
+ ErtsMessage* mp;
#ifdef USE_VM_PROBES
Sint tok_label = 0;
Sint tok_lastcnt = 0;
@@ -299,7 +267,17 @@ erts_queue_dist_message(Process *rcvr,
ERTS_SMP_LC_ASSERT(*rcvr_locks == erts_proc_lc_my_proc_locks(rcvr));
- mp = message_alloc();
+ mp = erts_alloc_message(0, NULL);
+ mp->data.dist_ext = dist_ext;
+
+ ERL_MESSAGE_TERM(mp) = THE_NON_VALUE;
+#ifdef USE_VM_PROBES
+ ERL_MESSAGE_DT_UTAG(mp) = NIL;
+ if (token == am_have_dt_utag)
+ ERL_MESSAGE_TOKEN(mp) = NIL;
+ else
+#endif
+ ERL_MESSAGE_TOKEN(mp) = token;
#ifdef ERTS_SMP
if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) {
@@ -318,64 +296,46 @@ erts_queue_dist_message(Process *rcvr,
if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
/* Drop message if receiver is exiting or has a pending exit ... */
- if (is_not_nil(token)) {
- ErlHeapFragment *heap_frag;
- heap_frag = erts_dist_ext_trailer(mp->data.dist_ext);
- erts_cleanup_offheap(&heap_frag->off_heap);
- }
- erts_free_dist_ext_copy(dist_ext);
- message_free(mp);
+ erts_cleanup_messages(mp);
}
else
#endif
if (IS_TRACED_FL(rcvr, F_TRACE_RECEIVE)) {
/* Ahh... need to decode it in order to trace it... */
- ErlHeapFragment *mbuf;
- Eterm msg;
if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
- message_free(mp);
- msg = erts_msg_distext2heap(rcvr, rcvr_locks, &mbuf, &token, dist_ext);
- if (is_value(msg))
+ if (!erts_decode_dist_message(rcvr, *rcvr_locks, mp, 0))
+ erts_free_message(mp);
+ else {
+ Eterm msg = ERL_MESSAGE_TERM(mp);
+ token = ERL_MESSAGE_TOKEN(mp);
#ifdef USE_VM_PROBES
- if (DTRACE_ENABLED(message_queued)) {
- DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
-
- dtrace_proc_str(rcvr, receiver_name);
- if (token != NIL && token != am_have_dt_utag) {
- tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
- tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
- tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
- }
- DTRACE6(message_queued,
- receiver_name, size_object(msg), rcvr->msg.len,
- tok_label, tok_lastcnt, tok_serial);
- }
+ if (DTRACE_ENABLED(message_queued)) {
+ DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
+
+ dtrace_proc_str(rcvr, receiver_name);
+ if (have_seqtrace(token)) {
+ tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
+ tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
+ tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
+ }
+ DTRACE6(message_queued,
+ receiver_name, size_object(msg), rcvr->msg.len,
+ tok_label, tok_lastcnt, tok_serial);
+ }
#endif
- erts_queue_message(rcvr, rcvr_locks, mbuf, msg, token);
+ erts_queue_message(rcvr, rcvr_locks, mp, msg, token);
+ }
}
else {
/* Enqueue message on external format */
- ERL_MESSAGE_TERM(mp) = THE_NON_VALUE;
-#ifdef USE_VM_PROBES
- ERL_MESSAGE_DT_UTAG(mp) = NIL;
- if (token == am_have_dt_utag) {
- ERL_MESSAGE_TOKEN(mp) = NIL;
- } else {
-#endif
- ERL_MESSAGE_TOKEN(mp) = token;
-#ifdef USE_VM_PROBES
- }
-#endif
- mp->next = NULL;
-
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(message_queued)) {
DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
dtrace_proc_str(rcvr, receiver_name);
- if (token != NIL && token != am_have_dt_utag) {
+ if (have_seqtrace(token)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
@@ -388,7 +348,7 @@ erts_queue_dist_message(Process *rcvr,
tok_label, tok_lastcnt, tok_serial);
}
#endif
- mp->data.dist_ext = dist_ext;
+
LINK_MESSAGE(rcvr, mp);
if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
@@ -408,9 +368,9 @@ erts_queue_dist_message(Process *rcvr,
static Sint
queue_message(Process *c_p,
Process* receiver,
- ErtsProcLocks *receiver_locks,
erts_aint32_t *receiver_state,
- ErlHeapFragment* bp,
+ ErtsProcLocks *receiver_locks,
+ ErtsMessage* mp,
Eterm message,
Eterm seq_trace_token
#ifdef USE_VM_PROBES
@@ -419,31 +379,24 @@ queue_message(Process *c_p,
)
{
Sint res;
- ErlMessage* mp;
int locked_msgq = 0;
- erts_aint_t state;
-
-#ifndef ERTS_SMP
- ASSERT(bp != NULL || receiver->mbuf == NULL);
-#endif
+ erts_aint32_t state;
ERTS_SMP_LC_ASSERT(*receiver_locks == erts_proc_lc_my_proc_locks(receiver));
- mp = message_alloc();
-
- if (receiver_state)
- state = *receiver_state;
- else
- state = erts_smp_atomic32_read_acqb(&receiver->state);
-
#ifdef ERTS_SMP
- if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))
- goto exiting;
-
if (!(*receiver_locks & ERTS_PROC_LOCK_MSGQ)) {
if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) {
ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ;
+
+ if (receiver_state)
+ state = *receiver_state;
+ else
+ state = erts_smp_atomic32_read_nob(&receiver->state);
+ if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))
+ goto exiting;
+
if (*receiver_locks & ERTS_PROC_LOCK_STATUS) {
erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS);
need_locks |= ERTS_PROC_LOCK_STATUS;
@@ -451,13 +404,12 @@ queue_message(Process *c_p,
erts_smp_proc_lock(receiver, need_locks);
}
locked_msgq = 1;
- state = erts_smp_atomic32_read_nob(&receiver->state);
- if (receiver_state)
- *receiver_state = state;
}
#endif
+ state = erts_smp_atomic32_read_nob(&receiver->state);
+
if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) {
#ifdef ERTS_SMP
exiting:
@@ -465,9 +417,7 @@ queue_message(Process *c_p,
/* Drop message if receiver is exiting or has a pending exit... */
if (locked_msgq)
erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
- if (bp)
- free_message_buffer(bp);
- message_free(mp);
+ erts_cleanup_messages(mp);
return 0;
}
@@ -476,13 +426,9 @@ queue_message(Process *c_p,
#ifdef USE_VM_PROBES
ERL_MESSAGE_DT_UTAG(mp) = dt_utag;
#endif
- mp->next = NULL;
- mp->data.heap_frag = bp;
-#ifndef ERTS_SMP
res = receiver->msg.len;
-#else
- res = receiver->msg_inq.len;
+#ifdef ERTS_SMP
if (*receiver_locks & ERTS_PROC_LOCK_MAIN) {
/*
* We move 'in queue' to 'private queue' and place
@@ -492,7 +438,7 @@ queue_message(Process *c_p,
* we don't need to include the 'in queue' in
* the root set when garbage collecting.
*/
- res += receiver->msg.len;
+ res += receiver->msg_inq.len;
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);
LINK_MESSAGE_PRIVQ(receiver, mp);
}
@@ -544,19 +490,19 @@ queue_message(Process *c_p,
void
#ifdef USE_VM_PROBES
erts_queue_message_probe(Process* receiver, ErtsProcLocks *receiver_locks,
- ErlHeapFragment* bp,
+ ErtsMessage* mp,
Eterm message, Eterm seq_trace_token, Eterm dt_utag)
#else
erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks,
- ErlHeapFragment* bp,
+ ErtsMessage* mp,
Eterm message, Eterm seq_trace_token)
#endif
{
queue_message(NULL,
receiver,
- receiver_locks,
NULL,
- bp,
+ receiver_locks,
+ mp,
message,
seq_trace_token
#ifdef USE_VM_PROBES
@@ -566,245 +512,37 @@ erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks,
}
void
-erts_link_mbuf_to_proc(struct process *proc, ErlHeapFragment *bp)
+erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp)
{
- Eterm* htop = HEAP_TOP(proc);
-
- link_mbuf_to_proc(proc, bp);
- if (htop < HEAP_LIMIT(proc)) {
- *htop = make_pos_bignum_header(HEAP_LIMIT(proc)-htop-1);
- HEAP_TOP(proc) = HEAP_LIMIT(proc);
- }
-}
-
-/*
- * Moves content of message buffer attached to a message into a heap.
- * The message buffer is deallocated.
- */
-void
-erts_move_msg_mbuf_to_heap(Eterm** hpp, ErlOffHeap* off_heap, ErlMessage *msg)
-{
- struct erl_off_heap_header* oh;
- Eterm term, token, *fhp, *hp;
- Sint offs;
- Uint sz;
- ErlHeapFragment *bp;
-#ifdef USE_VM_PROBES
- Eterm utag;
-#endif
-
-#ifdef HARD_DEBUG
- struct erl_off_heap_header* dbg_oh_start = off_heap->first;
- Eterm dbg_term, dbg_token;
- ErlHeapFragment *dbg_bp;
- Uint *dbg_hp, *dbg_thp_start;
- Uint dbg_term_sz, dbg_token_sz;
-#ifdef USE_VM_PROBES
- Eterm dbg_utag;
- Uint dbg_utag_sz;
-#endif
-#endif
-
- bp = msg->data.heap_frag;
- term = ERL_MESSAGE_TERM(msg);
- token = ERL_MESSAGE_TOKEN(msg);
-#ifdef USE_VM_PROBES
- utag = ERL_MESSAGE_DT_UTAG(msg);
-#endif
- if (!bp) {
-#ifdef USE_VM_PROBES
- ASSERT(is_immed(term) && is_immed(token) && is_immed(utag));
-#else
- ASSERT(is_immed(term) && is_immed(token));
-#endif
- return;
- }
-
-#ifdef HARD_DEBUG
- dbg_term_sz = size_object(term);
- dbg_token_sz = size_object(token);
- dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz);
-#ifdef USE_VM_PROBES
- dbg_utag_sz = size_object(utag);
- dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz + dbg_utag_sz );
-#endif
- /*ASSERT(dbg_term_sz + dbg_token_sz == erts_msg_used_frag_sz(msg));
- Copied size may be smaller due to removed SubBins's or garbage.
- Copied size may be larger due to duplicated shared terms.
- */
- dbg_hp = dbg_bp->mem;
- dbg_term = copy_struct(term, dbg_term_sz, &dbg_hp, &dbg_bp->off_heap);
- dbg_token = copy_struct(token, dbg_token_sz, &dbg_hp, &dbg_bp->off_heap);
-#ifdef USE_VM_PROBES
- dbg_utag = copy_struct(utag, dbg_utag_sz, &dbg_hp, &dbg_bp->off_heap);
-#endif
- dbg_thp_start = *hpp;
-#endif
-
- if (bp->next != NULL) {
- move_multi_frags(hpp, off_heap, bp, msg->m,
-#ifdef USE_VM_PROBES
- 3
-#else
- 2
-#endif
- );
- goto copy_done;
- }
-
- OH_OVERHEAD(off_heap, bp->off_heap.overhead);
- sz = bp->used_size;
-
- ASSERT(is_immed(term) || in_heapfrag(ptr_val(term),bp));
- ASSERT(is_immed(token) || in_heapfrag(ptr_val(token),bp));
-
- fhp = bp->mem;
- hp = *hpp;
- offs = hp - fhp;
-
- oh = NULL;
- while (sz--) {
- Uint cpy_sz;
- Eterm val = *fhp++;
-
- switch (primary_tag(val)) {
- case TAG_PRIMARY_IMMED1:
- *hp++ = val;
- break;
- case TAG_PRIMARY_LIST:
- case TAG_PRIMARY_BOXED:
- ASSERT(in_heapfrag(ptr_val(val), bp));
- *hp++ = offset_ptr(val, offs);
- break;
- case TAG_PRIMARY_HEADER:
- *hp++ = val;
- switch (val & _HEADER_SUBTAG_MASK) {
- case ARITYVAL_SUBTAG:
- break;
- case REFC_BINARY_SUBTAG:
- case FUN_SUBTAG:
- case EXTERNAL_PID_SUBTAG:
- case EXTERNAL_PORT_SUBTAG:
- case EXTERNAL_REF_SUBTAG:
- oh = (struct erl_off_heap_header*) (hp-1);
- cpy_sz = thing_arityval(val);
- goto cpy_words;
- default:
- cpy_sz = header_arity(val);
-
- cpy_words:
- ASSERT(sz >= cpy_sz);
- sz -= cpy_sz;
- while (cpy_sz >= 8) {
- cpy_sz -= 8;
- *hp++ = *fhp++;
- *hp++ = *fhp++;
- *hp++ = *fhp++;
- *hp++ = *fhp++;
- *hp++ = *fhp++;
- *hp++ = *fhp++;
- *hp++ = *fhp++;
- *hp++ = *fhp++;
- }
- switch (cpy_sz) {
- case 7: *hp++ = *fhp++;
- case 6: *hp++ = *fhp++;
- case 5: *hp++ = *fhp++;
- case 4: *hp++ = *fhp++;
- case 3: *hp++ = *fhp++;
- case 2: *hp++ = *fhp++;
- case 1: *hp++ = *fhp++;
- default: break;
+ if (first_bp) {
+ ErlHeapFragment *bp = first_bp;
+
+ while (1) {
+ /* Move any off_heap's into the process */
+ if (bp->off_heap.first != NULL) {
+ struct erl_off_heap_header** next_p = &bp->off_heap.first;
+ while (*next_p != NULL) {
+ next_p = &((*next_p)->next);
}
- if (oh) {
- /* Add to offheap list */
- oh->next = off_heap->first;
- off_heap->first = oh;
- ASSERT(*hpp <= (Eterm*)oh);
- ASSERT(hp > (Eterm*)oh);
- oh = NULL;
- }
- break;
+ *next_p = MSO(proc).first;
+ MSO(proc).first = bp->off_heap.first;
+ bp->off_heap.first = NULL;
+ OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead);
}
- break;
+ MBUF_SIZE(proc) += bp->used_size;
+ if (!bp->next)
+ break;
+ bp = bp->next;
}
- }
-
- ASSERT(bp->used_size == hp - *hpp);
- *hpp = hp;
-
- if (is_not_immed(token)) {
- ASSERT(in_heapfrag(ptr_val(token), bp));
- ERL_MESSAGE_TOKEN(msg) = offset_ptr(token, offs);
-#ifdef HARD_DEBUG
- ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TOKEN(msg)));
- ASSERT(hp > ptr_val(ERL_MESSAGE_TOKEN(msg)));
-#endif
- }
-
- if (is_not_immed(term)) {
- ASSERT(in_heapfrag(ptr_val(term),bp));
- ERL_MESSAGE_TERM(msg) = offset_ptr(term, offs);
-#ifdef HARD_DEBUG
- ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TERM(msg)));
- ASSERT(hp > ptr_val(ERL_MESSAGE_TERM(msg)));
-#endif
- }
-#ifdef USE_VM_PROBES
- if (is_not_immed(utag)) {
- ASSERT(in_heapfrag(ptr_val(utag), bp));
- ERL_MESSAGE_DT_UTAG(msg) = offset_ptr(utag, offs);
-#ifdef HARD_DEBUG
- ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_DT_UTAG(msg)));
- ASSERT(hp > ptr_val(ERL_MESSAGE_DT_UTAG(msg)));
-#endif
- }
-#endif
-
-copy_done:
-#ifdef HARD_DEBUG
- {
- int i, j;
- ErlHeapFragment* frag;
- {
- struct erl_off_heap_header* dbg_oh = off_heap->first;
- i = j = 0;
- while (dbg_oh != dbg_oh_start) {
- dbg_oh = dbg_oh->next;
- i++;
- }
- for (frag=bp; frag; frag=frag->next) {
- dbg_oh = frag->off_heap.first;
- while (dbg_oh) {
- dbg_oh = dbg_oh->next;
- j++;
- }
- }
- ASSERT(i == j);
- }
+ /* Link the message buffer */
+ bp->next = MBUF(proc);
+ MBUF(proc) = first_bp;
}
-#endif
-
-
- bp->off_heap.first = NULL;
- free_message_buffer(bp);
- msg->data.heap_frag = NULL;
-
-#ifdef HARD_DEBUG
- ASSERT(eq(ERL_MESSAGE_TERM(msg), dbg_term));
- ASSERT(eq(ERL_MESSAGE_TOKEN(msg), dbg_token));
-#ifdef USE_VM_PROBES
- ASSERT(eq(ERL_MESSAGE_DT_UTAG(msg), dbg_utag));
-#endif
- free_message_buffer(dbg_bp);
-#endif
-
}
-
Uint
-erts_msg_attached_data_size_aux(ErlMessage *msg)
+erts_msg_attached_data_size_aux(ErtsMessage *msg)
{
Sint sz;
ASSERT(is_non_value(ERL_MESSAGE_TERM(msg)));
@@ -833,29 +571,72 @@ erts_msg_attached_data_size_aux(ErlMessage *msg)
return sz;
}
-void
-erts_move_msg_attached_data_to_heap(ErtsHeapFactory* factory,
- ErlMessage *msg)
+ErtsMessage *
+erts_try_alloc_message_on_heap(Process *pp,
+ erts_aint32_t *psp,
+ ErtsProcLocks *plp,
+ Uint sz,
+ Eterm **hpp,
+ ErlOffHeap **ohpp,
+ int *on_heap_p)
{
- if (is_value(ERL_MESSAGE_TERM(msg)))
- erts_move_msg_mbuf_to_heap(&factory->hp, factory->off_heap, msg);
- else if (msg->data.dist_ext) {
- ASSERT(msg->data.dist_ext->heap_size >= 0);
- if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) {
- ErlHeapFragment *heap_frag;
- heap_frag = erts_dist_ext_trailer(msg->data.dist_ext);
- ERL_MESSAGE_TOKEN(msg) = copy_struct(ERL_MESSAGE_TOKEN(msg),
- heap_frag->used_size,
- &factory->hp,
- factory->off_heap);
- erts_cleanup_offheap(&heap_frag->off_heap);
+#ifdef ERTS_SMP
+ int locked_main = 0;
+#endif
+ ErtsMessage *mp;
+
+ ASSERT(!(*psp & ERTS_PSFLG_OFF_HEAP_MSGQ));
+
+ if (
+#if defined(ERTS_SMP)
+ *plp & ERTS_PROC_LOCK_MAIN
+#else
+ 1
+#endif
+ ) {
+#ifdef ERTS_SMP
+ try_on_heap:
+#endif
+ if ((*psp & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))
+ || (pp->flags & F_DISABLE_GC)
+ || HEAP_LIMIT(pp) - HEAP_TOP(pp) <= sz) {
+ /*
+ * The heap is either potentially in an inconsistent
+ * state, or not large enough.
+ */
+#ifdef ERTS_SMP
+ if (locked_main) {
+ *plp &= ~ERTS_PROC_LOCK_MAIN;
+ erts_smp_proc_unlock(pp, ERTS_PROC_LOCK_MAIN);
+ }
+#endif
+ goto in_message_fragment;
}
- ERL_MESSAGE_TERM(msg) = erts_decode_dist_ext(factory,
- msg->data.dist_ext);
- erts_free_dist_ext_copy(msg->data.dist_ext);
- msg->data.dist_ext = NULL;
+
+ *hpp = HEAP_TOP(pp);
+ HEAP_TOP(pp) = *hpp + sz;
+ *ohpp = &MSO(pp);
+ mp = erts_alloc_message(0, NULL);
+ mp->data.attached = NULL;
+ *on_heap_p = !0;
}
- /* else: bad external detected when calculating size */
+#ifdef ERTS_SMP
+ else if (erts_smp_proc_trylock(pp, ERTS_PROC_LOCK_MAIN) == 0) {
+ locked_main = 1;
+ *psp = erts_smp_atomic32_read_nob(&pp->state);
+ *plp |= ERTS_PROC_LOCK_MAIN;
+ goto try_on_heap;
+ }
+#endif
+ else {
+ in_message_fragment:
+
+ mp = erts_alloc_message(sz, hpp);
+ *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap;
+ *on_heap_p = 0;
+ }
+
+ return mp;
}
/*
@@ -870,7 +651,8 @@ erts_send_message(Process* sender,
unsigned flags)
{
Uint msize;
- ErlHeapFragment* bp = NULL;
+ ErtsMessage* mp;
+ ErlOffHeap *ohp;
Eterm token = NIL;
Sint res = 0;
#ifdef USE_VM_PROBES
@@ -879,67 +661,96 @@ erts_send_message(Process* sender,
Sint tok_label = 0;
Sint tok_lastcnt = 0;
Sint tok_serial = 0;
+ Eterm utag = NIL;
+#endif
+ erts_aint32_t receiver_state;
+#ifdef SHCOPY_SEND
+ erts_shcopy_t info;
#endif
BM_STOP_TIMER(system);
BM_MESSAGE(message,sender,receiver);
BM_START_TIMER(send);
- #ifdef USE_VM_PROBES
+#ifdef USE_VM_PROBES
*sender_name = *receiver_name = '\0';
- if (DTRACE_ENABLED(message_send)) {
+ if (DTRACE_ENABLED(message_send)) {
erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
"%T", sender->common.id);
erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)),
"%T", receiver->common.id);
}
#endif
+
+ receiver_state = erts_smp_atomic32_read_nob(&receiver->state);
+
if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) {
Eterm* hp;
Eterm stoken = SEQ_TRACE_TOKEN(sender);
Uint seq_trace_size = 0;
#ifdef USE_VM_PROBES
Uint dt_utag_size = 0;
- Eterm utag = NIL;
-#endif
-
- BM_SWAP_TIMER(send,size);
- msize = size_object(message);
- BM_SWAP_TIMER(size,send);
-
-#ifdef USE_VM_PROBES
- if (stoken != am_have_dt_utag) {
#endif
-
+ BM_SWAP_TIMER(send,size);
+
+ /* SHCOPY corrupts the heap between
+ * copy_shared_calculate, and
+ * copy_shared_perform. (it inserts move_markers like the gc).
+ * Make sure we don't use the heap between those instances.
+ */
+ if (have_seqtrace(stoken)) {
seq_trace_update_send(sender);
seq_trace_output(stoken, message, SEQ_TRACE_SEND,
receiver->common.id, sender);
seq_trace_size = 6; /* TUPLE5 */
-#ifdef USE_VM_PROBES
- }
- if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
- dt_utag_size = size_object(DT_UTAG(sender));
- } else if (stoken == am_have_dt_utag ) {
- stoken = NIL;
}
+#ifdef USE_VM_PROBES
+ if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
+ dt_utag_size = size_object(DT_UTAG(sender));
+ } else if (stoken == am_have_dt_utag ) {
+ stoken = NIL;
+ }
#endif
- bp = new_message_buffer(msize + seq_trace_size
+#ifdef SHCOPY_SEND
+ INITIALIZE_SHCOPY(info);
+ msize = copy_shared_calculate(message, &info);
+#else
+ msize = size_object(message);
+#endif
+ BM_SWAP_TIMER(size,send);
+
+ mp = erts_alloc_message_heap_state(receiver,
+ &receiver_state,
+ receiver_locks,
+ (msize
#ifdef USE_VM_PROBES
- + dt_utag_size
+ + dt_utag_size
#endif
- );
- hp = bp->mem;
+ + seq_trace_size),
+ &hp,
+ &ohp);
BM_SWAP_TIMER(send,copy);
- token = copy_struct(stoken,
- seq_trace_size,
- &hp,
- &bp->off_heap);
- message = copy_struct(message, msize, &hp, &bp->off_heap);
+#ifdef SHCOPY_SEND
+ if (is_not_immed(message))
+ message = copy_shared_perform(message, msize, &info, &hp, ohp);
+ DESTROY_SHCOPY(info);
+#else
+ if (is_not_immed(message))
+ message = copy_struct(message, msize, &hp, ohp);
+#endif
+ if (is_immed(stoken))
+ token = stoken;
+ else
+ token = copy_struct(stoken, seq_trace_size, &hp, ohp);
+
#ifdef USE_VM_PROBES
if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
- utag = copy_struct(DT_UTAG(sender), dt_utag_size, &hp, &bp->off_heap);
+ if (is_immed(DT_UTAG(sender)))
+ utag = DT_UTAG(sender);
+ else
+ utag = copy_struct(DT_UTAG(sender), dt_utag_size, ohp);
#ifdef DTRACE_TAG_HARDDEBUG
erts_fprintf(stderr,
"Dtrace -> (%T) Spreading tag (%T) with "
@@ -952,7 +763,7 @@ erts_send_message(Process* sender,
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(message_send)) {
- if (stoken != NIL && stoken != am_have_dt_utag) {
+ if (have_seqtrace(stoken)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(stoken));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(stoken));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(stoken));
@@ -961,103 +772,63 @@ erts_send_message(Process* sender,
msize, tok_label, tok_lastcnt, tok_serial);
}
#endif
- res = queue_message(NULL,
- receiver,
- receiver_locks,
- NULL,
- bp,
- message,
- token
-#ifdef USE_VM_PROBES
- , utag
-#endif
- );
- BM_SWAP_TIMER(send,system);
- } else if (sender == receiver) {
- /* Drop message if receiver has a pending exit ... */
-#ifdef ERTS_SMP
- ErtsProcLocks need_locks = (~(*receiver_locks)
- & (ERTS_PROC_LOCK_MSGQ
- | ERTS_PROC_LOCK_STATUS));
- if (need_locks) {
- *receiver_locks |= need_locks;
- if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) {
- if (need_locks == ERTS_PROC_LOCK_MSGQ) {
- erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS);
- need_locks = ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS;
- }
- erts_smp_proc_lock(receiver, need_locks);
- }
+ } else {
+ Eterm *hp;
+
+ if (receiver == sender && !(receiver_state & ERTS_PSFLG_OFF_HEAP_MSGQ)) {
+ mp = erts_alloc_message(0, NULL);
+ msize = 0;
}
- if (!ERTS_PROC_PENDING_EXIT(receiver))
+ else {
+ BM_SWAP_TIMER(send,size);
+#ifdef SHCOPY_SEND
+ INITIALIZE_SHCOPY(info);
+ msize = copy_shared_calculate(message, &info);
+#else
+ msize = size_object(message);
#endif
- {
- ErlMessage* mp = message_alloc();
-
- DTRACE6(message_send, sender_name, receiver_name,
- size_object(message), tok_label, tok_lastcnt, tok_serial);
- mp->data.attached = NULL;
- ERL_MESSAGE_TERM(mp) = message;
- ERL_MESSAGE_TOKEN(mp) = NIL;
-#ifdef USE_VM_PROBES
- ERL_MESSAGE_DT_UTAG(mp) = NIL;
+ BM_SWAP_TIMER(size,send);
+
+ mp = erts_alloc_message_heap_state(receiver,
+ &receiver_state,
+ receiver_locks,
+ msize,
+ &hp,
+ &ohp);
+ BM_SWAP_TIMER(send,copy);
+#ifdef SHCOPY_SEND
+ if (is_not_immed(message))
+ message = copy_shared_perform(message, msize, &info, &hp, ohp);
+ DESTROY_SHCOPY(info);
+#else
+ if (is_not_immed(message))
+ message = copy_struct(message, msize, &hp, ohp);
#endif
- mp->next = NULL;
- /*
- * We move 'in queue' to 'private queue' and place
- * message at the end of 'private queue' in order
- * to ensure that the 'in queue' doesn't contain
- * references into the heap. By ensuring this,
- * we don't need to include the 'in queue' in
- * the root set when garbage collecting.
- */
-
- ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);
- LINK_MESSAGE_PRIVQ(receiver, mp);
-
- res = receiver->msg.len;
-
- if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
- trace_receive(receiver, message);
- }
+ BM_MESSAGE_COPIED(msz);
+ BM_SWAP_TIMER(copy,send);
}
- BM_SWAP_TIMER(send,system);
- } else {
- ErlOffHeap *ohp;
- Eterm *hp;
- erts_aint32_t state;
-
- BM_SWAP_TIMER(send,size);
- msize = size_object(message);
- BM_SWAP_TIMER(size,send);
- hp = erts_alloc_message_heap_state(msize,
- &bp,
- &ohp,
- receiver,
- receiver_locks,
- &state);
- BM_SWAP_TIMER(send,copy);
- message = copy_struct(message, msize, &hp, ohp);
- BM_MESSAGE_COPIED(msz);
- BM_SWAP_TIMER(copy,send);
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
- res = queue_message(sender,
- receiver,
- receiver_locks,
- &state,
- bp,
- message,
- token
+ }
+
+ res = queue_message(sender,
+ receiver,
+ &receiver_state,
+ receiver_locks,
+ mp,
+ message,
+ token
#ifdef USE_VM_PROBES
- , NIL
+ , utag
#endif
- );
- BM_SWAP_TIMER(send,system);
- }
- return res;
+ );
+
+ BM_SWAP_TIMER(send,system);
+
+ return res;
}
+
/*
* This function delivers an EXIT message to a process
* which is trapping EXITs.
@@ -1075,46 +846,511 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp,
Uint sz_from;
Eterm* hp;
Eterm temptoken;
- ErlHeapFragment* bp = NULL;
-
- if (token != NIL
-#ifdef USE_VM_PROBES
- && token != am_have_dt_utag
+ ErtsMessage* mp;
+ ErlOffHeap *ohp;
+#ifdef SHCOPY_SEND
+ erts_shcopy_t info;
#endif
- ) {
+ if (have_seqtrace(token)) {
ASSERT(is_tuple(token));
- sz_reason = size_object(reason);
sz_token = size_object(token);
sz_from = size_object(from);
- bp = new_message_buffer(sz_reason + sz_from + sz_token + 4);
- hp = bp->mem;
- mess = copy_struct(reason, sz_reason, &hp, &bp->off_heap);
- from_copy = copy_struct(from, sz_from, &hp, &bp->off_heap);
+#ifdef SHCOPY_SEND
+ INITIALIZE_SHCOPY(info);
+ sz_reason = copy_shared_calculate(reason, &info);
+#else
+ sz_reason = size_object(reason);
+#endif
+ mp = erts_alloc_message_heap(to, to_locksp,
+ sz_reason + sz_from + sz_token + 4,
+ &hp, &ohp);
+#ifdef SHCOPY_SEND
+ mess = copy_shared_perform(reason, sz_reason, &info, &hp, ohp);
+ DESTROY_SHCOPY(info);
+#else
+ mess = copy_struct(reason, sz_reason, &hp, ohp);
+#endif
+ from_copy = copy_struct(from, sz_from, &hp, ohp);
save = TUPLE3(hp, am_EXIT, from_copy, mess);
hp += 4;
/* the trace token must in this case be updated by the caller */
seq_trace_output(token, save, SEQ_TRACE_SEND, to->common.id, NULL);
- temptoken = copy_struct(token, sz_token, &hp, &bp->off_heap);
- erts_queue_message(to, to_locksp, bp, save, temptoken);
+ temptoken = copy_struct(token, sz_token, &hp, ohp);
+ erts_queue_message(to, to_locksp, mp, save, temptoken);
} else {
- ErlOffHeap *ohp;
- sz_reason = size_object(reason);
sz_from = IS_CONST(from) ? 0 : size_object(from);
+#ifdef SHCOPY_SEND
+ INITIALIZE_SHCOPY(info);
+ sz_reason = copy_shared_calculate(reason, &info);
+#else
+ sz_reason = size_object(reason);
+#endif
+ mp = erts_alloc_message_heap(to, to_locksp,
+ sz_reason+sz_from+4, &hp, &ohp);
- hp = erts_alloc_message_heap(sz_reason+sz_from+4,
- &bp,
- &ohp,
- to,
- to_locksp);
-
+#ifdef SHCOPY_SEND
+ mess = copy_shared_perform(reason, sz_reason, &info, &hp, ohp);
+ DESTROY_SHCOPY(info);
+#else
mess = copy_struct(reason, sz_reason, &hp, ohp);
+#endif
from_copy = (IS_CONST(from)
? from
: copy_struct(from, sz_from, &hp, ohp));
save = TUPLE3(hp, am_EXIT, from_copy, mess);
- erts_queue_message(to, to_locksp, bp, save, NIL);
+ erts_queue_message(to, to_locksp, mp, save, NIL);
+ }
+}
+
+void erts_save_message_in_proc(Process *p, ErtsMessage *msgp)
+{
+ ErlHeapFragment *hfp;
+
+ if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG)
+ hfp = &msgp->hfrag;
+ else if (msgp->data.attached) {
+ hfp = msgp->data.heap_frag;
+ }
+ else {
+ erts_free_message(msgp);
+ return; /* Nothing to save */
+ }
+
+ while (1) {
+ struct erl_off_heap_header *ohhp = hfp->off_heap.first;
+ if (ohhp) {
+ for ( ; ohhp->next; ohhp = ohhp->next)
+ ;
+ ohhp->next = p->off_heap.first;
+ p->off_heap.first = hfp->off_heap.first;
+ hfp->off_heap.first = NULL;
+ }
+ p->off_heap.overhead += hfp->off_heap.overhead;
+ hfp->off_heap.overhead = 0;
+ p->mbuf_sz += hfp->used_size;
+
+ if (!hfp->next)
+ break;
+ hfp = hfp->next;
+ }
+
+ msgp->next = p->msg_frag;
+ p->msg_frag = msgp;
+}
+
+Sint
+erts_move_messages_off_heap(Process *c_p)
+{
+ int reds = 1;
+ /*
+ * Move all messages off heap. This *only* occurs when the
+ * process had off heap message disabled and just enabled
+ * it...
+ */
+ ErtsMessage *mp;
+
+ reds += c_p->msg.len / 10;
+
+ ASSERT(erts_smp_atomic32_read_nob(&c_p->state)
+ & ERTS_PSFLG_OFF_HEAP_MSGQ);
+ ASSERT(c_p->flags & F_OFF_HEAP_MSGQ_CHNG);
+
+ for (mp = c_p->msg.first; mp; mp = mp->next) {
+ Uint msg_sz, token_sz;
+#ifdef USE_VM_PROBES
+ Uint utag_sz;
+#endif
+ Eterm *hp;
+ ErlHeapFragment *hfrag;
+
+ if (mp->data.attached)
+ continue;
+
+ if (is_immed(ERL_MESSAGE_TERM(mp))
+#ifdef USE_VM_PROBES
+ && is_immed(ERL_MESSAGE_DT_UTAG(mp))
+#endif
+ && is_not_immed(ERL_MESSAGE_TOKEN(mp)))
+ continue;
+
+ /*
+ * The message refers into the heap. Copy the message
+ * from the heap into a heap fragment and attach
+ * it to the message...
+ */
+ msg_sz = size_object(ERL_MESSAGE_TERM(mp));
+#ifdef USE_VM_PROBES
+ utag_sz = size_object(ERL_MESSAGE_DT_UTAG(mp));
+#endif
+ token_sz = size_object(ERL_MESSAGE_TOKEN(mp));
+
+ hfrag = new_message_buffer(msg_sz
+#ifdef USE_VM_PROBES
+ + utag_sz
+#endif
+ + token_sz);
+ hp = hfrag->mem;
+ if (is_not_immed(ERL_MESSAGE_TERM(mp)))
+ ERL_MESSAGE_TERM(mp) = copy_struct(ERL_MESSAGE_TERM(mp),
+ msg_sz, &hp,
+ &hfrag->off_heap);
+ if (is_not_immed(ERL_MESSAGE_TOKEN(mp)))
+ ERL_MESSAGE_TOKEN(mp) = copy_struct(ERL_MESSAGE_TOKEN(mp),
+ token_sz, &hp,
+ &hfrag->off_heap);
+#ifdef USE_VM_PROBES
+ if (is_not_immed(ERL_MESSAGE_DT_UTAG(mp)))
+ ERL_MESSAGE_DT_UTAG(mp) = copy_struct(ERL_MESSAGE_DT_UTAG(mp),
+ utag_sz, &hp,
+ &hfrag->off_heap);
+#endif
+ mp->data.heap_frag = hfrag;
+ reds += 1;
+ }
+
+ return reds;
+}
+
+Sint
+erts_complete_off_heap_message_queue_change(Process *c_p)
+{
+ int reds = 1;
+
+ ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
+ ASSERT(c_p->flags & F_OFF_HEAP_MSGQ_CHNG);
+ ASSERT(erts_smp_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ);
+
+ /*
+ * This job was first initiated when the process changed
+ * "off heap message queue" state from false to true. Since
+ * then ERTS_PSFLG_OFF_HEAP_MSGQ has been set. However, the
+ * state change might have been changed again (multiple times)
+ * since then. Check users last requested state (the flag
+ * F_OFF_HEAP_MSGQ), and make the state consistent with that.
+ */
+
+ if (!(c_p->flags & F_OFF_HEAP_MSGQ))
+ erts_smp_atomic32_read_band_nob(&c_p->state,
+ ~ERTS_PSFLG_OFF_HEAP_MSGQ);
+ else {
+ reds += 2;
+ erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
+ ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p);
+ erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
+ reds += erts_move_messages_off_heap(c_p);
+ }
+ c_p->flags &= ~F_OFF_HEAP_MSGQ_CHNG;
+ return reds;
+}
+
+typedef struct {
+ Eterm pid;
+ ErtsThrPrgrLaterOp lop;
+} ErtsChangeOffHeapMessageQueue;
+
+static void
+change_off_heap_msgq(void *vcohmq)
+{
+ ErtsChangeOffHeapMessageQueue *cohmq;
+ /*
+ * Now we've waited thread progress which ensures that all
+ * messages to the process are enqueued off heap. Schedule
+ * completion of this change as a system task on the process
+ * itself. This in order to avoid lock contention on its
+ * main lock. We will be called in
+ * erts_complete_off_heap_message_queue_change() (above) when
+ * the system task has been selected for execution.
+ */
+ cohmq = (ErtsChangeOffHeapMessageQueue *) vcohmq;
+ erts_schedule_complete_off_heap_message_queue_change(cohmq->pid);
+ erts_free(ERTS_ALC_T_MSGQ_CHNG, vcohmq);
+}
+
+Eterm
+erts_change_off_heap_message_queue_state(Process *c_p, int enable)
+{
+
+#ifdef DEBUG
+ if (c_p->flags & F_OFF_HEAP_MSGQ) {
+ ASSERT(erts_smp_atomic32_read_nob(&c_p->state)
+ & ERTS_PSFLG_OFF_HEAP_MSGQ);
+ }
+ else {
+ if (c_p->flags & F_OFF_HEAP_MSGQ_CHNG) {
+ ASSERT(erts_smp_atomic32_read_nob(&c_p->state)
+ & ERTS_PSFLG_OFF_HEAP_MSGQ);
+ }
+ else {
+ ASSERT(!(erts_smp_atomic32_read_nob(&c_p->state)
+ & ERTS_PSFLG_OFF_HEAP_MSGQ));
+ }
+ }
+#endif
+
+ if (c_p->flags & F_OFF_HEAP_MSGQ) {
+ /* Off heap message queue is enabled */
+
+ if (!enable) {
+ c_p->flags &= ~F_OFF_HEAP_MSGQ;
+ /*
+ * We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ
+ * if a change is ongoing. It will be adjusted when the
+ * change completes...
+ */
+ if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) {
+ /* Safe to clear ERTS_PSFLG_OFF_HEAP_MSGQ... */
+ erts_smp_atomic32_read_band_nob(&c_p->state,
+ ~ERTS_PSFLG_OFF_HEAP_MSGQ);
+ }
+ }
+
+ return am_true; /* Old state */
+ }
+
+ /* Off heap message queue is disabled */
+
+ if (enable) {
+ c_p->flags |= F_OFF_HEAP_MSGQ;
+ /*
+ * We do not have to schedule a change if
+ * we have an ongoing change...
+ */
+ if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) {
+ ErtsChangeOffHeapMessageQueue *cohmq;
+ /*
+ * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait
+ * thread progress before completing the change in
+ * order to ensure that all senders observe that
+ * messages should be passed off heap. When the
+ * change has completed, GC does not need to inspect
+ * the message queue at all.
+ */
+ erts_smp_atomic32_read_bor_nob(&c_p->state,
+ ERTS_PSFLG_OFF_HEAP_MSGQ);
+ c_p->flags |= F_OFF_HEAP_MSGQ_CHNG;
+ cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG,
+ sizeof(ErtsChangeOffHeapMessageQueue));
+ cohmq->pid = c_p->common.id;
+ erts_schedule_thr_prgr_later_op(change_off_heap_msgq,
+ (void *) cohmq,
+ &cohmq->lop);
+ }
+ }
+
+ return am_false; /* Old state */
+}
+
+int
+erts_decode_dist_message(Process *proc, ErtsProcLocks proc_locks,
+ ErtsMessage *msgp, int force_off_heap)
+{
+ ErtsHeapFactory factory;
+ Eterm msg;
+ ErlHeapFragment *bp;
+ Sint need;
+ int decode_in_heap_frag;
+
+ decode_in_heap_frag = (force_off_heap
+ || !(proc_locks & ERTS_PROC_LOCK_MAIN)
+ || (proc->flags & F_OFF_HEAP_MSGQ));
+
+ if (msgp->data.dist_ext->heap_size >= 0)
+ need = msgp->data.dist_ext->heap_size;
+ else {
+ need = erts_decode_dist_ext_size(msgp->data.dist_ext);
+ if (need < 0) {
+ /* bad msg; remove it... */
+ if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
+ bp = erts_dist_ext_trailer(msgp->data.dist_ext);
+ erts_cleanup_offheap(&bp->off_heap);
+ }
+ erts_free_dist_ext_copy(msgp->data.dist_ext);
+ msgp->data.dist_ext = NULL;
+ return 0;
+ }
+
+ msgp->data.dist_ext->heap_size = need;
+ }
+
+ if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
+ bp = erts_dist_ext_trailer(msgp->data.dist_ext);
+ need += bp->used_size;
+ }
+
+ if (decode_in_heap_frag)
+ erts_factory_heap_frag_init(&factory, new_message_buffer(need));
+ else
+ erts_factory_proc_prealloc_init(&factory, proc, need);
+
+ ASSERT(msgp->data.dist_ext->heap_size >= 0);
+ if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
+ ErlHeapFragment *heap_frag;
+ heap_frag = erts_dist_ext_trailer(msgp->data.dist_ext);
+ ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp),
+ heap_frag->used_size,
+ &factory.hp,
+ factory.off_heap);
+ erts_cleanup_offheap(&heap_frag->off_heap);
+ }
+
+ msg = erts_decode_dist_ext(&factory, msgp->data.dist_ext);
+ ERL_MESSAGE_TERM(msgp) = msg;
+ erts_free_dist_ext_copy(msgp->data.dist_ext);
+ msgp->data.attached = NULL;
+
+ if (is_non_value(msg)) {
+ erts_factory_undo(&factory);
+ return 0;
+ }
+
+ erts_factory_trim_and_close(&factory, msgp->m,
+ ERL_MESSAGE_REF_ARRAY_SZ);
+
+ ASSERT(!msgp->data.heap_frag);
+
+ if (decode_in_heap_frag)
+ msgp->data.heap_frag = factory.heap_frags;
+
+ return 1;
+}
+
+/*
+ * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0 will move off heap messages
+ * into the heap of the inspected process if off_heap_message_queue
+ * is false when process_info(_, messages) is called. That is, the
+ * following GC will have more data in the rootset compared to the
+ * scenario when process_info(_, messages) had not been called.
+ *
+ * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS != 0 will keep off heap messages
+ * off heap when process_info(_, messages) is called regardless of
+ * the off_heap_message_queue setting of the process. That is, it
+ * will change the following execution of the process as little as
+ * possible.
+ */
+#define ERTS_INSPECT_MSGQ_KEEP_OH_MSGS 1
+
+Uint
+erts_prep_msgq_for_inspection(Process *c_p, Process *rp,
+ ErtsProcLocks rp_locks, ErtsMessageInfo *mip)
+{
+ Uint tot_heap_size;
+ ErtsMessage* mp;
+ Sint i;
+ int self_on_heap;
+
+ /*
+ * Prepare the message queue for inspection
+ * by process_info().
+ *
+ *
+ * - Decode all messages on external format
+ * - Remove all corrupt dist messages from queue
+ * - Save pointer to, and heap size need of each
+ * message in the mip array.
+ * - Return total heap size need for all messages
+ * that needs to be copied.
+ *
+ * If ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0:
+ * - In case off heap messages is disabled and
+ * we are inspecting our own queue, move all
+ * off heap data into the heap.
+ */
+
+ self_on_heap = c_p == rp && !(c_p->flags & F_OFF_HEAP_MSGQ);
+
+ tot_heap_size = 0;
+ i = 0;
+ mp = rp->msg.first;
+ while (mp) {
+ Eterm msg = ERL_MESSAGE_TERM(mp);
+
+ mip[i].size = 0;
+
+ if (is_non_value(msg)) {
+ /* Dist message on external format; decode it... */
+ if (mp->data.attached)
+ erts_decode_dist_message(rp, rp_locks, mp,
+ ERTS_INSPECT_MSGQ_KEEP_OH_MSGS);
+
+ msg = ERL_MESSAGE_TERM(mp);
+
+ if (is_non_value(msg)) {
+ ErtsMessage **mpp;
+ ErtsMessage *bad_mp = mp;
+ /*
+ * Bad distribution message; remove
+ * it from the queue...
+ */
+ ASSERT(!mp->data.attached);
+
+ mpp = i == 0 ? &rp->msg.first : &mip[i-1].msgp->next;
+
+ if (rp->msg.save == &bad_mp->next)
+ rp->msg.save = mpp;
+ if (rp->msg.last == &bad_mp->next)
+ rp->msg.last = mpp;
+ mp = mp->next;
+ *mpp = mp;
+ rp->msg.len--;
+ bad_mp->next = NULL;
+ erts_cleanup_messages(bad_mp);
+ continue;
+ }
+ }
+
+ ASSERT(is_value(msg));
+
+#if ERTS_INSPECT_MSGQ_KEEP_OH_MSGS
+ if (is_not_immed(msg) && (!self_on_heap || mp->data.attached)) {
+ Uint sz = size_object(msg);
+ mip[i].size = sz;
+ tot_heap_size += sz;
+ }
+#else
+ if (self_on_heap) {
+ if (mp->data.attached) {
+ ErtsMessage *tmp = NULL;
+ if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) {
+ erts_link_mbuf_to_proc(rp, mp->data.heap_frag);
+ mp->data.attached = NULL;
+ }
+ else {
+ /*
+ * Need to replace the message reference since
+ * we will get references to the message data
+ * from the heap...
+ */
+ ErtsMessage **mpp;
+ tmp = erts_alloc_message(0, NULL);
+ sys_memcpy((void *) tmp->m, (void *) mp->m,
+ sizeof(Eterm)*ERL_MESSAGE_REF_ARRAY_SZ);
+ mpp = i == 0 ? &rp->msg.first : &mip[i-1].msgp->next;
+ tmp->next = mp->next;
+ if (rp->msg.save == &mp->next)
+ rp->msg.save = &tmp->next;
+ if (rp->msg.last == &mp->next)
+ rp->msg.last = &tmp->next;
+ *mpp = tmp;
+ erts_save_message_in_proc(rp, mp);
+ mp = tmp;
+ }
+ }
+ }
+ else if (is_not_immed(msg)) {
+ Uint sz = size_object(msg);
+ mip[i].size = sz;
+ tot_heap_size += sz;
+ }
+
+#endif
+
+ mip[i].msgp = mp;
+ i++;
+ mp = mp->next;
}
+
+ return tot_heap_size;
}
void erts_factory_proc_init(ErtsHeapFactory* factory,
@@ -1127,47 +1363,138 @@ void erts_factory_proc_prealloc_init(ErtsHeapFactory* factory,
Process* p,
Sint size)
{
+ ErlHeapFragment *bp = p->mbuf;
factory->mode = FACTORY_HALLOC;
factory->p = p;
factory->hp_start = HAlloc(p, size);
factory->hp = factory->hp_start;
factory->hp_end = factory->hp_start + size;
factory->off_heap = &p->off_heap;
+ factory->message = NULL;
factory->off_heap_saved.first = p->off_heap.first;
factory->off_heap_saved.overhead = p->off_heap.overhead;
- factory->heap_frags_saved = p->mbuf;
+ factory->heap_frags_saved = bp;
+ factory->heap_frags_saved_used = bp ? bp->used_size : 0;
factory->heap_frags = NULL; /* not used */
factory->alloc_type = 0; /* not used */
}
-void erts_factory_message_init(ErtsHeapFactory* factory,
- Process* rp,
- Eterm* hp,
- ErlHeapFragment* bp)
+void erts_factory_heap_frag_init(ErtsHeapFactory* factory,
+ ErlHeapFragment* bp)
+{
+ factory->mode = FACTORY_HEAP_FRAGS;
+ factory->p = NULL;
+ factory->hp_start = bp->mem;
+ factory->hp = bp->mem;
+ factory->hp_end = bp->mem + bp->alloc_size;
+ factory->off_heap = &bp->off_heap;
+ factory->message = NULL;
+ factory->heap_frags = bp;
+ factory->heap_frags_saved = NULL;
+ factory->heap_frags_saved_used = 0;
+ factory->alloc_type = ERTS_ALC_T_HEAP_FRAG;
+ ASSERT(!bp->next);
+ factory->off_heap_saved.first = factory->off_heap->first;
+ factory->off_heap_saved.overhead = factory->off_heap->overhead;
+
+ ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end);
+}
+
+
+ErtsMessage *
+erts_factory_message_create(ErtsHeapFactory* factory,
+ Process *proc,
+ ErtsProcLocks *proc_locksp,
+ Uint sz)
{
- if (bp) {
- factory->mode = FACTORY_HEAP_FRAGS;
- factory->p = NULL;
- factory->hp_start = bp->mem;
- factory->hp = hp ? hp : bp->mem;
- factory->hp_end = bp->mem + bp->alloc_size;
- factory->off_heap = &bp->off_heap;
- factory->heap_frags = bp;
- factory->heap_frags_saved = bp;
- factory->alloc_type = ERTS_ALC_T_HEAP_FRAG;
- ASSERT(!bp->next);
+ Eterm *hp;
+ ErlOffHeap *ohp;
+ ErtsMessage *msgp;
+ int on_heap;
+ erts_aint32_t state;
+
+ state = erts_smp_atomic32_read_nob(&proc->state);
+
+ if (state & ERTS_PSFLG_OFF_HEAP_MSGQ) {
+ msgp = erts_alloc_message(sz, &hp);
+ ohp = sz == 0 ? NULL : &msgp->hfrag.off_heap;
+ on_heap = 0;
+ }
+ else {
+ msgp = erts_try_alloc_message_on_heap(proc, &state,
+ proc_locksp,
+ sz, &hp, &ohp,
+ &on_heap);
+ }
+
+ if (on_heap) {
+ ASSERT(*proc_locksp & ERTS_PROC_LOCK_MAIN);
+ ASSERT(ohp == &proc->off_heap);
+ factory->mode = FACTORY_HALLOC;
+ factory->p = proc;
+ factory->heap_frags_saved = proc->mbuf;
+ factory->heap_frags_saved_used = proc->mbuf ? proc->mbuf->used_size : 0;
}
else {
- factory->mode = FACTORY_HALLOC;
- factory->p = rp;
- factory->hp_start = hp;
- factory->hp = hp;
- factory->hp_end = HEAP_TOP(rp);
- factory->off_heap = &rp->off_heap;
- factory->heap_frags_saved = rp->mbuf;
- factory->heap_frags = NULL; /* not used */
- factory->alloc_type = 0; /* not used */
+ factory->mode = FACTORY_MESSAGE;
+ factory->p = NULL;
+ factory->heap_frags_saved = NULL;
+ factory->heap_frags_saved_used = 0;
+
+ if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) {
+ ASSERT(!msgp->hfrag.next);
+ factory->heap_frags = NULL;
+ }
+ else {
+ ASSERT(!msgp->data.heap_frag
+ || !msgp->data.heap_frag->next);
+ factory->heap_frags = msgp->data.heap_frag;
+ }
}
+ factory->hp_start = hp;
+ factory->hp = hp;
+ factory->hp_end = hp + sz;
+ factory->message = msgp;
+ factory->off_heap = ohp;
+ factory->alloc_type = ERTS_ALC_T_HEAP_FRAG;
+ if (ohp) {
+ factory->off_heap_saved.first = ohp->first;
+ factory->off_heap_saved.overhead = ohp->overhead;
+ }
+ else {
+ factory->off_heap_saved.first = NULL;
+ factory->off_heap_saved.overhead = 0;
+ }
+
+ ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end);
+
+ return msgp;
+}
+
+void erts_factory_selfcontained_message_init(ErtsHeapFactory* factory,
+ ErtsMessage *msgp,
+ Eterm *hp)
+{
+ ErlHeapFragment* bp;
+ if (msgp->data.attached == ERTS_MSG_COMBINED_HFRAG) {
+ bp = &msgp->hfrag;
+ factory->heap_frags = NULL;
+ }
+ else {
+ bp = msgp->data.heap_frag;
+ factory->heap_frags = bp;
+ }
+ factory->mode = FACTORY_MESSAGE;
+ factory->p = NULL;
+ factory->hp_start = bp->mem;
+ factory->hp = hp;
+ factory->hp_end = bp->mem + bp->alloc_size;
+ factory->message = msgp;
+ factory->off_heap = &bp->off_heap;
+ factory->heap_frags_saved = NULL;
+ factory->heap_frags_saved_used = 0;
+ factory->alloc_type = ERTS_ALC_T_HEAP_FRAG;
+ ASSERT(!bp->next);
factory->off_heap_saved.first = factory->off_heap->first;
factory->off_heap_saved.overhead = factory->off_heap->overhead;
@@ -1230,8 +1557,16 @@ static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra)
factory->hp_end = factory->hp + need;
return;
- case FACTORY_HEAP_FRAGS:
- bp = factory->heap_frags;
+ case FACTORY_MESSAGE:
+ if (!factory->heap_frags) {
+ ASSERT(factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG);
+ bp = &factory->message->hfrag;
+ }
+ else {
+ /* Fall through */
+ case FACTORY_HEAP_FRAGS:
+ bp = factory->heap_frags;
+ }
if (bp) {
ASSERT(factory->hp > bp->mem);
@@ -1269,8 +1604,23 @@ void erts_factory_close(ErtsHeapFactory* factory)
HRelease(factory->p, factory->hp_end, factory->hp);
break;
- case FACTORY_HEAP_FRAGS:
- bp = factory->heap_frags;
+ case FACTORY_MESSAGE:
+ if (!factory->heap_frags) {
+ if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG)
+ bp = &factory->message->hfrag;
+ else
+ bp = NULL;
+ }
+ else {
+ if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG)
+ factory->message->hfrag.next = factory->heap_frags;
+ else
+ factory->message->data.heap_frag = factory->heap_frags;
+
+ /* Fall through */
+ case FACTORY_HEAP_FRAGS:
+ bp = factory->heap_frags;
+ }
if (bp) {
ASSERT(factory->hp >= bp->mem);
@@ -1291,17 +1641,47 @@ void erts_factory_close(ErtsHeapFactory* factory)
void erts_factory_trim_and_close(ErtsHeapFactory* factory,
Eterm *brefs, Uint brefs_size)
{
- if (factory->mode == FACTORY_HEAP_FRAGS) {
- ErlHeapFragment* bp = factory->heap_frags;
+ ErlHeapFragment *bp;
+
+ switch (factory->mode) {
+ case FACTORY_MESSAGE: {
+ ErtsMessage *mp = factory->message;
+ if (mp->data.attached == ERTS_MSG_COMBINED_HFRAG) {
+ if (!mp->hfrag.next) {
+ Uint sz = factory->hp - factory->hp_start;
+ mp = erts_shrink_message(mp, sz, brefs, brefs_size);
+ factory->message = mp;
+ factory->mode = FACTORY_CLOSED;
+ return;
+ }
+ /*else we don't trim multi fragmented messages for now (off_heap...) */
+ break;
+ }
+ /* Fall through... */
+ }
+ case FACTORY_HEAP_FRAGS:
+ bp = factory->heap_frags;
+ if (!bp)
+ break;
if (bp->next == NULL) {
Uint used_sz = factory->hp - bp->mem;
ASSERT(used_sz <= bp->alloc_size);
- factory->heap_frags = erts_resize_message_buffer(bp, used_sz,
- brefs, brefs_size);
+ if (used_sz > 0)
+ bp = erts_resize_message_buffer(bp, used_sz,
+ brefs, brefs_size);
+ else {
+ free_message_buffer(bp);
+ bp = NULL;
+ }
+ factory->heap_frags = bp;
+ if (factory->mode == FACTORY_MESSAGE)
+ factory->message->data.heap_frag = bp;
factory->mode = FACTORY_CLOSED;
return;
}
- /*else we don't trim multi fragmented messages for now */
+ /*else we don't trim multi fragmented messages for now (off_heap...) */
+ default:
+ break;
}
erts_factory_close(factory);
}
@@ -1349,38 +1729,35 @@ void erts_factory_undo(ErtsHeapFactory* factory)
/* Rollback heap top
*/
- if (factory->heap_frags_saved == NULL) { /* No heap frags when we started */
- ASSERT(factory->hp_start >= HEAP_START(factory->p));
- ASSERT(factory->hp_start <= HEAP_LIMIT(factory->p));
- HEAP_TOP(factory->p) = factory->hp_start;
- }
- else {
+ if (HEAP_START(factory->p) <= factory->hp_start
+ && factory->hp_start <= HEAP_LIMIT(factory->p)) {
+ HEAP_TOP(factory->p) = factory->hp_start;
+ }
+
+ /* Fix last heap frag */
+ if (factory->heap_frags_saved) {
ASSERT(factory->heap_frags_saved == factory->p->mbuf);
- if (factory->hp_start == factory->heap_frags_saved->mem) {
+ if (factory->hp_start != factory->heap_frags_saved->mem)
+ factory->heap_frags_saved->used_size = factory->heap_frags_saved_used;
+ else {
factory->p->mbuf = factory->p->mbuf->next;
ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, factory->heap_frags_saved,
ERTS_HEAP_FRAG_SIZE(factory->heap_frags_saved->alloc_size));
}
- else if (factory->hp_start != factory->hp_end) {
- unsigned remains = factory->hp_start - factory->heap_frags_saved->mem;
- ASSERT(remains > 0 && remains < factory->heap_frags_saved->used_size);
- factory->heap_frags_saved->used_size = remains;
- }
}
}
break;
+ case FACTORY_MESSAGE:
+ if (factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG)
+ factory->message->hfrag.next = factory->heap_frags;
+ else
+ factory->message->data.heap_frag = factory->heap_frags;
+ erts_cleanup_messages(factory->message);
+ break;
case FACTORY_HEAP_FRAGS:
- bp = factory->heap_frags;
- do {
- ErlHeapFragment* next_bp = bp->next;
-
- erts_cleanup_offheap(&bp->off_heap);
- ERTS_HEAP_FREE(factory->alloc_type, (void *) bp,
- ERTS_HEAP_FRAG_SIZE(bp->size));
- bp = next_bp;
- }while (bp != NULL);
+ free_message_buffer(factory->heap_frags);
break;
case FACTORY_CLOSED: break;
@@ -1394,4 +1771,3 @@ void erts_factory_undo(ErtsHeapFactory* factory)
factory->heap_frags = NULL;
#endif
}
-