From ff1a1e3c6f9a233b880e9d359a1c50e55d7c812f Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Thu, 2 Aug 2012 17:36:06 +0200 Subject: Read message queue lengths while having lock --- erts/emulator/beam/erl_message.c | 55 ++++++++++++++++++++++++---------------- 1 file changed, 33 insertions(+), 22 deletions(-) (limited to 'erts/emulator/beam/erl_message.c') diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index b10964da52..64065bafb8 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -419,7 +419,7 @@ erts_queue_dist_message(Process *rcvr, } /* Add a message last in message queue */ -static int +static Sint queue_message(Process *c_p, Process* receiver, ErtsProcLocks *receiver_locks, @@ -432,6 +432,7 @@ queue_message(Process *c_p, #endif ) { + Sint res; ErlMessage* mp; int locked_msgq = 0; erts_aint_t state; @@ -492,7 +493,10 @@ queue_message(Process *c_p, mp->next = NULL; mp->data.heap_frag = bp; -#ifdef ERTS_SMP +#ifndef ERTS_SMP + res = receiver->msg.len; +#else + res = receiver->msg_inq.len; if (*receiver_locks & ERTS_PROC_LOCK_MAIN) { /* * We move 'in queue' to 'private queue' and place @@ -502,6 +506,7 @@ queue_message(Process *c_p, * we don't need to include the 'in queue' in * the root set when garbage collecting. */ + res += receiver->msg.len; ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); LINK_MESSAGE_PRIVQ(receiver, mp); } @@ -541,7 +546,7 @@ queue_message(Process *c_p, #ifndef ERTS_SMP ERTS_HOLE_CHECK(receiver); #endif - return 0; + return res; } void @@ -899,7 +904,7 @@ erts_move_msg_attached_data_to_heap(Eterm **hpp, ErlOffHeap *ohp, ErlMessage *ms * Send a local message when sender & receiver processes are known. */ -void +Sint erts_send_message(Process* sender, Process* receiver, ErtsProcLocks *receiver_locks, @@ -909,6 +914,7 @@ erts_send_message(Process* sender, Uint msize; ErlHeapFragment* bp = NULL; Eterm token = NIL; + Sint res = 0; #ifdef USE_VM_PROBES DTRACE_CHARBUF(sender_name, 64); DTRACE_CHARBUF(receiver_name, 64); @@ -995,15 +1001,17 @@ erts_send_message(Process* sender, msize, tok_label, tok_lastcnt, tok_serial); } #endif - erts_queue_message(receiver, - receiver_locks, - bp, - message, - token + res = queue_message(NULL, + receiver, + receiver_locks, + NULL, + bp, + message, + token #ifdef USE_VM_PROBES - , utag + , utag #endif - ); + ); BM_SWAP_TIMER(send,system); #ifdef HYBRID } else { @@ -1043,6 +1051,8 @@ erts_send_message(Process* sender, LINK_MESSAGE(receiver, mp); ACTIVATE(receiver); + res = receiver->msg.len; + erts_proc_notify_new_message(receiver); if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { @@ -1050,7 +1060,6 @@ erts_send_message(Process* sender, } BM_SWAP_TIMER(send,system); - return; #else } else if (sender == receiver) { /* Drop message if receiver has a pending exit ... */ @@ -1094,12 +1103,13 @@ erts_send_message(Process* sender, ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); LINK_MESSAGE_PRIVQ(receiver, mp); + res = receiver->msg.len; + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { trace_receive(receiver, message); } } BM_SWAP_TIMER(send,system); - return; } else { #ifdef ERTS_SMP ErlOffHeap *ohp; @@ -1121,15 +1131,15 @@ erts_send_message(Process* sender, BM_SWAP_TIMER(copy,send); DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); - queue_message(sender, - receiver, - receiver_locks, - &state, - bp, - message, - token + res = queue_message(sender, + receiver, + receiver_locks, + &state, + bp, + message, + token #ifdef USE_VM_PROBES - , NIL + , NIL #endif ); BM_SWAP_TIMER(send,system); @@ -1161,7 +1171,7 @@ erts_send_message(Process* sender, mp->next = NULL; mp->data.attached = NULL; LINK_MESSAGE(receiver, mp); - + res = receiver->msg.len; erts_proc_notify_new_message(receiver); if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { @@ -1171,6 +1181,7 @@ erts_send_message(Process* sender, #endif /* #ifndef ERTS_SMP */ #endif /* HYBRID */ } + return res; } /* -- cgit v1.2.3