aboutsummaryrefslogtreecommitdiffstats
path: root/erts/preloaded/src
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2019-04-29 11:30:43 +0200
committerMicael Karlberg <[email protected]>2019-04-29 11:30:43 +0200
commit87e748eda909272ab4c2178fdfe83bb74eef898b (patch)
treeff2d39774b4b5e04c1d8027217ccd3b22e125b20 /erts/preloaded/src
parent3fc3d282ee11ef974f426cfca867022084317370 (diff)
parent15f79a7522f61ee404d247f04f79188592b565df (diff)
downloadotp-87e748eda909272ab4c2178fdfe83bb74eef898b.tar.gz
otp-87e748eda909272ab4c2178fdfe83bb74eef898b.tar.bz2
otp-87e748eda909272ab4c2178fdfe83bb74eef898b.zip
Merge branch 'bmk/erts/esock/20190417/new_select_api/OTP-15496'
Diffstat (limited to 'erts/preloaded/src')
-rw-r--r--erts/preloaded/src/socket.erl209
1 files changed, 70 insertions, 139 deletions
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 5c1647290d..126db66cdd 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -520,7 +520,7 @@
%% necessary adapt (increase) the buffer size until all of
%% it fits.
%%
-%% Note that not all of these flags is useful for every recv function!
+%% Note that not all of these flags are useful for every recv function!
%%
-type recv_flags() :: [recv_flag()].
-type recv_flag() :: cmsg_cloexec |
@@ -531,7 +531,6 @@
-type shutdown_how() :: read | write | read_write.
-%% These are just place-holder(s) - used by the sendmsg/recvmsg functions...
-type msghdr_flag() :: ctrunc | eor | errqueue | oob | trunc.
-type msghdr_flags() :: [msghdr_flag()].
-type msghdr() :: #{
@@ -586,6 +585,12 @@
#{level := integer(), type := integer(), data := binary()}.
+%% This is used in messages sent from the nif-code to erlang processes:
+%%
+%% {?SOCKET_TAG, Socket :: socket(), Tag :: atom(), Info :: term()}
+%%
+-define(SOCKET_TAG, '$socket').
+
-define(SOCKET_DOMAIN_LOCAL, 1).
-define(SOCKET_DOMAIN_UNIX, ?SOCKET_DOMAIN_LOCAL).
-define(SOCKET_DOMAIN_INET, 2).
@@ -949,77 +954,50 @@ supports(_Key1, _Key2, _Key3) ->
%% ===========================================================================
%%
-%% open - create an endpoint for communication
-%%
-%% Extra: netns
-%%
%% <KOLLA>
%%
%% How do we handle the case when an fd has been created (somehow)
%% and we shall create a socket "from it".
%% Can we figure out Domain, Type and Protocol from fd?
-%% Yes we can: SO_DOMAIN, SO_PROTOCOL, SO_TYPE
-%% But does that work on all platforms? Or shall we require that the
-%% caller provide this explicitly?
-%%
+%% No we can't: For instance, its not possible to 'get' domain on FreeBSD.
+%%
+%% Instead, require: open(Domain, Stream, Proto, #{fd => FD}).
+%% The last argument, Extra, is used to provide the fd.
+%%
%% </KOLLA>
%%
%%
%% <KOLLA>
%%
-%% Start a controller process here, *before* the nif_open call.
-%% If that call is successful, update with owner process (controlling
-%% process) and SockRef. If the open fails, kill the process.
-%% "Register" the process on success:
-%%
-%% nif_register(SockRef, self()).
-%%
-%% <ALSO>
-%%
-%% Maybe register the process under a name?
-%% Something like:
-%%
-%% list_to_atom(lists:flatten(io_lib:format("socket-~p", [SockRef]))).
-%%
-%% </ALSO>
+%% Possibly add a "registry" in the nif, allowing the user processes to
+%% "register" themselves.
+%% The point of this would be to ensure that these processes are
+%% informed if the socket "terminates". Could possibly be used for
+%% other things? If gen_tcp implements the active feature using
+%% a reader process, the nif may need to know about this process,
+%% since its probably "hidden" from the socket "owner" (someone
+%% needs to handle it if it dies).
+%% Register under a name?
%%
%% The nif sets up a monitor to this process, and if it dies the socket
%% is closed. It is also used if someone wants to monitor the socket.
%%
-%% We therefor need monitor function(s):
+%% We may therefor need monitor function(s):
%%
%% socket:monitor(Socket)
%% socket:demonitor(Socket)
%%
-%% These are basically used to monitor the controller process.
-%% Should the socket record therefor contain the pid of the controller process?
-%%
%% </KOLLA>
%%
-%% -spec open(FD) -> {ok, Socket} | {error, Reason} when
-%% Socket :: socket(),
-%% Reason :: term().
-
-%% open(FD) ->
-%% try
-%% begin
-%% case nif_open(FD) of
-%% {ok, {SockRef, Domain, Type, Protocol}} ->
-%% SocketInfo = #{domain => Domain,
-%% type => Type,
-%% protocol => Protocol},
-%% Socket = #socket{info = SocketInfo,
-%% ref = SockRef},
-%% {ok, Socket};
-%% {error, _} = ERROR ->
-%% ERROR
-%% end
-%% end
-%% catch
-%% _:_ -> % This must be improved!!
-%% {error, einval}
-%% end.
+
+
+%% ===========================================================================
+%%
+%% open - create an endpoint for communication
+%%
+%% Extra: Currently only used for netns
+%%
-spec open(Domain, Type) -> {ok, Socket} | {error, Reason} when
Domain :: domain(),
@@ -1245,21 +1223,7 @@ connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout)
%% Connecting...
NewTimeout = next_timeout(TS, Timeout),
receive
- {select, SockRef, Ref, ready_output} ->
- %% <KOLLA>
- %%
- %% See open above!!
- %%
- %% * Here we should start and *register* the reader process
- %% (This will cause the nif code to create a monitor to
- %% the process)
- %% * The reader is basically used to implement the active-X
- %% feature!
- %% * If the reader dies for whatever reason, then the socket
- %% (resource) closes and the owner (controlling) process
- %% is informed (closed message).
- %%
- %% </KOLLA>
+ {?SOCKET_TAG, #socket{ref = SockRef}, select, Ref} ->
nif_finalize_connection(SockRef)
after NewTimeout ->
cancel(SockRef, connect, Ref),
@@ -1325,16 +1289,6 @@ do_accept(LSockRef, Timeout) ->
AccRef = make_ref(),
case nif_accept(LSockRef, AccRef) of
{ok, SockRef} ->
- %% <KOLLA>
- %%
- %% * Here we should start and *register* the reader process
- %% (This will cause the nif code to create a monitor to the process)
- %% * The reader is basically used to implement the active-X feature!
- %% * If the reader dies for whatever reason, then the socket (resource)
- %% closes and the owner (controlling) process is informed (closed
- %% message).
- %%
- %% </KOLLA>
Socket = #socket{ref = SockRef},
{ok, Socket};
@@ -1344,10 +1298,10 @@ do_accept(LSockRef, Timeout) ->
%% the receive.
NewTimeout = next_timeout(TS, Timeout),
receive
- {select, LSockRef, AccRef, ready_input} ->
+ {?SOCKET_TAG, #socket{ref = LSockRef}, select, AccRef} ->
do_accept(LSockRef, next_timeout(TS, Timeout));
- {'$socket', _, abort, {AccRef, Reason}} ->
+ {?SOCKET_TAG, _Socket, abort, {AccRef, Reason}} ->
{error, Reason}
after NewTimeout ->
@@ -1416,15 +1370,17 @@ do_send(SockRef, Data, EFlags, Timeout) ->
NewTimeout = next_timeout(TS, Timeout),
%% We are partially done, wait for continuation
receive
- {select, SockRef, SendRef, ready_output} when (Written > 0) ->
+ {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef}
+ when (Written > 0) ->
<<_:Written/binary, Rest/binary>> = Data,
do_send(SockRef, Rest, EFlags,
next_timeout(TS, Timeout));
- {select, SockRef, SendRef, ready_output} ->
+
+ {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} ->
do_send(SockRef, Data, EFlags,
next_timeout(TS, Timeout));
- {'$socket', _, abort, {SendRef, Reason}} ->
+ {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} ->
{error, Reason}
after NewTimeout ->
@@ -1433,11 +1389,11 @@ do_send(SockRef, Data, EFlags, Timeout) ->
end;
{error, eagain} ->
receive
- {select, SockRef, SendRef, ready_output} ->
+ {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} ->
do_send(SockRef, Data, EFlags,
next_timeout(TS, Timeout));
- {'$socket', _, abort, {SendRef, Reason}} ->
+ {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} ->
{error, Reason}
after Timeout ->
@@ -1521,15 +1477,17 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) ->
{ok, Written} ->
%% We are partially done, wait for continuation
receive
- {select, SockRef, SendRef, ready_output} when (Written > 0) ->
+ {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef}
+ when (Written > 0) ->
<<_:Written/binary, Rest/binary>> = Data,
do_sendto(SockRef, Rest, Dest, EFlags,
next_timeout(TS, Timeout));
- {select, SockRef, SendRef, ready_output} ->
+
+ {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} ->
do_sendto(SockRef, Data, Dest, EFlags,
next_timeout(TS, Timeout));
- {'$socket', _, abort, {SendRef, Reason}} ->
+ {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} ->
{error, Reason}
after Timeout ->
@@ -1539,11 +1497,11 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) ->
{error, eagain} ->
receive
- {select, SockRef, SendRef, ready_output} ->
+ {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} ->
do_sendto(SockRef, Data, Dest, EFlags,
next_timeout(TS, Timeout));
- {'$socket', _, abort, {SendRef, Reason}} ->
+ {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} ->
{error, Reason}
after Timeout ->
@@ -1593,7 +1551,8 @@ sendmsg(Socket, MsgHdr, Timeout)
sendmsg(Socket, MsgHdr, ?SOCKET_SENDMSG_FLAGS_DEFAULT, Timeout).
--spec sendmsg(Socket, MsgHdr, Flags, Timeout) -> ok | {ok, Remaining} | {error, Reason} when
+-spec sendmsg(Socket, MsgHdr, Flags, Timeout) ->
+ ok | {ok, Remaining} | {error, Reason} when
Socket :: socket(),
MsgHdr :: msghdr(),
Flags :: send_flags(),
@@ -1625,7 +1584,6 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) ->
ok;
{ok, Written} when is_integer(Written) andalso (Written > 0) ->
-
%% We should not retry here since the protocol may not
%% be able to handle a message being split. Leave it to
%% the caller to figure out (call again with the rest).
@@ -1638,9 +1596,10 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) ->
{error, eagain} ->
receive
- {select, SockRef, SendRef, ready_output} ->
+ {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} ->
do_sendmsg(SockRef, MsgHdr, EFlags,
next_timeout(TS, Timeout))
+
after Timeout ->
cancel(SockRef, sendmsg, SendRef),
{error, timeout}
@@ -1668,13 +1627,6 @@ ensure_msghdr(_) ->
%% ===========================================================================
%%
-%% writev - write data into multiple buffers
-%%
-
-
-
-%% ===========================================================================
-%%
%% recv, recvfrom, recvmsg - receive a message from a socket
%%
%% Description:
@@ -1757,14 +1709,10 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
(is_integer(Timeout) andalso (Timeout > 0)) ->
TS = timestamp(Timeout),
RecvRef = make_ref(),
- %% p("do_recv -> try read with"
- %% "~n Length: ~p", [Length]),
case nif_recv(SockRef, RecvRef, Length, EFlags) of
{ok, true = _Complete, Bin} when (size(Acc) =:= 0) ->
- %% p("do_recv -> complete success: ~w", [size(Bin)]),
{ok, Bin};
{ok, true = _Complete, Bin} ->
- %% p("do_recv -> completed success: ~w (~w)", [size(Bin), size(Acc)]),
{ok, <<Acc/binary, Bin/binary>>};
%% It depends on the amount of bytes we tried to read:
@@ -1773,7 +1721,6 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
%% > 0 - We got a part of the message and we will be notified
%% when there is more to read (a select message)
{ok, false = _Complete, Bin} when (Length =:= 0) ->
- %% p("do_recv -> partial success: ~w", [size(Bin)]),
do_recv(SockRef, RecvRef,
Length, EFlags,
<<Acc/binary, Bin/binary>>,
@@ -1783,17 +1730,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
%% We got the first chunk of it.
%% We will be notified (select message) when there
%% is more to read.
- %% p("do_recv -> partial success(~w): ~w"
- %% "~n ~p", [Length, size(Bin), Bin]),
NewTimeout = next_timeout(TS, Timeout),
receive
- {select, SockRef, RecvRef, ready_input} ->
+ {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} ->
do_recv(SockRef, RecvRef,
Length-size(Bin), EFlags,
Bin,
next_timeout(TS, Timeout));
- {'$socket', _, abort, {RecvRef, Reason}} ->
+ {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} ->
{error, Reason}
after NewTimeout ->
@@ -1803,17 +1748,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
{ok, false = _Completed, Bin} ->
%% We got a chunk of it!
- %% p("do_recv -> partial success(~w): ~w (~w)",
- %% [Length, size(Bin), size(Acc)]),
NewTimeout = next_timeout(TS, Timeout),
receive
- {select, SockRef, RecvRef, ready_input} ->
+ {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} ->
do_recv(SockRef, RecvRef,
Length-size(Bin), EFlags,
<<Acc/binary, Bin/binary>>,
next_timeout(TS, Timeout));
- {'$socket', _, abort, {RecvRef, Reason}} ->
+ {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} ->
{error, Reason}
after NewTimeout ->
@@ -1829,16 +1772,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
{error, eagain} ->
%% There is nothing just now, but we will be notified when there
%% is something to read (a select message).
- %% p("do_recv -> eagain(~w): ~w", [Length, size(Acc)]),
NewTimeout = next_timeout(TS, Timeout),
receive
- {select, SockRef, RecvRef, ready_input} ->
+ {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} ->
do_recv(SockRef, RecvRef,
Length, EFlags,
Acc,
next_timeout(TS, Timeout));
- {'$socket', _, abort, {RecvRef, Reason}} ->
+ {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} ->
{error, Reason}
after NewTimeout ->
@@ -1887,7 +1829,7 @@ do_recv(_SockRef, _RecvRef, _Length, _EFlags, _Acc, _Timeout) ->
%% It may be impossible to know what (buffer) size is appropriate
%% "in advance", and in those cases it may be convenient to use the
%% (recv) 'peek' flag. When this flag is provided the message is *not*
-%% "consumed" from the underlying buffers, so another recvfrom call
+%% "consumed" from the underlying (OS) buffers, so another recvfrom call
%% is needed, possibly with a then adjusted buffer size.
%%
@@ -1973,11 +1915,11 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) ->
%% is something to read (a select message).
NewTimeout = next_timeout(TS, Timeout),
receive
- {select, SockRef, RecvRef, ready_input} ->
+ {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} ->
do_recvfrom(SockRef, BufSz, EFlags,
next_timeout(TS, Timeout));
- {'$socket', _, abort, {RecvRef, Reason}} ->
+ {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} ->
{error, Reason}
after NewTimeout ->
@@ -1990,13 +1932,6 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) ->
end.
-%% pi(Item) ->
-%% pi(self(), Item).
-
-%% pi(Pid, Item) ->
-%% {Item, Info} = process_info(Pid, Item),
-%% Info.
-
%% ---------------------------------------------------------------------------
%%
@@ -2077,11 +2012,11 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) ->
%% is something to read (a select message).
NewTimeout = next_timeout(TS, Timeout),
receive
- {select, SockRef, RecvRef, ready_input} ->
+ {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} ->
do_recvmsg(SockRef, BufSz, CtrlSz, EFlags,
next_timeout(TS, Timeout));
- {'$socket', _, abort, {RecvRef, Reason}} ->
+ {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} ->
{error, Reason}
after NewTimeout ->
@@ -2100,12 +2035,6 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) ->
-%% ===========================================================================
-%%
-%% readv - read data into multiple buffers
-%%
-
-
%% ===========================================================================
%%
@@ -2118,10 +2047,9 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) ->
%% 1) nif_close + the socket_stop (nif) callback function
%% This is for everything that can be done safely NON-BLOCKING.
%% 2) nif_finalize_close which is executed by a *dirty* scheduler
-%% Before we call the socket close function, we se the socket
+%% Before we call the socket close function, we set the socket
%% BLOCKING. Thereby linger is handled properly.
-
-spec close(Socket) -> ok | {error, Reason} when
Socket :: socket(),
Reason :: term().
@@ -2137,7 +2065,7 @@ do_close(SockRef) ->
%% We must wait for the socket_stop callback function to
%% complete its work
receive
- {'$socket', SockRef, close, CloseRef} ->
+ {?SOCKET_TAG, #socket{ref = SockRef}, close, CloseRef} ->
nif_finalize_close(SockRef)
end;
{error, _} = ERROR ->
@@ -2381,6 +2309,8 @@ which_protocol(SockRef) ->
end.
+
+
%% ===========================================================================
%%
%% sockname - return the current address of the socket.
@@ -3499,7 +3429,7 @@ cancel(SockRef, Op, OpRef) ->
flush_select_msgs(SockRef, Ref) ->
receive
- {select, SockRef, Ref, _} ->
+ {?SOCKET_TAG, #socket{ref = SockRef}, select, Ref} ->
flush_select_msgs(SockRef, Ref)
after 0 ->
ok
@@ -3568,8 +3498,9 @@ tdiff(T1, T2) ->
%% p(undefined, F, A) ->
%% p("***", F, A);
%% p(SName, F, A) ->
-%% io:format(user,"[~s,~p] " ++ F ++ "~n", [SName, self()|A]),
-%% io:format("[~s,~p] " ++ F ++ "~n", [SName, self()|A]).
+%% TS = formated_timestamp(),
+%% io:format(user,"[~s][~s,~p] " ++ F ++ "~n", [TS, SName, self()|A]),
+%% io:format("[~s][~s,~p] " ++ F ++ "~n", [TS, SName, self()|A]).