/*
 * %CopyrightBegin%
 * 
 * Copyright Ericsson AB 2017. All Rights Reserved.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * 
 * %CopyrightEnd%
 */

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "sys.h"
#include "global.h"

#define ERL_WANT_HIPE_BIF_WRAPPER__
#include "bif.h"
#undef ERL_WANT_HIPE_BIF_WRAPPER__

#include "erl_bits.h"
#include "erl_io_queue.h"

#define IOL2V_SMALL_BIN_LIMIT (ERL_ONHEAP_BIN_LIMIT * 4)

static void free_binary(ErtsIOQBinary *b, int driver);
static ErtsIOQBinary *alloc_binary(Uint size, char *source, void **iov_base, int driver);

void erts_ioq_init(ErtsIOQueue *q, ErtsAlcType_t alct, int driver)
{

    ERTS_CT_ASSERT(offsetof(ErlNifIOVec,flags) == sizeof(ErtsIOVecCommon));
    ERTS_CT_ASSERT(sizeof(ErlIOVec) == sizeof(ErtsIOVecCommon));
    ERTS_CT_ASSERT(sizeof(size_t) == sizeof(ErlDrvSizeT));
    ERTS_CT_ASSERT(sizeof(size_t) == sizeof(Uint));

    q->alct = alct;
    q->driver = driver;
    q->size = 0;
    q->v_head = q->v_tail = q->v_start = q->v_small;
    q->v_end = q->v_small + ERTS_SMALL_IO_QUEUE;
    q->b_head = q->b_tail = q->b_start = q->b_small;
    q->b_end = q->b_small + ERTS_SMALL_IO_QUEUE;
}

void erts_ioq_clear(ErtsIOQueue *q)
{
    ErtsIOQBinary** binp = q->b_head;
    int driver = q->driver;

    if (q->v_start != q->v_small)
	erts_free(q->alct, (void *) q->v_start);

    while(binp < q->b_tail) {
	if (*binp != NULL)
            free_binary(*binp, driver);
	binp++;
    }
    if (q->b_start != q->b_small)
	erts_free(q->alct, (void *) q->b_start);
    q->v_start = q->v_end = q->v_head = q->v_tail = NULL;
    q->b_start = q->b_end = q->b_head = q->b_tail = NULL;
    q->size = 0;
}

static void free_binary(ErtsIOQBinary *b, int driver)
{
    if (driver)
        driver_free_binary(&b->driver);
    else if (erts_refc_dectest(&b->nif.intern.refc, 0) == 0)
        erts_bin_free(&b->nif);
}

static ErtsIOQBinary *alloc_binary(Uint size, char *source, void **iov_base, int driver)
{
    if (driver) {
        ErlDrvBinary *bin = driver_alloc_binary(size);
        if (!bin) return NULL;
        sys_memcpy(bin->orig_bytes, source, size);
        *iov_base = bin->orig_bytes;
        return (ErtsIOQBinary *)bin;
    } else {
        /* This clause can be triggered in enif_ioq_enq_binary is used */
        Binary *bin = erts_bin_nrml_alloc(size);
        if (!bin) return NULL;
        erts_refc_init(&bin->intern.refc, 1);
        sys_memcpy(bin->orig_bytes, source, size);
        *iov_base = bin->orig_bytes;
        return (ErtsIOQBinary *)bin;
    }
}

Uint erts_ioq_size(ErtsIOQueue *q)
{
    return q->size;
}

/* expand queue to hold n elements in tail or head */
static int expandq(ErtsIOQueue* q, int n, int tail)
/* tail: 0 if make room in head, make room in tail otherwise */
{
    int h_sz;  /* room before header */
    int t_sz;  /* room after tail */
    int q_sz;  /* occupied */
    int nvsz;
    SysIOVec* niov;
    ErtsIOQBinary** nbinv;

    h_sz = q->v_head - q->v_start;
    t_sz = q->v_end -  q->v_tail;
    q_sz = q->v_tail - q->v_head;

    if (tail && (n <= t_sz)) /* do we need to expand tail? */
	return 0;
    else if (!tail && (n <= h_sz))  /* do we need to expand head? */
	return 0;
    else if (n > (h_sz + t_sz)) { /* need to allocate */
	/* we may get little extra but it ok */
	nvsz = (q->v_end - q->v_start) + n;

	niov = erts_alloc_fnf(q->alct, nvsz * sizeof(SysIOVec));
	if (!niov)
	    return -1;
	nbinv = erts_alloc_fnf(q->alct, nvsz * sizeof(ErtsIOQBinary**));
	if (!nbinv) {
	    erts_free(q->alct, (void *) niov);
	    return -1;
	}
	if (tail) {
	    sys_memcpy(niov, q->v_head, q_sz*sizeof(SysIOVec));
	    if (q->v_start != q->v_small)
		erts_free(q->alct, (void *) q->v_start);
	    q->v_start = niov;
	    q->v_end = niov + nvsz;
	    q->v_head = q->v_start;
	    q->v_tail = q->v_head + q_sz;

	    sys_memcpy(nbinv, q->b_head, q_sz*sizeof(ErtsIOQBinary*));
	    if (q->b_start != q->b_small)
		erts_free(q->alct, (void *) q->b_start);
	    q->b_start = nbinv;
	    q->b_end = nbinv + nvsz;
	    q->b_head = q->b_start;
	    q->b_tail = q->b_head + q_sz;
	}
	else {
	    sys_memcpy(niov+nvsz-q_sz, q->v_head, q_sz*sizeof(SysIOVec));
	    if (q->v_start != q->v_small)
		erts_free(q->alct, (void *) q->v_start);
	    q->v_start = niov;
	    q->v_end = niov + nvsz;
	    q->v_tail = q->v_end;
	    q->v_head = q->v_tail - q_sz;

	    sys_memcpy(nbinv+nvsz-q_sz, q->b_head, q_sz*sizeof(ErtsIOQBinary*));
	    if (q->b_start != q->b_small)
		erts_free(q->alct, (void *) q->b_start);
	    q->b_start = nbinv;
	    q->b_end = nbinv + nvsz;
	    q->b_tail = q->b_end;
	    q->b_head = q->b_tail - q_sz;
	}
    }
    else if (tail) {  /* move to beginning to make room in tail */
	sys_memmove(q->v_start, q->v_head, q_sz*sizeof(SysIOVec));
	q->v_head = q->v_start;
	q->v_tail = q->v_head + q_sz;
	sys_memmove(q->b_start, q->b_head, q_sz*sizeof(ErtsIOQBinary*));
	q->b_head = q->b_start;
	q->b_tail = q->b_head + q_sz;
    }
    else {   /* move to end to make room */
	sys_memmove(q->v_end-q_sz, q->v_head, q_sz*sizeof(SysIOVec));
	q->v_tail = q->v_end;
	q->v_head = q->v_tail-q_sz;
	sys_memmove(q->b_end-q_sz, q->b_head, q_sz*sizeof(ErtsIOQBinary*));
	q->b_tail = q->b_end;
	q->b_head = q->b_tail-q_sz;
    }

    return 0;
}

static
int skip(ErtsIOVec* vec, Uint skipbytes,
         SysIOVec **iovp, ErtsIOQBinary ***binvp,
         Uint *lenp)
{
    int n;
    Uint len;
    SysIOVec* iov;
    ErtsIOQBinary** binv;

    if (vec->common.size <= skipbytes)
	return -1;

    iov = vec->common.iov;
    binv = vec->common.binv;
    n = vec->common.vsize;
    /* we use do here to strip iov_len=0 from beginning */
    do {
	len = iov->iov_len;
	if (len <= skipbytes) {
	    skipbytes -= len;
	    iov++;
	    binv++;
	    n--;
	}
	else {
	    iov->iov_base = ((char *)(iov->iov_base)) + skipbytes;
	    iov->iov_len -= skipbytes;
	    skipbytes = 0;
	}
    } while(skipbytes > 0);

    *binvp = binv;
    *iovp = iov;
    *lenp = len;

    return n;
}

/* Put elements from vec at q tail */
int erts_ioq_enqv(ErtsIOQueue *q, ErtsIOVec *eiov, Uint skipbytes)
{
    int n;
    Uint len;
    Uint size = eiov->common.size - skipbytes;
    SysIOVec *iov;
    ErtsIOQBinary** binv;
    ErtsIOQBinary*  b;

    if (q == NULL)
	return -1;

    ASSERT(eiov->common.size >= skipbytes);
    if (eiov->common.size <= skipbytes)
	return 0;

    n = skip(eiov, skipbytes, &iov, &binv, &len);

    if (n < 0)
        return n;

    if (q->v_tail + n >= q->v_end)
	if (expandq(q, n, 1))
            return -1;

    /* Queue and reference all binaries (remove zero length items) */
    while(n--) {
	if ((len = iov->iov_len) > 0) {
	    if ((b = *binv) == NULL) { /* special case create binary ! */
		b = alloc_binary(len, iov->iov_base, (void**)&q->v_tail->iov_base,
                                 q->driver);
                if (!b) return -1;
		*q->b_tail++ = b;
		q->v_tail->iov_len = len;
		q->v_tail++;
	    }
	    else {
                if (q->driver)
                    driver_binary_inc_refc(&b->driver);
                else
                    erts_refc_inc(&b->nif.intern.refc, 1);
		*q->b_tail++ = b;
		*q->v_tail++ = *iov;
	    }
	}
	iov++;
	binv++;
    }
    q->size += size;      /* update total size in queue */
    return 0;
}

/* Put elements from vec at q head */
int erts_ioq_pushqv(ErtsIOQueue *q, ErtsIOVec* vec, Uint skipbytes)
{
    int n;
    Uint len;
    Uint size = vec->common.size - skipbytes;
    SysIOVec* iov;
    ErtsIOQBinary** binv;
    ErtsIOQBinary* b;

    if (q == NULL)
	return -1;

    ASSERT(vec->common.size >= skipbytes);
    if (vec->common.size <= skipbytes)
	return 0;

    n = skip(vec, skipbytes, &iov, &binv, &len);

    if (n < 0)
        return n;

    if (q->v_head - n < q->v_start)
	if (expandq(q, n, 0))
            return -1;

    /* Queue and reference all binaries (remove zero length items) */
    iov += (n-1);  /* move to end */
    binv += (n-1); /* move to end */
    while(n--) {
	if ((len = iov->iov_len) > 0) {
	    if ((b = *binv) == NULL) { /* special case create binary ! */
                if (q->driver) {
                    ErlDrvBinary *bin = driver_alloc_binary(len);
                    if (!bin) return -1;
                    sys_memcpy(bin->orig_bytes, iov->iov_base, len);
                    b = (ErtsIOQBinary *)bin;
                    q->v_head->iov_base = bin->orig_bytes;
                }
		*--q->b_head = b;
		q->v_head--;
		q->v_head->iov_len = len;
	    }
	    else {
                if (q->driver)
                    driver_binary_inc_refc(&b->driver);
                else
                    erts_refc_inc(&b->nif.intern.refc, 1);
		*--q->b_head = b;
		*--q->v_head = *iov;
	    }
	}
	iov--;
	binv--;
    }
    q->size += size;      /* update total size in queue */
    return 0;
}


/*
** Remove size bytes from queue head
** Return number of bytes that remain in queue
*/
int erts_ioq_deq(ErtsIOQueue *q, Uint size)
{
    Uint len;

    if ((q == NULL) || (q->size < size))
	return -1;
    q->size -= size;
    while (size > 0) {
	ASSERT(q->v_head != q->v_tail);

	len = q->v_head->iov_len;
	if (len <= size) {
	    size -= len;
            free_binary(*q->b_head, q->driver);
	    *q->b_head++ = NULL;
	    q->v_head++;
	}
	else {
	    q->v_head->iov_base = ((char *)(q->v_head->iov_base)) + size;
	    q->v_head->iov_len -= size;
	    size = 0;
	}
    }

    /* restart pointers (optimised for enq) */
    if (q->v_head == q->v_tail) {
	q->v_head = q->v_tail = q->v_start;
	q->b_head = q->b_tail = q->b_start;
    }
    return 0;
}


Uint erts_ioq_peekqv(ErtsIOQueue *q, ErtsIOVec *ev) {
    ASSERT(ev);

    if (! q) {
	return (Uint) -1;
    } else {
	if ((ev->common.vsize = q->v_tail - q->v_head) == 0) {
	    ev->common.size = 0;
	    ev->common.iov = NULL;
	    ev->common.binv = NULL;
	} else {
	    ev->common.size = q->size;
	    ev->common.iov = q->v_head;
	    ev->common.binv = q->b_head;
	}
	return q->size;
    }
}

SysIOVec* erts_ioq_peekq(ErtsIOQueue *q, int* vlenp)  /* length of io-vector */
{

    if (q == NULL) {
	*vlenp = -1;
	return NULL;
    }
    if ((*vlenp = (q->v_tail - q->v_head)) == 0)
	return NULL;
    return q->v_head;
}

/* Fills a possibly deep list of chars and binaries into vec
** Small characters are first stored in the buffer buf of length ln
** binaries found are copied and linked into msoh
** Return  vector length on succsess,
**        -1 on overflow
**        -2 on type error
*/

static ERTS_INLINE void
io_list_to_vec_set_vec(SysIOVec **iov, ErtsIOQBinary ***binv,
                       ErtsIOQBinary *bin, byte *ptr, Uint len,
                       int *vlen)
{
    while (len > MAX_SYSIOVEC_IOVLEN) {
        (*iov)->iov_base = ptr;
        (*iov)->iov_len = MAX_SYSIOVEC_IOVLEN;
        ptr += MAX_SYSIOVEC_IOVLEN;
        len -= MAX_SYSIOVEC_IOVLEN;
        (*iov)++;
        (*vlen)++;
        *(*binv)++ = bin;
    }
    (*iov)->iov_base = ptr;
    (*iov)->iov_len = len;
    *(*binv)++ = bin;
    (*iov)++;
    (*vlen)++;
}

int
erts_ioq_iolist_to_vec(Eterm obj,	  /* io-list */
                       SysIOVec* iov,	  /* io vector */
                       ErtsIOQBinary** binv,       /* binary reference vector */
                       ErtsIOQBinary* cbin,        /* binary to store characters */
                       Uint bin_limit,  /* small binaries limit */
                       int driver)
{
    DECLARE_ESTACK(s);
    Eterm* objp;
    byte *buf  = NULL;
    Uint len = 0;
    Uint csize  = 0;
    int vlen   = 0;
    byte* cptr;

    if (cbin) {
        if (driver) {
            buf = (byte*)cbin->driver.orig_bytes;
            len = cbin->driver.orig_size;
        } else {
            buf = (byte*)cbin->nif.orig_bytes;
            len = cbin->nif.orig_size;
        }
    }
    cptr = buf;

    goto L_jump_start;  /* avoid push */

    while (!ESTACK_ISEMPTY(s)) {
	obj = ESTACK_POP(s);
    L_jump_start:
	if (is_list(obj)) {
	L_iter_list:
	    objp = list_val(obj);
	    obj = CAR(objp);
	    if (is_byte(obj)) {
		if (len == 0)
		    goto L_overflow;
		*buf++ = unsigned_val(obj);
		csize++;
		len--;
	    } else if (is_binary(obj)) {
		ESTACK_PUSH(s, CDR(objp));
		goto handle_binary;
	    } else if (is_list(obj)) {
		ESTACK_PUSH(s, CDR(objp));
		goto L_iter_list;    /* on head */
	    } else if (!is_nil(obj)) {
		goto L_type_error;
	    }
	    obj = CDR(objp);
	    if (is_list(obj))
		goto L_iter_list; /* on tail */
	    else if (is_binary(obj)) {
		goto handle_binary;
	    } else if (!is_nil(obj)) {
		goto L_type_error;
	    }
	} else if (is_binary(obj)) {
	    Eterm real_bin;
	    Uint offset;
	    Eterm* bptr;
	    Uint size;
	    int bitoffs;
	    int bitsize;

	handle_binary:
	    size = binary_size(obj);
	    ERTS_GET_REAL_BIN(obj, real_bin, offset, bitoffs, bitsize);
	    ASSERT(bitsize == 0);
	    bptr = binary_val(real_bin);
	    if (*bptr == HEADER_PROC_BIN) {
		ProcBin* pb = (ProcBin *) bptr;
		if (bitoffs != 0) {
		    if (len < size) {
			goto L_overflow;
		    }
		    erts_copy_bits(pb->bytes+offset, bitoffs, 1,
				   (byte *) buf, 0, 1, size*8);
		    csize += size;
		    buf += size;
		    len -= size;
		} else if (bin_limit && size < bin_limit) {
		    if (len < size) {
			goto L_overflow;
		    }
		    sys_memcpy(buf, pb->bytes+offset, size);
		    csize += size;
		    buf += size;
		    len -= size;
		} else {
                    ErtsIOQBinary *qbin;
		    if (csize != 0) {
                        io_list_to_vec_set_vec(&iov, &binv, cbin,
                                               cptr, csize, &vlen);
			cptr = buf;
			csize = 0;
		    }
		    if (pb->flags) {
			erts_emasculate_writable_binary(pb);
		    }
                    if (driver)
                        qbin = (ErtsIOQBinary*)Binary2ErlDrvBinary(pb->val);
                    else
                        qbin = (ErtsIOQBinary*)pb->val;

                    io_list_to_vec_set_vec(
                        &iov, &binv, qbin,
                        pb->bytes+offset, size, &vlen);
		}
	    } else {
		ErlHeapBin* hb = (ErlHeapBin *) bptr;
		if (len < size) {
		    goto L_overflow;
		}
		copy_binary_to_buffer(buf, 0,
				      ((byte *) hb->data)+offset, bitoffs,
				      8*size);
		csize += size;
		buf += size;
		len -= size;
	    }
	} else if (!is_nil(obj)) {
	    goto L_type_error;
	}
    }

    if (csize != 0) {
        io_list_to_vec_set_vec(&iov, &binv, cbin, cptr, csize, &vlen);
    }

    DESTROY_ESTACK(s);
    return vlen;

 L_type_error:
    DESTROY_ESTACK(s);
    return -2;

 L_overflow:
    DESTROY_ESTACK(s);
    return -1;
}

static ERTS_INLINE int
io_list_vec_count(Eterm obj, Uint *v_size,
                  Uint *c_size, Uint *b_size, Uint *in_clist,
                  Uint *p_v_size, Uint *p_c_size, Uint *p_in_clist,
                  Uint blimit)
{
    Uint size = binary_size(obj);
    Eterm real;
    ERTS_DECLARE_DUMMY(Uint offset);
    int bitoffs;
    int bitsize;
    ERTS_GET_REAL_BIN(obj, real, offset, bitoffs, bitsize);
    if (bitsize != 0) return 1;
    if (thing_subtag(*binary_val(real)) == REFC_BINARY_SUBTAG &&
	bitoffs == 0) {
	*b_size += size;
        if (*b_size < size) return 2;
	*in_clist = 0;
        ++*v_size;
        /* If iov_len is smaller then Uint we split the binary into*/
        /* multiple smaller (2GB) elements in the iolist.*/
	*v_size += size / MAX_SYSIOVEC_IOVLEN;
        if (size >= blimit) {
            *p_in_clist = 0;
            ++*p_v_size;
        } else {
            *p_c_size += size;
            if (!*p_in_clist) {
                *p_in_clist = 1;
                ++*p_v_size;
            }
        }
    } else {
	*c_size += size;
        if (*c_size < size) return 2;
	if (!*in_clist) {
	    *in_clist = 1;
	    ++*v_size;
	}
	*p_c_size += size;
	if (!*p_in_clist) {
	    *p_in_clist = 1;
	    ++*p_v_size;
	}
    }
    return 0;
}

#define IO_LIST_VEC_COUNT(obj)                                          \
    do {                                                                \
        switch (io_list_vec_count(obj, &v_size, &c_size,                \
                                  &b_size, &in_clist,                   \
                                  &p_v_size, &p_c_size, &p_in_clist,    \
                                  blimit)) {                            \
        case 1: goto L_type_error;                                      \
        case 2: goto L_overflow_error;                                  \
        default: break;                                                 \
        }                                                               \
    } while(0)

/* 
 * Returns 0 if successful and a non-zero value otherwise.
 *
 * Return values through pointers:
 *    *vsize      - SysIOVec size needed for a writev
 *    *csize      - Number of bytes not in binary (in the common binary)
 *    *pvsize     - SysIOVec size needed if packing small binaries
 *    *pcsize     - Number of bytes in the common binary if packing
 *    *total_size - Total size of iolist in bytes
 */
int
erts_ioq_iolist_vec_len(Eterm obj, int* vsize, Uint* csize,
                        Uint* pvsize, Uint* pcsize,
                        size_t* total_size, Uint blimit)
{
    DECLARE_ESTACK(s);
    Eterm* objp;
    Uint v_size = 0;
    Uint c_size = 0;
    Uint b_size = 0;
    Uint in_clist = 0;
    Uint p_v_size = 0;
    Uint p_c_size = 0;
    Uint p_in_clist = 0;
    size_t total;

    goto L_jump_start;  /* avoid a push */

    while (!ESTACK_ISEMPTY(s)) {
	obj = ESTACK_POP(s);
    L_jump_start:
	if (is_list(obj)) {
	L_iter_list:
	    objp = list_val(obj);
	    obj = CAR(objp);

	    if (is_byte(obj)) {
		c_size++;
		if (c_size == 0) {
		    goto L_overflow_error;
		}
		if (!in_clist) {
		    in_clist = 1;
		    v_size++;
		}
		p_c_size++;
		if (!p_in_clist) {
		    p_in_clist = 1;
		    p_v_size++;
		}
	    }
	    else if (is_binary(obj)) {
                IO_LIST_VEC_COUNT(obj);
	    }
	    else if (is_list(obj)) {
		ESTACK_PUSH(s, CDR(objp));
		goto L_iter_list;   /* on head */
	    }
	    else if (!is_nil(obj)) {
		goto L_type_error;
	    }

	    obj = CDR(objp);
	    if (is_list(obj))
		goto L_iter_list;   /* on tail */
	    else if (is_binary(obj)) {  /* binary tail is OK */
		IO_LIST_VEC_COUNT(obj);
	    }
	    else if (!is_nil(obj)) {
		goto L_type_error;
	    }
	}
	else if (is_binary(obj)) {
	    IO_LIST_VEC_COUNT(obj);
	}
	else if (!is_nil(obj)) {
	    goto L_type_error;
	}
    }

    total = c_size + b_size;
    if (total < c_size) {
	goto L_overflow_error;
    }
    *total_size = total;

    DESTROY_ESTACK(s);
    *vsize = v_size;
    *csize = c_size;
    *pvsize = p_v_size;
    *pcsize = p_c_size;
    return 0;

 L_type_error:
 L_overflow_error:
    DESTROY_ESTACK(s);
    return 1;
}

typedef struct {
    Eterm result_head;
    Eterm result_tail;
    Eterm input_list;

    UWord acc_size;
    Binary *acc;

    /* We yield after copying this many bytes into the accumulator (Minus
     * eating a few on consing etc). Large binaries will only count to the
     * extent their split (if any) resulted in a copy op. */
    UWord bytereds_available;
    UWord bytereds_spent;

    Process *process;
    ErtsEStack estack;

    Eterm magic_reference;
} iol2v_state_t;

static int iol2v_state_destructor(Binary *data) {
    iol2v_state_t *state = ERTS_MAGIC_BIN_UNALIGNED_DATA(data);

    DESTROY_SAVED_ESTACK(&state->estack);

    if (state->acc != NULL) {
        erts_bin_free(state->acc);
    }

    return 1;
}

static void iol2v_init(iol2v_state_t *state, Process *process, Eterm input) {
    state->process = process;

    state->result_head = NIL;
    state->result_tail = NIL;
    state->input_list = input;

    state->magic_reference = NIL;
    state->acc_size = 0;
    state->acc = NULL;

    CLEAR_SAVED_ESTACK(&state->estack);
}

static Eterm iol2v_make_sub_bin(iol2v_state_t *state, Eterm bin_term,
        UWord offset, UWord size) {
    Uint byte_offset, bit_offset, bit_size;
    ErlSubBin *sb;
    Eterm orig_pb_term;

    sb = (ErlSubBin*)HAlloc(state->process, ERL_SUB_BIN_SIZE);

    ERTS_GET_REAL_BIN(bin_term, orig_pb_term,
        byte_offset, bit_offset, bit_size);

    ASSERT(bit_size == 0);

    sb->thing_word = HEADER_SUB_BIN;
    sb->bitoffs = bit_offset;
    sb->bitsize = 0;
    sb->orig = orig_pb_term;
    sb->is_writable = 0;

    sb->offs = byte_offset + offset;
    sb->size = size;

    return make_binary(sb);
}

static Eterm iol2v_promote_acc(iol2v_state_t *state) {
    Eterm bin;

    bin = erts_build_proc_bin(&MSO(state->process),
                              HAlloc(state->process, PROC_BIN_SIZE),
                              erts_bin_realloc(state->acc, state->acc_size));
    state->acc_size = 0;
    state->acc = NULL;

    return bin;
}

/* Destructively enqueues a term to the result list, saving us the hassle of
 * having to reverse it later. This is safe since GC is disabled and we never
 * leak the unfinished term to the outside. */
static void iol2v_enqueue_result(iol2v_state_t *state, Eterm term) {
    Eterm prev_tail;
    Eterm *hp;

    prev_tail = state->result_tail;

    hp = HAlloc(state->process, 2);
    state->result_tail = CONS(hp, term, NIL);

    if(prev_tail != NIL) {
        Eterm *prev_cell = list_val(prev_tail);
        CDR(prev_cell) = state->result_tail;
    } else {
        state->result_head = state->result_tail;
    }

    state->bytereds_spent += 1;
}

#ifndef DEBUG
    #define ACC_REALLOCATION_LIMIT (IOL2V_SMALL_BIN_LIMIT * 32)
#else
    #define ACC_REALLOCATION_LIMIT (IOL2V_SMALL_BIN_LIMIT * 4)
#endif

static void iol2v_expand_acc(iol2v_state_t *state, UWord extra) {
    UWord required_bytes, acc_alloc_size;

    ERTS_CT_ASSERT(ERTS_UWORD_MAX > ACC_REALLOCATION_LIMIT / 2);
    ASSERT(extra >= 1);

    acc_alloc_size = state->acc != NULL ? (state->acc)->orig_size : 0;
    required_bytes = state->acc_size + extra;

    if (state->acc == NULL) {
        UWord new_size = MAX(required_bytes, IOL2V_SMALL_BIN_LIMIT);

        state->acc = erts_bin_nrml_alloc(new_size);
    } else if (required_bytes > acc_alloc_size) {
        Binary *prev_acc;
        UWord new_size;

        if (acc_alloc_size >= ACC_REALLOCATION_LIMIT) {
            /* We skip reallocating once we hit a certain point; it often
             * results in extra copying and we're very likely to overallocate
             * on anything other than absurdly long byte/heapbin sequences. */
            iol2v_enqueue_result(state, iol2v_promote_acc(state));
            iol2v_expand_acc(state, extra);
            return;
        }

        new_size = MAX(required_bytes, acc_alloc_size * 2);
        prev_acc = state->acc;

        state->acc = erts_bin_realloc(prev_acc, new_size);

        if (prev_acc != state->acc) {
            state->bytereds_spent += state->acc_size;
        }
    }

    state->bytereds_spent += extra;
}

static int iol2v_append_byte_seq(iol2v_state_t *state, Eterm seq_start, Eterm *seq_end) {
    Eterm lookahead, iterator;
    Uint observed_bits;
    SWord seq_length;
    char *acc_data;

    lookahead = seq_start;
    seq_length = 0;

    ASSERT(state->bytereds_available > state->bytereds_spent);

    while (is_list(lookahead)) {
        Eterm *cell = list_val(lookahead);

        if (!is_small(CAR(cell))) {
            break;
        }

        if (seq_length * 2 >= (state->bytereds_available - state->bytereds_spent)) {
            break;
        }

        lookahead = CDR(cell);
        seq_length += 1;
    }

    ASSERT(seq_length >= 1);

    iol2v_expand_acc(state, seq_length);

    /* Bump a few extra reductions to account for list traversal. */
    state->bytereds_spent += seq_length;

    acc_data = &(state->acc)->orig_bytes[state->acc_size];
    state->acc_size += seq_length;

    iterator = seq_start;
    observed_bits = 0;

    while (iterator != lookahead) {
        Eterm *cell;
        Uint byte;

        cell = list_val(iterator);
        iterator = CDR(cell);

        byte = unsigned_val(CAR(cell));
        observed_bits |= byte;

        ASSERT(acc_data < &(state->acc)->orig_bytes[state->acc_size]);
        *(acc_data++) = byte;
    }

    if (observed_bits > UCHAR_MAX) {
        return 0;
    }

    ASSERT(acc_data == &(state->acc)->orig_bytes[state->acc_size]);
    *seq_end = iterator;

    return 1;
}

static int iol2v_append_binary(iol2v_state_t *state, Eterm bin_term) {
    int is_acc_small, is_bin_small;
    UWord combined_size;
    UWord binary_size;

    Uint byte_offset, bit_offset, bit_size;
    byte *binary_data;

    Eterm *parent_header;
    Eterm parent_binary;

    ASSERT(state->bytereds_available > state->bytereds_spent);

    ERTS_GET_REAL_BIN(bin_term, parent_binary, byte_offset, bit_offset, bit_size);
    parent_header = binary_val(parent_binary);
    binary_size = binary_size(bin_term);

    if (bit_size != 0) {
        return 0;
    } else if (binary_size == 0) {
        state->bytereds_spent += 1;
        return 1;
    }

    is_acc_small = state->acc_size < IOL2V_SMALL_BIN_LIMIT;
    is_bin_small = binary_size < IOL2V_SMALL_BIN_LIMIT;
    combined_size = binary_size + state->acc_size;

    if (thing_subtag(*parent_header) == REFC_BINARY_SUBTAG) {
        ProcBin *pb = (ProcBin*)parent_header;

        if (pb->flags) {
            erts_emasculate_writable_binary(pb);
        }

        binary_data = &((byte*)pb->bytes)[byte_offset];
    } else {
        ErlHeapBin *hb = (ErlHeapBin*)parent_header;

        ASSERT(thing_subtag(*parent_header) == HEAP_BINARY_SUBTAG);
        ASSERT(is_bin_small);

        binary_data = &((byte*)&hb->data)[byte_offset];
    }

    if (!is_bin_small && (state->acc_size == 0 || !is_acc_small)) {
        /* Avoid combining if we encounter an acceptably large binary while the
         * accumulator is either empty or large enough to be returned on its
         * own. */
        if (state->acc_size != 0) {
            iol2v_enqueue_result(state, iol2v_promote_acc(state));
        }

        iol2v_enqueue_result(state, bin_term);
    } else if (is_bin_small || combined_size < (IOL2V_SMALL_BIN_LIMIT * 2)) {
        /* If the candidate is small or we can't split the combination in two,
         * then just copy it into the accumulator. */
        iol2v_expand_acc(state, binary_size);

        if (ERTS_LIKELY(bit_offset == 0)) {
            sys_memcpy(&(state->acc)->orig_bytes[state->acc_size],
                binary_data, binary_size);
        } else {
            ASSERT(binary_size <= ERTS_UWORD_MAX / 8);

            erts_copy_bits(binary_data, bit_offset, 1,
                (byte*)&(state->acc)->orig_bytes[state->acc_size], 0, 1,
                binary_size * 8);
        }

        state->acc_size += binary_size;
    } else {
        /* Otherwise, append enough data for the accumulator to be valid, and
         * then return the rest as a sub-binary. */
        UWord spill = IOL2V_SMALL_BIN_LIMIT - state->acc_size;
        Eterm binary_tail;

        iol2v_expand_acc(state, spill);

        if (ERTS_LIKELY(bit_offset == 0)) {
            sys_memcpy(&(state->acc)->orig_bytes[state->acc_size],
                binary_data, spill);
        } else {
            ASSERT(binary_size <= ERTS_UWORD_MAX / 8);

            erts_copy_bits(binary_data, bit_offset, 1,
                (byte*)&(state->acc)->orig_bytes[state->acc_size], 0, 1,
                spill * 8);
        }

        state->acc_size += spill;

        binary_tail = iol2v_make_sub_bin(state, bin_term, spill,
            binary_size - spill);

        iol2v_enqueue_result(state, iol2v_promote_acc(state));
        iol2v_enqueue_result(state, binary_tail);
    }

    return 1;
}

static BIF_RETTYPE iol2v_yield(iol2v_state_t *state) {
    if (is_nil(state->magic_reference)) {
        iol2v_state_t *boxed_state;
        Binary *magic_binary;
        Eterm *hp;

        magic_binary = erts_create_magic_binary_x(sizeof(*state),
            &iol2v_state_destructor, ERTS_ALC_T_BINARY, 1);

        boxed_state = ERTS_MAGIC_BIN_UNALIGNED_DATA(magic_binary);
        sys_memcpy(boxed_state, state, sizeof(*state));

        hp = HAlloc(boxed_state->process, ERTS_MAGIC_REF_THING_SIZE);
        boxed_state->magic_reference =
            erts_mk_magic_ref(&hp, &MSO(boxed_state->process), magic_binary);

        state = boxed_state;
    }

    ERTS_BIF_YIELD1(bif_export[BIF_iolist_to_iovec_1],
        state->process, state->magic_reference);
}

static BIF_RETTYPE iol2v_continue(iol2v_state_t *state) {
    Eterm iterator;

    DECLARE_ESTACK(s);
    ESTACK_CHANGE_ALLOCATOR(s, ERTS_ALC_T_SAVED_ESTACK);

    state->bytereds_available =
        ERTS_BIF_REDS_LEFT(state->process) * IOL2V_SMALL_BIN_LIMIT;
    state->bytereds_spent = 0;

    if (state->estack.start) {
        ESTACK_RESTORE(s, &state->estack);
    }

    iterator = state->input_list;

    for(;;) {
        if (state->bytereds_spent >= state->bytereds_available) {
            ESTACK_SAVE(s, &state->estack);
            state->input_list = iterator;

            return iol2v_yield(state);
        }

        while (is_list(iterator)) {
            Eterm *cell;
            Eterm head;

            cell = list_val(iterator);
            head = CAR(cell);

            if (is_binary(head)) {
                if (!iol2v_append_binary(state, head)) {
                    goto l_badarg;
                }

                iterator = CDR(cell);
            } else if (is_small(head)) {
                Eterm seq_end;

                if (!iol2v_append_byte_seq(state, iterator, &seq_end)) {
                    goto l_badarg;
                }

                iterator = seq_end;
            } else if (is_list(head) || is_nil(head)) {
                Eterm tail = CDR(cell);

                if (!is_nil(tail)) {
                    ESTACK_PUSH(s, tail);
                }

                state->bytereds_spent += 1;
                iterator = head;
            } else {
                goto l_badarg;
            }

            if (state->bytereds_spent >= state->bytereds_available) {
                ESTACK_SAVE(s, &state->estack);
                state->input_list = iterator;

                return iol2v_yield(state);
            }
        }

        if (is_binary(iterator)) {
            if (!iol2v_append_binary(state, iterator)) {
                goto l_badarg;
            }
        } else if (!is_nil(iterator)) {
            goto l_badarg;
        }

        if(ESTACK_ISEMPTY(s)) {
            break;
        }

        iterator = ESTACK_POP(s);
    }

    if (state->acc_size != 0) {
        iol2v_enqueue_result(state, iol2v_promote_acc(state));
    }

    BUMP_REDS(state->process, state->bytereds_spent / IOL2V_SMALL_BIN_LIMIT);

    CLEAR_SAVED_ESTACK(&state->estack);
    DESTROY_ESTACK(s);

    BIF_RET(state->result_head);

l_badarg:
    CLEAR_SAVED_ESTACK(&state->estack);
    DESTROY_ESTACK(s);

    if (state->acc != NULL) {
        erts_bin_free(state->acc);
        state->acc = NULL;
    }

    BIF_ERROR(state->process, BADARG);
}

HIPE_WRAPPER_BIF_DISABLE_GC(iolist_to_iovec, 1)

BIF_RETTYPE iolist_to_iovec_1(BIF_ALIST_1) {
    BIF_RETTYPE result;

    if (is_nil(BIF_ARG_1)) {
        BIF_RET(NIL);
    } else if (is_binary(BIF_ARG_1)) {
        if (binary_bitsize(BIF_ARG_1) != 0) {
            ASSERT(!(BIF_P->flags & F_DISABLE_GC));
            BIF_ERROR(BIF_P, BADARG);
        } else if (binary_size(BIF_ARG_1) != 0) {
            Eterm *hp = HAlloc(BIF_P, 2);

            BIF_RET(CONS(hp, BIF_ARG_1, NIL));
        } else {
            BIF_RET(NIL);
        }
    } else if (is_internal_magic_ref(BIF_ARG_1)) {
        iol2v_state_t *state;
        Binary *magic;

        magic = erts_magic_ref2bin(BIF_ARG_1);

        if (ERTS_MAGIC_BIN_DESTRUCTOR(magic) != &iol2v_state_destructor) {
            ASSERT(!(BIF_P->flags & F_DISABLE_GC));
            BIF_ERROR(BIF_P, BADARG);
        }

        ASSERT(BIF_P->flags & F_DISABLE_GC);

        state = ERTS_MAGIC_BIN_UNALIGNED_DATA(magic);
        result = iol2v_continue(state);
    } else if (!is_list(BIF_ARG_1)) {
        ASSERT(!(BIF_P->flags & F_DISABLE_GC));
        BIF_ERROR(BIF_P, BADARG);
    } else {
        iol2v_state_t state;

        iol2v_init(&state, BIF_P, BIF_ARG_1);

        erts_set_gc_state(BIF_P, 0);

        result = iol2v_continue(&state);
    }

    if (result != THE_NON_VALUE || BIF_P->freason != TRAP) {
        erts_set_gc_state(BIF_P, 1);
    }

    BIF_RET(result);
}