From 5857245406584b8bfcbdf0be450ad494a992d5c8 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 8 Sep 2017 17:11:43 +0200 Subject: erts: Fix auto-connect toward erl_interface/jinterface --- erts/emulator/beam/dist.c | 63 +++++++++++++++++------------------- erts/emulator/beam/dist.h | 2 +- erts/emulator/beam/erl_node_tables.h | 1 + erts/emulator/beam/external.c | 44 +++++++++++++++++++------ erts/emulator/beam/external.h | 2 +- 5 files changed, 66 insertions(+), 46 deletions(-) diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index cd4125e404..6d2e907f18 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -1879,33 +1879,32 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) { ctx->acmp = erts_get_atom_cache_map(ctx->c_p); - ctx->pass_through_size = 0; + ctx->max_finalize_prepend = 0; } else { ctx->acmp = NULL; - ctx->pass_through_size = 3; /* SVERK rename */ + ctx->max_finalize_prepend = 3; } #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, ">>%s CTL: %T\n", ctx->pass_through_size ? "P" : " ", ctx->ctl); - if (is_value(ctx->msg)) - erts_fprintf(stderr, " MSG: %T\n", ctx->msg); + erts_fprintf(stderr, ">> CTL: %T\n", ctx->ctl); + if (is_value(ctx->msg)) + erts_fprintf(stderr, " MSG: %T\n", ctx->msg); #endif - ctx->data_size = ctx->pass_through_size; + ctx->data_size = ctx->max_finalize_prepend; erts_reset_atom_cache_map(ctx->acmp); erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size); - if (is_value(ctx->msg)) { - ctx->u.sc.wstack.wstart = NULL; - ctx->u.sc.flags = ctx->flags; - ctx->u.sc.level = 0; - ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; - } else { - ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; - } - break; + if (is_non_value(ctx->msg)) { + ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; + break; + } + ctx->u.sc.wstack.wstart = NULL; + ctx->u.sc.flags = ctx->flags; + ctx->u.sc.level = 0; + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; case ERTS_DSIG_SEND_PHASE_MSG_SIZE: if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) { retval = ERTS_DSIG_SEND_CONTINUE; @@ -1920,23 +1919,24 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->data_size += ctx->dhdr_ext_size; ctx->obuf = alloc_dist_obuf(ctx->data_size); - ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->pass_through_size + ctx->dhdr_ext_size; + ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size; /* Encode internal version of dist header */ ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp); /* Encode control message */ erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL); - if (is_value(ctx->msg)) { - ctx->u.ec.flags = ctx->flags; - ctx->u.ec.level = 0; - ctx->u.ec.wstack.wstart = NULL; - ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; - } else { - ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; - } - break; + if (is_non_value(ctx->msg)) { + ctx->obuf->msg_start = NULL; + ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; + break; + } + ctx->u.ec.flags = ctx->flags; + ctx->u.ec.level = 0; + ctx->u.ec.wstack.wstart = NULL; + ctx->obuf->msg_start = ctx->obuf->ext_endp; - case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; + case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) { retval = ERTS_DSIG_SEND_CONTINUE; goto done; @@ -1949,7 +1949,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) int resume = 0; ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp); - ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->pass_through_size); + ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->max_finalize_prepend); ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size); ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp; @@ -2308,9 +2308,7 @@ erts_dist_command(Port *prt, int reds_limit) ob = oq.first; ASSERT(ob); do { - ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, - dep->cache, - flags); + erts_encode_ext_dist_header_finalize(ob, dep->cache, flags); ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; preempt = reds > reds_limit; @@ -2350,10 +2348,7 @@ erts_dist_command(Port *prt, int reds_limit) while (oq.first && !preempt) { ErtsDistOutputBuf *fob; Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); + erts_encode_ext_dist_header_finalize(oq.first, dep->cache, flags); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; ASSERT(&oq.first->data[0] <= oq.first->extp && oq.first->extp < oq.first->ext_endp); diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 49f3eac340..505d510473 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -405,7 +405,7 @@ struct erts_dsig_send_context { Eterm ctl; Eterm msg; int force_busy; - Uint32 pass_through_size; + Uint32 max_finalize_prepend; Uint data_size, dhdr_ext_size; ErtsAtomCacheMap *acmp; ErtsDistOutputBuf *obuf; diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index d012f4b2cf..af42814b8b 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -87,6 +87,7 @@ struct ErtsDistOutputBuf_ { ErtsDistOutputBuf *next; byte *extp; byte *ext_endp; + byte *msg_start; byte data[1]; }; diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index c49e2f617a..d8f52ceb57 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -351,41 +351,63 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) #define PASS_THROUGH 'p' /* This code should go */ -byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint32 dflags) +void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, + ErtsAtomCache *cache, + Uint32 dflags) { byte *ip; byte instr_buf[(2+4)*ERTS_ATOM_CACHE_SIZE]; int ci, sz; byte dist_hdr_flags; int long_atoms; - register byte *ep = ext; + register byte *ep = ob->extp; ASSERT(dflags & DFLAG_UTF8_ATOMS); + /* + * The buffer can have three different layouts at this point depending on + * what was known when encoded: + * + * Pending connection: CtrlTerm [, MsgTerm] + * With atom cache : VERSION_MAGIC, DIST_HEADER, ..., CtrlTerm [, MsgTerm] + * No atom cache : VERSION_MAGIC, CtrlTerm [, VERSION_MAGIC, MsgTerm] + */ + if (ep[0] != VERSION_MAGIC) { + /* + * Was encoded without atom cache toward pending connection. + */ ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT); if (dflags & DFLAG_DIST_HDR_ATOM_CACHE) { /* - * Encoded without atom cache (toward pending connection) - * but receiver wants dist header. Let's prepend an empty one. + * Receiver wants dist header. Let's prepend an empty one. */ *--ep = 0; /* NumberOfAtomCacheRefs */ *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; } else { - /* Node without atom cache, 'pass through' needed */ - - ASSERT(!"SVERK: Must insert VERSION_MAGIC's"); + /* + * Primitive receiver without atom cache (erl_interface/jinterface). + * Must prepend VERSION_MAGIC to both ctrl and message + * And add PASSTHROUGH. + */ + if (ob->msg_start) { + ASSERT(ep < ob->msg_start && ob->msg_start < ob->ext_endp); + sys_memmove(ep-1, ep, (ob->msg_start - ep)); + ob->msg_start[-1] = VERSION_MAGIC; + --ep; + } + *--ep = VERSION_MAGIC; *--ep = PASS_THROUGH; } - return ep; + goto done; } else if (ep[1] != DIST_HEADER) { ASSERT(ep[1] == SMALL_TUPLE_EXT || ep[1] == LARGE_TUPLE_EXT); ASSERT(!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)); /* Node without atom cache, 'pass through' needed */ *--ep = PASS_THROUGH; - return ep; + goto done; } dist_hdr_flags = ep[2]; @@ -510,7 +532,9 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint put_int8(ci, ep); *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; - return ep; +done: + ASSERT(ep >= ob->data); + ob->extp = ep; } int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index 49950c4aad..11044b17ef 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -154,7 +154,7 @@ void erts_finalize_atom_cache_map(ErtsAtomCacheMap *, Uint32); Uint erts_encode_ext_dist_header_size(ErtsAtomCacheMap *); byte *erts_encode_ext_dist_header_setup(byte *, ErtsAtomCacheMap *); -byte *erts_encode_ext_dist_header_finalize(byte *, ErtsAtomCache *, Uint32); +void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, ErtsAtomCache *, Uint32); struct erts_dsig_send_context; int erts_encode_dist_ext_size(Eterm, Uint32, ErtsAtomCacheMap*, Uint* szp); int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx, Uint* szp); -- cgit v1.2.3