aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r--erts/emulator/beam/beam_bif_load.c6
-rw-r--r--erts/emulator/beam/beam_debug.c2
-rw-r--r--erts/emulator/beam/dist.c2
-rw-r--r--erts/emulator/beam/erl_db.c2
-rw-r--r--erts/emulator/beam/erl_message.c144
-rw-r--r--erts/emulator/beam/erl_message.h27
-rw-r--r--erts/emulator/beam/erl_nif.c7
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c197
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.h12
-rw-r--r--erts/emulator/beam/erl_process.c23
-rw-r--r--erts/emulator/beam/erl_process.h7
11 files changed, 318 insertions, 111 deletions
diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c
index 5c76aafae7..d9312f4df8 100644
--- a/erts/emulator/beam/beam_bif_load.c
+++ b/erts/emulator/beam/beam_bif_load.c
@@ -1757,11 +1757,11 @@ BIF_RETTYPE erts_internal_purge_module_2(BIF_ALIST_2)
release_literal_areas.last = ref;
}
erts_mtx_unlock(&release_literal_areas.mtx);
- erts_queue_message(erts_literal_area_collector,
+ erts_queue_proc_message(BIF_P,
+ erts_literal_area_collector,
0,
erts_alloc_message(0, NULL),
- am_copy_literals,
- BIF_P->common.id);
+ am_copy_literals);
}
return ret;
diff --git a/erts/emulator/beam/beam_debug.c b/erts/emulator/beam/beam_debug.c
index 5eb68b817e..b8a8d06315 100644
--- a/erts/emulator/beam/beam_debug.c
+++ b/erts/emulator/beam/beam_debug.c
@@ -1191,7 +1191,7 @@ dirty_send_message(Process *c_p, Eterm to, Eterm tag)
mp = erts_alloc_message_heap(rp, &rp_locks, 3, &hp, &ohp);
msg = TUPLE2(hp, tag, c_p->common.id);
- erts_queue_message(rp, rp_locks, mp, msg, c_p->common.id);
+ erts_queue_proc_message(c_p, rp, rp_locks, mp, msg);
if (rp == real_c_p)
rp_locks &= ~c_p_locks;
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 24e1cac7bf..026f0a62d4 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -3587,7 +3587,7 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks)
dhandle = erts_build_dhandle(&hp, ohp, dep);
msg = TUPLE4(hp, am_auto_connect, dep->sysname, make_small(conn_id),
dhandle);
- erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id);
+ erts_queue_proc_message(proc, net_kernel, nk_locks, mp, msg);
erts_proc_unlock(net_kernel, nk_locks);
}
diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c
index f7ee408991..ca2ebb7c27 100644
--- a/erts/emulator/beam/erl_db.c
+++ b/erts/emulator/beam/erl_db.c
@@ -3547,7 +3547,7 @@ send_ets_transfer_message(Process *c_p, Process *proc,
hd_copy = copy_struct(heir_data, hd_sz, &hp, ohp);
sender = c_p->common.id;
msg = TUPLE4(hp, am_ETS_TRANSFER, tid, sender, hd_copy);
- erts_queue_message(proc, *locks, mp, msg, sender);
+ erts_queue_proc_message(c_p, proc, *locks, mp, msg);
}
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index 8eae85ccdd..bea7a0fe86 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -303,8 +303,7 @@ erts_queue_dist_message(Process *rcvr,
erts_cleanup_messages(mp);
}
else {
-
- LINK_MESSAGE(rcvr, mp, &mp->next, 1);
+ LINK_MESSAGE(rcvr, mp);
if (rcvr_locks & ERTS_PROC_LOCK_MAIN)
erts_proc_sig_fetch(rcvr);
@@ -317,9 +316,8 @@ erts_queue_dist_message(Process *rcvr,
}
/* Add messages last in message queue */
-static Sint
+static void
queue_messages(Process* receiver,
- erts_aint32_t *receiver_state,
ErtsProcLocks receiver_locks,
ErtsMessage* first,
ErtsMessage** last,
@@ -328,51 +326,51 @@ queue_messages(Process* receiver,
int locked_msgq = 0;
erts_aint32_t state;
- ASSERT(is_value(ERL_MESSAGE_TERM(first)));
- ASSERT(is_value(ERL_MESSAGE_FROM(first)));
- ASSERT(ERL_MESSAGE_TOKEN(first) == am_undefined ||
- ERL_MESSAGE_TOKEN(first) == NIL ||
- is_tuple(ERL_MESSAGE_TOKEN(first)));
-
-#ifdef ERTS_ENABLE_LOCK_CHECK
- ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(receiver) < ERTS_PROC_LOCK_MSGQ ||
- receiver_locks == erts_proc_lc_my_proc_locks(receiver));
+#ifdef DEBUG
+ {
+ ErtsMessage* fmsg = ERTS_SIG_IS_MSG(first) ? first : first->next;
+ ASSERT(fmsg);
+ ASSERT(is_value(ERL_MESSAGE_TERM(fmsg)));
+ ASSERT(is_value(ERL_MESSAGE_FROM(fmsg)));
+ ASSERT(ERL_MESSAGE_TOKEN(fmsg) == am_undefined ||
+ ERL_MESSAGE_TOKEN(fmsg) == NIL ||
+ is_tuple(ERL_MESSAGE_TOKEN(fmsg)));
+ }
#endif
- if (!(receiver_locks & ERTS_PROC_LOCK_MSGQ)) {
- if (erts_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) {
- ErtsProcLocks need_locks;
+ ERTS_LC_ASSERT((erts_proc_lc_my_proc_locks(receiver) & ERTS_PROC_LOCK_MSGQ)
+ == (receiver_locks & ERTS_PROC_LOCK_MSGQ));
- if (receiver_state)
- state = *receiver_state;
- else
- state = erts_atomic32_read_nob(&receiver->state);
- if (state & ERTS_PSFLG_EXITING)
- goto exiting;
-
- need_locks = receiver_locks & ERTS_PROC_LOCKS_HIGHER_THAN(ERTS_PROC_LOCK_MSGQ);
- if (need_locks) {
- erts_proc_unlock(receiver, need_locks);
- }
- need_locks |= ERTS_PROC_LOCK_MSGQ;
- erts_proc_lock(receiver, need_locks);
- }
+ if (!(receiver_locks & ERTS_PROC_LOCK_MSGQ)) {
+ erts_proc_lock(receiver, ERTS_PROC_LOCK_MSGQ);
locked_msgq = 1;
}
-
state = erts_atomic32_read_nob(&receiver->state);
if (state & ERTS_PSFLG_EXITING) {
- exiting:
/* Drop message if receiver is exiting or has a pending exit... */
if (locked_msgq)
- erts_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
+ if (ERTS_SIG_IS_NON_MSG(first)) {
+ ErtsSchedulerData* esdp = erts_get_scheduler_data();
+ ASSERT(esdp);
+ ASSERT(!esdp->pending_signal.sig);
+ esdp->pending_signal.sig = (ErtsSignal*) first;
+ esdp->pending_signal.to = receiver->common.id;
+ first = first->next;
+ }
erts_cleanup_messages(first);
- return 0;
+ return;
}
- LINK_MESSAGE(receiver, first, last, len);
+ if (last == &first->next) {
+ ASSERT(len == 1);
+ LINK_MESSAGE(receiver, first);
+ }
+ else {
+ erts_enqueue_signals(receiver, first, last, NULL, len, state);
+ }
if (receiver_locks & ERTS_PROC_LOCK_MAIN)
erts_proc_sig_fetch(receiver);
@@ -382,35 +380,67 @@ queue_messages(Process* receiver,
}
erts_proc_notify_new_message(receiver, receiver_locks);
- return 0;
}
-static Sint
-queue_message(Process* receiver,
- erts_aint32_t *receiver_state,
- ErtsProcLocks receiver_locks,
- ErtsMessage* mp, Eterm msg, Eterm from)
+static ERTS_INLINE
+ErtsMessage* prepend_pending_sig_maybe(Process* sender, Process* receiver,
+ ErtsMessage* mp)
{
- ERL_MESSAGE_TERM(mp) = msg;
- ERL_MESSAGE_FROM(mp) = from;
- return queue_messages(receiver, receiver_state, receiver_locks,
- mp, &mp->next, 1);
+ ErtsSchedulerData* esdp = sender->scheduler_data;
+ ErtsSignal* pend_sig;
+
+ if (!esdp || esdp->pending_signal.to != receiver->common.id)
+ return mp;
+
+ pend_sig = esdp->pending_signal.sig;
+
+ ASSERT(esdp->pending_signal.dbg_from == sender);
+ esdp->pending_signal.sig = NULL;
+ esdp->pending_signal.to = THE_NON_VALUE;
+ pend_sig->common.next = mp;
+ pend_sig->common.specific.next = NULL;
+ return (ErtsMessage*) pend_sig;
}
-Sint
+/**
+ *
+ * @brief Send one message from *NOT* a local process.
+ *
+ */
+void
erts_queue_message(Process* receiver, ErtsProcLocks receiver_locks,
ErtsMessage* mp, Eterm msg, Eterm from)
{
- return queue_message(receiver, NULL, receiver_locks, mp, msg, from);
+ ASSERT(is_not_internal_pid(from));
+ ERL_MESSAGE_TERM(mp) = msg;
+ ERL_MESSAGE_FROM(mp) = from;
+ queue_messages(receiver, receiver_locks, mp, &mp->next, 1);
}
+/**
+ * @brief Send one message from a local process.
+ */
+void
+erts_queue_proc_message(Process* sender,
+ Process* receiver, ErtsProcLocks receiver_locks,
+ ErtsMessage* mp, Eterm msg)
+{
+ ERL_MESSAGE_TERM(mp) = msg;
+ ERL_MESSAGE_FROM(mp) = sender->common.id;
+ queue_messages(receiver, receiver_locks,
+ prepend_pending_sig_maybe(sender, receiver, mp),
+ &mp->next, 1);
+}
-Sint
-erts_queue_messages(Process* receiver, ErtsProcLocks receiver_locks,
- ErtsMessage* first, ErtsMessage** last, Uint len)
+
+void
+erts_queue_proc_messages(Process* sender,
+ Process* receiver, ErtsProcLocks receiver_locks,
+ ErtsMessage* first, ErtsMessage** last, Uint len)
{
- return queue_messages(receiver, NULL, receiver_locks,
- first, last, len);
+ queue_messages(receiver, receiver_locks,
+ prepend_pending_sig_maybe(sender, receiver, first),
+ last, len);
}
void
@@ -547,7 +577,7 @@ erts_try_alloc_message_on_heap(Process *pp,
* Send a local message when sender & receiver processes are known.
*/
-Sint
+void
erts_send_message(Process* sender,
Process* receiver,
ErtsProcLocks *receiver_locks,
@@ -558,7 +588,6 @@ erts_send_message(Process* sender,
ErtsMessage* mp;
ErlOffHeap *ohp;
Eterm token = NIL;
- Sint res = 0;
#ifdef USE_VM_PROBES
DTRACE_CHARBUF(sender_name, 64);
DTRACE_CHARBUF(receiver_name, 64);
@@ -701,13 +730,8 @@ erts_send_message(Process* sender,
#ifdef USE_VM_PROBES
ERL_MESSAGE_DT_UTAG(mp) = utag;
#endif
- res = queue_message(receiver,
- &receiver_state,
- *receiver_locks,
- mp, message,
- sender->common.id);
- return res;
+ erts_queue_proc_message(sender, receiver, *receiver_locks, mp, message);
}
diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h
index ee87297ba4..d120111634 100644
--- a/erts/emulator/beam/erl_message.h
+++ b/erts/emulator/beam/erl_message.h
@@ -229,7 +229,7 @@ typedef union {
typedef struct {
/* pointers to next pointers pointing to... */
ErtsMessage **next; /* ... next (non-message) signal */
- ErtsMessage **last; /* ... next (non-message) signal */
+ ErtsMessage **last; /* ... last (non-message) signal */
} ErtsMsgQNMSigs;
/* Size of default message buffer (erl_message.c) */
@@ -296,8 +296,11 @@ typedef struct {
typedef struct {
ErtsMessage* first;
ErtsMessage** last; /* point to the last next pointer */
- Sint len; /* queue length */
+ Sint len; /* number of messages in queue */
ErtsMsgQNMSigs nmsigs;
+#ifdef ERTS_PROC_SIG_HARD_DEBUG
+ int may_contain_heap_terms;
+#endif
} ErtsSignalInQueue;
typedef struct erl_trace_message_queue__ {
@@ -364,13 +367,14 @@ typedef struct erl_trace_message_queue__ {
#endif
-/* Add message last_msg in message queue */
-#define LINK_MESSAGE(p, first_msg, last_msg, num_msgs) \
+/* Add one message last in message queue */
+#define LINK_MESSAGE(p, msg) \
do { \
+ ASSERT(ERTS_SIG_IS_MSG(msg)); \
ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE__((p), "before"); \
- *(p)->sig_inq.last = (first_msg); \
- (p)->sig_inq.last = (last_msg); \
- (p)->sig_inq.len += (num_msgs); \
+ *(p)->sig_inq.last = (msg); \
+ (p)->sig_inq.last = &(msg)->next; \
+ (p)->sig_inq.len++; \
ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE__((p), "before"); \
} while(0)
@@ -437,11 +441,12 @@ ErlHeapFragment* erts_resize_message_buffer(ErlHeapFragment *, Uint,
Eterm *, Uint);
void free_message_buffer(ErlHeapFragment *);
void erts_queue_dist_message(Process*, ErtsProcLocks, ErtsDistExternal *, Eterm, Eterm);
-Sint erts_queue_message(Process*, ErtsProcLocks,ErtsMessage*, Eterm, Eterm);
-Sint erts_queue_messages(Process*, ErtsProcLocks,
- ErtsMessage*, ErtsMessage**, Uint);
+void erts_queue_message(Process*, ErtsProcLocks,ErtsMessage*, Eterm, Eterm);
+void erts_queue_proc_message(Process* from,Process* to, ErtsProcLocks,ErtsMessage*, Eterm);
+void erts_queue_proc_messages(Process* from, Process* to, ErtsProcLocks,
+ ErtsMessage*, ErtsMessage**, Uint);
void erts_deliver_exit_message(Eterm, Process*, ErtsProcLocks *, Eterm, Eterm);
-Sint erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned);
+void erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned);
void erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp);
Uint erts_msg_attached_data_size_aux(ErtsMessage *msg);
diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c
index 260656e2d3..e208792868 100644
--- a/erts/emulator/beam/erl_nif.c
+++ b/erts/emulator/beam/erl_nif.c
@@ -665,7 +665,7 @@ int erts_flush_trace_messages(Process *c_p, ErtsProcLocks c_p_locks)
rp_locks = 0;
if (rp->common.id == c_p->common.id)
rp_locks = c_p_locks;
- erts_queue_messages(rp, rp_locks, first, last, len);
+ erts_queue_proc_messages(c_p, rp, rp_locks, first, last, len);
if (rp->common.id == c_p->common.id)
rp_locks &= ~c_p_locks;
if (rp_locks)
@@ -856,7 +856,10 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid,
}
}
- erts_queue_message(rp, rp_locks, mp, msg, from);
+ if (c_p)
+ erts_queue_proc_message(c_p, rp, rp_locks, mp, msg);
+ else
+ erts_queue_message(rp, rp_locks, mp, msg, from);
done:
if (c_p == rp)
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 6e940eb3f7..a2e6f1d39d 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -308,9 +308,8 @@ destroy_sig_group_leader(ErtsSigGroupLeader *sgl)
}
static ERTS_INLINE void
-sig_enqueue_trace(Process *c_p, ErtsMessage *sig, int op,
- Process *rp, ErtsMessage **first,
- ErtsMessage **last, ErtsMessage ***last_next)
+sig_enqueue_trace(Process *c_p, ErtsMessage **sigp, int op,
+ Process *rp, ErtsMessage ***last_next)
{
switch (op) {
case ERTS_SIG_Q_OP_LINK:
@@ -326,12 +325,11 @@ sig_enqueue_trace(Process *c_p, ErtsMessage *sig, int op,
* Prepend a trace-change-state signal before the
* link signal...
*/
-
tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_TRACE_CHANGE_STATE,
ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO,
0);
ti = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsSigTraceInfo));
- ti->common.next = *last;
+ ti->common.next = *sigp;
ti->common.specific.next = &ti->common.next;
ti->common.tag = tag;
ti->flags_on = ERTS_TRACE_FLAGS(c_p) & TRACEE_FLAGS;
@@ -344,8 +342,9 @@ sig_enqueue_trace(Process *c_p, ErtsMessage *sig, int op,
erts_proc_unlock(c_p, ERTS_PROC_LOCKS_ALL_MINOR);
}
erts_tracer_update(&ti->tracer, ERTS_TRACER(c_p));
- *first = (ErtsMessage *) ti;
- *last_next = &ti->common.next;
+ *sigp = (ErtsMessage *) ti;
+ if (!*last_next || *last_next == sigp)
+ *last_next = &ti->common.next;
}
break;
@@ -354,6 +353,7 @@ sig_enqueue_trace(Process *c_p, ErtsMessage *sig, int op,
case ERTS_SIG_Q_OP_EXIT_LINKED:
if (DTRACE_ENABLED(process_exit_signal)) {
+ ErtsMessage* sig = *sigp;
Uint16 type = ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag);
Eterm reason, from;
@@ -430,9 +430,24 @@ sig_enqueue_trace_cleanup(ErtsMessage *first, ErtsSignal *sig, ErtsMessage *last
}
}
+#ifdef DEBUG
+static int dbg_count_nmsigs(ErtsMessage *first)
+{
+ ErtsMessage *sig;
+ int cnt = 0;
+
+ for (sig = first; sig; sig = sig->next) {
+ if (ERTS_SIG_IS_NON_MSG(sig))
+ ++cnt;
+ }
+ return cnt;
+}
+#endif
+
static ERTS_INLINE erts_aint32_t
enqueue_signals(Process *rp, ErtsMessage *first,
- ErtsMessage *last, ErtsMessage **last_next,
+ ErtsMessage **last, ErtsMessage **last_next,
+ Uint num_msgs,
erts_aint32_t in_state)
{
erts_aint32_t state = in_state;
@@ -442,13 +457,23 @@ enqueue_signals(Process *rp, ErtsMessage *first,
ASSERT(!*this);
*this = first;
- rp->sig_inq.last = &last->next;
+ rp->sig_inq.last = last;
if (!rp->sig_inq.nmsigs.next) {
ASSERT(!rp->sig_inq.nmsigs.last);
- rp->sig_inq.nmsigs.next = this;
+ if (ERTS_SIG_IS_NON_MSG(first)) {
+ rp->sig_inq.nmsigs.next = this;
+ }
+ else if (last_next) {
+ ASSERT(first->next && ERTS_SIG_IS_NON_MSG(first->next));
+ rp->sig_inq.nmsigs.next = &first->next;
+ }
+ else
+ goto no_nmsig;
+
state = erts_atomic32_read_bor_nob(&rp->state,
ERTS_PSFLG_SIG_IN_Q);
+ no_nmsig:
ASSERT(!(state & ERTS_PSFLG_SIG_IN_Q));
}
else {
@@ -459,23 +484,41 @@ enqueue_signals(Process *rp, ErtsMessage *first,
ASSERT(sig && !sig->common.specific.next);
ASSERT(state & ERTS_PSFLG_SIG_IN_Q);
- sig->common.specific.next = this;
+ if (ERTS_SIG_IS_NON_MSG(first)) {
+ sig->common.specific.next = this;
+ }
+ else if (last_next) {
+ ASSERT(first->next && ERTS_SIG_IS_NON_MSG(first->next));
+ sig->common.specific.next = &first->next;
+ }
}
if (last_next) {
- ASSERT(first != last);
+ ASSERT(dbg_count_nmsigs(first) >= 2);
rp->sig_inq.nmsigs.last = last_next;
}
- else {
- ASSERT(first == last);
+ else if (ERTS_SIG_IS_NON_MSG(first)) {
+ ASSERT(dbg_count_nmsigs(first) == 1);
rp->sig_inq.nmsigs.last = this;
}
+ else
+ ASSERT(dbg_count_nmsigs(first) == 0);
+
+ rp->sig_inq.len += num_msgs;
ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp);
return state;
}
+erts_aint32_t erts_enqueue_signals(Process *rp, ErtsMessage *first,
+ ErtsMessage **last, ErtsMessage **last_next,
+ Uint num_msgs,
+ erts_aint32_t in_state)
+{
+ return enqueue_signals(rp, first, last, last_next, num_msgs, in_state);
+}
+
static ERTS_INLINE void
ensure_dirty_proc_handled(Eterm pid,
erts_aint32_t state,
@@ -516,26 +559,92 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op)
{
int res;
Process *rp;
- ErtsMessage *first, *last, **last_next;
+ ErtsMessage *first, *last, **last_next, **sigp;
ErtsSchedulerData *esdp = erts_get_scheduler_data();
int is_normal_sched = !!esdp && esdp->type == ERTS_SCHED_NORMAL;
erts_aint32_t state;
+ ErtsSignal *pend_sig;
- if (is_normal_sched)
- rp = erts_proc_lookup_raw(pid);
- else
- rp = erts_proc_lookup_raw_inc_refc(pid);
+ if (is_normal_sched) {
+ pend_sig = esdp->pending_signal.sig;
+ if (op == ERTS_SIG_Q_OP_MONITOR
+ && ((ErtsMonitor*)sig)->type == ERTS_MON_TYPE_PROC) {
- if (!rp)
- return 0;
+ if (!pend_sig) {
+ esdp->pending_signal.sig = sig;
+ esdp->pending_signal.to = pid;
+#ifdef DEBUG
+ esdp->pending_signal.dbg_from = esdp->current_process;
+#endif
+ return 1;
+ }
+ ASSERT(esdp->pending_signal.dbg_from == esdp->current_process);
+ if (pend_sig != sig) {
+ /* Switch them and send previously pending signal instead */
+ Eterm pend_to = esdp->pending_signal.to;
+ esdp->pending_signal.sig = sig;
+ esdp->pending_signal.to = pid;
+ sig = pend_sig;
+ pid = pend_to;
+ }
+ else {
+ /* Caller wants to flush pending signal */
+ ASSERT(pid == esdp->pending_signal.to);
+ esdp->pending_signal.sig = NULL;
+ esdp->pending_signal.to = THE_NON_VALUE;
+#ifdef DEBUG
+ esdp->pending_signal.dbg_from = NULL;
+#endif
+ pend_sig = NULL;
+ }
+ rp = erts_proc_lookup_raw(pid);
+ if (!rp) {
+ erts_proc_sig_send_monitor_down((ErtsMonitor*)sig, am_noproc);
+ return 1;
+ }
+ }
+ else if (pend_sig && pid == esdp->pending_signal.to) {
+ /* Flush pending signal to maintain signal order */
+ esdp->pending_signal.sig = NULL;
+ esdp->pending_signal.to = THE_NON_VALUE;
+
+ rp = erts_proc_lookup_raw(pid);
+ if (!rp) {
+ erts_proc_sig_send_monitor_down((ErtsMonitor*)pend_sig, am_noproc);
+ return 0;
+ }
+
+ /* Prepend pending signal */
+ pend_sig->common.next = (ErtsMessage*) sig;
+ pend_sig->common.specific.next = &pend_sig->common.next;
+ first = (ErtsMessage*) pend_sig;
+ last = (ErtsMessage*) sig;
+ sigp = last_next = &pend_sig->common.next;
+ goto first_last_done;
+ }
+ else {
+ pend_sig = NULL;
+ rp = erts_proc_lookup_raw(pid);
+ if (!rp)
+ return 0;
+ }
+ }
+ else {
+ rp = erts_proc_lookup_raw_inc_refc(pid);
+ if (!rp)
+ return 0;
+ pend_sig = NULL;
+ }
- sig->common.specific.next = NULL;
first = last = (ErtsMessage *) sig;
last_next = NULL;
+ sigp = &first;
+
+first_last_done:
+ sig->common.specific.next = NULL;
/* may add signals before and/or after sig */
- sig_enqueue_trace(c_p, first, op, rp,
- &first, &last, &last_next);
+ sig_enqueue_trace(c_p, sigp, op, rp, &last_next);
last->next = NULL;
@@ -546,7 +655,7 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op)
if (ERTS_PSFLG_FREE & state)
res = 0;
else {
- state = enqueue_signals(rp, first, last, last_next, state);
+ state = enqueue_signals(rp, first, &last->next, last_next, 0, state);
if (ERTS_UNLIKELY(op == ERTS_SIG_Q_OP_PROCESS_INFO))
check_push_msgq_len_offs_marker(rp, sig);
res = !0;
@@ -554,8 +663,21 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op)
erts_proc_unlock(rp, ERTS_PROC_LOCK_MSGQ);
- if (res == 0)
+ if (res == 0) {
+ if (pend_sig) {
+ if (sig == pend_sig) {
+ /* We did a switch, callers signal is now pending (still ok) */
+ ASSERT(esdp->pending_signal.sig);
+ res = 1;
+ }
+ else {
+ ASSERT(first == (ErtsMessage*)pend_sig);
+ first = first->next;
+ }
+ erts_proc_sig_send_monitor_down((ErtsMonitor*)pend_sig, am_noproc);
+ }
sig_enqueue_trace_cleanup(first, sig, last);
+ }
if (!(state & (ERTS_PSFLG_EXITING
| ERTS_PSFLG_ACTIVE_SYS
@@ -572,6 +694,24 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op)
return res;
}
+void erts_proc_sig_send_pending(ErtsSchedulerData* esdp)
+{
+ ErtsSignal* sig = esdp->pending_signal.sig;
+ int op;
+
+ ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL);
+ ASSERT(sig);
+ ASSERT(is_internal_pid(esdp->pending_signal.to));
+
+ op = ERTS_SIG_Q_OP_MONITOR;
+ ASSERT(op == ERTS_PROC_SIG_OP(sig->common.tag));
+
+ if (!proc_queue_signal(NULL, esdp->pending_signal.to, sig, op)) {
+ ErtsMonitor* mon = (ErtsMonitor*)sig;
+ erts_proc_sig_send_monitor_down(mon, am_noproc);
+ }
+}
+
static int
maybe_elevate_sig_handling_prio(Process *c_p, Eterm other)
{
@@ -1404,8 +1544,7 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref)
/* It wasn't alive; reply to ourselves... */
mp->next = NULL;
mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
- erts_queue_message(c_p, ERTS_PROC_LOCK_MAIN,
- mp, msg, am_system);
+ erts_queue_message(c_p, ERTS_PROC_LOCK_MAIN, mp, msg, am_system);
}
}
@@ -2490,7 +2629,7 @@ handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing,
if (is_alive)
erts_factory_trim_and_close(&hfact, &msg, 1);
- erts_queue_message(rp, locks, mp, msg, c_p->common.id);
+ erts_queue_proc_message(c_p, rp, locks, mp, msg);
if (!is_alive && locks)
erts_proc_unlock(rp, locks);
diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h
index d250ad820f..8b7cd35f61 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.h
+++ b/erts/emulator/beam/erl_proc_sig_queue.h
@@ -733,6 +733,18 @@ Sint
erts_proc_sig_privqs_len(Process *c_p);
+/* SVERK: Doc me up! */
+erts_aint32_t
+erts_enqueue_signals(Process *rp, ErtsMessage *first,
+ ErtsMessage **last, ErtsMessage **last_next,
+ Uint msg_cnt,
+ erts_aint32_t in_state);
+
+/* SVERK: Doc me up! */
+void
+erts_proc_sig_send_pending(ErtsSchedulerData* esdp);
+
+
typedef struct {
Uint size;
ErtsMessage *msgp;
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 650ec0958c..ad7ac27ac3 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -5715,6 +5715,12 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num,
esdp->io.out = (Uint64) 0;
esdp->io.in = (Uint64) 0;
+ esdp->pending_signal.sig = NULL;
+ esdp->pending_signal.to = THE_NON_VALUE;
+#ifdef DEBUG
+ esdp->pending_signal.dbg_from = NULL;
+#endif
+
if (daww_ptr) {
init_aux_work_data(&esdp->aux_work_data, esdp, *daww_ptr);
*daww_ptr += daww_sz;
@@ -9674,8 +9680,13 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
} else {
is_normal_sched = !esdp;
if (is_normal_sched) {
- esdp = p->scheduler_data;
+ esdp = p->scheduler_data;
ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp));
+
+ if (esdp->pending_signal.sig) {
+ ASSERT(esdp->pending_signal.dbg_from == p);
+ erts_proc_sig_send_pending(esdp);
+ }
}
else {
ASSERT(ERTS_SCHEDULER_IS_DIRTY(esdp));
@@ -10375,7 +10386,7 @@ notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st,
ASSERT(hp_start + hsz == hp);
#endif
- erts_queue_message(rp, rp_locks, mp, msg, c_p->common.id);
+ erts_queue_proc_message(c_p, rp, rp_locks, mp, msg);
if (c_p == rp)
rp_locks &= ~ERTS_PROC_LOCK_MAIN;
@@ -10937,7 +10948,7 @@ dispatch_system_task(Process *c_p, erts_aint_t fail_state,
msg = copy_struct(operation, osz, &hp, ohp);
msg = TUPLE4(hp, st->requester, target, prio, msg);
- erts_queue_message(rp, rp_locks, mp, msg, st->requester);
+ erts_queue_message(rp, rp_locks, mp, msg, am_system);
if (rp_locks)
erts_proc_unlock(rp, rp_locks);
@@ -11912,6 +11923,9 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
p->sig_inq.len = 0;
p->sig_inq.nmsigs.next = NULL;
p->sig_inq.nmsigs.last = NULL;
+#ifdef ERTS_PROC_SIG_HARD_DEBUG
+ p->sig_inq.may_contain_heap_terms = 0;
+#endif
p->bif_timers = NULL;
p->mbuf = NULL;
p->msg_frag = NULL;
@@ -12119,6 +12133,9 @@ void erts_init_empty_process(Process *p)
p->sig_inq.len = 0;
p->sig_inq.nmsigs.next = NULL;
p->sig_inq.nmsigs.last = NULL;
+#ifdef ERTS_PROC_SIG_HARD_DEBUG
+ p->sig_inq.may_contain_heap_terms = 0;
+#endif
p->bif_timers = NULL;
p->dictionary = NULL;
p->seq_trace_clock = 0;
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index e2aa1d9f84..b66272194c 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -660,6 +660,13 @@ struct ErtsSchedulerData_ {
Uint64 out;
Uint64 in;
} io;
+ struct {
+ ErtsSignal* sig;
+ Eterm to;
+#ifdef DEBUG
+ Process* dbg_from;
+#endif
+ } pending_signal;
Uint64 reductions;
ErtsSchedWallTime sched_wall_time;