aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_proc_sig_queue.c
diff options
context:
space:
mode:
authorLukas Larsson <[email protected]>2018-09-26 11:55:01 +0200
committerLukas Larsson <[email protected]>2019-02-22 11:12:53 +0100
commitf2c4f6f83deecba0c2527e520f0f18fba7d84815 (patch)
tree5009cc4300cdca28dc0f76daaa541051f8d44a01 /erts/emulator/beam/erl_proc_sig_queue.c
parent6686877360432144bacbf4e95c23b1232eab1b08 (diff)
downloadotp-f2c4f6f83deecba0c2527e520f0f18fba7d84815.tar.gz
otp-f2c4f6f83deecba0c2527e520f0f18fba7d84815.tar.bz2
otp-f2c4f6f83deecba0c2527e520f0f18fba7d84815.zip
erts: Implement fragmentation of distrubution messages
Diffstat (limited to 'erts/emulator/beam/erl_proc_sig_queue.c')
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c195
1 files changed, 100 insertions, 95 deletions
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 1bd219b6d7..8113abebde 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -36,6 +36,7 @@
#include "erl_port_task.h"
#include "erl_trace.h"
#include "beam_bp.h"
+#include "erl_binary.h"
#include "big.h"
#include "erl_gc.h"
#include "bif.h"
@@ -80,7 +81,10 @@
#define ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO \
ERTS_SIG_Q_TYPE_MAX
-#define ERTS_SIG_IS_EXTERNAL(sig) is_non_value(get_exit_signal_data(sig)->reason)
+#define ERTS_SIG_IS_GEN_EXIT(sig) \
+ (ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT)
+#define ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig) \
+ (ASSERT(ERTS_SIG_IS_GEN_EXIT(sig)),is_non_value(get_exit_signal_data(sig)->reason))
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler);
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_high);
@@ -114,8 +118,6 @@ typedef struct {
Eterm message;
Eterm from;
Eterm reason;
- struct erl_dist_external *dist_ext;
- ErlHeapFragment *heap_frag;
union {
Eterm ref;
int normal_kills;
@@ -945,13 +947,15 @@ erts_proc_sig_get_external(ErtsMessage *msgp)
{
if (ERTS_SIG_IS_EXTERNAL_MSG(msgp)) {
return erts_get_dist_ext(msgp->data.heap_frag);
- } else if (ERTS_SIG_IS_NON_MSG(msgp) && ERTS_SIG_IS_EXTERNAL(msgp)) {
+ } else if (ERTS_SIG_IS_NON_MSG(msgp) &&
+ ERTS_SIG_IS_GEN_EXIT(msgp) &&
+ ERTS_SIG_IS_GEN_EXIT_EXTERNAL(msgp)) {
ErtsDistExternal *edep;
ErtsExitSignalData *xsigd = get_exit_signal_data(msgp);
ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT);
ASSERT(is_non_value(xsigd->reason));
if (msgp->hfrag.next == NULL)
- edep = (ErtsDistExternal*)((void*)xsigd) + sizeof(ErtsExitSignalData);
+ edep = (ErtsDistExternal*)(xsigd + 1);
else
edep = erts_get_dist_ext(msgp->hfrag.next);
return edep;
@@ -965,6 +969,7 @@ static void
send_gen_exit_signal(Process *c_p, Eterm from_tag,
Eterm from, Eterm to,
Sint16 op, Eterm reason, ErtsDistExternal *dist_ext,
+ ErlHeapFragment *dist_ext_hfrag,
Eterm ref, Eterm token, int normal_kills)
{
ErtsExitSignalData *xsigd;
@@ -972,7 +977,7 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
ErtsMessage *mp;
ErlHeapFragment *hfrag;
ErlOffHeap *ohp;
- Uint hsz, from_sz, reason_sz, ref_sz, token_sz;
+ Uint hsz, from_sz, reason_sz, ref_sz, token_sz, dist_ext_sz;
int seq_trace;
#ifdef USE_VM_PROBES
Eterm s_utag, utag;
@@ -984,7 +989,7 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
ASSERT(is_immed(from_tag));
- hsz = sizeof(ErtsExitSignalData)/sizeof(Uint);
+ hsz = sizeof(ErtsExitSignalData)/sizeof(Eterm);
seq_trace = c_p && have_seqtrace(token);
if (seq_trace)
@@ -1012,6 +1017,8 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
ref_sz = size_object(ref);
hsz += ref_sz;
+ /* The reason was part of the control message,
+ just use copy it into the xsigd */
if (is_value(reason)) {
reason_sz = size_object(reason);
hsz += reason_sz;
@@ -1032,6 +1039,11 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
ERTS_INTERNAL_ERROR("Invalid exit signal op");
break;
}
+ } else if (dist_ext != NULL && dist_ext_hfrag == NULL) {
+ /* The message was not fragmented so we need to create space
+ for a single dist_ext element */
+ dist_ext_sz = erts_dist_ext_size(dist_ext) / sizeof(Eterm);
+ hsz += dist_ext_sz;
}
/*
@@ -1087,13 +1099,12 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
hfrag->used_size = hp - start_hp;
- xsigd = (ErtsExitSignalData *) (char *) hp;
+ xsigd = (ErtsExitSignalData *) hp;
xsigd->message = s_message;
xsigd->from = s_from;
xsigd->reason = s_reason;
- xsigd->dist_ext = dist_ext;
- xsigd->heap_frag = NULL;
+ hfrag->next = dist_ext_hfrag;
if (is_nil(s_ref))
xsigd->u.normal_kills = normal_kills;
@@ -1102,6 +1113,15 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
xsigd->u.ref = s_ref;
}
+ hp += sizeof(ErtsExitSignalData)/sizeof(Eterm);
+
+ if (dist_ext != NULL && dist_ext_hfrag == NULL && is_non_value(reason)) {
+ erts_make_dist_ext_copy(dist_ext, (ErtsDistExternal *) hp);
+ hp += dist_ext_sz;
+ }
+
+ ASSERT(hp == mp->hfrag.mem + mp->hfrag.alloc_size);
+
if (seq_trace)
do_seq_trace_output(to, s_token, s_message);
@@ -1234,17 +1254,18 @@ erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to,
from_tag = dep->sysname;
}
send_gen_exit_signal(c_p, from_tag, from, to, ERTS_SIG_Q_OP_EXIT,
- reason, NULL, NIL, token, normal_kills);
+ reason, NULL, NULL, NIL, token, normal_kills);
}
void
erts_proc_sig_send_dist_exit(DistEntry *dep,
Eterm from, Eterm to,
ErtsDistExternal *dist_ext,
+ ErlHeapFragment *hfrag,
Eterm reason, Eterm token)
{
send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT,
- reason, dist_ext, NIL, token, 0);
+ reason, dist_ext, hfrag, NIL, token, 0);
}
@@ -1259,7 +1280,7 @@ erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk,
if (is_not_immed(reason) || is_not_nil(token)) {
ASSERT(is_internal_pid(from) || is_internal_port(from));
send_gen_exit_signal(c_p, from, from, to, ERTS_SIG_Q_OP_EXIT_LINKED,
- reason, NULL, NIL, token, 0);
+ reason, NULL, NULL, NIL, token, 0);
}
else {
/* Pass signal using old link structure... */
@@ -1315,10 +1336,11 @@ void
erts_proc_sig_send_dist_link_exit(DistEntry *dep,
Eterm from, Eterm to,
ErtsDistExternal *dist_ext,
+ ErlHeapFragment *hfrag,
Eterm reason, Eterm token)
{
send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT_LINKED,
- reason, dist_ext, NIL, token, 0);
+ reason, dist_ext, hfrag, NIL, token, 0);
}
@@ -1342,6 +1364,7 @@ void
erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref,
Eterm from, Eterm to,
ErtsDistExternal *dist_ext,
+ ErlHeapFragment *hfrag,
Eterm reason)
{
Eterm monitored, heap[3];
@@ -1351,7 +1374,7 @@ erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref,
monitored = from;
send_gen_exit_signal(NULL, dep->sysname, monitored,
to, ERTS_SIG_Q_OP_MONITOR_DOWN,
- reason, dist_ext, ref, NIL, 0);
+ reason, dist_ext, hfrag, ref, NIL, 0);
}
void
@@ -1422,7 +1445,7 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason)
}
send_gen_exit_signal(NULL, from_tag, monitored,
to, ERTS_SIG_Q_OP_MONITOR_DOWN,
- reason, NULL, mdp->ref, NIL, 0);
+ reason, NULL, NULL, mdp->ref, NIL, 0);
}
erts_monitor_release(mon);
}
@@ -2105,9 +2128,9 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing,
}
/* This GEN_EXIT was received from another node, decode the exit reason */
- if (ERTS_SIG_IS_EXTERNAL(sig))
+ if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig))
erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1);
-
+
reason = xsigd->reason;
if (is_non_value(reason)) {
@@ -2115,17 +2138,14 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing,
ignore = !0;
destroy = !0;
}
-
+
if (!ignore) {
if ((op != ERTS_SIG_Q_OP_EXIT || reason != am_kill)
&& (c_p->flags & F_TRAP_EXIT)) {
convert_prepared_sig_to_msg(c_p, sig,
xsigd->message, next_nm_sig);
- ASSERT(sig->hfrag.next == NULL);
- sig->hfrag.next = xsigd->heap_frag;
conv_msg = sig;
-
}
else if (reason == am_normal && !xsigd->u.normal_kills) {
/* Ignore it... */
@@ -2991,50 +3011,29 @@ erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks,
ErtsMessage *msgp, int force_off_heap)
{
ErtsHeapFactory factory;
+ ErlHeapFragment *hfrag;
Eterm msg;
- ErlHeapFragment *bp;
Sint need;
- int decode_in_heap_frag;
- ErtsDistExternal *dist_ext;
+ ErtsDistExternal *edep;
ErtsExitSignalData *xsigd = NULL;
- decode_in_heap_frag = (force_off_heap
- || !(proc_locks & ERTS_PROC_LOCK_MAIN)
- || (proc->flags & F_OFF_HEAP_MSGQ));
-
- if (ERTS_SIG_IS_EXTERNAL_MSG(msgp))
- dist_ext = msgp->data.dist_ext;
- else {
+ edep = erts_proc_sig_get_external(msgp);
+ if (!ERTS_SIG_IS_EXTERNAL_MSG(msgp))
xsigd = get_exit_signal_data(msgp);
- ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT);
- ASSERT(is_non_value(xsigd->reason));
- dist_ext = xsigd->dist_ext;
- }
- if (dist_ext->heap_size >= 0)
- need = dist_ext->heap_size;
+ if (edep->heap_size >= 0)
+ need = edep->heap_size;
else {
- need = erts_decode_dist_ext_size(dist_ext);
+ need = erts_decode_dist_ext_size(edep, 1);
if (need < 0) {
- /* bad msg; remove it... */
- if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
- bp = erts_dist_ext_trailer(dist_ext);
- erts_cleanup_offheap(&bp->off_heap);
- }
- erts_free_dist_ext_copy(dist_ext);
- dist_ext = NULL;
+ /* bad signal; remove it... */
return 0;
}
- dist_ext->heap_size = need;
+ edep->heap_size = need;
}
- if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
- bp = erts_dist_ext_trailer(dist_ext);
- need += bp->used_size;
- }
-
- if (xsigd) {
+ if (ERTS_SIG_IS_NON_MSG(msgp)) {
switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) {
case ERTS_SIG_Q_OP_EXIT:
case ERTS_SIG_Q_OP_EXIT_LINKED:
@@ -3051,26 +3050,22 @@ erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks,
}
}
- if (decode_in_heap_frag)
- erts_factory_heap_frag_init(&factory, new_message_buffer(need));
- else
- erts_factory_proc_prealloc_init(&factory, proc, need);
-
- ASSERT(dist_ext->heap_size >= 0);
- if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
- ErlHeapFragment *heap_frag;
- heap_frag = erts_dist_ext_trailer(dist_ext);
- ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp),
- heap_frag->used_size,
- &factory.hp,
- factory.off_heap);
- erts_cleanup_offheap(&heap_frag->off_heap);
+ hfrag = new_message_buffer(need);
+ erts_factory_heap_frag_init(&factory, hfrag);
+
+ ASSERT(edep->heap_size >= 0);
+
+ msg = erts_decode_dist_ext(&factory, edep, 1);
+
+ if (is_non_value(msg)) {
+ erts_factory_undo(&factory);
+ return 0;
}
- msg = erts_decode_dist_ext(&factory, dist_ext);
- if (!xsigd) {
+ if (ERTS_SIG_IS_MSG(msgp)) {
ERL_MESSAGE_TERM(msgp) = msg;
- msgp->data.attached = NULL;
+ if (msgp->data.heap_frag == &msgp->hfrag)
+ msgp->data.heap_frag = ERTS_MSG_COMBINED_HFRAG;
} else {
switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) {
case ERTS_SIG_Q_OP_EXIT:
@@ -3089,22 +3084,21 @@ erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks,
}
xsigd->reason = msg;
}
- erts_free_dist_ext_copy(dist_ext);
- if (is_non_value(msg)) {
- erts_factory_undo(&factory);
- return 0;
- }
+ erts_free_dist_ext_copy(edep);
erts_factory_close(&factory);
- ASSERT(!msgp->data.heap_frag || xsigd);
+ hfrag = factory.heap_frags;
+ while (hfrag->next)
+ hfrag = hfrag->next;
- if (decode_in_heap_frag) {
- if (!xsigd)
- msgp->data.heap_frag = factory.heap_frags;
- else
- xsigd->heap_frag = factory.heap_frags;
+ if (ERTS_SIG_IS_MSG(msgp) && msgp->data.heap_frag != ERTS_MSG_COMBINED_HFRAG) {
+ hfrag->next = msgp->data.heap_frag;
+ msgp->data.heap_frag = factory.heap_frags;
+ } else {
+ hfrag->next = msgp->hfrag.next;
+ msgp->hfrag.next = factory.heap_frags;
}
return 1;
@@ -3274,8 +3268,9 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
xsigd = get_exit_signal_data(sig);
/* This GEN_EXIT was received from another node, decode the exit reason */
- if (ERTS_SIG_IS_EXTERNAL(sig))
- erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1);
+ if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig))
+ if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1))
+ break; /* Decode failed, just remove signal */
omon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p),
xsigd->u.ref);
@@ -4375,11 +4370,13 @@ handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing,
return -1; /* Yield... */
}
if (ERTS_SIG_IS_EXTERNAL_MSG(sig)) {
- cnt++;
+ cnt += 50; /* Decode is expensive... */
if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN,
sig, 0)) {
/* Bad dist message; remove it... */
remove_mq_m_sig(c_p, sig, next_sig, next_nm_sig);
+ sig->next = NULL;
+ erts_cleanup_messages(sig);
sig = *next_sig;
continue;
}
@@ -4451,18 +4448,12 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) {
/* decode it... */
- if (mp->data.attached)
- erts_proc_sig_decode_dist(rp, rp_locks, mp, !0);
-
- msg = ERL_MESSAGE_TERM(mp);
-
- if (is_non_value(msg)) {
+ if (!erts_proc_sig_decode_dist(rp, rp_locks, mp, !0)) {
ErtsMessage *bad_mp = mp;
/*
* Bad distribution message; remove
* it from the queue...
*/
- ASSERT(!mp->data.attached);
ASSERT(*mpp == bad_mp);
@@ -4474,6 +4465,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
erts_cleanup_messages(bad_mp);
continue;
}
+
+ msg = ERL_MESSAGE_TERM(mp);
}
ASSERT(is_value(msg));
@@ -4632,12 +4625,21 @@ debug_foreach_sig_fake_oh(Eterm term,
}
+static void
+debug_foreach_sig_external(ErtsMessage *msgp,
+ void (*ext_func)(ErtsDistExternal *, void *),
+ void *arg)
+{
+ ext_func(erts_proc_sig_get_external(msgp), arg);
+}
+
void
erts_proc_sig_debug_foreach_sig(Process *c_p,
void (*msg_func)(ErtsMessage *, void *),
void (*oh_func)(ErlOffHeap *, void *),
ErtsMonitorFunc mon_func,
ErtsLinkFunc lnk_func,
+ void (*ext_func)(ErtsDistExternal *, void *),
void *arg)
{
ErtsMessage *queue[] = {c_p->sig_qs.first, c_p->sig_qs.cont, c_p->sig_inq.first};
@@ -4646,10 +4648,10 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
for (qix = 0; qix < sizeof(queue)/sizeof(queue[0]); qix++) {
ErtsMessage *sig;
for (sig = queue[qix]; sig; sig = sig->next) {
-
- if (ERTS_SIG_IS_MSG(sig))
+
+ if (ERTS_SIG_IS_MSG(sig)) {
msg_func(sig, arg);
- else {
+ } else {
Eterm tag;
Uint16 type;
int op;
@@ -4668,7 +4670,10 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
case ERTS_SIG_Q_OP_MONITOR_DOWN:
switch (type) {
case ERTS_SIG_Q_TYPE_GEN_EXIT:
- debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg);
+ if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig))
+ debug_foreach_sig_external(sig, ext_func, arg);
+ else
+ debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg);
break;
case ERTS_LNK_TYPE_PORT:
case ERTS_LNK_TYPE_PROC: