From 2d57ebfc6fb723a476fdcffbb366558a6fa18844 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Thu, 12 Apr 2018 18:24:00 +0200 Subject: [socket-nif] Completed send We still need to handle simultaneous ops. That is, handle if two different procs tries to send at the same time. Or a recv and send at the same time. Ops queue? --- erts/preloaded/src/socket.erl | 178 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 159 insertions(+), 19 deletions(-) (limited to 'erts/preloaded') diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index f3a3d493ac..985b45a956 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -33,13 +33,21 @@ listen/1, listen/2, accept/1, accept/2, - send/2, send/3, sendto/5, - recv/1, recv/2, recvfrom/1, recvfrom/2, + send/2, send/3, send/4, + sendto/5, + %% sendmsg/4, + %% writev/4, OR SENDV? It will be strange for recv then: recvv (instead of readv) + + recv/1, recv/2, + recvfrom/1, recvfrom/2, + %% recvmsg/4, + %% readv/3, close/1, setopt/3, getopt/2, + %% ????? formated_timestamp/0 ]). @@ -457,45 +465,103 @@ flush_select_msgs(LSRef, Ref) -> end. + %% =========================================================================== %% %% send, sendto, sendmsg - send a message on a socket %% --spec send(Socket, Data, Flags) -> ok | {error, Reason} when - Socket :: socket(), - Data :: binary(), - Flags :: send_flags(), - Reason :: term(). - send(Socket, Data) -> - send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT). + send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, infinity). -send({socket, _, SockRef}, Data, Flags) - when is_binary(Data) andalso is_list(Flags) -> +send(Socket, Data, Flags) when is_list(Flags) -> + send(Socket, Data, Flags, infinity); +send(Socket, Data, Timeout) -> + send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, Timeout). + + +-spec send(Socket, Data, Flags, Timeout) -> ok | {error, Reason} when + Socket :: socket(), + Data :: iodata(), + Flags :: send_flags(), + Timeout :: timeout(), + Reason :: term(). + +send(Socket, Data, Flags, Timeout) when is_list(Data) -> + Bin = erlang:list_to_binary(Data), + send(Socket, Bin, Flags, Timeout); +send(Socket, Data, Flags, Timeout) when is_binary(Data) andalso is_list(Flags) -> EFlags = enc_send_flags(Flags), - nif_send(SockRef, Data, EFlags). + do_send(Socket, make_ref(), Data, EFlags, Timeout). + +do_send(SockRef, SendRef, Data, _EFlags, Timeout) + when (Timeout =< 0) -> + nif_cancel(SockRef, SendRef), + flush_select_msgs(SockRef, SendRef), + {error, {timeout, size(Data)}}; +do_send(SockRef, SendRef, Data, EFlags, Timeout) -> + TS = timestamp(Timeout), + case nif_send(SockRef, SendRef, Data, EFlags) of + ok -> + {ok, next_timeout(TS, Timeout)}; + {ok, Written} -> + %% We are partially done, wait for continuation + receive + {select, SockRef, SendRef, ready_output} when (Written > 0) -> + <<_:Written/binary, Rest/binary>> = Data, + do_send(SockRef, make_ref(), Rest, EFlags, + next_timeout(TS, Timeout)); + {select, SockRef, SendRef, ready_output} -> + do_send(SockRef, make_ref(), Data, EFlags, + next_timeout(TS, Timeout)) + after Timeout -> + nif_cancel(SockRef, SendRef), + flush_select_msgs(SockRef, SendRef), + {error, timeout} + end; + {error, eagain} -> + receive + {select, SockRef, SendRef, ready_output} -> + do_send(SockRef, SendRef, Data, EFlags, + next_timeout(TS, Timeout)) + after Timeout -> + nif_cancel(SockRef, SendRef), + flush_select_msgs(SockRef, SendRef), + {error, timeout} + end; + + {error, _} = ERROR -> + ERROR + end. + + %% --------------------------------------------------------------------------- +%% +%% Do we need a timeout argument here also? +%% -spec sendto(Socket, Data, Flags, DestAddr, Port) -> ok | {error, Reason} when Socket :: socket(), Data :: binary(), Flags :: send_flags(), - DestAddr :: ip_address(), + DestAddr :: null | ip_address(), Port :: port_number(), Reason :: term(). sendto({socket, _, SockRef}, Data, Flags, DestAddr, DestPort) when is_binary(Data) andalso is_list(Flags) andalso - (is_tuple(DestAddr) andalso - ((size(DestAddr) =:= 4) orelse - (size(DestAddr) =:= 8))) andalso + ((is_tuple(DestAddr) andalso + ((size(DestAddr) =:= 4) orelse + (size(DestAddr) =:= 8))) orelse + (DestAddr =:= null)) andalso (is_integer(DestPort) andalso (DestPort >= 0)) -> + %% We may need something like send/4 above? EFlags = enc_send_flags(Flags), - nif_sendto(SockRef, Data, EFlags, DestAddr, DestPort). + nif_sendto(SockRef, make_ref(), Data, EFlags, DestAddr, DestPort). + %% --------------------------------------------------------------------------- @@ -508,7 +574,73 @@ sendto({socket, _, SockRef}, Data, Flags, DestAddr, DestPort) +%% =========================================================================== +%% +%% writev - write data into multiple buffers +%% + +%% send(Socket, Data, Flags, Timeout) +%% when (is_list(Data) orelse is_binary(Data)) andalso is_list(Flags) -> +%% IOVec = erlang:iolist_to_iovec(Data), +%% EFlags = enc_send_flags(Flags), +%% send_iovec(Socket, IOVec, EFlags, Timeout). + + +%% %% Iterate over the IO-vector (list of binaries). + +%% send_iovec(_Socket, [] = _IOVec, _EFlags, _Timeout) -> +%% ok; +%% send_iovec({socket, _, SockRef} = Socket, [Bin|IOVec], EFlags, Timeout) -> +%% case do_send(SockRef, make_ref(), Bin, EFlags, Timeout) of +%% {ok, NewTimeout} -> +%% send_iovec(Socket, IOVec, EFlags, NewTimeout); +%% {error, _} = ERROR -> +%% ERROR +%% end. + + +%% do_send(SockRef, SendRef, Data, _EFlags, Timeout) +%% when (Timeout < 0) -> +%% nif_cancel(SockRef, SendRef), +%% flush_select_msgs(SockRef, SendRef), +%% {error, {timeout, size(Data)}}; +%% do_send(SockRef, SendRef, Data, EFlags, Timeout) -> +%% TS = timestamp(Timeout), +%% case nif_send(SockRef, SendRef, Data, EFlags) of +%% ok -> +%% {ok, next_timeout(TS, Timeout)}; +%% {ok, Written} -> +%% %% We are partially done, wait for continuation +%% receive +%% {select, SockRef, SendRef, ready_output} -> +%% <<_:Written/binary, Rest/binary>> = Data, +%% do_send(SockRef, make_ref(), Rest, EFlags, +%% next_timeout(TS, Timeout)) +%% after Timeout -> +%% nif_cancel(SockRef, SendRef), +%% flush_select_msgs(SockRef, SendRef), +%% {error, timeout} +%% end; +%% {error, eagain} -> +%% receive +%% {select, SockRef, SendRef, ready_output} -> +%% do_send(SockRef, SendRef, Data, EFlags, +%% next_timeout(TS, Timeout)) +%% after Timeout -> +%% nif_cancel(SockRef, SendRef), +%% flush_select_msgs(SockRef, SendRef), +%% {error, timeout} +%% end; + +%% {error, _} = ERROR -> +%% ERROR +%% end. + + +%% =========================================================================== +%% %% recv, recvfrom, recvmsg - receive a message from a socket +%% -spec recv(Socket, Flags) -> {ok, Data} | {error, Reason} when Socket :: socket(), @@ -540,6 +672,7 @@ recvfrom({socket, _, SockRef}, Flags) when is_list(Flags) -> EFlags = enc_recv_flags(Flags), nif_recvfrom(SockRef, EFlags). + %% -spec recvmsg(Socket, [out] MsgHdr, Flags) -> {ok, Data} | {error, Reason} when %% Socket :: socket(), %% MsgHdr :: msg_header(), @@ -549,6 +682,13 @@ recvfrom({socket, _, SockRef}, Flags) when is_list(Flags) -> +%% =========================================================================== +%% +%% readv - read data into multiple buffers +%% + + + %% close - close a file descriptor -spec close(Socket) -> ok | {error, Reason} when @@ -817,10 +957,10 @@ nif_listen(_SRef, _Backlog) -> nif_accept(_SRef, _Ref) -> erlang:error(badarg). -nif_send(_SRef, _Data, _Flags) -> +nif_send(_SockRef, _SendRef, _Data, _Flags) -> erlang:error(badarg). -nif_sendto(_SRef, _Data, _Flags, _Dest, _Port) -> +nif_sendto(_SRef, _SendRef, _Data, _Flags, _Dest, _Port) -> erlang:error(badarg). nif_recv(_SRef, _Flags) -> -- cgit v1.2.3