aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ranch.app.src2
-rw-r--r--src/ranch.erl10
-rw-r--r--src/ranch_acceptor.erl11
-rw-r--r--src/ranch_conns_sup.erl89
-rw-r--r--src/ranch_listener_sup.erl10
-rw-r--r--src/ranch_ssl.erl33
-rw-r--r--src/ranch_tcp.erl7
-rw-r--r--src/ranch_transport.erl7
8 files changed, 119 insertions, 50 deletions
diff --git a/src/ranch.app.src b/src/ranch.app.src
index 03b8dae..bb6db94 100644
--- a/src/ranch.app.src
+++ b/src/ranch.app.src
@@ -14,7 +14,7 @@
{application, ranch, [
{description, "Socket acceptor pool for TCP protocols."},
- {vsn, "0.8.5"},
+ {vsn, "0.9.0"},
{modules, []},
{registered, [ranch_sup, ranch_server]},
{applications, [
diff --git a/src/ranch.erl b/src/ranch.erl
index 74497f0..641fc4d 100644
--- a/src/ranch.erl
+++ b/src/ranch.erl
@@ -120,7 +120,7 @@ child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts)
andalso is_atom(Protocol) ->
{{ranch_listener_sup, Ref}, {ranch_listener_sup, start_link, [
Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts
- ]}, permanent, 5000, supervisor, [ranch_listener_sup]}.
+ ]}, permanent, infinity, supervisor, [ranch_listener_sup]}.
%% @doc Acknowledge the accepted connection.
%%
@@ -128,7 +128,9 @@ child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts)
%% the protocol process before starting to use it.
-spec accept_ack(ref()) -> ok.
accept_ack(Ref) ->
- receive {shoot, Ref} -> ok end.
+ receive {shoot, Ref, Transport, Socket, AckTimeout} ->
+ Transport:accept_ack(Socket, AckTimeout)
+ end.
%% @doc Remove the calling process' connection from the pool.
%%
@@ -175,7 +177,7 @@ set_protocol_options(Ref, Opts) ->
%% It takes a list of options, a list of allowed keys and an accumulator.
%% This accumulator can be used to set default options that should never
%% be overriden.
--spec filter_options([{atom(), any()} | {atom(), any(), any(), any()}],
+-spec filter_options([{atom(), any()} | {raw, any(), any(), any()}],
[atom()], Acc) -> Acc when Acc :: [any()].
filter_options([], _, Acc) ->
Acc;
@@ -200,7 +202,7 @@ set_option_default(Opts, Key, Value) ->
end.
%% @doc Start the given applications if they were not already started.
--spec require(list(module())) -> ok.
+-spec require([atom()]) -> ok.
require([]) ->
ok;
require([App|Tail]) ->
diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl
index f838b7d..da1aac5 100644
--- a/src/ranch_acceptor.erl
+++ b/src/ranch_acceptor.erl
@@ -48,4 +48,15 @@ loop(LSocket, Transport, ConnsSup) ->
{error, Reason} when Reason =/= closed ->
ok
end,
+ flush(),
?MODULE:loop(LSocket, Transport, ConnsSup).
+
+flush() ->
+ receive Msg ->
+ error_logger:error_msg(
+ "Ranch acceptor received unexpected message: ~p~n",
+ [Msg]),
+ flush()
+ after 0 ->
+ ok
+ end.
diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl
index 245a5e0..308a1ab 100644
--- a/src/ranch_conns_sup.erl
+++ b/src/ranch_conns_sup.erl
@@ -20,34 +20,38 @@
-module(ranch_conns_sup).
%% API.
--export([start_link/4]).
+-export([start_link/6]).
-export([start_protocol/2]).
-export([active_connections/1]).
%% Supervisor internals.
--export([init/5]).
+-export([init/7]).
-export([system_continue/3]).
-export([system_terminate/4]).
-export([system_code_change/4]).
-type conn_type() :: worker | supervisor.
+-type shutdown() :: brutal_kill | timeout().
-record(state, {
parent = undefined :: pid(),
ref :: ranch:ref(),
conn_type :: conn_type(),
+ shutdown :: shutdown(),
transport = undefined :: module(),
protocol = undefined :: module(),
opts :: any(),
- max_conns = undefined :: non_neg_integer() | infinity
+ ack_timeout :: timeout(),
+ max_conns = undefined :: ranch:max_conns()
}).
%% API.
--spec start_link(ranch:ref(), conn_type(), module(), module()) -> {ok, pid()}.
-start_link(Ref, ConnType, Transport, Protocol) ->
+-spec start_link(ranch:ref(), conn_type(), shutdown(), module(),
+ timeout(), module()) -> {ok, pid()}.
+start_link(Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol) ->
proc_lib:start_link(?MODULE, init,
- [self(), Ref, ConnType, Transport, Protocol]).
+ [self(), Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol]).
%% We can safely assume we are on the same node as the supervisor.
%%
@@ -92,26 +96,28 @@ active_connections(SupPid) ->
%% Supervisor internals.
--spec init(pid(), ranch:ref(), conn_type(), module(), module()) -> no_return().
-init(Parent, Ref, ConnType, Transport, Protocol) ->
+-spec init(pid(), ranch:ref(), conn_type(), shutdown(),
+ module(), timeout(), module()) -> no_return().
+init(Parent, Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol) ->
process_flag(trap_exit, true),
ok = ranch_server:set_connections_sup(Ref, self()),
MaxConns = ranch_server:get_max_connections(Ref),
Opts = ranch_server:get_protocol_options(Ref),
ok = proc_lib:init_ack(Parent, {ok, self()}),
loop(#state{parent=Parent, ref=Ref, conn_type=ConnType,
- transport=Transport, protocol=Protocol, opts=Opts,
- max_conns=MaxConns}, 0, 0, []).
+ shutdown=Shutdown, transport=Transport, protocol=Protocol,
+ opts=Opts, ack_timeout=AckTimeout, max_conns=MaxConns}, 0, 0, []).
loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType,
transport=Transport, protocol=Protocol, opts=Opts,
- max_conns=MaxConns}, CurConns, NbChildren, Sleepers) ->
+ ack_timeout=AckTimeout, max_conns=MaxConns},
+ CurConns, NbChildren, Sleepers) ->
receive
{?MODULE, start_protocol, To, Socket} ->
case Protocol:start_link(Ref, Socket, Transport, Opts) of
{ok, Pid} ->
Transport:controlling_process(Socket, Pid),
- Pid ! {shoot, Ref},
+ Pid ! {shoot, Ref, Transport, Socket, AckTimeout},
put(Pid, true),
CurConns2 = CurConns + 1,
if CurConns2 < MaxConns ->
@@ -122,8 +128,12 @@ loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType,
loop(State, CurConns2, NbChildren + 1,
[To|Sleepers])
end;
- _ ->
+ Ret ->
To ! self(),
+ error_logger:error_msg(
+ "Ranch listener ~p connection process start failure; "
+ "~p:start_link/4 returned: ~999999p~n",
+ [Ref, Protocol, Ret]),
Transport:close(Socket),
loop(State, CurConns, NbChildren, Sleepers)
end;
@@ -147,7 +157,7 @@ loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType,
loop(State#state{opts=Opts2},
CurConns, NbChildren, Sleepers);
{'EXIT', Parent, Reason} ->
- exit(Reason);
+ terminate(State, Reason, NbChildren);
{'EXIT', Pid, Reason} when Sleepers =:= [] ->
report_error(Ref, Protocol, Pid, Reason),
erase(Pid),
@@ -186,12 +196,59 @@ loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType,
[Ref, Msg])
end.
+-spec terminate(#state{}, any(), non_neg_integer()) -> no_return().
+%% Kill all children and then exit. We unlink first to avoid
+%% getting a message for each child getting killed.
+terminate(#state{shutdown=brutal_kill}, Reason, _) ->
+ Pids = get_keys(true),
+ _ = [begin
+ unlink(P),
+ exit(P, kill)
+ end || P <- Pids],
+ exit(Reason);
+%% Attempt to gracefully shutdown all children.
+terminate(#state{shutdown=Shutdown}, Reason, NbChildren) ->
+ shutdown_children(),
+ _ = if
+ Shutdown =:= infinity ->
+ ok;
+ true ->
+ erlang:send_after(Shutdown, self(), kill)
+ end,
+ wait_children(NbChildren),
+ exit(Reason).
+
+%% Monitor processes so we can know which ones have shutdown
+%% before the timeout. Unlink so we avoid receiving an extra
+%% message. Then send a shutdown exit signal.
+shutdown_children() ->
+ Pids = get_keys(true),
+ _ = [begin
+ monitor(process, P),
+ unlink(P),
+ exit(P, shutdown)
+ end || P <- Pids],
+ ok.
+
+wait_children(0) ->
+ ok;
+wait_children(NbChildren) ->
+ receive
+ {'DOWN', _, process, Pid, _} ->
+ _ = erase(Pid),
+ wait_children(NbChildren - 1);
+ kill ->
+ Pids = get_keys(true),
+ _ = [exit(P, kill) || P <- Pids],
+ ok
+ end.
+
system_continue(_, _, {State, CurConns, NbChildren, Sleepers}) ->
loop(State, CurConns, NbChildren, Sleepers).
-spec system_terminate(any(), _, _, _) -> no_return().
-system_terminate(Reason, _, _, _) ->
- exit(Reason).
+system_terminate(Reason, _, _, {State, _, NbChildren, _}) ->
+ terminate(State, Reason, NbChildren).
system_code_change(Misc, _, _, _) ->
{ok, Misc}.
diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl
index 0392105..30017d0 100644
--- a/src/ranch_listener_sup.erl
+++ b/src/ranch_listener_sup.erl
@@ -36,15 +36,15 @@ start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
%% supervisor.
init({Ref, NbAcceptors, Transport, TransOpts, Protocol}) ->
+ AckTimeout = proplists:get_value(ack_timeout, TransOpts, 5000),
ConnType = proplists:get_value(connection_type, TransOpts, worker),
+ Shutdown = proplists:get_value(shutdown, TransOpts, 5000),
ChildSpecs = [
- %% conns_sup
{ranch_conns_sup, {ranch_conns_sup, start_link,
- [Ref, ConnType, Transport, Protocol]},
+ [Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol]},
permanent, infinity, supervisor, [ranch_conns_sup]},
- %% acceptors_sup
{ranch_acceptors_sup, {ranch_acceptors_sup, start_link,
- [Ref, NbAcceptors, Transport, TransOpts]
- }, permanent, infinity, supervisor, [ranch_acceptors_sup]}
+ [Ref, NbAcceptors, Transport, TransOpts]},
+ permanent, infinity, supervisor, [ranch_acceptors_sup]}
],
{ok, {{rest_for_one, 10, 10}, ChildSpecs}}.
diff --git a/src/ranch_ssl.erl b/src/ranch_ssl.erl
index 097b31c..dc29a18 100644
--- a/src/ranch_ssl.erl
+++ b/src/ranch_ssl.erl
@@ -30,6 +30,7 @@
-export([messages/0]).
-export([listen/1]).
-export([accept/2]).
+-export([accept_ack/2]).
-export([connect/3]).
-export([connect/4]).
-export([recv/3]).
@@ -165,13 +166,18 @@ listen(Opts) ->
%% @see ssl:transport_accept/2
%% @see ssl:ssl_accept/2
-spec accept(ssl:sslsocket(), timeout())
- -> {ok, ssl:sslsocket()} | {error, closed | timeout | atom() | tuple()}.
+ -> {ok, ssl:sslsocket()} | {error, closed | timeout | atom()}.
accept(LSocket, Timeout) ->
- case ssl:transport_accept(LSocket, Timeout) of
- {ok, CSocket} ->
- ssl_accept(CSocket, Timeout);
+ ssl:transport_accept(LSocket, Timeout).
+
+-spec accept_ack(ssl:sslsocket(), timeout()) -> ok.
+accept_ack(CSocket, Timeout) ->
+ case ssl:ssl_accept(CSocket, Timeout) of
+ ok ->
+ ok;
{error, Reason} ->
- {error, Reason}
+ ok = close(CSocket),
+ error(Reason)
end.
%% @private Experimental. Open a connection to the given host and port number.
@@ -209,7 +215,7 @@ send(Socket, Packet) ->
ssl:send(Socket, Packet).
%% @equiv sendfile(Socket, Filename, 0, 0, [])
--spec sendfile(ssl:sslsocket(), file:name_all())
+-spec sendfile(ssl:sslsocket(), file:name_all() | file:fd())
-> {ok, non_neg_integer()} | {error, atom()}.
sendfile(Socket, Filename) ->
sendfile(Socket, Filename, 0, 0, []).
@@ -275,21 +281,6 @@ close(Socket) ->
%% Internal.
-%% This call always times out, either because a numeric timeout value
-%% was given, or because we've decided to use 5000ms instead of infinity.
-%% This value should be reasonable enough for the moment.
--spec ssl_accept(ssl:sslsocket(), timeout())
- -> {ok, ssl:sslsocket()} | {error, {ssl_accept, atom()}}.
-ssl_accept(Socket, infinity) ->
- ssl_accept(Socket, 5000);
-ssl_accept(Socket, Timeout) ->
- case ssl:ssl_accept(Socket, Timeout) of
- ok ->
- {ok, Socket};
- {error, Reason} ->
- {error, {ssl_accept, Reason}}
- end.
-
%% Unfortunately the implementation of elliptic-curve ciphers that has
%% been introduced in R16B01 is incomplete. Depending on the particular
%% client, this can cause the TLS handshake to break during key
diff --git a/src/ranch_tcp.erl b/src/ranch_tcp.erl
index abf7612..d5d5003 100644
--- a/src/ranch_tcp.erl
+++ b/src/ranch_tcp.erl
@@ -24,6 +24,7 @@
-export([messages/0]).
-export([listen/1]).
-export([accept/2]).
+-export([accept_ack/2]).
-export([connect/3]).
-export([connect/4]).
-export([recv/3]).
@@ -90,6 +91,10 @@ listen(Opts) ->
accept(LSocket, Timeout) ->
gen_tcp:accept(LSocket, Timeout).
+-spec accept_ack(inet:socket(), timeout()) -> ok.
+accept_ack(_, _) ->
+ ok.
+
%% @private Experimental. Open a connection to the given host and port number.
%% @see gen_tcp:connect/3
%% @todo Probably filter Opts?
@@ -126,7 +131,7 @@ send(Socket, Packet) ->
gen_tcp:send(Socket, Packet).
%% @equiv sendfile(Socket, File, Offset, Bytes, [])
--spec sendfile(inet:socket(), file:name_all())
+-spec sendfile(inet:socket(), file:name_all() | file:fd())
-> {ok, non_neg_integer()} | {error, atom()}.
sendfile(Socket, Filename) ->
sendfile(Socket, Filename, 0, 0, []).
diff --git a/src/ranch_transport.erl b/src/ranch_transport.erl
index fe06420..5cf10d1 100644
--- a/src/ranch_transport.erl
+++ b/src/ranch_transport.erl
@@ -46,7 +46,10 @@
%% Accept connections with the given listening socket.
-callback accept(socket(), timeout())
- -> {ok, socket()} | {error, closed | timeout | atom() | tuple()}.
+ -> {ok, socket()} | {error, closed | timeout | atom()}.
+
+%% Perform post-accept operations on the socket.
+-callback accept_ack(socket(), timeout()) -> ok.
%% Experimental. Open a connection to the given host and port number.
-callback connect(string(), inet:port_number(), opts())
@@ -65,7 +68,7 @@
-callback send(socket(), iodata()) -> ok | {error, atom()}.
%% Send a file on a socket.
--callback sendfile(socket(), file:name())
+-callback sendfile(socket(), file:name() | file:fd())
-> {ok, non_neg_integer()} | {error, atom()}.
%% Send part of a file on a socket.