/* * %CopyrightBegin% * * Copyright Ericsson AB 2017-2018. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * %CopyrightEnd% */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "sys.h" #include "global.h" #define ERL_WANT_HIPE_BIF_WRAPPER__ #include "bif.h" #undef ERL_WANT_HIPE_BIF_WRAPPER__ #include "erl_bits.h" #include "erl_io_queue.h" #define IOL2V_SMALL_BIN_LIMIT (ERL_ONHEAP_BIN_LIMIT * 4) static void free_binary(ErtsIOQBinary *b, int driver); static ErtsIOQBinary *alloc_binary(Uint size, char *source, void **iov_base, int driver); void erts_ioq_init(ErtsIOQueue *q, ErtsAlcType_t alct, int driver) { ERTS_CT_ASSERT(offsetof(ErlNifIOVec,flags) == sizeof(ErtsIOVecCommon)); ERTS_CT_ASSERT(sizeof(ErlIOVec) == sizeof(ErtsIOVecCommon)); ERTS_CT_ASSERT(sizeof(size_t) == sizeof(ErlDrvSizeT)); ERTS_CT_ASSERT(sizeof(size_t) == sizeof(Uint)); q->alct = alct; q->driver = driver; q->size = 0; q->v_head = q->v_tail = q->v_start = q->v_small; q->v_end = q->v_small + ERTS_SMALL_IO_QUEUE; q->b_head = q->b_tail = q->b_start = q->b_small; q->b_end = q->b_small + ERTS_SMALL_IO_QUEUE; } void erts_ioq_clear(ErtsIOQueue *q) { ErtsIOQBinary** binp = q->b_head; int driver = q->driver; if (q->v_start != q->v_small) erts_free(q->alct, (void *) q->v_start); while(binp < q->b_tail) { if (*binp != NULL) free_binary(*binp, driver); binp++; } if (q->b_start != q->b_small) erts_free(q->alct, (void *) q->b_start); q->v_start = q->v_end = q->v_head = q->v_tail = NULL; q->b_start = q->b_end = q->b_head = q->b_tail = NULL; q->size = 0; } static void free_binary(ErtsIOQBinary *b, int driver) { if (driver) driver_free_binary(&b->driver); else if (erts_refc_dectest(&b->nif.intern.refc, 0) == 0) erts_bin_free(&b->nif); } static ErtsIOQBinary *alloc_binary(Uint size, char *source, void **iov_base, int driver) { if (driver) { ErlDrvBinary *bin = driver_alloc_binary(size); if (!bin) return NULL; sys_memcpy(bin->orig_bytes, source, size); *iov_base = bin->orig_bytes; return (ErtsIOQBinary *)bin; } else { /* This clause can be triggered in enif_ioq_enq_binary is used */ Binary *bin = erts_bin_nrml_alloc(size); if (!bin) return NULL; erts_refc_init(&bin->intern.refc, 1); sys_memcpy(bin->orig_bytes, source, size); *iov_base = bin->orig_bytes; return (ErtsIOQBinary *)bin; } } Uint erts_ioq_size(ErtsIOQueue *q) { return q->size; } /* expand queue to hold n elements in tail or head */ static int expandq(ErtsIOQueue* q, int n, int tail) /* tail: 0 if make room in head, make room in tail otherwise */ { int h_sz; /* room before header */ int t_sz; /* room after tail */ int q_sz; /* occupied */ int nvsz; SysIOVec* niov; ErtsIOQBinary** nbinv; h_sz = q->v_head - q->v_start; t_sz = q->v_end - q->v_tail; q_sz = q->v_tail - q->v_head; if (tail && (n <= t_sz)) /* do we need to expand tail? */ return 0; else if (!tail && (n <= h_sz)) /* do we need to expand head? */ return 0; else if (n > (h_sz + t_sz)) { /* need to allocate */ /* we may get little extra but it ok */ nvsz = (q->v_end - q->v_start) + n; niov = erts_alloc_fnf(q->alct, nvsz * sizeof(SysIOVec)); if (!niov) return -1; nbinv = erts_alloc_fnf(q->alct, nvsz * sizeof(ErtsIOQBinary**)); if (!nbinv) { erts_free(q->alct, (void *) niov); return -1; } if (tail) { sys_memcpy(niov, q->v_head, q_sz*sizeof(SysIOVec)); if (q->v_start != q->v_small) erts_free(q->alct, (void *) q->v_start); q->v_start = niov; q->v_end = niov + nvsz; q->v_head = q->v_start; q->v_tail = q->v_head + q_sz; sys_memcpy(nbinv, q->b_head, q_sz*sizeof(ErtsIOQBinary*)); if (q->b_start != q->b_small) erts_free(q->alct, (void *) q->b_start); q->b_start = nbinv; q->b_end = nbinv + nvsz; q->b_head = q->b_start; q->b_tail = q->b_head + q_sz; } else { sys_memcpy(niov+nvsz-q_sz, q->v_head, q_sz*sizeof(SysIOVec)); if (q->v_start != q->v_small) erts_free(q->alct, (void *) q->v_start); q->v_start = niov; q->v_end = niov + nvsz; q->v_tail = q->v_end; q->v_head = q->v_tail - q_sz; sys_memcpy(nbinv+nvsz-q_sz, q->b_head, q_sz*sizeof(ErtsIOQBinary*)); if (q->b_start != q->b_small) erts_free(q->alct, (void *) q->b_start); q->b_start = nbinv; q->b_end = nbinv + nvsz; q->b_tail = q->b_end; q->b_head = q->b_tail - q_sz; } } else if (tail) { /* move to beginning to make room in tail */ sys_memmove(q->v_start, q->v_head, q_sz*sizeof(SysIOVec)); q->v_head = q->v_start; q->v_tail = q->v_head + q_sz; sys_memmove(q->b_start, q->b_head, q_sz*sizeof(ErtsIOQBinary*)); q->b_head = q->b_start; q->b_tail = q->b_head + q_sz; } else { /* move to end to make room */ sys_memmove(q->v_end-q_sz, q->v_head, q_sz*sizeof(SysIOVec)); q->v_tail = q->v_end; q->v_head = q->v_tail-q_sz; sys_memmove(q->b_end-q_sz, q->b_head, q_sz*sizeof(ErtsIOQBinary*)); q->b_tail = q->b_end; q->b_head = q->b_tail-q_sz; } return 0; } static int skip(ErtsIOVec* vec, Uint skipbytes, SysIOVec **iovp, ErtsIOQBinary ***binvp, Uint *lenp) { int n; Uint len; SysIOVec* iov; ErtsIOQBinary** binv; if (vec->common.size <= skipbytes) return -1; iov = vec->common.iov; binv = vec->common.binv; n = vec->common.vsize; /* we use do here to strip iov_len=0 from beginning */ do { len = iov->iov_len; if (len <= skipbytes) { skipbytes -= len; iov++; binv++; n--; } else { iov->iov_base = ((char *)(iov->iov_base)) + skipbytes; iov->iov_len -= skipbytes; skipbytes = 0; } } while(skipbytes > 0); *binvp = binv; *iovp = iov; *lenp = len; return n; } /* Put elements from vec at q tail */ int erts_ioq_enqv(ErtsIOQueue *q, ErtsIOVec *eiov, Uint skipbytes) { int n; Uint len; Uint size = eiov->common.size - skipbytes; SysIOVec *iov; ErtsIOQBinary** binv; ErtsIOQBinary* b; if (q == NULL) return -1; ASSERT(eiov->common.size >= skipbytes); if (eiov->common.size <= skipbytes) return 0; n = skip(eiov, skipbytes, &iov, &binv, &len); if (n < 0) return n; if (q->v_tail + n >= q->v_end) if (expandq(q, n, 1)) return -1; /* Queue and reference all binaries (remove zero length items) */ while(n--) { if ((len = iov->iov_len) > 0) { if ((b = *binv) == NULL) { /* special case create binary ! */ b = alloc_binary(len, iov->iov_base, (void**)&q->v_tail->iov_base, q->driver); if (!b) return -1; *q->b_tail++ = b; q->v_tail->iov_len = len; q->v_tail++; } else { if (q->driver) driver_binary_inc_refc(&b->driver); else erts_refc_inc(&b->nif.intern.refc, 1); *q->b_tail++ = b; *q->v_tail++ = *iov; } } iov++; binv++; } q->size += size; /* update total size in queue */ return 0; } /* Put elements from vec at q head */ int erts_ioq_pushqv(ErtsIOQueue *q, ErtsIOVec* vec, Uint skipbytes) { int n; Uint len; Uint size = vec->common.size - skipbytes; SysIOVec* iov; ErtsIOQBinary** binv; ErtsIOQBinary* b; if (q == NULL) return -1; ASSERT(vec->common.size >= skipbytes); if (vec->common.size <= skipbytes) return 0; n = skip(vec, skipbytes, &iov, &binv, &len); if (n < 0) return n; if (q->v_head - n < q->v_start) if (expandq(q, n, 0)) return -1; /* Queue and reference all binaries (remove zero length items) */ iov += (n-1); /* move to end */ binv += (n-1); /* move to end */ while(n--) { if ((len = iov->iov_len) > 0) { if ((b = *binv) == NULL) { /* special case create binary ! */ if (q->driver) { ErlDrvBinary *bin = driver_alloc_binary(len); if (!bin) return -1; sys_memcpy(bin->orig_bytes, iov->iov_base, len); b = (ErtsIOQBinary *)bin; q->v_head->iov_base = bin->orig_bytes; } *--q->b_head = b; q->v_head--; q->v_head->iov_len = len; } else { if (q->driver) driver_binary_inc_refc(&b->driver); else erts_refc_inc(&b->nif.intern.refc, 1); *--q->b_head = b; *--q->v_head = *iov; } } iov--; binv--; } q->size += size; /* update total size in queue */ return 0; } /* ** Remove size bytes from queue head ** Return number of bytes that remain in queue */ int erts_ioq_deq(ErtsIOQueue *q, Uint size) { Uint len; if ((q == NULL) || (q->size < size)) return -1; q->size -= size; while (size > 0) { ASSERT(q->v_head != q->v_tail); len = q->v_head->iov_len; if (len <= size) { size -= len; free_binary(*q->b_head, q->driver); *q->b_head++ = NULL; q->v_head++; } else { q->v_head->iov_base = ((char *)(q->v_head->iov_base)) + size; q->v_head->iov_len -= size; size = 0; } } /* restart pointers (optimised for enq) */ if (q->v_head == q->v_tail) { q->v_head = q->v_tail = q->v_start; q->b_head = q->b_tail = q->b_start; } return 0; } Uint erts_ioq_peekqv(ErtsIOQueue *q, ErtsIOVec *ev) { ASSERT(ev); if (! q) { return (Uint) -1; } else { if ((ev->common.vsize = q->v_tail - q->v_head) == 0) { ev->common.size = 0; ev->common.iov = NULL; ev->common.binv = NULL; } else { ev->common.size = q->size; ev->common.iov = q->v_head; ev->common.binv = q->b_head; } return q->size; } } SysIOVec* erts_ioq_peekq(ErtsIOQueue *q, int* vlenp) /* length of io-vector */ { if (q == NULL) { *vlenp = -1; return NULL; } if ((*vlenp = (q->v_tail - q->v_head)) == 0) return NULL; return q->v_head; } /* Fills a possibly deep list of chars and binaries into vec ** Small characters are first stored in the buffer buf of length ln ** binaries found are copied and linked into msoh ** Return vector length on succsess, ** -1 on overflow ** -2 on type error */ static ERTS_INLINE void io_list_to_vec_set_vec(SysIOVec **iov, ErtsIOQBinary ***binv, ErtsIOQBinary *bin, byte *ptr, Uint len, int *vlen) { while (len > MAX_SYSIOVEC_IOVLEN) { (*iov)->iov_base = ptr; (*iov)->iov_len = MAX_SYSIOVEC_IOVLEN; ptr += MAX_SYSIOVEC_IOVLEN; len -= MAX_SYSIOVEC_IOVLEN; (*iov)++; (*vlen)++; *(*binv)++ = bin; } (*iov)->iov_base = ptr; (*iov)->iov_len = len; *(*binv)++ = bin; (*iov)++; (*vlen)++; } int erts_ioq_iolist_to_vec(Eterm obj, /* io-list */ SysIOVec* iov, /* io vector */ ErtsIOQBinary** binv, /* binary reference vector */ ErtsIOQBinary* cbin, /* binary to store characters */ Uint bin_limit, /* small binaries limit */ int driver) { DECLARE_ESTACK(s); Eterm* objp; byte *buf = NULL; Uint len = 0; Uint csize = 0; int vlen = 0; byte* cptr; if (cbin) { if (driver) { buf = (byte*)cbin->driver.orig_bytes; len = cbin->driver.orig_size; } else { buf = (byte*)cbin->nif.orig_bytes; len = cbin->nif.orig_size; } } cptr = buf; goto L_jump_start; /* avoid push */ while (!ESTACK_ISEMPTY(s)) { obj = ESTACK_POP(s); L_jump_start: if (is_list(obj)) { L_iter_list: objp = list_val(obj); obj = CAR(objp); if (is_byte(obj)) { if (len == 0) goto L_overflow; *buf++ = unsigned_val(obj); csize++; len--; } else if (is_binary(obj)) { ESTACK_PUSH(s, CDR(objp)); goto handle_binary; } else if (is_list(obj)) { ESTACK_PUSH(s, CDR(objp)); goto L_iter_list; /* on head */ } else if (!is_nil(obj)) { goto L_type_error; } obj = CDR(objp); if (is_list(obj)) goto L_iter_list; /* on tail */ else if (is_binary(obj)) { goto handle_binary; } else if (!is_nil(obj)) { goto L_type_error; } } else if (is_binary(obj)) { Eterm real_bin; Uint offset; Eterm* bptr; Uint size; int bitoffs; int bitsize; handle_binary: size = binary_size(obj); ERTS_GET_REAL_BIN(obj, real_bin, offset, bitoffs, bitsize); ASSERT(bitsize == 0); bptr = binary_val(real_bin); if (*bptr == HEADER_PROC_BIN) { ProcBin* pb = (ProcBin *) bptr; if (bitoffs != 0) { if (len < size) { goto L_overflow; } erts_copy_bits(pb->bytes+offset, bitoffs, 1, (byte *) buf, 0, 1, size*8); csize += size; buf += size; len -= size; } else if (bin_limit && size < bin_limit) { if (len < size) { goto L_overflow; } sys_memcpy(buf, pb->bytes+offset, size); csize += size; buf += size; len -= size; } else { ErtsIOQBinary *qbin; if (csize != 0) { io_list_to_vec_set_vec(&iov, &binv, cbin, cptr, csize, &vlen); cptr = buf; csize = 0; } if (pb->flags) { erts_emasculate_writable_binary(pb); } if (driver) qbin = (ErtsIOQBinary*)Binary2ErlDrvBinary(pb->val); else qbin = (ErtsIOQBinary*)pb->val; io_list_to_vec_set_vec( &iov, &binv, qbin, pb->bytes+offset, size, &vlen); } } else { ErlHeapBin* hb = (ErlHeapBin *) bptr; if (len < size) { goto L_overflow; } copy_binary_to_buffer(buf, 0, ((byte *) hb->data)+offset, bitoffs, 8*size); csize += size; buf += size; len -= size; } } else if (!is_nil(obj)) { goto L_type_error; } } if (csize != 0) { io_list_to_vec_set_vec(&iov, &binv, cbin, cptr, csize, &vlen); } DESTROY_ESTACK(s); return vlen; L_type_error: DESTROY_ESTACK(s); return -2; L_overflow: DESTROY_ESTACK(s); return -1; } static ERTS_INLINE int io_list_vec_count(Eterm obj, Uint *v_size, Uint *c_size, Uint *b_size, Uint *in_clist, Uint *p_v_size, Uint *p_c_size, Uint *p_in_clist, Uint blimit) { Uint size = binary_size(obj); Eterm real; ERTS_DECLARE_DUMMY(Uint offset); int bitoffs; int bitsize; ERTS_GET_REAL_BIN(obj, real, offset, bitoffs, bitsize); if (bitsize != 0) return 1; if (thing_subtag(*binary_val(real)) == REFC_BINARY_SUBTAG && bitoffs == 0) { *b_size += size; if (*b_size < size) return 2; *in_clist = 0; ++*v_size; /* If iov_len is smaller then Uint we split the binary into*/ /* multiple smaller (2GB) elements in the iolist.*/ *v_size += size / MAX_SYSIOVEC_IOVLEN; if (size >= blimit) { *p_in_clist = 0; ++*p_v_size; } else { *p_c_size += size; if (!*p_in_clist) { *p_in_clist = 1; ++*p_v_size; } } } else { *c_size += size; if (*c_size < size) return 2; if (!*in_clist) { *in_clist = 1; ++*v_size; } *p_c_size += size; if (!*p_in_clist) { *p_in_clist = 1; ++*p_v_size; } } return 0; } #define IO_LIST_VEC_COUNT(obj) \ do { \ switch (io_list_vec_count(obj, &v_size, &c_size, \ &b_size, &in_clist, \ &p_v_size, &p_c_size, &p_in_clist, \ blimit)) { \ case 1: goto L_type_error; \ case 2: goto L_overflow_error; \ default: break; \ } \ } while(0) /* * Returns 0 if successful and a non-zero value otherwise. * * Return values through pointers: * *vsize - SysIOVec size needed for a writev * *csize - Number of bytes not in binary (in the common binary) * *pvsize - SysIOVec size needed if packing small binaries * *pcsize - Number of bytes in the common binary if packing * *total_size - Total size of iolist in bytes */ int erts_ioq_iolist_vec_len(Eterm obj, int* vsize, Uint* csize, Uint* pvsize, Uint* pcsize, size_t* total_size, Uint blimit) { DECLARE_ESTACK(s); Eterm* objp; Uint v_size = 0; Uint c_size = 0; Uint b_size = 0; Uint in_clist = 0; Uint p_v_size = 0; Uint p_c_size = 0; Uint p_in_clist = 0; size_t total; goto L_jump_start; /* avoid a push */ while (!ESTACK_ISEMPTY(s)) { obj = ESTACK_POP(s); L_jump_start: if (is_list(obj)) { L_iter_list: objp = list_val(obj); obj = CAR(objp); if (is_byte(obj)) { c_size++; if (c_size == 0) { goto L_overflow_error; } if (!in_clist) { in_clist = 1; v_size++; } p_c_size++; if (!p_in_clist) { p_in_clist = 1; p_v_size++; } } else if (is_binary(obj)) { IO_LIST_VEC_COUNT(obj); } else if (is_list(obj)) { ESTACK_PUSH(s, CDR(objp)); goto L_iter_list; /* on head */ } else if (!is_nil(obj)) { goto L_type_error; } obj = CDR(objp); if (is_list(obj)) goto L_iter_list; /* on tail */ else if (is_binary(obj)) { /* binary tail is OK */ IO_LIST_VEC_COUNT(obj); } else if (!is_nil(obj)) { goto L_type_error; } } else if (is_binary(obj)) { IO_LIST_VEC_COUNT(obj); } else if (!is_nil(obj)) { goto L_type_error; } } total = c_size + b_size; if (total < c_size) { goto L_overflow_error; } *total_size = total; DESTROY_ESTACK(s); *vsize = v_size; *csize = c_size; *pvsize = p_v_size; *pcsize = p_c_size; return 0; L_type_error: L_overflow_error: DESTROY_ESTACK(s); return 1; } typedef struct { Eterm result_head; Eterm result_tail; Eterm input_list; UWord acc_size; Binary *acc; /* We yield after copying this many bytes into the accumulator (Minus * eating a few on consing etc). Large binaries will only count to the * extent their split (if any) resulted in a copy op. */ UWord bytereds_available; UWord bytereds_spent; Process *process; ErtsEStack estack; Eterm magic_reference; } iol2v_state_t; static int iol2v_state_destructor(Binary *data) { iol2v_state_t *state = ERTS_MAGIC_BIN_UNALIGNED_DATA(data); DESTROY_SAVED_ESTACK(&state->estack); if (state->acc != NULL) { erts_bin_free(state->acc); } return 1; } static void iol2v_init(iol2v_state_t *state, Process *process, Eterm input) { state->process = process; state->result_head = NIL; state->result_tail = NIL; state->input_list = input; state->magic_reference = NIL; state->acc_size = 0; state->acc = NULL; CLEAR_SAVED_ESTACK(&state->estack); } static Eterm iol2v_make_sub_bin(iol2v_state_t *state, Eterm bin_term, UWord offset, UWord size) { Uint byte_offset, bit_offset, bit_size; ErlSubBin *sb; Eterm orig_pb_term; sb = (ErlSubBin*)HAlloc(state->process, ERL_SUB_BIN_SIZE); ERTS_GET_REAL_BIN(bin_term, orig_pb_term, byte_offset, bit_offset, bit_size); ASSERT(bit_size == 0); sb->thing_word = HEADER_SUB_BIN; sb->bitoffs = bit_offset; sb->bitsize = 0; sb->orig = orig_pb_term; sb->is_writable = 0; sb->offs = byte_offset + offset; sb->size = size; return make_binary(sb); } static Eterm iol2v_promote_acc(iol2v_state_t *state) { Eterm bin; bin = erts_build_proc_bin(&MSO(state->process), HAlloc(state->process, PROC_BIN_SIZE), erts_bin_realloc(state->acc, state->acc_size)); state->acc_size = 0; state->acc = NULL; return bin; } /* Destructively enqueues a term to the result list, saving us the hassle of * having to reverse it later. This is safe since GC is disabled and we never * leak the unfinished term to the outside. */ static void iol2v_enqueue_result(iol2v_state_t *state, Eterm term) { Eterm prev_tail; Eterm *hp; prev_tail = state->result_tail; hp = HAlloc(state->process, 2); state->result_tail = CONS(hp, term, NIL); if(prev_tail != NIL) { Eterm *prev_cell = list_val(prev_tail); CDR(prev_cell) = state->result_tail; } else { state->result_head = state->result_tail; } state->bytereds_spent += 1; } #ifndef DEBUG #define ACC_REALLOCATION_LIMIT (IOL2V_SMALL_BIN_LIMIT * 32) #else #define ACC_REALLOCATION_LIMIT (IOL2V_SMALL_BIN_LIMIT * 4) #endif static void iol2v_expand_acc(iol2v_state_t *state, UWord extra) { UWord required_bytes, acc_alloc_size; ERTS_CT_ASSERT(ERTS_UWORD_MAX > ACC_REALLOCATION_LIMIT / 2); ASSERT(extra >= 1); acc_alloc_size = state->acc != NULL ? (state->acc)->orig_size : 0; required_bytes = state->acc_size + extra; if (state->acc == NULL) { UWord new_size = MAX(required_bytes, IOL2V_SMALL_BIN_LIMIT); state->acc = erts_bin_nrml_alloc(new_size); } else if (required_bytes > acc_alloc_size) { Binary *prev_acc; UWord new_size; if (acc_alloc_size >= ACC_REALLOCATION_LIMIT) { /* We skip reallocating once we hit a certain point; it often * results in extra copying and we're very likely to overallocate * on anything other than absurdly long byte/heapbin sequences. */ iol2v_enqueue_result(state, iol2v_promote_acc(state)); iol2v_expand_acc(state, extra); return; } new_size = MAX(required_bytes, acc_alloc_size * 2); prev_acc = state->acc; state->acc = erts_bin_realloc(prev_acc, new_size); if (prev_acc != state->acc) { state->bytereds_spent += state->acc_size; } } state->bytereds_spent += extra; } static int iol2v_append_byte_seq(iol2v_state_t *state, Eterm seq_start, Eterm *seq_end) { Eterm lookahead, iterator; Uint observed_bits; SWord seq_length; char *acc_data; lookahead = seq_start; seq_length = 0; ASSERT(state->bytereds_available > state->bytereds_spent); while (is_list(lookahead)) { Eterm *cell = list_val(lookahead); if (!is_small(CAR(cell))) { break; } if (seq_length * 2 >= (state->bytereds_available - state->bytereds_spent)) { break; } lookahead = CDR(cell); seq_length += 1; } ASSERT(seq_length >= 1); iol2v_expand_acc(state, seq_length); /* Bump a few extra reductions to account for list traversal. */ state->bytereds_spent += seq_length; acc_data = &(state->acc)->orig_bytes[state->acc_size]; state->acc_size += seq_length; iterator = seq_start; observed_bits = 0; while (iterator != lookahead) { Eterm *cell; Uint byte; cell = list_val(iterator); iterator = CDR(cell); byte = unsigned_val(CAR(cell)); observed_bits |= byte; ASSERT(acc_data < &(state->acc)->orig_bytes[state->acc_size]); *(acc_data++) = byte; } if (observed_bits > UCHAR_MAX) { return 0; } ASSERT(acc_data == &(state->acc)->orig_bytes[state->acc_size]); *seq_end = iterator; return 1; } static int iol2v_append_binary(iol2v_state_t *state, Eterm bin_term) { int is_acc_small, is_bin_small; UWord combined_size; UWord binary_size; Uint byte_offset, bit_offset, bit_size; byte *binary_data; Eterm *parent_header; Eterm parent_binary; ASSERT(state->bytereds_available > state->bytereds_spent); ERTS_GET_REAL_BIN(bin_term, parent_binary, byte_offset, bit_offset, bit_size); parent_header = binary_val(parent_binary); binary_size = binary_size(bin_term); if (bit_size != 0) { return 0; } else if (binary_size == 0) { state->bytereds_spent += 1; return 1; } is_acc_small = state->acc_size < IOL2V_SMALL_BIN_LIMIT; is_bin_small = binary_size < IOL2V_SMALL_BIN_LIMIT; combined_size = binary_size + state->acc_size; if (thing_subtag(*parent_header) == REFC_BINARY_SUBTAG) { ProcBin *pb = (ProcBin*)parent_header; if (pb->flags) { erts_emasculate_writable_binary(pb); } binary_data = &((byte*)pb->bytes)[byte_offset]; } else { ErlHeapBin *hb = (ErlHeapBin*)parent_header; ASSERT(thing_subtag(*parent_header) == HEAP_BINARY_SUBTAG); ASSERT(is_bin_small); binary_data = &((byte*)&hb->data)[byte_offset]; } if (!is_bin_small && (state->acc_size == 0 || !is_acc_small)) { /* Avoid combining if we encounter an acceptably large binary while the * accumulator is either empty or large enough to be returned on its * own. */ if (state->acc_size != 0) { iol2v_enqueue_result(state, iol2v_promote_acc(state)); } iol2v_enqueue_result(state, bin_term); } else if (is_bin_small || combined_size < (IOL2V_SMALL_BIN_LIMIT * 2)) { /* If the candidate is small or we can't split the combination in two, * then just copy it into the accumulator. */ iol2v_expand_acc(state, binary_size); if (ERTS_LIKELY(bit_offset == 0)) { sys_memcpy(&(state->acc)->orig_bytes[state->acc_size], binary_data, binary_size); } else { ASSERT(binary_size <= ERTS_UWORD_MAX / 8); erts_copy_bits(binary_data, bit_offset, 1, (byte*)&(state->acc)->orig_bytes[state->acc_size], 0, 1, binary_size * 8); } state->acc_size += binary_size; } else { /* Otherwise, append enough data for the accumulator to be valid, and * then return the rest as a sub-binary. */ UWord spill = IOL2V_SMALL_BIN_LIMIT - state->acc_size; Eterm binary_tail; iol2v_expand_acc(state, spill); if (ERTS_LIKELY(bit_offset == 0)) { sys_memcpy(&(state->acc)->orig_bytes[state->acc_size], binary_data, spill); } else { ASSERT(binary_size <= ERTS_UWORD_MAX / 8); erts_copy_bits(binary_data, bit_offset, 1, (byte*)&(state->acc)->orig_bytes[state->acc_size], 0, 1, spill * 8); } state->acc_size += spill; binary_tail = iol2v_make_sub_bin(state, bin_term, spill, binary_size - spill); iol2v_enqueue_result(state, iol2v_promote_acc(state)); iol2v_enqueue_result(state, binary_tail); } return 1; } static BIF_RETTYPE iol2v_yield(iol2v_state_t *state) { if (is_nil(state->magic_reference)) { iol2v_state_t *boxed_state; Binary *magic_binary; Eterm *hp; magic_binary = erts_create_magic_binary_x(sizeof(*state), &iol2v_state_destructor, ERTS_ALC_T_BINARY, 1); boxed_state = ERTS_MAGIC_BIN_UNALIGNED_DATA(magic_binary); sys_memcpy(boxed_state, state, sizeof(*state)); hp = HAlloc(boxed_state->process, ERTS_MAGIC_REF_THING_SIZE); boxed_state->magic_reference = erts_mk_magic_ref(&hp, &MSO(boxed_state->process), magic_binary); state = boxed_state; } ERTS_BIF_YIELD1(bif_export[BIF_iolist_to_iovec_1], state->process, state->magic_reference); } static BIF_RETTYPE iol2v_continue(iol2v_state_t *state) { Eterm iterator; DECLARE_ESTACK(s); ESTACK_CHANGE_ALLOCATOR(s, ERTS_ALC_T_SAVED_ESTACK); state->bytereds_available = ERTS_BIF_REDS_LEFT(state->process) * IOL2V_SMALL_BIN_LIMIT; state->bytereds_spent = 0; if (state->estack.start) { ESTACK_RESTORE(s, &state->estack); } iterator = state->input_list; for(;;) { if (state->bytereds_spent >= state->bytereds_available) { ESTACK_SAVE(s, &state->estack); state->input_list = iterator; return iol2v_yield(state); } while (is_list(iterator)) { Eterm *cell; Eterm head; cell = list_val(iterator); head = CAR(cell); if (is_binary(head)) { if (!iol2v_append_binary(state, head)) { goto l_badarg; } iterator = CDR(cell); } else if (is_small(head)) { Eterm seq_end; if (!iol2v_append_byte_seq(state, iterator, &seq_end)) { goto l_badarg; } iterator = seq_end; } else if (is_list(head) || is_nil(head)) { Eterm tail = CDR(cell); if (!is_nil(tail)) { ESTACK_PUSH(s, tail); } state->bytereds_spent += 1; iterator = head; } else { goto l_badarg; } if (state->bytereds_spent >= state->bytereds_available) { ESTACK_SAVE(s, &state->estack); state->input_list = iterator; return iol2v_yield(state); } } if (is_binary(iterator)) { if (!iol2v_append_binary(state, iterator)) { goto l_badarg; } } else if (!is_nil(iterator)) { goto l_badarg; } if(ESTACK_ISEMPTY(s)) { break; } iterator = ESTACK_POP(s); } if (state->acc_size != 0) { iol2v_enqueue_result(state, iol2v_promote_acc(state)); } BUMP_REDS(state->process, state->bytereds_spent / IOL2V_SMALL_BIN_LIMIT); CLEAR_SAVED_ESTACK(&state->estack); DESTROY_ESTACK(s); BIF_RET(state->result_head); l_badarg: CLEAR_SAVED_ESTACK(&state->estack); DESTROY_ESTACK(s); if (state->acc != NULL) { erts_bin_free(state->acc); state->acc = NULL; } BIF_ERROR(state->process, BADARG); } HIPE_WRAPPER_BIF_DISABLE_GC(iolist_to_iovec, 1) BIF_RETTYPE iolist_to_iovec_1(BIF_ALIST_1) { BIF_RETTYPE result; if (is_nil(BIF_ARG_1)) { BIF_RET(NIL); } else if (is_binary(BIF_ARG_1)) { if (binary_bitsize(BIF_ARG_1) != 0) { ASSERT(!(BIF_P->flags & F_DISABLE_GC)); BIF_ERROR(BIF_P, BADARG); } else if (binary_size(BIF_ARG_1) != 0) { Eterm *hp = HAlloc(BIF_P, 2); BIF_RET(CONS(hp, BIF_ARG_1, NIL)); } else { BIF_RET(NIL); } } else if (is_internal_magic_ref(BIF_ARG_1)) { iol2v_state_t *state; Binary *magic; magic = erts_magic_ref2bin(BIF_ARG_1); if (ERTS_MAGIC_BIN_DESTRUCTOR(magic) != &iol2v_state_destructor) { ASSERT(!(BIF_P->flags & F_DISABLE_GC)); BIF_ERROR(BIF_P, BADARG); } ASSERT(BIF_P->flags & F_DISABLE_GC); state = ERTS_MAGIC_BIN_UNALIGNED_DATA(magic); result = iol2v_continue(state); } else if (!is_list(BIF_ARG_1)) { ASSERT(!(BIF_P->flags & F_DISABLE_GC)); BIF_ERROR(BIF_P, BADARG); } else { iol2v_state_t state; iol2v_init(&state, BIF_P, BIF_ARG_1); erts_set_gc_state(BIF_P, 0); result = iol2v_continue(&state); } if (result != THE_NON_VALUE || BIF_P->freason != TRAP) { erts_set_gc_state(BIF_P, 1); } BIF_RET(result); }