aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_message.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r--erts/emulator/beam/erl_message.c200
1 files changed, 112 insertions, 88 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index 4a797fbeae..b10964da52 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -297,38 +297,6 @@ erts_msg_distext2heap(Process *pp,
return THE_NON_VALUE;
}
-static ERTS_INLINE void
-notify_new_message(Process *receiver)
-{
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS
- & erts_proc_lc_my_proc_locks(receiver));
-
- ACTIVATE(receiver);
-
- switch (receiver->status) {
- case P_GARBING:
- switch (receiver->gcstatus) {
- case P_SUSPENDED:
- goto suspended;
- case P_WAITING:
- goto waiting;
- default:
- break;
- }
- break;
- case P_SUSPENDED:
- suspended:
- receiver->rstatus = P_RUNABLE;
- break;
- case P_WAITING:
- waiting:
- erts_add_to_runq(receiver);
- break;
- default:
- break;
- }
-}
-
void
erts_queue_dist_message(Process *rcvr,
ErtsProcLocks *rcvr_locks,
@@ -342,7 +310,7 @@ erts_queue_dist_message(Process *rcvr,
Sint tok_serial = 0;
#endif
#ifdef ERTS_SMP
- ErtsProcLocks need_locks;
+ erts_aint_t state;
#endif
ERTS_SMP_LC_ASSERT(*rcvr_locks == erts_proc_lc_my_proc_locks(rcvr));
@@ -350,20 +318,21 @@ erts_queue_dist_message(Process *rcvr,
mp = message_alloc();
#ifdef ERTS_SMP
- need_locks = ~(*rcvr_locks) & (ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS);
- if (need_locks) {
- *rcvr_locks |= need_locks;
- if (erts_smp_proc_trylock(rcvr, need_locks) == EBUSY) {
- if (need_locks == ERTS_PROC_LOCK_MSGQ) {
+ if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) {
+ if (erts_smp_proc_trylock(rcvr, ERTS_PROC_LOCK_MSGQ) == EBUSY) {
+ ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ;
+ if (*rcvr_locks & ERTS_PROC_LOCK_STATUS) {
erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_STATUS);
- need_locks = (ERTS_PROC_LOCK_MSGQ
- | ERTS_PROC_LOCK_STATUS);
+ need_locks |= ERTS_PROC_LOCK_STATUS;
}
erts_smp_proc_lock(rcvr, need_locks);
}
}
- if (ERTS_PROC_IS_EXITING(rcvr) || ERTS_PROC_PENDING_EXIT(rcvr)) {
+ state = erts_smp_atomic32_read_acqb(&rcvr->state);
+ if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) {
+ if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
+ erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
/* Drop message if receiver is exiting or has a pending exit ... */
if (is_not_nil(token)) {
ErlHeapFragment *heap_frag;
@@ -379,6 +348,8 @@ erts_queue_dist_message(Process *rcvr,
/* Ahh... need to decode it in order to trace it... */
ErlHeapFragment *mbuf;
Eterm msg;
+ if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
+ erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
message_free(mp);
msg = erts_msg_distext2heap(rcvr, rcvr_locks, &mbuf, &token, dist_ext);
if (is_value(msg))
@@ -440,26 +411,32 @@ erts_queue_dist_message(Process *rcvr,
mp->data.dist_ext = dist_ext;
LINK_MESSAGE(rcvr, mp);
- notify_new_message(rcvr);
+ if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
+ erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
+
+ erts_proc_notify_new_message(rcvr);
}
}
/* Add a message last in message queue */
-void
-erts_queue_message(Process* receiver,
- ErtsProcLocks *receiver_locks,
- ErlHeapFragment* bp,
- Eterm message,
- Eterm seq_trace_token
+static int
+queue_message(Process *c_p,
+ Process* receiver,
+ ErtsProcLocks *receiver_locks,
+ erts_aint32_t *receiver_state,
+ ErlHeapFragment* bp,
+ Eterm message,
+ Eterm seq_trace_token
#ifdef USE_VM_PROBES
, Eterm dt_utag
#endif
-)
+ )
{
ErlMessage* mp;
-#ifdef ERTS_SMP
- ErtsProcLocks need_locks;
-#else
+ int locked_msgq = 0;
+ erts_aint_t state;
+
+#ifndef ERTS_SMP
ASSERT(bp != NULL || receiver->mbuf == NULL);
#endif
@@ -467,31 +444,45 @@ erts_queue_message(Process* receiver,
mp = message_alloc();
+ if (receiver_state)
+ state = *receiver_state;
+ else
+ state = erts_smp_atomic32_read_acqb(&receiver->state);
+
#ifdef ERTS_SMP
- need_locks = ~(*receiver_locks) & (ERTS_PROC_LOCK_MSGQ
- | ERTS_PROC_LOCK_STATUS);
- if (need_locks) {
- *receiver_locks |= need_locks;
- if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) {
- if (need_locks == ERTS_PROC_LOCK_MSGQ) {
+
+ if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))
+ goto exiting;
+
+ if (!(*receiver_locks & ERTS_PROC_LOCK_MSGQ)) {
+ if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) {
+ ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ;
+ if (*receiver_locks & ERTS_PROC_LOCK_STATUS) {
erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS);
- need_locks = (ERTS_PROC_LOCK_MSGQ
- | ERTS_PROC_LOCK_STATUS);
+ need_locks |= ERTS_PROC_LOCK_STATUS;
}
erts_smp_proc_lock(receiver, need_locks);
}
+ locked_msgq = 1;
+ state = erts_smp_atomic32_read_nob(&receiver->state);
+ if (receiver_state)
+ *receiver_state = state;
}
- if (ERTS_PROC_IS_EXITING(receiver) || ERTS_PROC_PENDING_EXIT(receiver)) {
- /* Drop message if receiver is exiting or has a pending
- * exit ...
- */
+#endif
+
+ if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) {
+#ifdef ERTS_SMP
+ exiting:
+#endif
+ /* Drop message if receiver is exiting or has a pending exit... */
+ if (locked_msgq)
+ erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
if (bp)
free_message_buffer(bp);
message_free(mp);
- return;
+ return 0;
}
-#endif
ERL_MESSAGE_TERM(mp) = message;
ERL_MESSAGE_TOKEN(mp) = seq_trace_token;
@@ -514,12 +505,11 @@ erts_queue_message(Process* receiver,
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);
LINK_MESSAGE_PRIVQ(receiver, mp);
}
- else {
+ else
+#endif
+ {
LINK_MESSAGE(receiver, mp);
}
-#else
- LINK_MESSAGE(receiver, mp);
-#endif
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(message_queued)) {
@@ -539,15 +529,43 @@ erts_queue_message(Process* receiver,
tok_label, tok_lastcnt, tok_serial);
}
#endif
- notify_new_message(receiver);
- if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
+ if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE))
trace_receive(receiver, message);
- }
+
+ if (locked_msgq)
+ erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
+
+ erts_proc_notify_new_message(receiver);
#ifndef ERTS_SMP
ERTS_HOLE_CHECK(receiver);
#endif
+ return 0;
+}
+
+void
+erts_queue_message(Process* receiver,
+ ErtsProcLocks *receiver_locks,
+ ErlHeapFragment* bp,
+ Eterm message,
+ Eterm seq_trace_token
+#ifdef USE_VM_PROBES
+ , Eterm dt_utag
+#endif
+ )
+{
+ queue_message(NULL,
+ receiver,
+ receiver_locks,
+ NULL,
+ bp,
+ message,
+ seq_trace_token
+#ifdef USE_VM_PROBES
+ , dt_utag
+#endif
+ );
}
void
@@ -1025,11 +1043,8 @@ erts_send_message(Process* sender,
LINK_MESSAGE(receiver, mp);
ACTIVATE(receiver);
- if (receiver->status == P_WAITING) {
- erts_add_to_runq(receiver);
- } else if (receiver->status == P_SUSPENDED) {
- receiver->rstatus = P_RUNABLE;
- }
+ erts_proc_notify_new_message(receiver);
+
if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
trace_receive(receiver, message);
}
@@ -1089,21 +1104,34 @@ erts_send_message(Process* sender,
#ifdef ERTS_SMP
ErlOffHeap *ohp;
Eterm *hp;
+ erts_aint32_t state;
+
BM_SWAP_TIMER(send,size);
msize = size_object(message);
BM_SWAP_TIMER(size,send);
- hp = erts_alloc_message_heap(msize,&bp,&ohp,receiver,receiver_locks);
+ hp = erts_alloc_message_heap_state(msize,
+ &bp,
+ &ohp,
+ receiver,
+ receiver_locks,
+ &state);
BM_SWAP_TIMER(send,copy);
message = copy_struct(message, msize, &hp, ohp);
BM_MESSAGE_COPIED(msz);
BM_SWAP_TIMER(copy,send);
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
- erts_queue_message(receiver, receiver_locks, bp, message, token
+ queue_message(sender,
+ receiver,
+ receiver_locks,
+ &state,
+ bp,
+ message,
+ token
#ifdef USE_VM_PROBES
- , NIL
+ , NIL
#endif
- );
+ );
BM_SWAP_TIMER(send,system);
#else
ErlMessage* mp = message_alloc();
@@ -1134,17 +1162,13 @@ erts_send_message(Process* sender,
mp->data.attached = NULL;
LINK_MESSAGE(receiver, mp);
- if (receiver->status == P_WAITING) {
- erts_add_to_runq(receiver);
- } else if (receiver->status == P_SUSPENDED) {
- receiver->rstatus = P_RUNABLE;
- }
+ erts_proc_notify_new_message(receiver);
+
if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
trace_receive(receiver, message);
}
BM_SWAP_TIMER(send,system);
#endif /* #ifndef ERTS_SMP */
- return;
#endif /* HYBRID */
}
}