aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/external.c
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2017-08-25 18:46:58 +0200
committerSverker Eriksson <[email protected]>2017-11-15 20:13:03 +0100
commite4d28f70133af9b3b8c320fe5e70e2086830874d (patch)
tree72db3931c672d51e06281986b3f5a8685bd3ff75 /erts/emulator/beam/external.c
parent42dadd12ac6b6977513a6edd73ff6137488c4a4a (diff)
downloadotp-e4d28f70133af9b3b8c320fe5e70e2086830874d.tar.gz
otp-e4d28f70133af9b3b8c320fe5e70e2086830874d.tar.bz2
otp-e4d28f70133af9b3b8c320fe5e70e2086830874d.zip
erts: Transcode tuple fallbacks
When finalizing outgoing distribution messages we transcode them into using tuple fallbacks if the receiver does not support bitstrings and export-funs. This can only happen if the message was first encoded toward a pending connection when the receiver was unknown. It's an optimistic approach optmimized for modern beam nodes, that expect real bitstrings and funs (since <R13). Only erl_interface/jinterface lack this support.
Diffstat (limited to 'erts/emulator/beam/external.c')
-rw-r--r--erts/emulator/beam/external.c227
1 files changed, 202 insertions, 25 deletions
diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c
index 8408d7dd12..f2e6399ad7 100644
--- a/erts/emulator/beam/external.c
+++ b/erts/emulator/beam/external.c
@@ -122,6 +122,9 @@ static int encode_size_struct_int(struct TTBSizeContext_*, ErtsAtomCacheMap *acm
static Export binary_to_term_trap_export;
static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1);
+static Sint transcode_dist_obuf(ErtsDistOutputBuf*, DistEntry*, Uint32 dflags, Sint reds);
+
+
void erts_init_external(void) {
erts_init_trap_export(&term_to_binary_trap_export,
@@ -349,11 +352,13 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp)
}
}
-#define PASS_THROUGH 'p' /* This code should go */
-void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
- ErtsAtomCache *cache,
- Uint32 dflags)
+#define PASS_THROUGH 'p'
+
+Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
+ DistEntry* dep,
+ Uint32 dflags,
+ Sint reds)
{
byte *ip;
byte instr_buf[(2+4)*ERTS_ATOM_CACHE_SIZE];
@@ -364,7 +369,7 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
ASSERT(dflags & DFLAG_UTF8_ATOMS);
/*
- * The buffer can have three different layouts at this point depending on
+ * The buffer can have different layouts at this point depending on
* what was known when encoded:
*
* Pending connection: CtrlTerm [, MsgTerm]
@@ -372,34 +377,28 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
* No atom cache : VERSION_MAGIC, CtrlTerm [, VERSION_MAGIC, MsgTerm]
*/
- if (ep[0] != VERSION_MAGIC) {
+ if (ep[0] != VERSION_MAGIC || dep->transcode_ctx) {
/*
* Was encoded without atom cache toward pending connection.
*/
ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT);
+
+ if (~dflags & (DFLAG_BIT_BINARIES | DFLAG_EXPORT_PTR_TAG
+ | DFLAG_DIST_HDR_ATOM_CACHE)) {
+ reds = transcode_dist_obuf(ob, dep, dflags, reds);
+ if (reds < 0)
+ return reds;
+ ep = ob->extp;
+ }
if (dflags & DFLAG_DIST_HDR_ATOM_CACHE) {
/*
- * Receiver wants dist header. Let's prepend an empty one.
+ * Encoding was done without atom caching but receiver expects
+ * a dist header, so we prepend an empty one.
*/
*--ep = 0; /* NumberOfAtomCacheRefs */
*--ep = DIST_HEADER;
*--ep = VERSION_MAGIC;
}
- else {
- /*
- * Primitive receiver without atom cache (erl_interface/jinterface).
- * Must prepend VERSION_MAGIC to both ctrl and message
- * And add PASSTHROUGH.
- */
- if (ob->msg_start) {
- ASSERT(ep < ob->msg_start && ob->msg_start < ob->ext_endp);
- sys_memmove(ep-1, ep, (ob->msg_start - ep));
- ob->msg_start[-1] = VERSION_MAGIC;
- --ep;
- }
- *--ep = VERSION_MAGIC;
- *--ep = PASS_THROUGH;
- }
goto done;
}
else if (ep[1] != DIST_HEADER) {
@@ -436,6 +435,7 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
/ sizeof(Uint32))+1];
register Uint32 flgs;
int iix, flgs_bytes, flgs_buf_ix, used_half_bytes;
+ ErtsAtomCache* cache = dep->cache;
#ifdef DEBUG
int tot_used_half_bytes;
#endif
@@ -527,14 +527,16 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
break;
}
}
+ reds -= 3; /*was ERTS_PORT_REDS_DIST_CMD_FINALIZE*/
}
--ep;
put_int8(ci, ep);
*--ep = DIST_HEADER;
*--ep = VERSION_MAGIC;
done:
- ASSERT(ep >= ob->data);
ob->extp = ep;
+ ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp);
+ return reds < 0 ? 0 : reds;
}
int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp,
@@ -545,7 +547,7 @@ int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp,
return -1;
} else {
#ifndef ERTS_DEBUG_USE_DIST_SEP
- if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
+ if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_NO_MAGIC)))
#endif
sz++ /* VERSION_MAGIC */;
@@ -3285,6 +3287,7 @@ dec_term_atom_common:
n--;
if (ctx) {
if (reds < n) {
+ ASSERT(reds > 0);
ctx->state = B2TDecodeList;
ctx->u.dc.remaining_n = n - reds;
n = reds;
@@ -4388,7 +4391,7 @@ decoded_size(byte *ep, byte* endp, int internal_tags, B2TContext* ctx)
}
}
else
- reds = 0; /* not used but compiler warns anyway */
+ ERTS_UNDEF(reds, 0);
heap_size = 0;
terms = 1;
@@ -4698,3 +4701,177 @@ error:
#undef SKIP2
#undef CHKSIZE
}
+
+
+struct transcode_context {
+ enum {
+ TRANSCODE_DEC_MSG_SIZE,
+ TRANSCODE_DEC_MSG,
+ TRANSCODE_ENC_CTL,
+ TRANSCODE_ENC_MSG
+ }state;
+ Eterm ctl_term;
+ Eterm* ctl_heap;
+ ErtsHeapFactory ctl_factory;
+ Eterm* msg_heap;
+ B2TContext b2t;
+ TTBEncodeContext ttb;
+#ifdef DEBUG
+ ErtsDistOutputBuf* dbg_ob;
+#endif
+};
+
+void transcode_free_ctx(DistEntry* dep)
+{
+ struct transcode_context* ctx = dep->transcode_ctx;
+
+ erts_factory_close(&ctx->ctl_factory);
+ erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx->ctl_heap);
+
+ if (ctx->msg_heap) {
+ erts_factory_close(&ctx->b2t.u.dc.factory);
+ erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx->msg_heap);
+ }
+ erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx);
+ dep->transcode_ctx = NULL;
+}
+
+Sint transcode_dist_obuf(ErtsDistOutputBuf* ob,
+ DistEntry* dep,
+ Uint32 dflags,
+ Sint reds)
+{
+ Sint hsz;
+ byte* decp;
+ const int have_msg = !!ob->msg_start;
+ int i;
+ struct transcode_context* ctx = dep->transcode_ctx;
+
+ if (!ctx) { /* first call for 'ob' */
+
+ if (~dflags & (DFLAG_BIT_BINARIES | DFLAG_EXPORT_PTR_TAG)) {
+ /*
+ * Receiver does not support bitstrings and/or export funs.
+ * We need to transcode control and message terms to use tuple fallbacks.
+ */
+ ctx = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, sizeof(struct transcode_context));
+ dep->transcode_ctx = ctx;
+ #ifdef DEBUG
+ ctx->dbg_ob = ob;
+ #endif
+
+ hsz = decoded_size(ob->extp, ob->ext_endp, 0, NULL);
+ ctx->ctl_heap = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, hsz*sizeof(Eterm));
+ erts_factory_tmp_init(&ctx->ctl_factory, ctx->ctl_heap, hsz, ERTS_ALC_T_DIST_TRANSCODE);
+ ctx->msg_heap = NULL;
+
+ decp = dec_term(NULL, &ctx->ctl_factory, ob->extp, &ctx->ctl_term, NULL);
+ if (have_msg) {
+ ASSERT(decp == ob->msg_start); (void)decp;
+ ctx->b2t.u.sc.ep = NULL;
+ ctx->b2t.state = B2TSize;
+ ctx->b2t.aligned_alloc = NULL;
+ ctx->b2t.b2ts.exttmp = 0;
+ ctx->state = TRANSCODE_DEC_MSG_SIZE;
+ }
+ else {
+ ASSERT(decp == ob->ext_endp);
+ ctx->state = TRANSCODE_ENC_CTL;
+ }
+ }
+ else {
+ /*
+ * No need for full transcoding, but primitive receiver (erl_/jinterface)
+ * expects VERSION_MAGIC before both control and message terms.
+ */
+ if (ob->msg_start) {
+ Sint ctl_bytes = ob->msg_start - ob->extp;
+ ASSERT(ob->extp < ob->msg_start && ob->msg_start < ob->ext_endp);
+ /* Move control term back 1 byte to make room */
+ sys_memmove(ob->extp-1, ob->extp, ctl_bytes);
+ *--(ob->msg_start) = VERSION_MAGIC;
+ --(ob->extp);
+ reds -= ctl_bytes / (B2T_BYTES_PER_REDUCTION * B2T_MEMCPY_FACTOR);
+ }
+ *--(ob->extp) = VERSION_MAGIC;
+ goto done;
+ }
+ }
+ else {
+ ASSERT(ctx->dbg_ob == ob);
+ }
+ ctx->b2t.reds = reds * B2T_BYTES_PER_REDUCTION;
+
+ switch (ctx->state) {
+ case TRANSCODE_DEC_MSG_SIZE:
+ hsz = decoded_size(ob->msg_start, ob->ext_endp, 0, &ctx->b2t);
+ if (ctx->b2t.state == B2TSize) {
+ return -1;
+ }
+ ASSERT(ctx->b2t.state == B2TDecodeInit);
+ ctx->msg_heap = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, hsz*sizeof(Eterm));
+ ctx->b2t.u.dc.ep = ob->msg_start;
+ ctx->b2t.u.dc.res = (Eterm) NULL;
+ ctx->b2t.u.dc.next = &ctx->b2t.u.dc.res;
+ erts_factory_tmp_init(&ctx->b2t.u.dc.factory,
+ ctx->msg_heap, hsz, ERTS_ALC_T_DIST_TRANSCODE);
+ ctx->b2t.u.dc.flat_maps.wstart = NULL;
+ ctx->b2t.u.dc.hamt_array.pstart = NULL;
+ ctx->b2t.state = B2TDecode;
+
+ ctx->state = TRANSCODE_DEC_MSG;
+ case TRANSCODE_DEC_MSG:
+ if (ctx->b2t.reds <= 0)
+ ctx->b2t.reds = 1;
+ decp = dec_term(NULL, NULL, NULL, NULL, &ctx->b2t);
+ if (ctx->b2t.state < B2TDone) {
+ return -1;
+ }
+ ASSERT(ctx->b2t.state == B2TDone);
+ ASSERT(decp && decp <= ob->ext_endp);
+ reds = ctx->b2t.reds / B2T_BYTES_PER_REDUCTION;
+ b2t_destroy_context(&ctx->b2t);
+
+ ctx->state = TRANSCODE_ENC_CTL;
+ case TRANSCODE_ENC_CTL:
+ if (!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)) {
+ ASSERT(!(dflags & DFLAG_NO_MAGIC));
+ ob->extp -= 2; /* VERSION_MAGIC x 2 */
+ }
+ ob->ext_endp = ob->extp;
+ i = erts_encode_dist_ext(ctx->ctl_term, &ob->ext_endp, dflags,
+ NULL, NULL, NULL);
+ ASSERT(i == 0); (void)i;
+ ASSERT(ob->ext_endp <= ob->alloc_endp);
+
+ if (!have_msg) {
+ break;
+ }
+ ob->msg_start = ob->ext_endp;
+ ctx->ttb.wstack.wstart = NULL;
+ ctx->ttb.flags = dflags;
+ ctx->ttb.level = 0;
+
+ ctx->state = TRANSCODE_ENC_MSG;
+ case TRANSCODE_ENC_MSG:
+ reds *= TERM_TO_BINARY_LOOP_FACTOR;
+ if (erts_encode_dist_ext(ctx->b2t.u.dc.res, &ob->ext_endp, dflags, NULL,
+ &ctx->ttb, &reds)) {
+ return -1;
+ }
+ reds /= TERM_TO_BINARY_LOOP_FACTOR;
+
+ ASSERT(ob->ext_endp <= ob->alloc_endp);
+
+ }
+ transcode_free_ctx(dep);
+
+done:
+ if (!(dflags & DFLAG_DIST_HDR_ATOM_CACHE))
+ *--(ob->extp) = PASS_THROUGH;
+
+ if (reds < 0)
+ reds = 0;
+
+ return reds;
+}