aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_proc_sig_queue.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2018-04-13 13:36:29 +0200
committerRickard Green <[email protected]>2018-04-13 13:36:29 +0200
commitfa26cb5a3f7550d5779dce7de748bf91a4bc405e (patch)
tree75d1aa05f03495479348d5f957e4931118b69b2a /erts/emulator/beam/erl_proc_sig_queue.c
parent30fd5b9e98268305762b9647d1bf40df665ae54d (diff)
parent9f8a402cc3e49313089bb9e22bc625f07beea4ca (diff)
downloadotp-fa26cb5a3f7550d5779dce7de748bf91a4bc405e.tar.gz
otp-fa26cb5a3f7550d5779dce7de748bf91a4bc405e.tar.bz2
otp-fa26cb5a3f7550d5779dce7de748bf91a4bc405e.zip
Merge branch 'rickard/process_info/OTP-14966'
* rickard/process_info/OTP-14966: New process_info() implementation using signals
Diffstat (limited to 'erts/emulator/beam/erl_proc_sig_queue.c')
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c655
1 files changed, 500 insertions, 155 deletions
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index b32ba1b2e6..bcc4fc6d9b 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -49,7 +49,7 @@
* Note that not all signal are handled using this functionality!
*/
-#define ERTS_SIG_Q_OP_MAX 10
+#define ERTS_SIG_Q_OP_MAX 11
#define ERTS_SIG_Q_OP_EXIT 0
#define ERTS_SIG_Q_OP_EXIT_LINKED 1
@@ -61,7 +61,8 @@
#define ERTS_SIG_Q_OP_GROUP_LEADER 7
#define ERTS_SIG_Q_OP_TRACE_CHANGE_STATE 8
#define ERTS_SIG_Q_OP_PERSISTENT_MON_MSG 9
-#define ERTS_SIG_Q_OP_IS_ALIVE ERTS_SIG_Q_OP_MAX
+#define ERTS_SIG_Q_OP_IS_ALIVE 10
+#define ERTS_SIG_Q_OP_PROCESS_INFO ERTS_SIG_Q_OP_MAX
#define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 5)
@@ -76,46 +77,6 @@
#define ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO \
ERTS_SIG_Q_TYPE_MAX
-
-#define ERTS_SIG_Q_OP_BITS 8
-#define ERTS_SIG_Q_OP_SHIFT 0
-#define ERTS_SIG_Q_OP_MASK ((1 << ERTS_SIG_Q_OP_BITS) - 1)
-
-#define ERTS_SIG_Q_TYPE_BITS 8
-#define ERTS_SIG_Q_TYPE_SHIFT ERTS_SIG_Q_OP_BITS
-#define ERTS_SIG_Q_TYPE_MASK ((1 << ERTS_SIG_Q_TYPE_BITS) - 1)
-
-#define ERTS_SIG_Q_NON_X_BITS__ (_HEADER_ARITY_OFFS \
- + ERTS_SIG_Q_OP_BITS \
- + ERTS_SIG_Q_TYPE_BITS)
-
-#define ERTS_SIG_Q_XTRA_BITS (32 - ERTS_SIG_Q_NON_X_BITS__)
-#define ERTS_SIG_Q_XTRA_SHIFT (ERTS_SIG_Q_OP_BITS \
- + ERTS_SIG_Q_TYPE_BITS)
-#define ERTS_SIG_Q_XTRA_MASK ((1 << ERTS_SIG_Q_XTRA_BITS) - 1)
-
-#define ERTS_PROC_SIG_OP(Tag) \
- ((int) (_unchecked_thing_arityval((Tag)) \
- >> ERTS_SIG_Q_OP_SHIFT) & ERTS_SIG_Q_OP_MASK)
-
-#define ERTS_PROC_SIG_TYPE(Tag) \
- ((Uint16) (_unchecked_thing_arityval((Tag)) \
- >> ERTS_SIG_Q_TYPE_SHIFT) & ERTS_SIG_Q_TYPE_MASK)
-
-#define ERTS_PROC_SIG_XTRA(Tag) \
- ((Uint32) (_unchecked_thing_arityval((Tag)) \
- >> ERTS_SIG_Q_XTRA_SHIFT) & ERTS_SIG_Q_XTRA_MASK)
-
-#define ERTS_PROC_SIG_MAKE_TAG(Op, Type, Xtra) \
- (ASSERT(0 <= (Xtra) && (Xtra) <= ERTS_SIG_Q_XTRA_MASK), \
- _make_header((((Type) & ERTS_SIG_Q_TYPE_MASK) \
- << ERTS_SIG_Q_TYPE_SHIFT) \
- | (((Op) & ERTS_SIG_Q_OP_MASK) \
- << ERTS_SIG_Q_OP_SHIFT) \
- | (((Xtra) & ERTS_SIG_Q_XTRA_MASK) \
- << ERTS_SIG_Q_XTRA_SHIFT), \
- _TAG_HEADER_EXTERNAL_PID))
-
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler);
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_high);
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_max);
@@ -123,7 +84,8 @@ Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_max);
void
erts_proc_sig_queue_init(void)
{
- ERTS_CT_ASSERT(ERTS_SIG_Q_OP_MASK >= ERTS_SIG_Q_OP_MAX);
+ ERTS_CT_ASSERT(ERTS_SIG_Q_OP_MASK > ERTS_SIG_Q_OP_MAX);
+ ERTS_CT_ASSERT(ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK > ERTS_SIG_Q_OP_MAX);
ERTS_CT_ASSERT(ERTS_SIG_Q_TYPE_MASK >= ERTS_SIG_Q_TYPE_MAX);
}
@@ -191,6 +153,29 @@ typedef struct {
Eterm requester;
} ErtsIsAliveRequest;
+typedef struct {
+ ErtsSignalCommon common;
+ Sint refc;
+ Sint delayed_len;
+ Sint len_offset;
+} ErtsProcSigMsgQLenOffsetMarker;
+
+typedef struct {
+ ErtsSignalCommon common;
+ ErtsProcSigMsgQLenOffsetMarker marker;
+ Sint msgq_len_offset;
+ Eterm requester;
+ Eterm ref;
+ ErtsORefThing oref_thing;
+ Uint reserve_size;
+ Uint len;
+ int flags;
+ int item_ix[1]; /* of len size in reality... */
+} ErtsProcessInfoSig;
+
+#define ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE ((Sint) -1)
+#define ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC ((Sint) -2)
+
static int handle_msg_tracing(Process *c_p,
ErtsSigRecvTracing *tracing,
ErtsMessage ***next_nm_sig);
@@ -523,6 +508,9 @@ ensure_dirty_proc_handled(Eterm pid,
}
}
+static void
+check_push_msgq_len_offs_marker(Process *rp, ErtsSignal *sig);
+
static int
proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op)
{
@@ -559,6 +547,8 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op)
res = 0;
else {
state = enqueue_signals(rp, first, last, last_next, state);
+ if (ERTS_UNLIKELY(op == ERTS_SIG_Q_OP_PROCESS_INFO))
+ check_push_msgq_len_offs_marker(rp, sig);
res = !0;
}
@@ -620,31 +610,14 @@ maybe_elevate_sig_handling_prio(Process *c_p, Eterm other)
}
void
-erts_proc_sig_fetch(Process *proc)
+erts_proc_sig_fetch__(Process *proc)
{
#ifdef ERTS_PROC_SIG_HARD_DEBUG
ErtsSignalPrivQueues sig_qs = proc->sig_qs;
ErtsSignalInQueue sig_inq = proc->sig_inq;
#endif
- ERTS_LC_ASSERT(erts_thr_progress_is_blocking()
- || ERTS_PROC_IS_EXITING(proc)
- || ((erts_proc_lc_my_proc_locks(proc)
- & (ERTS_PROC_LOCK_MAIN
- | ERTS_PROC_LOCK_MSGQ))
- == (ERTS_PROC_LOCK_MAIN
- | ERTS_PROC_LOCK_MSGQ)));
-
- if (!proc->sig_inq.first) {
- ASSERT(proc->sig_inq.last == &proc->sig_inq.first);
- ASSERT(proc->sig_inq.len == 0);
- ASSERT(!proc->sig_inq.nmsigs.next);
- ASSERT(!proc->sig_inq.nmsigs.last);
- return;
- }
-
- ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(proc);
- ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc);
+ ASSERT(proc->sig_inq.first);
if (!proc->sig_inq.nmsigs.next) {
ASSERT(!(ERTS_PSFLG_SIG_IN_Q
@@ -720,7 +693,102 @@ erts_proc_sig_fetch(Process *proc)
proc->sig_inq.last = &proc->sig_inq.first;
proc->sig_inq.len = 0;
- ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc);
+}
+
+Sint
+erts_proc_sig_fetch_msgq_len_offs__(Process *proc)
+{
+ ErtsProcSigMsgQLenOffsetMarker *marker
+ = (ErtsProcSigMsgQLenOffsetMarker *) proc->sig_inq.first;
+
+ ASSERT(marker->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK);
+
+ if (marker->common.next) {
+ Sint len;
+
+ proc->flags |= F_DELAYED_PSIGQS_LEN;
+
+ /*
+ * Prevent update of sig_qs.len in fetch. These
+ * updates are done via process-info signal(s)
+ * instead...
+ */
+ len = proc->sig_inq.len;
+ marker->delayed_len += len;
+ marker->len_offset -= len;
+ proc->sig_inq.len = 0;
+
+ /*
+ * Temorarily remove marker during fetch...
+ */
+
+ proc->sig_inq.first = marker->common.next;
+ if (proc->sig_inq.last == &marker->common.next)
+ proc->sig_inq.last = &proc->sig_inq.first;
+ if (proc->sig_inq.nmsigs.next == &marker->common.next)
+ proc->sig_inq.nmsigs.next = &proc->sig_inq.first;
+ if (proc->sig_inq.nmsigs.last == &marker->common.next)
+ proc->sig_inq.nmsigs.last = &proc->sig_inq.first;
+
+ erts_proc_sig_fetch__(proc);
+
+ marker->common.next = NULL;
+ proc->sig_inq.first = (ErtsMessage *) marker;
+ proc->sig_inq.last = &marker->common.next;
+
+ }
+
+ return marker->delayed_len;
+}
+
+static ERTS_INLINE Sint
+proc_sig_privqs_len(Process *c_p, int have_qlock)
+{
+ Sint res = c_p->sig_qs.len;
+
+ ERTS_LC_ASSERT(!have_qlock
+ ? (ERTS_PROC_LOCK_MAIN
+ == erts_proc_lc_my_proc_locks(c_p))
+ : ((ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_MAIN)
+ == ((ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_MAIN)
+ & erts_proc_lc_my_proc_locks(c_p))));
+
+ if (c_p->flags & F_DELAYED_PSIGQS_LEN) {
+ ErtsProcSigMsgQLenOffsetMarker *marker;
+
+ if (!have_qlock)
+ erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
+
+ marker = (ErtsProcSigMsgQLenOffsetMarker *) c_p->sig_inq.first;
+ ASSERT(marker);
+ ASSERT(marker->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK);
+
+ res += marker->delayed_len;
+
+ if (!have_qlock)
+ erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
+ }
+
+#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN
+ {
+ Sint len = 0;
+ ERTS_FOREACH_SIG_PRIVQS(
+ c_p, mp,
+ {
+ if (ERTS_SIG_IS_MSG(mp))
+ len++;
+ });
+ ERTS_ASSERT(res == len);
+ }
+#endif
+
+ return res;
+}
+
+Sint
+erts_proc_sig_privqs_len(Process *c_p)
+{
+ return proc_sig_privqs_len(c_p, 0);
}
static void do_seq_trace_output(Eterm to, Eterm token, Eterm msg);
@@ -1354,6 +1422,59 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref)
}
}
+int
+erts_proc_sig_send_process_info_request(Process *c_p,
+ Eterm to,
+ int *item_ix,
+ int len,
+ int need_msgq_len,
+ int flags,
+ Uint reserve_size,
+ Eterm ref)
+{
+ Uint size = sizeof(ErtsProcessInfoSig) + (len - 1) * sizeof(int);
+ ErtsProcessInfoSig *pis = erts_alloc(ERTS_ALC_T_SIG_DATA, size);
+ int res;
+
+ ASSERT(c_p);
+ ASSERT(item_ix);
+ ASSERT(len > 0);
+ ASSERT(is_internal_ordinary_ref(ref));
+
+ pis->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_PROCESS_INFO,
+ 0, 0);
+
+ if (!need_msgq_len)
+ pis->msgq_len_offset = ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE;
+ else {
+ pis->msgq_len_offset = ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC;
+ pis->marker.common.next = NULL;
+ pis->marker.common.specific.next = NULL;
+ pis->marker.common.tag = ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK;
+ pis->marker.refc = 0;
+ pis->marker.delayed_len = 0;
+ pis->marker.len_offset = 0;
+ }
+ pis->requester = c_p->common.id;
+ sys_memcpy((void *) &pis->oref_thing,
+ (void *) internal_ref_val(ref),
+ sizeof(ErtsORefThing));
+ pis->ref = make_internal_ref((char *) &pis->oref_thing);
+ pis->reserve_size = reserve_size;
+ pis->len = len;
+ pis->flags = flags;
+ sys_memcpy((void *) &pis->item_ix[0],
+ (void *) item_ix,
+ sizeof(int)*len);
+ res = proc_queue_signal(c_p, to, (ErtsSignal *) pis,
+ ERTS_SIG_Q_OP_PROCESS_INFO);
+ if (res)
+ (void) maybe_elevate_sig_handling_prio(c_p, to);
+ else
+ erts_free(ERTS_ALC_T_SIG_DATA, pis);
+ return res;
+}
+
static void
is_alive_response(Process *c_p, ErtsMessage *mp, int is_alive)
{
@@ -1554,7 +1675,6 @@ remove_mq_m_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig, ErtsMess
{
/* Removing message... */
ASSERT(!ERTS_SIG_IS_NON_MSG(sig));
- ASSERT(c_p->sig_qs.len > 0);
c_p->sig_qs.len--;
remove_mq_sig(c_p, sig, next_sig, next_nm_sig);
}
@@ -1564,7 +1684,6 @@ remove_iq_m_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig)
{
/* Removing message... */
ASSERT(!ERTS_SIG_IS_NON_MSG(sig));
- ASSERT(c_p->sig_qs.len > 0);
c_p->sig_qs.len--;
remove_iq_sig(c_p, sig, next_sig);
}
@@ -2140,6 +2259,258 @@ handle_group_leader(Process *c_p, ErtsSigGroupLeader *sgl)
destroy_sig_group_leader(sgl);
}
+static void
+check_push_msgq_len_offs_marker(Process *rp, ErtsSignal *sig)
+{
+ ErtsProcessInfoSig *pisig = (ErtsProcessInfoSig *) sig;
+
+ ASSERT(ERTS_PROC_SIG_OP(sig->common.tag) == ERTS_SIG_Q_OP_PROCESS_INFO);
+
+ if (pisig->msgq_len_offset == ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC) {
+ ErtsProcSigMsgQLenOffsetMarker *mrkr;
+ Sint len, msgq_len_offset;
+ ErtsMessage *first = rp->sig_inq.first;
+ ASSERT(first);
+ if (((ErtsSignal *) first)->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK)
+ mrkr = (ErtsProcSigMsgQLenOffsetMarker *) first;
+ else {
+ mrkr = &pisig->marker;
+
+ ASSERT(mrkr->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK);
+
+ mrkr->common.next = first;
+ ASSERT(rp->sig_inq.last != &rp->sig_inq.first);
+ if (rp->sig_inq.nmsigs.next == &rp->sig_inq.first)
+ rp->sig_inq.nmsigs.next = &mrkr->common.next;
+ if (rp->sig_inq.nmsigs.last == &rp->sig_inq.first)
+ rp->sig_inq.nmsigs.last = &mrkr->common.next;
+ rp->sig_inq.first = (ErtsMessage *) mrkr;
+ }
+
+ len = rp->sig_inq.len;
+ msgq_len_offset = len - mrkr->len_offset;
+
+ mrkr->len_offset = len;
+ mrkr->refc++;
+
+ pisig->msgq_len_offset = msgq_len_offset;
+
+#ifdef DEBUG
+ /* save pointer to used marker... */
+ pisig->marker.common.specific.attachment = (void *) mrkr;
+#endif
+
+ }
+}
+
+static void
+destroy_process_info_request(Process *c_p, ErtsProcessInfoSig *pisig)
+{
+ int dealloc_pisig = !0;
+
+ if (pisig->msgq_len_offset != ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE) {
+ Sint refc;
+ int dealloc_marker = 0;
+ ErtsProcSigMsgQLenOffsetMarker *marker;
+#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN
+ Sint delayed_len;
+#endif
+
+ ASSERT(pisig->msgq_len_offset >= 0);
+
+ erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
+ marker = (ErtsProcSigMsgQLenOffsetMarker *) c_p->sig_inq.first;
+ ASSERT(marker);
+ ASSERT(marker->refc > 0);
+ ASSERT(pisig->marker.common.specific.attachment == (void *) marker);
+
+ marker->delayed_len -= pisig->msgq_len_offset;
+#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN
+ delayed_len = marker->delayed_len;
+#endif
+
+ refc = --marker->refc;
+ if (refc) {
+ if (marker == &pisig->marker) {
+ /* Another signal using our marker... */
+ dealloc_pisig = 0;
+ }
+ }
+ else {
+ /* Marker unused; remove it... */
+ ASSERT(marker->delayed_len + marker->len_offset == 0);
+#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN
+ delayed_len += marker->len_offset;
+#endif
+ if (marker != &pisig->marker)
+ dealloc_marker = !0; /* used another signals marker... */
+ c_p->sig_inq.first = marker->common.next;
+ if (c_p->sig_inq.last == &marker->common.next)
+ c_p->sig_inq.last = &c_p->sig_inq.first;
+ if (c_p->sig_inq.nmsigs.next == &marker->common.next)
+ c_p->sig_inq.nmsigs.next = &c_p->sig_inq.first;
+ if (c_p->sig_inq.nmsigs.last == &marker->common.next)
+ c_p->sig_inq.nmsigs.last = &c_p->sig_inq.first;
+ }
+ erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
+
+ if (!refc) {
+ c_p->flags &= ~F_DELAYED_PSIGQS_LEN;
+ /* Adjust msg len of inner+middle queue */
+ ASSERT(marker->len_offset <= 0);
+ c_p->sig_qs.len -= marker->len_offset;
+
+ ASSERT(c_p->sig_qs.len >= 0);
+ }
+
+#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN
+ {
+ Sint len = 0;
+ ERTS_FOREACH_SIG_PRIVQS(
+ c_p, mp,
+ {
+ if (ERTS_SIG_IS_MSG(mp))
+ len++;
+ });
+ ERTS_ASSERT(c_p->sig_qs.len + delayed_len == len);
+ }
+#endif
+
+
+ if (dealloc_marker) {
+ ErtsProcessInfoSig *pisig2
+ = (ErtsProcessInfoSig *) (((char *) marker)
+ - offsetof(ErtsProcessInfoSig,
+ marker));
+ erts_free(ERTS_ALC_T_SIG_DATA, pisig2);
+ }
+ }
+
+ if (dealloc_pisig)
+ erts_free(ERTS_ALC_T_SIG_DATA, pisig);
+}
+
+static int
+handle_process_info(Process *c_p, ErtsMessage *sig,
+ ErtsMessage ***next_nm_sig, int is_alive)
+{
+ ErtsProcessInfoSig *pisig = (ErtsProcessInfoSig *) sig;
+ Uint reds = 0;
+ Process *rp;
+
+ if (pisig->msgq_len_offset != ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE) {
+ /*
+ * Request requires message queue data to be updated
+ * before inspection...
+ */
+
+ ASSERT(pisig->msgq_len_offset >= 0);
+ /*
+ * Update sig_qs.len to reflect the length
+ * of the message queue...
+ */
+ c_p->sig_qs.len += pisig->msgq_len_offset;
+
+ if (is_alive) {
+ /*
+ * Move messages part of message queue into inner
+ * signal queue...
+ */
+ if (*next_nm_sig != &c_p->sig_qs.cont) {
+ *c_p->sig_qs.last = c_p->sig_qs.cont;
+ c_p->sig_qs.last = *next_nm_sig;
+
+ c_p->sig_qs.cont = **next_nm_sig;
+ if (c_p->sig_qs.nmsigs.last == *next_nm_sig)
+ c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont;
+ *next_nm_sig = &c_p->sig_qs.cont;
+ *c_p->sig_qs.last = NULL;
+ }
+
+ if (!pisig->common.specific.next) {
+ /*
+ * No more signals in middle queue...
+ *
+ * Process-info 'status' needs sig-q
+ * process flag to be updated in order
+ * to show accurate result...
+ */
+ erts_atomic32_read_band_nob(&c_p->state,
+ ~ERTS_PSFLG_SIG_Q);
+ }
+
+#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN
+ {
+ Sint len;
+ ErtsMessage *mp;
+ for (mp = c_p->sig_qs.first, len = 0; mp; mp = mp->next) {
+ ERTS_ASSERT(ERTS_SIG_IS_MSG(mp));
+ len++;
+ }
+ ERTS_ASSERT(c_p->sig_qs.len == len);
+ }
+#endif
+ }
+ }
+ if (is_alive)
+ remove_nm_sig(c_p, sig, next_nm_sig);
+
+ rp = erts_proc_lookup(pisig->requester);
+ ASSERT(c_p != rp);
+ if (rp) {
+ Eterm msg, res, ref, *hp;
+ ErtsProcLocks locks = 0;
+ ErtsHeapFactory hfact;
+ ErtsMessage *mp;
+ Uint reserve_size = 3 + sizeof(pisig->oref_thing)/sizeof(Eterm);
+
+ if (!is_alive) {
+ ErlOffHeap *ohp;
+ mp = erts_alloc_message_heap(rp, &locks, reserve_size, &hp, &ohp);
+ res = am_undefined;
+ }
+ else {
+ ErlHeapFragment *hfrag;
+
+ reserve_size += pisig->reserve_size;
+
+ mp = erts_alloc_message(0, NULL);
+ hfrag = new_message_buffer(reserve_size);
+ mp->data.heap_frag = hfrag;
+ erts_factory_selfcontained_message_init(&hfact, mp, &hfrag->mem[0]);
+
+ res = erts_process_info(c_p, &hfact, c_p, ERTS_PROC_LOCK_MAIN,
+ pisig->item_ix, pisig->len,
+ pisig->flags, reserve_size, &reds);
+
+ hp = erts_produce_heap(&hfact,
+ 3 + sizeof(pisig->oref_thing)/sizeof(Eterm),
+ 0);
+ }
+
+ sys_memcpy((void *) hp, (void *) &pisig->oref_thing,
+ sizeof(pisig->oref_thing));
+ ref = make_internal_ref(hp);
+ hp += sizeof(pisig->oref_thing)/sizeof(Eterm);
+
+ msg = TUPLE2(hp, ref, res);
+
+ if (is_alive)
+ erts_factory_trim_and_close(&hfact, &msg, 1);
+
+ erts_queue_message(rp, locks, mp, msg, c_p->common.id);
+
+ if (!is_alive && locks)
+ erts_proc_unlock(rp, locks);
+ }
+
+ destroy_process_info_request(c_p, pisig);
+
+ if (reds > INT_MAX/8)
+ reds = INT_MAX/8;
+
+ return ((int) reds)*4 + 8;
+}
/*
* Called in order to handle incoming signals.
@@ -2155,7 +2526,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
ErtsMessage *sig, ***next_nm_sig;
ErtsSigRecvTracing tracing;
- ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p);
+ ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
if (local_only)
@@ -2504,6 +2875,12 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
+ case ERTS_SIG_Q_OP_PROCESS_INFO:
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ handle_process_info(c_p, sig, next_nm_sig, !0);
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ break;
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: {
Uint16 type = ERTS_PROC_SIG_TYPE(tag);
@@ -2657,7 +3034,7 @@ stop: {
c_p->sig_qs.save = c_p->sig_qs.saved_last;
}
- ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p);
+ ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
*redsp = cnt/4 + 1;
@@ -2709,8 +3086,8 @@ erts_proc_sig_handle_exit(Process *c_p, int *redsp)
int cnt, limit;
ErtsMessage *sig, ***next_nm_sig;
- ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p);
- ERTS_LC_ASSERT(!erts_proc_lc_my_proc_locks(c_p));
+ ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
+ ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);
ASSERT(!(ERTS_PSFLG_SIG_IN_Q & erts_atomic32_read_nob(&c_p->state)));
@@ -2817,9 +3194,11 @@ erts_proc_sig_handle_exit(Process *c_p, int *redsp)
}
case ERTS_SIG_Q_OP_IS_ALIVE:
- ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
is_alive_response(c_p, sig, 0);
- ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ break;
+
+ case ERTS_SIG_Q_OP_PROCESS_INFO:
+ handle_process_info(c_p, sig, next_nm_sig, 0);
break;
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
@@ -3005,6 +3384,13 @@ erts_proc_sig_signal_size(ErtsSignal *sig)
size = sizeof(ErtsSigTraceInfo);
break;
+ case ERTS_SIG_Q_OP_PROCESS_INFO: {
+ ErtsProcessInfoSig *pisig = (ErtsProcessInfoSig *) sig;
+ size = sizeof(ErtsProcessInfoSig);
+ size += (pisig->len - 1) * sizeof(int);
+ break;
+ }
+
default:
ERTS_INTERNAL_ERROR("Unknown signal");
break;
@@ -3060,8 +3446,7 @@ erts_proc_sig_receive_helper(Process *c_p,
consumed_reds += 4;
left_reds -= 4;
erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
- if (c_p->sig_inq.first)
- erts_proc_sig_fetch(c_p);
+ erts_proc_sig_fetch(c_p);
/*
* Messages may have been moved directly to
* inner queue...
@@ -3222,6 +3607,7 @@ handle_message_enqueued_tracing(Process *c_p,
Sint tok_label = 0;
Sint tok_lastcnt = 0;
Sint tok_serial = 0;
+ Sint len = erts_proc_sig_privqs_len(c_p);
Eterm seq_trace_token = ERL_MESSAGE_TOKEN(msg);
if (seq_trace_token != NIL && is_tuple(seq_trace_token)) {
@@ -3233,7 +3619,7 @@ handle_message_enqueued_tracing(Process *c_p,
DTRACE6(message_queued,
tracing->messages.receiver_name,
size_object(ERL_MESSAGE_TERM(msg)),
- c_p->sig_qs.len,
+ len, /* This is NOT message queue len, but its something... */
tok_label, tok_lastcnt, tok_serial);
}
#endif
@@ -3305,24 +3691,11 @@ handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing,
return 0;
}
-/*
- * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0 will move off heap messages
- * into the heap of the inspected process if off_heap_message_queue
- * is false when process_info(_, messages) is called. That is, the
- * following GC will have more data in the rootset compared to the
- * scenario when process_info(_, messages) had not been called.
- *
- * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS != 0 will keep off heap messages
- * off heap when process_info(_, messages) is called regardless of
- * the off_heap_message_queue setting of the process. That is, it
- * will change the following execution of the process as little as
- * possible.
- */
-#define ERTS_INSPECT_MSGQ_KEEP_OH_MSGS 1
-
Uint
-erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp,
+erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
+ Process *rp,
ErtsProcLocks rp_locks,
+ int info_on_self,
ErtsMessageInfo *mip)
{
Uint tot_heap_size;
@@ -3331,9 +3704,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp,
int self_on_heap;
/*
- * Prepare the message queue for inspection
- * by process_info().
- *
+ * Prepare the message queue (inner signal queue)
+ * for inspection by process_info().
*
* - Decode all messages on external format
* - Remove all corrupt dist messages from queue
@@ -3342,20 +3714,11 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp,
* - Return total heap size need for all messages
* that needs to be copied.
*
- * If ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0:
- * - In case off heap messages is disabled and
- * we are inspecting our own queue, move all
- * off heap data into the heap.
*/
- /*
- * All non-message signals *need* to have been
- * handled before calling this functions...
- */
- ASSERT(!rp->sig_qs.cont);
- ASSERT(!rp->sig_qs.nmsigs.next && !rp->sig_qs.nmsigs.last);
+ ASSERT(!info_on_self || c_p == rp);
- self_on_heap = c_p == rp && !(c_p->flags & F_OFF_HEAP_MSGQ);
+ self_on_heap = info_on_self && !(c_p->flags & F_OFF_HEAP_MSGQ);
tot_heap_size = 0;
i = 0;
@@ -3369,8 +3732,7 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp,
if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) {
/* decode it... */
if (mp->data.attached)
- erts_decode_dist_message(rp, rp_locks, mp,
- ERTS_INSPECT_MSGQ_KEEP_OH_MSGS);
+ erts_decode_dist_message(rp, rp_locks, mp, !0);
msg = ERL_MESSAGE_TERM(mp);
@@ -3396,44 +3758,11 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp,
ASSERT(is_value(msg));
-#if ERTS_INSPECT_MSGQ_KEEP_OH_MSGS
if (is_not_immed(msg) && (!self_on_heap || mp->data.attached)) {
Uint sz = size_object(msg);
mip[i].size = sz;
tot_heap_size += sz;
}
-#else
- if (self_on_heap) {
- if (mp->data.attached) {
- ErtsMessage *tmp = NULL;
- if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) {
- erts_link_mbuf_to_proc(rp, mp->data.heap_frag);
- mp->data.attached = NULL;
- }
- else {
- /*
- * Need to replace the message reference since
- * we will get references to the message data
- * from the heap...
- */
- ErtsMessage **mpp;
- tmp = erts_alloc_message(0, NULL);
- sys_memcpy((void *) tmp->m, (void *) mp->m,
- sizeof(Eterm)*ERL_MESSAGE_REF_ARRAY_SZ);
- mpp = i == 0 ? &rp->sig_qs.first : &mip[i-1].msgp->next;
- erts_msgq_replace_msg_ref(&rp->msg, tmp, mpp);
- erts_save_message_in_proc(rp, mp);
- mp = tmp;
- }
- }
- }
- else if (is_not_immed(msg)) {
- Uint sz = size_object(msg);
- mip[i].size = sz;
- tot_heap_size += sz;
- }
-
-#endif
mip[i].msgp = mp;
i++;
@@ -3441,6 +3770,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp,
mp = mp->next;
}
+ ASSERT(c_p->sig_qs.len == i);
+
return tot_heap_size;
}
@@ -3477,11 +3808,11 @@ move_msg_to_heap(Process *c_p, ErtsMessage *mp)
void
erts_proc_sig_move_msgs_to_heap(Process *c_p)
{
- ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p);
+ ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
ERTS_FOREACH_SIG_PRIVQS(c_p, sig, move_msg_to_heap(c_p, sig));
- ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p);
+ ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
}
@@ -3672,6 +4003,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
case ERTS_SIG_Q_OP_IS_ALIVE:
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
+ case ERTS_SIG_Q_OP_PROCESS_INFO:
break;
default:
@@ -3716,10 +4048,8 @@ chk_eterm(Process *c_p, int privq, ErtsMessage *mp, Eterm term)
for (bp = erts_message_to_heap_frag(mp); bp; bp = bp->next) {
if (bp->mem <= ptr && ptr < bp->mem + bp->used_size)
return;
- bp = bp->next;
}
- ERTS_ASSERT(privq);
ASSERT(erts_dbg_within_proc(ptr, c_p, NULL));
}
@@ -3742,6 +4072,12 @@ proc_sig_hdbg_check_queue(Process *proc,
ErtsMessage **save = proc->sig_qs.save;
ErtsMessage **saved_last = proc->sig_qs.saved_last;
+ if (!privq) {
+ ErtsSignal *sig = (ErtsSignal *) *sig_next;
+ if (sig->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK) {
+
+ }
+ }
nm_next = sig_nm_next;
nm_last = sig_nm_last;
@@ -3806,21 +4142,29 @@ proc_sig_hdbg_check_queue(Process *proc,
if (!sig)
break;
- nm_sigs++;
+ nm_sig = (ErtsSignal *) sig;
- ERTS_ASSERT(!last_nm_sig_found);
- ERTS_ASSERT(ERTS_SIG_IS_NON_MSG(sig));
+ if (nm_sig->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK) {
+ ERTS_ASSERT(!privq);
+ ERTS_ASSERT(sig == *sig_next);
+ }
+ else {
+ nm_sigs++;
- nm_sig = (ErtsSignal *) sig;
+ ERTS_ASSERT(!last_nm_sig_found);
+ ERTS_ASSERT(ERTS_SIG_IS_NON_MSG(sig));
+
+ ERTS_ASSERT(nm_next == next);
+
+ if (nm_last == next) {
+ ASSERT(!nm_sig->common.specific.next);
+ last_nm_sig_found = 1;
+ }
- ERTS_ASSERT(nm_next == next);
+ nm_next = nm_sig->common.specific.next;
- if (nm_last == next) {
- ASSERT(!nm_sig->common.specific.next);
- last_nm_sig_found = 1;
}
- nm_next = nm_sig->common.specific.next;
next = &nm_sig->common.next;
sig = nm_sig->common.next;
@@ -3875,10 +4219,10 @@ proc_sig_hdbg_check_queue(Process *proc,
}
void
-erts_proc_sig_hdbg_check_priv_queue(Process *p, char *what, char *file, int line)
+erts_proc_sig_hdbg_check_priv_queue(Process *p, int qlock, char *what, char *file, int line)
{
int found_saved_last = 0;
- Sint len1, len2;
+ Sint len, len1, len2;
ERTS_LC_ASSERT(erts_thr_progress_is_blocking()
|| ERTS_PROC_IS_EXITING(p)
|| (ERTS_PROC_LOCK_MAIN
@@ -3903,7 +4247,8 @@ erts_proc_sig_hdbg_check_priv_queue(Process *p, char *what, char *file, int line
ERTS_PSFLG_SIG_Q);
if (p->sig_qs.saved_last)
ERTS_ASSERT(found_saved_last);
- ERTS_ASSERT(p->sig_qs.len == len1 + len2);
+ len = proc_sig_privqs_len(p, qlock);
+ ERTS_ASSERT(len == len1 + len2);
}
void