/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 2018. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* %CopyrightEnd%
*/
/*
* Description: Process signal queue implementation.
*
* Author: Rickard Green
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "sys.h"
#include "global.h"
#include "dist.h"
#include "erl_process.h"
#include "erl_port_task.h"
#include "erl_trace.h"
#include "beam_bp.h"
#include "erl_binary.h"
#include "big.h"
#include "erl_gc.h"
#include "bif.h"
#include "erl_bif_unique.h"
#include "erl_proc_sig_queue.h"
#include "dtrace-wrapper.h"
#define ERTS_SIG_REDS_CNT_FACTOR 4
#define ERTS_PROC_SIG_TRACE_COUNT_LIMIT 200
/*
* Note that not all signal are handled using this functionality!
*/
#define ERTS_SIG_Q_OP_MAX 13
#define ERTS_SIG_Q_OP_EXIT 0 /* Exit signal due to bif call */
#define ERTS_SIG_Q_OP_EXIT_LINKED 1 /* Exit signal due to link break*/
#define ERTS_SIG_Q_OP_MONITOR_DOWN 2
#define ERTS_SIG_Q_OP_MONITOR 3
#define ERTS_SIG_Q_OP_DEMONITOR 4
#define ERTS_SIG_Q_OP_LINK 5
#define ERTS_SIG_Q_OP_UNLINK 6
#define ERTS_SIG_Q_OP_GROUP_LEADER 7
#define ERTS_SIG_Q_OP_TRACE_CHANGE_STATE 8
#define ERTS_SIG_Q_OP_PERSISTENT_MON_MSG 9
#define ERTS_SIG_Q_OP_IS_ALIVE 10
#define ERTS_SIG_Q_OP_PROCESS_INFO 11
#define ERTS_SIG_Q_OP_SYNC_SUSPEND 12
#define ERTS_SIG_Q_OP_RPC ERTS_SIG_Q_OP_MAX
#define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 5)
#define ERTS_SIG_Q_TYPE_UNDEFINED \
(ERTS_MON_LNK_TYPE_MAX + 1)
#define ERTS_SIG_Q_TYPE_DIST_LINK \
(ERTS_MON_LNK_TYPE_MAX + 2)
#define ERTS_SIG_Q_TYPE_GEN_EXIT \
(ERTS_MON_LNK_TYPE_MAX + 3)
#define ERTS_SIG_Q_TYPE_DIST_PROC_DEMONITOR \
(ERTS_MON_LNK_TYPE_MAX + 4)
#define ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO \
ERTS_SIG_Q_TYPE_MAX
#define ERTS_SIG_IS_GEN_EXIT(sig) \
(ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT)
#define ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig) \
(ASSERT(ERTS_SIG_IS_GEN_EXIT(sig)),is_non_value(get_exit_signal_data(sig)->reason))
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler);
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_high);
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_max);
void
erts_proc_sig_queue_init(void)
{
ERTS_CT_ASSERT(ERTS_SIG_Q_OP_MASK > ERTS_SIG_Q_OP_MAX);
ERTS_CT_ASSERT(ERTS_SIG_Q_OP_MSGQ_LEN_OFFS_MARK > ERTS_SIG_Q_OP_MAX);
ERTS_CT_ASSERT(ERTS_SIG_Q_TYPE_MASK >= ERTS_SIG_Q_TYPE_MAX);
}
typedef struct {
int active;
int procs;
struct {
int active;
#if defined(USE_VM_PROBES)
int vm_probes;
char receiver_name[DTRACE_TERM_BUF_SIZE];
#endif
int receive_trace;
int bp_ix;
ErtsMessage **next;
ErtsTracingEvent *event;
} messages;
} ErtsSigRecvTracing;
typedef struct {
Eterm message;
Eterm from;
Eterm reason;
union {
Eterm ref;
int normal_kills;
} u;
} ErtsExitSignalData;
typedef struct {
Eterm message;
Eterm key;
} ErtsPersistMonMsg;
typedef struct {
ErtsSignalCommon common;
Eterm local; /* internal pid (immediate) */
Eterm remote; /* external pid (heap for it follow) */
Eterm heap[EXTERNAL_THING_HEAD_SIZE + 1];
} ErtsSigDistLinkOp;
typedef struct {
ErtsSignalCommon common;
Uint flags_on;
Uint flags_off;
Eterm tracer;
} ErtsSigTraceInfo;
#define ERTS_SIG_GL_FLG_ACTIVE (((erts_aint_t) 1) << 0)
#define ERTS_SIG_GL_FLG_RECEIVER (((erts_aint_t) 1) << 1)
#define ERTS_SIG_GL_FLG_SENDER (((erts_aint_t) 1) << 2)
typedef struct {
ErtsSignalCommon common;
erts_atomic_t flags;
Eterm group_leader;
Eterm reply_to;
Eterm ref;
ErlOffHeap oh;
Eterm heap[1];
} ErtsSigGroupLeader;
typedef struct {
Eterm message;
Eterm requester;
} ErtsIsAliveRequest;
typedef struct {
Eterm message;
Eterm requester;
int async;
} ErtsSyncSuspendRequest;
typedef struct {
ErtsMonitorSuspend *mon;
ErtsMessage *sync;
} ErtsProcSigPendingSuspend;
typedef struct {
ErtsSignalCommon common;
Sint refc;
Sint delayed_len;
Sint len_offset;
} ErtsProcSigMsgQLenOffsetMarker;
typedef struct {
ErtsSignalCommon common;
ErtsProcSigMsgQLenOffsetMarker marker;
Sint msgq_len_offset;
Eterm requester;
Eterm ref;
ErtsORefThing oref_thing;
Uint reserve_size;
Uint len;
int flags;
int item_ix[1]; /* of len size in reality... */
} ErtsProcessInfoSig;
#define ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE ((Sint) -1)
#define ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC ((Sint) -2)
typedef struct {
ErtsSignalCommon common;
Eterm requester;
Eterm (*func)(Process *, void *, int *, ErlHeapFragment **);
void *arg;
Eterm ref;
ErtsORefThing oref_thing;
} ErtsProcSigRPC;
static int handle_msg_tracing(Process *c_p,
ErtsSigRecvTracing *tracing,
ErtsMessage ***next_nm_sig);
static int handle_trace_change_state(Process *c_p,
ErtsSigRecvTracing *tracing,
Uint16 type,
ErtsMessage *sig,
ErtsMessage ***next_nm_sig);
static void getting_unlinked(Process *c_p, Eterm unlinker);
static void getting_linked(Process *c_p, Eterm linker);
static void group_leader_reply(Process *c_p, Eterm to,
Eterm ref, int success);
static int stretch_limit(Process *c_p, ErtsSigRecvTracing *tp,
int abs_lim, int *limp);
#ifdef ERTS_PROC_SIG_HARD_DEBUG
#define ERTS_PROC_SIG_HDBG_PRIV_CHKQ(P, T, NMN) \
do { \
ErtsMessage **nm_next__ = *(NMN); \
ErtsMessage **nm_last__ = (P)->sig_qs.nmsigs.last; \
if (!nm_next__ || !*nm_next__) { \
nm_next__ = NULL; \
nm_last__ = NULL; \
} \
proc_sig_hdbg_check_queue((P), \
1, \
&(P)->sig_qs.cont, \
(P)->sig_qs.cont_last, \
nm_next__, \
nm_last__, \
(T), \
NULL, \
ERTS_PSFLG_FREE); \
} while (0);
static Sint
proc_sig_hdbg_check_queue(Process *c_p,
int privq,
ErtsMessage **sig_next,
ErtsMessage **sig_last,
ErtsMessage **sig_nm_next,
ErtsMessage **sig_nm_last,
ErtsSigRecvTracing *tracing,
int *found_saved_last_p,
erts_aint32_t sig_psflg);
#else
#define ERTS_PROC_SIG_HDBG_PRIV_CHKQ(P, T, NMN)
#endif
typedef struct {
ErtsSignalCommon common;
Eterm ref;
Eterm heap[1];
} ErtsSigDistProcDemonitor;
static void
destroy_dist_proc_demonitor(ErtsSigDistProcDemonitor *dmon)
{
Eterm ref = dmon->ref;
if (is_external(ref)) {
ExternalThing *etp = external_thing_ptr(ref);
erts_deref_node_entry(etp->node, ref);
}
erts_free(ERTS_ALC_T_DIST_DEMONITOR, dmon);
}
static ERTS_INLINE ErtsSigDistLinkOp *
make_sig_dist_link_op(int op, Eterm local, Eterm remote)
{
Eterm *hp;
ErlOffHeap oh = {0};
ErtsSigDistLinkOp *sdlnk = erts_alloc(ERTS_ALC_T_SIG_DATA,
sizeof(ErtsSigDistLinkOp));
ASSERT(is_internal_pid(local));
ASSERT(is_external_pid(remote));
hp = &sdlnk->heap[0];
sdlnk->common.tag = ERTS_PROC_SIG_MAKE_TAG(op,
ERTS_SIG_Q_TYPE_DIST_LINK,
0);
sdlnk->local = local;
sdlnk->remote = STORE_NC(&hp, &oh, remote);
ASSERT(&sdlnk->heap[0] < hp);
ASSERT(hp <= &sdlnk->heap[0] + sizeof(sdlnk->heap)/sizeof(sdlnk->heap[0]));
ASSERT(boxed_val(sdlnk->remote) == &sdlnk->heap[0]);
return sdlnk;
}
static ERTS_INLINE void
destroy_sig_dist_link_op(ErtsSigDistLinkOp *sdlnk)
{
ASSERT(is_external_pid(sdlnk->remote));
ASSERT(boxed_val(sdlnk->remote) == &sdlnk->heap[0]);
erts_deref_node_entry(((ExternalThing *) &sdlnk->heap[0])->node,
make_boxed(&sdlnk->heap[0]));
erts_free(ERTS_ALC_T_SIG_DATA, sdlnk);
}
static ERTS_INLINE ErtsExitSignalData *
get_exit_signal_data(ErtsMessage *xsig)
{
ASSERT(ERTS_SIG_IS_NON_MSG(xsig));
ASSERT((ERTS_PROC_SIG_OP(((ErtsSignal *) xsig)->common.tag)
== ERTS_SIG_Q_OP_EXIT)
|| (ERTS_PROC_SIG_OP(((ErtsSignal *) xsig)->common.tag)
== ERTS_SIG_Q_OP_EXIT_LINKED)
|| (ERTS_PROC_SIG_OP(((ErtsSignal *) xsig)->common.tag)
== ERTS_SIG_Q_OP_MONITOR_DOWN));
ASSERT(xsig->hfrag.alloc_size > xsig->hfrag.used_size);
ASSERT((xsig->hfrag.alloc_size - xsig->hfrag.used_size)*sizeof(UWord)
>= sizeof(ErtsExitSignalData));
return (ErtsExitSignalData *) (char *) (&xsig->hfrag.mem[0]
+ xsig->hfrag.used_size);
}
static ERTS_INLINE void
destroy_trace_info(ErtsSigTraceInfo *ti)
{
if (is_value(ti->tracer))
erts_tracer_update(&ti->tracer, NIL);
erts_free(ERTS_ALC_T_SIG_DATA, ti);
}
static void
destroy_sig_group_leader(ErtsSigGroupLeader *sgl)
{
erts_cleanup_offheap(&sgl->oh);
erts_free(ERTS_ALC_T_SIG_DATA, sgl);
}
static ERTS_INLINE void
sig_enqueue_trace(Process *c_p, ErtsMessage **sigp, int op,
Process *rp, ErtsMessage ***last_next)
{
switch (op) {
case ERTS_SIG_Q_OP_LINK:
if (c_p
&& ((!!IS_TRACED(c_p))
& (ERTS_TRACE_FLAGS(c_p) & (F_TRACE_SOL
| F_TRACE_SOL1)))) {
ErtsSigTraceInfo *ti;
Eterm tag;
/*
* Set on link enabled.
*
* Prepend a trace-change-state signal before the
* link signal...
*/
tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_TRACE_CHANGE_STATE,
ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO,
0);
ti = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsSigTraceInfo));
ti->common.next = *sigp;
ti->common.specific.next = &ti->common.next;
ti->common.tag = tag;
ti->flags_on = ERTS_TRACE_FLAGS(c_p) & TRACEE_FLAGS;
if (!(ti->flags_on & F_TRACE_SOL1))
ti->flags_off = 0;
else {
ti->flags_off = F_TRACE_SOL1|F_TRACE_SOL;
erts_proc_lock(c_p, ERTS_PROC_LOCKS_ALL_MINOR);
ERTS_TRACE_FLAGS(c_p) &= ~(F_TRACE_SOL1|F_TRACE_SOL);
erts_proc_unlock(c_p, ERTS_PROC_LOCKS_ALL_MINOR);
}
erts_tracer_update(&ti->tracer, ERTS_TRACER(c_p));
*sigp = (ErtsMessage *) ti;
if (!*last_next || *last_next == sigp)
*last_next = &ti->common.next;
}
break;
#ifdef USE_VM_PROBES
case ERTS_SIG_Q_OP_EXIT:
case ERTS_SIG_Q_OP_EXIT_LINKED:
if (DTRACE_ENABLED(process_exit_signal)) {
ErtsMessage* sig = *sigp;
Uint16 type = ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag);
Eterm reason, from;
if (type == ERTS_SIG_Q_TYPE_GEN_EXIT) {
ErtsExitSignalData *xsigd = get_exit_signal_data(sig);
reason = xsigd->reason;
from = xsigd->from;
}
else {
ErtsLink *lnk = (ErtsLink *) sig, *olnk;
ASSERT(type == ERTS_LNK_TYPE_PROC
|| type == ERTS_LNK_TYPE_PORT
|| type == ERTS_LNK_TYPE_DIST_PROC);
olnk = erts_link_to_other(lnk, NULL);
reason = lnk->other.item;
from = olnk->other.item;
}
if (is_pid(from)) {
DTRACE_CHARBUF(sender_str, DTRACE_TERM_BUF_SIZE);
DTRACE_CHARBUF(receiver_str, DTRACE_TERM_BUF_SIZE);
DTRACE_CHARBUF(reason_buf, DTRACE_TERM_BUF_SIZE);
if (reason == am_kill) {
reason = am_killed;
}
dtrace_pid_str(from, sender_str);
dtrace_proc_str(rp, receiver_str);
erts_snprintf(reason_buf, sizeof(DTRACE_CHARBUF_NAME(reason_buf)) - 1, "%T", reason);
DTRACE3(process_exit_signal, sender_str, receiver_str, reason_buf);
}
}
break;
#endif
default:
break;
}
}
static void
sig_enqueue_trace_cleanup(ErtsMessage *first, ErtsSignal *sig)
{
ErtsMessage *tmp;
/* The usual case; no tracing signals... */
if (sig == (ErtsSignal *) first) {
ASSERT(sig->common.next == NULL);
return;
}
/* Got trace signals to clean up... */
tmp = first;
while (tmp) {
ErtsMessage *tmp_free = tmp;
tmp = tmp->next;
if (sig != (ErtsSignal *) tmp_free) {
switch (ERTS_PROC_SIG_OP(((ErtsSignal *) tmp_free)->common.tag)) {
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
destroy_trace_info((ErtsSigTraceInfo *) tmp_free);
break;
case ERTS_SIG_Q_OP_MONITOR:
break; /* ignore flushed pending signal */
default:
ERTS_INTERNAL_ERROR("Unexpected signal op");
break;
}
}
}
}
#ifdef DEBUG
static int dbg_count_nmsigs(ErtsMessage *first)
{
ErtsMessage *sig;
int cnt = 0;
for (sig = first; sig; sig = sig->next) {
if (ERTS_SIG_IS_NON_MSG(sig))
++cnt;
}
return cnt;
}
#endif
static ERTS_INLINE erts_aint32_t
enqueue_signals(Process *rp, ErtsMessage *first,
ErtsMessage **last, ErtsMessage **last_next,
Uint num_msgs,
erts_aint32_t in_state)
{
erts_aint32_t state = in_state;
ErtsMessage **this = rp->sig_inq.last;
ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp);
ASSERT(!*this);
*this = first;
rp->sig_inq.last = last;
if (!rp->sig_inq.nmsigs.next) {
ASSERT(!rp->sig_inq.nmsigs.last);
if (ERTS_SIG_IS_NON_MSG(first)) {
rp->sig_inq.nmsigs.next = this;
}
else if (last_next) {
ASSERT(first->next && ERTS_SIG_IS_NON_MSG(first->next));
rp->sig_inq.nmsigs.next = &first->next;
}
else
goto no_nmsig;
state = erts_atomic32_read_bor_nob(&rp->state,
ERTS_PSFLG_SIG_IN_Q);
no_nmsig:
ASSERT(!(state & ERTS_PSFLG_SIG_IN_Q));
}
else {
ErtsSignal *sig;
ASSERT(rp->sig_inq.nmsigs.last);
sig = (ErtsSignal *) *rp->sig_inq.nmsigs.last;
ASSERT(sig && !sig->common.specific.next);
ASSERT(state & ERTS_PSFLG_SIG_IN_Q);
if (ERTS_SIG_IS_NON_MSG(first)) {
sig->common.specific.next = this;
}
else if (last_next) {
ASSERT(first->next && ERTS_SIG_IS_NON_MSG(first->next));
sig->common.specific.next = &first->next;
}
}
if (last_next) {
ASSERT(dbg_count_nmsigs(first) >= 2);
rp->sig_inq.nmsigs.last = last_next;
}
else if (ERTS_SIG_IS_NON_MSG(first)) {
ASSERT(dbg_count_nmsigs(first) == 1);
rp->sig_inq.nmsigs.last = this;
}
else
ASSERT(dbg_count_nmsigs(first) == 0);
rp->sig_inq.len += num_msgs;
ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE(rp);
return state;
}
erts_aint32_t erts_enqueue_signals(Process *rp, ErtsMessage *first,
ErtsMessage **last, ErtsMessage **last_next,
Uint num_msgs,
erts_aint32_t in_state)
{
return enqueue_signals(rp, first, last, last_next, num_msgs, in_state);
}
void
erts_make_dirty_proc_handled(Eterm pid,
erts_aint32_t state,
erts_aint32_t prio)
{
Eterm *hp;
ErtsMessage *mp;
Process *sig_handler;
ASSERT(state & (ERTS_PSFLG_DIRTY_RUNNING |
ERTS_PSFLG_DIRTY_RUNNING_SYS));
if (prio < 0)
prio = (int) ERTS_PSFLGS_GET_USR_PRIO(state);
switch (prio) {
case PRIORITY_MAX:
sig_handler = erts_dirty_process_signal_handler_max;
break;
case PRIORITY_HIGH:
sig_handler = erts_dirty_process_signal_handler_high;
break;
default:
sig_handler = erts_dirty_process_signal_handler;
break;
}
/* Make sure signals are handled... */
mp = erts_alloc_message(0, &hp);
erts_queue_message(sig_handler, 0, mp, pid, am_system);
}
static void
check_push_msgq_len_offs_marker(Process *rp, ErtsSignal *sig);
static int
proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op)
{
int res;
Process *rp;
ErtsMessage *first, *last, **last_next, **sigp;
ErtsSchedulerData *esdp = erts_get_scheduler_data();
int is_normal_sched = !!esdp && esdp->type == ERTS_SCHED_NORMAL;
erts_aint32_t state;
ErtsSignal *pend_sig;
if (is_normal_sched) {
pend_sig = esdp->pending_signal.sig;
if (op == ERTS_SIG_Q_OP_MONITOR
&& ((ErtsMonitor*)sig)->type == ERTS_MON_TYPE_PROC) {
if (!pend_sig) {
esdp->pending_signal.sig = sig;
esdp->pending_signal.to = pid;
#ifdef DEBUG
esdp->pending_signal.dbg_from = esdp->current_process;
#endif
return 1;
}
ASSERT(esdp->pending_signal.dbg_from == esdp->current_process);
if (pend_sig != sig) {
/* Switch them and send previously pending signal instead */
Eterm pend_to = esdp->pending_signal.to;
esdp->pending_signal.sig = sig;
esdp->pending_signal.to = pid;
sig = pend_sig;
pid = pend_to;
}
else {
/* Caller wants to flush pending signal */
ASSERT(pid == esdp->pending_signal.to);
esdp->pending_signal.sig = NULL;
esdp->pending_signal.to = THE_NON_VALUE;
#ifdef DEBUG
esdp->pending_signal.dbg_from = NULL;
#endif
pend_sig = NULL;
}
rp = erts_proc_lookup_raw(pid);
if (!rp) {
erts_proc_sig_send_monitor_down((ErtsMonitor*)sig, am_noproc);
return 1;
}
}
else if (pend_sig && pid == esdp->pending_signal.to) {
/* Flush pending signal to maintain signal order */
esdp->pending_signal.sig = NULL;
esdp->pending_signal.to = THE_NON_VALUE;
rp = erts_proc_lookup_raw(pid);
if (!rp) {
erts_proc_sig_send_monitor_down((ErtsMonitor*)pend_sig, am_noproc);
return 0;
}
/* Prepend pending signal */
pend_sig->common.next = (ErtsMessage*) sig;
pend_sig->common.specific.next = &pend_sig->common.next;
first = (ErtsMessage*) pend_sig;
last = (ErtsMessage*) sig;
sigp = last_next = &pend_sig->common.next;
goto first_last_done;
}
else {
pend_sig = NULL;
rp = erts_proc_lookup_raw(pid);
if (!rp)
return 0;
}
}
else {
rp = erts_proc_lookup_raw_inc_refc(pid);
if (!rp)
return 0;
pend_sig = NULL;
}
first = last = (ErtsMessage *) sig;
last_next = NULL;
sigp = &first;
first_last_done:
sig->common.specific.next = NULL;
/* may add signals before sig */
sig_enqueue_trace(c_p, sigp, op, rp, &last_next);
last->next = NULL;
erts_proc_lock(rp, ERTS_PROC_LOCK_MSGQ);
state = erts_atomic32_read_nob(&rp->state);
if (ERTS_PSFLG_FREE & state)
res = 0;
else {
state = enqueue_signals(rp, first, &last->next, last_next, 0, state);
if (ERTS_UNLIKELY(op == ERTS_SIG_Q_OP_PROCESS_INFO))
check_push_msgq_len_offs_marker(rp, sig);
res = !0;
}
erts_proc_unlock(rp, ERTS_PROC_LOCK_MSGQ);
if (res == 0) {
sig_enqueue_trace_cleanup(first, sig);
if (pend_sig) {
erts_proc_sig_send_monitor_down((ErtsMonitor*)pend_sig, am_noproc);
if (sig == pend_sig) {
/* We did a switch, callers signal is now pending (still ok) */
ASSERT(esdp->pending_signal.sig);
res = 1;
}
}
}
else
erts_proc_notify_new_sig(rp, state, 0);
if (!is_normal_sched)
erts_proc_dec_refc(rp);
return res;
}
void erts_proc_sig_send_pending(ErtsSchedulerData* esdp)
{
ErtsSignal* sig = esdp->pending_signal.sig;
int op;
ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL);
ASSERT(sig);
ASSERT(is_internal_pid(esdp->pending_signal.to));
op = ERTS_SIG_Q_OP_MONITOR;
ASSERT(op == ERTS_PROC_SIG_OP(sig->common.tag));
if (!proc_queue_signal(NULL, esdp->pending_signal.to, sig, op)) {
ErtsMonitor* mon = (ErtsMonitor*)sig;
erts_proc_sig_send_monitor_down(mon, am_noproc);
}
}
static int
maybe_elevate_sig_handling_prio(Process *c_p, Eterm other)
{
/*
* returns:
* > 0 -> elevated prio; process alive or exiting
* < 0 -> no elevation needed; process alive or exiting
* 0 -> process terminated (free)
*/
int res;
Process *rp;
erts_aint32_t state, my_prio, other_prio;
rp = erts_proc_lookup_raw(other);
if (!rp)
res = 0;
else {
res = -1;
state = erts_atomic32_read_nob(&c_p->state);
my_prio = ERTS_PSFLGS_GET_USR_PRIO(state);
state = erts_atomic32_read_nob(&rp->state);
other_prio = ERTS_PSFLGS_GET_USR_PRIO(state);
if (other_prio > my_prio) {
/* Others prio is lower than mine; elevate it... */
res = !!erts_sig_prio(other, my_prio);
if (res) {
/* ensure handled if dirty executing... */
state = erts_atomic32_read_nob(&rp->state);
if (state & (ERTS_PSFLG_DIRTY_RUNNING
| ERTS_PSFLG_DIRTY_RUNNING_SYS)) {
erts_make_dirty_proc_handled(other, state, my_prio);
}
}
}
}
return res;
}
void
erts_proc_sig_fetch__(Process *proc)
{
ASSERT(proc->sig_inq.first);
if (!proc->sig_inq.nmsigs.next) {
ASSERT(!(ERTS_PSFLG_SIG_IN_Q
& erts_atomic32_read_nob(&proc->state)));
ASSERT(!proc->sig_inq.nmsigs.last);
if (proc->sig_qs.cont || ERTS_MSG_RECV_TRACED(proc)) {
*proc->sig_qs.cont_last = proc->sig_inq.first;
proc->sig_qs.cont_last = proc->sig_inq.last;
}
else {
*proc->sig_qs.last = proc->sig_inq.first;
proc->sig_qs.last = proc->sig_inq.last;
}
}
else {
erts_aint32_t s;
ASSERT(proc->sig_inq.nmsigs.last);
if (!proc->sig_qs.nmsigs.last) {
ASSERT(!proc->sig_qs.nmsigs.next);
if (proc->sig_inq.nmsigs.next == &proc->sig_inq.first)
proc->sig_qs.nmsigs.next = proc->sig_qs.cont_last;
else
proc->sig_qs.nmsigs.next = proc->sig_inq.nmsigs.next;
s = erts_atomic32_read_bset_nob(&proc->state,
(ERTS_PSFLG_SIG_Q
| ERTS_PSFLG_SIG_IN_Q),
ERTS_PSFLG_SIG_Q);
ASSERT((s & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q))
== ERTS_PSFLG_SIG_IN_Q); (void)s;
}
else {
ErtsSignal *sig;
ASSERT(proc->sig_qs.nmsigs.next);
sig = ((ErtsSignal *) *proc->sig_qs.nmsigs.last);
ASSERT(ERTS_SIG_IS_NON_MSG(sig));
ASSERT(!sig->common.specific.next);
if (proc->sig_inq.nmsigs.next == &proc->sig_inq.first)
sig->common.specific.next = proc->sig_qs.cont_last;
else
sig->common.specific.next = proc->sig_inq.nmsigs.next;
s = erts_atomic32_read_band_nob(&proc->state,
~ERTS_PSFLG_SIG_IN_Q);
ASSERT((s & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q))
== (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)); (void)s;
}
if (proc->sig_inq.nmsigs.last == &proc->sig_inq.first)
proc->sig_qs.nmsigs.last = proc->sig_qs.cont_last;
else
proc->sig_qs.nmsigs.last = proc->sig_inq.nmsigs.last;
proc->sig_inq.nmsigs.next = NULL;
proc->sig_inq.nmsigs.last = NULL;
*proc->sig_qs.cont_last = proc->sig_inq.first;
proc->sig_qs.cont_last = proc->sig_inq.last;
}
proc->sig_qs.len += proc->sig_inq.len;
proc->sig_inq.first = NULL;
proc->sig_inq.last = &proc->sig_inq.first;
proc->sig_inq.len = 0;
}
Sint
erts_proc_sig_fetch_msgq_len_offs__(Process *proc)
{
ErtsProcSigMsgQLenOffsetMarker *marker
= (ErtsProcSigMsgQLenOffsetMarker *) proc->sig_inq.first;
ASSERT(marker->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK);
if (marker->common.next) {
Sint len;
proc->flags |= F_DELAYED_PSIGQS_LEN;
/*
* Prevent update of sig_qs.len in fetch. These
* updates are done via process-info signal(s)
* instead...
*/
len = proc->sig_inq.len;
marker->delayed_len += len;
marker->len_offset -= len;
proc->sig_inq.len = 0;
/*
* Temorarily remove marker during fetch...
*/
proc->sig_inq.first = marker->common.next;
if (proc->sig_inq.last == &marker->common.next)
proc->sig_inq.last = &proc->sig_inq.first;
if (proc->sig_inq.nmsigs.next == &marker->common.next)
proc->sig_inq.nmsigs.next = &proc->sig_inq.first;
if (proc->sig_inq.nmsigs.last == &marker->common.next)
proc->sig_inq.nmsigs.last = &proc->sig_inq.first;
erts_proc_sig_fetch__(proc);
marker->common.next = NULL;
proc->sig_inq.first = (ErtsMessage *) marker;
proc->sig_inq.last = &marker->common.next;
}
return marker->delayed_len;
}
static ERTS_INLINE Sint
proc_sig_privqs_len(Process *c_p, int have_qlock)
{
Sint res = c_p->sig_qs.len;
ERTS_LC_ASSERT(!have_qlock
? (ERTS_PROC_LOCK_MAIN
== erts_proc_lc_my_proc_locks(c_p))
: ((ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_MAIN)
== ((ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_MAIN)
& erts_proc_lc_my_proc_locks(c_p))));
if (c_p->flags & F_DELAYED_PSIGQS_LEN) {
ErtsProcSigMsgQLenOffsetMarker *marker;
if (!have_qlock)
erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
marker = (ErtsProcSigMsgQLenOffsetMarker *) c_p->sig_inq.first;
ASSERT(marker);
ASSERT(marker->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK);
res += marker->delayed_len;
if (!have_qlock)
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
}
#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN
{
Sint len = 0;
ERTS_FOREACH_SIG_PRIVQS(
c_p, mp,
{
if (ERTS_SIG_IS_MSG(mp))
len++;
});
ERTS_ASSERT(res == len);
}
#endif
return res;
}
Sint
erts_proc_sig_privqs_len(Process *c_p)
{
return proc_sig_privqs_len(c_p, 0);
}
ErtsDistExternal *
erts_proc_sig_get_external(ErtsMessage *msgp)
{
if (ERTS_SIG_IS_EXTERNAL_MSG(msgp)) {
return erts_get_dist_ext(msgp->data.heap_frag);
} else if (ERTS_SIG_IS_NON_MSG(msgp) &&
ERTS_SIG_IS_GEN_EXIT(msgp) &&
ERTS_SIG_IS_GEN_EXIT_EXTERNAL(msgp)) {
ErtsDistExternal *edep;
ErtsExitSignalData *xsigd = get_exit_signal_data(msgp);
ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT);
ASSERT(is_non_value(xsigd->reason));
if (msgp->hfrag.next == NULL)
edep = (ErtsDistExternal*)(xsigd + 1);
else
edep = erts_get_dist_ext(msgp->hfrag.next);
return edep;
}
return NULL;
}
static void do_seq_trace_output(Eterm to, Eterm token, Eterm msg);
static void
send_gen_exit_signal(Process *c_p, Eterm from_tag,
Eterm from, Eterm to,
Sint16 op, Eterm reason, ErtsDistExternal *dist_ext,
ErlHeapFragment *dist_ext_hfrag,
Eterm ref, Eterm token, int normal_kills)
{
ErtsExitSignalData *xsigd;
Eterm *hp, *start_hp, s_reason, s_ref, s_message, s_token, s_from;
ErtsMessage *mp;
ErlHeapFragment *hfrag;
ErlOffHeap *ohp;
Uint hsz, from_sz, reason_sz, ref_sz, token_sz, dist_ext_sz;
int seq_trace;
#ifdef USE_VM_PROBES
Eterm s_utag, utag;
Uint utag_sz;
#endif
ASSERT((is_value(reason) && dist_ext == NULL) ||
(is_non_value(reason) && dist_ext != NULL));
ASSERT(is_immed(from_tag));
hsz = sizeof(ErtsExitSignalData)/sizeof(Eterm);
seq_trace = c_p && have_seqtrace(token);
if (seq_trace)
seq_trace_update_send(c_p);
#ifdef USE_VM_PROBES
utag_sz = 0;
utag = NIL;
if (c_p && token != NIL && (DT_UTAG_FLAGS(c_p) & DT_UTAG_SPREADING)) {
utag_sz = size_object(DT_UTAG(c_p));
utag = DT_UTAG(c_p);
}
else if (token == am_have_dt_utag) {
token = NIL;
}
hsz += utag_sz;
#endif
token_sz = size_object(token);
hsz += token_sz;
from_sz = size_object(from);
hsz += from_sz;
ref_sz = size_object(ref);
hsz += ref_sz;
/* The reason was part of the control message,
just use copy it into the xsigd */
if (is_value(reason)) {
reason_sz = size_object(reason);
hsz += reason_sz;
switch (op) {
case ERTS_SIG_Q_OP_EXIT:
case ERTS_SIG_Q_OP_EXIT_LINKED: {
/* {'EXIT', From, Reason} */
hsz += 4; /* 3-tuple */
break;
}
case ERTS_SIG_Q_OP_MONITOR_DOWN: {
/* {'DOWN', Ref, process, From, Reason} */
hsz += 6; /* 5-tuple */
break;
}
default:
ERTS_INTERNAL_ERROR("Invalid exit signal op");
break;
}
} else if (dist_ext != NULL && dist_ext_hfrag == NULL) {
/* The message was not fragmented so we need to create space
for a single dist_ext element */
dist_ext_sz = erts_dist_ext_size(dist_ext) / sizeof(Eterm);
hsz += dist_ext_sz;
}
/*
* Allocate message combined with heap fragment...
*/
mp = erts_alloc_message(hsz, &hp);
hfrag = &mp->hfrag;
mp->next = NULL;
ohp = &hfrag->off_heap;
start_hp = hp;
s_token = copy_struct(token, token_sz, &hp, ohp);
s_from = copy_struct(from, from_sz, &hp, ohp);
s_ref = copy_struct(ref, ref_sz, &hp, ohp);
if (is_value(reason)) {
s_reason = copy_struct(reason, reason_sz, &hp, ohp);
switch (op) {
case ERTS_SIG_Q_OP_EXIT:
case ERTS_SIG_Q_OP_EXIT_LINKED:
/* {'EXIT', From, Reason} */
s_message = TUPLE3(hp, am_EXIT, s_from, s_reason);
hp += 4;
break;
case ERTS_SIG_Q_OP_MONITOR_DOWN:
/* {'DOWN', Ref, process, From, Reason} */
s_message = TUPLE5(hp, am_DOWN, s_ref, am_process, s_from, s_reason);
hp += 6;
break;
default:
/* This cannot happen, used to silence gcc warning */
s_message = THE_NON_VALUE;
break;
}
} else {
s_message = THE_NON_VALUE;
s_reason = THE_NON_VALUE;
}
#ifdef USE_VM_PROBES
s_utag = (is_immed(utag)
? utag
: copy_struct(utag, utag_sz, &hp, ohp));
ERL_MESSAGE_DT_UTAG(mp) = s_utag;
#endif
ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(op,
ERTS_SIG_Q_TYPE_GEN_EXIT,
0);
ERL_MESSAGE_TOKEN(mp) = s_token;
ERL_MESSAGE_FROM(mp) = from_tag; /* immediate... */
hfrag->used_size = hp - start_hp;
xsigd = (ErtsExitSignalData *) hp;
xsigd->message = s_message;
xsigd->from = s_from;
xsigd->reason = s_reason;
hfrag->next = dist_ext_hfrag;
if (is_nil(s_ref))
xsigd->u.normal_kills = normal_kills;
else {
ASSERT(is_ref(s_ref));
xsigd->u.ref = s_ref;
}
hp += sizeof(ErtsExitSignalData)/sizeof(Eterm);
if (dist_ext != NULL && dist_ext_hfrag == NULL && is_non_value(reason)) {
erts_make_dist_ext_copy(dist_ext, (ErtsDistExternal *) hp);
hp += dist_ext_sz;
}
ASSERT(hp == mp->hfrag.mem + mp->hfrag.alloc_size);
if (seq_trace)
do_seq_trace_output(to, s_token, s_message);
if (!proc_queue_signal(c_p, to, (ErtsSignal *) mp, op)) {
mp->next = NULL;
erts_cleanup_messages(mp);
}
}
static void
do_seq_trace_output(Eterm to, Eterm token, Eterm msg)
{
/*
* We could do this when enqueuing the signal and avoid some
* locking. However, the enqueuing code would then always
* have the penalty of this seq-tracing code which we do not
* want...
*/
ErtsSchedulerData *esdp = erts_get_scheduler_data();
int is_normal_sched = !!esdp && esdp->type == ERTS_SCHED_NORMAL;
Process *rp;
if (is_normal_sched)
rp = erts_proc_lookup_raw(to);
else
rp = erts_proc_lookup_raw_inc_refc(to);
if (rp) {
erts_proc_lock(rp, ERTS_PROC_LOCK_MSGQ);
if (!ERTS_PROC_IS_EXITING(rp))
seq_trace_output(token, msg, SEQ_TRACE_SEND, to, rp);
erts_proc_unlock(rp, ERTS_PROC_LOCK_MSGQ);
if (!is_normal_sched)
erts_proc_dec_refc(rp);
}
}
void
erts_proc_sig_send_persistent_monitor_msg(Uint16 type, Eterm key,
Eterm from, Eterm to,
Eterm msg, Uint msg_sz)
{
ErtsPersistMonMsg *prst_mon;
ErtsMessage *mp;
ErlHeapFragment *hfrag;
Eterm *hp, *start_hp, message;
ErlOffHeap *ohp;
Uint hsz = sizeof(ErtsPersistMonMsg) + msg_sz;
/*
* Allocate message combined with heap fragment...
*/
mp = erts_alloc_message(hsz, &hp);
hfrag = &mp->hfrag;
mp->next = NULL;
ohp = &hfrag->off_heap;
start_hp = hp;
ASSERT(msg_sz == size_object(msg));
message = copy_struct(msg, msg_sz, &hp, ohp);
hfrag->used_size = hp - start_hp;
prst_mon = (ErtsPersistMonMsg *) (char *) hp;
prst_mon->message = message;
switch (type) {
case ERTS_MON_TYPE_NODES:
ASSERT(is_small(key));
prst_mon->key = key;
break;
case ERTS_MON_TYPE_TIME_OFFSET:
ASSERT(is_internal_ref(key));
ASSERT(is_tuple_arity(message, 5));
prst_mon->key = tuple_val(message)[2];
ASSERT(eq(prst_mon->key, key));
break;
default:
ERTS_INTERNAL_ERROR("Invalid persistent monitor type");
prst_mon->key = key;
break;
}
ASSERT(is_immed(from));
ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_PERSISTENT_MON_MSG,
type, 0);
ERL_MESSAGE_FROM(mp) = from;
ERL_MESSAGE_TOKEN(mp) = am_undefined;
if (!proc_queue_signal(NULL, to, (ErtsSignal *) mp,
ERTS_SIG_Q_OP_PERSISTENT_MON_MSG)) {
mp->next = NULL;
erts_cleanup_messages(mp);
}
}
static ERTS_INLINE Eterm
get_persist_mon_msg(ErtsMessage *sig, Eterm *msg)
{
ErtsPersistMonMsg *prst_mon;
prst_mon = ((ErtsPersistMonMsg *)
(char *) (&sig->hfrag.mem[0]
+ sig->hfrag.used_size));
*msg = prst_mon->message;
return prst_mon->key;
}
void
erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to,
Eterm reason, Eterm token,
int normal_kills)
{
Eterm from_tag;
ASSERT(!c_p || c_p->common.id == from);
if (is_immed(from)) {
ASSERT(is_internal_pid(from) || is_internal_port(from));
from_tag = from;
}
else {
DistEntry *dep;
ASSERT(is_external_pid(from));
dep = external_pid_dist_entry(from);
from_tag = dep->sysname;
}
send_gen_exit_signal(c_p, from_tag, from, to, ERTS_SIG_Q_OP_EXIT,
reason, NULL, NULL, NIL, token, normal_kills);
}
void
erts_proc_sig_send_dist_exit(DistEntry *dep,
Eterm from, Eterm to,
ErtsDistExternal *dist_ext,
ErlHeapFragment *hfrag,
Eterm reason, Eterm token)
{
send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT,
reason, dist_ext, hfrag, NIL, token, 0);
}
void
erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk,
Eterm reason, Eterm token)
{
Eterm to;
ASSERT(!c_p || c_p->common.id == from);
ASSERT(lnk);
to = lnk->other.item;
if (is_not_immed(reason) || is_not_nil(token)) {
ASSERT(is_internal_pid(from) || is_internal_port(from));
send_gen_exit_signal(c_p, from, from, to, ERTS_SIG_Q_OP_EXIT_LINKED,
reason, NULL, NULL, NIL, token, 0);
}
else {
/* Pass signal using old link structure... */
ErtsSignal *sig = (ErtsSignal *) lnk;
lnk->other.item = reason; /* pass reason via this other.item */
sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_EXIT_LINKED,
lnk->type, 0);
if (proc_queue_signal(c_p, to, sig, ERTS_SIG_Q_OP_EXIT_LINKED))
return; /* receiver will destroy lnk structure */
}
if (lnk)
erts_link_release(lnk);
}
int
erts_proc_sig_send_link(Process *c_p, Eterm to, ErtsLink *lnk)
{
ErtsSignal *sig;
Uint16 type = lnk->type;
ASSERT(!c_p || c_p->common.id == lnk->other.item);
ASSERT(lnk);
ASSERT(is_internal_pid(to));
sig = (ErtsSignal *) lnk;
sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_LINK,
type, 0);
return proc_queue_signal(c_p, to, sig, ERTS_SIG_Q_OP_LINK);
}
void
erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk)
{
ErtsSignal *sig;
Eterm to;
ASSERT(lnk);
sig = (ErtsSignal *) lnk;
to = lnk->other.item;
ASSERT(is_internal_pid(to));
sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_UNLINK,
lnk->type, 0);
if (!proc_queue_signal(c_p, to, sig, ERTS_SIG_Q_OP_UNLINK))
erts_link_release(lnk);
}
void
erts_proc_sig_send_dist_link_exit(DistEntry *dep,
Eterm from, Eterm to,
ErtsDistExternal *dist_ext,
ErlHeapFragment *hfrag,
Eterm reason, Eterm token)
{
send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT_LINKED,
reason, dist_ext, hfrag, NIL, token, 0);
}
void
erts_proc_sig_send_dist_unlink(DistEntry *dep, Eterm from, Eterm to)
{
ErtsSignal *sig;
ASSERT(is_internal_pid(to));
ASSERT(is_external_pid(from));
ASSERT(dep == external_pid_dist_entry(from));
sig = (ErtsSignal *) make_sig_dist_link_op(ERTS_SIG_Q_OP_UNLINK,
to, from);
if (!proc_queue_signal(NULL, to, sig, ERTS_SIG_Q_OP_UNLINK))
destroy_sig_dist_link_op((ErtsSigDistLinkOp *) sig);
}
void
erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref,
Eterm from, Eterm to,
ErtsDistExternal *dist_ext,
ErlHeapFragment *hfrag,
Eterm reason)
{
Eterm monitored, heap[3];
if (is_atom(from))
monitored = TUPLE2(&heap[0], from, dep->sysname);
else
monitored = from;
send_gen_exit_signal(NULL, dep->sysname, monitored,
to, ERTS_SIG_Q_OP_MONITOR_DOWN,
reason, dist_ext, hfrag, ref, NIL, 0);
}
void
erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason)
{
Eterm to;
ASSERT(erts_monitor_is_target(mon));
ASSERT(!erts_monitor_is_in_table(mon));
to = mon->other.item;
ASSERT(is_internal_pid(to));
if (is_immed(reason)) {
/* Pass signal using old monitor structure... */
ErtsSignal *sig;
send_using_monitor_struct:
mon->other.item = reason; /* Pass immed reason via other.item... */
sig = (ErtsSignal *) mon;
sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_MONITOR_DOWN,
mon->type, 0);
if (proc_queue_signal(NULL, to, sig, ERTS_SIG_Q_OP_MONITOR_DOWN))
return; /* receiver will destroy mon structure */
}
else {
ErtsMonitorData *mdp = erts_monitor_to_data(mon);
Eterm from_tag, monitored, heap[3];
if (mon->type == ERTS_MON_TYPE_SUSPEND) {
/*
* Set reason to 'undefined', since exit
* reason is not used for suspend monitors,
* and send using monitor structure. This
* since we don't want to trigger
* unnecessary memory allocation etc...
*/
reason = am_undefined;
goto send_using_monitor_struct;
}
if (!(mon->flags & ERTS_ML_FLG_NAME)) {
from_tag = monitored = mdp->origin.other.item;
if (is_external_pid(from_tag)) {
DistEntry *dep = external_pid_dist_entry(from_tag);
from_tag = dep->sysname;
}
}
else {
ErtsMonitorDataExtended *mdep;
Eterm name, node;
mdep = (ErtsMonitorDataExtended *) mdp;
name = mdep->u.name;
ASSERT(is_atom(name));
if (mdep->dist) {
node = mdep->dist->nodename;
from_tag = node;
}
else {
node = erts_this_dist_entry->sysname;
from_tag = mdp->origin.other.item;
}
ASSERT(is_internal_port(from_tag)
|| is_internal_pid(from_tag)
|| is_atom(from_tag));
monitored = TUPLE2(&heap[0], name, node);
}
send_gen_exit_signal(NULL, from_tag, monitored,
to, ERTS_SIG_Q_OP_MONITOR_DOWN,
reason, NULL, NULL, mdp->ref, NIL, 0);
}
erts_monitor_release(mon);
}
void
erts_proc_sig_send_dist_demonitor(Eterm to, Eterm ref)
{
ErtsSigDistProcDemonitor *dmon;
ErtsSignal *sig;
Eterm *hp;
ErlOffHeap oh;
size_t size;
ERTS_INIT_OFF_HEAP(&oh);
ASSERT(is_internal_pid(to));
size = sizeof(ErtsSigDistProcDemonitor) - sizeof(Eterm);
ASSERT(is_ref(ref));
size += NC_HEAP_SIZE(ref)*sizeof(Eterm);
dmon = erts_alloc(ERTS_ALC_T_DIST_DEMONITOR, size);
hp = &dmon->heap[0];
dmon->ref = STORE_NC(&hp, &oh, ref);
sig = (ErtsSignal *) dmon;
sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_DEMONITOR,
ERTS_SIG_Q_TYPE_DIST_PROC_DEMONITOR,
0);
if (!proc_queue_signal(NULL, to, sig, ERTS_SIG_Q_OP_DEMONITOR))
destroy_dist_proc_demonitor(dmon);
}
void
erts_proc_sig_send_demonitor(ErtsMonitor *mon)
{
ErtsSignal *sig = (ErtsSignal *) mon;
Uint16 type = mon->type;
Eterm to = mon->other.item;
ASSERT(is_internal_pid(to));
ASSERT(erts_monitor_is_origin(mon));
ASSERT(!erts_monitor_is_in_table(mon));
sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_DEMONITOR,
type, 0);
if (!proc_queue_signal(NULL, to, sig, ERTS_SIG_Q_OP_DEMONITOR))
erts_monitor_release(mon);
}
int
erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to)
{
ErtsSignal *sig = (ErtsSignal *) mon;
Uint16 type = mon->type;
ASSERT(is_internal_pid(to) || to == am_undefined);
ASSERT(erts_monitor_is_target(mon));
sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_MONITOR,
type, 0);
return proc_queue_signal(NULL, to, sig, ERTS_SIG_Q_OP_MONITOR);
}
void
erts_proc_sig_send_trace_change(Eterm to, Uint on, Uint off, Eterm tracer)
{
ErtsSigTraceInfo *ti;
Eterm tag;
ti = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsSigTraceInfo));
tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_TRACE_CHANGE_STATE,
ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO,
0);
ti->common.tag = tag;
ti->flags_off = off;
ti->flags_on = on;
ti->tracer = NIL;
if (is_not_nil(tracer))
erts_tracer_update(&ti->tracer, tracer);
if (!proc_queue_signal(NULL, to, (ErtsSignal *) ti,
ERTS_SIG_Q_OP_TRACE_CHANGE_STATE))
destroy_trace_info(ti);
}
void
erts_proc_sig_send_group_leader(Process *c_p, Eterm to, Eterm gl, Eterm ref)
{
int res;
ErtsSigGroupLeader *sgl;
Eterm *hp;
Uint gl_sz, ref_sz, size;
erts_aint_t init_flags = ERTS_SIG_GL_FLG_ACTIVE|ERTS_SIG_GL_FLG_RECEIVER;
if (c_p)
init_flags |= ERTS_SIG_GL_FLG_SENDER;
ASSERT(c_p ? is_internal_ref(ref) : ref == NIL);
gl_sz = is_immed(gl) ? 0 : size_object(gl);
ref_sz = is_immed(ref) ? 0 : size_object(ref);
size = sizeof(ErtsSigGroupLeader);
size += (gl_sz + ref_sz - 1) * sizeof(Eterm);
sgl = erts_alloc(ERTS_ALC_T_SIG_DATA, size);
erts_atomic_init_nob(&sgl->flags, init_flags);
ERTS_INIT_OFF_HEAP(&sgl->oh);
hp = &sgl->heap[0];
sgl->group_leader = is_immed(gl) ? gl : copy_struct(gl, gl_sz, &hp, &sgl->oh);
sgl->reply_to = c_p ? c_p->common.id : NIL;
sgl->ref = is_immed(ref) ? ref : copy_struct(ref, ref_sz, &hp, &sgl->oh);
sgl->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_GROUP_LEADER,
ERTS_SIG_Q_TYPE_UNDEFINED,
0);
res = proc_queue_signal(c_p, to, (ErtsSignal *) sgl,
ERTS_SIG_Q_OP_GROUP_LEADER);
if (!res)
destroy_sig_group_leader(sgl);
else if (c_p) {
erts_aint_t flags, rm_flags = ERTS_SIG_GL_FLG_SENDER;
int prio_res = maybe_elevate_sig_handling_prio(c_p, to);
if (!prio_res)
rm_flags |= ERTS_SIG_GL_FLG_ACTIVE;
flags = erts_atomic_read_band_nob(&sgl->flags, ~rm_flags);
if (!prio_res && (flags & ERTS_SIG_GL_FLG_ACTIVE))
res = 0; /* We deactivated signal... */
if ((flags & ~rm_flags) == 0)
destroy_sig_group_leader(sgl);
}
if (!res && c_p)
group_leader_reply(c_p, c_p->common.id, ref, 0);
}
void
erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref)
{
ErlHeapFragment *hfrag;
Uint hsz;
Eterm *hp, *start_hp, ref_cpy, msg;
ErlOffHeap *ohp;
ErtsMessage *mp;
ErtsIsAliveRequest *alive_req;
ASSERT(is_internal_ordinary_ref(ref));
hsz = ERTS_REF_THING_SIZE + 3 + sizeof(ErtsIsAliveRequest)/sizeof(Eterm);
mp = erts_alloc_message(hsz, &hp);
hfrag = &mp->hfrag;
mp->next = NULL;
ohp = &hfrag->off_heap;
start_hp = hp;
ref_cpy = STORE_NC(&hp, ohp, ref);
msg = TUPLE2(hp, ref_cpy, am_false); /* default res 'false' */
hp += 3;
hfrag->used_size = hp - start_hp;
alive_req = (ErtsIsAliveRequest *) (char *) hp;
alive_req->message = msg;
alive_req->requester = c_p->common.id;
ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_IS_ALIVE,
ERTS_SIG_Q_TYPE_UNDEFINED,
0);
if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_IS_ALIVE))
(void) maybe_elevate_sig_handling_prio(c_p, to);
else {
/* It wasn't alive; reply to ourselves... */
mp->next = NULL;
mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
erts_queue_message(c_p, ERTS_PROC_LOCK_MAIN, mp, msg, am_system);
}
}
int
erts_proc_sig_send_process_info_request(Process *c_p,
Eterm to,
int *item_ix,
int len,
int need_msgq_len,
int flags,
Uint reserve_size,
Eterm ref)
{
Uint size = sizeof(ErtsProcessInfoSig) + (len - 1) * sizeof(int);
ErtsProcessInfoSig *pis = erts_alloc(ERTS_ALC_T_SIG_DATA, size);
int res;
ASSERT(c_p);
ASSERT(item_ix);
ASSERT(len > 0);
ASSERT(is_internal_ordinary_ref(ref));
pis->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_PROCESS_INFO,
0, 0);
if (!need_msgq_len)
pis->msgq_len_offset = ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE;
else {
pis->msgq_len_offset = ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC;
pis->marker.common.next = NULL;
pis->marker.common.specific.next = NULL;
pis->marker.common.tag = ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK;
pis->marker.refc = 0;
pis->marker.delayed_len = 0;
pis->marker.len_offset = 0;
}
pis->requester = c_p->common.id;
sys_memcpy((void *) &pis->oref_thing,
(void *) internal_ref_val(ref),
sizeof(ErtsORefThing));
pis->ref = make_internal_ref((char *) &pis->oref_thing);
pis->reserve_size = reserve_size;
pis->len = len;
pis->flags = flags;
sys_memcpy((void *) &pis->item_ix[0],
(void *) item_ix,
sizeof(int)*len);
res = proc_queue_signal(c_p, to, (ErtsSignal *) pis,
ERTS_SIG_Q_OP_PROCESS_INFO);
if (res)
(void) maybe_elevate_sig_handling_prio(c_p, to);
else
erts_free(ERTS_ALC_T_SIG_DATA, pis);
return res;
}
void
erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, Eterm tag, Eterm reply)
{
ErlHeapFragment *hfrag;
Uint hsz, tag_sz;
Eterm *hp, *start_hp, tag_cpy, msg, default_reply;
ErlOffHeap *ohp;
ErtsMessage *mp;
ErtsSyncSuspendRequest *ssusp;
int async_suspend;
tag_sz = size_object(tag);
hsz = 3 + tag_sz + sizeof(ErtsSyncSuspendRequest)/sizeof(Eterm);
mp = erts_alloc_message(hsz, &hp);
hfrag = &mp->hfrag;
mp->next = NULL;
ohp = &hfrag->off_heap;
start_hp = hp;
tag_cpy = copy_struct(tag, tag_sz, &hp, ohp);
async_suspend = is_non_value(reply);
default_reply = async_suspend ? am_suspended : reply;
msg = TUPLE2(hp, tag_cpy, default_reply);
hp += 3;
hfrag->used_size = hp - start_hp;
ssusp = (ErtsSyncSuspendRequest *) (char *) hp;
ssusp->message = msg;
ssusp->requester = c_p->common.id;
ssusp->async = async_suspend;
ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_SYNC_SUSPEND,
ERTS_SIG_Q_TYPE_UNDEFINED,
0);
if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_SYNC_SUSPEND))
(void) maybe_elevate_sig_handling_prio(c_p, to);
else {
Eterm *tp;
/* It wasn't alive; reply to ourselves... */
mp->next = NULL;
mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
tp = tuple_val(msg);
tp[2] = async_suspend ? am_badarg : am_exited;
erts_queue_message(c_p, ERTS_PROC_LOCK_MAIN,
mp, msg, am_system);
}
}
Eterm
erts_proc_sig_send_rpc_request(Process *c_p,
Eterm to,
int reply,
Eterm (*func)(Process *, void *, int *, ErlHeapFragment **),
void *arg)
{
Eterm res;
ErtsProcSigRPC *sig = erts_alloc(ERTS_ALC_T_SIG_DATA,
sizeof(ErtsProcSigRPC));
sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_RPC,
ERTS_SIG_Q_TYPE_UNDEFINED,
0);
sig->requester = reply ? c_p->common.id : NIL;
sig->func = func;
sig->arg = arg;
if (!reply) {
res = am_ok;
sig->ref = am_ok;
}
else {
res = erts_make_ref(c_p);
sys_memcpy((void *) &sig->oref_thing,
(void *) internal_ref_val(res),
sizeof(ErtsORefThing));
sig->ref = make_internal_ref(&sig->oref_thing);
ERTS_RECV_MARK_SAVE(c_p);
ERTS_RECV_MARK_SET(c_p);
}
if (proc_queue_signal(c_p, to, (ErtsSignal *) sig, ERTS_SIG_Q_OP_RPC))
(void) maybe_elevate_sig_handling_prio(c_p, to);
else {
erts_free(ERTS_ALC_T_SIG_DATA, sig);
res = THE_NON_VALUE;
if (reply)
JOIN_MESSAGE(c_p);
}
return res;
}
static int
handle_rpc(Process *c_p, ErtsProcSigRPC *rpc, int cnt, int limit, int *yieldp)
{
Process *rp;
ErlHeapFragment *bp = NULL;
Eterm res;
Uint hsz;
int reds, out_cnt;
/*
* reds in:
* Reductions left.
*
* reds out:
* Absolute value of reds out equals consumed
* amount of reds. If a negative value, force
* a yield.
*/
reds = (limit - cnt) / ERTS_SIG_REDS_CNT_FACTOR;
if (reds <= 0)
reds = 1;
res = (*rpc->func)(c_p, rpc->arg, &reds, &bp);
if (reds < 0) {
/* Force yield... */
*yieldp = !0;
reds *= -1;
}
out_cnt = reds*ERTS_SIG_REDS_CNT_FACTOR;
hsz = 3 + sizeof(ErtsORefThing)/sizeof(Eterm);
rp = erts_proc_lookup(rpc->requester);
if (!rp) {
if (bp)
free_message_buffer(bp);
}
else {
Eterm *hp, msg, ref;
ErtsMessage *mp = erts_alloc_message(hsz, &hp);
sys_memcpy((void *) hp, (void *) &rpc->oref_thing,
sizeof(rpc->oref_thing));
ref = make_internal_ref(hp);
hp += sizeof(rpc->oref_thing)/sizeof(Eterm);
msg = TUPLE2(hp, ref, res);
mp->hfrag.next = bp;
ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_proc_message(c_p, rp, 0, mp, msg);
}
erts_free(ERTS_ALC_T_SIG_DATA, rpc);
return out_cnt;
}
static void
is_alive_response(Process *c_p, ErtsMessage *mp, int is_alive)
{
/*
* Sender prepared the message for us. Just patch
* the result if necessary. The default prepared
* result is 'false'.
*/
Process *rp;
ErtsIsAliveRequest *alive_req;
alive_req = (ErtsIsAliveRequest *) (char *) (&mp->hfrag.mem[0]
+ mp->hfrag.used_size);
ASSERT(ERTS_SIG_IS_NON_MSG(mp));
ASSERT(ERTS_PROC_SIG_OP(((ErtsSignal *) mp)->common.tag)
== ERTS_SIG_Q_OP_IS_ALIVE);
ASSERT(mp->hfrag.alloc_size > mp->hfrag.used_size);
ASSERT((mp->hfrag.alloc_size - mp->hfrag.used_size)*sizeof(UWord)
>= sizeof(ErtsIsAliveRequest));
ASSERT(is_internal_pid(alive_req->requester));
ASSERT(alive_req->requester != c_p->common.id);
ASSERT(is_tuple_arity(alive_req->message, 2));
ASSERT(is_internal_ordinary_ref(tuple_val(alive_req->message)[1]));
ASSERT(tuple_val(alive_req->message)[2] == am_false);
ERL_MESSAGE_TERM(mp) = alive_req->message;
mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
mp->next = NULL;
rp = erts_proc_lookup(alive_req->requester);
if (!rp)
erts_cleanup_messages(mp);
else {
if (is_alive) { /* patch result... */
Eterm *tp = tuple_val(alive_req->message);
tp[2] = am_true;
}
erts_queue_message(rp, 0, mp, alive_req->message, am_system);
}
}
static ERTS_INLINE void
adjust_tracing_state(Process *c_p, ErtsSigRecvTracing *tracing, int setup)
{
if (!IS_TRACED(c_p) || (ERTS_TRACE_FLAGS(c_p) & F_SENSITIVE)) {
tracing->messages.active = 0;
tracing->messages.receive_trace = 0;
tracing->messages.event = NULL;
tracing->messages.next = NULL;
tracing->procs = 0;
tracing->active = 0;
}
else {
Uint flgs = ERTS_TRACE_FLAGS(c_p);
int procs_trace = !!(flgs & F_TRACE_PROCS);
int recv_trace = !!(flgs & F_TRACE_RECEIVE);
/* procs tracing enabled? */
tracing->procs = procs_trace;
/* message receive tracing enabled? */
tracing->messages.receive_trace = recv_trace;
if (!recv_trace)
tracing->messages.event = NULL;
else {
if (tracing->messages.bp_ix < 0)
tracing->messages.bp_ix = erts_active_bp_ix();
tracing->messages.event = &erts_receive_tracing[tracing->messages.bp_ix];
}
if (setup) {
if (recv_trace)
tracing->messages.next = &c_p->sig_qs.cont;
else
tracing->messages.next = NULL;
}
tracing->messages.active = recv_trace;
tracing->active = recv_trace | procs_trace;
}
#if defined(USE_VM_PROBES)
/* vm probe message_queued enabled? */
tracing->messages.vm_probes = DTRACE_ENABLED(message_queued);
if (tracing->messages.vm_probes) {
dtrace_proc_str(c_p, tracing->messages.receiver_name);
tracing->messages.active = !0;
tracing->active = !0;
if (setup && !tracing->messages.next)
tracing->messages.next = &c_p->sig_qs.cont;
}
#endif
}
static ERTS_INLINE void
setup_tracing_state(Process *c_p, ErtsSigRecvTracing *tracing)
{
tracing->messages.bp_ix = -1;
adjust_tracing_state(c_p, tracing, !0);
}
static ERTS_INLINE void
remove_iq_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig)
{
/*
* Remove signal from inner queue.
*/
ASSERT(c_p->sig_qs.cont_last != &sig->next);
ASSERT(c_p->sig_qs.nmsigs.next != &sig->next);
ASSERT(c_p->sig_qs.nmsigs.last != &sig->next);
if (c_p->sig_qs.save == &sig->next)
c_p->sig_qs.save = next_sig;
if (c_p->sig_qs.last == &sig->next)
c_p->sig_qs.last = next_sig;
if (c_p->sig_qs.saved_last == &sig->next)
c_p->sig_qs.saved_last = next_sig;
*next_sig = sig->next;
}
static ERTS_INLINE void
remove_mq_sig(Process *c_p, ErtsMessage *sig,
ErtsMessage **next_sig, ErtsMessage ***next_nm_sig)
{
/*
* Remove signal from middle queue.
*/
ASSERT(c_p->sig_qs.save != &sig->next);
ASSERT(c_p->sig_qs.last != &sig->next);
if (c_p->sig_qs.cont_last == &sig->next)
c_p->sig_qs.cont_last = next_sig;
if (c_p->sig_qs.saved_last == &sig->next)
c_p->sig_qs.saved_last = next_sig;
if (*next_nm_sig == &sig->next)
*next_nm_sig = next_sig;
if (c_p->sig_qs.nmsigs.last == &sig->next)
c_p->sig_qs.nmsigs.last = next_sig;
*next_sig = sig->next;
}
static ERTS_INLINE void
remove_nm_sig(Process *c_p, ErtsMessage *sig, ErtsMessage ***next_nm_sig)
{
ErtsMessage **next_sig = *next_nm_sig;
ASSERT(ERTS_SIG_IS_NON_MSG(sig));
ASSERT(*next_sig == sig);
*next_nm_sig = ((ErtsSignal *) sig)->common.specific.next;
remove_mq_sig(c_p, sig, next_sig, next_nm_sig);
}
static ERTS_INLINE void
convert_to_msg(Process *c_p, ErtsMessage *sig, ErtsMessage *msg,
ErtsMessage ***next_nm_sig)
{
ErtsMessage **next_sig = *next_nm_sig;
ASSERT(ERTS_SIG_IS_NON_MSG(sig));
*next_nm_sig = ((ErtsSignal *) sig)->common.specific.next;
c_p->sig_qs.len++;
*next_sig = msg;
remove_mq_sig(c_p, sig, &msg->next, next_nm_sig);
}
static ERTS_INLINE void
convert_to_msgs(Process *c_p, ErtsMessage *sig, Uint no_msgs,
ErtsMessage *first_msg, ErtsMessage *last_msg,
ErtsMessage ***next_nm_sig)
{
ErtsMessage **next_sig = *next_nm_sig;
ASSERT(ERTS_SIG_IS_NON_MSG(sig));
*next_nm_sig = ((ErtsSignal *) sig)->common.specific.next;
c_p->sig_qs.len += no_msgs;
*next_sig = first_msg;
remove_mq_sig(c_p, sig, &last_msg->next, next_nm_sig);
}
static ERTS_INLINE void
insert_messages(Process *c_p, ErtsMessage **next, ErtsMessage *first,
ErtsMessage *last, Uint no_msgs, ErtsMessage ***next_nm_sig)
{
last->next = *next;
if (c_p->sig_qs.cont_last == next)
c_p->sig_qs.cont_last = &last->next;
if (*next_nm_sig == next)
*next_nm_sig = &last->next;
if (c_p->sig_qs.nmsigs.last == next)
c_p->sig_qs.nmsigs.last = &last->next;
c_p->sig_qs.len += no_msgs;
*next = first;
}
static ERTS_INLINE void
remove_mq_m_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig, ErtsMessage ***next_nm_sig)
{
/* Removing message... */
ASSERT(!ERTS_SIG_IS_NON_MSG(sig));
c_p->sig_qs.len--;
remove_mq_sig(c_p, sig, next_sig, next_nm_sig);
}
static ERTS_INLINE void
remove_iq_m_sig(Process *c_p, ErtsMessage *sig, ErtsMessage **next_sig)
{
/* Removing message... */
ASSERT(!ERTS_SIG_IS_NON_MSG(sig));
c_p->sig_qs.len--;
remove_iq_sig(c_p, sig, next_sig);
}
static ERTS_INLINE void
convert_prepared_sig_to_msg(Process *c_p, ErtsMessage *sig, Eterm msg,
ErtsMessage ***next_nm_sig)
{
/*
* Everything is already there except for the reference to
* the message and the combined hfrag marker that needs to be
* restored...
*/
*next_nm_sig = ((ErtsSignal *) sig)->common.specific.next;
sig->data.attached = ERTS_MSG_COMBINED_HFRAG;
ERL_MESSAGE_TERM(sig) = msg;
c_p->sig_qs.len++;
}
static ERTS_INLINE int
handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing,
ErtsMessage *sig, ErtsMessage ***next_nm_sig,
int *exited)
{
ErtsMessage *conv_msg = NULL;
ErtsExitSignalData *xsigd = NULL;
ErtsLinkData *ldp = NULL; /* Avoid erroneous warning... */
ErtsLink *dlnk = NULL; /* Avoid erroneous warning... */
Eterm tag = ((ErtsSignal *) sig)->common.tag;
Uint16 type = ERTS_PROC_SIG_TYPE(tag);
int op = ERTS_PROC_SIG_OP(tag);
int destroy = 0;
int ignore = 0;
int save = 0;
int exit = 0;
int cnt = 1;
Eterm reason;
Eterm from;
if (type == ERTS_SIG_Q_TYPE_GEN_EXIT) {
xsigd = get_exit_signal_data(sig);
from = xsigd->from;
if (op != ERTS_SIG_Q_OP_EXIT_LINKED)
ignore = 0;
else {
ErtsLink *llnk = erts_link_tree_lookup(ERTS_P_LINKS(c_p), from);
if (!llnk) {
/* Link no longer active; ignore... */
ignore = !0;
destroy = !0;
}
else {
ignore = 0;
erts_link_tree_delete(&ERTS_P_LINKS(c_p), llnk);
if (llnk->type != ERTS_LNK_TYPE_DIST_PROC)
erts_link_release(llnk);
else {
dlnk = erts_link_to_other(llnk, &ldp);
if (erts_link_dist_delete(dlnk))
erts_link_release_both(ldp);
else
erts_link_release(llnk);
}
}
}
/* This GEN_EXIT was received from another node, decode the exit reason */
if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig))
erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1);
reason = xsigd->reason;
if (is_non_value(reason)) {
/* Bad distribution message; remove it from queue... */
ignore = !0;
destroy = !0;
}
if (!ignore) {
if ((op != ERTS_SIG_Q_OP_EXIT || reason != am_kill)
&& (c_p->flags & F_TRAP_EXIT)) {
convert_prepared_sig_to_msg(c_p, sig,
xsigd->message, next_nm_sig);
conv_msg = sig;
}
else if (reason == am_normal && !xsigd->u.normal_kills) {
/* Ignore it... */
destroy = !0;
ignore = !0;
}
else {
/* Terminate... */
save = !0;
exit = !0;
if (op == ERTS_SIG_Q_OP_EXIT && reason == am_kill)
reason = am_killed;
}
}
}
else { /* Link exit */
ErtsLink *slnk = (ErtsLink *) sig;
ErtsLink *llnk = erts_link_to_other(slnk, &ldp);
ASSERT(type == ERTS_LNK_TYPE_PROC
|| type == ERTS_LNK_TYPE_PORT
|| type == ERTS_LNK_TYPE_DIST_PROC);
from = llnk->other.item;
reason = slnk->other.item; /* reason in other.item ... */
ASSERT(is_pid(from) || is_internal_port(from));
ASSERT(is_immed(reason));
ASSERT(op == ERTS_SIG_Q_OP_EXIT_LINKED);
dlnk = erts_link_tree_key_delete(&ERTS_P_LINKS(c_p), llnk);
if (!dlnk) {
ignore = !0; /* Link no longer active; ignore... */
ldp = NULL;
}
else {
Eterm pid;
ErtsMessage *mp;
ErtsProcLocks locks;
Uint hsz;
Eterm *hp;
ErlOffHeap *ohp;
ignore = 0;
if (dlnk == llnk)
dlnk = NULL;
else
ldp = NULL;
ASSERT(is_immed(reason));
if (!(c_p->flags & F_TRAP_EXIT)) {
if (reason == am_normal)
ignore = !0; /* Ignore it... */
else
exit = !0; /* Terminate... */
}
else {
/*
* Create and EXIT message and replace
* the original signal with the message...
*/
locks = ERTS_PROC_LOCK_MAIN;
hsz = 4 + NC_HEAP_SIZE(from);
mp = erts_alloc_message_heap(c_p, &locks, hsz, &hp, &ohp);
if (locks != ERTS_PROC_LOCK_MAIN)
erts_proc_unlock(c_p, locks & ~ERTS_PROC_LOCK_MAIN);
pid = STORE_NC(&hp, ohp, from);
ERL_MESSAGE_TERM(mp) = TUPLE3(hp, am_EXIT, pid, reason);
ERL_MESSAGE_TOKEN(mp) = am_undefined;
if (is_immed(pid))
ERL_MESSAGE_FROM(mp) = pid;
else {
DistEntry *dep;
ASSERT(is_external_pid(pid));
dep = external_pid_dist_entry(pid);
ERL_MESSAGE_FROM(mp) = dep->sysname;
}
/* Replace original signal with the exit message... */
convert_to_msg(c_p, sig, mp, next_nm_sig);
cnt += 4;
conv_msg = mp;
}
}
destroy = !0;
}
if (ignore|exit) {
remove_nm_sig(c_p, sig, next_nm_sig);
if (exit) {
if (save) {
sig->data.attached = ERTS_MSG_COMBINED_HFRAG;
ERL_MESSAGE_TERM(sig) = xsigd->message;
erts_save_message_in_proc(c_p, sig);
}
/* Exit process... */
erts_set_self_exiting(c_p, reason);
cnt++;
}
}
if (!exit) {
if (conv_msg)
erts_proc_notify_new_message(c_p, ERTS_PROC_LOCK_MAIN);
if (op == ERTS_SIG_Q_OP_EXIT_LINKED && tracing->procs)
getting_unlinked(c_p, from);
}
if (destroy) {
cnt++;
if (type == ERTS_SIG_Q_TYPE_GEN_EXIT) {
sig->next = NULL;
erts_cleanup_messages(sig);
}
else {
if (ldp)
erts_link_release_both(ldp);
else {
if (dlnk)
erts_link_release(dlnk);
erts_link_release((ErtsLink *) sig);
}
}
}
*exited = exit;
return cnt;
}
static ERTS_INLINE int
convert_prepared_down_message(Process *c_p, ErtsMessage *sig,
Eterm msg, ErtsMessage ***next_nm_sig)
{
convert_prepared_sig_to_msg(c_p, sig, msg, next_nm_sig);
erts_proc_notify_new_message(c_p, ERTS_PROC_LOCK_MAIN);
return 1;
}
static int
convert_to_down_message(Process *c_p,
ErtsMessage *sig,
ErtsMonitorData *mdp,
Uint16 mon_type,
ErtsMessage ***next_nm_sig)
{
/*
* Create a 'DOWN' message and replace the signal
* with it...
*/
int cnt = 0;
Eterm node = am_undefined;
ErtsMessage *mp;
ErtsProcLocks locks;
Uint hsz;
Eterm *hp, ref, from, type, reason;
ErlOffHeap *ohp;
ASSERT(mdp);
ASSERT((mdp->origin.flags & ERTS_ML_FLGS_SAME)
== (mdp->target.flags & ERTS_ML_FLGS_SAME));
hsz = 6; /* 5-tuple */
if (mdp->origin.flags & ERTS_ML_FLG_NAME)
hsz += 3; /* reg name 2-tuple */
else {
ASSERT(is_pid(mdp->origin.other.item)
|| is_internal_port(mdp->origin.other.item));
hsz += NC_HEAP_SIZE(mdp->origin.other.item);
}
ASSERT(is_ref(mdp->ref));
hsz += NC_HEAP_SIZE(mdp->ref);
locks = ERTS_PROC_LOCK_MAIN;
/* reason is mdp->target.other.item */
reason = mdp->target.other.item;
ASSERT(is_immed(reason));
mp = erts_alloc_message_heap(c_p, &locks, hsz, &hp, &ohp);
if (locks != ERTS_PROC_LOCK_MAIN)
erts_proc_unlock(c_p, locks & ~ERTS_PROC_LOCK_MAIN);
cnt += 4;
ref = STORE_NC(&hp, ohp, mdp->ref);
if (!(mdp->origin.flags & ERTS_ML_FLG_NAME)) {
from = STORE_NC(&hp, ohp, mdp->origin.other.item);
}
else {
ErtsMonitorDataExtended *mdep;
ASSERT(mdp->origin.flags & ERTS_ML_FLG_EXTENDED);
mdep = (ErtsMonitorDataExtended *) mdp;
ASSERT(is_atom(mdep->u.name));
if (mdep->dist)
node = mdep->dist->nodename;
else
node = erts_this_dist_entry->sysname;
from = TUPLE2(hp, mdep->u.name, node);
hp += 3;
}
ASSERT(mdp->origin.type == mon_type);
switch (mon_type) {
case ERTS_MON_TYPE_PORT:
type = am_port;
if (mdp->origin.other.item == am_undefined) {
/* failed by name... */
ERL_MESSAGE_FROM(mp) = am_system;
}
else {
ASSERT(is_internal_port(mdp->origin.other.item));
ERL_MESSAGE_FROM(mp) = mdp->origin.other.item;
}
break;
case ERTS_MON_TYPE_PROC:
type = am_process;
if (mdp->origin.other.item == am_undefined) {
/* failed by name... */
ERL_MESSAGE_FROM(mp) = am_system;
}
else {
ASSERT(is_internal_pid(mdp->origin.other.item));
ERL_MESSAGE_FROM(mp) = mdp->origin.other.item;
}
break;
case ERTS_MON_TYPE_DIST_PROC:
type = am_process;
if (node == am_undefined) {
ErtsMonitorDataExtended *mdep;
ASSERT(mdp->origin.flags & ERTS_ML_FLG_EXTENDED);
mdep = (ErtsMonitorDataExtended *) mdp;
ASSERT(mdep->dist);
node = mdep->dist->nodename;
}
ASSERT(is_atom(node) && node != am_undefined);
ERL_MESSAGE_FROM(mp) = node;
break;
default:
ERTS_INTERNAL_ERROR("Unexpected monitor type");
type = am_undefined;
ERL_MESSAGE_FROM(mp) = am_undefined;
break;
}
ERL_MESSAGE_TERM(mp) = TUPLE5(hp, am_DOWN, ref,
type, from, reason);
hp += 6;
ERL_MESSAGE_TOKEN(mp) = am_undefined;
/* Replace original signal with the exit message... */
convert_to_msg(c_p, sig, mp, next_nm_sig);
cnt += 4;
erts_proc_notify_new_message(c_p, ERTS_PROC_LOCK_MAIN);
return cnt;
}
static ERTS_INLINE int
convert_to_nodedown_messages(Process *c_p,
ErtsMessage *sig,
ErtsMonitorData *mdp,
ErtsMessage ***next_nm_sig)
{
int cnt = 1;
Uint n;
ErtsMonitorDataExtended *mdep = (ErtsMonitorDataExtended *) mdp;
ASSERT((mdp->origin.flags & ERTS_ML_FLGS_SAME)
== (mdp->target.flags & ERTS_ML_FLGS_SAME));
ASSERT(mdp->origin.flags & ERTS_ML_FLG_EXTENDED);
n = mdep->u.refc;
if (n == 0)
remove_nm_sig(c_p, sig, next_nm_sig);
else {
Uint i;
ErtsMessage *nd_first = NULL;
ErtsMessage *nd_last = NULL;
ErtsProcLocks locks = ERTS_PROC_LOCK_MAIN;
Eterm node = mdep->dist->nodename;
ASSERT(is_atom(node));
ASSERT(n > 0);
for (i = 0; i < n; i++) {
ErtsMessage *mp;
ErlOffHeap *ohp;
Eterm *hp;
mp = erts_alloc_message_heap(c_p, &locks, 3, &hp, &ohp);
ERL_MESSAGE_TERM(mp) = TUPLE2(hp, am_nodedown, node);
ERL_MESSAGE_FROM(mp) = am_system;
ERL_MESSAGE_TOKEN(mp) = am_undefined;
mp->next = nd_first;
nd_first = mp;
if (!nd_last)
nd_last = mp;
cnt++;
}
if (locks != ERTS_PROC_LOCK_MAIN)
erts_proc_unlock(c_p, locks & ~ERTS_PROC_LOCK_MAIN);
/* Replace signal with 'nodedown' messages */
convert_to_msgs(c_p, sig, n, nd_first, nd_last, next_nm_sig);
erts_proc_notify_new_message(c_p, ERTS_PROC_LOCK_MAIN);
}
return cnt;
}
static int
handle_nodedown(Process *c_p,
ErtsMessage *sig,
ErtsMonitorData *mdp,
ErtsMessage ***next_nm_sig)
{
ErtsMonitorDataExtended *mdep = (ErtsMonitorDataExtended *) mdp;
ErtsMonitor *omon = &mdp->origin;
int not_in_subtab = !(omon->flags & ERTS_ML_FLG_IN_SUBTABLE);
int cnt = 1;
ASSERT(erts_monitor_is_in_table(omon));
if (not_in_subtab & !mdep->uptr.node_monitors)
erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p), omon);
else if (not_in_subtab) {
ErtsMonitor *sub_mon;
ErtsMonitorDataExtended *sub_mdep;
sub_mon = erts_monitor_list_last(mdep->uptr.node_monitors);
ASSERT(sub_mon);
erts_monitor_list_delete(&mdep->uptr.node_monitors, sub_mon);
sub_mon->flags &= ~ERTS_ML_FLG_IN_SUBTABLE;
sub_mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(sub_mon);
ASSERT(!sub_mdep->uptr.node_monitors);
sub_mdep->uptr.node_monitors = mdep->uptr.node_monitors;
mdep->uptr.node_monitors = NULL;
erts_monitor_tree_replace(&ERTS_P_MONITORS(c_p), omon, sub_mon);
cnt += 2;
}
else {
ErtsMonitorDataExtended *top_mdep;
ErtsMonitor *top_mon;
ASSERT(is_atom(omon->other.item));
ASSERT(!mdep->uptr.node_monitors);
top_mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p),
omon->other.item);
ASSERT(top_mon);
top_mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(top_mon);
ASSERT(top_mdep->uptr.node_monitors);
erts_monitor_list_delete(&top_mdep->uptr.node_monitors, omon);
omon->flags &= ~ERTS_ML_FLG_IN_SUBTABLE;
cnt += 3;
}
return cnt + convert_to_nodedown_messages(c_p, sig, mdp, next_nm_sig);
}
static void
handle_persistent_mon_msg(Process *c_p, Uint16 type,
ErtsMonitor *mon, ErtsMessage *sig,
Eterm msg, ErtsMessage ***next_nm_sig)
{
convert_prepared_sig_to_msg(c_p, sig, msg, next_nm_sig);
switch (type) {
case ERTS_MON_TYPE_TIME_OFFSET:
ASSERT(mon->type == ERTS_MON_TYPE_TIME_OFFSET);
break;
case ERTS_MON_TYPE_NODES: {
ErtsMonitorDataExtended *mdep;
Uint n;
ASSERT(mon->type == ERTS_MON_TYPE_NODES);
mdep = (ErtsMonitorDataExtended *) erts_monitor_to_data(mon);
ERTS_ML_ASSERT(mdep->u.refc > 0);
n = mdep->u.refc;
n--;
if (n > 0) {
ErtsProcLocks locks = ERTS_PROC_LOCK_MAIN;
ErtsMessage *first = NULL, *prev, *last;
Uint hsz = size_object(msg);
Uint i;
for (i = 0; i < n; i++) {
Eterm *hp;
ErlOffHeap *ohp;
last = erts_alloc_message_heap(c_p, &locks, hsz, &hp, &ohp);
if (!first)
first = last;
else
prev->next = last;
prev = last;
ERL_MESSAGE_TERM(last) = copy_struct(msg, hsz, &hp, ohp);
#ifdef USE_VM_PROBES
ASSERT(is_immed(ERL_MESSAGE_DT_UTAG(sig)));
ERL_MESSAGE_DT_UTAG(last) = ERL_MESSAGE_DT_UTAG(sig);
#endif
ASSERT(is_immed(ERL_MESSAGE_TOKEN(sig)));
ERL_MESSAGE_TOKEN(last) = ERL_MESSAGE_TOKEN(sig);
ASSERT(is_immed(ERL_MESSAGE_FROM(sig)));
ERL_MESSAGE_FROM(last) = ERL_MESSAGE_FROM(sig);
}
if (locks != ERTS_PROC_LOCK_MAIN)
erts_proc_unlock(c_p, locks & ~ERTS_PROC_LOCK_MAIN);
insert_messages(c_p, &sig->next, first, last, n, next_nm_sig);
}
break;
}
default:
ERTS_INTERNAL_ERROR("Invalid type");
break;
}
erts_proc_notify_new_message(c_p, ERTS_PROC_LOCK_MAIN);
}
static void
group_leader_reply(Process *c_p, Eterm to, Eterm ref, int success)
{
Process *rp = erts_proc_lookup(to);
if (rp) {
ErtsProcLocks locks;
Uint sz;
Eterm *hp, msg, ref_cpy, result;
ErlOffHeap *ohp;
ErtsMessage *mp;
ASSERT(is_internal_ref(ref));
locks = c_p == rp ? ERTS_PROC_LOCK_MAIN : 0;
sz = size_object(ref);
mp = erts_alloc_message_heap(rp, &locks, sz+3,
&hp, &ohp);
ref_cpy = copy_struct(ref, sz, &hp, ohp);
result = success ? am_true : am_badarg;
msg = TUPLE2(hp, ref_cpy, result);
erts_queue_message(rp, locks, mp, msg, am_system);
if (c_p == rp)
locks &= ~ERTS_PROC_LOCK_MAIN;
if (locks)
erts_proc_unlock(rp, locks);
}
}
static void
handle_group_leader(Process *c_p, ErtsSigGroupLeader *sgl)
{
erts_aint_t flags;
flags = erts_atomic_read_band_nob(&sgl->flags, ~ERTS_SIG_GL_FLG_ACTIVE);
if (flags & ERTS_SIG_GL_FLG_ACTIVE) {
int res = erts_set_group_leader(c_p, sgl->group_leader);
if (is_internal_pid(sgl->reply_to))
group_leader_reply(c_p, sgl->reply_to, sgl->ref, res);
}
flags = erts_atomic_read_band_nob(&sgl->flags, ~ERTS_SIG_GL_FLG_RECEIVER);
if ((flags & ~ERTS_SIG_GL_FLG_RECEIVER) == 0)
destroy_sig_group_leader(sgl);
}
static void
check_push_msgq_len_offs_marker(Process *rp, ErtsSignal *sig)
{
ErtsProcessInfoSig *pisig = (ErtsProcessInfoSig *) sig;
ASSERT(ERTS_PROC_SIG_OP(sig->common.tag) == ERTS_SIG_Q_OP_PROCESS_INFO);
if (pisig->msgq_len_offset == ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC) {
ErtsProcSigMsgQLenOffsetMarker *mrkr;
Sint len, msgq_len_offset;
ErtsMessage *first = rp->sig_inq.first;
ASSERT(first);
if (((ErtsSignal *) first)->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK)
mrkr = (ErtsProcSigMsgQLenOffsetMarker *) first;
else {
mrkr = &pisig->marker;
ASSERT(mrkr->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK);
mrkr->common.next = first;
ASSERT(rp->sig_inq.last != &rp->sig_inq.first);
if (rp->sig_inq.nmsigs.next == &rp->sig_inq.first)
rp->sig_inq.nmsigs.next = &mrkr->common.next;
if (rp->sig_inq.nmsigs.last == &rp->sig_inq.first)
rp->sig_inq.nmsigs.last = &mrkr->common.next;
rp->sig_inq.first = (ErtsMessage *) mrkr;
}
len = rp->sig_inq.len;
msgq_len_offset = len - mrkr->len_offset;
mrkr->len_offset = len;
mrkr->refc++;
pisig->msgq_len_offset = msgq_len_offset;
#ifdef DEBUG
/* save pointer to used marker... */
pisig->marker.common.specific.attachment = (void *) mrkr;
#endif
}
}
static void
destroy_process_info_request(Process *c_p, ErtsProcessInfoSig *pisig)
{
int dealloc_pisig = !0;
if (pisig->msgq_len_offset != ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE) {
Sint refc;
int dealloc_marker = 0;
ErtsProcSigMsgQLenOffsetMarker *marker;
#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN
Sint delayed_len;
#endif
ASSERT(pisig->msgq_len_offset >= 0);
erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
marker = (ErtsProcSigMsgQLenOffsetMarker *) c_p->sig_inq.first;
ASSERT(marker);
ASSERT(marker->refc > 0);
ASSERT(pisig->marker.common.specific.attachment == (void *) marker);
marker->delayed_len -= pisig->msgq_len_offset;
#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN
delayed_len = marker->delayed_len;
#endif
refc = --marker->refc;
if (refc) {
if (marker == &pisig->marker) {
/* Another signal using our marker... */
dealloc_pisig = 0;
}
}
else {
/* Marker unused; remove it... */
ASSERT(marker->delayed_len + marker->len_offset == 0);
#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN
delayed_len += marker->len_offset;
#endif
if (marker != &pisig->marker)
dealloc_marker = !0; /* used another signals marker... */
c_p->sig_inq.first = marker->common.next;
if (c_p->sig_inq.last == &marker->common.next)
c_p->sig_inq.last = &c_p->sig_inq.first;
if (c_p->sig_inq.nmsigs.next == &marker->common.next)
c_p->sig_inq.nmsigs.next = &c_p->sig_inq.first;
if (c_p->sig_inq.nmsigs.last == &marker->common.next)
c_p->sig_inq.nmsigs.last = &c_p->sig_inq.first;
}
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
if (!refc) {
c_p->flags &= ~F_DELAYED_PSIGQS_LEN;
/* Adjust msg len of inner+middle queue */
ASSERT(marker->len_offset <= 0);
c_p->sig_qs.len -= marker->len_offset;
ASSERT(c_p->sig_qs.len >= 0);
}
#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN
{
Sint len = 0;
ERTS_FOREACH_SIG_PRIVQS(
c_p, mp,
{
if (ERTS_SIG_IS_MSG(mp))
len++;
});
ERTS_ASSERT(c_p->sig_qs.len + delayed_len == len);
}
#endif
if (dealloc_marker) {
ErtsProcessInfoSig *pisig2
= (ErtsProcessInfoSig *) (((char *) marker)
- offsetof(ErtsProcessInfoSig,
marker));
erts_free(ERTS_ALC_T_SIG_DATA, pisig2);
}
}
if (dealloc_pisig)
erts_free(ERTS_ALC_T_SIG_DATA, pisig);
}
static int
handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing,
ErtsMessage *sig, ErtsMessage ***next_nm_sig,
int is_alive)
{
ErtsProcessInfoSig *pisig = (ErtsProcessInfoSig *) sig;
Uint reds = 0;
Process *rp;
ASSERT(!!is_alive == !(erts_atomic32_read_nob(&c_p->state)
& ERTS_PSFLG_EXITING));
if (pisig->msgq_len_offset != ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE) {
/*
* Request requires message queue data to be updated
* before inspection...
*/
ASSERT(pisig->msgq_len_offset >= 0);
/*
* Update sig_qs.len to reflect the length
* of the message queue...
*/
c_p->sig_qs.len += pisig->msgq_len_offset;
if (is_alive) {
/*
* Move messages part of message queue into inner
* signal queue...
*/
ASSERT(tracing);
if (*next_nm_sig != &c_p->sig_qs.cont) {
if (*next_nm_sig == tracing->messages.next)
tracing->messages.next = &c_p->sig_qs.cont;
*c_p->sig_qs.last = c_p->sig_qs.cont;
c_p->sig_qs.last = *next_nm_sig;
c_p->sig_qs.cont = **next_nm_sig;
if (c_p->sig_qs.nmsigs.last == *next_nm_sig)
c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont;
*next_nm_sig = &c_p->sig_qs.cont;
*c_p->sig_qs.last = NULL;
}
#ifdef ERTS_PROC_SIG_HARD_DEBUG_SIGQ_MSG_LEN
{
Sint len;
ErtsMessage *mp;
for (mp = c_p->sig_qs.first, len = 0; mp; mp = mp->next) {
ERTS_ASSERT(ERTS_SIG_IS_MSG(mp));
len++;
}
ERTS_ASSERT(c_p->sig_qs.len == len);
}
#endif
}
}
if (is_alive) {
if (!pisig->common.specific.next) {
/*
* No more signals in middle queue...
*
* Process-info 'status' needs sig-q
* process flag to be updated in order
* to show accurate result...
*/
erts_atomic32_read_band_nob(&c_p->state,
~ERTS_PSFLG_SIG_Q);
}
remove_nm_sig(c_p, sig, next_nm_sig);
}
rp = erts_proc_lookup(pisig->requester);
ASSERT(c_p != rp);
if (rp) {
Eterm msg, res, ref, *hp;
ErtsProcLocks locks = 0;
ErtsHeapFactory hfact;
ErtsMessage *mp;
Uint reserve_size = 3 + sizeof(pisig->oref_thing)/sizeof(Eterm);
if (!is_alive) {
ErlOffHeap *ohp;
mp = erts_alloc_message_heap(rp, &locks, reserve_size, &hp, &ohp);
res = am_undefined;
}
else {
ErlHeapFragment *hfrag;
reserve_size += pisig->reserve_size;
mp = erts_alloc_message(0, NULL);
hfrag = new_message_buffer(reserve_size);
mp->data.heap_frag = hfrag;
erts_factory_selfcontained_message_init(&hfact, mp, &hfrag->mem[0]);
res = erts_process_info(c_p, &hfact, c_p, ERTS_PROC_LOCK_MAIN,
pisig->item_ix, pisig->len,
pisig->flags, reserve_size, &reds);
hp = erts_produce_heap(&hfact,
3 + sizeof(pisig->oref_thing)/sizeof(Eterm),
0);
}
sys_memcpy((void *) hp, (void *) &pisig->oref_thing,
sizeof(pisig->oref_thing));
ref = make_internal_ref(hp);
hp += sizeof(pisig->oref_thing)/sizeof(Eterm);
msg = TUPLE2(hp, ref, res);
if (is_alive)
erts_factory_trim_and_close(&hfact, &msg, 1);
ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_proc_message(c_p, rp, locks, mp, msg);
if (!is_alive && locks)
erts_proc_unlock(rp, locks);
}
destroy_process_info_request(c_p, pisig);
if (reds > INT_MAX/8)
reds = INT_MAX/8;
return ((int) reds)*4 + 8;
}
static void
handle_suspend(Process *c_p, ErtsMonitor *mon, int *yieldp)
{
erts_aint32_t state = erts_atomic32_read_nob(&c_p->state);
ASSERT(mon->type == ERTS_MON_TYPE_SUSPEND);
if (!(state & ERTS_PSFLG_DIRTY_RUNNING)) {
ErtsMonitorSuspend *msp;
erts_aint_t mstate;
msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon);
mstate = erts_atomic_read_bor_acqb(&msp->state,
ERTS_MSUSPEND_STATE_FLG_ACTIVE);
ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate;
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
*yieldp = !0;
}
else {
/* Executing dirty; delay suspend... */
ErtsProcSigPendingSuspend *psusp;
ErtsMonitorSuspend *msp;
psusp = ERTS_PROC_GET_PENDING_SUSPEND(c_p);
if (!psusp) {
psusp = erts_alloc(ERTS_ALC_T_SIG_DATA,
sizeof(ErtsProcSigPendingSuspend));
psusp->mon = NULL;
psusp->sync = NULL;
ERTS_PROC_SET_PENDING_SUSPEND(c_p, (void *) psusp);
}
msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon);
msp->next = psusp->mon;
psusp->mon = msp;
erts_atomic32_inc_nob(&msp->md.refc);
}
}
static void
sync_suspend_reply(Process *c_p, ErtsMessage *mp, erts_aint32_t state)
{
/*
* Sender prepared the message for us. Just patch
* the result if necessary. The default prepared
* result is 'false'.
*/
Process *rp;
ErtsSyncSuspendRequest *ssusp;
ssusp = (ErtsSyncSuspendRequest *) (char *) (&mp->hfrag.mem[0]
+ mp->hfrag.used_size);
ASSERT(ERTS_SIG_IS_NON_MSG(mp));
ASSERT(ERTS_PROC_SIG_OP(((ErtsSignal *) mp)->common.tag)
== ERTS_SIG_Q_OP_SYNC_SUSPEND);
ASSERT(mp->hfrag.alloc_size > mp->hfrag.used_size);
ASSERT((mp->hfrag.alloc_size - mp->hfrag.used_size)*sizeof(UWord)
>= sizeof(ErtsSyncSuspendRequest));
ASSERT(is_internal_pid(ssusp->requester));
ASSERT(ssusp->requester != c_p->common.id);
ASSERT(is_tuple_arity(ssusp->message, 2));
ASSERT(is_immed(tuple_val(ssusp->message)[2]));
ERL_MESSAGE_TERM(mp) = ssusp->message;
mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
mp->next = NULL;
rp = erts_proc_lookup(ssusp->requester);
if (!rp)
erts_cleanup_messages(mp);
else {
if ((state & (ERTS_PSFLG_EXITING
| ERTS_PSFLG_SUSPENDED)) != ERTS_PSFLG_SUSPENDED) {
/* Not suspended -> patch result... */
if (state & ERTS_PSFLG_EXITING) {
Eterm *tp = tuple_val(ssusp->message);
tp[2] = ssusp->async ? am_exited : am_badarg;
}
else {
Eterm *tp = tuple_val(ssusp->message);
ASSERT(!(state & ERTS_PSFLG_SUSPENDED));
tp[2] = ssusp->async ? am_not_suspended : am_internal_error;
}
}
ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_proc_message(c_p, rp, 0, mp, ssusp->message);
}
}
static void
handle_sync_suspend(Process *c_p, ErtsMessage *mp)
{
ErtsProcSigPendingSuspend *psusp;
psusp = (ErtsProcSigPendingSuspend *) ERTS_PROC_GET_PENDING_SUSPEND(c_p);
if (!psusp)
sync_suspend_reply(c_p, mp, erts_atomic32_read_nob(&c_p->state));
else {
mp->next = psusp->sync;
psusp->sync = mp;
}
}
int
erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks,
ErtsMessage *msgp, int force_off_heap)
{
ErtsHeapFactory factory;
ErlHeapFragment *hfrag;
Eterm msg;
Sint need;
ErtsDistExternal *edep;
ErtsExitSignalData *xsigd = NULL;
edep = erts_proc_sig_get_external(msgp);
if (!ERTS_SIG_IS_EXTERNAL_MSG(msgp))
xsigd = get_exit_signal_data(msgp);
if (edep->heap_size >= 0)
need = edep->heap_size;
else {
need = erts_decode_dist_ext_size(edep, 1);
if (need < 0) {
/* bad signal; remove it... */
return 0;
}
edep->heap_size = need;
}
if (ERTS_SIG_IS_NON_MSG(msgp)) {
switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) {
case ERTS_SIG_Q_OP_EXIT:
case ERTS_SIG_Q_OP_EXIT_LINKED:
/* {'EXIT', From, Reason} */
need += 4;
break;
case ERTS_SIG_Q_OP_MONITOR_DOWN:
/* {'DOWN', Ref, process, From, Reason} */
need += 6; /* 5-tuple */
break;
default:
ERTS_INTERNAL_ERROR("Invalid exit signal op");
break;
}
}
hfrag = new_message_buffer(need);
erts_factory_heap_frag_init(&factory, hfrag);
ASSERT(edep->heap_size >= 0);
msg = erts_decode_dist_ext(&factory, edep, 1);
if (is_non_value(msg)) {
erts_factory_undo(&factory);
return 0;
}
if (ERTS_SIG_IS_MSG(msgp)) {
ERL_MESSAGE_TERM(msgp) = msg;
if (msgp->data.heap_frag == &msgp->hfrag)
msgp->data.heap_frag = ERTS_MSG_COMBINED_HFRAG;
} else {
switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) {
case ERTS_SIG_Q_OP_EXIT:
case ERTS_SIG_Q_OP_EXIT_LINKED:
/* {'EXIT', From, Reason} */
erts_reserve_heap(&factory, 4);
xsigd->message = TUPLE3(factory.hp, am_EXIT, xsigd->from, msg);
factory.hp += 4;
break;
case ERTS_SIG_Q_OP_MONITOR_DOWN:
/* {'DOWN', Ref, process, From, Reason} */
erts_reserve_heap(&factory, 6);
xsigd->message = TUPLE5(factory.hp, am_DOWN, xsigd->u.ref, am_process, xsigd->from, msg);
factory.hp += 6;
break;
}
xsigd->reason = msg;
}
erts_free_dist_ext_copy(edep);
erts_factory_close(&factory);
hfrag = factory.heap_frags;
while (hfrag->next)
hfrag = hfrag->next;
if (ERTS_SIG_IS_MSG(msgp) && msgp->data.heap_frag != ERTS_MSG_COMBINED_HFRAG) {
hfrag->next = msgp->data.heap_frag;
msgp->data.heap_frag = factory.heap_frags;
} else {
hfrag->next = msgp->hfrag.next;
msgp->hfrag.next = factory.heap_frags;
}
return 1;
}
void
erts_proc_sig_handle_pending_suspend(Process *c_p)
{
ErtsMonitorSuspend *msp;
ErtsMessage *sync;
ErtsProcSigPendingSuspend *psusp;
erts_aint32_t state = erts_atomic32_read_nob(&c_p->state);
psusp = (ErtsProcSigPendingSuspend *) ERTS_PROC_GET_PENDING_SUSPEND(c_p);
msp = psusp->mon;
while (msp) {
ErtsMonitorSuspend *next_msp = msp->next;
msp->next = NULL;
if (!(state & ERTS_PSFLG_EXITING)
&& erts_monitor_is_in_table(&msp->md.target)) {
erts_aint_t mstate;
mstate = erts_atomic_read_bor_acqb(&msp->state,
ERTS_MSUSPEND_STATE_FLG_ACTIVE);
ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate;
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
}
erts_monitor_release(&msp->md.target);
msp = next_msp;
}
sync = psusp->sync;
while (sync) {
ErtsMessage *next_sync = sync->next;
sync->next = NULL;
sync_suspend_reply(c_p, sync, state);
sync = next_sync;
}
erts_free(ERTS_ALC_T_SIG_DATA, psusp);
ERTS_PROC_SET_PENDING_SUSPEND(c_p, NULL);
}
/*
* Called in order to handle incoming signals.
*/
int
erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
int *redsp, int max_reds, int local_only)
{
Eterm tag;
erts_aint32_t state;
int yield, cnt, limit, abs_lim, msg_tracing;
ErtsMessage *sig, ***next_nm_sig;
ErtsSigRecvTracing tracing;
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
state = erts_atomic32_read_nob(&c_p->state);
if (!local_only) {
if (ERTS_PSFLG_SIG_IN_Q & state) {
erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
erts_proc_sig_fetch(c_p);
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
}
}
limit = *redsp;
*redsp = 0;
yield = 0;
if (!c_p->sig_qs.cont) {
*statep = state;
return !0;
}
if (state & ERTS_PSFLG_EXITING) {
*statep = state;
return 0;
}
next_nm_sig = &c_p->sig_qs.nmsigs.next;
setup_tracing_state(c_p, &tracing);
msg_tracing = tracing.messages.active;
limit *= ERTS_SIG_REDS_CNT_FACTOR;
abs_lim = ERTS_SIG_REDS_CNT_FACTOR*max_reds;
if (limit > abs_lim)
limit = abs_lim;
cnt = 0;
do {
if (msg_tracing) {
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
if (handle_msg_tracing(c_p, &tracing, next_nm_sig) != 0) {
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break; /* tracing limit or end... */
}
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
}
if (!*next_nm_sig)
break;
sig = **next_nm_sig;
ASSERT(sig);
ASSERT(ERTS_SIG_IS_NON_MSG(sig));
tag = ((ErtsSignal *) sig)->common.tag;
switch (ERTS_PROC_SIG_OP(tag)) {
case ERTS_SIG_Q_OP_EXIT:
case ERTS_SIG_Q_OP_EXIT_LINKED: {
int exited;
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
cnt += handle_exit_signal(c_p, &tracing, sig,
next_nm_sig, &exited);
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
if (exited)
goto stop; /* terminated by signal */
/* ignored or converted to exit message... */
break;
}
case ERTS_SIG_Q_OP_MONITOR_DOWN: {
Uint16 type = ERTS_PROC_SIG_TYPE(tag);
ErtsExitSignalData *xsigd = NULL;
ErtsMonitorData *mdp = NULL;
ErtsMonitor *omon = NULL, *tmon = NULL;
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
switch (type) {
case ERTS_MON_TYPE_DIST_PROC:
case ERTS_MON_TYPE_PROC:
case ERTS_MON_TYPE_PORT:
tmon = (ErtsMonitor *) sig;
ASSERT(erts_monitor_is_target(tmon));
ASSERT(!erts_monitor_is_in_table(tmon));
mdp = erts_monitor_to_data(tmon);
if (erts_monitor_is_in_table(&mdp->origin)) {
omon = &mdp->origin;
erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p),
omon);
cnt += convert_to_down_message(c_p, sig, mdp,
type, next_nm_sig);
}
break;
case ERTS_SIG_Q_TYPE_GEN_EXIT:
xsigd = get_exit_signal_data(sig);
/* This GEN_EXIT was received from another node, decode the exit reason */
if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig))
if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1))
break; /* Decode failed, just remove signal */
omon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p),
xsigd->u.ref);
if (omon) {
ASSERT(erts_monitor_is_origin(omon));
if (omon->type == ERTS_MON_TYPE_DIST_PROC) {
mdp = erts_monitor_to_data(omon);
if (erts_monitor_dist_delete(&mdp->target))
tmon = &mdp->target;
}
erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p),
omon);
cnt += convert_prepared_down_message(c_p, sig,
xsigd->message,
next_nm_sig);
}
break;
case ERTS_MON_TYPE_NODE:
tmon = (ErtsMonitor *) sig;
ASSERT(erts_monitor_is_target(tmon));
ASSERT(!erts_monitor_is_in_table(tmon));
mdp = erts_monitor_to_data(tmon);
if (erts_monitor_is_in_table(&mdp->origin)) {
omon = &mdp->origin;
cnt += handle_nodedown(c_p, sig, mdp, next_nm_sig);
}
break;
case ERTS_MON_TYPE_SUSPEND:
tmon = (ErtsMonitor *) sig;
ASSERT(erts_monitor_is_target(tmon));
ASSERT(!erts_monitor_is_in_table(tmon));
mdp = erts_monitor_to_data(tmon);
if (erts_monitor_is_in_table(&mdp->origin)) {
erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p),
&mdp->origin);
omon = &mdp->origin;
remove_nm_sig(c_p, sig, next_nm_sig);
}
break;
default:
ERTS_INTERNAL_ERROR("invalid monitor type");
break;
}
if (omon) {
if (tmon)
erts_monitor_release_both(mdp);
else
erts_monitor_release(omon);
}
else {
remove_nm_sig(c_p, sig, next_nm_sig);
if (xsigd) {
sig->next = NULL;
erts_cleanup_messages(sig);
}
if (tmon)
erts_monitor_release(tmon);
}
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
}
case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG: {
Uint16 type = ERTS_PROC_SIG_TYPE(tag);
ErtsMonitor *mon;
Eterm msg;
Eterm key;
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
key = get_persist_mon_msg(sig, &msg);
cnt++;
mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p), key);
if (mon) {
ASSERT(erts_monitor_is_origin(mon));
handle_persistent_mon_msg(c_p, type, mon, sig,
msg, next_nm_sig);
}
else {
cnt++;
remove_nm_sig(c_p, sig, next_nm_sig);
sig->next = NULL;
erts_cleanup_messages(sig);
}
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
}
case ERTS_SIG_Q_OP_MONITOR: {
ErtsMonitor *mon = (ErtsMonitor *) sig;
ASSERT(erts_monitor_is_target(mon));
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
remove_nm_sig(c_p, sig, next_nm_sig);
if (mon->type == ERTS_MON_TYPE_DIST_PROC)
erts_monitor_tree_insert(&ERTS_P_MONITORS(c_p), mon);
else {
erts_monitor_list_insert(&ERTS_P_LT_MONITORS(c_p), mon);
if (mon->type == ERTS_MON_TYPE_SUSPEND)
handle_suspend(c_p, mon, &yield);
}
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
cnt += 2;
break;
}
case ERTS_SIG_Q_OP_DEMONITOR: {
Uint16 type = ERTS_PROC_SIG_TYPE(tag);
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
remove_nm_sig(c_p, sig, next_nm_sig);
if (type == ERTS_SIG_Q_TYPE_DIST_PROC_DEMONITOR) {
ErtsMonitor *tmon;
ErtsSigDistProcDemonitor *dmon;
dmon = (ErtsSigDistProcDemonitor *) sig;
tmon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p), dmon->ref);
destroy_dist_proc_demonitor(dmon);
cnt++;
if (tmon) {
ErtsMonitorData *mdp = erts_monitor_to_data(tmon);
ASSERT(erts_monitor_is_target(tmon));
erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p), tmon);
if (!erts_monitor_dist_delete(&mdp->origin))
erts_monitor_release(tmon);
else
erts_monitor_release_both(mdp);
cnt += 2;
}
}
else {
ErtsMonitor *omon = (ErtsMonitor *) sig;
ErtsMonitorData *mdp = erts_monitor_to_data(omon);
ASSERT(omon->type == type);
ASSERT(erts_monitor_is_origin(omon));
ASSERT(!erts_monitor_is_in_table(omon));
if (!erts_monitor_is_in_table(&mdp->target))
erts_monitor_release(omon);
else {
ErtsMonitor *tmon = &mdp->target;
ASSERT(tmon->type == type);
if (type == ERTS_MON_TYPE_DIST_PROC)
erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p), tmon);
else {
erts_monitor_list_delete(&ERTS_P_LT_MONITORS(c_p), tmon);
switch (type) {
case ERTS_MON_TYPE_RESOURCE:
erts_nif_demonitored((ErtsResource *) tmon->other.ptr);
cnt++;
break;
case ERTS_MON_TYPE_SUSPEND:
erts_resume(c_p, ERTS_PROC_LOCK_MAIN);
break;
default:
break;
}
}
erts_monitor_release_both(mdp);
cnt++;
}
cnt++;
}
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
}
case ERTS_SIG_Q_OP_LINK: {
ErtsLink *rlnk, *lnk = (ErtsLink *) sig;
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
remove_nm_sig(c_p, sig, next_nm_sig);
rlnk = erts_link_tree_insert_addr_replace(&ERTS_P_LINKS(c_p),
lnk);
if (!rlnk) {
if (tracing.procs)
getting_linked(c_p, lnk->other.item);
}
else {
if (rlnk->type != ERTS_LNK_TYPE_DIST_PROC)
erts_link_release(rlnk);
else {
ErtsLinkData *ldp;
ErtsLink *dlnk = erts_link_to_other(rlnk, &ldp);
if (erts_link_dist_delete(dlnk))
erts_link_release_both(ldp);
else
erts_link_release(rlnk);
}
}
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
}
case ERTS_SIG_Q_OP_UNLINK: {
Uint16 type = ERTS_PROC_SIG_TYPE(tag);
ErtsLinkData *ldp;
ErtsLink *llnk;
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
remove_nm_sig(c_p, sig, next_nm_sig);
if (type == ERTS_SIG_Q_TYPE_DIST_LINK) {
ErtsSigDistLinkOp *sdlnk = (ErtsSigDistLinkOp *) sig;
ASSERT(type == ERTS_SIG_Q_TYPE_DIST_LINK);
ASSERT(is_external_pid(sdlnk->remote));
llnk = erts_link_tree_lookup(ERTS_P_LINKS(c_p), sdlnk->remote);
if (llnk) {
ErtsLink *dlnk = erts_link_to_other(llnk, &ldp);
erts_link_tree_delete(&ERTS_P_LINKS(c_p), llnk);
if (erts_link_dist_delete(dlnk))
erts_link_release_both(ldp);
else
erts_link_release(llnk);
cnt += 8;
if (tracing.procs)
getting_unlinked(c_p, sdlnk->remote);
}
destroy_sig_dist_link_op(sdlnk);
cnt++;
}
else {
ErtsLinkData *ldp;
ErtsLink *dlnk, *slnk;
slnk = (ErtsLink *) sig;
llnk = erts_link_to_other(slnk, &ldp);
dlnk = erts_link_tree_key_delete(&ERTS_P_LINKS(c_p), llnk);
if (!dlnk)
erts_link_release(slnk);
else {
if (tracing.procs)
getting_unlinked(c_p, llnk->other.item);
if (dlnk == llnk)
erts_link_release_both(ldp);
else {
erts_link_release(slnk);
erts_link_release(dlnk);
}
}
cnt += 2;
}
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
}
case ERTS_SIG_Q_OP_GROUP_LEADER: {
ErtsSigGroupLeader *sgl = (ErtsSigGroupLeader *) sig;
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
remove_nm_sig(c_p, sig, next_nm_sig);
handle_group_leader(c_p, sgl);
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
}
case ERTS_SIG_Q_OP_IS_ALIVE:
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
remove_nm_sig(c_p, sig, next_nm_sig);
is_alive_response(c_p, sig, !0);
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
case ERTS_SIG_Q_OP_PROCESS_INFO:
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
handle_process_info(c_p, &tracing, sig, next_nm_sig, !0);
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
case ERTS_SIG_Q_OP_SYNC_SUSPEND:
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
remove_nm_sig(c_p, sig, next_nm_sig);
handle_sync_suspend(c_p, sig);
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
case ERTS_SIG_Q_OP_RPC:
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
remove_nm_sig(c_p, sig, next_nm_sig);
cnt += handle_rpc(c_p, (ErtsProcSigRPC *) sig, cnt,
limit, &yield);
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: {
Uint16 type = ERTS_PROC_SIG_TYPE(tag);
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
msg_tracing = handle_trace_change_state(c_p, &tracing,
type, sig,
next_nm_sig);
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
}
default:
ERTS_INTERNAL_ERROR("Unknown signal");
break;
}
cnt++;
} while (cnt <= limit || stretch_limit(c_p, &tracing, abs_lim, &limit));
stop: {
int deferred_save, deferred_saved_last, res;
deferred_saved_last = !!(c_p->flags & F_DEFERRED_SAVED_LAST);
deferred_save = 0;
if (!deferred_saved_last)
deferred_save = 0;
else {
if (c_p->sig_qs.saved_last == &c_p->sig_qs.cont) {
c_p->sig_qs.saved_last = c_p->sig_qs.last;
c_p->flags &= ~F_DEFERRED_SAVED_LAST;
deferred_saved_last = deferred_save = 0;
}
else {
if (c_p->sig_qs.save == c_p->sig_qs.last)
deferred_save = !0;
else
deferred_save = 0;
}
}
ASSERT(c_p->sig_qs.saved_last != &c_p->sig_qs.cont);
if (ERTS_UNLIKELY(msg_tracing != 0)) {
/*
* All messages that has been traced should
* be moved to inner queue. Next signal in
* middle queue should either be next message
* to trace or next non-message signal.
*/
ASSERT(tracing.messages.next);
if (*next_nm_sig) {
if (*next_nm_sig == tracing.messages.next)
*next_nm_sig = &c_p->sig_qs.cont;
if (c_p->sig_qs.nmsigs.last == tracing.messages.next)
c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont;
*statep = erts_atomic32_read_nob(&c_p->state);
}
else {
ASSERT(!c_p->sig_qs.nmsigs.next);
c_p->sig_qs.nmsigs.last = NULL;
state = erts_atomic32_read_band_nob(&c_p->state,
~ERTS_PSFLG_SIG_Q);
state &= ~ERTS_PSFLG_SIG_Q;
*statep = state;
}
if (tracing.messages.next != &c_p->sig_qs.cont) {
*c_p->sig_qs.last = c_p->sig_qs.cont;
c_p->sig_qs.last = tracing.messages.next;
c_p->sig_qs.cont = *tracing.messages.next;
if (!c_p->sig_qs.cont)
c_p->sig_qs.cont_last = &c_p->sig_qs.cont;
*c_p->sig_qs.last = NULL;
}
res = !c_p->sig_qs.cont;
}
else if (*next_nm_sig) {
/*
* All messages prior to next non-message
* signal should be moved to inner queue.
* Next non-message signal to handle should
* be first in middle queue.
*/
ASSERT(**next_nm_sig);
if (*next_nm_sig != &c_p->sig_qs.cont) {
*c_p->sig_qs.last = c_p->sig_qs.cont;
c_p->sig_qs.last = *next_nm_sig;
c_p->sig_qs.cont = **next_nm_sig;
if (c_p->sig_qs.nmsigs.last == *next_nm_sig)
c_p->sig_qs.nmsigs.last = &c_p->sig_qs.cont;
*next_nm_sig = &c_p->sig_qs.cont;
*c_p->sig_qs.last = NULL;
}
ASSERT(c_p->sig_qs.cont);
*statep = erts_atomic32_read_nob(&c_p->state);
res = 0;
}
else {
/*
* All non-message signals handled. All
* messages should be moved to inner queue.
* Middle queue should be empty.
*/
ASSERT(!c_p->sig_qs.nmsigs.next);
c_p->sig_qs.nmsigs.last = NULL;
if (c_p->sig_qs.cont_last != &c_p->sig_qs.cont) {
ASSERT(!*c_p->sig_qs.last);
*c_p->sig_qs.last = c_p->sig_qs.cont;
c_p->sig_qs.last = c_p->sig_qs.cont_last;
ASSERT(!*c_p->sig_qs.last);
c_p->sig_qs.cont_last = &c_p->sig_qs.cont;
c_p->sig_qs.cont = NULL;
}
ASSERT(!c_p->sig_qs.cont);
state = erts_atomic32_read_band_nob(&c_p->state,
~ERTS_PSFLG_SIG_Q);
state &= ~ERTS_PSFLG_SIG_Q;
*statep = state;
res = !0;
}
if (deferred_saved_last
&& (c_p->sig_qs.saved_last == &c_p->sig_qs.cont)) {
c_p->sig_qs.saved_last = c_p->sig_qs.last;
c_p->flags &= ~F_DEFERRED_SAVED_LAST;
if (deferred_save)
c_p->sig_qs.save = c_p->sig_qs.saved_last;
}
else if (!res) {
if (deferred_save) {
c_p->sig_qs.save = c_p->sig_qs.last;
ASSERT(!PEEK_MESSAGE(c_p));
}
}
else {
c_p->flags &= ~F_DEFERRED_SAVED_LAST;
if (deferred_save)
c_p->sig_qs.save = c_p->sig_qs.saved_last;
}
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
*redsp = cnt/4 + 1;
if (yield) {
int vreds = max_reds - *redsp;
if (vreds > 0) {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
esdp->virtual_reds += vreds;
}
*redsp = max_reds;
}
return res;
}
}
static int
stretch_limit(Process *c_p, ErtsSigRecvTracing *tp,
int abs_lim, int *limp)
{
int lim;
/*
* Stretch limit up to a maximum of 'abs_lim' if
* there currently are no messages available to
* inspect by 'receive' and it might be possible
* to get messages available by processing
* signals (or trace messages).
*/
lim = *limp;
ASSERT(abs_lim >= lim);
if (abs_lim == lim)
return 0;
if (!(c_p->flags & F_DEFERRED_SAVED_LAST)) {
ErtsSignal *sig;
if (PEEK_MESSAGE(c_p))
return 0;
sig = (ErtsSignal *) c_p->sig_qs.cont;
if (!sig)
return 0; /* No signals to process available... */
if (ERTS_SIG_IS_MSG(sig) && tp->messages.next != &c_p->sig_qs.cont)
return 0;
}
lim += ERTS_SIG_REDS_CNT_FACTOR*100;
if (lim > abs_lim)
lim = abs_lim;
*limp = lim;
return !0;
}
int
erts_proc_sig_handle_exit(Process *c_p, Sint *redsp)
{
int cnt;
Sint limit;
ErtsMessage *sig, ***next_nm_sig;
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
ERTS_LC_ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);
ASSERT(!(ERTS_PSFLG_SIG_IN_Q & erts_atomic32_read_nob(&c_p->state)));
limit = *redsp;
limit *= ERTS_SIG_REDS_CNT_FACTOR;
*redsp = 1;
next_nm_sig = &c_p->sig_qs.nmsigs.next;
if (!*next_nm_sig) {
ASSERT(!c_p->sig_qs.nmsigs.last);
return !0; /* done... */
}
cnt = 0;
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, NULL, next_nm_sig);
do {
Eterm tag;
Uint16 type;
int op;
sig = **next_nm_sig;
ASSERT(sig);
ASSERT(ERTS_SIG_IS_NON_MSG(sig));
tag = ((ErtsSignal *) sig)->common.tag;
type = ERTS_PROC_SIG_TYPE(tag);
op = ERTS_PROC_SIG_OP(tag);
remove_nm_sig(c_p, sig, next_nm_sig);
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, NULL, next_nm_sig);
cnt++;
switch (op) {
case ERTS_SIG_Q_OP_EXIT:
case ERTS_SIG_Q_OP_EXIT_LINKED:
case ERTS_SIG_Q_OP_MONITOR_DOWN:
switch (type) {
case ERTS_SIG_Q_TYPE_GEN_EXIT:
sig->next = NULL;
erts_cleanup_messages(sig);
break;
case ERTS_LNK_TYPE_PORT:
case ERTS_LNK_TYPE_PROC:
case ERTS_LNK_TYPE_DIST_PROC:
erts_link_release((ErtsLink *) sig);
break;
case ERTS_MON_TYPE_PORT:
case ERTS_MON_TYPE_PROC:
case ERTS_MON_TYPE_DIST_PROC:
case ERTS_MON_TYPE_NODE:
case ERTS_MON_TYPE_NODES:
case ERTS_MON_TYPE_SUSPEND:
erts_monitor_release((ErtsMonitor *) sig);
break;
default:
ERTS_INTERNAL_ERROR("Unexpected sig type");
break;
}
break;
case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG:
sig->next = NULL;
erts_cleanup_messages(sig);
break;
case ERTS_SIG_Q_OP_MONITOR: {
ErtsProcExitContext pectxt = {c_p, am_noproc, NULL, NULL, NIL};
erts_proc_exit_handle_monitor((ErtsMonitor *) sig,
(void *) &pectxt, -1);
cnt += 4;
break;
}
case ERTS_SIG_Q_OP_DEMONITOR:
if (type == ERTS_SIG_Q_TYPE_DIST_PROC_DEMONITOR)
destroy_dist_proc_demonitor((ErtsSigDistProcDemonitor *) sig);
else
erts_monitor_release((ErtsMonitor *) sig);
break;
case ERTS_SIG_Q_OP_LINK: {
ErtsProcExitContext pectxt = {c_p, am_noproc};
erts_proc_exit_handle_link((ErtsLink *) sig, (void *) &pectxt, -1);
break;
}
case ERTS_SIG_Q_OP_UNLINK:
if (type == ERTS_SIG_Q_TYPE_DIST_LINK)
destroy_sig_dist_link_op((ErtsSigDistLinkOp *) sig);
else
erts_link_release((ErtsLink *) sig);
break;
case ERTS_SIG_Q_OP_GROUP_LEADER: {
ErtsSigGroupLeader *sgl = (ErtsSigGroupLeader *) sig;
handle_group_leader(c_p, sgl);
break;
}
case ERTS_SIG_Q_OP_IS_ALIVE:
is_alive_response(c_p, sig, 0);
break;
case ERTS_SIG_Q_OP_PROCESS_INFO:
handle_process_info(c_p, NULL, sig, next_nm_sig, 0);
break;
case ERTS_SIG_Q_OP_SYNC_SUSPEND:
handle_sync_suspend(c_p, sig);
break;
case ERTS_SIG_Q_OP_RPC: {
int yield = 0;
handle_rpc(c_p, (ErtsProcSigRPC *) sig,
cnt, limit, &yield);
break;
}
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
destroy_trace_info((ErtsSigTraceInfo *) sig);
break;
default:
ERTS_INTERNAL_ERROR("Unknown signal");
break;
}
} while (cnt >= limit && *next_nm_sig);
*redsp += cnt / ERTS_SIG_REDS_CNT_FACTOR;
if (*next_nm_sig)
return 0;
ASSERT(!c_p->sig_qs.nmsigs.next);
c_p->sig_qs.nmsigs.last = NULL;
(void) erts_atomic32_read_band_nob(&c_p->state,
~ERTS_PSFLG_SIG_Q);
return !0;
}
#ifdef USE_VM_PROBES
# define ERTS_CLEAR_SEQ_TOKEN(MP) \
ERL_MESSAGE_TOKEN((MP)) = ((ERL_MESSAGE_DT_UTAG((MP)) != NIL) \
? am_have_dt_utag : NIL)
#else
# define ERTS_CLEAR_SEQ_TOKEN(MP) \
ERL_MESSAGE_TOKEN((MP)) = NIL
#endif
static ERTS_INLINE void
clear_seq_trace_token(ErtsMessage *sig)
{
if (ERTS_SIG_IS_MSG((ErtsSignal *) sig))
ERTS_CLEAR_SEQ_TOKEN(sig);
else {
Uint tag;
Uint16 op, type;
tag = ((ErtsSignal *) sig)->common.tag;
type = ERTS_PROC_SIG_TYPE(tag);
op = ERTS_PROC_SIG_OP(tag);
switch (op) {
case ERTS_SIG_Q_OP_EXIT:
case ERTS_SIG_Q_OP_EXIT_LINKED:
case ERTS_SIG_Q_OP_MONITOR_DOWN:
switch (type) {
case ERTS_SIG_Q_TYPE_GEN_EXIT:
ERTS_CLEAR_SEQ_TOKEN(sig);
break;
case ERTS_LNK_TYPE_PORT:
case ERTS_LNK_TYPE_PROC:
case ERTS_LNK_TYPE_DIST_PROC:
case ERTS_MON_TYPE_PORT:
case ERTS_MON_TYPE_PROC:
case ERTS_MON_TYPE_DIST_PROC:
case ERTS_MON_TYPE_NODE:
break;
default:
ERTS_INTERNAL_ERROR("Unexpected sig type");
break;
}
break;
case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG:
ERTS_CLEAR_SEQ_TOKEN(sig);
break;
case ERTS_SIG_Q_OP_MONITOR:
case ERTS_SIG_Q_OP_DEMONITOR:
case ERTS_SIG_Q_OP_LINK:
case ERTS_SIG_Q_OP_UNLINK:
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
break;
default:
ERTS_INTERNAL_ERROR("Unknown signal");
break;
}
}
}
void
erts_proc_sig_clear_seq_trace_tokens(Process *c_p)
{
erts_proc_sig_fetch(c_p);
ERTS_FOREACH_SIG_PRIVQS(c_p, sig, clear_seq_trace_token(sig));
}
Uint
erts_proc_sig_signal_size(ErtsSignal *sig)
{
Eterm tag;
Uint16 type;
int op;
Uint size = 0;
ASSERT(sig);
ASSERT(ERTS_SIG_IS_NON_MSG(sig));
tag = sig->common.tag;
type = ERTS_PROC_SIG_TYPE(tag);
op = ERTS_PROC_SIG_OP(tag);
switch (op) {
case ERTS_SIG_Q_OP_EXIT:
case ERTS_SIG_Q_OP_EXIT_LINKED:
case ERTS_SIG_Q_OP_MONITOR_DOWN:
switch (type) {
case ERTS_SIG_Q_TYPE_GEN_EXIT:
size = ((ErtsMessage *) sig)->hfrag.alloc_size;
size *= sizeof(Eterm);
size += sizeof(ErtsMessage) - sizeof(Eterm);
break;
case ERTS_LNK_TYPE_PORT:
case ERTS_LNK_TYPE_PROC:
case ERTS_LNK_TYPE_DIST_PROC:
size = erts_link_size((ErtsLink *) sig);
break;
case ERTS_MON_TYPE_PORT:
case ERTS_MON_TYPE_PROC:
case ERTS_MON_TYPE_DIST_PROC:
case ERTS_MON_TYPE_NODE:
size = erts_monitor_size((ErtsMonitor *) sig);
default:
ERTS_INTERNAL_ERROR("Unexpected sig type");
break;
}
break;
case ERTS_SIG_Q_OP_SYNC_SUSPEND:
case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG:
case ERTS_SIG_Q_OP_IS_ALIVE:
size = ((ErtsMessage *) sig)->hfrag.alloc_size;
size *= sizeof(Eterm);
size += sizeof(ErtsMessage) - sizeof(Eterm);
break;
case ERTS_SIG_Q_OP_DEMONITOR:
if (type == ERTS_SIG_Q_TYPE_DIST_PROC_DEMONITOR) {
size = NC_HEAP_SIZE(((ErtsSigDistProcDemonitor *) sig)->ref);
size--;
size *= sizeof(Eterm);
size += sizeof(ErtsSigDistProcDemonitor);
break;
}
/* Fall through... */
case ERTS_SIG_Q_OP_MONITOR:
size = erts_monitor_size((ErtsMonitor *) sig);
break;
case ERTS_SIG_Q_OP_UNLINK:
if (type == ERTS_SIG_Q_TYPE_DIST_LINK) {
size = NC_HEAP_SIZE(((ErtsSigDistLinkOp *) sig)->remote);
size--;
size *= sizeof(Eterm);
size += sizeof(ErtsSigDistLinkOp);
break;
}
/* Fall through... */
case ERTS_SIG_Q_OP_LINK:
size = erts_link_size((ErtsLink *) sig);
break;
case ERTS_SIG_Q_OP_GROUP_LEADER: {
ErtsSigGroupLeader *sgl = (ErtsSigGroupLeader *) sig;
size = size_object(sgl->group_leader);
size += size_object(sgl->ref);
size *= sizeof(Eterm);
size += sizeof(ErtsSigGroupLeader) - sizeof(Eterm);
break;
}
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
size = sizeof(ErtsSigTraceInfo);
break;
case ERTS_SIG_Q_OP_PROCESS_INFO: {
ErtsProcessInfoSig *pisig = (ErtsProcessInfoSig *) sig;
size = sizeof(ErtsProcessInfoSig);
size += (pisig->len - 1) * sizeof(int);
break;
}
case ERTS_SIG_Q_OP_RPC:
size = sizeof(ErtsProcSigRPC);
break;
default:
ERTS_INTERNAL_ERROR("Unknown signal");
break;
}
return size;
}
int
erts_proc_sig_receive_helper(Process *c_p,
int fcalls,
int neg_o_reds,
ErtsMessage **msgpp,
int *get_outp)
{
ErtsMessage *msgp;
int reds, consumed_reds, left_reds, max_reds;
/*
* Called from the loop-rec instruction when receive
* has reached end of inner (private) queue. This function
* tries to move more messages into the inner queue
* for the receive to handle. This by, processing the
* middle (private) queue and/or moving signals from
* the outer (public) queue into the middle queue.
*
* If this function succeeds in making more messages
* available in the inner queue, *msgpp points to next
* message. If *msgpp is set to NULL when:
* -- process became exiting. *get_outp is set to a
* value greater than zero.
* -- process needs to yield. *get_outp is set to a
* value less than zero.
* -- no more messages exist in any of the queues.
* *get_outp is set to zero and the message queue
* lock remains locked. This so the process can
* make its way to the appropriate wait instruction
* without racing with new incoming messages.
*/
ASSERT(erts_proc_lc_my_proc_locks(c_p) == ERTS_PROC_LOCK_MAIN);
ASSERT(!ERTS_PROC_IS_EXITING(c_p));
ASSERT(!*msgpp);
left_reds = fcalls - neg_o_reds;
consumed_reds = 0;
while (!0) {
erts_aint32_t state;
if (!c_p->sig_qs.cont) {
consumed_reds += 4;
left_reds -= 4;
erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
erts_proc_sig_fetch(c_p);
/*
* Messages may have been moved directly to
* inner queue...
*/
msgp = PEEK_MESSAGE(c_p);
if (msgp) {
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
*get_outp = 0;
*msgpp = msgp;
return consumed_reds;
}
if (!c_p->sig_qs.cont) {
/*
* No messages! Return with message queue
* locked and let the process continue
* to wait instruction...
*/
*get_outp = 0;
*msgpp = NULL;
return consumed_reds;
}
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
if (left_reds <= 0)
break; /* Yield */
/* handle newly arrived signals... */
}
reds = ERTS_SIG_HANDLE_REDS_MAX_PREFERED;
#ifdef DEBUG
/* test that it works also with very few reds */
max_reds = left_reds;
if (reds > left_reds)
reds = left_reds;
#else
/* At least work preferred amount of reds... */
max_reds = left_reds;
if (max_reds < reds)
max_reds = reds;
#endif
(void) erts_proc_sig_handle_incoming(c_p, &state, &reds,
max_reds, !0);
consumed_reds += reds;
left_reds -= reds;
/* we may have exited or suspended by an incoming signal... */
if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_SUSPENDED)) {
if (state & ERTS_PSFLG_SUSPENDED)
break; /* Yield */
/*
* Process need to schedule out in order
* to terminate. Prepare this a bit...
*/
ASSERT(state & ERTS_PSFLG_EXITING);
ASSERT(c_p->flags & F_DELAY_GC);
c_p->flags &= ~F_DELAY_GC;
c_p->arity = 0;
c_p->current = NULL;
*get_outp = 1;
*msgpp = NULL;
return consumed_reds;
}
msgp = PEEK_MESSAGE(c_p);
if (msgp) {
*get_outp = 0;
*msgpp = msgp;
return consumed_reds;
}
if (left_reds <= 0)
break; /* yield */
ASSERT(!c_p->sig_qs.cont);
/* Go fetch again... */
}
/* Yield... */
*get_outp = -1;
*msgpp = NULL;
ASSERT(consumed_reds >= (fcalls - neg_o_reds));
return consumed_reds;
}
static int
handle_trace_change_state(Process *c_p,
ErtsSigRecvTracing *tracing,
Uint16 type,
ErtsMessage *sig,
ErtsMessage ***next_nm_sig)
{
ErtsSigTraceInfo *trace_info = (ErtsSigTraceInfo *) sig;
ErtsMessage **next = *next_nm_sig;
int msgs_active, old_msgs_active = !!tracing->messages.active;
ASSERT(sig == *next);
erts_proc_lock(c_p, ERTS_PROC_LOCKS_ALL_MINOR);
ERTS_TRACE_FLAGS(c_p) |= trace_info->flags_on;
ERTS_TRACE_FLAGS(c_p) &= ~trace_info->flags_off;
if (is_value(trace_info->tracer))
erts_tracer_replace(&c_p->common, trace_info->tracer);
erts_proc_unlock(c_p, ERTS_PROC_LOCKS_ALL_MINOR);
remove_nm_sig(c_p, sig, next_nm_sig);
destroy_trace_info(trace_info);
/*
* Adjust tracing state according to modifications made by
* the trace info signal...
*/
adjust_tracing_state(c_p, tracing, 0);
msgs_active = !!tracing->messages.active;
if (old_msgs_active ^ msgs_active) {
if (msgs_active) {
ASSERT(!tracing->messages.next);
tracing->messages.next = next;
}
else {
ASSERT(tracing->messages.next);
tracing->messages.next = NULL;
}
}
ASSERT(!msgs_active || tracing->messages.next);
return msgs_active;
}
static void
getting_unlinked(Process *c_p, Eterm unlinker)
{
trace_proc(c_p, ERTS_PROC_LOCK_MAIN, c_p,
am_getting_unlinked, unlinker);
}
static void
getting_linked(Process *c_p, Eterm linker)
{
trace_proc(c_p, ERTS_PROC_LOCK_MAIN, c_p,
am_getting_linked, linker);
}
static ERTS_INLINE void
handle_message_enqueued_tracing(Process *c_p,
ErtsSigRecvTracing *tracing,
ErtsMessage *msg)
{
ASSERT(ERTS_SIG_IS_INTERNAL_MSG(msg));
#if defined(USE_VM_PROBES)
if (tracing->messages.vm_probes && DTRACE_ENABLED(message_queued)) {
Sint tok_label = 0;
Sint tok_lastcnt = 0;
Sint tok_serial = 0;
Sint len = erts_proc_sig_privqs_len(c_p);
Eterm seq_trace_token = ERL_MESSAGE_TOKEN(msg);
if (seq_trace_token != NIL && is_tuple(seq_trace_token)) {
tok_label = SEQ_TRACE_T_DTRACE_LABEL(seq_trace_token);
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(seq_trace_token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(seq_trace_token));
}
/* Message intentionally not passed... */
DTRACE6(message_queued,
tracing->messages.receiver_name,
size_object(ERL_MESSAGE_TERM(msg)),
len, /* This is NOT message queue len, but its something... */
tok_label, tok_lastcnt, tok_serial);
}
#endif
if (tracing->messages.receive_trace && tracing->messages.event->on) {
ASSERT(IS_TRACED(c_p));
trace_receive(c_p,
ERL_MESSAGE_FROM(msg),
ERL_MESSAGE_TERM(msg),
tracing->messages.event);
}
}
static int
handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing,
ErtsMessage ***next_nm_sig)
{
ErtsMessage **next_sig, *sig;
int cnt = 0, limit = ERTS_PROC_SIG_TRACE_COUNT_LIMIT;
ASSERT(tracing->messages.next);
next_sig = tracing->messages.next;
sig = *next_sig;
/*
* Receive tracing active. Handle all messages
* until next non-message signal...
*/
while (sig && ERTS_SIG_IS_MSG(sig)) {
if (cnt > limit) {
tracing->messages.next = next_sig;
return -1; /* Yield... */
}
if (ERTS_SIG_IS_EXTERNAL_MSG(sig)) {
cnt += 50; /* Decode is expensive... */
if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN,
sig, 0)) {
/* Bad dist message; remove it... */
remove_mq_m_sig(c_p, sig, next_sig, next_nm_sig);
sig->next = NULL;
erts_cleanup_messages(sig);
sig = *next_sig;
continue;
}
}
handle_message_enqueued_tracing(c_p, tracing, sig);
cnt++;
next_sig = &(*next_sig)->next;
sig = *next_sig;
}
tracing->messages.next = next_sig;
if (!sig) {
ASSERT(!*next_nm_sig);
return 1; /* end... */
}
ASSERT(*next_nm_sig);
ASSERT(**next_nm_sig == sig);
/* Next signal a non-message signal... */
ASSERT(ERTS_SIG_IS_NON_MSG(sig));
/*
* Return and handle the non-message signal...
*/
return 0;
}
Uint
erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
Process *rp,
ErtsProcLocks rp_locks,
int info_on_self,
ErtsMessageInfo *mip)
{
Uint tot_heap_size;
ErtsMessage *mp, **mpp;
Sint i;
int self_on_heap;
/*
* Prepare the message queue (inner signal queue)
* for inspection by process_info().
*
* - Decode all messages on external format
* - Remove all corrupt dist messages from queue
* - Save pointer to, and heap size need of each
* message in the mip array.
* - Return total heap size need for all messages
* that needs to be copied.
*
*/
ASSERT(!info_on_self || c_p == rp);
self_on_heap = info_on_self && !(c_p->flags & F_OFF_HEAP_MSGQ);
tot_heap_size = 0;
i = 0;
mpp = &rp->sig_qs.first;
mp = rp->sig_qs.first;
while (mp) {
Eterm msg = ERL_MESSAGE_TERM(mp);
mip[i].size = 0;
if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) {
/* decode it... */
if (!erts_proc_sig_decode_dist(rp, rp_locks, mp, !0)) {
ErtsMessage *bad_mp = mp;
/*
* Bad distribution message; remove
* it from the queue...
*/
ASSERT(*mpp == bad_mp);
remove_iq_m_sig(rp, mp, mpp);
mp = *mpp;
bad_mp->next = NULL;
erts_cleanup_messages(bad_mp);
continue;
}
msg = ERL_MESSAGE_TERM(mp);
}
ASSERT(is_value(msg));
if (is_not_immed(msg) && (!self_on_heap || mp->data.attached)) {
Uint sz = size_object(msg);
mip[i].size = sz;
tot_heap_size += sz;
}
mip[i].msgp = mp;
i++;
mpp = &mp->next;
mp = mp->next;
}
ASSERT(c_p->sig_qs.len == i);
return tot_heap_size;
}
static ERTS_INLINE void
move_msg_to_heap(Process *c_p, ErtsMessage *mp)
{
/*
* We leave not yet decoded distribution messages
* as they are in the queue since it is not
* possible to determine a maximum size until
* actual decoding...
*
* We also leave combined messages as they are...
*/
if (ERTS_SIG_IS_INTERNAL_MSG(mp)
&& mp->data.attached
&& mp->data.attached != ERTS_MSG_COMBINED_HFRAG) {
ErlHeapFragment *bp;
bp = erts_message_to_heap_frag(mp);
if (bp->next)
erts_move_multi_frags(&c_p->htop, &c_p->off_heap, bp,
mp->m, ERL_MESSAGE_REF_ARRAY_SZ, 0);
else
erts_copy_one_frag(&c_p->htop, &c_p->off_heap, bp,
mp->m, ERL_MESSAGE_REF_ARRAY_SZ);
mp->data.heap_frag = NULL;
free_message_buffer(bp);
}
}
void
erts_proc_sig_move_msgs_to_heap(Process *c_p)
{
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
ERTS_FOREACH_SIG_PRIVQS(c_p, sig, move_msg_to_heap(c_p, sig));
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
}
BIF_RETTYPE
erts_internal_dirty_process_handle_signals_1(BIF_ALIST_1)
{
erts_aint32_t state, dirty, noproc;
int busy;
Process *rp;
if (BIF_P != erts_dirty_process_signal_handler
&& BIF_P != erts_dirty_process_signal_handler_high
&& BIF_P != erts_dirty_process_signal_handler_max)
BIF_ERROR(BIF_P, EXC_NOTSUP);
if (is_not_internal_pid(BIF_ARG_1))
BIF_RET(am_false);
rp = erts_proc_lookup_raw(BIF_ARG_1);
if (!rp)
BIF_RET(am_noproc);
state = erts_atomic32_read_nob(&rp->state);
dirty = (state & (ERTS_PSFLG_DIRTY_RUNNING
| ERTS_PSFLG_DIRTY_RUNNING_SYS));
if (!dirty)
BIF_RET(am_normal);
busy = erts_proc_trylock(rp, ERTS_PROC_LOCK_MAIN) == EBUSY;
state = erts_atomic32_read_mb(&rp->state);
noproc = (state & ERTS_PSFLG_FREE);
dirty = (state & (ERTS_PSFLG_DIRTY_RUNNING
| ERTS_PSFLG_DIRTY_RUNNING_SYS));
if (busy) {
if (noproc)
BIF_RET(am_noproc);
if (dirty)
BIF_RET(am_more); /* try again... */
BIF_RET(am_normal); /* will handle signals itself... */
}
else {
erts_aint32_t state;
int done;
Eterm res = am_false;
int reds = 0;
if (noproc)
res = am_noproc;
else if (!dirty)
res = am_normal; /* will handle signals itself... */
else {
reds = ERTS_BIF_REDS_LEFT(BIF_P);
done = erts_proc_sig_handle_incoming(rp, &state, &reds,
reds, 0);
if (done || (state & ERTS_PSFLG_EXITING))
res = am_ok;
else
res = am_more;
}
erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);
if (reds)
BUMP_REDS(BIF_P, reds);
BIF_RET(res);
}
}
static void
debug_foreach_sig_heap_frags(ErlHeapFragment *hfrag,
void (*oh_func)(ErlOffHeap *, void *),
void *arg)
{
ErlHeapFragment *hf = hfrag;
while (hf) {
oh_func(&(hf->off_heap), arg);
hf = hf->next;
}
}
static void
debug_foreach_sig_fake_oh(Eterm term,
void (*oh_func)(ErlOffHeap *, void *),
void *arg)
{
if (is_external(term)) {
ErlOffHeap oh;
oh.overhead = 0;
oh.first = ((struct erl_off_heap_header *)
(char *) external_thing_ptr(term));
ASSERT(!oh.first->next);
oh_func(&oh, arg);
}
}
static void
debug_foreach_sig_external(ErtsMessage *msgp,
void (*ext_func)(ErtsDistExternal *, void *),
void *arg)
{
ext_func(erts_proc_sig_get_external(msgp), arg);
}
void
erts_proc_sig_debug_foreach_sig(Process *c_p,
void (*msg_func)(ErtsMessage *, void *),
void (*oh_func)(ErlOffHeap *, void *),
ErtsMonitorFunc mon_func,
ErtsLinkFunc lnk_func,
void (*ext_func)(ErtsDistExternal *, void *),
void *arg)
{
ErtsMessage *queue[] = {c_p->sig_qs.first, c_p->sig_qs.cont, c_p->sig_inq.first};
int qix;
for (qix = 0; qix < sizeof(queue)/sizeof(queue[0]); qix++) {
ErtsMessage *sig;
for (sig = queue[qix]; sig; sig = sig->next) {
if (ERTS_SIG_IS_MSG(sig)) {
msg_func(sig, arg);
} else {
Eterm tag;
Uint16 type;
int op;
ASSERT(sig);
ASSERT(ERTS_SIG_IS_NON_MSG(sig));
tag = ((ErtsSignal *) sig)->common.tag;
type = ERTS_PROC_SIG_TYPE(tag);
op = ERTS_PROC_SIG_OP(tag);
switch (op) {
case ERTS_SIG_Q_OP_EXIT:
case ERTS_SIG_Q_OP_EXIT_LINKED:
case ERTS_SIG_Q_OP_MONITOR_DOWN:
switch (type) {
case ERTS_SIG_Q_TYPE_GEN_EXIT:
if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig))
debug_foreach_sig_external(sig, ext_func, arg);
else
debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg);
break;
case ERTS_LNK_TYPE_PORT:
case ERTS_LNK_TYPE_PROC:
case ERTS_LNK_TYPE_DIST_PROC:
lnk_func((ErtsLink *) sig, arg, -1);
break;
case ERTS_MON_TYPE_PORT:
case ERTS_MON_TYPE_PROC:
case ERTS_MON_TYPE_DIST_PROC:
case ERTS_MON_TYPE_NODE:
mon_func((ErtsMonitor *) sig, arg, -1);
break;
default:
ERTS_INTERNAL_ERROR("Unexpected sig type");
break;
}
break;
case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG:
debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg);
break;
case ERTS_SIG_Q_OP_DEMONITOR:
if (type == ERTS_SIG_Q_TYPE_DIST_PROC_DEMONITOR) {
debug_foreach_sig_fake_oh(((ErtsSigDistProcDemonitor *) sig)->ref,
oh_func, arg);
break;
}
/* Fall through... */
case ERTS_SIG_Q_OP_MONITOR:
mon_func((ErtsMonitor *) sig, arg, -1);
break;
case ERTS_SIG_Q_OP_UNLINK:
if (type == ERTS_SIG_Q_TYPE_DIST_LINK) {
debug_foreach_sig_fake_oh(((ErtsSigDistLinkOp *) sig)->remote,
oh_func, arg);
break;
}
/* Fall through... */
case ERTS_SIG_Q_OP_LINK:
lnk_func((ErtsLink *) sig, arg, -1);
break;
case ERTS_SIG_Q_OP_GROUP_LEADER: {
ErtsSigGroupLeader *sgl = (ErtsSigGroupLeader *) sig;
oh_func(&sgl->oh, arg);
break;
}
case ERTS_SIG_Q_OP_IS_ALIVE:
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
case ERTS_SIG_Q_OP_PROCESS_INFO:
break;
default:
ERTS_INTERNAL_ERROR("Unknown signal");
break;
}
}
}
}
}
#ifdef ERTS_PROC_SIG_HARD_DEBUG
static void
chk_eterm(Process *c_p, int privq, ErtsMessage *mp, Eterm term)
{
ErlHeapFragment *bp;
Eterm *ptr = NULL;
switch (primary_tag(term)) {
case TAG_PRIMARY_IMMED1:
return;
case TAG_PRIMARY_LIST:
ptr = list_val(term);
ERTS_ASSERT(!is_header(CAR(ptr)));
ERTS_ASSERT(!is_header(CDR(ptr)));
break;
case TAG_PRIMARY_BOXED:
ptr = boxed_val(term);
ERTS_ASSERT(is_header(*ptr));
break;
case TAG_PRIMARY_HEADER:
default:
ERTS_INTERNAL_ERROR("Not valid term");
break;
}
if (erts_is_literal(term, ptr))
return;
for (bp = erts_message_to_heap_frag(mp); bp; bp = bp->next) {
if (bp->mem <= ptr && ptr < bp->mem + bp->used_size)
return;
}
ASSERT(erts_dbg_within_proc(ptr, c_p, NULL));
}
static Sint
proc_sig_hdbg_check_queue(Process *proc,
int privq,
ErtsMessage **sig_next,
ErtsMessage **sig_last,
ErtsMessage **sig_nm_next,
ErtsMessage **sig_nm_last,
ErtsSigRecvTracing *tracing,
int *found_saved_last_p,
erts_aint32_t sig_psflg)
{
ErtsMessage **next, *sig, **nm_next, **nm_last;
int last_nm_sig_found, nm_sigs = 0, found_next_trace = 0,
found_save = 0, last_sig_found = 0, found_saved_last = 0;
Sint msg_len = 0;
ErtsMessage **next_trace = tracing ? tracing->messages.next : NULL;
ErtsMessage **save = proc->sig_qs.save;
ErtsMessage **saved_last = proc->sig_qs.saved_last;
if (!privq) {
ErtsSignal *sig = (ErtsSignal *) *sig_next;
if (sig->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK) {
}
}
nm_next = sig_nm_next;
nm_last = sig_nm_last;
next = sig_next;
sig = *sig_next;
last_nm_sig_found = !nm_last;
if (last_nm_sig_found)
ERTS_ASSERT(!nm_next);
else
ERTS_ASSERT(nm_next);
while (1) {
ErtsSignal *nm_sig;
if (next == sig_last) {
ASSERT(!*next);
last_sig_found = 1;
}
if (next == save)
found_save = 1;
if (next == saved_last)
found_saved_last = 1;
if (next == next_trace) {
found_next_trace = 1;
ERTS_ASSERT(nm_sigs == 0);
}
while (sig && ERTS_SIG_IS_MSG(sig)) {
int i;
if (ERTS_SIG_IS_EXTERNAL_MSG(sig))
i = 1;
else
i = 0;
for (; i < ERL_MESSAGE_REF_ARRAY_SZ; i++)
chk_eterm(proc, privq, sig, sig->m[i]);
msg_len++;
next = &sig->next;
sig = sig->next;
if (next == sig_last) {
ASSERT(!*next);
last_sig_found = 1;
}
if (next == save)
found_save = 1;
if (next == saved_last)
found_saved_last = 1;
if (next == next_trace) {
found_next_trace = 1;
ERTS_ASSERT(nm_sigs == 0);
}
}
if (!sig)
break;
nm_sig = (ErtsSignal *) sig;
if (nm_sig->common.tag == ERTS_PROC_SIG_MSGQ_LEN_OFFS_MARK) {
ERTS_ASSERT(!privq);
ERTS_ASSERT(sig == *sig_next);
}
else {
nm_sigs++;
ERTS_ASSERT(!last_nm_sig_found);
ERTS_ASSERT(ERTS_SIG_IS_NON_MSG(sig));
ERTS_ASSERT(nm_next == next);
if (nm_last == next) {
ASSERT(!nm_sig->common.specific.next);
last_nm_sig_found = 1;
}
nm_next = nm_sig->common.specific.next;
}
next = &nm_sig->common.next;
sig = nm_sig->common.next;
}
if (!privq) {
/* outer queue */
ERTS_ASSERT(!found_save);
ERTS_ASSERT(!found_saved_last);
}
else if (privq > 0) {
/* middle queue */
ERTS_ASSERT(!next_trace || found_next_trace);
ERTS_ASSERT(!found_save);
if (!found_saved_last_p) {
ERTS_ASSERT(!found_saved_last
|| (proc->flags & F_DEFERRED_SAVED_LAST));
}
else {
if (*found_saved_last_p) {
ERTS_ASSERT(!found_saved_last);
ERTS_ASSERT(!(proc->flags & F_DEFERRED_SAVED_LAST));
}
else if (saved_last) {
ERTS_ASSERT(found_saved_last);
ERTS_ASSERT(proc->flags & F_DEFERRED_SAVED_LAST);
}
*found_saved_last_p |= found_saved_last;
}
}
else {
/* inner queue */
ERTS_ASSERT(!found_next_trace);
ERTS_ASSERT(nm_sigs == 0);
ERTS_ASSERT(found_save);
ERTS_ASSERT(!saved_last
|| (found_saved_last
|| (proc->flags & F_DEFERRED_SAVED_LAST)));
if (found_saved_last_p)
*found_saved_last_p |= found_saved_last;
}
ERTS_ASSERT(last_nm_sig_found);
ERTS_ASSERT(last_sig_found);
if (sig_psflg != ERTS_PSFLG_FREE) {
erts_aint32_t state = erts_atomic32_read_nob(&proc->state);
ERTS_ASSERT(nm_sigs ? !!(state & sig_psflg) : !(state & sig_psflg));
}
return msg_len;
}
void
erts_proc_sig_hdbg_check_priv_queue(Process *p, int qlock, char *what, char *file, int line)
{
int found_saved_last = 0;
Sint len, len1, len2;
ERTS_LC_ASSERT(erts_thr_progress_is_blocking()
|| ERTS_PROC_IS_EXITING(p)
|| (ERTS_PROC_LOCK_MAIN
& erts_proc_lc_my_proc_locks(p)));
len1 = proc_sig_hdbg_check_queue(p,
-1,
&p->sig_qs.first,
p->sig_qs.last,
NULL,
NULL,
NULL,
&found_saved_last,
ERTS_PSFLG_FREE);
len2 = proc_sig_hdbg_check_queue(p,
1,
&p->sig_qs.cont,
p->sig_qs.cont_last,
p->sig_qs.nmsigs.next,
p->sig_qs.nmsigs.last,
NULL,
&found_saved_last,
ERTS_PSFLG_SIG_Q);
if (p->sig_qs.saved_last)
ERTS_ASSERT(found_saved_last);
len = proc_sig_privqs_len(p, qlock);
ERTS_ASSERT(len == len1 + len2);
}
void
erts_proc_sig_hdbg_check_in_queue(Process *p, char *what, char *file, int line)
{
Sint len;
ERTS_LC_ASSERT(erts_thr_progress_is_blocking()
|| ERTS_PROC_IS_EXITING(p)
|| (ERTS_PROC_LOCK_MSGQ
& erts_proc_lc_my_proc_locks(p)));
len = proc_sig_hdbg_check_queue(p,
0,
&p->sig_inq.first,
p->sig_inq.last,
p->sig_inq.nmsigs.next,
p->sig_inq.nmsigs.last,
NULL,
NULL,
ERTS_PSFLG_SIG_IN_Q);
ASSERT(p->sig_inq.len == len); (void)len;
}
#endif /* ERTS_PROC_SIG_HARD_DEBUG */