From d5aecb115070de76cb42b44edee6bbcb5f4a3724 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Thu, 19 Apr 2018 11:45:00 +0200 Subject: [socket-nif] Corrected timout handling The timeout handling for accept and send was unnecessarily complex. --- erts/preloaded/src/socket.erl | 151 +++++++++++++++++++++--------------------- 1 file changed, 75 insertions(+), 76 deletions(-) (limited to 'erts/preloaded/src') diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 6784477123..0dadcecaa0 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -446,18 +446,16 @@ accept(Socket) -> accept(Socket, ?SOCKET_ACCEPT_TIMEOUT_DEFAULT). %% Do we really need this optimization? -accept(_, Timeout) when is_integer(Timeout) andalso (Timeout < 0) -> +accept(_, Timeout) when is_integer(Timeout) andalso (Timeout =< 0) -> {error, timeout}; accept({socket, SI, LSockRef}, Timeout) when is_integer(Timeout) orelse (Timeout =:= infinity) -> - Ref = make_ref(), - do_accept(LSockRef, SI, Ref, Timeout). + do_accept(LSockRef, SI, Timeout). -do_accept(_, _, _Ref, Timeout) when is_integer(Timeout) andalso (Timeout < 0) -> - {error, timeout}; -do_accept(LSockRef, SI, Ref, Timeout) -> - TS = timestamp(Timeout), - case nif_accept(LSockRef, Ref) of +do_accept(LSockRef, SI, Timeout) -> + TS = timestamp(Timeout), + AccRef = make_ref(), + case nif_accept(LSockRef, AccRef) of {ok, SockRef} -> SocketInfo = #{domain => maps:get(domain, SI), type => maps:get(type, SI), @@ -467,23 +465,15 @@ do_accept(LSockRef, SI, Ref, Timeout) -> {error, eagain} -> NewTimeout = next_timeout(TS, Timeout), receive - {select, LSockRef, Ref, ready_input} -> - do_accept(LSockRef, SI, make_ref(), next_timeout(TS, Timeout)) + {select, LSockRef, AccRef, ready_input} -> + do_accept(LSockRef, SI, next_timeout(TS, Timeout)) after NewTimeout -> - nif_cancel(LSockRef, accept, Ref), - flush_select_msgs(LSockRef, Ref), + nif_cancel(LSockRef, accept, AccRef), + flush_select_msgs(LSockRef, AccRef), {error, timeout} end end. -flush_select_msgs(LSRef, Ref) -> - receive - {select, LSRef, Ref, _} -> - flush_select_msgs(LSRef, Ref) - after 0 -> - ok - end. - %% =========================================================================== @@ -511,19 +501,11 @@ send(Socket, Data, Flags, Timeout) when is_list(Data) -> send(Socket, Bin, Flags, Timeout); send(Socket, Data, Flags, Timeout) when is_binary(Data) andalso is_list(Flags) -> EFlags = enc_send_flags(Flags), - do_send(Socket, make_ref(), Data, EFlags, Timeout). - -do_send(SockRef, SendRef, Data, _EFlags, Timeout) - when (Timeout =< 0) -> - %% - %% THIS IS THE WRONG SEND REF - %% WE SHOULD NOT HAVE THIS REF AS AN ARGUMENT - SEE RECV - %% - nif_cancel(SockRef, send, SendRef), - flush_select_msgs(SockRef, SendRef), - {error, {timeout, size(Data)}}; -do_send(SockRef, SendRef, Data, EFlags, Timeout) -> - TS = timestamp(Timeout), + do_send(Socket, Data, EFlags, Timeout). + +do_send(SockRef, Data, EFlags, Timeout) -> + TS = timestamp(Timeout), + SendRef = make_ref(), case nif_send(SockRef, SendRef, Data, EFlags) of ok -> ok; @@ -533,25 +515,25 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) -> receive {select, SockRef, SendRef, ready_output} when (Written > 0) -> <<_:Written/binary, Rest/binary>> = Data, - do_send(SockRef, make_ref(), Rest, EFlags, + do_send(SockRef, Rest, EFlags, next_timeout(TS, Timeout)); {select, SockRef, SendRef, ready_output} -> - do_send(SockRef, make_ref(), Data, EFlags, + do_send(SockRef, Data, EFlags, next_timeout(TS, Timeout)) after NewTimeout -> nif_cancel(SockRef, send, SendRef), flush_select_msgs(SockRef, SendRef), - {error, timeout} + {error, {timeout, size(Data)}} end; {error, eagain} -> receive {select, SockRef, SendRef, ready_output} -> - do_send(SockRef, SendRef, Data, EFlags, + do_send(SockRef, Data, EFlags, next_timeout(TS, Timeout)) after Timeout -> nif_cancel(SockRef, send, SendRef), flush_select_msgs(SockRef, SendRef), - {error, timeout} + {error, {timeout, size(Data)}} end; {error, _} = ERROR -> @@ -563,8 +545,6 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) -> %% --------------------------------------------------------------------------- %% -%% Do we need a timeout argument here also? -%% sendto(Socket, Data, Flags, DestAddr, DestPort) -> sendto(Socket, Data, Flags, DestAddr, DestPort, ?SOCKET_SENDTO_TIMEOUT_DEFAULT). @@ -589,46 +569,36 @@ sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) is_integer(DestPort) andalso (is_integer(Timeout) orelse (Timeout =:= infinity)) -> EFlags = enc_send_flags(Flags), - do_sendto(Socket, make_ref(), Data, EFlags, DestAddr, DestPort, Timeout). - -do_sendto(SockRef, SendRef, Data, _EFlags, _DestAddr, _DestPort, Timeout) - when (Timeout =< 0) -> - %% - %% THIS IS THE WRONG SEND REF - %% WE SHOULD NOT HAVE THIS REF AS AN ARGUMENT - SEE RECV - %% - nif_cancel(SockRef, sendto, SendRef), - flush_select_msgs(SockRef, SendRef), - {error, {timeout, size(Data)}}; -do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) -> - TS = timestamp(Timeout), + do_sendto(Socket, Data, EFlags, DestAddr, DestPort, Timeout). + +do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, Timeout) -> + TS = timestamp(Timeout), + SendRef = make_ref(), case nif_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort) of ok -> - {ok, next_timeout(TS, Timeout)}; + %% We are done + ok; + {ok, Written} -> %% We are partially done, wait for continuation receive {select, SockRef, SendRef, ready_output} when (Written > 0) -> <<_:Written/binary, Rest/binary>> = Data, - do_sendto(SockRef, make_ref(), Rest, EFlags, - DestAddr, DestPort, + do_sendto(SockRef, Rest, EFlags, DestAddr, DestPort, next_timeout(TS, Timeout)); {select, SockRef, SendRef, ready_output} -> - do_sendto(SockRef, make_ref(), Data, EFlags, - DestAddr, DestPort, + do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, next_timeout(TS, Timeout)) after Timeout -> nif_cancel(SockRef, sendto, SendRef), flush_select_msgs(SockRef, SendRef), {error, timeout} end; + {error, eagain} -> - %% Is this what we can expect? - %% If we have to wait because there is another ongoing write?? receive {select, SockRef, SendRef, ready_output} -> - do_sendto(SockRef, SendRef, Data, EFlags, - DestAddr, DestPort, + do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, next_timeout(TS, Timeout)) after Timeout -> nif_cancel(SockRef, sendto, SendRef), @@ -761,9 +731,13 @@ recv(Socket, Length, Flags, Timeout) is_list(Flags) andalso (is_integer(Timeout) orelse (Timeout =:= infinity)) -> EFlags = enc_recv_flags(Flags), - do_recv(Socket, Length, EFlags, <<>>, EFlags). + do_recv(Socket, undefined, Length, EFlags, <<>>, EFlags). -do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout) +%% We need to pass the "old recv ref" around because of the special case +%% with Length = 0. This case makes it neccessary to have a timeout function +%% clause since we may never wait for anything (no receive select), and so the +%% the only timeout check will be the function clause. +do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout) when (Timeout =:= infinity) orelse (is_integer(Timeout) andalso (Timeout > 0)) -> TS = timestamp(Timeout), @@ -780,7 +754,8 @@ do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout) %% > 0 - We got a part of the message and we will be notified %% when there is more to read (a select message) {ok, false = _Complete, Bin} when (Length =:= 0) -> - do_recv(Socket, Length, EFlags, + do_recv(Socket, RecvRef, + Length, EFlags, <>, next_timeout(TS, Timeout)); @@ -791,7 +766,8 @@ do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout) NewTimeout = next_timeout(TS, Timeout), receive {select, SockRef, RecvRef, ready_input} -> - do_recv(Socket, Length-size(Bin), EFlags, + do_recv(Socket, RecvRef, + Length-size(Bin), EFlags, Bin, next_timeout(TS, Timeout)) after NewTimeout -> @@ -805,7 +781,8 @@ do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout) NewTimeout = next_timeout(TS, Timeout), receive {select, SockRef, RecvRef, ready_input} -> - do_recv(Socket, Length-size(Bin), EFlags, + do_recv(Socket, RecvRef, + Length-size(Bin), EFlags, <>, next_timeout(TS, Timeout)) after NewTimeout -> @@ -824,7 +801,8 @@ do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout) NewTimeout = next_timeout(TS, Timeout), receive {select, SockRef, RecvRef, ready_input} -> - do_recv(Socket, Length, EFlags, + do_recv(Socket, RecvRef, + Length, EFlags, Acc, next_timeout(TS, Timeout)) after NewTimeout -> @@ -841,16 +819,26 @@ do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout) end; -do_recv({socket, _, SockRef} = _Socket, 0 = _Length, _Eflags, Acc, _Timeout) -> +do_recv({socket, _, SockRef} = _Socket, RecvRef, + 0 = _Length, _Eflags, Acc, _Timeout) -> %% The current recv operation is to be cancelled, so no need for a ref... %% The cancel will end our 'read everything you have' and "activate" - %% any waiting readers. - nif_cancel(SockRef, recv, undefined), + %% any waiting reader. + nif_cancel(SockRef, recv, RecvRef), {ok, Acc}; -do_recv(_Socket, _Length, _EFlags, Acc, _Timeout) -> - {error, {timeout, Acc}}. +do_recv(_Socket, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) -> + {error, {timeout, Acc}}; +do_recv(_Socket, _RecvRef, _Length, _EFlags, _Acc, _Timeout) -> + {error, timeout}. + +%% --------------------------------------------------------------------------- +%% + +recvfrom(Socket) -> + recvfrom(Socket, ?SOCKET_RECV_FLAGS_DEFAULT). + -spec recvfrom(Socket, Flags) -> {ok, Data, SrcAddr, SrcPort} | {error, Reason} when Socket :: socket(), Flags :: recv_flags(), @@ -859,14 +847,14 @@ do_recv(_Socket, _Length, _EFlags, Acc, _Timeout) -> SrcPort :: port_number(), Reason :: term(). -recvfrom(Socket) -> - recvfrom(Socket, ?SOCKET_RECV_FLAGS_DEFAULT). - recvfrom({socket, _, SockRef}, Flags) when is_list(Flags) -> EFlags = enc_recv_flags(Flags), nif_recvfrom(SockRef, EFlags). +%% --------------------------------------------------------------------------- +%% + %% -spec recvmsg(Socket, [out] MsgHdr, Flags) -> {ok, Data} | {error, Reason} when %% Socket :: socket(), %% MsgHdr :: msg_header(), @@ -883,6 +871,8 @@ recvfrom({socket, _, SockRef}, Flags) when is_list(Flags) -> +%% =========================================================================== +%% %% close - close a file descriptor -spec close(Socket) -> ok | {error, Reason} when @@ -1067,6 +1057,15 @@ dec_getopt_value(debug, B, _, _, _) when is_boolean(B) -> %% %% =========================================================================== +flush_select_msgs(LSRef, Ref) -> + receive + {select, LSRef, Ref, _} -> + flush_select_msgs(LSRef, Ref) + after 0 -> + ok + end. + + formated_timestamp() -> format_timestamp(os:timestamp()). -- cgit v1.2.3