aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/beam/bif.c15
-rw-r--r--erts/emulator/beam/erl_message.c52
-rw-r--r--erts/emulator/beam/erl_message.h6
-rw-r--r--erts/emulator/beam/erl_process.c19
4 files changed, 42 insertions, 50 deletions
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index 6943c8852c..66f4259d20 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -2051,18 +2051,11 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) {
rp_locks |= ERTS_PROC_LOCK_MAIN;
#endif
/* send to local process */
- erts_send_message(p, rp, &rp_locks, msg, 0);
- if (!erts_use_sender_punish)
+ res = erts_send_message(p, rp, &rp_locks, msg, 0);
+ if (erts_use_sender_punish)
+ res *= 4;
+ else
res = 0;
- else {
-#ifdef ERTS_SMP
- res = rp->msg_inq.len*4;
- if (ERTS_PROC_LOCK_MAIN & rp_locks)
- res += rp->msg.len*4;
-#else
- res = rp->msg.len*4;
-#endif
- }
erts_smp_proc_unlock(rp,
p == rp
? (rp_locks & ~ERTS_PROC_LOCK_MAIN)
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index c98ca09f3f..531144bc4f 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -418,7 +418,7 @@ erts_queue_dist_message(Process *rcvr,
}
/* Add a message last in message queue */
-static int
+static Sint
queue_message(Process *c_p,
Process* receiver,
ErtsProcLocks *receiver_locks,
@@ -431,6 +431,7 @@ queue_message(Process *c_p,
#endif
)
{
+ Sint res;
ErlMessage* mp;
int locked_msgq = 0;
erts_aint_t state;
@@ -491,7 +492,10 @@ queue_message(Process *c_p,
mp->next = NULL;
mp->data.heap_frag = bp;
-#ifdef ERTS_SMP
+#ifndef ERTS_SMP
+ res = receiver->msg.len;
+#else
+ res = receiver->msg_inq.len;
if (*receiver_locks & ERTS_PROC_LOCK_MAIN) {
/*
* We move 'in queue' to 'private queue' and place
@@ -501,6 +505,7 @@ queue_message(Process *c_p,
* we don't need to include the 'in queue' in
* the root set when garbage collecting.
*/
+ res += receiver->msg.len;
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);
LINK_MESSAGE_PRIVQ(receiver, mp);
}
@@ -540,7 +545,7 @@ queue_message(Process *c_p,
#ifndef ERTS_SMP
ERTS_HOLE_CHECK(receiver);
#endif
- return 0;
+ return res;
}
void
@@ -864,7 +869,7 @@ erts_move_msg_attached_data_to_heap(Eterm **hpp, ErlOffHeap *ohp, ErlMessage *ms
* Send a local message when sender & receiver processes are known.
*/
-void
+Sint
erts_send_message(Process* sender,
Process* receiver,
ErtsProcLocks *receiver_locks,
@@ -874,6 +879,7 @@ erts_send_message(Process* sender,
Uint msize;
ErlHeapFragment* bp = NULL;
Eterm token = NIL;
+ Sint res = 0;
#ifdef USE_VM_PROBES
DTRACE_CHARBUF(sender_name, 64);
DTRACE_CHARBUF(receiver_name, 64);
@@ -960,15 +966,17 @@ erts_send_message(Process* sender,
msize, tok_label, tok_lastcnt, tok_serial);
}
#endif
- erts_queue_message(receiver,
- receiver_locks,
- bp,
- message,
- token
+ res = queue_message(NULL,
+ receiver,
+ receiver_locks,
+ NULL,
+ bp,
+ message,
+ token
#ifdef USE_VM_PROBES
- , utag
+ , utag
#endif
- );
+ );
BM_SWAP_TIMER(send,system);
} else if (sender == receiver) {
/* Drop message if receiver has a pending exit ... */
@@ -1012,12 +1020,13 @@ erts_send_message(Process* sender,
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);
LINK_MESSAGE_PRIVQ(receiver, mp);
+ res = receiver->msg.len;
+
if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
trace_receive(receiver, message);
}
}
BM_SWAP_TIMER(send,system);
- return;
} else {
#ifdef ERTS_SMP
ErlOffHeap *ohp;
@@ -1039,15 +1048,15 @@ erts_send_message(Process* sender,
BM_SWAP_TIMER(copy,send);
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
- queue_message(sender,
- receiver,
- receiver_locks,
- &state,
- bp,
- message,
- token
+ res = queue_message(sender,
+ receiver,
+ receiver_locks,
+ &state,
+ bp,
+ message,
+ token
#ifdef USE_VM_PROBES
- , NIL
+ , NIL
#endif
);
BM_SWAP_TIMER(send,system);
@@ -1079,7 +1088,7 @@ erts_send_message(Process* sender,
mp->next = NULL;
mp->data.attached = NULL;
LINK_MESSAGE(receiver, mp);
-
+ res = receiver->msg.len;
erts_proc_notify_new_message(receiver);
if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
@@ -1089,6 +1098,7 @@ erts_send_message(Process* sender,
#endif /* #ifndef ERTS_SMP */
return;
}
+ return res;
}
/*
diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h
index 3e9a24ee81..bb4dbf0ef3 100644
--- a/erts/emulator/beam/erl_message.h
+++ b/erts/emulator/beam/erl_message.h
@@ -90,7 +90,7 @@ typedef struct {
ErlMessage* first;
ErlMessage** last; /* point to the last next pointer */
ErlMessage** save;
- int len; /* queue length */
+ Sint len; /* queue length */
/*
* The following two fields are used by the recv_mark/1 and
@@ -105,7 +105,7 @@ typedef struct {
typedef struct {
ErlMessage* first;
ErlMessage** last; /* point to the last next pointer */
- int len; /* queue length */
+ Sint len; /* queue length */
} ErlMessageInQueue;
#endif
@@ -234,7 +234,7 @@ void erts_queue_message(Process*, ErtsProcLocks*, ErlHeapFragment*, Eterm, Eterm
#endif
);
void erts_deliver_exit_message(Eterm, Process*, ErtsProcLocks *, Eterm, Eterm);
-void erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned);
+Sint erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned);
void erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp);
void erts_move_msg_mbuf_to_heap(Eterm**, ErlOffHeap*, ErlMessage *);
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index afd2ddc69d..8153cdce1c 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -5001,7 +5001,9 @@ schedule_process(Process *p, erts_aint32_t state, int active_enq)
while (1) {
erts_aint32_t e;
n = e = a;
- ASSERT(!(a & ERTS_PSFLG_FREE));
+
+ if (a & ERTS_PSFLG_FREE)
+ return; /* We don't want to schedule free processes... */
n |= ERTS_PSFLG_ACTIVE;
if (!(a & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_RUNNING)))
n |= ERTS_PSFLG_IN_RUNQ;
@@ -5012,23 +5014,13 @@ schedule_process(Process *p, erts_aint32_t state, int active_enq)
return; /* Someone else activated process ... */
}
-#ifdef RRR_DEBUG
- if (active_enq)
- erts_fprintf(stderr, "! state=0x%x\n", n);
-#endif
-
if (erts_system_profile_flags.runnable_procs
&& !(a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) {
profile_runnable_proc(p, am_active);
}
- if ((n & ERTS_PSFLG_IN_RUNQ) && !(a & ERTS_PSFLG_IN_RUNQ)) {
-#ifdef RRR_DEBUG
- if (active_enq)
- erts_fprintf(stderr, "-->\n");
-#endif
+ if ((n & ERTS_PSFLG_IN_RUNQ) && !(a & ERTS_PSFLG_IN_RUNQ))
add2runq(p, n);
- }
}
void
@@ -5098,9 +5090,6 @@ resume_process(Process *p)
state = erts_smp_atomic32_read_band_mb(&p->state, ~ERTS_PSFLG_SUSPENDED);
state &= ~ERTS_PSFLG_SUSPENDED;
-#ifdef RRR_DEBUG
- erts_fprintf(stderr, "%T - state=0x%x\n", p->id, state);
-#endif
if ((state & (ERTS_PSFLG_EXITING
| ERTS_PSFLG_ACTIVE
| ERTS_PSFLG_IN_RUNQ