aboutsummaryrefslogtreecommitdiffstats
path: root/erts/preloaded
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-04-23 10:40:28 +0200
committerMicael Karlberg <[email protected]>2018-09-18 13:01:37 +0200
commit04335ca6aedfc5ad9f0d6a8d193dfd76a222291c (patch)
treecef0cfbfb688200f73f5690e545c23fbe1c319d1 /erts/preloaded
parentd5aecb115070de76cb42b44edee6bbcb5f4a3724 (diff)
downloadotp-04335ca6aedfc5ad9f0d6a8d193dfd76a222291c.tar.gz
otp-04335ca6aedfc5ad9f0d6a8d193dfd76a222291c.tar.bz2
otp-04335ca6aedfc5ad9f0d6a8d193dfd76a222291c.zip
[socket-nif] Completed the recv and recvfrom functions
Also updated the socket type (now a record for easy use).
Diffstat (limited to 'erts/preloaded')
-rw-r--r--erts/preloaded/src/socket.erl172
1 files changed, 115 insertions, 57 deletions
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 0dadcecaa0..bae561cd51 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -39,7 +39,7 @@
%% writev/4, OR SENDV? It will be strange for recv then: recvv (instead of readv)
recv/2, recv/3, recv/4,
- recvfrom/1, recvfrom/2,
+ recvfrom/1, recvfrom/2, recvfrom/3, recvfrom/4,
%% recvmsg/4,
%% readv/3,
@@ -106,10 +106,10 @@
-type port_number() :: 0..65535.
-type socket_info() :: map().
-%% -record(socket, {info :: socket_info,
-%% ref :: reference()}).
--opaque socket() :: {socket, socket_info(), reference()}.
-%% -opaque socket() :: #socket{}.
+-record(socket, {info :: socket_info(),
+ ref :: reference()}).
+%% -opaque socket() :: {socket, socket_info(), reference()}.
+-opaque socket() :: #socket{}.
-type accept_flags() :: [accept_flag()].
-type accept_flag() :: nonblock | cloexec.
@@ -117,20 +117,26 @@
-type send_flags() :: [send_flag()].
-type send_flag() :: confirm |
dontroute |
- dontwait |
eor |
more |
nosignal |
oob.
+%% Extend with OWN flags for other usage:
+%% - adapt-buffer-sz:
+%% This will have the effect that the nif recvfrom will use
+%% MSG_PEEK to ensure no part of the message is lost, but if
+%% necessary adapt (increase) the buffer size until all of
+%% it fits.
+%%
+%% Note that not all of these flags is useful for every recv function!
+%%
-type recv_flags() :: [recv_flag()].
-type recv_flag() :: cmsg_cloexec |
- dontwait |
errqueue |
oob |
peek |
- trunc |
- waitall.
+ trunc.
-type setopt_key() :: foo.
-type getopt_key() :: foo.
@@ -177,23 +183,20 @@
-define(SOCKET_SEND_FLAG_CONFIRM, 0).
-define(SOCKET_SEND_FLAG_DONTROUTE, 1).
--define(SOCKET_SEND_FLAG_DONTWAIT, 2).
--define(SOCKET_SEND_FLAG_EOR, 3).
--define(SOCKET_SEND_FLAG_MORE, 4).
--define(SOCKET_SEND_FLAG_NOSIGNAL, 5).
--define(SOCKET_SEND_FLAG_OOB, 6).
+-define(SOCKET_SEND_FLAG_EOR, 2).
+-define(SOCKET_SEND_FLAG_MORE, 3).
+-define(SOCKET_SEND_FLAG_NOSIGNAL, 4).
+-define(SOCKET_SEND_FLAG_OOB, 5).
-define(SOCKET_SEND_FLAGS_DEFAULT, []).
-define(SOCKET_SEND_TIMEOUT_DEFAULT, infinity).
-define(SOCKET_SENDTO_TIMEOUT_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT).
-define(SOCKET_RECV_FLAG_CMSG_CLOEXEC, 0).
--define(SOCKET_RECV_FLAG_DONTWAIT, 1).
--define(SOCKET_RECV_FLAG_ERRQUEUE, 2).
--define(SOCKET_RECV_FLAG_OOB, 3).
--define(SOCKET_RECV_FLAG_PEEK, 4).
--define(SOCKET_RECV_FLAG_TRUNC, 5).
--define(SOCKET_RECV_FLAG_WAITALL, 6).
+-define(SOCKET_RECV_FLAG_ERRQUEUE, 1).
+-define(SOCKET_RECV_FLAG_OOB, 2).
+-define(SOCKET_RECV_FLAG_PEEK, 3).
+-define(SOCKET_RECV_FLAG_TRUNC, 4).
-define(SOCKET_RECV_FLAGS_DEFAULT, []).
-define(SOCKET_RECV_TIMEOUT_DEFAULT, infinity).
@@ -287,7 +290,8 @@ open(Domain, Type, Protocol0, Extra) when is_map(Extra) ->
SocketInfo = #{domain => Domain,
type => Type,
protocol => Protocol},
- Socket = {socket, SocketInfo, SockRef},
+ Socket = #socket{info = SocketInfo,
+ ref = SockRef},
{ok, Socket};
{error, _} = ERROR ->
ERROR
@@ -352,7 +356,7 @@ bind(Socket, Addr) when is_tuple(Addr) orelse
Reason :: term().
%% Shall we keep info about domain so that we can verify address?
-bind({socket, _, SockRef}, Addr, Port)
+bind(#socket{ref = SockRef}, Addr, Port)
when (is_tuple(Addr) andalso
((size(Addr) =:= 4) orelse (size(Addr) =:= 8))) orelse
((Addr =:= any) orelse (Addr =:= loopback)) andalso
@@ -385,7 +389,7 @@ connect(Socket, Addr, Port) ->
connect(_Socket, _Addr, _Port, Timeout)
when (is_integer(Timeout) andalso (Timeout =< 0)) ->
{error, timeout};
-connect({socket, _, SockRef}, Addr, Port, Timeout)
+connect(#socket{ref = SockRef}, Addr, Port, Timeout)
when (is_tuple(Addr) andalso
((size(Addr) =:= 4) orelse (size(Addr) =:= 8))) andalso
(is_integer(Port) andalso (Port >= 0)) andalso
@@ -424,7 +428,7 @@ connect({socket, _, SockRef}, Addr, Port, Timeout)
listen(Socket) ->
listen(Socket, ?SOCKET_LISTEN_BACKLOG_DEFAULT).
-listen({socket, _, SockRef}, Backlog)
+listen(#socket{ref = SockRef}, Backlog)
when (is_integer(Backlog) andalso (Backlog >= 0)) ->
nif_listen(SockRef, Backlog).
@@ -448,7 +452,7 @@ accept(Socket) ->
%% Do we really need this optimization?
accept(_, Timeout) when is_integer(Timeout) andalso (Timeout =< 0) ->
{error, timeout};
-accept({socket, SI, LSockRef}, Timeout)
+accept(#socket{info = SI, ref = LSockRef}, Timeout)
when is_integer(Timeout) orelse (Timeout =:= infinity) ->
do_accept(LSockRef, SI, Timeout).
@@ -460,7 +464,8 @@ do_accept(LSockRef, SI, Timeout) ->
SocketInfo = #{domain => maps:get(domain, SI),
type => maps:get(type, SI),
protocol => maps:get(protocol, SI)},
- Socket = {socket, SocketInfo, SockRef},
+ Socket = #socket{info = SocketInfo,
+ ref = SockRef},
{ok, Socket};
{error, eagain} ->
NewTimeout = next_timeout(TS, Timeout),
@@ -499,9 +504,10 @@ send(Socket, Data, Timeout) ->
send(Socket, Data, Flags, Timeout) when is_list(Data) ->
Bin = erlang:list_to_binary(Data),
send(Socket, Bin, Flags, Timeout);
-send(Socket, Data, Flags, Timeout) when is_binary(Data) andalso is_list(Flags) ->
+send(#socket{ref = SockRef}, Data, Flags, Timeout)
+ when is_binary(Data) andalso is_list(Flags) ->
EFlags = enc_send_flags(Flags),
- do_send(Socket, Data, EFlags, Timeout).
+ do_send(SockRef, Data, EFlags, Timeout).
do_send(SockRef, Data, EFlags, Timeout) ->
TS = timestamp(Timeout),
@@ -562,14 +568,14 @@ sendto(Socket, Data, Flags, DestAddr, DestPort) ->
sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) when is_list(Data) ->
Bin = erlang:list_to_binary(Data),
sendto(Socket, Bin, Flags, DestAddr, DestPort, Timeout);
-sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout)
+sendto(#socket{ref = SockRef}, Data, Flags, DestAddr, DestPort, Timeout)
when is_binary(Data) andalso
is_list(Flags) andalso
(is_tuple(DestAddr) orelse (DestAddr =:= null)) andalso
is_integer(DestPort) andalso
(is_integer(Timeout) orelse (Timeout =:= infinity)) ->
EFlags = enc_send_flags(Flags),
- do_sendto(Socket, Data, EFlags, DestAddr, DestPort, Timeout).
+ do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, Timeout).
do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, Timeout) ->
TS = timestamp(Timeout),
@@ -726,18 +732,18 @@ recv(Socket, Length, Timeout) ->
Data :: binary(),
Reason :: term().
-recv(Socket, Length, Flags, Timeout)
+recv(#socket{ref = SockRef}, Length, Flags, Timeout)
when (is_integer(Length) andalso (Length >= 0)) andalso
is_list(Flags) andalso
(is_integer(Timeout) orelse (Timeout =:= infinity)) ->
EFlags = enc_recv_flags(Flags),
- do_recv(Socket, undefined, Length, EFlags, <<>>, EFlags).
+ do_recv(SockRef, undefined, Length, EFlags, <<>>, Timeout).
%% We need to pass the "old recv ref" around because of the special case
%% with Length = 0. This case makes it neccessary to have a timeout function
%% clause since we may never wait for anything (no receive select), and so the
%% the only timeout check will be the function clause.
-do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout)
+do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
when (Timeout =:= infinity) orelse
(is_integer(Timeout) andalso (Timeout > 0)) ->
TS = timestamp(Timeout),
@@ -754,7 +760,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout)
%% > 0 - We got a part of the message and we will be notified
%% when there is more to read (a select message)
{ok, false = _Complete, Bin} when (Length =:= 0) ->
- do_recv(Socket, RecvRef,
+ do_recv(SockRef, RecvRef,
Length, EFlags,
<<Acc/binary, Bin/binary>>,
next_timeout(TS, Timeout));
@@ -766,7 +772,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout)
NewTimeout = next_timeout(TS, Timeout),
receive
{select, SockRef, RecvRef, ready_input} ->
- do_recv(Socket, RecvRef,
+ do_recv(SockRef, RecvRef,
Length-size(Bin), EFlags,
Bin,
next_timeout(TS, Timeout))
@@ -781,7 +787,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout)
NewTimeout = next_timeout(TS, Timeout),
receive
{select, SockRef, RecvRef, ready_input} ->
- do_recv(Socket, RecvRef,
+ do_recv(SockRef, RecvRef,
Length-size(Bin), EFlags,
<<Acc/binary, Bin/binary>>,
next_timeout(TS, Timeout))
@@ -801,7 +807,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout)
NewTimeout = next_timeout(TS, Timeout),
receive
{select, SockRef, RecvRef, ready_input} ->
- do_recv(Socket, RecvRef,
+ do_recv(SockRef, RecvRef,
Length, EFlags,
Acc,
next_timeout(TS, Timeout))
@@ -819,37 +825,92 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout)
end;
-do_recv({socket, _, SockRef} = _Socket, RecvRef,
- 0 = _Length, _Eflags, Acc, _Timeout) ->
+do_recv(SockRef, RecvRef, 0 = _Length, _Eflags, Acc, _Timeout) ->
%% The current recv operation is to be cancelled, so no need for a ref...
%% The cancel will end our 'read everything you have' and "activate"
%% any waiting reader.
nif_cancel(SockRef, recv, RecvRef),
{ok, Acc};
-do_recv(_Socket, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) ->
+do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) ->
{error, {timeout, Acc}};
-do_recv(_Socket, _RecvRef, _Length, _EFlags, _Acc, _Timeout) ->
+do_recv(_SockRef, _RecvRef, _Length, _EFlags, _Acc, _Timeout) ->
{error, timeout}.
%% ---------------------------------------------------------------------------
%%
+%% With recvfrom we get messages, which means that regardless of how
+%% much we want to read, we return when we get a message.
+%% The MaxSize argument basically defines the size of our receive
+%% buffer. By setting the size to zero (0), we use the configured
+%% size (see setopt).
+%% It may be impossible to know what (buffer) size is appropriate
+%% "in advance", and in those cases it may be convenient to use the
+%% (recv) 'peek' flag. When this flag is provided the message is *not*
+%% "consumed" from the underlying buffers, so another recvfrom call
+%% is needed, possibly with a then adjusted buffer size.
+%%
recvfrom(Socket) ->
- recvfrom(Socket, ?SOCKET_RECV_FLAGS_DEFAULT).
+ recvfrom(Socket, 0).
+
+recvfrom(Socket, BufSz) ->
+ recvfrom(Socket, BufSz,
+ ?SOCKET_RECV_FLAGS_DEFAULT,
+ ?SOCKET_RECV_TIMEOUT_DEFAULT).
+
+
+recvfrom(Socket, Flags, Timeout) when is_list(Flags) ->
+ recvfrom(Socket, 0, Flags, Timeout);
+recvfrom(Socket, BufSz, Flags) when is_list(Flags) ->
+ recvfrom(Socket, BufSz, Flags, ?SOCKET_RECV_TIMEOUT_DEFAULT);
+recvfrom(Socket, BufSz, Timeout) ->
+ recvfrom(Socket, BufSz, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout).
+
+-spec recvfrom(Socket, BufSz, Flags, Timeout) -> {ok, {SrcDomain, Source, Data}} | {error, Reason} when
+ Socket :: socket(),
+ BufSz :: non_neg_integer(),
+ Flags :: recv_flags(),
+ Timeout :: timeout(),
+ SrcDomain :: domain() | undefined,
+ Source :: {ip_address(), port_number()} | string() | undefined,
+ Data :: binary(),
+ Reason :: term().
+
+recvfrom(#socket{ref = SockRef}, BufSz, Flags, Timeout)
+ when (is_integer(BufSz) andalso (BufSz >= 0)) andalso
+ is_list(Flags) andalso
+ (is_integer(Timeout) orelse (Timeout =:= infinity)) ->
+ EFlags = enc_recv_flags(Flags),
+ do_recvfrom(SockRef, BufSz, EFlags, Timeout).
--spec recvfrom(Socket, Flags) -> {ok, Data, SrcAddr, SrcPort} | {error, Reason} when
- Socket :: socket(),
- Flags :: recv_flags(),
- Data :: binary(),
- SrcAddr :: ip_address(),
- SrcPort :: port_number(),
- Reason :: term().
+do_recvfrom(SockRef, BufSz, EFlags, Timeout) ->
+ TS = timestamp(Timeout),
+ RecvRef = make_ref(),
+ case nif_recvfrom(SockRef, RecvRef, BufSz, EFlags) of
+ {ok, {_Domain, _Source, _NewData}} = OK ->
+ OK;
+
+ {error, eagain} ->
+ %% There is nothing just now, but we will be notified when there
+ %% is something to read (a select message).
+ NewTimeout = next_timeout(TS, Timeout),
+ receive
+ {select, SockRef, RecvRef, ready_input} ->
+ do_recvfrom(SockRef, BufSz, EFlags,
+ next_timeout(TS, Timeout))
+ after NewTimeout ->
+ nif_cancel(SockRef, recvfrom, RecvRef),
+ flush_select_msgs(SockRef, RecvRef),
+ {error, timeout}
+ end;
+
+ {error, _} = ERROR ->
+ ERROR
+
+ end.
-recvfrom({socket, _, SockRef}, Flags) when is_list(Flags) ->
- EFlags = enc_recv_flags(Flags),
- nif_recvfrom(SockRef, EFlags).
%% ---------------------------------------------------------------------------
@@ -998,7 +1059,6 @@ enc_protocol(Type, Proto) -> throw({error, {invalid_protocol, {Type, Proto}}
enc_send_flags(Flags) ->
EFlags = [{confirm, ?SOCKET_SEND_FLAG_CONFIRM},
{dontroute, ?SOCKET_SEND_FLAG_DONTROUTE},
- {dontwait, ?SOCKET_SEND_FLAG_DONTWAIT},
{eor, ?SOCKET_SEND_FLAG_EOR},
{more, ?SOCKET_SEND_FLAG_MORE},
{nosignal, ?SOCKET_SEND_FLAG_NOSIGNAL},
@@ -1010,12 +1070,10 @@ enc_send_flags(Flags) ->
enc_recv_flags(Flags) ->
EFlags = [{cmsg_cloexec, ?SOCKET_RECV_FLAG_CMSG_CLOEXEC},
- {dontwait, ?SOCKET_RECV_FLAG_DONTWAIT},
{errqueue, ?SOCKET_RECV_FLAG_ERRQUEUE},
{oob, ?SOCKET_RECV_FLAG_OOB},
{peek, ?SOCKET_RECV_FLAG_PEEK},
- {trunc, ?SOCKET_RECV_FLAG_TRUNC},
- {waitall, ?SOCKET_RECV_FLAG_WAITALL}],
+ {trunc, ?SOCKET_RECV_FLAG_TRUNC}],
enc_flags(Flags, EFlags).
@@ -1159,7 +1217,7 @@ nif_sendto(_SRef, _SendRef, _Data, _Flags, _Dest, _Port) ->
nif_recv(_SRef, _RecvRef, _Length, _Flags) ->
erlang:error(badarg).
-nif_recvfrom(_SRef, _Flags) ->
+nif_recvfrom(_SRef, _RecvRef, _Length, _Flags) ->
erlang:error(badarg).
nif_cancel(_SRef, _Op, _Ref) ->