diff options
Diffstat (limited to 'erts/preloaded/src/socket.erl')
-rw-r--r-- | erts/preloaded/src/socket.erl | 172 |
1 files changed, 115 insertions, 57 deletions
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 0dadcecaa0..bae561cd51 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -39,7 +39,7 @@ %% writev/4, OR SENDV? It will be strange for recv then: recvv (instead of readv) recv/2, recv/3, recv/4, - recvfrom/1, recvfrom/2, + recvfrom/1, recvfrom/2, recvfrom/3, recvfrom/4, %% recvmsg/4, %% readv/3, @@ -106,10 +106,10 @@ -type port_number() :: 0..65535. -type socket_info() :: map(). -%% -record(socket, {info :: socket_info, -%% ref :: reference()}). --opaque socket() :: {socket, socket_info(), reference()}. -%% -opaque socket() :: #socket{}. +-record(socket, {info :: socket_info(), + ref :: reference()}). +%% -opaque socket() :: {socket, socket_info(), reference()}. +-opaque socket() :: #socket{}. -type accept_flags() :: [accept_flag()]. -type accept_flag() :: nonblock | cloexec. @@ -117,20 +117,26 @@ -type send_flags() :: [send_flag()]. -type send_flag() :: confirm | dontroute | - dontwait | eor | more | nosignal | oob. +%% Extend with OWN flags for other usage: +%% - adapt-buffer-sz: +%% This will have the effect that the nif recvfrom will use +%% MSG_PEEK to ensure no part of the message is lost, but if +%% necessary adapt (increase) the buffer size until all of +%% it fits. +%% +%% Note that not all of these flags is useful for every recv function! +%% -type recv_flags() :: [recv_flag()]. -type recv_flag() :: cmsg_cloexec | - dontwait | errqueue | oob | peek | - trunc | - waitall. + trunc. -type setopt_key() :: foo. -type getopt_key() :: foo. @@ -177,23 +183,20 @@ -define(SOCKET_SEND_FLAG_CONFIRM, 0). -define(SOCKET_SEND_FLAG_DONTROUTE, 1). --define(SOCKET_SEND_FLAG_DONTWAIT, 2). --define(SOCKET_SEND_FLAG_EOR, 3). --define(SOCKET_SEND_FLAG_MORE, 4). --define(SOCKET_SEND_FLAG_NOSIGNAL, 5). --define(SOCKET_SEND_FLAG_OOB, 6). +-define(SOCKET_SEND_FLAG_EOR, 2). +-define(SOCKET_SEND_FLAG_MORE, 3). +-define(SOCKET_SEND_FLAG_NOSIGNAL, 4). +-define(SOCKET_SEND_FLAG_OOB, 5). -define(SOCKET_SEND_FLAGS_DEFAULT, []). -define(SOCKET_SEND_TIMEOUT_DEFAULT, infinity). -define(SOCKET_SENDTO_TIMEOUT_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT). -define(SOCKET_RECV_FLAG_CMSG_CLOEXEC, 0). --define(SOCKET_RECV_FLAG_DONTWAIT, 1). --define(SOCKET_RECV_FLAG_ERRQUEUE, 2). --define(SOCKET_RECV_FLAG_OOB, 3). --define(SOCKET_RECV_FLAG_PEEK, 4). --define(SOCKET_RECV_FLAG_TRUNC, 5). --define(SOCKET_RECV_FLAG_WAITALL, 6). +-define(SOCKET_RECV_FLAG_ERRQUEUE, 1). +-define(SOCKET_RECV_FLAG_OOB, 2). +-define(SOCKET_RECV_FLAG_PEEK, 3). +-define(SOCKET_RECV_FLAG_TRUNC, 4). -define(SOCKET_RECV_FLAGS_DEFAULT, []). -define(SOCKET_RECV_TIMEOUT_DEFAULT, infinity). @@ -287,7 +290,8 @@ open(Domain, Type, Protocol0, Extra) when is_map(Extra) -> SocketInfo = #{domain => Domain, type => Type, protocol => Protocol}, - Socket = {socket, SocketInfo, SockRef}, + Socket = #socket{info = SocketInfo, + ref = SockRef}, {ok, Socket}; {error, _} = ERROR -> ERROR @@ -352,7 +356,7 @@ bind(Socket, Addr) when is_tuple(Addr) orelse Reason :: term(). %% Shall we keep info about domain so that we can verify address? -bind({socket, _, SockRef}, Addr, Port) +bind(#socket{ref = SockRef}, Addr, Port) when (is_tuple(Addr) andalso ((size(Addr) =:= 4) orelse (size(Addr) =:= 8))) orelse ((Addr =:= any) orelse (Addr =:= loopback)) andalso @@ -385,7 +389,7 @@ connect(Socket, Addr, Port) -> connect(_Socket, _Addr, _Port, Timeout) when (is_integer(Timeout) andalso (Timeout =< 0)) -> {error, timeout}; -connect({socket, _, SockRef}, Addr, Port, Timeout) +connect(#socket{ref = SockRef}, Addr, Port, Timeout) when (is_tuple(Addr) andalso ((size(Addr) =:= 4) orelse (size(Addr) =:= 8))) andalso (is_integer(Port) andalso (Port >= 0)) andalso @@ -424,7 +428,7 @@ connect({socket, _, SockRef}, Addr, Port, Timeout) listen(Socket) -> listen(Socket, ?SOCKET_LISTEN_BACKLOG_DEFAULT). -listen({socket, _, SockRef}, Backlog) +listen(#socket{ref = SockRef}, Backlog) when (is_integer(Backlog) andalso (Backlog >= 0)) -> nif_listen(SockRef, Backlog). @@ -448,7 +452,7 @@ accept(Socket) -> %% Do we really need this optimization? accept(_, Timeout) when is_integer(Timeout) andalso (Timeout =< 0) -> {error, timeout}; -accept({socket, SI, LSockRef}, Timeout) +accept(#socket{info = SI, ref = LSockRef}, Timeout) when is_integer(Timeout) orelse (Timeout =:= infinity) -> do_accept(LSockRef, SI, Timeout). @@ -460,7 +464,8 @@ do_accept(LSockRef, SI, Timeout) -> SocketInfo = #{domain => maps:get(domain, SI), type => maps:get(type, SI), protocol => maps:get(protocol, SI)}, - Socket = {socket, SocketInfo, SockRef}, + Socket = #socket{info = SocketInfo, + ref = SockRef}, {ok, Socket}; {error, eagain} -> NewTimeout = next_timeout(TS, Timeout), @@ -499,9 +504,10 @@ send(Socket, Data, Timeout) -> send(Socket, Data, Flags, Timeout) when is_list(Data) -> Bin = erlang:list_to_binary(Data), send(Socket, Bin, Flags, Timeout); -send(Socket, Data, Flags, Timeout) when is_binary(Data) andalso is_list(Flags) -> +send(#socket{ref = SockRef}, Data, Flags, Timeout) + when is_binary(Data) andalso is_list(Flags) -> EFlags = enc_send_flags(Flags), - do_send(Socket, Data, EFlags, Timeout). + do_send(SockRef, Data, EFlags, Timeout). do_send(SockRef, Data, EFlags, Timeout) -> TS = timestamp(Timeout), @@ -562,14 +568,14 @@ sendto(Socket, Data, Flags, DestAddr, DestPort) -> sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) when is_list(Data) -> Bin = erlang:list_to_binary(Data), sendto(Socket, Bin, Flags, DestAddr, DestPort, Timeout); -sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) +sendto(#socket{ref = SockRef}, Data, Flags, DestAddr, DestPort, Timeout) when is_binary(Data) andalso is_list(Flags) andalso (is_tuple(DestAddr) orelse (DestAddr =:= null)) andalso is_integer(DestPort) andalso (is_integer(Timeout) orelse (Timeout =:= infinity)) -> EFlags = enc_send_flags(Flags), - do_sendto(Socket, Data, EFlags, DestAddr, DestPort, Timeout). + do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, Timeout). do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, Timeout) -> TS = timestamp(Timeout), @@ -726,18 +732,18 @@ recv(Socket, Length, Timeout) -> Data :: binary(), Reason :: term(). -recv(Socket, Length, Flags, Timeout) +recv(#socket{ref = SockRef}, Length, Flags, Timeout) when (is_integer(Length) andalso (Length >= 0)) andalso is_list(Flags) andalso (is_integer(Timeout) orelse (Timeout =:= infinity)) -> EFlags = enc_recv_flags(Flags), - do_recv(Socket, undefined, Length, EFlags, <<>>, EFlags). + do_recv(SockRef, undefined, Length, EFlags, <<>>, Timeout). %% We need to pass the "old recv ref" around because of the special case %% with Length = 0. This case makes it neccessary to have a timeout function %% clause since we may never wait for anything (no receive select), and so the %% the only timeout check will be the function clause. -do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout) +do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) when (Timeout =:= infinity) orelse (is_integer(Timeout) andalso (Timeout > 0)) -> TS = timestamp(Timeout), @@ -754,7 +760,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout) %% > 0 - We got a part of the message and we will be notified %% when there is more to read (a select message) {ok, false = _Complete, Bin} when (Length =:= 0) -> - do_recv(Socket, RecvRef, + do_recv(SockRef, RecvRef, Length, EFlags, <<Acc/binary, Bin/binary>>, next_timeout(TS, Timeout)); @@ -766,7 +772,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout) NewTimeout = next_timeout(TS, Timeout), receive {select, SockRef, RecvRef, ready_input} -> - do_recv(Socket, RecvRef, + do_recv(SockRef, RecvRef, Length-size(Bin), EFlags, Bin, next_timeout(TS, Timeout)) @@ -781,7 +787,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout) NewTimeout = next_timeout(TS, Timeout), receive {select, SockRef, RecvRef, ready_input} -> - do_recv(Socket, RecvRef, + do_recv(SockRef, RecvRef, Length-size(Bin), EFlags, <<Acc/binary, Bin/binary>>, next_timeout(TS, Timeout)) @@ -801,7 +807,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout) NewTimeout = next_timeout(TS, Timeout), receive {select, SockRef, RecvRef, ready_input} -> - do_recv(Socket, RecvRef, + do_recv(SockRef, RecvRef, Length, EFlags, Acc, next_timeout(TS, Timeout)) @@ -819,37 +825,92 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout) end; -do_recv({socket, _, SockRef} = _Socket, RecvRef, - 0 = _Length, _Eflags, Acc, _Timeout) -> +do_recv(SockRef, RecvRef, 0 = _Length, _Eflags, Acc, _Timeout) -> %% The current recv operation is to be cancelled, so no need for a ref... %% The cancel will end our 'read everything you have' and "activate" %% any waiting reader. nif_cancel(SockRef, recv, RecvRef), {ok, Acc}; -do_recv(_Socket, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) -> +do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) -> {error, {timeout, Acc}}; -do_recv(_Socket, _RecvRef, _Length, _EFlags, _Acc, _Timeout) -> +do_recv(_SockRef, _RecvRef, _Length, _EFlags, _Acc, _Timeout) -> {error, timeout}. %% --------------------------------------------------------------------------- %% +%% With recvfrom we get messages, which means that regardless of how +%% much we want to read, we return when we get a message. +%% The MaxSize argument basically defines the size of our receive +%% buffer. By setting the size to zero (0), we use the configured +%% size (see setopt). +%% It may be impossible to know what (buffer) size is appropriate +%% "in advance", and in those cases it may be convenient to use the +%% (recv) 'peek' flag. When this flag is provided the message is *not* +%% "consumed" from the underlying buffers, so another recvfrom call +%% is needed, possibly with a then adjusted buffer size. +%% recvfrom(Socket) -> - recvfrom(Socket, ?SOCKET_RECV_FLAGS_DEFAULT). + recvfrom(Socket, 0). + +recvfrom(Socket, BufSz) -> + recvfrom(Socket, BufSz, + ?SOCKET_RECV_FLAGS_DEFAULT, + ?SOCKET_RECV_TIMEOUT_DEFAULT). + + +recvfrom(Socket, Flags, Timeout) when is_list(Flags) -> + recvfrom(Socket, 0, Flags, Timeout); +recvfrom(Socket, BufSz, Flags) when is_list(Flags) -> + recvfrom(Socket, BufSz, Flags, ?SOCKET_RECV_TIMEOUT_DEFAULT); +recvfrom(Socket, BufSz, Timeout) -> + recvfrom(Socket, BufSz, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout). + +-spec recvfrom(Socket, BufSz, Flags, Timeout) -> {ok, {SrcDomain, Source, Data}} | {error, Reason} when + Socket :: socket(), + BufSz :: non_neg_integer(), + Flags :: recv_flags(), + Timeout :: timeout(), + SrcDomain :: domain() | undefined, + Source :: {ip_address(), port_number()} | string() | undefined, + Data :: binary(), + Reason :: term(). + +recvfrom(#socket{ref = SockRef}, BufSz, Flags, Timeout) + when (is_integer(BufSz) andalso (BufSz >= 0)) andalso + is_list(Flags) andalso + (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + EFlags = enc_recv_flags(Flags), + do_recvfrom(SockRef, BufSz, EFlags, Timeout). --spec recvfrom(Socket, Flags) -> {ok, Data, SrcAddr, SrcPort} | {error, Reason} when - Socket :: socket(), - Flags :: recv_flags(), - Data :: binary(), - SrcAddr :: ip_address(), - SrcPort :: port_number(), - Reason :: term(). +do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> + TS = timestamp(Timeout), + RecvRef = make_ref(), + case nif_recvfrom(SockRef, RecvRef, BufSz, EFlags) of + {ok, {_Domain, _Source, _NewData}} = OK -> + OK; + + {error, eagain} -> + %% There is nothing just now, but we will be notified when there + %% is something to read (a select message). + NewTimeout = next_timeout(TS, Timeout), + receive + {select, SockRef, RecvRef, ready_input} -> + do_recvfrom(SockRef, BufSz, EFlags, + next_timeout(TS, Timeout)) + after NewTimeout -> + nif_cancel(SockRef, recvfrom, RecvRef), + flush_select_msgs(SockRef, RecvRef), + {error, timeout} + end; + + {error, _} = ERROR -> + ERROR + + end. -recvfrom({socket, _, SockRef}, Flags) when is_list(Flags) -> - EFlags = enc_recv_flags(Flags), - nif_recvfrom(SockRef, EFlags). %% --------------------------------------------------------------------------- @@ -998,7 +1059,6 @@ enc_protocol(Type, Proto) -> throw({error, {invalid_protocol, {Type, Proto}} enc_send_flags(Flags) -> EFlags = [{confirm, ?SOCKET_SEND_FLAG_CONFIRM}, {dontroute, ?SOCKET_SEND_FLAG_DONTROUTE}, - {dontwait, ?SOCKET_SEND_FLAG_DONTWAIT}, {eor, ?SOCKET_SEND_FLAG_EOR}, {more, ?SOCKET_SEND_FLAG_MORE}, {nosignal, ?SOCKET_SEND_FLAG_NOSIGNAL}, @@ -1010,12 +1070,10 @@ enc_send_flags(Flags) -> enc_recv_flags(Flags) -> EFlags = [{cmsg_cloexec, ?SOCKET_RECV_FLAG_CMSG_CLOEXEC}, - {dontwait, ?SOCKET_RECV_FLAG_DONTWAIT}, {errqueue, ?SOCKET_RECV_FLAG_ERRQUEUE}, {oob, ?SOCKET_RECV_FLAG_OOB}, {peek, ?SOCKET_RECV_FLAG_PEEK}, - {trunc, ?SOCKET_RECV_FLAG_TRUNC}, - {waitall, ?SOCKET_RECV_FLAG_WAITALL}], + {trunc, ?SOCKET_RECV_FLAG_TRUNC}], enc_flags(Flags, EFlags). @@ -1159,7 +1217,7 @@ nif_sendto(_SRef, _SendRef, _Data, _Flags, _Dest, _Port) -> nif_recv(_SRef, _RecvRef, _Length, _Flags) -> erlang:error(badarg). -nif_recvfrom(_SRef, _Flags) -> +nif_recvfrom(_SRef, _RecvRef, _Length, _Flags) -> erlang:error(badarg). nif_cancel(_SRef, _Op, _Ref) -> |