diff options
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r-- | erts/emulator/beam/erl_message.c | 200 |
1 files changed, 112 insertions, 88 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 4a797fbeae..b10964da52 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -297,38 +297,6 @@ erts_msg_distext2heap(Process *pp, return THE_NON_VALUE; } -static ERTS_INLINE void -notify_new_message(Process *receiver) -{ - ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS - & erts_proc_lc_my_proc_locks(receiver)); - - ACTIVATE(receiver); - - switch (receiver->status) { - case P_GARBING: - switch (receiver->gcstatus) { - case P_SUSPENDED: - goto suspended; - case P_WAITING: - goto waiting; - default: - break; - } - break; - case P_SUSPENDED: - suspended: - receiver->rstatus = P_RUNABLE; - break; - case P_WAITING: - waiting: - erts_add_to_runq(receiver); - break; - default: - break; - } -} - void erts_queue_dist_message(Process *rcvr, ErtsProcLocks *rcvr_locks, @@ -342,7 +310,7 @@ erts_queue_dist_message(Process *rcvr, Sint tok_serial = 0; #endif #ifdef ERTS_SMP - ErtsProcLocks need_locks; + erts_aint_t state; #endif ERTS_SMP_LC_ASSERT(*rcvr_locks == erts_proc_lc_my_proc_locks(rcvr)); @@ -350,20 +318,21 @@ erts_queue_dist_message(Process *rcvr, mp = message_alloc(); #ifdef ERTS_SMP - need_locks = ~(*rcvr_locks) & (ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS); - if (need_locks) { - *rcvr_locks |= need_locks; - if (erts_smp_proc_trylock(rcvr, need_locks) == EBUSY) { - if (need_locks == ERTS_PROC_LOCK_MSGQ) { + if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) { + if (erts_smp_proc_trylock(rcvr, ERTS_PROC_LOCK_MSGQ) == EBUSY) { + ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ; + if (*rcvr_locks & ERTS_PROC_LOCK_STATUS) { erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_STATUS); - need_locks = (ERTS_PROC_LOCK_MSGQ - | ERTS_PROC_LOCK_STATUS); + need_locks |= ERTS_PROC_LOCK_STATUS; } erts_smp_proc_lock(rcvr, need_locks); } } - if (ERTS_PROC_IS_EXITING(rcvr) || ERTS_PROC_PENDING_EXIT(rcvr)) { + state = erts_smp_atomic32_read_acqb(&rcvr->state); + if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) { + if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) + erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); /* Drop message if receiver is exiting or has a pending exit ... */ if (is_not_nil(token)) { ErlHeapFragment *heap_frag; @@ -379,6 +348,8 @@ erts_queue_dist_message(Process *rcvr, /* Ahh... need to decode it in order to trace it... */ ErlHeapFragment *mbuf; Eterm msg; + if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) + erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); message_free(mp); msg = erts_msg_distext2heap(rcvr, rcvr_locks, &mbuf, &token, dist_ext); if (is_value(msg)) @@ -440,26 +411,32 @@ erts_queue_dist_message(Process *rcvr, mp->data.dist_ext = dist_ext; LINK_MESSAGE(rcvr, mp); - notify_new_message(rcvr); + if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) + erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); + + erts_proc_notify_new_message(rcvr); } } /* Add a message last in message queue */ -void -erts_queue_message(Process* receiver, - ErtsProcLocks *receiver_locks, - ErlHeapFragment* bp, - Eterm message, - Eterm seq_trace_token +static int +queue_message(Process *c_p, + Process* receiver, + ErtsProcLocks *receiver_locks, + erts_aint32_t *receiver_state, + ErlHeapFragment* bp, + Eterm message, + Eterm seq_trace_token #ifdef USE_VM_PROBES , Eterm dt_utag #endif -) + ) { ErlMessage* mp; -#ifdef ERTS_SMP - ErtsProcLocks need_locks; -#else + int locked_msgq = 0; + erts_aint_t state; + +#ifndef ERTS_SMP ASSERT(bp != NULL || receiver->mbuf == NULL); #endif @@ -467,31 +444,45 @@ erts_queue_message(Process* receiver, mp = message_alloc(); + if (receiver_state) + state = *receiver_state; + else + state = erts_smp_atomic32_read_acqb(&receiver->state); + #ifdef ERTS_SMP - need_locks = ~(*receiver_locks) & (ERTS_PROC_LOCK_MSGQ - | ERTS_PROC_LOCK_STATUS); - if (need_locks) { - *receiver_locks |= need_locks; - if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) { - if (need_locks == ERTS_PROC_LOCK_MSGQ) { + + if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) + goto exiting; + + if (!(*receiver_locks & ERTS_PROC_LOCK_MSGQ)) { + if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) { + ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ; + if (*receiver_locks & ERTS_PROC_LOCK_STATUS) { erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS); - need_locks = (ERTS_PROC_LOCK_MSGQ - | ERTS_PROC_LOCK_STATUS); + need_locks |= ERTS_PROC_LOCK_STATUS; } erts_smp_proc_lock(receiver, need_locks); } + locked_msgq = 1; + state = erts_smp_atomic32_read_nob(&receiver->state); + if (receiver_state) + *receiver_state = state; } - if (ERTS_PROC_IS_EXITING(receiver) || ERTS_PROC_PENDING_EXIT(receiver)) { - /* Drop message if receiver is exiting or has a pending - * exit ... - */ +#endif + + if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) { +#ifdef ERTS_SMP + exiting: +#endif + /* Drop message if receiver is exiting or has a pending exit... */ + if (locked_msgq) + erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ); if (bp) free_message_buffer(bp); message_free(mp); - return; + return 0; } -#endif ERL_MESSAGE_TERM(mp) = message; ERL_MESSAGE_TOKEN(mp) = seq_trace_token; @@ -514,12 +505,11 @@ erts_queue_message(Process* receiver, ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); LINK_MESSAGE_PRIVQ(receiver, mp); } - else { + else +#endif + { LINK_MESSAGE(receiver, mp); } -#else - LINK_MESSAGE(receiver, mp); -#endif #ifdef USE_VM_PROBES if (DTRACE_ENABLED(message_queued)) { @@ -539,15 +529,43 @@ erts_queue_message(Process* receiver, tok_label, tok_lastcnt, tok_serial); } #endif - notify_new_message(receiver); - if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) trace_receive(receiver, message); - } + + if (locked_msgq) + erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ); + + erts_proc_notify_new_message(receiver); #ifndef ERTS_SMP ERTS_HOLE_CHECK(receiver); #endif + return 0; +} + +void +erts_queue_message(Process* receiver, + ErtsProcLocks *receiver_locks, + ErlHeapFragment* bp, + Eterm message, + Eterm seq_trace_token +#ifdef USE_VM_PROBES + , Eterm dt_utag +#endif + ) +{ + queue_message(NULL, + receiver, + receiver_locks, + NULL, + bp, + message, + seq_trace_token +#ifdef USE_VM_PROBES + , dt_utag +#endif + ); } void @@ -1025,11 +1043,8 @@ erts_send_message(Process* sender, LINK_MESSAGE(receiver, mp); ACTIVATE(receiver); - if (receiver->status == P_WAITING) { - erts_add_to_runq(receiver); - } else if (receiver->status == P_SUSPENDED) { - receiver->rstatus = P_RUNABLE; - } + erts_proc_notify_new_message(receiver); + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { trace_receive(receiver, message); } @@ -1089,21 +1104,34 @@ erts_send_message(Process* sender, #ifdef ERTS_SMP ErlOffHeap *ohp; Eterm *hp; + erts_aint32_t state; + BM_SWAP_TIMER(send,size); msize = size_object(message); BM_SWAP_TIMER(size,send); - hp = erts_alloc_message_heap(msize,&bp,&ohp,receiver,receiver_locks); + hp = erts_alloc_message_heap_state(msize, + &bp, + &ohp, + receiver, + receiver_locks, + &state); BM_SWAP_TIMER(send,copy); message = copy_struct(message, msize, &hp, ohp); BM_MESSAGE_COPIED(msz); BM_SWAP_TIMER(copy,send); DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); - erts_queue_message(receiver, receiver_locks, bp, message, token + queue_message(sender, + receiver, + receiver_locks, + &state, + bp, + message, + token #ifdef USE_VM_PROBES - , NIL + , NIL #endif - ); + ); BM_SWAP_TIMER(send,system); #else ErlMessage* mp = message_alloc(); @@ -1134,17 +1162,13 @@ erts_send_message(Process* sender, mp->data.attached = NULL; LINK_MESSAGE(receiver, mp); - if (receiver->status == P_WAITING) { - erts_add_to_runq(receiver); - } else if (receiver->status == P_SUSPENDED) { - receiver->rstatus = P_RUNABLE; - } + erts_proc_notify_new_message(receiver); + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { trace_receive(receiver, message); } BM_SWAP_TIMER(send,system); #endif /* #ifndef ERTS_SMP */ - return; #endif /* HYBRID */ } } |