diff options
-rw-r--r-- | erts/emulator/beam/dist.c | 159 | ||||
-rw-r--r-- | erts/emulator/beam/erl_alloc.types | 1 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.c | 1 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.h | 3 | ||||
-rw-r--r-- | erts/emulator/beam/external.c | 227 | ||||
-rw-r--r-- | erts/emulator/beam/external.h | 4 |
6 files changed, 289 insertions, 106 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 7877865ad4..a5df92bf9a 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -648,6 +648,7 @@ alloc_dist_obuf(Uint size) obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[0]; #ifdef DEBUG obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN; + obuf->alloc_endp = obuf->data + size; ASSERT(bin == ErtsDistOutputBuf2Binary(obuf)); #endif return obuf; @@ -692,6 +693,7 @@ static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; + return obuf; } @@ -755,6 +757,8 @@ static void clear_dist_entry(DistEntry *dep) delete_cache(cache); free_de_out_queues(dep, obuf); + if (dep->transcode_ctx) + transcode_free_ctx(dep); } int erts_dsend_context_dtor(Binary* ctx_bin) @@ -2208,7 +2212,6 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) #endif #define ERTS_PORT_REDS_DIST_CMD_START 5 -#define ERTS_PORT_REDS_DIST_CMD_FINALIZE 3 #define ERTS_PORT_REDS_DIST_CMD_EXIT 200 #define ERTS_PORT_REDS_DIST_CMD_RESUMED 5 #define ERTS_PORT_REDS_DIST_CMD_DATA(SZ) \ @@ -2217,9 +2220,9 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) : ((((Sint) (SZ)) >> 10) & ((Sint) ERTS_PORT_REDS_MASK__))) int -erts_dist_command(Port *prt, int reds_limit) +erts_dist_command(Port *prt, int initial_reds) { - Sint reds = ERTS_PORT_REDS_DIST_CMD_START; + Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START; Uint32 status; Uint32 flags; Sint qsize, obufsize = 0; @@ -2241,9 +2244,12 @@ erts_dist_command(Port *prt, int reds_limit) if (status & ERTS_DE_SFLG_EXITING) { erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1); - return reds + ERTS_PORT_REDS_DIST_CMD_EXIT; + reds -= ERTS_PORT_REDS_DIST_CMD_EXIT; + return initial_reds - reds; } + ASSERT(!(status & ERTS_DE_SFLG_PENDING)); + ASSERT(send); /* @@ -2268,7 +2274,7 @@ erts_dist_command(Port *prt, int reds_limit) sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - if (reds > reds_limit) + if (reds < 0) goto preempted; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) { @@ -2283,13 +2289,13 @@ erts_dist_command(Port *prt, int reds_limit) erts_fprintf(stderr, ">> "); bw(foq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = foq.first; - obufsize += size_obuf(fob); - foq.first = foq.first->next; - free_dist_obuf(fob); + reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = foq.first; + obufsize += size_obuf(fob); + foq.first = foq.first->next; + free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + preempt = reds < 0 || (sched_flags & ERTS_PTS_FLG_EXIT); if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) break; } while (foq.first && !preempt); @@ -2302,44 +2308,42 @@ erts_dist_command(Port *prt, int reds_limit) if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) { if (oq.first) { ErtsDistOutputBuf *ob; - int preempt; + ErtsDistOutputBuf *last_finalized = NULL; finalize_only: - preempt = 0; ob = oq.first; ASSERT(ob); do { - erts_encode_ext_dist_header_finalize(ob, dep->cache, flags); - ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - preempt = reds > reds_limit; - if (preempt) - break; - ob = ob->next; + reds = erts_encode_ext_dist_header_finalize(ob, dep, flags, reds); + if (reds >= 0) { + last_finalized = ob; + ob = ob->next; + } } while (ob); - /* - * At least one buffer was finalized; if we got preempted, - * ob points to the last buffer that we finalized. - */ - if (foq.last) - foq.last->next = oq.first; - else - foq.first = oq.first; - if (!preempt) { - /* All buffers finalized */ - foq.last = oq.last; - oq.first = oq.last = NULL; - } - else { - /* Not all buffers finalized; split oq. */ - foq.last = ob; - oq.first = ob->next; - if (oq.first) - ob->next = NULL; - else - oq.last = NULL; - } - if (preempt) - goto preempted; + if (last_finalized) { + /* + * At least one buffer was finalized; if we got preempted, + * ob points to the next buffer to continue finalize. + */ + if (foq.last) + foq.last->next = oq.first; + else + foq.first = oq.first; + foq.last = last_finalized; + if (!ob) { + /* All buffers finalized */ + ASSERT(foq.last == oq.last); + ASSERT(foq.last->next == NULL); + oq.first = oq.last = NULL; + } + else { + /* Not all buffers finalized; split oq. */ + ASSERT(foq.last->next == ob); + foq.last->next = NULL; + oq.first = ob; + } + } + if (reds <= 0) + goto preempted; } } else { @@ -2348,8 +2352,11 @@ erts_dist_command(Port *prt, int reds_limit) while (oq.first && !preempt) { ErtsDistOutputBuf *fob; Uint size; - erts_encode_ext_dist_header_finalize(oq.first, dep->cache, flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + reds = erts_encode_ext_dist_header_finalize(oq.first, dep, flags, reds); + if (reds < 0) { + preempt = 1; + break; + } ASSERT(&oq.first->data[0] <= oq.first->extp && oq.first->extp < oq.first->ext_endp); size = (*send)(prt, oq.first); @@ -2359,13 +2366,13 @@ erts_dist_command(Port *prt, int reds_limit) erts_fprintf(stderr, ">> "); bw(oq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = oq.first; - obufsize += size_obuf(fob); - oq.first = oq.first->next; - free_dist_obuf(fob); + reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = oq.first; + obufsize += size_obuf(fob); + oq.first = oq.first->next; + free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + preempt = reds <= 0 || (sched_flags & ERTS_PTS_FLG_EXIT); if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt) goto finalize_only; } @@ -2405,7 +2412,7 @@ erts_dist_command(Port *prt, int reds_limit) erts_mtx_unlock(&dep->qlock); resumed = erts_resume_processes(suspendees); - reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } else erts_mtx_unlock(&dep->qlock); @@ -2428,8 +2435,7 @@ erts_dist_command(Port *prt, int reds_limit) erts_mtx_unlock(&dep->qlock); } - ASSERT(foq.first || !foq.last); - ASSERT(!foq.first || foq.last); + ASSERT(!!foq.first == !!foq.last); ASSERT(!dep->finalized_out_queue.first); ASSERT(!dep->finalized_out_queue.last); @@ -2439,10 +2445,10 @@ erts_dist_command(Port *prt, int reds_limit) } /* Avoid wrapping reduction counter... */ - if (reds > INT_MAX/2) - reds = INT_MAX/2; + if (reds < INT_MIN/2) + reds = INT_MIN/2; - return reds; + return initial_reds - reds; preempted: /* @@ -2450,8 +2456,7 @@ erts_dist_command(Port *prt, int reds_limit) * since last call to driver. */ - ASSERT(oq.first || !oq.last); - ASSERT(!oq.first || oq.last); + ASSERT(!!oq.first == !!oq.last); if (sched_flags & ERTS_PTS_FLG_EXIT) { /* @@ -2475,14 +2480,11 @@ erts_dist_command(Port *prt, int reds_limit) foq.first = NULL; foq.last = NULL; - -/* SVERK Hmmm.... #ifdef DEBUG erts_mtx_lock(&dep->qlock); ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); erts_mtx_unlock(&dep->qlock); #endif -*/ } else { if (oq.first) { @@ -2772,7 +2774,8 @@ BIF_RETTYPE dist_ctrl_get_data_1(BIF_ALIST_1) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); - int reds = 1; + const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P); + Sint reds = initial_reds; ErtsDistOutputBuf *obuf; Eterm *hp; ProcBin *pb; @@ -2803,6 +2806,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1) { if (!dep->tmp_out_queue.first) { ASSERT(!dep->tmp_out_queue.last); + ASSERT(!dep->transcode_ctx); qsize = erts_atomic_read_acqb(&dep->qsize); if (qsize > 0) { erts_mtx_lock(&dep->qlock); @@ -2820,21 +2824,18 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_de_runlock(dep); BIF_RET(am_none); } - else { - obuf = dep->tmp_out_queue.first; - dep->tmp_out_queue.first = obuf->next; - if (!obuf->next) - dep->tmp_out_queue.last = NULL; + + obuf = dep->tmp_out_queue.first; + reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->flags, reds); + if (reds < 0) { + erts_de_runlock(dep); + ERTS_BIF_YIELD1(bif_export[BIF_dist_ctrl_get_data_1], + BIF_P, BIF_ARG_1); } - obuf->extp = erts_encode_ext_dist_header_finalize(obuf->extp, - dep->cache, - dep->flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--obuf->extp = PASS_THROUGH; /* 'pass through' needed */ - ASSERT(&obuf->data[0] <= obuf->extp - && obuf->extp < obuf->ext_endp); + dep->tmp_out_queue.first = obuf->next; + if (!obuf->next) + dep->tmp_out_queue.last = NULL; } erts_atomic64_inc_nob(&dep->out); @@ -2862,11 +2863,11 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_mtx_unlock(&dep->qlock); if (resume_procs) { int resumed = erts_resume_processes(resume_procs); - reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } } - BIF_RET2(make_binary(pb), reds); + BIF_RET2(make_binary(pb), (initial_reds - reds)); } void diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 2960272eab..c6cc5c78b3 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -261,6 +261,7 @@ type MINDIRECTION FIXED_SIZE SYSTEM magic_indirection type BINARY_FIND SHORT_LIVED PROCESSES binary_find type OPEN_PORT_ENV TEMPORARY SYSTEM open_port_env type CRASH_DUMP STANDARD SYSTEM crash_dump +type DIST_TRANSCODE SHORT_LIVED SYSTEM dist_transcode_context type THR_Q_EL STANDARD SYSTEM thr_q_element type THR_Q_EL_SL FIXED_SIZE SYSTEM sl_thr_q_element diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 2a33766ac8..00e8306510 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -204,6 +204,7 @@ dist_table_alloc(void *dep_tmpl) erts_port_task_handle_init(&dep->dist_cmd); dep->send = NULL; dep->cache = NULL; + dep->transcode_ctx = NULL; /* Link in */ diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index af42814b8b..afa83f8b46 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -83,6 +83,7 @@ typedef struct ErtsDistOutputBuf_ ErtsDistOutputBuf; struct ErtsDistOutputBuf_ { #ifdef DEBUG Uint dbg_pattern; + byte *alloc_endp; #endif ErtsDistOutputBuf *next; byte *extp; @@ -156,6 +157,8 @@ typedef struct dist_entry_ { struct cache* cache; /* The atom cache */ ErtsThrPrgrLaterOp later_op; + + struct transcode_context* transcode_ctx; } DistEntry; typedef struct erl_node_ { diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 8408d7dd12..f2e6399ad7 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -122,6 +122,9 @@ static int encode_size_struct_int(struct TTBSizeContext_*, ErtsAtomCacheMap *acm static Export binary_to_term_trap_export; static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1); +static Sint transcode_dist_obuf(ErtsDistOutputBuf*, DistEntry*, Uint32 dflags, Sint reds); + + void erts_init_external(void) { erts_init_trap_export(&term_to_binary_trap_export, @@ -349,11 +352,13 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) } } -#define PASS_THROUGH 'p' /* This code should go */ -void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, - ErtsAtomCache *cache, - Uint32 dflags) +#define PASS_THROUGH 'p' + +Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, + DistEntry* dep, + Uint32 dflags, + Sint reds) { byte *ip; byte instr_buf[(2+4)*ERTS_ATOM_CACHE_SIZE]; @@ -364,7 +369,7 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, ASSERT(dflags & DFLAG_UTF8_ATOMS); /* - * The buffer can have three different layouts at this point depending on + * The buffer can have different layouts at this point depending on * what was known when encoded: * * Pending connection: CtrlTerm [, MsgTerm] @@ -372,34 +377,28 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, * No atom cache : VERSION_MAGIC, CtrlTerm [, VERSION_MAGIC, MsgTerm] */ - if (ep[0] != VERSION_MAGIC) { + if (ep[0] != VERSION_MAGIC || dep->transcode_ctx) { /* * Was encoded without atom cache toward pending connection. */ ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT); + + if (~dflags & (DFLAG_BIT_BINARIES | DFLAG_EXPORT_PTR_TAG + | DFLAG_DIST_HDR_ATOM_CACHE)) { + reds = transcode_dist_obuf(ob, dep, dflags, reds); + if (reds < 0) + return reds; + ep = ob->extp; + } if (dflags & DFLAG_DIST_HDR_ATOM_CACHE) { /* - * Receiver wants dist header. Let's prepend an empty one. + * Encoding was done without atom caching but receiver expects + * a dist header, so we prepend an empty one. */ *--ep = 0; /* NumberOfAtomCacheRefs */ *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; } - else { - /* - * Primitive receiver without atom cache (erl_interface/jinterface). - * Must prepend VERSION_MAGIC to both ctrl and message - * And add PASSTHROUGH. - */ - if (ob->msg_start) { - ASSERT(ep < ob->msg_start && ob->msg_start < ob->ext_endp); - sys_memmove(ep-1, ep, (ob->msg_start - ep)); - ob->msg_start[-1] = VERSION_MAGIC; - --ep; - } - *--ep = VERSION_MAGIC; - *--ep = PASS_THROUGH; - } goto done; } else if (ep[1] != DIST_HEADER) { @@ -436,6 +435,7 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, / sizeof(Uint32))+1]; register Uint32 flgs; int iix, flgs_bytes, flgs_buf_ix, used_half_bytes; + ErtsAtomCache* cache = dep->cache; #ifdef DEBUG int tot_used_half_bytes; #endif @@ -527,14 +527,16 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, break; } } + reds -= 3; /*was ERTS_PORT_REDS_DIST_CMD_FINALIZE*/ } --ep; put_int8(ci, ep); *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; done: - ASSERT(ep >= ob->data); ob->extp = ep; + ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); + return reds < 0 ? 0 : reds; } int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, @@ -545,7 +547,7 @@ int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, return -1; } else { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_NO_MAGIC))) #endif sz++ /* VERSION_MAGIC */; @@ -3285,6 +3287,7 @@ dec_term_atom_common: n--; if (ctx) { if (reds < n) { + ASSERT(reds > 0); ctx->state = B2TDecodeList; ctx->u.dc.remaining_n = n - reds; n = reds; @@ -4388,7 +4391,7 @@ decoded_size(byte *ep, byte* endp, int internal_tags, B2TContext* ctx) } } else - reds = 0; /* not used but compiler warns anyway */ + ERTS_UNDEF(reds, 0); heap_size = 0; terms = 1; @@ -4698,3 +4701,177 @@ error: #undef SKIP2 #undef CHKSIZE } + + +struct transcode_context { + enum { + TRANSCODE_DEC_MSG_SIZE, + TRANSCODE_DEC_MSG, + TRANSCODE_ENC_CTL, + TRANSCODE_ENC_MSG + }state; + Eterm ctl_term; + Eterm* ctl_heap; + ErtsHeapFactory ctl_factory; + Eterm* msg_heap; + B2TContext b2t; + TTBEncodeContext ttb; +#ifdef DEBUG + ErtsDistOutputBuf* dbg_ob; +#endif +}; + +void transcode_free_ctx(DistEntry* dep) +{ + struct transcode_context* ctx = dep->transcode_ctx; + + erts_factory_close(&ctx->ctl_factory); + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx->ctl_heap); + + if (ctx->msg_heap) { + erts_factory_close(&ctx->b2t.u.dc.factory); + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx->msg_heap); + } + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx); + dep->transcode_ctx = NULL; +} + +Sint transcode_dist_obuf(ErtsDistOutputBuf* ob, + DistEntry* dep, + Uint32 dflags, + Sint reds) +{ + Sint hsz; + byte* decp; + const int have_msg = !!ob->msg_start; + int i; + struct transcode_context* ctx = dep->transcode_ctx; + + if (!ctx) { /* first call for 'ob' */ + + if (~dflags & (DFLAG_BIT_BINARIES | DFLAG_EXPORT_PTR_TAG)) { + /* + * Receiver does not support bitstrings and/or export funs. + * We need to transcode control and message terms to use tuple fallbacks. + */ + ctx = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, sizeof(struct transcode_context)); + dep->transcode_ctx = ctx; + #ifdef DEBUG + ctx->dbg_ob = ob; + #endif + + hsz = decoded_size(ob->extp, ob->ext_endp, 0, NULL); + ctx->ctl_heap = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, hsz*sizeof(Eterm)); + erts_factory_tmp_init(&ctx->ctl_factory, ctx->ctl_heap, hsz, ERTS_ALC_T_DIST_TRANSCODE); + ctx->msg_heap = NULL; + + decp = dec_term(NULL, &ctx->ctl_factory, ob->extp, &ctx->ctl_term, NULL); + if (have_msg) { + ASSERT(decp == ob->msg_start); (void)decp; + ctx->b2t.u.sc.ep = NULL; + ctx->b2t.state = B2TSize; + ctx->b2t.aligned_alloc = NULL; + ctx->b2t.b2ts.exttmp = 0; + ctx->state = TRANSCODE_DEC_MSG_SIZE; + } + else { + ASSERT(decp == ob->ext_endp); + ctx->state = TRANSCODE_ENC_CTL; + } + } + else { + /* + * No need for full transcoding, but primitive receiver (erl_/jinterface) + * expects VERSION_MAGIC before both control and message terms. + */ + if (ob->msg_start) { + Sint ctl_bytes = ob->msg_start - ob->extp; + ASSERT(ob->extp < ob->msg_start && ob->msg_start < ob->ext_endp); + /* Move control term back 1 byte to make room */ + sys_memmove(ob->extp-1, ob->extp, ctl_bytes); + *--(ob->msg_start) = VERSION_MAGIC; + --(ob->extp); + reds -= ctl_bytes / (B2T_BYTES_PER_REDUCTION * B2T_MEMCPY_FACTOR); + } + *--(ob->extp) = VERSION_MAGIC; + goto done; + } + } + else { + ASSERT(ctx->dbg_ob == ob); + } + ctx->b2t.reds = reds * B2T_BYTES_PER_REDUCTION; + + switch (ctx->state) { + case TRANSCODE_DEC_MSG_SIZE: + hsz = decoded_size(ob->msg_start, ob->ext_endp, 0, &ctx->b2t); + if (ctx->b2t.state == B2TSize) { + return -1; + } + ASSERT(ctx->b2t.state == B2TDecodeInit); + ctx->msg_heap = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, hsz*sizeof(Eterm)); + ctx->b2t.u.dc.ep = ob->msg_start; + ctx->b2t.u.dc.res = (Eterm) NULL; + ctx->b2t.u.dc.next = &ctx->b2t.u.dc.res; + erts_factory_tmp_init(&ctx->b2t.u.dc.factory, + ctx->msg_heap, hsz, ERTS_ALC_T_DIST_TRANSCODE); + ctx->b2t.u.dc.flat_maps.wstart = NULL; + ctx->b2t.u.dc.hamt_array.pstart = NULL; + ctx->b2t.state = B2TDecode; + + ctx->state = TRANSCODE_DEC_MSG; + case TRANSCODE_DEC_MSG: + if (ctx->b2t.reds <= 0) + ctx->b2t.reds = 1; + decp = dec_term(NULL, NULL, NULL, NULL, &ctx->b2t); + if (ctx->b2t.state < B2TDone) { + return -1; + } + ASSERT(ctx->b2t.state == B2TDone); + ASSERT(decp && decp <= ob->ext_endp); + reds = ctx->b2t.reds / B2T_BYTES_PER_REDUCTION; + b2t_destroy_context(&ctx->b2t); + + ctx->state = TRANSCODE_ENC_CTL; + case TRANSCODE_ENC_CTL: + if (!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)) { + ASSERT(!(dflags & DFLAG_NO_MAGIC)); + ob->extp -= 2; /* VERSION_MAGIC x 2 */ + } + ob->ext_endp = ob->extp; + i = erts_encode_dist_ext(ctx->ctl_term, &ob->ext_endp, dflags, + NULL, NULL, NULL); + ASSERT(i == 0); (void)i; + ASSERT(ob->ext_endp <= ob->alloc_endp); + + if (!have_msg) { + break; + } + ob->msg_start = ob->ext_endp; + ctx->ttb.wstack.wstart = NULL; + ctx->ttb.flags = dflags; + ctx->ttb.level = 0; + + ctx->state = TRANSCODE_ENC_MSG; + case TRANSCODE_ENC_MSG: + reds *= TERM_TO_BINARY_LOOP_FACTOR; + if (erts_encode_dist_ext(ctx->b2t.u.dc.res, &ob->ext_endp, dflags, NULL, + &ctx->ttb, &reds)) { + return -1; + } + reds /= TERM_TO_BINARY_LOOP_FACTOR; + + ASSERT(ob->ext_endp <= ob->alloc_endp); + + } + transcode_free_ctx(dep); + +done: + if (!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--(ob->extp) = PASS_THROUGH; + + if (reds < 0) + reds = 0; + + return reds; +} diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index 11044b17ef..f9f8abcc27 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -154,7 +154,7 @@ void erts_finalize_atom_cache_map(ErtsAtomCacheMap *, Uint32); Uint erts_encode_ext_dist_header_size(ErtsAtomCacheMap *); byte *erts_encode_ext_dist_header_setup(byte *, ErtsAtomCacheMap *); -void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, ErtsAtomCache *, Uint32); +Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, DistEntry *, Uint32 dflags, Sint reds); struct erts_dsig_send_context; int erts_encode_dist_ext_size(Eterm, Uint32, ErtsAtomCacheMap*, Uint* szp); int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx, Uint* szp); @@ -195,7 +195,7 @@ void erts_binary2term_abort(ErtsBinary2TermState *); Eterm erts_binary2term_create(ErtsBinary2TermState *, ErtsHeapFactory*); int erts_debug_max_atom_out_cache_index(void); int erts_debug_atom_to_out_cache_index(Eterm); - +void transcode_free_ctx(DistEntry* dep); #if ERTS_GLB_INLINE_INCL_FUNC_DEF |