aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator
diff options
context:
space:
mode:
authorLukas Larsson <lukas@erlang.org>2018-09-19 16:37:55 +0200
committerLukas Larsson <lukas@erlang.org>2019-02-21 16:38:04 +0100
commit9a7df41a9e5361049a17c8aa971b52599fba2632 (patch)
tree9084706715fd064a1011753e2e72fe49e05875ba /erts/emulator
parent0184c2e0438ac42c44d14e92b13e2807e0a670b3 (diff)
downloadotp-9a7df41a9e5361049a17c8aa971b52599fba2632.tar.gz
otp-9a7df41a9e5361049a17c8aa971b52599fba2632.tar.bz2
otp-9a7df41a9e5361049a17c8aa971b52599fba2632.zip
erts: Move reason in dist messages to payload
The dist messages EXIT, EXIT2 and MONITOR_DOWN have been updated with new versions that send the reason term as part of the payload of the message instead of as part of the control message. This allows the decode of the reason to be done by the receiving process instead of the dist entry which in turn makes it possible for multiple decodes to be done in parallel. This change is done in order to make it easier to fragment the potentially large payload of EXIT, EXIT2 and MONITOR_DOWN into multiple distribution messages. OTP-15611
Diffstat (limited to 'erts/emulator')
-rw-r--r--erts/emulator/beam/dist.c284
-rw-r--r--erts/emulator/beam/dist.h52
-rw-r--r--erts/emulator/beam/erl_message.c124
-rw-r--r--erts/emulator/beam/erl_message.h2
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c312
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.h69
-rw-r--r--erts/emulator/beam/external.c22
-rw-r--r--erts/emulator/beam/external.h2
-rw-r--r--erts/emulator/beam/msg_instrs.tab4
-rw-r--r--erts/emulator/hipe/hipe_native_bif.c2
10 files changed, 593 insertions, 280 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 3b6c0e5db9..91a9481d84 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -50,7 +50,8 @@
#define DIST_CTL_DEFAULT_SIZE 64
/* Turn this on to get printouts of all distribution messages
- * which go on the line
+ * which go on the line. Enabling this may make some testcases
+ * fail. Especially the broken dist testcases in distribution_SUITE.
*/
#if 0
#define ERTS_DIST_MSG_DBG
@@ -71,8 +72,6 @@ static void
dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
{
ErtsHeapFactory factory;
- DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
- Eterm* ctl = ctl_default;
byte *extp = edep->extp;
Eterm msg;
Sint ctl_len;
@@ -116,6 +115,7 @@ static Export *dist_ctrl_put_data_trap;
/* forward declarations */
+static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, int force_busy);
static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy);
static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm);
static void init_nodes_monitors(void);
@@ -848,7 +848,7 @@ int
erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
Eterm ref, Eterm reason)
{
- Eterm ctl;
+ Eterm ctl, msg;
DeclareTmpHeapNoproc(ctl_heap,6);
int res;
@@ -861,10 +861,17 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
UseTmpHeapNoproc(6);
- ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT),
- watched, watcher, ref, reason);
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE4(&ctl_heap[0], make_small(DOP_PAYLOAD_MONITOR_P_EXIT),
+ watched, watcher, ref);
+ msg = reason;
+ } else {
+ ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT),
+ watched, watcher, ref, reason);
+ msg = THE_NON_VALUE;
+ }
- res = dsig_send_ctl(dsdp, ctl, 1);
+ res = dsig_send_exit(dsdp, ctl, msg, 1);
UnUseTmpHeapNoproc(6);
return res;
}
@@ -1018,6 +1025,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
ctx->dss.ctl = ctl;
ctx->dss.msg = message;
ctx->dss.force_busy = 0;
+ ctx->dss.force_encode = 0;
res = erts_dsig_send(&ctx->dsd, &ctx->dss);
return res;
}
@@ -1077,6 +1085,7 @@ erts_dsig_send_reg_msg(Eterm remote_name, Eterm message,
ctx->dss.ctl = ctl;
ctx->dss.msg = message;
ctx->dss.force_busy = 0;
+ ctx->dss.force_encode = 0;
res = erts_dsig_send(&ctx->dsd, &ctx->dss);
return res;
}
@@ -1086,7 +1095,7 @@ int
erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
Eterm reason, Eterm token)
{
- Eterm ctl;
+ Eterm ctl, msg = THE_NON_VALUE;
DeclareTmpHeapNoproc(ctl_heap,6);
int res;
#ifdef USE_VM_PROBES
@@ -1101,13 +1110,24 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
#endif
UseTmpHeapNoproc(6);
+
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD)
+ msg = reason;
+
if (have_seqtrace(token)) {
seq_trace_update_send(dsdp->proc);
seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local);
- ctl = TUPLE5(&ctl_heap[0],
- make_small(DOP_EXIT_TT), local, remote, token, reason);
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE4(&ctl_heap[0],
+ make_small(DOP_PAYLOAD_EXIT_TT), local, remote, token);
+ } else
+ ctl = TUPLE5(&ctl_heap[0],
+ make_small(DOP_EXIT_TT), local, remote, token, reason);
} else {
- ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD)
+ ctl = TUPLE3(&ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote);
+ else
+ ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
}
#ifdef USE_VM_PROBES
*node_name = *sender_name = *remote_name = '\0';
@@ -1129,8 +1149,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
#endif
DTRACE7(process_exit_signal_remote, sender_name, node_name,
remote_name, reason_str, tok_label, tok_lastcnt, tok_serial);
- /* forced, i.e ignore busy */
- res = dsig_send_ctl(dsdp, ctl, 1);
+ res = dsig_send_exit(dsdp, ctl, msg, 1);
UnUseTmpHeapNoproc(6);
return res;
}
@@ -1140,13 +1159,18 @@ erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason)
{
DeclareTmpHeapNoproc(ctl_heap,5);
int res;
- Eterm ctl;
+ Eterm ctl, msg = dsdp->dep->flags & DFLAG_EXIT_PAYLOAD ? reason : THE_NON_VALUE;
UseTmpHeapNoproc(5);
- ctl = TUPLE4(&ctl_heap[0],
- make_small(DOP_EXIT), local, remote, reason);
- /* forced, i.e ignore busy */
- res = dsig_send_ctl(dsdp, ctl, 1);
+
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE3(&ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote);
+ msg = reason;
+ } else {
+ ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
+ msg = THE_NON_VALUE;
+ }
+ res = dsig_send_exit(dsdp, ctl, msg, 1);
UnUseTmpHeapNoproc(5);
return res;
}
@@ -1156,13 +1180,20 @@ erts_dsig_send_exit2(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason
{
DeclareTmpHeapNoproc(ctl_heap,5);
int res;
- Eterm ctl;
+ Eterm ctl, msg;
UseTmpHeapNoproc(5);
- ctl = TUPLE4(&ctl_heap[0],
- make_small(DOP_EXIT2), local, remote, reason);
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE3(&ctl_heap[0],
+ make_small(DOP_PAYLOAD_EXIT2), local, remote);
+ msg = reason;
+ } else {
+ ctl = TUPLE4(&ctl_heap[0],
+ make_small(DOP_EXIT2), local, remote, reason);
+ msg = THE_NON_VALUE;
+ }
- res = dsig_send_ctl(dsdp, ctl, 0);
+ res = dsig_send_exit(dsdp, ctl, msg, 0);
UnUseTmpHeapNoproc(5);
return res;
}
@@ -1227,10 +1258,8 @@ int erts_net_message(Port *prt,
DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
Eterm* ctl = ctl_default;
ErtsHeapFactory factory;
- Eterm* hp;
Sint type;
Eterm token;
- Eterm token_size;
Uint tuple_arity;
int res;
#ifdef ERTS_DIST_MSG_DBG
@@ -1290,7 +1319,6 @@ int erts_net_message(Port *prt,
if (ctl_len > DIST_CTL_DEFAULT_SIZE) {
ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm));
}
- hp = ctl;
erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF);
arg = erts_decode_dist_ext(&factory, &ede);
@@ -1313,7 +1341,6 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- token_size = 0;
token = NIL;
switch (type = unsigned_val(tuple[1])) {
@@ -1508,7 +1535,6 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- token_size = size_object(tuple[5]);
/* Fall through ... */
case DOP_REG_SEND:
/* {DOP_REG_SEND, From, Cookie, ToName} -- Message */
@@ -1533,35 +1559,25 @@ int erts_net_message(Port *prt,
}
rp = erts_whereis_process(NULL, 0, to, 0, 0);
if (rp) {
- Uint xsize = (type == DOP_REG_SEND
- ? 0
- : ERTS_HEAP_FRAG_SIZE(token_size));
ErtsProcLocks locks = 0;
ErtsDistExternal *ede_copy;
- ede_copy = erts_make_dist_ext_copy(&ede, xsize);
if (type == DOP_REG_SEND) {
token = NIL;
} else {
- ErlHeapFragment *heap_frag;
- ErlOffHeap *ohp;
- ASSERT(xsize);
- heap_frag = erts_dist_ext_trailer(ede_copy);
- ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
- hp = heap_frag->mem;
- ohp = &heap_frag->off_heap;
token = tuple[5];
- token = copy_struct(token, token_size, &hp, ohp);
}
- erts_queue_dist_message(rp, locks, ede_copy, token, from);
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
+
+ erts_queue_dist_message(rp, locks, ede_copy, token, from);
+
if (locks)
erts_proc_unlock(rp, locks);
}
break;
case DOP_SEND_SENDER_TT: {
- Uint xsize;
case DOP_SEND_TT:
if (tuple_arity != 4) {
@@ -1569,15 +1585,12 @@ int erts_net_message(Port *prt,
}
token = tuple[4];
- token_size = size_object(token);
- xsize = ERTS_HEAP_FRAG_SIZE(token_size);
goto send_common;
case DOP_SEND_SENDER:
case DOP_SEND:
token = NIL;
- xsize = 0;
if (tuple_arity != 3)
goto invalid_message;
@@ -1604,17 +1617,7 @@ int erts_net_message(Port *prt,
ErtsProcLocks locks = 0;
ErtsDistExternal *ede_copy;
- ede_copy = erts_make_dist_ext_copy(&ede, xsize);
- if (is_not_nil(token)) {
- ErlHeapFragment *heap_frag;
- ErlOffHeap *ohp;
- ASSERT(xsize);
- heap_frag = erts_dist_ext_trailer(ede_copy);
- ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
- hp = heap_frag->mem;
- ohp = &heap_frag->off_heap;
- token = copy_struct(token, token_size, &hp, ohp);
- }
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
erts_queue_dist_message(rp, locks, ede_copy, token, am_Empty);
if (locks)
@@ -1623,18 +1626,28 @@ int erts_net_message(Port *prt,
break;
}
+ case DOP_PAYLOAD_MONITOR_P_EXIT:
case DOP_MONITOR_P_EXIT: {
+ ErtsDistExternal *ede_copy = NULL;
+
/* We are monitoring a process on the remote node which dies, we get
{DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */
-
- if (tuple_arity != 5) {
- goto invalid_message;
- }
- watched = tuple[2]; /* remote proc or name which died */
- watcher = tuple[3];
+ watched = tuple[2]; /* remote proc or name which died */
+ watcher = tuple[3];
ref = tuple[4];
- reason = tuple[5];
+
+ if (type == DOP_PAYLOAD_MONITOR_P_EXIT) {
+ if (tuple_arity != 4) {
+ goto invalid_message;
+ }
+ reason = THE_NON_VALUE;
+ } else {
+ if (tuple_arity != 5) {
+ goto invalid_message;
+ }
+ reason = tuple[5];
+ }
if (is_not_ref(ref))
goto invalid_message;
@@ -1650,70 +1663,134 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
+ if (reason == THE_NON_VALUE) {
+
+#ifdef ERTS_DIST_MSG_DBG
+ dist_msg_dbg(&ede, "MSG", buf, orig_len);
+#endif
+
+ if (!erts_proc_lookup(watcher)) break; /* Process not alive */
+
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
+ }
+
erts_proc_sig_send_dist_monitor_down(dep, ref, watched,
- watcher, reason);
+ watcher, ede_copy, reason);
break;
}
+ case DOP_PAYLOAD_EXIT:
+ case DOP_PAYLOAD_EXIT_TT:
case DOP_EXIT_TT:
case DOP_EXIT: {
+ ErtsDistExternal *ede_copy = NULL;
+
/* 'from', which 'to' is linked to, died */
+ from = tuple[2];
+ to = tuple[3];
+
if (type == DOP_EXIT) {
if (tuple_arity != 4) {
goto invalid_message;
}
-
- from = tuple[2];
- to = tuple[3];
- reason = tuple[4];
token = NIL;
- } else {
+ reason = tuple[4];
+ } else if (type == DOP_EXIT_TT){
if (tuple_arity != 5) {
goto invalid_message;
}
- from = tuple[2];
- to = tuple[3];
token = tuple[4];
reason = tuple[5];
- }
+ } else if (type == DOP_PAYLOAD_EXIT) {
+ if (tuple_arity != 3) {
+ goto invalid_message;
+ }
+ token = NIL;
+ reason = THE_NON_VALUE;
+ } else {
+ if (tuple_arity != 4) {
+ goto invalid_message;
+ }
+ token = tuple[4];
+ reason = THE_NON_VALUE;
+ }
if (is_not_external_pid(from)
|| dep != external_pid_dist_entry(from)
|| is_not_internal_pid(to)) {
goto invalid_message;
}
+ if (reason == THE_NON_VALUE) {
+
+#ifdef ERTS_DIST_MSG_DBG
+ dist_msg_dbg(&ede, "MSG", buf, orig_len);
+#endif
+
+ if (!erts_proc_lookup(to)) break; /* Process not alive */
+
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
+ }
+
erts_proc_sig_send_dist_link_exit(dep,
- from, to,
+ from, to, ede_copy,
reason, token);
break;
}
+ case DOP_PAYLOAD_EXIT2_TT:
+ case DOP_PAYLOAD_EXIT2:
case DOP_EXIT2_TT:
- case DOP_EXIT2:
+ case DOP_EXIT2: {
+ ErtsDistExternal *ede_copy = NULL;
+
/* 'from' is send an exit signal to 'to' */
+ from = tuple[2];
+ to = tuple[3];
+
if (type == DOP_EXIT2) {
if (tuple_arity != 4) {
goto invalid_message;
}
- from = tuple[2];
- to = tuple[3];
reason = tuple[4];
token = NIL;
- } else {
+ } else if (type == DOP_EXIT2_TT) {
if (tuple_arity != 5) {
goto invalid_message;
}
- from = tuple[2];
- to = tuple[3];
token = tuple[4];
reason = tuple[5];
- }
- if (is_not_pid(from) || is_not_internal_pid(to)) {
+ } else if (type == DOP_PAYLOAD_EXIT2) {
+ if (tuple_arity != 3) {
+ goto invalid_message;
+ }
+ reason = THE_NON_VALUE;
+ token = NIL;
+ } else {
+ if (tuple_arity != 4) {
+ goto invalid_message;
+ }
+ reason = THE_NON_VALUE;
+ token = tuple[4];
+ }
+ if (is_not_pid(from)
+ || dep != external_pid_dist_entry(from)
+ || is_not_internal_pid(to)) {
goto invalid_message;
}
- erts_proc_sig_send_exit(NULL, from, to, reason, token, 0);
- break;
+ if (reason == THE_NON_VALUE) {
+
+#ifdef ERTS_DIST_MSG_DBG
+ dist_msg_dbg(&ede, "MSG", buf, orig_len);
+#endif
+
+ if (!erts_proc_lookup(to)) break; /* Process not alive */
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
+ }
+
+ erts_proc_sig_send_dist_exit(dep, from, to, ede_copy, reason, token);
+ break;
+ }
case DOP_GROUP_LEADER:
if (tuple_arity != 3) {
goto invalid_message;
@@ -1757,6 +1834,23 @@ data_error:
return -1;
}
+static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, int force_busy)
+{
+ struct erts_dsig_send_context ctx;
+ int ret;
+ ctx.ctl = ctl;
+ ctx.msg = msg;
+ ctx.force_busy = force_busy;
+ ctx.force_encode = 1;
+ ctx.phase = ERTS_DSIG_SEND_PHASE_INIT;
+#ifdef DEBUG
+ ctx.reds = 1; /* provoke assert below (no reduction count with force_encode) */
+#endif
+ ret = erts_dsig_send(dsdp, &ctx);
+ ASSERT(ret != ERTS_DSIG_SEND_CONTINUE);
+ return ret;
+}
+
static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy)
{
struct erts_dsig_send_context ctx;
@@ -1764,6 +1858,7 @@ static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy)
ctx.ctl = ctl;
ctx.msg = THE_NON_VALUE;
ctx.force_busy = force_busy;
+ ctx.force_encode = 1;
ctx.phase = ERTS_DSIG_SEND_PHASE_INIT;
#ifdef DEBUG
ctx.reds = 1; /* provoke assert below (no reduction count without msg) */
@@ -1850,10 +1945,14 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE;
case ERTS_DSIG_SEND_PHASE_MSG_SIZE:
- if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) {
- retval = ERTS_DSIG_SEND_CONTINUE;
- goto done;
- }
+ if (!ctx->force_encode) {
+ if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) {
+ retval = ERTS_DSIG_SEND_CONTINUE;
+ goto done;
+ }
+ } else {
+ erts_encode_dist_ext_size(ctx->msg, ctx->flags, ctx->acmp, &ctx->data_size);
+ }
ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
case ERTS_DSIG_SEND_PHASE_ALLOC:
@@ -1882,10 +1981,15 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE;
case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
- if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) {
- retval = ERTS_DSIG_SEND_CONTINUE;
- goto done;
- }
+ if (!ctx->force_encode) {
+ if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags,
+ ctx->acmp, &ctx->u.ec, &ctx->reds)) {
+ retval = ERTS_DSIG_SEND_CONTINUE;
+ goto done;
+ }
+ } else {
+ erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL);
+ }
ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
case ERTS_DSIG_SEND_PHASE_FIN: {
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index a8ccd1eaec..81a09bc69c 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -47,6 +47,7 @@
#define DFLAG_SEND_SENDER 0x80000
#define DFLAG_BIG_SEQTRACE_LABELS 0x100000
#define DFLAG_NO_MAGIC 0x200000 /* internal for pending connection */
+#define DFLAG_EXIT_PAYLOAD 0x400000
/* Mandatory flags for distribution */
#define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \
@@ -75,7 +76,8 @@
| DFLAG_MAP_TAG \
| DFLAG_BIG_CREATION \
| DFLAG_SEND_SENDER \
- | DFLAG_BIG_SEQTRACE_LABELS)
+ | DFLAG_BIG_SEQTRACE_LABELS \
+ | DFLAG_EXIT_PAYLOAD)
/* Flags addable by local distr implementations */
#define DFLAG_DIST_ADDABLE DFLAG_DIST_DEFAULT
@@ -99,26 +101,35 @@
| DFLAG_BIG_CREATION)
/* opcodes used in distribution messages */
-#define DOP_LINK 1
-#define DOP_SEND 2
-#define DOP_EXIT 3
-#define DOP_UNLINK 4
+enum {
+ DOP_LINK = 1,
+ DOP_SEND = 2,
+ DOP_EXIT = 3,
+ DOP_UNLINK = 4,
/* Ancient DOP_NODE_LINK (5) was here, can be reused */
-#define DOP_REG_SEND 6
-#define DOP_GROUP_LEADER 7
-#define DOP_EXIT2 8
-
-#define DOP_SEND_TT 12
-#define DOP_EXIT_TT 13
-#define DOP_REG_SEND_TT 16
-#define DOP_EXIT2_TT 18
-
-#define DOP_MONITOR_P 19
-#define DOP_DEMONITOR_P 20
-#define DOP_MONITOR_P_EXIT 21
-
-#define DOP_SEND_SENDER 22
-#define DOP_SEND_SENDER_TT 23
+ DOP_REG_SEND = 6,
+ DOP_GROUP_LEADER = 7,
+ DOP_EXIT2 = 8,
+
+ DOP_SEND_TT = 12,
+ DOP_EXIT_TT = 13,
+ DOP_REG_SEND_TT = 16,
+ DOP_EXIT2_TT = 18,
+
+ DOP_MONITOR_P = 19,
+ DOP_DEMONITOR_P = 20,
+ DOP_MONITOR_P_EXIT = 21,
+
+ DOP_SEND_SENDER = 22,
+ DOP_SEND_SENDER_TT = 23,
+
+ /* These are used when DFLAG_EXIT_PAYLOAD is detected */
+ DOP_PAYLOAD_EXIT = 24,
+ DOP_PAYLOAD_EXIT_TT = 25,
+ DOP_PAYLOAD_EXIT2 = 26,
+ DOP_PAYLOAD_EXIT2_TT = 27,
+ DOP_PAYLOAD_MONITOR_P_EXIT = 28
+};
/* distribution trap functions */
extern Export* dmonitor_node_trap;
@@ -346,6 +357,7 @@ struct erts_dsig_send_context {
Eterm ctl;
Eterm msg;
int force_busy;
+ int force_encode;
Uint32 max_finalize_prepend;
Uint data_size, dhdr_ext_size;
ErtsAtomCacheMap *acmp;
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index 942bec84cf..cd7bce8bf4 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -201,34 +201,40 @@ free_message_buffer(ErlHeapFragment* bp)
}while (bp != NULL);
}
+static void
+erts_cleanup_message(ErtsMessage *mp)
+{
+ ErlHeapFragment *bp;
+ if (ERTS_SIG_IS_EXTERNAL_MSG(mp) || ERTS_SIG_IS_NON_MSG(mp)) {
+ ErtsDistExternal *edep = erts_proc_sig_get_external(mp);
+ if (edep) {
+ erts_free_dist_ext_copy(edep);
+ if (mp->data.heap_frag == &mp->hfrag) {
+ ASSERT(ERTS_SIG_IS_EXTERNAL_MSG(mp));
+ mp->data.heap_frag = ERTS_MSG_COMBINED_HFRAG;
+ }
+ }
+ }
+
+ if (ERTS_SIG_IS_MSG(mp) && mp->data.attached != ERTS_MSG_COMBINED_HFRAG) {
+ bp = mp->data.heap_frag;
+ } else {
+ mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
+ bp = mp->hfrag.next;
+ erts_cleanup_offheap(&mp->hfrag.off_heap);
+ }
+
+ if (bp)
+ free_message_buffer(bp);
+}
+
void
erts_cleanup_messages(ErtsMessage *msgp)
{
ErtsMessage *mp = msgp;
while (mp) {
ErtsMessage *fmp;
- ErlHeapFragment *bp;
- if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) {
- if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) {
- bp = (ErlHeapFragment *) mp->data.dist_ext->ext_endp;
- erts_cleanup_offheap(&bp->off_heap);
- }
- if (mp->data.dist_ext)
- erts_free_dist_ext_copy(mp->data.dist_ext);
- }
- else {
- if (ERTS_SIG_IS_INTERNAL_MSG(mp)
- && mp->data.attached != ERTS_MSG_COMBINED_HFRAG) {
- bp = mp->data.heap_frag;
- }
- else {
- mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
- bp = mp->hfrag.next;
- erts_cleanup_offheap(&mp->hfrag.off_heap);
- }
- if (bp)
- free_message_buffer(bp);
- }
+ erts_cleanup_message(mp);
fmp = mp;
mp = mp->next;
erts_free_message(fmp);
@@ -1099,80 +1105,6 @@ change_to_off_heap:
return res;
}
-int
-erts_decode_dist_message(Process *proc, ErtsProcLocks proc_locks,
- ErtsMessage *msgp, int force_off_heap)
-{
- ErtsHeapFactory factory;
- Eterm msg;
- ErlHeapFragment *bp;
- Sint need;
- int decode_in_heap_frag;
-
- decode_in_heap_frag = (force_off_heap
- || !(proc_locks & ERTS_PROC_LOCK_MAIN)
- || (proc->flags & F_OFF_HEAP_MSGQ));
-
- if (msgp->data.dist_ext->heap_size >= 0)
- need = msgp->data.dist_ext->heap_size;
- else {
- need = erts_decode_dist_ext_size(msgp->data.dist_ext);
- if (need < 0) {
- /* bad msg; remove it... */
- if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
- bp = erts_dist_ext_trailer(msgp->data.dist_ext);
- erts_cleanup_offheap(&bp->off_heap);
- }
- erts_free_dist_ext_copy(msgp->data.dist_ext);
- msgp->data.dist_ext = NULL;
- return 0;
- }
-
- msgp->data.dist_ext->heap_size = need;
- }
-
- if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
- bp = erts_dist_ext_trailer(msgp->data.dist_ext);
- need += bp->used_size;
- }
-
- if (decode_in_heap_frag)
- erts_factory_heap_frag_init(&factory, new_message_buffer(need));
- else
- erts_factory_proc_prealloc_init(&factory, proc, need);
-
- ASSERT(msgp->data.dist_ext->heap_size >= 0);
- if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
- ErlHeapFragment *heap_frag;
- heap_frag = erts_dist_ext_trailer(msgp->data.dist_ext);
- ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp),
- heap_frag->used_size,
- &factory.hp,
- factory.off_heap);
- erts_cleanup_offheap(&heap_frag->off_heap);
- }
-
- msg = erts_decode_dist_ext(&factory, msgp->data.dist_ext);
- ERL_MESSAGE_TERM(msgp) = msg;
- erts_free_dist_ext_copy(msgp->data.dist_ext);
- msgp->data.attached = NULL;
-
- if (is_non_value(msg)) {
- erts_factory_undo(&factory);
- return 0;
- }
-
- erts_factory_trim_and_close(&factory, msgp->m,
- ERL_MESSAGE_REF_ARRAY_SZ);
-
- ASSERT(!msgp->data.heap_frag);
-
- if (decode_in_heap_frag)
- msgp->data.heap_frag = factory.heap_frags;
-
- return 1;
-}
-
void erts_factory_proc_init(ErtsHeapFactory* factory,
Process* p)
{
diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h
index b2550814fd..58294648b4 100644
--- a/erts/emulator/beam/erl_message.h
+++ b/erts/emulator/beam/erl_message.h
@@ -455,8 +455,6 @@ Sint erts_move_messages_off_heap(Process *c_p);
Sint erts_complete_off_heap_message_queue_change(Process *c_p);
Eterm erts_change_message_queue_management(Process *c_p, Eterm new_state);
-int erts_decode_dist_message(Process *, ErtsProcLocks, ErtsMessage *, int);
-
void erts_cleanup_messages(ErtsMessage *mp);
void *erts_alloc_message_ref(void);
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 668ee4ce96..1bd219b6d7 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -80,6 +80,8 @@
#define ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO \
ERTS_SIG_Q_TYPE_MAX
+#define ERTS_SIG_IS_EXTERNAL(sig) is_non_value(get_exit_signal_data(sig)->reason)
+
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler);
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_high);
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_max);
@@ -112,6 +114,8 @@ typedef struct {
Eterm message;
Eterm from;
Eterm reason;
+ struct erl_dist_external *dist_ext;
+ ErlHeapFragment *heap_frag;
union {
Eterm ref;
int normal_kills;
@@ -936,13 +940,32 @@ erts_proc_sig_privqs_len(Process *c_p)
return proc_sig_privqs_len(c_p, 0);
}
+ErtsDistExternal *
+erts_proc_sig_get_external(ErtsMessage *msgp)
+{
+ if (ERTS_SIG_IS_EXTERNAL_MSG(msgp)) {
+ return erts_get_dist_ext(msgp->data.heap_frag);
+ } else if (ERTS_SIG_IS_NON_MSG(msgp) && ERTS_SIG_IS_EXTERNAL(msgp)) {
+ ErtsDistExternal *edep;
+ ErtsExitSignalData *xsigd = get_exit_signal_data(msgp);
+ ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT);
+ ASSERT(is_non_value(xsigd->reason));
+ if (msgp->hfrag.next == NULL)
+ edep = (ErtsDistExternal*)((void*)xsigd) + sizeof(ErtsExitSignalData);
+ else
+ edep = erts_get_dist_ext(msgp->hfrag.next);
+ return edep;
+ }
+ return NULL;
+}
+
static void do_seq_trace_output(Eterm to, Eterm token, Eterm msg);
static void
send_gen_exit_signal(Process *c_p, Eterm from_tag,
Eterm from, Eterm to,
- Sint16 op, Eterm reason, Eterm ref,
- Eterm token, int normal_kills)
+ Sint16 op, Eterm reason, ErtsDistExternal *dist_ext,
+ Eterm ref, Eterm token, int normal_kills)
{
ErtsExitSignalData *xsigd;
Eterm *hp, *start_hp, s_reason, s_ref, s_message, s_token, s_from;
@@ -956,6 +979,9 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
Uint utag_sz;
#endif
+ ASSERT((is_value(reason) && dist_ext == NULL) ||
+ (is_non_value(reason) && dist_ext != NULL));
+
ASSERT(is_immed(from_tag));
hsz = sizeof(ErtsExitSignalData)/sizeof(Uint);
@@ -977,33 +1003,35 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
hsz += utag_sz;
#endif
- token_sz = is_immed(token) ? 0 : size_object(token);
+ token_sz = size_object(token);
hsz += token_sz;
- from_sz = is_immed(from) ? 0 : size_object(from);
+ from_sz = size_object(from);
hsz += from_sz;
- reason_sz = is_immed(reason) ? 0 : size_object(reason);
- hsz += reason_sz;
+ ref_sz = size_object(ref);
+ hsz += ref_sz;
- switch (op) {
- case ERTS_SIG_Q_OP_EXIT:
- case ERTS_SIG_Q_OP_EXIT_LINKED: {
- /* {'EXIT', From, Reason} */
- hsz += 4; /* 3-tuple */
- ref_sz = 0;
- break;
- }
- case ERTS_SIG_Q_OP_MONITOR_DOWN: {
- /* {'DOWN', Ref, process, From, Reason} */
- hsz += 6; /* 5-tuple */
- ref_sz = NC_HEAP_SIZE(ref);
- hsz += ref_sz;
- break;
- }
- default:
- ERTS_INTERNAL_ERROR("Invalid exit signal op");
- break;
+ if (is_value(reason)) {
+ reason_sz = size_object(reason);
+ hsz += reason_sz;
+
+ switch (op) {
+ case ERTS_SIG_Q_OP_EXIT:
+ case ERTS_SIG_Q_OP_EXIT_LINKED: {
+ /* {'EXIT', From, Reason} */
+ hsz += 4; /* 3-tuple */
+ break;
+ }
+ case ERTS_SIG_Q_OP_MONITOR_DOWN: {
+ /* {'DOWN', Ref, process, From, Reason} */
+ hsz += 6; /* 5-tuple */
+ break;
+ }
+ default:
+ ERTS_INTERNAL_ERROR("Invalid exit signal op");
+ break;
+ }
}
/*
@@ -1015,35 +1043,33 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
ohp = &hfrag->off_heap;
start_hp = hp;
- s_token = (is_immed(token)
- ? token
- : copy_struct(token, token_sz, &hp, ohp));
-
- s_reason = (is_immed(reason)
- ? reason
- : copy_struct(reason, reason_sz, &hp, ohp));
+ s_token = copy_struct(token, token_sz, &hp, ohp);
+ s_from = copy_struct(from, from_sz, &hp, ohp);
+ s_ref = copy_struct(ref, ref_sz, &hp, ohp);
- s_from = (is_immed(from)
- ? from
- : copy_struct(from, from_sz, &hp, ohp));
+ if (is_value(reason)) {
+ s_reason = copy_struct(reason, reason_sz, &hp, ohp);
- if (!ref_sz)
- s_ref = NIL;
- else
- s_ref = STORE_NC(&hp, ohp, ref);
-
- switch (op) {
- case ERTS_SIG_Q_OP_EXIT:
- case ERTS_SIG_Q_OP_EXIT_LINKED:
- /* {'EXIT', From, Reason} */
- s_message = TUPLE3(hp, am_EXIT, s_from, s_reason);
- hp += 4;
- break;
- case ERTS_SIG_Q_OP_MONITOR_DOWN:
- /* {'DOWN', Ref, process, From, Reason} */
- s_message = TUPLE5(hp, am_DOWN, s_ref, am_process, s_from, s_reason);
- hp += 6;
- break;
+ switch (op) {
+ case ERTS_SIG_Q_OP_EXIT:
+ case ERTS_SIG_Q_OP_EXIT_LINKED:
+ /* {'EXIT', From, Reason} */
+ s_message = TUPLE3(hp, am_EXIT, s_from, s_reason);
+ hp += 4;
+ break;
+ case ERTS_SIG_Q_OP_MONITOR_DOWN:
+ /* {'DOWN', Ref, process, From, Reason} */
+ s_message = TUPLE5(hp, am_DOWN, s_ref, am_process, s_from, s_reason);
+ hp += 6;
+ break;
+ default:
+ /* This cannot happen, used to silence gcc warning */
+ s_message = THE_NON_VALUE;
+ break;
+ }
+ } else {
+ s_message = THE_NON_VALUE;
+ s_reason = THE_NON_VALUE;
}
#ifdef USE_VM_PROBES
@@ -1066,6 +1092,9 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
xsigd->message = s_message;
xsigd->from = s_from;
xsigd->reason = s_reason;
+ xsigd->dist_ext = dist_ext;
+ xsigd->heap_frag = NULL;
+
if (is_nil(s_ref))
xsigd->u.normal_kills = normal_kills;
else {
@@ -1205,7 +1234,18 @@ erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to,
from_tag = dep->sysname;
}
send_gen_exit_signal(c_p, from_tag, from, to, ERTS_SIG_Q_OP_EXIT,
- reason, NIL, token, normal_kills);
+ reason, NULL, NIL, token, normal_kills);
+}
+
+void
+erts_proc_sig_send_dist_exit(DistEntry *dep,
+ Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
+ Eterm reason, Eterm token)
+{
+ send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT,
+ reason, dist_ext, NIL, token, 0);
+
}
void
@@ -1219,7 +1259,7 @@ erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk,
if (is_not_immed(reason) || is_not_nil(token)) {
ASSERT(is_internal_pid(from) || is_internal_port(from));
send_gen_exit_signal(c_p, from, from, to, ERTS_SIG_Q_OP_EXIT_LINKED,
- reason, NIL, token, 0);
+ reason, NULL, NIL, token, 0);
}
else {
/* Pass signal using old link structure... */
@@ -1274,10 +1314,12 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk)
void
erts_proc_sig_send_dist_link_exit(DistEntry *dep,
Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
Eterm reason, Eterm token)
{
send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT_LINKED,
- reason, NIL, token, 0);
+ reason, dist_ext, NIL, token, 0);
+
}
void
@@ -1299,16 +1341,17 @@ erts_proc_sig_send_dist_unlink(DistEntry *dep, Eterm from, Eterm to)
void
erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref,
Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
Eterm reason)
{
Eterm monitored, heap[3];
- if (is_atom(from))
+ if (is_atom(from))
monitored = TUPLE2(&heap[0], from, dep->sysname);
else
monitored = from;
send_gen_exit_signal(NULL, dep->sysname, monitored,
to, ERTS_SIG_Q_OP_MONITOR_DOWN,
- reason, ref, NIL, 0);
+ reason, dist_ext, ref, NIL, 0);
}
void
@@ -1376,10 +1419,10 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason)
|| is_internal_pid(from_tag)
|| is_atom(from_tag));
monitored = TUPLE2(&heap[0], name, node);
- }
+ }
send_gen_exit_signal(NULL, from_tag, monitored,
to, ERTS_SIG_Q_OP_MONITOR_DOWN,
- reason, mdp->ref, NIL, 0);
+ reason, NULL, mdp->ref, NIL, 0);
}
erts_monitor_release(mon);
}
@@ -2037,7 +2080,6 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing,
if (type == ERTS_SIG_Q_TYPE_GEN_EXIT) {
xsigd = get_exit_signal_data(sig);
from = xsigd->from;
- reason = xsigd->reason;
if (op != ERTS_SIG_Q_OP_EXIT_LINKED)
ignore = 0;
else {
@@ -2062,13 +2104,28 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing,
}
}
+ /* This GEN_EXIT was received from another node, decode the exit reason */
+ if (ERTS_SIG_IS_EXTERNAL(sig))
+ erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1);
+
+ reason = xsigd->reason;
+
+ if (is_non_value(reason)) {
+ /* Bad distribution message; remove it from queue... */
+ ignore = !0;
+ destroy = !0;
+ }
+
if (!ignore) {
if ((op != ERTS_SIG_Q_OP_EXIT || reason != am_kill)
&& (c_p->flags & F_TRAP_EXIT)) {
convert_prepared_sig_to_msg(c_p, sig,
xsigd->message, next_nm_sig);
+ ASSERT(sig->hfrag.next == NULL);
+ sig->hfrag.next = xsigd->heap_frag;
conv_msg = sig;
+
}
else if (reason == am_normal && !xsigd->u.normal_kills) {
/* Ignore it... */
@@ -2929,6 +2986,130 @@ handle_sync_suspend(Process *c_p, ErtsMessage *mp)
}
}
+int
+erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks,
+ ErtsMessage *msgp, int force_off_heap)
+{
+ ErtsHeapFactory factory;
+ Eterm msg;
+ ErlHeapFragment *bp;
+ Sint need;
+ int decode_in_heap_frag;
+ ErtsDistExternal *dist_ext;
+ ErtsExitSignalData *xsigd = NULL;
+
+ decode_in_heap_frag = (force_off_heap
+ || !(proc_locks & ERTS_PROC_LOCK_MAIN)
+ || (proc->flags & F_OFF_HEAP_MSGQ));
+
+ if (ERTS_SIG_IS_EXTERNAL_MSG(msgp))
+ dist_ext = msgp->data.dist_ext;
+ else {
+ xsigd = get_exit_signal_data(msgp);
+ ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT);
+ ASSERT(is_non_value(xsigd->reason));
+ dist_ext = xsigd->dist_ext;
+ }
+
+ if (dist_ext->heap_size >= 0)
+ need = dist_ext->heap_size;
+ else {
+ need = erts_decode_dist_ext_size(dist_ext);
+ if (need < 0) {
+ /* bad msg; remove it... */
+ if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
+ bp = erts_dist_ext_trailer(dist_ext);
+ erts_cleanup_offheap(&bp->off_heap);
+ }
+ erts_free_dist_ext_copy(dist_ext);
+ dist_ext = NULL;
+ return 0;
+ }
+
+ dist_ext->heap_size = need;
+ }
+
+ if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
+ bp = erts_dist_ext_trailer(dist_ext);
+ need += bp->used_size;
+ }
+
+ if (xsigd) {
+ switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) {
+ case ERTS_SIG_Q_OP_EXIT:
+ case ERTS_SIG_Q_OP_EXIT_LINKED:
+ /* {'EXIT', From, Reason} */
+ need += 4;
+ break;
+ case ERTS_SIG_Q_OP_MONITOR_DOWN:
+ /* {'DOWN', Ref, process, From, Reason} */
+ need += 6; /* 5-tuple */
+ break;
+ default:
+ ERTS_INTERNAL_ERROR("Invalid exit signal op");
+ break;
+ }
+ }
+
+ if (decode_in_heap_frag)
+ erts_factory_heap_frag_init(&factory, new_message_buffer(need));
+ else
+ erts_factory_proc_prealloc_init(&factory, proc, need);
+
+ ASSERT(dist_ext->heap_size >= 0);
+ if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
+ ErlHeapFragment *heap_frag;
+ heap_frag = erts_dist_ext_trailer(dist_ext);
+ ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp),
+ heap_frag->used_size,
+ &factory.hp,
+ factory.off_heap);
+ erts_cleanup_offheap(&heap_frag->off_heap);
+ }
+
+ msg = erts_decode_dist_ext(&factory, dist_ext);
+ if (!xsigd) {
+ ERL_MESSAGE_TERM(msgp) = msg;
+ msgp->data.attached = NULL;
+ } else {
+ switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) {
+ case ERTS_SIG_Q_OP_EXIT:
+ case ERTS_SIG_Q_OP_EXIT_LINKED:
+ /* {'EXIT', From, Reason} */
+ erts_reserve_heap(&factory, 4);
+ xsigd->message = TUPLE3(factory.hp, am_EXIT, xsigd->from, msg);
+ factory.hp += 4;
+ break;
+ case ERTS_SIG_Q_OP_MONITOR_DOWN:
+ /* {'DOWN', Ref, process, From, Reason} */
+ erts_reserve_heap(&factory, 6);
+ xsigd->message = TUPLE5(factory.hp, am_DOWN, xsigd->u.ref, am_process, xsigd->from, msg);
+ factory.hp += 6;
+ break;
+ }
+ xsigd->reason = msg;
+ }
+ erts_free_dist_ext_copy(dist_ext);
+
+ if (is_non_value(msg)) {
+ erts_factory_undo(&factory);
+ return 0;
+ }
+
+ erts_factory_close(&factory);
+
+ ASSERT(!msgp->data.heap_frag || xsigd);
+
+ if (decode_in_heap_frag) {
+ if (!xsigd)
+ msgp->data.heap_frag = factory.heap_frags;
+ else
+ xsigd->heap_frag = factory.heap_frags;
+ }
+
+ return 1;
+}
+
void
erts_proc_sig_handle_pending_suspend(Process *c_p)
{
@@ -3045,7 +3226,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
ASSERT(ERTS_SIG_IS_NON_MSG(sig));
tag = ((ErtsSignal *) sig)->common.tag;
-
+
switch (ERTS_PROC_SIG_OP(tag)) {
case ERTS_SIG_Q_OP_EXIT:
@@ -3091,6 +3272,11 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
break;
case ERTS_SIG_Q_TYPE_GEN_EXIT:
xsigd = get_exit_signal_data(sig);
+
+ /* This GEN_EXIT was received from another node, decode the exit reason */
+ if (ERTS_SIG_IS_EXTERNAL(sig))
+ erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1);
+
omon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p),
xsigd->u.ref);
if (omon) {
@@ -4190,8 +4376,8 @@ handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing,
}
if (ERTS_SIG_IS_EXTERNAL_MSG(sig)) {
cnt++;
- if (!erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN,
- sig, 0)) {
+ if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN,
+ sig, 0)) {
/* Bad dist message; remove it... */
remove_mq_m_sig(c_p, sig, next_sig, next_nm_sig);
sig = *next_sig;
@@ -4266,7 +4452,7 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) {
/* decode it... */
if (mp->data.attached)
- erts_decode_dist_message(rp, rp_locks, mp, !0);
+ erts_proc_sig_decode_dist(rp, rp_locks, mp, !0);
msg = ERL_MESSAGE_TERM(mp);
diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h
index 5a02708bb7..39cee6f230 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.h
+++ b/erts/emulator/beam/erl_proc_sig_queue.h
@@ -89,6 +89,7 @@
#endif
struct erl_mesg;
+struct erl_dist_external;
typedef struct {
struct erl_mesg *next;
@@ -212,6 +213,34 @@ erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to,
/**
*
+ * @brief Send an exit signal to a process.
+ *
+ * This function is used instead of erts_proc_sig_send_link_exit()
+ * when the signal arrives via the distribution and
+ * therefore no link structure is available.
+ *
+ * @param[in] dep Distribution entry of channel
+ * that the signal arrived on.
+ *
+ * @param[in] from Identifier of sender.
+ *
+ * @param[in] to Identifier of receiver.
+ *
+ * @param[in] dist_ext The exit reason in external term format
+ *
+ * @param[in] reason Exit reason.
+ *
+ * @param[in] token Seq trace token.
+ *
+ */
+void
+erts_proc_sig_send_dist_exit(DistEntry *dep,
+ Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
+ Eterm reason, Eterm token);
+
+/**
+ *
* @brief Send an exit signal due to broken link to a process.
*
*
@@ -282,7 +311,7 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk);
*
* This function is used instead of erts_proc_sig_send_link_exit()
* when the signal arrives via the distribution and
- * no link structure is available.
+ * therefore no link structure is available.
*
* @param[in] dep Distribution entry of channel
* that the signal arrived on.
@@ -291,6 +320,8 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk);
*
* @param[in] to Identifier of receiver.
*
+ * @param[in] dist_ext The exit reason in external term format
+ *
* @param[in] reason Exit reason.
*
* @param[in] token Seq trace token.
@@ -299,6 +330,7 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk);
void
erts_proc_sig_send_dist_link_exit(struct dist_entry_ *dep,
Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
Eterm reason, Eterm token);
/**
@@ -307,7 +339,7 @@ erts_proc_sig_send_dist_link_exit(struct dist_entry_ *dep,
*
* This function is used instead of erts_proc_sig_send_unlink()
* when the signal arrives via the distribution and
- * no link structure is available.
+ * therefore no link structure is available.
*
* @param[in] dep Distribution entry of channel
* that the signal arrived on.
@@ -380,7 +412,7 @@ erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to);
*
* This function is used instead of erts_proc_sig_send_monitor_down()
* when the signal arrives via the distribution and
- * no link structure is available.
+ * therefore no monitor structure is available.
*
* @param[in] dep Pointer to distribution entry
* of channel that the signal
@@ -392,12 +424,15 @@ erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to);
*
* @param[in] to Identifier of receiver.
*
+ * @param[in] dist_ext The exit reason in external term format
+ *
* @param[in] reason Exit reason.
*
*/
void
erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref,
Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
Eterm reason);
/**
@@ -962,6 +997,34 @@ void
erts_proc_sig_handle_pending_suspend(Process *c_p);
/**
+ *
+ * @brief Decode the reason term in an external signal
+ *
+ * Any distributed signal with a payload only has the control
+ * message decoded by the dist entry. The final decode of the
+ * payload is done by the process when it inspects the signal
+ * by calling this function.
+ *
+ * This functions handles both messages and link/monitor exits.
+ *
+ * Return true if the decode was successful, false otherwise.
+ *
+ * @param[in] c_p Pointer to executing process
+ *
+ * @param[in] proc_lock Locks held by process. Should always be MAIN.
+ *
+ * @param[in] msgp The signal to decode
+ *
+ * @param[in] force_off_heap If the term should be forced to be off-heap
+ */
+int
+erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks,
+ ErtsMessage *msgp, int force_off_heap);
+
+ErtsDistExternal *
+erts_proc_sig_get_external(ErtsMessage *msgp);
+
+/**
* @brief Initialize this functionality
*/
void erts_proc_sig_queue_init(void);
diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c
index 4b8040b356..640ca338d9 100644
--- a/erts/emulator/beam/external.c
+++ b/erts/emulator/beam/external.c
@@ -636,11 +636,13 @@ byte* erts_encode_ext_ets(Eterm term, byte *ep, struct erl_off_heap_header** off
}
ErtsDistExternal *
-erts_make_dist_ext_copy(ErtsDistExternal *edep, Uint xsize)
+erts_make_dist_ext_copy(ErtsDistExternal *edep, Eterm *token)
{
size_t align_sz;
size_t dist_ext_sz;
size_t ext_sz = 0;
+ size_t token_sz = 0;
+ Eterm token_size;
byte *ep;
ErtsDistExternal *new_edep;
@@ -652,8 +654,12 @@ erts_make_dist_ext_copy(ErtsDistExternal *edep, Uint xsize)
align_sz = ERTS_EXTRA_DATA_ALIGN_SZ(dist_ext_sz + ext_sz);
+ token_size = size_object(*token);
+ if (token_size)
+ token_sz = ERTS_HEAP_FRAG_SIZE(token_size);
+
new_edep = erts_alloc(ERTS_ALC_T_EXT_TERM_DATA,
- dist_ext_sz + ext_sz + align_sz + xsize);
+ dist_ext_sz + ext_sz + align_sz + token_sz);
ep = (byte *) new_edep;
sys_memcpy((void *) ep, (void *) edep, dist_ext_sz);
@@ -669,6 +675,18 @@ erts_make_dist_ext_copy(ErtsDistExternal *edep, Uint xsize)
} else {
erts_refc_inc(&new_edep->binp->intern.refc, 2);
}
+
+ /* Copy the seq_trace token */
+ if (is_not_nil(*token)) {
+ ErlHeapFragment *heap_frag;
+ ErlOffHeap *ohp;
+ Eterm *hp;
+ heap_frag = erts_dist_ext_trailer(new_edep);
+ ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
+ hp = heap_frag->mem;
+ ohp = &heap_frag->off_heap;
+ *token = copy_struct(*token, token_size, &hp, ohp);
+ }
return new_edep;
}
diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h
index 1223f90fcc..91c028c9bd 100644
--- a/erts/emulator/beam/external.h
+++ b/erts/emulator/beam/external.h
@@ -176,7 +176,7 @@ byte* erts_encode_ext_ets(Eterm, byte *, struct erl_off_heap_header** ext_off_he
void erts_free_dist_ext_copy(ErtsDistExternal *);
ERTS_GLB_INLINE void *erts_dist_ext_trailer(ErtsDistExternal *);
-ErtsDistExternal *erts_make_dist_ext_copy(ErtsDistExternal *, Uint);
+ErtsDistExternal *erts_make_dist_ext_copy(ErtsDistExternal *, Eterm *);
void *erts_dist_ext_trailer(ErtsDistExternal *);
void erts_destroy_dist_ext_copy(ErtsDistExternal *);
diff --git a/erts/emulator/beam/msg_instrs.tab b/erts/emulator/beam/msg_instrs.tab
index 9bf3aefaca..6f8d1469ef 100644
--- a/erts/emulator/beam/msg_instrs.tab
+++ b/erts/emulator/beam/msg_instrs.tab
@@ -137,8 +137,8 @@ i_loop_rec(Dest) {
if (ERTS_UNLIKELY(ERTS_SIG_IS_EXTERNAL_MSG(msgp))) {
FCALLS -= 10; /* FIXME: bump appropriate amount... */
- SWAPOUT; /* erts_decode_dist_message() may write to heap... */
- if (!erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) {
+ SWAPOUT; /* erts_proc_sig_decode_dist() may write to heap... */
+ if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) {
/*
* A corrupt distribution message that we weren't able to decode;
* remove it...
diff --git a/erts/emulator/hipe/hipe_native_bif.c b/erts/emulator/hipe/hipe_native_bif.c
index 211ce0492a..80e5d81023 100644
--- a/erts/emulator/hipe/hipe_native_bif.c
+++ b/erts/emulator/hipe/hipe_native_bif.c
@@ -579,7 +579,7 @@ Eterm hipe_check_get_msg(Process *c_p)
if (ERTS_SIG_IS_EXTERNAL_MSG(msgp)) {
/* FIXME: bump appropriate amount... */
- if (!erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) {
+ if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) {
/*
* A corrupt distribution message that we weren't able to decode;
* remove it...