aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_message.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r--erts/emulator/beam/erl_message.c160
1 files changed, 82 insertions, 78 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index 7596747b91..1cb02ec274 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -324,7 +324,7 @@ erts_queue_dist_message(Process *rcvr,
tok_label, tok_lastcnt, tok_serial);
}
#endif
- erts_queue_message(rcvr, rcvr_locks, mp, msg, token);
+ erts_queue_message(rcvr, rcvr_locks, mp, msg);
}
}
else {
@@ -349,7 +349,7 @@ erts_queue_dist_message(Process *rcvr,
}
#endif
- LINK_MESSAGE(rcvr, mp);
+ LINK_MESSAGE(rcvr, mp, &mp->next, 1);
if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
@@ -364,31 +364,34 @@ erts_queue_dist_message(Process *rcvr,
}
}
-/* Add a message last in message queue */
+/* Add messages last in message queue */
static Sint
-queue_message(Process *c_p,
- Process* receiver,
- erts_aint32_t *receiver_state,
- ErtsProcLocks *receiver_locks,
- ErtsMessage* mp,
- Eterm message,
- Eterm seq_trace_token
-#ifdef USE_VM_PROBES
- , Eterm dt_utag
-#endif
- )
+queue_messages(Process *c_p,
+ Process* receiver,
+ erts_aint32_t *receiver_state,
+ ErtsProcLocks *receiver_locks,
+ ErtsMessage* first,
+ ErtsMessage** last,
+ Uint len)
{
Sint res;
int locked_msgq = 0;
erts_aint32_t state;
- ERTS_SMP_LC_ASSERT(*receiver_locks == erts_proc_lc_my_proc_locks(receiver));
+ ASSERT(is_value(ERL_MESSAGE_TERM(first)));
+ ASSERT(ERL_MESSAGE_TOKEN(first) == am_undefined ||
+ ERL_MESSAGE_TOKEN(first) == NIL ||
+ is_tuple(ERL_MESSAGE_TOKEN(first)));
#ifdef ERTS_SMP
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(receiver) < ERTS_PROC_LOCK_MSGQ ||
+ *receiver_locks == erts_proc_lc_my_proc_locks(receiver));
+#endif
if (!(*receiver_locks & ERTS_PROC_LOCK_MSGQ)) {
if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) {
- ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ;
+ ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ;
if (receiver_state)
state = *receiver_state;
@@ -417,16 +420,10 @@ queue_message(Process *c_p,
/* Drop message if receiver is exiting or has a pending exit... */
if (locked_msgq)
erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
- erts_cleanup_messages(mp);
+ erts_cleanup_messages(first);
return 0;
}
- ERL_MESSAGE_TERM(mp) = message;
- ERL_MESSAGE_TOKEN(mp) = seq_trace_token;
-#ifdef USE_VM_PROBES
- ERL_MESSAGE_DT_UTAG(mp) = dt_utag;
-#endif
-
res = receiver->msg.len;
#ifdef ERTS_SMP
if (*receiver_locks & ERTS_PROC_LOCK_MAIN) {
@@ -440,75 +437,83 @@ queue_message(Process *c_p,
*/
res += receiver->msg_inq.len;
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);
- LINK_MESSAGE_PRIVQ(receiver, mp);
+ LINK_MESSAGE_PRIVQ(receiver, first, last, len);
}
else
#endif
{
- LINK_MESSAGE(receiver, mp);
+ LINK_MESSAGE(receiver, first, last, len);
}
+ if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
+ ErtsMessage *msg = first;
+
#ifdef USE_VM_PROBES
- if (DTRACE_ENABLED(message_queued)) {
- DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
- Sint tok_label = 0;
- Sint tok_lastcnt = 0;
- Sint tok_serial = 0;
-
- dtrace_proc_str(receiver, receiver_name);
- if (seq_trace_token != NIL && is_tuple(seq_trace_token)) {
- tok_label = signed_val(SEQ_TRACE_T_LABEL(seq_trace_token));
- tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(seq_trace_token));
- tok_serial = signed_val(SEQ_TRACE_T_SERIAL(seq_trace_token));
+ if (DTRACE_ENABLED(message_queued)) {
+ DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
+ Sint tok_label = 0;
+ Sint tok_lastcnt = 0;
+ Sint tok_serial = 0;
+ Eterm seq_trace_token = ERL_MESSAGE_TOKEN(msg);
+
+ dtrace_proc_str(receiver, receiver_name);
+ if (seq_trace_token != NIL && is_tuple(seq_trace_token)) {
+ tok_label = signed_val(SEQ_TRACE_T_LABEL(seq_trace_token));
+ tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(seq_trace_token));
+ tok_serial = signed_val(SEQ_TRACE_T_SERIAL(seq_trace_token));
+ }
+ DTRACE6(message_queued,
+ receiver_name, size_object(ERL_MESSAGE_TERM(msg)),
+ receiver->msg.len,
+ tok_label, tok_lastcnt, tok_serial);
}
- DTRACE6(message_queued,
- receiver_name, size_object(message), receiver->msg.len,
- tok_label, tok_lastcnt, tok_serial);
- }
#endif
- if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE))
- trace_receive(receiver, message);
+ while (msg) {
+ trace_receive(receiver, ERL_MESSAGE_TERM(msg));
+ msg = msg->next;
+ }
+
+ }
if (locked_msgq)
erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
- erts_proc_notify_new_message(receiver,
#ifdef ERTS_SMP
- *receiver_locks
+ erts_proc_notify_new_message(receiver, *receiver_locks);
#else
- 0
-#endif
- );
-
-#ifndef ERTS_SMP
+ erts_proc_notify_new_message(receiver, 0);
ERTS_HOLE_CHECK(receiver);
#endif
return res;
}
-void
-#ifdef USE_VM_PROBES
-erts_queue_message_probe(Process* receiver, ErtsProcLocks *receiver_locks,
- ErtsMessage* mp,
- Eterm message, Eterm seq_trace_token, Eterm dt_utag)
-#else
+static Sint
+queue_message(Process *c_p,
+ Process* receiver,
+ erts_aint32_t *receiver_state,
+ ErtsProcLocks *receiver_locks,
+ ErtsMessage* mp, Eterm msg)
+{
+ ERL_MESSAGE_TERM(mp) = msg;
+ return queue_messages(c_p, receiver, receiver_state, receiver_locks,
+ mp, &mp->next, 1 );
+}
+
+Sint
erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks,
- ErtsMessage* mp,
- Eterm message, Eterm seq_trace_token)
-#endif
+ ErtsMessage* mp, Eterm msg)
{
- queue_message(NULL,
- receiver,
- NULL,
- receiver_locks,
- mp,
- message,
- seq_trace_token
-#ifdef USE_VM_PROBES
- , dt_utag
-#endif
- );
+ return queue_message(NULL, receiver, NULL, receiver_locks, mp, msg);
+}
+
+
+Sint
+erts_queue_messages(Process* receiver, ErtsProcLocks *receiver_locks,
+ ErtsMessage* first, ErtsMessage** last, Uint len)
+{
+ return queue_messages(NULL, receiver, NULL, receiver_locks,
+ first, last, len);
}
void
@@ -821,17 +826,15 @@ erts_send_message(Process* sender,
#endif
}
+ ERL_MESSAGE_TOKEN(mp) = token;
+#ifdef USE_VM_PROBES
+ ERL_MESSAGE_DT_UTAG(mp) = utag;
+#endif
res = queue_message(sender,
receiver,
&receiver_state,
receiver_locks,
- mp,
- message,
- token
-#ifdef USE_VM_PROBES
- , utag
-#endif
- );
+ mp, message);
BM_SWAP_TIMER(send,system);
@@ -887,7 +890,8 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp,
/* the trace token must in this case be updated by the caller */
seq_trace_output(token, save, SEQ_TRACE_SEND, to->common.id, NULL);
temptoken = copy_struct(token, sz_token, &hp, ohp);
- erts_queue_message(to, to_locksp, mp, save, temptoken);
+ ERL_MESSAGE_TOKEN(mp) = temptoken;
+ erts_queue_message(to, to_locksp, mp, save);
} else {
sz_from = IS_CONST(from) ? 0 : size_object(from);
#ifdef SHCOPY_SEND
@@ -909,7 +913,7 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp,
? from
: copy_struct(from, sz_from, &hp, ohp));
save = TUPLE3(hp, am_EXIT, from_copy, mess);
- erts_queue_message(to, to_locksp, mp, save, NIL);
+ erts_queue_message(to, to_locksp, mp, save);
}
}