aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/beam/dist.c63
-rw-r--r--erts/emulator/beam/dist.h2
-rw-r--r--erts/emulator/beam/erl_node_tables.h1
-rw-r--r--erts/emulator/beam/external.c44
-rw-r--r--erts/emulator/beam/external.h2
5 files changed, 66 insertions, 46 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index cd4125e404..6d2e907f18 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -1879,33 +1879,32 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) {
ctx->acmp = erts_get_atom_cache_map(ctx->c_p);
- ctx->pass_through_size = 0;
+ ctx->max_finalize_prepend = 0;
}
else {
ctx->acmp = NULL;
- ctx->pass_through_size = 3; /* SVERK rename */
+ ctx->max_finalize_prepend = 3;
}
#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, ">>%s CTL: %T\n", ctx->pass_through_size ? "P" : " ", ctx->ctl);
- if (is_value(ctx->msg))
- erts_fprintf(stderr, " MSG: %T\n", ctx->msg);
+ erts_fprintf(stderr, ">> CTL: %T\n", ctx->ctl);
+ if (is_value(ctx->msg))
+ erts_fprintf(stderr, " MSG: %T\n", ctx->msg);
#endif
- ctx->data_size = ctx->pass_through_size;
+ ctx->data_size = ctx->max_finalize_prepend;
erts_reset_atom_cache_map(ctx->acmp);
erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size);
- if (is_value(ctx->msg)) {
- ctx->u.sc.wstack.wstart = NULL;
- ctx->u.sc.flags = ctx->flags;
- ctx->u.sc.level = 0;
- ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE;
- } else {
- ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
- }
- break;
+ if (is_non_value(ctx->msg)) {
+ ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
+ break;
+ }
+ ctx->u.sc.wstack.wstart = NULL;
+ ctx->u.sc.flags = ctx->flags;
+ ctx->u.sc.level = 0;
+ ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE;
case ERTS_DSIG_SEND_PHASE_MSG_SIZE:
if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) {
retval = ERTS_DSIG_SEND_CONTINUE;
@@ -1920,23 +1919,24 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
ctx->data_size += ctx->dhdr_ext_size;
ctx->obuf = alloc_dist_obuf(ctx->data_size);
- ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->pass_through_size + ctx->dhdr_ext_size;
+ ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size;
/* Encode internal version of dist header */
ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp);
/* Encode control message */
erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL);
- if (is_value(ctx->msg)) {
- ctx->u.ec.flags = ctx->flags;
- ctx->u.ec.level = 0;
- ctx->u.ec.wstack.wstart = NULL;
- ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE;
- } else {
- ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
- }
- break;
+ if (is_non_value(ctx->msg)) {
+ ctx->obuf->msg_start = NULL;
+ ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
+ break;
+ }
+ ctx->u.ec.flags = ctx->flags;
+ ctx->u.ec.level = 0;
+ ctx->u.ec.wstack.wstart = NULL;
+ ctx->obuf->msg_start = ctx->obuf->ext_endp;
- case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
+ ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE;
+ case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) {
retval = ERTS_DSIG_SEND_CONTINUE;
goto done;
@@ -1949,7 +1949,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
int resume = 0;
ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp);
- ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->pass_through_size);
+ ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->max_finalize_prepend);
ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size);
ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp;
@@ -2308,9 +2308,7 @@ erts_dist_command(Port *prt, int reds_limit)
ob = oq.first;
ASSERT(ob);
do {
- ob->extp = erts_encode_ext_dist_header_finalize(ob->extp,
- dep->cache,
- flags);
+ erts_encode_ext_dist_header_finalize(ob, dep->cache, flags);
ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp);
reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
preempt = reds > reds_limit;
@@ -2350,10 +2348,7 @@ erts_dist_command(Port *prt, int reds_limit)
while (oq.first && !preempt) {
ErtsDistOutputBuf *fob;
Uint size;
- oq.first->extp
- = erts_encode_ext_dist_header_finalize(oq.first->extp,
- dep->cache,
- flags);
+ erts_encode_ext_dist_header_finalize(oq.first, dep->cache, flags);
reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
ASSERT(&oq.first->data[0] <= oq.first->extp
&& oq.first->extp < oq.first->ext_endp);
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index 49f3eac340..505d510473 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -405,7 +405,7 @@ struct erts_dsig_send_context {
Eterm ctl;
Eterm msg;
int force_busy;
- Uint32 pass_through_size;
+ Uint32 max_finalize_prepend;
Uint data_size, dhdr_ext_size;
ErtsAtomCacheMap *acmp;
ErtsDistOutputBuf *obuf;
diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h
index d012f4b2cf..af42814b8b 100644
--- a/erts/emulator/beam/erl_node_tables.h
+++ b/erts/emulator/beam/erl_node_tables.h
@@ -87,6 +87,7 @@ struct ErtsDistOutputBuf_ {
ErtsDistOutputBuf *next;
byte *extp;
byte *ext_endp;
+ byte *msg_start;
byte data[1];
};
diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c
index c49e2f617a..d8f52ceb57 100644
--- a/erts/emulator/beam/external.c
+++ b/erts/emulator/beam/external.c
@@ -351,41 +351,63 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp)
#define PASS_THROUGH 'p' /* This code should go */
-byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint32 dflags)
+void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
+ ErtsAtomCache *cache,
+ Uint32 dflags)
{
byte *ip;
byte instr_buf[(2+4)*ERTS_ATOM_CACHE_SIZE];
int ci, sz;
byte dist_hdr_flags;
int long_atoms;
- register byte *ep = ext;
+ register byte *ep = ob->extp;
ASSERT(dflags & DFLAG_UTF8_ATOMS);
+ /*
+ * The buffer can have three different layouts at this point depending on
+ * what was known when encoded:
+ *
+ * Pending connection: CtrlTerm [, MsgTerm]
+ * With atom cache : VERSION_MAGIC, DIST_HEADER, ..., CtrlTerm [, MsgTerm]
+ * No atom cache : VERSION_MAGIC, CtrlTerm [, VERSION_MAGIC, MsgTerm]
+ */
+
if (ep[0] != VERSION_MAGIC) {
+ /*
+ * Was encoded without atom cache toward pending connection.
+ */
ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT);
if (dflags & DFLAG_DIST_HDR_ATOM_CACHE) {
/*
- * Encoded without atom cache (toward pending connection)
- * but receiver wants dist header. Let's prepend an empty one.
+ * Receiver wants dist header. Let's prepend an empty one.
*/
*--ep = 0; /* NumberOfAtomCacheRefs */
*--ep = DIST_HEADER;
*--ep = VERSION_MAGIC;
}
else {
- /* Node without atom cache, 'pass through' needed */
-
- ASSERT(!"SVERK: Must insert VERSION_MAGIC's");
+ /*
+ * Primitive receiver without atom cache (erl_interface/jinterface).
+ * Must prepend VERSION_MAGIC to both ctrl and message
+ * And add PASSTHROUGH.
+ */
+ if (ob->msg_start) {
+ ASSERT(ep < ob->msg_start && ob->msg_start < ob->ext_endp);
+ sys_memmove(ep-1, ep, (ob->msg_start - ep));
+ ob->msg_start[-1] = VERSION_MAGIC;
+ --ep;
+ }
+ *--ep = VERSION_MAGIC;
*--ep = PASS_THROUGH;
}
- return ep;
+ goto done;
}
else if (ep[1] != DIST_HEADER) {
ASSERT(ep[1] == SMALL_TUPLE_EXT || ep[1] == LARGE_TUPLE_EXT);
ASSERT(!(dflags & DFLAG_DIST_HDR_ATOM_CACHE));
/* Node without atom cache, 'pass through' needed */
*--ep = PASS_THROUGH;
- return ep;
+ goto done;
}
dist_hdr_flags = ep[2];
@@ -510,7 +532,9 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint
put_int8(ci, ep);
*--ep = DIST_HEADER;
*--ep = VERSION_MAGIC;
- return ep;
+done:
+ ASSERT(ep >= ob->data);
+ ob->extp = ep;
}
int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp,
diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h
index 49950c4aad..11044b17ef 100644
--- a/erts/emulator/beam/external.h
+++ b/erts/emulator/beam/external.h
@@ -154,7 +154,7 @@ void erts_finalize_atom_cache_map(ErtsAtomCacheMap *, Uint32);
Uint erts_encode_ext_dist_header_size(ErtsAtomCacheMap *);
byte *erts_encode_ext_dist_header_setup(byte *, ErtsAtomCacheMap *);
-byte *erts_encode_ext_dist_header_finalize(byte *, ErtsAtomCache *, Uint32);
+void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, ErtsAtomCache *, Uint32);
struct erts_dsig_send_context;
int erts_encode_dist_ext_size(Eterm, Uint32, ErtsAtomCacheMap*, Uint* szp);
int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx, Uint* szp);