diff options
-rw-r--r-- | erts/emulator/beam/beam_bif_load.c | 6 | ||||
-rw-r--r-- | erts/emulator/beam/beam_debug.c | 2 | ||||
-rw-r--r-- | erts/emulator/beam/dist.c | 2 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db.c | 2 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.c | 144 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.h | 27 | ||||
-rw-r--r-- | erts/emulator/beam/erl_nif.c | 7 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.c | 197 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.h | 12 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 23 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 7 |
11 files changed, 318 insertions, 111 deletions
diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index 5c76aafae7..d9312f4df8 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -1757,11 +1757,11 @@ BIF_RETTYPE erts_internal_purge_module_2(BIF_ALIST_2) release_literal_areas.last = ref; } erts_mtx_unlock(&release_literal_areas.mtx); - erts_queue_message(erts_literal_area_collector, + erts_queue_proc_message(BIF_P, + erts_literal_area_collector, 0, erts_alloc_message(0, NULL), - am_copy_literals, - BIF_P->common.id); + am_copy_literals); } return ret; diff --git a/erts/emulator/beam/beam_debug.c b/erts/emulator/beam/beam_debug.c index 5eb68b817e..b8a8d06315 100644 --- a/erts/emulator/beam/beam_debug.c +++ b/erts/emulator/beam/beam_debug.c @@ -1191,7 +1191,7 @@ dirty_send_message(Process *c_p, Eterm to, Eterm tag) mp = erts_alloc_message_heap(rp, &rp_locks, 3, &hp, &ohp); msg = TUPLE2(hp, tag, c_p->common.id); - erts_queue_message(rp, rp_locks, mp, msg, c_p->common.id); + erts_queue_proc_message(c_p, rp, rp_locks, mp, msg); if (rp == real_c_p) rp_locks &= ~c_p_locks; diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 24e1cac7bf..026f0a62d4 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3587,7 +3587,7 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) dhandle = erts_build_dhandle(&hp, ohp, dep); msg = TUPLE4(hp, am_auto_connect, dep->sysname, make_small(conn_id), dhandle); - erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); + erts_queue_proc_message(proc, net_kernel, nk_locks, mp, msg); erts_proc_unlock(net_kernel, nk_locks); } diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c index f7ee408991..ca2ebb7c27 100644 --- a/erts/emulator/beam/erl_db.c +++ b/erts/emulator/beam/erl_db.c @@ -3547,7 +3547,7 @@ send_ets_transfer_message(Process *c_p, Process *proc, hd_copy = copy_struct(heir_data, hd_sz, &hp, ohp); sender = c_p->common.id; msg = TUPLE4(hp, am_ETS_TRANSFER, tid, sender, hd_copy); - erts_queue_message(proc, *locks, mp, msg, sender); + erts_queue_proc_message(c_p, proc, *locks, mp, msg); } diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 8eae85ccdd..bea7a0fe86 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -303,8 +303,7 @@ erts_queue_dist_message(Process *rcvr, erts_cleanup_messages(mp); } else { - - LINK_MESSAGE(rcvr, mp, &mp->next, 1); + LINK_MESSAGE(rcvr, mp); if (rcvr_locks & ERTS_PROC_LOCK_MAIN) erts_proc_sig_fetch(rcvr); @@ -317,9 +316,8 @@ erts_queue_dist_message(Process *rcvr, } /* Add messages last in message queue */ -static Sint +static void queue_messages(Process* receiver, - erts_aint32_t *receiver_state, ErtsProcLocks receiver_locks, ErtsMessage* first, ErtsMessage** last, @@ -328,51 +326,51 @@ queue_messages(Process* receiver, int locked_msgq = 0; erts_aint32_t state; - ASSERT(is_value(ERL_MESSAGE_TERM(first))); - ASSERT(is_value(ERL_MESSAGE_FROM(first))); - ASSERT(ERL_MESSAGE_TOKEN(first) == am_undefined || - ERL_MESSAGE_TOKEN(first) == NIL || - is_tuple(ERL_MESSAGE_TOKEN(first))); - -#ifdef ERTS_ENABLE_LOCK_CHECK - ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(receiver) < ERTS_PROC_LOCK_MSGQ || - receiver_locks == erts_proc_lc_my_proc_locks(receiver)); +#ifdef DEBUG + { + ErtsMessage* fmsg = ERTS_SIG_IS_MSG(first) ? first : first->next; + ASSERT(fmsg); + ASSERT(is_value(ERL_MESSAGE_TERM(fmsg))); + ASSERT(is_value(ERL_MESSAGE_FROM(fmsg))); + ASSERT(ERL_MESSAGE_TOKEN(fmsg) == am_undefined || + ERL_MESSAGE_TOKEN(fmsg) == NIL || + is_tuple(ERL_MESSAGE_TOKEN(fmsg))); + } #endif - if (!(receiver_locks & ERTS_PROC_LOCK_MSGQ)) { - if (erts_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) { - ErtsProcLocks need_locks; + ERTS_LC_ASSERT((erts_proc_lc_my_proc_locks(receiver) & ERTS_PROC_LOCK_MSGQ) + == (receiver_locks & ERTS_PROC_LOCK_MSGQ)); - if (receiver_state) - state = *receiver_state; - else - state = erts_atomic32_read_nob(&receiver->state); - if (state & ERTS_PSFLG_EXITING) - goto exiting; - - need_locks = receiver_locks & ERTS_PROC_LOCKS_HIGHER_THAN(ERTS_PROC_LOCK_MSGQ); - if (need_locks) { - erts_proc_unlock(receiver, need_locks); - } - need_locks |= ERTS_PROC_LOCK_MSGQ; - erts_proc_lock(receiver, need_locks); - } + if (!(receiver_locks & ERTS_PROC_LOCK_MSGQ)) { + erts_proc_lock(receiver, ERTS_PROC_LOCK_MSGQ); locked_msgq = 1; } - state = erts_atomic32_read_nob(&receiver->state); if (state & ERTS_PSFLG_EXITING) { - exiting: /* Drop message if receiver is exiting or has a pending exit... */ if (locked_msgq) - erts_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ); + erts_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ); + if (ERTS_SIG_IS_NON_MSG(first)) { + ErtsSchedulerData* esdp = erts_get_scheduler_data(); + ASSERT(esdp); + ASSERT(!esdp->pending_signal.sig); + esdp->pending_signal.sig = (ErtsSignal*) first; + esdp->pending_signal.to = receiver->common.id; + first = first->next; + } erts_cleanup_messages(first); - return 0; + return; } - LINK_MESSAGE(receiver, first, last, len); + if (last == &first->next) { + ASSERT(len == 1); + LINK_MESSAGE(receiver, first); + } + else { + erts_enqueue_signals(receiver, first, last, NULL, len, state); + } if (receiver_locks & ERTS_PROC_LOCK_MAIN) erts_proc_sig_fetch(receiver); @@ -382,35 +380,67 @@ queue_messages(Process* receiver, } erts_proc_notify_new_message(receiver, receiver_locks); - return 0; } -static Sint -queue_message(Process* receiver, - erts_aint32_t *receiver_state, - ErtsProcLocks receiver_locks, - ErtsMessage* mp, Eterm msg, Eterm from) +static ERTS_INLINE +ErtsMessage* prepend_pending_sig_maybe(Process* sender, Process* receiver, + ErtsMessage* mp) { - ERL_MESSAGE_TERM(mp) = msg; - ERL_MESSAGE_FROM(mp) = from; - return queue_messages(receiver, receiver_state, receiver_locks, - mp, &mp->next, 1); + ErtsSchedulerData* esdp = sender->scheduler_data; + ErtsSignal* pend_sig; + + if (!esdp || esdp->pending_signal.to != receiver->common.id) + return mp; + + pend_sig = esdp->pending_signal.sig; + + ASSERT(esdp->pending_signal.dbg_from == sender); + esdp->pending_signal.sig = NULL; + esdp->pending_signal.to = THE_NON_VALUE; + pend_sig->common.next = mp; + pend_sig->common.specific.next = NULL; + return (ErtsMessage*) pend_sig; } -Sint +/** + * + * @brief Send one message from *NOT* a local process. + * + */ +void erts_queue_message(Process* receiver, ErtsProcLocks receiver_locks, ErtsMessage* mp, Eterm msg, Eterm from) { - return queue_message(receiver, NULL, receiver_locks, mp, msg, from); + ASSERT(is_not_internal_pid(from)); + ERL_MESSAGE_TERM(mp) = msg; + ERL_MESSAGE_FROM(mp) = from; + queue_messages(receiver, receiver_locks, mp, &mp->next, 1); } +/** + * @brief Send one message from a local process. + */ +void +erts_queue_proc_message(Process* sender, + Process* receiver, ErtsProcLocks receiver_locks, + ErtsMessage* mp, Eterm msg) +{ + ERL_MESSAGE_TERM(mp) = msg; + ERL_MESSAGE_FROM(mp) = sender->common.id; + queue_messages(receiver, receiver_locks, + prepend_pending_sig_maybe(sender, receiver, mp), + &mp->next, 1); +} -Sint -erts_queue_messages(Process* receiver, ErtsProcLocks receiver_locks, - ErtsMessage* first, ErtsMessage** last, Uint len) + +void +erts_queue_proc_messages(Process* sender, + Process* receiver, ErtsProcLocks receiver_locks, + ErtsMessage* first, ErtsMessage** last, Uint len) { - return queue_messages(receiver, NULL, receiver_locks, - first, last, len); + queue_messages(receiver, receiver_locks, + prepend_pending_sig_maybe(sender, receiver, first), + last, len); } void @@ -547,7 +577,7 @@ erts_try_alloc_message_on_heap(Process *pp, * Send a local message when sender & receiver processes are known. */ -Sint +void erts_send_message(Process* sender, Process* receiver, ErtsProcLocks *receiver_locks, @@ -558,7 +588,6 @@ erts_send_message(Process* sender, ErtsMessage* mp; ErlOffHeap *ohp; Eterm token = NIL; - Sint res = 0; #ifdef USE_VM_PROBES DTRACE_CHARBUF(sender_name, 64); DTRACE_CHARBUF(receiver_name, 64); @@ -701,13 +730,8 @@ erts_send_message(Process* sender, #ifdef USE_VM_PROBES ERL_MESSAGE_DT_UTAG(mp) = utag; #endif - res = queue_message(receiver, - &receiver_state, - *receiver_locks, - mp, message, - sender->common.id); - return res; + erts_queue_proc_message(sender, receiver, *receiver_locks, mp, message); } diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index ee87297ba4..d120111634 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -229,7 +229,7 @@ typedef union { typedef struct { /* pointers to next pointers pointing to... */ ErtsMessage **next; /* ... next (non-message) signal */ - ErtsMessage **last; /* ... next (non-message) signal */ + ErtsMessage **last; /* ... last (non-message) signal */ } ErtsMsgQNMSigs; /* Size of default message buffer (erl_message.c) */ @@ -296,8 +296,11 @@ typedef struct { typedef struct { ErtsMessage* first; ErtsMessage** last; /* point to the last next pointer */ - Sint len; /* queue length */ + Sint len; /* number of messages in queue */ ErtsMsgQNMSigs nmsigs; +#ifdef ERTS_PROC_SIG_HARD_DEBUG + int may_contain_heap_terms; +#endif } ErtsSignalInQueue; typedef struct erl_trace_message_queue__ { @@ -364,13 +367,14 @@ typedef struct erl_trace_message_queue__ { #endif -/* Add message last_msg in message queue */ -#define LINK_MESSAGE(p, first_msg, last_msg, num_msgs) \ +/* Add one message last in message queue */ +#define LINK_MESSAGE(p, msg) \ do { \ + ASSERT(ERTS_SIG_IS_MSG(msg)); \ ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE__((p), "before"); \ - *(p)->sig_inq.last = (first_msg); \ - (p)->sig_inq.last = (last_msg); \ - (p)->sig_inq.len += (num_msgs); \ + *(p)->sig_inq.last = (msg); \ + (p)->sig_inq.last = &(msg)->next; \ + (p)->sig_inq.len++; \ ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE__((p), "before"); \ } while(0) @@ -437,11 +441,12 @@ ErlHeapFragment* erts_resize_message_buffer(ErlHeapFragment *, Uint, Eterm *, Uint); void free_message_buffer(ErlHeapFragment *); void erts_queue_dist_message(Process*, ErtsProcLocks, ErtsDistExternal *, Eterm, Eterm); -Sint erts_queue_message(Process*, ErtsProcLocks,ErtsMessage*, Eterm, Eterm); -Sint erts_queue_messages(Process*, ErtsProcLocks, - ErtsMessage*, ErtsMessage**, Uint); +void erts_queue_message(Process*, ErtsProcLocks,ErtsMessage*, Eterm, Eterm); +void erts_queue_proc_message(Process* from,Process* to, ErtsProcLocks,ErtsMessage*, Eterm); +void erts_queue_proc_messages(Process* from, Process* to, ErtsProcLocks, + ErtsMessage*, ErtsMessage**, Uint); void erts_deliver_exit_message(Eterm, Process*, ErtsProcLocks *, Eterm, Eterm); -Sint erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned); +void erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned); void erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp); Uint erts_msg_attached_data_size_aux(ErtsMessage *msg); diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c index 260656e2d3..e208792868 100644 --- a/erts/emulator/beam/erl_nif.c +++ b/erts/emulator/beam/erl_nif.c @@ -665,7 +665,7 @@ int erts_flush_trace_messages(Process *c_p, ErtsProcLocks c_p_locks) rp_locks = 0; if (rp->common.id == c_p->common.id) rp_locks = c_p_locks; - erts_queue_messages(rp, rp_locks, first, last, len); + erts_queue_proc_messages(c_p, rp, rp_locks, first, last, len); if (rp->common.id == c_p->common.id) rp_locks &= ~c_p_locks; if (rp_locks) @@ -856,7 +856,10 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid, } } - erts_queue_message(rp, rp_locks, mp, msg, from); + if (c_p) + erts_queue_proc_message(c_p, rp, rp_locks, mp, msg); + else + erts_queue_message(rp, rp_locks, mp, msg, from); done: if (c_p == rp) diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 6e940eb3f7..a2e6f1d39d 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -308,9 +308,8 @@ destroy_sig_group_leader(ErtsSigGroupLeader *sgl) } static ERTS_INLINE void -sig_enqueue_trace(Process *c_p, ErtsMessage *sig, int op, - Process *rp, ErtsMessage **first, - ErtsMessage **last, ErtsMessage ***last_next) +sig_enqueue_trace(Process *c_p, ErtsMessage **sigp, int op, + Process *rp, ErtsMessage ***last_next) { switch (op) { case ERTS_SIG_Q_OP_LINK: @@ -326,12 +325,11 @@ sig_enqueue_trace(Process *c_p, ErtsMessage *sig, int op, * Prepend a trace-change-state signal before the * link signal... */ - tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_TRACE_CHANGE_STATE, ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO, 0); ti = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsSigTraceInfo)); - ti->common.next = *last; + ti->common.next = *sigp; ti->common.specific.next = &ti->common.next; ti->common.tag = tag; ti->flags_on = ERTS_TRACE_FLAGS(c_p) & TRACEE_FLAGS; @@ -344,8 +342,9 @@ sig_enqueue_trace(Process *c_p, ErtsMessage *sig, int op, erts_proc_unlock(c_p, ERTS_PROC_LOCKS_ALL_MINOR); } erts_tracer_update(&ti->tracer, ERTS_TRACER(c_p)); - *first = (ErtsMessage *) ti; - *last_next = &ti->common.next; + *sigp = (ErtsMessage *) ti; + if (!*last_next || *last_next == sigp) + *last_next = &ti->common.next; } break; @@ -354,6 +353,7 @@ sig_enqueue_trace(Process *c_p, ErtsMessage *sig, int op, case ERTS_SIG_Q_OP_EXIT_LINKED: if (DTRACE_ENABLED(process_exit_signal)) { + ErtsMessage* sig = *sigp; Uint16 type = ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag); Eterm reason, from; @@ -430,9 +430,24 @@ sig_enqueue_trace_cleanup(ErtsMessage *first, ErtsSignal *sig, ErtsMessage *last } } +#ifdef DEBUG +static int dbg_count_nmsigs(ErtsMessage *first) +{ + ErtsMessage *sig; + int cnt = 0; + + for (sig = first; sig; sig = sig->next) { + if (ERTS_SIG_IS_NON_MSG(sig)) + ++cnt; + } + return cnt; +} +#endif + static ERTS_INLINE erts_aint32_t enqueue_signals(Process *rp, ErtsMessage *first, - ErtsMessage *last, ErtsMessage **last_next, + ErtsMessage **last, ErtsMessage **last_next, + Uint num_msgs, erts_aint32_t in_state) { erts_aint32_t state = in_state; @@ -442,13 +457,23 @@ enqueue_signals(Process *rp, ErtsMessage *first, ASSERT(!*this); *this = first; - rp->sig_inq.last = &last->next; + rp->sig_inq.last = last; if (!rp->sig_inq.nmsigs.next) { ASSERT(!rp->sig_inq.nmsigs.last); - rp->sig_inq.nmsigs.next = this; + if (ERTS_SIG_IS_NON_MSG(first)) { + rp->sig_inq.nmsigs.next = this; + } + else if (last_next) { + ASSERT(first->next && ERTS_SIG_IS_NON_MSG(first->next)); + rp->sig_inq.nmsigs.next = &first->next; + } + else + goto no_nmsig; + state = erts_atomic32_read_bor_nob(&rp->state, ERTS_PSFLG_SIG_IN_Q); + no_nmsig: ASSERT(!(state & ERTS_PSFLG_SIG_IN_Q)); } else { @@ -459,23 +484,41 @@ enqueue_signals(Process *rp, ErtsMessage *first, ASSERT(sig && !sig->common.specific.next); ASSERT(state & ERTS_PSFLG_SIG_IN_Q); - sig->common.specific.next = this; + if (ERTS_SIG_IS_NON_MSG(first)) { + sig->common.specific.next = this; + } + else if (last_next) { + ASSERT(first->next && ERTS_SIG_IS_NON_MSG(first->next)); + sig->common.specific.next = &first->next; + } } if (last_next) { - ASSERT(first != last); + ASSERT(dbg_count_nmsigs(first) >= 2); rp->sig_inq.nmsigs.last = last_next; } - else { - ASSERT(first == last); + else if (ERTS_SIG_IS_NON_MSG(first)) { + ASSERT(dbg_count_nmsigs(first) == 1); rp->sig_inq.nmsigs.last = this; } + else + ASSERT(dbg_count_nmsigs(first) == 0); + + rp->sig_inq.len += num_msgs; ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp); return state; } +erts_aint32_t erts_enqueue_signals(Process *rp, ErtsMessage *first, + ErtsMessage **last, ErtsMessage **last_next, + Uint num_msgs, + erts_aint32_t in_state) +{ + return enqueue_signals(rp, first, last, last_next, num_msgs, in_state); +} + static ERTS_INLINE void ensure_dirty_proc_handled(Eterm pid, erts_aint32_t state, @@ -516,26 +559,92 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op) { int res; Process *rp; - ErtsMessage *first, *last, **last_next; + ErtsMessage *first, *last, **last_next, **sigp; ErtsSchedulerData *esdp = erts_get_scheduler_data(); int is_normal_sched = !!esdp && esdp->type == ERTS_SCHED_NORMAL; erts_aint32_t state; + ErtsSignal *pend_sig; - if (is_normal_sched) - rp = erts_proc_lookup_raw(pid); - else - rp = erts_proc_lookup_raw_inc_refc(pid); + if (is_normal_sched) { + pend_sig = esdp->pending_signal.sig; + if (op == ERTS_SIG_Q_OP_MONITOR + && ((ErtsMonitor*)sig)->type == ERTS_MON_TYPE_PROC) { - if (!rp) - return 0; + if (!pend_sig) { + esdp->pending_signal.sig = sig; + esdp->pending_signal.to = pid; +#ifdef DEBUG + esdp->pending_signal.dbg_from = esdp->current_process; +#endif + return 1; + } + ASSERT(esdp->pending_signal.dbg_from == esdp->current_process); + if (pend_sig != sig) { + /* Switch them and send previously pending signal instead */ + Eterm pend_to = esdp->pending_signal.to; + esdp->pending_signal.sig = sig; + esdp->pending_signal.to = pid; + sig = pend_sig; + pid = pend_to; + } + else { + /* Caller wants to flush pending signal */ + ASSERT(pid == esdp->pending_signal.to); + esdp->pending_signal.sig = NULL; + esdp->pending_signal.to = THE_NON_VALUE; +#ifdef DEBUG + esdp->pending_signal.dbg_from = NULL; +#endif + pend_sig = NULL; + } + rp = erts_proc_lookup_raw(pid); + if (!rp) { + erts_proc_sig_send_monitor_down((ErtsMonitor*)sig, am_noproc); + return 1; + } + } + else if (pend_sig && pid == esdp->pending_signal.to) { + /* Flush pending signal to maintain signal order */ + esdp->pending_signal.sig = NULL; + esdp->pending_signal.to = THE_NON_VALUE; + + rp = erts_proc_lookup_raw(pid); + if (!rp) { + erts_proc_sig_send_monitor_down((ErtsMonitor*)pend_sig, am_noproc); + return 0; + } + + /* Prepend pending signal */ + pend_sig->common.next = (ErtsMessage*) sig; + pend_sig->common.specific.next = &pend_sig->common.next; + first = (ErtsMessage*) pend_sig; + last = (ErtsMessage*) sig; + sigp = last_next = &pend_sig->common.next; + goto first_last_done; + } + else { + pend_sig = NULL; + rp = erts_proc_lookup_raw(pid); + if (!rp) + return 0; + } + } + else { + rp = erts_proc_lookup_raw_inc_refc(pid); + if (!rp) + return 0; + pend_sig = NULL; + } - sig->common.specific.next = NULL; first = last = (ErtsMessage *) sig; last_next = NULL; + sigp = &first; + +first_last_done: + sig->common.specific.next = NULL; /* may add signals before and/or after sig */ - sig_enqueue_trace(c_p, first, op, rp, - &first, &last, &last_next); + sig_enqueue_trace(c_p, sigp, op, rp, &last_next); last->next = NULL; @@ -546,7 +655,7 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op) if (ERTS_PSFLG_FREE & state) res = 0; else { - state = enqueue_signals(rp, first, last, last_next, state); + state = enqueue_signals(rp, first, &last->next, last_next, 0, state); if (ERTS_UNLIKELY(op == ERTS_SIG_Q_OP_PROCESS_INFO)) check_push_msgq_len_offs_marker(rp, sig); res = !0; @@ -554,8 +663,21 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op) erts_proc_unlock(rp, ERTS_PROC_LOCK_MSGQ); - if (res == 0) + if (res == 0) { + if (pend_sig) { + if (sig == pend_sig) { + /* We did a switch, callers signal is now pending (still ok) */ + ASSERT(esdp->pending_signal.sig); + res = 1; + } + else { + ASSERT(first == (ErtsMessage*)pend_sig); + first = first->next; + } + erts_proc_sig_send_monitor_down((ErtsMonitor*)pend_sig, am_noproc); + } sig_enqueue_trace_cleanup(first, sig, last); + } if (!(state & (ERTS_PSFLG_EXITING | ERTS_PSFLG_ACTIVE_SYS @@ -572,6 +694,24 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op) return res; } +void erts_proc_sig_send_pending(ErtsSchedulerData* esdp) +{ + ErtsSignal* sig = esdp->pending_signal.sig; + int op; + + ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL); + ASSERT(sig); + ASSERT(is_internal_pid(esdp->pending_signal.to)); + + op = ERTS_SIG_Q_OP_MONITOR; + ASSERT(op == ERTS_PROC_SIG_OP(sig->common.tag)); + + if (!proc_queue_signal(NULL, esdp->pending_signal.to, sig, op)) { + ErtsMonitor* mon = (ErtsMonitor*)sig; + erts_proc_sig_send_monitor_down(mon, am_noproc); + } +} + static int maybe_elevate_sig_handling_prio(Process *c_p, Eterm other) { @@ -1404,8 +1544,7 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref) /* It wasn't alive; reply to ourselves... */ mp->next = NULL; mp->data.attached = ERTS_MSG_COMBINED_HFRAG; - erts_queue_message(c_p, ERTS_PROC_LOCK_MAIN, - mp, msg, am_system); + erts_queue_message(c_p, ERTS_PROC_LOCK_MAIN, mp, msg, am_system); } } @@ -2490,7 +2629,7 @@ handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing, if (is_alive) erts_factory_trim_and_close(&hfact, &msg, 1); - erts_queue_message(rp, locks, mp, msg, c_p->common.id); + erts_queue_proc_message(c_p, rp, locks, mp, msg); if (!is_alive && locks) erts_proc_unlock(rp, locks); diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index d250ad820f..8b7cd35f61 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -733,6 +733,18 @@ Sint erts_proc_sig_privqs_len(Process *c_p); +/* SVERK: Doc me up! */ +erts_aint32_t +erts_enqueue_signals(Process *rp, ErtsMessage *first, + ErtsMessage **last, ErtsMessage **last_next, + Uint msg_cnt, + erts_aint32_t in_state); + +/* SVERK: Doc me up! */ +void +erts_proc_sig_send_pending(ErtsSchedulerData* esdp); + + typedef struct { Uint size; ErtsMessage *msgp; diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 650ec0958c..ad7ac27ac3 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -5715,6 +5715,12 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num, esdp->io.out = (Uint64) 0; esdp->io.in = (Uint64) 0; + esdp->pending_signal.sig = NULL; + esdp->pending_signal.to = THE_NON_VALUE; +#ifdef DEBUG + esdp->pending_signal.dbg_from = NULL; +#endif + if (daww_ptr) { init_aux_work_data(&esdp->aux_work_data, esdp, *daww_ptr); *daww_ptr += daww_sz; @@ -9674,8 +9680,13 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) } else { is_normal_sched = !esdp; if (is_normal_sched) { - esdp = p->scheduler_data; + esdp = p->scheduler_data; ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp)); + + if (esdp->pending_signal.sig) { + ASSERT(esdp->pending_signal.dbg_from == p); + erts_proc_sig_send_pending(esdp); + } } else { ASSERT(ERTS_SCHEDULER_IS_DIRTY(esdp)); @@ -10375,7 +10386,7 @@ notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st, ASSERT(hp_start + hsz == hp); #endif - erts_queue_message(rp, rp_locks, mp, msg, c_p->common.id); + erts_queue_proc_message(c_p, rp, rp_locks, mp, msg); if (c_p == rp) rp_locks &= ~ERTS_PROC_LOCK_MAIN; @@ -10937,7 +10948,7 @@ dispatch_system_task(Process *c_p, erts_aint_t fail_state, msg = copy_struct(operation, osz, &hp, ohp); msg = TUPLE4(hp, st->requester, target, prio, msg); - erts_queue_message(rp, rp_locks, mp, msg, st->requester); + erts_queue_message(rp, rp_locks, mp, msg, am_system); if (rp_locks) erts_proc_unlock(rp, rp_locks); @@ -11912,6 +11923,9 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->sig_inq.len = 0; p->sig_inq.nmsigs.next = NULL; p->sig_inq.nmsigs.last = NULL; +#ifdef ERTS_PROC_SIG_HARD_DEBUG + p->sig_inq.may_contain_heap_terms = 0; +#endif p->bif_timers = NULL; p->mbuf = NULL; p->msg_frag = NULL; @@ -12119,6 +12133,9 @@ void erts_init_empty_process(Process *p) p->sig_inq.len = 0; p->sig_inq.nmsigs.next = NULL; p->sig_inq.nmsigs.last = NULL; +#ifdef ERTS_PROC_SIG_HARD_DEBUG + p->sig_inq.may_contain_heap_terms = 0; +#endif p->bif_timers = NULL; p->dictionary = NULL; p->seq_trace_clock = 0; diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index e2aa1d9f84..b66272194c 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -660,6 +660,13 @@ struct ErtsSchedulerData_ { Uint64 out; Uint64 in; } io; + struct { + ErtsSignal* sig; + Eterm to; +#ifdef DEBUG + Process* dbg_from; +#endif + } pending_signal; Uint64 reductions; ErtsSchedWallTime sched_wall_time; |