aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_message.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r--erts/emulator/beam/erl_message.c254
1 files changed, 233 insertions, 21 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index ab1ab7b1ea..4cdf2d7d09 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -31,6 +31,7 @@
#include "erl_process.h"
#include "erl_nmgc.h"
#include "erl_binary.h"
+#include "dtrace-wrapper.h"
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message,
ErlMessage,
@@ -335,6 +336,11 @@ erts_queue_dist_message(Process *rcvr,
Eterm token)
{
ErlMessage* mp;
+#ifdef USE_VM_PROBES
+ Sint tok_label = 0;
+ Sint tok_lastcnt = 0;
+ Sint tok_serial = 0;
+#endif
#ifdef ERTS_SMP
ErtsProcLocks need_locks;
#endif
@@ -376,15 +382,61 @@ erts_queue_dist_message(Process *rcvr,
message_free(mp);
msg = erts_msg_distext2heap(rcvr, rcvr_locks, &mbuf, &token, dist_ext);
if (is_value(msg))
- erts_queue_message(rcvr, rcvr_locks, mbuf, msg, token);
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(message_queued)) {
+ DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
+
+ dtrace_proc_str(rcvr, receiver_name);
+ if (token != NIL && token != am_have_dt_utag) {
+ tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
+ tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
+ tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
+ }
+ DTRACE6(message_queued,
+ receiver_name, size_object(msg), rcvr->msg.len,
+ tok_label, tok_lastcnt, tok_serial);
+ }
+#endif
+ erts_queue_message(rcvr, rcvr_locks, mbuf, msg, token
+#ifdef USE_VM_PROBES
+ , NIL
+#endif
+ );
}
else {
/* Enqueue message on external format */
ERL_MESSAGE_TERM(mp) = THE_NON_VALUE;
- ERL_MESSAGE_TOKEN(mp) = token;
+#ifdef USE_VM_PROBES
+ ERL_MESSAGE_DT_UTAG(mp) = NIL;
+ if (token == am_have_dt_utag) {
+ ERL_MESSAGE_TOKEN(mp) = NIL;
+ } else {
+#endif
+ ERL_MESSAGE_TOKEN(mp) = token;
+#ifdef USE_VM_PROBES
+ }
+#endif
mp->next = NULL;
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(message_queued)) {
+ DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
+
+ dtrace_proc_str(rcvr, receiver_name);
+ if (token != NIL && token != am_have_dt_utag) {
+ tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
+ tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
+ tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
+ }
+ /*
+ * TODO: We don't know the real size of the external message here.
+ * -1 will appear to a D script as 4294967295.
+ */
+ DTRACE6(message_queued, receiver_name, -1, rcvr->msg.len + 1,
+ tok_label, tok_lastcnt, tok_serial);
+ }
+#endif
mp->data.dist_ext = dist_ext;
LINK_MESSAGE(rcvr, mp);
@@ -398,7 +450,11 @@ erts_queue_message(Process* receiver,
ErtsProcLocks *receiver_locks,
ErlHeapFragment* bp,
Eterm message,
- Eterm seq_trace_token)
+ Eterm seq_trace_token
+#ifdef USE_VM_PROBES
+ , Eterm dt_utag
+#endif
+)
{
ErlMessage* mp;
#ifdef ERTS_SMP
@@ -439,6 +495,9 @@ erts_queue_message(Process* receiver,
ERL_MESSAGE_TERM(mp) = message;
ERL_MESSAGE_TOKEN(mp) = seq_trace_token;
+#ifdef USE_VM_PROBES
+ ERL_MESSAGE_DT_UTAG(mp) = dt_utag;
+#endif
mp->next = NULL;
mp->data.heap_frag = bp;
@@ -462,12 +521,30 @@ erts_queue_message(Process* receiver,
LINK_MESSAGE(receiver, mp);
#endif
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(message_queued)) {
+ DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
+ Sint tok_label = 0;
+ Sint tok_lastcnt = 0;
+ Sint tok_serial = 0;
+
+ dtrace_proc_str(receiver, receiver_name);
+ if (seq_trace_token != NIL && is_tuple(seq_trace_token)) {
+ tok_label = signed_val(SEQ_TRACE_T_LABEL(seq_trace_token));
+ tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(seq_trace_token));
+ tok_serial = signed_val(SEQ_TRACE_T_SERIAL(seq_trace_token));
+ }
+ DTRACE6(message_queued,
+ receiver_name, size_object(message), receiver->msg.len,
+ tok_label, tok_lastcnt, tok_serial);
+ }
+#endif
notify_new_message(receiver);
if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
trace_receive(receiver, message);
}
-
+
#ifndef ERTS_SMP
ERTS_HOLE_CHECK(receiver);
#endif
@@ -497,6 +574,9 @@ erts_move_msg_mbuf_to_heap(Eterm** hpp, ErlOffHeap* off_heap, ErlMessage *msg)
Sint offs;
Uint sz;
ErlHeapFragment *bp;
+#ifdef USE_VM_PROBES
+ Eterm utag;
+#endif
#ifdef HARD_DEBUG
ProcBin *dbg_mso_start = off_heap->mso;
@@ -506,32 +586,56 @@ erts_move_msg_mbuf_to_heap(Eterm** hpp, ErlOffHeap* off_heap, ErlMessage *msg)
ErlHeapFragment *dbg_bp;
Uint *dbg_hp, *dbg_thp_start;
Uint dbg_term_sz, dbg_token_sz;
+#ifdef USE_VM_PROBES
+ Eterm dbg_utag;
+ Uint dbg_utag_sz;
+#endif
#endif
bp = msg->data.heap_frag;
term = ERL_MESSAGE_TERM(msg);
token = ERL_MESSAGE_TOKEN(msg);
+#ifdef USE_VM_PROBES
+ utag = ERL_MESSAGE_DT_UTAG(msg);
+#endif
if (!bp) {
+#ifdef USE_VM_PROBES
+ ASSERT(is_immed(term) && is_immed(token) && is_immed(utag));
+#else
ASSERT(is_immed(term) && is_immed(token));
+#endif
return;
}
#ifdef HARD_DEBUG
dbg_term_sz = size_object(term);
dbg_token_sz = size_object(token);
+ dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz);
+#ifdef USE_VM_PROBES
+ dbg_utag_sz = size_object(utag);
+ dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz + dbg_utag_sz );
+#endif
/*ASSERT(dbg_term_sz + dbg_token_sz == erts_msg_used_frag_sz(msg));
Copied size may be smaller due to removed SubBins's or garbage.
Copied size may be larger due to duplicated shared terms.
*/
- dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz);
dbg_hp = dbg_bp->mem;
dbg_term = copy_struct(term, dbg_term_sz, &dbg_hp, &dbg_bp->off_heap);
dbg_token = copy_struct(token, dbg_token_sz, &dbg_hp, &dbg_bp->off_heap);
- dbg_thp_start = *hpp;
+#ifdef USE_VM_PROBES
+ dbg_utag = copy_struct(utag, dbg_utag_sz, &dbg_hp, &dbg_bp->off_heap);
+#endif
+ dbg_thp_start = *hpp;
#endif
if (bp->next != NULL) {
- move_multi_frags(hpp, off_heap, bp, msg->m, 2);
+ move_multi_frags(hpp, off_heap, bp, msg->m,
+#ifdef USE_VM_PROBES
+ 3
+#else
+ 2
+#endif
+ );
goto copy_done;
}
@@ -633,6 +737,16 @@ erts_move_msg_mbuf_to_heap(Eterm** hpp, ErlOffHeap* off_heap, ErlMessage *msg)
ASSERT(hp > ptr_val(ERL_MESSAGE_TERM(msg)));
#endif
}
+#ifdef USE_VM_PROBES
+ if (is_not_immed(utag)) {
+ ASSERT(in_heapfrag(ptr_val(utag), bp));
+ ERL_MESSAGE_DT_UTAG(msg) = offset_ptr(utag, offs);
+#ifdef HARD_DEBUG
+ ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_DT_UTAG(msg)));
+ ASSERT(hp > ptr_val(ERL_MESSAGE_DT_UTAG(msg)));
+#endif
+ }
+#endif
copy_done:
@@ -699,6 +813,9 @@ copy_done:
#ifdef HARD_DEBUG
ASSERT(eq(ERL_MESSAGE_TERM(msg), dbg_term));
ASSERT(eq(ERL_MESSAGE_TOKEN(msg), dbg_token));
+#ifdef USE_VM_PROBES
+ ASSERT(eq(ERL_MESSAGE_DT_UTAG(msg), dbg_utag));
+#endif
free_message_buffer(dbg_bp);
#endif
@@ -774,39 +891,101 @@ erts_send_message(Process* sender,
Uint msize;
ErlHeapFragment* bp = NULL;
Eterm token = NIL;
-
+#ifdef USE_VM_PROBES
+ DTRACE_CHARBUF(sender_name, 64);
+ DTRACE_CHARBUF(receiver_name, 64);
+ Sint tok_label = 0;
+ Sint tok_lastcnt = 0;
+ Sint tok_serial = 0;
+#endif
BM_STOP_TIMER(system);
BM_MESSAGE(message,sender,receiver);
BM_START_TIMER(send);
+ #ifdef USE_VM_PROBES
+ *sender_name = *receiver_name = '\0';
+ if (DTRACE_ENABLED(message_send)) {
+ erts_snprintf(sender_name, sizeof(sender_name), "%T", sender->id);
+ erts_snprintf(receiver_name, sizeof(receiver_name), "%T", receiver->id);
+ }
+#endif
if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) {
Eterm* hp;
+ Eterm stoken = SEQ_TRACE_TOKEN(sender);
+ Uint seq_trace_size = 0;
+#ifdef USE_VM_PROBES
+ Uint dt_utag_size = 0;
+ Eterm utag = NIL;
+#endif
- BM_SWAP_TIMER(send,size);
+ BM_SWAP_TIMER(send,size);
msize = size_object(message);
- BM_SWAP_TIMER(size,send);
+ BM_SWAP_TIMER(size,send);
+
+#ifdef USE_VM_PROBES
+ if (stoken != am_have_dt_utag) {
+#endif
+
+ seq_trace_update_send(sender);
+ seq_trace_output(stoken, message, SEQ_TRACE_SEND,
+ receiver->id, sender);
+ seq_trace_size = 6; /* TUPLE5 */
+#ifdef USE_VM_PROBES
+ }
+ if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
+ dt_utag_size = size_object(DT_UTAG(sender));
+ } else if (stoken == am_have_dt_utag ) {
+ stoken = NIL;
+ }
+#endif
- seq_trace_update_send(sender);
- seq_trace_output(SEQ_TRACE_TOKEN(sender), message, SEQ_TRACE_SEND,
- receiver->id, sender);
- bp = new_message_buffer(msize + 6 /* TUPLE5 */);
+ bp = new_message_buffer(msize + seq_trace_size
+#ifdef USE_VM_PROBES
+ + dt_utag_size
+#endif
+ );
hp = bp->mem;
BM_SWAP_TIMER(send,copy);
- token = copy_struct(SEQ_TRACE_TOKEN(sender),
- 6 /* TUPLE5 */,
+ token = copy_struct(stoken,
+ seq_trace_size,
&hp,
&bp->off_heap);
message = copy_struct(message, msize, &hp, &bp->off_heap);
+#ifdef USE_VM_PROBES
+ if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
+ utag = copy_struct(DT_UTAG(sender), dt_utag_size, &hp, &bp->off_heap);
+#ifdef DTRACE_TAG_HARDDEBUG
+ erts_fprintf(stderr,
+ "Dtrace -> (%T) Spreading tag (%T) with "
+ "message %T!\r\n",sender->id, utag, message);
+#endif
+ }
+#endif
BM_MESSAGE_COPIED(msize);
BM_SWAP_TIMER(copy,send);
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(message_send)) {
+ if (stoken != NIL && stoken != am_have_dt_utag) {
+ tok_label = signed_val(SEQ_TRACE_T_LABEL(stoken));
+ tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(stoken));
+ tok_serial = signed_val(SEQ_TRACE_T_SERIAL(stoken));
+ }
+ DTRACE6(message_send, sender_name, receiver_name,
+ msize, tok_label, tok_lastcnt, tok_serial);
+ }
+#endif
erts_queue_message(receiver,
receiver_locks,
bp,
message,
- token);
+ token
+#ifdef USE_VM_PROBES
+ , utag
+#endif
+ );
BM_SWAP_TIMER(send,system);
#ifdef HYBRID
} else {
@@ -835,8 +1014,13 @@ erts_send_message(Process* sender,
#endif
LAZY_COPY(sender,message);
BM_SWAP_TIMER(copy,send);
+ DTRACE6(message_send, sender_name, receiver_name,
+ size_object(message)msize, tok_label, tok_lastcnt, tok_serial);
ERL_MESSAGE_TERM(mp) = message;
ERL_MESSAGE_TOKEN(mp) = NIL;
+#ifdef USE_VM_PROBES
+ ERL_MESSAGE_DT_UTAG(mp) = NIL;
+#endif
mp->next = NULL;
LINK_MESSAGE(receiver, mp);
ACTIVATE(receiver);
@@ -874,9 +1058,14 @@ erts_send_message(Process* sender,
{
ErlMessage* mp = message_alloc();
+ DTRACE6(message_send, sender_name, receiver_name,
+ size_object(message), tok_label, tok_lastcnt, tok_serial);
mp->data.attached = NULL;
ERL_MESSAGE_TERM(mp) = message;
ERL_MESSAGE_TOKEN(mp) = NIL;
+#ifdef USE_VM_PROBES
+ ERL_MESSAGE_DT_UTAG(mp) = NIL;
+#endif
mp->next = NULL;
/*
* We move 'in queue' to 'private queue' and place
@@ -908,7 +1097,13 @@ erts_send_message(Process* sender,
message = copy_struct(message, msize, &hp, ohp);
BM_MESSAGE_COPIED(msz);
BM_SWAP_TIMER(copy,send);
- erts_queue_message(receiver, receiver_locks, bp, message, token);
+ DTRACE6(message_send, sender_name, receiver_name,
+ msize, tok_label, tok_lastcnt, tok_serial);
+ erts_queue_message(receiver, receiver_locks, bp, message, token
+#ifdef USE_VM_PROBES
+ , NIL
+#endif
+ );
BM_SWAP_TIMER(send,system);
#else
ErlMessage* mp = message_alloc();
@@ -928,8 +1123,13 @@ erts_send_message(Process* sender,
message = copy_struct(message, msize, &hp, &receiver->off_heap);
BM_MESSAGE_COPIED(msize);
BM_SWAP_TIMER(copy,send);
+ DTRACE6(message_send, sender_name, receiver_name,
+ (uint32_t)msize, tok_label, tok_lastcnt, tok_serial);
ERL_MESSAGE_TERM(mp) = message;
ERL_MESSAGE_TOKEN(mp) = NIL;
+#ifdef USE_VM_PROBES
+ ERL_MESSAGE_DT_UTAG(mp) = NIL;
+#endif
mp->next = NULL;
mp->data.attached = NULL;
LINK_MESSAGE(receiver, mp);
@@ -968,7 +1168,11 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp,
Eterm temptoken;
ErlHeapFragment* bp = NULL;
- if (token != NIL) {
+ if (token != NIL
+#ifdef USE_VM_PROBES
+ && token != am_have_dt_utag
+#endif
+ ) {
ASSERT(is_tuple(token));
sz_reason = size_object(reason);
@@ -983,7 +1187,11 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp,
/* the trace token must in this case be updated by the caller */
seq_trace_output(token, save, SEQ_TRACE_SEND, to->id, NULL);
temptoken = copy_struct(token, sz_token, &hp, &bp->off_heap);
- erts_queue_message(to, to_locksp, bp, save, temptoken);
+ erts_queue_message(to, to_locksp, bp, save, temptoken
+#ifdef USE_VM_PROBES
+ , NIL
+#endif
+ );
} else {
ErlOffHeap *ohp;
sz_reason = size_object(reason);
@@ -1000,7 +1208,11 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp,
? from
: copy_struct(from, sz_from, &hp, ohp));
save = TUPLE3(hp, am_EXIT, from_copy, mess);
- erts_queue_message(to, to_locksp, bp, save, NIL);
+ erts_queue_message(to, to_locksp, bp, save, NIL
+#ifdef USE_VM_PROBES
+ , NIL
+#endif
+ );
}
}