aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-08-02 17:36:06 +0200
committerRickard Green <[email protected]>2012-08-02 17:36:06 +0200
commitff1a1e3c6f9a233b880e9d359a1c50e55d7c812f (patch)
tree2a3fb137788674f6a3e27dae62aa367f85f6160e
parent10c218e71f30754be31d8091a9e98f3946ec991d (diff)
downloadotp-ff1a1e3c6f9a233b880e9d359a1c50e55d7c812f.tar.gz
otp-ff1a1e3c6f9a233b880e9d359a1c50e55d7c812f.tar.bz2
otp-ff1a1e3c6f9a233b880e9d359a1c50e55d7c812f.zip
Read message queue lengths while having lock
-rw-r--r--erts/emulator/beam/bif.c15
-rw-r--r--erts/emulator/beam/erl_message.c55
-rw-r--r--erts/emulator/beam/erl_message.h6
3 files changed, 40 insertions, 36 deletions
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index 160b1ef0fc..20ac0637e5 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -2051,18 +2051,11 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) {
rp_locks |= ERTS_PROC_LOCK_MAIN;
#endif
/* send to local process */
- erts_send_message(p, rp, &rp_locks, msg, 0);
- if (!erts_use_sender_punish)
+ res = erts_send_message(p, rp, &rp_locks, msg, 0);
+ if (erts_use_sender_punish)
+ res *= 4;
+ else
res = 0;
- else {
-#ifdef ERTS_SMP
- res = rp->msg_inq.len*4;
- if (ERTS_PROC_LOCK_MAIN & rp_locks)
- res += rp->msg.len*4;
-#else
- res = rp->msg.len*4;
-#endif
- }
erts_smp_proc_unlock(rp,
p == rp
? (rp_locks & ~ERTS_PROC_LOCK_MAIN)
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index b10964da52..64065bafb8 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -419,7 +419,7 @@ erts_queue_dist_message(Process *rcvr,
}
/* Add a message last in message queue */
-static int
+static Sint
queue_message(Process *c_p,
Process* receiver,
ErtsProcLocks *receiver_locks,
@@ -432,6 +432,7 @@ queue_message(Process *c_p,
#endif
)
{
+ Sint res;
ErlMessage* mp;
int locked_msgq = 0;
erts_aint_t state;
@@ -492,7 +493,10 @@ queue_message(Process *c_p,
mp->next = NULL;
mp->data.heap_frag = bp;
-#ifdef ERTS_SMP
+#ifndef ERTS_SMP
+ res = receiver->msg.len;
+#else
+ res = receiver->msg_inq.len;
if (*receiver_locks & ERTS_PROC_LOCK_MAIN) {
/*
* We move 'in queue' to 'private queue' and place
@@ -502,6 +506,7 @@ queue_message(Process *c_p,
* we don't need to include the 'in queue' in
* the root set when garbage collecting.
*/
+ res += receiver->msg.len;
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);
LINK_MESSAGE_PRIVQ(receiver, mp);
}
@@ -541,7 +546,7 @@ queue_message(Process *c_p,
#ifndef ERTS_SMP
ERTS_HOLE_CHECK(receiver);
#endif
- return 0;
+ return res;
}
void
@@ -899,7 +904,7 @@ erts_move_msg_attached_data_to_heap(Eterm **hpp, ErlOffHeap *ohp, ErlMessage *ms
* Send a local message when sender & receiver processes are known.
*/
-void
+Sint
erts_send_message(Process* sender,
Process* receiver,
ErtsProcLocks *receiver_locks,
@@ -909,6 +914,7 @@ erts_send_message(Process* sender,
Uint msize;
ErlHeapFragment* bp = NULL;
Eterm token = NIL;
+ Sint res = 0;
#ifdef USE_VM_PROBES
DTRACE_CHARBUF(sender_name, 64);
DTRACE_CHARBUF(receiver_name, 64);
@@ -995,15 +1001,17 @@ erts_send_message(Process* sender,
msize, tok_label, tok_lastcnt, tok_serial);
}
#endif
- erts_queue_message(receiver,
- receiver_locks,
- bp,
- message,
- token
+ res = queue_message(NULL,
+ receiver,
+ receiver_locks,
+ NULL,
+ bp,
+ message,
+ token
#ifdef USE_VM_PROBES
- , utag
+ , utag
#endif
- );
+ );
BM_SWAP_TIMER(send,system);
#ifdef HYBRID
} else {
@@ -1043,6 +1051,8 @@ erts_send_message(Process* sender,
LINK_MESSAGE(receiver, mp);
ACTIVATE(receiver);
+ res = receiver->msg.len;
+
erts_proc_notify_new_message(receiver);
if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
@@ -1050,7 +1060,6 @@ erts_send_message(Process* sender,
}
BM_SWAP_TIMER(send,system);
- return;
#else
} else if (sender == receiver) {
/* Drop message if receiver has a pending exit ... */
@@ -1094,12 +1103,13 @@ erts_send_message(Process* sender,
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);
LINK_MESSAGE_PRIVQ(receiver, mp);
+ res = receiver->msg.len;
+
if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
trace_receive(receiver, message);
}
}
BM_SWAP_TIMER(send,system);
- return;
} else {
#ifdef ERTS_SMP
ErlOffHeap *ohp;
@@ -1121,15 +1131,15 @@ erts_send_message(Process* sender,
BM_SWAP_TIMER(copy,send);
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
- queue_message(sender,
- receiver,
- receiver_locks,
- &state,
- bp,
- message,
- token
+ res = queue_message(sender,
+ receiver,
+ receiver_locks,
+ &state,
+ bp,
+ message,
+ token
#ifdef USE_VM_PROBES
- , NIL
+ , NIL
#endif
);
BM_SWAP_TIMER(send,system);
@@ -1161,7 +1171,7 @@ erts_send_message(Process* sender,
mp->next = NULL;
mp->data.attached = NULL;
LINK_MESSAGE(receiver, mp);
-
+ res = receiver->msg.len;
erts_proc_notify_new_message(receiver);
if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
@@ -1171,6 +1181,7 @@ erts_send_message(Process* sender,
#endif /* #ifndef ERTS_SMP */
#endif /* HYBRID */
}
+ return res;
}
/*
diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h
index 3e9a24ee81..bb4dbf0ef3 100644
--- a/erts/emulator/beam/erl_message.h
+++ b/erts/emulator/beam/erl_message.h
@@ -90,7 +90,7 @@ typedef struct {
ErlMessage* first;
ErlMessage** last; /* point to the last next pointer */
ErlMessage** save;
- int len; /* queue length */
+ Sint len; /* queue length */
/*
* The following two fields are used by the recv_mark/1 and
@@ -105,7 +105,7 @@ typedef struct {
typedef struct {
ErlMessage* first;
ErlMessage** last; /* point to the last next pointer */
- int len; /* queue length */
+ Sint len; /* queue length */
} ErlMessageInQueue;
#endif
@@ -234,7 +234,7 @@ void erts_queue_message(Process*, ErtsProcLocks*, ErlHeapFragment*, Eterm, Eterm
#endif
);
void erts_deliver_exit_message(Eterm, Process*, ErtsProcLocks *, Eterm, Eterm);
-void erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned);
+Sint erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned);
void erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp);
void erts_move_msg_mbuf_to_heap(Eterm**, ErlOffHeap*, ErlMessage *);