From 9f8a402cc3e49313089bb9e22bc625f07beea4ca Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Tue, 27 Mar 2018 11:26:39 +0200 Subject: New process_info() implementation using signals --- erts/emulator/beam/erl_proc_sig_queue.c | 655 ++++++++++++++++++++++++-------- 1 file changed, 500 insertions(+), 155 deletions(-) (limited to 'erts/emulator/beam/erl_proc_sig_queue.c') diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 99d20e9242..4bb5f31538 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -49,7 +49,7 @@ * Note that not all signal are handled using this functionality! */ -#define ERTS_SIG_Q_OP_MAX 10 +#define ERTS_SIG_Q_OP_MAX 11 #define ERTS_SIG_Q_OP_EXIT 0 #define ERTS_SIG_Q_OP_EXIT_LINKED 1 @@ -61,7 +61,8 @@ #define ERTS_SIG_Q_OP_GROUP_LEADER 7 #define ERTS_SIG_Q_OP_TRACE_CHANGE_STATE 8 #define ERTS_SIG_Q_OP_PERSISTENT_MON_MSG 9 -#define ERTS_SIG_Q_OP_IS_ALIVE ERTS_SIG_Q_OP_MAX +#define ERTS_SIG_Q_OP_IS_ALIVE 10 +#define ERTS_SIG_Q_OP_PROCESS_INFO ERTS_SIG_Q_OP_MAX #define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 5) @@ -76,46 +77,6 @@ #define ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO \ ERTS_SIG_Q_TYPE_MAX - -#define ERTS_SIG_Q_OP_BITS 8 -#define ERTS_SIG_Q_OP_SHIFT 0 -#define ERTS_SIG_Q_OP_MASK ((1 << ERTS_SIG_Q_OP_BITS) - 1) - -#define ERTS_SIG_Q_TYPE_BITS 8 -#define ERTS_SIG_Q_TYPE_SHIFT ERTS_SIG_Q_OP_BITS -#define ERTS_SIG_Q_TYPE_MASK ((1 << ERTS_SIG_Q_TYPE_BITS) - 1) - -#define ERTS_SIG_Q_NON_X_BITS__ (_HEADER_ARITY_OFFS \ - + ERTS_SIG_Q_OP_BITS \ - + ERTS_SIG_Q_TYPE_BITS) - -#define ERTS_SIG_Q_XTRA_BITS (32 - ERTS_SIG_Q_NON_X_BITS__) -#define ERTS_SIG_Q_XTRA_SHIFT (ERTS_SIG_Q_OP_BITS \ - + ERTS_SIG_Q_TYPE_BITS) -#define ERTS_SIG_Q_XTRA_MASK ((1 << ERTS_SIG_Q_XTRA_BITS) - 1) - -#define ERTS_PROC_SIG_OP(Tag) \ - ((int) (_unchecked_thing_arityval((Tag)) \ - >> ERTS_SIG_Q_OP_SHIFT) & ERTS_SIG_Q_OP_MASK) - -#define ERTS_PROC_SIG_TYPE(Tag) \ - ((Uint16) (_unchecked_thing_arityval((Tag)) \ - >> ERTS_SIG_Q_TYPE_SHIFT) & ERTS_SIG_Q_TYPE_MASK) - -#define ERTS_PROC_SIG_XTRA(Tag) \ - ((Uint32) (_unchecked_thing_arityval((Tag)) \ - >> ERTS_SIG_Q_XTRA_SHIFT) & ERTS_SIG_Q_XTRA_MASK) - -#define ERTS_PROC_SIG_MAKE_TAG(Op, Type, Xtra) \ - (ASSERT(0 <= (Xtra) && (Xtra) <= ERTS_SIG_Q_XTRA_MASK), \ - _make_header((((Type) & ERTS_SIG_Q_TYPE_MASK) \ - << ERTS_SIG_Q_TYPE_SHIFT) \ - | (((Op) & ERTS_SIG_Q_OP_MASK) \ - << ERTS_SIG_Q_OP_SHIFT) \ - | (((Xtra) & ERTS_SIG_Q_XTRA_MASK) \ - << ERTS_SIG_Q_XTRA_SHIFT), \ - _TAG_HEADER_EXTERNAL_PID)) - Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler); Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_high); Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_max); @@ -123,7 +84,8 @@ Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_max); void erts_proc_sig_queue_init(void) { - ERTS_CT_ASSERT(ERTS_SIG_Q_OP_MASK >= ERTS_SIG_Q_OP_MAX); + ERTS_CT_ASSERT(ERTS_SIG_Q_OP_MASK > ERTS_SIG_Q_OP_MAX); + ERTS_CT_ASSERT(ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK > ERTS_SIG_Q_OP_MAX); ERTS_CT_ASSERT(ERTS_SIG_Q_TYPE_MASK >= ERTS_SIG_Q_TYPE_MAX); } @@ -191,6 +153,29 @@ typedef struct { Eterm requester; } ErtsIsAliveRequest; +typedef struct { + ErtsSignalCommon common; + Sint refc; + Sint delayed_len; + Sint len_offset; +} ErtsProcSigMsgQLenOffsetMarker; + +typedef struct { + ErtsSignalCommon common; + ErtsProcSigMsgQLenOffsetMarker marker; + Sint msgq_len_offset; + Eterm requester; + Eterm ref; + ErtsORefThing oref_thing; + Uint reserve_size; + Uint len; + int flags; + int item_ix[1]; /* of len size in reality... */ +} ErtsProcessInfoSig; + +#define ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE ((Sint) -1) +#define ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC ((Sint) -2) + static int handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing, ErtsMessage ***next_nm_sig); @@ -523,6 +508,9 @@ ensure_dirty_proc_handled(Eterm pid, } } +static void +check_push_msgq_len_offs_marker(Process *rp, ErtsSignal *sig); + static int proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op) { @@ -559,6 +547,8 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op) res = 0; else { state = enqueue_signals(rp, first, last, last_next, state); + if (ERTS_UNLIKELY(op == ERTS_SIG_Q_OP_PROCESS_INFO)) + check_push_msgq_len_offs_marker(rp, sig); res = !0; } @@ -620,31 +610,14 @@ maybe_elevate_sig_handling_prio(Process *c_p, Eterm other) } void -erts_proc_sig_fetch(Process *proc) +erts_proc_sig_fetch__(Process *proc) { #ifdef ERTS_PROC_SIG_HARD_DEBUG ErtsSignalPrivQueues sig_qs = proc->sig_qs; ErtsSignalInQueue sig_inq = proc->sig_inq; #endif - ERTS_LC_ASSERT(erts_thr_progress_is_blocking() - || ERTS_PROC_IS_EXITING(proc) - || ((erts_proc_lc_my_proc_locks(proc) - & (ERTS_PROC_LOCK_MAIN - | ERTS_PROC_LOCK_MSGQ)) - == (ERTS_PROC_LOCK_MAIN - | ERTS_PROC_LOCK_MSGQ))); - - if (!proc->sig_inq.first) { - ASSERT(proc->sig_inq.last == &proc->sig_inq.first); - ASSERT(proc->sig_inq.len == 0); - ASSERT(!proc->sig_inq.nmsigs.next); - ASSERT(!proc->sig_inq.nmsigs.last); - return; - } - - ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(proc); - ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc); + ASSERT(proc->sig_inq.first); if (!proc->sig_inq.nmsigs.next) { ASSERT(!(ERTS_PSFLG_SIG_IN_Q @@ -720,7 +693,102 @@ erts_proc_sig_fetch(Process *proc) proc->sig_inq.last = &proc->sig_inq.first; proc->sig_inq.len = 0; - ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(proc); +} + +Sint +erts_proc_sig_fetch_msgq_len_offs__(Process *proc) +{ + ErtsProcSigMsgQLenOffsetMarker *marker + = (ErtsProcSigMsgQLenOffsetMarker *) proc->sig_inq.first; + + ASSERT(marker->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK); + + if (marker->common.next) { + Sint len; + + proc->flags |= F_DELAYED_PSIGQS_LEN; + + /* + * Prevent update of sig_qs.len in fetch. These + * updates are done via process-info signal(s) + * instead... + */ + len = proc->sig_inq.len; + marker->delayed_len += len; + marker->len_offset -= len; + proc->sig_inq.len = 0; + + /* + * Temorarily remove marker during fetch... + */ + + proc->sig_inq.first = marker->common.next; + if (proc->sig_inq.last == &marker->common.next) + proc->sig_inq.last = &proc->sig_inq.first; + if (proc->sig_inq.nmsigs.next == &marker->common.next) + proc->sig_inq.nmsigs.next = &proc->sig_inq.first; + if (proc->sig_inq.nmsigs.last == &marker->common.next) + proc->sig_inq.nmsigs.last = &proc->sig_inq.first; + + erts_proc_sig_fetch__(proc); + + marker->common.next = NULL; + proc->sig_inq.first = (ErtsMessage *) marker; + proc->sig_inq.last = &marker->common.next; + + } + + return marker->delayed_len; +} + +static ERTS_INLINE Sint +proc_sig_privqs_len(Process *c_p, int have_qlock) +{ + Sint res = c_p->sig_qs.len; + + ERTS_LC_ASSERT(!have_qlock + ? (ERTS_PROC_LOCK_MAIN + == erts_proc_lc_my_proc_locks(c_p)) + : ((ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_MAIN) + == ((ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_MAIN) + & erts_proc_lc_my_proc_locks(c_p)))); + + if (c_p->flags & F_DELAYED_PSIGQS_LEN) { + ErtsProcSigMsgQLenOffsetMarker *marker; + + if (!have_qlock) + erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + + marker = (ErtsProcSigMsgQLenOffsetMarker *) c_p->sig_inq.first; + ASSERT(marker); + ASSERT(marker->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK); + + res += marker->delayed_len; + + if (!have_qlock) + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); + } + +#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN + { + Sint len = 0; + ERTS_FOREACH_SIG_PRIVQS( + c_p, mp, + { + if (ERTS_SIG_IS_MSG(mp)) + len++; + }); + ERTS_ASSERT(res == len); + } +#endif + + return res; +} + +Sint +erts_proc_sig_privqs_len(Process *c_p) +{ + return proc_sig_privqs_len(c_p, 0); } void do_seq_trace_output(Eterm to, Eterm token, Eterm msg); @@ -1352,6 +1420,59 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref) } } +int +erts_proc_sig_send_process_info_request(Process *c_p, + Eterm to, + int *item_ix, + int len, + int need_msgq_len, + int flags, + Uint reserve_size, + Eterm ref) +{ + Uint size = sizeof(ErtsProcessInfoSig) + (len - 1) * sizeof(int); + ErtsProcessInfoSig *pis = erts_alloc(ERTS_ALC_T_SIG_DATA, size); + int res; + + ASSERT(c_p); + ASSERT(item_ix); + ASSERT(len > 0); + ASSERT(is_internal_ordinary_ref(ref)); + + pis->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_PROCESS_INFO, + 0, 0); + + if (!need_msgq_len) + pis->msgq_len_offset = ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE; + else { + pis->msgq_len_offset = ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC; + pis->marker.common.next = NULL; + pis->marker.common.specific.next = NULL; + pis->marker.common.tag = ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK; + pis->marker.refc = 0; + pis->marker.delayed_len = 0; + pis->marker.len_offset = 0; + } + pis->requester = c_p->common.id; + sys_memcpy((void *) &pis->oref_thing, + (void *) internal_ref_val(ref), + sizeof(ErtsORefThing)); + pis->ref = make_internal_ref((char *) &pis->oref_thing); + pis->reserve_size = reserve_size; + pis->len = len; + pis->flags = flags; + sys_memcpy((void *) &pis->item_ix[0], + (void *) item_ix, + sizeof(int)*len); + res = proc_queue_signal(c_p, to, (ErtsSignal *) pis, + ERTS_SIG_Q_OP_PROCESS_INFO); + if (res) + (void) maybe_elevate_sig_handling_prio(c_p, to); + else + erts_free(ERTS_ALC_T_SIG_DATA, pis); + return res; +} + static void is_alive_response(Process *c_p, ErtsMessage *mp, int is_alive) { @@ -1552,7 +1673,6 @@ remove_mq_m_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig, ErtsMess { /* Removing message... */ ASSERT(!ERTS_SIG_IS_NON_MSG(sig)); - ASSERT(c_p->sig_qs.len > 0); c_p->sig_qs.len--; remove_mq_sig(c_p, sig, next_sig, next_nm_sig); } @@ -1562,7 +1682,6 @@ remove_iq_m_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig) { /* Removing message... */ ASSERT(!ERTS_SIG_IS_NON_MSG(sig)); - ASSERT(c_p->sig_qs.len > 0); c_p->sig_qs.len--; remove_iq_sig(c_p, sig, next_sig); } @@ -2138,6 +2257,258 @@ handle_group_leader(Process *c_p, ErtsSigGroupLeader *sgl) destroy_sig_group_leader(sgl); } +static void +check_push_msgq_len_offs_marker(Process *rp, ErtsSignal *sig) +{ + ErtsProcessInfoSig *pisig = (ErtsProcessInfoSig *) sig; + + ASSERT(ERTS_PROC_SIG_OP(sig->common.tag) == ERTS_SIG_Q_OP_PROCESS_INFO); + + if (pisig->msgq_len_offset == ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC) { + ErtsProcSigMsgQLenOffsetMarker *mrkr; + Sint len, msgq_len_offset; + ErtsMessage *first = rp->sig_inq.first; + ASSERT(first); + if (((ErtsSignal *) first)->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK) + mrkr = (ErtsProcSigMsgQLenOffsetMarker *) first; + else { + mrkr = &pisig->marker; + + ASSERT(mrkr->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK); + + mrkr->common.next = first; + ASSERT(rp->sig_inq.last != &rp->sig_inq.first); + if (rp->sig_inq.nmsigs.next == &rp->sig_inq.first) + rp->sig_inq.nmsigs.next = &mrkr->common.next; + if (rp->sig_inq.nmsigs.last == &rp->sig_inq.first) + rp->sig_inq.nmsigs.last = &mrkr->common.next; + rp->sig_inq.first = (ErtsMessage *) mrkr; + } + + len = rp->sig_inq.len; + msgq_len_offset = len - mrkr->len_offset; + + mrkr->len_offset = len; + mrkr->refc++; + + pisig->msgq_len_offset = msgq_len_offset; + +#ifdef DEBUG + /* save pointer to used marker... */ + pisig->marker.common.specific.attachment = (void *) mrkr; +#endif + + } +} + +static void +destroy_process_info_request(Process *c_p, ErtsProcessInfoSig *pisig) +{ + int dealloc_pisig = !0; + + if (pisig->msgq_len_offset != ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE) { + Sint refc; + int dealloc_marker = 0; + ErtsProcSigMsgQLenOffsetMarker *marker; +#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN + Sint delayed_len; +#endif + + ASSERT(pisig->msgq_len_offset >= 0); + + erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + marker = (ErtsProcSigMsgQLenOffsetMarker *) c_p->sig_inq.first; + ASSERT(marker); + ASSERT(marker->refc > 0); + ASSERT(pisig->marker.common.specific.attachment == (void *) marker); + + marker->delayed_len -= pisig->msgq_len_offset; +#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN + delayed_len = marker->delayed_len; +#endif + + refc = --marker->refc; + if (refc) { + if (marker == &pisig->marker) { + /* Another signal using our marker... */ + dealloc_pisig = 0; + } + } + else { + /* Marker unused; remove it... */ + ASSERT(marker->delayed_len + marker->len_offset == 0); +#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN + delayed_len += marker->len_offset; +#endif + if (marker != &pisig->marker) + dealloc_marker = !0; /* used another signals marker... */ + c_p->sig_inq.first = marker->common.next; + if (c_p->sig_inq.last == &marker->common.next) + c_p->sig_inq.last = &c_p->sig_inq.first; + if (c_p->sig_inq.nmsigs.next == &marker->common.next) + c_p->sig_inq.nmsigs.next = &c_p->sig_inq.first; + if (c_p->sig_inq.nmsigs.last == &marker->common.next) + c_p->sig_inq.nmsigs.last = &c_p->sig_inq.first; + } + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); + + if (!refc) { + c_p->flags &= ~F_DELAYED_PSIGQS_LEN; + /* Adjust msg len of inner+middle queue */ + ASSERT(marker->len_offset <= 0); + c_p->sig_qs.len -= marker->len_offset; + + ASSERT(c_p->sig_qs.len >= 0); + } + +#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN + { + Sint len = 0; + ERTS_FOREACH_SIG_PRIVQS( + c_p, mp, + { + if (ERTS_SIG_IS_MSG(mp)) + len++; + }); + ERTS_ASSERT(c_p->sig_qs.len + delayed_len == len); + } +#endif + + + if (dealloc_marker) { + ErtsProcessInfoSig *pisig2 + = (ErtsProcessInfoSig *) (((char *) marker) + - offsetof(ErtsProcessInfoSig, + marker)); + erts_free(ERTS_ALC_T_SIG_DATA, pisig2); + } + } + + if (dealloc_pisig) + erts_free(ERTS_ALC_T_SIG_DATA, pisig); +} + +static int +handle_process_info(Process *c_p, ErtsMessage *sig, + ErtsMessage ***next_nm_sig, int is_alive) +{ + ErtsProcessInfoSig *pisig = (ErtsProcessInfoSig *) sig; + Uint reds = 0; + Process *rp; + + if (pisig->msgq_len_offset != ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE) { + /* + * Request requires message queue data to be updated + * before inspection... + */ + + ASSERT(pisig->msgq_len_offset >= 0); + /* + * Update sig_qs.len to reflect the length + * of the message queue... + */ + c_p->sig_qs.len += pisig->msgq_len_offset; + + if (is_alive) { + /* + * Move messages part of message queue into inner + * signal queue... + */ + if (*next_nm_sig != &c_p->sig_qs.cont) { + *c_p->sig_qs.last = c_p->sig_qs.cont; + c_p->sig_qs.last = *next_nm_sig; + + c_p->sig_qs.cont = **next_nm_sig; + if (c_p->sig_qs.nmsigs.last == *next_nm_sig) + c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont; + *next_nm_sig = &c_p->sig_qs.cont; + *c_p->sig_qs.last = NULL; + } + + if (!pisig->common.specific.next) { + /* + * No more signals in middle queue... + * + * Process-info 'status' needs sig-q + * process flag to be updated in order + * to show accurate result... + */ + erts_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_SIG_Q); + } + +#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN + { + Sint len; + ErtsMessage *mp; + for (mp = c_p->sig_qs.first, len = 0; mp; mp = mp->next) { + ERTS_ASSERT(ERTS_SIG_IS_MSG(mp)); + len++; + } + ERTS_ASSERT(c_p->sig_qs.len == len); + } +#endif + } + } + if (is_alive) + remove_nm_sig(c_p, sig, next_nm_sig); + + rp = erts_proc_lookup(pisig->requester); + ASSERT(c_p != rp); + if (rp) { + Eterm msg, res, ref, *hp; + ErtsProcLocks locks = 0; + ErtsHeapFactory hfact; + ErtsMessage *mp; + Uint reserve_size = 3 + sizeof(pisig->oref_thing)/sizeof(Eterm); + + if (!is_alive) { + ErlOffHeap *ohp; + mp = erts_alloc_message_heap(rp, &locks, reserve_size, &hp, &ohp); + res = am_undefined; + } + else { + ErlHeapFragment *hfrag; + + reserve_size += pisig->reserve_size; + + mp = erts_alloc_message(0, NULL); + hfrag = new_message_buffer(reserve_size); + mp->data.heap_frag = hfrag; + erts_factory_selfcontained_message_init(&hfact, mp, &hfrag->mem[0]); + + res = erts_process_info(c_p, &hfact, c_p, ERTS_PROC_LOCK_MAIN, + pisig->item_ix, pisig->len, + pisig->flags, reserve_size, &reds); + + hp = erts_produce_heap(&hfact, + 3 + sizeof(pisig->oref_thing)/sizeof(Eterm), + 0); + } + + sys_memcpy((void *) hp, (void *) &pisig->oref_thing, + sizeof(pisig->oref_thing)); + ref = make_internal_ref(hp); + hp += sizeof(pisig->oref_thing)/sizeof(Eterm); + + msg = TUPLE2(hp, ref, res); + + if (is_alive) + erts_factory_trim_and_close(&hfact, &msg, 1); + + erts_queue_message(rp, locks, mp, msg, c_p->common.id); + + if (!is_alive && locks) + erts_proc_unlock(rp, locks); + } + + destroy_process_info_request(c_p, pisig); + + if (reds > INT_MAX/8) + reds = INT_MAX/8; + + return ((int) reds)*4 + 8; +} /* * Called in order to handle incoming signals. @@ -2153,7 +2524,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, ErtsMessage *sig, ***next_nm_sig; ErtsSigRecvTracing tracing; - ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p); + ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); if (local_only) @@ -2502,6 +2873,12 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); break; + case ERTS_SIG_Q_OP_PROCESS_INFO: + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + handle_process_info(c_p, sig, next_nm_sig, !0); + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + break; + case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: { Uint16 type = ERTS_PROC_SIG_TYPE(tag); @@ -2655,7 +3032,7 @@ stop: { c_p->sig_qs.save = c_p->sig_qs.saved_last; } - ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p); + ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); *redsp = cnt/4 + 1; @@ -2707,8 +3084,8 @@ erts_proc_sig_handle_exit(Process *c_p, int *redsp) int cnt, limit; ErtsMessage *sig, ***next_nm_sig; - ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p); - ERTS_LC_ASSERT(!erts_proc_lc_my_proc_locks(c_p)); + ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); + ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN); ASSERT(!(ERTS_PSFLG_SIG_IN_Q & erts_atomic32_read_nob(&c_p->state))); @@ -2815,9 +3192,11 @@ erts_proc_sig_handle_exit(Process *c_p, int *redsp) } case ERTS_SIG_Q_OP_IS_ALIVE: - ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); is_alive_response(c_p, sig, 0); - ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + break; + + case ERTS_SIG_Q_OP_PROCESS_INFO: + handle_process_info(c_p, sig, next_nm_sig, 0); break; case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: @@ -3003,6 +3382,13 @@ erts_proc_sig_signal_size(ErtsSignal *sig) size = sizeof(ErtsSigTraceInfo); break; + case ERTS_SIG_Q_OP_PROCESS_INFO: { + ErtsProcessInfoSig *pisig = (ErtsProcessInfoSig *) sig; + size = sizeof(ErtsProcessInfoSig); + size += (pisig->len - 1) * sizeof(int); + break; + } + default: ERTS_INTERNAL_ERROR("Unknown signal"); break; @@ -3058,8 +3444,7 @@ erts_proc_sig_receive_helper(Process *c_p, consumed_reds += 4; left_reds -= 4; erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); - if (c_p->sig_inq.first) - erts_proc_sig_fetch(c_p); + erts_proc_sig_fetch(c_p); /* * Messages may have been moved directly to * inner queue... @@ -3220,6 +3605,7 @@ handle_message_enqueued_tracing(Process *c_p, Sint tok_label = 0; Sint tok_lastcnt = 0; Sint tok_serial = 0; + Sint len = erts_proc_sig_privqs_len(c_p); Eterm seq_trace_token = ERL_MESSAGE_TOKEN(msg); if (seq_trace_token != NIL && is_tuple(seq_trace_token)) { @@ -3231,7 +3617,7 @@ handle_message_enqueued_tracing(Process *c_p, DTRACE6(message_queued, tracing->messages.receiver_name, size_object(ERL_MESSAGE_TERM(msg)), - c_p->sig_qs.len, + len, /* This is NOT message queue len, but its something... */ tok_label, tok_lastcnt, tok_serial); } #endif @@ -3303,24 +3689,11 @@ handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing, return 0; } -/* - * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0 will move off heap messages - * into the heap of the inspected process if off_heap_message_queue - * is false when process_info(_, messages) is called. That is, the - * following GC will have more data in the rootset compared to the - * scenario when process_info(_, messages) had not been called. - * - * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS != 0 will keep off heap messages - * off heap when process_info(_, messages) is called regardless of - * the off_heap_message_queue setting of the process. That is, it - * will change the following execution of the process as little as - * possible. - */ -#define ERTS_INSPECT_MSGQ_KEEP_OH_MSGS 1 - Uint -erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp, +erts_proc_sig_prep_msgq_for_inspection(Process *c_p, + Process *rp, ErtsProcLocks rp_locks, + int info_on_self, ErtsMessageInfo *mip) { Uint tot_heap_size; @@ -3329,9 +3702,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp, int self_on_heap; /* - * Prepare the message queue for inspection - * by process_info(). - * + * Prepare the message queue (inner signal queue) + * for inspection by process_info(). * * - Decode all messages on external format * - Remove all corrupt dist messages from queue @@ -3340,20 +3712,11 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp, * - Return total heap size need for all messages * that needs to be copied. * - * If ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0: - * - In case off heap messages is disabled and - * we are inspecting our own queue, move all - * off heap data into the heap. */ - /* - * All non-message signals *need* to have been - * handled before calling this functions... - */ - ASSERT(!rp->sig_qs.cont); - ASSERT(!rp->sig_qs.nmsigs.next && !rp->sig_qs.nmsigs.last); + ASSERT(!info_on_self || c_p == rp); - self_on_heap = c_p == rp && !(c_p->flags & F_OFF_HEAP_MSGQ); + self_on_heap = info_on_self && !(c_p->flags & F_OFF_HEAP_MSGQ); tot_heap_size = 0; i = 0; @@ -3367,8 +3730,7 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp, if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) { /* decode it... */ if (mp->data.attached) - erts_decode_dist_message(rp, rp_locks, mp, - ERTS_INSPECT_MSGQ_KEEP_OH_MSGS); + erts_decode_dist_message(rp, rp_locks, mp, !0); msg = ERL_MESSAGE_TERM(mp); @@ -3394,44 +3756,11 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp, ASSERT(is_value(msg)); -#if ERTS_INSPECT_MSGQ_KEEP_OH_MSGS if (is_not_immed(msg) && (!self_on_heap || mp->data.attached)) { Uint sz = size_object(msg); mip[i].size = sz; tot_heap_size += sz; } -#else - if (self_on_heap) { - if (mp->data.attached) { - ErtsMessage *tmp = NULL; - if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) { - erts_link_mbuf_to_proc(rp, mp->data.heap_frag); - mp->data.attached = NULL; - } - else { - /* - * Need to replace the message reference since - * we will get references to the message data - * from the heap... - */ - ErtsMessage **mpp; - tmp = erts_alloc_message(0, NULL); - sys_memcpy((void *) tmp->m, (void *) mp->m, - sizeof(Eterm)*ERL_MESSAGE_REF_ARRAY_SZ); - mpp = i == 0 ? &rp->sig_qs.first : &mip[i-1].msgp->next; - erts_msgq_replace_msg_ref(&rp->msg, tmp, mpp); - erts_save_message_in_proc(rp, mp); - mp = tmp; - } - } - } - else if (is_not_immed(msg)) { - Uint sz = size_object(msg); - mip[i].size = sz; - tot_heap_size += sz; - } - -#endif mip[i].msgp = mp; i++; @@ -3439,6 +3768,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, Process *rp, mp = mp->next; } + ASSERT(c_p->sig_qs.len == i); + return tot_heap_size; } @@ -3475,11 +3806,11 @@ move_msg_to_heap(Process *c_p, ErtsMessage *mp) void erts_proc_sig_move_msgs_to_heap(Process *c_p) { - ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p); + ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); ERTS_FOREACH_SIG_PRIVQS(c_p, sig, move_msg_to_heap(c_p, sig)); - ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p); + ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); } @@ -3670,6 +4001,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, case ERTS_SIG_Q_OP_IS_ALIVE: case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: + case ERTS_SIG_Q_OP_PROCESS_INFO: break; default: @@ -3714,10 +4046,8 @@ chk_eterm(Process *c_p, int privq, ErtsMessage *mp, Eterm term) for (bp = erts_message_to_heap_frag(mp); bp; bp = bp->next) { if (bp->mem <= ptr && ptr < bp->mem + bp->used_size) return; - bp = bp->next; } - ERTS_ASSERT(privq); ASSERT(erts_dbg_within_proc(ptr, c_p, NULL)); } @@ -3740,6 +4070,12 @@ proc_sig_hdbg_check_queue(Process *proc, ErtsMessage **save = proc->sig_qs.save; ErtsMessage **saved_last = proc->sig_qs.saved_last; + if (!privq) { + ErtsSignal *sig = (ErtsSignal *) *sig_next; + if (sig->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK) { + + } + } nm_next = sig_nm_next; nm_last = sig_nm_last; @@ -3804,21 +4140,29 @@ proc_sig_hdbg_check_queue(Process *proc, if (!sig) break; - nm_sigs++; + nm_sig = (ErtsSignal *) sig; - ERTS_ASSERT(!last_nm_sig_found); - ERTS_ASSERT(ERTS_SIG_IS_NON_MSG(sig)); + if (nm_sig->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK) { + ERTS_ASSERT(!privq); + ERTS_ASSERT(sig == *sig_next); + } + else { + nm_sigs++; - nm_sig = (ErtsSignal *) sig; + ERTS_ASSERT(!last_nm_sig_found); + ERTS_ASSERT(ERTS_SIG_IS_NON_MSG(sig)); + + ERTS_ASSERT(nm_next == next); + + if (nm_last == next) { + ASSERT(!nm_sig->common.specific.next); + last_nm_sig_found = 1; + } - ERTS_ASSERT(nm_next == next); + nm_next = nm_sig->common.specific.next; - if (nm_last == next) { - ASSERT(!nm_sig->common.specific.next); - last_nm_sig_found = 1; } - nm_next = nm_sig->common.specific.next; next = &nm_sig->common.next; sig = nm_sig->common.next; @@ -3873,10 +4217,10 @@ proc_sig_hdbg_check_queue(Process *proc, } void -erts_proc_sig_hdbg_check_priv_queue(Process *p, char *what, char *file, int line) +erts_proc_sig_hdbg_check_priv_queue(Process *p, int qlock, char *what, char *file, int line) { int found_saved_last = 0; - Sint len1, len2; + Sint len, len1, len2; ERTS_LC_ASSERT(erts_thr_progress_is_blocking() || ERTS_PROC_IS_EXITING(p) || (ERTS_PROC_LOCK_MAIN @@ -3901,7 +4245,8 @@ erts_proc_sig_hdbg_check_priv_queue(Process *p, char *what, char *file, int line ERTS_PSFLG_SIG_Q); if (p->sig_qs.saved_last) ERTS_ASSERT(found_saved_last); - ERTS_ASSERT(p->sig_qs.len == len1 + len2); + len = proc_sig_privqs_len(p, qlock); + ERTS_ASSERT(len == len1 + len2); } void -- cgit v1.2.3