diff options
author | Lukas Larsson <[email protected]> | 2015-12-10 11:10:46 +0100 |
---|---|---|
committer | Lukas Larsson <[email protected]> | 2016-04-15 15:06:27 +0200 |
commit | 37092dab15448ef6a078800e3ff0cc41880ea765 (patch) | |
tree | cb8a30fe427cec5f82d96b3e05258d4fd263e58f /erts/emulator/beam/erl_message.c | |
parent | 13bd4ca2493d8a76f9d835c27163b56ba86c2aef (diff) | |
download | otp-37092dab15448ef6a078800e3ff0cc41880ea765.tar.gz otp-37092dab15448ef6a078800e3ff0cc41880ea765.tar.bz2 otp-37092dab15448ef6a078800e3ff0cc41880ea765.zip |
erts: Implement tracer modules
Add the possibility to use modules as trace data receivers. The functions
in the module have to be nifs as otherwise complex trace probes will be
very hard to handle (complex means trace probes for ports for example).
This commit changes the way that the ptab->tracer field works from always
being an immediate, to now be NIL if no tracer is present or else be
the tuple {TracerModule, TracerState} where TracerModule is an atom that
is later used to lookup the appropriate tracer callbacks to call and
TracerState is just passed to the tracer callback. The default process and
port tracers have been rewritten to use the new API.
This commit also changes the order which trace messages are delivered to the
potential tracer process. Any enif_send done in a tracer module may be delayed
indefinitely because of lock order issues. If a message is delayed any other
trace message send from that process is also delayed so that order is preserved
for each traced entity. This means that for some trace events (i.e. send/receive)
the events may come in an unintuitive order (receive before send) to the
trace receiver. Timestamps are taken when the trace message is generated so
trace messages from differented processes may arrive with the timestamp
out of order.
Both the erlang:trace and seq_trace:set_system_tracer accept the new tracer
module tracers and also the backwards compatible arguments.
OTP-10267
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r-- | erts/emulator/beam/erl_message.c | 160 |
1 files changed, 82 insertions, 78 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 7596747b91..1cb02ec274 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -324,7 +324,7 @@ erts_queue_dist_message(Process *rcvr, tok_label, tok_lastcnt, tok_serial); } #endif - erts_queue_message(rcvr, rcvr_locks, mp, msg, token); + erts_queue_message(rcvr, rcvr_locks, mp, msg); } } else { @@ -349,7 +349,7 @@ erts_queue_dist_message(Process *rcvr, } #endif - LINK_MESSAGE(rcvr, mp); + LINK_MESSAGE(rcvr, mp, &mp->next, 1); if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); @@ -364,31 +364,34 @@ erts_queue_dist_message(Process *rcvr, } } -/* Add a message last in message queue */ +/* Add messages last in message queue */ static Sint -queue_message(Process *c_p, - Process* receiver, - erts_aint32_t *receiver_state, - ErtsProcLocks *receiver_locks, - ErtsMessage* mp, - Eterm message, - Eterm seq_trace_token -#ifdef USE_VM_PROBES - , Eterm dt_utag -#endif - ) +queue_messages(Process *c_p, + Process* receiver, + erts_aint32_t *receiver_state, + ErtsProcLocks *receiver_locks, + ErtsMessage* first, + ErtsMessage** last, + Uint len) { Sint res; int locked_msgq = 0; erts_aint32_t state; - ERTS_SMP_LC_ASSERT(*receiver_locks == erts_proc_lc_my_proc_locks(receiver)); + ASSERT(is_value(ERL_MESSAGE_TERM(first))); + ASSERT(ERL_MESSAGE_TOKEN(first) == am_undefined || + ERL_MESSAGE_TOKEN(first) == NIL || + is_tuple(ERL_MESSAGE_TOKEN(first))); #ifdef ERTS_SMP +#ifdef ERTS_ENABLE_LOCK_CHECK + ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(receiver) < ERTS_PROC_LOCK_MSGQ || + *receiver_locks == erts_proc_lc_my_proc_locks(receiver)); +#endif if (!(*receiver_locks & ERTS_PROC_LOCK_MSGQ)) { if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) { - ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ; + ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ; if (receiver_state) state = *receiver_state; @@ -417,16 +420,10 @@ queue_message(Process *c_p, /* Drop message if receiver is exiting or has a pending exit... */ if (locked_msgq) erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ); - erts_cleanup_messages(mp); + erts_cleanup_messages(first); return 0; } - ERL_MESSAGE_TERM(mp) = message; - ERL_MESSAGE_TOKEN(mp) = seq_trace_token; -#ifdef USE_VM_PROBES - ERL_MESSAGE_DT_UTAG(mp) = dt_utag; -#endif - res = receiver->msg.len; #ifdef ERTS_SMP if (*receiver_locks & ERTS_PROC_LOCK_MAIN) { @@ -440,75 +437,83 @@ queue_message(Process *c_p, */ res += receiver->msg_inq.len; ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); - LINK_MESSAGE_PRIVQ(receiver, mp); + LINK_MESSAGE_PRIVQ(receiver, first, last, len); } else #endif { - LINK_MESSAGE(receiver, mp); + LINK_MESSAGE(receiver, first, last, len); } + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { + ErtsMessage *msg = first; + #ifdef USE_VM_PROBES - if (DTRACE_ENABLED(message_queued)) { - DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); - Sint tok_label = 0; - Sint tok_lastcnt = 0; - Sint tok_serial = 0; - - dtrace_proc_str(receiver, receiver_name); - if (seq_trace_token != NIL && is_tuple(seq_trace_token)) { - tok_label = signed_val(SEQ_TRACE_T_LABEL(seq_trace_token)); - tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(seq_trace_token)); - tok_serial = signed_val(SEQ_TRACE_T_SERIAL(seq_trace_token)); + if (DTRACE_ENABLED(message_queued)) { + DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); + Sint tok_label = 0; + Sint tok_lastcnt = 0; + Sint tok_serial = 0; + Eterm seq_trace_token = ERL_MESSAGE_TOKEN(msg); + + dtrace_proc_str(receiver, receiver_name); + if (seq_trace_token != NIL && is_tuple(seq_trace_token)) { + tok_label = signed_val(SEQ_TRACE_T_LABEL(seq_trace_token)); + tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(seq_trace_token)); + tok_serial = signed_val(SEQ_TRACE_T_SERIAL(seq_trace_token)); + } + DTRACE6(message_queued, + receiver_name, size_object(ERL_MESSAGE_TERM(msg)), + receiver->msg.len, + tok_label, tok_lastcnt, tok_serial); } - DTRACE6(message_queued, - receiver_name, size_object(message), receiver->msg.len, - tok_label, tok_lastcnt, tok_serial); - } #endif - if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) - trace_receive(receiver, message); + while (msg) { + trace_receive(receiver, ERL_MESSAGE_TERM(msg)); + msg = msg->next; + } + + } if (locked_msgq) erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ); - erts_proc_notify_new_message(receiver, #ifdef ERTS_SMP - *receiver_locks + erts_proc_notify_new_message(receiver, *receiver_locks); #else - 0 -#endif - ); - -#ifndef ERTS_SMP + erts_proc_notify_new_message(receiver, 0); ERTS_HOLE_CHECK(receiver); #endif return res; } -void -#ifdef USE_VM_PROBES -erts_queue_message_probe(Process* receiver, ErtsProcLocks *receiver_locks, - ErtsMessage* mp, - Eterm message, Eterm seq_trace_token, Eterm dt_utag) -#else +static Sint +queue_message(Process *c_p, + Process* receiver, + erts_aint32_t *receiver_state, + ErtsProcLocks *receiver_locks, + ErtsMessage* mp, Eterm msg) +{ + ERL_MESSAGE_TERM(mp) = msg; + return queue_messages(c_p, receiver, receiver_state, receiver_locks, + mp, &mp->next, 1 ); +} + +Sint erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks, - ErtsMessage* mp, - Eterm message, Eterm seq_trace_token) -#endif + ErtsMessage* mp, Eterm msg) { - queue_message(NULL, - receiver, - NULL, - receiver_locks, - mp, - message, - seq_trace_token -#ifdef USE_VM_PROBES - , dt_utag -#endif - ); + return queue_message(NULL, receiver, NULL, receiver_locks, mp, msg); +} + + +Sint +erts_queue_messages(Process* receiver, ErtsProcLocks *receiver_locks, + ErtsMessage* first, ErtsMessage** last, Uint len) +{ + return queue_messages(NULL, receiver, NULL, receiver_locks, + first, last, len); } void @@ -821,17 +826,15 @@ erts_send_message(Process* sender, #endif } + ERL_MESSAGE_TOKEN(mp) = token; +#ifdef USE_VM_PROBES + ERL_MESSAGE_DT_UTAG(mp) = utag; +#endif res = queue_message(sender, receiver, &receiver_state, receiver_locks, - mp, - message, - token -#ifdef USE_VM_PROBES - , utag -#endif - ); + mp, message); BM_SWAP_TIMER(send,system); @@ -887,7 +890,8 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp, /* the trace token must in this case be updated by the caller */ seq_trace_output(token, save, SEQ_TRACE_SEND, to->common.id, NULL); temptoken = copy_struct(token, sz_token, &hp, ohp); - erts_queue_message(to, to_locksp, mp, save, temptoken); + ERL_MESSAGE_TOKEN(mp) = temptoken; + erts_queue_message(to, to_locksp, mp, save); } else { sz_from = IS_CONST(from) ? 0 : size_object(from); #ifdef SHCOPY_SEND @@ -909,7 +913,7 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp, ? from : copy_struct(from, sz_from, &hp, ohp)); save = TUPLE3(hp, am_EXIT, from_copy, mess); - erts_queue_message(to, to_locksp, mp, save, NIL); + erts_queue_message(to, to_locksp, mp, save); } } |