From 28611d6e6daab8ae24e5e593c001bcd6442506eb Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Thu, 19 Apr 2018 10:57:54 +0200 Subject: [socket-nif] Completed recv Need to fix the use of the request ref (ID) handling in previous functions. --- erts/preloaded/src/socket.erl | 204 +++++++++++++++++++++++++++++++++--------- 1 file changed, 164 insertions(+), 40 deletions(-) (limited to 'erts/preloaded/src') diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 0a78feab4e..6784477123 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -38,7 +38,7 @@ %% sendmsg/4, %% writev/4, OR SENDV? It will be strange for recv then: recvv (instead of readv) - recv/1, recv/2, + recv/2, recv/3, recv/4, recvfrom/1, recvfrom/2, %% recvmsg/4, %% readv/3, @@ -173,11 +173,7 @@ -define(SOCKET_LISTEN_BACKLOG_DEFAULT, 5). -%% Bit numbers (from right). --define(SOCKET_ACCEPT_FLAG_NONBLOCK, 0). --define(SOCKET_ACCEPT_FLAG_CLOEXEC, 1). - --define(SOCKET_ACCEPT_FLAGS_DEFAULT, []). +-define(SOCKET_ACCEPT_TIMEOUT_DEFAULT, infinity). -define(SOCKET_SEND_FLAG_CONFIRM, 0). -define(SOCKET_SEND_FLAG_DONTROUTE, 1). @@ -187,7 +183,9 @@ -define(SOCKET_SEND_FLAG_NOSIGNAL, 5). -define(SOCKET_SEND_FLAG_OOB, 6). --define(SOCKET_SEND_FLAGS_DEFAULT, []). +-define(SOCKET_SEND_FLAGS_DEFAULT, []). +-define(SOCKET_SEND_TIMEOUT_DEFAULT, infinity). +-define(SOCKET_SENDTO_TIMEOUT_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT). -define(SOCKET_RECV_FLAG_CMSG_CLOEXEC, 0). -define(SOCKET_RECV_FLAG_DONTWAIT, 1). @@ -197,7 +195,8 @@ -define(SOCKET_RECV_FLAG_TRUNC, 5). -define(SOCKET_RECV_FLAG_WAITALL, 6). --define(SOCKET_RECV_FLAGS_DEFAULT, []). +-define(SOCKET_RECV_FLAGS_DEFAULT, []). +-define(SOCKET_RECV_TIMEOUT_DEFAULT, infinity). -define(SOCKET_SETOPT_KEY_DEBUG, 0). @@ -403,7 +402,7 @@ connect({socket, _, SockRef}, Addr, Port, Timeout) {select, SockRef, Ref, ready_output} -> nif_finalize_connection(SockRef) after NewTimeout -> - nif_cancel(SockRef, Ref), + nif_cancel(SockRef, connect, Ref), {error, timeout} end; {error, _} = ERROR -> @@ -444,7 +443,7 @@ listen({socket, _, SockRef}, Backlog) Reason :: term(). accept(Socket) -> - accept(Socket, infinity). + accept(Socket, ?SOCKET_ACCEPT_TIMEOUT_DEFAULT). %% Do we really need this optimization? accept(_, Timeout) when is_integer(Timeout) andalso (Timeout < 0) -> @@ -466,11 +465,12 @@ do_accept(LSockRef, SI, Ref, Timeout) -> Socket = {socket, SocketInfo, SockRef}, {ok, Socket}; {error, eagain} -> + NewTimeout = next_timeout(TS, Timeout), receive {select, LSockRef, Ref, ready_input} -> do_accept(LSockRef, SI, make_ref(), next_timeout(TS, Timeout)) - after Timeout -> - nif_cancel(LSockRef, Ref), + after NewTimeout -> + nif_cancel(LSockRef, accept, Ref), flush_select_msgs(LSockRef, Ref), {error, timeout} end @@ -492,14 +492,13 @@ flush_select_msgs(LSRef, Ref) -> %% send(Socket, Data) -> - send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, infinity). + send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT). send(Socket, Data, Flags) when is_list(Flags) -> - send(Socket, Data, Flags, infinity); + send(Socket, Data, Flags, ?SOCKET_SEND_TIMEOUT_DEFAULT); send(Socket, Data, Timeout) -> send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, Timeout). - -spec send(Socket, Data, Flags, Timeout) -> ok | {error, Reason} when Socket :: socket(), Data :: iodata(), @@ -516,15 +515,20 @@ send(Socket, Data, Flags, Timeout) when is_binary(Data) andalso is_list(Flags) do_send(SockRef, SendRef, Data, _EFlags, Timeout) when (Timeout =< 0) -> - nif_cancel(SockRef, SendRef), + %% + %% THIS IS THE WRONG SEND REF + %% WE SHOULD NOT HAVE THIS REF AS AN ARGUMENT - SEE RECV + %% + nif_cancel(SockRef, send, SendRef), flush_select_msgs(SockRef, SendRef), {error, {timeout, size(Data)}}; do_send(SockRef, SendRef, Data, EFlags, Timeout) -> TS = timestamp(Timeout), case nif_send(SockRef, SendRef, Data, EFlags) of ok -> - {ok, next_timeout(TS, Timeout)}; + ok; {ok, Written} -> + NewTimeout = next_timeout(TS, Timeout), %% We are partially done, wait for continuation receive {select, SockRef, SendRef, ready_output} when (Written > 0) -> @@ -534,8 +538,8 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) -> {select, SockRef, SendRef, ready_output} -> do_send(SockRef, make_ref(), Data, EFlags, next_timeout(TS, Timeout)) - after Timeout -> - nif_cancel(SockRef, SendRef), + after NewTimeout -> + nif_cancel(SockRef, send, SendRef), flush_select_msgs(SockRef, SendRef), {error, timeout} end; @@ -545,7 +549,7 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) -> do_send(SockRef, SendRef, Data, EFlags, next_timeout(TS, Timeout)) after Timeout -> - nif_cancel(SockRef, SendRef), + nif_cancel(SockRef, send, SendRef), flush_select_msgs(SockRef, SendRef), {error, timeout} end; @@ -563,7 +567,7 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) -> %% sendto(Socket, Data, Flags, DestAddr, DestPort) -> - sendto(Socket, Data, Flags, DestAddr, DestPort, infinity). + sendto(Socket, Data, Flags, DestAddr, DestPort, ?SOCKET_SENDTO_TIMEOUT_DEFAULT). -spec sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) -> ok | {error, Reason} when @@ -589,12 +593,16 @@ sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) do_sendto(SockRef, SendRef, Data, _EFlags, _DestAddr, _DestPort, Timeout) when (Timeout =< 0) -> - nif_cancel(SockRef, SendRef), + %% + %% THIS IS THE WRONG SEND REF + %% WE SHOULD NOT HAVE THIS REF AS AN ARGUMENT - SEE RECV + %% + nif_cancel(SockRef, sendto, SendRef), flush_select_msgs(SockRef, SendRef), {error, {timeout, size(Data)}}; do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) -> TS = timestamp(Timeout), - case nif_sendto(SockRef, SendRef, Data, DestAddr, DestPort, EFlags) of + case nif_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort) of ok -> {ok, next_timeout(TS, Timeout)}; {ok, Written} -> @@ -610,7 +618,7 @@ do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) -> DestAddr, DestPort, next_timeout(TS, Timeout)) after Timeout -> - nif_cancel(SockRef, SendRef), + nif_cancel(SockRef, sendto, SendRef), flush_select_msgs(SockRef, SendRef), {error, timeout} end; @@ -623,7 +631,7 @@ do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) -> DestAddr, DestPort, next_timeout(TS, Timeout)) after Timeout -> - nif_cancel(SockRef, SendRef), + nif_cancel(SockRef, sendto, SendRef), flush_select_msgs(SockRef, SendRef), {error, timeout} end; @@ -711,20 +719,136 @@ do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) -> %% %% recv, recvfrom, recvmsg - receive a message from a socket %% +%% Description: +%% There is a special case for the argument Length. If its set to zero (0), +%% it means "give me everything you have". +%% +%% Returns: {ok, Binary} | {error, Reason} +%% Binary - The received data as a binary +%% Reason - The error reason: +%% timeout | {timeout, AccData} | +%% posix() | {posix(), AccData} | +%% atom() | {atom(), AccData} +%% AccData - The data (as a binary) that we did manage to receive +%% before the timeout. +%% +%% Arguments: +%% Socket - The socket to read from. +%% Length - The number of bytes to read. +%% Flags - A list of "options" for the read. +%% Timeout - Time-out in milliseconds. + +recv(Socket, Length) -> + recv(Socket, Length, + ?SOCKET_RECV_FLAGS_DEFAULT, + ?SOCKET_RECV_TIMEOUT_DEFAULT). + +recv(Socket, Length, Flags) when is_list(Flags) -> + recv(Socket, Length, Flags, ?SOCKET_RECV_TIMEOUT_DEFAULT); +recv(Socket, Length, Timeout) -> + recv(Socket, Length, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout). + +-spec recv(Socket, Length, Flags, Timeout) -> {ok, Data} | {error, Reason} when + Socket :: socket(), + Length :: non_neg_integer(), + Flags :: recv_flags(), + Timeout :: timeout(), + Data :: binary(), + Reason :: term(). --spec recv(Socket, Flags) -> {ok, Data} | {error, Reason} when - Socket :: socket(), - Flags :: recv_flags(), - Data :: binary(), - Reason :: term(). +recv(Socket, Length, Flags, Timeout) + when (is_integer(Length) andalso (Length >= 0)) andalso + is_list(Flags) andalso + (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + EFlags = enc_recv_flags(Flags), + do_recv(Socket, Length, EFlags, <<>>, EFlags). + +do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout) + when (Timeout =:= infinity) orelse + (is_integer(Timeout) andalso (Timeout > 0)) -> + TS = timestamp(Timeout), + RecvRef = make_ref(), + case nif_recv(SockRef, RecvRef, Length, EFlags) of + {ok, true = _Complete, Bin} when (size(Acc) =:= 0) -> + {ok, Bin}; + {ok, true = _Complete, Bin} -> + {ok, <>}; + + %% It depends on the amount of bytes we tried to read: + %% 0 - Read everything available + %% We got something, but there may be more - keep reading. + %% > 0 - We got a part of the message and we will be notified + %% when there is more to read (a select message) + {ok, false = _Complete, Bin} when (Length =:= 0) -> + do_recv(Socket, Length, EFlags, + <>, + next_timeout(TS, Timeout)); + + {ok, false = _Completed, Bin} when (size(Acc) =:= 0) -> + %% We got the first chunk of it. + %% We will be notified (select message) when there + %% is more to read. + NewTimeout = next_timeout(TS, Timeout), + receive + {select, SockRef, RecvRef, ready_input} -> + do_recv(Socket, Length-size(Bin), EFlags, + Bin, + next_timeout(TS, Timeout)) + after NewTimeout -> + nif_cancel(SockRef, recv, RecvRef), + flush_select_msgs(SockRef, RecvRef), + {error, {timeout, Acc}} + end; + + {ok, false = _Completed, Bin} -> + %% We got a chunk of it! + NewTimeout = next_timeout(TS, Timeout), + receive + {select, SockRef, RecvRef, ready_input} -> + do_recv(Socket, Length-size(Bin), EFlags, + <>, + next_timeout(TS, Timeout)) + after NewTimeout -> + nif_cancel(SockRef, recv, RecvRef), + flush_select_msgs(SockRef, RecvRef), + {error, {timeout, Acc}} + end; -recv(Socket) -> - recv(Socket, ?SOCKET_RECV_FLAGS_DEFAULT). + %% We return with the accumulated binary regardless if its empty... + {error, eagain} when (Length =:= 0) -> + {ok, Acc}; -%% WE "may" need a timeout option here... -recv({socket, _, SockRef}, Flags) when is_list(Flags) -> - EFlags = enc_recv_flags(Flags), - nif_recv(SockRef, EFlags). + {error, eagain} -> + %% There is nothing just now, but we will be notified when there + %% is something to read (a select message). + NewTimeout = next_timeout(TS, Timeout), + receive + {select, SockRef, RecvRef, ready_input} -> + do_recv(Socket, Length, EFlags, + Acc, + next_timeout(TS, Timeout)) + after NewTimeout -> + nif_cancel(SockRef, recv, RecvRef), + flush_select_msgs(SockRef, RecvRef), + {error, timeout} + end; + + {error, _} = ERROR when (size(Acc) =:= 0) -> + ERROR; + + {error, Reason} -> + {error, {Reason, Acc}} + + end; + +do_recv({socket, _, SockRef} = _Socket, 0 = _Length, _Eflags, Acc, _Timeout) -> + %% The current recv operation is to be cancelled, so no need for a ref... + %% The cancel will end our 'read everything you have' and "activate" + %% any waiting readers. + nif_cancel(SockRef, recv, undefined), + {ok, Acc}; +do_recv(_Socket, _Length, _EFlags, Acc, _Timeout) -> + {error, {timeout, Acc}}. -spec recvfrom(Socket, Flags) -> {ok, Data, SrcAddr, SrcPort} | {error, Reason} when @@ -980,9 +1104,9 @@ timestamp() -> {A,B,C} = os:timestamp(), A*1000000000+B*1000+(C div 1000). -next_timeout(infinity = Timeout, _) -> +next_timeout(_, infinity = Timeout) -> Timeout; -next_timeout(Timeout, TS) -> +next_timeout(TS, Timeout) -> NewTimeout = Timeout - tdiff(TS, timestamp()), if (NewTimeout > 0) -> @@ -1033,13 +1157,13 @@ nif_send(_SockRef, _SendRef, _Data, _Flags) -> nif_sendto(_SRef, _SendRef, _Data, _Flags, _Dest, _Port) -> erlang:error(badarg). -nif_recv(_SRef, _Flags) -> +nif_recv(_SRef, _RecvRef, _Length, _Flags) -> erlang:error(badarg). nif_recvfrom(_SRef, _Flags) -> erlang:error(badarg). -nif_cancel(_SRef, _Ref) -> +nif_cancel(_SRef, _Op, _Ref) -> erlang:error(badarg). nif_close(_SRef) -> -- cgit v1.2.3