aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
authorLukas Larsson <[email protected]>2018-09-19 16:37:55 +0200
committerLukas Larsson <[email protected]>2019-02-21 16:38:04 +0100
commit9a7df41a9e5361049a17c8aa971b52599fba2632 (patch)
tree9084706715fd064a1011753e2e72fe49e05875ba /erts/emulator/beam/dist.c
parent0184c2e0438ac42c44d14e92b13e2807e0a670b3 (diff)
downloadotp-9a7df41a9e5361049a17c8aa971b52599fba2632.tar.gz
otp-9a7df41a9e5361049a17c8aa971b52599fba2632.tar.bz2
otp-9a7df41a9e5361049a17c8aa971b52599fba2632.zip
erts: Move reason in dist messages to payload
The dist messages EXIT, EXIT2 and MONITOR_DOWN have been updated with new versions that send the reason term as part of the payload of the message instead of as part of the control message. This allows the decode of the reason to be done by the receiving process instead of the dist entry which in turn makes it possible for multiple decodes to be done in parallel. This change is done in order to make it easier to fragment the potentially large payload of EXIT, EXIT2 and MONITOR_DOWN into multiple distribution messages. OTP-15611
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c284
1 files changed, 194 insertions, 90 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 3b6c0e5db9..91a9481d84 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -50,7 +50,8 @@
#define DIST_CTL_DEFAULT_SIZE 64
/* Turn this on to get printouts of all distribution messages
- * which go on the line
+ * which go on the line. Enabling this may make some testcases
+ * fail. Especially the broken dist testcases in distribution_SUITE.
*/
#if 0
#define ERTS_DIST_MSG_DBG
@@ -71,8 +72,6 @@ static void
dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
{
ErtsHeapFactory factory;
- DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
- Eterm* ctl = ctl_default;
byte *extp = edep->extp;
Eterm msg;
Sint ctl_len;
@@ -116,6 +115,7 @@ static Export *dist_ctrl_put_data_trap;
/* forward declarations */
+static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, int force_busy);
static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy);
static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm);
static void init_nodes_monitors(void);
@@ -848,7 +848,7 @@ int
erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
Eterm ref, Eterm reason)
{
- Eterm ctl;
+ Eterm ctl, msg;
DeclareTmpHeapNoproc(ctl_heap,6);
int res;
@@ -861,10 +861,17 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
UseTmpHeapNoproc(6);
- ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT),
- watched, watcher, ref, reason);
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE4(&ctl_heap[0], make_small(DOP_PAYLOAD_MONITOR_P_EXIT),
+ watched, watcher, ref);
+ msg = reason;
+ } else {
+ ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT),
+ watched, watcher, ref, reason);
+ msg = THE_NON_VALUE;
+ }
- res = dsig_send_ctl(dsdp, ctl, 1);
+ res = dsig_send_exit(dsdp, ctl, msg, 1);
UnUseTmpHeapNoproc(6);
return res;
}
@@ -1018,6 +1025,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
ctx->dss.ctl = ctl;
ctx->dss.msg = message;
ctx->dss.force_busy = 0;
+ ctx->dss.force_encode = 0;
res = erts_dsig_send(&ctx->dsd, &ctx->dss);
return res;
}
@@ -1077,6 +1085,7 @@ erts_dsig_send_reg_msg(Eterm remote_name, Eterm message,
ctx->dss.ctl = ctl;
ctx->dss.msg = message;
ctx->dss.force_busy = 0;
+ ctx->dss.force_encode = 0;
res = erts_dsig_send(&ctx->dsd, &ctx->dss);
return res;
}
@@ -1086,7 +1095,7 @@ int
erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
Eterm reason, Eterm token)
{
- Eterm ctl;
+ Eterm ctl, msg = THE_NON_VALUE;
DeclareTmpHeapNoproc(ctl_heap,6);
int res;
#ifdef USE_VM_PROBES
@@ -1101,13 +1110,24 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
#endif
UseTmpHeapNoproc(6);
+
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD)
+ msg = reason;
+
if (have_seqtrace(token)) {
seq_trace_update_send(dsdp->proc);
seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local);
- ctl = TUPLE5(&ctl_heap[0],
- make_small(DOP_EXIT_TT), local, remote, token, reason);
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE4(&ctl_heap[0],
+ make_small(DOP_PAYLOAD_EXIT_TT), local, remote, token);
+ } else
+ ctl = TUPLE5(&ctl_heap[0],
+ make_small(DOP_EXIT_TT), local, remote, token, reason);
} else {
- ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD)
+ ctl = TUPLE3(&ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote);
+ else
+ ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
}
#ifdef USE_VM_PROBES
*node_name = *sender_name = *remote_name = '\0';
@@ -1129,8 +1149,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
#endif
DTRACE7(process_exit_signal_remote, sender_name, node_name,
remote_name, reason_str, tok_label, tok_lastcnt, tok_serial);
- /* forced, i.e ignore busy */
- res = dsig_send_ctl(dsdp, ctl, 1);
+ res = dsig_send_exit(dsdp, ctl, msg, 1);
UnUseTmpHeapNoproc(6);
return res;
}
@@ -1140,13 +1159,18 @@ erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason)
{
DeclareTmpHeapNoproc(ctl_heap,5);
int res;
- Eterm ctl;
+ Eterm ctl, msg = dsdp->dep->flags & DFLAG_EXIT_PAYLOAD ? reason : THE_NON_VALUE;
UseTmpHeapNoproc(5);
- ctl = TUPLE4(&ctl_heap[0],
- make_small(DOP_EXIT), local, remote, reason);
- /* forced, i.e ignore busy */
- res = dsig_send_ctl(dsdp, ctl, 1);
+
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE3(&ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote);
+ msg = reason;
+ } else {
+ ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
+ msg = THE_NON_VALUE;
+ }
+ res = dsig_send_exit(dsdp, ctl, msg, 1);
UnUseTmpHeapNoproc(5);
return res;
}
@@ -1156,13 +1180,20 @@ erts_dsig_send_exit2(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason
{
DeclareTmpHeapNoproc(ctl_heap,5);
int res;
- Eterm ctl;
+ Eterm ctl, msg;
UseTmpHeapNoproc(5);
- ctl = TUPLE4(&ctl_heap[0],
- make_small(DOP_EXIT2), local, remote, reason);
+ if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) {
+ ctl = TUPLE3(&ctl_heap[0],
+ make_small(DOP_PAYLOAD_EXIT2), local, remote);
+ msg = reason;
+ } else {
+ ctl = TUPLE4(&ctl_heap[0],
+ make_small(DOP_EXIT2), local, remote, reason);
+ msg = THE_NON_VALUE;
+ }
- res = dsig_send_ctl(dsdp, ctl, 0);
+ res = dsig_send_exit(dsdp, ctl, msg, 0);
UnUseTmpHeapNoproc(5);
return res;
}
@@ -1227,10 +1258,8 @@ int erts_net_message(Port *prt,
DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
Eterm* ctl = ctl_default;
ErtsHeapFactory factory;
- Eterm* hp;
Sint type;
Eterm token;
- Eterm token_size;
Uint tuple_arity;
int res;
#ifdef ERTS_DIST_MSG_DBG
@@ -1290,7 +1319,6 @@ int erts_net_message(Port *prt,
if (ctl_len > DIST_CTL_DEFAULT_SIZE) {
ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm));
}
- hp = ctl;
erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF);
arg = erts_decode_dist_ext(&factory, &ede);
@@ -1313,7 +1341,6 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- token_size = 0;
token = NIL;
switch (type = unsigned_val(tuple[1])) {
@@ -1508,7 +1535,6 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- token_size = size_object(tuple[5]);
/* Fall through ... */
case DOP_REG_SEND:
/* {DOP_REG_SEND, From, Cookie, ToName} -- Message */
@@ -1533,35 +1559,25 @@ int erts_net_message(Port *prt,
}
rp = erts_whereis_process(NULL, 0, to, 0, 0);
if (rp) {
- Uint xsize = (type == DOP_REG_SEND
- ? 0
- : ERTS_HEAP_FRAG_SIZE(token_size));
ErtsProcLocks locks = 0;
ErtsDistExternal *ede_copy;
- ede_copy = erts_make_dist_ext_copy(&ede, xsize);
if (type == DOP_REG_SEND) {
token = NIL;
} else {
- ErlHeapFragment *heap_frag;
- ErlOffHeap *ohp;
- ASSERT(xsize);
- heap_frag = erts_dist_ext_trailer(ede_copy);
- ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
- hp = heap_frag->mem;
- ohp = &heap_frag->off_heap;
token = tuple[5];
- token = copy_struct(token, token_size, &hp, ohp);
}
- erts_queue_dist_message(rp, locks, ede_copy, token, from);
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
+
+ erts_queue_dist_message(rp, locks, ede_copy, token, from);
+
if (locks)
erts_proc_unlock(rp, locks);
}
break;
case DOP_SEND_SENDER_TT: {
- Uint xsize;
case DOP_SEND_TT:
if (tuple_arity != 4) {
@@ -1569,15 +1585,12 @@ int erts_net_message(Port *prt,
}
token = tuple[4];
- token_size = size_object(token);
- xsize = ERTS_HEAP_FRAG_SIZE(token_size);
goto send_common;
case DOP_SEND_SENDER:
case DOP_SEND:
token = NIL;
- xsize = 0;
if (tuple_arity != 3)
goto invalid_message;
@@ -1604,17 +1617,7 @@ int erts_net_message(Port *prt,
ErtsProcLocks locks = 0;
ErtsDistExternal *ede_copy;
- ede_copy = erts_make_dist_ext_copy(&ede, xsize);
- if (is_not_nil(token)) {
- ErlHeapFragment *heap_frag;
- ErlOffHeap *ohp;
- ASSERT(xsize);
- heap_frag = erts_dist_ext_trailer(ede_copy);
- ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
- hp = heap_frag->mem;
- ohp = &heap_frag->off_heap;
- token = copy_struct(token, token_size, &hp, ohp);
- }
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
erts_queue_dist_message(rp, locks, ede_copy, token, am_Empty);
if (locks)
@@ -1623,18 +1626,28 @@ int erts_net_message(Port *prt,
break;
}
+ case DOP_PAYLOAD_MONITOR_P_EXIT:
case DOP_MONITOR_P_EXIT: {
+ ErtsDistExternal *ede_copy = NULL;
+
/* We are monitoring a process on the remote node which dies, we get
{DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */
-
- if (tuple_arity != 5) {
- goto invalid_message;
- }
- watched = tuple[2]; /* remote proc or name which died */
- watcher = tuple[3];
+ watched = tuple[2]; /* remote proc or name which died */
+ watcher = tuple[3];
ref = tuple[4];
- reason = tuple[5];
+
+ if (type == DOP_PAYLOAD_MONITOR_P_EXIT) {
+ if (tuple_arity != 4) {
+ goto invalid_message;
+ }
+ reason = THE_NON_VALUE;
+ } else {
+ if (tuple_arity != 5) {
+ goto invalid_message;
+ }
+ reason = tuple[5];
+ }
if (is_not_ref(ref))
goto invalid_message;
@@ -1650,70 +1663,134 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
+ if (reason == THE_NON_VALUE) {
+
+#ifdef ERTS_DIST_MSG_DBG
+ dist_msg_dbg(&ede, "MSG", buf, orig_len);
+#endif
+
+ if (!erts_proc_lookup(watcher)) break; /* Process not alive */
+
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
+ }
+
erts_proc_sig_send_dist_monitor_down(dep, ref, watched,
- watcher, reason);
+ watcher, ede_copy, reason);
break;
}
+ case DOP_PAYLOAD_EXIT:
+ case DOP_PAYLOAD_EXIT_TT:
case DOP_EXIT_TT:
case DOP_EXIT: {
+ ErtsDistExternal *ede_copy = NULL;
+
/* 'from', which 'to' is linked to, died */
+ from = tuple[2];
+ to = tuple[3];
+
if (type == DOP_EXIT) {
if (tuple_arity != 4) {
goto invalid_message;
}
-
- from = tuple[2];
- to = tuple[3];
- reason = tuple[4];
token = NIL;
- } else {
+ reason = tuple[4];
+ } else if (type == DOP_EXIT_TT){
if (tuple_arity != 5) {
goto invalid_message;
}
- from = tuple[2];
- to = tuple[3];
token = tuple[4];
reason = tuple[5];
- }
+ } else if (type == DOP_PAYLOAD_EXIT) {
+ if (tuple_arity != 3) {
+ goto invalid_message;
+ }
+ token = NIL;
+ reason = THE_NON_VALUE;
+ } else {
+ if (tuple_arity != 4) {
+ goto invalid_message;
+ }
+ token = tuple[4];
+ reason = THE_NON_VALUE;
+ }
if (is_not_external_pid(from)
|| dep != external_pid_dist_entry(from)
|| is_not_internal_pid(to)) {
goto invalid_message;
}
+ if (reason == THE_NON_VALUE) {
+
+#ifdef ERTS_DIST_MSG_DBG
+ dist_msg_dbg(&ede, "MSG", buf, orig_len);
+#endif
+
+ if (!erts_proc_lookup(to)) break; /* Process not alive */
+
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
+ }
+
erts_proc_sig_send_dist_link_exit(dep,
- from, to,
+ from, to, ede_copy,
reason, token);
break;
}
+ case DOP_PAYLOAD_EXIT2_TT:
+ case DOP_PAYLOAD_EXIT2:
case DOP_EXIT2_TT:
- case DOP_EXIT2:
+ case DOP_EXIT2: {
+ ErtsDistExternal *ede_copy = NULL;
+
/* 'from' is send an exit signal to 'to' */
+ from = tuple[2];
+ to = tuple[3];
+
if (type == DOP_EXIT2) {
if (tuple_arity != 4) {
goto invalid_message;
}
- from = tuple[2];
- to = tuple[3];
reason = tuple[4];
token = NIL;
- } else {
+ } else if (type == DOP_EXIT2_TT) {
if (tuple_arity != 5) {
goto invalid_message;
}
- from = tuple[2];
- to = tuple[3];
token = tuple[4];
reason = tuple[5];
- }
- if (is_not_pid(from) || is_not_internal_pid(to)) {
+ } else if (type == DOP_PAYLOAD_EXIT2) {
+ if (tuple_arity != 3) {
+ goto invalid_message;
+ }
+ reason = THE_NON_VALUE;
+ token = NIL;
+ } else {
+ if (tuple_arity != 4) {
+ goto invalid_message;
+ }
+ reason = THE_NON_VALUE;
+ token = tuple[4];
+ }
+ if (is_not_pid(from)
+ || dep != external_pid_dist_entry(from)
+ || is_not_internal_pid(to)) {
goto invalid_message;
}
- erts_proc_sig_send_exit(NULL, from, to, reason, token, 0);
- break;
+ if (reason == THE_NON_VALUE) {
+
+#ifdef ERTS_DIST_MSG_DBG
+ dist_msg_dbg(&ede, "MSG", buf, orig_len);
+#endif
+
+ if (!erts_proc_lookup(to)) break; /* Process not alive */
+ ede_copy = erts_make_dist_ext_copy(&ede, &token);
+ }
+
+ erts_proc_sig_send_dist_exit(dep, from, to, ede_copy, reason, token);
+ break;
+ }
case DOP_GROUP_LEADER:
if (tuple_arity != 3) {
goto invalid_message;
@@ -1757,6 +1834,23 @@ data_error:
return -1;
}
+static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, int force_busy)
+{
+ struct erts_dsig_send_context ctx;
+ int ret;
+ ctx.ctl = ctl;
+ ctx.msg = msg;
+ ctx.force_busy = force_busy;
+ ctx.force_encode = 1;
+ ctx.phase = ERTS_DSIG_SEND_PHASE_INIT;
+#ifdef DEBUG
+ ctx.reds = 1; /* provoke assert below (no reduction count with force_encode) */
+#endif
+ ret = erts_dsig_send(dsdp, &ctx);
+ ASSERT(ret != ERTS_DSIG_SEND_CONTINUE);
+ return ret;
+}
+
static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy)
{
struct erts_dsig_send_context ctx;
@@ -1764,6 +1858,7 @@ static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy)
ctx.ctl = ctl;
ctx.msg = THE_NON_VALUE;
ctx.force_busy = force_busy;
+ ctx.force_encode = 1;
ctx.phase = ERTS_DSIG_SEND_PHASE_INIT;
#ifdef DEBUG
ctx.reds = 1; /* provoke assert below (no reduction count without msg) */
@@ -1850,10 +1945,14 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE;
case ERTS_DSIG_SEND_PHASE_MSG_SIZE:
- if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) {
- retval = ERTS_DSIG_SEND_CONTINUE;
- goto done;
- }
+ if (!ctx->force_encode) {
+ if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) {
+ retval = ERTS_DSIG_SEND_CONTINUE;
+ goto done;
+ }
+ } else {
+ erts_encode_dist_ext_size(ctx->msg, ctx->flags, ctx->acmp, &ctx->data_size);
+ }
ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
case ERTS_DSIG_SEND_PHASE_ALLOC:
@@ -1882,10 +1981,15 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE;
case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
- if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) {
- retval = ERTS_DSIG_SEND_CONTINUE;
- goto done;
- }
+ if (!ctx->force_encode) {
+ if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags,
+ ctx->acmp, &ctx->u.ec, &ctx->reds)) {
+ retval = ERTS_DSIG_SEND_CONTINUE;
+ goto done;
+ }
+ } else {
+ erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL);
+ }
ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
case ERTS_DSIG_SEND_PHASE_FIN: {