aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/external.c
diff options
context:
space:
mode:
authorLukas Larsson <[email protected]>2018-09-26 11:55:01 +0200
committerLukas Larsson <[email protected]>2019-02-22 11:12:53 +0100
commitf2c4f6f83deecba0c2527e520f0f18fba7d84815 (patch)
tree5009cc4300cdca28dc0f76daaa541051f8d44a01 /erts/emulator/beam/external.c
parent6686877360432144bacbf4e95c23b1232eab1b08 (diff)
downloadotp-f2c4f6f83deecba0c2527e520f0f18fba7d84815.tar.gz
otp-f2c4f6f83deecba0c2527e520f0f18fba7d84815.tar.bz2
otp-f2c4f6f83deecba0c2527e520f0f18fba7d84815.zip
erts: Implement fragmentation of distrubution messages
Diffstat (limited to 'erts/emulator/beam/external.c')
-rw-r--r--erts/emulator/beam/external.c298
1 files changed, 195 insertions, 103 deletions
diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c
index 640ca338d9..84e6e0d6fd 100644
--- a/erts/emulator/beam/external.c
+++ b/erts/emulator/beam/external.c
@@ -262,19 +262,12 @@ erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags)
if (acmp) {
int long_atoms = 0; /* !0 if one or more atoms are longer than 255. */
int i;
- int sz;
- int fix_sz
- = 1 /* VERSION_MAGIC */
- + 1 /* DIST_HEADER */
- + 1 /* dist header flags */
- + 1 /* number of internal cache entries */
- ;
+ int sz = 0;
int min_sz;
ASSERT(dflags & DFLAG_UTF8_ATOMS);
ASSERT(acmp->hdr_sz < 0);
/* Make sure cache update instructions fit */
- min_sz = fix_sz+(2+4)*acmp->sz;
- sz = fix_sz;
+ min_sz = (2+4)*acmp->sz;
for (i = 0; i < acmp->sz; i++) {
Atom *a;
Eterm atom;
@@ -302,17 +295,28 @@ erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags)
}
Uint
-erts_encode_ext_dist_header_size(ErtsAtomCacheMap *acmp)
+erts_encode_ext_dist_header_size(ErtsAtomCacheMap *acmp, Uint fragments)
{
if (!acmp)
return 0;
else {
+ int fix_sz
+ = 1 /* VERSION_MAGIC */
+ + 1 /* DIST_HEADER */
+ + 1 /* dist header flags */
+ + 1 /* number of internal cache entries */
+ ;
ASSERT(acmp->hdr_sz >= 0);
- return acmp->hdr_sz;
+ if (fragments > 1)
+ fix_sz += 8 /* sequence id */
+ + 8 /* number of fragments */
+ ;
+ return fix_sz + acmp->hdr_sz;
}
}
-byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp)
+byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp,
+ Uint fragments, Eterm from)
{
/* Maximum number of atom must be less than the maximum of a 32 bits
unsigned integer. Check is done in erl_init.c, erl_start function. */
@@ -346,12 +350,37 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp)
put_int8(acmp->sz, ep);
--ep;
put_int8(dist_hdr_flags, ep);
- *--ep = DIST_HEADER;
- *--ep = VERSION_MAGIC;
+ if (fragments > 1) {
+ ASSERT(is_pid(from));
+ ep -= 8;
+ put_int64(fragments, ep);
+ ep -= 8;
+ put_int64(from, ep);
+ *--ep = DIST_FRAG_HEADER;
+ } else {
+ *--ep = DIST_HEADER;
+ }
+ *--ep = VERSION_MAGIC;
return ep;
}
}
+byte *erts_encode_ext_dist_header_fragment(byte **hdrpp,
+ Uint fragment,
+ Eterm from)
+{
+ byte *ep = *hdrpp, *start = ep;
+ ASSERT(is_pid(from));
+ *ep++ = VERSION_MAGIC;
+ *ep++ = DIST_FRAG_CONT;
+ put_int64(from, ep);
+ ep += 8;
+ put_int64(fragment, ep);
+ ep += 8;
+ *hdrpp = ep;
+ return start;
+}
+
#define PASS_THROUGH 'p'
@@ -365,7 +394,8 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
int ci, sz;
byte dist_hdr_flags;
int long_atoms;
- register byte *ep = ob->extp;
+ Uint64 seq_id = 0, frag_id = 0;
+ register byte *ep = ob->hdrp ? ob->hdrp : ob->extp;
ASSERT(dflags & DFLAG_UTF8_ATOMS);
/*
@@ -416,7 +446,7 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
}
goto done;
}
- else if (ep[1] != DIST_HEADER) {
+ else if (ep[1] != DIST_HEADER && ep[1] != DIST_FRAG_HEADER && ep[1] != DIST_FRAG_CONT) {
ASSERT(ep[1] == SMALL_TUPLE_EXT || ep[1] == LARGE_TUPLE_EXT);
ASSERT(!(dflags & DFLAG_DIST_HDR_ATOM_CACHE));
/* Node without atom cache, 'pass through' needed */
@@ -424,6 +454,17 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
goto done;
}
+ if (ep[1] == DIST_FRAG_CONT) {
+ ep = ob->extp;
+ goto done;
+ } else if (ep[1] == DIST_FRAG_HEADER) {
+ /* skip the seq id and frag id */
+ seq_id = get_int64(&ep[2]);
+ ep += 8;
+ frag_id = get_int64(&ep[2]);
+ ep += 8;
+ }
+
dist_hdr_flags = ep[2];
long_atoms = ERTS_DIST_HDR_LONG_ATOMS_FLG & ((int) dist_hdr_flags);
@@ -546,11 +587,19 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
}
--ep;
put_int8(ci, ep);
- *--ep = DIST_HEADER;
+ if (seq_id) {
+ ep -= 8;
+ put_int64(frag_id, ep);
+ ep -= 8;
+ put_int64(seq_id, ep);
+ *--ep = DIST_FRAG_HEADER;
+ } else {
+ *--ep = DIST_HEADER;
+ }
*--ep = VERSION_MAGIC;
done:
ob->extp = ep;
- ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp);
+ ASSERT((byte*)ob->bin->orig_bytes <= ob->extp && ob->extp < ob->ext_endp);
return reds < 0 ? 0 : reds;
}
@@ -635,72 +684,90 @@ byte* erts_encode_ext_ets(Eterm term, byte *ep, struct erl_off_heap_header** off
off_heap);
}
-ErtsDistExternal *
-erts_make_dist_ext_copy(ErtsDistExternal *edep, Eterm *token)
+
+static Uint
+dist_ext_size(ErtsDistExternal *edep)
{
- size_t align_sz;
- size_t dist_ext_sz;
- size_t ext_sz = 0;
- size_t token_sz = 0;
- Eterm token_size;
- byte *ep;
- ErtsDistExternal *new_edep;
+ Uint sz = sizeof(ErtsDistExternal);
- dist_ext_sz = ERTS_DIST_EXT_SIZE(edep);
- ASSERT(edep->ext_endp && edep->extp);
- ASSERT(edep->ext_endp >= edep->extp);
- if (edep->binp == NULL)
- ext_sz = edep->ext_endp - edep->extp;
+ ASSERT(edep->data->ext_endp && edep->data->extp);
+ ASSERT(edep->data->ext_endp >= edep->data->extp);
- align_sz = ERTS_EXTRA_DATA_ALIGN_SZ(dist_ext_sz + ext_sz);
+ if (edep->flags & ERTS_DIST_EXT_ATOM_TRANS_TAB) {
+ ASSERT(0 <= edep->attab.size \
+ && edep->attab.size <= ERTS_ATOM_CACHE_SIZE);
+ sz -= sizeof(Eterm)*(ERTS_ATOM_CACHE_SIZE - edep->attab.size);
+ } else {
+ sz -= sizeof(ErtsAtomTranslationTable);
+ }
+ return sz;
+}
- token_size = size_object(*token);
- if (token_size)
- token_sz = ERTS_HEAP_FRAG_SIZE(token_size);
+Uint
+erts_dist_ext_size(ErtsDistExternal *edep)
+{
+ Uint sz = dist_ext_size(edep);
+ sz += edep->data[0].frag_id * sizeof(ErtsDistExternalData);
+ return sz + ERTS_EXTRA_DATA_ALIGN_SZ(sz);
+}
- new_edep = erts_alloc(ERTS_ALC_T_EXT_TERM_DATA,
- dist_ext_sz + ext_sz + align_sz + token_sz);
+Uint
+erts_dist_ext_data_size(ErtsDistExternal *edep)
+{
+ Uint sz = 0, i;
+ for (i = 0; i < edep->data->frag_id; i++)
+ sz += edep->data[i].ext_endp - edep->data[i].extp;
+ return sz;
+}
- ep = (byte *) new_edep;
- sys_memcpy((void *) ep, (void *) edep, dist_ext_sz);
- if (new_edep->dep)
- erts_ref_dist_entry(new_edep->dep);
- new_edep->heap_size = -1;
-
- if (ext_sz) {
- ep += dist_ext_sz;
- new_edep->extp = ep;
- new_edep->ext_endp = ep + ext_sz;
- sys_memcpy((void *) ep, (void *) edep->extp, ext_sz);
+void
+erts_dist_ext_frag(ErtsDistExternalData *ede_datap, ErtsDistExternal *edep)
+{
+ ErtsDistExternalData *new_ede_datap = &edep->data[edep->data->frag_id - ede_datap->frag_id];
+ sys_memcpy(new_ede_datap, ede_datap, sizeof(ErtsDistExternalData));
+
+ /* If the data is not backed by a binary, we create one here to keep
+ things simple. Only custom distribution drivers should use lists. */
+ if (new_ede_datap->binp == NULL) {
+ size_t ext_sz = ede_datap->ext_endp - ede_datap->extp;
+ new_ede_datap->binp = erts_bin_nrml_alloc(ext_sz);
+ sys_memcpy(new_ede_datap->binp->orig_bytes, (void *) ede_datap->extp, ext_sz);
+ new_ede_datap->extp = (byte*)new_ede_datap->binp->orig_bytes;
+ new_ede_datap->ext_endp = (byte*)new_ede_datap->binp->orig_bytes + ext_sz;
} else {
- erts_refc_inc(&new_edep->binp->intern.refc, 2);
+ erts_refc_inc(&new_ede_datap->binp->intern.refc, 2);
}
+}
- /* Copy the seq_trace token */
- if (is_not_nil(*token)) {
- ErlHeapFragment *heap_frag;
- ErlOffHeap *ohp;
- Eterm *hp;
- heap_frag = erts_dist_ext_trailer(new_edep);
- ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
- hp = heap_frag->mem;
- ohp = &heap_frag->off_heap;
- *token = copy_struct(*token, token_size, &hp, ohp);
- }
- return new_edep;
+void
+erts_make_dist_ext_copy(ErtsDistExternal *edep, ErtsDistExternal *new_edep)
+{
+ size_t dist_ext_sz = dist_ext_size(edep);
+ byte *ep;
+
+ ep = (byte *) new_edep;
+ sys_memcpy((void *) ep, (void *) edep, dist_ext_sz);
+ erts_ref_dist_entry(new_edep->dep);
+
+ ep += dist_ext_sz;
+
+ new_edep->data = (ErtsDistExternalData*)ep;
+ sys_memzero(new_edep->data, sizeof(ErtsDistExternalData) * edep->data->frag_id);
+ new_edep->data->frag_id = edep->data->frag_id;
+ erts_dist_ext_frag(edep->data, new_edep);
}
void
erts_free_dist_ext_copy(ErtsDistExternal *edep)
{
- if (edep->dep)
- erts_deref_dist_entry(edep->dep);
- if (edep->binp)
- erts_bin_release(edep->binp);
- erts_free(ERTS_ALC_T_EXT_TERM_DATA, edep);
+ int i;
+ erts_deref_dist_entry(edep->dep);
+ for (i = 0; i < edep->data->frag_id; i++)
+ if (edep->data[i].binp)
+ erts_bin_release(edep->data[i].binp);
}
-int
+ErtsPrepDistExtRes
erts_prepare_dist_ext(ErtsDistExternal *edep,
byte *ext,
Uint size,
@@ -711,10 +778,6 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
{
register byte *ep;
- edep->heap_size = -1;
- edep->flags = 0;
- edep->dep = dep;
-
ASSERT(dep);
erts_de_rlock(dep);
@@ -734,10 +797,6 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
size--;
}
- edep->heap_size = -1;
- edep->ext_endp = ext + size;
- edep->binp = binp;
-
ep = ext;
if (size < 2)
@@ -753,16 +812,33 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
goto fail;
}
+ edep->heap_size = -1;
+ edep->flags = 0;
+ edep->dep = dep;
+ edep->connection_id = conn_id;
+ edep->data->ext_endp = ext+size;
+ edep->data->binp = binp;
+ edep->data->seq_id = 0;
+ edep->data->frag_id = 1;
+
if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)
edep->flags |= ERTS_DIST_EXT_DFLAG_HDR;
- edep->connection_id = dep->connection_id;
-
- if (ep[1] != DIST_HEADER) {
+ if (ep[1] != DIST_HEADER && ep[1] != DIST_FRAG_HEADER && ep[1] != DIST_FRAG_CONT) {
if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR)
goto bad_hdr;
edep->attab.size = 0;
- edep->extp = ext;
+ edep->data->extp = ext;
+ }
+ else if (ep[1] == DIST_FRAG_CONT) {
+ if (!(dep->flags & DFLAG_FRAGMENTS))
+ goto bad_hdr;
+ edep->attab.size = 0;
+ edep->data->extp = ext + 1 + 1 + 8 + 8;
+ edep->data->seq_id = get_int64(&ep[2]);
+ edep->data->frag_id = get_int64(&ep[2+8]);
+ erts_de_runlock(dep);
+ return ERTS_PREP_DIST_EXT_FRAG_CONT;
}
else {
int tix;
@@ -771,9 +847,17 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
if (!(edep->flags & ERTS_DIST_EXT_DFLAG_HDR))
goto bad_hdr;
+ if (ep[1] == DIST_FRAG_HEADER) {
+ if (!(dep->flags & DFLAG_FRAGMENTS))
+ goto bad_hdr;
+ edep->data->seq_id = get_int64(&ep[2]);
+ edep->data->frag_id = get_int64(&ep[2+8]);
+ ep += 16;
+ }
+
#undef CHKSIZE
#define CHKSIZE(SZ) \
- do { if ((SZ) > edep->ext_endp - ep) goto bad_hdr; } while(0)
+ do { if ((SZ) > edep->data->ext_endp - ep) goto bad_hdr; } while(0)
CHKSIZE(1+1+1);
ep += 2;
@@ -903,7 +987,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
#endif
}
}
- edep->extp = ep;
+ edep->data->extp = ep;
#ifdef ERTS_DEBUG_USE_DIST_SEP
if (*ep != VERSION_MAGIC)
goto bad_hdr;
@@ -928,7 +1012,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
erts_this_node->sysname,
edep->dep->sysname,
dist_entry_channel_no(edep->dep));
- for (ep = ext; ep < edep->ext_endp; ep++)
+ for (ep = ext; ep < edep->data->ext_endp; ep++)
erts_dsprintf(dsbufp, ep != ext ? ",%b8u" : "<<%b8u", *ep);
erts_dsprintf(dsbufp, ">>");
erts_send_warning_to_logger_nogl(dsbufp);
@@ -953,9 +1037,9 @@ bad_dist_ext(ErtsDistExternal *edep)
erts_this_node->sysname,
dep->sysname,
dist_entry_channel_no(dep));
- for (ep = edep->extp; ep < edep->ext_endp; ep++)
+ for (ep = edep->data->extp; ep < edep->data->ext_endp; ep++)
erts_dsprintf(dsbufp,
- ep != edep->extp ? ",%b8u" : "<<...,%b8u",
+ ep != edep->data->extp ? ",%b8u" : "<<...,%b8u",
*ep);
erts_dsprintf(dsbufp, ">>\n");
erts_dsprintf(dsbufp, "ATOM_CACHE_REF translations: ");
@@ -973,30 +1057,32 @@ bad_dist_ext(ErtsDistExternal *edep)
}
Sint
-erts_decode_dist_ext_size(ErtsDistExternal *edep)
+erts_decode_dist_ext_size(ErtsDistExternal *edep, int kill_connection)
{
Sint res;
byte *ep;
- if (edep->extp >= edep->ext_endp)
+
+ if (edep->data->extp >= edep->data->ext_endp)
goto fail;
#ifndef ERTS_DEBUG_USE_DIST_SEP
if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) {
- if (*edep->extp == VERSION_MAGIC)
+ if (*edep->data->extp == VERSION_MAGIC)
goto fail;
- ep = edep->extp;
+ ep = edep->data->extp;
}
else
#endif
{
- if (*edep->extp != VERSION_MAGIC)
+ if (*edep->data->extp != VERSION_MAGIC)
goto fail;
- ep = edep->extp+1;
+ ep = edep->data->extp+1;
}
- res = decoded_size(ep, edep->ext_endp, 0, NULL);
+ res = decoded_size(ep, edep->data->ext_endp, 0, NULL);
if (res >= 0)
return res;
fail:
- bad_dist_ext(edep);
+ if (kill_connection)
+ bad_dist_ext(edep);
return -1;
}
@@ -1022,12 +1108,15 @@ Sint erts_decode_ext_size_ets(byte *ext, Uint size)
*/
Eterm
erts_decode_dist_ext(ErtsHeapFactory* factory,
- ErtsDistExternal *edep)
+ ErtsDistExternal *edep,
+ int kill_connection)
{
Eterm obj;
- byte* ep = edep->extp;
+ byte* ep;
+
+ ep = edep->data->extp;
- if (ep >= edep->ext_endp)
+ if (ep >= edep->data->ext_endp)
goto error;
#ifndef ERTS_DEBUG_USE_DIST_SEP
if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) {
@@ -1045,14 +1134,15 @@ erts_decode_dist_ext(ErtsHeapFactory* factory,
if (!ep)
goto error;
- edep->extp = ep;
+ edep->data->extp = ep;
return obj;
error:
erts_factory_undo(factory);
- bad_dist_ext(edep);
+ if (kill_connection)
+ bad_dist_ext(edep);
return THE_NON_VALUE;
}
@@ -1097,6 +1187,7 @@ BIF_RETTYPE erts_debug_dist_ext_to_term_2(BIF_ALIST_2)
Eterm res;
Sint hsz;
ErtsDistExternal ede;
+ ErtsDistExternalData ede_data;
Eterm *tp;
Eterm real_bin;
Uint offset;
@@ -1109,7 +1200,8 @@ BIF_RETTYPE erts_debug_dist_ext_to_term_2(BIF_ALIST_2)
ede.flags = ERTS_DIST_EXT_ATOM_TRANS_TAB;
ede.dep = NULL;
ede.heap_size = -1;
-
+ ede.data = &ede_data;
+
if (is_not_tuple(BIF_ARG_1))
goto badarg;
tp = tuple_val(BIF_ARG_1);
@@ -1134,15 +1226,15 @@ BIF_RETTYPE erts_debug_dist_ext_to_term_2(BIF_ALIST_2)
if (bitsize != 0)
goto badarg;
- ede.extp = binary_bytes(real_bin)+offset;
- ede.ext_endp = ede.extp + size;
+ ede.data->extp = binary_bytes(real_bin)+offset;
+ ede.data->ext_endp = ede.data->extp + size;
- hsz = erts_decode_dist_ext_size(&ede);
+ hsz = erts_decode_dist_ext_size(&ede, 1);
if (hsz < 0)
goto badarg;
erts_factory_proc_prealloc_init(&factory, BIF_P, hsz);
- res = erts_decode_dist_ext(&factory, &ede);
+ res = erts_decode_dist_ext(&factory, &ede, 1);
erts_factory_close(&factory);
if (is_value(res))