aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ranch.erl51
-rw-r--r--src/ranch_acceptor.erl14
-rw-r--r--src/ranch_acceptors_sup.erl14
-rw-r--r--src/ranch_conns_sup.erl32
-rw-r--r--src/ranch_conns_sup_sup.erl34
-rw-r--r--src/ranch_listener_sup.erl13
-rw-r--r--src/ranch_server.erl60
7 files changed, 147 insertions, 71 deletions
diff --git a/src/ranch.erl b/src/ranch.erl
index 33a8256..2290dee 100644
--- a/src/ranch.erl
+++ b/src/ranch.erl
@@ -255,8 +255,11 @@ recv_proxy_header(Ref, Timeout) ->
-spec remove_connection(ref()) -> ok.
remove_connection(Ref) ->
- ConnsSup = ranch_server:get_connections_sup(Ref),
- ConnsSup ! {remove_connection, Ref, self()},
+ ListenerSup = ranch_server:get_listener_sup(Ref),
+ {_, ConnsSupSup, _, _} = lists:keyfind(ranch_conns_sup_sup, 1,
+ supervisor:which_children(ListenerSup)),
+ _ = [ConnsSup ! {remove_connection, Ref, self()} ||
+ {_, ConnsSup, _, _} <- supervisor:which_children(ConnsSupSup)],
ok.
-spec get_status(ref()) -> running | suspended.
@@ -279,6 +282,16 @@ get_port(Ref) ->
{_, Port} = get_addr(Ref),
Port.
+-spec get_connections(ref(), active|all) -> non_neg_integer().
+get_connections(Ref, active) ->
+ SupCounts = [ranch_conns_sup:active_connections(ConnsSup) ||
+ {_, ConnsSup} <- ranch_server:get_connections_sups(Ref)],
+ lists:sum(SupCounts);
+get_connections(Ref, all) ->
+ SupCounts = [proplists:get_value(active, supervisor:count_children(ConnsSup)) ||
+ {_, ConnsSup} <- ranch_server:get_connections_sups(Ref)],
+ lists:sum(SupCounts).
+
-spec get_max_connections(ref()) -> max_conns().
get_max_connections(Ref) ->
ranch_server:get_max_connections(Ref).
@@ -321,7 +334,6 @@ info(Ref) ->
listener_info(Ref, Pid) ->
[_, Transport, _, Protocol, _] = ranch_server:get_listener_start_args(Ref),
- ConnsSup = ranch_server:get_connections_sup(Ref),
Status = get_status(Ref),
{IP, Port} = get_addr(Ref),
MaxConns = get_max_connections(Ref),
@@ -333,8 +345,8 @@ listener_info(Ref, Pid) ->
{ip, IP},
{port, Port},
{max_connections, MaxConns},
- {active_connections, ranch_conns_sup:active_connections(ConnsSup)},
- {all_connections, proplists:get_value(active, supervisor:count_children(ConnsSup))},
+ {active_connections, get_connections(Ref, active)},
+ {all_connections, get_connections(Ref, all)},
{transport, Transport},
{transport_options, TransOpts},
{protocol, Protocol},
@@ -342,20 +354,28 @@ listener_info(Ref, Pid) ->
].
-spec procs(ref(), acceptors | connections) -> [pid()].
-procs(Ref, acceptors) ->
- procs1(Ref, ranch_acceptors_sup);
-procs(Ref, connections) ->
- procs1(Ref, ranch_conns_sup).
-
-procs1(Ref, Sup) ->
+procs(Ref, Type) ->
ListenerSup = ranch_server:get_listener_sup(Ref),
- {_, SupPid, _, _} = lists:keyfind(Sup, 1,
+ procs1(ListenerSup, Type).
+
+procs1(ListenerSup, acceptors) ->
+ {_, SupPid, _, _} = lists:keyfind(ranch_acceptors_sup, 1,
supervisor:which_children(ListenerSup)),
try
[Pid || {_, Pid, _, _} <- supervisor:which_children(SupPid)]
- catch exit:{noproc, _} when Sup =:= ranch_acceptors_sup ->
+ catch exit:{noproc, _} ->
[]
- end.
+ end;
+procs1(ListenerSup, connections) ->
+ {_, SupSupPid, _, _} = lists:keyfind(ranch_conns_sup_sup, 1,
+ supervisor:which_children(ListenerSup)),
+ Conns=
+ lists:map(fun ({_, SupPid, _, _}) ->
+ [Pid || {_, Pid, _, _} <- supervisor:which_children(SupPid)]
+ end,
+ supervisor:which_children(SupSupPid)
+ ),
+ lists:flatten(Conns).
-spec wait_for_connections
(ref(), '>' | '>=' | '==' | '=<', non_neg_integer()) -> ok;
@@ -387,8 +407,7 @@ validate_interval(_) -> error(badarg).
wait_for_connections_loop(Ref, Op, NumConns, Interval) ->
CurConns = try
- ConnsSup = ranch_server:get_connections_sup(Ref),
- proplists:get_value(active, supervisor:count_children(ConnsSup))
+ get_connections(Ref, all)
catch _:_ ->
0
end,
diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl
index 3e426bd..935ec5c 100644
--- a/src/ranch_acceptor.erl
+++ b/src/ranch_acceptor.erl
@@ -15,23 +15,25 @@
-module(ranch_acceptor).
-export([start_link/4]).
--export([loop/4]).
+-export([loop/5]).
-spec start_link(inet:socket(), module(), module(), pid())
-> {ok, pid()}.
start_link(LSocket, Transport, Logger, ConnsSup) ->
- Pid = spawn_link(?MODULE, loop, [LSocket, Transport, Logger, ConnsSup]),
+ MonitorRef = monitor(process, ConnsSup),
+ Pid = spawn_link(?MODULE, loop, [LSocket, Transport, Logger, ConnsSup, MonitorRef]),
{ok, Pid}.
--spec loop(inet:socket(), module(), module(), pid()) -> no_return().
-loop(LSocket, Transport, Logger, ConnsSup) ->
+-spec loop(inet:socket(), module(), module(), pid(), reference()) -> no_return().
+loop(LSocket, Transport, Logger, ConnsSup, MonitorRef) ->
_ = case Transport:accept(LSocket, infinity) of
{ok, CSocket} ->
case Transport:controlling_process(CSocket, ConnsSup) of
ok ->
%% This call will not return until process has been started
%% AND we are below the maximum number of connections.
- ranch_conns_sup:start_protocol(ConnsSup, CSocket);
+ ranch_conns_sup:start_protocol(ConnsSup, MonitorRef,
+ CSocket);
{error, _} ->
Transport:close(CSocket)
end;
@@ -51,7 +53,7 @@ loop(LSocket, Transport, Logger, ConnsSup) ->
ok
end,
flush(Logger),
- ?MODULE:loop(LSocket, Transport, Logger, ConnsSup).
+ ?MODULE:loop(LSocket, Transport, Logger, ConnsSup, MonitorRef).
flush(Logger) ->
receive Msg ->
diff --git a/src/ranch_acceptors_sup.erl b/src/ranch_acceptors_sup.erl
index cdb633b..e320633 100644
--- a/src/ranch_acceptors_sup.erl
+++ b/src/ranch_acceptors_sup.erl
@@ -15,18 +15,16 @@
-module(ranch_acceptors_sup).
-behaviour(supervisor).
--export([start_link/2]).
+-export([start_link/3]).
-export([init/1]).
--spec start_link(ranch:ref(), module())
+-spec start_link(ranch:ref(), pos_integer(), module())
-> {ok, pid()}.
-start_link(Ref, Transport) ->
- supervisor:start_link(?MODULE, [Ref, Transport]).
+start_link(Ref, NumAcceptors, Transport) ->
+ supervisor:start_link(?MODULE, [Ref, NumAcceptors, Transport]).
-init([Ref, Transport]) ->
- ConnsSup = ranch_server:get_connections_sup(Ref),
+init([Ref, NumAcceptors, Transport]) ->
TransOpts = ranch_server:get_transport_options(Ref),
- NumAcceptors = maps:get(num_acceptors, TransOpts, 10),
Logger = maps:get(logger, TransOpts, error_logger),
SocketOpts = maps:get(socket_opts, TransOpts, []),
%% We temporarily put the logger in the process dictionary
@@ -45,7 +43,7 @@ init([Ref, Transport]) ->
ranch_server:set_addr(Ref, Addr),
Procs = [
{{acceptor, self(), N}, {ranch_acceptor, start_link, [
- LSocket, Transport, Logger, ConnsSup
+ LSocket, Transport, Logger, ranch_server:get_connections_sup(Ref, N)
]}, permanent, brutal_kill, worker, []}
|| N <- lists:seq(1, NumAcceptors)],
{ok, {{one_for_one, 1, 5}, Procs}}.
diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl
index d19405b..af3ba23 100644
--- a/src/ranch_conns_sup.erl
+++ b/src/ranch_conns_sup.erl
@@ -18,12 +18,12 @@
-module(ranch_conns_sup).
%% API.
--export([start_link/3]).
--export([start_protocol/2]).
+-export([start_link/4]).
+-export([start_protocol/3]).
-export([active_connections/1]).
%% Supervisor internals.
--export([init/4]).
+-export([init/5]).
-export([system_continue/3]).
-export([system_terminate/4]).
-export([system_code_change/4]).
@@ -34,6 +34,7 @@
-record(state, {
parent = undefined :: pid(),
ref :: ranch:ref(),
+ acceptor_id :: non_neg_integer(),
conn_type :: conn_type(),
shutdown :: shutdown(),
transport = undefined :: module(),
@@ -46,10 +47,10 @@
%% API.
--spec start_link(ranch:ref(), module(), module()) -> {ok, pid()}.
-start_link(Ref, Transport, Protocol) ->
+-spec start_link(ranch:ref(), non_neg_integer(), module(), module()) -> {ok, pid()}.
+start_link(Ref, AcceptorId, Transport, Protocol) ->
proc_lib:start_link(?MODULE, init,
- [self(), Ref, Transport, Protocol]).
+ [self(), Ref, AcceptorId, Transport, Protocol]).
%% We can safely assume we are on the same node as the supervisor.
%%
@@ -67,10 +68,15 @@ start_link(Ref, Transport, Protocol) ->
%% We do not need the reply, we only need the ok from the supervisor
%% to continue. The supervisor sends its own pid when the acceptor can
%% continue.
--spec start_protocol(pid(), inet:socket()) -> ok.
-start_protocol(SupPid, Socket) ->
+-spec start_protocol(pid(), reference(), inet:socket()) -> ok.
+start_protocol(SupPid, MonitorRef, Socket) ->
SupPid ! {?MODULE, start_protocol, self(), Socket},
- receive SupPid -> ok end.
+ receive
+ SupPid ->
+ ok;
+ {'DOWN', MonitorRef, process, SupPid, Reason} ->
+ error(Reason)
+ end.
%% We can't make the above assumptions here. This function might be
%% called from anywhere.
@@ -94,10 +100,10 @@ active_connections(SupPid) ->
%% Supervisor internals.
--spec init(pid(), ranch:ref(), module(), module()) -> no_return().
-init(Parent, Ref, Transport, Protocol) ->
+-spec init(pid(), ranch:ref(), non_neg_integer(), module(), module()) -> no_return().
+init(Parent, Ref, AcceptorId, Transport, Protocol) ->
process_flag(trap_exit, true),
- ok = ranch_server:set_connections_sup(Ref, self()),
+ ok = ranch_server:set_connections_sup(Ref, AcceptorId, self()),
MaxConns = ranch_server:get_max_connections(Ref),
TransOpts = ranch_server:get_transport_options(Ref),
ConnType = maps:get(connection_type, TransOpts, worker),
@@ -106,7 +112,7 @@ init(Parent, Ref, Transport, Protocol) ->
Logger = maps:get(logger, TransOpts, error_logger),
ProtoOpts = ranch_server:get_protocol_options(Ref),
ok = proc_lib:init_ack(Parent, {ok, self()}),
- loop(#state{parent=Parent, ref=Ref, conn_type=ConnType,
+ loop(#state{parent=Parent, ref=Ref, acceptor_id=AcceptorId, conn_type=ConnType,
shutdown=Shutdown, transport=Transport, protocol=Protocol,
opts=ProtoOpts, handshake_timeout=HandshakeTimeout,
max_conns=MaxConns, logger=Logger}, 0, 0, []).
diff --git a/src/ranch_conns_sup_sup.erl b/src/ranch_conns_sup_sup.erl
new file mode 100644
index 0000000..423c5db
--- /dev/null
+++ b/src/ranch_conns_sup_sup.erl
@@ -0,0 +1,34 @@
+%% Copyright (c) 2019, Jan Uhlig <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(ranch_conns_sup_sup).
+
+-behaviour(supervisor).
+
+-export([start_link/4]).
+-export([init/1]).
+
+start_link(Ref, NumAcceptors, Transport, Protocol) ->
+ ok = ranch_server:cleanup_connections_sups(Ref),
+ supervisor:start_link(?MODULE, {
+ Ref, NumAcceptors, Transport, Protocol
+ }).
+
+init({Ref, NumAcceptors, Transport, Protocol}) ->
+ ChildSpecs = [
+ {{ranch_conns_sup, N}, {ranch_conns_sup, start_link,
+ [Ref, N, Transport, Protocol]},
+ permanent, infinity, supervisor, [ranch_conns_sup]}
+ || N <- lists:seq(1, NumAcceptors)],
+ {ok, {{one_for_one, 1, 5}, ChildSpecs}}.
diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl
index 3853425..a4cc995 100644
--- a/src/ranch_listener_sup.erl
+++ b/src/ranch_listener_sup.erl
@@ -21,21 +21,22 @@
-spec start_link(ranch:ref(), module(), any(), module(), any())
-> {ok, pid()}.
start_link(Ref, Transport, TransOpts, Protocol, ProtoOpts) ->
+ NumAcceptors = maps:get(num_acceptors, TransOpts, 10),
MaxConns = maps:get(max_connections, TransOpts, 1024),
ranch_server:set_new_listener_opts(Ref, MaxConns, TransOpts, ProtoOpts,
[Ref, Transport, TransOpts, Protocol, ProtoOpts]),
supervisor:start_link(?MODULE, {
- Ref, Transport, Protocol
+ Ref, NumAcceptors, Transport, Protocol
}).
-init({Ref, Transport, Protocol}) ->
+init({Ref, NumAcceptors, Transport, Protocol}) ->
ok = ranch_server:set_listener_sup(Ref, self()),
ChildSpecs = [
- {ranch_conns_sup, {ranch_conns_sup, start_link,
- [Ref, Transport, Protocol]},
- permanent, infinity, supervisor, [ranch_conns_sup]},
+ {ranch_conns_sup_sup, {ranch_conns_sup_sup, start_link,
+ [Ref, NumAcceptors, Transport, Protocol]},
+ permanent, infinity, supervisor, [ranch_conns_sup_sup]},
{ranch_acceptors_sup, {ranch_acceptors_sup, start_link,
- [Ref, Transport]},
+ [Ref, NumAcceptors, Transport]},
permanent, infinity, supervisor, [ranch_acceptors_sup]}
],
{ok, {{rest_for_one, 1, 5}, ChildSpecs}}.
diff --git a/src/ranch_server.erl b/src/ranch_server.erl
index a767cd8..9116217 100644
--- a/src/ranch_server.erl
+++ b/src/ranch_server.erl
@@ -19,8 +19,10 @@
-export([start_link/0]).
-export([set_new_listener_opts/5]).
-export([cleanup_listener_opts/1]).
--export([set_connections_sup/2]).
--export([get_connections_sup/1]).
+-export([cleanup_connections_sups/1]).
+-export([set_connections_sup/3]).
+-export([get_connections_sup/2]).
+-export([get_connections_sups/1]).
-export([get_connections_sups/0]).
-export([set_listener_sup/2]).
-export([get_listener_sup/1]).
@@ -68,29 +70,40 @@ cleanup_listener_opts(Ref) ->
_ = ets:delete(?TAB, {trans_opts, Ref}),
_ = ets:delete(?TAB, {proto_opts, Ref}),
_ = ets:delete(?TAB, {listener_start_args, Ref}),
- %% We also remove the pid of the connections supervisor.
- %% Depending on the timing, it might already have been deleted
+ %% We also remove the pid of the connection supervisors.
+ %% Depending on the timing, they might already have been deleted
%% when we handled the monitor DOWN message. However, in some
%% cases when calling stop_listener followed by get_connections_sup,
%% we could end up with the pid still being returned, when we
%% expected a crash (because the listener was stopped).
%% Deleting it explictly here removes any possible confusion.
- _ = ets:delete(?TAB, {conns_sup, Ref}),
+ _ = ets:match_delete(?TAB, {{conns_sup, Ref, '_'}, '_'}),
%% Ditto for the listener supervisor.
_ = ets:delete(?TAB, {listener_sup, Ref}),
ok.
--spec set_connections_sup(ranch:ref(), pid()) -> ok.
-set_connections_sup(Ref, Pid) ->
- gen_server:call(?MODULE, {set_connections_sup, Ref, Pid}).
+-spec cleanup_connections_sups(ranch:ref()) -> ok.
+cleanup_connections_sups(Ref) ->
+ _ = ets:match_delete(?TAB, {{conns_sup, Ref, '_'}, '_'}),
+ ok.
+
+-spec set_connections_sup(ranch:ref(), non_neg_integer(), pid()) -> ok.
+set_connections_sup(Ref, AcceptorId, Pid) ->
+ gen_server:call(?MODULE, {set_connections_sup, Ref, AcceptorId, Pid}).
+
+-spec get_connections_sup(ranch:ref(), non_neg_integer()) -> pid().
+get_connections_sup(Ref, AcceptorId) ->
+ ets:lookup_element(?TAB, {conns_sup, Ref, AcceptorId}, 2).
--spec get_connections_sup(ranch:ref()) -> pid().
-get_connections_sup(Ref) ->
- ets:lookup_element(?TAB, {conns_sup, Ref}, 2).
+-spec get_connections_sups(ranch:ref()) -> [{non_neg_integer(), pid()}].
+get_connections_sups(Ref) ->
+ [{AcceptorId, Pid} ||
+ [AcceptorId, Pid] <- ets:match(?TAB, {{conns_sup, Ref, '$1'}, '$2'})].
--spec get_connections_sups() -> [{ranch:ref(), pid()}].
+-spec get_connections_sups() -> [{ranch:ref(), non_neg_integer(), pid()}].
get_connections_sups() ->
- [{Ref, Pid} || [Ref, Pid] <- ets:match(?TAB, {{conns_sup, '$1'}, '$2'})].
+ [{Ref, AcceptorId, Pid} ||
+ [Ref, AcceptorId, Pid] <- ets:match(?TAB, {{conns_sup, '$1', '$2'}, '$3'})].
-spec set_listener_sup(ranch:ref(), pid()) -> ok.
set_listener_sup(Ref, Pid) ->
@@ -142,13 +155,18 @@ get_listener_start_args(Ref) ->
-spec count_connections(ranch:ref()) -> non_neg_integer().
count_connections(Ref) ->
- ranch_conns_sup:active_connections(get_connections_sup(Ref)).
+ lists:foldl(
+ fun ({_, ConnsSup}, Acc) ->
+ Acc+ranch_conns_sup:active_connections(ConnsSup)
+ end,
+ 0,
+ get_connections_sups(Ref)).
%% gen_server.
init([]) ->
- ConnMonitors = [{{erlang:monitor(process, Pid), Pid}, {conns_sup, Ref}} ||
- [Ref, Pid] <- ets:match(?TAB, {{conns_sup, '$1'}, '$2'})],
+ ConnMonitors = [{{erlang:monitor(process, Pid), Pid}, {conns_sup, Ref, AcceptorId}} ||
+ [Ref, AcceptorId, Pid] <- ets:match(?TAB, {{conns_sup, '$1', '$2'}, '$3'})],
ListenerMonitors = [{{erlang:monitor(process, Pid), Pid}, {listener_sup, Ref}} ||
[Ref, Pid] <- ets:match(?TAB, {{listener_sup, '$1'}, '$2'})],
{ok, #state{monitors=ConnMonitors++ListenerMonitors}}.
@@ -159,8 +177,8 @@ handle_call({set_new_listener_opts, Ref, MaxConns, TransOpts, ProtoOpts, StartAr
ets:insert_new(?TAB, {{proto_opts, Ref}, ProtoOpts}),
ets:insert_new(?TAB, {{listener_start_args, Ref}, StartArgs}),
{reply, ok, State};
-handle_call({set_connections_sup, Ref, Pid}, _, State0) ->
- State = set_monitored_process({conns_sup, Ref}, Pid, State0),
+handle_call({set_connections_sup, Ref, AcceptorId, Pid}, _, State0) ->
+ State = set_monitored_process({conns_sup, Ref, AcceptorId}, Pid, State0),
{reply, ok, State};
handle_call({set_listener_sup, Ref, Pid}, _, State0) ->
State = set_monitored_process({listener_sup, Ref}, Pid, State0),
@@ -170,16 +188,14 @@ handle_call({set_addr, Ref, Addr}, _, State) ->
{reply, ok, State};
handle_call({set_max_conns, Ref, MaxConns}, _, State) ->
ets:insert(?TAB, {{max_conns, Ref}, MaxConns}),
- ConnsSup = get_connections_sup(Ref),
- ConnsSup ! {set_max_conns, MaxConns},
+ _ = [ConnsSup ! {set_max_conns, MaxConns} || {_, ConnsSup} <- get_connections_sups(Ref)],
{reply, ok, State};
handle_call({set_trans_opts, Ref, Opts}, _, State) ->
ets:insert(?TAB, {{trans_opts, Ref}, Opts}),
{reply, ok, State};
handle_call({set_proto_opts, Ref, Opts}, _, State) ->
ets:insert(?TAB, {{proto_opts, Ref}, Opts}),
- ConnsSup = get_connections_sup(Ref),
- ConnsSup ! {set_opts, Opts},
+ _ = [ConnsSup ! {set_opts, Opts} || {_, ConnsSup} <- get_connections_sups(Ref)],
{reply, ok, State};
handle_call(_Request, _From, State) ->
{reply, ignore, State}.