aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/doc/src/socket.xml250
-rw-r--r--erts/emulator/nifs/common/socket_nif.c2
-rw-r--r--erts/preloaded/src/socket.erl136
3 files changed, 341 insertions, 47 deletions
diff --git a/erts/doc/src/socket.xml b/erts/doc/src/socket.xml
index d0a316c4de..3efa412b8a 100644
--- a/erts/doc/src/socket.xml
+++ b/erts/doc/src/socket.xml
@@ -149,7 +149,17 @@
<name name="close" arity="1"/>
<fsummary>Close a socket.</fsummary>
<desc>
- <p>Closes the socket.</p>
+ <p>Closes the socket.</p>
+
+ <note>
+ <p>Note that for e.g. <c>protocol</c> = <c>tcp</c>, most implementations
+ doing a close does not guarantee that any data sent is delivered to
+ the recipient before the close is detected at the remote side. </p>
+ <p>One way to handle this is to use the
+ <seealso marker="#shutdown/2"><c>shutdown</c></seealso> function
+ (<c>socket:shutdown(Socket, write)</c>) to signal that no more data is
+ to be sent and then wait for the read side of the socket to be closed.</p>
+ </note>
</desc>
</func>
@@ -159,7 +169,7 @@
<fsummary>Initiate a connection on a socket.</fsummary>
<desc>
<p>This function connects the socket to the address
- specied by the <c>Addr</c> argument.</p>
+ specied by the <c>SockAddr</c> argument.</p>
</desc>
</func>
@@ -171,12 +181,27 @@
<name name="getopt" arity="3" clause_i="5"/>
<name name="getopt" arity="3" clause_i="6"/>
<name name="getopt" arity="3" clause_i="7"/>
+ <fsummary>Get an option on a socket.</fsummary>
+ <desc>
+ <p>Get an option on a socket.</p>
+ <p>What properties are valid depend both on <c>Level</c> and
+ on what kind of socket it is (<c>domain</c>, <c>type</c> and
+ <c>protocol</c>).</p>
+
+ <note><p>Not all options are valid on all platforms. That is,
+ even if "we" support an option, that does not mean that the
+ underlying OS does.</p></note>
+ </desc>
+ </func>
+
+ <func>
<name name="getopt" arity="3" clause_i="8"/>
<fsummary>Get an option on a socket.</fsummary>
<desc>
<p>Get an option on a socket.</p>
- <p>What properties are valid depend on what kind of socket
- it is (<c>domain</c>, <c>type</c> and <c>protocol</c>).</p>
+ <p>What properties are valid depend both on <c>Level</c> and
+ on what kind of socket it is (<c>domain</c>, <c>type</c> and
+ <c>protocol</c>).</p>
<p>When specifying <c>Level</c> as an integer, and therefor
using "native mode", it is *currently* up to the caller to
know how to interpret the result.</p>
@@ -241,12 +266,12 @@
<p>Receive a message from a socket.</p>
<p>This function reads "messages", which means that regardless of
how much we want to read, it returns when we get a message.</p>
- <p>The <c>MaxSize</c> argument basically defines the size of the
+ <p>The <c>BufSz</c> argument basically defines the size of the
receive buffer. By setting the value to zero (0), the configured
- size (setopt) is used.</p>
+ size (setopt with <c>Level</c> = <c>otp</c>) is used.</p>
<p>It may be impossible to know what (buffer) size is appropriate
"in advance", and in those cases it may be convenient to use the
- (recv) 'peek' flag. When this flag is provided the message is *not*
+ (recv) 'peek' flag. When this flag is provided, the message is *not*
"consumed" from the underlying buffers, so another recvfrom call
is needed, possibly with a then adjusted buffer size.</p>
</desc>
@@ -305,5 +330,216 @@
</func>
</funcs>
+ <section>
+ <title>Examples</title>
+ <marker id="examples"></marker>
+ <p>TBD: Need to implement a receiver process in order to be able to
+ implement active!</p>
+ <p>x_tcp.erl:</p>
+ <code type="none">
+listen(Addr, Port) ->
+ try
+ begin
+ Socket = case socket:open(inet, stream, tcp) of
+ {ok, Socket} ->
+ Socket;
+ {error, _} = OERROR ->
+ throw(OERROR)
+ end,
+ SockAddr = #in4_sockaddr{port = Port,
+ addr = Addr},
+ ok = case socket:bind(Socket, SockAddr) of
+ ok ->
+ ok;
+ {error, _} = BERROR ->
+ throw(BERROR)
+ end,
+ case socket:listen(Socket, 10) of
+ ok ->
+ {ok, Socket};
+ {error, _} = LERROR ->
+ throw(LERROR)
+ end
+ end
+ catch
+ throw:ERROR ->
+ ERROR
+end.
+ </code>
+
+ <code type="none">
+connect(Addr, Port) ->
+ try
+ begin
+ Socket = case socket:open(inet, stream, tcp) of
+ {ok, Socket} ->
+ Socket;
+ {error, _} = OERROR ->
+ throw(OERROR)
+ end,
+ BSockAddr = #in4_sockaddr{port = 0,
+ addr = any},
+ ok = case socket:bind(Socket, BSockAddr) of
+ ok ->
+ ok;
+ {error, _} = BERROR ->
+ throw(BERROR)
+ end,
+ CSockAddr = #in4_sockaddr{port = Port,
+ addr = Addr},
+ Socket = case socket:connect(Socket, CSockAddr) of
+ {ok, Sock} ->
+ Sock;
+ {error, _} = CERROR ->
+ throw(CERROR)
+ end,
+ case start_receiver(Socket) of
+ {ok, Pid} ->
+ ok = socket:ts_add(Socket, receiver, Pid),
+ {ok, Socket};
+ {error, _} = RERROR ->
+ socket:close(Socket),
+ throw(RERROR)
+ end
+ end
+ catch
+ throw:ERROR ->
+ ERROR
+end.
+ </code>
+
+ <code type="none">
+accept(LSocket) ->
+ case socket:accept(LSocket) of
+ {ok, Socket} ->
+ case start_receiver(Socket) of
+ {ok, Pid} ->
+ ok = socket:ts_add(Socket, receiver, Pid),
+ {ok, Socket};
+ {error, _} = RERROR ->
+ socket:close(Socket),
+ throw(RERROR)
+ end,
+ {error, _} = AERROR ->
+ AERROR
+ end.
+ </code>
+
+ <code type="none">
+send(Socket, Data) ->
+ socket:send(Socket, Data).
+ </code>
+
+ <code type="none">
+setopt(Socket, controlling_process = Item, Pid) when is_pid(Pid) ->
+ case socket:setopt(Socket, otp, Item, Pid) of
+ ok ->
+ {ok, Receiver} = socket:ts_get(Socket, receiver),
+ update_receiver(Receiver, {controlling_process, Pid),
+ ok;
+ {error, _} = ERROR ->
+ ERROR
+ end;
+setopt(Socket, active, Active) ->
+ {ok, Receiver} = socket:ts_get(Socket, receiver),
+ receiver_active(Receiver, Active),
+ ok;
+%% This is just to indicate that there will be more options
+%% that needs to be handled
+setopt(Socket, Item, Pid) when is_pid(Pid) ->
+ socket:setopt(Socket, which_level(Item), Item, Pid).
+ </code>
+
+ <p>The receiver process: </p>
+ <code type="none">
+start_receiver(Socket) ->
+ CtrlProc = self(),
+ Ref = make_ref(),
+ Receiver = proc_lib:spawn_link(fun() -> receiver_init(CtrlProc, Ref) end),
+ receive
+ {?MODULE, started, Ref} ->
+ Receiver ! {?MODULE, receiver, Ref, Sock, true},
+ unlink(Receiver),
+ {ok, Receiver};
+ {'EXIT', Receiver, Reason} ->
+ {error, Reason}
+ end.
+
+receiver_active(Receiver, Active)
+ when (Active =:= false) orelse
+ (Active =:= true) orelse
+ (Active =:= once) orelse
+ is_integer(Active) ->
+ Receiver ! {?MODULE, active, What}.
+
+receiver_update(Receiver, What) ->
+ Ref = make_ref(),
+ Receiver ! {?MODULE, receiver_update, Ref, What},
+ receive
+ {?MODULE, receiver_upadate, Ref, Result} ->
+ Result
+ end.
+
+-record(receiver, {sock :: socket:socket(),
+ ctrl :: pid(),
+ num_packages :: infinity | non_neg_ineteger()}).
+
+receiver_init(Pid, Ref) ->
+ receive
+ {?MODULE, receiver, Ref, stop} ->
+ i("received stop"),
+ exit(normal);
+
+ {?MODULE, receiver, Ref, Sock, InitialMode} ->
+ i("received socket: ~p", [Sock]),
+ NumPackages = mode2pkgcount(InitialMode),
+ receiver(#receiver{sock = Sock, ctrl = Pid})
+ end.
+
+mode2pkgcount(true) ->
+ infinity;
+mode2pkgcount(false) ->
+ 0;
+mode2pkgcount(N) when is_integer(N) andalso (N >= 0) ->
+ N.
+
+receiver(#receiver{num_packages = 0} = State) ->
+ receive
+ {?MODULE, active, false} ->
+ receiver(State);
+ {?MODULE, active, true} ->
+ receiver(State#receiver{num_packages = infinity});
+ {?MODULE, active, once} ->
+ receiver(State#receiver{num_packages = 1});
+ {?MODULE, active, N} when (N > 0) ->
+ receiver(State#receiver{num_packages = N})
+ end;
+receiver(#receiver{num_packages = N, sock = Sock, ctrl = Pid} = State) ->
+ case socket:recv(Sock, 0) of
+ {ok, Package} when size(Package) > 0 ->
+ Pid ! {tcp, Sock, Packege},
+ case next_active(N) of
+ 0 ->
+ Pid ! {tcp_passive, Sock},
+ receiver(State#{num_packages = 0});
+ NextN ->
+ receiver(State#{num_packages = NextN})
+ end;
+ {ok, Package} when size(Package) =:= 0 ->
+ receiver(State);
+
+ {error, closed} ->
+ i("closed"),
+ Pid ! {tcp_closed, Sock},
+ exit(normal);
+
+ {error, Reason} ->
+ i("error: ~p", [Reason]),
+ Pid ! {tcp_error, Sock, Reason},
+ exit(normal)
+ end.
+ </code>
+
+ </section>
</erlref>
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index ed961d691d..7bafe38273 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -7315,6 +7315,8 @@ ErlNifFunc socket_funcs[] =
// {"nif_debug", 1, nif_debug_, 0},
// The proper "socket" interface
+ // This is used when we already have a file descriptor
+ // {"nif_open", 1, nif_open, 0},
{"nif_open", 4, nif_open, 0},
{"nif_bind", 2, nif_bind, 0},
{"nif_connect", 2, nif_connect, 0},
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 932048ba75..772c733bc7 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -159,6 +159,7 @@
-type otp_socket_option() :: debug |
iow |
+ controlling_process |
rcvbuf |
sndbuf.
%% Shall we have special treatment of linger??
@@ -517,6 +518,63 @@ info() ->
%%
%% open - create an endpoint for communication
%%
+%% Extra: netns
+%%
+%% <KOLLA>
+%%
+%% How do we handle the case when an fd has beem created (somehow)
+%% and we shall create a socket "from it".
+%% Can we figure out Domain, Type and Protocol from fd?
+%% Yes we can: SO_DOMAIN, SO_PROTOCOL, SO_TYPE
+%%
+%% </KOLLA>
+%%
+%%
+%% <KOLLA>
+%%
+%% Start a controller process here, *before* the nif_open call.
+%% If that call is successful, update with owner process (controlling
+%% process) and SockRef. If the open fails, kill the process.
+%% "Register" the process on success:
+%%
+%% nif_register(SockRef, self()).
+%%
+%% The nif sets up a monitor to this process, and if it dies the socket
+%% is closed. It is also used if someone wants to monitor the socket.
+%%
+%% We therefor need monitor function(s):
+%%
+%% socket:monitor(Socket)
+%% socket:demonitor(Socket)
+%%
+%% These are basically used to monitor the controller process.
+%%
+%% </KOLLA>
+%%
+
+%% -spec open(FD) -> {ok, Socket} | {error, Reason} when
+%% Socket :: socket(),
+%% Reason :: term().
+
+%% open(FD) ->
+%% try
+%% begin
+%% case nif_open(FD) of
+%% {ok, {SockRef, Domain, Type, Protocol}} ->
+%% SocketInfo = #{domain => Domain,
+%% type => Type,
+%% protocol => Protocol},
+%% Socket = #socket{info = SocketInfo,
+%% ref = SockRef},
+%% {ok, Socket};
+%% {error, _} = ERROR ->
+%% ERROR
+%% end
+%% end
+%% catch
+%% _:_ -> % This must be improved!!
+%% {error, einval}
+%% end.
-spec open(Domain, Type) -> {ok, Socket} | {error, Reason} when
Domain :: domain(),
@@ -653,6 +711,20 @@ connect(#socket{ref = SockRef}, SockAddr, Timeout)
NewTimeout = next_timeout(TS, Timeout),
receive
{select, SockRef, Ref, ready_output} ->
+ %% <KOLLA>
+ %%
+ %% See open above!!
+ %%
+ %% * Here we should start and *register* the reader process
+ %% (This will cause the nif code to create a monitor to
+ %% the process)
+ %% * The reader is basically used to implement the active-X
+ %% feature!
+ %% * If the reader dies for whatever reason, then the socket
+ %% (resource) closes and the owner (controlling) process
+ %% is informed (closed message).
+ %%
+ %% </KOLLA>
nif_finalize_connection(SockRef)
after NewTimeout ->
nif_cancel(SockRef, connect, Ref),
@@ -719,6 +791,16 @@ do_accept(LSockRef, SI, Timeout) ->
AccRef = make_ref(),
case nif_accept(LSockRef, AccRef) of
{ok, SockRef} ->
+ %% <KOLLA>
+ %%
+ %% * Here we should start and *register* the reader process
+ %% (This will cause the nif code to create a monitor to the process)
+ %% * The reader is basically used to implement the active-X feature!
+ %% * If the reader dies for whatever reason, then the socket (resource)
+ %% closes and the owner (controlling) process is informed (closed
+ %% message).
+ %%
+ %% </KOLLA>
SocketInfo = #{domain => maps:get(domain, SI),
type => maps:get(type, SI),
protocol => maps:get(protocol, SI)},
@@ -1297,46 +1379,32 @@ shutdown(#socket{ref = SockRef}, How) ->
%%
%% </KOLLA>
--spec setopt(Socket, Level, Opt, Value) -> ok | {error, Reason} when
+-spec setopt(Socket, otp, otp_socket_option(), Value) -> ok | {error, Reason} when
Socket :: socket(),
- Level :: otp,
- Opt :: otp_socket_option(),
Value :: term(),
Reason :: term()
- ; (Socket, Level, Opt, Value) -> ok | {error, Reason} when
+ ; (Socket, socket, socket_option(), Value) -> ok | {error, Reason} when
Socket :: socket(),
- Level :: socket,
- Opt :: socket_option(),
Value :: term(),
Reason :: term()
- ; (Socket, Level, Opt, Value) -> ok | {error, Reason} when
+ ; (Socket, ip, ip_socket_option(), Value) -> ok | {error, Reason} when
Socket :: socket(),
- Level :: ip,
- Opt :: ip_socket_option(),
Value :: term(),
Reason :: term()
- ; (Socket, Level, Opt, Value) -> ok | {error, Reason} when
+ ; (Socket, ipv6, ipv6_socket_option(), Value) -> ok | {error, Reason} when
Socket :: socket(),
- Level :: ipv6,
- Opt :: ipv6_socket_option(),
Value :: term(),
Reason :: term()
- ; (Socket, Level, Opt, Value) -> ok | {error, Reason} when
+ ; (Socket, tcp, tcp_socket_option(), Value) -> ok | {error, Reason} when
Socket :: socket(),
- Level :: tcp,
- Opt :: tcp_socket_option(),
Value :: term(),
Reason :: term()
- ; (Socket, Level, Opt, Value) -> ok | {error, Reason} when
+ ; (Socket, udp, udp_socket_option(), Value) -> ok | {error, Reason} when
Socket :: socket(),
- Level :: udp,
- Opt :: udp_socket_option(),
Value :: term(),
Reason :: term()
- ; (Socket, Level, Opt, Value) -> ok | {error, Reason} when
+ ; (Socket, sctp, sctp_socket_option(), Value) -> ok | {error, Reason} when
Socket :: socket(),
- Level :: sctp,
- Opt :: sctp_socket_option(),
Value :: term(),
Reason :: term().
@@ -1374,46 +1442,34 @@ setopt(#socket{info = Info, ref = SockRef}, Level, Key, Value) ->
%% value size. Example: int | bool | {string, pos_integer()} | non_neg_integer()
%%
--spec getopt(Socket, Level, Key) -> {ok, Value} | {error, Reason} when
+-spec getopt(Socket, otp, otp_socket_option()) -> {ok, Value} | {error, Reason} when
Socket :: socket(),
- Level :: otp,
- Key :: otp_socket_option(),
Value :: term(),
Reason :: term()
- ; (Socket, Level, Key) -> {ok, Value} | {error, Reason} when
+ ; (Socket, socket, socket_option()) -> {ok, Value} | {error, Reason} when
Socket :: socket(),
- Level :: socket,
- Key :: socket_option(),
Value :: term(),
Reason :: term()
- ; (Socket, Level, Key) -> {ok, Value} | {error, Reason} when
+ ; (Socket, ip, ip_socket_option()) -> {ok, Value} | {error, Reason} when
Socket :: socket(),
Level :: ip,
Key :: ip_socket_option(),
Value :: term(),
Reason :: term()
- ; (Socket, Level, Key) -> {ok, Value} | {error, Reason} when
+ ; (Socket, ipv6, ipv6_socket_option()) -> {ok, Value} | {error, Reason} when
Socket :: socket(),
- Level :: ipv6,
- Key :: ipv6_socket_option(),
Value :: term(),
Reason :: term()
- ; (Socket, Level, Key) -> {ok, Value} | {error, Reason} when
+ ; (Socket, tcp, tcp_socket_option()) -> {ok, Value} | {error, Reason} when
Socket :: socket(),
- Level :: tcp,
- Key :: tcp_socket_option(),
Value :: term(),
Reason :: term()
- ; (Socket, Level, Key) -> {ok, Value} | {error, Reason} when
+ ; (Socket, udp, udp_socket_option()) -> {ok, Value} | {error, Reason} when
Socket :: socket(),
- Level :: udp,
- Key :: udp_socket_option(),
Value :: term(),
Reason :: term()
- ; (Socket, Level, Key) -> {ok, Value} | {error, Reason} when
+ ; (Socket, sctp, sctp_socket_option()) -> {ok, Value} | {error, Reason} when
Socket :: socket(),
- Level :: sctp,
- Key :: sctp_socket_option(),
Value :: term(),
Reason :: term()
; (Socket, Level, Key) -> ok | {ok, Value} | {error, Reason} when