aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_message.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r--erts/emulator/beam/erl_message.c189
1 files changed, 117 insertions, 72 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index 79739501a8..1811651a58 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -68,6 +68,7 @@ new_message_buffer(Uint size)
bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP_FRAG,
ERTS_HEAP_FRAG_SIZE(size));
ERTS_INIT_HEAP_FRAG(bp, size, size);
+ VERBOSE(DEBUG_SHCOPY, ("[pid=%T] new message buffer %p\n", erts_get_current_pid(), bp->mem));
return bp;
}
@@ -249,36 +250,6 @@ erts_realloc_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_s
}
void
-erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp)
-{
- if (first_bp) {
- ErlHeapFragment *bp = first_bp;
-
- while (1) {
- /* Move any off_heap's into the process */
- if (bp->off_heap.first != NULL) {
- struct erl_off_heap_header** next_p = &bp->off_heap.first;
- while (*next_p != NULL) {
- next_p = &((*next_p)->next);
- }
- *next_p = MSO(proc).first;
- MSO(proc).first = bp->off_heap.first;
- bp->off_heap.first = NULL;
- OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead);
- }
- MBUF_SIZE(proc) += bp->used_size;
- if (!bp->next)
- break;
- bp = bp->next;
- }
-
- /* Link the message buffer */
- bp->next = MBUF(proc);
- MBUF(proc) = first_bp;
- }
-}
-
-void
erts_queue_dist_message(Process *rcvr,
ErtsProcLocks *rcvr_locks,
ErtsDistExternal *dist_ext,
@@ -343,7 +314,7 @@ erts_queue_dist_message(Process *rcvr,
DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
dtrace_proc_str(rcvr, receiver_name);
- if (token != NIL && token != am_have_dt_utag) {
+ if (have_seqtrace(token)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
@@ -364,7 +335,7 @@ erts_queue_dist_message(Process *rcvr,
DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
dtrace_proc_str(rcvr, receiver_name);
- if (token != NIL && token != am_have_dt_utag) {
+ if (have_seqtrace(token)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
@@ -540,6 +511,36 @@ erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks,
);
}
+void
+erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp)
+{
+ if (first_bp) {
+ ErlHeapFragment *bp = first_bp;
+
+ while (1) {
+ /* Move any off_heap's into the process */
+ if (bp->off_heap.first != NULL) {
+ struct erl_off_heap_header** next_p = &bp->off_heap.first;
+ while (*next_p != NULL) {
+ next_p = &((*next_p)->next);
+ }
+ *next_p = MSO(proc).first;
+ MSO(proc).first = bp->off_heap.first;
+ bp->off_heap.first = NULL;
+ OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead);
+ }
+ MBUF_SIZE(proc) += bp->used_size;
+ if (!bp->next)
+ break;
+ bp = bp->next;
+ }
+
+ /* Link the message buffer */
+ bp->next = MBUF(proc);
+ MBUF(proc) = first_bp;
+ }
+}
+
Uint
erts_msg_attached_data_size_aux(ErtsMessage *msg)
{
@@ -663,11 +664,14 @@ erts_send_message(Process* sender,
Eterm utag = NIL;
#endif
erts_aint32_t receiver_state;
+#ifdef SHCOPY_SEND
+ erts_shcopy_t info;
+#endif
BM_STOP_TIMER(system);
BM_MESSAGE(message,sender,receiver);
BM_START_TIMER(send);
- #ifdef USE_VM_PROBES
+#ifdef USE_VM_PROBES
*sender_name = *receiver_name = '\0';
if (DTRACE_ENABLED(message_send)) {
erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
@@ -686,48 +690,61 @@ erts_send_message(Process* sender,
#ifdef USE_VM_PROBES
Uint dt_utag_size = 0;
#endif
-
- BM_SWAP_TIMER(send,size);
- msize = size_object(message);
- BM_SWAP_TIMER(size,send);
-
-#ifdef USE_VM_PROBES
- if (stoken != am_have_dt_utag) {
-#endif
-
+ BM_SWAP_TIMER(send,size);
+
+ /* SHCOPY corrupts the heap between
+ * copy_shared_calculate, and
+ * copy_shared_perform. (it inserts move_markers like the gc).
+ * Make sure we don't use the heap between those instances.
+ */
+ if (have_seqtrace(stoken)) {
seq_trace_update_send(sender);
seq_trace_output(stoken, message, SEQ_TRACE_SEND,
receiver->common.id, sender);
seq_trace_size = 6; /* TUPLE5 */
-#ifdef USE_VM_PROBES
- }
- if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
- dt_utag_size = size_object(DT_UTAG(sender));
- } else if (stoken == am_have_dt_utag ) {
- stoken = NIL;
}
+#ifdef USE_VM_PROBES
+ if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
+ dt_utag_size = size_object(DT_UTAG(sender));
+ } else if (stoken == am_have_dt_utag ) {
+ stoken = NIL;
+ }
#endif
- mp = erts_alloc_message_heap_state(receiver,
- &receiver_state,
- receiver_locks,
- (msize
+#ifdef SHCOPY_SEND
+ INITIALIZE_SHCOPY(info);
+ msize = copy_shared_calculate(message, &info);
+#else
+ msize = size_object(message);
+#endif
+ BM_SWAP_TIMER(size,send);
+
+ mp = erts_alloc_message_heap_state(receiver,
+ &receiver_state,
+ receiver_locks,
+ (msize
#ifdef USE_VM_PROBES
- + dt_utag_size
+ + dt_utag_size
#endif
- + seq_trace_size),
- &hp,
- &ohp);
+ + seq_trace_size),
+ &hp,
+ &ohp);
BM_SWAP_TIMER(send,copy);
+
+#ifdef SHCOPY_SEND
+ if (is_not_immed(message))
+ message = copy_shared_perform(message, msize, &info, &hp, ohp);
+ DESTROY_SHCOPY(info);
+#else
+ if (is_not_immed(message))
+ message = copy_struct(message, msize, &hp, ohp);
+#endif
if (is_immed(stoken))
token = stoken;
else
token = copy_struct(stoken, seq_trace_size, &hp, ohp);
- if (is_not_immed(message))
- message = copy_struct(message, msize, &hp, ohp);
-
#ifdef USE_VM_PROBES
if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
if (is_immed(DT_UTAG(sender)))
@@ -746,7 +763,7 @@ erts_send_message(Process* sender,
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(message_send)) {
- if (stoken != NIL && stoken != am_have_dt_utag) {
+ if (have_seqtrace(stoken)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(stoken));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(stoken));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(stoken));
@@ -764,7 +781,12 @@ erts_send_message(Process* sender,
}
else {
BM_SWAP_TIMER(send,size);
- msize = size_object(message);
+#ifdef SHCOPY_SEND
+ INITIALIZE_SHCOPY(info);
+ msize = copy_shared_calculate(message, &info);
+#else
+ msize = size_object(message);
+#endif
BM_SWAP_TIMER(size,send);
mp = erts_alloc_message_heap_state(receiver,
@@ -774,8 +796,14 @@ erts_send_message(Process* sender,
&hp,
&ohp);
BM_SWAP_TIMER(send,copy);
- if (is_not_immed(message))
- message = copy_struct(message, msize, &hp, ohp);
+#ifdef SHCOPY_SEND
+ if (is_not_immed(message))
+ message = copy_shared_perform(message, msize, &info, &hp, ohp);
+ DESTROY_SHCOPY(info);
+#else
+ if (is_not_immed(message))
+ message = copy_struct(message, msize, &hp, ohp);
+#endif
BM_MESSAGE_COPIED(msz);
BM_SWAP_TIMER(copy,send);
}
@@ -800,6 +828,7 @@ erts_send_message(Process* sender,
return res;
}
+
/*
* This function delivers an EXIT message to a process
* which is trapping EXITs.
@@ -819,21 +848,29 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp,
Eterm temptoken;
ErtsMessage* mp;
ErlOffHeap *ohp;
-
- if (token != NIL
-#ifdef USE_VM_PROBES
- && token != am_have_dt_utag
+#ifdef SHCOPY_SEND
+ erts_shcopy_t info;
#endif
- ) {
+ if (have_seqtrace(token)) {
ASSERT(is_tuple(token));
- sz_reason = size_object(reason);
sz_token = size_object(token);
sz_from = size_object(from);
+#ifdef SHCOPY_SEND
+ INITIALIZE_SHCOPY(info);
+ sz_reason = copy_shared_calculate(reason, &info);
+#else
+ sz_reason = size_object(reason);
+#endif
mp = erts_alloc_message_heap(to, to_locksp,
sz_reason + sz_from + sz_token + 4,
&hp, &ohp);
+#ifdef SHCOPY_SEND
+ mess = copy_shared_perform(reason, sz_reason, &info, &hp, ohp);
+ DESTROY_SHCOPY(info);
+#else
mess = copy_struct(reason, sz_reason, &hp, ohp);
+#endif
from_copy = copy_struct(from, sz_from, &hp, ohp);
save = TUPLE3(hp, am_EXIT, from_copy, mess);
hp += 4;
@@ -842,13 +879,22 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp,
temptoken = copy_struct(token, sz_token, &hp, ohp);
erts_queue_message(to, to_locksp, mp, save, temptoken);
} else {
- sz_reason = size_object(reason);
sz_from = IS_CONST(from) ? 0 : size_object(from);
-
+#ifdef SHCOPY_SEND
+ INITIALIZE_SHCOPY(info);
+ sz_reason = copy_shared_calculate(reason, &info);
+#else
+ sz_reason = size_object(reason);
+#endif
mp = erts_alloc_message_heap(to, to_locksp,
sz_reason+sz_from+4, &hp, &ohp);
+#ifdef SHCOPY_SEND
+ mess = copy_shared_perform(reason, sz_reason, &info, &hp, ohp);
+ DESTROY_SHCOPY(info);
+#else
mess = copy_struct(reason, sz_reason, &hp, ohp);
+#endif
from_copy = (IS_CONST(from)
? from
: copy_struct(from, sz_from, &hp, ohp));
@@ -1725,4 +1771,3 @@ void erts_factory_undo(ErtsHeapFactory* factory)
factory->heap_frags = NULL;
#endif
}
-