aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r--erts/emulator/beam/dist.c284
-rw-r--r--erts/emulator/beam/dist.h52
-rw-r--r--erts/emulator/beam/erl_message.c124
-rw-r--r--erts/emulator/beam/erl_message.h2
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c312
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.h69
-rw-r--r--erts/emulator/beam/external.c22
-rw-r--r--erts/emulator/beam/external.h2
-rw-r--r--erts/emulator/beam/msg_instrs.tab4
9 files changed, 592 insertions, 279 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 3b6c0e5db9..91a9481d84 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -50,7 +50,8 @@
#define DIST_CTL_DEFAULT_SIZE 64
/* Turn this on to get printouts of all distribution messages
- * which go on the line
+ * which go on the line. Enabling this may make some testcases
+ * fail. Especially the broken dist testcases in distribution_SUITE.
*/
#if 0
#define ERTS_DIST_MSG_DBG
@@ -71,8 +72,6 @@ static void
dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
{
ErtsHeapFactory factory;
- DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
- Eterm* ctl = ctl_default;
byte *extp = edep->extp;
Eterm msg;
Sint ctl_len;
@@ -116,6 +115,7 @@ static Export *dist_ctrl_put_data_trap;
/* forward declarations */
+static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, int force_busy);
static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy);
static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm);
static void init_nodes_monitors(void);
@@ -848,7 +848,7 @@ int
erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
Eterm ref, Eterm reason)
{
- Eterm ctl;
+ Eterm ctl, msg;
DeclareTmpHeapNoproc(ctl_heap,6);
int res;
@@ -861,10 +861,17 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
UseTmpHeapNoproc(6);
- ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT),
- watched, watcher, ref, reason);
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE4(&ctl_heap[0], make_small(DOP_PAYLOAD_MONITOR_P_EXIT),
+ watched, watcher, ref);
+ msg = reason;
+ } else {
+ ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT),
+ watched, watcher, ref, reason);
+ msg = THE_NON_VALUE;
+ }
- res = dsig_send_ctl(dsdp, ctl, 1);
+ res = dsig_send_exit(dsdp, ctl, msg, 1);
UnUseTmpHeapNoproc(6);
return res;
}
@@ -1018,6 +1025,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
ctx->dss.ctl = ctl;
ctx->dss.msg = message;
ctx->dss.force_busy = 0;
+ ctx->dss.force_encode = 0;
res = erts_dsig_send(&ctx->dsd, &ctx->dss);
return res;
}
@@ -1077,6 +1085,7 @@ erts_dsig_send_reg_msg(Eterm remote_name, Eterm message,
ctx->dss.ctl = ctl;
ctx->dss.msg = message;
ctx->dss.force_busy = 0;
+ ctx->dss.force_encode = 0;
res = erts_dsig_send(&ctx->dsd, &ctx->dss);
return res;
}
@@ -1086,7 +1095,7 @@ int
erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
Eterm reason, Eterm token)
{
- Eterm ctl;
+ Eterm ctl, msg = THE_NON_VALUE;
DeclareTmpHeapNoproc(ctl_heap,6);
int res;
#ifdef USE_VM_PROBES
@@ -1101,13 +1110,24 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
#endif
UseTmpHeapNoproc(6);
+
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD)
+ msg = reason;
+
if (have_seqtrace(token)) {
seq_trace_update_send(dsdp->proc);
seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local);
- ctl = TUPLE5(&ctl_heap[0],
- make_small(DOP_EXIT_TT), local, remote, token, reason);
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE4(&ctl_heap[0],
+ make_small(DOP_PAYLOAD_EXIT_TT), local, remote, token);
+ } else
+ ctl = TUPLE5(&ctl_heap[0],
+ make_small(DOP_EXIT_TT), local, remote, token, reason);
} else {
- ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD)
+ ctl = TUPLE3(&ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote);
+ else
+ ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
}
#ifdef USE_VM_PROBES
*node_name = *sender_name = *remote_name = '\0';
@@ -1129,8 +1149,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
#endif
DTRACE7(process_exit_signal_remote, sender_name, node_name,
remote_name, reason_str, tok_label, tok_lastcnt, tok_serial);
- /* forced, i.e ignore busy */
- res = dsig_send_ctl(dsdp, ctl, 1);
+ res = dsig_send_exit(dsdp, ctl, msg, 1);
UnUseTmpHeapNoproc(6);
return res;
}
@@ -1140,13 +1159,18 @@ erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason)
{
DeclareTmpHeapNoproc(ctl_heap,5);
int res;
- Eterm ctl;
+ Eterm ctl, msg = dsdp->dep->flags & DFLAG_EXIT_PAYLOAD ? reason : THE_NON_VALUE;
UseTmpHeapNoproc(5);
- ctl = TUPLE4(&ctl_heap[0],
- make_small(DOP_EXIT), local, remote, reason);
- /* forced, i.e ignore busy */
- res = dsig_send_ctl(dsdp, ctl, 1);
+
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE3(&ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote);
+ msg = reason;
+ } else {
+ ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
+ msg = THE_NON_VALUE;
+ }
+ res = dsig_send_exit(dsdp, ctl, msg, 1);
UnUseTmpHeapNoproc(5);
return res;
}
@@ -1156,13 +1180,20 @@ erts_dsig_send_exit2(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason
{
DeclareTmpHeapNoproc(ctl_heap,5);
int res;
- Eterm ctl;
+ Eterm ctl, msg;
UseTmpHeapNoproc(5);
- ctl = TUPLE4(&ctl_heap[0],
- make_small(DOP_EXIT2), local, remote, reason);
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE3(&ctl_heap[0],
+ make_small(DOP_PAYLOAD_EXIT2), local, remote);
+ msg = reason;
+ } else {
+ ctl = TUPLE4(&ctl_heap[0],
+ make_small(DOP_EXIT2), local, remote, reason);
+ msg = THE_NON_VALUE;
+ }
- res = dsig_send_ctl(dsdp, ctl, 0);
+ res = dsig_send_exit(dsdp, ctl, msg, 0);
UnUseTmpHeapNoproc(5);
return res;
}
@@ -1227,10 +1258,8 @@ int erts_net_message(Port *prt,
DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
Eterm* ctl = ctl_default;
ErtsHeapFactory factory;
- Eterm* hp;
Sint type;
Eterm token;
- Eterm token_size;
Uint tuple_arity;
int res;
#ifdef ERTS_DIST_MSG_DBG
@@ -1290,7 +1319,6 @@ int erts_net_message(Port *prt,
if (ctl_len > DIST_CTL_DEFAULT_SIZE) {
ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm));
}
- hp = ctl;
erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF);
arg = erts_decode_dist_ext(&factory, &ede);
@@ -1313,7 +1341,6 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- token_size = 0;
token = NIL;
switch (type = unsigned_val(tuple[1])) {
@@ -1508,7 +1535,6 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- token_size = size_object(tuple[5]);
/* Fall through ... */
case DOP_REG_SEND:
/* {DOP_REG_SEND, From, Cookie, ToName} -- Message */
@@ -1533,35 +1559,25 @@ int erts_net_message(Port *prt,
}
rp = erts_whereis_process(NULL, 0, to, 0, 0);
if (rp) {
- Uint xsize = (type == DOP_REG_SEND
- ? 0
- : ERTS_HEAP_FRAG_SIZE(token_size));
ErtsProcLocks locks = 0;
ErtsDistExternal *ede_copy;
- ede_copy = erts_make_dist_ext_copy(&ede, xsize);
if (type == DOP_REG_SEND) {
token = NIL;
} else {
- ErlHeapFragment *heap_frag;
- ErlOffHeap *ohp;
- ASSERT(xsize);
- heap_frag = erts_dist_ext_trailer(ede_copy);
- ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
- hp = heap_frag->mem;
- ohp = &heap_frag->off_heap;
token = tuple[5];
- token = copy_struct(token, token_size, &hp, ohp);
}
- erts_queue_dist_message(rp, locks, ede_copy, token, from);
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
+
+ erts_queue_dist_message(rp, locks, ede_copy, token, from);
+
if (locks)
erts_proc_unlock(rp, locks);
}
break;
case DOP_SEND_SENDER_TT: {
- Uint xsize;
case DOP_SEND_TT:
if (tuple_arity != 4) {
@@ -1569,15 +1585,12 @@ int erts_net_message(Port *prt,
}
token = tuple[4];
- token_size = size_object(token);
- xsize = ERTS_HEAP_FRAG_SIZE(token_size);
goto send_common;
case DOP_SEND_SENDER:
case DOP_SEND:
token = NIL;
- xsize = 0;
if (tuple_arity != 3)
goto invalid_message;
@@ -1604,17 +1617,7 @@ int erts_net_message(Port *prt,
ErtsProcLocks locks = 0;
ErtsDistExternal *ede_copy;
- ede_copy = erts_make_dist_ext_copy(&ede, xsize);
- if (is_not_nil(token)) {
- ErlHeapFragment *heap_frag;
- ErlOffHeap *ohp;
- ASSERT(xsize);
- heap_frag = erts_dist_ext_trailer(ede_copy);
- ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
- hp = heap_frag->mem;
- ohp = &heap_frag->off_heap;
- token = copy_struct(token, token_size, &hp, ohp);
- }
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
erts_queue_dist_message(rp, locks, ede_copy, token, am_Empty);
if (locks)
@@ -1623,18 +1626,28 @@ int erts_net_message(Port *prt,
break;
}
+ case DOP_PAYLOAD_MONITOR_P_EXIT:
case DOP_MONITOR_P_EXIT: {
+ ErtsDistExternal *ede_copy = NULL;
+
/* We are monitoring a process on the remote node which dies, we get
{DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */
-
- if (tuple_arity != 5) {
- goto invalid_message;
- }
- watched = tuple[2]; /* remote proc or name which died */
- watcher = tuple[3];
+ watched = tuple[2]; /* remote proc or name which died */
+ watcher = tuple[3];
ref = tuple[4];
- reason = tuple[5];
+
+ if (type == DOP_PAYLOAD_MONITOR_P_EXIT) {
+ if (tuple_arity != 4) {
+ goto invalid_message;
+ }
+ reason = THE_NON_VALUE;
+ } else {
+ if (tuple_arity != 5) {
+ goto invalid_message;
+ }
+ reason = tuple[5];
+ }
if (is_not_ref(ref))
goto invalid_message;
@@ -1650,70 +1663,134 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
+ if (reason == THE_NON_VALUE) {
+
+#ifdef ERTS_DIST_MSG_DBG
+ dist_msg_dbg(&ede, "MSG", buf, orig_len);
+#endif
+
+ if (!erts_proc_lookup(watcher)) break; /* Process not alive */
+
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
+ }
+
erts_proc_sig_send_dist_monitor_down(dep, ref, watched,
- watcher, reason);
+ watcher, ede_copy, reason);
break;
}
+ case DOP_PAYLOAD_EXIT:
+ case DOP_PAYLOAD_EXIT_TT:
case DOP_EXIT_TT:
case DOP_EXIT: {
+ ErtsDistExternal *ede_copy = NULL;
+
/* 'from', which 'to' is linked to, died */
+ from = tuple[2];
+ to = tuple[3];
+
if (type == DOP_EXIT) {
if (tuple_arity != 4) {
goto invalid_message;
}
-
- from = tuple[2];
- to = tuple[3];
- reason = tuple[4];
token = NIL;
- } else {
+ reason = tuple[4];
+ } else if (type == DOP_EXIT_TT){
if (tuple_arity != 5) {
goto invalid_message;
}
- from = tuple[2];
- to = tuple[3];
token = tuple[4];
reason = tuple[5];
- }
+ } else if (type == DOP_PAYLOAD_EXIT) {
+ if (tuple_arity != 3) {
+ goto invalid_message;
+ }
+ token = NIL;
+ reason = THE_NON_VALUE;
+ } else {
+ if (tuple_arity != 4) {
+ goto invalid_message;
+ }
+ token = tuple[4];
+ reason = THE_NON_VALUE;
+ }
if (is_not_external_pid(from)
|| dep != external_pid_dist_entry(from)
|| is_not_internal_pid(to)) {
goto invalid_message;
}
+ if (reason == THE_NON_VALUE) {
+
+#ifdef ERTS_DIST_MSG_DBG
+ dist_msg_dbg(&ede, "MSG", buf, orig_len);
+#endif
+
+ if (!erts_proc_lookup(to)) break; /* Process not alive */
+
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
+ }
+
erts_proc_sig_send_dist_link_exit(dep,
- from, to,
+ from, to, ede_copy,
reason, token);
break;
}
+ case DOP_PAYLOAD_EXIT2_TT:
+ case DOP_PAYLOAD_EXIT2:
case DOP_EXIT2_TT:
- case DOP_EXIT2:
+ case DOP_EXIT2: {
+ ErtsDistExternal *ede_copy = NULL;
+
/* 'from' is send an exit signal to 'to' */
+ from = tuple[2];
+ to = tuple[3];
+
if (type == DOP_EXIT2) {
if (tuple_arity != 4) {
goto invalid_message;
}
- from = tuple[2];
- to = tuple[3];
reason = tuple[4];
token = NIL;
- } else {
+ } else if (type == DOP_EXIT2_TT) {
if (tuple_arity != 5) {
goto invalid_message;
}
- from = tuple[2];
- to = tuple[3];
token = tuple[4];
reason = tuple[5];
- }
- if (is_not_pid(from) || is_not_internal_pid(to)) {
+ } else if (type == DOP_PAYLOAD_EXIT2) {
+ if (tuple_arity != 3) {
+ goto invalid_message;
+ }
+ reason = THE_NON_VALUE;
+ token = NIL;
+ } else {
+ if (tuple_arity != 4) {
+ goto invalid_message;
+ }
+ reason = THE_NON_VALUE;
+ token = tuple[4];
+ }
+ if (is_not_pid(from)
+ || dep != external_pid_dist_entry(from)
+ || is_not_internal_pid(to)) {
goto invalid_message;
}
- erts_proc_sig_send_exit(NULL, from, to, reason, token, 0);
- break;
+ if (reason == THE_NON_VALUE) {
+
+#ifdef ERTS_DIST_MSG_DBG
+ dist_msg_dbg(&ede, "MSG", buf, orig_len);
+#endif
+
+ if (!erts_proc_lookup(to)) break; /* Process not alive */
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
+ }
+
+ erts_proc_sig_send_dist_exit(dep, from, to, ede_copy, reason, token);
+ break;
+ }
case DOP_GROUP_LEADER:
if (tuple_arity != 3) {
goto invalid_message;
@@ -1757,6 +1834,23 @@ data_error:
return -1;
}
+static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, int force_busy)
+{
+ struct erts_dsig_send_context ctx;
+ int ret;
+ ctx.ctl = ctl;
+ ctx.msg = msg;
+ ctx.force_busy = force_busy;
+ ctx.force_encode = 1;
+ ctx.phase = ERTS_DSIG_SEND_PHASE_INIT;
+#ifdef DEBUG
+ ctx.reds = 1; /* provoke assert below (no reduction count with force_encode) */
+#endif
+ ret = erts_dsig_send(dsdp, &ctx);
+ ASSERT(ret != ERTS_DSIG_SEND_CONTINUE);
+ return ret;
+}
+
static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy)
{
struct erts_dsig_send_context ctx;
@@ -1764,6 +1858,7 @@ static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy)
ctx.ctl = ctl;
ctx.msg = THE_NON_VALUE;
ctx.force_busy = force_busy;
+ ctx.force_encode = 1;
ctx.phase = ERTS_DSIG_SEND_PHASE_INIT;
#ifdef DEBUG
ctx.reds = 1; /* provoke assert below (no reduction count without msg) */
@@ -1850,10 +1945,14 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE;
case ERTS_DSIG_SEND_PHASE_MSG_SIZE:
- if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) {
- retval = ERTS_DSIG_SEND_CONTINUE;
- goto done;
- }
+ if (!ctx->force_encode) {
+ if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) {
+ retval = ERTS_DSIG_SEND_CONTINUE;
+ goto done;
+ }
+ } else {
+ erts_encode_dist_ext_size(ctx->msg, ctx->flags, ctx->acmp, &ctx->data_size);
+ }
ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
case ERTS_DSIG_SEND_PHASE_ALLOC:
@@ -1882,10 +1981,15 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE;
case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
- if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) {
- retval = ERTS_DSIG_SEND_CONTINUE;
- goto done;
- }
+ if (!ctx->force_encode) {
+ if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags,
+ ctx->acmp, &ctx->u.ec, &ctx->reds)) {
+ retval = ERTS_DSIG_SEND_CONTINUE;
+ goto done;
+ }
+ } else {
+ erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL);
+ }
ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
case ERTS_DSIG_SEND_PHASE_FIN: {
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index a8ccd1eaec..81a09bc69c 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -47,6 +47,7 @@
#define DFLAG_SEND_SENDER 0x80000
#define DFLAG_BIG_SEQTRACE_LABELS 0x100000
#define DFLAG_NO_MAGIC 0x200000 /* internal for pending connection */
+#define DFLAG_EXIT_PAYLOAD 0x400000
/* Mandatory flags for distribution */
#define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \
@@ -75,7 +76,8 @@
| DFLAG_MAP_TAG \
| DFLAG_BIG_CREATION \
| DFLAG_SEND_SENDER \
- | DFLAG_BIG_SEQTRACE_LABELS)
+ | DFLAG_BIG_SEQTRACE_LABELS \
+ | DFLAG_EXIT_PAYLOAD)
/* Flags addable by local distr implementations */
#define DFLAG_DIST_ADDABLE DFLAG_DIST_DEFAULT
@@ -99,26 +101,35 @@
| DFLAG_BIG_CREATION)
/* opcodes used in distribution messages */
-#define DOP_LINK 1
-#define DOP_SEND 2
-#define DOP_EXIT 3
-#define DOP_UNLINK 4
+enum {
+ DOP_LINK = 1,
+ DOP_SEND = 2,
+ DOP_EXIT = 3,
+ DOP_UNLINK = 4,
/* Ancient DOP_NODE_LINK (5) was here, can be reused */
-#define DOP_REG_SEND 6
-#define DOP_GROUP_LEADER 7
-#define DOP_EXIT2 8
-
-#define DOP_SEND_TT 12
-#define DOP_EXIT_TT 13
-#define DOP_REG_SEND_TT 16
-#define DOP_EXIT2_TT 18
-
-#define DOP_MONITOR_P 19
-#define DOP_DEMONITOR_P 20
-#define DOP_MONITOR_P_EXIT 21
-
-#define DOP_SEND_SENDER 22
-#define DOP_SEND_SENDER_TT 23
+ DOP_REG_SEND = 6,
+ DOP_GROUP_LEADER = 7,
+ DOP_EXIT2 = 8,
+
+ DOP_SEND_TT = 12,
+ DOP_EXIT_TT = 13,
+ DOP_REG_SEND_TT = 16,
+ DOP_EXIT2_TT = 18,
+
+ DOP_MONITOR_P = 19,
+ DOP_DEMONITOR_P = 20,
+ DOP_MONITOR_P_EXIT = 21,
+
+ DOP_SEND_SENDER = 22,
+ DOP_SEND_SENDER_TT = 23,
+
+ /* These are used when DFLAG_EXIT_PAYLOAD is detected */
+ DOP_PAYLOAD_EXIT = 24,
+ DOP_PAYLOAD_EXIT_TT = 25,
+ DOP_PAYLOAD_EXIT2 = 26,
+ DOP_PAYLOAD_EXIT2_TT = 27,
+ DOP_PAYLOAD_MONITOR_P_EXIT = 28
+};
/* distribution trap functions */
extern Export* dmonitor_node_trap;
@@ -346,6 +357,7 @@ struct erts_dsig_send_context {
Eterm ctl;
Eterm msg;
int force_busy;
+ int force_encode;
Uint32 max_finalize_prepend;
Uint data_size, dhdr_ext_size;
ErtsAtomCacheMap *acmp;
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index 942bec84cf..cd7bce8bf4 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -201,34 +201,40 @@ free_message_buffer(ErlHeapFragment* bp)
}while (bp != NULL);
}
+static void
+erts_cleanup_message(ErtsMessage *mp)
+{
+ ErlHeapFragment *bp;
+ if (ERTS_SIG_IS_EXTERNAL_MSG(mp) || ERTS_SIG_IS_NON_MSG(mp)) {
+ ErtsDistExternal *edep = erts_proc_sig_get_external(mp);
+ if (edep) {
+ erts_free_dist_ext_copy(edep);
+ if (mp->data.heap_frag == &mp->hfrag) {
+ ASSERT(ERTS_SIG_IS_EXTERNAL_MSG(mp));
+ mp->data.heap_frag = ERTS_MSG_COMBINED_HFRAG;
+ }
+ }
+ }
+
+ if (ERTS_SIG_IS_MSG(mp) && mp->data.attached != ERTS_MSG_COMBINED_HFRAG) {
+ bp = mp->data.heap_frag;
+ } else {
+ mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
+ bp = mp->hfrag.next;
+ erts_cleanup_offheap(&mp->hfrag.off_heap);
+ }
+
+ if (bp)
+ free_message_buffer(bp);
+}
+
void
erts_cleanup_messages(ErtsMessage *msgp)
{
ErtsMessage *mp = msgp;
while (mp) {
ErtsMessage *fmp;
- ErlHeapFragment *bp;
- if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) {
- if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) {
- bp = (ErlHeapFragment *) mp->data.dist_ext->ext_endp;
- erts_cleanup_offheap(&bp->off_heap);
- }
- if (mp->data.dist_ext)
- erts_free_dist_ext_copy(mp->data.dist_ext);
- }
- else {
- if (ERTS_SIG_IS_INTERNAL_MSG(mp)
- && mp->data.attached != ERTS_MSG_COMBINED_HFRAG) {
- bp = mp->data.heap_frag;
- }
- else {
- mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
- bp = mp->hfrag.next;
- erts_cleanup_offheap(&mp->hfrag.off_heap);
- }
- if (bp)
- free_message_buffer(bp);
- }
+ erts_cleanup_message(mp);
fmp = mp;
mp = mp->next;
erts_free_message(fmp);
@@ -1099,80 +1105,6 @@ change_to_off_heap:
return res;
}
-int
-erts_decode_dist_message(Process *proc, ErtsProcLocks proc_locks,
- ErtsMessage *msgp, int force_off_heap)
-{
- ErtsHeapFactory factory;
- Eterm msg;
- ErlHeapFragment *bp;
- Sint need;
- int decode_in_heap_frag;
-
- decode_in_heap_frag = (force_off_heap
- || !(proc_locks & ERTS_PROC_LOCK_MAIN)
- || (proc->flags & F_OFF_HEAP_MSGQ));
-
- if (msgp->data.dist_ext->heap_size >= 0)
- need = msgp->data.dist_ext->heap_size;
- else {
- need = erts_decode_dist_ext_size(msgp->data.dist_ext);
- if (need < 0) {
- /* bad msg; remove it... */
- if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
- bp = erts_dist_ext_trailer(msgp->data.dist_ext);
- erts_cleanup_offheap(&bp->off_heap);
- }
- erts_free_dist_ext_copy(msgp->data.dist_ext);
- msgp->data.dist_ext = NULL;
- return 0;
- }
-
- msgp->data.dist_ext->heap_size = need;
- }
-
- if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
- bp = erts_dist_ext_trailer(msgp->data.dist_ext);
- need += bp->used_size;
- }
-
- if (decode_in_heap_frag)
- erts_factory_heap_frag_init(&factory, new_message_buffer(need));
- else
- erts_factory_proc_prealloc_init(&factory, proc, need);
-
- ASSERT(msgp->data.dist_ext->heap_size >= 0);
- if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
- ErlHeapFragment *heap_frag;
- heap_frag = erts_dist_ext_trailer(msgp->data.dist_ext);
- ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp),
- heap_frag->used_size,
- &factory.hp,
- factory.off_heap);
- erts_cleanup_offheap(&heap_frag->off_heap);
- }
-
- msg = erts_decode_dist_ext(&factory, msgp->data.dist_ext);
- ERL_MESSAGE_TERM(msgp) = msg;
- erts_free_dist_ext_copy(msgp->data.dist_ext);
- msgp->data.attached = NULL;
-
- if (is_non_value(msg)) {
- erts_factory_undo(&factory);
- return 0;
- }
-
- erts_factory_trim_and_close(&factory, msgp->m,
- ERL_MESSAGE_REF_ARRAY_SZ);
-
- ASSERT(!msgp->data.heap_frag);
-
- if (decode_in_heap_frag)
- msgp->data.heap_frag = factory.heap_frags;
-
- return 1;
-}
-
void erts_factory_proc_init(ErtsHeapFactory* factory,
Process* p)
{
diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h
index b2550814fd..58294648b4 100644
--- a/erts/emulator/beam/erl_message.h
+++ b/erts/emulator/beam/erl_message.h
@@ -455,8 +455,6 @@ Sint erts_move_messages_off_heap(Process *c_p);
Sint erts_complete_off_heap_message_queue_change(Process *c_p);
Eterm erts_change_message_queue_management(Process *c_p, Eterm new_state);
-int erts_decode_dist_message(Process *, ErtsProcLocks, ErtsMessage *, int);
-
void erts_cleanup_messages(ErtsMessage *mp);
void *erts_alloc_message_ref(void);
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 668ee4ce96..1bd219b6d7 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -80,6 +80,8 @@
#define ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO \
ERTS_SIG_Q_TYPE_MAX
+#define ERTS_SIG_IS_EXTERNAL(sig) is_non_value(get_exit_signal_data(sig)->reason)
+
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler);
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_high);
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_max);
@@ -112,6 +114,8 @@ typedef struct {
Eterm message;
Eterm from;
Eterm reason;
+ struct erl_dist_external *dist_ext;
+ ErlHeapFragment *heap_frag;
union {
Eterm ref;
int normal_kills;
@@ -936,13 +940,32 @@ erts_proc_sig_privqs_len(Process *c_p)
return proc_sig_privqs_len(c_p, 0);
}
+ErtsDistExternal *
+erts_proc_sig_get_external(ErtsMessage *msgp)
+{
+ if (ERTS_SIG_IS_EXTERNAL_MSG(msgp)) {
+ return erts_get_dist_ext(msgp->data.heap_frag);
+ } else if (ERTS_SIG_IS_NON_MSG(msgp) && ERTS_SIG_IS_EXTERNAL(msgp)) {
+ ErtsDistExternal *edep;
+ ErtsExitSignalData *xsigd = get_exit_signal_data(msgp);
+ ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT);
+ ASSERT(is_non_value(xsigd->reason));
+ if (msgp->hfrag.next == NULL)
+ edep = (ErtsDistExternal*)((void*)xsigd) + sizeof(ErtsExitSignalData);
+ else
+ edep = erts_get_dist_ext(msgp->hfrag.next);
+ return edep;
+ }
+ return NULL;
+}
+
static void do_seq_trace_output(Eterm to, Eterm token, Eterm msg);
static void
send_gen_exit_signal(Process *c_p, Eterm from_tag,
Eterm from, Eterm to,
- Sint16 op, Eterm reason, Eterm ref,
- Eterm token, int normal_kills)
+ Sint16 op, Eterm reason, ErtsDistExternal *dist_ext,
+ Eterm ref, Eterm token, int normal_kills)
{
ErtsExitSignalData *xsigd;
Eterm *hp, *start_hp, s_reason, s_ref, s_message, s_token, s_from;
@@ -956,6 +979,9 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
Uint utag_sz;
#endif
+ ASSERT((is_value(reason) && dist_ext == NULL) ||
+ (is_non_value(reason) && dist_ext != NULL));
+
ASSERT(is_immed(from_tag));
hsz = sizeof(ErtsExitSignalData)/sizeof(Uint);
@@ -977,33 +1003,35 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
hsz += utag_sz;
#endif
- token_sz = is_immed(token) ? 0 : size_object(token);
+ token_sz = size_object(token);
hsz += token_sz;
- from_sz = is_immed(from) ? 0 : size_object(from);
+ from_sz = size_object(from);
hsz += from_sz;
- reason_sz = is_immed(reason) ? 0 : size_object(reason);
- hsz += reason_sz;
+ ref_sz = size_object(ref);
+ hsz += ref_sz;
- switch (op) {
- case ERTS_SIG_Q_OP_EXIT:
- case ERTS_SIG_Q_OP_EXIT_LINKED: {
- /* {'EXIT', From, Reason} */
- hsz += 4; /* 3-tuple */
- ref_sz = 0;
- break;
- }
- case ERTS_SIG_Q_OP_MONITOR_DOWN: {
- /* {'DOWN', Ref, process, From, Reason} */
- hsz += 6; /* 5-tuple */
- ref_sz = NC_HEAP_SIZE(ref);
- hsz += ref_sz;
- break;
- }
- default:
- ERTS_INTERNAL_ERROR("Invalid exit signal op");
- break;
+ if (is_value(reason)) {
+ reason_sz = size_object(reason);
+ hsz += reason_sz;
+
+ switch (op) {
+ case ERTS_SIG_Q_OP_EXIT:
+ case ERTS_SIG_Q_OP_EXIT_LINKED: {
+ /* {'EXIT', From, Reason} */
+ hsz += 4; /* 3-tuple */
+ break;
+ }
+ case ERTS_SIG_Q_OP_MONITOR_DOWN: {
+ /* {'DOWN', Ref, process, From, Reason} */
+ hsz += 6; /* 5-tuple */
+ break;
+ }
+ default:
+ ERTS_INTERNAL_ERROR("Invalid exit signal op");
+ break;
+ }
}
/*
@@ -1015,35 +1043,33 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
ohp = &hfrag->off_heap;
start_hp = hp;
- s_token = (is_immed(token)
- ? token
- : copy_struct(token, token_sz, &hp, ohp));
-
- s_reason = (is_immed(reason)
- ? reason
- : copy_struct(reason, reason_sz, &hp, ohp));
+ s_token = copy_struct(token, token_sz, &hp, ohp);
+ s_from = copy_struct(from, from_sz, &hp, ohp);
+ s_ref = copy_struct(ref, ref_sz, &hp, ohp);
- s_from = (is_immed(from)
- ? from
- : copy_struct(from, from_sz, &hp, ohp));
+ if (is_value(reason)) {
+ s_reason = copy_struct(reason, reason_sz, &hp, ohp);
- if (!ref_sz)
- s_ref = NIL;
- else
- s_ref = STORE_NC(&hp, ohp, ref);
-
- switch (op) {
- case ERTS_SIG_Q_OP_EXIT:
- case ERTS_SIG_Q_OP_EXIT_LINKED:
- /* {'EXIT', From, Reason} */
- s_message = TUPLE3(hp, am_EXIT, s_from, s_reason);
- hp += 4;
- break;
- case ERTS_SIG_Q_OP_MONITOR_DOWN:
- /* {'DOWN', Ref, process, From, Reason} */
- s_message = TUPLE5(hp, am_DOWN, s_ref, am_process, s_from, s_reason);
- hp += 6;
- break;
+ switch (op) {
+ case ERTS_SIG_Q_OP_EXIT:
+ case ERTS_SIG_Q_OP_EXIT_LINKED:
+ /* {'EXIT', From, Reason} */
+ s_message = TUPLE3(hp, am_EXIT, s_from, s_reason);
+ hp += 4;
+ break;
+ case ERTS_SIG_Q_OP_MONITOR_DOWN:
+ /* {'DOWN', Ref, process, From, Reason} */
+ s_message = TUPLE5(hp, am_DOWN, s_ref, am_process, s_from, s_reason);
+ hp += 6;
+ break;
+ default:
+ /* This cannot happen, used to silence gcc warning */
+ s_message = THE_NON_VALUE;
+ break;
+ }
+ } else {
+ s_message = THE_NON_VALUE;
+ s_reason = THE_NON_VALUE;
}
#ifdef USE_VM_PROBES
@@ -1066,6 +1092,9 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
xsigd->message = s_message;
xsigd->from = s_from;
xsigd->reason = s_reason;
+ xsigd->dist_ext = dist_ext;
+ xsigd->heap_frag = NULL;
+
if (is_nil(s_ref))
xsigd->u.normal_kills = normal_kills;
else {
@@ -1205,7 +1234,18 @@ erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to,
from_tag = dep->sysname;
}
send_gen_exit_signal(c_p, from_tag, from, to, ERTS_SIG_Q_OP_EXIT,
- reason, NIL, token, normal_kills);
+ reason, NULL, NIL, token, normal_kills);
+}
+
+void
+erts_proc_sig_send_dist_exit(DistEntry *dep,
+ Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
+ Eterm reason, Eterm token)
+{
+ send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT,
+ reason, dist_ext, NIL, token, 0);
+
}
void
@@ -1219,7 +1259,7 @@ erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk,
if (is_not_immed(reason) || is_not_nil(token)) {
ASSERT(is_internal_pid(from) || is_internal_port(from));
send_gen_exit_signal(c_p, from, from, to, ERTS_SIG_Q_OP_EXIT_LINKED,
- reason, NIL, token, 0);
+ reason, NULL, NIL, token, 0);
}
else {
/* Pass signal using old link structure... */
@@ -1274,10 +1314,12 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk)
void
erts_proc_sig_send_dist_link_exit(DistEntry *dep,
Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
Eterm reason, Eterm token)
{
send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT_LINKED,
- reason, NIL, token, 0);
+ reason, dist_ext, NIL, token, 0);
+
}
void
@@ -1299,16 +1341,17 @@ erts_proc_sig_send_dist_unlink(DistEntry *dep, Eterm from, Eterm to)
void
erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref,
Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
Eterm reason)
{
Eterm monitored, heap[3];
- if (is_atom(from))
+ if (is_atom(from))
monitored = TUPLE2(&heap[0], from, dep->sysname);
else
monitored = from;
send_gen_exit_signal(NULL, dep->sysname, monitored,
to, ERTS_SIG_Q_OP_MONITOR_DOWN,
- reason, ref, NIL, 0);
+ reason, dist_ext, ref, NIL, 0);
}
void
@@ -1376,10 +1419,10 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason)
|| is_internal_pid(from_tag)
|| is_atom(from_tag));
monitored = TUPLE2(&heap[0], name, node);
- }
+ }
send_gen_exit_signal(NULL, from_tag, monitored,
to, ERTS_SIG_Q_OP_MONITOR_DOWN,
- reason, mdp->ref, NIL, 0);
+ reason, NULL, mdp->ref, NIL, 0);
}
erts_monitor_release(mon);
}
@@ -2037,7 +2080,6 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing,
if (type == ERTS_SIG_Q_TYPE_GEN_EXIT) {
xsigd = get_exit_signal_data(sig);
from = xsigd->from;
- reason = xsigd->reason;
if (op != ERTS_SIG_Q_OP_EXIT_LINKED)
ignore = 0;
else {
@@ -2062,13 +2104,28 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing,
}
}
+ /* This GEN_EXIT was received from another node, decode the exit reason */
+ if (ERTS_SIG_IS_EXTERNAL(sig))
+ erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1);
+
+ reason = xsigd->reason;
+
+ if (is_non_value(reason)) {
+ /* Bad distribution message; remove it from queue... */
+ ignore = !0;
+ destroy = !0;
+ }
+
if (!ignore) {
if ((op != ERTS_SIG_Q_OP_EXIT || reason != am_kill)
&& (c_p->flags & F_TRAP_EXIT)) {
convert_prepared_sig_to_msg(c_p, sig,
xsigd->message, next_nm_sig);
+ ASSERT(sig->hfrag.next == NULL);
+ sig->hfrag.next = xsigd->heap_frag;
conv_msg = sig;
+
}
else if (reason == am_normal && !xsigd->u.normal_kills) {
/* Ignore it... */
@@ -2929,6 +2986,130 @@ handle_sync_suspend(Process *c_p, ErtsMessage *mp)
}
}
+int
+erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks,
+ ErtsMessage *msgp, int force_off_heap)
+{
+ ErtsHeapFactory factory;
+ Eterm msg;
+ ErlHeapFragment *bp;
+ Sint need;
+ int decode_in_heap_frag;
+ ErtsDistExternal *dist_ext;
+ ErtsExitSignalData *xsigd = NULL;
+
+ decode_in_heap_frag = (force_off_heap
+ || !(proc_locks & ERTS_PROC_LOCK_MAIN)
+ || (proc->flags & F_OFF_HEAP_MSGQ));
+
+ if (ERTS_SIG_IS_EXTERNAL_MSG(msgp))
+ dist_ext = msgp->data.dist_ext;
+ else {
+ xsigd = get_exit_signal_data(msgp);
+ ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT);
+ ASSERT(is_non_value(xsigd->reason));
+ dist_ext = xsigd->dist_ext;
+ }
+
+ if (dist_ext->heap_size >= 0)
+ need = dist_ext->heap_size;
+ else {
+ need = erts_decode_dist_ext_size(dist_ext);
+ if (need < 0) {
+ /* bad msg; remove it... */
+ if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
+ bp = erts_dist_ext_trailer(dist_ext);
+ erts_cleanup_offheap(&bp->off_heap);
+ }
+ erts_free_dist_ext_copy(dist_ext);
+ dist_ext = NULL;
+ return 0;
+ }
+
+ dist_ext->heap_size = need;
+ }
+
+ if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
+ bp = erts_dist_ext_trailer(dist_ext);
+ need += bp->used_size;
+ }
+
+ if (xsigd) {
+ switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) {
+ case ERTS_SIG_Q_OP_EXIT:
+ case ERTS_SIG_Q_OP_EXIT_LINKED:
+ /* {'EXIT', From, Reason} */
+ need += 4;
+ break;
+ case ERTS_SIG_Q_OP_MONITOR_DOWN:
+ /* {'DOWN', Ref, process, From, Reason} */
+ need += 6; /* 5-tuple */
+ break;
+ default:
+ ERTS_INTERNAL_ERROR("Invalid exit signal op");
+ break;
+ }
+ }
+
+ if (decode_in_heap_frag)
+ erts_factory_heap_frag_init(&factory, new_message_buffer(need));
+ else
+ erts_factory_proc_prealloc_init(&factory, proc, need);
+
+ ASSERT(dist_ext->heap_size >= 0);
+ if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
+ ErlHeapFragment *heap_frag;
+ heap_frag = erts_dist_ext_trailer(dist_ext);
+ ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp),
+ heap_frag->used_size,
+ &factory.hp,
+ factory.off_heap);
+ erts_cleanup_offheap(&heap_frag->off_heap);
+ }
+
+ msg = erts_decode_dist_ext(&factory, dist_ext);
+ if (!xsigd) {
+ ERL_MESSAGE_TERM(msgp) = msg;
+ msgp->data.attached = NULL;
+ } else {
+ switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) {
+ case ERTS_SIG_Q_OP_EXIT:
+ case ERTS_SIG_Q_OP_EXIT_LINKED:
+ /* {'EXIT', From, Reason} */
+ erts_reserve_heap(&factory, 4);
+ xsigd->message = TUPLE3(factory.hp, am_EXIT, xsigd->from, msg);
+ factory.hp += 4;
+ break;
+ case ERTS_SIG_Q_OP_MONITOR_DOWN:
+ /* {'DOWN', Ref, process, From, Reason} */
+ erts_reserve_heap(&factory, 6);
+ xsigd->message = TUPLE5(factory.hp, am_DOWN, xsigd->u.ref, am_process, xsigd->from, msg);
+ factory.hp += 6;
+ break;
+ }
+ xsigd->reason = msg;
+ }
+ erts_free_dist_ext_copy(dist_ext);
+
+ if (is_non_value(msg)) {
+ erts_factory_undo(&factory);
+ return 0;
+ }
+
+ erts_factory_close(&factory);
+
+ ASSERT(!msgp->data.heap_frag || xsigd);
+
+ if (decode_in_heap_frag) {
+ if (!xsigd)
+ msgp->data.heap_frag = factory.heap_frags;
+ else
+ xsigd->heap_frag = factory.heap_frags;
+ }
+
+ return 1;
+}
+
void
erts_proc_sig_handle_pending_suspend(Process *c_p)
{
@@ -3045,7 +3226,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
ASSERT(ERTS_SIG_IS_NON_MSG(sig));
tag = ((ErtsSignal *) sig)->common.tag;
-
+
switch (ERTS_PROC_SIG_OP(tag)) {
case ERTS_SIG_Q_OP_EXIT:
@@ -3091,6 +3272,11 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
break;
case ERTS_SIG_Q_TYPE_GEN_EXIT:
xsigd = get_exit_signal_data(sig);
+
+ /* This GEN_EXIT was received from another node, decode the exit reason */
+ if (ERTS_SIG_IS_EXTERNAL(sig))
+ erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1);
+
omon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p),
xsigd->u.ref);
if (omon) {
@@ -4190,8 +4376,8 @@ handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing,
}
if (ERTS_SIG_IS_EXTERNAL_MSG(sig)) {
cnt++;
- if (!erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN,
- sig, 0)) {
+ if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN,
+ sig, 0)) {
/* Bad dist message; remove it... */
remove_mq_m_sig(c_p, sig, next_sig, next_nm_sig);
sig = *next_sig;
@@ -4266,7 +4452,7 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) {
/* decode it... */
if (mp->data.attached)
- erts_decode_dist_message(rp, rp_locks, mp, !0);
+ erts_proc_sig_decode_dist(rp, rp_locks, mp, !0);
msg = ERL_MESSAGE_TERM(mp);
diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h
index 5a02708bb7..39cee6f230 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.h
+++ b/erts/emulator/beam/erl_proc_sig_queue.h
@@ -89,6 +89,7 @@
#endif
struct erl_mesg;
+struct erl_dist_external;
typedef struct {
struct erl_mesg *next;
@@ -212,6 +213,34 @@ erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to,
/**
*
+ * @brief Send an exit signal to a process.
+ *
+ * This function is used instead of erts_proc_sig_send_link_exit()
+ * when the signal arrives via the distribution and
+ * therefore no link structure is available.
+ *
+ * @param[in] dep Distribution entry of channel
+ * that the signal arrived on.
+ *
+ * @param[in] from Identifier of sender.
+ *
+ * @param[in] to Identifier of receiver.
+ *
+ * @param[in] dist_ext The exit reason in external term format
+ *
+ * @param[in] reason Exit reason.
+ *
+ * @param[in] token Seq trace token.
+ *
+ */
+void
+erts_proc_sig_send_dist_exit(DistEntry *dep,
+ Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
+ Eterm reason, Eterm token);
+
+/**
+ *
* @brief Send an exit signal due to broken link to a process.
*
*
@@ -282,7 +311,7 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk);
*
* This function is used instead of erts_proc_sig_send_link_exit()
* when the signal arrives via the distribution and
- * no link structure is available.
+ * therefore no link structure is available.
*
* @param[in] dep Distribution entry of channel
* that the signal arrived on.
@@ -291,6 +320,8 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk);
*
* @param[in] to Identifier of receiver.
*
+ * @param[in] dist_ext The exit reason in external term format
+ *
* @param[in] reason Exit reason.
*
* @param[in] token Seq trace token.
@@ -299,6 +330,7 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk);
void
erts_proc_sig_send_dist_link_exit(struct dist_entry_ *dep,
Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
Eterm reason, Eterm token);
/**
@@ -307,7 +339,7 @@ erts_proc_sig_send_dist_link_exit(struct dist_entry_ *dep,
*
* This function is used instead of erts_proc_sig_send_unlink()
* when the signal arrives via the distribution and
- * no link structure is available.
+ * therefore no link structure is available.
*
* @param[in] dep Distribution entry of channel
* that the signal arrived on.
@@ -380,7 +412,7 @@ erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to);
*
* This function is used instead of erts_proc_sig_send_monitor_down()
* when the signal arrives via the distribution and
- * no link structure is available.
+ * therefore no monitor structure is available.
*
* @param[in] dep Pointer to distribution entry
* of channel that the signal
@@ -392,12 +424,15 @@ erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to);
*
* @param[in] to Identifier of receiver.
*
+ * @param[in] dist_ext The exit reason in external term format
+ *
* @param[in] reason Exit reason.
*
*/
void
erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref,
Eterm from, Eterm to,
+ ErtsDistExternal *dist_ext,
Eterm reason);
/**
@@ -962,6 +997,34 @@ void
erts_proc_sig_handle_pending_suspend(Process *c_p);
/**
+ *
+ * @brief Decode the reason term in an external signal
+ *
+ * Any distributed signal with a payload only has the control
+ * message decoded by the dist entry. The final decode of the
+ * payload is done by the process when it inspects the signal
+ * by calling this function.
+ *
+ * This functions handles both messages and link/monitor exits.
+ *
+ * Return true if the decode was successful, false otherwise.
+ *
+ * @param[in] c_p Pointer to executing process
+ *
+ * @param[in] proc_lock Locks held by process. Should always be MAIN.
+ *
+ * @param[in] msgp The signal to decode
+ *
+ * @param[in] force_off_heap If the term should be forced to be off-heap
+ */
+int
+erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks,
+ ErtsMessage *msgp, int force_off_heap);
+
+ErtsDistExternal *
+erts_proc_sig_get_external(ErtsMessage *msgp);
+
+/**
* @brief Initialize this functionality
*/
void erts_proc_sig_queue_init(void);
diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c
index 4b8040b356..640ca338d9 100644
--- a/erts/emulator/beam/external.c
+++ b/erts/emulator/beam/external.c
@@ -636,11 +636,13 @@ byte* erts_encode_ext_ets(Eterm term, byte *ep, struct erl_off_heap_header** off
}
ErtsDistExternal *
-erts_make_dist_ext_copy(ErtsDistExternal *edep, Uint xsize)
+erts_make_dist_ext_copy(ErtsDistExternal *edep, Eterm *token)
{
size_t align_sz;
size_t dist_ext_sz;
size_t ext_sz = 0;
+ size_t token_sz = 0;
+ Eterm token_size;
byte *ep;
ErtsDistExternal *new_edep;
@@ -652,8 +654,12 @@ erts_make_dist_ext_copy(ErtsDistExternal *edep, Uint xsize)
align_sz = ERTS_EXTRA_DATA_ALIGN_SZ(dist_ext_sz + ext_sz);
+ token_size = size_object(*token);
+ if (token_size)
+ token_sz = ERTS_HEAP_FRAG_SIZE(token_size);
+
new_edep = erts_alloc(ERTS_ALC_T_EXT_TERM_DATA,
- dist_ext_sz + ext_sz + align_sz + xsize);
+ dist_ext_sz + ext_sz + align_sz + token_sz);
ep = (byte *) new_edep;
sys_memcpy((void *) ep, (void *) edep, dist_ext_sz);
@@ -669,6 +675,18 @@ erts_make_dist_ext_copy(ErtsDistExternal *edep, Uint xsize)
} else {
erts_refc_inc(&new_edep->binp->intern.refc, 2);
}
+
+ /* Copy the seq_trace token */
+ if (is_not_nil(*token)) {
+ ErlHeapFragment *heap_frag;
+ ErlOffHeap *ohp;
+ Eterm *hp;
+ heap_frag = erts_dist_ext_trailer(new_edep);
+ ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
+ hp = heap_frag->mem;
+ ohp = &heap_frag->off_heap;
+ *token = copy_struct(*token, token_size, &hp, ohp);
+ }
return new_edep;
}
diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h
index 1223f90fcc..91c028c9bd 100644
--- a/erts/emulator/beam/external.h
+++ b/erts/emulator/beam/external.h
@@ -176,7 +176,7 @@ byte* erts_encode_ext_ets(Eterm, byte *, struct erl_off_heap_header** ext_off_he
void erts_free_dist_ext_copy(ErtsDistExternal *);
ERTS_GLB_INLINE void *erts_dist_ext_trailer(ErtsDistExternal *);
-ErtsDistExternal *erts_make_dist_ext_copy(ErtsDistExternal *, Uint);
+ErtsDistExternal *erts_make_dist_ext_copy(ErtsDistExternal *, Eterm *);
void *erts_dist_ext_trailer(ErtsDistExternal *);
void erts_destroy_dist_ext_copy(ErtsDistExternal *);
diff --git a/erts/emulator/beam/msg_instrs.tab b/erts/emulator/beam/msg_instrs.tab
index 9bf3aefaca..6f8d1469ef 100644
--- a/erts/emulator/beam/msg_instrs.tab
+++ b/erts/emulator/beam/msg_instrs.tab
@@ -137,8 +137,8 @@ i_loop_rec(Dest) {
if (ERTS_UNLIKELY(ERTS_SIG_IS_EXTERNAL_MSG(msgp))) {
FCALLS -= 10; /* FIXME: bump appropriate amount... */
- SWAPOUT; /* erts_decode_dist_message() may write to heap... */
- if (!erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) {
+ SWAPOUT; /* erts_proc_sig_decode_dist() may write to heap... */
+ if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) {
/*
* A corrupt distribution message that we weren't able to decode;
* remove it...