aboutsummaryrefslogtreecommitdiffstats
path: root/erts/preloaded/src
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-04-19 10:57:54 +0200
committerMicael Karlberg <[email protected]>2018-09-18 13:01:37 +0200
commit28611d6e6daab8ae24e5e593c001bcd6442506eb (patch)
tree630ea2ff977cac8644a5929673f288b61041f295 /erts/preloaded/src
parent5920705deb70a44311e1b7552cfa73553f284164 (diff)
downloadotp-28611d6e6daab8ae24e5e593c001bcd6442506eb.tar.gz
otp-28611d6e6daab8ae24e5e593c001bcd6442506eb.tar.bz2
otp-28611d6e6daab8ae24e5e593c001bcd6442506eb.zip
[socket-nif] Completed recv
Need to fix the use of the request ref (ID) handling in previous functions.
Diffstat (limited to 'erts/preloaded/src')
-rw-r--r--erts/preloaded/src/socket.erl204
1 files changed, 164 insertions, 40 deletions
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 0a78feab4e..6784477123 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -38,7 +38,7 @@
%% sendmsg/4,
%% writev/4, OR SENDV? It will be strange for recv then: recvv (instead of readv)
- recv/1, recv/2,
+ recv/2, recv/3, recv/4,
recvfrom/1, recvfrom/2,
%% recvmsg/4,
%% readv/3,
@@ -173,11 +173,7 @@
-define(SOCKET_LISTEN_BACKLOG_DEFAULT, 5).
-%% Bit numbers (from right).
--define(SOCKET_ACCEPT_FLAG_NONBLOCK, 0).
--define(SOCKET_ACCEPT_FLAG_CLOEXEC, 1).
-
--define(SOCKET_ACCEPT_FLAGS_DEFAULT, []).
+-define(SOCKET_ACCEPT_TIMEOUT_DEFAULT, infinity).
-define(SOCKET_SEND_FLAG_CONFIRM, 0).
-define(SOCKET_SEND_FLAG_DONTROUTE, 1).
@@ -187,7 +183,9 @@
-define(SOCKET_SEND_FLAG_NOSIGNAL, 5).
-define(SOCKET_SEND_FLAG_OOB, 6).
--define(SOCKET_SEND_FLAGS_DEFAULT, []).
+-define(SOCKET_SEND_FLAGS_DEFAULT, []).
+-define(SOCKET_SEND_TIMEOUT_DEFAULT, infinity).
+-define(SOCKET_SENDTO_TIMEOUT_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT).
-define(SOCKET_RECV_FLAG_CMSG_CLOEXEC, 0).
-define(SOCKET_RECV_FLAG_DONTWAIT, 1).
@@ -197,7 +195,8 @@
-define(SOCKET_RECV_FLAG_TRUNC, 5).
-define(SOCKET_RECV_FLAG_WAITALL, 6).
--define(SOCKET_RECV_FLAGS_DEFAULT, []).
+-define(SOCKET_RECV_FLAGS_DEFAULT, []).
+-define(SOCKET_RECV_TIMEOUT_DEFAULT, infinity).
-define(SOCKET_SETOPT_KEY_DEBUG, 0).
@@ -403,7 +402,7 @@ connect({socket, _, SockRef}, Addr, Port, Timeout)
{select, SockRef, Ref, ready_output} ->
nif_finalize_connection(SockRef)
after NewTimeout ->
- nif_cancel(SockRef, Ref),
+ nif_cancel(SockRef, connect, Ref),
{error, timeout}
end;
{error, _} = ERROR ->
@@ -444,7 +443,7 @@ listen({socket, _, SockRef}, Backlog)
Reason :: term().
accept(Socket) ->
- accept(Socket, infinity).
+ accept(Socket, ?SOCKET_ACCEPT_TIMEOUT_DEFAULT).
%% Do we really need this optimization?
accept(_, Timeout) when is_integer(Timeout) andalso (Timeout < 0) ->
@@ -466,11 +465,12 @@ do_accept(LSockRef, SI, Ref, Timeout) ->
Socket = {socket, SocketInfo, SockRef},
{ok, Socket};
{error, eagain} ->
+ NewTimeout = next_timeout(TS, Timeout),
receive
{select, LSockRef, Ref, ready_input} ->
do_accept(LSockRef, SI, make_ref(), next_timeout(TS, Timeout))
- after Timeout ->
- nif_cancel(LSockRef, Ref),
+ after NewTimeout ->
+ nif_cancel(LSockRef, accept, Ref),
flush_select_msgs(LSockRef, Ref),
{error, timeout}
end
@@ -492,14 +492,13 @@ flush_select_msgs(LSRef, Ref) ->
%%
send(Socket, Data) ->
- send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, infinity).
+ send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT).
send(Socket, Data, Flags) when is_list(Flags) ->
- send(Socket, Data, Flags, infinity);
+ send(Socket, Data, Flags, ?SOCKET_SEND_TIMEOUT_DEFAULT);
send(Socket, Data, Timeout) ->
send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, Timeout).
-
-spec send(Socket, Data, Flags, Timeout) -> ok | {error, Reason} when
Socket :: socket(),
Data :: iodata(),
@@ -516,15 +515,20 @@ send(Socket, Data, Flags, Timeout) when is_binary(Data) andalso is_list(Flags)
do_send(SockRef, SendRef, Data, _EFlags, Timeout)
when (Timeout =< 0) ->
- nif_cancel(SockRef, SendRef),
+ %% <KOLLA>
+ %% THIS IS THE WRONG SEND REF
+ %% WE SHOULD NOT HAVE THIS REF AS AN ARGUMENT - SEE RECV
+ %% </KOLLA>
+ nif_cancel(SockRef, send, SendRef),
flush_select_msgs(SockRef, SendRef),
{error, {timeout, size(Data)}};
do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
TS = timestamp(Timeout),
case nif_send(SockRef, SendRef, Data, EFlags) of
ok ->
- {ok, next_timeout(TS, Timeout)};
+ ok;
{ok, Written} ->
+ NewTimeout = next_timeout(TS, Timeout),
%% We are partially done, wait for continuation
receive
{select, SockRef, SendRef, ready_output} when (Written > 0) ->
@@ -534,8 +538,8 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
{select, SockRef, SendRef, ready_output} ->
do_send(SockRef, make_ref(), Data, EFlags,
next_timeout(TS, Timeout))
- after Timeout ->
- nif_cancel(SockRef, SendRef),
+ after NewTimeout ->
+ nif_cancel(SockRef, send, SendRef),
flush_select_msgs(SockRef, SendRef),
{error, timeout}
end;
@@ -545,7 +549,7 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
do_send(SockRef, SendRef, Data, EFlags,
next_timeout(TS, Timeout))
after Timeout ->
- nif_cancel(SockRef, SendRef),
+ nif_cancel(SockRef, send, SendRef),
flush_select_msgs(SockRef, SendRef),
{error, timeout}
end;
@@ -563,7 +567,7 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
%%
sendto(Socket, Data, Flags, DestAddr, DestPort) ->
- sendto(Socket, Data, Flags, DestAddr, DestPort, infinity).
+ sendto(Socket, Data, Flags, DestAddr, DestPort, ?SOCKET_SENDTO_TIMEOUT_DEFAULT).
-spec sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) ->
ok | {error, Reason} when
@@ -589,12 +593,16 @@ sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout)
do_sendto(SockRef, SendRef, Data, _EFlags, _DestAddr, _DestPort, Timeout)
when (Timeout =< 0) ->
- nif_cancel(SockRef, SendRef),
+ %% <KOLLA>
+ %% THIS IS THE WRONG SEND REF
+ %% WE SHOULD NOT HAVE THIS REF AS AN ARGUMENT - SEE RECV
+ %% </KOLLA>
+ nif_cancel(SockRef, sendto, SendRef),
flush_select_msgs(SockRef, SendRef),
{error, {timeout, size(Data)}};
do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) ->
TS = timestamp(Timeout),
- case nif_sendto(SockRef, SendRef, Data, DestAddr, DestPort, EFlags) of
+ case nif_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort) of
ok ->
{ok, next_timeout(TS, Timeout)};
{ok, Written} ->
@@ -610,7 +618,7 @@ do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) ->
DestAddr, DestPort,
next_timeout(TS, Timeout))
after Timeout ->
- nif_cancel(SockRef, SendRef),
+ nif_cancel(SockRef, sendto, SendRef),
flush_select_msgs(SockRef, SendRef),
{error, timeout}
end;
@@ -623,7 +631,7 @@ do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) ->
DestAddr, DestPort,
next_timeout(TS, Timeout))
after Timeout ->
- nif_cancel(SockRef, SendRef),
+ nif_cancel(SockRef, sendto, SendRef),
flush_select_msgs(SockRef, SendRef),
{error, timeout}
end;
@@ -711,20 +719,136 @@ do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) ->
%%
%% recv, recvfrom, recvmsg - receive a message from a socket
%%
+%% Description:
+%% There is a special case for the argument Length. If its set to zero (0),
+%% it means "give me everything you have".
+%%
+%% Returns: {ok, Binary} | {error, Reason}
+%% Binary - The received data as a binary
+%% Reason - The error reason:
+%% timeout | {timeout, AccData} |
+%% posix() | {posix(), AccData} |
+%% atom() | {atom(), AccData}
+%% AccData - The data (as a binary) that we did manage to receive
+%% before the timeout.
+%%
+%% Arguments:
+%% Socket - The socket to read from.
+%% Length - The number of bytes to read.
+%% Flags - A list of "options" for the read.
+%% Timeout - Time-out in milliseconds.
+
+recv(Socket, Length) ->
+ recv(Socket, Length,
+ ?SOCKET_RECV_FLAGS_DEFAULT,
+ ?SOCKET_RECV_TIMEOUT_DEFAULT).
+
+recv(Socket, Length, Flags) when is_list(Flags) ->
+ recv(Socket, Length, Flags, ?SOCKET_RECV_TIMEOUT_DEFAULT);
+recv(Socket, Length, Timeout) ->
+ recv(Socket, Length, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout).
+
+-spec recv(Socket, Length, Flags, Timeout) -> {ok, Data} | {error, Reason} when
+ Socket :: socket(),
+ Length :: non_neg_integer(),
+ Flags :: recv_flags(),
+ Timeout :: timeout(),
+ Data :: binary(),
+ Reason :: term().
--spec recv(Socket, Flags) -> {ok, Data} | {error, Reason} when
- Socket :: socket(),
- Flags :: recv_flags(),
- Data :: binary(),
- Reason :: term().
+recv(Socket, Length, Flags, Timeout)
+ when (is_integer(Length) andalso (Length >= 0)) andalso
+ is_list(Flags) andalso
+ (is_integer(Timeout) orelse (Timeout =:= infinity)) ->
+ EFlags = enc_recv_flags(Flags),
+ do_recv(Socket, Length, EFlags, <<>>, EFlags).
+
+do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout)
+ when (Timeout =:= infinity) orelse
+ (is_integer(Timeout) andalso (Timeout > 0)) ->
+ TS = timestamp(Timeout),
+ RecvRef = make_ref(),
+ case nif_recv(SockRef, RecvRef, Length, EFlags) of
+ {ok, true = _Complete, Bin} when (size(Acc) =:= 0) ->
+ {ok, Bin};
+ {ok, true = _Complete, Bin} ->
+ {ok, <<Acc/binary, Bin/binary>>};
+
+ %% It depends on the amount of bytes we tried to read:
+ %% 0 - Read everything available
+ %% We got something, but there may be more - keep reading.
+ %% > 0 - We got a part of the message and we will be notified
+ %% when there is more to read (a select message)
+ {ok, false = _Complete, Bin} when (Length =:= 0) ->
+ do_recv(Socket, Length, EFlags,
+ <<Acc/binary, Bin/binary>>,
+ next_timeout(TS, Timeout));
+
+ {ok, false = _Completed, Bin} when (size(Acc) =:= 0) ->
+ %% We got the first chunk of it.
+ %% We will be notified (select message) when there
+ %% is more to read.
+ NewTimeout = next_timeout(TS, Timeout),
+ receive
+ {select, SockRef, RecvRef, ready_input} ->
+ do_recv(Socket, Length-size(Bin), EFlags,
+ Bin,
+ next_timeout(TS, Timeout))
+ after NewTimeout ->
+ nif_cancel(SockRef, recv, RecvRef),
+ flush_select_msgs(SockRef, RecvRef),
+ {error, {timeout, Acc}}
+ end;
+
+ {ok, false = _Completed, Bin} ->
+ %% We got a chunk of it!
+ NewTimeout = next_timeout(TS, Timeout),
+ receive
+ {select, SockRef, RecvRef, ready_input} ->
+ do_recv(Socket, Length-size(Bin), EFlags,
+ <<Acc/binary, Bin/binary>>,
+ next_timeout(TS, Timeout))
+ after NewTimeout ->
+ nif_cancel(SockRef, recv, RecvRef),
+ flush_select_msgs(SockRef, RecvRef),
+ {error, {timeout, Acc}}
+ end;
-recv(Socket) ->
- recv(Socket, ?SOCKET_RECV_FLAGS_DEFAULT).
+ %% We return with the accumulated binary regardless if its empty...
+ {error, eagain} when (Length =:= 0) ->
+ {ok, Acc};
-%% WE "may" need a timeout option here...
-recv({socket, _, SockRef}, Flags) when is_list(Flags) ->
- EFlags = enc_recv_flags(Flags),
- nif_recv(SockRef, EFlags).
+ {error, eagain} ->
+ %% There is nothing just now, but we will be notified when there
+ %% is something to read (a select message).
+ NewTimeout = next_timeout(TS, Timeout),
+ receive
+ {select, SockRef, RecvRef, ready_input} ->
+ do_recv(Socket, Length, EFlags,
+ Acc,
+ next_timeout(TS, Timeout))
+ after NewTimeout ->
+ nif_cancel(SockRef, recv, RecvRef),
+ flush_select_msgs(SockRef, RecvRef),
+ {error, timeout}
+ end;
+
+ {error, _} = ERROR when (size(Acc) =:= 0) ->
+ ERROR;
+
+ {error, Reason} ->
+ {error, {Reason, Acc}}
+
+ end;
+
+do_recv({socket, _, SockRef} = _Socket, 0 = _Length, _Eflags, Acc, _Timeout) ->
+ %% The current recv operation is to be cancelled, so no need for a ref...
+ %% The cancel will end our 'read everything you have' and "activate"
+ %% any waiting readers.
+ nif_cancel(SockRef, recv, undefined),
+ {ok, Acc};
+do_recv(_Socket, _Length, _EFlags, Acc, _Timeout) ->
+ {error, {timeout, Acc}}.
-spec recvfrom(Socket, Flags) -> {ok, Data, SrcAddr, SrcPort} | {error, Reason} when
@@ -980,9 +1104,9 @@ timestamp() ->
{A,B,C} = os:timestamp(),
A*1000000000+B*1000+(C div 1000).
-next_timeout(infinity = Timeout, _) ->
+next_timeout(_, infinity = Timeout) ->
Timeout;
-next_timeout(Timeout, TS) ->
+next_timeout(TS, Timeout) ->
NewTimeout = Timeout - tdiff(TS, timestamp()),
if
(NewTimeout > 0) ->
@@ -1033,13 +1157,13 @@ nif_send(_SockRef, _SendRef, _Data, _Flags) ->
nif_sendto(_SRef, _SendRef, _Data, _Flags, _Dest, _Port) ->
erlang:error(badarg).
-nif_recv(_SRef, _Flags) ->
+nif_recv(_SRef, _RecvRef, _Length, _Flags) ->
erlang:error(badarg).
nif_recvfrom(_SRef, _Flags) ->
erlang:error(badarg).
-nif_cancel(_SRef, _Ref) ->
+nif_cancel(_SRef, _Op, _Ref) ->
erlang:error(badarg).
nif_close(_SRef) ->