diff options
author | Micael Karlberg <[email protected]> | 2019-04-29 11:30:43 +0200 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2019-04-29 11:30:43 +0200 |
commit | 87e748eda909272ab4c2178fdfe83bb74eef898b (patch) | |
tree | ff2d39774b4b5e04c1d8027217ccd3b22e125b20 /erts/preloaded/src | |
parent | 3fc3d282ee11ef974f426cfca867022084317370 (diff) | |
parent | 15f79a7522f61ee404d247f04f79188592b565df (diff) | |
download | otp-87e748eda909272ab4c2178fdfe83bb74eef898b.tar.gz otp-87e748eda909272ab4c2178fdfe83bb74eef898b.tar.bz2 otp-87e748eda909272ab4c2178fdfe83bb74eef898b.zip |
Merge branch 'bmk/erts/esock/20190417/new_select_api/OTP-15496'
Diffstat (limited to 'erts/preloaded/src')
-rw-r--r-- | erts/preloaded/src/socket.erl | 209 |
1 files changed, 70 insertions, 139 deletions
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 5c1647290d..126db66cdd 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -520,7 +520,7 @@ %% necessary adapt (increase) the buffer size until all of %% it fits. %% -%% Note that not all of these flags is useful for every recv function! +%% Note that not all of these flags are useful for every recv function! %% -type recv_flags() :: [recv_flag()]. -type recv_flag() :: cmsg_cloexec | @@ -531,7 +531,6 @@ -type shutdown_how() :: read | write | read_write. -%% These are just place-holder(s) - used by the sendmsg/recvmsg functions... -type msghdr_flag() :: ctrunc | eor | errqueue | oob | trunc. -type msghdr_flags() :: [msghdr_flag()]. -type msghdr() :: #{ @@ -586,6 +585,12 @@ #{level := integer(), type := integer(), data := binary()}. +%% This is used in messages sent from the nif-code to erlang processes: +%% +%% {?SOCKET_TAG, Socket :: socket(), Tag :: atom(), Info :: term()} +%% +-define(SOCKET_TAG, '$socket'). + -define(SOCKET_DOMAIN_LOCAL, 1). -define(SOCKET_DOMAIN_UNIX, ?SOCKET_DOMAIN_LOCAL). -define(SOCKET_DOMAIN_INET, 2). @@ -949,77 +954,50 @@ supports(_Key1, _Key2, _Key3) -> %% =========================================================================== %% -%% open - create an endpoint for communication -%% -%% Extra: netns -%% %% <KOLLA> %% %% How do we handle the case when an fd has been created (somehow) %% and we shall create a socket "from it". %% Can we figure out Domain, Type and Protocol from fd? -%% Yes we can: SO_DOMAIN, SO_PROTOCOL, SO_TYPE -%% But does that work on all platforms? Or shall we require that the -%% caller provide this explicitly? -%% +%% No we can't: For instance, its not possible to 'get' domain on FreeBSD. +%% +%% Instead, require: open(Domain, Stream, Proto, #{fd => FD}). +%% The last argument, Extra, is used to provide the fd. +%% %% </KOLLA> %% %% %% <KOLLA> %% -%% Start a controller process here, *before* the nif_open call. -%% If that call is successful, update with owner process (controlling -%% process) and SockRef. If the open fails, kill the process. -%% "Register" the process on success: -%% -%% nif_register(SockRef, self()). -%% -%% <ALSO> -%% -%% Maybe register the process under a name? -%% Something like: -%% -%% list_to_atom(lists:flatten(io_lib:format("socket-~p", [SockRef]))). -%% -%% </ALSO> +%% Possibly add a "registry" in the nif, allowing the user processes to +%% "register" themselves. +%% The point of this would be to ensure that these processes are +%% informed if the socket "terminates". Could possibly be used for +%% other things? If gen_tcp implements the active feature using +%% a reader process, the nif may need to know about this process, +%% since its probably "hidden" from the socket "owner" (someone +%% needs to handle it if it dies). +%% Register under a name? %% %% The nif sets up a monitor to this process, and if it dies the socket %% is closed. It is also used if someone wants to monitor the socket. %% -%% We therefor need monitor function(s): +%% We may therefor need monitor function(s): %% %% socket:monitor(Socket) %% socket:demonitor(Socket) %% -%% These are basically used to monitor the controller process. -%% Should the socket record therefor contain the pid of the controller process? -%% %% </KOLLA> %% -%% -spec open(FD) -> {ok, Socket} | {error, Reason} when -%% Socket :: socket(), -%% Reason :: term(). - -%% open(FD) -> -%% try -%% begin -%% case nif_open(FD) of -%% {ok, {SockRef, Domain, Type, Protocol}} -> -%% SocketInfo = #{domain => Domain, -%% type => Type, -%% protocol => Protocol}, -%% Socket = #socket{info = SocketInfo, -%% ref = SockRef}, -%% {ok, Socket}; -%% {error, _} = ERROR -> -%% ERROR -%% end -%% end -%% catch -%% _:_ -> % This must be improved!! -%% {error, einval} -%% end. + + +%% =========================================================================== +%% +%% open - create an endpoint for communication +%% +%% Extra: Currently only used for netns +%% -spec open(Domain, Type) -> {ok, Socket} | {error, Reason} when Domain :: domain(), @@ -1245,21 +1223,7 @@ connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout) %% Connecting... NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, Ref, ready_output} -> - %% <KOLLA> - %% - %% See open above!! - %% - %% * Here we should start and *register* the reader process - %% (This will cause the nif code to create a monitor to - %% the process) - %% * The reader is basically used to implement the active-X - %% feature! - %% * If the reader dies for whatever reason, then the socket - %% (resource) closes and the owner (controlling) process - %% is informed (closed message). - %% - %% </KOLLA> + {?SOCKET_TAG, #socket{ref = SockRef}, select, Ref} -> nif_finalize_connection(SockRef) after NewTimeout -> cancel(SockRef, connect, Ref), @@ -1325,16 +1289,6 @@ do_accept(LSockRef, Timeout) -> AccRef = make_ref(), case nif_accept(LSockRef, AccRef) of {ok, SockRef} -> - %% <KOLLA> - %% - %% * Here we should start and *register* the reader process - %% (This will cause the nif code to create a monitor to the process) - %% * The reader is basically used to implement the active-X feature! - %% * If the reader dies for whatever reason, then the socket (resource) - %% closes and the owner (controlling) process is informed (closed - %% message). - %% - %% </KOLLA> Socket = #socket{ref = SockRef}, {ok, Socket}; @@ -1344,10 +1298,10 @@ do_accept(LSockRef, Timeout) -> %% the receive. NewTimeout = next_timeout(TS, Timeout), receive - {select, LSockRef, AccRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = LSockRef}, select, AccRef} -> do_accept(LSockRef, next_timeout(TS, Timeout)); - {'$socket', _, abort, {AccRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {AccRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1416,15 +1370,17 @@ do_send(SockRef, Data, EFlags, Timeout) -> NewTimeout = next_timeout(TS, Timeout), %% We are partially done, wait for continuation receive - {select, SockRef, SendRef, ready_output} when (Written > 0) -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} + when (Written > 0) -> <<_:Written/binary, Rest/binary>> = Data, do_send(SockRef, Rest, EFlags, next_timeout(TS, Timeout)); - {select, SockRef, SendRef, ready_output} -> + + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_send(SockRef, Data, EFlags, next_timeout(TS, Timeout)); - {'$socket', _, abort, {SendRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1433,11 +1389,11 @@ do_send(SockRef, Data, EFlags, Timeout) -> end; {error, eagain} -> receive - {select, SockRef, SendRef, ready_output} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_send(SockRef, Data, EFlags, next_timeout(TS, Timeout)); - {'$socket', _, abort, {SendRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} -> {error, Reason} after Timeout -> @@ -1521,15 +1477,17 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> {ok, Written} -> %% We are partially done, wait for continuation receive - {select, SockRef, SendRef, ready_output} when (Written > 0) -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} + when (Written > 0) -> <<_:Written/binary, Rest/binary>> = Data, do_sendto(SockRef, Rest, Dest, EFlags, next_timeout(TS, Timeout)); - {select, SockRef, SendRef, ready_output} -> + + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_sendto(SockRef, Data, Dest, EFlags, next_timeout(TS, Timeout)); - {'$socket', _, abort, {SendRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} -> {error, Reason} after Timeout -> @@ -1539,11 +1497,11 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> {error, eagain} -> receive - {select, SockRef, SendRef, ready_output} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_sendto(SockRef, Data, Dest, EFlags, next_timeout(TS, Timeout)); - {'$socket', _, abort, {SendRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} -> {error, Reason} after Timeout -> @@ -1593,7 +1551,8 @@ sendmsg(Socket, MsgHdr, Timeout) sendmsg(Socket, MsgHdr, ?SOCKET_SENDMSG_FLAGS_DEFAULT, Timeout). --spec sendmsg(Socket, MsgHdr, Flags, Timeout) -> ok | {ok, Remaining} | {error, Reason} when +-spec sendmsg(Socket, MsgHdr, Flags, Timeout) -> + ok | {ok, Remaining} | {error, Reason} when Socket :: socket(), MsgHdr :: msghdr(), Flags :: send_flags(), @@ -1625,7 +1584,6 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> ok; {ok, Written} when is_integer(Written) andalso (Written > 0) -> - %% We should not retry here since the protocol may not %% be able to handle a message being split. Leave it to %% the caller to figure out (call again with the rest). @@ -1638,9 +1596,10 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> {error, eagain} -> receive - {select, SockRef, SendRef, ready_output} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_sendmsg(SockRef, MsgHdr, EFlags, next_timeout(TS, Timeout)) + after Timeout -> cancel(SockRef, sendmsg, SendRef), {error, timeout} @@ -1668,13 +1627,6 @@ ensure_msghdr(_) -> %% =========================================================================== %% -%% writev - write data into multiple buffers -%% - - - -%% =========================================================================== -%% %% recv, recvfrom, recvmsg - receive a message from a socket %% %% Description: @@ -1757,14 +1709,10 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) (is_integer(Timeout) andalso (Timeout > 0)) -> TS = timestamp(Timeout), RecvRef = make_ref(), - %% p("do_recv -> try read with" - %% "~n Length: ~p", [Length]), case nif_recv(SockRef, RecvRef, Length, EFlags) of {ok, true = _Complete, Bin} when (size(Acc) =:= 0) -> - %% p("do_recv -> complete success: ~w", [size(Bin)]), {ok, Bin}; {ok, true = _Complete, Bin} -> - %% p("do_recv -> completed success: ~w (~w)", [size(Bin), size(Acc)]), {ok, <<Acc/binary, Bin/binary>>}; %% It depends on the amount of bytes we tried to read: @@ -1773,7 +1721,6 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) %% > 0 - We got a part of the message and we will be notified %% when there is more to read (a select message) {ok, false = _Complete, Bin} when (Length =:= 0) -> - %% p("do_recv -> partial success: ~w", [size(Bin)]), do_recv(SockRef, RecvRef, Length, EFlags, <<Acc/binary, Bin/binary>>, @@ -1783,17 +1730,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) %% We got the first chunk of it. %% We will be notified (select message) when there %% is more to read. - %% p("do_recv -> partial success(~w): ~w" - %% "~n ~p", [Length, size(Bin), Bin]), NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recv(SockRef, RecvRef, Length-size(Bin), EFlags, Bin, next_timeout(TS, Timeout)); - {'$socket', _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1803,17 +1748,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) {ok, false = _Completed, Bin} -> %% We got a chunk of it! - %% p("do_recv -> partial success(~w): ~w (~w)", - %% [Length, size(Bin), size(Acc)]), NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recv(SockRef, RecvRef, Length-size(Bin), EFlags, <<Acc/binary, Bin/binary>>, next_timeout(TS, Timeout)); - {'$socket', _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1829,16 +1772,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) {error, eagain} -> %% There is nothing just now, but we will be notified when there %% is something to read (a select message). - %% p("do_recv -> eagain(~w): ~w", [Length, size(Acc)]), NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recv(SockRef, RecvRef, Length, EFlags, Acc, next_timeout(TS, Timeout)); - {'$socket', _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1887,7 +1829,7 @@ do_recv(_SockRef, _RecvRef, _Length, _EFlags, _Acc, _Timeout) -> %% It may be impossible to know what (buffer) size is appropriate %% "in advance", and in those cases it may be convenient to use the %% (recv) 'peek' flag. When this flag is provided the message is *not* -%% "consumed" from the underlying buffers, so another recvfrom call +%% "consumed" from the underlying (OS) buffers, so another recvfrom call %% is needed, possibly with a then adjusted buffer size. %% @@ -1973,11 +1915,11 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> %% is something to read (a select message). NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recvfrom(SockRef, BufSz, EFlags, next_timeout(TS, Timeout)); - {'$socket', _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1990,13 +1932,6 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> end. -%% pi(Item) -> -%% pi(self(), Item). - -%% pi(Pid, Item) -> -%% {Item, Info} = process_info(Pid, Item), -%% Info. - %% --------------------------------------------------------------------------- %% @@ -2077,11 +2012,11 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> %% is something to read (a select message). NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, next_timeout(TS, Timeout)); - {'$socket', _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -2100,12 +2035,6 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> -%% =========================================================================== -%% -%% readv - read data into multiple buffers -%% - - %% =========================================================================== %% @@ -2118,10 +2047,9 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> %% 1) nif_close + the socket_stop (nif) callback function %% This is for everything that can be done safely NON-BLOCKING. %% 2) nif_finalize_close which is executed by a *dirty* scheduler -%% Before we call the socket close function, we se the socket +%% Before we call the socket close function, we set the socket %% BLOCKING. Thereby linger is handled properly. - -spec close(Socket) -> ok | {error, Reason} when Socket :: socket(), Reason :: term(). @@ -2137,7 +2065,7 @@ do_close(SockRef) -> %% We must wait for the socket_stop callback function to %% complete its work receive - {'$socket', SockRef, close, CloseRef} -> + {?SOCKET_TAG, #socket{ref = SockRef}, close, CloseRef} -> nif_finalize_close(SockRef) end; {error, _} = ERROR -> @@ -2381,6 +2309,8 @@ which_protocol(SockRef) -> end. + + %% =========================================================================== %% %% sockname - return the current address of the socket. @@ -3499,7 +3429,7 @@ cancel(SockRef, Op, OpRef) -> flush_select_msgs(SockRef, Ref) -> receive - {select, SockRef, Ref, _} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, Ref} -> flush_select_msgs(SockRef, Ref) after 0 -> ok @@ -3568,8 +3498,9 @@ tdiff(T1, T2) -> %% p(undefined, F, A) -> %% p("***", F, A); %% p(SName, F, A) -> -%% io:format(user,"[~s,~p] " ++ F ++ "~n", [SName, self()|A]), -%% io:format("[~s,~p] " ++ F ++ "~n", [SName, self()|A]). +%% TS = formated_timestamp(), +%% io:format(user,"[~s][~s,~p] " ++ F ++ "~n", [TS, SName, self()|A]), +%% io:format("[~s][~s,~p] " ++ F ++ "~n", [TS, SName, self()|A]). |