diff options
Diffstat (limited to 'erts/emulator/beam/external.c')
-rw-r--r-- | erts/emulator/beam/external.c | 288 |
1 files changed, 210 insertions, 78 deletions
diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 1ded5f031c..73eae614fa 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -262,19 +262,12 @@ erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags) if (acmp) { int long_atoms = 0; /* !0 if one or more atoms are longer than 255. */ int i; - int sz; - int fix_sz - = 1 /* VERSION_MAGIC */ - + 1 /* DIST_HEADER */ - + 1 /* dist header flags */ - + 1 /* number of internal cache entries */ - ; + int sz = 0; int min_sz; ASSERT(dflags & DFLAG_UTF8_ATOMS); ASSERT(acmp->hdr_sz < 0); /* Make sure cache update instructions fit */ - min_sz = fix_sz+(2+4)*acmp->sz; - sz = fix_sz; + min_sz = (2+4)*acmp->sz; for (i = 0; i < acmp->sz; i++) { Atom *a; Eterm atom; @@ -302,17 +295,28 @@ erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags) } Uint -erts_encode_ext_dist_header_size(ErtsAtomCacheMap *acmp) +erts_encode_ext_dist_header_size(ErtsAtomCacheMap *acmp, Uint fragments) { if (!acmp) return 0; else { + int fix_sz + = 1 /* VERSION_MAGIC */ + + 1 /* DIST_HEADER */ + + 1 /* dist header flags */ + + 1 /* number of internal cache entries */ + ; ASSERT(acmp->hdr_sz >= 0); - return acmp->hdr_sz; + if (fragments > 1) + fix_sz += 8 /* sequence id */ + + 8 /* number of fragments */ + ; + return fix_sz + acmp->hdr_sz; } } -byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) +byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp, + Uint fragments, Eterm from) { /* Maximum number of atom must be less than the maximum of a 32 bits unsigned integer. Check is done in erl_init.c, erl_start function. */ @@ -346,12 +350,37 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) put_int8(acmp->sz, ep); --ep; put_int8(dist_hdr_flags, ep); - *--ep = DIST_HEADER; - *--ep = VERSION_MAGIC; + if (fragments > 1) { + ASSERT(is_pid(from)); + ep -= 8; + put_int64(fragments, ep); + ep -= 8; + put_int64(from, ep); + *--ep = DIST_FRAG_HEADER; + } else { + *--ep = DIST_HEADER; + } + *--ep = VERSION_MAGIC; return ep; } } +byte *erts_encode_ext_dist_header_fragment(byte **hdrpp, + Uint fragment, + Eterm from) +{ + byte *ep = *hdrpp, *start = ep; + ASSERT(is_pid(from)); + *ep++ = VERSION_MAGIC; + *ep++ = DIST_FRAG_CONT; + put_int64(from, ep); + ep += 8; + put_int64(fragment, ep); + ep += 8; + *hdrpp = ep; + return start; +} + #define PASS_THROUGH 'p' @@ -365,7 +394,8 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, int ci, sz; byte dist_hdr_flags; int long_atoms; - register byte *ep = ob->extp; + Uint64 seq_id = 0, frag_id = 0; + register byte *ep = ob->hdrp ? ob->hdrp : ob->extp; ASSERT(dflags & DFLAG_UTF8_ATOMS); /* @@ -416,7 +446,7 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, } goto done; } - else if (ep[1] != DIST_HEADER) { + else if (ep[1] != DIST_HEADER && ep[1] != DIST_FRAG_HEADER && ep[1] != DIST_FRAG_CONT) { ASSERT(ep[1] == SMALL_TUPLE_EXT || ep[1] == LARGE_TUPLE_EXT); ASSERT(!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)); /* Node without atom cache, 'pass through' needed */ @@ -424,6 +454,17 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, goto done; } + if (ep[1] == DIST_FRAG_CONT) { + ep = ob->extp; + goto done; + } else if (ep[1] == DIST_FRAG_HEADER) { + /* skip the seq id and frag id */ + seq_id = get_int64(&ep[2]); + ep += 8; + frag_id = get_int64(&ep[2]); + ep += 8; + } + dist_hdr_flags = ep[2]; long_atoms = ERTS_DIST_HDR_LONG_ATOMS_FLG & ((int) dist_hdr_flags); @@ -546,11 +587,19 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, } --ep; put_int8(ci, ep); - *--ep = DIST_HEADER; + if (seq_id) { + ep -= 8; + put_int64(frag_id, ep); + ep -= 8; + put_int64(seq_id, ep); + *--ep = DIST_FRAG_HEADER; + } else { + *--ep = DIST_HEADER; + } *--ep = VERSION_MAGIC; done: ob->extp = ep; - ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); + ASSERT((byte*)ob->bin->orig_bytes <= ob->extp && ob->extp < ob->ext_endp); return reds < 0 ? 0 : reds; } @@ -571,7 +620,7 @@ int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, } } -int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx, Uint* szp) +int erts_encode_dist_ext_size_int(Eterm term, ErtsDSigSendContext *ctx, Uint* szp) { Uint sz; if (encode_size_struct_int(&ctx->u.sc, ctx->acmp, term, ctx->flags, &ctx->reds, &sz)) { @@ -635,56 +684,106 @@ byte* erts_encode_ext_ets(Eterm term, byte *ep, struct erl_off_heap_header** off off_heap); } -ErtsDistExternal * -erts_make_dist_ext_copy(ErtsDistExternal *edep, Uint xsize) + +static Uint +dist_ext_size(ErtsDistExternal *edep) { - size_t align_sz; - size_t dist_ext_sz; - size_t ext_sz; - byte *ep; - ErtsDistExternal *new_edep; + Uint sz = sizeof(ErtsDistExternal); + + ASSERT(edep->data->ext_endp && edep->data->extp); + ASSERT(edep->data->ext_endp >= edep->data->extp); + + if (edep->flags & ERTS_DIST_EXT_ATOM_TRANS_TAB) { + ASSERT(0 <= edep->attab.size \ + && edep->attab.size <= ERTS_ATOM_CACHE_SIZE); + sz -= sizeof(Eterm)*(ERTS_ATOM_CACHE_SIZE - edep->attab.size); + } else { + sz -= sizeof(ErtsAtomTranslationTable); + } + return sz; +} - dist_ext_sz = ERTS_DIST_EXT_SIZE(edep); - ASSERT(edep->ext_endp && edep->extp); - ASSERT(edep->ext_endp >= edep->extp); - ext_sz = edep->ext_endp - edep->extp; +Uint +erts_dist_ext_size(ErtsDistExternal *edep) +{ + Uint sz = dist_ext_size(edep); + sz += edep->data[0].frag_id * sizeof(ErtsDistExternalData); + return sz + ERTS_EXTRA_DATA_ALIGN_SZ(sz); +} - align_sz = ERTS_EXTRA_DATA_ALIGN_SZ(dist_ext_sz + ext_sz); +Uint +erts_dist_ext_data_size(ErtsDistExternal *edep) +{ + Uint sz = 0, i; + for (i = 0; i < edep->data->frag_id; i++) + sz += edep->data[i].ext_endp - edep->data[i].extp; + return sz; +} - new_edep = erts_alloc(ERTS_ALC_T_EXT_TERM_DATA, - dist_ext_sz + ext_sz + align_sz + xsize); +void +erts_dist_ext_frag(ErtsDistExternalData *ede_datap, ErtsDistExternal *edep) +{ + ErtsDistExternalData *new_ede_datap = &edep->data[edep->data->frag_id - ede_datap->frag_id]; + sys_memcpy(new_ede_datap, ede_datap, sizeof(ErtsDistExternalData)); + + /* If the data is not backed by a binary, we create one here to keep + things simple. Only custom distribution drivers should use lists. */ + if (new_ede_datap->binp == NULL) { + size_t ext_sz = ede_datap->ext_endp - ede_datap->extp; + new_ede_datap->binp = erts_bin_nrml_alloc(ext_sz); + sys_memcpy(new_ede_datap->binp->orig_bytes, (void *) ede_datap->extp, ext_sz); + new_ede_datap->extp = (byte*)new_ede_datap->binp->orig_bytes; + new_ede_datap->ext_endp = (byte*)new_ede_datap->binp->orig_bytes + ext_sz; + } else { + erts_refc_inc(&new_ede_datap->binp->intern.refc, 2); + } +} + +void +erts_make_dist_ext_copy(ErtsDistExternal *edep, ErtsDistExternal *new_edep) +{ + size_t dist_ext_sz = dist_ext_size(edep); + byte *ep; ep = (byte *) new_edep; sys_memcpy((void *) ep, (void *) edep, dist_ext_sz); + erts_ref_dist_entry(new_edep->dep); + ep += dist_ext_sz; - if (new_edep->dep) - erts_ref_dist_entry(new_edep->dep); - new_edep->extp = ep; - new_edep->ext_endp = ep + ext_sz; - new_edep->heap_size = -1; - sys_memcpy((void *) ep, (void *) edep->extp, ext_sz); - return new_edep; + + new_edep->data = (ErtsDistExternalData*)ep; + sys_memzero(new_edep->data, sizeof(ErtsDistExternalData) * edep->data->frag_id); + new_edep->data->frag_id = edep->data->frag_id; + erts_dist_ext_frag(edep->data, new_edep); } -int +void +erts_free_dist_ext_copy(ErtsDistExternal *edep) +{ + int i; + erts_deref_dist_entry(edep->dep); + for (i = 0; i < edep->data->frag_id; i++) + if (edep->data[i].binp) + erts_bin_release(edep->data[i].binp); +} + +ErtsPrepDistExtRes erts_prepare_dist_ext(ErtsDistExternal *edep, byte *ext, Uint size, + Binary *binp, DistEntry *dep, Uint32 conn_id, ErtsAtomCache *cache) { register byte *ep; - edep->heap_size = -1; - edep->flags = 0; - edep->dep = dep; - ASSERT(dep); erts_de_rlock(dep); ASSERT(dep->flags & DFLAG_UTF8_ATOMS); + if ((dep->state != ERTS_DE_STATE_CONNECTED && dep->state != ERTS_DE_STATE_PENDING) || dep->connection_id != conn_id) { @@ -697,7 +796,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, ext++; size--; } - edep->ext_endp = ext + size; + ep = ext; if (size < 2) @@ -713,16 +812,33 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, goto fail; } + edep->heap_size = -1; + edep->flags = 0; + edep->dep = dep; + edep->connection_id = conn_id; + edep->data->ext_endp = ext+size; + edep->data->binp = binp; + edep->data->seq_id = 0; + edep->data->frag_id = 1; + if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) edep->flags |= ERTS_DIST_EXT_DFLAG_HDR; - edep->connection_id = dep->connection_id; - - if (ep[1] != DIST_HEADER) { + if (ep[1] != DIST_HEADER && ep[1] != DIST_FRAG_HEADER && ep[1] != DIST_FRAG_CONT) { if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) goto bad_hdr; edep->attab.size = 0; - edep->extp = ext; + edep->data->extp = ext; + } + else if (ep[1] == DIST_FRAG_CONT) { + if (!(dep->flags & DFLAG_FRAGMENTS)) + goto bad_hdr; + edep->attab.size = 0; + edep->data->extp = ext + 1 + 1 + 8 + 8; + edep->data->seq_id = get_int64(&ep[2]); + edep->data->frag_id = get_int64(&ep[2+8]); + erts_de_runlock(dep); + return ERTS_PREP_DIST_EXT_FRAG_CONT; } else { int tix; @@ -731,9 +847,17 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, if (!(edep->flags & ERTS_DIST_EXT_DFLAG_HDR)) goto bad_hdr; + if (ep[1] == DIST_FRAG_HEADER) { + if (!(dep->flags & DFLAG_FRAGMENTS)) + goto bad_hdr; + edep->data->seq_id = get_int64(&ep[2]); + edep->data->frag_id = get_int64(&ep[2+8]); + ep += 16; + } + #undef CHKSIZE #define CHKSIZE(SZ) \ - do { if ((SZ) > edep->ext_endp - ep) goto bad_hdr; } while(0) + do { if ((SZ) > edep->data->ext_endp - ep) goto bad_hdr; } while(0) CHKSIZE(1+1+1); ep += 2; @@ -863,7 +987,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, #endif } } - edep->extp = ep; + edep->data->extp = ep; #ifdef ERTS_DEBUG_USE_DIST_SEP if (*ep != VERSION_MAGIC) goto bad_hdr; @@ -888,7 +1012,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, erts_this_node->sysname, edep->dep->sysname, dist_entry_channel_no(edep->dep)); - for (ep = ext; ep < edep->ext_endp; ep++) + for (ep = ext; ep < edep->data->ext_endp; ep++) erts_dsprintf(dsbufp, ep != ext ? ",%b8u" : "<<%b8u", *ep); erts_dsprintf(dsbufp, ">>"); erts_send_warning_to_logger_nogl(dsbufp); @@ -913,9 +1037,9 @@ bad_dist_ext(ErtsDistExternal *edep) erts_this_node->sysname, dep->sysname, dist_entry_channel_no(dep)); - for (ep = edep->extp; ep < edep->ext_endp; ep++) + for (ep = edep->data->extp; ep < edep->data->ext_endp; ep++) erts_dsprintf(dsbufp, - ep != edep->extp ? ",%b8u" : "<<...,%b8u", + ep != edep->data->extp ? ",%b8u" : "<<...,%b8u", *ep); erts_dsprintf(dsbufp, ">>\n"); erts_dsprintf(dsbufp, "ATOM_CACHE_REF translations: "); @@ -933,30 +1057,32 @@ bad_dist_ext(ErtsDistExternal *edep) } Sint -erts_decode_dist_ext_size(ErtsDistExternal *edep) +erts_decode_dist_ext_size(ErtsDistExternal *edep, int kill_connection) { Sint res; byte *ep; - if (edep->extp >= edep->ext_endp) + + if (edep->data->extp >= edep->data->ext_endp) goto fail; #ifndef ERTS_DEBUG_USE_DIST_SEP if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) { - if (*edep->extp == VERSION_MAGIC) + if (*edep->data->extp == VERSION_MAGIC) goto fail; - ep = edep->extp; + ep = edep->data->extp; } else #endif { - if (*edep->extp != VERSION_MAGIC) + if (*edep->data->extp != VERSION_MAGIC) goto fail; - ep = edep->extp+1; + ep = edep->data->extp+1; } - res = decoded_size(ep, edep->ext_endp, 0, NULL); + res = decoded_size(ep, edep->data->ext_endp, 0, NULL); if (res >= 0) return res; fail: - bad_dist_ext(edep); + if (kill_connection) + bad_dist_ext(edep); return -1; } @@ -982,12 +1108,15 @@ Sint erts_decode_ext_size_ets(byte *ext, Uint size) */ Eterm erts_decode_dist_ext(ErtsHeapFactory* factory, - ErtsDistExternal *edep) + ErtsDistExternal *edep, + int kill_connection) { Eterm obj; - byte* ep = edep->extp; + byte* ep; + + ep = edep->data->extp; - if (ep >= edep->ext_endp) + if (ep >= edep->data->ext_endp) goto error; #ifndef ERTS_DEBUG_USE_DIST_SEP if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) { @@ -1005,14 +1134,15 @@ erts_decode_dist_ext(ErtsHeapFactory* factory, if (!ep) goto error; - edep->extp = ep; + edep->data->extp = ep; return obj; error: erts_factory_undo(factory); - bad_dist_ext(edep); + if (kill_connection) + bad_dist_ext(edep); return THE_NON_VALUE; } @@ -1057,6 +1187,7 @@ BIF_RETTYPE erts_debug_dist_ext_to_term_2(BIF_ALIST_2) Eterm res; Sint hsz; ErtsDistExternal ede; + ErtsDistExternalData ede_data; Eterm *tp; Eterm real_bin; Uint offset; @@ -1069,7 +1200,8 @@ BIF_RETTYPE erts_debug_dist_ext_to_term_2(BIF_ALIST_2) ede.flags = ERTS_DIST_EXT_ATOM_TRANS_TAB; ede.dep = NULL; ede.heap_size = -1; - + ede.data = &ede_data; + if (is_not_tuple(BIF_ARG_1)) goto badarg; tp = tuple_val(BIF_ARG_1); @@ -1094,15 +1226,15 @@ BIF_RETTYPE erts_debug_dist_ext_to_term_2(BIF_ALIST_2) if (bitsize != 0) goto badarg; - ede.extp = binary_bytes(real_bin)+offset; - ede.ext_endp = ede.extp + size; + ede.data->extp = binary_bytes(real_bin)+offset; + ede.data->ext_endp = ede.data->extp + size; - hsz = erts_decode_dist_ext_size(&ede); + hsz = erts_decode_dist_ext_size(&ede, 1); if (hsz < 0) goto badarg; erts_factory_proc_prealloc_init(&factory, BIF_P, hsz); - res = erts_decode_dist_ext(&factory, &ede); + res = erts_decode_dist_ext(&factory, &ede, 1); erts_factory_close(&factory); if (is_value(res)) @@ -2348,7 +2480,7 @@ dec_atom(ErtsDistExternal *edep, byte* ep, Eterm* objp) return ep; } -static ERTS_INLINE ErlNode* dec_get_node(Eterm sysname, Uint32 creation) +static ERTS_INLINE ErlNode* dec_get_node(Eterm sysname, Uint32 creation, Eterm book) { if (sysname == INTERNAL_LOCAL_SYSNAME) /* && DFLAG_INTERNAL_TAGS */ return erts_this_node; @@ -2357,7 +2489,7 @@ static ERTS_INLINE ErlNode* dec_get_node(Eterm sysname, Uint32 creation) && (creation == erts_this_node->creation || creation == ORIG_CREATION)) return erts_this_node; - return erts_find_or_insert_node(sysname,creation); + return erts_find_or_insert_node(sysname,creation,book); } static byte* @@ -2403,7 +2535,7 @@ dec_pid(ErtsDistExternal *edep, ErtsHeapFactory* factory, byte* ep, * We are careful to create the node entry only after all * validity tests are done. */ - node = dec_get_node(sysname, cre); + node = dec_get_node(sysname, cre, make_boxed(factory->hp)); if(node == erts_this_node) { *objp = make_internal_pid(data); @@ -3397,7 +3529,7 @@ dec_term_atom_common: cre = get_int32(ep); ep += 4; } - node = dec_get_node(sysname, cre); + node = dec_get_node(sysname, cre, make_boxed(hp)); if(node == erts_this_node) { *objp = make_internal_port(num); } @@ -3477,7 +3609,7 @@ dec_term_atom_common: if (ref_words > ERTS_MAX_REF_NUMBERS) goto error; - node = dec_get_node(sysname, cre); + node = dec_get_node(sysname, cre, make_boxed(hp)); if(node == erts_this_node) { rtp = (ErtsORefThing *) hp; |