diff options
Diffstat (limited to 'erts/preloaded/src/socket.erl')
-rw-r--r-- | erts/preloaded/src/socket.erl | 545 |
1 files changed, 415 insertions, 130 deletions
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 0f0d8f7a02..ae1ffdb4ac 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -53,10 +53,16 @@ getopt/3, sockname/1, - peername/1 + peername/1, + + cancel/2 ]). -export_type([ + select_tag/0, + select_ref/0, + select_info/0, + domain/0, type/0, protocol/0, @@ -585,6 +591,17 @@ #{level := integer(), type := integer(), data := binary()}. +-opaque select_tag() :: atom(). +-opaque select_ref() :: reference(). + +-record(select_info, {tag :: select_tag(), ref :: select_ref()}). + +-type select_info() :: #select_info{}. + +-define(SELECT_INFO(T, R), #select_info{tag = T, ref = R}). +-define(SELECT(T, R), {select, ?SELECT_INFO(T, R)}). + + %% This is used in messages sent from the nif-code to erlang processes: %% %% {?SOCKET_TAG, Socket :: socket(), Tag :: atom(), Info :: term()} @@ -1189,18 +1206,24 @@ validate_inet6_addrs(Addrs) -> %% -spec connect(Socket, SockAddr) -> ok | {error, Reason} when - Socket :: socket(), - SockAddr :: sockaddr(), - Reason :: term(). + Socket :: socket(), + SockAddr :: sockaddr(), + Reason :: term(). connect(Socket, SockAddr) -> connect(Socket, SockAddr, infinity). --spec connect(Socket, SockAddr, Timeout) -> ok | {error, Reason} when - Socket :: socket(), - SockAddr :: sockaddr(), - Timeout :: timeout(), - Reason :: term(). +-spec connect(Socket, SockAddr, nowait) -> + ok | {select, SelectInfo} | {error, Reason} when + Socket :: socket(), + SockAddr :: sockaddr(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, SockAddr, Timeout) -> ok | {error, Reason} when + Socket :: socket(), + SockAddr :: sockaddr(), + Timeout :: timeout(), + Reason :: term(). %% <KOLLA> %% Is it possible to connect with family = local for the (dest) sockaddr? @@ -1210,12 +1233,18 @@ connect(_Socket, _SockAddr, Timeout) {error, timeout}; connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout) when ((Fam =:= inet) orelse (Fam =:= inet6) orelse (Fam =:= local)) andalso - ((Timeout =:= infinity) orelse is_integer(Timeout)) -> + ((Timeout =:= nowait) orelse + (Timeout =:= infinity) orelse is_integer(Timeout)) -> TS = timestamp(Timeout), case nif_connect(SockRef, SockAddr) of ok -> %% Connected! ok; + + {ok, Ref} when (Timeout =:= nowait) -> + %% Connecting, but the caller does not want to wait... + ?SELECT(connect, Ref); + {ok, Ref} -> %% Connecting... NewTimeout = next_timeout(TS, Timeout), @@ -1268,17 +1297,27 @@ listen(#socket{ref = SockRef}, Backlog) accept(Socket) -> accept(Socket, ?SOCKET_ACCEPT_TIMEOUT_DEFAULT). --spec accept(LSocket, Timeout) -> {ok, Socket} | {error, Reason} when - LSocket :: socket(), - Timeout :: timeout(), - Socket :: socket(), - Reason :: term(). +-spec accept(LSocket, nowait) -> + {ok, Socket} | + {select, SelectInfo} | + {error, Reason} when + LSocket :: socket(), + Socket :: socket(), + SelectInfo :: select_info(), + Reason :: term() + ; (LSocket, Timeout) -> {ok, Socket} | {error, Reason} when + LSocket :: socket(), + Timeout :: timeout(), + Socket :: socket(), + Reason :: term(). %% Do we really need this optimization? accept(_, Timeout) when is_integer(Timeout) andalso (Timeout =< 0) -> {error, timeout}; accept(#socket{ref = LSockRef}, Timeout) - when is_integer(Timeout) orelse (Timeout =:= infinity) -> + when is_integer(Timeout) orelse + (Timeout =:= infinity) orelse + (Timeout =:= nowait) -> do_accept(LSockRef, Timeout). do_accept(LSockRef, Timeout) -> @@ -1289,6 +1328,11 @@ do_accept(LSockRef, Timeout) -> Socket = #socket{ref = SockRef}, {ok, Socket}; + + {error, eagain} when (Timeout =:= nowait) -> + ?SELECT(accept, AccRef); + + {error, eagain} -> %% Each call is non-blocking, but even then it takes %% *some* time, so just to be sure, recalculate before @@ -1327,33 +1371,56 @@ send(Socket, Data) -> send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT). -spec send(Socket, Data, Flags) -> ok | {error, Reason} when - Socket :: socket(), - Data :: iodata(), - Flags :: send_flags(), - Reason :: term() + Socket :: socket(), + Data :: iodata(), + Flags :: send_flags(), + Reason :: term() + ; (Socket, Data, Timeout :: nowait) -> ok | + {select, SelectInfo} | + {ok, {RestData, SelectInfo}} | + {error, Reason} when + Socket :: socket(), + Data :: iodata(), + RestData :: binary(), + SelectInfo :: select_info(), + Reason :: term() ; (Socket, Data, Timeout) -> ok | {error, Reason} when - Socket :: socket(), - Data :: iodata(), - Timeout :: timeout(), - Reason :: term(). + Socket :: socket(), + Data :: iodata(), + Timeout :: timeout(), + Reason :: term(). send(Socket, Data, Flags) when is_list(Flags) -> send(Socket, Data, Flags, ?SOCKET_SEND_TIMEOUT_DEFAULT); send(Socket, Data, Timeout) -> send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, Timeout). --spec send(Socket, Data, Flags, Timeout) -> ok | {error, Reason} when - Socket :: socket(), - Data :: iodata(), - Flags :: send_flags(), - Timeout :: timeout(), - Reason :: term(). +-spec send(Socket, Data, Flags, nowait) -> ok | + {select, SelectInfo} | + {ok, {RestData, SelectInfo}} | + {error, Reason} when + Socket :: socket(), + Data :: iodata(), + Flags :: send_flags(), + RestData :: binary(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, Data, Flags, Timeout) -> ok | {error, Reason} when + Socket :: socket(), + Data :: iodata(), + Flags :: send_flags(), + Timeout :: timeout(), + Reason :: term(). send(Socket, Data, Flags, Timeout) when is_list(Data) -> Bin = erlang:list_to_binary(Data), send(Socket, Bin, Flags, Timeout); send(#socket{ref = SockRef}, Data, Flags, Timeout) - when is_binary(Data) andalso is_list(Flags) -> + when is_binary(Data) andalso + is_list(Flags) andalso + ((Timeout =:= nowait) orelse + (Timeout =:= infinity) orelse + (is_integer(Timeout) andalso (Timeout > 0))) -> EFlags = enc_send_flags(Flags), do_send(SockRef, Data, EFlags, Timeout). @@ -1363,6 +1430,15 @@ do_send(SockRef, Data, EFlags, Timeout) -> case nif_send(SockRef, SendRef, Data, EFlags) of ok -> ok; + + + {ok, Written} when (Timeout =:= nowait) -> + <<_:Written/binary, Rest/binary>> = Data, + %% We are partially done, but the user don't want to wait (here) + %% for completion + {ok, {Rest, ?SELECT_INFO(send, SendRef)}}; + + {ok, Written} -> NewTimeout = next_timeout(TS, Timeout), %% We are partially done, wait for continuation @@ -1384,6 +1460,12 @@ do_send(SockRef, Data, EFlags, Timeout) -> cancel(SockRef, send, SendRef), {error, {timeout, size(Data)}} end; + + + {error, eagain} when (Timeout =:= nowait) -> + ?SELECT(send, SendRef); + + {error, eagain} -> receive {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> @@ -1419,17 +1501,25 @@ sendto(Socket, Data, Dest) -> sendto(Socket, Data, Dest, ?SOCKET_SENDTO_FLAGS_DEFAULT). -spec sendto(Socket, Data, Dest, Flags) -> ok | {error, Reason} when - Socket :: socket(), - Data :: binary(), - Dest :: null | sockaddr(), - Flags :: send_flags(), - Reason :: term() + Socket :: socket(), + Data :: binary(), + Dest :: null | sockaddr(), + Flags :: send_flags(), + Reason :: term() + ; (Socket, Data, Dest, Timeout :: nowait) -> ok | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + Data :: iodata(), + Dest :: null | sockaddr(), + SelectInfo :: select_info(), + Reason :: term() ; (Socket, Data, Dest, Timeout) -> ok | {error, Reason} when - Socket :: socket(), - Data :: iodata(), - Dest :: null | sockaddr(), - Timeout :: timeout(), - Reason :: term(). + Socket :: socket(), + Data :: iodata(), + Dest :: null | sockaddr(), + Timeout :: timeout(), + Reason :: term(). sendto(Socket, Data, Dest, Flags) when is_list(Flags) -> sendto(Socket, Data, Dest, Flags, ?SOCKET_SENDTO_TIMEOUT_DEFAULT); @@ -1437,13 +1527,22 @@ sendto(Socket, Data, Dest, Timeout) -> sendto(Socket, Data, Dest, ?SOCKET_SENDTO_FLAGS_DEFAULT, Timeout). --spec sendto(Socket, Data, Dest, Flags, Timeout) -> ok | {error, Reason} when - Socket :: socket(), - Data :: binary(), - Dest :: null | sockaddr(), - Flags :: send_flags(), - Timeout :: timeout(), - Reason :: term(). +-spec sendto(Socket, Data, Dest, Flags, nowait) -> ok | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + Data :: binary(), + Dest :: null | sockaddr(), + Flags :: send_flags(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, Data, Dest, Flags, Timeout) -> ok | {error, Reason} when + Socket :: socket(), + Data :: binary(), + Dest :: null | sockaddr(), + Flags :: send_flags(), + Timeout :: timeout(), + Reason :: term(). sendto(Socket, Data, Dest, Flags, Timeout) when is_list(Data) -> Bin = erlang:list_to_binary(Data), @@ -1452,14 +1551,18 @@ sendto(#socket{ref = SockRef}, Data, Dest, Flags, Timeout) when is_binary(Data) andalso (Dest =:= null) andalso is_list(Flags) andalso - (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + ((Timeout =:= nowait) orelse + (Timeout =:= infinity) orelse + (is_integer(Timeout) andalso (Timeout > 0))) -> EFlags = enc_send_flags(Flags), do_sendto(SockRef, Data, Dest, EFlags, Timeout); sendto(#socket{ref = SockRef}, Data, #{family := Fam} = Dest, Flags, Timeout) when is_binary(Data) andalso ((Fam =:= inet) orelse (Fam =:= inet6) orelse (Fam =:= local)) andalso is_list(Flags) andalso - (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + ((Timeout =:= nowait) orelse + (Timeout =:= infinity) orelse + (is_integer(Timeout) andalso (Timeout > 0))) -> EFlags = enc_send_flags(Flags), do_sendto(SockRef, Data, Dest, EFlags, Timeout). @@ -1471,6 +1574,11 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> %% We are done ok; + {ok, Written} when (Timeout =:= nowait) -> + <<_:Written/binary, Rest/binary>> = Data, + {ok, {Rest, ?SELECT_INFO(sendto, SendRef)}}; + + {ok, Written} -> %% We are partially done, wait for continuation receive @@ -1492,6 +1600,11 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> {error, timeout} end; + + {error, eagain} when (Timeout =:= nowait) -> + ?SELECT(sendto, SendRef); + + {error, eagain} -> receive {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> @@ -1535,11 +1648,18 @@ sendmsg(Socket, MsgHdr) -> MsgHdr :: msghdr(), Flags :: send_flags(), Reason :: term() + ; (Socket, MsgHdr, Timeout :: nowait) -> ok | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + MsgHdr :: msghdr(), + SelectInfo :: select_info(), + Reason :: term() ; (Socket, MsgHdr, Timeout) -> ok | {error, Reason} when - Socket :: socket(), - MsgHdr :: msghdr(), - Timeout :: timeout(), - Reason :: term(). + Socket :: socket(), + MsgHdr :: msghdr(), + Timeout :: timeout(), + Reason :: term(). sendmsg(Socket, MsgHdr, Flags) when is_list(Flags) -> sendmsg(Socket, MsgHdr, Flags, ?SOCKET_SENDMSG_TIMEOUT_DEFAULT); @@ -1548,19 +1668,34 @@ sendmsg(Socket, MsgHdr, Timeout) sendmsg(Socket, MsgHdr, ?SOCKET_SENDMSG_FLAGS_DEFAULT, Timeout). --spec sendmsg(Socket, MsgHdr, Flags, Timeout) -> - ok | {ok, Remaining} | {error, Reason} when - Socket :: socket(), - MsgHdr :: msghdr(), - Flags :: send_flags(), - Timeout :: timeout(), - Remaining :: erlang:iovec(), - Reason :: term(). +-spec sendmsg(Socket, MsgHdr, Flags, nowait) -> + ok | + {ok, Remaining} | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + MsgHdr :: msghdr(), + Flags :: send_flags(), + Remaining :: erlang:iovec(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, MsgHdr, Flags, Timeout) -> + ok | + {ok, Remaining} | + {error, Reason} when + Socket :: socket(), + MsgHdr :: msghdr(), + Flags :: send_flags(), + Timeout :: timeout(), + Remaining :: erlang:iovec(), + Reason :: term(). sendmsg(#socket{ref = SockRef}, #{iov := IOV} = MsgHdr, Flags, Timeout) when is_list(IOV) andalso is_list(Flags) andalso - (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + ((Timeout =:= nowait) orelse + (Timeout =:= infinity) orelse + (is_integer(Timeout) andalso (Timeout > 0))) -> try ensure_msghdr(MsgHdr) of M -> EFlags = enc_send_flags(Flags), @@ -1580,6 +1715,7 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> %% We are done ok; + {ok, Written} when is_integer(Written) andalso (Written > 0) -> %% We should not retry here since the protocol may not %% be able to handle a message being split. Leave it to @@ -1591,6 +1727,11 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> cancel(SockRef, sendmsg, SendRef), {ok, do_sendmsg_rest(maps:get(iov, MsgHdr), Written)}; + + {error, eagain} when (Timeout =:= nowait) -> + ?SELECT(sendmsg, SendRef); + + {error, eagain} -> receive {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> @@ -1664,13 +1805,24 @@ recv(Socket, Length) -> ?SOCKET_RECV_FLAGS_DEFAULT, ?SOCKET_RECV_TIMEOUT_DEFAULT). --spec recv(Socket, Length, Flags) -> {ok, Data} | {error, Reason} when +-spec recv(Socket, Length, Flags) -> {ok, Data} | + {error, Reason} when Socket :: socket(), Length :: non_neg_integer(), Flags :: recv_flags(), Data :: binary(), Reason :: term() - ; (Socket, Length, Timeout) -> {ok, Data} | {error, Reason} when + ; (Socket, Length, Timeout :: nowait) -> {ok, Data} | + {select, SelectInfo} | + {ok, {Data, SelectInfo}} | + {error, Reason} when + Socket :: socket(), + Length :: non_neg_integer(), + Data :: binary(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, Length, Timeout) -> {ok, Data} | + {error, Reason} when Socket :: socket(), Length :: non_neg_integer(), Timeout :: timeout(), @@ -1682,18 +1834,31 @@ recv(Socket, Length, Flags) when is_list(Flags) -> recv(Socket, Length, Timeout) -> recv(Socket, Length, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout). --spec recv(Socket, Length, Flags, Timeout) -> {ok, Data} | {error, Reason} when - Socket :: socket(), - Length :: non_neg_integer(), - Flags :: recv_flags(), - Timeout :: timeout(), - Data :: binary(), - Reason :: term(). +-spec recv(Socket, Length, Flags, nowait) -> {ok, Data} | + {select, SelectInfo} | + {ok, {Data, SelectInfo}} | + {error, Reason} when + Socket :: socket(), + Length :: non_neg_integer(), + Flags :: recv_flags(), + Data :: binary(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, Length, Flags, Timeout) -> {ok, Data} | + {error, Reason} when + Socket :: socket(), + Length :: non_neg_integer(), + Flags :: recv_flags(), + Timeout :: timeout(), + Data :: binary(), + Reason :: term(). recv(#socket{ref = SockRef}, Length, Flags, Timeout) when (is_integer(Length) andalso (Length >= 0)) andalso is_list(Flags) andalso - (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + (is_integer(Timeout) orelse + (Timeout =:= infinity) orelse + (Timeout =:= nowait)) -> EFlags = enc_recv_flags(Flags), do_recv(SockRef, undefined, Length, EFlags, <<>>, Timeout). @@ -1701,8 +1866,12 @@ recv(#socket{ref = SockRef}, Length, Flags, Timeout) %% with Length = 0. This case makes it neccessary to have a timeout function %% clause since we may never wait for anything (no receive select), and so the %% the only timeout check will be the function clause. +%% Note that the Timeout value of 'nowait' has a special meaning. It means +%% that we will either return with data or with the with {error, NNNN}. In +%% wich case the caller will receive a select message at some later time. do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) - when (Timeout =:= infinity) orelse + when (Timeout =:= nowait) orelse + (Timeout =:= infinity) orelse (is_integer(Timeout) andalso (Timeout > 0)) -> TS = timestamp(Timeout), RecvRef = make_ref(), @@ -1723,6 +1892,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) <<Acc/binary, Bin/binary>>, next_timeout(TS, Timeout)); + + %% Did not get all the user asked for, but the user also + %% specified 'nowait', so deliver what we got and the + %% select info. + {ok, false = _Completed, Bin} when (Timeout =:= nowait) andalso + (size(Acc) =:= 0) -> + {ok, {Bin, ?SELECT_INFO(recv, RecvRef)}}; + + {ok, false = _Completed, Bin} when (size(Acc) =:= 0) -> %% We got the first chunk of it. %% We will be notified (select message) when there @@ -1761,6 +1939,17 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) {error, {timeout, Acc}} end; + + %% The user does not want to wait! + %% The user will be informed that there is something to read + %% via the select socket message (see below). + + {error, eagain} when (Timeout =:= nowait) andalso (size(Acc) =:= 0) -> + ?SELECT(recv, RecvRef); + {error, eagain} when (Timeout =:= nowait) -> + {ok, {Acc, ?SELECT_INFO(recv, RecvRef)}}; + + %% We return with the accumulated binary (if its non-empty) {error, eagain} when (Length =:= 0) andalso (size(Acc) > 0) -> %% CAN WE REALLY DO THIS? THE NIF HAS SELECTED!! OR? @@ -1851,30 +2040,51 @@ recvfrom(Socket, BufSz) -> ?SOCKET_RECV_FLAGS_DEFAULT, ?SOCKET_RECV_TIMEOUT_DEFAULT). --spec recvfrom(Socket, Flags, Timeout) -> - {ok, {Source, Data}} | {error, Reason} when - Socket :: socket(), - Flags :: recv_flags(), - Timeout :: timeout(), - Source :: sockaddr() | undefined, - Data :: binary(), - Reason :: term() +-spec recvfrom(Socket, Flags, nowait) -> + {ok, {Source, Data}} | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + Flags :: recv_flags(), + Source :: sockaddr() | undefined, + Data :: binary(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, Flags, Timeout) -> + {ok, {Source, Data}} | + {error, Reason} when + Socket :: socket(), + Flags :: recv_flags(), + Timeout :: timeout(), + Source :: sockaddr() | undefined, + Data :: binary(), + Reason :: term() ; (Socket, BufSz, Flags) -> {ok, {Source, Data}} | {error, Reason} when - Socket :: socket(), - BufSz :: non_neg_integer(), - Flags :: recv_flags(), - Source :: sockaddr() | undefined, - Data :: binary(), - Reason :: term() + Socket :: socket(), + BufSz :: non_neg_integer(), + Flags :: recv_flags(), + Source :: sockaddr() | undefined, + Data :: binary(), + Reason :: term() + ; (Socket, BufSz, nowait) -> + {ok, {Source, Data}} | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + BufSz :: non_neg_integer(), + Source :: sockaddr() | undefined, + Data :: binary(), + SelectInfo :: select_info(), + Reason :: term() ; (Socket, BufSz, Timeout) -> {ok, {Source, Data}} | {error, Reason} when - Socket :: socket(), - BufSz :: non_neg_integer(), - Timeout :: timeout(), - Source :: sockaddr() | undefined, - Data :: binary(), - Reason :: term(). + Socket :: socket(), + BufSz :: non_neg_integer(), + Timeout :: timeout(), + Source :: sockaddr() | undefined, + Data :: binary(), + Reason :: term(). recvfrom(Socket, Flags, Timeout) when is_list(Flags) -> recvfrom(Socket, 0, Flags, Timeout); @@ -1883,20 +2093,34 @@ recvfrom(Socket, BufSz, Flags) when is_list(Flags) -> recvfrom(Socket, BufSz, Timeout) -> recvfrom(Socket, BufSz, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout). --spec recvfrom(Socket, BufSz, Flags, Timeout) -> - {ok, {Source, Data}} | {error, Reason} when - Socket :: socket(), - BufSz :: non_neg_integer(), - Flags :: recv_flags(), - Timeout :: timeout(), - Source :: sockaddr() | undefined, - Data :: binary(), - Reason :: term(). +-spec recvfrom(Socket, BufSz, Flags, nowait) -> + {ok, {Source, Data}} | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + BufSz :: non_neg_integer(), + Flags :: recv_flags(), + Source :: sockaddr() | undefined, + Data :: binary(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, BufSz, Flags, Timeout) -> + {ok, {Source, Data}} | + {error, Reason} when + Socket :: socket(), + BufSz :: non_neg_integer(), + Flags :: recv_flags(), + Timeout :: timeout(), + Source :: sockaddr() | undefined, + Data :: binary(), + Reason :: term(). recvfrom(#socket{ref = SockRef}, BufSz, Flags, Timeout) when (is_integer(BufSz) andalso (BufSz >= 0)) andalso is_list(Flags) andalso - (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + (is_integer(Timeout) orelse + (Timeout =:= infinity) orelse + (Timeout =:= nowait)) -> EFlags = enc_recv_flags(Flags), do_recvfrom(SockRef, BufSz, EFlags, Timeout). @@ -1907,6 +2131,11 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> {ok, {_Source, _NewData}} = OK -> OK; + + {error, eagain} when (Timeout =:= nowait) -> + ?SELECT(recvfrom, RecvRef); + + {error, eagain} -> %% There is nothing just now, but we will be notified when there %% is something to read (a select message). @@ -1947,29 +2176,44 @@ recvmsg(Socket) -> Flags :: recv_flags(), MsgHdr :: msghdr(), Reason :: term() + ; (Socket, Timeout :: nowait) -> {ok, MsgHdr} | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + MsgHdr :: msghdr(), + SelectInfo :: select_info(), + Reason :: term() ; (Socket, Timeout) -> {ok, MsgHdr} | {error, Reason} when - Socket :: socket(), - Timeout :: timeout(), - MsgHdr :: msghdr(), - Reason :: term(). + Socket :: socket(), + Timeout :: timeout(), + MsgHdr :: msghdr(), + Reason :: term(). recvmsg(Socket, Flags) when is_list(Flags) -> recvmsg(Socket, 0, 0, Flags, ?SOCKET_RECV_TIMEOUT_DEFAULT); recvmsg(Socket, Timeout) -> recvmsg(Socket, 0, 0, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout). --spec recvmsg(Socket, Flags, Timeout) -> {ok, MsgHdr} | {error, Reason} when - Socket :: socket(), - Flags :: recv_flags(), - Timeout :: timeout(), - MsgHdr :: msghdr(), - Reason :: term() +-spec recvmsg(Socket, Flags, nowait) -> {ok, MsgHdr} | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + Flags :: recv_flags(), + MsgHdr :: msghdr(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, Flags, Timeout) -> {ok, MsgHdr} | {error, Reason} when + Socket :: socket(), + Flags :: recv_flags(), + Timeout :: timeout(), + MsgHdr :: msghdr(), + Reason :: term() ; (Socket, BufSz, CtrlSz) -> {ok, MsgHdr} | {error, Reason} when - Socket :: socket(), - BufSz :: non_neg_integer(), - CtrlSz :: non_neg_integer(), - MsgHdr :: msghdr(), - Reason :: term(). + Socket :: socket(), + BufSz :: non_neg_integer(), + CtrlSz :: non_neg_integer(), + MsgHdr :: msghdr(), + Reason :: term(). recvmsg(Socket, Flags, Timeout) when is_list(Flags) -> recvmsg(Socket, 0, 0, Flags, Timeout); @@ -1980,20 +2224,34 @@ recvmsg(Socket, BufSz, CtrlSz) when is_integer(BufSz) andalso is_integer(CtrlSz) -spec recvmsg(Socket, BufSz, CtrlSz, - Flags, Timeout) -> {ok, MsgHdr} | {error, Reason} when - Socket :: socket(), - BufSz :: non_neg_integer(), - CtrlSz :: non_neg_integer(), - Flags :: recv_flags(), - Timeout :: timeout(), - MsgHdr :: msghdr(), - Reason :: term(). + Flags, nowait) -> {ok, MsgHdr} | + {select, SelectInfo} | + {error, Reason} when + Socket :: socket(), + BufSz :: non_neg_integer(), + CtrlSz :: non_neg_integer(), + Flags :: recv_flags(), + MsgHdr :: msghdr(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, + BufSz, CtrlSz, + Flags, Timeout) -> {ok, MsgHdr} | {error, Reason} when + Socket :: socket(), + BufSz :: non_neg_integer(), + CtrlSz :: non_neg_integer(), + Flags :: recv_flags(), + Timeout :: timeout(), + MsgHdr :: msghdr(), + Reason :: term(). recvmsg(#socket{ref = SockRef}, BufSz, CtrlSz, Flags, Timeout) when (is_integer(BufSz) andalso (BufSz >= 0)) andalso (is_integer(CtrlSz) andalso (CtrlSz >= 0)) andalso is_list(Flags) andalso - (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + (is_integer(Timeout) orelse + (Timeout =:= infinity) orelse + (Timeout =:= nowait)) -> EFlags = enc_recv_flags(Flags), do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout). @@ -2004,6 +2262,11 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> {ok, _MsgHdr} = OK -> OK; + + {error, eagain} when (Timeout =:= nowait) -> + ?SELECT(recvmsg, RecvRef); + + {error, eagain} -> %% There is nothing just now, but we will be notified when there %% is something to read (a select message). @@ -2339,6 +2602,25 @@ peername(#socket{ref = SockRef}) -> nif_peername(SockRef). +%% =========================================================================== +%% +%% cancel - cancel an operation resulting in a select +%% +%% A call to accept, recv/recvfrom/recvmsg and send/sendto/sendmsg +%% can result in a select if they are called with the Timeout argument +%% set to nowait. This is indicated by the return of the select-info. +%% Such a operation can be cancelled by calling this function. +%% + +-spec cancel(Socket, SelectInfo) -> ok | {error, Reason} when + Socket :: socket(), + SelectInfo :: select_info(), + Reason :: term(). + +cancel(#socket{ref = SockRef}, #select_info{tag = Tag, ref = Ref}) -> + cancel(SockRef, Tag, Ref). + + %% =========================================================================== %% @@ -3462,15 +3744,18 @@ flush_select_msgs(SockRef, Ref) -> %% A timestamp in ms +timestamp(nowait = T) -> + T; timestamp(infinity) -> undefined; timestamp(_) -> timestamp(). timestamp() -> - {A,B,C} = os:timestamp(), - A*1000000000+B*1000+(C div 1000). + erlang:monotonic_time(milli_seconds). +next_timeout(_, nowait = Timeout) -> + Timeout; next_timeout(_, infinity = Timeout) -> Timeout; next_timeout(TS, Timeout) -> |