diff options
Diffstat (limited to 'erts')
27 files changed, 4506 insertions, 1922 deletions
diff --git a/erts/doc/src/erl_nif.xml b/erts/doc/src/erl_nif.xml index 5a69bed34c..419e41693e 100644 --- a/erts/doc/src/erl_nif.xml +++ b/erts/doc/src/erl_nif.xml @@ -206,7 +206,7 @@ ok <seealso marker="#enif_make_resource"> <c>enif_make_resource</c></seealso>. The term returned by <c>enif_make_resource</c> is opaque in nature. - It can be stored and passed between processes on the same node, but + It can be stored and passed between processes, but the only real end usage is to pass it back as an argument to a NIF. The NIF can then call <seealso marker="#enif_get_resource"> <c>enif_get_resource</c></seealso> and get back a pointer to the @@ -344,6 +344,81 @@ return term;</code> <c>enif_convert_time_unit()</c></seealso></item> </list> </item> + <tag><marker id="enif_ioq"/>I/O Queues</tag> + <item> + <p>The Erlang nif library contains function for easily working + with I/O vectors as used by the unix system call <c>writev</c>. + The I/O Queue is not thread safe, so some other synchronization + mechanism has to be used.</p> + <list type="bulleted"> + <item><seealso marker="#SysIOVec"> + <c>SysIOVec</c></seealso></item> + <item><seealso marker="#ErlNifIOVec"> + <c>ErlNifIOVec</c></seealso></item> + <item><seealso marker="#enif_ioq_create"> + <c>enif_ioq_create()</c></seealso></item> + <item><seealso marker="#enif_ioq_destroy"> + <c>enif_ioq_destroy()</c></seealso></item> + <item><seealso marker="#enif_ioq_enq_binary"> + <c>enif_ioq_enq_binary()</c></seealso></item> + <item><seealso marker="#enif_ioq_enqv"> + <c>enif_ioq_enqv()</c></seealso></item> + <item><seealso marker="#enif_ioq_deq"> + <c>enif_ioq_deq()</c></seealso></item> + <item><seealso marker="#enif_ioq_peek"> + <c>enif_ioq_peek()</c></seealso></item> + <item><seealso marker="#enif_inspect_iovec"> + <c>enif_inspect_iovec()</c></seealso></item> + <item><seealso marker="#enif_free_iovec"> + <c>enif_free_iovec()</c></seealso></item> + </list> + <p>Typical usage when writing to a file descriptor looks like this:</p> + <code type="none"><![CDATA[ +int writeiovec(ErlNifEnv *env, ERL_NIF_TERM term, ERL_NIF_TERM *tail, + ErlNifIOQueue *q, int fd) { + + ErlNifIOVec vec, *iovec = &vec; + SysIOVec *sysiovec; + int saved_errno; + int iovcnt, n; + + if (!enif_inspect_iovec(env, 64, term, tail, &iovec)) + return -2; + + if (enif_ioq_size(q) > 0) { + /* If the I/O queue contains data we enqueue the iovec and + then peek the data to write out of the queue. */ + if (!enif_ioq_enqv(q, iovec, 0)) + return -3; + + sysiovec = enif_ioq_peek(q, &iovcnt); + } else { + /* If the I/O queue is empty we skip the trip through it. */ + iovcnt = iovec->iovcnt; + sysiovec = iovec->iov; + } + + /* Attempt to write the data */ + n = writev(fd, sysiovec, iovcnt); + saved_errno = errno; + + if (enif_ioq_size(q) == 0) { + /* If the I/O queue was initially empty we enqueue any + remaining data into the queue for writing later. */ + if (n >= 0 && !enif_ioq_enqv(q, iovec, n)) + return -3; + } else { + /* Dequeue any data that was written from the queue. */ + if (n > 0 && !enif_ioq_deq(q, n, NULL)) + return -4; + } + + /* return n, which is either number of bytes written or -1 if + some error happened */ + errno = saved_errno; + return n; +}]]></code> + </item> <tag><marker id="lengthy_work"/>Long-running NIFs</tag> <item> <p>As mentioned in the <seealso marker="#WARNING">warning</seealso> text @@ -837,6 +912,36 @@ typedef enum { </item> </taglist> </item> + <tag><marker id="SysIOVec"/><c>SysIOVec</c></tag> + <item> + <p>A system I/O vector, as used by <c>writev</c> on + Unix and <c>WSASend</c> on Win32. It is used in + <c>ErlNifIOVec</c> and by + <seealso marker="#enif_ioq_peek"><c>enif_ioq_peek</c></seealso>.</p> + </item> + <tag><marker id="ErlNifIOVec"/><c>ErlNifIOVec</c></tag> + <item> + <code type="none"> +typedef struct { + int iovcnt; + size_t size; + SysIOVec* iov; +} ErlNifIOVec;</code> + <p>An I/O vector containing <c>iovcnt</c> <c>SysIOVec</c>s + pointing to the data. It is used by + <seealso marker="#enif_inspect_iovec"> + <c>enif_inspect_iovec</c></seealso> and + <seealso marker="#enif_ioq_enqv"> + <c>enif_ioq_enqv</c></seealso>.</p> + </item> + <tag><marker id="ErlNifIOQueueOpts"/><c>ErlNifIOQueueOpts</c></tag> + <item> + Options to configure a <c>ErlNifIOQueue</c>. + <taglist> + <tag>ERL_NIF_IOQ_NORMAL</tag> + <item><p>Create a normal I/O Queue</p></item> + </taglist> + </item> </taglist> </section> @@ -1143,6 +1248,31 @@ typedef enum { </func> <func> + <name><ret>void</ret> + <nametext>enif_free_iovec(ErlNifIOvec* iov)</nametext></name> + <fsummary>Free an ErlIOVec</fsummary> + <desc> + <p>Frees an io vector returned from + <seealso marker="#enif_inspect_iovec"> + <c>enif_inspect_iovec</c></seealso>. + This is needed only if a <c>NULL</c> environment is passed to + <seealso marker="#enif_inspect_iovec"> + <c>enif_inspect_iovec</c></seealso>.</p> + <code type="none"><![CDATA[ +ErlNifIOVec *iovec = NULL; +size_t max_elements = 128; +ERL_NIF_TERM tail; +if (!enif_inspect_iovec(NULL, max_elements, term, &tail, iovec)) + return 0; + +// Do things with the iovec + +/* Free the iovector, possibly in another thread or nif function call */ +enif_free_iovec(iovec);]]></code> + </desc> + </func> + + <func> <name><ret>int</ret><nametext>enif_get_atom(ErlNifEnv* env, ERL_NIF_TERM term, char* buf, unsigned size, ErlNifCharEncoding encode)</nametext> </name> @@ -1449,6 +1579,127 @@ typedef enum { </func> <func> + <name><ret>int</ret><nametext>enif_inspect_iovec(ErlNifEnv* + env, size_t max_elements, ERL_NIF_TERM iovec_term, ERL_NIF_TERM* tail, + ErlNifIOVec** iovec)</nametext></name> + <fsummary>Inspect a list of binaries as an ErlNifIOVec.</fsummary> + <desc> + <p>Fills <c>iovec</c> with the list of binaries provided in + <c>iovec_term</c>. The number of elements handled in the call is + limited to <c>max_elements</c>, and <c>tail</c> is set to the + remainder of the list. Note that the output may be longer than + <c>max_elements</c> on some platforms. + </p> + <p>To create a list of binaries from an arbitrary iolist, use + <seealso marker="erts:erlang#iolist_to_iovec/1"> + <c>erlang:iolist_to_iovec/1</c></seealso>.</p> + <p>When calling this function, <c>iovec</c> should contain a pointer to + <c>NULL</c> or a ErlNifIOVec structure that should be used if + possible. e.g. + </p> + <code type="none"> +/* Don't use a pre-allocated structure */ +ErlNifIOVec *iovec = NULL; +enif_inspect_iovec(env, max_elements, term, &tail, &iovec); + +/* Use a stack-allocated vector as an optimization for vectors with few elements */ +ErlNifIOVec vec, *iovec = &vec; +enif_inspect_iovec(env, max_elements, term, &tail, &iovec); +</code> + <p>The contents of the <c>iovec</c> is valid until the called nif + function returns. If the <c>iovec</c> should be valid after the nif + call returns, it is possible to call this function with a + <c>NULL</c> environment. If no environment is given the <c>iovec</c> + owns the data in the vector and it has to be explicitly freed using + <seealso marker="#enif_free_iovec"><c>enif_free_iovec</c> + </seealso>.</p> + <p>Returns <c>true</c> on success, or <c>false</c> if <c>iovec_term</c> + not an iovec.</p> + </desc> + </func> + + <func> + <name><ret>ErlNifIOQueue *</ret> + <nametext>enif_ioq_create(ErlNifIOQueueOpts opts)</nametext></name> + <fsummary>Create a new IO Queue</fsummary> + <desc> + <p>Create a new I/O Queue that can be used to store data. + <c>opts</c> has to be set to <c>ERL_NIF_IOQ_NORMAL</c>. + </p> + </desc> + </func> + + <func> + <name><ret>void</ret> + <nametext>enif_ioq_destroy(ErlNifIOQueue *q)</nametext></name> + <fsummary>Destroy an IO Queue and free it's content</fsummary> + <desc> + <p>Destroy the I/O queue and free all of it's contents</p> + </desc> + </func> + + <func> + <name><ret>int</ret> + <nametext>enif_ioq_deq(ErlNifIOQueue *q, size_t count, size_t *size)</nametext></name> + <fsummary>Dequeue count bytes from the IO Queue</fsummary> + <desc> + <p>Dequeue <c>count</c> bytes from the I/O queue. + If <c>size</c> is not <c>NULL</c>, the new size of the queue + is placed there.</p> + <p>Returns <c>true</c> on success, or <c>false</c> if the I/O does + not contain <c>count</c> bytes. On failure the queue is left un-altered.</p> + </desc> + </func> + + <func> + <name><ret>int</ret> + <nametext>enif_ioq_enq_binary(ErlNifIOQueue *q, ErlNifBinary *bin, size_t skip)</nametext></name> + <fsummary>Enqueue the binary into the IO Queue</fsummary> + <desc> + <p>Enqueue the <c>bin</c> into <c>q</c> skipping the first <c>skip</c> bytes.</p> + <p>Returns <c>true</c> on success, or <c>false</c> if <c>skip</c> is greater + than the size of <c>bin</c>. Any ownership of the binary data is transferred + to the queue and <c>bin</c> is to be considered read-only for the rest of the NIF + call and then as released.</p> + </desc> + </func> + + <func> + <name><ret>int</ret> + <nametext>enif_ioq_enqv(ErlNifIOQueue *q, ErlNifIOVec *iovec, size_t skip)</nametext></name> + <fsummary>Enqueue the iovec into the IO Queue</fsummary> + <desc> + <p>Enqueue the <c>iovec</c> into <c>q</c> skipping the first <c>skip</c> bytes.</p> + <p>Returns <c>true</c> on success, or <c>false</c> if <c>skip</c> is greater + than the size of <c>iovec</c>.</p> + </desc> + </func> + + <func> + <name><ret>SysIOVec *</ret> + <nametext>enif_ioq_peek(ErlNifIOQueue *q, int *iovlen)</nametext></name> + <fsummary>Peek inside the IO Queue</fsummary> + <desc> + <p>Get the I/O queue as a pointer to an array of <c>SysIOVec</c>s. + It also returns the number of elements in <c>iovlen</c>. + This is the only way to get data out of the queue.</p> + <p>Nothing is removed from the queue by this function, that must be done + with <seealso marker="#enif_ioq_deq"><c>enif_ioq_deq</c></seealso>.</p> + <p>The returned array is suitable to use with the Unix system + call <c>writev</c>.</p> + </desc> + </func> + + <func> + <name><ret>size_t</ret> + <nametext>enif_ioq_size(ErlNifIOQueue *q)</nametext></name> + <fsummary>Get the current size of the IO Queue</fsummary> + <desc> + <p>Get the size of <c>q</c>.</p> + </desc> + </func> + + <func> <name><ret>int</ret> <nametext>enif_is_atom(ErlNifEnv* env, ERL_NIF_TERM term)</nametext> </name> @@ -1952,10 +2203,33 @@ typedef enum { details, see the <seealso marker="#enif_resource_example">example of creating and returning a resource object</seealso> in the User's Guide.</p> - <p>Notice that the only defined behavior of using a resource term in - an Erlang program is to store it and send it between processes on the - same node. Other operations, such as matching or - <c>term_to_binary</c>, have unpredictable (but harmless) results.</p> + <note> + <p>Since ERTS 9.0 (OTP-20.0), resource terms have a defined behavior + when compared and serialized through <c>term_to_binary</c> or passed + between nodes.</p> + <list type="bulleted"> + <item> + <p>Two resource terms will compare equal iff they + would yield the same resource object pointer when passed to + <seealso marker="#enif_get_resource"><c>enif_get_resource</c></seealso>.</p> + </item> + <item> + <p>A resoure term can be serialized with <c>term_to_binary</c> and later + be fully recreated if the resource object is still alive when + <c>binary_to_term</c> is called. A <em>stale</em> resource term will be + returned from <c>binary_to_term</c> if the resource object has + been deallocated. <seealso marker="#enif_get_resource"><c>enif_get_resource</c></seealso> + will return false for stale resource terms.</p> + <p>The same principles of serialization apply when passing + resource terms in messages to remote nodes and back again. A + resource term will act stale on all nodes except the node where + its resource object is still alive in memory.</p> + </item> + </list> + <p>Before ERTS 9.0 (OTP-20.0), all resource terms did + compare equal to each other and to empty binaries (<c><<>></c>). + If serialized, they would be recreated as plain empty binaries.</p> + </note> </desc> </func> diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml index 11f2d36a01..5afac46d21 100644 --- a/erts/doc/src/erlang.xml +++ b/erts/doc/src/erlang.xml @@ -60,6 +60,14 @@ </desc> </datatype> <datatype> + <name>iovec()</name> + <desc> + <p>A list of binaries. This datatype is useful to use + together with <seealso marker="erl_nif#enif_inspect_iovec"> + <c>enif_inspect_iovec</c></seealso>.</p> + </desc> + </datatype> + <datatype> <name name="message_queue_data"></name> <desc> <p>See <seealso marker="#process_flag_message_queue_data"> @@ -2128,6 +2136,15 @@ os_prompt%</pre> </func> <func> + <name name="iolist_to_iovec" arity="1"/> + <fsummary>Converts an iolist to a iovec.</fsummary> + <desc> + <p>Returns an iovec that is made from the integers and binaries in + <c><anno>IoListOrBinary</anno></c>.</p> + </desc> + </func> + + <func> <name name="is_alive" arity="0"/> <fsummary>Check whether the local node is alive.</fsummary> <desc> diff --git a/erts/doc/src/zlib.xml b/erts/doc/src/zlib.xml index 1d272c4c18..f5cc1b1e64 100644 --- a/erts/doc/src/zlib.xml +++ b/erts/doc/src/zlib.xml @@ -65,13 +65,17 @@ list_to_binary([Compressed|Last])</pre> <tag><c>badarg</c></tag> <item>Bad argument. </item> + <tag><c>not_initialized</c></tag> + <item>The stream hasn't been initialized, eg. if + <seealso marker="#inflateInit/1"><c>inflateInit/1</c></seealso> wasn't + called prior to a call to + <seealso marker="#inflate/2"><c>inflate/2</c></seealso>. + </item> <tag><c>data_error</c></tag> <item>The data contains errors. </item> <tag><c>stream_error</c></tag> <item>Inconsistent stream state.</item> - <tag><c>einval</c></tag> - <item>Bad value or wrong function called.</item> <tag><c>{need_dictionary,Adler32}</c></tag> <item>See <seealso marker="#inflate/2"><c>inflate/2</c></seealso>. </item> @@ -90,6 +94,9 @@ list_to_binary([Compressed|Last])</pre> <name name="zlevel"/> </datatype> <datatype> + <name name="zflush"/> + </datatype> + <datatype> <name name="zmemlevel"/> </datatype> <datatype> @@ -112,6 +119,11 @@ list_to_binary([Compressed|Last])</pre> <fsummary>Calculate the Adler checksum.</fsummary> <desc> <p>Calculates the Adler-32 checksum for <c><anno>Data</anno></c>.</p> + <warning> + <p>This function is deprecated and will be removed in a future + release. Use <seealso marker="erts:erlang#adler32/1"> + <c>erlang:adler32/1</c></seealso> instead.</p> + </warning> </desc> </func> @@ -127,6 +139,11 @@ list_to_binary([Compressed|Last])</pre> Crc = lists:foldl(fun(Data,Crc0) -> zlib:adler32(Z, Crc0, Data), end, zlib:adler32(Z,<< >>), Datas)</pre> + <warning> + <p>This function is deprecated and will be removed in a future + release. Use <seealso marker="erts:erlang#adler32/2"> + <c>erlang:adler32/2</c></seealso> instead.</p> + </warning> </desc> </func> @@ -141,6 +158,11 @@ Crc = lists:foldl(fun(Data,Crc0) -> <p>This function returns the <c><anno>Adler</anno></c> checksum of <c>[Data1,Data2]</c>, requiring only <c><anno>Adler1</anno></c>, <c><anno>Adler2</anno></c>, and <c><anno>Size2</anno></c>.</p> + <warning> + <p>This function is deprecated and will be removed in a future + release. Use <seealso marker="erts:erlang#adler32_combine/3"> + <c>erlang:adler32_combine/3</c></seealso> instead.</p> + </warning> </desc> </func> @@ -165,6 +187,12 @@ Crc = lists:foldl(fun(Data,Crc0) -> <fsummary>Get current CRC.</fsummary> <desc> <p>Gets the current calculated CRC checksum.</p> + <warning> + <p>This function is deprecated and will be removed in a future + release. Use <seealso marker="erts:erlang#crc32/1"> + <c>erlang:crc32/1</c></seealso> on the uncompressed data + instead.</p> + </warning> </desc> </func> @@ -173,6 +201,11 @@ Crc = lists:foldl(fun(Data,Crc0) -> <fsummary>Calculate CRC.</fsummary> <desc> <p>Calculates the CRC checksum for <c><anno>Data</anno></c>.</p> + <warning> + <p>This function is deprecated and will be removed in a future + release. Use <seealso marker="erts:erlang#crc32/1"> + <c>erlang:crc32/1</c></seealso> instead.</p> + </warning> </desc> </func> @@ -188,6 +221,11 @@ Crc = lists:foldl(fun(Data,Crc0) -> Crc = lists:foldl(fun(Data,Crc0) -> zlib:crc32(Z, Crc0, Data), end, zlib:crc32(Z,<< >>), Datas)</pre> + <warning> + <p>This function is deprecated and will be removed in a future + release. Use <seealso marker="erts:erlang#crc32/2"> + <c>erlang:crc32/2</c></seealso> instead.</p> + </warning> </desc> </func> @@ -202,6 +240,11 @@ Crc = lists:foldl(fun(Data,Crc0) -> <p>This function returns the <c><anno>CRC</anno></c> checksum of <c>[Data1,Data2]</c>, requiring only <c><anno>CRC1</anno></c>, <c><anno>CRC2</anno></c>, and <c><anno>Size2</anno></c>.</p> + <warning> + <p>This function is deprecated and will be removed in a future + release. Use <seealso marker="erts:erlang#crc32_combine/3"> + <c>erlang:crc32_combine/3</c></seealso> instead.</p> + </warning> </desc> </func> @@ -407,8 +450,8 @@ list_to_binary([B1,B2])</pre> <seealso marker="#deflateInit/1"><c>deflateInit/1,2,6</c></seealso> or <seealso marker="#deflateReset/1"><c>deflateReset/1</c></seealso>, before any call of - <seealso marker="#deflate/3"><c>deflate/3</c></seealso>. - The compressor and decompressor must use the same dictionary (see + <seealso marker="#deflate/3"><c>deflate/3</c></seealso>.</p> + <p>The compressor and decompressor must use the same dictionary (see <seealso marker="#inflateSetDictionary/2"> <c>inflateSetDictionary/2</c></seealso>).</p> <p>The Adler checksum of the dictionary is returned.</p> @@ -420,6 +463,10 @@ list_to_binary([B1,B2])</pre> <fsummary>Get buffer size.</fsummary> <desc> <p>Gets the size of the intermediate buffer.</p> + <warning> + <p>This function is deprecated and will be removed in a future + release.</p> + </warning> </desc> </func> @@ -443,14 +490,31 @@ list_to_binary([B1,B2])</pre> <name name="inflate" arity="2"/> <fsummary>Decompress data.</fsummary> <desc> - <p>Decompresses as much data as possible. - It can introduce some output latency (reading - input without producing any output).</p> - <p>If a preset dictionary is needed at this point (see - <seealso marker="#inflateSetDictionary/2"> - <c>inflateSetDictionary/2</c></seealso>), <c>inflate/2</c> throws a - <c>{need_dictionary,Adler}</c> exception, where <c>Adler</c> is - the Adler-32 checksum of the dictionary chosen by the compressor.</p> + <p>Equivalent to + <seealso marker="#inflate/3"><c>inflate(Z, Data, [])</c></seealso> + </p> + </desc> + </func> + + <func> + <name name="inflate" arity="3"/> + <fsummary>Decompress data.</fsummary> + <desc> + <p>Decompresses as much data as possible. It can introduce some output + latency (reading input without producing any output).</p> + <p>Currently the only available option is + <c>{exception_on_need_dict,boolean()}</c> which controls whether the + function should throw an exception when a preset dictionary is + required for decompression. When set to false, a + <c>need_dictionary</c> tuple will be returned instead. See + <seealso marker="#inflateSetDictionary/2"> + <c>inflateSetDictionary/2</c></seealso> for details.</p> + <warning> + <p>This option defaults to <c>true</c> for backwards compatibility + but we intend to remove the exception behavior in a future + release. New code that needs to handle dictionaries manually + should always specify <c>{exception_on_need_dict,false}</c>.</p> + </warning> </desc> </func> @@ -458,6 +522,11 @@ list_to_binary([B1,B2])</pre> <name name="inflateChunk" arity="1"/> <fsummary>Read next uncompressed chunk.</fsummary> <desc> + <warning> + <p>This function is deprecated and will be removed in a future + release. Use <seealso marker="#safeInflate/2"><c>safeInflate/2</c> + </seealso> instead.</p> + </warning> <p>Reads the next chunk of uncompressed data, initialized by <seealso marker="#inflateChunk/2"><c>inflateChunk/2</c></seealso>.</p> <p>This function is to be repeatedly called, while it returns @@ -469,23 +538,27 @@ list_to_binary([B1,B2])</pre> <name name="inflateChunk" arity="2"/> <fsummary>Decompress data with limited output size.</fsummary> <desc> + <warning> + <p>This function is deprecated and will be removed in a future + release. Use <seealso marker="#safeInflate/2"><c>safeInflate/2</c> + </seealso> instead.</p> + </warning> <p>Like <seealso marker="#inflate/2"><c>inflate/2</c></seealso>, - but decompresses no more data than will fit in the buffer configured - through <seealso marker="#setBufSize/2"><c>setBufSize/2</c></seealso>. - Is is useful when decompressing a stream with a high compression - ratio, such that a small amount of compressed input can expand up to - 1000 times.</p> + but decompresses no more data than will fit in the buffer configured + through <seealso marker="#setBufSize/2"><c>setBufSize/2</c> + </seealso>. Is is useful when decompressing a stream with a high + compression ratio, such that a small amount of compressed input can + expand up to 1000 times.</p> <p>This function returns <c>{more, Decompressed}</c>, when there is - more output available, and - <seealso marker="#inflateChunk/1"><c>inflateChunk/1</c></seealso> - is to be used to read it.</p> - <p>This function can introduce some output latency (reading - input without producing any output).</p> - <p>If a preset dictionary is needed at this point (see - <seealso marker="#inflateSetDictionary/2"> - <c>inflateSetDictionary/2</c></seealso>), this function throws a - <c>{need_dictionary,Adler}</c> exception, where <c>Adler</c> is - the Adler-32 checksum of the dictionary chosen by the compressor.</p> + more output available, and + <seealso marker="#inflateChunk/1"><c>inflateChunk/1</c></seealso> + is to be used to read it.</p> + <p>This function can introduce some output latency (reading input + without producing any output).</p> + <p>An exception will be thrown if a preset dictionary is required for + further decompression. See + <seealso marker="#inflateSetDictionary/2"> + <c>inflateSetDictionary/2</c></seealso> for details.</p> <p>Example:</p> <pre> walk(Compressed, Handler) -> @@ -517,6 +590,18 @@ loop(Z, Handler, Uncompressed) -> </func> <func> + <name name="inflateGetDictionary" arity="1"/> + <fsummary>Return the decompression dictionary.</fsummary> + <desc> + <p>Returns the decompression dictionary currently in use + by the stream. This function must be called between + <seealso marker="#inflateInit/1"><c>inflateInit/1,2</c></seealso> + and <seealso marker="#inflateEnd/1"><c>inflateEnd</c></seealso>.</p> + <p>Only supported if ERTS was compiled with zlib >= 1.2.8.</p> + </desc> + </func> + + <func> <name name="inflateInit" arity="1"/> <fsummary>Initialize a session for decompression.</fsummary> <desc> @@ -562,45 +647,83 @@ loop(Z, Handler, Uncompressed) -> <fsummary>Initialize the decompression dictionary.</fsummary> <desc> <p>Initializes the decompression dictionary from the specified - uncompressed byte sequence. This function must be called - immediately after a call of - <seealso marker="#inflate/2"><c>inflate/2</c></seealso> - if this call threw a <c>{need_dictionary,Adler}</c> exception. - The dictionary chosen by the compressor can be determined from the - Adler value thrown by the call to <c>inflate/2</c>. - The compressor and decompressor must use the same dictionary (see - <seealso marker="#deflateSetDictionary/2"> - <c>deflateSetDictionary/2</c></seealso>).</p> + uncompressed byte sequence. This function must be called as a + response to an inflate operation (eg. + <seealso marker="#safeInflate/2"><c>safeInflate/2</c></seealso>) + returning <c>{need_dictionary,Adler,Output}</c> or in the case of + deprecated functions, throwing an + <c>{'EXIT',{{need_dictionary,Adler},_StackTrace}}</c> exception.</p> + <p>The dictionary chosen by the compressor can be determined from the + Adler value returned or thrown by the call to the inflate function. + The compressor and decompressor must use the same dictionary (See + <seealso marker="#deflateSetDictionary/2"> + <c>deflateSetDictionary/2</c></seealso>).</p> + <p>After setting the dictionary the inflate operation should be + retried without new input.</p> <p>Example:</p> <pre> -unpack(Z, Compressed, Dict) -> +deprecated_unpack(Z, Compressed, Dict) -> case catch zlib:inflate(Z, Compressed) of - {'EXIT',{{need_dictionary,DictID},_}} -> - zlib:inflateSetDictionary(Z, Dict), + {'EXIT',{{need_dictionary,_DictID},_}} -> + ok = zlib:inflateSetDictionary(Z, Dict), Uncompressed = zlib:inflate(Z, []); Uncompressed -> Uncompressed - end.</pre> + end. + +new_unpack(Z, Compressed, Dict) -> + case zlib:inflate(Z, Compressed, [{exception_on_need_dict, false}]) of + {need_dictionary, _DictId, Output} -> + ok = zlib:inflateSetDictionary(Z, Dict), + [Output | zlib:inflate(Z, [])]; + Uncompressed -> + Uncompressed + end.</pre> </desc> </func> <func> - <name name="inflateGetDictionary" arity="1"/> - <fsummary>Return the decompression dictionary.</fsummary> + <name name="open" arity="0"/> + <fsummary>Open a stream and return a stream reference.</fsummary> <desc> - <p>Returns the decompression dictionary currently in use - by the stream. This function must be called between - <seealso marker="#inflateInit/1"><c>inflateInit/1,2</c></seealso> - and <seealso marker="#inflateEnd/1"><c>inflateEnd</c></seealso>.</p> - <p>Only supported if ERTS was compiled with zlib >= 1.2.8.</p> + <p>Opens a zlib stream.</p> </desc> </func> <func> - <name name="open" arity="0"/> - <fsummary>Open a stream and return a stream reference.</fsummary> + <name name="safeInflate" arity="2"/> + <fsummary>Decompress data with limited output size.</fsummary> <desc> - <p>Opens a zlib stream.</p> + <p>Like <seealso marker="#inflate/2"><c>inflate/2</c></seealso>, + but returns once it has expanded beyond a small + implementation-defined threshold. It's useful when decompressing + untrusted input which could have been maliciously crafted to expand + until the system runs out of memory.</p> + <p>This function returns <c>{continue | finished, Output}</c>, where + <anno>Output</anno> is the data that was decompressed in this call. + New input can be queued up on each call if desired, and the function + will return <c>{finished, Output}</c> once all queued data has been + decompressed.</p> + <p>This function can introduce some output latency (reading + input without producing any output).</p> + <p>If a preset dictionary is required for further decompression, this + function returns a <c>need_dictionary</c> tuple. See + <seealso marker="#inflateSetDictionary/2"> + <c>inflateSetDictionary/2</c></seealso>) for details.</p> + <p>Example:</p> + <pre> +walk(Compressed, Handler) -> + Z = zlib:open(), + zlib:inflateInit(Z), + loop(Z, Handler, zlib:safeInflate(Z, Compressed)), + zlib:inflateEnd(Z), + zlib:close(Z). + +loop(Z, Handler, {continue, Output}) -> + Handler(Output), + loop(Z, Handler, zlib:safeInflate(Z, [])); +loop(Z, Handler, {finished, Output}) -> + Handler(Output).</pre> </desc> </func> @@ -609,6 +732,10 @@ unpack(Z, Compressed, Dict) -> <fsummary>Set buffer size.</fsummary> <desc> <p>Sets the intermediate buffer size.</p> + <warning> + <p>This function is deprecated and will be removed in a future + release.</p> + </warning> </desc> </func> diff --git a/erts/emulator/Makefile.in b/erts/emulator/Makefile.in index 1ae905276d..1916b97a89 100644 --- a/erts/emulator/Makefile.in +++ b/erts/emulator/Makefile.in @@ -815,17 +815,19 @@ RUN_OBJS = \ $(OBJDIR)/erl_bif_binary.o $(OBJDIR)/erl_ao_firstfit_alloc.o \ $(OBJDIR)/erl_thr_queue.o $(OBJDIR)/erl_sched_spec_pre_alloc.o \ $(OBJDIR)/erl_ptab.o $(OBJDIR)/erl_map.o \ - $(OBJDIR)/erl_msacc.o $(OBJDIR)/erl_lock_flags.o + $(OBJDIR)/erl_msacc.o $(OBJDIR)/erl_lock_flags.o \ + $(OBJDIR)/erl_io_queue.o LTTNG_OBJS = $(OBJDIR)/erlang_lttng.o -NIF_OBJS = $(OBJDIR)/erl_tracer_nif.o +NIF_OBJS = \ + $(OBJDIR)/erl_tracer_nif.o \ + $(OBJDIR)/zlib_nif.o ifeq ($(TARGET),win32) DRV_OBJS = \ $(OBJDIR)/registry_drv.o \ $(OBJDIR)/efile_drv.o \ $(OBJDIR)/inet_drv.o \ - $(OBJDIR)/zlib_drv.o \ $(OBJDIR)/ram_file_drv.o \ $(OBJDIR)/ttsl_drv.o OS_OBJS = \ @@ -855,7 +857,6 @@ OS_OBJS = \ DRV_OBJS = \ $(OBJDIR)/efile_drv.o \ $(OBJDIR)/inet_drv.o \ - $(OBJDIR)/zlib_drv.o \ $(OBJDIR)/ram_file_drv.o \ $(OBJDIR)/ttsl_drv.o endif diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index 962b00ae7b..10ca0b5066 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -679,3 +679,9 @@ bif math:ceil/1 bif math:fmod/2 bif os:set_signal/2 bif erts_internal:maps_to_list/2 + +# +# New in 20.1 +# + +bif erlang:iolist_to_iovec/1 diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h index 0e8ebf0c98..5ad616fec3 100644 --- a/erts/emulator/beam/erl_driver.h +++ b/erts/emulator/beam/erl_driver.h @@ -40,7 +40,6 @@ #include "erl_drv_nif.h" #include <stdlib.h> -#include <sys/types.h> /* ssize_t */ #if defined(__WIN32__) || defined(_WIN32) || defined(_WIN32_) #ifndef STATIC_ERLANG_DRIVER @@ -48,24 +47,6 @@ #define ERL_DRIVER_TYPES_ONLY #define WIN32_DYNAMIC_ERL_DRIVER #endif -/* - * This structure can be cast to a WSABUF structure. - */ -typedef struct _SysIOVec { - unsigned long iov_len; - char* iov_base; -} SysIOVec; -#else /* Unix */ -# ifdef HAVE_SYS_UIO_H -# include <sys/types.h> -# include <sys/uio.h> -typedef struct iovec SysIOVec; -# else -typedef struct { - char* iov_base; - size_t iov_len; -} SysIOVec; -# endif #endif #ifndef EXTERN diff --git a/erts/emulator/beam/erl_drv_nif.h b/erts/emulator/beam/erl_drv_nif.h index f88138063e..31b4817fb1 100644 --- a/erts/emulator/beam/erl_drv_nif.h +++ b/erts/emulator/beam/erl_drv_nif.h @@ -144,8 +144,25 @@ typedef signed int ErlNapiSInt; #define ERTS_NAPI_USEC__ 2 #define ERTS_NAPI_NSEC__ 3 -#endif /* __ERL_DRV_NIF_H__ */ - - - +#if (defined(__WIN32__) || defined(_WIN32) || defined(_WIN32_)) +/* + * This structure can be cast to a WSABUF structure. + */ +typedef struct _SysIOVec { + unsigned long iov_len; + char* iov_base; +} SysIOVec; +#else /* Unix */ +# include <sys/types.h> +# ifdef HAVE_SYS_UIO_H +# include <sys/uio.h> +typedef struct iovec SysIOVec; +# else +typedef struct { + char* iov_base; + size_t iov_len; +} SysIOVec; +# endif +#endif +#endif /* __ERL_DRV_NIF_H__ */ diff --git a/erts/emulator/beam/erl_io_queue.c b/erts/emulator/beam/erl_io_queue.c new file mode 100644 index 0000000000..301b55b315 --- /dev/null +++ b/erts/emulator/beam/erl_io_queue.c @@ -0,0 +1,1238 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2017. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * %CopyrightEnd% + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "sys.h" +#include "global.h" + +#define ERL_WANT_HIPE_BIF_WRAPPER__ +#include "bif.h" +#undef ERL_WANT_HIPE_BIF_WRAPPER__ + +#include "erl_bits.h" +#include "erl_io_queue.h" + +#ifndef MAX +#define MAX(X, Y) ((X) > (Y) ? (X) : (Y)) +#endif + +#ifndef MIN +#define MIN(X, Y) ((X) < (Y) ? (X) : (Y)) +#endif + +#define IOL2V_SMALL_BIN_LIMIT (ERL_ONHEAP_BIN_LIMIT * 4) + +static void free_binary(ErtsIOQBinary *b, int driver); +static ErtsIOQBinary *alloc_binary(Uint size, char *source, void **iov_base, int driver); + +void erts_ioq_init(ErtsIOQueue *q, ErtsAlcType_t alct, int driver) +{ + + ERTS_CT_ASSERT(offsetof(ErlNifIOVec,flags) == sizeof(ErtsIOVecCommon)); + ERTS_CT_ASSERT(sizeof(ErlIOVec) == sizeof(ErtsIOVecCommon)); + ERTS_CT_ASSERT(sizeof(size_t) == sizeof(ErlDrvSizeT)); + ERTS_CT_ASSERT(sizeof(size_t) == sizeof(Uint)); + + q->alct = alct; + q->driver = driver; + q->size = 0; + q->v_head = q->v_tail = q->v_start = q->v_small; + q->v_end = q->v_small + ERTS_SMALL_IO_QUEUE; + q->b_head = q->b_tail = q->b_start = q->b_small; + q->b_end = q->b_small + ERTS_SMALL_IO_QUEUE; +} + +void erts_ioq_clear(ErtsIOQueue *q) +{ + ErtsIOQBinary** binp = q->b_head; + int driver = q->driver; + + if (q->v_start != q->v_small) + erts_free(q->alct, (void *) q->v_start); + + while(binp < q->b_tail) { + if (*binp != NULL) + free_binary(*binp, driver); + binp++; + } + if (q->b_start != q->b_small) + erts_free(q->alct, (void *) q->b_start); + q->v_start = q->v_end = q->v_head = q->v_tail = NULL; + q->b_start = q->b_end = q->b_head = q->b_tail = NULL; + q->size = 0; +} + +static void free_binary(ErtsIOQBinary *b, int driver) +{ + if (driver) + driver_free_binary(&b->driver); + else if (erts_refc_dectest(&b->nif.intern.refc, 0) == 0) + erts_bin_free(&b->nif); +} + +static ErtsIOQBinary *alloc_binary(Uint size, char *source, void **iov_base, int driver) +{ + if (driver) { + ErlDrvBinary *bin = driver_alloc_binary(size); + if (!bin) return NULL; + sys_memcpy(bin->orig_bytes, source, size); + *iov_base = bin->orig_bytes; + return (ErtsIOQBinary *)bin; + } else { + /* This clause can be triggered in enif_ioq_enq_binary is used */ + Binary *bin = erts_bin_nrml_alloc(size); + if (!bin) return NULL; + erts_refc_init(&bin->intern.refc, 1); + sys_memcpy(bin->orig_bytes, source, size); + *iov_base = bin->orig_bytes; + return (ErtsIOQBinary *)bin; + } +} + +Uint erts_ioq_size(ErtsIOQueue *q) +{ + return q->size; +} + +/* expand queue to hold n elements in tail or head */ +static int expandq(ErtsIOQueue* q, int n, int tail) +/* tail: 0 if make room in head, make room in tail otherwise */ +{ + int h_sz; /* room before header */ + int t_sz; /* room after tail */ + int q_sz; /* occupied */ + int nvsz; + SysIOVec* niov; + ErtsIOQBinary** nbinv; + + h_sz = q->v_head - q->v_start; + t_sz = q->v_end - q->v_tail; + q_sz = q->v_tail - q->v_head; + + if (tail && (n <= t_sz)) /* do we need to expand tail? */ + return 0; + else if (!tail && (n <= h_sz)) /* do we need to expand head? */ + return 0; + else if (n > (h_sz + t_sz)) { /* need to allocate */ + /* we may get little extra but it ok */ + nvsz = (q->v_end - q->v_start) + n; + + niov = erts_alloc_fnf(q->alct, nvsz * sizeof(SysIOVec)); + if (!niov) + return -1; + nbinv = erts_alloc_fnf(q->alct, nvsz * sizeof(ErtsIOQBinary**)); + if (!nbinv) { + erts_free(q->alct, (void *) niov); + return -1; + } + if (tail) { + sys_memcpy(niov, q->v_head, q_sz*sizeof(SysIOVec)); + if (q->v_start != q->v_small) + erts_free(q->alct, (void *) q->v_start); + q->v_start = niov; + q->v_end = niov + nvsz; + q->v_head = q->v_start; + q->v_tail = q->v_head + q_sz; + + sys_memcpy(nbinv, q->b_head, q_sz*sizeof(ErtsIOQBinary*)); + if (q->b_start != q->b_small) + erts_free(q->alct, (void *) q->b_start); + q->b_start = nbinv; + q->b_end = nbinv + nvsz; + q->b_head = q->b_start; + q->b_tail = q->b_head + q_sz; + } + else { + sys_memcpy(niov+nvsz-q_sz, q->v_head, q_sz*sizeof(SysIOVec)); + if (q->v_start != q->v_small) + erts_free(q->alct, (void *) q->v_start); + q->v_start = niov; + q->v_end = niov + nvsz; + q->v_tail = q->v_end; + q->v_head = q->v_tail - q_sz; + + sys_memcpy(nbinv+nvsz-q_sz, q->b_head, q_sz*sizeof(ErtsIOQBinary*)); + if (q->b_start != q->b_small) + erts_free(q->alct, (void *) q->b_start); + q->b_start = nbinv; + q->b_end = nbinv + nvsz; + q->b_tail = q->b_end; + q->b_head = q->b_tail - q_sz; + } + } + else if (tail) { /* move to beginning to make room in tail */ + sys_memmove(q->v_start, q->v_head, q_sz*sizeof(SysIOVec)); + q->v_head = q->v_start; + q->v_tail = q->v_head + q_sz; + sys_memmove(q->b_start, q->b_head, q_sz*sizeof(ErtsIOQBinary*)); + q->b_head = q->b_start; + q->b_tail = q->b_head + q_sz; + } + else { /* move to end to make room */ + sys_memmove(q->v_end-q_sz, q->v_head, q_sz*sizeof(SysIOVec)); + q->v_tail = q->v_end; + q->v_head = q->v_tail-q_sz; + sys_memmove(q->b_end-q_sz, q->b_head, q_sz*sizeof(ErtsIOQBinary*)); + q->b_tail = q->b_end; + q->b_head = q->b_tail-q_sz; + } + + return 0; +} + +static +int skip(ErtsIOVec* vec, Uint skipbytes, + SysIOVec **iovp, ErtsIOQBinary ***binvp, + Uint *lenp) +{ + int n; + Uint len; + SysIOVec* iov; + ErtsIOQBinary** binv; + + if (vec->common.size <= skipbytes) + return -1; + + iov = vec->common.iov; + binv = vec->common.binv; + n = vec->common.vsize; + /* we use do here to strip iov_len=0 from beginning */ + do { + len = iov->iov_len; + if (len <= skipbytes) { + skipbytes -= len; + iov++; + binv++; + n--; + } + else { + iov->iov_base = ((char *)(iov->iov_base)) + skipbytes; + iov->iov_len -= skipbytes; + skipbytes = 0; + } + } while(skipbytes > 0); + + *binvp = binv; + *iovp = iov; + *lenp = len; + + return n; +} + +/* Put elements from vec at q tail */ +int erts_ioq_enqv(ErtsIOQueue *q, ErtsIOVec *eiov, Uint skipbytes) +{ + int n; + Uint len; + Uint size = eiov->common.size - skipbytes; + SysIOVec *iov; + ErtsIOQBinary** binv; + ErtsIOQBinary* b; + + if (q == NULL) + return -1; + + ASSERT(eiov->common.size >= skipbytes); + if (eiov->common.size <= skipbytes) + return 0; + + n = skip(eiov, skipbytes, &iov, &binv, &len); + + if (n < 0) + return n; + + if (q->v_tail + n >= q->v_end) + if (expandq(q, n, 1)) + return -1; + + /* Queue and reference all binaries (remove zero length items) */ + while(n--) { + if ((len = iov->iov_len) > 0) { + if ((b = *binv) == NULL) { /* special case create binary ! */ + b = alloc_binary(len, iov->iov_base, (void**)&q->v_tail->iov_base, + q->driver); + if (!b) return -1; + *q->b_tail++ = b; + q->v_tail->iov_len = len; + q->v_tail++; + } + else { + if (q->driver) + driver_binary_inc_refc(&b->driver); + else + erts_refc_inc(&b->nif.intern.refc, 1); + *q->b_tail++ = b; + *q->v_tail++ = *iov; + } + } + iov++; + binv++; + } + q->size += size; /* update total size in queue */ + return 0; +} + +/* Put elements from vec at q head */ +int erts_ioq_pushqv(ErtsIOQueue *q, ErtsIOVec* vec, Uint skipbytes) +{ + int n; + Uint len; + Uint size = vec->common.size - skipbytes; + SysIOVec* iov; + ErtsIOQBinary** binv; + ErtsIOQBinary* b; + + if (q == NULL) + return -1; + + ASSERT(vec->common.size >= skipbytes); + if (vec->common.size <= skipbytes) + return 0; + + n = skip(vec, skipbytes, &iov, &binv, &len); + + if (n < 0) + return n; + + if (q->v_head - n < q->v_start) + if (expandq(q, n, 0)) + return -1; + + /* Queue and reference all binaries (remove zero length items) */ + iov += (n-1); /* move to end */ + binv += (n-1); /* move to end */ + while(n--) { + if ((len = iov->iov_len) > 0) { + if ((b = *binv) == NULL) { /* special case create binary ! */ + if (q->driver) { + ErlDrvBinary *bin = driver_alloc_binary(len); + if (!bin) return -1; + sys_memcpy(bin->orig_bytes, iov->iov_base, len); + b = (ErtsIOQBinary *)bin; + q->v_head->iov_base = bin->orig_bytes; + } + *--q->b_head = b; + q->v_head--; + q->v_head->iov_len = len; + } + else { + if (q->driver) + driver_binary_inc_refc(&b->driver); + else + erts_refc_inc(&b->nif.intern.refc, 1); + *--q->b_head = b; + *--q->v_head = *iov; + } + } + iov--; + binv--; + } + q->size += size; /* update total size in queue */ + return 0; +} + + +/* +** Remove size bytes from queue head +** Return number of bytes that remain in queue +*/ +int erts_ioq_deq(ErtsIOQueue *q, Uint size) +{ + Uint len; + + if ((q == NULL) || (q->size < size)) + return -1; + q->size -= size; + while (size > 0) { + ASSERT(q->v_head != q->v_tail); + + len = q->v_head->iov_len; + if (len <= size) { + size -= len; + free_binary(*q->b_head, q->driver); + *q->b_head++ = NULL; + q->v_head++; + } + else { + q->v_head->iov_base = ((char *)(q->v_head->iov_base)) + size; + q->v_head->iov_len -= size; + size = 0; + } + } + + /* restart pointers (optimised for enq) */ + if (q->v_head == q->v_tail) { + q->v_head = q->v_tail = q->v_start; + q->b_head = q->b_tail = q->b_start; + } + return 0; +} + + +Uint erts_ioq_peekqv(ErtsIOQueue *q, ErtsIOVec *ev) { + ASSERT(ev); + + if (! q) { + return (Uint) -1; + } else { + if ((ev->common.vsize = q->v_tail - q->v_head) == 0) { + ev->common.size = 0; + ev->common.iov = NULL; + ev->common.binv = NULL; + } else { + ev->common.size = q->size; + ev->common.iov = q->v_head; + ev->common.binv = q->b_head; + } + return q->size; + } +} + +SysIOVec* erts_ioq_peekq(ErtsIOQueue *q, int* vlenp) /* length of io-vector */ +{ + + if (q == NULL) { + *vlenp = -1; + return NULL; + } + if ((*vlenp = (q->v_tail - q->v_head)) == 0) + return NULL; + return q->v_head; +} + +/* Fills a possibly deep list of chars and binaries into vec +** Small characters are first stored in the buffer buf of length ln +** binaries found are copied and linked into msoh +** Return vector length on succsess, +** -1 on overflow +** -2 on type error +*/ + +static ERTS_INLINE void +io_list_to_vec_set_vec(SysIOVec **iov, ErtsIOQBinary ***binv, + ErtsIOQBinary *bin, byte *ptr, Uint len, + int *vlen) +{ + while (len > MAX_SYSIOVEC_IOVLEN) { + (*iov)->iov_base = ptr; + (*iov)->iov_len = MAX_SYSIOVEC_IOVLEN; + ptr += MAX_SYSIOVEC_IOVLEN; + len -= MAX_SYSIOVEC_IOVLEN; + (*iov)++; + (*vlen)++; + *(*binv)++ = bin; + } + (*iov)->iov_base = ptr; + (*iov)->iov_len = len; + *(*binv)++ = bin; + (*iov)++; + (*vlen)++; +} + +int +erts_ioq_iolist_to_vec(Eterm obj, /* io-list */ + SysIOVec* iov, /* io vector */ + ErtsIOQBinary** binv, /* binary reference vector */ + ErtsIOQBinary* cbin, /* binary to store characters */ + Uint bin_limit, /* small binaries limit */ + int driver) +{ + DECLARE_ESTACK(s); + Eterm* objp; + byte *buf = NULL; + Uint len = 0; + Uint csize = 0; + int vlen = 0; + byte* cptr; + + if (cbin) { + if (driver) { + buf = (byte*)cbin->driver.orig_bytes; + len = cbin->driver.orig_size; + } else { + buf = (byte*)cbin->nif.orig_bytes; + len = cbin->nif.orig_size; + } + } + cptr = buf; + + goto L_jump_start; /* avoid push */ + + while (!ESTACK_ISEMPTY(s)) { + obj = ESTACK_POP(s); + L_jump_start: + if (is_list(obj)) { + L_iter_list: + objp = list_val(obj); + obj = CAR(objp); + if (is_byte(obj)) { + if (len == 0) + goto L_overflow; + *buf++ = unsigned_val(obj); + csize++; + len--; + } else if (is_binary(obj)) { + ESTACK_PUSH(s, CDR(objp)); + goto handle_binary; + } else if (is_list(obj)) { + ESTACK_PUSH(s, CDR(objp)); + goto L_iter_list; /* on head */ + } else if (!is_nil(obj)) { + goto L_type_error; + } + obj = CDR(objp); + if (is_list(obj)) + goto L_iter_list; /* on tail */ + else if (is_binary(obj)) { + goto handle_binary; + } else if (!is_nil(obj)) { + goto L_type_error; + } + } else if (is_binary(obj)) { + Eterm real_bin; + Uint offset; + Eterm* bptr; + Uint size; + int bitoffs; + int bitsize; + + handle_binary: + size = binary_size(obj); + ERTS_GET_REAL_BIN(obj, real_bin, offset, bitoffs, bitsize); + ASSERT(bitsize == 0); + bptr = binary_val(real_bin); + if (*bptr == HEADER_PROC_BIN) { + ProcBin* pb = (ProcBin *) bptr; + if (bitoffs != 0) { + if (len < size) { + goto L_overflow; + } + erts_copy_bits(pb->bytes+offset, bitoffs, 1, + (byte *) buf, 0, 1, size*8); + csize += size; + buf += size; + len -= size; + } else if (bin_limit && size < bin_limit) { + if (len < size) { + goto L_overflow; + } + sys_memcpy(buf, pb->bytes+offset, size); + csize += size; + buf += size; + len -= size; + } else { + ErtsIOQBinary *qbin; + if (csize != 0) { + io_list_to_vec_set_vec(&iov, &binv, cbin, + cptr, csize, &vlen); + cptr = buf; + csize = 0; + } + if (pb->flags) { + erts_emasculate_writable_binary(pb); + } + if (driver) + qbin = (ErtsIOQBinary*)Binary2ErlDrvBinary(pb->val); + else + qbin = (ErtsIOQBinary*)pb->val; + + io_list_to_vec_set_vec( + &iov, &binv, qbin, + pb->bytes+offset, size, &vlen); + } + } else { + ErlHeapBin* hb = (ErlHeapBin *) bptr; + if (len < size) { + goto L_overflow; + } + copy_binary_to_buffer(buf, 0, + ((byte *) hb->data)+offset, bitoffs, + 8*size); + csize += size; + buf += size; + len -= size; + } + } else if (!is_nil(obj)) { + goto L_type_error; + } + } + + if (csize != 0) { + io_list_to_vec_set_vec(&iov, &binv, cbin, cptr, csize, &vlen); + } + + DESTROY_ESTACK(s); + return vlen; + + L_type_error: + DESTROY_ESTACK(s); + return -2; + + L_overflow: + DESTROY_ESTACK(s); + return -1; +} + +static ERTS_INLINE int +io_list_vec_count(Eterm obj, Uint *v_size, + Uint *c_size, Uint *b_size, Uint *in_clist, + Uint *p_v_size, Uint *p_c_size, Uint *p_in_clist, + Uint blimit) +{ + Uint size = binary_size(obj); + Eterm real; + ERTS_DECLARE_DUMMY(Uint offset); + int bitoffs; + int bitsize; + ERTS_GET_REAL_BIN(obj, real, offset, bitoffs, bitsize); + if (bitsize != 0) return 1; + if (thing_subtag(*binary_val(real)) == REFC_BINARY_SUBTAG && + bitoffs == 0) { + *b_size += size; + if (*b_size < size) return 2; + *in_clist = 0; + ++*v_size; + /* If iov_len is smaller then Uint we split the binary into*/ + /* multiple smaller (2GB) elements in the iolist.*/ + *v_size += size / MAX_SYSIOVEC_IOVLEN; + if (size >= blimit) { + *p_in_clist = 0; + ++*p_v_size; + } else { + *p_c_size += size; + if (!*p_in_clist) { + *p_in_clist = 1; + ++*p_v_size; + } + } + } else { + *c_size += size; + if (*c_size < size) return 2; + if (!*in_clist) { + *in_clist = 1; + ++*v_size; + } + *p_c_size += size; + if (!*p_in_clist) { + *p_in_clist = 1; + ++*p_v_size; + } + } + return 0; +} + +#define IO_LIST_VEC_COUNT(obj) \ + do { \ + switch (io_list_vec_count(obj, &v_size, &c_size, \ + &b_size, &in_clist, \ + &p_v_size, &p_c_size, &p_in_clist, \ + blimit)) { \ + case 1: goto L_type_error; \ + case 2: goto L_overflow_error; \ + default: break; \ + } \ + } while(0) + +/* + * Returns 0 if successful and a non-zero value otherwise. + * + * Return values through pointers: + * *vsize - SysIOVec size needed for a writev + * *csize - Number of bytes not in binary (in the common binary) + * *pvsize - SysIOVec size needed if packing small binaries + * *pcsize - Number of bytes in the common binary if packing + * *total_size - Total size of iolist in bytes + */ +int +erts_ioq_iolist_vec_len(Eterm obj, int* vsize, Uint* csize, + Uint* pvsize, Uint* pcsize, + Uint* total_size, Uint blimit) +{ + DECLARE_ESTACK(s); + Eterm* objp; + Uint v_size = 0; + Uint c_size = 0; + Uint b_size = 0; + Uint in_clist = 0; + Uint p_v_size = 0; + Uint p_c_size = 0; + Uint p_in_clist = 0; + Uint total; + + goto L_jump_start; /* avoid a push */ + + while (!ESTACK_ISEMPTY(s)) { + obj = ESTACK_POP(s); + L_jump_start: + if (is_list(obj)) { + L_iter_list: + objp = list_val(obj); + obj = CAR(objp); + + if (is_byte(obj)) { + c_size++; + if (c_size == 0) { + goto L_overflow_error; + } + if (!in_clist) { + in_clist = 1; + v_size++; + } + p_c_size++; + if (!p_in_clist) { + p_in_clist = 1; + p_v_size++; + } + } + else if (is_binary(obj)) { + IO_LIST_VEC_COUNT(obj); + } + else if (is_list(obj)) { + ESTACK_PUSH(s, CDR(objp)); + goto L_iter_list; /* on head */ + } + else if (!is_nil(obj)) { + goto L_type_error; + } + + obj = CDR(objp); + if (is_list(obj)) + goto L_iter_list; /* on tail */ + else if (is_binary(obj)) { /* binary tail is OK */ + IO_LIST_VEC_COUNT(obj); + } + else if (!is_nil(obj)) { + goto L_type_error; + } + } + else if (is_binary(obj)) { + IO_LIST_VEC_COUNT(obj); + } + else if (!is_nil(obj)) { + goto L_type_error; + } + } + + total = c_size + b_size; + if (total < c_size) { + goto L_overflow_error; + } + *total_size = total; + + DESTROY_ESTACK(s); + *vsize = v_size; + *csize = c_size; + *pvsize = p_v_size; + *pcsize = p_c_size; + return 0; + + L_type_error: + L_overflow_error: + DESTROY_ESTACK(s); + return 1; +} + +typedef struct { + Eterm result_head; + Eterm result_tail; + Eterm input_list; + + UWord acc_size; + Binary *acc; + + /* We yield after copying this many bytes into the accumulator (Minus + * eating a few on consing etc). Large binaries will only count to the + * extent their split (if any) resulted in a copy op. */ + UWord bytereds_available; + UWord bytereds_spent; + + Process *process; + ErtsEStack estack; + + Eterm magic_reference; +} iol2v_state_t; + +static int iol2v_state_destructor(Binary *data) { + iol2v_state_t *state = ERTS_MAGIC_BIN_UNALIGNED_DATA(data); + + DESTROY_SAVED_ESTACK(&state->estack); + + if (state->acc != NULL) { + erts_bin_free(state->acc); + } + + return 1; +} + +static void iol2v_init(iol2v_state_t *state, Process *process, Eterm input) { + state->process = process; + + state->result_head = NIL; + state->result_tail = NIL; + state->input_list = input; + + state->magic_reference = NIL; + state->acc_size = 0; + state->acc = NULL; + + CLEAR_SAVED_ESTACK(&state->estack); +} + +static Eterm iol2v_make_sub_bin(iol2v_state_t *state, Eterm bin_term, + UWord offset, UWord size) { + Uint byte_offset, bit_offset, bit_size; + ErlSubBin *sb; + Eterm orig_pb_term; + + sb = (ErlSubBin*)HAlloc(state->process, ERL_SUB_BIN_SIZE); + + ERTS_GET_REAL_BIN(bin_term, orig_pb_term, + byte_offset, bit_offset, bit_size); + + (void)bit_offset; + (void)bit_size; + + sb->thing_word = HEADER_SUB_BIN; + sb->bitsize = 0; + sb->bitoffs = 0; + sb->orig = orig_pb_term; + sb->is_writable = 0; + + sb->offs = byte_offset + offset; + sb->size = size; + + return make_binary(sb); +} + +static Eterm iol2v_promote_acc(iol2v_state_t *state) { + ProcBin *pb; + + state->acc = erts_bin_realloc(state->acc, state->acc_size); + + pb = (ProcBin*)HAlloc(state->process, PROC_BIN_SIZE); + pb->thing_word = HEADER_PROC_BIN; + pb->size = state->acc_size; + pb->val = state->acc; + pb->bytes = (byte*)(state->acc)->orig_bytes; + pb->flags = 0; + pb->next = MSO(state->process).first; + OH_OVERHEAD(&(MSO(state->process)), pb->size / sizeof(Eterm)); + MSO(state->process).first = (struct erl_off_heap_header*)pb; + + state->acc_size = 0; + state->acc = NULL; + + return make_binary(pb); +} + +/* Destructively enqueues a term to the result list, saving us the hassle of + * having to reverse it later. This is safe since GC is disabled and we never + * leak the unfinished term to the outside. */ +static void iol2v_enqueue_result(iol2v_state_t *state, Eterm term) { + Eterm prev_tail; + Eterm *hp; + + prev_tail = state->result_tail; + + hp = HAlloc(state->process, 2); + state->result_tail = CONS(hp, term, NIL); + + if(prev_tail != NIL) { + Eterm *prev_cell = list_val(prev_tail); + CDR(prev_cell) = state->result_tail; + } else { + state->result_head = state->result_tail; + } + + state->bytereds_spent += 1; +} + +#ifndef DEBUG + #define ACC_REALLOCATION_LIMIT (IOL2V_SMALL_BIN_LIMIT * 32) +#else + #define ACC_REALLOCATION_LIMIT (IOL2V_SMALL_BIN_LIMIT * 4) +#endif + +static void iol2v_expand_acc(iol2v_state_t *state, UWord extra) { + UWord required_bytes, acc_alloc_size; + + ERTS_CT_ASSERT(ERTS_UWORD_MAX > ACC_REALLOCATION_LIMIT / 2); + ASSERT(extra >= 1); + + acc_alloc_size = state->acc != NULL ? (state->acc)->orig_size : 0; + required_bytes = state->acc_size + extra; + + if (state->acc == NULL) { + UWord new_size = MAX(required_bytes, IOL2V_SMALL_BIN_LIMIT); + + state->acc = erts_bin_nrml_alloc(new_size); + } else if (required_bytes > acc_alloc_size) { + Binary *prev_acc; + UWord new_size; + + if (acc_alloc_size >= ACC_REALLOCATION_LIMIT) { + /* We skip reallocating once we hit a certain point; it often + * results in extra copying and we're very likely to overallocate + * on anything other than absurdly long byte/heapbin sequences. */ + iol2v_enqueue_result(state, iol2v_promote_acc(state)); + iol2v_expand_acc(state, extra); + return; + } + + new_size = MAX(required_bytes, acc_alloc_size * 2); + prev_acc = state->acc; + + state->acc = erts_bin_realloc(prev_acc, new_size); + + if (prev_acc != state->acc) { + state->bytereds_spent += state->acc_size; + } + } + + state->bytereds_spent += extra; +} + +static int iol2v_append_byte_seq(iol2v_state_t *state, Eterm seq_start, Eterm *seq_end) { + Eterm lookahead, iterator; + Uint observed_bits; + SWord seq_length; + char *acc_data; + + lookahead = seq_start; + seq_length = 0; + + ASSERT(state->bytereds_available > state->bytereds_spent); + + while (is_list(lookahead)) { + Eterm *cell = list_val(lookahead); + + if (!is_small(CAR(cell))) { + break; + } + + if (seq_length * 2 >= (state->bytereds_available - state->bytereds_spent)) { + break; + } + + lookahead = CDR(cell); + seq_length += 1; + } + + ASSERT(seq_length >= 1); + + iol2v_expand_acc(state, seq_length); + + /* Bump a few extra reductions to account for list traversal. */ + state->bytereds_spent += seq_length; + + acc_data = &(state->acc)->orig_bytes[state->acc_size]; + state->acc_size += seq_length; + + iterator = seq_start; + observed_bits = 0; + + while (iterator != lookahead) { + Eterm *cell; + Uint byte; + + cell = list_val(iterator); + iterator = CDR(cell); + + byte = unsigned_val(CAR(cell)); + observed_bits |= byte; + + ASSERT(acc_data < &(state->acc)->orig_bytes[state->acc_size]); + *(acc_data++) = byte; + } + + if (observed_bits > UCHAR_MAX) { + return 0; + } + + ASSERT(acc_data == &(state->acc)->orig_bytes[state->acc_size]); + *seq_end = iterator; + + return 1; +} + +static int iol2v_append_binary(iol2v_state_t *state, Eterm bin_term) { + int is_acc_small, is_bin_small; + UWord combined_size; + UWord binary_size; + + Uint byte_offset, bit_offset, bit_size; + Eterm *parent_header; + Eterm parent_binary; + byte *binary_data; + + ASSERT(state->bytereds_available > state->bytereds_spent); + + ERTS_GET_REAL_BIN(bin_term, parent_binary, byte_offset, bit_offset, bit_size); + parent_header = binary_val(parent_binary); + binary_size = binary_size(bin_term); + + if (bit_offset != 0 || bit_size != 0) { + return 0; + } else if (binary_size == 0) { + state->bytereds_spent += 1; + return 1; + } + + is_acc_small = state->acc_size < IOL2V_SMALL_BIN_LIMIT; + is_bin_small = binary_size < IOL2V_SMALL_BIN_LIMIT; + combined_size = binary_size + state->acc_size; + + if (thing_subtag(*parent_header) == REFC_BINARY_SUBTAG) { + ProcBin *pb = (ProcBin*)parent_header; + + if (pb->flags) { + erts_emasculate_writable_binary(pb); + } + + binary_data = pb->bytes; + } else { + ErlHeapBin *hb = (ErlHeapBin*)parent_header; + + ASSERT(thing_subtag(*parent_header) == HEAP_BINARY_SUBTAG); + ASSERT(is_bin_small); + + binary_data = &((unsigned char*)&hb->data)[byte_offset]; + } + + if (!is_bin_small && (state->acc_size == 0 || !is_acc_small)) { + /* Avoid combining if we encounter an acceptably large binary while the + * accumulator is either empty or large enough to be returned on its + * own. */ + if (state->acc_size != 0) { + iol2v_enqueue_result(state, iol2v_promote_acc(state)); + } + + iol2v_enqueue_result(state, bin_term); + } else if (is_bin_small || combined_size < (IOL2V_SMALL_BIN_LIMIT * 2)) { + /* If the candidate is small or we can't split the combination in two, + * then just copy it into the accumulator. */ + iol2v_expand_acc(state, binary_size); + + sys_memcpy(&(state->acc)->orig_bytes[state->acc_size], + binary_data, binary_size); + + state->acc_size += binary_size; + } else { + /* Otherwise, append enough data for the accumulator to be valid, and + * then return the rest as a sub-binary. */ + UWord spill = IOL2V_SMALL_BIN_LIMIT - state->acc_size; + Eterm binary_tail; + + iol2v_expand_acc(state, spill); + + sys_memcpy(&(state->acc)->orig_bytes[state->acc_size], + binary_data, spill); + + state->acc_size += spill; + + binary_tail = iol2v_make_sub_bin(state, bin_term, spill, + binary_size - spill); + + iol2v_enqueue_result(state, iol2v_promote_acc(state)); + iol2v_enqueue_result(state, binary_tail); + } + + return 1; +} + +static BIF_RETTYPE iol2v_yield(iol2v_state_t *state) { + if (is_nil(state->magic_reference)) { + iol2v_state_t *boxed_state; + Binary *magic_binary; + Eterm *hp; + + magic_binary = erts_create_magic_binary_x(sizeof(*state), + &iol2v_state_destructor, ERTS_ALC_T_BINARY, 1); + + boxed_state = ERTS_MAGIC_BIN_UNALIGNED_DATA(magic_binary); + sys_memcpy(boxed_state, state, sizeof(*state)); + + hp = HAlloc(boxed_state->process, ERTS_MAGIC_REF_THING_SIZE); + boxed_state->magic_reference = + erts_mk_magic_ref(&hp, &MSO(boxed_state->process), magic_binary); + + state = boxed_state; + } + + ERTS_BIF_YIELD1(bif_export[BIF_iolist_to_iovec_1], + state->process, state->magic_reference); +} + +static BIF_RETTYPE iol2v_continue(iol2v_state_t *state) { + Eterm iterator; + + DECLARE_ESTACK(s); + ESTACK_CHANGE_ALLOCATOR(s, ERTS_ALC_T_SAVED_ESTACK); + + state->bytereds_available = + ERTS_BIF_REDS_LEFT(state->process) * IOL2V_SMALL_BIN_LIMIT; + state->bytereds_spent = 0; + + if (state->estack.start) { + ESTACK_RESTORE(s, &state->estack); + } + + iterator = state->input_list; + + for(;;) { + if (state->bytereds_spent >= state->bytereds_available) { + ESTACK_SAVE(s, &state->estack); + state->input_list = iterator; + + return iol2v_yield(state); + } + + while (is_list(iterator)) { + Eterm *cell; + Eterm head; + + cell = list_val(iterator); + head = CAR(cell); + + if (is_binary(head)) { + if (!iol2v_append_binary(state, head)) { + goto l_badarg; + } + + iterator = CDR(cell); + } else if (is_small(head)) { + Eterm seq_end; + + if (!iol2v_append_byte_seq(state, iterator, &seq_end)) { + goto l_badarg; + } + + iterator = seq_end; + } else if (is_list(head) || is_nil(head)) { + Eterm tail = CDR(cell); + + if (!is_nil(tail)) { + ESTACK_PUSH(s, tail); + } + + state->bytereds_spent += 1; + iterator = head; + } else { + goto l_badarg; + } + + if (state->bytereds_spent >= state->bytereds_available) { + ESTACK_SAVE(s, &state->estack); + state->input_list = iterator; + + return iol2v_yield(state); + } + } + + if (is_binary(iterator)) { + if (!iol2v_append_binary(state, iterator)) { + goto l_badarg; + } + } else if (!is_nil(iterator)) { + goto l_badarg; + } + + if(ESTACK_ISEMPTY(s)) { + break; + } + + iterator = ESTACK_POP(s); + } + + if (state->acc_size != 0) { + iol2v_enqueue_result(state, iol2v_promote_acc(state)); + } + + BUMP_REDS(state->process, state->bytereds_spent / IOL2V_SMALL_BIN_LIMIT); + + CLEAR_SAVED_ESTACK(&state->estack); + DESTROY_ESTACK(s); + + BIF_RET(state->result_head); + +l_badarg: + CLEAR_SAVED_ESTACK(&state->estack); + DESTROY_ESTACK(s); + + if (state->acc != NULL) { + erts_bin_free(state->acc); + state->acc = NULL; + } + + BIF_ERROR(state->process, BADARG); +} + +HIPE_WRAPPER_BIF_DISABLE_GC(iolist_to_iovec, 1) + +BIF_RETTYPE iolist_to_iovec_1(BIF_ALIST_1) { + BIF_RETTYPE result; + + if (is_nil(BIF_ARG_1)) { + BIF_RET(NIL); + } else if (is_binary(BIF_ARG_1)) { + if (binary_size(BIF_ARG_1) != 0) { + Eterm *hp = HAlloc(BIF_P, 2); + + BIF_RET(CONS(hp, BIF_ARG_1, NIL)); + } else { + BIF_RET(NIL); + } + } else if (is_internal_magic_ref(BIF_ARG_1)) { + iol2v_state_t *state; + Binary *magic; + + magic = erts_magic_ref2bin(BIF_ARG_1); + + if (ERTS_MAGIC_BIN_DESTRUCTOR(magic) != &iol2v_state_destructor) { + ASSERT(!(BIF_P->flags & F_DISABLE_GC)); + BIF_ERROR(BIF_P, BADARG); + } + + ASSERT(BIF_P->flags & F_DISABLE_GC); + + state = ERTS_MAGIC_BIN_UNALIGNED_DATA(magic); + result = iol2v_continue(state); + } else if (!is_list(BIF_ARG_1)) { + ASSERT(!(BIF_P->flags & F_DISABLE_GC)); + BIF_ERROR(BIF_P, BADARG); + } else { + iol2v_state_t state; + + iol2v_init(&state, BIF_P, BIF_ARG_1); + + erts_set_gc_state(BIF_P, 0); + + result = iol2v_continue(&state); + } + + if (result != THE_NON_VALUE || BIF_P->freason != TRAP) { + erts_set_gc_state(BIF_P, 1); + } + + BIF_RET(result); +} diff --git a/erts/emulator/beam/erl_io_queue.h b/erts/emulator/beam/erl_io_queue.h new file mode 100644 index 0000000000..51abe99510 --- /dev/null +++ b/erts/emulator/beam/erl_io_queue.h @@ -0,0 +1,201 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2017. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * %CopyrightEnd% + */ + +/* + * Description: A queue used for storing binary data that should be + * passed to writev or similar functions. Used by both + * the nif and driver api. + * + * Author: Lukas Larsson + */ + +#ifndef ERL_IO_QUEUE_H__TYPES__ +#define ERL_IO_QUEUE_H__TYPES__ + +#define ERTS_BINARY_TYPES_ONLY__ +#include "erl_binary.h" +#undef ERTS_BINARY_TYPES_ONLY__ +#include "erl_nif.h" + +#ifdef DEBUG +#define MAX_SYSIOVEC_IOVLEN (1ull << (32 - 1)) +#else +#define MAX_SYSIOVEC_IOVLEN (1ull << (sizeof(((SysIOVec*)0)->iov_len) * 8 - 1)) +#endif + +#define ERTS_SMALL_IO_QUEUE 5 + +typedef union { + ErlDrvBinary driver; + Binary nif; +} ErtsIOQBinary; + +typedef struct { + int vsize; /* length of vectors */ + Uint size; /* total size in bytes */ + SysIOVec* iov; + ErtsIOQBinary** binv; +} ErtsIOVecCommon; + +typedef union { + ErtsIOVecCommon common; + ErlIOVec driver; + ErlNifIOVec nif; +} ErtsIOVec; + +/* head/tail represent the data in the queue + * start/end represent the edges of the allocated queue + * small is used when the number of iovec elements is < SMALL_IO_QUEUE + */ +typedef struct erts_io_queue { + ErtsAlcType_t alct; + int driver; + Uint size; /* total size in bytes */ + + SysIOVec* v_start; + SysIOVec* v_end; + SysIOVec* v_head; + SysIOVec* v_tail; + SysIOVec v_small[ERTS_SMALL_IO_QUEUE]; + + ErtsIOQBinary **b_start; + ErtsIOQBinary **b_end; + ErtsIOQBinary **b_head; + ErtsIOQBinary **b_tail; + ErtsIOQBinary *b_small[ERTS_SMALL_IO_QUEUE]; + +} ErtsIOQueue; + +#endif /* ERL_IO_QUEUE_H__TYPES__ */ + +#if !defined(ERL_IO_QUEUE_H) && !defined(ERTS_IO_QUEUE_TYPES_ONLY__) +#define ERL_IO_QUEUE_H + +#include "erl_binary.h" +#include "erl_bits.h" + +void erts_ioq_init(ErtsIOQueue *q, ErtsAlcType_t alct, int driver); +void erts_ioq_clear(ErtsIOQueue *q); +Uint erts_ioq_size(ErtsIOQueue *q); +int erts_ioq_enqv(ErtsIOQueue *q, ErtsIOVec *vec, Uint skip); +int erts_ioq_pushqv(ErtsIOQueue *q, ErtsIOVec *vec, Uint skip); +int erts_ioq_deq(ErtsIOQueue *q, Uint Uint); +Uint erts_ioq_peekqv(ErtsIOQueue *q, ErtsIOVec *ev); +SysIOVec *erts_ioq_peekq(ErtsIOQueue *q, int *vlenp); +Uint erts_ioq_sizeq(ErtsIOQueue *q); + +int erts_ioq_iolist_vec_len(Eterm obj, int* vsize, Uint* csize, + Uint* pvsize, Uint* pcsize, + Uint* total_size, Uint blimit); +int erts_ioq_iolist_to_vec(Eterm obj, SysIOVec* iov, + ErtsIOQBinary** binv, ErtsIOQBinary* cbin, + Uint bin_limit, int driver_binary); + +ERTS_GLB_INLINE +int erts_ioq_iodata_vec_len(Eterm obj, int* vsize, Uint* csize, + Uint* pvsize, Uint* pcsize, + Uint* total_size, Uint blimit); +ERTS_GLB_INLINE +int erts_ioq_iodata_to_vec(Eterm obj, SysIOVec* iov, + ErtsIOQBinary** binv, ErtsIOQBinary* cbin, + Uint bin_limit, int driver_binary); + + +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_INLINE +int erts_ioq_iodata_vec_len(Eterm obj, int* vsize, Uint* csize, + Uint* pvsize, Uint* pcsize, + Uint* total_size, Uint blimit) { + if (is_binary(obj)) { + /* We optimize for when we get a procbin without a bit-offset + * that fits in one iov slot + */ + Eterm real_bin; + byte bitoffs; + byte bitsize; + ERTS_DECLARE_DUMMY(Uint offset); + Uint size = binary_size(obj); + ERTS_GET_REAL_BIN(obj, real_bin, offset, bitoffs, bitsize); + if (size < MAX_SYSIOVEC_IOVLEN && bitoffs == 0 && bitsize == 0) { + *vsize = 1; + *pvsize = 1; + if (thing_subtag(*binary_val(real_bin)) == REFC_BINARY_SUBTAG) { + *csize = 0; + *pcsize = 0; + } else { + *csize = size; + *pcsize = size; + } + *total_size = size; + return 0; + } + } + + return erts_ioq_iolist_vec_len(obj, vsize, csize, + pvsize, pcsize, total_size, blimit); +} + +ERTS_GLB_INLINE +int erts_ioq_iodata_to_vec(Eterm obj, + SysIOVec *iov, + ErtsIOQBinary **binv, + ErtsIOQBinary *cbin, + Uint bin_limit, + int driver) +{ + if (is_binary(obj)) { + Eterm real_bin; + byte bitoffs; + byte bitsize; + Uint offset; + Uint size = binary_size(obj); + ERTS_GET_REAL_BIN(obj, real_bin, offset, bitoffs, bitsize); + if (size < MAX_SYSIOVEC_IOVLEN && bitoffs == 0 && bitsize == 0) { + Eterm *bptr = binary_val(real_bin); + if (thing_subtag(*bptr) == REFC_BINARY_SUBTAG) { + ProcBin *pb = (ProcBin *)bptr; + if (pb->flags) + erts_emasculate_writable_binary(pb); + iov[0].iov_base = pb->bytes+offset; + iov[0].iov_len = size; + if (driver) + binv[0] = (ErtsIOQBinary*)Binary2ErlDrvBinary(pb->val); + else + binv[0] = (ErtsIOQBinary*)pb->val; + return 1; + } else { + ErlHeapBin* hb = (ErlHeapBin *)bptr; + byte *buf = driver ? (byte*)cbin->driver.orig_bytes : + (byte*)cbin->nif.orig_bytes; + copy_binary_to_buffer(buf, 0, ((byte *) hb->data)+offset, 0, 8*size); + iov[0].iov_base = buf; + iov[0].iov_len = size; + binv[0] = cbin; + return 1; + } + } + } + return erts_ioq_iolist_to_vec(obj, iov, binv, cbin, bin_limit, driver); +} + +#endif + +#endif /* ERL_IO_QUEUE_H */ diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c index 9caeed3273..a3016ca912 100644 --- a/erts/emulator/beam/erl_nif.c +++ b/erts/emulator/beam/erl_nif.c @@ -56,6 +56,7 @@ #include "erl_process.h" #include "erl_bif_unique.h" #include "erl_utils.h" +#include "erl_io_queue.h" #undef ERTS_WANT_NFUNC_SCHED_INTERNALS__ #define ERTS_WANT_NFUNC_SCHED_INTERNALS__ #include "erl_nfunc_sched.h" @@ -66,6 +67,13 @@ #include <limits.h> #include <stddef.h> /* offsetof */ +#ifndef MAX +#define MAX(X, Y) ((X) > (Y) ? (X) : (Y)) +#endif + +#ifndef MIN +#define MIN(X, Y) ((X) < (Y) ? (X) : (Y)) +#endif /* Information about a loaded nif library. * Each successful call to erlang:load_nif will allocate an instance of @@ -3348,6 +3356,363 @@ int enif_compare_monitors(const ErlNifMonitor *monitor1, ERTS_REF_THING_SIZE*sizeof(Eterm)); } +ErlNifIOQueue *enif_ioq_create(ErlNifIOQueueOpts opts) +{ + ErlNifIOQueue *q; + + if (opts != ERL_NIF_IOQ_NORMAL) + return NULL; + + q = enif_alloc(sizeof(ErlNifIOQueue)); + if (!q) return NULL; + erts_ioq_init(q, ERTS_ALC_T_NIF, 0); + + return q; +} + +void enif_ioq_destroy(ErlNifIOQueue *q) +{ + erts_ioq_clear(q); + enif_free(q); +} + +/* If the iovec was preallocated (Stack or otherwise) it needs to be marked as + * such to perform a proper free. */ +#define ERL_NIF_IOVEC_FLAGS_PREALLOC (1 << 0) + +void enif_free_iovec(ErlNifIOVec *iov) +{ + int i; + /* Decrement the refc of all the binaries */ + for (i = 0; i < iov->iovcnt; i++) { + Binary *bptr = ((Binary**)iov->ref_bins)[i]; + /* bptr can be null if enq_binary was used */ + if (bptr && erts_refc_dectest(&bptr->intern.refc, 0) == 0) { + erts_bin_free(bptr); + } + } + + if (!(iov->flags & ERL_NIF_IOVEC_FLAGS_PREALLOC)) { + enif_free(iov); + } +} + +typedef struct { + UWord sublist_length; + Eterm sublist_start; + Eterm sublist_end; + + UWord offheap_size; + UWord onheap_size; + + UWord iovec_len; +} iovec_slice_t; + +static int examine_iovec_term(Eterm list, UWord max_length, iovec_slice_t *result) { + Eterm lookahead; + + result->sublist_start = list; + result->sublist_length = 0; + result->offheap_size = 0; + result->onheap_size = 0; + result->iovec_len = 0; + + lookahead = result->sublist_start; + + while (is_list(lookahead)) { + Eterm *binary_header, binary; + Eterm *cell; + UWord size; + + cell = list_val(lookahead); + binary = CAR(cell); + + if (!is_binary(binary)) { + return 0; + } + + size = binary_size(binary); + binary_header = binary_val(binary); + + /* If we're a sub-binary we'll need to check our underlying binary to + * determine whether we're on-heap or not. */ + if(thing_subtag(*binary_header) == SUB_BINARY_SUBTAG) { + ErlSubBin *sb = (ErlSubBin*)binary_header; + + /* Reject bitstrings */ + if((sb->bitoffs + sb->bitsize) > 0) { + return 0; + } + + ASSERT(size <= binary_size(sb->orig)); + binary_header = binary_val(sb->orig); + } + + if(thing_subtag(*binary_header) == HEAP_BINARY_SUBTAG) { + ASSERT(size <= ERL_ONHEAP_BIN_LIMIT); + + result->iovec_len += 1; + result->onheap_size += size; + } else { + ASSERT(thing_subtag(*binary_header) == REFC_BINARY_SUBTAG); + + result->iovec_len += 1 + size / MAX_SYSIOVEC_IOVLEN; + result->offheap_size += size; + } + + result->sublist_length += 1; + lookahead = CDR(cell); + + if(result->sublist_length >= max_length) { + break; + } + } + + if (!is_nil(lookahead) && !is_list(lookahead)) { + return 0; + } + + result->sublist_end = lookahead; + + return 1; +} + +static void inspect_raw_binary_data(Eterm binary, ErlNifBinary *result) { + Eterm *parent_header; + Eterm parent_binary; + + int bit_offset, bit_size; + Uint byte_offset; + + ASSERT(is_binary(binary)); + + ERTS_GET_REAL_BIN(binary, parent_binary, byte_offset, bit_offset, bit_size); + + parent_header = binary_val(parent_binary); + + result->size = binary_size(binary); + result->bin_term = binary; + + if (thing_subtag(*parent_header) == REFC_BINARY_SUBTAG) { + ProcBin *pb = (ProcBin*)parent_header; + + ASSERT(pb->val != NULL); + ASSERT(byte_offset < pb->size); + ASSERT(&pb->bytes[byte_offset] >= (byte*)(pb->val)->orig_bytes); + + result->data = (unsigned char*)&pb->bytes[byte_offset]; + result->ref_bin = (void*)pb->val; + } else { + ErlHeapBin *hb = (ErlHeapBin*)parent_header; + + ASSERT(thing_subtag(*parent_header) == HEAP_BINARY_SUBTAG); + + result->data = &((unsigned char*)&hb->data)[byte_offset]; + result->ref_bin = NULL; + } +} + +static int fill_iovec_with_slice(ErlNifEnv *env, + iovec_slice_t *slice, + ErlNifIOVec *iovec) { + UWord onheap_offset, iovec_idx; + ErlNifBinary onheap_data; + Eterm sublist_iterator; + + /* Set up a common refc binary for all on-heap binaries. */ + if (slice->onheap_size > 0) { + if (!enif_alloc_binary(slice->onheap_size, &onheap_data)) { + return 0; + } + } + + sublist_iterator = slice->sublist_start; + onheap_offset = 0; + iovec_idx = 0; + + while (sublist_iterator != slice->sublist_end) { + ErlNifBinary raw_data; + Eterm *cell; + + cell = list_val(sublist_iterator); + inspect_raw_binary_data(CAR(cell), &raw_data); + + /* If this isn't a refc binary, copy its contents to the onheap buffer + * and reference that instead. */ + if (raw_data.ref_bin == NULL) { + ASSERT(onheap_offset < onheap_data.size); + ASSERT(slice->onheap_size > 0); + + sys_memcpy(&onheap_data.data[onheap_offset], + raw_data.data, raw_data.size); + + raw_data.data = &onheap_data.data[onheap_offset]; + raw_data.ref_bin = onheap_data.ref_bin; + } + + ASSERT(raw_data.ref_bin != NULL); + + while (raw_data.size > 0) { + UWord chunk_len = MIN(raw_data.size, MAX_SYSIOVEC_IOVLEN); + + ASSERT(iovec_idx < iovec->iovcnt); + + iovec->iov[iovec_idx].iov_base = raw_data.data; + iovec->iov[iovec_idx].iov_len = chunk_len; + + iovec->ref_bins[iovec_idx] = raw_data.ref_bin; + + raw_data.data += chunk_len; + raw_data.size -= chunk_len; + + iovec_idx += 1; + } + + sublist_iterator = CDR(cell); + } + + ASSERT(iovec_idx == iovec->iovcnt); + + if (env == NULL) { + int i; + for (i = 0; i < iovec->iovcnt; i++) { + Binary *refc_binary = (Binary*)(iovec->ref_bins[i]); + erts_refc_inc(&refc_binary->intern.refc, 1); + } + + if (slice->onheap_size > 0) { + /* Transfer ownership to the iovec; we've taken references to it in + * the above loop. */ + enif_release_binary(&onheap_data); + } + } else { + if (slice->onheap_size > 0) { + /* Attach the binary to our environment and let the GC take care of + * it after returning. */ + enif_make_binary(env, &onheap_data); + } + } + + return 1; +} + +static int create_iovec_from_slice(ErlNifEnv *env, + iovec_slice_t *slice, + ErlNifIOVec **result) { + ErlNifIOVec *iovec = *result; + + if (iovec && slice->iovec_len < ERL_NIF_IOVEC_SIZE) { + iovec->iov = iovec->small_iov; + iovec->ref_bins = iovec->small_ref_bin; + iovec->flags = ERL_NIF_IOVEC_FLAGS_PREALLOC; + } else { + UWord iov_offset, binv_offset, alloc_size; + char *alloc_base; + + iov_offset = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErlNifIOVec)); + binv_offset = iov_offset; + binv_offset += ERTS_ALC_DATA_ALIGN_SIZE(slice->iovec_len * sizeof(SysIOVec)); + alloc_size = binv_offset; + alloc_size += slice->iovec_len * sizeof(Binary*); + + /* If we have an environment we'll attach the allocated data to it. The + * GC will take care of releasing it later on. */ + if (env != NULL) { + ErlNifBinary gc_bin; + + if (!enif_alloc_binary(alloc_size, &gc_bin)) { + return 0; + } + + alloc_base = (char*)gc_bin.data; + enif_make_binary(env, &gc_bin); + } else { + alloc_base = enif_alloc(alloc_size); + } + + iovec = (ErlNifIOVec*)alloc_base; + iovec->iov = (SysIOVec*)(alloc_base + iov_offset); + iovec->ref_bins = (void**)(alloc_base + binv_offset); + iovec->flags = 0; + } + + iovec->size = slice->offheap_size + slice->onheap_size; + iovec->iovcnt = slice->iovec_len; + + if(!fill_iovec_with_slice(env, slice, iovec)) { + if (env == NULL && !(iovec->flags & ERL_NIF_IOVEC_FLAGS_PREALLOC)) { + enif_free(iovec); + } + + return 0; + } + + *result = iovec; + + return 1; +} + +int enif_inspect_iovec(ErlNifEnv *env, size_t max_elements, + ERL_NIF_TERM list, ERL_NIF_TERM *tail, + ErlNifIOVec **iov) { + iovec_slice_t slice; + + if(!examine_iovec_term(list, max_elements, &slice)) { + return 0; + } else if(!create_iovec_from_slice(env, &slice, iov)) { + return 0; + } + + (*tail) = slice.sublist_end; + + return 1; +} + +/* */ +int enif_ioq_enqv(ErlNifIOQueue *q, ErlNifIOVec *iov, size_t skip) +{ + if(skip <= iov->size) { + return !erts_ioq_enqv(q, (ErtsIOVec*)iov, skip); + } + + return 0; +} + +int enif_ioq_enq_binary(ErlNifIOQueue *q, ErlNifBinary *bin, size_t skip) +{ + ErlNifIOVec vec = {1, bin->size, NULL, NULL, ERL_NIF_IOVEC_FLAGS_PREALLOC }; + Binary *ref_bin = (Binary*)bin->ref_bin; + int res; + vec.iov = vec.small_iov; + vec.ref_bins = vec.small_ref_bin; + vec.iov[0].iov_base = bin->data; + vec.iov[0].iov_len = bin->size; + ((Binary**)(vec.ref_bins))[0] = ref_bin; + + res = enif_ioq_enqv(q, &vec, skip); + enif_release_binary(bin); + return res; +} + +size_t enif_ioq_size(ErlNifIOQueue *q) +{ + return erts_ioq_size(q); +} + +int enif_ioq_deq(ErlNifIOQueue *q, size_t elems, size_t *size) +{ + if (erts_ioq_deq(q, elems) == -1) + return 0; + if (size) + *size = erts_ioq_size(q); + return 1; +} + +SysIOVec *enif_ioq_peek(ErlNifIOQueue *q, int *iovlen) +{ + return erts_ioq_peekq(q, iovlen); +} + /*************************************************************************** ** load_nif/2 ** ***************************************************************************/ diff --git a/erts/emulator/beam/erl_nif.h b/erts/emulator/beam/erl_nif.h index b0d5c39798..d195721054 100644 --- a/erts/emulator/beam/erl_nif.h +++ b/erts/emulator/beam/erl_nif.h @@ -50,6 +50,7 @@ ** 2.9: 18.2 enif_getenv ** 2.10: Time API ** 2.11: 19.0 enif_snprintf +** 2.12: 20.0 add enif_queue */ #define ERL_NIF_MAJOR_VERSION 2 #define ERL_NIF_MINOR_VERSION 12 @@ -241,6 +242,28 @@ typedef enum { ERL_NIF_PHASH2 = 2 } ErlNifHash; +#define ERL_NIF_IOVEC_SIZE 16 + +typedef struct erl_nif_io_vec { + int iovcnt; /* length of vectors */ + size_t size; /* total size in bytes */ + SysIOVec *iov; + + /* internals (avert your eyes) */ + void **ref_bins; /* Binary[] */ + int flags; + + /* Used when stack allocating the io vec */ + SysIOVec small_iov[ERL_NIF_IOVEC_SIZE]; + void *small_ref_bin[ERL_NIF_IOVEC_SIZE]; +} ErlNifIOVec; + +typedef struct erts_io_queue ErlNifIOQueue; + +typedef enum { + ERL_NIF_IOQ_NORMAL = 1 +} ErlNifIOQueueOpts; + /* * Return values from enif_thread_type(). Negative values * reserved for specific types of non-scheduler threads. diff --git a/erts/emulator/beam/erl_nif_api_funcs.h b/erts/emulator/beam/erl_nif_api_funcs.h index 94c04cd126..9e573307d8 100644 --- a/erts/emulator/beam/erl_nif_api_funcs.h +++ b/erts/emulator/beam/erl_nif_api_funcs.h @@ -184,6 +184,21 @@ ERL_NIF_API_FUNC_DECL(ErlNifUInt64,enif_hash,(ErlNifHash type, ERL_NIF_TERM term ERL_NIF_API_FUNC_DECL(int, enif_whereis_pid, (ErlNifEnv *env, ERL_NIF_TERM name, ErlNifPid *pid)); ERL_NIF_API_FUNC_DECL(int, enif_whereis_port, (ErlNifEnv *env, ERL_NIF_TERM name, ErlNifPort *port)); +ERL_NIF_API_FUNC_DECL(ErlNifIOQueue *,enif_ioq_create,(ErlNifIOQueueOpts opts)); +ERL_NIF_API_FUNC_DECL(void,enif_ioq_destroy,(ErlNifIOQueue *q)); + +ERL_NIF_API_FUNC_DECL(int,enif_ioq_enq_binary,(ErlNifIOQueue *q, ErlNifBinary *bin, size_t skip)); +ERL_NIF_API_FUNC_DECL(int,enif_ioq_enqv,(ErlNifIOQueue *q, ErlNifIOVec *iov, size_t skip)); + +ERL_NIF_API_FUNC_DECL(size_t,enif_ioq_size,(ErlNifIOQueue *q)); +ERL_NIF_API_FUNC_DECL(int,enif_ioq_deq,(ErlNifIOQueue *q, size_t count, size_t *size)); + +ERL_NIF_API_FUNC_DECL(SysIOVec*,enif_ioq_peek,(ErlNifIOQueue *q, int *iovlen)); + +ERL_NIF_API_FUNC_DECL(int,enif_inspect_iovec,(ErlNifEnv *env, size_t max_length, ERL_NIF_TERM iovec_term, ERL_NIF_TERM *tail, ErlNifIOVec **iovec)); +ERL_NIF_API_FUNC_DECL(void,enif_free_iovec,(ErlNifIOVec *iov)); + + /* ** ADD NEW ENTRIES HERE (before this comment) !!! */ @@ -348,6 +363,16 @@ ERL_NIF_API_FUNC_DECL(int, enif_whereis_port, (ErlNifEnv *env, ERL_NIF_TERM name # define enif_hash ERL_NIF_API_FUNC_MACRO(enif_hash) # define enif_whereis_pid ERL_NIF_API_FUNC_MACRO(enif_whereis_pid) # define enif_whereis_port ERL_NIF_API_FUNC_MACRO(enif_whereis_port) +# define enif_ioq_create ERL_NIF_API_FUNC_MACRO(enif_ioq_create) +# define enif_ioq_destroy ERL_NIF_API_FUNC_MACRO(enif_ioq_destroy) +# define enif_ioq_enq ERL_NIF_API_FUNC_MACRO(enif_ioq_enq) +# define enif_ioq_enq_binary ERL_NIF_API_FUNC_MACRO(enif_ioq_enq_binary) +# define enif_ioq_enqv ERL_NIF_API_FUNC_MACRO(enif_ioq_enqv) +# define enif_ioq_size ERL_NIF_API_FUNC_MACRO(enif_ioq_size) +# define enif_ioq_deq ERL_NIF_API_FUNC_MACRO(enif_ioq_deq) +# define enif_ioq_peek ERL_NIF_API_FUNC_MACRO(enif_ioq_peek) +# define enif_inspect_iovec ERL_NIF_API_FUNC_MACRO(enif_inspect_iovec) +# define enif_free_iovec ERL_NIF_API_FUNC_MACRO(enif_free_iovec) /* ** ADD NEW ENTRIES HERE (before this comment) diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h index 6a3213ec52..b64de624dd 100644 --- a/erts/emulator/beam/erl_port.h +++ b/erts/emulator/beam/erl_port.h @@ -31,6 +31,9 @@ typedef struct ErtsProc2PortSigData_ ErtsProc2PortSigData; #include "erl_ptab.h" #include "erl_thr_progress.h" #include "erl_trace.h" +#define ERTS_IO_QUEUE_TYPES_ONLY__ +#include "erl_io_queue.h" +#undef ERTS_IO_QUEUE_TYPES_ONLY__ #ifndef __WIN32__ #define ERTS_DEFAULT_MAX_PORTS (1 << 16) @@ -75,23 +78,8 @@ typedef struct erts_driver_t_ erts_driver_t; #define ERTS_Port2ErlDrvPort(PH) ((ErlDrvPort) (PH)) #endif -#define SMALL_IO_QUEUE 5 /* Number of fixed elements */ +typedef ErtsIOQueue ErlPortIOQueue; -typedef struct { - ErlDrvSizeT size; /* total size in bytes */ - - SysIOVec* v_start; - SysIOVec* v_end; - SysIOVec* v_head; - SysIOVec* v_tail; - SysIOVec v_small[SMALL_IO_QUEUE]; - - ErlDrvBinary** b_start; - ErlDrvBinary** b_end; - ErlDrvBinary** b_head; - ErlDrvBinary** b_tail; - ErlDrvBinary* b_small[SMALL_IO_QUEUE]; -} ErlIOQueue; typedef struct line_buf { /* Buffer used in line oriented I/O */ ErlDrvSizeT bufsiz; /* Size of character buffer */ @@ -172,7 +160,7 @@ struct _erl_drv_port { Uint bytes_in; /* Number of bytes read */ Uint bytes_out; /* Number of bytes written */ - ErlIOQueue ioq; /* driver accessible i/o queue */ + ErlPortIOQueue ioq; /* driver accessible i/o queue */ DistEntry *dist_entry; /* Dist entry used in DISTRIBUTION */ char *name; /* String used in the open */ erts_driver_t* drv_ptr; diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index b609f6de39..3a5ddde5f4 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -52,6 +52,7 @@ #include "erl_bif_unique.h" #include "erl_hl_timer.h" #include "erl_time.h" +#include "erl_io_queue.h" extern ErlDrvEntry fd_driver_entry; extern ErlDrvEntry vanilla_driver_entry; @@ -108,7 +109,7 @@ static void driver_monitor_unlock_pdl(Port *p); #define ERL_SMALL_IO_BIN_LIMIT (4*ERL_ONHEAP_BIN_LIMIT) #define SMALL_WRITE_VEC 16 -static ERTS_INLINE ErlIOQueue* +static ERTS_INLINE ErlPortIOQueue* drvport2ioq(ErlDrvPort drvport) { Port *prt = erts_thr_drvport2port(drvport, 0); @@ -123,11 +124,11 @@ is_port_ioq_empty(Port *pp) int res; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); if (!pp->port_data_lock) - res = (pp->ioq.size == 0); + res = (erts_ioq_size(&pp->ioq) == 0); else { ErlDrvPDL pdl = pp->port_data_lock; erts_mtx_lock(&pdl->mtx); - res = (pp->ioq.size == 0); + res = (erts_ioq_size(&pp->ioq) == 0); erts_mtx_unlock(&pdl->mtx); } return res; @@ -142,14 +143,14 @@ erts_is_port_ioq_empty(Port *pp) Uint erts_port_ioq_size(Port *pp) { - int res; + ErlDrvSizeT res; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); if (!pp->port_data_lock) - res = pp->ioq.size; + res = erts_ioq_size(&pp->ioq); else { ErlDrvPDL pdl = pp->port_data_lock; erts_mtx_lock(&pdl->mtx); - res = pp->ioq.size; + res = erts_ioq_size(&pp->ioq); erts_mtx_unlock(&pdl->mtx); } return (Uint) res; @@ -508,41 +509,17 @@ erts_port_free(Port *prt) */ static void initq(Port* prt) { - ErlIOQueue* q = &prt->ioq; - ERTS_LC_ASSERT(!prt->port_data_lock); - - q->size = 0; - q->v_head = q->v_tail = q->v_start = q->v_small; - q->v_end = q->v_small + SMALL_IO_QUEUE; - q->b_head = q->b_tail = q->b_start = q->b_small; - q->b_end = q->b_small + SMALL_IO_QUEUE; + erts_ioq_init(&prt->ioq, ERTS_ALC_T_IOQ, 1); } static void stopq(Port* prt) { - ErlIOQueue* q; - ErlDrvBinary** binp; if (prt->port_data_lock) driver_pdl_lock(prt->port_data_lock); - q = &prt->ioq; - binp = q->b_head; - - if (q->v_start != q->v_small) - erts_free(ERTS_ALC_T_IOQ, (void *) q->v_start); - - while(binp < q->b_tail) { - if (*binp != NULL) - driver_free_binary(*binp); - binp++; - } - if (q->b_start != q->b_small) - erts_free(ERTS_ALC_T_IOQ, (void *) q->b_start); - q->v_start = q->v_end = q->v_head = q->v_tail = NULL; - q->b_start = q->b_end = q->b_head = q->b_tail = NULL; - q->size = 0; + erts_ioq_clear(&prt->ioq); if (prt->port_data_lock) { driver_pdl_unlock(prt->port_data_lock); @@ -923,311 +900,6 @@ int erts_port_handle_xports(Port *prt) } #endif -/* Fills a possibly deep list of chars and binaries into vec -** Small characters are first stored in the buffer buf of length ln -** binaries found are copied and linked into msoh -** Return vector length on succsess, -** -1 on overflow -** -2 on type error -*/ - -#ifdef DEBUG -#define MAX_SYSIOVEC_IOVLEN (1ull << (32 - 1)) -#else -#define MAX_SYSIOVEC_IOVLEN (1ull << (sizeof(((SysIOVec*)0)->iov_len) * 8 - 1)) -#endif - -static ERTS_INLINE void -io_list_to_vec_set_vec(SysIOVec **iov, ErlDrvBinary ***binv, - ErlDrvBinary *bin, byte *ptr, Uint len, - int *vlen) -{ - while (len > MAX_SYSIOVEC_IOVLEN) { - (*iov)->iov_base = ptr; - (*iov)->iov_len = MAX_SYSIOVEC_IOVLEN; - ptr += MAX_SYSIOVEC_IOVLEN; - len -= MAX_SYSIOVEC_IOVLEN; - (*iov)++; - (*vlen)++; - *(*binv)++ = bin; - } - (*iov)->iov_base = ptr; - (*iov)->iov_len = len; - *(*binv)++ = bin; - (*iov)++; - (*vlen)++; -} - -static int -io_list_to_vec(Eterm obj, /* io-list */ - SysIOVec* iov, /* io vector */ - ErlDrvBinary** binv, /* binary reference vector */ - ErlDrvBinary* cbin, /* binary to store characters */ - ErlDrvSizeT bin_limit) /* small binaries limit */ -{ - DECLARE_ESTACK(s); - Eterm* objp; - byte *buf = (byte*)cbin->orig_bytes; - Uint len = cbin->orig_size; - Uint csize = 0; - int vlen = 0; - byte* cptr = buf; - - goto L_jump_start; /* avoid push */ - - while (!ESTACK_ISEMPTY(s)) { - obj = ESTACK_POP(s); - L_jump_start: - if (is_list(obj)) { - L_iter_list: - objp = list_val(obj); - obj = CAR(objp); - if (is_byte(obj)) { - if (len == 0) - goto L_overflow; - *buf++ = unsigned_val(obj); - csize++; - len--; - } else if (is_binary(obj)) { - ESTACK_PUSH(s, CDR(objp)); - goto handle_binary; - } else if (is_list(obj)) { - ESTACK_PUSH(s, CDR(objp)); - goto L_iter_list; /* on head */ - } else if (!is_nil(obj)) { - goto L_type_error; - } - obj = CDR(objp); - if (is_list(obj)) - goto L_iter_list; /* on tail */ - else if (is_binary(obj)) { - goto handle_binary; - } else if (!is_nil(obj)) { - goto L_type_error; - } - } else if (is_binary(obj)) { - Eterm real_bin; - Uint offset; - Eterm* bptr; - ErlDrvSizeT size; - int bitoffs; - int bitsize; - - handle_binary: - size = binary_size(obj); - ERTS_GET_REAL_BIN(obj, real_bin, offset, bitoffs, bitsize); - ASSERT(bitsize == 0); - bptr = binary_val(real_bin); - if (*bptr == HEADER_PROC_BIN) { - ProcBin* pb = (ProcBin *) bptr; - if (bitoffs != 0) { - if (len < size) { - goto L_overflow; - } - erts_copy_bits(pb->bytes+offset, bitoffs, 1, - (byte *) buf, 0, 1, size*8); - csize += size; - buf += size; - len -= size; - } else if (bin_limit && size < bin_limit) { - if (len < size) { - goto L_overflow; - } - sys_memcpy(buf, pb->bytes+offset, size); - csize += size; - buf += size; - len -= size; - } else { - if (csize != 0) { - io_list_to_vec_set_vec(&iov, &binv, cbin, - cptr, csize, &vlen); - cptr = buf; - csize = 0; - } - if (pb->flags) { - erts_emasculate_writable_binary(pb); - } - io_list_to_vec_set_vec( - &iov, &binv, Binary2ErlDrvBinary(pb->val), - pb->bytes+offset, size, &vlen); - } - } else { - ErlHeapBin* hb = (ErlHeapBin *) bptr; - if (len < size) { - goto L_overflow; - } - copy_binary_to_buffer(buf, 0, - ((byte *) hb->data)+offset, bitoffs, - 8*size); - csize += size; - buf += size; - len -= size; - } - } else if (!is_nil(obj)) { - goto L_type_error; - } - } - - if (csize != 0) { - io_list_to_vec_set_vec(&iov, &binv, cbin, cptr, csize, &vlen); - } - - DESTROY_ESTACK(s); - return vlen; - - L_type_error: - DESTROY_ESTACK(s); - return -2; - - L_overflow: - DESTROY_ESTACK(s); - return -1; -} - -#define IO_LIST_VEC_COUNT(obj) \ -do { \ - Uint _size = binary_size(obj); \ - Eterm _real; \ - ERTS_DECLARE_DUMMY(Uint _offset); \ - int _bitoffs; \ - int _bitsize; \ - ERTS_GET_REAL_BIN(obj, _real, _offset, _bitoffs, _bitsize); \ - if (_bitsize != 0) goto L_type_error; \ - if (thing_subtag(*binary_val(_real)) == REFC_BINARY_SUBTAG && \ - _bitoffs == 0) { \ - b_size += _size; \ - if (b_size < _size) goto L_overflow_error; \ - in_clist = 0; \ - v_size++; \ - /* If iov_len is smaller then Uint we split the binary into*/ \ - /* multiple smaller (2GB) elements in the iolist.*/ \ - v_size += _size / MAX_SYSIOVEC_IOVLEN; \ - if (_size >= ERL_SMALL_IO_BIN_LIMIT) { \ - p_in_clist = 0; \ - p_v_size++; \ - } else { \ - p_c_size += _size; \ - if (!p_in_clist) { \ - p_in_clist = 1; \ - p_v_size++; \ - } \ - } \ - } else { \ - c_size += _size; \ - if (c_size < _size) goto L_overflow_error; \ - if (!in_clist) { \ - in_clist = 1; \ - v_size++; \ - } \ - p_c_size += _size; \ - if (!p_in_clist) { \ - p_in_clist = 1; \ - p_v_size++; \ - } \ - } \ -} while (0) - - -/* - * Returns 0 if successful and a non-zero value otherwise. - * - * Return values through pointers: - * *vsize - SysIOVec size needed for a writev - * *csize - Number of bytes not in binary (in the common binary) - * *pvsize - SysIOVec size needed if packing small binaries - * *pcsize - Number of bytes in the common binary if packing - * *total_size - Total size of iolist in bytes - */ - -static int -io_list_vec_len(Eterm obj, int* vsize, Uint* csize, - Uint* pvsize, Uint* pcsize, - ErlDrvSizeT* total_size) -{ - DECLARE_ESTACK(s); - Eterm* objp; - Uint v_size = 0; - Uint c_size = 0; - Uint b_size = 0; - Uint in_clist = 0; - Uint p_v_size = 0; - Uint p_c_size = 0; - Uint p_in_clist = 0; - Uint total; - - goto L_jump_start; /* avoid a push */ - - while (!ESTACK_ISEMPTY(s)) { - obj = ESTACK_POP(s); - L_jump_start: - if (is_list(obj)) { - L_iter_list: - objp = list_val(obj); - obj = CAR(objp); - - if (is_byte(obj)) { - c_size++; - if (c_size == 0) { - goto L_overflow_error; - } - if (!in_clist) { - in_clist = 1; - v_size++; - } - p_c_size++; - if (!p_in_clist) { - p_in_clist = 1; - p_v_size++; - } - } - else if (is_binary(obj)) { - IO_LIST_VEC_COUNT(obj); - } - else if (is_list(obj)) { - ESTACK_PUSH(s, CDR(objp)); - goto L_iter_list; /* on head */ - } - else if (!is_nil(obj)) { - goto L_type_error; - } - - obj = CDR(objp); - if (is_list(obj)) - goto L_iter_list; /* on tail */ - else if (is_binary(obj)) { /* binary tail is OK */ - IO_LIST_VEC_COUNT(obj); - } - else if (!is_nil(obj)) { - goto L_type_error; - } - } - else if (is_binary(obj)) { - IO_LIST_VEC_COUNT(obj); - } - else if (!is_nil(obj)) { - goto L_type_error; - } - } - - total = c_size + b_size; - if (total < c_size) { - goto L_overflow_error; - } - *total_size = (ErlDrvSizeT) total; - - DESTROY_ESTACK(s); - *vsize = v_size; - *csize = c_size; - *pvsize = p_v_size; - *pcsize = p_c_size; - return 0; - - L_type_error: - L_overflow_error: - DESTROY_ESTACK(s); - return 1; -} - typedef enum { ERTS_TRY_IMM_DRV_CALL_OK, ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK, @@ -1797,8 +1469,7 @@ cleanup_scheduled_outputv(ErlIOVec *ev, ErlDrvBinary *cbinp) int i; /* Need to free all binaries */ for (i = 1; i < ev->vsize; i++) - if (ev->binv[i]) - driver_free_binary(ev->binv[i]); + driver_free_binary(ev->binv[i]); if (cbinp) driver_free_binary(cbinp); } @@ -1966,15 +1637,14 @@ erts_port_output_async(Port *prt, Eterm from, Eterm list) size_t size; int task_flags; ErtsProc2PortSigCallback port_sig_callback; - ErlDrvBinary *cbin = NULL; - ErlIOVec *evp = NULL; + ErtsIOQBinary *cbin = NULL; + ErtsIOVec *evp = NULL; char *buf = NULL; ErtsPortTaskHandle *ns_pthp; if (drv->outputv) { - ErlIOVec ev; SysIOVec* ivp; - ErlDrvBinary** bvp; + ErtsIOQBinary** bvp; int vsize; Uint csize; Uint pvsize; @@ -1984,91 +1654,63 @@ erts_port_output_async(Port *prt, Eterm from, Eterm list) char *ptr; int i; - Eterm* bptr = NULL; - Uint offset; - - if (is_binary(list)) { - /* We optimize for when we get a procbin without offset */ - Eterm real_bin; - int bitoffs; - int bitsize; - ERTS_GET_REAL_BIN(list, real_bin, offset, bitoffs, bitsize); - bptr = binary_val(real_bin); - if (*bptr == HEADER_PROC_BIN && bitoffs == 0) { - size = binary_size(list); - vsize = 1; - } else - bptr = NULL; - } - - if (!bptr) { - if (io_list_vec_len(list, &vsize, &csize, &pvsize, &pcsize, &size)) - goto bad_value; + if (erts_ioq_iodata_vec_len(list, &vsize, &csize, &pvsize, &pcsize, + &size, ERL_SMALL_IO_BIN_LIMIT)) + goto bad_value; - /* To pack or not to pack (small binaries) ...? */ - if (vsize >= SMALL_WRITE_VEC) { - /* Do pack */ - vsize = pvsize + 1; - csize = pcsize; - blimit = ERL_SMALL_IO_BIN_LIMIT; - } - cbin = driver_alloc_binary(csize); + /* To pack or not to pack (small binaries) ...? */ + if (vsize >= SMALL_WRITE_VEC) { + /* Do pack */ + vsize = pvsize + 1; + csize = pcsize; + blimit = ERL_SMALL_IO_BIN_LIMIT; + } + if (csize) { + cbin = (ErtsIOQBinary *)driver_alloc_binary(csize); if (!cbin) erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize)); } - iov_offset = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErlIOVec)); binv_offset = iov_offset; binv_offset += ERTS_ALC_DATA_ALIGN_SIZE((vsize+1)*sizeof(SysIOVec)); alloc_size = binv_offset; - alloc_size += (vsize+1)*sizeof(ErlDrvBinary *); + alloc_size += (vsize+1)*sizeof(ErtsIOQBinary *); sigdp = erts_port_task_alloc_p2p_sig_data_extra(alloc_size, (void**)&ptr); - evp = (ErlIOVec *) ptr; - ivp = evp->iov = (SysIOVec *) (ptr + iov_offset); - bvp = evp->binv = (ErlDrvBinary **) (ptr + binv_offset); + evp = (ErtsIOVec *) ptr; + ivp = evp->driver.iov = (SysIOVec *) (ptr + iov_offset); + bvp = evp->common.binv = (ErtsIOQBinary **) (ptr + binv_offset); ivp[0].iov_base = NULL; ivp[0].iov_len = 0; bvp[0] = NULL; - if (bptr) { - ProcBin* pb = (ProcBin *) bptr; - - ivp[1].iov_base = pb->bytes+offset; - ivp[1].iov_len = size; - bvp[1] = Binary2ErlDrvBinary(pb->val); - - evp->vsize = 1; - } else { - - evp->vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit); - if (evp->vsize < 0) { - if (evp != &ev) - erts_free(ERTS_ALC_T_DRV_CMD_DATA, evp); - driver_free_binary(cbin); - goto bad_value; - } + evp->driver.vsize = erts_ioq_iodata_to_vec(list, ivp+1, bvp+1, cbin, + blimit, 1); + if (evp->driver.vsize < 0) { + erts_free(ERTS_ALC_T_DRV_CMD_DATA, evp); + driver_free_binary(&cbin->driver); + goto bad_value; } #if 0 /* This assertion may say something useful, but it can be falsified during the emulator test suites. */ ASSERT(evp->vsize == vsize); #endif - evp->vsize++; - evp->size = size; /* total size */ + evp->driver.vsize++; + evp->driver.size = size; /* total size */ /* Need to increase refc on all binaries */ - for (i = 1; i < evp->vsize; i++) + for (i = 1; i < evp->driver.vsize; i++) if (bvp[i]) - driver_binary_inc_refc(bvp[i]); + driver_binary_inc_refc(&bvp[i]->driver); sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV; sigdp->u.outputv.from = from; - sigdp->u.outputv.evp = evp; - sigdp->u.outputv.cbinp = cbin; + sigdp->u.outputv.evp = &evp->driver; + sigdp->u.outputv.cbinp = &cbin->driver; port_sig_callback = port_sig_outputv; } else { ErlDrvSizeT ERTS_DECLARE_DUMMY(r); @@ -2139,8 +1781,8 @@ erts_port_output(Process *c_p, erts_aint32_t sched_flags, busy_flgs, invalid_flags; int task_flags; ErtsProc2PortSigCallback port_sig_callback; - ErlDrvBinary *cbin = NULL; - ErlIOVec *evp = NULL; + ErtsIOQBinary *cbin = NULL; + ErtsIOVec *evp = NULL; char *buf = NULL; int force_immediate_call = (flags & ERTS_PORT_SIG_FLG_FORCE_IMM_CALL); int async_nosuspend; @@ -2186,11 +1828,11 @@ erts_port_output(Process *c_p, } #endif if (drv->outputv) { - ErlIOVec ev; + ErtsIOVec ev; SysIOVec iv[SMALL_WRITE_VEC]; - ErlDrvBinary* bv[SMALL_WRITE_VEC]; + ErtsIOQBinary* bv[SMALL_WRITE_VEC]; SysIOVec* ivp; - ErlDrvBinary** bvp; + ErtsIOQBinary** bvp; int vsize; Uint csize; Uint pvsize; @@ -2198,18 +1840,19 @@ erts_port_output(Process *c_p, Uint blimit; size_t iov_offset, binv_offset, alloc_size; - if (io_list_vec_len(list, &vsize, &csize, &pvsize, &pcsize, &size)) + if (erts_ioq_iodata_vec_len(list, &vsize, &csize, &pvsize, &pcsize, + &size, ERL_SMALL_IO_BIN_LIMIT)) goto bad_value; iov_offset = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErlIOVec)); binv_offset = iov_offset; binv_offset += ERTS_ALC_DATA_ALIGN_SIZE((vsize+1)*sizeof(SysIOVec)); alloc_size = binv_offset; - alloc_size += (vsize+1)*sizeof(ErlDrvBinary *); + alloc_size += (vsize+1)*sizeof(ErtsIOQBinary *); if (try_call && vsize < SMALL_WRITE_VEC) { - ivp = ev.iov = iv; - bvp = ev.binv = bv; + ivp = ev.common.iov = iv; + bvp = ev.common.binv = bv; evp = &ev; } else { @@ -2220,9 +1863,9 @@ erts_port_output(Process *c_p, sigdp = erts_port_task_alloc_p2p_sig_data_extra( alloc_size, (void**)&ptr); } - evp = (ErlIOVec *) ptr; - ivp = evp->iov = (SysIOVec *) (ptr + iov_offset); - bvp = evp->binv = (ErlDrvBinary **) (ptr + binv_offset); + evp = (ErtsIOVec *) ptr; + ivp = evp->driver.iov = (SysIOVec *) (ptr + iov_offset); + bvp = evp->common.binv = (ErtsIOQBinary **) (ptr + binv_offset); } /* To pack or not to pack (small binaries) ...? */ @@ -2238,23 +1881,26 @@ erts_port_output(Process *c_p, } /* Use vsize and csize from now on */ - cbin = driver_alloc_binary(csize); - if (!cbin) - erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize)); + if (csize) { + cbin = (ErtsIOQBinary *)driver_alloc_binary(csize); + if (!cbin) + erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize)); + } /* Element 0 is for driver usage to add header block */ ivp[0].iov_base = NULL; ivp[0].iov_len = 0; bvp[0] = NULL; - evp->vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit); - if (evp->vsize < 0) { + evp->driver.vsize = erts_ioq_iodata_to_vec(list, ivp+1, bvp+1, + cbin, blimit, 1); + if (evp->driver.vsize < 0) { if (evp != &ev) { if (try_call) erts_free(ERTS_ALC_T_TMP, evp); else erts_port_task_free_p2p_sig_data(sigdp); } - driver_free_binary(cbin); + driver_free_binary(&cbin->driver); goto bad_value; } #if 0 @@ -2262,19 +1908,19 @@ erts_port_output(Process *c_p, be falsified during the emulator test suites. */ ASSERT(evp->vsize == vsize); #endif - evp->vsize++; - evp->size = size; /* total size */ + evp->driver.vsize++; + evp->driver.size = size; /* total size */ if (!try_call) { int i; /* Need to increase refc on all binaries */ - for (i = 1; i < evp->vsize; i++) - if (bvp[i]) - driver_binary_inc_refc(bvp[i]); + for (i = 1; i < evp->driver.vsize; i++) + if (bvp[i]) + driver_binary_inc_refc(&bvp[i]->driver); } else { int i; - ErlIOVec *new_evp; + ErtsIOVec *new_evp; ErtsTryImmDrvCallResult try_call_res; ErtsTryImmDrvCallState try_call_state = ERTS_INIT_TRY_IMM_DRV_CALL_STATE( @@ -2297,14 +1943,14 @@ erts_port_output(Process *c_p, from, prt, drv, - evp); + &evp->driver); if (force_immediate_call) finalize_force_imm_drv_call(&try_call_state); else finalize_imm_drv_call(&try_call_state); /* Fall through... */ case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT: - driver_free_binary(cbin); + driver_free_binary(&cbin->driver); if (evp != &ev) { ASSERT(!sigdp); erts_free(ERTS_ALC_T_TMP, evp); @@ -2318,7 +1964,7 @@ erts_port_output(Process *c_p, sched_flags = try_call_state.sched_flags; if (async_nosuspend && (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))) { - driver_free_binary(cbin); + driver_free_binary(&cbin->driver); if (evp != &ev) { ASSERT(!sigdp); erts_free(ERTS_ALC_T_TMP, evp); @@ -2333,9 +1979,9 @@ erts_port_output(Process *c_p, } /* Need to increase refc on all binaries */ - for (i = 1; i < evp->vsize; i++) + for (i = 1; i < evp->driver.vsize; i++) if (bvp[i]) - driver_binary_inc_refc(bvp[i]); + driver_binary_inc_refc(&bvp[i]->driver); /* The port task and iovec is allocated in the same structure as an optimization. This @@ -2348,18 +1994,18 @@ erts_port_output(Process *c_p, if (evp != &ev) { /* Copy from TMP alloc to port task */ sys_memcpy((void *) new_evp, (void *) evp, alloc_size); - new_evp->iov = (SysIOVec *) (((char *) new_evp) - + iov_offset); - bvp = new_evp->binv = (ErlDrvBinary **) (((char *) new_evp) - + binv_offset); + new_evp->driver.iov = (SysIOVec *) (((char *) new_evp) + + iov_offset); + bvp = new_evp->common.binv = (ErtsIOQBinary **) (((char *) new_evp) + + binv_offset); #ifdef DEBUG - ASSERT(new_evp->vsize == evp->vsize); - ASSERT(new_evp->size == evp->size); - for (i = 0; i < evp->vsize; i++) { - ASSERT(new_evp->iov[i].iov_len == evp->iov[i].iov_len); - ASSERT(new_evp->iov[i].iov_base == evp->iov[i].iov_base); - ASSERT(new_evp->binv[i] == evp->binv[i]); + ASSERT(new_evp->driver.vsize == evp->driver.vsize); + ASSERT(new_evp->driver.size == evp->driver.size); + for (i = 0; i < evp->driver.vsize; i++) { + ASSERT(new_evp->driver.iov[i].iov_len == evp->driver.iov[i].iov_len); + ASSERT(new_evp->driver.iov[i].iov_base == evp->driver.iov[i].iov_base); + ASSERT(new_evp->driver.binv[i] == evp->driver.binv[i]); } #endif @@ -2368,24 +2014,24 @@ erts_port_output(Process *c_p, else { /* from stack allocated structure; offsets may differ */ sys_memcpy((void *) new_evp, (void *) evp, sizeof(ErlIOVec)); - new_evp->iov = (SysIOVec *) (((char *) new_evp) - + iov_offset); - sys_memcpy((void *) new_evp->iov, - (void *) evp->iov, - evp->vsize * sizeof(SysIOVec)); - new_evp->binv = (ErlDrvBinary **) (((char *) new_evp) - + binv_offset); - sys_memcpy((void *) new_evp->binv, - (void *) evp->binv, - evp->vsize * sizeof(ErlDrvBinary *)); + new_evp->driver.iov = (SysIOVec *) (((char *) new_evp) + + iov_offset); + sys_memcpy((void *) new_evp->driver.iov, + (void *) evp->driver.iov, + evp->driver.vsize * sizeof(SysIOVec)); + new_evp->common.binv = (ErtsIOQBinary **) (((char *) new_evp) + + binv_offset); + sys_memcpy((void *) new_evp->common.binv, + (void *) evp->common.binv, + evp->driver.vsize * sizeof(ErtsIOQBinary *)); #ifdef DEBUG - ASSERT(new_evp->vsize == evp->vsize); - ASSERT(new_evp->size == evp->size); - for (i = 0; i < evp->vsize; i++) { - ASSERT(new_evp->iov[i].iov_len == evp->iov[i].iov_len); - ASSERT(new_evp->iov[i].iov_base == evp->iov[i].iov_base); - ASSERT(new_evp->binv[i] == evp->binv[i]); + ASSERT(new_evp->driver.vsize == evp->driver.vsize); + ASSERT(new_evp->driver.size == evp->driver.size); + for (i = 0; i < evp->driver.vsize; i++) { + ASSERT(new_evp->driver.iov[i].iov_len == evp->driver.iov[i].iov_len); + ASSERT(new_evp->driver.iov[i].iov_base == evp->driver.iov[i].iov_base); + ASSERT(new_evp->driver.binv[i] == evp->driver.binv[i]); } #endif @@ -2396,8 +2042,8 @@ erts_port_output(Process *c_p, sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV; sigdp->u.outputv.from = from; - sigdp->u.outputv.evp = evp; - sigdp->u.outputv.cbinp = cbin; + sigdp->u.outputv.evp = &evp->driver; + sigdp->u.outputv.cbinp = &cbin->driver; port_sig_callback = port_sig_outputv; } else { @@ -7154,307 +6800,51 @@ driver_pdl_dec_refc(ErlDrvPDL pdl) return refc; } -/* expand queue to hold n elements in tail or head */ -static int expandq(ErlIOQueue* q, int n, int tail) -/* tail: 0 if make room in head, make room in tail otherwise */ -{ - int h_sz; /* room before header */ - int t_sz; /* room after tail */ - int q_sz; /* occupied */ - int nvsz; - SysIOVec* niov; - ErlDrvBinary** nbinv; - - h_sz = q->v_head - q->v_start; - t_sz = q->v_end - q->v_tail; - q_sz = q->v_tail - q->v_head; - - if (tail && (n <= t_sz)) /* do we need to expand tail? */ - return 0; - else if (!tail && (n <= h_sz)) /* do we need to expand head? */ - return 0; - else if (n > (h_sz + t_sz)) { /* need to allocate */ - /* we may get little extra but it ok */ - nvsz = (q->v_end - q->v_start) + n; - - niov = erts_alloc_fnf(ERTS_ALC_T_IOQ, nvsz * sizeof(SysIOVec)); - if (!niov) - return -1; - nbinv = erts_alloc_fnf(ERTS_ALC_T_IOQ, nvsz * sizeof(ErlDrvBinary**)); - if (!nbinv) { - erts_free(ERTS_ALC_T_IOQ, (void *) niov); - return -1; - } - if (tail) { - sys_memcpy(niov, q->v_head, q_sz*sizeof(SysIOVec)); - if (q->v_start != q->v_small) - erts_free(ERTS_ALC_T_IOQ, (void *) q->v_start); - q->v_start = niov; - q->v_end = niov + nvsz; - q->v_head = q->v_start; - q->v_tail = q->v_head + q_sz; - - sys_memcpy(nbinv, q->b_head, q_sz*sizeof(ErlDrvBinary*)); - if (q->b_start != q->b_small) - erts_free(ERTS_ALC_T_IOQ, (void *) q->b_start); - q->b_start = nbinv; - q->b_end = nbinv + nvsz; - q->b_head = q->b_start; - q->b_tail = q->b_head + q_sz; - } - else { - sys_memcpy(niov+nvsz-q_sz, q->v_head, q_sz*sizeof(SysIOVec)); - if (q->v_start != q->v_small) - erts_free(ERTS_ALC_T_IOQ, (void *) q->v_start); - q->v_start = niov; - q->v_end = niov + nvsz; - q->v_tail = q->v_end; - q->v_head = q->v_tail - q_sz; - - sys_memcpy(nbinv+nvsz-q_sz, q->b_head, q_sz*sizeof(ErlDrvBinary*)); - if (q->b_start != q->b_small) - erts_free(ERTS_ALC_T_IOQ, (void *) q->b_start); - q->b_start = nbinv; - q->b_end = nbinv + nvsz; - q->b_tail = q->b_end; - q->b_head = q->b_tail - q_sz; - } - } - else if (tail) { /* move to beginning to make room in tail */ - sys_memmove(q->v_start, q->v_head, q_sz*sizeof(SysIOVec)); - q->v_head = q->v_start; - q->v_tail = q->v_head + q_sz; - sys_memmove(q->b_start, q->b_head, q_sz*sizeof(ErlDrvBinary*)); - q->b_head = q->b_start; - q->b_tail = q->b_head + q_sz; - } - else { /* move to end to make room */ - sys_memmove(q->v_end-q_sz, q->v_head, q_sz*sizeof(SysIOVec)); - q->v_tail = q->v_end; - q->v_head = q->v_tail-q_sz; - sys_memmove(q->b_end-q_sz, q->b_head, q_sz*sizeof(ErlDrvBinary*)); - q->b_tail = q->b_end; - q->b_head = q->b_tail-q_sz; - } - - return 0; -} - - - /* Put elements from vec at q tail */ int driver_enqv(ErlDrvPort ix, ErlIOVec* vec, ErlDrvSizeT skip) { - int n; - size_t len; - ErlDrvSizeT size; - SysIOVec* iov; - ErlDrvBinary** binv; - ErlDrvBinary* b; - ErlIOQueue* q = drvport2ioq(ix); - - if (q == NULL) - return -1; - - ASSERT(vec->size >= skip); /* debug only */ - if (vec->size <= skip) - return 0; - size = vec->size - skip; - - iov = vec->iov; - binv = vec->binv; - n = vec->vsize; - - /* we use do here to strip iov_len=0 from beginning */ - do { - len = iov->iov_len; - if (len <= skip) { - skip -= len; - iov++; - binv++; - n--; - } - else { - iov->iov_base = ((char *)(iov->iov_base)) + skip; - iov->iov_len -= skip; - skip = 0; - } - } while(skip > 0); - - if (q->v_tail + n >= q->v_end) - expandq(q, n, 1); - - /* Queue and reference all binaries (remove zero length items) */ - while(n--) { - if ((len = iov->iov_len) > 0) { - if ((b = *binv) == NULL) { /* speical case create binary ! */ - b = driver_alloc_binary(len); - sys_memcpy(b->orig_bytes, iov->iov_base, len); - *q->b_tail++ = b; - q->v_tail->iov_len = len; - q->v_tail->iov_base = b->orig_bytes; - q->v_tail++; - } - else { - driver_binary_inc_refc(b); - *q->b_tail++ = b; - *q->v_tail++ = *iov; - } - } - iov++; - binv++; - } - q->size += size; /* update total size in queue */ - return 0; + ASSERT(vec->size >= skip); + return erts_ioq_enqv(drvport2ioq(ix), (ErtsIOVec*)vec, skip); } /* Put elements from vec at q head */ int driver_pushqv(ErlDrvPort ix, ErlIOVec* vec, ErlDrvSizeT skip) { - int n; - size_t len; - ErlDrvSizeT size; - SysIOVec* iov; - ErlDrvBinary** binv; - ErlDrvBinary* b; - ErlIOQueue* q = drvport2ioq(ix); - - if (q == NULL) - return -1; - - if (vec->size <= skip) - return 0; - size = vec->size - skip; - - iov = vec->iov; - binv = vec->binv; - n = vec->vsize; - - /* we use do here to strip iov_len=0 from beginning */ - do { - len = iov->iov_len; - if (len <= skip) { - skip -= len; - iov++; - binv++; - n--; - } - else { - iov->iov_base = ((char *)(iov->iov_base)) + skip; - iov->iov_len -= skip; - skip = 0; - } - } while(skip > 0); - - if (q->v_head - n < q->v_start) - expandq(q, n, 0); - - /* Queue and reference all binaries (remove zero length items) */ - iov += (n-1); /* move to end */ - binv += (n-1); /* move to end */ - while(n--) { - if ((len = iov->iov_len) > 0) { - if ((b = *binv) == NULL) { /* speical case create binary ! */ - b = driver_alloc_binary(len); - sys_memcpy(b->orig_bytes, iov->iov_base, len); - *--q->b_head = b; - q->v_head--; - q->v_head->iov_len = len; - q->v_head->iov_base = b->orig_bytes; - } - else { - driver_binary_inc_refc(b); - *--q->b_head = b; - *--q->v_head = *iov; - } - } - iov--; - binv--; - } - q->size += size; /* update total size in queue */ - return 0; + ASSERT(vec->size >= skip); + return erts_ioq_pushqv(drvport2ioq(ix), (ErtsIOVec*)vec, skip); } - /* ** Remove size bytes from queue head ** Return number of bytes that remain in queue */ ErlDrvSizeT driver_deq(ErlDrvPort ix, ErlDrvSizeT size) { - ErlIOQueue* q = drvport2ioq(ix); - ErlDrvSizeT len; - - if ((q == NULL) || (q->size < size)) - return -1; - q->size -= size; - while (size > 0) { - ASSERT(q->v_head != q->v_tail); - - len = q->v_head->iov_len; - if (len <= size) { - size -= len; - driver_free_binary(*q->b_head); - *q->b_head++ = NULL; - q->v_head++; - } - else { - q->v_head->iov_base = ((char *)(q->v_head->iov_base)) + size; - q->v_head->iov_len -= size; - size = 0; - } - } - - /* restart pointers (optimised for enq) */ - if (q->v_head == q->v_tail) { - q->v_head = q->v_tail = q->v_start; - q->b_head = q->b_tail = q->b_start; - } - return q->size; + ErlPortIOQueue *q = drvport2ioq(ix); + if (erts_ioq_deq(q, size) == -1) + return -1; + return erts_ioq_size(q); } -ErlDrvSizeT driver_peekqv(ErlDrvPort ix, ErlIOVec *ev) { - ErlIOQueue *q = drvport2ioq(ix); - ASSERT(ev); - - if (! q) { - return (ErlDrvSizeT) -1; - } else { - if ((ev->vsize = q->v_tail - q->v_head) == 0) { - ev->size = 0; - ev->iov = NULL; - ev->binv = NULL; - } else { - ev->size = q->size; - ev->iov = q->v_head; - ev->binv = q->b_head; - } - return q->size; - } +ErlDrvSizeT driver_peekqv(ErlDrvPort ix, ErlIOVec *ev) +{ + return erts_ioq_peekqv(drvport2ioq(ix), (ErtsIOVec*)ev); } SysIOVec* driver_peekq(ErlDrvPort ix, int* vlenp) /* length of io-vector */ { - ErlIOQueue* q = drvport2ioq(ix); - - if (q == NULL) { - *vlenp = -1; - return NULL; - } - if ((*vlenp = (q->v_tail - q->v_head)) == 0) - return NULL; - return q->v_head; + return erts_ioq_peekq(drvport2ioq(ix), vlenp); } ErlDrvSizeT driver_sizeq(ErlDrvPort ix) { - ErlIOQueue* q = drvport2ioq(ix); + ErlPortIOQueue *q = drvport2ioq(ix); if (q == NULL) - return (size_t) -1; - return q->size; + return (ErlDrvSizeT) -1; + return erts_ioq_size(q); } diff --git a/erts/emulator/beam/utils.c b/erts/emulator/beam/utils.c index bab7352479..d7116bd2c3 100644 --- a/erts/emulator/beam/utils.c +++ b/erts/emulator/beam/utils.c @@ -52,6 +52,7 @@ #include "erl_ptab.h" #include "erl_check_io.h" #include "erl_bif_unique.h" +#include "erl_io_queue.h" #define ERTS_WANT_TIMER_WHEEL_API #include "erl_time.h" #ifdef HIPE diff --git a/erts/emulator/drivers/common/zlib_drv.c b/erts/emulator/drivers/common/zlib_drv.c deleted file mode 100644 index e342e414b5..0000000000 --- a/erts/emulator/drivers/common/zlib_drv.c +++ /dev/null @@ -1,792 +0,0 @@ -/* - * %CopyrightBegin% - * - * Copyright Ericsson AB 2003-2017. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * %CopyrightEnd% - */ - -/* - * ZLib interface for erlang - * - */ -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif -#include <stdio.h> -#include <zlib.h> -#include <errno.h> -#include <string.h> - -#include "erl_driver.h" - - -#define DEFLATE_INIT 1 -#define DEFLATE_INIT2 2 -#define DEFLATE_SETDICT 3 -#define DEFLATE_RESET 4 -#define DEFLATE_END 5 -#define DEFLATE_PARAMS 6 -#define DEFLATE 7 - -#define INFLATE_INIT 8 -#define INFLATE_INIT2 9 -#define INFLATE_SETDICT 10 -#define INFLATE_GETDICT 11 -#define INFLATE_SYNC 12 -#define INFLATE_RESET 13 -#define INFLATE_END 14 -#define INFLATE 15 - -#define CRC32_0 16 -#define CRC32_1 17 -#define CRC32_2 18 - -#define SET_BUFSZ 19 -#define GET_BUFSZ 20 -#define GET_QSIZE 21 - -#define ADLER32_1 22 -#define ADLER32_2 23 - -#define CRC32_COMBINE 24 -#define ADLER32_COMBINE 25 - -#define INFLATE_CHUNK 26 - - -#define DEFAULT_BUFSZ 4000 - -/* According to zlib documentation, it can never exceed this */ -#define INFL_DICT_SZ 32768 - -/* This flag is used in the same places, where zlib return codes - * (Z_OK, Z_STREAM_END, Z_NEED_DICT) are. So, we need to set it to - * relatively large value to avoid possible value clashes in future. - * */ -#define INFLATE_HAS_MORE 100 - -static int zlib_init(void); -static ErlDrvData zlib_start(ErlDrvPort port, char* buf); -static void zlib_stop(ErlDrvData e); -static void zlib_flush(ErlDrvData e); -static ErlDrvSSizeT zlib_ctl(ErlDrvData drv_data, unsigned int command, char *buf, - ErlDrvSizeT len, char **rbuf, ErlDrvSizeT rlen); -static void zlib_outputv(ErlDrvData drv_data, ErlIOVec *ev); - -ErlDrvEntry zlib_driver_entry = { - zlib_init, - zlib_start, - zlib_stop, - NULL, /* output */ - NULL, /* ready_input */ - NULL, /* ready_output */ - "zlib_drv", - NULL, /* finish */ - NULL, /* handle */ - zlib_ctl, - NULL, /* timeout */ - zlib_outputv, - NULL, /* read_async */ - zlib_flush, - NULL, /* call */ - NULL, /* event */ - ERL_DRV_EXTENDED_MARKER, - ERL_DRV_EXTENDED_MAJOR_VERSION, - ERL_DRV_EXTENDED_MINOR_VERSION, - ERL_DRV_FLAG_USE_PORT_LOCKING, - NULL, /* handle2 */ - NULL, /* process_exit */ -}; - -typedef enum { - ST_NONE = 0, - ST_DEFLATE = 1, - ST_INFLATE = 2 -} ZLibState; - - -typedef struct { - z_stream s; - ZLibState state; - ErlDrvBinary* bin; - int binsz; - int binsz_need; - uLong crc; - int inflate_eos_seen; - int want_crc; /* 1 if crc is calculated on clear text */ - ErlDrvPort port; /* the associcated port */ -} ZLibData; - -static int zlib_inflate(ZLibData* d, int flush); -static int zlib_deflate(ZLibData* d, int flush); - -#if defined(__WIN32__) -static int i32(char* buf) -#else -static __inline__ int i32(char* buf) -#endif -{ - return (int) ( - (((int)((unsigned char*)buf)[0]) << 24) | - (((int)((unsigned char*)buf)[1]) << 16) | - (((int)((unsigned char*)buf)[2]) << 8) | - (((int)((unsigned char*)buf)[3]) << 0)); -} - -static char* zlib_reason(int code, int* err) -{ - switch(code) { - case Z_OK: - *err = 0; - return "ok"; - case Z_STREAM_END: - *err = 0; - return "stream_end"; - case Z_ERRNO: - *err = 1; - return erl_errno_id(errno); - case Z_STREAM_ERROR: - *err = 1; - return "stream_error"; - case Z_DATA_ERROR: - *err = 1; - return "data_error"; - case Z_MEM_ERROR: - *err = 1; - return "mem_error"; - case Z_BUF_ERROR: - *err = 1; - return "buf_error"; - case Z_VERSION_ERROR: - *err = 1; - return "version_error"; - default: - *err = 1; - return "unknown_error"; - } -} - - -static ErlDrvSSizeT zlib_return(int code, char** rbuf, ErlDrvSizeT rlen) -{ - int msg_code = 0; /* 0=ok, 1=error */ - char* dst = *rbuf; - char* src; - ErlDrvSizeT len = 0; - - src = zlib_reason(code, &msg_code); - *dst++ = msg_code; - rlen--; - len = 1; - - while((rlen > 0) && *src) { - *dst++ = *src++; - rlen--; - len++; - } - return len; -} - -static ErlDrvSSizeT zlib_value2(int msg_code, int value, - char** rbuf, ErlDrvSizeT rlen) -{ - char* dst = *rbuf; - - if (rlen < 5) { - return -1; - } - *dst++ = msg_code; - *dst++ = (value >> 24) & 0xff; - *dst++ = (value >> 16) & 0xff; - *dst++ = (value >> 8) & 0xff; - *dst++ = value & 0xff; - return 5; -} - -static ErlDrvSSizeT zlib_value(int value, char** rbuf, ErlDrvSizeT rlen) -{ - return zlib_value2(2, value, rbuf, rlen); -} - -static int zlib_output_init(ZLibData* d) -{ - if (d->bin != NULL) - driver_free_binary(d->bin); - if ((d->bin = driver_alloc_binary(d->binsz_need)) == NULL) - return -1; - d->binsz = d->binsz_need; - d->s.next_out = (unsigned char*)d->bin->orig_bytes; - d->s.avail_out = d->binsz; - return 0; -} - -/* - * Send compressed or uncompressed data - * and restart output procesing - */ -static int zlib_output(ZLibData* d) -{ - if (d->bin != NULL) { - int len = d->binsz - d->s.avail_out; - if (len > 0) { - if (driver_output_binary(d->port, NULL, 0, d->bin, 0, len) < 0) - return -1; - } - driver_free_binary(d->bin); - d->bin = NULL; - d->binsz = 0; - } - return zlib_output_init(d); -} - -static int zlib_inflate_get_dictionary(ZLibData* d) -{ -#ifdef HAVE_ZLIB_INFLATEGETDICTIONARY - ErlDrvBinary* dbin = driver_alloc_binary(INFL_DICT_SZ); - uInt dlen = 0; - int res = inflateGetDictionary(&d->s, (unsigned char*)dbin->orig_bytes, &dlen); - if ((res == Z_OK) && (driver_output_binary(d->port, NULL, 0, dbin, 0, dlen) < 0)) { - res = Z_ERRNO; - } - driver_free_binary(dbin); - return res; -#else - abort(); /* never called, just to silence 'unresolved symbol' - for non-optimizing compiler */ -#endif -} - -static int zlib_inflate(ZLibData* d, int flush) -{ - int res = Z_OK; - - if ((d->bin == NULL) && (zlib_output_init(d) < 0)) { - errno = ENOMEM; - return Z_ERRNO; - } - - while ((driver_sizeq(d->port) > 0) && (res != Z_STREAM_END)) { - int vlen; - SysIOVec* iov = driver_peekq(d->port, &vlen); - int len; - int possibly_more_output = 0; - - d->s.next_in = iov[0].iov_base; - d->s.avail_in = iov[0].iov_len; - while((possibly_more_output || (d->s.avail_in > 0)) && (res != Z_STREAM_END)) { - res = inflate(&d->s, Z_NO_FLUSH); - if (res == Z_NEED_DICT) { - /* Essential to eat the header bytes that zlib has looked at */ - len = iov[0].iov_len - d->s.avail_in; - driver_deq(d->port, len); - return res; - } - if (res == Z_BUF_ERROR) { - /* Was possible more output, but actually not */ - res = Z_OK; - } - else if (res < 0) { - return res; - } - if (d->s.avail_out != 0) { - possibly_more_output = 0; - } else { - if (d->want_crc) - d->crc = crc32(d->crc, (unsigned char*)d->bin->orig_bytes, - d->binsz - d->s.avail_out); - zlib_output(d); - possibly_more_output = 1; - } - } - len = iov[0].iov_len - d->s.avail_in; - driver_deq(d->port, len); - } - - if (d->want_crc) { - d->crc = crc32(d->crc, (unsigned char*) d->bin->orig_bytes, - d->binsz - d->s.avail_out); - } - zlib_output(d); - if (res == Z_STREAM_END) { - d->inflate_eos_seen = 1; - } - return res; -} - -static int zlib_inflate_chunk(ZLibData* d) -{ - int res = Z_OK; - - if ((d->bin == NULL) && (zlib_output_init(d) < 0)) { - errno = ENOMEM; - return Z_ERRNO; - } - - while ((driver_sizeq(d->port) > 0) && (d->s.avail_out > 0) && - (res != Z_STREAM_END)) { - int vlen; - SysIOVec* iov = driver_peekq(d->port, &vlen); - int len; - - d->s.next_in = iov[0].iov_base; - d->s.avail_in = iov[0].iov_len; - while((d->s.avail_in > 0) && (d->s.avail_out > 0) && (res != Z_STREAM_END)) { - res = inflate(&d->s, Z_NO_FLUSH); - if (res == Z_NEED_DICT) { - /* Essential to eat the header bytes that zlib has looked at */ - len = iov[0].iov_len - d->s.avail_in; - driver_deq(d->port, len); - return res; - } - if (res == Z_BUF_ERROR) { - /* Was possible more output, but actually not */ - res = Z_OK; - } - else if (res < 0) { - return res; - } - } - len = iov[0].iov_len - d->s.avail_in; - driver_deq(d->port, len); - } - - /* We are here because all input was consumed or EOS reached or output - * buffer is full */ - if (d->want_crc) { - d->crc = crc32(d->crc, (unsigned char*) d->bin->orig_bytes, - d->binsz - d->s.avail_out); - } - zlib_output(d); - if ((res == Z_OK) && (d->s.avail_in > 0)) - res = INFLATE_HAS_MORE; - else if (res == Z_STREAM_END) { - d->inflate_eos_seen = 1; - } - return res; -} - -static int zlib_deflate(ZLibData* d, int flush) -{ - int res = Z_OK; - - if ((d->bin == NULL) && (zlib_output_init(d) < 0)) { - errno = ENOMEM; - return Z_ERRNO; - } - - while ((driver_sizeq(d->port) > 0) && (res != Z_STREAM_END)) { - int vlen; - SysIOVec* iov = driver_peekq(d->port, &vlen); - int len; - - d->s.next_in = iov[0].iov_base; - d->s.avail_in = iov[0].iov_len; - - while((d->s.avail_in > 0) && (res != Z_STREAM_END)) { - if ((res = deflate(&d->s, Z_NO_FLUSH)) < 0) { - return res; - } - if (d->s.avail_out == 0) { - zlib_output(d); - } - } - len = iov[0].iov_len - d->s.avail_in; - if (d->want_crc) { - d->crc = crc32(d->crc, iov[0].iov_base, len); - } - driver_deq(d->port, len); - } - - if (flush != Z_NO_FLUSH) { - if ((res = deflate(&d->s, flush)) < 0) { - return res; - } - if (flush == Z_FINISH) { - while (d->s.avail_out < d->binsz) { - zlib_output(d); - if (res == Z_STREAM_END) { - break; - } - if ((res = deflate(&d->s, flush)) < 0) { - return res; - } - } - } else { - while (d->s.avail_out == 0) { - zlib_output(d); - if ((res = deflate(&d->s, flush)) < 0) { - return res; - } - } - if (d->s.avail_out < d->binsz) { - zlib_output(d); - } - } - } - return res; -} - - - -static void* zlib_alloc(void* data, unsigned int items, unsigned int size) -{ - return (void*) driver_alloc(items*size); -} - -static void zlib_free(void* data, void* addr) -{ - driver_free(addr); -} - -#if defined(__APPLE__) && defined(__MACH__) && defined(HAVE_ZLIB_INFLATEGETDICTIONARY) - -/* Work around broken build system with runtime version test */ -static int have_inflateGetDictionary; - -static int zlib_init() -{ - unsigned int v[4] = {0, 0, 0, 0}; - unsigned hexver; - - sscanf(zlibVersion(), "%u.%u.%u.%u", &v[0], &v[1], &v[2], &v[3]); - - hexver = (v[0] << (8*3)) | (v[1] << (8*2)) | (v[2] << (8)) | v[3]; - - have_inflateGetDictionary = (hexver >= 0x1020701); /* 1.2.7.1 */ - - return 0; -} -#else /* trust configure got it right */ -# ifdef HAVE_ZLIB_INFLATEGETDICTIONARY -# define have_inflateGetDictionary 1 -# else -# define have_inflateGetDictionary 0 -# endif -static int zlib_init() -{ - return 0; -} -#endif - -static ErlDrvData zlib_start(ErlDrvPort port, char* buf) -{ - ZLibData* d; - - if ((d = (ZLibData*) driver_alloc(sizeof(ZLibData))) == NULL) - return ERL_DRV_ERROR_GENERAL; - - memset(&d->s, 0, sizeof(z_stream)); - - d->s.zalloc = zlib_alloc; - d->s.zfree = zlib_free; - d->s.opaque = d; - d->s.data_type = Z_BINARY; - - d->port = port; - d->state = ST_NONE; - d->bin = NULL; - d->binsz = 0; - d->binsz_need = DEFAULT_BUFSZ; - d->crc = crc32(0L, Z_NULL, 0); - d->inflate_eos_seen = 0; - d->want_crc = 0; - return (ErlDrvData)d; -} - - -static void zlib_stop(ErlDrvData e) -{ - ZLibData* d = (ZLibData*)e; - - if (d->state == ST_DEFLATE) - deflateEnd(&d->s); - else if (d->state == ST_INFLATE) - inflateEnd(&d->s); - - if (d->bin != NULL) - driver_free_binary(d->bin); - - driver_free(d); -} - -static void zlib_flush(ErlDrvData drv_data) -{ - ZLibData* d = (ZLibData*) drv_data; - - driver_deq(d->port, driver_sizeq(d->port)); -} - -static ErlDrvSSizeT zlib_ctl(ErlDrvData drv_data, unsigned int command, char *buf, - ErlDrvSizeT len, char **rbuf, ErlDrvSizeT rlen) -{ - ZLibData* d = (ZLibData*)drv_data; - int res; - - switch(command) { - case DEFLATE_INIT: - if (len != 4) goto badarg; - if (d->state != ST_NONE) goto badarg; - res = deflateInit(&d->s, i32(buf)); - if (res == Z_OK) { - d->state = ST_DEFLATE; - d->want_crc = 0; - d->crc = crc32(0L, Z_NULL, 0); - } - return zlib_return(res, rbuf, rlen); - - case DEFLATE_INIT2: { - int wbits; - - if (len != 20) goto badarg; - if (d->state != ST_NONE) goto badarg; - wbits = i32(buf+8); - res = deflateInit2(&d->s, i32(buf), i32(buf+4), wbits, - i32(buf+12), i32(buf+16)); - if (res == Z_OK) { - d->state = ST_DEFLATE; - d->want_crc = (wbits < 0); - d->crc = crc32(0L, Z_NULL, 0); - } - return zlib_return(res, rbuf, rlen); - } - - case DEFLATE_SETDICT: - if (d->state != ST_DEFLATE) goto badarg; - res = deflateSetDictionary(&d->s, (unsigned char*)buf, len); - if (res == Z_OK) { - return zlib_value(d->s.adler, rbuf, rlen); - } else { - return zlib_return(res, rbuf, rlen); - } - - case DEFLATE_RESET: - if (len != 0) goto badarg; - if (d->state != ST_DEFLATE) goto badarg; - driver_deq(d->port, driver_sizeq(d->port)); - res = deflateReset(&d->s); - return zlib_return(res, rbuf, rlen); - - case DEFLATE_END: - if (len != 0) goto badarg; - if (d->state != ST_DEFLATE) goto badarg; - driver_deq(d->port, driver_sizeq(d->port)); - res = deflateEnd(&d->s); - d->state = ST_NONE; - return zlib_return(res, rbuf, rlen); - - case DEFLATE_PARAMS: - if (len != 8) goto badarg; - if (d->state != ST_DEFLATE) goto badarg; - res = deflateParams(&d->s, i32(buf), i32(buf+4)); - return zlib_return(res, rbuf, rlen); - - case DEFLATE: - if (d->state != ST_DEFLATE) goto badarg; - if (len != 4) goto badarg; - res = zlib_deflate(d, i32(buf)); - return zlib_return(res, rbuf, rlen); - - case INFLATE_INIT: - if (len != 0) goto badarg; - if (d->state != ST_NONE) goto badarg; - res = inflateInit(&d->s); - if (res == Z_OK) { - d->state = ST_INFLATE; - d->inflate_eos_seen = 0; - d->want_crc = 0; - d->crc = crc32(0L, Z_NULL, 0); - } - return zlib_return(res, rbuf, rlen); - - case INFLATE_INIT2: { - int wbits; - - if (len != 4) goto badarg; - if (d->state != ST_NONE) goto badarg; - wbits = i32(buf); - res = inflateInit2(&d->s, wbits); - if (res == Z_OK) { - d->state = ST_INFLATE; - d->inflate_eos_seen = 0; - d->want_crc = (wbits < 0); - d->crc = crc32(0L, Z_NULL, 0); - } - return zlib_return(res, rbuf, rlen); - } - - case INFLATE_SETDICT: - if (d->state != ST_INFLATE) goto badarg; - res = inflateSetDictionary(&d->s, (unsigned char*)buf, len); - return zlib_return(res, rbuf, rlen); - - case INFLATE_GETDICT: - if (have_inflateGetDictionary) { - if (d->state != ST_INFLATE) goto badarg; - res = zlib_inflate_get_dictionary(d); - } else { - errno = ENOTSUP; - res = Z_ERRNO; - } - return zlib_return(res, rbuf, rlen); - - case INFLATE_SYNC: - if (d->state != ST_INFLATE) goto badarg; - if (len != 0) goto badarg; - if (driver_sizeq(d->port) == 0) { - res = Z_BUF_ERROR; - } else { - int vlen; - SysIOVec* iov = driver_peekq(d->port, &vlen); - - d->s.next_in = iov[0].iov_base; - d->s.avail_in = iov[0].iov_len; - res = inflateSync(&d->s); - } - return zlib_return(res, rbuf, rlen); - - case INFLATE_RESET: - if (d->state != ST_INFLATE) goto badarg; - if (len != 0) goto badarg; - driver_deq(d->port, driver_sizeq(d->port)); - res = inflateReset(&d->s); - d->inflate_eos_seen = 0; - return zlib_return(res, rbuf, rlen); - - case INFLATE_END: - if (d->state != ST_INFLATE) goto badarg; - if (len != 0) goto badarg; - driver_deq(d->port, driver_sizeq(d->port)); - res = inflateEnd(&d->s); - if (res == Z_OK && d->inflate_eos_seen == 0) { - res = Z_DATA_ERROR; - } - d->state = ST_NONE; - return zlib_return(res, rbuf, rlen); - - case INFLATE: - if (d->state != ST_INFLATE) goto badarg; - if (len != 4) goto badarg; - res = zlib_inflate(d, i32(buf)); - if (res == Z_NEED_DICT) { - return zlib_value2(3, d->s.adler, rbuf, rlen); - } else { - return zlib_return(res, rbuf, rlen); - } - - case INFLATE_CHUNK: - if (d->state != ST_INFLATE) goto badarg; - if (len != 0) goto badarg; - res = zlib_inflate_chunk(d); - if (res == INFLATE_HAS_MORE) { - return zlib_value2(4, 0, rbuf, rlen); - } else if (res == Z_NEED_DICT) { - return zlib_value2(3, d->s.adler, rbuf, rlen); - } else { - return zlib_return(res, rbuf, rlen); - } - - case GET_QSIZE: - return zlib_value(driver_sizeq(d->port), rbuf, rlen); - - case GET_BUFSZ: - return zlib_value(d->binsz_need, rbuf, rlen); - - case SET_BUFSZ: { - int need; - if (len != 4) goto badarg; - need = i32(buf); - if ((need < 16) || (need > 0x00ffffff)) - goto badarg; - if (d->binsz_need != need) { - d->binsz_need = need; - if (d->bin != NULL) { - if (d->s.avail_out == d->binsz) { - driver_free_binary(d->bin); - d->bin = NULL; - d->binsz = 0; - } - else - zlib_output(d); - } - } - return zlib_return(Z_OK, rbuf, rlen); - } - - case CRC32_0: - return zlib_value(d->crc, rbuf, rlen); - - case CRC32_1: { - uLong crc = crc32(0L, Z_NULL, 0); - crc = crc32(crc, (unsigned char*) buf, len); - return zlib_value(crc, rbuf, rlen); - } - - case CRC32_2: { - uLong crc; - if (len < 4) goto badarg; - crc = (unsigned int) i32(buf); - crc = crc32(crc, (unsigned char*) buf+4, len-4); - return zlib_value(crc, rbuf, rlen); - } - - case ADLER32_1: { - uLong adler = adler32(0L, Z_NULL, 0); - adler = adler32(adler, (unsigned char*) buf, len); - return zlib_value(adler, rbuf, rlen); - } - - case ADLER32_2: { - uLong adler; - if (len < 4) goto badarg; - adler = (unsigned int) i32(buf); - adler = adler32(adler, (unsigned char*) buf+4, len-4); - return zlib_value(adler, rbuf, rlen); - } - - case CRC32_COMBINE: { - uLong crc, crc1, crc2, len2; - if (len != 12) goto badarg; - crc1 = (unsigned int) i32(buf); - crc2 = (unsigned int) i32(buf+4); - len2 = (unsigned int) i32(buf+8); - crc = crc32_combine(crc1, crc2, len2); - return zlib_value(crc, rbuf, rlen); - } - - case ADLER32_COMBINE: { - uLong adler, adler1, adler2, len2; - if (len != 12) goto badarg; - adler1 = (unsigned int) i32(buf); - adler2 = (unsigned int) i32(buf+4); - len2 = (unsigned int) i32(buf+8); - adler = adler32_combine(adler1, adler2, len2); - return zlib_value(adler, rbuf, rlen); - } - } - - badarg: - errno = EINVAL; - return zlib_return(Z_ERRNO, rbuf, rlen); -} - - - -static void zlib_outputv(ErlDrvData drv_data, ErlIOVec *ev) -{ - ZLibData* d = (ZLibData*) drv_data; - - driver_enqv(d->port, ev, 0); -} diff --git a/erts/emulator/nifs/common/zlib_nif.c b/erts/emulator/nifs/common/zlib_nif.c new file mode 100644 index 0000000000..b7f3adaffe --- /dev/null +++ b/erts/emulator/nifs/common/zlib_nif.c @@ -0,0 +1,1019 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson 2017. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * %CopyrightEnd% + */ + +#define STATIC_ERLANG_NIF 1 + +#include <stdio.h> +#include <zlib.h> + +#include "erl_nif.h" +#include "config.h" +#include "sys.h" + +#ifdef VALGRIND +# include <valgrind/memcheck.h> +#endif + +#define INFL_DICT_SZ (32768) + +#ifndef MAX +#define MAX(X, Y) ((X) > (Y) ? (X) : (Y)) +#endif + +#ifndef MIN +#define MIN(X, Y) ((X) < (Y) ? (X) : (Y)) +#endif + +/* NIF interface declarations */ +static int load(ErlNifEnv *env, void** priv_data, ERL_NIF_TERM load_info); +static int upgrade(ErlNifEnv *env, void** priv_data, void** old_priv_data, ERL_NIF_TERM load_info); +static void unload(ErlNifEnv *env, void* priv_data); + +static ErlNifResourceType *rtype_zlib; + +static ERL_NIF_TERM am_not_on_controlling_process; + +static ERL_NIF_TERM am_not_initialized; +static ERL_NIF_TERM am_already_initialized; + +static ERL_NIF_TERM am_ok; +static ERL_NIF_TERM am_error; + +static ERL_NIF_TERM am_continue; +static ERL_NIF_TERM am_finished; + +static ERL_NIF_TERM am_not_supported; +static ERL_NIF_TERM am_need_dictionary; + +static ERL_NIF_TERM am_empty; + +static ERL_NIF_TERM am_stream_end; +static ERL_NIF_TERM am_stream_error; +static ERL_NIF_TERM am_data_error; +static ERL_NIF_TERM am_mem_error; +static ERL_NIF_TERM am_buf_error; +static ERL_NIF_TERM am_version_error; +static ERL_NIF_TERM am_unknown_error; + +typedef enum { + ST_NONE = 0, + ST_DEFLATE = 1, + ST_INFLATE = 2, + ST_CLOSED = 3 +} zlib_state; + +typedef struct { + z_stream s; + zlib_state state; + + /* These refer to the plaintext CRC, and are only needed for zlib:crc32/1 + * which is deprecated. */ + uLong input_crc; + uLong output_crc; + int want_input_crc; + int want_output_crc; + + int is_raw_stream; + + int eos_seen; + + /* DEPRECATED */ + int inflateChunk_buffer_size; + + ErlNifPid controlling_process; + + ErlNifIOQueue *input_queue; + + ErlNifEnv *stash_env; + ERL_NIF_TERM stash_term; +} zlib_data_t; + +/* The NIFs: */ + +static ERL_NIF_TERM zlib_open(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_close(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_deflateInit(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_deflateInit2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_deflateSetDictionary(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_deflateReset(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_deflateEnd(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_deflateParams(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_deflate(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); + +static ERL_NIF_TERM zlib_inflateInit(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_inflateInit2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_inflateSetDictionary(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_inflateGetDictionary(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_inflateReset(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_inflateEnd(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_inflate(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); + +static ERL_NIF_TERM zlib_crc32(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); + +static ERL_NIF_TERM zlib_clearStash(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_setStash(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_getStash(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); + +static ERL_NIF_TERM zlib_getBufSize(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM zlib_setBufSize(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); + +static ERL_NIF_TERM zlib_enqueue_input(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); + +static ErlNifFunc nif_funcs[] = { + /* deflate */ + {"deflateInit_nif", 2, zlib_deflateInit}, + {"deflateInit_nif", 6, zlib_deflateInit2}, + {"deflateSetDictionary_nif", 2, zlib_deflateSetDictionary}, + {"deflateReset_nif", 1, zlib_deflateReset}, + {"deflateEnd_nif", 1, zlib_deflateEnd}, + {"deflateParams_nif", 3, zlib_deflateParams}, + {"deflate_nif", 4, zlib_deflate}, + + /* inflate */ + {"inflateInit_nif", 1, zlib_inflateInit}, + {"inflateInit_nif", 2, zlib_inflateInit2}, + {"inflateSetDictionary_nif", 2, zlib_inflateSetDictionary}, + {"inflateGetDictionary_nif", 1, zlib_inflateGetDictionary}, + {"inflateReset_nif", 1, zlib_inflateReset}, + {"inflateEnd_nif", 1, zlib_inflateEnd}, + {"inflate_nif", 4, zlib_inflate}, + + /* running checksum */ + {"crc32_nif", 1, zlib_crc32}, + + /* open & close */ + {"close_nif", 1, zlib_close}, + {"open_nif", 0, zlib_open}, + + /* The stash keeps a single term alive across calls, and is used in + * exception_on_need_dict/1 to retain the old error behavior, and for + * saving data flushed through deflateParams/3. */ + {"getStash_nif", 1, zlib_getStash}, + {"clearStash_nif", 1, zlib_clearStash}, + {"setStash_nif", 2, zlib_setStash}, + + /* DEPRECATED: buffer size for inflateChunk */ + {"getBufSize_nif", 1, zlib_getBufSize}, + {"setBufSize_nif", 2, zlib_setBufSize}, + + {"enqueue_nif", 2, zlib_enqueue_input}, +}; + +ERL_NIF_INIT(zlib, nif_funcs, load, NULL, upgrade, unload) + +static void gc_zlib(ErlNifEnv *env, void* data); + +static int load(ErlNifEnv *env, void** priv_data, ERL_NIF_TERM load_info) +{ + am_not_on_controlling_process = + enif_make_atom(env, "not_on_controlling_process"); + + am_not_initialized = enif_make_atom(env, "not_initialized"); + am_already_initialized = enif_make_atom(env, "already_initialized"); + + am_ok = enif_make_atom(env, "ok"); + am_error = enif_make_atom(env, "error"); + + am_continue = enif_make_atom(env, "continue"); + am_finished = enif_make_atom(env, "finished"); + + am_not_supported = enif_make_atom(env, "not_supported"); + am_need_dictionary = enif_make_atom(env, "need_dictionary"); + + am_empty = enif_make_atom(env, "empty"); + + am_stream_end = enif_make_atom(env, "stream_end"); + am_stream_error = enif_make_atom(env, "stream_error"); + am_data_error = enif_make_atom(env, "data_error"); + am_mem_error = enif_make_atom(env, "mem_error"); + am_buf_error = enif_make_atom(env, "buf_error"); + am_version_error = enif_make_atom(env, "version_error"); + am_unknown_error = enif_make_atom(env, "unknown_error"); + + rtype_zlib = enif_open_resource_type(env, NULL, + "gc_zlib", gc_zlib, ERL_NIF_RT_CREATE, NULL); + *priv_data = NULL; + + return 0; +} + +static void unload(ErlNifEnv *env, void* priv_data) +{ + +} + +static int upgrade(ErlNifEnv *env, void** priv_data, void** old_priv_data, ERL_NIF_TERM load_info) +{ + if(*old_priv_data != NULL) { + return -1; /* Don't know how to do that */ + } + if(*priv_data != NULL) { + return -1; /* Don't know how to do that */ + } + if(load(env, priv_data, load_info)) { + return -1; + } + return 0; +} + +static void* zlib_alloc(void* data, unsigned int items, unsigned int size) +{ + return (void*) enif_alloc(items * size); +} + +static void zlib_free(void* data, void* addr) +{ + enif_free(addr); +} + +static ERL_NIF_TERM zlib_return(ErlNifEnv *env, int code) { + ERL_NIF_TERM reason; + switch(code) { + case Z_OK: + reason = am_ok; + break; + case Z_STREAM_END: + reason = am_stream_end; + break; + case Z_ERRNO: + reason = enif_make_int(env, errno); + break; + case Z_STREAM_ERROR: + reason = enif_raise_exception(env, am_stream_error); + break; + case Z_DATA_ERROR: + reason = enif_raise_exception(env, am_data_error); + break; + case Z_MEM_ERROR: + reason = am_mem_error; + break; + case Z_BUF_ERROR: + reason = am_buf_error; + break; + case Z_VERSION_ERROR: + reason = am_version_error; + break; + default: + reason = am_unknown_error; + break; + } + return reason; +} + +static void gc_zlib(ErlNifEnv *env, void* data) { + zlib_data_t *d = (zlib_data_t*)data; + + if(d->state == ST_DEFLATE) { + deflateEnd(&d->s); + } else if(d->state == ST_INFLATE) { + inflateEnd(&d->s); + } + + if(d->state != ST_CLOSED) { + enif_ioq_destroy(d->input_queue); + + if(d->stash_env != NULL) { + enif_free_env(d->stash_env); + } + + d->state = ST_CLOSED; + } +} + +static int get_zlib_data(ErlNifEnv *env, ERL_NIF_TERM opaque, zlib_data_t **d) { + return enif_get_resource(env, opaque, rtype_zlib, (void **)d); +} + +static int zlib_process_check(ErlNifEnv *env, zlib_data_t *d) { + ErlNifPid current_process; + + enif_self(env, ¤t_process); + + return enif_is_identical(enif_make_pid(env, ¤t_process), + enif_make_pid(env, &d->controlling_process)); +} + +static void zlib_reset_input(zlib_data_t *d) { + enif_ioq_destroy(d->input_queue); + d->input_queue = enif_ioq_create(ERL_NIF_IOQ_NORMAL); + + if(d->stash_env != NULL) { + enif_free_env(d->stash_env); + d->stash_env = NULL; + d->stash_term = NIL; + } +} + +static int zlib_flush_queue(int (*codec)(z_stream*, int), ErlNifEnv *env, + zlib_data_t *d, size_t input_limit, ErlNifBinary *output_buffer, int flush, + size_t *bytes_produced, size_t *bytes_consumed, size_t *bytes_remaining) { + + int vec_len, vec_idx; + SysIOVec *input_vec; + int res; + + input_vec = enif_ioq_peek(d->input_queue, &vec_len); + vec_idx = 0; + res = Z_OK; + + *bytes_produced = 0; + *bytes_consumed = 0; + + d->s.avail_out = output_buffer->size; + d->s.next_out = output_buffer->data; + + while(res == Z_OK && vec_idx < vec_len && *bytes_consumed < input_limit) { + size_t timeslice_percent, block_consumed, block_size; + + block_size = MIN(input_vec[vec_idx].iov_len, input_limit); + + d->s.next_in = input_vec[vec_idx].iov_base; + d->s.avail_in = block_size; + + res = codec(&d->s, Z_NO_FLUSH); + + ASSERT(d->s.avail_in == 0 || d->s.avail_out == 0 || res != Z_OK); + + block_consumed = block_size - d->s.avail_in; + *bytes_consumed += block_consumed; + + if(d->want_input_crc) { + d->input_crc = + crc32(d->input_crc, input_vec[vec_idx].iov_base, block_consumed); + } + + timeslice_percent = (100 * block_consumed) / input_limit; + if(enif_consume_timeslice(env, MAX(1, timeslice_percent))) { + break; + } + + vec_idx++; + } + + if(!enif_ioq_deq(d->input_queue, *bytes_consumed, bytes_remaining)) { + *bytes_remaining = 0; + res = Z_BUF_ERROR; + } + + if(res == Z_OK && flush != Z_NO_FLUSH && (*bytes_remaining == 0)) { + d->s.next_in = NULL; + d->s.avail_in = 0; + + res = codec(&d->s, flush); + } + + *bytes_produced = output_buffer->size - d->s.avail_out; + + return res; +} + +static ERL_NIF_TERM zlib_codec(int (*codec)(z_stream*, int), + ErlNifEnv *env, zlib_data_t *d, + int input_chunk_size, + int output_chunk_size, + int flush) { + + size_t bytes_produced, bytes_consumed, bytes_remaining; + ErlNifBinary output_buffer; + int res; + + if(!enif_alloc_binary(output_chunk_size, &output_buffer)) { + return zlib_return(env, Z_MEM_ERROR); + } + + res = zlib_flush_queue(codec, env, d, input_chunk_size, &output_buffer, + flush, &bytes_produced, &bytes_consumed, &bytes_remaining); + + if(res < 0 && res != Z_BUF_ERROR) { + enif_release_binary(&output_buffer); + return zlib_return(env, res); + } + + if(res == Z_STREAM_END) { + d->eos_seen = 1; + } + + if(d->want_output_crc) { + d->output_crc = + crc32(d->output_crc, output_buffer.data, bytes_produced); + } + + if(bytes_consumed == 0 && bytes_produced == 0 && bytes_remaining != 0) { + /* Die if we've made zero progress; this should not happen on + * well-formed input. */ + + enif_release_binary(&output_buffer); + return zlib_return(env, Z_DATA_ERROR); + } else { + ERL_NIF_TERM flushed_output; + + if(bytes_produced > 0) { + if(bytes_produced < output_buffer.size) { + enif_realloc_binary(&output_buffer, bytes_produced); + } + + flushed_output = + enif_make_list1(env, enif_make_binary(env, &output_buffer)); + } else { + enif_release_binary(&output_buffer); + flushed_output = enif_make_list(env, 0); + } + + if(bytes_remaining == 0 && bytes_produced < output_chunk_size) { + return enif_make_tuple2(env, am_finished, flushed_output); + } else if(res != Z_NEED_DICT) { + return enif_make_tuple2(env, am_continue, flushed_output); + } + + return enif_make_tuple3(env, am_need_dictionary, + enif_make_int(env, d->s.adler), flushed_output); + } +} + +/* zlib nifs */ + +static ERL_NIF_TERM zlib_getStash(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + + if(argc != 1 || !get_zlib_data(env, argv[0], &d)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } + + if(d->stash_env == NULL) { + return am_empty; + } + + return enif_make_tuple2(env, am_ok, enif_make_copy(env, d->stash_term)); +} + +static ERL_NIF_TERM zlib_clearStash(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + + if(argc != 1 || !get_zlib_data(env, argv[0], &d)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->stash_env == NULL) { + return enif_raise_exception(env, am_error); + } + + enif_free_env(d->stash_env); + d->stash_env = NULL; + d->stash_term = NIL; + + return am_ok; +} + +static ERL_NIF_TERM zlib_setStash(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + + if(argc != 2 || !get_zlib_data(env, argv[0], &d)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->stash_env != NULL) { + return enif_raise_exception(env, am_error); + } + + d->stash_env = enif_alloc_env(); + d->stash_term = enif_make_copy(d->stash_env, argv[1]); + + return am_ok; +} + +static ERL_NIF_TERM zlib_open(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + ERL_NIF_TERM result; + + d = (zlib_data_t *) enif_alloc_resource(rtype_zlib, sizeof(zlib_data_t)); + + memset(&d->s, 0, sizeof(z_stream)); + + enif_self(env, &d->controlling_process); + + d->input_queue = enif_ioq_create(ERL_NIF_IOQ_NORMAL); + + d->s.zalloc = zlib_alloc; + d->s.zfree = zlib_free; + d->s.opaque = d; + d->s.data_type = Z_BINARY; + + d->state = ST_NONE; + d->eos_seen = 0; + + d->want_output_crc = 0; + d->want_input_crc = 0; + d->is_raw_stream = 0; + + d->output_crc = crc32(0L, Z_NULL, 0); + d->input_crc = crc32(0L, Z_NULL, 0); + + d->stash_env = NULL; + d->stash_term = NIL; + + d->inflateChunk_buffer_size = 4000; + + result = enif_make_resource(env, d); + enif_release_resource(d); + + return result; +} + +static ERL_NIF_TERM zlib_close(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + + /* strictly speaking not needed since the gc will handle this */ + if(argc != 1 || !get_zlib_data(env, argv[0], &d)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state == ST_CLOSED) { + return enif_raise_exception(env, am_not_initialized); + } + + gc_zlib(env, d); + + return am_ok; +} + +/* deflate */ + +static ERL_NIF_TERM zlib_deflateInit(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + int level, res; + + if(argc != 2 || !get_zlib_data(env, argv[0], &d) || + !enif_get_int(env, argv[1], &level)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_NONE) { + return enif_raise_exception(env, am_already_initialized); + } + + res = deflateInit(&d->s, level); + + if(res == Z_OK) { + d->state = ST_DEFLATE; + d->eos_seen = 0; + + /* FIXME: crc32/1 is documented as returning "the current calculated + * checksum," but failed to mention that the old implementation only + * calculated it when WindowBits < 0 (See zlib_deflateInit2). + * + * We could fix this behavior by setting d->want_input_crc to 1 here, + * but we've decided to retain this quirk since the performance hit is + * quite significant. */ + d->want_output_crc = 0; + d->want_input_crc = 0; + + d->output_crc = crc32(0L, Z_NULL, 0); + d->input_crc = crc32(0L, Z_NULL, 0); + } + + return zlib_return(env, res); +} + +static ERL_NIF_TERM zlib_deflateInit2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + int level, method, windowBits, memLevel, strategy, res; + + if(argc != 6 || !get_zlib_data(env, argv[0], &d) + || !enif_get_int(env, argv[1], &level) + || !enif_get_int(env, argv[2], &method) + || !enif_get_int(env, argv[3], &windowBits) + || !enif_get_int(env, argv[4], &memLevel) + || !enif_get_int(env, argv[5], &strategy)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_NONE) { + return enif_raise_exception(env, am_already_initialized); + } + + res = deflateInit2(&d->s, level, method, windowBits, memLevel, strategy); + + if(res == Z_OK) { + d->state = ST_DEFLATE; + d->eos_seen = 0; + + d->is_raw_stream = (windowBits < 0); + + d->want_output_crc = 0; + d->want_input_crc = d->is_raw_stream; + + d->output_crc = crc32(0L, Z_NULL, 0); + d->input_crc = crc32(0L, Z_NULL, 0); + } + + return zlib_return(env, res); +} + +static ERL_NIF_TERM zlib_deflateSetDictionary(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + ErlNifBinary bin; + int res; + + if(argc != 2 || !get_zlib_data(env, argv[0], &d) + || !enif_inspect_iolist_as_binary(env, argv[1], &bin)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_DEFLATE) { + return enif_raise_exception(env, am_not_initialized); + } + + if((res = deflateSetDictionary(&d->s, bin.data, bin.size)) == Z_OK) { + uLong checksum = d->s.adler; + + /* d->s.adler is not updated in raw deflate mode, so we'll calculate it + * ourselves in case the user wants to rely on that behavior. */ + if(d->is_raw_stream) { + checksum = adler32(0, bin.data, bin.size); + } + + return enif_make_int(env, checksum); + } + + return zlib_return(env, res); +} + +static ERL_NIF_TERM zlib_deflateReset(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + int res; + + if(argc != 1 || !get_zlib_data(env, argv[0], &d)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_DEFLATE) { + return enif_raise_exception(env, am_not_initialized); + } + + res = deflateReset(&d->s); + + d->input_crc = crc32(0L, Z_NULL, 0); + d->eos_seen = 0; + + zlib_reset_input(d); + + return zlib_return(env, res); +} + +static ERL_NIF_TERM zlib_deflateEnd(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + int res; + + if(argc != 1 || !get_zlib_data(env, argv[0], &d)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_DEFLATE) { + return enif_raise_exception(env, am_not_initialized); + } + + res = deflateEnd(&d->s); + + if(res == Z_OK && enif_ioq_size(d->input_queue) > 0) { + res = Z_DATA_ERROR; + } + + zlib_reset_input(d); + d->state = ST_NONE; + + return zlib_return(env, res); +} + +static ERL_NIF_TERM zlib_deflateParams(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + int res, level, strategy; + + if(argc != 3 || !get_zlib_data(env, argv[0], &d) + || !enif_get_int(env, argv[1], &level) + || !enif_get_int(env, argv[2], &strategy)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_DEFLATE) { + return enif_raise_exception(env, am_not_initialized); + } + + /* deflateParams will flush everything currently in the stream, corrupting + * the heap unless it's empty. We therefore pretend to have a full output + * buffer, forcing a Z_BUF_ERROR if there's anything left to be flushed. */ + d->s.avail_out = 0; + res = deflateParams(&d->s, level, strategy); + + return zlib_return(env, res); +} + +static ERL_NIF_TERM zlib_deflate(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + + int input_chunk_size, output_chunk_size, flush; + + if(argc != 4 || !get_zlib_data(env, argv[0], &d) + || !enif_get_int(env, argv[1], &input_chunk_size) + || !enif_get_int(env, argv[2], &output_chunk_size) + || !enif_get_int(env, argv[3], &flush)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_DEFLATE) { + return enif_raise_exception(env, am_not_initialized); + } + + return zlib_codec(&deflate, env, d, input_chunk_size, output_chunk_size, flush); +} + +/* inflate */ + +static ERL_NIF_TERM zlib_inflateInit(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + int res; + + if(argc != 1 || !get_zlib_data(env, argv[0], &d)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_NONE) { + return enif_raise_exception(env, am_already_initialized); + } + + res = inflateInit(&d->s); + + if(res == Z_OK) { + d->state = ST_INFLATE; + d->eos_seen = 0; + + d->want_output_crc = 0; + d->want_input_crc = 0; + d->is_raw_stream = 0; + + d->output_crc = crc32(0L, Z_NULL, 0); + d->input_crc = crc32(0L, Z_NULL, 0); + } + + return zlib_return(env, res); +} + +static ERL_NIF_TERM zlib_inflateInit2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + int windowBits, res; + + if(argc != 2 || !get_zlib_data(env, argv[0], &d) + || !enif_get_int(env, argv[1], &windowBits)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_NONE) { + return enif_raise_exception(env, am_already_initialized); + } + + res = inflateInit2(&d->s, windowBits); + + if(res == Z_OK) { + d->state = ST_INFLATE; + d->eos_seen = 0; + + d->is_raw_stream = (windowBits < 0); + + d->want_output_crc = d->is_raw_stream; + d->want_input_crc = 0; + + d->output_crc = crc32(0L, Z_NULL, 0); + d->input_crc = crc32(0L, Z_NULL, 0); + } + + return zlib_return(env, res); +} + +static ERL_NIF_TERM zlib_inflateSetDictionary(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + ErlNifBinary bin; + int res; + + if(argc != 2 || !get_zlib_data(env, argv[0], &d) + || !enif_inspect_iolist_as_binary(env, argv[1], &bin)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_INFLATE) { + return enif_raise_exception(env, am_not_initialized); + } + + res = inflateSetDictionary(&d->s, bin.data, bin.size); + + return zlib_return(env, res); +} + +#ifdef HAVE_ZLIB_INFLATEGETDICTIONARY +/* Work around broken build system with runtime version test */ +static int zlib_supports_inflateGetDictionary(void) { + static int supportsGetDictionary = -1; + +#if defined(__APPLE__) && defined(__MACH__) + if(supportsGetDictionary < 0) { + unsigned int v[4] = {0, 0, 0, 0}; + unsigned hexver; + + sscanf(zlibVersion(), "%u.%u.%u.%u", &v[0], &v[1], &v[2], &v[3]); + + hexver = (v[0] << (8*3)) | (v[1] << (8*2)) | (v[2] << (8)) | v[3]; + supportsGetDictionary = (hexver >= 0x1020701); /* 1.2.7.1 */ + } +#endif + + return supportsGetDictionary; +} +#endif + +static ERL_NIF_TERM zlib_inflateGetDictionary(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + + if(argc != 1 || !get_zlib_data(env, argv[0], &d)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_INFLATE) { + return enif_raise_exception(env, am_not_initialized); + } + +#ifdef HAVE_ZLIB_INFLATEGETDICTIONARY + if(zlib_supports_inflateGetDictionary()) { + ErlNifBinary obin; + uInt len; + int res; + + enif_alloc_binary(INFL_DICT_SZ, &obin); + len = 0; + + if((res = inflateGetDictionary(&d->s, obin.data, &len)) < 0) { + enif_release_binary(&obin); + return zlib_return(env, res); + } + + enif_realloc_binary(&obin, (size_t)len); + return enif_make_binary(env, &obin); + } +#endif + + return enif_raise_exception(env, am_not_supported); +} + +static ERL_NIF_TERM zlib_inflateReset(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + int res; + + if(argc != 1 || !get_zlib_data(env, argv[0], &d)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_INFLATE) { + return enif_raise_exception(env, am_not_initialized); + } + + res = inflateReset(&d->s); + + d->output_crc = crc32(0L, Z_NULL, 0); + d->eos_seen = 0; + + zlib_reset_input(d); + + return zlib_return(env, res); +} + +static ERL_NIF_TERM zlib_inflateEnd(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + int res; + + if(argc != 1 || !get_zlib_data(env, argv[0], &d)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_INFLATE) { + return enif_raise_exception(env, am_not_initialized); + } + + res = inflateEnd(&d->s); + + if(res == Z_OK && (!d->eos_seen || enif_ioq_size(d->input_queue) > 0)) { + res = Z_DATA_ERROR; + } + + zlib_reset_input(d); + d->state = ST_NONE; + + return zlib_return(env, res); +} + +static ERL_NIF_TERM zlib_inflate(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + + int input_chunk_size, output_chunk_size, flush; + + if(argc != 4 || !get_zlib_data(env, argv[0], &d) + || !enif_get_int(env, argv[1], &input_chunk_size) + || !enif_get_int(env, argv[2], &output_chunk_size) + || !enif_get_int(env, argv[3], &flush)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_INFLATE) { + return enif_raise_exception(env, am_not_initialized); + } + + return zlib_codec(&inflate, env, d, input_chunk_size, output_chunk_size, flush); +} + +static ERL_NIF_TERM zlib_crc32(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + + if(argc != 1 || !get_zlib_data(env, argv[0], &d)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } + + if(d->state == ST_DEFLATE) { + return enif_make_ulong(env, d->input_crc); + } else if(d->state == ST_INFLATE) { + return enif_make_ulong(env, d->output_crc); + } + + return enif_raise_exception(env, am_not_initialized); +} + +static ERL_NIF_TERM zlib_getBufSize(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + + if(argc != 1 || !get_zlib_data(env, argv[0], &d)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } + + return enif_make_int(env, d->inflateChunk_buffer_size); +} + +static ERL_NIF_TERM zlib_setBufSize(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + + if(argc != 2 || !get_zlib_data(env, argv[0], &d)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } + + if(!enif_get_int(env, argv[1], &d->inflateChunk_buffer_size)) { + return enif_make_badarg(env); + } + + return am_ok; +} + +static ERL_NIF_TERM zlib_enqueue_input(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { + zlib_data_t *d; + + ErlNifIOVec prealloc, *iovec = &prealloc; + ERL_NIF_TERM tail; + + if(argc != 2 || !get_zlib_data(env, argv[0], &d)) { + return enif_make_badarg(env); + } else if(!zlib_process_check(env, d)) { + return enif_raise_exception(env, am_not_on_controlling_process); + } else if(d->state != ST_DEFLATE && d->state != ST_INFLATE) { + return enif_raise_exception(env, am_not_initialized); + } + + if(!enif_inspect_iovec(env, 256, argv[1], &tail, &iovec)) { + return enif_make_badarg(env); + } else if(!enif_ioq_enqv(d->input_queue, iovec, 0)) { + return enif_make_badarg(env); + } + + if(!enif_is_empty_list(env, tail)) { + return enif_make_tuple2(env, am_continue, tail); + } + + return am_ok; +} diff --git a/erts/emulator/test/Makefile b/erts/emulator/test/Makefile index 370fcb0f3a..b17170c8b8 100644 --- a/erts/emulator/test/Makefile +++ b/erts/emulator/test/Makefile @@ -71,6 +71,7 @@ MODULES= \ hash_SUITE \ hibernate_SUITE \ hipe_SUITE \ + iovec_SUITE \ list_bif_SUITE \ lttng_SUITE \ lcnt_SUITE \ diff --git a/erts/emulator/test/iovec_SUITE.erl b/erts/emulator/test/iovec_SUITE.erl new file mode 100644 index 0000000000..a5f605bfff --- /dev/null +++ b/erts/emulator/test/iovec_SUITE.erl @@ -0,0 +1,168 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2017. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(iovec_SUITE). + +-export([all/0, suite/0]). + +-export([integer_lists/1, binary_lists/1, empty_lists/1, empty_binary_lists/1, + mixed_lists/1, improper_lists/1, illegal_lists/1, cons_bomb/1, + iolist_to_iovec_idempotence/1, iolist_to_iovec_correctness/1]). + +-include_lib("common_test/include/ct.hrl"). + +suite() -> + [{ct_hooks,[ts_install_cth]}, + {timetrap, {minutes, 2}}]. + +all() -> + [integer_lists, binary_lists, empty_lists, empty_binary_lists, mixed_lists, + illegal_lists, improper_lists, cons_bomb, iolist_to_iovec_idempotence, + iolist_to_iovec_correctness]. + +integer_lists(Config) when is_list(Config) -> + Variations = gen_variations([I || I <- lists:seq(1, 255)]), + + equivalence_test(fun erlang:iolist_to_iovec/1, Variations), + + ok. + +binary_lists(Config) when is_list(Config) -> + Variations = gen_variations([<<I:8>> || I <- lists:seq(1, 255)]), + equivalence_test(fun erlang:iolist_to_iovec/1, Variations), + ok. + +empty_lists(Config) when is_list(Config) -> + Variations = gen_variations([[] || _ <- lists:seq(1, 256)]), + equivalence_test(fun erlang:iolist_to_iovec/1, Variations), + [] = erlang:iolist_to_iovec([]), + ok. + +empty_binary_lists(Config) when is_list(Config) -> + Variations = gen_variations([<<>> || _ <- lists:seq(1, 8192)]), + equivalence_test(fun erlang:iolist_to_iovec/1, Variations), + [] = erlang:iolist_to_iovec(Variations), + ok. + +mixed_lists(Config) when is_list(Config) -> + Variations = gen_variations([<<>>, lists:seq(1, 40), <<12, 45, 78>>]), + equivalence_test(fun erlang:iolist_to_iovec/1, Variations), + ok. + +illegal_lists(Config) when is_list(Config) -> + BitStrs = gen_variations(["gurka", <<1:1>>, "gaffel"]), + BadInts = gen_variations(["gurka", 890, "gaffel"]), + Atoms = gen_variations([gurka, "gaffel"]), + BadTails = [["test" | 0], ["gurka", gaffel]], + + Variations = + BitStrs ++ BadInts ++ Atoms ++ BadTails, + + illegality_test(fun erlang:iolist_to_iovec/1, Variations), + + ok. + +improper_lists(Config) when is_list(Config) -> + Variations = [ + [[[[1 | <<2>>] | <<3>>] | <<4>>] | <<5>>], + [[<<"test">>, 3] | <<"improper tail">>], + [1, 2, 3 | <<"improper tail">>] + ], + equivalence_test(fun erlang:iolist_to_iovec/1, Variations), + ok. + +cons_bomb(Config) when is_list(Config) -> + IntBase = gen_variations([I || I <- lists:seq(1, 255)]), + BinBase = gen_variations([<<I:8>> || I <- lists:seq(1, 255)]), + MixBase = gen_variations([<<12, 45, 78>>, lists:seq(1, 255)]), + + Rounds = + case system_mem_size() of + Mem when Mem >= (16 bsl 30) -> 32; + Mem when Mem >= (3 bsl 30) -> 28; + _ -> 20 + end, + + Variations = gen_variations([IntBase, BinBase, MixBase], Rounds), + equivalence_test(fun erlang:iolist_to_iovec/1, Variations), + ok. + +iolist_to_iovec_idempotence(Config) when is_list(Config) -> + IntVariations = gen_variations([I || I <- lists:seq(1, 255)]), + BinVariations = gen_variations([<<I:8>> || I <- lists:seq(1, 255)]), + MixVariations = gen_variations([<<12, 45, 78>>, lists:seq(1, 255)]), + + Variations = [IntVariations, BinVariations, MixVariations], + Optimized = erlang:iolist_to_iovec(Variations), + + true = Optimized =:= erlang:iolist_to_iovec(Optimized), + ok. + +iolist_to_iovec_correctness(Config) when is_list(Config) -> + IntVariations = gen_variations([I || I <- lists:seq(1, 255)]), + BinVariations = gen_variations([<<I:8>> || I <- lists:seq(1, 255)]), + MixVariations = gen_variations([<<12, 45, 78>>, lists:seq(1, 255)]), + + Variations = [IntVariations, BinVariations, MixVariations], + Optimized = erlang:iolist_to_iovec(Variations), + + true = is_iolist_equal(Optimized, Variations), + ok. + +illegality_test(Fun, Variations) -> + [{'EXIT',{badarg, _}} = (catch Fun(Variation)) || Variation <- Variations]. + +equivalence_test(Fun, [Head | _] = Variations) -> + Comparand = Fun(Head), + [is_iolist_equal(Comparand, Fun(Variation)) || Variation <- Variations], + ok. + +is_iolist_equal(A, B) -> + iolist_to_binary(A) =:= iolist_to_binary(B). + +%% Generates a bunch of lists whose contents will be equal to Base repeated a +%% few times. The lists only differ by their structure, so their reduction to +%% a simpler format should yield the same result. +gen_variations(Base) -> + gen_variations(Base, 16). +gen_variations(Base, N) -> + [gen_flat_list(Base, N), + gen_nested_list(Base, N), + gen_nasty_list(Base, N)]. + +gen_flat_list(Base, N) -> + lists:flatten(gen_nested_list(Base, N)). + +gen_nested_list(Base, N) -> + [Base || _ <- lists:seq(1, N)]. + +gen_nasty_list(Base, N) -> + gen_nasty_list_1(gen_nested_list(Base, N), []). +gen_nasty_list_1([], Result) -> + Result; +gen_nasty_list_1([Head | Base], Result) when is_list(Head) -> + gen_nasty_list_1(Base, [[Result], [gen_nasty_list_1(Head, [])]]); +gen_nasty_list_1([Head | Base], Result) -> + gen_nasty_list_1(Base, [[Result], [Head]]). + +system_mem_size() -> + application:ensure_all_started(os_mon), + {Tot,_Used,_} = memsup:get_memory_data(), + Tot. diff --git a/erts/emulator/test/nif_SUITE.erl b/erts/emulator/test/nif_SUITE.erl index 0337274178..4811244b98 100644 --- a/erts/emulator/test/nif_SUITE.erl +++ b/erts/emulator/test/nif_SUITE.erl @@ -61,7 +61,8 @@ nif_internal_hash_salted/1, nif_phash2/1, nif_whereis/1, nif_whereis_parallel/1, - nif_whereis_threaded/1, nif_whereis_proxy/1 + nif_whereis_threaded/1, nif_whereis_proxy/1, + nif_ioq/1 ]). -export([many_args_100/100]). @@ -99,7 +100,8 @@ all() -> nif_internal_hash, nif_internal_hash_salted, nif_phash2, - nif_whereis, nif_whereis_parallel, nif_whereis_threaded]. + nif_whereis, nif_whereis_parallel, nif_whereis_threaded, + nif_ioq]. groups() -> [{G, [], api_repeaters()} || G <- api_groups()] @@ -2957,6 +2959,180 @@ nif_whereis_proxy(Ref) -> {Ref, quit} -> ok end. +nif_ioq(Config) -> + ensure_lib_loaded(Config), + + Script = + [{create, a}, + + %% Test enq of erlang term binary + {enqb, a}, + {enqb, a, 3}, + + %% Test enq of non-erlang term binary + {enqbraw,a}, + {enqbraw,a, 5}, + {peek, a}, + {deq, a, 42}, + + %% Test enqv + {enqv, a, 2, 100}, + {deq, a, all}, + + %% This skips all elements but one in the iolist + {enqv, a, 5, iolist_size(nif_ioq_payload(5)) - 1}, + {peek, a}, + + %% Test to enqueue a bunch of refc binaries + {enqv, a, [nif_ioq_payload(refcbin) || _ <- lists:seq(1,20)], 0}, + + %% Enq stuff to destroy with data in queue + {enqv, a, 2, 100}, + {destroy,a}, + + %% Test destroy of new queue + {create, a}, + {destroy,a} + ], + + nif_ioq_run(Script), + + %% Test that only enif_inspect_as_vec works + Payload = nif_ioq_payload(5), + PayloadBin = iolist_to_binary(Payload), + + [begin + PayloadBin = iolist_to_binary(ioq_nif(inspect,Payload,Stack,Env)), + <<>> = iolist_to_binary(ioq_nif(inspect,[],Stack,Env)) + end || Stack <- [no_stack, use_stack], Env <- [use_env, no_env]], + + %% Test error cases + + Q = ioq_nif(create), + + {'EXIT', {badarg, _}} = (catch ioq_nif(deq, Q, 1)), + {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, 1, 1234)), + + {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, [atom_in_list], 0)), + {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, [make_ref()], 0)), + {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, [256], 0)), + {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, [-1], 0)), + {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, [#{}], 0)), + {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, [1 bsl 64], 0)), + {'EXIT', {badarg, _}} = (catch ioq_nif(enqv, Q, [{tuple}], 0)), + + {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, [atom_in_list], use_stack)), + {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, [make_ref()], no_stack)), + {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, [256], use_stack)), + {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, [-1], no_stack)), + {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, [#{}], use_stack)), + {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, [1 bsl 64], no_stack)), + {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, [{tuple}], use_stack)), + {'EXIT', {badarg, _}} = (catch ioq_nif(inspect, <<"binary">>, use_stack)), + + ioq_nif(destroy, Q), + + %% Test that the example in the docs works + ExampleQ = ioq_nif(create), + true = ioq_nif(example, ExampleQ, nif_ioq_payload(5)), + ioq_nif(destroy, ExampleQ), + + ok. + + +nif_ioq_run(Script) -> + nif_ioq_run(Script, #{}). + +nif_ioq_run([{Action, Name}|T], State) + when Action =:= enqb; Action =:= enqbraw -> + nif_ioq_run([{Action, Name, heapbin}|T], State); +nif_ioq_run([{Action, Name, Skip}|T], State) + when Action =:= enqb, is_integer(Skip); + Action =:= enqbraw, is_integer(Skip) -> + nif_ioq_run([{Action, Name, heapbin, Skip}|T], State); +nif_ioq_run([{Action, Name, N}|T], State) + when Action =:= enqv; Action =:= enqb; Action =:= enqbraw -> + nif_ioq_run([{Action, Name, N, 0}|T], State); +nif_ioq_run([{Action, Name, N, Skip}|T], State) + when Action =:= enqv; Action =:= enqb; Action =:= enqbraw -> + + #{ q := IOQ, b := B } = Q = maps:get(Name, State), + true = ioq_nif(size, IOQ) == iolist_size(B), + + %% Sanitize the log output a bit so that it doesn't become too large. + H = {Action, Name, try iolist_size(N) of Sz -> Sz catch _:_ -> N end, Skip}, + ct:log("~p", [H]), + + Data = nif_ioq_payload(N), + ioq_nif(Action, IOQ, Data, Skip), + + <<_:Skip/binary, SkippedData/binary>> = iolist_to_binary(Data), + + true = ioq_nif(size, IOQ) == (iolist_size([B|SkippedData])), + + nif_ioq_run(T, State#{ Name := Q#{ b := [B|SkippedData]}}); +nif_ioq_run([{peek, Name} = H|T], State) -> + #{ q := IOQ, b := B } = maps:get(Name, State), + true = ioq_nif(size, IOQ) == iolist_size(B), + + ct:log("~p", [H]), + + Data = ioq_nif(peek, IOQ, ioq_nif(size, IOQ)), + + true = iolist_to_binary(B) == iolist_to_binary(Data), + nif_ioq_run(T, State); +nif_ioq_run([{deq, Name, all}|T], State) -> + #{ q := IOQ, b := B } = maps:get(Name, State), + Size = ioq_nif(size, IOQ), + true = Size == iolist_size(B), + nif_ioq_run([{deq, Name, Size}|T], State); +nif_ioq_run([{deq, Name, N} = H|T], State) -> + #{ q := IOQ, b := B } = Q = maps:get(Name, State), + true = ioq_nif(size, IOQ) == iolist_size(B), + + ct:log("~p", [H]), + + <<_:N/binary,Remain/binary>> = iolist_to_binary(B), + NewQ = Q#{ b := Remain }, + + Sz = ioq_nif(deq, IOQ, N), + + true = Sz == iolist_size(Remain), + true = ioq_nif(size, IOQ) == iolist_size(Remain), + + nif_ioq_run(T, State#{ Name := NewQ }); +nif_ioq_run([{create, Name} = H|T], State) -> + ct:log("~p", [H]), + nif_ioq_run(T, State#{ Name => #{ q => ioq_nif(create), b => [] } }); +nif_ioq_run([{destroy, Name} = H|T], State) -> + #{ q := IOQ, b := B } = maps:get(Name, State), + true = ioq_nif(size, IOQ) == iolist_size(B), + + ct:log("~p", [H]), + + ioq_nif(destroy, IOQ), + + nif_ioq_run(T, maps:remove(Name, State)); +nif_ioq_run([], State) -> + State. + +nif_ioq_payload(N) when is_integer(N) -> + Tail = if N > 3 -> nif_ioq_payload(N-3); true -> [] end, + Head = element(1, lists:split(N,[nif_ioq_payload(subbin), + nif_ioq_payload(heapbin), + nif_ioq_payload(refcbin) | Tail])), + erlang:iolist_to_iovec(Head); +nif_ioq_payload(subbin) -> + Bin = nif_ioq_payload(refcbin), + Sz = size(Bin) - 1, + <<_:8,SubBin:Sz/binary,_/bits>> = Bin, + SubBin; +nif_ioq_payload(heapbin) -> + <<"a literal heap binary">>; +nif_ioq_payload(refcbin) -> + iolist_to_binary([lists:seq(1,255) || _ <- lists:seq(1,255)]); +nif_ioq_payload(Else) -> + Else. %% The NIFs: lib_version() -> undefined. @@ -3032,6 +3208,10 @@ monitor_process_nif(_,_,_,_) -> ?nif_stub. demonitor_process_nif(_,_) -> ?nif_stub. compare_monitors_nif(_,_) -> ?nif_stub. monitor_frenzy_nif(_,_,_,_) -> ?nif_stub. +ioq_nif(_) -> ?nif_stub. +ioq_nif(_,_) -> ?nif_stub. +ioq_nif(_,_,_) -> ?nif_stub. +ioq_nif(_,_,_,_) -> ?nif_stub. %% whereis whereis_send(_Type,_Name,_Msg) -> ?nif_stub. diff --git a/erts/emulator/test/nif_SUITE_data/nif_SUITE.c b/erts/emulator/test/nif_SUITE_data/nif_SUITE.c index 307d1c390f..b47d013bd2 100644 --- a/erts/emulator/test/nif_SUITE_data/nif_SUITE.c +++ b/erts/emulator/test/nif_SUITE_data/nif_SUITE.c @@ -186,6 +186,12 @@ static ErlNifResourceTypeInit frenzy_rt_init = { static ErlNifResourceType* whereis_resource_type; static void whereis_thread_resource_dtor(ErlNifEnv* env, void* obj); +static ErlNifResourceType* ioq_resource_type; + +static void ioq_resource_dtor(ErlNifEnv* env, void* obj); +struct ioq_resource { + ErlNifIOQueue *q; +}; static int get_pointer(ErlNifEnv* env, ERL_NIF_TERM term, void** pp) { @@ -243,6 +249,10 @@ static int load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) whereis_resource_type = enif_open_resource_type(env, NULL, "nif_SUITE.whereis", whereis_thread_resource_dtor, ERL_NIF_RT_CREATE, NULL); + ioq_resource_type = enif_open_resource_type(env,NULL,"ioq", + ioq_resource_dtor, + ERL_NIF_RT_CREATE, NULL); + atom_false = enif_make_atom(env,"false"); atom_true = enif_make_atom(env,"true"); atom_self = enif_make_atom(env,"self"); @@ -2430,7 +2440,6 @@ static ERL_NIF_TERM format_term(ErlNifEnv* env, int argc, const ERL_NIF_TERM arg return enif_make_binary(env,&obin); } - static int get_fd(ErlNifEnv* env, ERL_NIF_TERM term, struct fd_resource** rsrc) { if (!enif_get_resource(env, term, fd_resource_type, (void**)rsrc)) { @@ -3158,7 +3167,231 @@ static void frenzy_resource_down(ErlNifEnv* env, void* obj, ErlNifPid* pid, abort(); } +/*********** testing ioq ************/ + +static void ioq_resource_dtor(ErlNifEnv* env, void* obj) { + +} + +#ifndef __WIN32__ +static int writeiovec(ErlNifEnv *env, ERL_NIF_TERM term, ERL_NIF_TERM *tail, ErlNifIOQueue *q, int fd) { + ErlNifIOVec vec, *iovec = &vec; + SysIOVec *sysiovec; + int saved_errno; + int iovcnt, n; + + if (!enif_inspect_iovec(env, 64, term, tail, &iovec)) + return -2; + + if (enif_ioq_size(q) > 0) { + /* If the I/O queue contains data we enqueue the iovec and then + peek the data to write out of the queue. */ + if (!enif_ioq_enqv(q, iovec, 0)) + return -3; + + sysiovec = enif_ioq_peek(q, &iovcnt); + } else { + /* If the I/O queue is empty we skip the trip through it. */ + iovcnt = iovec->iovcnt; + sysiovec = iovec->iov; + } + + /* Attempt to write the data */ + n = writev(fd, sysiovec, iovcnt); + saved_errno = errno; + + if (enif_ioq_size(q) == 0) { + /* If the I/O queue was initially empty we enqueue any + remaining data into the queue for writing later. */ + if (n >= 0 && !enif_ioq_enqv(q, iovec, n)) + return -3; + } else { + /* Dequeue any data that was written from the queue. */ + if (n > 0 && !enif_ioq_deq(q, n, NULL)) + return -4; + } + + /* return n, which is either number of bytes written or -1 if + some error happened */ + errno = saved_errno; + return n; +} +#endif + +static ERL_NIF_TERM ioq(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + struct ioq_resource *ioq; + ERL_NIF_TERM ret; + if (enif_is_identical(argv[0], enif_make_atom(env, "create"))) { + ErlNifIOQueue *q = enif_ioq_create(ERL_NIF_IOQ_NORMAL); + ioq = (struct ioq_resource *)enif_alloc_resource(ioq_resource_type, + sizeof(*ioq)); + ioq->q = q; + ret = enif_make_resource(env, ioq); + enif_release_resource(ioq); + return ret; + } else if (enif_is_identical(argv[0], enif_make_atom(env, "inspect"))) { + ErlNifIOVec vec, *iovec = NULL; + int i, iovcnt; + ERL_NIF_TERM *elems, tail, list; + ErlNifEnv *myenv = NULL; + + if (enif_is_identical(argv[2], enif_make_atom(env, "use_stack"))) + iovec = &vec; + if (enif_is_identical(argv[3], enif_make_atom(env, "use_env"))) + myenv = env; + if (!enif_inspect_iovec(myenv, ~(size_t)0, argv[1], &tail, &iovec)) + return enif_make_badarg(env); + + iovcnt = iovec->iovcnt; + elems = enif_alloc(sizeof(ERL_NIF_TERM) * iovcnt); + + for (i = 0; i < iovcnt; i++) { + ErlNifBinary bin; + if (!enif_alloc_binary(iovec->iov[i].iov_len, &bin)) { + enif_free_iovec(iovec); + enif_free(elems); + return enif_make_badarg(env); + } + memcpy(bin.data, iovec->iov[i].iov_base, iovec->iov[i].iov_len); + elems[i] = enif_make_binary(env, &bin); + } + + if (!myenv) + enif_free_iovec(iovec); + + list = enif_make_list_from_array(env, elems, iovcnt); + enif_free(elems); + return list; + } else { + unsigned skip; + if (!enif_get_resource(env, argv[1], ioq_resource_type, (void**)&ioq) + || !ioq->q) + return enif_make_badarg(env); + + if (enif_is_identical(argv[0], enif_make_atom(env, "example"))) { +#ifndef __WIN32__ + int fd[2], res = 0, cnt = 0, queue_cnt; + ERL_NIF_TERM tail; + char buff[255]; + pipe(fd); + fcntl(fd[0], F_SETFL, fcntl(fd[0], F_GETFL) | O_NONBLOCK); + fcntl(fd[1], F_SETFL, fcntl(fd[1], F_GETFL) | O_NONBLOCK); + + /* Write until the pipe buffer is full, which should result in data + * being queued up. */ + for (res = 0; res >= 0; ) { + cnt += res; + res = writeiovec(env, argv[2], &tail, ioq->q, fd[1]); + } + + /* Flush the queue while reading from the other end of the pipe. */ + tail = enif_make_list(env, 0); + while (enif_ioq_size(ioq->q) > 0) { + res = writeiovec(env, tail, &tail, ioq->q, fd[1]); + if (res < 0 && errno != EAGAIN) { + break; + } else if (res > 0) { + cnt += res; + } + + for (res = 0; res >= 0; ) { + cnt -= res; + res = read(fd[0], buff, sizeof(buff)); + } + } + + close(fd[0]); + close(fd[1]); + + /* Check that we read as much as we wrote */ + if (cnt == 0 && enif_ioq_size(ioq->q) == 0) + return enif_make_atom(env, "true"); + + return enif_make_int(env, cnt); +#else + return enif_make_atom(env, "true"); +#endif + } + if (enif_is_identical(argv[0], enif_make_atom(env, "destroy"))) { + enif_ioq_destroy(ioq->q); + ioq->q = NULL; + return enif_make_atom(env, "false"); + } else if (enif_is_identical(argv[0], enif_make_atom(env, "enqv"))) { + ErlNifIOVec vec, *iovec = &vec; + ERL_NIF_TERM tail; + + if (!enif_get_uint(env, argv[3], &skip)) + return enif_make_badarg(env); + if (!enif_inspect_iovec(env, ~0ul, argv[2], &tail, &iovec)) + return enif_make_badarg(env); + if (!enif_ioq_enqv(ioq->q, iovec, skip)) + return enif_make_badarg(env); + + return enif_make_atom(env, "true"); + } else if (enif_is_identical(argv[0], enif_make_atom(env, "enqb"))) { + ErlNifBinary bin; + if (!enif_get_uint(env, argv[3], &skip) || + !enif_inspect_binary(env, argv[2], &bin)) + return enif_make_badarg(env); + + if (!enif_ioq_enq_binary(ioq->q, &bin, skip)) + return enif_make_badarg(env); + + return enif_make_atom(env, "true"); + } else if (enif_is_identical(argv[0], enif_make_atom(env, "enqbraw"))) { + ErlNifBinary bin; + ErlNifBinary localbin; + int i; + if (!enif_get_uint(env, argv[3], &skip) || + !enif_inspect_binary(env, argv[2], &bin) || + !enif_alloc_binary(bin.size, &localbin)) + return enif_make_badarg(env); + + memcpy(localbin.data, bin.data, bin.size); + i = enif_ioq_enq_binary(ioq->q, &localbin, skip); + if (!i) + return enif_make_badarg(env); + else + return enif_make_atom(env, "true"); + } else if (enif_is_identical(argv[0], enif_make_atom(env, "peek"))) { + int iovlen, num, i, off = 0; + SysIOVec *iov = enif_ioq_peek(ioq->q, &iovlen); + ErlNifBinary bin; + + if (!enif_get_int(env, argv[2], &num) || !enif_alloc_binary(num, &bin)) + return enif_make_badarg(env); + + for (i = 0; i < iovlen && num > 0; i++) { + int to_copy = num < iov[i].iov_len ? num : iov[i].iov_len; + memcpy(bin.data + off, iov[i].iov_base, to_copy); + num -= to_copy; + off += to_copy; + } + + return enif_make_binary(env, &bin); + } else if (enif_is_identical(argv[0], enif_make_atom(env, "deq"))) { + int num; + size_t sz; + ErlNifUInt64 sz64; + if (!enif_get_int(env, argv[2], &num)) + return enif_make_badarg(env); + + if (!enif_ioq_deq(ioq->q, num, &sz)) + return enif_make_badarg(env); + + sz64 = sz; + + return enif_make_uint64(env, sz64); + } else if (enif_is_identical(argv[0], enif_make_atom(env, "size"))) { + ErlNifUInt64 size = enif_ioq_size(ioq->q); + return enif_make_uint64(env, size); + } + } + + return enif_make_badarg(env); +} static ErlNifFunc nif_funcs[] = { @@ -3255,7 +3488,11 @@ static ErlNifFunc nif_funcs[] = {"whereis_send", 3, whereis_send}, {"whereis_term", 2, whereis_term}, {"whereis_thd_lookup", 2, whereis_thd_lookup}, - {"whereis_thd_result", 1, whereis_thd_result} + {"whereis_thd_result", 1, whereis_thd_result}, + {"ioq_nif", 1, ioq}, + {"ioq_nif", 2, ioq}, + {"ioq_nif", 3, ioq}, + {"ioq_nif", 4, ioq} }; ERL_NIF_INIT(nif_SUITE,nif_funcs,load,NULL,upgrade,unload) diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam Binary files differindex 58c17dc416..6fa48e8582 100644 --- a/erts/preloaded/ebin/erlang.beam +++ b/erts/preloaded/ebin/erlang.beam diff --git a/erts/preloaded/ebin/init.beam b/erts/preloaded/ebin/init.beam Binary files differindex 2acb1f1211..1c8d0e626a 100644 --- a/erts/preloaded/ebin/init.beam +++ b/erts/preloaded/ebin/init.beam diff --git a/erts/preloaded/ebin/zlib.beam b/erts/preloaded/ebin/zlib.beam Binary files differindex a959ebaaf2..267b5cb0a8 100644 --- a/erts/preloaded/ebin/zlib.beam +++ b/erts/preloaded/ebin/zlib.beam diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 72dd804412..f796ea64d3 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -87,6 +87,10 @@ -export_type([prepared_code/0]). +-type iovec() :: [binary()]. + +-export_type([iovec/0]). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Native code BIF stubs and their types %% (BIF's actually implemented in this module goes last in the file) @@ -124,7 +128,7 @@ has_prepared_code_on_load/1, hibernate/3]). -export([insert_element/3]). -export([integer_to_binary/1, integer_to_list/1]). --export([iolist_size/1, iolist_to_binary/1]). +-export([iolist_size/1, iolist_to_binary/1, iolist_to_iovec/1]). -export([is_alive/0, is_builtin/3, is_process_alive/1, length/1, link/1]). -export([list_to_atom/1, list_to_binary/1]). -export([list_to_bitstring/1, list_to_existing_atom/1, list_to_float/1]). @@ -1079,6 +1083,12 @@ iolist_size(_Item) -> iolist_to_binary(_IoListOrBinary) -> erlang:nif_error(undefined). +%% iolist_to_iovec/1 +-spec erlang:iolist_to_iovec(IoListOrBinary) -> iovec() when + IoListOrBinary :: iolist() | binary(). +iolist_to_iovec(_IoListOrBinary) -> + erlang:nif_error(undefined). + %% is_alive/0 -spec is_alive() -> boolean(). is_alive() -> diff --git a/erts/preloaded/src/init.erl b/erts/preloaded/src/init.erl index 1ccf8d599f..34a9f6b8b9 100644 --- a/erts/preloaded/src/init.erl +++ b/erts/preloaded/src/init.erl @@ -200,6 +200,8 @@ boot(BootArgs) -> register(init, self()), process_flag(trap_exit, true), + %% Load the zlib nif + zlib:on_load(), %% Load the tracer nif erl_tracer:on_load(), diff --git a/erts/preloaded/src/zlib.erl b/erts/preloaded/src/zlib.erl index 8cd3e39fd7..dca5a42779 100644 --- a/erts/preloaded/src/zlib.erl +++ b/erts/preloaded/src/zlib.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2003-2016. All Rights Reserved. +%% Copyright Ericsson AB 2003-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -21,19 +21,29 @@ -module(zlib). -export([open/0,close/1,deflateInit/1,deflateInit/2,deflateInit/6, - deflateSetDictionary/2,deflateReset/1,deflateParams/3, - deflate/2,deflate/3,deflateEnd/1, - inflateInit/1,inflateInit/2, - inflateSetDictionary/2,inflateGetDictionary/1, - inflateSync/1,inflateReset/1,inflate/2,inflateEnd/1, - inflateChunk/1, inflateChunk/2, - setBufSize/2,getBufSize/1, - crc32/1,crc32/2,crc32/3,adler32/2,adler32/3,getQSize/1, - crc32_combine/4,adler32_combine/4, - compress/1,uncompress/1,zip/1,unzip/1, - gzip/1,gunzip/1]). - --export_type([zstream/0, zlevel/0, zwindowbits/0, zmemlevel/0, zstrategy/0]). + deflateSetDictionary/2,deflateReset/1,deflateParams/3, + deflate/2,deflate/3,deflateEnd/1, + inflateInit/1,inflateInit/2, + inflateSetDictionary/2,inflateGetDictionary/1, inflateReset/1, + inflate/2,inflate/3,inflateEnd/1, + inflateChunk/2,inflateChunk/1, + safeInflate/2, + setBufSize/2,getBufSize/1, + crc32/1,crc32/2,crc32/3,adler32/2,adler32/3, + crc32_combine/4,adler32_combine/4, + compress/1,uncompress/1,zip/1,unzip/1, + gzip/1,gunzip/1]). + +-export([on_load/0]). + +%% These are soft-deprecated until OTP 21. +% -deprecated([inflateChunk/1, inflateChunk/2, +% getBufSize/1, setBufSize/2, +% crc32/1,crc32/2,crc32/3,adler32/2,adler32/3, +% crc32_combine/4,adler32_combine/4]). + +-export_type([zstream/0, zflush/0, zlevel/0, zwindowbits/0, zmemlevel/0, + zstrategy/0]). %% flush argument encoding -define(Z_NO_FLUSH, 0). @@ -56,116 +66,76 @@ %% deflate compression method -define(Z_DEFLATED, 8). --define(Z_NULL, 0). - -define(MAX_WBITS, 15). -%% gzip defs (rfc 1952) - --define(ID1, 16#1f). --define(ID2, 16#8b). - --define(FTEXT, 16#01). --define(FHCRC, 16#02). --define(FEXTRA, 16#04). --define(FNAME, 16#08). --define(FCOMMENT, 16#10). --define(RESERVED, 16#E0). - --define(OS_MDDOS, 0). --define(OS_AMIGA, 1). --define(OS_OPENVMS, 2). --define(OS_UNIX, 3). --define(OS_VMCMS, 4). --define(OS_ATARI, 5). --define(OS_OS2, 6). --define(OS_MAC, 7). --define(OS_ZSYS, 8). --define(OS_CPM, 9). --define(OS_TOP20, 10). --define(OS_NTFS, 11). --define(OS_QDOS, 12). --define(OS_ACORN, 13). --define(OS_UNKNOWN,255). - --define(DEFLATE_INIT, 1). --define(DEFLATE_INIT2, 2). --define(DEFLATE_SETDICT, 3). --define(DEFLATE_RESET, 4). --define(DEFLATE_END, 5). --define(DEFLATE_PARAMS, 6). --define(DEFLATE, 7). - --define(INFLATE_INIT, 8). --define(INFLATE_INIT2, 9). --define(INFLATE_SETDICT, 10). --define(INFLATE_GETDICT, 11). --define(INFLATE_SYNC, 12). --define(INFLATE_RESET, 13). --define(INFLATE_END, 14). --define(INFLATE, 15). --define(INFLATE_CHUNK, 26). - --define(CRC32_0, 16). --define(CRC32_1, 17). --define(CRC32_2, 18). - --define(SET_BUFSZ, 19). --define(GET_BUFSZ, 20). --define(GET_QSIZE, 21). - --define(ADLER32_1, 22). --define(ADLER32_2, 23). - --define(CRC32_COMBINE, 24). --define(ADLER32_COMBINE, 25). +%% Chunk sizes are hardcoded on account of them screwing with the +%% predictability of the system. zlib is incapable of trapping so we need to +%% ensure that it never operates on any significant amount of data. +-define(DEFLATE_IN_CHUNKSIZE, 8 bsl 10). +-define(DEFLATE_OUT_CHUNKSIZE, 8 bsl 10). +-define(INFLATE_IN_CHUNKSIZE, 8 bsl 10). +-define(INFLATE_OUT_CHUNKSIZE, 16 bsl 10). %%------------------------------------------------------------------------ -%% Main data types of the file --type zstream() :: port(). +%% Public data types. +-type zstream() :: term(). +-type zflush() :: 'none' | 'sync' | 'full' | 'finish'. -%% Auxiliary data types of the file --type zlevel() :: 'none' | 'default' | 'best_compression' | 'best_speed' - | 0..9. --type zmethod() :: 'deflated'. +-type zlevel() :: + 'none' | 'default' | 'best_compression' | 'best_speed' | 0..9. +-type zstrategy() :: 'default' | 'filtered' | 'huffman_only' | 'rle'. + +-type zmemlevel() :: 1..9. -type zwindowbits() :: -15..-8 | 8..47. --type zmemlevel() :: 1..9. --type zstrategy() :: 'default' | 'filtered' | 'huffman_only' | 'rle'. + +%% Private data types. + +-type zmethod() :: 'deflated'. + +-record(zlib_opts, { + stream :: zstream(), + method :: term(), + input_chunk_size :: integer(), + output_chunk_size :: integer(), + flush :: integer() + }). %%------------------------------------------------------------------------ -%% open a z_stream +on_load() -> + case erlang:load_nif(atom_to_list(?MODULE), 0) of + ok -> ok + end. + -spec open() -> zstream(). open() -> - open_port({spawn, "zlib_drv"}, [binary]). + open_nif(). +open_nif() -> + erlang:nif_error(undef). -%% close and release z_stream -spec close(Z) -> 'ok' when Z :: zstream(). close(Z) -> - try - true = port_close(Z), - receive %In case the caller is the owner and traps exits - {'EXIT',Z,_} -> ok - after 0 -> ok - end - catch _:_ -> erlang:error(badarg) - end. + close_nif(Z). +close_nif(_Z) -> + erlang:nif_error(undef). -spec deflateInit(Z) -> 'ok' when Z :: zstream(). deflateInit(Z) -> - call(Z, ?DEFLATE_INIT, <<?Z_DEFAULT_COMPRESSION:32>>). + deflateInit(Z, default). -spec deflateInit(Z, Level) -> 'ok' when Z :: zstream(), Level :: zlevel(). deflateInit(Z, Level) -> - call(Z, ?DEFLATE_INIT, <<(arg_level(Level)):32>>). + deflateInit_nif(Z, arg_level(Level)). + +deflateInit_nif(_Z, _Level) -> + erlang:nif_error(undef). --spec deflateInit(Z, Level, Method, - WindowBits, MemLevel, Strategy) -> 'ok' when +-spec deflateInit(Z, Level, Method, WindowBits, MemLevel, Strategy) -> 'ok' when Z :: zstream(), Level :: zlevel(), Method :: zmethod(), @@ -173,31 +143,49 @@ deflateInit(Z, Level) -> MemLevel :: zmemlevel(), Strategy :: zstrategy(). deflateInit(Z, Level, Method, WindowBits, MemLevel, Strategy) -> - call(Z, ?DEFLATE_INIT2, <<(arg_level(Level)):32, - (arg_method(Method)):32, - (arg_bitsz(WindowBits)):32, - (arg_mem(MemLevel)):32, - (arg_strategy(Strategy)):32>>). + deflateInit_nif(Z, + arg_level(Level), + arg_method(Method), + arg_bitsz(WindowBits), + arg_mem(MemLevel), + arg_strategy(Strategy)). +deflateInit_nif(_Z, _Level, _Method, _WindowBits, _MemLevel, _Strategy) -> + erlang:nif_error(undef). -spec deflateSetDictionary(Z, Dictionary) -> Adler32 when Z :: zstream(), Dictionary :: iodata(), Adler32 :: integer(). deflateSetDictionary(Z, Dictionary) -> - call(Z, ?DEFLATE_SETDICT, Dictionary). + deflateSetDictionary_nif(Z, Dictionary). +deflateSetDictionary_nif(_Z, _Dictionary) -> + erlang:nif_error(undef). -spec deflateReset(Z) -> 'ok' when Z :: zstream(). deflateReset(Z) -> - call(Z, ?DEFLATE_RESET, []). + deflateReset_nif(Z). +deflateReset_nif(_Z) -> + erlang:nif_error(undef). -spec deflateParams(Z, Level, Strategy) -> ok when Z :: zstream(), Level :: zlevel(), Strategy :: zstrategy(). -deflateParams(Z, Level, Strategy) -> - call(Z, ?DEFLATE_PARAMS, <<(arg_level(Level)):32, - (arg_strategy(Strategy)):32>>). +deflateParams(Z, Level0, Strategy0) -> + Level = arg_level(Level0), + Strategy = arg_strategy(Strategy0), + case deflateParams_nif(Z, Level, Strategy) of + buf_error -> + %% We had data left in the pipe; flush everything and stash it away + %% for the next deflate call before trying again. + Output = deflate(Z, <<>>, full), + save_progress(Z, deflate, Output), + deflateParams_nif(Z, Level, Strategy); + Any -> Any + end. +deflateParams_nif(_Z, _Level, _Strategy) -> + erlang:nif_error(undef). -spec deflate(Z, Data) -> Compressed when Z :: zstream(), @@ -209,170 +197,234 @@ deflate(Z, Data) -> -spec deflate(Z, Data, Flush) -> Compressed when Z :: zstream(), Data :: iodata(), - Flush :: none | sync | full | finish, + Flush :: zflush(), Compressed :: iolist(). deflate(Z, Data, Flush) -> - try port_command(Z, Data) of - true -> - _ = call(Z, ?DEFLATE, <<(arg_flush(Flush)):32>>), - collect(Z) - catch - error:_Err -> - flush(Z), - erlang:error(badarg) - end. + Progress = restore_progress(Z, deflate), + enqueue_input(Z, Data), + append_iolist(Progress, dequeue_all_chunks(Z, deflate_opts(Flush))). + +deflate_opts(Flush) -> + #zlib_opts{ + method = fun deflate_nif/4, + input_chunk_size = ?DEFLATE_IN_CHUNKSIZE, + output_chunk_size = ?DEFLATE_OUT_CHUNKSIZE, + flush = arg_flush(Flush) + }. + +deflate_nif(_Z, _InputChSize, _OutputChSize, _Flush) -> + erlang:nif_error(undef). -spec deflateEnd(Z) -> 'ok' when Z :: zstream(). deflateEnd(Z) -> - call(Z, ?DEFLATE_END, []). + deflateEnd_nif(Z). +deflateEnd_nif(_Z) -> + erlang:nif_error(undef). -spec inflateInit(Z) -> 'ok' when Z :: zstream(). inflateInit(Z) -> - call(Z, ?INFLATE_INIT, []). + inflateInit_nif(Z). +inflateInit_nif(_Z) -> + erlang:nif_error(undef). -spec inflateInit(Z, WindowBits) -> 'ok' when Z :: zstream(), WindowBits :: zwindowbits(). -inflateInit(Z, WindowBits) -> - call(Z, ?INFLATE_INIT2, <<(arg_bitsz(WindowBits)):32>>). +inflateInit(Z, WindowBits) -> + inflateInit_nif(Z, WindowBits). +inflateInit_nif(_Z, _WindowBits) -> + erlang:nif_error(undef). -spec inflateSetDictionary(Z, Dictionary) -> 'ok' when Z :: zstream(), Dictionary :: iodata(). -inflateSetDictionary(Z, Dictionary) -> - call(Z, ?INFLATE_SETDICT, Dictionary). +inflateSetDictionary(Z, Dictionary) -> + inflateSetDictionary_nif(Z, Dictionary). +inflateSetDictionary_nif(_Z, _Dictionary) -> + erlang:nif_error(undef). -spec inflateGetDictionary(Z) -> Dictionary when Z :: zstream(), - Dictionary :: iolist(). + Dictionary :: binary(). inflateGetDictionary(Z) -> - _ = call(Z, ?INFLATE_GETDICT, []), - collect(Z). - --spec inflateSync(zstream()) -> 'ok'. -inflateSync(Z) -> - call(Z, ?INFLATE_SYNC, []). + case inflateGetDictionary_nif(Z) of + Dictionary when is_binary(Dictionary) -> + Dictionary; + not_supported -> + erlang:error(enotsup) + end. +inflateGetDictionary_nif(_Z) -> + erlang:nif_error(undef). -spec inflateReset(Z) -> 'ok' when Z :: zstream(). -inflateReset(Z) -> - call(Z, ?INFLATE_RESET, []). +inflateReset(Z) -> + inflateReset_nif(Z). +inflateReset_nif(_Z) -> + erlang:nif_error(undef). -spec inflate(Z, Data) -> Decompressed when Z :: zstream(), Data :: iodata(), Decompressed :: iolist(). inflate(Z, Data) -> - try port_command(Z, Data) of - true -> - _ = call(Z, ?INFLATE, <<?Z_NO_FLUSH:32>>), - collect(Z) - catch - error:_Err -> - flush(Z), - erlang:error(badarg) + inflate(Z, Data, []). + +-spec inflate(Z, Data, Options) -> Decompressed when + Z :: zstream(), + Data :: iodata(), + Options :: list({exception_on_need_dict, boolean()}), + Decompressed :: iolist() | + {need_dictionary, + Adler32 :: integer(), + Output :: iolist()}. +inflate(Z, Data, Options) -> + enqueue_input(Z, Data), + Result = dequeue_all_chunks(Z, inflate_opts()), + case proplist_get_value(Options, exception_on_need_dict, true) of + true -> exception_on_need_dict(Z, Result); + false -> Result end. +inflate_nif(_Z, _InputChSize, _OutputChSize, _Flush) -> + erlang:nif_error(undef). + +inflate_opts() -> + #zlib_opts{ + method = fun inflate_nif/4, + input_chunk_size = ?INFLATE_IN_CHUNKSIZE, + output_chunk_size = ?INFLATE_OUT_CHUNKSIZE, + flush = arg_flush(none) + }. + -spec inflateChunk(Z, Data) -> Decompressed | {more, Decompressed} when Z :: zstream(), Data :: iodata(), Decompressed :: iolist(). inflateChunk(Z, Data) -> - try port_command(Z, Data) of - true -> - inflateChunk(Z) - catch - error:_Err -> - flush(Z), - erlang:error(badarg) - end. + enqueue_input(Z, Data), + inflateChunk(Z). -spec inflateChunk(Z) -> Decompressed | {more, Decompressed} when Z :: zstream(), Decompressed :: iolist(). inflateChunk(Z) -> - Status = call(Z, ?INFLATE_CHUNK, []), - Data = receive - {Z, {data, Bin}} -> - Bin - after 0 -> - [] - end, - - case Status of - Good when (Good == ok) orelse (Good == stream_end) -> - Data; - inflate_has_more -> - {more, Data} - end. + Opts0 = inflate_opts(), + Opts = Opts0#zlib_opts { output_chunk_size = getBufSize(Z) }, + + Result0 = dequeue_next_chunk(Z, Opts), + Result1 = exception_on_need_dict(Z, Result0), + yield_inflateChunk(Z, Result1). + +yield_inflateChunk(_Z, {continue, Output}) -> + {more, lists:flatten(Output)}; +yield_inflateChunk(_Z, {finished, Output}) -> + lists:flatten(Output). + +exception_on_need_dict(Z, {need_dictionary, Adler, Output}) -> + Progress = restore_progress(Z, inflate), + save_progress(Z, inflate, append_iolist(Progress, Output)), + erlang:error({need_dictionary, Adler}); +exception_on_need_dict(Z, {Mark, Output}) -> + Progress = restore_progress(Z, inflate), + {Mark, append_iolist(Progress, Output)}; +exception_on_need_dict(Z, Output) when is_list(Output); is_binary(Output) -> + Progress = restore_progress(Z, inflate), + append_iolist(Progress, Output). + +-spec safeInflate(Z, Data) -> Result when + Z :: zstream(), + Data :: iodata(), + Result :: {continue, Output :: iolist()} | + {finished, Output :: iolist()} | + {need_dictionary, + Adler32 :: integer(), + Output :: iolist()}. +safeInflate(Z, Data) -> + enqueue_input(Z, Data), + dequeue_next_chunk(Z, inflate_opts()). -spec inflateEnd(Z) -> 'ok' when Z :: zstream(). inflateEnd(Z) -> - call(Z, ?INFLATE_END, []). + inflateEnd_nif(Z). +inflateEnd_nif(_Z) -> + erlang:nif_error(undef). -spec setBufSize(Z, Size) -> 'ok' when Z :: zstream(), Size :: non_neg_integer(). -setBufSize(Z, Size) -> - call(Z, ?SET_BUFSZ, <<Size:32>>). +setBufSize(Z, Size) when is_integer(Size), Size > 16, Size < (1 bsl 24) -> + setBufSize_nif(Z, Size); +setBufSize(_Z, _Size) -> + erlang:error(badarg). +setBufSize_nif(_Z, _Size) -> + erlang:nif_error(undef). --spec getBufSize(Z) -> Size when - Z :: zstream(), - Size :: non_neg_integer(). +-spec getBufSize(Z) -> non_neg_integer() when + Z :: zstream(). getBufSize(Z) -> - call(Z, ?GET_BUFSZ, []). + getBufSize_nif(Z). +getBufSize_nif(_Z) -> + erlang:nif_error(undef). -spec crc32(Z) -> CRC when Z :: zstream(), CRC :: integer(). crc32(Z) -> - call(Z, ?CRC32_0, []). + crc32_nif(Z). +crc32_nif(_Z) -> + erlang:nif_error(undef). -spec crc32(Z, Data) -> CRC when Z :: zstream(), Data :: iodata(), CRC :: integer(). -crc32(Z, Data) -> - call(Z, ?CRC32_1, Data). +crc32(Z, Data) when is_reference(Z) -> + erlang:crc32(Data); +crc32(_Z, _Data) -> + erlang:error(badarg). -spec crc32(Z, PrevCRC, Data) -> CRC when Z :: zstream(), PrevCRC :: integer(), Data :: iodata(), CRC :: integer(). -crc32(Z, CRC, Data) -> - call(Z, ?CRC32_2, [<<CRC:32>>, Data]). +crc32(Z, CRC, Data) when is_reference(Z) -> + erlang:crc32(CRC, Data); +crc32(_Z, _CRC, _Data) -> + erlang:error(badarg). + +-spec crc32_combine(Z, CRC1, CRC2, Size2) -> CRC when + Z :: zstream(), + CRC :: integer(), + CRC1 :: integer(), + CRC2 :: integer(), + Size2 :: integer(). +crc32_combine(Z, CRC1, CRC2, Size2) when is_reference(Z) -> + erlang:crc32_combine(CRC1, CRC2, Size2); +crc32_combine(_Z, _CRC1, _CRC2, _Size2) -> + erlang:error(badarg). -spec adler32(Z, Data) -> CheckSum when Z :: zstream(), Data :: iodata(), CheckSum :: integer(). -adler32(Z, Data) -> - call(Z, ?ADLER32_1, Data). +adler32(Z, Data) when is_reference(Z) -> + erlang:adler32(Data); +adler32(_Z, _Data) -> + erlang:error(badarg). -spec adler32(Z, PrevAdler, Data) -> CheckSum when Z :: zstream(), PrevAdler :: integer(), Data :: iodata(), CheckSum :: integer(). -adler32(Z, Adler, Data) when is_integer(Adler) -> - call(Z, ?ADLER32_2, [<<Adler:32>>, Data]); -adler32(_Z, _Adler, _Data) -> - erlang:error(badarg). - --spec crc32_combine(Z, CRC1, CRC2, Size2) -> CRC when - Z :: zstream(), - CRC :: integer(), - CRC1 :: integer(), - CRC2 :: integer(), - Size2 :: integer(). -crc32_combine(Z, CRC1, CRC2, Len2) - when is_integer(CRC1), is_integer(CRC2), is_integer(Len2) -> - call(Z, ?CRC32_COMBINE, <<CRC1:32, CRC2:32, Len2:32>>); -crc32_combine(_Z, _CRC1, _CRC2, _Len2) -> +adler32(Z, Adler, Data) when is_reference(Z) -> + erlang:adler32(Adler, Data); +adler32(_Z, _Adler, _Data) -> erlang:error(badarg). -spec adler32_combine(Z, Adler1, Adler2, Size2) -> Adler when @@ -381,16 +433,11 @@ crc32_combine(_Z, _CRC1, _CRC2, _Len2) -> Adler1 :: integer(), Adler2 :: integer(), Size2 :: integer(). -adler32_combine(Z, Adler1, Adler2, Len2) - when is_integer(Adler1), is_integer(Adler2), is_integer(Len2) -> - call(Z, ?ADLER32_COMBINE, <<Adler1:32, Adler2:32, Len2:32>>); -adler32_combine(_Z, _Adler1, _Adler2, _Len2) -> +adler32_combine(Z, Adler1, Adler2, Size2) when is_reference(Z) -> + erlang:adler32_combine(Adler1, Adler2, Size2); +adler32_combine(_Z, _Adler1, _Adler2, _Size2) -> erlang:error(badarg). --spec getQSize(zstream()) -> non_neg_integer(). -getQSize(Z) -> - call(Z, ?GET_QSIZE, []). - %% compress/uncompress zlib with header -spec compress(Data) -> Compressed when Data :: iodata(), @@ -398,13 +445,13 @@ getQSize(Z) -> compress(Data) -> Z = open(), Bs = try - deflateInit(Z, default), - B = deflate(Z, Data, finish), - deflateEnd(Z), - B - after - close(Z) - end, + deflateInit(Z, default), + B = deflate(Z, Data, finish), + deflateEnd(Z), + B + after + close(Z) + end, iolist_to_binary(Bs). -spec uncompress(Data) -> Decompressed when @@ -416,14 +463,14 @@ uncompress(Data) -> if Size >= 8 -> Z = open(), - Bs = try - inflateInit(Z), - B = inflate(Z, Data), - inflateEnd(Z), - B - after - close(Z) - end, + Bs = try + inflateInit(Z), + B = inflate(Z, Data), + inflateEnd(Z), + B + after + close(Z) + end, iolist_to_binary(Bs); true -> erlang:error(data_error) @@ -440,13 +487,13 @@ uncompress(Data) -> zip(Data) -> Z = open(), Bs = try - deflateInit(Z, default, deflated, -?MAX_WBITS, 8, default), - B = deflate(Z, Data, finish), - deflateEnd(Z), - B - after - close(Z) - end, + deflateInit(Z, default, deflated, -?MAX_WBITS, 8, default), + B = deflate(Z, Data, finish), + deflateEnd(Z), + B + after + close(Z) + end, iolist_to_binary(Bs). -spec unzip(Data) -> Decompressed when @@ -455,28 +502,28 @@ zip(Data) -> unzip(Data) -> Z = open(), Bs = try - inflateInit(Z, -?MAX_WBITS), - B = inflate(Z, Data), - inflateEnd(Z), - B - after - close(Z) - end, + inflateInit(Z, -?MAX_WBITS), + B = inflate(Z, Data), + inflateEnd(Z), + B + after + close(Z) + end, iolist_to_binary(Bs). - + -spec gzip(Data) -> Compressed when Data :: iodata(), Compressed :: binary(). gzip(Data) -> Z = open(), Bs = try - deflateInit(Z, default, deflated, 16+?MAX_WBITS, 8, default), - B = deflate(Z, Data, finish), - deflateEnd(Z), - B - after - close(Z) - end, + deflateInit(Z, default, deflated, 16+?MAX_WBITS, 8, default), + B = deflate(Z, Data, finish), + deflateEnd(Z), + B + after + close(Z) + end, iolist_to_binary(Bs). -spec gunzip(Data) -> Decompressed when @@ -485,92 +532,150 @@ gzip(Data) -> gunzip(Data) -> Z = open(), Bs = try - inflateInit(Z, 16+?MAX_WBITS), - B = inflate(Z, Data), - inflateEnd(Z), - B - after - close(Z) - end, + inflateInit(Z, 16+?MAX_WBITS), + B = inflate(Z, Data), + inflateEnd(Z), + B + after + close(Z) + end, iolist_to_binary(Bs). --spec collect(zstream()) -> iolist(). -collect(Z) -> - collect(Z, []). - --spec collect(zstream(), iolist()) -> iolist(). -collect(Z, Acc) -> - receive - {Z, {data, Bin}} -> - collect(Z, [Bin|Acc]) - after 0 -> - reverse(Acc) +-spec dequeue_all_chunks(Z, Opts) -> Result when + Z :: zstream(), + Opts :: #zlib_opts{}, + Result :: {need_dictionary, integer(), iolist()} | + iolist(). +dequeue_all_chunks(Z, Opts) -> + dequeue_all_chunks_1(Z, Opts, []). +dequeue_all_chunks_1(Z, Opts, Output) -> + case dequeue_next_chunk(Z, Opts) of + {need_dictionary, _, _} = NeedDict -> + NeedDict; + {continue, Chunk} -> + dequeue_all_chunks_1(Z, Opts, append_iolist(Output, Chunk)); + {finished, Chunk} -> + append_iolist(Output, Chunk) end. --spec flush(zstream()) -> 'ok'. -flush(Z) -> - receive - {Z, {data,_}} -> - flush(Z) - after 0 -> - ok +-spec dequeue_next_chunk(Z, Opts) -> Result when + Z :: zstream(), + Opts :: #zlib_opts{}, + Result :: {need_dictionary, integer(), iolist()} | + {continue, iolist()} | + {finished, iolist()}. +dequeue_next_chunk(Z, Opts) -> + Method = Opts#zlib_opts.method, + IChSz = Opts#zlib_opts.input_chunk_size, + OChSz = Opts#zlib_opts.output_chunk_size, + Flush = Opts#zlib_opts.flush, + Method(Z, IChSz, OChSz, Flush). + +-spec append_iolist(IO, D) -> iolist() when + IO :: iodata(), + D :: iodata(). +append_iolist([], D) when is_list(D) -> D; +append_iolist([], D) -> [D]; +append_iolist(IO, []) -> IO; +append_iolist(IO, [D]) -> [IO, D]; +append_iolist(IO, D) -> [IO, D]. + +%% inflate/2 and friends are documented as throwing an error on Z_NEED_DICT +%% rather than simply returning something to that effect, and deflateParams/3 +%% may flush behind the scenes. This requires us to stow away our current +%% progress in the handle and resume from that point on our next call. +%% +%% Generally speaking this is either a refc binary or nothing at all, so it's +%% pretty cheap. + +-spec save_progress(Z, Kind, Output) -> ok when + Z :: zstream(), + Kind :: inflate | deflate, + Output :: iolist(). +save_progress(Z, Kind, Output) -> + ok = setStash_nif(Z, {Kind, Output}). + +-spec restore_progress(Z, Kind) -> iolist() when + Z :: zstream(), + Kind :: inflate | deflate. +restore_progress(Z, Kind) -> + case getStash_nif(Z) of + {ok, {Kind, Output}} -> + ok = clearStash_nif(Z), + Output; + empty -> + [] end. - -arg_flush(none) -> ?Z_NO_FLUSH; + +-spec clearStash_nif(Z) -> ok when + Z :: zstream(). +clearStash_nif(_Z) -> + erlang:nif_error(undef). + +-spec setStash_nif(Z, Term) -> ok when + Z :: zstream(), + Term :: term(). +setStash_nif(_Z, _Term) -> + erlang:nif_error(undef). + +-spec getStash_nif(Z) -> {ok, term()} | empty when + Z :: zstream(). +getStash_nif(_Z) -> + erlang:nif_error(undef). + +%% The 'proplists' module isn't preloaded so we can't rely on its existence. +proplist_get_value([], _Name, DefVal) -> DefVal; +proplist_get_value([{Name, Value} | _Opts], Name, _DefVal) -> Value; +proplist_get_value([_Head | Opts], Name, DefVal) -> + proplist_get_value(Opts, Name, DefVal). + +arg_flush(none) -> ?Z_NO_FLUSH; %% ?Z_PARTIAL_FLUSH is deprecated in zlib -- deliberately not included. -arg_flush(sync) -> ?Z_SYNC_FLUSH; -arg_flush(full) -> ?Z_FULL_FLUSH; -arg_flush(finish) -> ?Z_FINISH; -arg_flush(_) -> erlang:error(badarg). +arg_flush(sync) -> ?Z_SYNC_FLUSH; +arg_flush(full) -> ?Z_FULL_FLUSH; +arg_flush(finish) -> ?Z_FINISH; +arg_flush(_) -> erlang:error(bad_flush_mode). arg_level(none) -> ?Z_NO_COMPRESSION; arg_level(best_speed) -> ?Z_BEST_SPEED; arg_level(best_compression) -> ?Z_BEST_COMPRESSION; arg_level(default) -> ?Z_DEFAULT_COMPRESSION; arg_level(Level) when is_integer(Level), Level >= 0, Level =< 9 -> Level; -arg_level(_) -> erlang:error(badarg). - +arg_level(_) -> erlang:error(bad_compression_level). + arg_strategy(filtered) -> ?Z_FILTERED; arg_strategy(huffman_only) -> ?Z_HUFFMAN_ONLY; arg_strategy(rle) -> ?Z_RLE; arg_strategy(default) -> ?Z_DEFAULT_STRATEGY; -arg_strategy(_) -> erlang:error(badarg). +arg_strategy(_) -> erlang:error(bad_compression_strategy). arg_method(deflated) -> ?Z_DEFLATED; -arg_method(_) -> erlang:error(badarg). +arg_method(_) -> erlang:error(bad_compression_method). -spec arg_bitsz(zwindowbits()) -> zwindowbits(). arg_bitsz(Bits) when is_integer(Bits) andalso - ((8 =< Bits andalso Bits < 48) orelse - (-15 =< Bits andalso Bits =< -8)) -> + ((8 =< Bits andalso Bits < 48) orelse + (-15 =< Bits andalso Bits =< -8)) -> Bits; -arg_bitsz(_) -> erlang:error(badarg). +arg_bitsz(_) -> erlang:error(bad_windowbits). -spec arg_mem(zmemlevel()) -> zmemlevel(). arg_mem(Level) when is_integer(Level), 1 =< Level, Level =< 9 -> Level; -arg_mem(_) -> erlang:error(badarg). - -call(Z, Cmd, Arg) -> - try port_control(Z, Cmd, Arg) of - [0|Res] -> list_to_atom(Res); - [1|Res] -> - flush(Z), - erlang:error(list_to_atom(Res)); - [2,A,B,C,D] -> - (A bsl 24)+(B bsl 16)+(C bsl 8)+D; - [3,A,B,C,D] -> - erlang:error({need_dictionary,(A bsl 24)+(B bsl 16)+(C bsl 8)+D}); - [4, _, _, _, _] -> - inflate_has_more - catch - error:badarg -> %% Rethrow loses port_control from stacktrace. - erlang:error(badarg) - end. +arg_mem(_) -> erlang:error(bad_memlevel). -reverse(X) -> - reverse(X, []). +-spec enqueue_input(Z, IOData) -> ok when + Z :: zstream(), + IOData :: iodata(). +enqueue_input(Z, IOData) -> + enqueue_input_1(Z, erlang:iolist_to_iovec(IOData)). + +enqueue_input_1(_Z, []) -> + ok; +enqueue_input_1(Z, IOVec) -> + case enqueue_nif(Z, IOVec) of + {continue, Remainder} -> enqueue_input_1(Z, Remainder); + ok -> ok + end. -reverse([H|T], Y) -> - reverse(T, [H|Y]); -reverse([], X) -> - X. +enqueue_nif(_Z, _IOVec) -> + erlang:nif_error(undef).
\ No newline at end of file |