From 6907100826bf1956a8dbeb16ba2e736199913d0a Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Mon, 25 Mar 2019 13:36:01 +0100 Subject: erts: Include dist header in return from dist_ctrl_get_data --- erts/emulator/beam/dist.c | 68 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 49 insertions(+), 19 deletions(-) diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index faed10ff91..8bbe6450eb 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3361,14 +3361,13 @@ dist_ctrl_get_data_1(BIF_ALIST_1) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P); - Sint reds = initial_reds; + Sint reds = initial_reds, obufsize = 0; ErtsDistOutputBuf *obuf; - Eterm *hp; + Eterm *hp, res; ProcBin *pb; erts_aint_t qsize; Uint32 conn_id, get_size; - Eterm res; - Uint hsz, bin_sz; + Uint hsz = 0, bin_sz; if (!dep) BIF_ERROR(BIF_P, EXC_NOTSUP); @@ -3420,7 +3419,9 @@ dist_ctrl_get_data_1(BIF_ALIST_1) } obuf = dep->tmp_out_queue.first; + obufsize += size_obuf(obuf); reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->flags, reds); + obufsize -= size_obuf(obuf); if (reds < 0) { erts_de_runlock(dep); ERTS_BIF_YIELD1(bif_export[BIF_dist_ctrl_get_data_1], @@ -3436,8 +3437,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_de_runlock(dep); - bin_sz = obuf->ext_endp - obuf->extp; - hsz = PROC_BIN_SIZE; + bin_sz = obuf->ext_endp - obuf->extp + obuf->hdr_endp - obuf->hdrp; get_size = dep->opts & ERTS_DIST_CTRL_OPT_GET_SIZE; if (get_size) { @@ -3446,18 +3446,50 @@ dist_ctrl_get_data_1(BIF_ALIST_1) hsz += BIG_UINT_HEAP_SIZE; } - hp = HAlloc(BIF_P, hsz); - pb = (ProcBin *) (char *) hp; - pb->thing_word = HEADER_PROC_BIN; - pb->size = bin_sz; - pb->next = MSO(BIF_P).first; - MSO(BIF_P).first = (struct erl_off_heap_header*) pb; - pb->val = ErtsDistOutputBuf2Binary(obuf); - pb->bytes = (byte*) obuf->extp; - pb->flags = 0; - hp += PROC_BIN_SIZE; + if (!obuf->hdrp) { + hp = HAlloc(BIF_P, PROC_BIN_SIZE + hsz); + pb = (ProcBin *) (char *) hp; + pb->thing_word = HEADER_PROC_BIN; + pb->size = obuf->ext_endp - obuf->extp; + pb->next = MSO(BIF_P).first; + MSO(BIF_P).first = (struct erl_off_heap_header*) pb; + pb->val = ErtsDistOutputBuf2Binary(obuf); + pb->bytes = (byte*) obuf->extp; + pb->flags = 0; + res = make_binary(pb); + } else { + hp = HAlloc(BIF_P, PROC_BIN_SIZE * 2 + 4 + hsz); + pb = (ProcBin *) (char *) hp; + pb->thing_word = HEADER_PROC_BIN; + pb->size = obuf->ext_endp - obuf->extp; + pb->next = MSO(BIF_P).first; + MSO(BIF_P).first = (struct erl_off_heap_header*) pb; + pb->val = ErtsDistOutputBuf2Binary(obuf); + pb->bytes = (byte*) obuf->extp; + pb->flags = 0; + hp += PROC_BIN_SIZE; + + res = CONS(hp, make_binary(pb), NIL); + hp += 2; + + pb = (ProcBin *) (char *) hp; + pb->thing_word = HEADER_PROC_BIN; + pb->size = obuf->hdr_endp - obuf->hdrp; + pb->next = MSO(BIF_P).first; + MSO(BIF_P).first = (struct erl_off_heap_header*) pb; + pb->val = ErtsDistOutputBuf2Binary(obuf); + erts_refc_inc(&pb->val->intern.refc, 1); + pb->bytes = (byte*) obuf->hdrp; + pb->flags = 0; + hp += PROC_BIN_SIZE; + res = CONS(hp, make_binary(pb), res); + hp += 2; + } + + obufsize += size_obuf(obuf); + + qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) -obufsize); - qsize = erts_atomic_add_read_nob(&dep->qsize, -size_obuf(obuf)); ASSERT(qsize >= 0); if (qsize < erts_dist_buf_busy_limit/2 @@ -3472,8 +3504,6 @@ dist_ctrl_get_data_1(BIF_ALIST_1) } } - res = make_binary(pb); - if (get_size) { Eterm sz_term; if (IS_USMALL(0, bin_sz)) -- cgit v1.2.3