diff options
author | Sverker Eriksson <[email protected]> | 2017-08-25 18:46:58 +0200 |
---|---|---|
committer | Sverker Eriksson <[email protected]> | 2017-11-15 20:13:03 +0100 |
commit | e4d28f70133af9b3b8c320fe5e70e2086830874d (patch) | |
tree | 72db3931c672d51e06281986b3f5a8685bd3ff75 /erts/emulator/beam/external.c | |
parent | 42dadd12ac6b6977513a6edd73ff6137488c4a4a (diff) | |
download | otp-e4d28f70133af9b3b8c320fe5e70e2086830874d.tar.gz otp-e4d28f70133af9b3b8c320fe5e70e2086830874d.tar.bz2 otp-e4d28f70133af9b3b8c320fe5e70e2086830874d.zip |
erts: Transcode tuple fallbacks
When finalizing outgoing distribution messages
we transcode them into using tuple fallbacks if the
receiver does not support bitstrings and export-funs.
This can only happen if the message was first encoded toward
a pending connection when the receiver was unknown.
It's an optimistic approach optmimized for modern beam nodes,
that expect real bitstrings and funs (since <R13).
Only erl_interface/jinterface lack this support.
Diffstat (limited to 'erts/emulator/beam/external.c')
-rw-r--r-- | erts/emulator/beam/external.c | 227 |
1 files changed, 202 insertions, 25 deletions
diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 8408d7dd12..f2e6399ad7 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -122,6 +122,9 @@ static int encode_size_struct_int(struct TTBSizeContext_*, ErtsAtomCacheMap *acm static Export binary_to_term_trap_export; static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1); +static Sint transcode_dist_obuf(ErtsDistOutputBuf*, DistEntry*, Uint32 dflags, Sint reds); + + void erts_init_external(void) { erts_init_trap_export(&term_to_binary_trap_export, @@ -349,11 +352,13 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) } } -#define PASS_THROUGH 'p' /* This code should go */ -void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, - ErtsAtomCache *cache, - Uint32 dflags) +#define PASS_THROUGH 'p' + +Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, + DistEntry* dep, + Uint32 dflags, + Sint reds) { byte *ip; byte instr_buf[(2+4)*ERTS_ATOM_CACHE_SIZE]; @@ -364,7 +369,7 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, ASSERT(dflags & DFLAG_UTF8_ATOMS); /* - * The buffer can have three different layouts at this point depending on + * The buffer can have different layouts at this point depending on * what was known when encoded: * * Pending connection: CtrlTerm [, MsgTerm] @@ -372,34 +377,28 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, * No atom cache : VERSION_MAGIC, CtrlTerm [, VERSION_MAGIC, MsgTerm] */ - if (ep[0] != VERSION_MAGIC) { + if (ep[0] != VERSION_MAGIC || dep->transcode_ctx) { /* * Was encoded without atom cache toward pending connection. */ ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT); + + if (~dflags & (DFLAG_BIT_BINARIES | DFLAG_EXPORT_PTR_TAG + | DFLAG_DIST_HDR_ATOM_CACHE)) { + reds = transcode_dist_obuf(ob, dep, dflags, reds); + if (reds < 0) + return reds; + ep = ob->extp; + } if (dflags & DFLAG_DIST_HDR_ATOM_CACHE) { /* - * Receiver wants dist header. Let's prepend an empty one. + * Encoding was done without atom caching but receiver expects + * a dist header, so we prepend an empty one. */ *--ep = 0; /* NumberOfAtomCacheRefs */ *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; } - else { - /* - * Primitive receiver without atom cache (erl_interface/jinterface). - * Must prepend VERSION_MAGIC to both ctrl and message - * And add PASSTHROUGH. - */ - if (ob->msg_start) { - ASSERT(ep < ob->msg_start && ob->msg_start < ob->ext_endp); - sys_memmove(ep-1, ep, (ob->msg_start - ep)); - ob->msg_start[-1] = VERSION_MAGIC; - --ep; - } - *--ep = VERSION_MAGIC; - *--ep = PASS_THROUGH; - } goto done; } else if (ep[1] != DIST_HEADER) { @@ -436,6 +435,7 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, / sizeof(Uint32))+1]; register Uint32 flgs; int iix, flgs_bytes, flgs_buf_ix, used_half_bytes; + ErtsAtomCache* cache = dep->cache; #ifdef DEBUG int tot_used_half_bytes; #endif @@ -527,14 +527,16 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, break; } } + reds -= 3; /*was ERTS_PORT_REDS_DIST_CMD_FINALIZE*/ } --ep; put_int8(ci, ep); *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; done: - ASSERT(ep >= ob->data); ob->extp = ep; + ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); + return reds < 0 ? 0 : reds; } int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, @@ -545,7 +547,7 @@ int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, return -1; } else { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_NO_MAGIC))) #endif sz++ /* VERSION_MAGIC */; @@ -3285,6 +3287,7 @@ dec_term_atom_common: n--; if (ctx) { if (reds < n) { + ASSERT(reds > 0); ctx->state = B2TDecodeList; ctx->u.dc.remaining_n = n - reds; n = reds; @@ -4388,7 +4391,7 @@ decoded_size(byte *ep, byte* endp, int internal_tags, B2TContext* ctx) } } else - reds = 0; /* not used but compiler warns anyway */ + ERTS_UNDEF(reds, 0); heap_size = 0; terms = 1; @@ -4698,3 +4701,177 @@ error: #undef SKIP2 #undef CHKSIZE } + + +struct transcode_context { + enum { + TRANSCODE_DEC_MSG_SIZE, + TRANSCODE_DEC_MSG, + TRANSCODE_ENC_CTL, + TRANSCODE_ENC_MSG + }state; + Eterm ctl_term; + Eterm* ctl_heap; + ErtsHeapFactory ctl_factory; + Eterm* msg_heap; + B2TContext b2t; + TTBEncodeContext ttb; +#ifdef DEBUG + ErtsDistOutputBuf* dbg_ob; +#endif +}; + +void transcode_free_ctx(DistEntry* dep) +{ + struct transcode_context* ctx = dep->transcode_ctx; + + erts_factory_close(&ctx->ctl_factory); + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx->ctl_heap); + + if (ctx->msg_heap) { + erts_factory_close(&ctx->b2t.u.dc.factory); + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx->msg_heap); + } + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx); + dep->transcode_ctx = NULL; +} + +Sint transcode_dist_obuf(ErtsDistOutputBuf* ob, + DistEntry* dep, + Uint32 dflags, + Sint reds) +{ + Sint hsz; + byte* decp; + const int have_msg = !!ob->msg_start; + int i; + struct transcode_context* ctx = dep->transcode_ctx; + + if (!ctx) { /* first call for 'ob' */ + + if (~dflags & (DFLAG_BIT_BINARIES | DFLAG_EXPORT_PTR_TAG)) { + /* + * Receiver does not support bitstrings and/or export funs. + * We need to transcode control and message terms to use tuple fallbacks. + */ + ctx = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, sizeof(struct transcode_context)); + dep->transcode_ctx = ctx; + #ifdef DEBUG + ctx->dbg_ob = ob; + #endif + + hsz = decoded_size(ob->extp, ob->ext_endp, 0, NULL); + ctx->ctl_heap = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, hsz*sizeof(Eterm)); + erts_factory_tmp_init(&ctx->ctl_factory, ctx->ctl_heap, hsz, ERTS_ALC_T_DIST_TRANSCODE); + ctx->msg_heap = NULL; + + decp = dec_term(NULL, &ctx->ctl_factory, ob->extp, &ctx->ctl_term, NULL); + if (have_msg) { + ASSERT(decp == ob->msg_start); (void)decp; + ctx->b2t.u.sc.ep = NULL; + ctx->b2t.state = B2TSize; + ctx->b2t.aligned_alloc = NULL; + ctx->b2t.b2ts.exttmp = 0; + ctx->state = TRANSCODE_DEC_MSG_SIZE; + } + else { + ASSERT(decp == ob->ext_endp); + ctx->state = TRANSCODE_ENC_CTL; + } + } + else { + /* + * No need for full transcoding, but primitive receiver (erl_/jinterface) + * expects VERSION_MAGIC before both control and message terms. + */ + if (ob->msg_start) { + Sint ctl_bytes = ob->msg_start - ob->extp; + ASSERT(ob->extp < ob->msg_start && ob->msg_start < ob->ext_endp); + /* Move control term back 1 byte to make room */ + sys_memmove(ob->extp-1, ob->extp, ctl_bytes); + *--(ob->msg_start) = VERSION_MAGIC; + --(ob->extp); + reds -= ctl_bytes / (B2T_BYTES_PER_REDUCTION * B2T_MEMCPY_FACTOR); + } + *--(ob->extp) = VERSION_MAGIC; + goto done; + } + } + else { + ASSERT(ctx->dbg_ob == ob); + } + ctx->b2t.reds = reds * B2T_BYTES_PER_REDUCTION; + + switch (ctx->state) { + case TRANSCODE_DEC_MSG_SIZE: + hsz = decoded_size(ob->msg_start, ob->ext_endp, 0, &ctx->b2t); + if (ctx->b2t.state == B2TSize) { + return -1; + } + ASSERT(ctx->b2t.state == B2TDecodeInit); + ctx->msg_heap = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, hsz*sizeof(Eterm)); + ctx->b2t.u.dc.ep = ob->msg_start; + ctx->b2t.u.dc.res = (Eterm) NULL; + ctx->b2t.u.dc.next = &ctx->b2t.u.dc.res; + erts_factory_tmp_init(&ctx->b2t.u.dc.factory, + ctx->msg_heap, hsz, ERTS_ALC_T_DIST_TRANSCODE); + ctx->b2t.u.dc.flat_maps.wstart = NULL; + ctx->b2t.u.dc.hamt_array.pstart = NULL; + ctx->b2t.state = B2TDecode; + + ctx->state = TRANSCODE_DEC_MSG; + case TRANSCODE_DEC_MSG: + if (ctx->b2t.reds <= 0) + ctx->b2t.reds = 1; + decp = dec_term(NULL, NULL, NULL, NULL, &ctx->b2t); + if (ctx->b2t.state < B2TDone) { + return -1; + } + ASSERT(ctx->b2t.state == B2TDone); + ASSERT(decp && decp <= ob->ext_endp); + reds = ctx->b2t.reds / B2T_BYTES_PER_REDUCTION; + b2t_destroy_context(&ctx->b2t); + + ctx->state = TRANSCODE_ENC_CTL; + case TRANSCODE_ENC_CTL: + if (!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)) { + ASSERT(!(dflags & DFLAG_NO_MAGIC)); + ob->extp -= 2; /* VERSION_MAGIC x 2 */ + } + ob->ext_endp = ob->extp; + i = erts_encode_dist_ext(ctx->ctl_term, &ob->ext_endp, dflags, + NULL, NULL, NULL); + ASSERT(i == 0); (void)i; + ASSERT(ob->ext_endp <= ob->alloc_endp); + + if (!have_msg) { + break; + } + ob->msg_start = ob->ext_endp; + ctx->ttb.wstack.wstart = NULL; + ctx->ttb.flags = dflags; + ctx->ttb.level = 0; + + ctx->state = TRANSCODE_ENC_MSG; + case TRANSCODE_ENC_MSG: + reds *= TERM_TO_BINARY_LOOP_FACTOR; + if (erts_encode_dist_ext(ctx->b2t.u.dc.res, &ob->ext_endp, dflags, NULL, + &ctx->ttb, &reds)) { + return -1; + } + reds /= TERM_TO_BINARY_LOOP_FACTOR; + + ASSERT(ob->ext_endp <= ob->alloc_endp); + + } + transcode_free_ctx(dep); + +done: + if (!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--(ob->extp) = PASS_THROUGH; + + if (reds < 0) + reds = 0; + + return reds; +} |