diff options
Diffstat (limited to 'erts/preloaded/src/socket.erl')
-rw-r--r-- | erts/preloaded/src/socket.erl | 705 |
1 files changed, 539 insertions, 166 deletions
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 126db66cdd..be94e3a867 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -26,9 +26,13 @@ %% Administrative and "global" utility functions -export([ on_load/0, on_load/1, - info/0, - supports/0, supports/1, supports/2, supports/3, - ensure_sockaddr/1 + + ensure_sockaddr/1, + + debug/1, + %% command/1, + info/0, info/1, + supports/0, supports/1, supports/2, supports/3 ]). -export([ @@ -53,10 +57,22 @@ getopt/3, sockname/1, - peername/1 + peername/1, + + cancel/2 ]). -export_type([ + select_tag/0, + select_ref/0, + select_info/0, + + socket_counters/0, + socket_counter/0, + socket_info/0, + + %% command/0, + domain/0, type/0, protocol/0, @@ -129,6 +145,27 @@ ]). +%% The command type has the general form: +%% #{ +%% command := atom(), +%% data := term() +%% } +%% But only certain values are actually valid, so the type gets the form: +-type debug_command() :: #{ + command := debug, + data := boolean() + }. +%% -type command() :: debug_command(). + +-type socket_counters() :: [{socket_counter(), non_neg_integer()}]. +-type socket_counter() :: read_byte | read_fails | read_pkg | read_tries | + read_waits | write_byte | write_fails | write_pkg | + write_tries | write_waits. +-type socket_info() :: #{counters := socket_counters(), + num_readers := non_neg_integer(), + num_writers := non_neg_integer(), + num_acceptors := non_neg_integer()}. + -type uint8() :: 0..16#FF. -type uint16() :: 0..16#FFFF. -type uint20() :: 0..16#FFFFF. @@ -585,6 +622,17 @@ #{level := integer(), type := integer(), data := binary()}. +-opaque select_tag() :: atom(). +-opaque select_ref() :: reference(). + +-record(select_info, {tag :: select_tag(), ref :: select_ref()}). + +-type select_info() :: #select_info{}. + +-define(SELECT_INFO(T, R), #select_info{tag = T, ref = R}). +-define(SELECT(T, R), {select, ?SELECT_INFO(T, R)}). + + %% This is used in messages sent from the nif-code to erlang processes: %% %% {?SOCKET_TAG, Socket :: socket(), Tag :: atom(), Info :: term()} @@ -602,12 +650,13 @@ %% -define(SOCKET_TYPE_RDM, 4). -define(SOCKET_TYPE_SEQPACKET, 5). --define(SOCKET_PROTOCOL_IP, 1). --define(SOCKET_PROTOCOL_TCP, 2). --define(SOCKET_PROTOCOL_UDP, 3). --define(SOCKET_PROTOCOL_SCTP, 4). --define(SOCKET_PROTOCOL_ICMP, 5). --define(SOCKET_PROTOCOL_IGMP, 6). +-define(SOCKET_PROTOCOL_DEFAULT, 0). +-define(SOCKET_PROTOCOL_IP, 1). +-define(SOCKET_PROTOCOL_TCP, 2). +-define(SOCKET_PROTOCOL_UDP, 3). +-define(SOCKET_PROTOCOL_SCTP, 4). +-define(SOCKET_PROTOCOL_ICMP, 5). +-define(SOCKET_PROTOCOL_IGMP, 6). -define(SOCKET_LISTEN_BACKLOG_DEFAULT, 5). @@ -820,6 +869,7 @@ -define(SOCKET_SUPPORTS_OPTIONS, 16#0001). -define(SOCKET_SUPPORTS_SCTP, 16#0002). -define(SOCKET_SUPPORTS_IPV6, 16#0003). +-define(SOCKET_SUPPORTS_LOCAL, 16#0004). %% =========================================================================== @@ -842,12 +892,46 @@ on_load(Extra) -> --spec info() -> list(). +-spec info() -> map(). info() -> nif_info(). +-spec debug(D) -> ok when + D :: boolean(). + +debug(D) when is_boolean(D) -> + command(#{command => debug, + data => D}). + + +-spec command(Command) -> ok when + Command :: debug_command(). + +command(#{command := debug, + data := Dbg} = Command) when is_boolean(Dbg) -> + nif_command(Command). + + + +%% =========================================================================== +%% +%% info - Get miscellaneous information about a socket. +%% +%% Generates a list of various info about the socket, such as counter values. +%% +%% Do *not* call this function often. +%% +%% =========================================================================== + +-spec info(Socket) -> socket_info() when + Socket :: socket(). + +info(#socket{ref = SockRef}) -> + nif_info(SockRef). + + %% =========================================================================== %% @@ -875,18 +959,21 @@ info() -> -spec supports() -> [{options, supports_options()} | {sctp, boolean()} | - {ipv6, boolean()}]. + {ipv6, boolean()} | + {local, boolean()}]. supports() -> [{options, supports(options)}, {sctp, supports(sctp)}, - {ipv6, supports(ipv6)}]. + {ipv6, supports(ipv6)}, + {local, supports(local)}]. -dialyzer({nowarn_function, supports/1}). -spec supports(options) -> supports_options(); (sctp) -> boolean(); (ipv6) -> boolean(); + (local) -> boolean(); (Key1) -> false when Key1 :: term(). @@ -896,6 +983,8 @@ supports(sctp) -> nif_supports(?SOCKET_SUPPORTS_SCTP); supports(ipv6) -> nif_supports(?SOCKET_SUPPORTS_IPV6); +supports(local) -> + nif_supports(?SOCKET_SUPPORTS_LOCAL); supports(_Key1) -> false. @@ -1006,12 +1095,12 @@ supports(_Key1, _Key2, _Key3) -> Reason :: term(). open(Domain, Type) -> - open(Domain, Type, null). + open(Domain, Type, default). -spec open(Domain, Type, Protocol) -> {ok, Socket} | {error, Reason} when Domain :: domain(), Type :: type(), - Protocol :: null | protocol(), + Protocol :: default | protocol(), Socket :: socket(), Reason :: term(). @@ -1021,15 +1110,14 @@ open(Domain, Type, Protocol) -> -spec open(Domain, Type, Protocol, Extra) -> {ok, Socket} | {error, Reason} when Domain :: domain(), Type :: type(), - Protocol :: null | protocol(), + Protocol :: default | protocol(), Extra :: map(), Socket :: socket(), Reason :: term(). -open(Domain, Type, Protocol0, Extra) when is_map(Extra) -> +open(Domain, Type, Protocol, Extra) when is_map(Extra) -> try begin - Protocol = default_protocol(Protocol0, Type), EDomain = enc_domain(Domain), EType = enc_type(Domain, Type), EProtocol = enc_protocol(Type, Protocol), @@ -1052,15 +1140,6 @@ open(Domain, Type, Protocol0, Extra) when is_map(Extra) -> {error, Reason} end. -%% Note that this is just a convenience function for when the protocol was -%% not specified. If its actually specified, then that will be selected. -%% Also, this only works for the some of the type's (stream, dgram and -%% seqpacket). -default_protocol(null, stream) -> tcp; -default_protocol(null, dgram) -> udp; -default_protocol(null, seqpacket) -> sctp; -default_protocol(null, Type) -> throw({error, {no_default_protocol, Type}}); -default_protocol(Protocol, _) -> Protocol. %% =========================================================================== @@ -1192,18 +1271,24 @@ validate_inet6_addrs(Addrs) -> %% -spec connect(Socket, SockAddr) -> ok | {error, Reason} when - Socket :: socket(), - SockAddr :: sockaddr(), - Reason :: term(). + Socket :: socket(), + SockAddr :: sockaddr(), + Reason :: term(). connect(Socket, SockAddr) -> connect(Socket, SockAddr, infinity). --spec connect(Socket, SockAddr, Timeout) -> ok | {error, Reason} when - Socket :: socket(), - SockAddr :: sockaddr(), - Timeout :: timeout(), - Reason :: term(). +-spec connect(Socket, SockAddr, nowait) -> + ok | {select, SelectInfo} | {error, Reason} when + Socket :: socket(), + SockAddr :: sockaddr(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, SockAddr, Timeout) -> ok | {error, Reason} when + Socket :: socket(), + SockAddr :: sockaddr(), + Timeout :: timeout(), + Reason :: term(). %% <KOLLA> %% Is it possible to connect with family = local for the (dest) sockaddr? @@ -1213,12 +1298,18 @@ connect(_Socket, _SockAddr, Timeout) {error, timeout}; connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout) when ((Fam =:= inet) orelse (Fam =:= inet6) orelse (Fam =:= local)) andalso - ((Timeout =:= infinity) orelse is_integer(Timeout)) -> + ((Timeout =:= nowait) orelse + (Timeout =:= infinity) orelse is_integer(Timeout)) -> TS = timestamp(Timeout), case nif_connect(SockRef, SockAddr) of ok -> %% Connected! ok; + + {ok, Ref} when (Timeout =:= nowait) -> + %% Connecting, but the caller does not want to wait... + ?SELECT(connect, Ref); + {ok, Ref} -> %% Connecting... NewTimeout = next_timeout(TS, Timeout), @@ -1271,17 +1362,27 @@ listen(#socket{ref = SockRef}, Backlog) accept(Socket) -> accept(Socket, ?SOCKET_ACCEPT_TIMEOUT_DEFAULT). --spec accept(LSocket, Timeout) -> {ok, Socket} | {error, Reason} when - LSocket :: socket(), - Timeout :: timeout(), - Socket :: socket(), - Reason :: term(). +-spec accept(LSocket, nowait) -> + {ok, Socket} | + {select, SelectInfo} | + {error, Reason} when + LSocket :: socket(), + Socket :: socket(), + SelectInfo :: select_info(), + Reason :: term() + ; (LSocket, Timeout) -> {ok, Socket} | {error, Reason} when + LSocket :: socket(), + Timeout :: timeout(), + Socket :: socket(), + Reason :: term(). %% Do we really need this optimization? accept(_, Timeout) when is_integer(Timeout) andalso (Timeout =< 0) -> {error, timeout}; accept(#socket{ref = LSockRef}, Timeout) - when is_integer(Timeout) orelse (Timeout =:= infinity) -> + when is_integer(Timeout) orelse + (Timeout =:= infinity) orelse + (Timeout =:= nowait) -> do_accept(LSockRef, Timeout). do_accept(LSockRef, Timeout) -> @@ -1292,6 +1393,11 @@ do_accept(LSockRef, Timeout) -> Socket = #socket{ref = SockRef}, {ok, Socket}; + + {error, eagain} when (Timeout =:= nowait) -> + ?SELECT(accept, AccRef); + + {error, eagain} -> %% Each call is non-blocking, but even then it takes %% *some* time, so just to be sure, recalculate before @@ -1330,33 +1436,56 @@ send(Socket, Data) -> send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT). -spec send(Socket, Data, Flags) -> ok | {error, Reason} when - Socket :: socket(), - Data :: iodata(), - Flags :: send_flags(), - Reason :: term() + Socket :: socket(), + Data :: iodata(), + Flags :: send_flags(), + Reason :: term() + ; (Socket, Data, Timeout :: nowait) -> ok | + {select, SelectInfo} | + {ok, {RestData, SelectInfo}} | + {error, Reason} when + Socket :: socket(), + Data :: iodata(), + RestData :: binary(), + SelectInfo :: select_info(), + Reason :: term() ; (Socket, Data, Timeout) -> ok | {error, Reason} when - Socket :: socket(), - Data :: iodata(), - Timeout :: timeout(), - Reason :: term(). + Socket :: socket(), + Data :: iodata(), + Timeout :: timeout(), + Reason :: term(). send(Socket, Data, Flags) when is_list(Flags) -> send(Socket, Data, Flags, ?SOCKET_SEND_TIMEOUT_DEFAULT); send(Socket, Data, Timeout) -> send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, Timeout). --spec send(Socket, Data, Flags, Timeout) -> ok | {error, Reason} when - Socket :: socket(), - Data :: iodata(), - Flags :: send_flags(), - Timeout :: timeout(), - Reason :: term(). +-spec send(Socket, Data, Flags, nowait) -> ok | + {select, SelectInfo} | + {ok, {RestData, SelectInfo}} | + {error, Reason} when + Socket :: socket(), + Data :: iodata(), + Flags :: send_flags(), + RestData :: binary(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, Data, Flags, Timeout) -> ok | {error, Reason} when + Socket :: socket(), + Data :: iodata(), + Flags :: send_flags(), + Timeout :: timeout(), + Reason :: term(). send(Socket, Data, Flags, Timeout) when is_list(Data) -> Bin = erlang:list_to_binary(Data), send(Socket, Bin, Flags, Timeout); send(#socket{ref = SockRef}, Data, Flags, Timeout) - when is_binary(Data) andalso is_list(Flags) -> + when is_binary(Data) andalso + is_list(Flags) andalso + ((Timeout =:= nowait) orelse + (Timeout =:= infinity) orelse + (is_integer(Timeout) andalso (Timeout > 0))) -> EFlags = enc_send_flags(Flags), do_send(SockRef, Data, EFlags, Timeout). @@ -1366,6 +1495,15 @@ do_send(SockRef, Data, EFlags, Timeout) -> case nif_send(SockRef, SendRef, Data, EFlags) of ok -> ok; + + + {ok, Written} when (Timeout =:= nowait) -> + <<_:Written/binary, Rest/binary>> = Data, + %% We are partially done, but the user don't want to wait (here) + %% for completion + {ok, {Rest, ?SELECT_INFO(send, SendRef)}}; + + {ok, Written} -> NewTimeout = next_timeout(TS, Timeout), %% We are partially done, wait for continuation @@ -1387,6 +1525,12 @@ do_send(SockRef, Data, EFlags, Timeout) -> cancel(SockRef, send, SendRef), {error, {timeout, size(Data)}} end; + + + {error, eagain} when (Timeout =:= nowait) -> + ?SELECT(send, SendRef); + + {error, eagain} -> receive {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> @@ -1422,17 +1566,25 @@ sendto(Socket, Data, Dest) -> sendto(Socket, Data, Dest, ?SOCKET_SENDTO_FLAGS_DEFAULT). -spec sendto(Socket, Data, Dest, Flags) -> ok | {error, Reason} when - Socket :: socket(), - Data :: binary(), - Dest :: null | sockaddr(), - Flags :: send_flags(), - Reason :: term() + Socket :: socket(), + Data :: binary(), + Dest :: null | sockaddr(), + Flags :: send_flags(), + Reason :: term() + ; (Socket, Data, Dest, Timeout :: nowait) -> ok | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + Data :: iodata(), + Dest :: null | sockaddr(), + SelectInfo :: select_info(), + Reason :: term() ; (Socket, Data, Dest, Timeout) -> ok | {error, Reason} when - Socket :: socket(), - Data :: iodata(), - Dest :: null | sockaddr(), - Timeout :: timeout(), - Reason :: term(). + Socket :: socket(), + Data :: iodata(), + Dest :: null | sockaddr(), + Timeout :: timeout(), + Reason :: term(). sendto(Socket, Data, Dest, Flags) when is_list(Flags) -> sendto(Socket, Data, Dest, Flags, ?SOCKET_SENDTO_TIMEOUT_DEFAULT); @@ -1440,13 +1592,22 @@ sendto(Socket, Data, Dest, Timeout) -> sendto(Socket, Data, Dest, ?SOCKET_SENDTO_FLAGS_DEFAULT, Timeout). --spec sendto(Socket, Data, Dest, Flags, Timeout) -> ok | {error, Reason} when - Socket :: socket(), - Data :: binary(), - Dest :: null | sockaddr(), - Flags :: send_flags(), - Timeout :: timeout(), - Reason :: term(). +-spec sendto(Socket, Data, Dest, Flags, nowait) -> ok | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + Data :: binary(), + Dest :: null | sockaddr(), + Flags :: send_flags(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, Data, Dest, Flags, Timeout) -> ok | {error, Reason} when + Socket :: socket(), + Data :: binary(), + Dest :: null | sockaddr(), + Flags :: send_flags(), + Timeout :: timeout(), + Reason :: term(). sendto(Socket, Data, Dest, Flags, Timeout) when is_list(Data) -> Bin = erlang:list_to_binary(Data), @@ -1455,14 +1616,18 @@ sendto(#socket{ref = SockRef}, Data, Dest, Flags, Timeout) when is_binary(Data) andalso (Dest =:= null) andalso is_list(Flags) andalso - (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + ((Timeout =:= nowait) orelse + (Timeout =:= infinity) orelse + (is_integer(Timeout) andalso (Timeout > 0))) -> EFlags = enc_send_flags(Flags), do_sendto(SockRef, Data, Dest, EFlags, Timeout); sendto(#socket{ref = SockRef}, Data, #{family := Fam} = Dest, Flags, Timeout) when is_binary(Data) andalso ((Fam =:= inet) orelse (Fam =:= inet6) orelse (Fam =:= local)) andalso is_list(Flags) andalso - (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + ((Timeout =:= nowait) orelse + (Timeout =:= infinity) orelse + (is_integer(Timeout) andalso (Timeout > 0))) -> EFlags = enc_send_flags(Flags), do_sendto(SockRef, Data, Dest, EFlags, Timeout). @@ -1474,6 +1639,11 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> %% We are done ok; + {ok, Written} when (Timeout =:= nowait) -> + <<_:Written/binary, Rest/binary>> = Data, + {ok, {Rest, ?SELECT_INFO(sendto, SendRef)}}; + + {ok, Written} -> %% We are partially done, wait for continuation receive @@ -1495,6 +1665,11 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> {error, timeout} end; + + {error, eagain} when (Timeout =:= nowait) -> + ?SELECT(sendto, SendRef); + + {error, eagain} -> receive {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> @@ -1538,11 +1713,18 @@ sendmsg(Socket, MsgHdr) -> MsgHdr :: msghdr(), Flags :: send_flags(), Reason :: term() + ; (Socket, MsgHdr, Timeout :: nowait) -> ok | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + MsgHdr :: msghdr(), + SelectInfo :: select_info(), + Reason :: term() ; (Socket, MsgHdr, Timeout) -> ok | {error, Reason} when - Socket :: socket(), - MsgHdr :: msghdr(), - Timeout :: timeout(), - Reason :: term(). + Socket :: socket(), + MsgHdr :: msghdr(), + Timeout :: timeout(), + Reason :: term(). sendmsg(Socket, MsgHdr, Flags) when is_list(Flags) -> sendmsg(Socket, MsgHdr, Flags, ?SOCKET_SENDMSG_TIMEOUT_DEFAULT); @@ -1551,19 +1733,34 @@ sendmsg(Socket, MsgHdr, Timeout) sendmsg(Socket, MsgHdr, ?SOCKET_SENDMSG_FLAGS_DEFAULT, Timeout). --spec sendmsg(Socket, MsgHdr, Flags, Timeout) -> - ok | {ok, Remaining} | {error, Reason} when - Socket :: socket(), - MsgHdr :: msghdr(), - Flags :: send_flags(), - Timeout :: timeout(), - Remaining :: erlang:iovec(), - Reason :: term(). +-spec sendmsg(Socket, MsgHdr, Flags, nowait) -> + ok | + {ok, Remaining} | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + MsgHdr :: msghdr(), + Flags :: send_flags(), + Remaining :: erlang:iovec(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, MsgHdr, Flags, Timeout) -> + ok | + {ok, Remaining} | + {error, Reason} when + Socket :: socket(), + MsgHdr :: msghdr(), + Flags :: send_flags(), + Timeout :: timeout(), + Remaining :: erlang:iovec(), + Reason :: term(). sendmsg(#socket{ref = SockRef}, #{iov := IOV} = MsgHdr, Flags, Timeout) when is_list(IOV) andalso is_list(Flags) andalso - (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + ((Timeout =:= nowait) orelse + (Timeout =:= infinity) orelse + (is_integer(Timeout) andalso (Timeout > 0))) -> try ensure_msghdr(MsgHdr) of M -> EFlags = enc_send_flags(Flags), @@ -1583,6 +1780,7 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> %% We are done ok; + {ok, Written} when is_integer(Written) andalso (Written > 0) -> %% We should not retry here since the protocol may not %% be able to handle a message being split. Leave it to @@ -1594,6 +1792,11 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> cancel(SockRef, sendmsg, SendRef), {ok, do_sendmsg_rest(maps:get(iov, MsgHdr), Written)}; + + {error, eagain} when (Timeout =:= nowait) -> + ?SELECT(sendmsg, SendRef); + + {error, eagain} -> receive {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> @@ -1667,13 +1870,24 @@ recv(Socket, Length) -> ?SOCKET_RECV_FLAGS_DEFAULT, ?SOCKET_RECV_TIMEOUT_DEFAULT). --spec recv(Socket, Length, Flags) -> {ok, Data} | {error, Reason} when +-spec recv(Socket, Length, Flags) -> {ok, Data} | + {error, Reason} when Socket :: socket(), Length :: non_neg_integer(), Flags :: recv_flags(), Data :: binary(), Reason :: term() - ; (Socket, Length, Timeout) -> {ok, Data} | {error, Reason} when + ; (Socket, Length, Timeout :: nowait) -> {ok, Data} | + {select, SelectInfo} | + {ok, {Data, SelectInfo}} | + {error, Reason} when + Socket :: socket(), + Length :: non_neg_integer(), + Data :: binary(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, Length, Timeout) -> {ok, Data} | + {error, Reason} when Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout(), @@ -1685,18 +1899,31 @@ recv(Socket, Length, Flags) when is_list(Flags) -> recv(Socket, Length, Timeout) -> recv(Socket, Length, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout). --spec recv(Socket, Length, Flags, Timeout) -> {ok, Data} | {error, Reason} when - Socket :: socket(), - Length :: non_neg_integer(), - Flags :: recv_flags(), - Timeout :: timeout(), - Data :: binary(), - Reason :: term(). +-spec recv(Socket, Length, Flags, nowait) -> {ok, Data} | + {select, SelectInfo} | + {ok, {Data, SelectInfo}} | + {error, Reason} when + Socket :: socket(), + Length :: non_neg_integer(), + Flags :: recv_flags(), + Data :: binary(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, Length, Flags, Timeout) -> {ok, Data} | + {error, Reason} when + Socket :: socket(), + Length :: non_neg_integer(), + Flags :: recv_flags(), + Timeout :: timeout(), + Data :: binary(), + Reason :: term(). recv(#socket{ref = SockRef}, Length, Flags, Timeout) when (is_integer(Length) andalso (Length >= 0)) andalso is_list(Flags) andalso - (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + (is_integer(Timeout) orelse + (Timeout =:= infinity) orelse + (Timeout =:= nowait)) -> EFlags = enc_recv_flags(Flags), do_recv(SockRef, undefined, Length, EFlags, <<>>, Timeout). @@ -1704,8 +1931,12 @@ recv(#socket{ref = SockRef}, Length, Flags, Timeout) %% with Length = 0. This case makes it neccessary to have a timeout function %% clause since we may never wait for anything (no receive select), and so the %% the only timeout check will be the function clause. +%% Note that the Timeout value of 'nowait' has a special meaning. It means +%% that we will either return with data or with the with {error, NNNN}. In +%% wich case the caller will receive a select message at some later time. do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) - when (Timeout =:= infinity) orelse + when (Timeout =:= nowait) orelse + (Timeout =:= infinity) orelse (is_integer(Timeout) andalso (Timeout > 0)) -> TS = timestamp(Timeout), RecvRef = make_ref(), @@ -1726,6 +1957,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) <<Acc/binary, Bin/binary>>, next_timeout(TS, Timeout)); + + %% Did not get all the user asked for, but the user also + %% specified 'nowait', so deliver what we got and the + %% select info. + {ok, false = _Completed, Bin} when (Timeout =:= nowait) andalso + (size(Acc) =:= 0) -> + {ok, {Bin, ?SELECT_INFO(recv, RecvRef)}}; + + {ok, false = _Completed, Bin} when (size(Acc) =:= 0) -> %% We got the first chunk of it. %% We will be notified (select message) when there @@ -1764,6 +2004,17 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) {error, {timeout, Acc}} end; + + %% The user does not want to wait! + %% The user will be informed that there is something to read + %% via the select socket message (see below). + + {error, eagain} when (Timeout =:= nowait) andalso (size(Acc) =:= 0) -> + ?SELECT(recv, RecvRef); + {error, eagain} when (Timeout =:= nowait) -> + {ok, {Acc, ?SELECT_INFO(recv, RecvRef)}}; + + %% We return with the accumulated binary (if its non-empty) {error, eagain} when (Length =:= 0) andalso (size(Acc) > 0) -> %% CAN WE REALLY DO THIS? THE NIF HAS SELECTED!! OR? @@ -1854,30 +2105,51 @@ recvfrom(Socket, BufSz) -> ?SOCKET_RECV_FLAGS_DEFAULT, ?SOCKET_RECV_TIMEOUT_DEFAULT). --spec recvfrom(Socket, Flags, Timeout) -> - {ok, {Source, Data}} | {error, Reason} when - Socket :: socket(), - Flags :: recv_flags(), - Timeout :: timeout(), - Source :: sockaddr() | undefined, - Data :: binary(), - Reason :: term() +-spec recvfrom(Socket, Flags, nowait) -> + {ok, {Source, Data}} | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + Flags :: recv_flags(), + Source :: sockaddr() | undefined, + Data :: binary(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, Flags, Timeout) -> + {ok, {Source, Data}} | + {error, Reason} when + Socket :: socket(), + Flags :: recv_flags(), + Timeout :: timeout(), + Source :: sockaddr() | undefined, + Data :: binary(), + Reason :: term() ; (Socket, BufSz, Flags) -> {ok, {Source, Data}} | {error, Reason} when - Socket :: socket(), - BufSz :: non_neg_integer(), - Flags :: recv_flags(), - Source :: sockaddr() | undefined, - Data :: binary(), - Reason :: term() + Socket :: socket(), + BufSz :: non_neg_integer(), + Flags :: recv_flags(), + Source :: sockaddr() | undefined, + Data :: binary(), + Reason :: term() + ; (Socket, BufSz, nowait) -> + {ok, {Source, Data}} | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + BufSz :: non_neg_integer(), + Source :: sockaddr() | undefined, + Data :: binary(), + SelectInfo :: select_info(), + Reason :: term() ; (Socket, BufSz, Timeout) -> {ok, {Source, Data}} | {error, Reason} when - Socket :: socket(), - BufSz :: non_neg_integer(), - Timeout :: timeout(), - Source :: sockaddr() | undefined, - Data :: binary(), - Reason :: term(). + Socket :: socket(), + BufSz :: non_neg_integer(), + Timeout :: timeout(), + Source :: sockaddr() | undefined, + Data :: binary(), + Reason :: term(). recvfrom(Socket, Flags, Timeout) when is_list(Flags) -> recvfrom(Socket, 0, Flags, Timeout); @@ -1886,20 +2158,34 @@ recvfrom(Socket, BufSz, Flags) when is_list(Flags) -> recvfrom(Socket, BufSz, Timeout) -> recvfrom(Socket, BufSz, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout). --spec recvfrom(Socket, BufSz, Flags, Timeout) -> - {ok, {Source, Data}} | {error, Reason} when - Socket :: socket(), - BufSz :: non_neg_integer(), - Flags :: recv_flags(), - Timeout :: timeout(), - Source :: sockaddr() | undefined, - Data :: binary(), - Reason :: term(). +-spec recvfrom(Socket, BufSz, Flags, nowait) -> + {ok, {Source, Data}} | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + BufSz :: non_neg_integer(), + Flags :: recv_flags(), + Source :: sockaddr() | undefined, + Data :: binary(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, BufSz, Flags, Timeout) -> + {ok, {Source, Data}} | + {error, Reason} when + Socket :: socket(), + BufSz :: non_neg_integer(), + Flags :: recv_flags(), + Timeout :: timeout(), + Source :: sockaddr() | undefined, + Data :: binary(), + Reason :: term(). recvfrom(#socket{ref = SockRef}, BufSz, Flags, Timeout) when (is_integer(BufSz) andalso (BufSz >= 0)) andalso is_list(Flags) andalso - (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + (is_integer(Timeout) orelse + (Timeout =:= infinity) orelse + (Timeout =:= nowait)) -> EFlags = enc_recv_flags(Flags), do_recvfrom(SockRef, BufSz, EFlags, Timeout). @@ -1910,6 +2196,11 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> {ok, {_Source, _NewData}} = OK -> OK; + + {error, eagain} when (Timeout =:= nowait) -> + ?SELECT(recvfrom, RecvRef); + + {error, eagain} -> %% There is nothing just now, but we will be notified when there %% is something to read (a select message). @@ -1950,29 +2241,44 @@ recvmsg(Socket) -> Flags :: recv_flags(), MsgHdr :: msghdr(), Reason :: term() + ; (Socket, Timeout :: nowait) -> {ok, MsgHdr} | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + MsgHdr :: msghdr(), + SelectInfo :: select_info(), + Reason :: term() ; (Socket, Timeout) -> {ok, MsgHdr} | {error, Reason} when - Socket :: socket(), - Timeout :: timeout(), - MsgHdr :: msghdr(), - Reason :: term(). + Socket :: socket(), + Timeout :: timeout(), + MsgHdr :: msghdr(), + Reason :: term(). recvmsg(Socket, Flags) when is_list(Flags) -> recvmsg(Socket, 0, 0, Flags, ?SOCKET_RECV_TIMEOUT_DEFAULT); recvmsg(Socket, Timeout) -> recvmsg(Socket, 0, 0, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout). --spec recvmsg(Socket, Flags, Timeout) -> {ok, MsgHdr} | {error, Reason} when - Socket :: socket(), - Flags :: recv_flags(), - Timeout :: timeout(), - MsgHdr :: msghdr(), - Reason :: term() +-spec recvmsg(Socket, Flags, nowait) -> {ok, MsgHdr} | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + Flags :: recv_flags(), + MsgHdr :: msghdr(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, Flags, Timeout) -> {ok, MsgHdr} | {error, Reason} when + Socket :: socket(), + Flags :: recv_flags(), + Timeout :: timeout(), + MsgHdr :: msghdr(), + Reason :: term() ; (Socket, BufSz, CtrlSz) -> {ok, MsgHdr} | {error, Reason} when - Socket :: socket(), - BufSz :: non_neg_integer(), - CtrlSz :: non_neg_integer(), - MsgHdr :: msghdr(), - Reason :: term(). + Socket :: socket(), + BufSz :: non_neg_integer(), + CtrlSz :: non_neg_integer(), + MsgHdr :: msghdr(), + Reason :: term(). recvmsg(Socket, Flags, Timeout) when is_list(Flags) -> recvmsg(Socket, 0, 0, Flags, Timeout); @@ -1983,20 +2289,34 @@ recvmsg(Socket, BufSz, CtrlSz) when is_integer(BufSz) andalso is_integer(CtrlSz) -spec recvmsg(Socket, BufSz, CtrlSz, - Flags, Timeout) -> {ok, MsgHdr} | {error, Reason} when - Socket :: socket(), - BufSz :: non_neg_integer(), - CtrlSz :: non_neg_integer(), - Flags :: recv_flags(), - Timeout :: timeout(), - MsgHdr :: msghdr(), - Reason :: term(). + Flags, nowait) -> {ok, MsgHdr} | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + BufSz :: non_neg_integer(), + CtrlSz :: non_neg_integer(), + Flags :: recv_flags(), + MsgHdr :: msghdr(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, + BufSz, CtrlSz, + Flags, Timeout) -> {ok, MsgHdr} | {error, Reason} when + Socket :: socket(), + BufSz :: non_neg_integer(), + CtrlSz :: non_neg_integer(), + Flags :: recv_flags(), + Timeout :: timeout(), + MsgHdr :: msghdr(), + Reason :: term(). recvmsg(#socket{ref = SockRef}, BufSz, CtrlSz, Flags, Timeout) when (is_integer(BufSz) andalso (BufSz >= 0)) andalso (is_integer(CtrlSz) andalso (CtrlSz >= 0)) andalso is_list(Flags) andalso - (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + (is_integer(Timeout) orelse + (Timeout =:= infinity) orelse + (Timeout =:= nowait)) -> EFlags = enc_recv_flags(Flags), do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout). @@ -2007,6 +2327,11 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> {ok, _MsgHdr} = OK -> OK; + + {error, eagain} when (Timeout =:= nowait) -> + ?SELECT(recvmsg, RecvRef); + + {error, eagain} -> %% There is nothing just now, but we will be notified when there %% is something to read (a select message). @@ -2342,6 +2667,25 @@ peername(#socket{ref = SockRef}) -> nif_peername(SockRef). +%% =========================================================================== +%% +%% cancel - cancel an operation resulting in a select +%% +%% A call to accept, recv/recvfrom/recvmsg and send/sendto/sendmsg +%% can result in a select if they are called with the Timeout argument +%% set to nowait. This is indicated by the return of the select-info. +%% Such a operation can be cancelled by calling this function. +%% + +-spec cancel(Socket, SelectInfo) -> ok | {error, Reason} when + Socket :: socket(), + SelectInfo :: select_info(), + Reason :: term(). + +cancel(#socket{ref = SockRef}, #select_info{tag = Tag, ref = Ref}) -> + cancel(SockRef, Tag, Ref). + + %% =========================================================================== %% @@ -2355,7 +2699,7 @@ peername(#socket{ref = SockRef}) -> enc_domain(local) -> ?SOCKET_DOMAIN_LOCAL; enc_domain(inet) -> ?SOCKET_DOMAIN_INET; enc_domain(inet6) -> ?SOCKET_DOMAIN_INET6; -enc_domain(Domain) -> throw({error, {invalid_domain, Domain}}). +enc_domain(Domain) -> invalid_domain(Domain). -spec enc_type(Domain, Type) -> non_neg_integer() when Domain :: domain(), @@ -2366,22 +2710,23 @@ enc_type(_, stream) -> ?SOCKET_TYPE_STREAM; enc_type(_, dgram) -> ?SOCKET_TYPE_DGRAM; enc_type(_, raw) -> ?SOCKET_TYPE_RAW; enc_type(_, seqpacket) -> ?SOCKET_TYPE_SEQPACKET; -enc_type(_, Type) -> throw({error, {invalid_type, Type}}). +enc_type(_, Type) -> invalid_type(Type). -spec enc_protocol(Type, Protocol) -> non_neg_integer() | {raw, non_neg_integer()} when Type :: type(), Protocol :: protocol(). -enc_protocol(dgram, ip) -> ?SOCKET_PROTOCOL_IP; -enc_protocol(stream, tcp) -> ?SOCKET_PROTOCOL_TCP; -enc_protocol(dgram, udp) -> ?SOCKET_PROTOCOL_UDP; -enc_protocol(seqpacket, sctp) -> ?SOCKET_PROTOCOL_SCTP; -enc_protocol(raw, icmp) -> ?SOCKET_PROTOCOL_ICMP; -enc_protocol(raw, igmp) -> ?SOCKET_PROTOCOL_IGMP; +enc_protocol(_, default) -> ?SOCKET_PROTOCOL_DEFAULT; +enc_protocol(dgram, ip) -> ?SOCKET_PROTOCOL_IP; +enc_protocol(stream, tcp) -> ?SOCKET_PROTOCOL_TCP; +enc_protocol(dgram, udp) -> ?SOCKET_PROTOCOL_UDP; +enc_protocol(seqpacket, sctp) -> ?SOCKET_PROTOCOL_SCTP; +enc_protocol(raw, icmp) -> ?SOCKET_PROTOCOL_ICMP; +enc_protocol(raw, igmp) -> ?SOCKET_PROTOCOL_IGMP; enc_protocol(raw, {raw, P} = RAW) when is_integer(P) -> RAW; enc_protocol(Type, Proto) -> - throw({error, {invalid_protocol, {Type, Proto}}}). + invalid_protocol(Type, Proto). -spec enc_send_flags(Flags) -> non_neg_integer() when @@ -2532,7 +2877,7 @@ enc_setopt_value(otp, rcvbuf, V, _, _, _) when is_integer(V) andalso (V > 0) -> V; %% N: Number of reads (when specifying length = 0) %% V: Size of the "read" buffer -enc_setopt_value(otp, rcvbuf, {N, BufSz} = V, _, stream = _T, tcp = _P) +enc_setopt_value(otp, rcvbuf, {N, BufSz} = V, _, stream = _T, _P) when (is_integer(N) andalso (N > 0)) andalso (is_integer(BufSz) andalso (BufSz > 0)) -> V; @@ -3464,15 +3809,18 @@ flush_select_msgs(SockRef, Ref) -> %% A timestamp in ms +timestamp(nowait = T) -> + T; timestamp(infinity) -> undefined; timestamp(_) -> timestamp(). timestamp() -> - {A,B,C} = os:timestamp(), - A*1000000000+B*1000+(C div 1000). + erlang:monotonic_time(milli_seconds). +next_timeout(_, nowait = Timeout) -> + Timeout; next_timeout(_, infinity = Timeout) -> Timeout; next_timeout(TS, Timeout) -> @@ -3510,6 +3858,25 @@ tdiff(T1, T2) -> %% %% =========================================================================== +-spec invalid_domain(Domain) -> no_return() when + Domain :: term(). + +invalid_domain(Domain) -> + error({invalid_domain, Domain}). + +-spec invalid_type(Type) -> no_return() when + Type :: term(). + +invalid_type(Type) -> + error({invalid_type, Type}). + +-spec invalid_protocol(Type, Proto) -> no_return() when + Type :: term(), + Proto :: term(). + +invalid_protocol(Type, Proto) -> + error({invalid_protocol, {Type, Proto}}). + -spec not_supported(What) -> no_return() when What :: term(). @@ -3543,6 +3910,12 @@ error(Reason) -> nif_info() -> erlang:nif_error(undef). +nif_info(_SRef) -> + erlang:nif_error(undef). + +nif_command(_Command) -> + erlang:nif_error(undef). + nif_supports(_Key) -> erlang:nif_error(undef). |