aboutsummaryrefslogtreecommitdiffstats
path: root/erts
diff options
context:
space:
mode:
authorLukas Larsson <[email protected]>2017-02-24 19:38:37 +0100
committerLukas Larsson <[email protected]>2017-09-05 14:35:26 +0200
commit314702f86c9199957e40edfb73bcdbddb422f9d7 (patch)
tree08421d1740b23964f62eaefa74321528897bb06a /erts
parent520310feef0d76c0f99447187f3574408b219ae5 (diff)
downloadotp-314702f86c9199957e40edfb73bcdbddb422f9d7.tar.gz
otp-314702f86c9199957e40edfb73bcdbddb422f9d7.tar.bz2
otp-314702f86c9199957e40edfb73bcdbddb422f9d7.zip
erts: Add nif ioq
Diffstat (limited to 'erts')
-rw-r--r--erts/doc/src/erl_nif.xml251
-rw-r--r--erts/emulator/Makefile.in3
-rw-r--r--erts/emulator/beam/erl_driver.h19
-rw-r--r--erts/emulator/beam/erl_drv_nif.h25
-rw-r--r--erts/emulator/beam/erl_io_queue.c738
-rw-r--r--erts/emulator/beam/erl_io_queue.h201
-rw-r--r--erts/emulator/beam/erl_nif.c365
-rw-r--r--erts/emulator/beam/erl_nif.h23
-rw-r--r--erts/emulator/beam/erl_nif_api_funcs.h25
-rw-r--r--erts/emulator/beam/erl_port.h22
-rw-r--r--erts/emulator/beam/io.c844
-rw-r--r--erts/emulator/beam/utils.c1
-rw-r--r--erts/emulator/test/nif_SUITE.erl184
-rw-r--r--erts/emulator/test/nif_SUITE_data/nif_SUITE.c241
14 files changed, 2170 insertions, 772 deletions
diff --git a/erts/doc/src/erl_nif.xml b/erts/doc/src/erl_nif.xml
index 5a69bed34c..e47bb6a806 100644
--- a/erts/doc/src/erl_nif.xml
+++ b/erts/doc/src/erl_nif.xml
@@ -344,6 +344,81 @@ return term;</code>
<c>enif_convert_time_unit()</c></seealso></item>
</list>
</item>
+ <tag><marker id="enif_ioq"/>I/O Queues</tag>
+ <item>
+ <p>The Erlang nif library contains function for easily working
+ with I/O vectors as used by the unix system call <c>writev</c>.
+ The I/O Queue is not thread safe, so some other synchronization
+ mechanism has to be used.</p>
+ <list type="bulleted">
+ <item><seealso marker="#SysIOVec">
+ <c>SysIOVec</c></seealso></item>
+ <item><seealso marker="#ErlNifIOVec">
+ <c>ErlNifIOVec</c></seealso></item>
+ <item><seealso marker="#enif_ioq_create">
+ <c>enif_ioq_create()</c></seealso></item>
+ <item><seealso marker="#enif_ioq_destroy">
+ <c>enif_ioq_destroy()</c></seealso></item>
+ <item><seealso marker="#enif_ioq_enq_binary">
+ <c>enif_ioq_enq_binary()</c></seealso></item>
+ <item><seealso marker="#enif_ioq_enqv">
+ <c>enif_ioq_enqv()</c></seealso></item>
+ <item><seealso marker="#enif_ioq_deq">
+ <c>enif_ioq_deq()</c></seealso></item>
+ <item><seealso marker="#enif_ioq_peek">
+ <c>enif_ioq_peek()</c></seealso></item>
+ <item><seealso marker="#enif_inspect_iovec">
+ <c>enif_inspect_iovec()</c></seealso></item>
+ <item><seealso marker="#enif_free_iovec">
+ <c>enif_free_iovec()</c></seealso></item>
+ </list>
+ <p>Typical usage when writing to a file descriptor looks like this:</p>
+ <code type="none"><![CDATA[
+int writeiovec(ErlNifEnv *env, ERL_NIF_TERM term, ERL_NIF_TERM *tail,
+ ErlNifIOQueue *q, int fd) {
+
+ ErlNifIOVec vec, *iovec = &vec;
+ SysIOVec *sysiovec;
+ int saved_errno;
+ int iovcnt, n;
+
+ if (!enif_inspect_iovec(env, 64, term, tail, &iovec))
+ return -2;
+
+ if (enif_ioq_size(q) > 0) {
+ /* If the I/O queue contains data we enqueue the iovec and
+ then peek the data to write out of the queue. */
+ if (!enif_ioq_enqv(q, iovec, 0))
+ return -3;
+
+ sysiovec = enif_ioq_peek(q, &iovcnt);
+ } else {
+ /* If the I/O queue is empty we skip the trip through it. */
+ iovcnt = iovec->iovcnt;
+ sysiovec = iovec->iov;
+ }
+
+ /* Attempt to write the data */
+ n = writev(fd, sysiovec, iovcnt);
+ saved_errno = errno;
+
+ if (enif_ioq_size(q) == 0) {
+ /* If the I/O queue was initially empty we enqueue any
+ remaining data into the queue for writing later. */
+ if (n >= 0 && !enif_ioq_enqv(q, iovec, n))
+ return -3;
+ } else {
+ /* Dequeue any data that was written from the queue. */
+ if (n > 0 && !enif_ioq_deq(q, n, NULL))
+ return -4;
+ }
+
+ /* return n, which is either number of bytes written or -1 if
+ some error happened */
+ errno = saved_errno;
+ return n;
+}]]></code>
+ </item>
<tag><marker id="lengthy_work"/>Long-running NIFs</tag>
<item>
<p>As mentioned in the <seealso marker="#WARNING">warning</seealso> text
@@ -837,6 +912,36 @@ typedef enum {
</item>
</taglist>
</item>
+ <tag><marker id="SysIOVec"/><c>SysIOVec</c></tag>
+ <item>
+ <p>A system I/O vector, as used by <c>writev</c> on
+ Unix and <c>WSASend</c> on Win32. It is used in
+ <c>ErlNifIOVec</c> and by
+ <seealso marker="#enif_ioq_peek"><c>enif_ioq_peek</c></seealso>.</p>
+ </item>
+ <tag><marker id="ErlNifIOVec"/><c>ErlNifIOVec</c></tag>
+ <item>
+ <code type="none">
+typedef struct {
+ int iovcnt;
+ size_t size;
+ SysIOVec* iov;
+} ErlNifIOVec;</code>
+ <p>An I/O vector containing <c>iovcnt</c> <c>SysIOVec</c>s
+ pointing to the data. It is used by
+ <seealso marker="#enif_inspect_iovec">
+ <c>enif_inspect_iovec</c></seealso> and
+ <seealso marker="#enif_ioq_enqv">
+ <c>enif_ioq_enqv</c></seealso>.</p>
+ </item>
+ <tag><marker id="ErlNifIOQueueOpts"/><c>ErlNifIOQueueOpts</c></tag>
+ <item>
+ Options to configure a <c>ErlNifIOQueue</c>.
+ <taglist>
+ <tag>ERL_NIF_IOQ_NORMAL</tag>
+ <item><p>Create a normal I/O Queue</p></item>
+ </taglist>
+ </item>
</taglist>
</section>
@@ -1143,6 +1248,31 @@ typedef enum {
</func>
<func>
+ <name><ret>void</ret>
+ <nametext>enif_free_iovec(ErlNifIOvec* iov)</nametext></name>
+ <fsummary>Free an ErlIOVec</fsummary>
+ <desc>
+ <p>Frees an io vector returned from
+ <seealso marker="#enif_inspect_iovec">
+ <c>enif_inspect_iovec</c></seealso>.
+ This is needed only if a <c>NULL</c> environment is passed to
+ <seealso marker="#enif_inspect_iovec">
+ <c>enif_inspect_iovec</c></seealso>.</p>
+ <code type="none"><![CDATA[
+ErlNifIOVec *iovec = NULL;
+size_t max_elements = 128;
+ERL_NIF_TERM tail;
+if (!enif_inspect_iovec(NULL, max_elements, term, &tail, iovec))
+ return 0;
+
+// Do things with the iovec
+
+/* Free the iovector, possibly in another thread or nif function call */
+enif_free_iovec(iovec);]]></code>
+ </desc>
+ </func>
+
+ <func>
<name><ret>int</ret><nametext>enif_get_atom(ErlNifEnv* env, ERL_NIF_TERM
term, char* buf, unsigned size, ErlNifCharEncoding encode)</nametext>
</name>
@@ -1449,6 +1579,127 @@ typedef enum {
</func>
<func>
+ <name><ret>int</ret><nametext>enif_inspect_iovec(ErlNifEnv*
+ env, size_t max_elements, ERL_NIF_TERM iovec_term, ERL_NIF_TERM* tail,
+ ErlNifIOVec** iovec)</nametext></name>
+ <fsummary>Inspect a list of binaries as an ErlNifIOVec.</fsummary>
+ <desc>
+ <p>Fills <c>iovec</c> with the list of binaries provided in
+ <c>iovec_term</c>. The number of elements handled in the call is
+ limited to <c>max_elements</c>, and <c>tail</c> is set to the
+ remainder of the list. Note that the output may be longer than
+ <c>max_elements</c> on some platforms.
+ </p>
+ <p>To create a list of binaries from an arbitrary iolist, use
+ <seealso marker="erts:erlang#iolist_to_iovec/1">
+ <c>erlang:iolist_to_iovec/1</c></seealso>.</p>
+ <p>When calling this function, <c>iovec</c> should contain a pointer to
+ <c>NULL</c> or a ErlNifIOVec structure that should be used if
+ possible. e.g.
+ </p>
+ <code type="none">
+/* Don't use a pre-allocated structure */
+ErlNifIOVec *iovec = NULL;
+enif_inspect_iovec(env, max_elements, term, &amp;tail, &amp;iovec);
+
+/* Use a stack-allocated vector as an optimization for vectors with few elements */
+ErlNifIOVec vec, *iovec = &amp;vec;
+enif_inspect_iovec(env, max_elements, term, &amp;tail, &amp;iovec);
+</code>
+ <p>The contents of the <c>iovec</c> is valid until the called nif
+ function returns. If the <c>iovec</c> should be valid after the nif
+ call returns, it is possible to call this function with a
+ <c>NULL</c> environment. If no environment is given the <c>iovec</c>
+ owns the data in the vector and it has to be explicitly freed using
+ <seealso marker="#enif_free_iovec"><c>enif_free_iovec</c>
+ </seealso>.</p>
+ <p>Returns <c>true</c> on success, or <c>false</c> if <c>iovec_term</c>
+ not an iovec.</p>
+ </desc>
+ </func>
+
+ <func>
+ <name><ret>ErlNifIOQueue *</ret>
+ <nametext>enif_ioq_create(ErlNifIOQueueOpts opts)</nametext></name>
+ <fsummary>Create a new IO Queue</fsummary>
+ <desc>
+ <p>Create a new I/O Queue that can be used to store data.
+ <c>opts</c> has to be set to <c>ERL_NIF_IOQ_NORMAL</c>.
+ </p>
+ </desc>
+ </func>
+
+ <func>
+ <name><ret>void</ret>
+ <nametext>enif_ioq_destroy(ErlNifIOQueue *q)</nametext></name>
+ <fsummary>Destroy an IO Queue and free it's content</fsummary>
+ <desc>
+ <p>Destroy the I/O queue and free all of it's contents</p>
+ </desc>
+ </func>
+
+ <func>
+ <name><ret>int</ret>
+ <nametext>enif_ioq_deq(ErlNifIOQueue *q, size_t count, size_t *size)</nametext></name>
+ <fsummary>Dequeue count bytes from the IO Queue</fsummary>
+ <desc>
+ <p>Dequeue <c>count</c> bytes from the I/O queue.
+ If <c>size</c> is not <c>NULL</c>, the new size of the queue
+ is placed there.</p>
+ <p>Returns <c>true</c> on success, or <c>false</c> if the I/O does
+ not contain <c>count</c> bytes. On failure the queue is left un-altered.</p>
+ </desc>
+ </func>
+
+ <func>
+ <name><ret>int</ret>
+ <nametext>enif_ioq_enq_binary(ErlNifIOQueue *q, ErlNifBinary *bin, size_t skip)</nametext></name>
+ <fsummary>Enqueue the binary into the IO Queue</fsummary>
+ <desc>
+ <p>Enqueue the <c>bin</c> into <c>q</c> skipping the first <c>skip</c> bytes.</p>
+ <p>Returns <c>true</c> on success, or <c>false</c> if <c>skip</c> is greater
+ than the size of <c>bin</c>. Any ownership of the binary data is transferred
+ to the queue and <c>bin</c> is to be considered read-only for the rest of the NIF
+ call and then as released.</p>
+ </desc>
+ </func>
+
+ <func>
+ <name><ret>int</ret>
+ <nametext>enif_ioq_enqv(ErlNifIOQueue *q, ErlNifIOVec *iovec, size_t skip)</nametext></name>
+ <fsummary>Enqueue the iovec into the IO Queue</fsummary>
+ <desc>
+ <p>Enqueue the <c>iovec</c> into <c>q</c> skipping the first <c>skip</c> bytes.</p>
+ <p>Returns <c>true</c> on success, or <c>false</c> if <c>skip</c> is greater
+ than the size of <c>iovec</c>.</p>
+ </desc>
+ </func>
+
+ <func>
+ <name><ret>SysIOVec *</ret>
+ <nametext>enif_ioq_peek(ErlNifIOQueue *q, int *iovlen)</nametext></name>
+ <fsummary>Peek inside the IO Queue</fsummary>
+ <desc>
+ <p>Get the I/O queue as a pointer to an array of <c>SysIOVec</c>s.
+ It also returns the number of elements in <c>iovlen</c>.
+ This is the only way to get data out of the queue.</p>
+ <p>Nothing is removed from the queue by this function, that must be done
+ with <seealso marker="#enif_ioq_deq"><c>enif_ioq_deq</c></seealso>.</p>
+ <p>The returned array is suitable to use with the Unix system
+ call <c>writev</c>.</p>
+ </desc>
+ </func>
+
+ <func>
+ <name><ret>size_t</ret>
+ <nametext>enif_ioq_size(ErlNifIOQueue *q)</nametext></name>
+ <fsummary>Get the current size of the IO Queue</fsummary>
+ <desc>
+ <p>Get the size of <c>q</c>.</p>
+ </desc>
+ </func>
+
+ <func>
<name><ret>int</ret>
<nametext>enif_is_atom(ErlNifEnv* env, ERL_NIF_TERM term)</nametext>
</name>
diff --git a/erts/emulator/Makefile.in b/erts/emulator/Makefile.in
index 1ae905276d..cc1fa571df 100644
--- a/erts/emulator/Makefile.in
+++ b/erts/emulator/Makefile.in
@@ -815,7 +815,8 @@ RUN_OBJS = \
$(OBJDIR)/erl_bif_binary.o $(OBJDIR)/erl_ao_firstfit_alloc.o \
$(OBJDIR)/erl_thr_queue.o $(OBJDIR)/erl_sched_spec_pre_alloc.o \
$(OBJDIR)/erl_ptab.o $(OBJDIR)/erl_map.o \
- $(OBJDIR)/erl_msacc.o $(OBJDIR)/erl_lock_flags.o
+ $(OBJDIR)/erl_msacc.o $(OBJDIR)/erl_lock_flags.o \
+ $(OBJDIR)/erl_io_queue.o
LTTNG_OBJS = $(OBJDIR)/erlang_lttng.o
NIF_OBJS = $(OBJDIR)/erl_tracer_nif.o
diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h
index 0e8ebf0c98..5ad616fec3 100644
--- a/erts/emulator/beam/erl_driver.h
+++ b/erts/emulator/beam/erl_driver.h
@@ -40,7 +40,6 @@
#include "erl_drv_nif.h"
#include <stdlib.h>
-#include <sys/types.h> /* ssize_t */
#if defined(__WIN32__) || defined(_WIN32) || defined(_WIN32_)
#ifndef STATIC_ERLANG_DRIVER
@@ -48,24 +47,6 @@
#define ERL_DRIVER_TYPES_ONLY
#define WIN32_DYNAMIC_ERL_DRIVER
#endif
-/*
- * This structure can be cast to a WSABUF structure.
- */
-typedef struct _SysIOVec {
- unsigned long iov_len;
- char* iov_base;
-} SysIOVec;
-#else /* Unix */
-# ifdef HAVE_SYS_UIO_H
-# include <sys/types.h>
-# include <sys/uio.h>
-typedef struct iovec SysIOVec;
-# else
-typedef struct {
- char* iov_base;
- size_t iov_len;
-} SysIOVec;
-# endif
#endif
#ifndef EXTERN
diff --git a/erts/emulator/beam/erl_drv_nif.h b/erts/emulator/beam/erl_drv_nif.h
index f88138063e..31b4817fb1 100644
--- a/erts/emulator/beam/erl_drv_nif.h
+++ b/erts/emulator/beam/erl_drv_nif.h
@@ -144,8 +144,25 @@ typedef signed int ErlNapiSInt;
#define ERTS_NAPI_USEC__ 2
#define ERTS_NAPI_NSEC__ 3
-#endif /* __ERL_DRV_NIF_H__ */
-
-
-
+#if (defined(__WIN32__) || defined(_WIN32) || defined(_WIN32_))
+/*
+ * This structure can be cast to a WSABUF structure.
+ */
+typedef struct _SysIOVec {
+ unsigned long iov_len;
+ char* iov_base;
+} SysIOVec;
+#else /* Unix */
+# include <sys/types.h>
+# ifdef HAVE_SYS_UIO_H
+# include <sys/uio.h>
+typedef struct iovec SysIOVec;
+# else
+typedef struct {
+ char* iov_base;
+ size_t iov_len;
+} SysIOVec;
+# endif
+#endif
+#endif /* __ERL_DRV_NIF_H__ */
diff --git a/erts/emulator/beam/erl_io_queue.c b/erts/emulator/beam/erl_io_queue.c
new file mode 100644
index 0000000000..d072376d1c
--- /dev/null
+++ b/erts/emulator/beam/erl_io_queue.c
@@ -0,0 +1,738 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2017. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "sys.h"
+#include "global.h"
+#include "erl_bits.h"
+#include "erl_io_queue.h"
+
+static void free_binary(ErtsIOQBinary *b, int driver);
+static ErtsIOQBinary *alloc_binary(Uint size, char *source, void **iov_base, int driver);
+
+void erts_ioq_init(ErtsIOQueue *q, ErtsAlcType_t alct, int driver)
+{
+
+ ERTS_CT_ASSERT(offsetof(ErlNifIOVec,flags) == sizeof(ErtsIOVecCommon));
+ ERTS_CT_ASSERT(sizeof(ErlIOVec) == sizeof(ErtsIOVecCommon));
+ ERTS_CT_ASSERT(sizeof(size_t) == sizeof(ErlDrvSizeT));
+ ERTS_CT_ASSERT(sizeof(size_t) == sizeof(Uint));
+
+ q->alct = alct;
+ q->driver = driver;
+ q->size = 0;
+ q->v_head = q->v_tail = q->v_start = q->v_small;
+ q->v_end = q->v_small + ERTS_SMALL_IO_QUEUE;
+ q->b_head = q->b_tail = q->b_start = q->b_small;
+ q->b_end = q->b_small + ERTS_SMALL_IO_QUEUE;
+}
+
+void erts_ioq_clear(ErtsIOQueue *q)
+{
+ ErtsIOQBinary** binp = q->b_head;
+ int driver = q->driver;
+
+ if (q->v_start != q->v_small)
+ erts_free(q->alct, (void *) q->v_start);
+
+ while(binp < q->b_tail) {
+ if (*binp != NULL)
+ free_binary(*binp, driver);
+ binp++;
+ }
+ if (q->b_start != q->b_small)
+ erts_free(q->alct, (void *) q->b_start);
+ q->v_start = q->v_end = q->v_head = q->v_tail = NULL;
+ q->b_start = q->b_end = q->b_head = q->b_tail = NULL;
+ q->size = 0;
+}
+
+static void free_binary(ErtsIOQBinary *b, int driver)
+{
+ if (driver)
+ driver_free_binary(&b->driver);
+ else if (erts_refc_dectest(&b->nif.intern.refc, 0) == 0)
+ erts_bin_free(&b->nif);
+}
+
+static ErtsIOQBinary *alloc_binary(Uint size, char *source, void **iov_base, int driver)
+{
+ if (driver) {
+ ErlDrvBinary *bin = driver_alloc_binary(size);
+ if (!bin) return NULL;
+ sys_memcpy(bin->orig_bytes, source, size);
+ *iov_base = bin->orig_bytes;
+ return (ErtsIOQBinary *)bin;
+ } else {
+ /* This clause can be triggered in enif_ioq_enq_binary is used */
+ Binary *bin = erts_bin_nrml_alloc(size);
+ if (!bin) return NULL;
+ erts_refc_init(&bin->intern.refc, 1);
+ sys_memcpy(bin->orig_bytes, source, size);
+ *iov_base = bin->orig_bytes;
+ return (ErtsIOQBinary *)bin;
+ }
+}
+
+Uint erts_ioq_size(ErtsIOQueue *q)
+{
+ return q->size;
+}
+
+/* expand queue to hold n elements in tail or head */
+static int expandq(ErtsIOQueue* q, int n, int tail)
+/* tail: 0 if make room in head, make room in tail otherwise */
+{
+ int h_sz; /* room before header */
+ int t_sz; /* room after tail */
+ int q_sz; /* occupied */
+ int nvsz;
+ SysIOVec* niov;
+ ErtsIOQBinary** nbinv;
+
+ h_sz = q->v_head - q->v_start;
+ t_sz = q->v_end - q->v_tail;
+ q_sz = q->v_tail - q->v_head;
+
+ if (tail && (n <= t_sz)) /* do we need to expand tail? */
+ return 0;
+ else if (!tail && (n <= h_sz)) /* do we need to expand head? */
+ return 0;
+ else if (n > (h_sz + t_sz)) { /* need to allocate */
+ /* we may get little extra but it ok */
+ nvsz = (q->v_end - q->v_start) + n;
+
+ niov = erts_alloc_fnf(q->alct, nvsz * sizeof(SysIOVec));
+ if (!niov)
+ return -1;
+ nbinv = erts_alloc_fnf(q->alct, nvsz * sizeof(ErtsIOQBinary**));
+ if (!nbinv) {
+ erts_free(q->alct, (void *) niov);
+ return -1;
+ }
+ if (tail) {
+ sys_memcpy(niov, q->v_head, q_sz*sizeof(SysIOVec));
+ if (q->v_start != q->v_small)
+ erts_free(q->alct, (void *) q->v_start);
+ q->v_start = niov;
+ q->v_end = niov + nvsz;
+ q->v_head = q->v_start;
+ q->v_tail = q->v_head + q_sz;
+
+ sys_memcpy(nbinv, q->b_head, q_sz*sizeof(ErtsIOQBinary*));
+ if (q->b_start != q->b_small)
+ erts_free(q->alct, (void *) q->b_start);
+ q->b_start = nbinv;
+ q->b_end = nbinv + nvsz;
+ q->b_head = q->b_start;
+ q->b_tail = q->b_head + q_sz;
+ }
+ else {
+ sys_memcpy(niov+nvsz-q_sz, q->v_head, q_sz*sizeof(SysIOVec));
+ if (q->v_start != q->v_small)
+ erts_free(q->alct, (void *) q->v_start);
+ q->v_start = niov;
+ q->v_end = niov + nvsz;
+ q->v_tail = q->v_end;
+ q->v_head = q->v_tail - q_sz;
+
+ sys_memcpy(nbinv+nvsz-q_sz, q->b_head, q_sz*sizeof(ErtsIOQBinary*));
+ if (q->b_start != q->b_small)
+ erts_free(q->alct, (void *) q->b_start);
+ q->b_start = nbinv;
+ q->b_end = nbinv + nvsz;
+ q->b_tail = q->b_end;
+ q->b_head = q->b_tail - q_sz;
+ }
+ }
+ else if (tail) { /* move to beginning to make room in tail */
+ sys_memmove(q->v_start, q->v_head, q_sz*sizeof(SysIOVec));
+ q->v_head = q->v_start;
+ q->v_tail = q->v_head + q_sz;
+ sys_memmove(q->b_start, q->b_head, q_sz*sizeof(ErtsIOQBinary*));
+ q->b_head = q->b_start;
+ q->b_tail = q->b_head + q_sz;
+ }
+ else { /* move to end to make room */
+ sys_memmove(q->v_end-q_sz, q->v_head, q_sz*sizeof(SysIOVec));
+ q->v_tail = q->v_end;
+ q->v_head = q->v_tail-q_sz;
+ sys_memmove(q->b_end-q_sz, q->b_head, q_sz*sizeof(ErtsIOQBinary*));
+ q->b_tail = q->b_end;
+ q->b_head = q->b_tail-q_sz;
+ }
+
+ return 0;
+}
+
+static
+int skip(ErtsIOVec* vec, Uint skipbytes,
+ SysIOVec **iovp, ErtsIOQBinary ***binvp,
+ Uint *lenp)
+{
+ int n;
+ Uint len;
+ SysIOVec* iov;
+ ErtsIOQBinary** binv;
+
+ if (vec->common.size <= skipbytes)
+ return -1;
+
+ iov = vec->common.iov;
+ binv = vec->common.binv;
+ n = vec->common.vsize;
+ /* we use do here to strip iov_len=0 from beginning */
+ do {
+ len = iov->iov_len;
+ if (len <= skipbytes) {
+ skipbytes -= len;
+ iov++;
+ binv++;
+ n--;
+ }
+ else {
+ iov->iov_base = ((char *)(iov->iov_base)) + skipbytes;
+ iov->iov_len -= skipbytes;
+ skipbytes = 0;
+ }
+ } while(skipbytes > 0);
+
+ *binvp = binv;
+ *iovp = iov;
+ *lenp = len;
+
+ return n;
+}
+
+/* Put elements from vec at q tail */
+int erts_ioq_enqv(ErtsIOQueue *q, ErtsIOVec *eiov, Uint skipbytes)
+{
+ int n;
+ Uint len;
+ Uint size = eiov->common.size - skipbytes;
+ SysIOVec *iov;
+ ErtsIOQBinary** binv;
+ ErtsIOQBinary* b;
+
+ if (q == NULL)
+ return -1;
+
+ ASSERT(eiov->common.size >= skipbytes);
+ if (eiov->common.size <= skipbytes)
+ return 0;
+
+ n = skip(eiov, skipbytes, &iov, &binv, &len);
+
+ if (n < 0)
+ return n;
+
+ if (q->v_tail + n >= q->v_end)
+ if (expandq(q, n, 1))
+ return -1;
+
+ /* Queue and reference all binaries (remove zero length items) */
+ while(n--) {
+ if ((len = iov->iov_len) > 0) {
+ if ((b = *binv) == NULL) { /* special case create binary ! */
+ b = alloc_binary(len, iov->iov_base, (void**)&q->v_tail->iov_base,
+ q->driver);
+ if (!b) return -1;
+ *q->b_tail++ = b;
+ q->v_tail->iov_len = len;
+ q->v_tail++;
+ }
+ else {
+ if (q->driver)
+ driver_binary_inc_refc(&b->driver);
+ else
+ erts_refc_inc(&b->nif.intern.refc, 1);
+ *q->b_tail++ = b;
+ *q->v_tail++ = *iov;
+ }
+ }
+ iov++;
+ binv++;
+ }
+ q->size += size; /* update total size in queue */
+ return 0;
+}
+
+/* Put elements from vec at q head */
+int erts_ioq_pushqv(ErtsIOQueue *q, ErtsIOVec* vec, Uint skipbytes)
+{
+ int n;
+ Uint len;
+ Uint size = vec->common.size - skipbytes;
+ SysIOVec* iov;
+ ErtsIOQBinary** binv;
+ ErtsIOQBinary* b;
+
+ if (q == NULL)
+ return -1;
+
+ ASSERT(vec->common.size >= skipbytes);
+ if (vec->common.size <= skipbytes)
+ return 0;
+
+ n = skip(vec, skipbytes, &iov, &binv, &len);
+
+ if (n < 0)
+ return n;
+
+ if (q->v_head - n < q->v_start)
+ if (expandq(q, n, 0))
+ return -1;
+
+ /* Queue and reference all binaries (remove zero length items) */
+ iov += (n-1); /* move to end */
+ binv += (n-1); /* move to end */
+ while(n--) {
+ if ((len = iov->iov_len) > 0) {
+ if ((b = *binv) == NULL) { /* special case create binary ! */
+ if (q->driver) {
+ ErlDrvBinary *bin = driver_alloc_binary(len);
+ if (!bin) return -1;
+ sys_memcpy(bin->orig_bytes, iov->iov_base, len);
+ b = (ErtsIOQBinary *)bin;
+ q->v_head->iov_base = bin->orig_bytes;
+ }
+ *--q->b_head = b;
+ q->v_head--;
+ q->v_head->iov_len = len;
+ }
+ else {
+ if (q->driver)
+ driver_binary_inc_refc(&b->driver);
+ else
+ erts_refc_inc(&b->nif.intern.refc, 1);
+ *--q->b_head = b;
+ *--q->v_head = *iov;
+ }
+ }
+ iov--;
+ binv--;
+ }
+ q->size += size; /* update total size in queue */
+ return 0;
+}
+
+
+/*
+** Remove size bytes from queue head
+** Return number of bytes that remain in queue
+*/
+int erts_ioq_deq(ErtsIOQueue *q, Uint size)
+{
+ Uint len;
+
+ if ((q == NULL) || (q->size < size))
+ return -1;
+ q->size -= size;
+ while (size > 0) {
+ ASSERT(q->v_head != q->v_tail);
+
+ len = q->v_head->iov_len;
+ if (len <= size) {
+ size -= len;
+ free_binary(*q->b_head, q->driver);
+ *q->b_head++ = NULL;
+ q->v_head++;
+ }
+ else {
+ q->v_head->iov_base = ((char *)(q->v_head->iov_base)) + size;
+ q->v_head->iov_len -= size;
+ size = 0;
+ }
+ }
+
+ /* restart pointers (optimised for enq) */
+ if (q->v_head == q->v_tail) {
+ q->v_head = q->v_tail = q->v_start;
+ q->b_head = q->b_tail = q->b_start;
+ }
+ return 0;
+}
+
+
+Uint erts_ioq_peekqv(ErtsIOQueue *q, ErtsIOVec *ev) {
+ ASSERT(ev);
+
+ if (! q) {
+ return (Uint) -1;
+ } else {
+ if ((ev->common.vsize = q->v_tail - q->v_head) == 0) {
+ ev->common.size = 0;
+ ev->common.iov = NULL;
+ ev->common.binv = NULL;
+ } else {
+ ev->common.size = q->size;
+ ev->common.iov = q->v_head;
+ ev->common.binv = q->b_head;
+ }
+ return q->size;
+ }
+}
+
+SysIOVec* erts_ioq_peekq(ErtsIOQueue *q, int* vlenp) /* length of io-vector */
+{
+
+ if (q == NULL) {
+ *vlenp = -1;
+ return NULL;
+ }
+ if ((*vlenp = (q->v_tail - q->v_head)) == 0)
+ return NULL;
+ return q->v_head;
+}
+
+/* Fills a possibly deep list of chars and binaries into vec
+** Small characters are first stored in the buffer buf of length ln
+** binaries found are copied and linked into msoh
+** Return vector length on succsess,
+** -1 on overflow
+** -2 on type error
+*/
+
+static ERTS_INLINE void
+io_list_to_vec_set_vec(SysIOVec **iov, ErtsIOQBinary ***binv,
+ ErtsIOQBinary *bin, byte *ptr, Uint len,
+ int *vlen)
+{
+ while (len > MAX_SYSIOVEC_IOVLEN) {
+ (*iov)->iov_base = ptr;
+ (*iov)->iov_len = MAX_SYSIOVEC_IOVLEN;
+ ptr += MAX_SYSIOVEC_IOVLEN;
+ len -= MAX_SYSIOVEC_IOVLEN;
+ (*iov)++;
+ (*vlen)++;
+ *(*binv)++ = bin;
+ }
+ (*iov)->iov_base = ptr;
+ (*iov)->iov_len = len;
+ *(*binv)++ = bin;
+ (*iov)++;
+ (*vlen)++;
+}
+
+int
+erts_ioq_iolist_to_vec(Eterm obj, /* io-list */
+ SysIOVec* iov, /* io vector */
+ ErtsIOQBinary** binv, /* binary reference vector */
+ ErtsIOQBinary* cbin, /* binary to store characters */
+ Uint bin_limit, /* small binaries limit */
+ int driver)
+{
+ DECLARE_ESTACK(s);
+ Eterm* objp;
+ byte *buf = NULL;
+ Uint len = 0;
+ Uint csize = 0;
+ int vlen = 0;
+ byte* cptr;
+
+ if (cbin) {
+ if (driver) {
+ buf = (byte*)cbin->driver.orig_bytes;
+ len = cbin->driver.orig_size;
+ } else {
+ buf = (byte*)cbin->nif.orig_bytes;
+ len = cbin->nif.orig_size;
+ }
+ }
+ cptr = buf;
+
+ goto L_jump_start; /* avoid push */
+
+ while (!ESTACK_ISEMPTY(s)) {
+ obj = ESTACK_POP(s);
+ L_jump_start:
+ if (is_list(obj)) {
+ L_iter_list:
+ objp = list_val(obj);
+ obj = CAR(objp);
+ if (is_byte(obj)) {
+ if (len == 0)
+ goto L_overflow;
+ *buf++ = unsigned_val(obj);
+ csize++;
+ len--;
+ } else if (is_binary(obj)) {
+ ESTACK_PUSH(s, CDR(objp));
+ goto handle_binary;
+ } else if (is_list(obj)) {
+ ESTACK_PUSH(s, CDR(objp));
+ goto L_iter_list; /* on head */
+ } else if (!is_nil(obj)) {
+ goto L_type_error;
+ }
+ obj = CDR(objp);
+ if (is_list(obj))
+ goto L_iter_list; /* on tail */
+ else if (is_binary(obj)) {
+ goto handle_binary;
+ } else if (!is_nil(obj)) {
+ goto L_type_error;
+ }
+ } else if (is_binary(obj)) {
+ Eterm real_bin;
+ Uint offset;
+ Eterm* bptr;
+ Uint size;
+ int bitoffs;
+ int bitsize;
+
+ handle_binary:
+ size = binary_size(obj);
+ ERTS_GET_REAL_BIN(obj, real_bin, offset, bitoffs, bitsize);
+ ASSERT(bitsize == 0);
+ bptr = binary_val(real_bin);
+ if (*bptr == HEADER_PROC_BIN) {
+ ProcBin* pb = (ProcBin *) bptr;
+ if (bitoffs != 0) {
+ if (len < size) {
+ goto L_overflow;
+ }
+ erts_copy_bits(pb->bytes+offset, bitoffs, 1,
+ (byte *) buf, 0, 1, size*8);
+ csize += size;
+ buf += size;
+ len -= size;
+ } else if (bin_limit && size < bin_limit) {
+ if (len < size) {
+ goto L_overflow;
+ }
+ sys_memcpy(buf, pb->bytes+offset, size);
+ csize += size;
+ buf += size;
+ len -= size;
+ } else {
+ ErtsIOQBinary *qbin;
+ if (csize != 0) {
+ io_list_to_vec_set_vec(&iov, &binv, cbin,
+ cptr, csize, &vlen);
+ cptr = buf;
+ csize = 0;
+ }
+ if (pb->flags) {
+ erts_emasculate_writable_binary(pb);
+ }
+ if (driver)
+ qbin = (ErtsIOQBinary*)Binary2ErlDrvBinary(pb->val);
+ else
+ qbin = (ErtsIOQBinary*)pb->val;
+
+ io_list_to_vec_set_vec(
+ &iov, &binv, qbin,
+ pb->bytes+offset, size, &vlen);
+ }
+ } else {
+ ErlHeapBin* hb = (ErlHeapBin *) bptr;
+ if (len < size) {
+ goto L_overflow;
+ }
+ copy_binary_to_buffer(buf, 0,
+ ((byte *) hb->data)+offset, bitoffs,
+ 8*size);
+ csize += size;
+ buf += size;
+ len -= size;
+ }
+ } else if (!is_nil(obj)) {
+ goto L_type_error;
+ }
+ }
+
+ if (csize != 0) {
+ io_list_to_vec_set_vec(&iov, &binv, cbin, cptr, csize, &vlen);
+ }
+
+ DESTROY_ESTACK(s);
+ return vlen;
+
+ L_type_error:
+ DESTROY_ESTACK(s);
+ return -2;
+
+ L_overflow:
+ DESTROY_ESTACK(s);
+ return -1;
+}
+
+static ERTS_INLINE int
+io_list_vec_count(Eterm obj, Uint *v_size,
+ Uint *c_size, Uint *b_size, Uint *in_clist,
+ Uint *p_v_size, Uint *p_c_size, Uint *p_in_clist,
+ Uint blimit)
+{
+ Uint size = binary_size(obj);
+ Eterm real;
+ ERTS_DECLARE_DUMMY(Uint offset);
+ int bitoffs;
+ int bitsize;
+ ERTS_GET_REAL_BIN(obj, real, offset, bitoffs, bitsize);
+ if (bitsize != 0) return 1;
+ if (thing_subtag(*binary_val(real)) == REFC_BINARY_SUBTAG &&
+ bitoffs == 0) {
+ *b_size += size;
+ if (*b_size < size) return 2;
+ *in_clist = 0;
+ ++*v_size;
+ /* If iov_len is smaller then Uint we split the binary into*/
+ /* multiple smaller (2GB) elements in the iolist.*/
+ *v_size += size / MAX_SYSIOVEC_IOVLEN;
+ if (size >= blimit) {
+ *p_in_clist = 0;
+ ++*p_v_size;
+ } else {
+ *p_c_size += size;
+ if (!*p_in_clist) {
+ *p_in_clist = 1;
+ ++*p_v_size;
+ }
+ }
+ } else {
+ *c_size += size;
+ if (*c_size < size) return 2;
+ if (!*in_clist) {
+ *in_clist = 1;
+ ++*v_size;
+ }
+ *p_c_size += size;
+ if (!*p_in_clist) {
+ *p_in_clist = 1;
+ ++*p_v_size;
+ }
+ }
+ return 0;
+}
+
+#define IO_LIST_VEC_COUNT(obj) \
+ do { \
+ switch (io_list_vec_count(obj, &v_size, &c_size, \
+ &b_size, &in_clist, \
+ &p_v_size, &p_c_size, &p_in_clist, \
+ blimit)) { \
+ case 1: goto L_type_error; \
+ case 2: goto L_overflow_error; \
+ default: break; \
+ } \
+ } while(0)
+
+/*
+ * Returns 0 if successful and a non-zero value otherwise.
+ *
+ * Return values through pointers:
+ * *vsize - SysIOVec size needed for a writev
+ * *csize - Number of bytes not in binary (in the common binary)
+ * *pvsize - SysIOVec size needed if packing small binaries
+ * *pcsize - Number of bytes in the common binary if packing
+ * *total_size - Total size of iolist in bytes
+ */
+int
+erts_ioq_iolist_vec_len(Eterm obj, int* vsize, Uint* csize,
+ Uint* pvsize, Uint* pcsize,
+ Uint* total_size, Uint blimit)
+{
+ DECLARE_ESTACK(s);
+ Eterm* objp;
+ Uint v_size = 0;
+ Uint c_size = 0;
+ Uint b_size = 0;
+ Uint in_clist = 0;
+ Uint p_v_size = 0;
+ Uint p_c_size = 0;
+ Uint p_in_clist = 0;
+ Uint total;
+
+ goto L_jump_start; /* avoid a push */
+
+ while (!ESTACK_ISEMPTY(s)) {
+ obj = ESTACK_POP(s);
+ L_jump_start:
+ if (is_list(obj)) {
+ L_iter_list:
+ objp = list_val(obj);
+ obj = CAR(objp);
+
+ if (is_byte(obj)) {
+ c_size++;
+ if (c_size == 0) {
+ goto L_overflow_error;
+ }
+ if (!in_clist) {
+ in_clist = 1;
+ v_size++;
+ }
+ p_c_size++;
+ if (!p_in_clist) {
+ p_in_clist = 1;
+ p_v_size++;
+ }
+ }
+ else if (is_binary(obj)) {
+ IO_LIST_VEC_COUNT(obj);
+ }
+ else if (is_list(obj)) {
+ ESTACK_PUSH(s, CDR(objp));
+ goto L_iter_list; /* on head */
+ }
+ else if (!is_nil(obj)) {
+ goto L_type_error;
+ }
+
+ obj = CDR(objp);
+ if (is_list(obj))
+ goto L_iter_list; /* on tail */
+ else if (is_binary(obj)) { /* binary tail is OK */
+ IO_LIST_VEC_COUNT(obj);
+ }
+ else if (!is_nil(obj)) {
+ goto L_type_error;
+ }
+ }
+ else if (is_binary(obj)) {
+ IO_LIST_VEC_COUNT(obj);
+ }
+ else if (!is_nil(obj)) {
+ goto L_type_error;
+ }
+ }
+
+ total = c_size + b_size;
+ if (total < c_size) {
+ goto L_overflow_error;
+ }
+ *total_size = total;
+
+ DESTROY_ESTACK(s);
+ *vsize = v_size;
+ *csize = c_size;
+ *pvsize = p_v_size;
+ *pcsize = p_c_size;
+ return 0;
+
+ L_type_error:
+ L_overflow_error:
+ DESTROY_ESTACK(s);
+ return 1;
+}
diff --git a/erts/emulator/beam/erl_io_queue.h b/erts/emulator/beam/erl_io_queue.h
new file mode 100644
index 0000000000..51abe99510
--- /dev/null
+++ b/erts/emulator/beam/erl_io_queue.h
@@ -0,0 +1,201 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2017. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+/*
+ * Description: A queue used for storing binary data that should be
+ * passed to writev or similar functions. Used by both
+ * the nif and driver api.
+ *
+ * Author: Lukas Larsson
+ */
+
+#ifndef ERL_IO_QUEUE_H__TYPES__
+#define ERL_IO_QUEUE_H__TYPES__
+
+#define ERTS_BINARY_TYPES_ONLY__
+#include "erl_binary.h"
+#undef ERTS_BINARY_TYPES_ONLY__
+#include "erl_nif.h"
+
+#ifdef DEBUG
+#define MAX_SYSIOVEC_IOVLEN (1ull << (32 - 1))
+#else
+#define MAX_SYSIOVEC_IOVLEN (1ull << (sizeof(((SysIOVec*)0)->iov_len) * 8 - 1))
+#endif
+
+#define ERTS_SMALL_IO_QUEUE 5
+
+typedef union {
+ ErlDrvBinary driver;
+ Binary nif;
+} ErtsIOQBinary;
+
+typedef struct {
+ int vsize; /* length of vectors */
+ Uint size; /* total size in bytes */
+ SysIOVec* iov;
+ ErtsIOQBinary** binv;
+} ErtsIOVecCommon;
+
+typedef union {
+ ErtsIOVecCommon common;
+ ErlIOVec driver;
+ ErlNifIOVec nif;
+} ErtsIOVec;
+
+/* head/tail represent the data in the queue
+ * start/end represent the edges of the allocated queue
+ * small is used when the number of iovec elements is < SMALL_IO_QUEUE
+ */
+typedef struct erts_io_queue {
+ ErtsAlcType_t alct;
+ int driver;
+ Uint size; /* total size in bytes */
+
+ SysIOVec* v_start;
+ SysIOVec* v_end;
+ SysIOVec* v_head;
+ SysIOVec* v_tail;
+ SysIOVec v_small[ERTS_SMALL_IO_QUEUE];
+
+ ErtsIOQBinary **b_start;
+ ErtsIOQBinary **b_end;
+ ErtsIOQBinary **b_head;
+ ErtsIOQBinary **b_tail;
+ ErtsIOQBinary *b_small[ERTS_SMALL_IO_QUEUE];
+
+} ErtsIOQueue;
+
+#endif /* ERL_IO_QUEUE_H__TYPES__ */
+
+#if !defined(ERL_IO_QUEUE_H) && !defined(ERTS_IO_QUEUE_TYPES_ONLY__)
+#define ERL_IO_QUEUE_H
+
+#include "erl_binary.h"
+#include "erl_bits.h"
+
+void erts_ioq_init(ErtsIOQueue *q, ErtsAlcType_t alct, int driver);
+void erts_ioq_clear(ErtsIOQueue *q);
+Uint erts_ioq_size(ErtsIOQueue *q);
+int erts_ioq_enqv(ErtsIOQueue *q, ErtsIOVec *vec, Uint skip);
+int erts_ioq_pushqv(ErtsIOQueue *q, ErtsIOVec *vec, Uint skip);
+int erts_ioq_deq(ErtsIOQueue *q, Uint Uint);
+Uint erts_ioq_peekqv(ErtsIOQueue *q, ErtsIOVec *ev);
+SysIOVec *erts_ioq_peekq(ErtsIOQueue *q, int *vlenp);
+Uint erts_ioq_sizeq(ErtsIOQueue *q);
+
+int erts_ioq_iolist_vec_len(Eterm obj, int* vsize, Uint* csize,
+ Uint* pvsize, Uint* pcsize,
+ Uint* total_size, Uint blimit);
+int erts_ioq_iolist_to_vec(Eterm obj, SysIOVec* iov,
+ ErtsIOQBinary** binv, ErtsIOQBinary* cbin,
+ Uint bin_limit, int driver_binary);
+
+ERTS_GLB_INLINE
+int erts_ioq_iodata_vec_len(Eterm obj, int* vsize, Uint* csize,
+ Uint* pvsize, Uint* pcsize,
+ Uint* total_size, Uint blimit);
+ERTS_GLB_INLINE
+int erts_ioq_iodata_to_vec(Eterm obj, SysIOVec* iov,
+ ErtsIOQBinary** binv, ErtsIOQBinary* cbin,
+ Uint bin_limit, int driver_binary);
+
+
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_INLINE
+int erts_ioq_iodata_vec_len(Eterm obj, int* vsize, Uint* csize,
+ Uint* pvsize, Uint* pcsize,
+ Uint* total_size, Uint blimit) {
+ if (is_binary(obj)) {
+ /* We optimize for when we get a procbin without a bit-offset
+ * that fits in one iov slot
+ */
+ Eterm real_bin;
+ byte bitoffs;
+ byte bitsize;
+ ERTS_DECLARE_DUMMY(Uint offset);
+ Uint size = binary_size(obj);
+ ERTS_GET_REAL_BIN(obj, real_bin, offset, bitoffs, bitsize);
+ if (size < MAX_SYSIOVEC_IOVLEN && bitoffs == 0 && bitsize == 0) {
+ *vsize = 1;
+ *pvsize = 1;
+ if (thing_subtag(*binary_val(real_bin)) == REFC_BINARY_SUBTAG) {
+ *csize = 0;
+ *pcsize = 0;
+ } else {
+ *csize = size;
+ *pcsize = size;
+ }
+ *total_size = size;
+ return 0;
+ }
+ }
+
+ return erts_ioq_iolist_vec_len(obj, vsize, csize,
+ pvsize, pcsize, total_size, blimit);
+}
+
+ERTS_GLB_INLINE
+int erts_ioq_iodata_to_vec(Eterm obj,
+ SysIOVec *iov,
+ ErtsIOQBinary **binv,
+ ErtsIOQBinary *cbin,
+ Uint bin_limit,
+ int driver)
+{
+ if (is_binary(obj)) {
+ Eterm real_bin;
+ byte bitoffs;
+ byte bitsize;
+ Uint offset;
+ Uint size = binary_size(obj);
+ ERTS_GET_REAL_BIN(obj, real_bin, offset, bitoffs, bitsize);
+ if (size < MAX_SYSIOVEC_IOVLEN && bitoffs == 0 && bitsize == 0) {
+ Eterm *bptr = binary_val(real_bin);
+ if (thing_subtag(*bptr) == REFC_BINARY_SUBTAG) {
+ ProcBin *pb = (ProcBin *)bptr;
+ if (pb->flags)
+ erts_emasculate_writable_binary(pb);
+ iov[0].iov_base = pb->bytes+offset;
+ iov[0].iov_len = size;
+ if (driver)
+ binv[0] = (ErtsIOQBinary*)Binary2ErlDrvBinary(pb->val);
+ else
+ binv[0] = (ErtsIOQBinary*)pb->val;
+ return 1;
+ } else {
+ ErlHeapBin* hb = (ErlHeapBin *)bptr;
+ byte *buf = driver ? (byte*)cbin->driver.orig_bytes :
+ (byte*)cbin->nif.orig_bytes;
+ copy_binary_to_buffer(buf, 0, ((byte *) hb->data)+offset, 0, 8*size);
+ iov[0].iov_base = buf;
+ iov[0].iov_len = size;
+ binv[0] = cbin;
+ return 1;
+ }
+ }
+ }
+ return erts_ioq_iolist_to_vec(obj, iov, binv, cbin, bin_limit, driver);
+}
+
+#endif
+
+#endif /* ERL_IO_QUEUE_H */
diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c
index d3c5af3a83..0ebb2b6db0 100644
--- a/erts/emulator/beam/erl_nif.c
+++ b/erts/emulator/beam/erl_nif.c
@@ -56,6 +56,7 @@
#include "erl_process.h"
#include "erl_bif_unique.h"
#include "erl_utils.h"
+#include "erl_io_queue.h"
#undef ERTS_WANT_NFUNC_SCHED_INTERNALS__
#define ERTS_WANT_NFUNC_SCHED_INTERNALS__
#include "erl_nfunc_sched.h"
@@ -66,6 +67,13 @@
#include <limits.h>
#include <stddef.h> /* offsetof */
+#ifndef MAX
+#define MAX(X, Y) ((X) > (Y) ? (X) : (Y))
+#endif
+
+#ifndef MIN
+#define MIN(X, Y) ((X) < (Y) ? (X) : (Y))
+#endif
/* Information about a loaded nif library.
* Each successful call to erlang:load_nif will allocate an instance of
@@ -3344,6 +3352,363 @@ int enif_compare_monitors(const ErlNifMonitor *monitor1,
ERTS_REF_THING_SIZE*sizeof(Eterm));
}
+ErlNifIOQueue *enif_ioq_create(ErlNifIOQueueOpts opts)
+{
+ ErlNifIOQueue *q;
+
+ if (opts != ERL_NIF_IOQ_NORMAL)
+ return NULL;
+
+ q = enif_alloc(sizeof(ErlNifIOQueue));
+ if (!q) return NULL;
+ erts_ioq_init(q, ERTS_ALC_T_NIF, 0);
+
+ return q;
+}
+
+void enif_ioq_destroy(ErlNifIOQueue *q)
+{
+ erts_ioq_clear(q);
+ enif_free(q);
+}
+
+/* If the iovec was preallocated (Stack or otherwise) it needs to be marked as
+ * such to perform a proper free. */
+#define ERL_NIF_IOVEC_FLAGS_PREALLOC (1 << 0)
+
+void enif_free_iovec(ErlNifIOVec *iov)
+{
+ int i;
+ /* Decrement the refc of all the binaries */
+ for (i = 0; i < iov->iovcnt; i++) {
+ Binary *bptr = ((Binary**)iov->ref_bins)[i];
+ /* bptr can be null if enq_binary was used */
+ if (bptr && erts_refc_dectest(&bptr->intern.refc, 0) == 0) {
+ erts_bin_free(bptr);
+ }
+ }
+
+ if (!(iov->flags & ERL_NIF_IOVEC_FLAGS_PREALLOC)) {
+ enif_free(iov);
+ }
+}
+
+typedef struct {
+ UWord sublist_length;
+ Eterm sublist_start;
+ Eterm sublist_end;
+
+ UWord offheap_size;
+ UWord onheap_size;
+
+ UWord iovec_len;
+} iovec_slice_t;
+
+static int examine_iovec_term(Eterm list, UWord max_length, iovec_slice_t *result) {
+ Eterm lookahead;
+
+ result->sublist_start = list;
+ result->sublist_length = 0;
+ result->offheap_size = 0;
+ result->onheap_size = 0;
+ result->iovec_len = 0;
+
+ lookahead = result->sublist_start;
+
+ while (is_list(lookahead)) {
+ Eterm *binary_header, binary;
+ Eterm *cell;
+ UWord size;
+
+ cell = list_val(lookahead);
+ binary = CAR(cell);
+
+ if (!is_binary(binary)) {
+ return 0;
+ }
+
+ size = binary_size(binary);
+ binary_header = binary_val(binary);
+
+ /* If we're a sub-binary we'll need to check our underlying binary to
+ * determine whether we're on-heap or not. */
+ if(thing_subtag(*binary_header) == SUB_BINARY_SUBTAG) {
+ ErlSubBin *sb = (ErlSubBin*)binary_header;
+
+ /* Reject bitstrings */
+ if((sb->bitoffs + sb->bitsize) > 0) {
+ return 0;
+ }
+
+ ASSERT(size <= binary_size(sb->orig));
+ binary_header = binary_val(sb->orig);
+ }
+
+ if(thing_subtag(*binary_header) == HEAP_BINARY_SUBTAG) {
+ ASSERT(size <= ERL_ONHEAP_BIN_LIMIT);
+
+ result->iovec_len += 1;
+ result->onheap_size += size;
+ } else {
+ ASSERT(thing_subtag(*binary_header) == REFC_BINARY_SUBTAG);
+
+ result->iovec_len += 1 + size / MAX_SYSIOVEC_IOVLEN;
+ result->offheap_size += size;
+ }
+
+ result->sublist_length += 1;
+ lookahead = CDR(cell);
+
+ if(result->sublist_length >= max_length) {
+ break;
+ }
+ }
+
+ if (!is_nil(lookahead) && !is_list(lookahead)) {
+ return 0;
+ }
+
+ result->sublist_end = lookahead;
+
+ return 1;
+}
+
+static void inspect_raw_binary_data(Eterm binary, ErlNifBinary *result) {
+ Eterm *parent_header;
+ Eterm parent_binary;
+
+ int bit_offset, bit_size;
+ Uint byte_offset;
+
+ ASSERT(is_binary(binary));
+
+ ERTS_GET_REAL_BIN(binary, parent_binary, byte_offset, bit_offset, bit_size);
+
+ parent_header = binary_val(parent_binary);
+
+ result->size = binary_size(binary);
+ result->bin_term = binary;
+
+ if (thing_subtag(*parent_header) == REFC_BINARY_SUBTAG) {
+ ProcBin *pb = (ProcBin*)parent_header;
+
+ ASSERT(pb->val != NULL);
+ ASSERT(byte_offset < pb->size);
+ ASSERT(&pb->bytes[byte_offset] >= (byte*)(pb->val)->orig_bytes);
+
+ result->data = (unsigned char*)&pb->bytes[byte_offset];
+ result->ref_bin = (void*)pb->val;
+ } else {
+ ErlHeapBin *hb = (ErlHeapBin*)parent_header;
+
+ ASSERT(thing_subtag(*parent_header) == HEAP_BINARY_SUBTAG);
+
+ result->data = &((unsigned char*)&hb->data)[byte_offset];
+ result->ref_bin = NULL;
+ }
+}
+
+static int fill_iovec_with_slice(ErlNifEnv *env,
+ iovec_slice_t *slice,
+ ErlNifIOVec *iovec) {
+ UWord onheap_offset, iovec_idx;
+ ErlNifBinary onheap_data;
+ Eterm sublist_iterator;
+
+ /* Set up a common refc binary for all on-heap binaries. */
+ if (slice->onheap_size > 0) {
+ if (!enif_alloc_binary(slice->onheap_size, &onheap_data)) {
+ return 0;
+ }
+ }
+
+ sublist_iterator = slice->sublist_start;
+ onheap_offset = 0;
+ iovec_idx = 0;
+
+ while (sublist_iterator != slice->sublist_end) {
+ ErlNifBinary raw_data;
+ Eterm *cell;
+
+ cell = list_val(sublist_iterator);
+ inspect_raw_binary_data(CAR(cell), &raw_data);
+
+ /* If this isn't a refc binary, copy its contents to the onheap buffer
+ * and reference that instead. */
+ if (raw_data.ref_bin == NULL) {
+ ASSERT(onheap_offset < onheap_data.size);
+ ASSERT(slice->onheap_size > 0);
+
+ sys_memcpy(&onheap_data.data[onheap_offset],
+ raw_data.data, raw_data.size);
+
+ raw_data.data = &onheap_data.data[onheap_offset];
+ raw_data.ref_bin = onheap_data.ref_bin;
+ }
+
+ ASSERT(raw_data.ref_bin != NULL);
+
+ while (raw_data.size > 0) {
+ UWord chunk_len = MIN(raw_data.size, MAX_SYSIOVEC_IOVLEN);
+
+ ASSERT(iovec_idx < iovec->iovcnt);
+
+ iovec->iov[iovec_idx].iov_base = raw_data.data;
+ iovec->iov[iovec_idx].iov_len = chunk_len;
+
+ iovec->ref_bins[iovec_idx] = raw_data.ref_bin;
+
+ raw_data.data += chunk_len;
+ raw_data.size -= chunk_len;
+
+ iovec_idx += 1;
+ }
+
+ sublist_iterator = CDR(cell);
+ }
+
+ ASSERT(iovec_idx == iovec->iovcnt);
+
+ if (env == NULL) {
+ int i;
+ for (i = 0; i < iovec->iovcnt; i++) {
+ Binary *refc_binary = (Binary*)(iovec->ref_bins[i]);
+ erts_refc_inc(&refc_binary->intern.refc, 1);
+ }
+
+ if (slice->onheap_size > 0) {
+ /* Transfer ownership to the iovec; we've taken references to it in
+ * the above loop. */
+ enif_release_binary(&onheap_data);
+ }
+ } else {
+ if (slice->onheap_size > 0) {
+ /* Attach the binary to our environment and let the GC take care of
+ * it after returning. */
+ enif_make_binary(env, &onheap_data);
+ }
+ }
+
+ return 1;
+}
+
+static int create_iovec_from_slice(ErlNifEnv *env,
+ iovec_slice_t *slice,
+ ErlNifIOVec **result) {
+ ErlNifIOVec *iovec = *result;
+
+ if (iovec && slice->iovec_len < ERL_NIF_IOVEC_SIZE) {
+ iovec->iov = iovec->small_iov;
+ iovec->ref_bins = iovec->small_ref_bin;
+ iovec->flags = ERL_NIF_IOVEC_FLAGS_PREALLOC;
+ } else {
+ UWord iov_offset, binv_offset, alloc_size;
+ char *alloc_base;
+
+ iov_offset = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErlNifIOVec));
+ binv_offset = iov_offset;
+ binv_offset += ERTS_ALC_DATA_ALIGN_SIZE(slice->iovec_len * sizeof(SysIOVec));
+ alloc_size = binv_offset;
+ alloc_size += slice->iovec_len * sizeof(Binary*);
+
+ /* If we have an environment we'll attach the allocated data to it. The
+ * GC will take care of releasing it later on. */
+ if (env != NULL) {
+ ErlNifBinary gc_bin;
+
+ if (!enif_alloc_binary(alloc_size, &gc_bin)) {
+ return 0;
+ }
+
+ alloc_base = (char*)gc_bin.data;
+ enif_make_binary(env, &gc_bin);
+ } else {
+ alloc_base = enif_alloc(alloc_size);
+ }
+
+ iovec = (ErlNifIOVec*)alloc_base;
+ iovec->iov = (SysIOVec*)(alloc_base + iov_offset);
+ iovec->ref_bins = (void**)(alloc_base + binv_offset);
+ iovec->flags = 0;
+ }
+
+ iovec->size = slice->offheap_size + slice->onheap_size;
+ iovec->iovcnt = slice->iovec_len;
+
+ if(!fill_iovec_with_slice(env, slice, iovec)) {
+ if (env == NULL && !(iovec->flags & ERL_NIF_IOVEC_FLAGS_PREALLOC)) {
+ enif_free(iovec);
+ }
+
+ return 0;
+ }
+
+ *result = iovec;
+
+ return 1;
+}
+
+int enif_inspect_iovec(ErlNifEnv *env, size_t max_elements,
+ ERL_NIF_TERM list, ERL_NIF_TERM *tail,
+ ErlNifIOVec **iov) {
+ iovec_slice_t slice;
+
+ if(!examine_iovec_term(list, max_elements, &slice)) {
+ return 0;
+ } else if(!create_iovec_from_slice(env, &slice, iov)) {
+ return 0;
+ }
+
+ (*tail) = slice.sublist_end;
+
+ return 1;
+}
+
+/* */
+int enif_ioq_enqv(ErlNifIOQueue *q, ErlNifIOVec *iov, size_t skip)
+{
+ if(skip <= iov->size) {
+ return !erts_ioq_enqv(q, (ErtsIOVec*)iov, skip);
+ }
+
+ return 0;
+}
+
+int enif_ioq_enq_binary(ErlNifIOQueue *q, ErlNifBinary *bin, size_t skip)
+{
+ ErlNifIOVec vec = {1, bin->size, NULL, NULL, ERL_NIF_IOVEC_FLAGS_PREALLOC };
+ Binary *ref_bin = (Binary*)bin->ref_bin;
+ int res;
+ vec.iov = vec.small_iov;
+ vec.ref_bins = vec.small_ref_bin;
+ vec.iov[0].iov_base = bin->data;
+ vec.iov[0].iov_len = bin->size;
+ ((Binary**)(vec.ref_bins))[0] = ref_bin;
+
+ res = enif_ioq_enqv(q, &vec, skip);
+ enif_release_binary(bin);
+ return res;
+}
+
+size_t enif_ioq_size(ErlNifIOQueue *q)
+{
+ return erts_ioq_size(q);
+}
+
+int enif_ioq_deq(ErlNifIOQueue *q, size_t elems, size_t *size)
+{
+ if (erts_ioq_deq(q, elems) == -1)
+ return 0;
+ if (size)
+ *size = erts_ioq_size(q);
+ return 1;
+}
+
+SysIOVec *enif_ioq_peek(ErlNifIOQueue *q, int *iovlen)
+{
+ return erts_ioq_peekq(q, iovlen);
+}
+
/***************************************************************************
** load_nif/2 **
***************************************************************************/
diff --git a/erts/emulator/beam/erl_nif.h b/erts/emulator/beam/erl_nif.h
index b0d5c39798..d195721054 100644
--- a/erts/emulator/beam/erl_nif.h
+++ b/erts/emulator/beam/erl_nif.h
@@ -50,6 +50,7 @@
** 2.9: 18.2 enif_getenv
** 2.10: Time API
** 2.11: 19.0 enif_snprintf
+** 2.12: 20.0 add enif_queue
*/
#define ERL_NIF_MAJOR_VERSION 2
#define ERL_NIF_MINOR_VERSION 12
@@ -241,6 +242,28 @@ typedef enum {
ERL_NIF_PHASH2 = 2
} ErlNifHash;
+#define ERL_NIF_IOVEC_SIZE 16
+
+typedef struct erl_nif_io_vec {
+ int iovcnt; /* length of vectors */
+ size_t size; /* total size in bytes */
+ SysIOVec *iov;
+
+ /* internals (avert your eyes) */
+ void **ref_bins; /* Binary[] */
+ int flags;
+
+ /* Used when stack allocating the io vec */
+ SysIOVec small_iov[ERL_NIF_IOVEC_SIZE];
+ void *small_ref_bin[ERL_NIF_IOVEC_SIZE];
+} ErlNifIOVec;
+
+typedef struct erts_io_queue ErlNifIOQueue;
+
+typedef enum {
+ ERL_NIF_IOQ_NORMAL = 1
+} ErlNifIOQueueOpts;
+
/*
* Return values from enif_thread_type(). Negative values
* reserved for specific types of non-scheduler threads.
diff --git a/erts/emulator/beam/erl_nif_api_funcs.h b/erts/emulator/beam/erl_nif_api_funcs.h
index 94c04cd126..9e573307d8 100644
--- a/erts/emulator/beam/erl_nif_api_funcs.h
+++ b/erts/emulator/beam/erl_nif_api_funcs.h
@@ -184,6 +184,21 @@ ERL_NIF_API_FUNC_DECL(ErlNifUInt64,enif_hash,(ErlNifHash type, ERL_NIF_TERM term
ERL_NIF_API_FUNC_DECL(int, enif_whereis_pid, (ErlNifEnv *env, ERL_NIF_TERM name, ErlNifPid *pid));
ERL_NIF_API_FUNC_DECL(int, enif_whereis_port, (ErlNifEnv *env, ERL_NIF_TERM name, ErlNifPort *port));
+ERL_NIF_API_FUNC_DECL(ErlNifIOQueue *,enif_ioq_create,(ErlNifIOQueueOpts opts));
+ERL_NIF_API_FUNC_DECL(void,enif_ioq_destroy,(ErlNifIOQueue *q));
+
+ERL_NIF_API_FUNC_DECL(int,enif_ioq_enq_binary,(ErlNifIOQueue *q, ErlNifBinary *bin, size_t skip));
+ERL_NIF_API_FUNC_DECL(int,enif_ioq_enqv,(ErlNifIOQueue *q, ErlNifIOVec *iov, size_t skip));
+
+ERL_NIF_API_FUNC_DECL(size_t,enif_ioq_size,(ErlNifIOQueue *q));
+ERL_NIF_API_FUNC_DECL(int,enif_ioq_deq,(ErlNifIOQueue *q, size_t count, size_t *size));
+
+ERL_NIF_API_FUNC_DECL(SysIOVec*,enif_ioq_peek,(ErlNifIOQueue *q, int *iovlen));
+
+ERL_NIF_API_FUNC_DECL(int,enif_inspect_iovec,(ErlNifEnv *env, size_t max_length, ERL_NIF_TERM iovec_term, ERL_NIF_TERM *tail, ErlNifIOVec **iovec));
+ERL_NIF_API_FUNC_DECL(void,enif_free_iovec,(ErlNifIOVec *iov));
+
+
/*
** ADD NEW ENTRIES HERE (before this comment) !!!
*/
@@ -348,6 +363,16 @@ ERL_NIF_API_FUNC_DECL(int, enif_whereis_port, (ErlNifEnv *env, ERL_NIF_TERM name
# define enif_hash ERL_NIF_API_FUNC_MACRO(enif_hash)
# define enif_whereis_pid ERL_NIF_API_FUNC_MACRO(enif_whereis_pid)
# define enif_whereis_port ERL_NIF_API_FUNC_MACRO(enif_whereis_port)
+# define enif_ioq_create ERL_NIF_API_FUNC_MACRO(enif_ioq_create)
+# define enif_ioq_destroy ERL_NIF_API_FUNC_MACRO(enif_ioq_destroy)
+# define enif_ioq_enq ERL_NIF_API_FUNC_MACRO(enif_ioq_enq)
+# define enif_ioq_enq_binary ERL_NIF_API_FUNC_MACRO(enif_ioq_enq_binary)
+# define enif_ioq_enqv ERL_NIF_API_FUNC_MACRO(enif_ioq_enqv)
+# define enif_ioq_size ERL_NIF_API_FUNC_MACRO(enif_ioq_size)
+# define enif_ioq_deq ERL_NIF_API_FUNC_MACRO(enif_ioq_deq)
+# define enif_ioq_peek ERL_NIF_API_FUNC_MACRO(enif_ioq_peek)
+# define enif_inspect_iovec ERL_NIF_API_FUNC_MACRO(enif_inspect_iovec)
+# define enif_free_iovec ERL_NIF_API_FUNC_MACRO(enif_free_iovec)
/*
** ADD NEW ENTRIES HERE (before this comment)
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h
index 6a3213ec52..b64de624dd 100644
--- a/erts/emulator/beam/erl_port.h
+++ b/erts/emulator/beam/erl_port.h
@@ -31,6 +31,9 @@ typedef struct ErtsProc2PortSigData_ ErtsProc2PortSigData;
#include "erl_ptab.h"
#include "erl_thr_progress.h"
#include "erl_trace.h"
+#define ERTS_IO_QUEUE_TYPES_ONLY__
+#include "erl_io_queue.h"
+#undef ERTS_IO_QUEUE_TYPES_ONLY__
#ifndef __WIN32__
#define ERTS_DEFAULT_MAX_PORTS (1 << 16)
@@ -75,23 +78,8 @@ typedef struct erts_driver_t_ erts_driver_t;
#define ERTS_Port2ErlDrvPort(PH) ((ErlDrvPort) (PH))
#endif
-#define SMALL_IO_QUEUE 5 /* Number of fixed elements */
+typedef ErtsIOQueue ErlPortIOQueue;
-typedef struct {
- ErlDrvSizeT size; /* total size in bytes */
-
- SysIOVec* v_start;
- SysIOVec* v_end;
- SysIOVec* v_head;
- SysIOVec* v_tail;
- SysIOVec v_small[SMALL_IO_QUEUE];
-
- ErlDrvBinary** b_start;
- ErlDrvBinary** b_end;
- ErlDrvBinary** b_head;
- ErlDrvBinary** b_tail;
- ErlDrvBinary* b_small[SMALL_IO_QUEUE];
-} ErlIOQueue;
typedef struct line_buf { /* Buffer used in line oriented I/O */
ErlDrvSizeT bufsiz; /* Size of character buffer */
@@ -172,7 +160,7 @@ struct _erl_drv_port {
Uint bytes_in; /* Number of bytes read */
Uint bytes_out; /* Number of bytes written */
- ErlIOQueue ioq; /* driver accessible i/o queue */
+ ErlPortIOQueue ioq; /* driver accessible i/o queue */
DistEntry *dist_entry; /* Dist entry used in DISTRIBUTION */
char *name; /* String used in the open */
erts_driver_t* drv_ptr;
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index b609f6de39..3a5ddde5f4 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -52,6 +52,7 @@
#include "erl_bif_unique.h"
#include "erl_hl_timer.h"
#include "erl_time.h"
+#include "erl_io_queue.h"
extern ErlDrvEntry fd_driver_entry;
extern ErlDrvEntry vanilla_driver_entry;
@@ -108,7 +109,7 @@ static void driver_monitor_unlock_pdl(Port *p);
#define ERL_SMALL_IO_BIN_LIMIT (4*ERL_ONHEAP_BIN_LIMIT)
#define SMALL_WRITE_VEC 16
-static ERTS_INLINE ErlIOQueue*
+static ERTS_INLINE ErlPortIOQueue*
drvport2ioq(ErlDrvPort drvport)
{
Port *prt = erts_thr_drvport2port(drvport, 0);
@@ -123,11 +124,11 @@ is_port_ioq_empty(Port *pp)
int res;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
if (!pp->port_data_lock)
- res = (pp->ioq.size == 0);
+ res = (erts_ioq_size(&pp->ioq) == 0);
else {
ErlDrvPDL pdl = pp->port_data_lock;
erts_mtx_lock(&pdl->mtx);
- res = (pp->ioq.size == 0);
+ res = (erts_ioq_size(&pp->ioq) == 0);
erts_mtx_unlock(&pdl->mtx);
}
return res;
@@ -142,14 +143,14 @@ erts_is_port_ioq_empty(Port *pp)
Uint
erts_port_ioq_size(Port *pp)
{
- int res;
+ ErlDrvSizeT res;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
if (!pp->port_data_lock)
- res = pp->ioq.size;
+ res = erts_ioq_size(&pp->ioq);
else {
ErlDrvPDL pdl = pp->port_data_lock;
erts_mtx_lock(&pdl->mtx);
- res = pp->ioq.size;
+ res = erts_ioq_size(&pp->ioq);
erts_mtx_unlock(&pdl->mtx);
}
return (Uint) res;
@@ -508,41 +509,17 @@ erts_port_free(Port *prt)
*/
static void initq(Port* prt)
{
- ErlIOQueue* q = &prt->ioq;
-
ERTS_LC_ASSERT(!prt->port_data_lock);
-
- q->size = 0;
- q->v_head = q->v_tail = q->v_start = q->v_small;
- q->v_end = q->v_small + SMALL_IO_QUEUE;
- q->b_head = q->b_tail = q->b_start = q->b_small;
- q->b_end = q->b_small + SMALL_IO_QUEUE;
+ erts_ioq_init(&prt->ioq, ERTS_ALC_T_IOQ, 1);
}
static void stopq(Port* prt)
{
- ErlIOQueue* q;
- ErlDrvBinary** binp;
if (prt->port_data_lock)
driver_pdl_lock(prt->port_data_lock);
- q = &prt->ioq;
- binp = q->b_head;
-
- if (q->v_start != q->v_small)
- erts_free(ERTS_ALC_T_IOQ, (void *) q->v_start);
-
- while(binp < q->b_tail) {
- if (*binp != NULL)
- driver_free_binary(*binp);
- binp++;
- }
- if (q->b_start != q->b_small)
- erts_free(ERTS_ALC_T_IOQ, (void *) q->b_start);
- q->v_start = q->v_end = q->v_head = q->v_tail = NULL;
- q->b_start = q->b_end = q->b_head = q->b_tail = NULL;
- q->size = 0;
+ erts_ioq_clear(&prt->ioq);
if (prt->port_data_lock) {
driver_pdl_unlock(prt->port_data_lock);
@@ -923,311 +900,6 @@ int erts_port_handle_xports(Port *prt)
}
#endif
-/* Fills a possibly deep list of chars and binaries into vec
-** Small characters are first stored in the buffer buf of length ln
-** binaries found are copied and linked into msoh
-** Return vector length on succsess,
-** -1 on overflow
-** -2 on type error
-*/
-
-#ifdef DEBUG
-#define MAX_SYSIOVEC_IOVLEN (1ull << (32 - 1))
-#else
-#define MAX_SYSIOVEC_IOVLEN (1ull << (sizeof(((SysIOVec*)0)->iov_len) * 8 - 1))
-#endif
-
-static ERTS_INLINE void
-io_list_to_vec_set_vec(SysIOVec **iov, ErlDrvBinary ***binv,
- ErlDrvBinary *bin, byte *ptr, Uint len,
- int *vlen)
-{
- while (len > MAX_SYSIOVEC_IOVLEN) {
- (*iov)->iov_base = ptr;
- (*iov)->iov_len = MAX_SYSIOVEC_IOVLEN;
- ptr += MAX_SYSIOVEC_IOVLEN;
- len -= MAX_SYSIOVEC_IOVLEN;
- (*iov)++;
- (*vlen)++;
- *(*binv)++ = bin;
- }
- (*iov)->iov_base = ptr;
- (*iov)->iov_len = len;
- *(*binv)++ = bin;
- (*iov)++;
- (*vlen)++;
-}
-
-static int
-io_list_to_vec(Eterm obj, /* io-list */
- SysIOVec* iov, /* io vector */
- ErlDrvBinary** binv, /* binary reference vector */
- ErlDrvBinary* cbin, /* binary to store characters */
- ErlDrvSizeT bin_limit) /* small binaries limit */
-{
- DECLARE_ESTACK(s);
- Eterm* objp;
- byte *buf = (byte*)cbin->orig_bytes;
- Uint len = cbin->orig_size;
- Uint csize = 0;
- int vlen = 0;
- byte* cptr = buf;
-
- goto L_jump_start; /* avoid push */
-
- while (!ESTACK_ISEMPTY(s)) {
- obj = ESTACK_POP(s);
- L_jump_start:
- if (is_list(obj)) {
- L_iter_list:
- objp = list_val(obj);
- obj = CAR(objp);
- if (is_byte(obj)) {
- if (len == 0)
- goto L_overflow;
- *buf++ = unsigned_val(obj);
- csize++;
- len--;
- } else if (is_binary(obj)) {
- ESTACK_PUSH(s, CDR(objp));
- goto handle_binary;
- } else if (is_list(obj)) {
- ESTACK_PUSH(s, CDR(objp));
- goto L_iter_list; /* on head */
- } else if (!is_nil(obj)) {
- goto L_type_error;
- }
- obj = CDR(objp);
- if (is_list(obj))
- goto L_iter_list; /* on tail */
- else if (is_binary(obj)) {
- goto handle_binary;
- } else if (!is_nil(obj)) {
- goto L_type_error;
- }
- } else if (is_binary(obj)) {
- Eterm real_bin;
- Uint offset;
- Eterm* bptr;
- ErlDrvSizeT size;
- int bitoffs;
- int bitsize;
-
- handle_binary:
- size = binary_size(obj);
- ERTS_GET_REAL_BIN(obj, real_bin, offset, bitoffs, bitsize);
- ASSERT(bitsize == 0);
- bptr = binary_val(real_bin);
- if (*bptr == HEADER_PROC_BIN) {
- ProcBin* pb = (ProcBin *) bptr;
- if (bitoffs != 0) {
- if (len < size) {
- goto L_overflow;
- }
- erts_copy_bits(pb->bytes+offset, bitoffs, 1,
- (byte *) buf, 0, 1, size*8);
- csize += size;
- buf += size;
- len -= size;
- } else if (bin_limit && size < bin_limit) {
- if (len < size) {
- goto L_overflow;
- }
- sys_memcpy(buf, pb->bytes+offset, size);
- csize += size;
- buf += size;
- len -= size;
- } else {
- if (csize != 0) {
- io_list_to_vec_set_vec(&iov, &binv, cbin,
- cptr, csize, &vlen);
- cptr = buf;
- csize = 0;
- }
- if (pb->flags) {
- erts_emasculate_writable_binary(pb);
- }
- io_list_to_vec_set_vec(
- &iov, &binv, Binary2ErlDrvBinary(pb->val),
- pb->bytes+offset, size, &vlen);
- }
- } else {
- ErlHeapBin* hb = (ErlHeapBin *) bptr;
- if (len < size) {
- goto L_overflow;
- }
- copy_binary_to_buffer(buf, 0,
- ((byte *) hb->data)+offset, bitoffs,
- 8*size);
- csize += size;
- buf += size;
- len -= size;
- }
- } else if (!is_nil(obj)) {
- goto L_type_error;
- }
- }
-
- if (csize != 0) {
- io_list_to_vec_set_vec(&iov, &binv, cbin, cptr, csize, &vlen);
- }
-
- DESTROY_ESTACK(s);
- return vlen;
-
- L_type_error:
- DESTROY_ESTACK(s);
- return -2;
-
- L_overflow:
- DESTROY_ESTACK(s);
- return -1;
-}
-
-#define IO_LIST_VEC_COUNT(obj) \
-do { \
- Uint _size = binary_size(obj); \
- Eterm _real; \
- ERTS_DECLARE_DUMMY(Uint _offset); \
- int _bitoffs; \
- int _bitsize; \
- ERTS_GET_REAL_BIN(obj, _real, _offset, _bitoffs, _bitsize); \
- if (_bitsize != 0) goto L_type_error; \
- if (thing_subtag(*binary_val(_real)) == REFC_BINARY_SUBTAG && \
- _bitoffs == 0) { \
- b_size += _size; \
- if (b_size < _size) goto L_overflow_error; \
- in_clist = 0; \
- v_size++; \
- /* If iov_len is smaller then Uint we split the binary into*/ \
- /* multiple smaller (2GB) elements in the iolist.*/ \
- v_size += _size / MAX_SYSIOVEC_IOVLEN; \
- if (_size >= ERL_SMALL_IO_BIN_LIMIT) { \
- p_in_clist = 0; \
- p_v_size++; \
- } else { \
- p_c_size += _size; \
- if (!p_in_clist) { \
- p_in_clist = 1; \
- p_v_size++; \
- } \
- } \
- } else { \
- c_size += _size; \
- if (c_size < _size) goto L_overflow_error; \
- if (!in_clist) { \
- in_clist = 1; \
- v_size++; \
- } \
- p_c_size += _size; \
- if (!p_in_clist) { \
- p_in_clist = 1; \
- p_v_size++; \
- } \
- } \
-} while (0)
-
-
-/*
- * Returns 0 if successful and a non-zero value otherwise.
- *
- * Return values through pointers:
- * *vsize - SysIOVec size needed for a writev
- * *csize - Number of bytes not in binary (in the common binary)
- * *pvsize - SysIOVec size needed if packing small binaries
- * *pcsize - Number of bytes in the common binary if packing
- * *total_size - Total size of iolist in bytes
- */
-
-static int
-io_list_vec_len(Eterm obj, int* vsize, Uint* csize,
- Uint* pvsize, Uint* pcsize,
- ErlDrvSizeT* total_size)
-{
- DECLARE_ESTACK(s);
- Eterm* objp;
- Uint v_size = 0;
- Uint c_size = 0;
- Uint b_size = 0;
- Uint in_clist = 0;
- Uint p_v_size = 0;
- Uint p_c_size = 0;
- Uint p_in_clist = 0;
- Uint total;
-
- goto L_jump_start; /* avoid a push */
-
- while (!ESTACK_ISEMPTY(s)) {
- obj = ESTACK_POP(s);
- L_jump_start:
- if (is_list(obj)) {
- L_iter_list:
- objp = list_val(obj);
- obj = CAR(objp);
-
- if (is_byte(obj)) {
- c_size++;
- if (c_size == 0) {
- goto L_overflow_error;
- }
- if (!in_clist) {
- in_clist = 1;
- v_size++;
- }
- p_c_size++;
- if (!p_in_clist) {
- p_in_clist = 1;
- p_v_size++;
- }
- }
- else if (is_binary(obj)) {
- IO_LIST_VEC_COUNT(obj);
- }
- else if (is_list(obj)) {
- ESTACK_PUSH(s, CDR(objp));
- goto L_iter_list; /* on head */
- }
- else if (!is_nil(obj)) {
- goto L_type_error;
- }
-
- obj = CDR(objp);
- if (is_list(obj))
- goto L_iter_list; /* on tail */
- else if (is_binary(obj)) { /* binary tail is OK */
- IO_LIST_VEC_COUNT(obj);
- }
- else if (!is_nil(obj)) {
- goto L_type_error;
- }
- }
- else if (is_binary(obj)) {
- IO_LIST_VEC_COUNT(obj);
- }
- else if (!is_nil(obj)) {
- goto L_type_error;
- }
- }
-
- total = c_size + b_size;
- if (total < c_size) {
- goto L_overflow_error;
- }
- *total_size = (ErlDrvSizeT) total;
-
- DESTROY_ESTACK(s);
- *vsize = v_size;
- *csize = c_size;
- *pvsize = p_v_size;
- *pcsize = p_c_size;
- return 0;
-
- L_type_error:
- L_overflow_error:
- DESTROY_ESTACK(s);
- return 1;
-}
-
typedef enum {
ERTS_TRY_IMM_DRV_CALL_OK,
ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK,
@@ -1797,8 +1469,7 @@ cleanup_scheduled_outputv(ErlIOVec *ev, ErlDrvBinary *cbinp)
int i;
/* Need to free all binaries */
for (i = 1; i < ev->vsize; i++)
- if (ev->binv[i])
- driver_free_binary(ev->binv[i]);
+ driver_free_binary(ev->binv[i]);
if (cbinp)
driver_free_binary(cbinp);
}
@@ -1966,15 +1637,14 @@ erts_port_output_async(Port *prt, Eterm from, Eterm list)
size_t size;
int task_flags;
ErtsProc2PortSigCallback port_sig_callback;
- ErlDrvBinary *cbin = NULL;
- ErlIOVec *evp = NULL;
+ ErtsIOQBinary *cbin = NULL;
+ ErtsIOVec *evp = NULL;
char *buf = NULL;
ErtsPortTaskHandle *ns_pthp;
if (drv->outputv) {
- ErlIOVec ev;
SysIOVec* ivp;
- ErlDrvBinary** bvp;
+ ErtsIOQBinary** bvp;
int vsize;
Uint csize;
Uint pvsize;
@@ -1984,91 +1654,63 @@ erts_port_output_async(Port *prt, Eterm from, Eterm list)
char *ptr;
int i;
- Eterm* bptr = NULL;
- Uint offset;
-
- if (is_binary(list)) {
- /* We optimize for when we get a procbin without offset */
- Eterm real_bin;
- int bitoffs;
- int bitsize;
- ERTS_GET_REAL_BIN(list, real_bin, offset, bitoffs, bitsize);
- bptr = binary_val(real_bin);
- if (*bptr == HEADER_PROC_BIN && bitoffs == 0) {
- size = binary_size(list);
- vsize = 1;
- } else
- bptr = NULL;
- }
-
- if (!bptr) {
- if (io_list_vec_len(list, &vsize, &csize, &pvsize, &pcsize, &size))
- goto bad_value;
+ if (erts_ioq_iodata_vec_len(list, &vsize, &csize, &pvsize, &pcsize,
+ &size, ERL_SMALL_IO_BIN_LIMIT))
+ goto bad_value;
- /* To pack or not to pack (small binaries) ...? */
- if (vsize >= SMALL_WRITE_VEC) {
- /* Do pack */
- vsize = pvsize + 1;
- csize = pcsize;
- blimit = ERL_SMALL_IO_BIN_LIMIT;
- }
- cbin = driver_alloc_binary(csize);
+ /* To pack or not to pack (small binaries) ...? */
+ if (vsize >= SMALL_WRITE_VEC) {
+ /* Do pack */
+ vsize = pvsize + 1;
+ csize = pcsize;
+ blimit = ERL_SMALL_IO_BIN_LIMIT;
+ }
+ if (csize) {
+ cbin = (ErtsIOQBinary *)driver_alloc_binary(csize);
if (!cbin)
erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize));
}
-
iov_offset = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErlIOVec));
binv_offset = iov_offset;
binv_offset += ERTS_ALC_DATA_ALIGN_SIZE((vsize+1)*sizeof(SysIOVec));
alloc_size = binv_offset;
- alloc_size += (vsize+1)*sizeof(ErlDrvBinary *);
+ alloc_size += (vsize+1)*sizeof(ErtsIOQBinary *);
sigdp = erts_port_task_alloc_p2p_sig_data_extra(alloc_size, (void**)&ptr);
- evp = (ErlIOVec *) ptr;
- ivp = evp->iov = (SysIOVec *) (ptr + iov_offset);
- bvp = evp->binv = (ErlDrvBinary **) (ptr + binv_offset);
+ evp = (ErtsIOVec *) ptr;
+ ivp = evp->driver.iov = (SysIOVec *) (ptr + iov_offset);
+ bvp = evp->common.binv = (ErtsIOQBinary **) (ptr + binv_offset);
ivp[0].iov_base = NULL;
ivp[0].iov_len = 0;
bvp[0] = NULL;
- if (bptr) {
- ProcBin* pb = (ProcBin *) bptr;
-
- ivp[1].iov_base = pb->bytes+offset;
- ivp[1].iov_len = size;
- bvp[1] = Binary2ErlDrvBinary(pb->val);
-
- evp->vsize = 1;
- } else {
-
- evp->vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit);
- if (evp->vsize < 0) {
- if (evp != &ev)
- erts_free(ERTS_ALC_T_DRV_CMD_DATA, evp);
- driver_free_binary(cbin);
- goto bad_value;
- }
+ evp->driver.vsize = erts_ioq_iodata_to_vec(list, ivp+1, bvp+1, cbin,
+ blimit, 1);
+ if (evp->driver.vsize < 0) {
+ erts_free(ERTS_ALC_T_DRV_CMD_DATA, evp);
+ driver_free_binary(&cbin->driver);
+ goto bad_value;
}
#if 0
/* This assertion may say something useful, but it can
be falsified during the emulator test suites. */
ASSERT(evp->vsize == vsize);
#endif
- evp->vsize++;
- evp->size = size; /* total size */
+ evp->driver.vsize++;
+ evp->driver.size = size; /* total size */
/* Need to increase refc on all binaries */
- for (i = 1; i < evp->vsize; i++)
+ for (i = 1; i < evp->driver.vsize; i++)
if (bvp[i])
- driver_binary_inc_refc(bvp[i]);
+ driver_binary_inc_refc(&bvp[i]->driver);
sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
sigdp->u.outputv.from = from;
- sigdp->u.outputv.evp = evp;
- sigdp->u.outputv.cbinp = cbin;
+ sigdp->u.outputv.evp = &evp->driver;
+ sigdp->u.outputv.cbinp = &cbin->driver;
port_sig_callback = port_sig_outputv;
} else {
ErlDrvSizeT ERTS_DECLARE_DUMMY(r);
@@ -2139,8 +1781,8 @@ erts_port_output(Process *c_p,
erts_aint32_t sched_flags, busy_flgs, invalid_flags;
int task_flags;
ErtsProc2PortSigCallback port_sig_callback;
- ErlDrvBinary *cbin = NULL;
- ErlIOVec *evp = NULL;
+ ErtsIOQBinary *cbin = NULL;
+ ErtsIOVec *evp = NULL;
char *buf = NULL;
int force_immediate_call = (flags & ERTS_PORT_SIG_FLG_FORCE_IMM_CALL);
int async_nosuspend;
@@ -2186,11 +1828,11 @@ erts_port_output(Process *c_p,
}
#endif
if (drv->outputv) {
- ErlIOVec ev;
+ ErtsIOVec ev;
SysIOVec iv[SMALL_WRITE_VEC];
- ErlDrvBinary* bv[SMALL_WRITE_VEC];
+ ErtsIOQBinary* bv[SMALL_WRITE_VEC];
SysIOVec* ivp;
- ErlDrvBinary** bvp;
+ ErtsIOQBinary** bvp;
int vsize;
Uint csize;
Uint pvsize;
@@ -2198,18 +1840,19 @@ erts_port_output(Process *c_p,
Uint blimit;
size_t iov_offset, binv_offset, alloc_size;
- if (io_list_vec_len(list, &vsize, &csize, &pvsize, &pcsize, &size))
+ if (erts_ioq_iodata_vec_len(list, &vsize, &csize, &pvsize, &pcsize,
+ &size, ERL_SMALL_IO_BIN_LIMIT))
goto bad_value;
iov_offset = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErlIOVec));
binv_offset = iov_offset;
binv_offset += ERTS_ALC_DATA_ALIGN_SIZE((vsize+1)*sizeof(SysIOVec));
alloc_size = binv_offset;
- alloc_size += (vsize+1)*sizeof(ErlDrvBinary *);
+ alloc_size += (vsize+1)*sizeof(ErtsIOQBinary *);
if (try_call && vsize < SMALL_WRITE_VEC) {
- ivp = ev.iov = iv;
- bvp = ev.binv = bv;
+ ivp = ev.common.iov = iv;
+ bvp = ev.common.binv = bv;
evp = &ev;
}
else {
@@ -2220,9 +1863,9 @@ erts_port_output(Process *c_p,
sigdp = erts_port_task_alloc_p2p_sig_data_extra(
alloc_size, (void**)&ptr);
}
- evp = (ErlIOVec *) ptr;
- ivp = evp->iov = (SysIOVec *) (ptr + iov_offset);
- bvp = evp->binv = (ErlDrvBinary **) (ptr + binv_offset);
+ evp = (ErtsIOVec *) ptr;
+ ivp = evp->driver.iov = (SysIOVec *) (ptr + iov_offset);
+ bvp = evp->common.binv = (ErtsIOQBinary **) (ptr + binv_offset);
}
/* To pack or not to pack (small binaries) ...? */
@@ -2238,23 +1881,26 @@ erts_port_output(Process *c_p,
}
/* Use vsize and csize from now on */
- cbin = driver_alloc_binary(csize);
- if (!cbin)
- erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize));
+ if (csize) {
+ cbin = (ErtsIOQBinary *)driver_alloc_binary(csize);
+ if (!cbin)
+ erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize));
+ }
/* Element 0 is for driver usage to add header block */
ivp[0].iov_base = NULL;
ivp[0].iov_len = 0;
bvp[0] = NULL;
- evp->vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit);
- if (evp->vsize < 0) {
+ evp->driver.vsize = erts_ioq_iodata_to_vec(list, ivp+1, bvp+1,
+ cbin, blimit, 1);
+ if (evp->driver.vsize < 0) {
if (evp != &ev) {
if (try_call)
erts_free(ERTS_ALC_T_TMP, evp);
else
erts_port_task_free_p2p_sig_data(sigdp);
}
- driver_free_binary(cbin);
+ driver_free_binary(&cbin->driver);
goto bad_value;
}
#if 0
@@ -2262,19 +1908,19 @@ erts_port_output(Process *c_p,
be falsified during the emulator test suites. */
ASSERT(evp->vsize == vsize);
#endif
- evp->vsize++;
- evp->size = size; /* total size */
+ evp->driver.vsize++;
+ evp->driver.size = size; /* total size */
if (!try_call) {
int i;
/* Need to increase refc on all binaries */
- for (i = 1; i < evp->vsize; i++)
- if (bvp[i])
- driver_binary_inc_refc(bvp[i]);
+ for (i = 1; i < evp->driver.vsize; i++)
+ if (bvp[i])
+ driver_binary_inc_refc(&bvp[i]->driver);
}
else {
int i;
- ErlIOVec *new_evp;
+ ErtsIOVec *new_evp;
ErtsTryImmDrvCallResult try_call_res;
ErtsTryImmDrvCallState try_call_state
= ERTS_INIT_TRY_IMM_DRV_CALL_STATE(
@@ -2297,14 +1943,14 @@ erts_port_output(Process *c_p,
from,
prt,
drv,
- evp);
+ &evp->driver);
if (force_immediate_call)
finalize_force_imm_drv_call(&try_call_state);
else
finalize_imm_drv_call(&try_call_state);
/* Fall through... */
case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
- driver_free_binary(cbin);
+ driver_free_binary(&cbin->driver);
if (evp != &ev) {
ASSERT(!sigdp);
erts_free(ERTS_ALC_T_TMP, evp);
@@ -2318,7 +1964,7 @@ erts_port_output(Process *c_p,
sched_flags = try_call_state.sched_flags;
if (async_nosuspend
&& (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))) {
- driver_free_binary(cbin);
+ driver_free_binary(&cbin->driver);
if (evp != &ev) {
ASSERT(!sigdp);
erts_free(ERTS_ALC_T_TMP, evp);
@@ -2333,9 +1979,9 @@ erts_port_output(Process *c_p,
}
/* Need to increase refc on all binaries */
- for (i = 1; i < evp->vsize; i++)
+ for (i = 1; i < evp->driver.vsize; i++)
if (bvp[i])
- driver_binary_inc_refc(bvp[i]);
+ driver_binary_inc_refc(&bvp[i]->driver);
/* The port task and iovec is allocated in the
same structure as an optimization. This
@@ -2348,18 +1994,18 @@ erts_port_output(Process *c_p,
if (evp != &ev) {
/* Copy from TMP alloc to port task */
sys_memcpy((void *) new_evp, (void *) evp, alloc_size);
- new_evp->iov = (SysIOVec *) (((char *) new_evp)
- + iov_offset);
- bvp = new_evp->binv = (ErlDrvBinary **) (((char *) new_evp)
- + binv_offset);
+ new_evp->driver.iov = (SysIOVec *) (((char *) new_evp)
+ + iov_offset);
+ bvp = new_evp->common.binv = (ErtsIOQBinary **) (((char *) new_evp)
+ + binv_offset);
#ifdef DEBUG
- ASSERT(new_evp->vsize == evp->vsize);
- ASSERT(new_evp->size == evp->size);
- for (i = 0; i < evp->vsize; i++) {
- ASSERT(new_evp->iov[i].iov_len == evp->iov[i].iov_len);
- ASSERT(new_evp->iov[i].iov_base == evp->iov[i].iov_base);
- ASSERT(new_evp->binv[i] == evp->binv[i]);
+ ASSERT(new_evp->driver.vsize == evp->driver.vsize);
+ ASSERT(new_evp->driver.size == evp->driver.size);
+ for (i = 0; i < evp->driver.vsize; i++) {
+ ASSERT(new_evp->driver.iov[i].iov_len == evp->driver.iov[i].iov_len);
+ ASSERT(new_evp->driver.iov[i].iov_base == evp->driver.iov[i].iov_base);
+ ASSERT(new_evp->driver.binv[i] == evp->driver.binv[i]);
}
#endif
@@ -2368,24 +2014,24 @@ erts_port_output(Process *c_p,
else { /* from stack allocated structure; offsets may differ */
sys_memcpy((void *) new_evp, (void *) evp, sizeof(ErlIOVec));
- new_evp->iov = (SysIOVec *) (((char *) new_evp)
- + iov_offset);
- sys_memcpy((void *) new_evp->iov,
- (void *) evp->iov,
- evp->vsize * sizeof(SysIOVec));
- new_evp->binv = (ErlDrvBinary **) (((char *) new_evp)
- + binv_offset);
- sys_memcpy((void *) new_evp->binv,
- (void *) evp->binv,
- evp->vsize * sizeof(ErlDrvBinary *));
+ new_evp->driver.iov = (SysIOVec *) (((char *) new_evp)
+ + iov_offset);
+ sys_memcpy((void *) new_evp->driver.iov,
+ (void *) evp->driver.iov,
+ evp->driver.vsize * sizeof(SysIOVec));
+ new_evp->common.binv = (ErtsIOQBinary **) (((char *) new_evp)
+ + binv_offset);
+ sys_memcpy((void *) new_evp->common.binv,
+ (void *) evp->common.binv,
+ evp->driver.vsize * sizeof(ErtsIOQBinary *));
#ifdef DEBUG
- ASSERT(new_evp->vsize == evp->vsize);
- ASSERT(new_evp->size == evp->size);
- for (i = 0; i < evp->vsize; i++) {
- ASSERT(new_evp->iov[i].iov_len == evp->iov[i].iov_len);
- ASSERT(new_evp->iov[i].iov_base == evp->iov[i].iov_base);
- ASSERT(new_evp->binv[i] == evp->binv[i]);
+ ASSERT(new_evp->driver.vsize == evp->driver.vsize);
+ ASSERT(new_evp->driver.size == evp->driver.size);
+ for (i = 0; i < evp->driver.vsize; i++) {
+ ASSERT(new_evp->driver.iov[i].iov_len == evp->driver.iov[i].iov_len);
+ ASSERT(new_evp->driver.iov[i].iov_base == evp->driver.iov[i].iov_base);
+ ASSERT(new_evp->driver.binv[i] == evp->driver.binv[i]);
}
#endif
@@ -2396,8 +2042,8 @@ erts_port_output(Process *c_p,
sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
sigdp->u.outputv.from = from;
- sigdp->u.outputv.evp = evp;
- sigdp->u.outputv.cbinp = cbin;
+ sigdp->u.outputv.evp = &evp->driver;
+ sigdp->u.outputv.cbinp = &cbin->driver;
port_sig_callback = port_sig_outputv;
}
else {
@@ -7154,307 +6800,51 @@ driver_pdl_dec_refc(ErlDrvPDL pdl)
return refc;
}
-/* expand queue to hold n elements in tail or head */
-static int expandq(ErlIOQueue* q, int n, int tail)
-/* tail: 0 if make room in head, make room in tail otherwise */
-{
- int h_sz; /* room before header */
- int t_sz; /* room after tail */
- int q_sz; /* occupied */
- int nvsz;
- SysIOVec* niov;
- ErlDrvBinary** nbinv;
-
- h_sz = q->v_head - q->v_start;
- t_sz = q->v_end - q->v_tail;
- q_sz = q->v_tail - q->v_head;
-
- if (tail && (n <= t_sz)) /* do we need to expand tail? */
- return 0;
- else if (!tail && (n <= h_sz)) /* do we need to expand head? */
- return 0;
- else if (n > (h_sz + t_sz)) { /* need to allocate */
- /* we may get little extra but it ok */
- nvsz = (q->v_end - q->v_start) + n;
-
- niov = erts_alloc_fnf(ERTS_ALC_T_IOQ, nvsz * sizeof(SysIOVec));
- if (!niov)
- return -1;
- nbinv = erts_alloc_fnf(ERTS_ALC_T_IOQ, nvsz * sizeof(ErlDrvBinary**));
- if (!nbinv) {
- erts_free(ERTS_ALC_T_IOQ, (void *) niov);
- return -1;
- }
- if (tail) {
- sys_memcpy(niov, q->v_head, q_sz*sizeof(SysIOVec));
- if (q->v_start != q->v_small)
- erts_free(ERTS_ALC_T_IOQ, (void *) q->v_start);
- q->v_start = niov;
- q->v_end = niov + nvsz;
- q->v_head = q->v_start;
- q->v_tail = q->v_head + q_sz;
-
- sys_memcpy(nbinv, q->b_head, q_sz*sizeof(ErlDrvBinary*));
- if (q->b_start != q->b_small)
- erts_free(ERTS_ALC_T_IOQ, (void *) q->b_start);
- q->b_start = nbinv;
- q->b_end = nbinv + nvsz;
- q->b_head = q->b_start;
- q->b_tail = q->b_head + q_sz;
- }
- else {
- sys_memcpy(niov+nvsz-q_sz, q->v_head, q_sz*sizeof(SysIOVec));
- if (q->v_start != q->v_small)
- erts_free(ERTS_ALC_T_IOQ, (void *) q->v_start);
- q->v_start = niov;
- q->v_end = niov + nvsz;
- q->v_tail = q->v_end;
- q->v_head = q->v_tail - q_sz;
-
- sys_memcpy(nbinv+nvsz-q_sz, q->b_head, q_sz*sizeof(ErlDrvBinary*));
- if (q->b_start != q->b_small)
- erts_free(ERTS_ALC_T_IOQ, (void *) q->b_start);
- q->b_start = nbinv;
- q->b_end = nbinv + nvsz;
- q->b_tail = q->b_end;
- q->b_head = q->b_tail - q_sz;
- }
- }
- else if (tail) { /* move to beginning to make room in tail */
- sys_memmove(q->v_start, q->v_head, q_sz*sizeof(SysIOVec));
- q->v_head = q->v_start;
- q->v_tail = q->v_head + q_sz;
- sys_memmove(q->b_start, q->b_head, q_sz*sizeof(ErlDrvBinary*));
- q->b_head = q->b_start;
- q->b_tail = q->b_head + q_sz;
- }
- else { /* move to end to make room */
- sys_memmove(q->v_end-q_sz, q->v_head, q_sz*sizeof(SysIOVec));
- q->v_tail = q->v_end;
- q->v_head = q->v_tail-q_sz;
- sys_memmove(q->b_end-q_sz, q->b_head, q_sz*sizeof(ErlDrvBinary*));
- q->b_tail = q->b_end;
- q->b_head = q->b_tail-q_sz;
- }
-
- return 0;
-}
-
-
-
/* Put elements from vec at q tail */
int driver_enqv(ErlDrvPort ix, ErlIOVec* vec, ErlDrvSizeT skip)
{
- int n;
- size_t len;
- ErlDrvSizeT size;
- SysIOVec* iov;
- ErlDrvBinary** binv;
- ErlDrvBinary* b;
- ErlIOQueue* q = drvport2ioq(ix);
-
- if (q == NULL)
- return -1;
-
- ASSERT(vec->size >= skip); /* debug only */
- if (vec->size <= skip)
- return 0;
- size = vec->size - skip;
-
- iov = vec->iov;
- binv = vec->binv;
- n = vec->vsize;
-
- /* we use do here to strip iov_len=0 from beginning */
- do {
- len = iov->iov_len;
- if (len <= skip) {
- skip -= len;
- iov++;
- binv++;
- n--;
- }
- else {
- iov->iov_base = ((char *)(iov->iov_base)) + skip;
- iov->iov_len -= skip;
- skip = 0;
- }
- } while(skip > 0);
-
- if (q->v_tail + n >= q->v_end)
- expandq(q, n, 1);
-
- /* Queue and reference all binaries (remove zero length items) */
- while(n--) {
- if ((len = iov->iov_len) > 0) {
- if ((b = *binv) == NULL) { /* speical case create binary ! */
- b = driver_alloc_binary(len);
- sys_memcpy(b->orig_bytes, iov->iov_base, len);
- *q->b_tail++ = b;
- q->v_tail->iov_len = len;
- q->v_tail->iov_base = b->orig_bytes;
- q->v_tail++;
- }
- else {
- driver_binary_inc_refc(b);
- *q->b_tail++ = b;
- *q->v_tail++ = *iov;
- }
- }
- iov++;
- binv++;
- }
- q->size += size; /* update total size in queue */
- return 0;
+ ASSERT(vec->size >= skip);
+ return erts_ioq_enqv(drvport2ioq(ix), (ErtsIOVec*)vec, skip);
}
/* Put elements from vec at q head */
int driver_pushqv(ErlDrvPort ix, ErlIOVec* vec, ErlDrvSizeT skip)
{
- int n;
- size_t len;
- ErlDrvSizeT size;
- SysIOVec* iov;
- ErlDrvBinary** binv;
- ErlDrvBinary* b;
- ErlIOQueue* q = drvport2ioq(ix);
-
- if (q == NULL)
- return -1;
-
- if (vec->size <= skip)
- return 0;
- size = vec->size - skip;
-
- iov = vec->iov;
- binv = vec->binv;
- n = vec->vsize;
-
- /* we use do here to strip iov_len=0 from beginning */
- do {
- len = iov->iov_len;
- if (len <= skip) {
- skip -= len;
- iov++;
- binv++;
- n--;
- }
- else {
- iov->iov_base = ((char *)(iov->iov_base)) + skip;
- iov->iov_len -= skip;
- skip = 0;
- }
- } while(skip > 0);
-
- if (q->v_head - n < q->v_start)
- expandq(q, n, 0);
-
- /* Queue and reference all binaries (remove zero length items) */
- iov += (n-1); /* move to end */
- binv += (n-1); /* move to end */
- while(n--) {
- if ((len = iov->iov_len) > 0) {
- if ((b = *binv) == NULL) { /* speical case create binary ! */
- b = driver_alloc_binary(len);
- sys_memcpy(b->orig_bytes, iov->iov_base, len);
- *--q->b_head = b;
- q->v_head--;
- q->v_head->iov_len = len;
- q->v_head->iov_base = b->orig_bytes;
- }
- else {
- driver_binary_inc_refc(b);
- *--q->b_head = b;
- *--q->v_head = *iov;
- }
- }
- iov--;
- binv--;
- }
- q->size += size; /* update total size in queue */
- return 0;
+ ASSERT(vec->size >= skip);
+ return erts_ioq_pushqv(drvport2ioq(ix), (ErtsIOVec*)vec, skip);
}
-
/*
** Remove size bytes from queue head
** Return number of bytes that remain in queue
*/
ErlDrvSizeT driver_deq(ErlDrvPort ix, ErlDrvSizeT size)
{
- ErlIOQueue* q = drvport2ioq(ix);
- ErlDrvSizeT len;
-
- if ((q == NULL) || (q->size < size))
- return -1;
- q->size -= size;
- while (size > 0) {
- ASSERT(q->v_head != q->v_tail);
-
- len = q->v_head->iov_len;
- if (len <= size) {
- size -= len;
- driver_free_binary(*q->b_head);
- *q->b_head++ = NULL;
- q->v_head++;
- }
- else {
- q->v_head->iov_base = ((char *)(q->v_head->iov_base)) + size;
- q->v_head->iov_len -= size;
- size = 0;
- }
- }
-
- /* restart pointers (optimised for enq) */
- if (q->v_head == q->v_tail) {
- q->v_head = q->v_tail = q->v_start;
- q->b_head = q->b_tail = q->b_start;
- }
- return q->size;
+ ErlPortIOQueue *q = drvport2ioq(ix);
+ if (erts_ioq_deq(q, size) == -1)
+ return -1;
+ return erts_ioq_size(q);
}
-ErlDrvSizeT driver_peekqv(ErlDrvPort ix, ErlIOVec *ev) {
- ErlIOQueue *q = drvport2ioq(ix);
- ASSERT(ev);
-
- if (! q) {
- return (ErlDrvSizeT) -1;
- } else {
- if ((ev->vsize = q->v_tail - q->v_head) == 0) {
- ev->size = 0;
- ev->iov = NULL;
- ev->binv = NULL;
- } else {
- ev->size = q->size;
- ev->iov = q->v_head;
- ev->binv = q->b_head;
- }
- return q->size;
- }
+ErlDrvSizeT driver_peekqv(ErlDrvPort ix, ErlIOVec *ev)
+{
+ return erts_ioq_peekqv(drvport2ioq(ix), (ErtsIOVec*)ev);
}
SysIOVec* driver_peekq(ErlDrvPort ix, int* vlenp) /* length of io-vector */
{
- ErlIOQueue* q = drvport2ioq(ix);
-
- if (q == NULL) {
- *vlenp = -1;
- return NULL;
- }
- if ((*vlenp = (q->v_tail - q->v_head)) == 0)
- return NULL;
- return q->v_head;
+ return erts_ioq_peekq(drvport2ioq(ix), vlenp);
}
ErlDrvSizeT driver_sizeq(ErlDrvPort ix)
{
- ErlIOQueue* q = drvport2ioq(ix);
+ ErlPortIOQueue *q = drvport2ioq(ix);
if (q == NULL)
- return (size_t) -1;
- return q->size;
+ return (ErlDrvSizeT) -1;
+ return erts_ioq_size(q);
}
diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c
index 0fb25c2082..a3305da47d 100644
--- a/erts/emulator/beam/utils.c
+++ b/erts/emulator/beam/utils.c
@@ -52,6 +52,7 @@
#include "erl_ptab.h"
#include "erl_check_io.h"
#include "erl_bif_unique.h"
+#include "erl_io_queue.h"
#define ERTS_WANT_TIMER_WHEEL_API
#include "erl_time.h"
#ifdef HIPE
diff --git a/erts/emulator/test/nif_SUITE.erl b/erts/emulator/test/nif_SUITE.erl
index 0337274178..4811244b98 100644
--- a/erts/emulator/test/nif_SUITE.erl
+++ b/erts/emulator/test/nif_SUITE.erl
@@ -61,7 +61,8 @@
nif_internal_hash_salted/1,
nif_phash2/1,
nif_whereis/1, nif_whereis_parallel/1,
- nif_whereis_threaded/1, nif_whereis_proxy/1
+ nif_whereis_threaded/1, nif_whereis_proxy/1,
+ nif_ioq/1
]).
-export([many_args_100/100]).
@@ -99,7 +100,8 @@ all() ->
nif_internal_hash,
nif_internal_hash_salted,
nif_phash2,
- nif_whereis, nif_whereis_parallel, nif_whereis_threaded].
+ nif_whereis, nif_whereis_parallel, nif_whereis_threaded,
+ nif_ioq].
groups() ->
[{G, [], api_repeaters()} || G <- api_groups()]
@@ -2957,6 +2959,180 @@ nif_whereis_proxy(Ref) ->
{Ref, quit} ->
ok
end.
+nif_ioq(Config) ->
+ ensure_lib_loaded(Config),
+
+ Script =
+ [{create, a},
+
+ %% Test enq of erlang term binary
+ {enqb, a},
+ {enqb, a, 3},
+
+ %% Test enq of non-erlang term binary
+ {enqbraw,a},
+ {enqbraw,a, 5},
+ {peek, a},
+ {deq, a, 42},
+
+ %% Test enqv
+ {enqv, a, 2, 100},
+ {deq, a, all},
+
+ %% This skips all elements but one in the iolist
+ {enqv, a, 5, iolist_size(nif_ioq_payload(5)) - 1},
+ {peek, a},
+
+ %% Test to enqueue a bunch of refc binaries
+ {enqv, a, [nif_ioq_payload(refcbin) || _ <- lists:seq(1,20)], 0},
+
+ %% Enq stuff to destroy with data in queue
+ {enqv, a, 2, 100},
+ {destroy,a},
+
+ %% Test destroy of new queue
+ {create, a},
+ {destroy,a}
+ ],
+
+ nif_ioq_run(Script),
+
+ %% Test that only enif_inspect_as_vec works
+ Payload = nif_ioq_payload(5),
+ PayloadBin = iolist_to_binary(Payload),
+
+ [begin
+ PayloadBin = iolist_to_binary(ioq_nif(inspect,Payload,Stack,Env)),
+ <<>> = iolist_to_binary(ioq_nif(inspect,[],Stack,Env))
+ end || Stack <- [no_stack, use_stack], Env <- [use_env, no_env]],
+
+ %% Test error cases
+
+ Q = ioq_nif(create),
+
+ {'EXIT', {badarg, _}} = (catch ioq_nif(deq, Q, 1)),
+ {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, 1, 1234)),
+
+ {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, [atom_in_list], 0)),
+ {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, [make_ref()], 0)),
+ {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, [256], 0)),
+ {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, [-1], 0)),
+ {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, [#{}], 0)),
+ {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, [1 bsl 64], 0)),
+ {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, [{tuple}], 0)),
+
+ {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, [atom_in_list], use_stack)),
+ {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, [make_ref()], no_stack)),
+ {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, [256], use_stack)),
+ {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, [-1], no_stack)),
+ {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, [#{}], use_stack)),
+ {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, [1 bsl 64], no_stack)),
+ {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, [{tuple}], use_stack)),
+ {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, <<"binary">>, use_stack)),
+
+ ioq_nif(destroy, Q),
+
+ %% Test that the example in the docs works
+ ExampleQ = ioq_nif(create),
+ true = ioq_nif(example, ExampleQ, nif_ioq_payload(5)),
+ ioq_nif(destroy, ExampleQ),
+
+ ok.
+
+
+nif_ioq_run(Script) ->
+ nif_ioq_run(Script, #{}).
+
+nif_ioq_run([{Action, Name}|T], State)
+ when Action =:= enqb; Action =:= enqbraw ->
+ nif_ioq_run([{Action, Name, heapbin}|T], State);
+nif_ioq_run([{Action, Name, Skip}|T], State)
+ when Action =:= enqb, is_integer(Skip);
+ Action =:= enqbraw, is_integer(Skip) ->
+ nif_ioq_run([{Action, Name, heapbin, Skip}|T], State);
+nif_ioq_run([{Action, Name, N}|T], State)
+ when Action =:= enqv; Action =:= enqb; Action =:= enqbraw ->
+ nif_ioq_run([{Action, Name, N, 0}|T], State);
+nif_ioq_run([{Action, Name, N, Skip}|T], State)
+ when Action =:= enqv; Action =:= enqb; Action =:= enqbraw ->
+
+ #{ q := IOQ, b := B } = Q = maps:get(Name, State),
+ true = ioq_nif(size, IOQ) == iolist_size(B),
+
+ %% Sanitize the log output a bit so that it doesn't become too large.
+ H = {Action, Name, try iolist_size(N) of Sz -> Sz catch _:_ -> N end, Skip},
+ ct:log("~p", [H]),
+
+ Data = nif_ioq_payload(N),
+ ioq_nif(Action, IOQ, Data, Skip),
+
+ <<_:Skip/binary, SkippedData/binary>> = iolist_to_binary(Data),
+
+ true = ioq_nif(size, IOQ) == (iolist_size([B|SkippedData])),
+
+ nif_ioq_run(T, State#{ Name := Q#{ b := [B|SkippedData]}});
+nif_ioq_run([{peek, Name} = H|T], State) ->
+ #{ q := IOQ, b := B } = maps:get(Name, State),
+ true = ioq_nif(size, IOQ) == iolist_size(B),
+
+ ct:log("~p", [H]),
+
+ Data = ioq_nif(peek, IOQ, ioq_nif(size, IOQ)),
+
+ true = iolist_to_binary(B) == iolist_to_binary(Data),
+ nif_ioq_run(T, State);
+nif_ioq_run([{deq, Name, all}|T], State) ->
+ #{ q := IOQ, b := B } = maps:get(Name, State),
+ Size = ioq_nif(size, IOQ),
+ true = Size == iolist_size(B),
+ nif_ioq_run([{deq, Name, Size}|T], State);
+nif_ioq_run([{deq, Name, N} = H|T], State) ->
+ #{ q := IOQ, b := B } = Q = maps:get(Name, State),
+ true = ioq_nif(size, IOQ) == iolist_size(B),
+
+ ct:log("~p", [H]),
+
+ <<_:N/binary,Remain/binary>> = iolist_to_binary(B),
+ NewQ = Q#{ b := Remain },
+
+ Sz = ioq_nif(deq, IOQ, N),
+
+ true = Sz == iolist_size(Remain),
+ true = ioq_nif(size, IOQ) == iolist_size(Remain),
+
+ nif_ioq_run(T, State#{ Name := NewQ });
+nif_ioq_run([{create, Name} = H|T], State) ->
+ ct:log("~p", [H]),
+ nif_ioq_run(T, State#{ Name => #{ q => ioq_nif(create), b => [] } });
+nif_ioq_run([{destroy, Name} = H|T], State) ->
+ #{ q := IOQ, b := B } = maps:get(Name, State),
+ true = ioq_nif(size, IOQ) == iolist_size(B),
+
+ ct:log("~p", [H]),
+
+ ioq_nif(destroy, IOQ),
+
+ nif_ioq_run(T, maps:remove(Name, State));
+nif_ioq_run([], State) ->
+ State.
+
+nif_ioq_payload(N) when is_integer(N) ->
+ Tail = if N > 3 -> nif_ioq_payload(N-3); true -> [] end,
+ Head = element(1, lists:split(N,[nif_ioq_payload(subbin),
+ nif_ioq_payload(heapbin),
+ nif_ioq_payload(refcbin) | Tail])),
+ erlang:iolist_to_iovec(Head);
+nif_ioq_payload(subbin) ->
+ Bin = nif_ioq_payload(refcbin),
+ Sz = size(Bin) - 1,
+ <<_:8,SubBin:Sz/binary,_/bits>> = Bin,
+ SubBin;
+nif_ioq_payload(heapbin) ->
+ <<"a literal heap binary">>;
+nif_ioq_payload(refcbin) ->
+ iolist_to_binary([lists:seq(1,255) || _ <- lists:seq(1,255)]);
+nif_ioq_payload(Else) ->
+ Else.
%% The NIFs:
lib_version() -> undefined.
@@ -3032,6 +3208,10 @@ monitor_process_nif(_,_,_,_) -> ?nif_stub.
demonitor_process_nif(_,_) -> ?nif_stub.
compare_monitors_nif(_,_) -> ?nif_stub.
monitor_frenzy_nif(_,_,_,_) -> ?nif_stub.
+ioq_nif(_) -> ?nif_stub.
+ioq_nif(_,_) -> ?nif_stub.
+ioq_nif(_,_,_) -> ?nif_stub.
+ioq_nif(_,_,_,_) -> ?nif_stub.
%% whereis
whereis_send(_Type,_Name,_Msg) -> ?nif_stub.
diff --git a/erts/emulator/test/nif_SUITE_data/nif_SUITE.c b/erts/emulator/test/nif_SUITE_data/nif_SUITE.c
index 307d1c390f..b47d013bd2 100644
--- a/erts/emulator/test/nif_SUITE_data/nif_SUITE.c
+++ b/erts/emulator/test/nif_SUITE_data/nif_SUITE.c
@@ -186,6 +186,12 @@ static ErlNifResourceTypeInit frenzy_rt_init = {
static ErlNifResourceType* whereis_resource_type;
static void whereis_thread_resource_dtor(ErlNifEnv* env, void* obj);
+static ErlNifResourceType* ioq_resource_type;
+
+static void ioq_resource_dtor(ErlNifEnv* env, void* obj);
+struct ioq_resource {
+ ErlNifIOQueue *q;
+};
static int get_pointer(ErlNifEnv* env, ERL_NIF_TERM term, void** pp)
{
@@ -243,6 +249,10 @@ static int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
whereis_resource_type = enif_open_resource_type(env, NULL, "nif_SUITE.whereis",
whereis_thread_resource_dtor, ERL_NIF_RT_CREATE, NULL);
+ ioq_resource_type = enif_open_resource_type(env,NULL,"ioq",
+ ioq_resource_dtor,
+ ERL_NIF_RT_CREATE, NULL);
+
atom_false = enif_make_atom(env,"false");
atom_true = enif_make_atom(env,"true");
atom_self = enif_make_atom(env,"self");
@@ -2430,7 +2440,6 @@ static ERL_NIF_TERM format_term(ErlNifEnv* env, int argc, const ERL_NIF_TERM arg
return enif_make_binary(env,&obin);
}
-
static int get_fd(ErlNifEnv* env, ERL_NIF_TERM term, struct fd_resource** rsrc)
{
if (!enif_get_resource(env, term, fd_resource_type, (void**)rsrc)) {
@@ -3158,7 +3167,231 @@ static void frenzy_resource_down(ErlNifEnv* env, void* obj, ErlNifPid* pid,
abort();
}
+/*********** testing ioq ************/
+
+static void ioq_resource_dtor(ErlNifEnv* env, void* obj) {
+
+}
+
+#ifndef __WIN32__
+static int writeiovec(ErlNifEnv *env, ERL_NIF_TERM term, ERL_NIF_TERM *tail, ErlNifIOQueue *q, int fd) {
+ ErlNifIOVec vec, *iovec = &vec;
+ SysIOVec *sysiovec;
+ int saved_errno;
+ int iovcnt, n;
+
+ if (!enif_inspect_iovec(env, 64, term, tail, &iovec))
+ return -2;
+
+ if (enif_ioq_size(q) > 0) {
+ /* If the I/O queue contains data we enqueue the iovec and then
+ peek the data to write out of the queue. */
+ if (!enif_ioq_enqv(q, iovec, 0))
+ return -3;
+
+ sysiovec = enif_ioq_peek(q, &iovcnt);
+ } else {
+ /* If the I/O queue is empty we skip the trip through it. */
+ iovcnt = iovec->iovcnt;
+ sysiovec = iovec->iov;
+ }
+
+ /* Attempt to write the data */
+ n = writev(fd, sysiovec, iovcnt);
+ saved_errno = errno;
+
+ if (enif_ioq_size(q) == 0) {
+ /* If the I/O queue was initially empty we enqueue any
+ remaining data into the queue for writing later. */
+ if (n >= 0 && !enif_ioq_enqv(q, iovec, n))
+ return -3;
+ } else {
+ /* Dequeue any data that was written from the queue. */
+ if (n > 0 && !enif_ioq_deq(q, n, NULL))
+ return -4;
+ }
+
+ /* return n, which is either number of bytes written or -1 if
+ some error happened */
+ errno = saved_errno;
+ return n;
+}
+#endif
+
+static ERL_NIF_TERM ioq(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
+{
+ struct ioq_resource *ioq;
+ ERL_NIF_TERM ret;
+ if (enif_is_identical(argv[0], enif_make_atom(env, "create"))) {
+ ErlNifIOQueue *q = enif_ioq_create(ERL_NIF_IOQ_NORMAL);
+ ioq = (struct ioq_resource *)enif_alloc_resource(ioq_resource_type,
+ sizeof(*ioq));
+ ioq->q = q;
+ ret = enif_make_resource(env, ioq);
+ enif_release_resource(ioq);
+ return ret;
+ } else if (enif_is_identical(argv[0], enif_make_atom(env, "inspect"))) {
+ ErlNifIOVec vec, *iovec = NULL;
+ int i, iovcnt;
+ ERL_NIF_TERM *elems, tail, list;
+ ErlNifEnv *myenv = NULL;
+
+ if (enif_is_identical(argv[2], enif_make_atom(env, "use_stack")))
+ iovec = &vec;
+ if (enif_is_identical(argv[3], enif_make_atom(env, "use_env")))
+ myenv = env;
+ if (!enif_inspect_iovec(myenv, ~(size_t)0, argv[1], &tail, &iovec))
+ return enif_make_badarg(env);
+
+ iovcnt = iovec->iovcnt;
+ elems = enif_alloc(sizeof(ERL_NIF_TERM) * iovcnt);
+
+ for (i = 0; i < iovcnt; i++) {
+ ErlNifBinary bin;
+ if (!enif_alloc_binary(iovec->iov[i].iov_len, &bin)) {
+ enif_free_iovec(iovec);
+ enif_free(elems);
+ return enif_make_badarg(env);
+ }
+ memcpy(bin.data, iovec->iov[i].iov_base, iovec->iov[i].iov_len);
+ elems[i] = enif_make_binary(env, &bin);
+ }
+
+ if (!myenv)
+ enif_free_iovec(iovec);
+
+ list = enif_make_list_from_array(env, elems, iovcnt);
+ enif_free(elems);
+ return list;
+ } else {
+ unsigned skip;
+ if (!enif_get_resource(env, argv[1], ioq_resource_type, (void**)&ioq)
+ || !ioq->q)
+ return enif_make_badarg(env);
+
+ if (enif_is_identical(argv[0], enif_make_atom(env, "example"))) {
+#ifndef __WIN32__
+ int fd[2], res = 0, cnt = 0, queue_cnt;
+ ERL_NIF_TERM tail;
+ char buff[255];
+ pipe(fd);
+ fcntl(fd[0], F_SETFL, fcntl(fd[0], F_GETFL) | O_NONBLOCK);
+ fcntl(fd[1], F_SETFL, fcntl(fd[1], F_GETFL) | O_NONBLOCK);
+
+ /* Write until the pipe buffer is full, which should result in data
+ * being queued up. */
+ for (res = 0; res >= 0; ) {
+ cnt += res;
+ res = writeiovec(env, argv[2], &tail, ioq->q, fd[1]);
+ }
+
+ /* Flush the queue while reading from the other end of the pipe. */
+ tail = enif_make_list(env, 0);
+ while (enif_ioq_size(ioq->q) > 0) {
+ res = writeiovec(env, tail, &tail, ioq->q, fd[1]);
+ if (res < 0 && errno != EAGAIN) {
+ break;
+ } else if (res > 0) {
+ cnt += res;
+ }
+
+ for (res = 0; res >= 0; ) {
+ cnt -= res;
+ res = read(fd[0], buff, sizeof(buff));
+ }
+ }
+
+ close(fd[0]);
+ close(fd[1]);
+
+ /* Check that we read as much as we wrote */
+ if (cnt == 0 && enif_ioq_size(ioq->q) == 0)
+ return enif_make_atom(env, "true");
+
+ return enif_make_int(env, cnt);
+#else
+ return enif_make_atom(env, "true");
+#endif
+ }
+ if (enif_is_identical(argv[0], enif_make_atom(env, "destroy"))) {
+ enif_ioq_destroy(ioq->q);
+ ioq->q = NULL;
+ return enif_make_atom(env, "false");
+ } else if (enif_is_identical(argv[0], enif_make_atom(env, "enqv"))) {
+ ErlNifIOVec vec, *iovec = &vec;
+ ERL_NIF_TERM tail;
+
+ if (!enif_get_uint(env, argv[3], &skip))
+ return enif_make_badarg(env);
+ if (!enif_inspect_iovec(env, ~0ul, argv[2], &tail, &iovec))
+ return enif_make_badarg(env);
+ if (!enif_ioq_enqv(ioq->q, iovec, skip))
+ return enif_make_badarg(env);
+
+ return enif_make_atom(env, "true");
+ } else if (enif_is_identical(argv[0], enif_make_atom(env, "enqb"))) {
+ ErlNifBinary bin;
+ if (!enif_get_uint(env, argv[3], &skip) ||
+ !enif_inspect_binary(env, argv[2], &bin))
+ return enif_make_badarg(env);
+
+ if (!enif_ioq_enq_binary(ioq->q, &bin, skip))
+ return enif_make_badarg(env);
+
+ return enif_make_atom(env, "true");
+ } else if (enif_is_identical(argv[0], enif_make_atom(env, "enqbraw"))) {
+ ErlNifBinary bin;
+ ErlNifBinary localbin;
+ int i;
+ if (!enif_get_uint(env, argv[3], &skip) ||
+ !enif_inspect_binary(env, argv[2], &bin) ||
+ !enif_alloc_binary(bin.size, &localbin))
+ return enif_make_badarg(env);
+
+ memcpy(localbin.data, bin.data, bin.size);
+ i = enif_ioq_enq_binary(ioq->q, &localbin, skip);
+ if (!i)
+ return enif_make_badarg(env);
+ else
+ return enif_make_atom(env, "true");
+ } else if (enif_is_identical(argv[0], enif_make_atom(env, "peek"))) {
+ int iovlen, num, i, off = 0;
+ SysIOVec *iov = enif_ioq_peek(ioq->q, &iovlen);
+ ErlNifBinary bin;
+
+ if (!enif_get_int(env, argv[2], &num) || !enif_alloc_binary(num, &bin))
+ return enif_make_badarg(env);
+
+ for (i = 0; i < iovlen && num > 0; i++) {
+ int to_copy = num < iov[i].iov_len ? num : iov[i].iov_len;
+ memcpy(bin.data + off, iov[i].iov_base, to_copy);
+ num -= to_copy;
+ off += to_copy;
+ }
+
+ return enif_make_binary(env, &bin);
+ } else if (enif_is_identical(argv[0], enif_make_atom(env, "deq"))) {
+ int num;
+ size_t sz;
+ ErlNifUInt64 sz64;
+ if (!enif_get_int(env, argv[2], &num))
+ return enif_make_badarg(env);
+
+ if (!enif_ioq_deq(ioq->q, num, &sz))
+ return enif_make_badarg(env);
+
+ sz64 = sz;
+
+ return enif_make_uint64(env, sz64);
+ } else if (enif_is_identical(argv[0], enif_make_atom(env, "size"))) {
+ ErlNifUInt64 size = enif_ioq_size(ioq->q);
+ return enif_make_uint64(env, size);
+ }
+ }
+
+ return enif_make_badarg(env);
+}
static ErlNifFunc nif_funcs[] =
{
@@ -3255,7 +3488,11 @@ static ErlNifFunc nif_funcs[] =
{"whereis_send", 3, whereis_send},
{"whereis_term", 2, whereis_term},
{"whereis_thd_lookup", 2, whereis_thd_lookup},
- {"whereis_thd_result", 1, whereis_thd_result}
+ {"whereis_thd_result", 1, whereis_thd_result},
+ {"ioq_nif", 1, ioq},
+ {"ioq_nif", 2, ioq},
+ {"ioq_nif", 3, ioq},
+ {"ioq_nif", 4, ioq}
};
ERL_NIF_INIT(nif_SUITE,nif_funcs,load,NULL,upgrade,unload)