From ff1a1e3c6f9a233b880e9d359a1c50e55d7c812f Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Thu, 2 Aug 2012 17:36:06 +0200 Subject: Read message queue lengths while having lock --- erts/emulator/beam/bif.c | 15 +++-------- erts/emulator/beam/erl_message.c | 55 ++++++++++++++++++++++++---------------- erts/emulator/beam/erl_message.h | 6 ++--- 3 files changed, 40 insertions(+), 36 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 160b1ef0fc..20ac0637e5 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -2051,18 +2051,11 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) { rp_locks |= ERTS_PROC_LOCK_MAIN; #endif /* send to local process */ - erts_send_message(p, rp, &rp_locks, msg, 0); - if (!erts_use_sender_punish) + res = erts_send_message(p, rp, &rp_locks, msg, 0); + if (erts_use_sender_punish) + res *= 4; + else res = 0; - else { -#ifdef ERTS_SMP - res = rp->msg_inq.len*4; - if (ERTS_PROC_LOCK_MAIN & rp_locks) - res += rp->msg.len*4; -#else - res = rp->msg.len*4; -#endif - } erts_smp_proc_unlock(rp, p == rp ? (rp_locks & ~ERTS_PROC_LOCK_MAIN) diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index b10964da52..64065bafb8 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -419,7 +419,7 @@ erts_queue_dist_message(Process *rcvr, } /* Add a message last in message queue */ -static int +static Sint queue_message(Process *c_p, Process* receiver, ErtsProcLocks *receiver_locks, @@ -432,6 +432,7 @@ queue_message(Process *c_p, #endif ) { + Sint res; ErlMessage* mp; int locked_msgq = 0; erts_aint_t state; @@ -492,7 +493,10 @@ queue_message(Process *c_p, mp->next = NULL; mp->data.heap_frag = bp; -#ifdef ERTS_SMP +#ifndef ERTS_SMP + res = receiver->msg.len; +#else + res = receiver->msg_inq.len; if (*receiver_locks & ERTS_PROC_LOCK_MAIN) { /* * We move 'in queue' to 'private queue' and place @@ -502,6 +506,7 @@ queue_message(Process *c_p, * we don't need to include the 'in queue' in * the root set when garbage collecting. */ + res += receiver->msg.len; ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); LINK_MESSAGE_PRIVQ(receiver, mp); } @@ -541,7 +546,7 @@ queue_message(Process *c_p, #ifndef ERTS_SMP ERTS_HOLE_CHECK(receiver); #endif - return 0; + return res; } void @@ -899,7 +904,7 @@ erts_move_msg_attached_data_to_heap(Eterm **hpp, ErlOffHeap *ohp, ErlMessage *ms * Send a local message when sender & receiver processes are known. */ -void +Sint erts_send_message(Process* sender, Process* receiver, ErtsProcLocks *receiver_locks, @@ -909,6 +914,7 @@ erts_send_message(Process* sender, Uint msize; ErlHeapFragment* bp = NULL; Eterm token = NIL; + Sint res = 0; #ifdef USE_VM_PROBES DTRACE_CHARBUF(sender_name, 64); DTRACE_CHARBUF(receiver_name, 64); @@ -995,15 +1001,17 @@ erts_send_message(Process* sender, msize, tok_label, tok_lastcnt, tok_serial); } #endif - erts_queue_message(receiver, - receiver_locks, - bp, - message, - token + res = queue_message(NULL, + receiver, + receiver_locks, + NULL, + bp, + message, + token #ifdef USE_VM_PROBES - , utag + , utag #endif - ); + ); BM_SWAP_TIMER(send,system); #ifdef HYBRID } else { @@ -1043,6 +1051,8 @@ erts_send_message(Process* sender, LINK_MESSAGE(receiver, mp); ACTIVATE(receiver); + res = receiver->msg.len; + erts_proc_notify_new_message(receiver); if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { @@ -1050,7 +1060,6 @@ erts_send_message(Process* sender, } BM_SWAP_TIMER(send,system); - return; #else } else if (sender == receiver) { /* Drop message if receiver has a pending exit ... */ @@ -1094,12 +1103,13 @@ erts_send_message(Process* sender, ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); LINK_MESSAGE_PRIVQ(receiver, mp); + res = receiver->msg.len; + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { trace_receive(receiver, message); } } BM_SWAP_TIMER(send,system); - return; } else { #ifdef ERTS_SMP ErlOffHeap *ohp; @@ -1121,15 +1131,15 @@ erts_send_message(Process* sender, BM_SWAP_TIMER(copy,send); DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); - queue_message(sender, - receiver, - receiver_locks, - &state, - bp, - message, - token + res = queue_message(sender, + receiver, + receiver_locks, + &state, + bp, + message, + token #ifdef USE_VM_PROBES - , NIL + , NIL #endif ); BM_SWAP_TIMER(send,system); @@ -1161,7 +1171,7 @@ erts_send_message(Process* sender, mp->next = NULL; mp->data.attached = NULL; LINK_MESSAGE(receiver, mp); - + res = receiver->msg.len; erts_proc_notify_new_message(receiver); if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { @@ -1171,6 +1181,7 @@ erts_send_message(Process* sender, #endif /* #ifndef ERTS_SMP */ #endif /* HYBRID */ } + return res; } /* diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 3e9a24ee81..bb4dbf0ef3 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -90,7 +90,7 @@ typedef struct { ErlMessage* first; ErlMessage** last; /* point to the last next pointer */ ErlMessage** save; - int len; /* queue length */ + Sint len; /* queue length */ /* * The following two fields are used by the recv_mark/1 and @@ -105,7 +105,7 @@ typedef struct { typedef struct { ErlMessage* first; ErlMessage** last; /* point to the last next pointer */ - int len; /* queue length */ + Sint len; /* queue length */ } ErlMessageInQueue; #endif @@ -234,7 +234,7 @@ void erts_queue_message(Process*, ErtsProcLocks*, ErlHeapFragment*, Eterm, Eterm #endif ); void erts_deliver_exit_message(Eterm, Process*, ErtsProcLocks *, Eterm, Eterm); -void erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned); +Sint erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned); void erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp); void erts_move_msg_mbuf_to_heap(Eterm**, ErlOffHeap*, ErlMessage *); -- cgit v1.2.3