aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_proc_sig_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_proc_sig_queue.c')
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c400
1 files changed, 306 insertions, 94 deletions
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index f343e984f7..d5e0e3b218 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -36,6 +36,7 @@
#include "erl_port_task.h"
#include "erl_trace.h"
#include "beam_bp.h"
+#include "erl_binary.h"
#include "big.h"
#include "erl_gc.h"
#include "bif.h"
@@ -80,6 +81,11 @@
#define ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO \
ERTS_SIG_Q_TYPE_MAX
+#define ERTS_SIG_IS_GEN_EXIT(sig) \
+ (ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT)
+#define ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig) \
+ (ASSERT(ERTS_SIG_IS_GEN_EXIT(sig)),is_non_value(get_exit_signal_data(sig)->reason))
+
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler);
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_high);
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_max);
@@ -259,7 +265,7 @@ destroy_dist_proc_demonitor(ErtsSigDistProcDemonitor *dmon)
Eterm ref = dmon->ref;
if (is_external(ref)) {
ExternalThing *etp = external_thing_ptr(ref);
- erts_deref_node_entry(etp->node);
+ erts_deref_node_entry(etp->node, ref);
}
erts_free(ERTS_ALC_T_DIST_DEMONITOR, dmon);
}
@@ -294,7 +300,8 @@ destroy_sig_dist_link_op(ErtsSigDistLinkOp *sdlnk)
{
ASSERT(is_external_pid(sdlnk->remote));
ASSERT(boxed_val(sdlnk->remote) == &sdlnk->heap[0]);
- erts_deref_node_entry(((ExternalThing *) &sdlnk->heap[0])->node);
+ erts_deref_node_entry(((ExternalThing *) &sdlnk->heap[0])->node,
+ make_boxed(&sdlnk->heap[0]));
erts_free(ERTS_ALC_T_SIG_DATA, sdlnk);
}
@@ -604,7 +611,8 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op)
#endif
return 1;
}
- ASSERT(esdp->pending_signal.dbg_from == esdp->current_process);
+ ASSERT(esdp->pending_signal.dbg_from == esdp->current_process ||
+ esdp->pending_signal.dbg_from == esdp->free_process);
if (pend_sig != sig) {
/* Switch them and send previously pending signal instead */
Eterm pend_to = esdp->pending_signal.to;
@@ -936,29 +944,54 @@ erts_proc_sig_privqs_len(Process *c_p)
return proc_sig_privqs_len(c_p, 0);
}
+ErtsDistExternal *
+erts_proc_sig_get_external(ErtsMessage *msgp)
+{
+ if (ERTS_SIG_IS_EXTERNAL_MSG(msgp)) {
+ return erts_get_dist_ext(msgp->data.heap_frag);
+ } else if (ERTS_SIG_IS_NON_MSG(msgp) &&
+ ERTS_SIG_IS_GEN_EXIT(msgp) &&
+ ERTS_SIG_IS_GEN_EXIT_EXTERNAL(msgp)) {
+ ErtsDistExternal *edep;
+ ErtsExitSignalData *xsigd = get_exit_signal_data(msgp);
+ ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT);
+ ASSERT(is_non_value(xsigd->reason));
+ if (msgp->hfrag.next == NULL)
+ edep = (ErtsDistExternal*)(xsigd + 1);
+ else
+ edep = erts_get_dist_ext(msgp->hfrag.next);
+ return edep;
+ }
+ return NULL;
+}
+
static void do_seq_trace_output(Eterm to, Eterm token, Eterm msg);
static void
send_gen_exit_signal(Process *c_p, Eterm from_tag,
Eterm from, Eterm to,
- Sint16 op, Eterm reason, Eterm ref,
- Eterm token, int normal_kills)
+ Sint16 op, Eterm reason, ErtsDistExternal *dist_ext,
+ ErlHeapFragment *dist_ext_hfrag,
+ Eterm ref, Eterm token, int normal_kills)
{
ErtsExitSignalData *xsigd;
Eterm *hp, *start_hp, s_reason, s_ref, s_message, s_token, s_from;
ErtsMessage *mp;
ErlHeapFragment *hfrag;
ErlOffHeap *ohp;
- Uint hsz, from_sz, reason_sz, ref_sz, token_sz;
+ Uint hsz, from_sz, reason_sz, ref_sz, token_sz, dist_ext_sz;
int seq_trace;
#ifdef USE_VM_PROBES
Eterm s_utag, utag;
Uint utag_sz;
#endif
+ ASSERT((is_value(reason) && dist_ext == NULL) ||
+ (is_non_value(reason) && dist_ext != NULL));
+
ASSERT(is_immed(from_tag));
- hsz = sizeof(ErtsExitSignalData)/sizeof(Uint);
+ hsz = sizeof(ErtsExitSignalData)/sizeof(Eterm);
seq_trace = c_p && have_seqtrace(token);
if (seq_trace)
@@ -977,33 +1010,44 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
hsz += utag_sz;
#endif
- token_sz = is_immed(token) ? 0 : size_object(token);
+ token_sz = size_object(token);
hsz += token_sz;
- from_sz = is_immed(from) ? 0 : size_object(from);
+ from_sz = size_object(from);
hsz += from_sz;
- reason_sz = is_immed(reason) ? 0 : size_object(reason);
- hsz += reason_sz;
+ ref_sz = size_object(ref);
+ hsz += ref_sz;
- switch (op) {
- case ERTS_SIG_Q_OP_EXIT:
- case ERTS_SIG_Q_OP_EXIT_LINKED: {
- /* {'EXIT', From, Reason} */
- hsz += 4; /* 3-tuple */
- ref_sz = 0;
- break;
- }
- case ERTS_SIG_Q_OP_MONITOR_DOWN: {
- /* {'DOWN', Ref, process, From, Reason} */
- hsz += 6; /* 5-tuple */
- ref_sz = NC_HEAP_SIZE(ref);
- hsz += ref_sz;
- break;
- }
- default:
- ERTS_INTERNAL_ERROR("Invalid exit signal op");
- break;
+ reason_sz = 0; /* Set to silence gcc warning */
+
+ /* The reason was part of the control message,
+ just use copy it into the xsigd */
+ if (is_value(reason)) {
+ reason_sz = size_object(reason);
+ hsz += reason_sz;
+
+ switch (op) {
+ case ERTS_SIG_Q_OP_EXIT:
+ case ERTS_SIG_Q_OP_EXIT_LINKED: {
+ /* {'EXIT', From, Reason} */
+ hsz += 4; /* 3-tuple */
+ break;
+ }
+ case ERTS_SIG_Q_OP_MONITOR_DOWN: {
+ /* {'DOWN', Ref, process, From, Reason} */
+ hsz += 6; /* 5-tuple */
+ break;
+ }
+ default:
+ ERTS_INTERNAL_ERROR("Invalid exit signal op");
+ break;
+ }
+ } else if (dist_ext != NULL && dist_ext_hfrag == NULL) {
+ /* The message was not fragmented so we need to create space
+ for a single dist_ext element */
+ dist_ext_sz = erts_dist_ext_size(dist_ext) / sizeof(Eterm);
+ hsz += dist_ext_sz;
}
/*
@@ -1015,35 +1059,33 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
ohp = &hfrag->off_heap;
start_hp = hp;
- s_token = (is_immed(token)
- ? token
- : copy_struct(token, token_sz, &hp, ohp));
-
- s_reason = (is_immed(reason)
- ? reason
- : copy_struct(reason, reason_sz, &hp, ohp));
+ s_token = copy_struct(token, token_sz, &hp, ohp);
+ s_from = copy_struct(from, from_sz, &hp, ohp);
+ s_ref = copy_struct(ref, ref_sz, &hp, ohp);
- s_from = (is_immed(from)
- ? from
- : copy_struct(from, from_sz, &hp, ohp));
-
- if (!ref_sz)
- s_ref = NIL;
- else
- s_ref = STORE_NC(&hp, ohp, ref);
+ if (is_value(reason)) {
+ s_reason = copy_struct(reason, reason_sz, &hp, ohp);
- switch (op) {
- case ERTS_SIG_Q_OP_EXIT:
- case ERTS_SIG_Q_OP_EXIT_LINKED:
- /* {'EXIT', From, Reason} */
- s_message = TUPLE3(hp, am_EXIT, s_from, s_reason);
- hp += 4;
- break;
- case ERTS_SIG_Q_OP_MONITOR_DOWN:
- /* {'DOWN', Ref, process, From, Reason} */
- s_message = TUPLE5(hp, am_DOWN, s_ref, am_process, s_from, s_reason);
- hp += 6;
- break;
+ switch (op) {
+ case ERTS_SIG_Q_OP_EXIT:
+ case ERTS_SIG_Q_OP_EXIT_LINKED:
+ /* {'EXIT', From, Reason} */
+ s_message = TUPLE3(hp, am_EXIT, s_from, s_reason);
+ hp += 4;
+ break;
+ case ERTS_SIG_Q_OP_MONITOR_DOWN:
+ /* {'DOWN', Ref, process, From, Reason} */
+ s_message = TUPLE5(hp, am_DOWN, s_ref, am_process, s_from, s_reason);
+ hp += 6;
+ break;
+ default:
+ /* This cannot happen, used to silence gcc warning */
+ s_message = THE_NON_VALUE;
+ break;
+ }
+ } else {
+ s_message = THE_NON_VALUE;
+ s_reason = THE_NON_VALUE;
}
#ifdef USE_VM_PROBES
@@ -1061,11 +1103,13 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
hfrag->used_size = hp - start_hp;
- xsigd = (ErtsExitSignalData *) (char *) hp;
+ xsigd = (ErtsExitSignalData *) hp;
xsigd->message = s_message;
xsigd->from = s_from;
xsigd->reason = s_reason;
+ hfrag->next = dist_ext_hfrag;
+
if (is_nil(s_ref))
xsigd->u.normal_kills = normal_kills;
else {
@@ -1073,6 +1117,15 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
xsigd->u.ref = s_ref;
}
+ hp += sizeof(ErtsExitSignalData)/sizeof(Eterm);
+
+ if (dist_ext != NULL && dist_ext_hfrag == NULL && is_non_value(reason)) {
+ erts_make_dist_ext_copy(dist_ext, (ErtsDistExternal *) hp);
+ hp += dist_ext_sz;
+ }
+
+ ASSERT(hp == mp->hfrag.mem + mp->hfrag.alloc_size);
+
if (seq_trace)
do_seq_trace_output(to, s_token, s_message);
@@ -1205,7 +1258,19 @@ erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to,
from_tag = dep->sysname;
}
send_gen_exit_signal(c_p, from_tag, from, to, ERTS_SIG_Q_OP_EXIT,
- reason, NIL, token, normal_kills);
+ reason, NULL, NULL, NIL, token, normal_kills);
+}
+
+void
+erts_proc_sig_send_dist_exit(DistEntry *dep,
+ Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
+ ErlHeapFragment *hfrag,
+ Eterm reason, Eterm token)
+{
+ send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT,
+ reason, dist_ext, hfrag, NIL, token, 0);
+
}
void
@@ -1219,7 +1284,7 @@ erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk,
if (is_not_immed(reason) || is_not_nil(token)) {
ASSERT(is_internal_pid(from) || is_internal_port(from));
send_gen_exit_signal(c_p, from, from, to, ERTS_SIG_Q_OP_EXIT_LINKED,
- reason, NIL, token, 0);
+ reason, NULL, NULL, NIL, token, 0);
}
else {
/* Pass signal using old link structure... */
@@ -1274,10 +1339,13 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk)
void
erts_proc_sig_send_dist_link_exit(DistEntry *dep,
Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
+ ErlHeapFragment *hfrag,
Eterm reason, Eterm token)
{
send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT_LINKED,
- reason, NIL, token, 0);
+ reason, dist_ext, hfrag, NIL, token, 0);
+
}
void
@@ -1299,16 +1367,18 @@ erts_proc_sig_send_dist_unlink(DistEntry *dep, Eterm from, Eterm to)
void
erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref,
Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
+ ErlHeapFragment *hfrag,
Eterm reason)
{
Eterm monitored, heap[3];
- if (is_atom(from))
+ if (is_atom(from))
monitored = TUPLE2(&heap[0], from, dep->sysname);
else
monitored = from;
send_gen_exit_signal(NULL, dep->sysname, monitored,
to, ERTS_SIG_Q_OP_MONITOR_DOWN,
- reason, ref, NIL, 0);
+ reason, dist_ext, hfrag, ref, NIL, 0);
}
void
@@ -1376,10 +1446,10 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason)
|| is_internal_pid(from_tag)
|| is_atom(from_tag));
monitored = TUPLE2(&heap[0], name, node);
- }
+ }
send_gen_exit_signal(NULL, from_tag, monitored,
to, ERTS_SIG_Q_OP_MONITOR_DOWN,
- reason, mdp->ref, NIL, 0);
+ reason, NULL, NULL, mdp->ref, NIL, 0);
}
erts_monitor_release(mon);
}
@@ -2037,7 +2107,6 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing,
if (type == ERTS_SIG_Q_TYPE_GEN_EXIT) {
xsigd = get_exit_signal_data(sig);
from = xsigd->from;
- reason = xsigd->reason;
if (op != ERTS_SIG_Q_OP_EXIT_LINKED)
ignore = 0;
else {
@@ -2062,6 +2131,18 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing,
}
}
+ /* This GEN_EXIT was received from another node, decode the exit reason */
+ if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig))
+ erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1);
+
+ reason = xsigd->reason;
+
+ if (is_non_value(reason)) {
+ /* Bad distribution message; remove it from queue... */
+ ignore = !0;
+ destroy = !0;
+ }
+
if (!ignore) {
if ((op != ERTS_SIG_Q_OP_EXIT || reason != am_kill)
@@ -2929,6 +3010,104 @@ handle_sync_suspend(Process *c_p, ErtsMessage *mp)
}
}
+int
+erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks,
+ ErtsMessage *msgp, int force_off_heap)
+{
+ ErtsHeapFactory factory;
+ ErlHeapFragment *hfrag;
+ Eterm msg;
+ Sint need;
+ ErtsDistExternal *edep;
+ ErtsExitSignalData *xsigd = NULL;
+
+ edep = erts_proc_sig_get_external(msgp);
+ if (!ERTS_SIG_IS_EXTERNAL_MSG(msgp))
+ xsigd = get_exit_signal_data(msgp);
+
+ if (edep->heap_size >= 0)
+ need = edep->heap_size;
+ else {
+ need = erts_decode_dist_ext_size(edep, 1, 1);
+ if (need < 0) {
+ /* bad signal; remove it... */
+ return 0;
+ }
+
+ edep->heap_size = need;
+ }
+
+ if (ERTS_SIG_IS_NON_MSG(msgp)) {
+ switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) {
+ case ERTS_SIG_Q_OP_EXIT:
+ case ERTS_SIG_Q_OP_EXIT_LINKED:
+ /* {'EXIT', From, Reason} */
+ need += 4;
+ break;
+ case ERTS_SIG_Q_OP_MONITOR_DOWN:
+ /* {'DOWN', Ref, process, From, Reason} */
+ need += 6; /* 5-tuple */
+ break;
+ default:
+ ERTS_INTERNAL_ERROR("Invalid exit signal op");
+ break;
+ }
+ }
+
+ hfrag = new_message_buffer(need);
+ erts_factory_heap_frag_init(&factory, hfrag);
+
+ ASSERT(edep->heap_size >= 0);
+
+ msg = erts_decode_dist_ext(&factory, edep, 1);
+
+ if (is_non_value(msg)) {
+ erts_factory_undo(&factory);
+ return 0;
+ }
+
+ if (ERTS_SIG_IS_MSG(msgp)) {
+ ERL_MESSAGE_TERM(msgp) = msg;
+ if (msgp->data.heap_frag == &msgp->hfrag)
+ msgp->data.heap_frag = ERTS_MSG_COMBINED_HFRAG;
+ } else {
+ switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) {
+ case ERTS_SIG_Q_OP_EXIT:
+ case ERTS_SIG_Q_OP_EXIT_LINKED:
+ /* {'EXIT', From, Reason} */
+ erts_reserve_heap(&factory, 4);
+ xsigd->message = TUPLE3(factory.hp, am_EXIT, xsigd->from, msg);
+ factory.hp += 4;
+ break;
+ case ERTS_SIG_Q_OP_MONITOR_DOWN:
+ /* {'DOWN', Ref, process, From, Reason} */
+ erts_reserve_heap(&factory, 6);
+ xsigd->message = TUPLE5(factory.hp, am_DOWN, xsigd->u.ref, am_process, xsigd->from, msg);
+ factory.hp += 6;
+ break;
+ }
+ xsigd->reason = msg;
+ }
+
+ erts_free_dist_ext_copy(edep);
+
+ erts_factory_close(&factory);
+
+ hfrag = factory.heap_frags;
+ while (hfrag->next)
+ hfrag = hfrag->next;
+
+ if (ERTS_SIG_IS_MSG(msgp) && msgp->data.heap_frag != ERTS_MSG_COMBINED_HFRAG) {
+ hfrag->next = msgp->data.heap_frag;
+ msgp->data.heap_frag = factory.heap_frags;
+ } else {
+ hfrag->next = msgp->hfrag.next;
+ msgp->hfrag.next = factory.heap_frags;
+ }
+
+ return 1;
+}
+
void
erts_proc_sig_handle_pending_suspend(Process *c_p)
{
@@ -3045,7 +3224,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
ASSERT(ERTS_SIG_IS_NON_MSG(sig));
tag = ((ErtsSignal *) sig)->common.tag;
-
+
switch (ERTS_PROC_SIG_OP(tag)) {
case ERTS_SIG_Q_OP_EXIT:
@@ -3091,6 +3270,12 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
break;
case ERTS_SIG_Q_TYPE_GEN_EXIT:
xsigd = get_exit_signal_data(sig);
+
+ /* This GEN_EXIT was received from another node, decode the exit reason */
+ if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig))
+ if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1))
+ break; /* Decode failed, just remove signal */
+
omon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p),
xsigd->u.ref);
if (omon) {
@@ -3247,9 +3432,15 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
erts_nif_demonitored((ErtsResource *) tmon->other.ptr);
cnt++;
break;
- case ERTS_MON_TYPE_SUSPEND:
- erts_resume(c_p, ERTS_PROC_LOCK_MAIN);
+ case ERTS_MON_TYPE_SUSPEND: {
+ ErtsMonitorSuspend *msp;
+ erts_aint_t mstate;
+ msp = (ErtsMonitorSuspend *) erts_monitor_to_data(tmon);
+ mstate = erts_atomic_read_acqb(&msp->state);
+ if (mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)
+ erts_resume(c_p, ERTS_PROC_LOCK_MAIN);
break;
+ }
default:
break;
}
@@ -3590,9 +3781,10 @@ stretch_limit(Process *c_p, ErtsSigRecvTracing *tp,
int
-erts_proc_sig_handle_exit(Process *c_p, int *redsp)
+erts_proc_sig_handle_exit(Process *c_p, Sint *redsp)
{
- int cnt, limit;
+ int cnt;
+ Sint limit;
ErtsMessage *sig, ***next_nm_sig;
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
@@ -3671,9 +3863,9 @@ erts_proc_sig_handle_exit(Process *c_p, int *redsp)
break;
case ERTS_SIG_Q_OP_MONITOR: {
- ErtsProcExitContext pectxt = {c_p, am_noproc};
+ ErtsProcExitContext pectxt = {c_p, am_noproc, NULL, NULL, NIL};
erts_proc_exit_handle_monitor((ErtsMonitor *) sig,
- (void *) &pectxt);
+ (void *) &pectxt, -1);
cnt += 4;
break;
}
@@ -3687,7 +3879,7 @@ erts_proc_sig_handle_exit(Process *c_p, int *redsp)
case ERTS_SIG_Q_OP_LINK: {
ErtsProcExitContext pectxt = {c_p, am_noproc};
- erts_proc_exit_handle_link((ErtsLink *) sig, (void *) &pectxt);
+ erts_proc_exit_handle_link((ErtsLink *) sig, (void *) &pectxt, -1);
break;
}
@@ -3784,6 +3976,9 @@ clear_seq_trace_token(ErtsMessage *sig)
case ERTS_MON_TYPE_PROC:
case ERTS_MON_TYPE_DIST_PROC:
case ERTS_MON_TYPE_NODE:
+ case ERTS_MON_TYPE_NODES:
+ case ERTS_MON_TYPE_SUSPEND:
+ case ERTS_MON_TYPE_TIME_OFFSET:
break;
default:
ERTS_INTERNAL_ERROR("Unexpected sig type");
@@ -3800,6 +3995,11 @@ clear_seq_trace_token(ErtsMessage *sig)
case ERTS_SIG_Q_OP_LINK:
case ERTS_SIG_Q_OP_UNLINK:
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
+ case ERTS_SIG_Q_OP_GROUP_LEADER:
+ case ERTS_SIG_Q_OP_IS_ALIVE:
+ case ERTS_SIG_Q_OP_PROCESS_INFO:
+ case ERTS_SIG_Q_OP_SYNC_SUSPEND:
+ case ERTS_SIG_Q_OP_RPC:
break;
default:
@@ -3812,7 +4012,6 @@ clear_seq_trace_token(ErtsMessage *sig)
void
erts_proc_sig_clear_seq_trace_tokens(Process *c_p)
{
- ASSERT(erts_thr_progress_is_blocking());
erts_proc_sig_fetch(c_p);
ERTS_FOREACH_SIG_PRIVQS(c_p, sig, clear_seq_trace_token(sig));
}
@@ -3852,6 +4051,7 @@ erts_proc_sig_signal_size(ErtsSignal *sig)
case ERTS_MON_TYPE_DIST_PROC:
case ERTS_MON_TYPE_NODE:
size = erts_monitor_size((ErtsMonitor *) sig);
+ break;
default:
ERTS_INTERNAL_ERROR("Unexpected sig type");
break;
@@ -4189,11 +4389,13 @@ handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing,
return -1; /* Yield... */
}
if (ERTS_SIG_IS_EXTERNAL_MSG(sig)) {
- cnt++;
- if (!erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN,
- sig, 0)) {
+ cnt += 50; /* Decode is expensive... */
+ if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN,
+ sig, 0)) {
/* Bad dist message; remove it... */
remove_mq_m_sig(c_p, sig, next_sig, next_nm_sig);
+ sig->next = NULL;
+ erts_cleanup_messages(sig);
sig = *next_sig;
continue;
}
@@ -4265,18 +4467,12 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) {
/* decode it... */
- if (mp->data.attached)
- erts_decode_dist_message(rp, rp_locks, mp, !0);
-
- msg = ERL_MESSAGE_TERM(mp);
-
- if (is_non_value(msg)) {
+ if (!erts_proc_sig_decode_dist(rp, rp_locks, mp, !0)) {
ErtsMessage *bad_mp = mp;
/*
* Bad distribution message; remove
* it from the queue...
*/
- ASSERT(!mp->data.attached);
ASSERT(*mpp == bad_mp);
@@ -4288,6 +4484,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
erts_cleanup_messages(bad_mp);
continue;
}
+
+ msg = ERL_MESSAGE_TERM(mp);
}
ASSERT(is_value(msg));
@@ -4446,12 +4644,21 @@ debug_foreach_sig_fake_oh(Eterm term,
}
+static void
+debug_foreach_sig_external(ErtsMessage *msgp,
+ void (*ext_func)(ErtsDistExternal *, void *),
+ void *arg)
+{
+ ext_func(erts_proc_sig_get_external(msgp), arg);
+}
+
void
erts_proc_sig_debug_foreach_sig(Process *c_p,
void (*msg_func)(ErtsMessage *, void *),
void (*oh_func)(ErlOffHeap *, void *),
- void (*mon_func)(ErtsMonitor *, void *),
- void (*lnk_func)(ErtsLink *, void *),
+ ErtsMonitorFunc mon_func,
+ ErtsLinkFunc lnk_func,
+ void (*ext_func)(ErtsDistExternal *, void *),
void *arg)
{
ErtsMessage *queue[] = {c_p->sig_qs.first, c_p->sig_qs.cont, c_p->sig_inq.first};
@@ -4460,10 +4667,10 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
for (qix = 0; qix < sizeof(queue)/sizeof(queue[0]); qix++) {
ErtsMessage *sig;
for (sig = queue[qix]; sig; sig = sig->next) {
-
- if (ERTS_SIG_IS_MSG(sig))
+
+ if (ERTS_SIG_IS_MSG(sig)) {
msg_func(sig, arg);
- else {
+ } else {
Eterm tag;
Uint16 type;
int op;
@@ -4482,18 +4689,23 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
case ERTS_SIG_Q_OP_MONITOR_DOWN:
switch (type) {
case ERTS_SIG_Q_TYPE_GEN_EXIT:
- debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg);
+ if (!ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig))
+ debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg);
+ else {
+ oh_func(&sig->hfrag.off_heap, arg);
+ debug_foreach_sig_external(sig, ext_func, arg);
+ }
break;
case ERTS_LNK_TYPE_PORT:
case ERTS_LNK_TYPE_PROC:
case ERTS_LNK_TYPE_DIST_PROC:
- lnk_func((ErtsLink *) sig, arg);
+ lnk_func((ErtsLink *) sig, arg, -1);
break;
case ERTS_MON_TYPE_PORT:
case ERTS_MON_TYPE_PROC:
case ERTS_MON_TYPE_DIST_PROC:
case ERTS_MON_TYPE_NODE:
- mon_func((ErtsMonitor *) sig, arg);
+ mon_func((ErtsMonitor *) sig, arg, -1);
break;
default:
ERTS_INTERNAL_ERROR("Unexpected sig type");
@@ -4514,7 +4726,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
/* Fall through... */
case ERTS_SIG_Q_OP_MONITOR:
- mon_func((ErtsMonitor *) sig, arg);
+ mon_func((ErtsMonitor *) sig, arg, -1);
break;
case ERTS_SIG_Q_OP_UNLINK:
@@ -4526,7 +4738,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
/* Fall through... */
case ERTS_SIG_Q_OP_LINK:
- lnk_func((ErtsLink *) sig, arg);
+ lnk_func((ErtsLink *) sig, arg, -1);
break;
case ERTS_SIG_Q_OP_GROUP_LEADER: {