aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/preloaded/src/socket.erl151
1 files changed, 75 insertions, 76 deletions
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 6784477123..0dadcecaa0 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -446,18 +446,16 @@ accept(Socket) ->
accept(Socket, ?SOCKET_ACCEPT_TIMEOUT_DEFAULT).
%% Do we really need this optimization?
-accept(_, Timeout) when is_integer(Timeout) andalso (Timeout < 0) ->
+accept(_, Timeout) when is_integer(Timeout) andalso (Timeout =< 0) ->
{error, timeout};
accept({socket, SI, LSockRef}, Timeout)
when is_integer(Timeout) orelse (Timeout =:= infinity) ->
- Ref = make_ref(),
- do_accept(LSockRef, SI, Ref, Timeout).
+ do_accept(LSockRef, SI, Timeout).
-do_accept(_, _, _Ref, Timeout) when is_integer(Timeout) andalso (Timeout < 0) ->
- {error, timeout};
-do_accept(LSockRef, SI, Ref, Timeout) ->
- TS = timestamp(Timeout),
- case nif_accept(LSockRef, Ref) of
+do_accept(LSockRef, SI, Timeout) ->
+ TS = timestamp(Timeout),
+ AccRef = make_ref(),
+ case nif_accept(LSockRef, AccRef) of
{ok, SockRef} ->
SocketInfo = #{domain => maps:get(domain, SI),
type => maps:get(type, SI),
@@ -467,23 +465,15 @@ do_accept(LSockRef, SI, Ref, Timeout) ->
{error, eagain} ->
NewTimeout = next_timeout(TS, Timeout),
receive
- {select, LSockRef, Ref, ready_input} ->
- do_accept(LSockRef, SI, make_ref(), next_timeout(TS, Timeout))
+ {select, LSockRef, AccRef, ready_input} ->
+ do_accept(LSockRef, SI, next_timeout(TS, Timeout))
after NewTimeout ->
- nif_cancel(LSockRef, accept, Ref),
- flush_select_msgs(LSockRef, Ref),
+ nif_cancel(LSockRef, accept, AccRef),
+ flush_select_msgs(LSockRef, AccRef),
{error, timeout}
end
end.
-flush_select_msgs(LSRef, Ref) ->
- receive
- {select, LSRef, Ref, _} ->
- flush_select_msgs(LSRef, Ref)
- after 0 ->
- ok
- end.
-
%% ===========================================================================
@@ -511,19 +501,11 @@ send(Socket, Data, Flags, Timeout) when is_list(Data) ->
send(Socket, Bin, Flags, Timeout);
send(Socket, Data, Flags, Timeout) when is_binary(Data) andalso is_list(Flags) ->
EFlags = enc_send_flags(Flags),
- do_send(Socket, make_ref(), Data, EFlags, Timeout).
-
-do_send(SockRef, SendRef, Data, _EFlags, Timeout)
- when (Timeout =< 0) ->
- %% <KOLLA>
- %% THIS IS THE WRONG SEND REF
- %% WE SHOULD NOT HAVE THIS REF AS AN ARGUMENT - SEE RECV
- %% </KOLLA>
- nif_cancel(SockRef, send, SendRef),
- flush_select_msgs(SockRef, SendRef),
- {error, {timeout, size(Data)}};
-do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
- TS = timestamp(Timeout),
+ do_send(Socket, Data, EFlags, Timeout).
+
+do_send(SockRef, Data, EFlags, Timeout) ->
+ TS = timestamp(Timeout),
+ SendRef = make_ref(),
case nif_send(SockRef, SendRef, Data, EFlags) of
ok ->
ok;
@@ -533,25 +515,25 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
receive
{select, SockRef, SendRef, ready_output} when (Written > 0) ->
<<_:Written/binary, Rest/binary>> = Data,
- do_send(SockRef, make_ref(), Rest, EFlags,
+ do_send(SockRef, Rest, EFlags,
next_timeout(TS, Timeout));
{select, SockRef, SendRef, ready_output} ->
- do_send(SockRef, make_ref(), Data, EFlags,
+ do_send(SockRef, Data, EFlags,
next_timeout(TS, Timeout))
after NewTimeout ->
nif_cancel(SockRef, send, SendRef),
flush_select_msgs(SockRef, SendRef),
- {error, timeout}
+ {error, {timeout, size(Data)}}
end;
{error, eagain} ->
receive
{select, SockRef, SendRef, ready_output} ->
- do_send(SockRef, SendRef, Data, EFlags,
+ do_send(SockRef, Data, EFlags,
next_timeout(TS, Timeout))
after Timeout ->
nif_cancel(SockRef, send, SendRef),
flush_select_msgs(SockRef, SendRef),
- {error, timeout}
+ {error, {timeout, size(Data)}}
end;
{error, _} = ERROR ->
@@ -563,8 +545,6 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
%% ---------------------------------------------------------------------------
%%
-%% Do we need a timeout argument here also?
-%%
sendto(Socket, Data, Flags, DestAddr, DestPort) ->
sendto(Socket, Data, Flags, DestAddr, DestPort, ?SOCKET_SENDTO_TIMEOUT_DEFAULT).
@@ -589,46 +569,36 @@ sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout)
is_integer(DestPort) andalso
(is_integer(Timeout) orelse (Timeout =:= infinity)) ->
EFlags = enc_send_flags(Flags),
- do_sendto(Socket, make_ref(), Data, EFlags, DestAddr, DestPort, Timeout).
-
-do_sendto(SockRef, SendRef, Data, _EFlags, _DestAddr, _DestPort, Timeout)
- when (Timeout =< 0) ->
- %% <KOLLA>
- %% THIS IS THE WRONG SEND REF
- %% WE SHOULD NOT HAVE THIS REF AS AN ARGUMENT - SEE RECV
- %% </KOLLA>
- nif_cancel(SockRef, sendto, SendRef),
- flush_select_msgs(SockRef, SendRef),
- {error, {timeout, size(Data)}};
-do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) ->
- TS = timestamp(Timeout),
+ do_sendto(Socket, Data, EFlags, DestAddr, DestPort, Timeout).
+
+do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, Timeout) ->
+ TS = timestamp(Timeout),
+ SendRef = make_ref(),
case nif_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort) of
ok ->
- {ok, next_timeout(TS, Timeout)};
+ %% We are done
+ ok;
+
{ok, Written} ->
%% We are partially done, wait for continuation
receive
{select, SockRef, SendRef, ready_output} when (Written > 0) ->
<<_:Written/binary, Rest/binary>> = Data,
- do_sendto(SockRef, make_ref(), Rest, EFlags,
- DestAddr, DestPort,
+ do_sendto(SockRef, Rest, EFlags, DestAddr, DestPort,
next_timeout(TS, Timeout));
{select, SockRef, SendRef, ready_output} ->
- do_sendto(SockRef, make_ref(), Data, EFlags,
- DestAddr, DestPort,
+ do_sendto(SockRef, Data, EFlags, DestAddr, DestPort,
next_timeout(TS, Timeout))
after Timeout ->
nif_cancel(SockRef, sendto, SendRef),
flush_select_msgs(SockRef, SendRef),
{error, timeout}
end;
+
{error, eagain} ->
- %% Is this what we can expect?
- %% If we have to wait because there is another ongoing write??
receive
{select, SockRef, SendRef, ready_output} ->
- do_sendto(SockRef, SendRef, Data, EFlags,
- DestAddr, DestPort,
+ do_sendto(SockRef, Data, EFlags, DestAddr, DestPort,
next_timeout(TS, Timeout))
after Timeout ->
nif_cancel(SockRef, sendto, SendRef),
@@ -761,9 +731,13 @@ recv(Socket, Length, Flags, Timeout)
is_list(Flags) andalso
(is_integer(Timeout) orelse (Timeout =:= infinity)) ->
EFlags = enc_recv_flags(Flags),
- do_recv(Socket, Length, EFlags, <<>>, EFlags).
+ do_recv(Socket, undefined, Length, EFlags, <<>>, EFlags).
-do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout)
+%% We need to pass the "old recv ref" around because of the special case
+%% with Length = 0. This case makes it neccessary to have a timeout function
+%% clause since we may never wait for anything (no receive select), and so the
+%% the only timeout check will be the function clause.
+do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout)
when (Timeout =:= infinity) orelse
(is_integer(Timeout) andalso (Timeout > 0)) ->
TS = timestamp(Timeout),
@@ -780,7 +754,8 @@ do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout)
%% > 0 - We got a part of the message and we will be notified
%% when there is more to read (a select message)
{ok, false = _Complete, Bin} when (Length =:= 0) ->
- do_recv(Socket, Length, EFlags,
+ do_recv(Socket, RecvRef,
+ Length, EFlags,
<<Acc/binary, Bin/binary>>,
next_timeout(TS, Timeout));
@@ -791,7 +766,8 @@ do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout)
NewTimeout = next_timeout(TS, Timeout),
receive
{select, SockRef, RecvRef, ready_input} ->
- do_recv(Socket, Length-size(Bin), EFlags,
+ do_recv(Socket, RecvRef,
+ Length-size(Bin), EFlags,
Bin,
next_timeout(TS, Timeout))
after NewTimeout ->
@@ -805,7 +781,8 @@ do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout)
NewTimeout = next_timeout(TS, Timeout),
receive
{select, SockRef, RecvRef, ready_input} ->
- do_recv(Socket, Length-size(Bin), EFlags,
+ do_recv(Socket, RecvRef,
+ Length-size(Bin), EFlags,
<<Acc/binary, Bin/binary>>,
next_timeout(TS, Timeout))
after NewTimeout ->
@@ -824,7 +801,8 @@ do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout)
NewTimeout = next_timeout(TS, Timeout),
receive
{select, SockRef, RecvRef, ready_input} ->
- do_recv(Socket, Length, EFlags,
+ do_recv(Socket, RecvRef,
+ Length, EFlags,
Acc,
next_timeout(TS, Timeout))
after NewTimeout ->
@@ -841,16 +819,26 @@ do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout)
end;
-do_recv({socket, _, SockRef} = _Socket, 0 = _Length, _Eflags, Acc, _Timeout) ->
+do_recv({socket, _, SockRef} = _Socket, RecvRef,
+ 0 = _Length, _Eflags, Acc, _Timeout) ->
%% The current recv operation is to be cancelled, so no need for a ref...
%% The cancel will end our 'read everything you have' and "activate"
- %% any waiting readers.
- nif_cancel(SockRef, recv, undefined),
+ %% any waiting reader.
+ nif_cancel(SockRef, recv, RecvRef),
{ok, Acc};
-do_recv(_Socket, _Length, _EFlags, Acc, _Timeout) ->
- {error, {timeout, Acc}}.
+do_recv(_Socket, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) ->
+ {error, {timeout, Acc}};
+do_recv(_Socket, _RecvRef, _Length, _EFlags, _Acc, _Timeout) ->
+ {error, timeout}.
+
+%% ---------------------------------------------------------------------------
+%%
+
+recvfrom(Socket) ->
+ recvfrom(Socket, ?SOCKET_RECV_FLAGS_DEFAULT).
+
-spec recvfrom(Socket, Flags) -> {ok, Data, SrcAddr, SrcPort} | {error, Reason} when
Socket :: socket(),
Flags :: recv_flags(),
@@ -859,14 +847,14 @@ do_recv(_Socket, _Length, _EFlags, Acc, _Timeout) ->
SrcPort :: port_number(),
Reason :: term().
-recvfrom(Socket) ->
- recvfrom(Socket, ?SOCKET_RECV_FLAGS_DEFAULT).
-
recvfrom({socket, _, SockRef}, Flags) when is_list(Flags) ->
EFlags = enc_recv_flags(Flags),
nif_recvfrom(SockRef, EFlags).
+%% ---------------------------------------------------------------------------
+%%
+
%% -spec recvmsg(Socket, [out] MsgHdr, Flags) -> {ok, Data} | {error, Reason} when
%% Socket :: socket(),
%% MsgHdr :: msg_header(),
@@ -883,6 +871,8 @@ recvfrom({socket, _, SockRef}, Flags) when is_list(Flags) ->
+%% ===========================================================================
+%%
%% close - close a file descriptor
-spec close(Socket) -> ok | {error, Reason} when
@@ -1067,6 +1057,15 @@ dec_getopt_value(debug, B, _, _, _) when is_boolean(B) ->
%%
%% ===========================================================================
+flush_select_msgs(LSRef, Ref) ->
+ receive
+ {select, LSRef, Ref, _} ->
+ flush_select_msgs(LSRef, Ref)
+ after 0 ->
+ ok
+ end.
+
+
formated_timestamp() ->
format_timestamp(os:timestamp()).