aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_message.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r--erts/emulator/beam/erl_message.c389
1 files changed, 273 insertions, 116 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index 79739501a8..b3e74e3e6a 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -68,6 +68,7 @@ new_message_buffer(Uint size)
bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP_FRAG,
ERTS_HEAP_FRAG_SIZE(size));
ERTS_INIT_HEAP_FRAG(bp, size, size);
+ VERBOSE(DEBUG_SHCOPY, ("[pid=%T] new message buffer %p\n", erts_get_current_pid(), bp->mem));
return bp;
}
@@ -249,36 +250,6 @@ erts_realloc_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_s
}
void
-erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp)
-{
- if (first_bp) {
- ErlHeapFragment *bp = first_bp;
-
- while (1) {
- /* Move any off_heap's into the process */
- if (bp->off_heap.first != NULL) {
- struct erl_off_heap_header** next_p = &bp->off_heap.first;
- while (*next_p != NULL) {
- next_p = &((*next_p)->next);
- }
- *next_p = MSO(proc).first;
- MSO(proc).first = bp->off_heap.first;
- bp->off_heap.first = NULL;
- OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead);
- }
- MBUF_SIZE(proc) += bp->used_size;
- if (!bp->next)
- break;
- bp = bp->next;
- }
-
- /* Link the message buffer */
- bp->next = MBUF(proc);
- MBUF(proc) = first_bp;
- }
-}
-
-void
erts_queue_dist_message(Process *rcvr,
ErtsProcLocks *rcvr_locks,
ErtsDistExternal *dist_ext,
@@ -343,7 +314,7 @@ erts_queue_dist_message(Process *rcvr,
DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
dtrace_proc_str(rcvr, receiver_name);
- if (token != NIL && token != am_have_dt_utag) {
+ if (have_seqtrace(token)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
@@ -364,7 +335,7 @@ erts_queue_dist_message(Process *rcvr,
DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
dtrace_proc_str(rcvr, receiver_name);
- if (token != NIL && token != am_have_dt_utag) {
+ if (have_seqtrace(token)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
@@ -540,6 +511,36 @@ erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks,
);
}
+void
+erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp)
+{
+ if (first_bp) {
+ ErlHeapFragment *bp = first_bp;
+
+ while (1) {
+ /* Move any off_heap's into the process */
+ if (bp->off_heap.first != NULL) {
+ struct erl_off_heap_header** next_p = &bp->off_heap.first;
+ while (*next_p != NULL) {
+ next_p = &((*next_p)->next);
+ }
+ *next_p = MSO(proc).first;
+ MSO(proc).first = bp->off_heap.first;
+ bp->off_heap.first = NULL;
+ OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead);
+ }
+ MBUF_SIZE(proc) += bp->used_size;
+ if (!bp->next)
+ break;
+ bp = bp->next;
+ }
+
+ /* Link the message buffer */
+ bp->next = MBUF(proc);
+ MBUF(proc) = first_bp;
+ }
+}
+
Uint
erts_msg_attached_data_size_aux(ErtsMessage *msg)
{
@@ -629,9 +630,24 @@ erts_try_alloc_message_on_heap(Process *pp,
#endif
else {
in_message_fragment:
-
- mp = erts_alloc_message(sz, hpp);
- *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap;
+ if (!((*psp) & ERTS_PSFLG_ON_HEAP_MSGQ)) {
+ mp = erts_alloc_message(sz, hpp);
+ *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap;
+ }
+ else {
+ mp = erts_alloc_message(0, NULL);
+ if (!sz) {
+ *hpp = NULL;
+ *ohpp = NULL;
+ }
+ else {
+ ErlHeapFragment *bp;
+ bp = new_message_buffer(sz);
+ *hpp = &bp->mem[0];
+ mp->data.heap_frag = bp;
+ *ohpp = &bp->off_heap;
+ }
+ }
*on_heap_p = 0;
}
@@ -663,11 +679,14 @@ erts_send_message(Process* sender,
Eterm utag = NIL;
#endif
erts_aint32_t receiver_state;
+#ifdef SHCOPY_SEND
+ erts_shcopy_t info;
+#endif
BM_STOP_TIMER(system);
BM_MESSAGE(message,sender,receiver);
BM_START_TIMER(send);
- #ifdef USE_VM_PROBES
+#ifdef USE_VM_PROBES
*sender_name = *receiver_name = '\0';
if (DTRACE_ENABLED(message_send)) {
erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
@@ -686,54 +705,67 @@ erts_send_message(Process* sender,
#ifdef USE_VM_PROBES
Uint dt_utag_size = 0;
#endif
-
- BM_SWAP_TIMER(send,size);
- msize = size_object(message);
- BM_SWAP_TIMER(size,send);
-
-#ifdef USE_VM_PROBES
- if (stoken != am_have_dt_utag) {
-#endif
-
+ BM_SWAP_TIMER(send,size);
+
+ /* SHCOPY corrupts the heap between
+ * copy_shared_calculate, and
+ * copy_shared_perform. (it inserts move_markers like the gc).
+ * Make sure we don't use the heap between those instances.
+ */
+ if (have_seqtrace(stoken)) {
seq_trace_update_send(sender);
seq_trace_output(stoken, message, SEQ_TRACE_SEND,
receiver->common.id, sender);
seq_trace_size = 6; /* TUPLE5 */
-#ifdef USE_VM_PROBES
- }
- if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
- dt_utag_size = size_object(DT_UTAG(sender));
- } else if (stoken == am_have_dt_utag ) {
- stoken = NIL;
}
+#ifdef USE_VM_PROBES
+ if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
+ dt_utag_size = size_object(DT_UTAG(sender));
+ } else if (stoken == am_have_dt_utag ) {
+ stoken = NIL;
+ }
+#endif
+
+#ifdef SHCOPY_SEND
+ INITIALIZE_SHCOPY(info);
+ msize = copy_shared_calculate(message, &info);
+#else
+ msize = size_object(message);
#endif
+ BM_SWAP_TIMER(size,send);
- mp = erts_alloc_message_heap_state(receiver,
- &receiver_state,
- receiver_locks,
- (msize
+ mp = erts_alloc_message_heap_state(receiver,
+ &receiver_state,
+ receiver_locks,
+ (msize
#ifdef USE_VM_PROBES
- + dt_utag_size
+ + dt_utag_size
#endif
- + seq_trace_size),
- &hp,
- &ohp);
+ + seq_trace_size),
+ &hp,
+ &ohp);
BM_SWAP_TIMER(send,copy);
+
+#ifdef SHCOPY_SEND
+ if (is_not_immed(message))
+ message = copy_shared_perform(message, msize, &info, &hp, ohp);
+ DESTROY_SHCOPY(info);
+#else
+ if (is_not_immed(message))
+ message = copy_struct(message, msize, &hp, ohp);
+#endif
if (is_immed(stoken))
token = stoken;
else
token = copy_struct(stoken, seq_trace_size, &hp, ohp);
- if (is_not_immed(message))
- message = copy_struct(message, msize, &hp, ohp);
-
#ifdef USE_VM_PROBES
if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
if (is_immed(DT_UTAG(sender)))
utag = DT_UTAG(sender);
else
- utag = copy_struct(DT_UTAG(sender), dt_utag_size, ohp);
+ utag = copy_struct(DT_UTAG(sender), dt_utag_size, &hp, ohp);
#ifdef DTRACE_TAG_HARDDEBUG
erts_fprintf(stderr,
"Dtrace -> (%T) Spreading tag (%T) with "
@@ -746,7 +778,7 @@ erts_send_message(Process* sender,
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(message_send)) {
- if (stoken != NIL && stoken != am_have_dt_utag) {
+ if (have_seqtrace(stoken)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(stoken));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(stoken));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(stoken));
@@ -764,7 +796,12 @@ erts_send_message(Process* sender,
}
else {
BM_SWAP_TIMER(send,size);
- msize = size_object(message);
+#ifdef SHCOPY_SEND
+ INITIALIZE_SHCOPY(info);
+ msize = copy_shared_calculate(message, &info);
+#else
+ msize = size_object(message);
+#endif
BM_SWAP_TIMER(size,send);
mp = erts_alloc_message_heap_state(receiver,
@@ -774,8 +811,14 @@ erts_send_message(Process* sender,
&hp,
&ohp);
BM_SWAP_TIMER(send,copy);
- if (is_not_immed(message))
- message = copy_struct(message, msize, &hp, ohp);
+#ifdef SHCOPY_SEND
+ if (is_not_immed(message))
+ message = copy_shared_perform(message, msize, &info, &hp, ohp);
+ DESTROY_SHCOPY(info);
+#else
+ if (is_not_immed(message))
+ message = copy_struct(message, msize, &hp, ohp);
+#endif
BM_MESSAGE_COPIED(msz);
BM_SWAP_TIMER(copy,send);
}
@@ -800,6 +843,7 @@ erts_send_message(Process* sender,
return res;
}
+
/*
* This function delivers an EXIT message to a process
* which is trapping EXITs.
@@ -819,21 +863,29 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp,
Eterm temptoken;
ErtsMessage* mp;
ErlOffHeap *ohp;
-
- if (token != NIL
-#ifdef USE_VM_PROBES
- && token != am_have_dt_utag
+#ifdef SHCOPY_SEND
+ erts_shcopy_t info;
#endif
- ) {
+ if (have_seqtrace(token)) {
ASSERT(is_tuple(token));
- sz_reason = size_object(reason);
sz_token = size_object(token);
sz_from = size_object(from);
+#ifdef SHCOPY_SEND
+ INITIALIZE_SHCOPY(info);
+ sz_reason = copy_shared_calculate(reason, &info);
+#else
+ sz_reason = size_object(reason);
+#endif
mp = erts_alloc_message_heap(to, to_locksp,
sz_reason + sz_from + sz_token + 4,
&hp, &ohp);
+#ifdef SHCOPY_SEND
+ mess = copy_shared_perform(reason, sz_reason, &info, &hp, ohp);
+ DESTROY_SHCOPY(info);
+#else
mess = copy_struct(reason, sz_reason, &hp, ohp);
+#endif
from_copy = copy_struct(from, sz_from, &hp, ohp);
save = TUPLE3(hp, am_EXIT, from_copy, mess);
hp += 4;
@@ -842,13 +894,22 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp,
temptoken = copy_struct(token, sz_token, &hp, ohp);
erts_queue_message(to, to_locksp, mp, save, temptoken);
} else {
- sz_reason = size_object(reason);
sz_from = IS_CONST(from) ? 0 : size_object(from);
-
+#ifdef SHCOPY_SEND
+ INITIALIZE_SHCOPY(info);
+ sz_reason = copy_shared_calculate(reason, &info);
+#else
+ sz_reason = size_object(reason);
+#endif
mp = erts_alloc_message_heap(to, to_locksp,
sz_reason+sz_from+4, &hp, &ohp);
+#ifdef SHCOPY_SEND
+ mess = copy_shared_perform(reason, sz_reason, &info, &hp, ohp);
+ DESTROY_SHCOPY(info);
+#else
mess = copy_struct(reason, sz_reason, &hp, ohp);
+#endif
from_copy = (IS_CONST(from)
? from
: copy_struct(from, sz_from, &hp, ohp));
@@ -976,12 +1037,12 @@ erts_complete_off_heap_message_queue_change(Process *c_p)
ASSERT(erts_smp_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ);
/*
- * This job was first initiated when the process changed
- * "off heap message queue" state from false to true. Since
- * then ERTS_PSFLG_OFF_HEAP_MSGQ has been set. However, the
- * state change might have been changed again (multiple times)
- * since then. Check users last requested state (the flag
- * F_OFF_HEAP_MSGQ), and make the state consistent with that.
+ * This job was first initiated when the process changed to off heap
+ * message queue management. Since then ERTS_PSFLG_OFF_HEAP_MSGQ
+ * has been set. However, the management state might have been changed
+ * again (multiple times) since then. Check users last requested state
+ * (the flags F_OFF_HEAP_MSGQ, and F_ON_HEAP_MSGQ), and make the state
+ * consistent with that.
*/
if (!(c_p->flags & F_OFF_HEAP_MSGQ))
@@ -1022,8 +1083,9 @@ change_off_heap_msgq(void *vcohmq)
}
Eterm
-erts_change_off_heap_message_queue_state(Process *c_p, int enable)
+erts_change_message_queue_management(Process *c_p, Eterm new_state)
{
+ Eterm res;
#ifdef DEBUG
if (c_p->flags & F_OFF_HEAP_MSGQ) {
@@ -1042,57 +1104,117 @@ erts_change_off_heap_message_queue_state(Process *c_p, int enable)
}
#endif
- if (c_p->flags & F_OFF_HEAP_MSGQ) {
- /* Off heap message queue is enabled */
+ switch (c_p->flags & (F_OFF_HEAP_MSGQ|F_ON_HEAP_MSGQ)) {
- if (!enable) {
+ case F_OFF_HEAP_MSGQ:
+ res = am_off_heap;
+
+ switch (new_state) {
+ case am_off_heap:
+ break;
+ case am_on_heap:
+ c_p->flags |= F_ON_HEAP_MSGQ;
+ erts_smp_atomic32_read_bor_nob(&c_p->state,
+ ERTS_PSFLG_ON_HEAP_MSGQ);
+ /* fall through */
+ case am_mixed:
c_p->flags &= ~F_OFF_HEAP_MSGQ;
/*
* We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ
- * if a change is ongoing. It will be adjusted when the
- * change completes...
+ * if a off heap change is ongoing. It will be adjusted
+ * when the change completes...
*/
if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) {
/* Safe to clear ERTS_PSFLG_OFF_HEAP_MSGQ... */
erts_smp_atomic32_read_band_nob(&c_p->state,
~ERTS_PSFLG_OFF_HEAP_MSGQ);
}
+ break;
+ default:
+ res = THE_NON_VALUE; /* badarg */
+ break;
}
+ break;
+
+ case F_ON_HEAP_MSGQ:
+ res = am_on_heap;
- return am_true; /* Old state */
+ switch (new_state) {
+ case am_on_heap:
+ break;
+ case am_mixed:
+ c_p->flags &= ~F_ON_HEAP_MSGQ;
+ erts_smp_atomic32_read_band_nob(&c_p->state,
+ ~ERTS_PSFLG_ON_HEAP_MSGQ);
+ break;
+ case am_off_heap:
+ c_p->flags &= ~F_ON_HEAP_MSGQ;
+ erts_smp_atomic32_read_band_nob(&c_p->state,
+ ~ERTS_PSFLG_ON_HEAP_MSGQ);
+ goto change_to_off_heap;
+ default:
+ res = THE_NON_VALUE; /* badarg */
+ break;
+ }
+ break;
+
+ case 0:
+ res = am_mixed;
+
+ switch (new_state) {
+ case am_mixed:
+ break;
+ case am_on_heap:
+ c_p->flags |= F_ON_HEAP_MSGQ;
+ erts_smp_atomic32_read_bor_nob(&c_p->state,
+ ERTS_PSFLG_ON_HEAP_MSGQ);
+ break;
+ case am_off_heap:
+ goto change_to_off_heap;
+ default:
+ res = THE_NON_VALUE; /* badarg */
+ break;
+ }
+ break;
+
+ default:
+ res = am_error;
+ ERTS_INTERNAL_ERROR("Inconsistent message queue management state");
+ break;
}
- /* Off heap message queue is disabled */
+ return res;
+
+change_to_off_heap:
- if (enable) {
- c_p->flags |= F_OFF_HEAP_MSGQ;
+ c_p->flags |= F_OFF_HEAP_MSGQ;
+
+ /*
+ * We do not have to schedule a change if
+ * we have an ongoing off heap change...
+ */
+ if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) {
+ ErtsChangeOffHeapMessageQueue *cohmq;
/*
- * We do not have to schedule a change if
- * we have an ongoing change...
+ * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait
+ * thread progress before completing the change in
+ * order to ensure that all senders observe that
+ * messages should be passed off heap. When the
+ * change has completed, GC does not need to inspect
+ * the message queue at all.
*/
- if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) {
- ErtsChangeOffHeapMessageQueue *cohmq;
- /*
- * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait
- * thread progress before completing the change in
- * order to ensure that all senders observe that
- * messages should be passed off heap. When the
- * change has completed, GC does not need to inspect
- * the message queue at all.
- */
- erts_smp_atomic32_read_bor_nob(&c_p->state,
- ERTS_PSFLG_OFF_HEAP_MSGQ);
- c_p->flags |= F_OFF_HEAP_MSGQ_CHNG;
- cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG,
- sizeof(ErtsChangeOffHeapMessageQueue));
- cohmq->pid = c_p->common.id;
- erts_schedule_thr_prgr_later_op(change_off_heap_msgq,
- (void *) cohmq,
- &cohmq->lop);
- }
+ erts_smp_atomic32_read_bor_nob(&c_p->state,
+ ERTS_PSFLG_OFF_HEAP_MSGQ);
+ c_p->flags |= F_OFF_HEAP_MSGQ_CHNG;
+ cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG,
+ sizeof(ErtsChangeOffHeapMessageQueue));
+ cohmq->pid = c_p->common.id;
+ erts_schedule_thr_prgr_later_op(change_off_heap_msgq,
+ (void *) cohmq,
+ &cohmq->lop);
}
- return am_false; /* Old state */
+ return res;
}
int
@@ -1455,6 +1577,9 @@ void erts_factory_selfcontained_message_init(ErtsHeapFactory* factory,
ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end);
}
+/* One static sized heap that must suffice.
+ No extra heap fragments will be allocated.
+*/
void erts_factory_static_init(ErtsHeapFactory* factory,
Eterm* hp,
Uint size,
@@ -1469,6 +1594,23 @@ void erts_factory_static_init(ErtsHeapFactory* factory,
factory->off_heap_saved.overhead = factory->off_heap->overhead;
}
+/* A temporary heap with default buffer allocated/freed by client.
+ * factory_close is same as factory_undo
+ */
+void erts_factory_tmp_init(ErtsHeapFactory* factory, Eterm* hp, Uint size,
+ Uint32 atype)
+{
+ factory->mode = FACTORY_TMP;
+ factory->hp_start = hp;
+ factory->hp = hp;
+ factory->hp_end = hp + size;
+ factory->heap_frags = NULL;
+ factory->off_heap_saved.first = NULL;
+ factory->off_heap_saved.overhead = 0;
+ factory->off_heap = &factory->off_heap_saved;
+ factory->alloc_type = atype;
+}
+
/* When we know the term is an immediate and need no heap.
*/
void erts_factory_dummy_init(ErtsHeapFactory* factory)
@@ -1519,6 +1661,7 @@ static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra)
else {
/* Fall through */
case FACTORY_HEAP_FRAGS:
+ case FACTORY_TMP:
bp = factory->heap_frags;
}
@@ -1584,6 +1727,9 @@ void erts_factory_close(ErtsHeapFactory* factory)
bp->used_size = factory->hp - bp->mem;
}
break;
+ case FACTORY_TMP:
+ erts_factory_undo(factory);
+ break;
case FACTORY_STATIC: break;
case FACTORY_CLOSED: break;
default:
@@ -1710,8 +1856,20 @@ void erts_factory_undo(ErtsHeapFactory* factory)
factory->message->data.heap_frag = factory->heap_frags;
erts_cleanup_messages(factory->message);
break;
+ case FACTORY_TMP:
case FACTORY_HEAP_FRAGS:
- free_message_buffer(factory->heap_frags);
+ erts_cleanup_offheap(factory->off_heap);
+ factory->off_heap->first = NULL;
+
+ bp = factory->heap_frags;
+ while (bp != NULL) {
+ ErlHeapFragment* next_bp = bp->next;
+
+ ASSERT(bp->off_heap.first == NULL);
+ ERTS_HEAP_FREE(factory->alloc_type, (void *) bp,
+ ERTS_HEAP_FRAG_SIZE(bp->alloc_size));
+ bp = next_bp;
+ }
break;
case FACTORY_CLOSED: break;
@@ -1725,4 +1883,3 @@ void erts_factory_undo(ErtsHeapFactory* factory)
factory->heap_frags = NULL;
#endif
}
-