diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ranch.erl | 51 | ||||
-rw-r--r-- | src/ranch_acceptor.erl | 14 | ||||
-rw-r--r-- | src/ranch_acceptors_sup.erl | 14 | ||||
-rw-r--r-- | src/ranch_conns_sup.erl | 32 | ||||
-rw-r--r-- | src/ranch_conns_sup_sup.erl | 34 | ||||
-rw-r--r-- | src/ranch_listener_sup.erl | 13 | ||||
-rw-r--r-- | src/ranch_server.erl | 60 |
7 files changed, 147 insertions, 71 deletions
diff --git a/src/ranch.erl b/src/ranch.erl index 33a8256..2290dee 100644 --- a/src/ranch.erl +++ b/src/ranch.erl @@ -255,8 +255,11 @@ recv_proxy_header(Ref, Timeout) -> -spec remove_connection(ref()) -> ok. remove_connection(Ref) -> - ConnsSup = ranch_server:get_connections_sup(Ref), - ConnsSup ! {remove_connection, Ref, self()}, + ListenerSup = ranch_server:get_listener_sup(Ref), + {_, ConnsSupSup, _, _} = lists:keyfind(ranch_conns_sup_sup, 1, + supervisor:which_children(ListenerSup)), + _ = [ConnsSup ! {remove_connection, Ref, self()} || + {_, ConnsSup, _, _} <- supervisor:which_children(ConnsSupSup)], ok. -spec get_status(ref()) -> running | suspended. @@ -279,6 +282,16 @@ get_port(Ref) -> {_, Port} = get_addr(Ref), Port. +-spec get_connections(ref(), active|all) -> non_neg_integer(). +get_connections(Ref, active) -> + SupCounts = [ranch_conns_sup:active_connections(ConnsSup) || + {_, ConnsSup} <- ranch_server:get_connections_sups(Ref)], + lists:sum(SupCounts); +get_connections(Ref, all) -> + SupCounts = [proplists:get_value(active, supervisor:count_children(ConnsSup)) || + {_, ConnsSup} <- ranch_server:get_connections_sups(Ref)], + lists:sum(SupCounts). + -spec get_max_connections(ref()) -> max_conns(). get_max_connections(Ref) -> ranch_server:get_max_connections(Ref). @@ -321,7 +334,6 @@ info(Ref) -> listener_info(Ref, Pid) -> [_, Transport, _, Protocol, _] = ranch_server:get_listener_start_args(Ref), - ConnsSup = ranch_server:get_connections_sup(Ref), Status = get_status(Ref), {IP, Port} = get_addr(Ref), MaxConns = get_max_connections(Ref), @@ -333,8 +345,8 @@ listener_info(Ref, Pid) -> {ip, IP}, {port, Port}, {max_connections, MaxConns}, - {active_connections, ranch_conns_sup:active_connections(ConnsSup)}, - {all_connections, proplists:get_value(active, supervisor:count_children(ConnsSup))}, + {active_connections, get_connections(Ref, active)}, + {all_connections, get_connections(Ref, all)}, {transport, Transport}, {transport_options, TransOpts}, {protocol, Protocol}, @@ -342,20 +354,28 @@ listener_info(Ref, Pid) -> ]. -spec procs(ref(), acceptors | connections) -> [pid()]. -procs(Ref, acceptors) -> - procs1(Ref, ranch_acceptors_sup); -procs(Ref, connections) -> - procs1(Ref, ranch_conns_sup). - -procs1(Ref, Sup) -> +procs(Ref, Type) -> ListenerSup = ranch_server:get_listener_sup(Ref), - {_, SupPid, _, _} = lists:keyfind(Sup, 1, + procs1(ListenerSup, Type). + +procs1(ListenerSup, acceptors) -> + {_, SupPid, _, _} = lists:keyfind(ranch_acceptors_sup, 1, supervisor:which_children(ListenerSup)), try [Pid || {_, Pid, _, _} <- supervisor:which_children(SupPid)] - catch exit:{noproc, _} when Sup =:= ranch_acceptors_sup -> + catch exit:{noproc, _} -> [] - end. + end; +procs1(ListenerSup, connections) -> + {_, SupSupPid, _, _} = lists:keyfind(ranch_conns_sup_sup, 1, + supervisor:which_children(ListenerSup)), + Conns= + lists:map(fun ({_, SupPid, _, _}) -> + [Pid || {_, Pid, _, _} <- supervisor:which_children(SupPid)] + end, + supervisor:which_children(SupSupPid) + ), + lists:flatten(Conns). -spec wait_for_connections (ref(), '>' | '>=' | '==' | '=<', non_neg_integer()) -> ok; @@ -387,8 +407,7 @@ validate_interval(_) -> error(badarg). wait_for_connections_loop(Ref, Op, NumConns, Interval) -> CurConns = try - ConnsSup = ranch_server:get_connections_sup(Ref), - proplists:get_value(active, supervisor:count_children(ConnsSup)) + get_connections(Ref, all) catch _:_ -> 0 end, diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl index 3e426bd..935ec5c 100644 --- a/src/ranch_acceptor.erl +++ b/src/ranch_acceptor.erl @@ -15,23 +15,25 @@ -module(ranch_acceptor). -export([start_link/4]). --export([loop/4]). +-export([loop/5]). -spec start_link(inet:socket(), module(), module(), pid()) -> {ok, pid()}. start_link(LSocket, Transport, Logger, ConnsSup) -> - Pid = spawn_link(?MODULE, loop, [LSocket, Transport, Logger, ConnsSup]), + MonitorRef = monitor(process, ConnsSup), + Pid = spawn_link(?MODULE, loop, [LSocket, Transport, Logger, ConnsSup, MonitorRef]), {ok, Pid}. --spec loop(inet:socket(), module(), module(), pid()) -> no_return(). -loop(LSocket, Transport, Logger, ConnsSup) -> +-spec loop(inet:socket(), module(), module(), pid(), reference()) -> no_return(). +loop(LSocket, Transport, Logger, ConnsSup, MonitorRef) -> _ = case Transport:accept(LSocket, infinity) of {ok, CSocket} -> case Transport:controlling_process(CSocket, ConnsSup) of ok -> %% This call will not return until process has been started %% AND we are below the maximum number of connections. - ranch_conns_sup:start_protocol(ConnsSup, CSocket); + ranch_conns_sup:start_protocol(ConnsSup, MonitorRef, + CSocket); {error, _} -> Transport:close(CSocket) end; @@ -51,7 +53,7 @@ loop(LSocket, Transport, Logger, ConnsSup) -> ok end, flush(Logger), - ?MODULE:loop(LSocket, Transport, Logger, ConnsSup). + ?MODULE:loop(LSocket, Transport, Logger, ConnsSup, MonitorRef). flush(Logger) -> receive Msg -> diff --git a/src/ranch_acceptors_sup.erl b/src/ranch_acceptors_sup.erl index cdb633b..e320633 100644 --- a/src/ranch_acceptors_sup.erl +++ b/src/ranch_acceptors_sup.erl @@ -15,18 +15,16 @@ -module(ranch_acceptors_sup). -behaviour(supervisor). --export([start_link/2]). +-export([start_link/3]). -export([init/1]). --spec start_link(ranch:ref(), module()) +-spec start_link(ranch:ref(), pos_integer(), module()) -> {ok, pid()}. -start_link(Ref, Transport) -> - supervisor:start_link(?MODULE, [Ref, Transport]). +start_link(Ref, NumAcceptors, Transport) -> + supervisor:start_link(?MODULE, [Ref, NumAcceptors, Transport]). -init([Ref, Transport]) -> - ConnsSup = ranch_server:get_connections_sup(Ref), +init([Ref, NumAcceptors, Transport]) -> TransOpts = ranch_server:get_transport_options(Ref), - NumAcceptors = maps:get(num_acceptors, TransOpts, 10), Logger = maps:get(logger, TransOpts, error_logger), SocketOpts = maps:get(socket_opts, TransOpts, []), %% We temporarily put the logger in the process dictionary @@ -45,7 +43,7 @@ init([Ref, Transport]) -> ranch_server:set_addr(Ref, Addr), Procs = [ {{acceptor, self(), N}, {ranch_acceptor, start_link, [ - LSocket, Transport, Logger, ConnsSup + LSocket, Transport, Logger, ranch_server:get_connections_sup(Ref, N) ]}, permanent, brutal_kill, worker, []} || N <- lists:seq(1, NumAcceptors)], {ok, {{one_for_one, 1, 5}, Procs}}. diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl index d19405b..af3ba23 100644 --- a/src/ranch_conns_sup.erl +++ b/src/ranch_conns_sup.erl @@ -18,12 +18,12 @@ -module(ranch_conns_sup). %% API. --export([start_link/3]). --export([start_protocol/2]). +-export([start_link/4]). +-export([start_protocol/3]). -export([active_connections/1]). %% Supervisor internals. --export([init/4]). +-export([init/5]). -export([system_continue/3]). -export([system_terminate/4]). -export([system_code_change/4]). @@ -34,6 +34,7 @@ -record(state, { parent = undefined :: pid(), ref :: ranch:ref(), + acceptor_id :: non_neg_integer(), conn_type :: conn_type(), shutdown :: shutdown(), transport = undefined :: module(), @@ -46,10 +47,10 @@ %% API. --spec start_link(ranch:ref(), module(), module()) -> {ok, pid()}. -start_link(Ref, Transport, Protocol) -> +-spec start_link(ranch:ref(), non_neg_integer(), module(), module()) -> {ok, pid()}. +start_link(Ref, AcceptorId, Transport, Protocol) -> proc_lib:start_link(?MODULE, init, - [self(), Ref, Transport, Protocol]). + [self(), Ref, AcceptorId, Transport, Protocol]). %% We can safely assume we are on the same node as the supervisor. %% @@ -67,10 +68,15 @@ start_link(Ref, Transport, Protocol) -> %% We do not need the reply, we only need the ok from the supervisor %% to continue. The supervisor sends its own pid when the acceptor can %% continue. --spec start_protocol(pid(), inet:socket()) -> ok. -start_protocol(SupPid, Socket) -> +-spec start_protocol(pid(), reference(), inet:socket()) -> ok. +start_protocol(SupPid, MonitorRef, Socket) -> SupPid ! {?MODULE, start_protocol, self(), Socket}, - receive SupPid -> ok end. + receive + SupPid -> + ok; + {'DOWN', MonitorRef, process, SupPid, Reason} -> + error(Reason) + end. %% We can't make the above assumptions here. This function might be %% called from anywhere. @@ -94,10 +100,10 @@ active_connections(SupPid) -> %% Supervisor internals. --spec init(pid(), ranch:ref(), module(), module()) -> no_return(). -init(Parent, Ref, Transport, Protocol) -> +-spec init(pid(), ranch:ref(), non_neg_integer(), module(), module()) -> no_return(). +init(Parent, Ref, AcceptorId, Transport, Protocol) -> process_flag(trap_exit, true), - ok = ranch_server:set_connections_sup(Ref, self()), + ok = ranch_server:set_connections_sup(Ref, AcceptorId, self()), MaxConns = ranch_server:get_max_connections(Ref), TransOpts = ranch_server:get_transport_options(Ref), ConnType = maps:get(connection_type, TransOpts, worker), @@ -106,7 +112,7 @@ init(Parent, Ref, Transport, Protocol) -> Logger = maps:get(logger, TransOpts, error_logger), ProtoOpts = ranch_server:get_protocol_options(Ref), ok = proc_lib:init_ack(Parent, {ok, self()}), - loop(#state{parent=Parent, ref=Ref, conn_type=ConnType, + loop(#state{parent=Parent, ref=Ref, acceptor_id=AcceptorId, conn_type=ConnType, shutdown=Shutdown, transport=Transport, protocol=Protocol, opts=ProtoOpts, handshake_timeout=HandshakeTimeout, max_conns=MaxConns, logger=Logger}, 0, 0, []). diff --git a/src/ranch_conns_sup_sup.erl b/src/ranch_conns_sup_sup.erl new file mode 100644 index 0000000..423c5db --- /dev/null +++ b/src/ranch_conns_sup_sup.erl @@ -0,0 +1,34 @@ +%% Copyright (c) 2019, Jan Uhlig <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(ranch_conns_sup_sup). + +-behaviour(supervisor). + +-export([start_link/4]). +-export([init/1]). + +start_link(Ref, NumAcceptors, Transport, Protocol) -> + ok = ranch_server:cleanup_connections_sups(Ref), + supervisor:start_link(?MODULE, { + Ref, NumAcceptors, Transport, Protocol + }). + +init({Ref, NumAcceptors, Transport, Protocol}) -> + ChildSpecs = [ + {{ranch_conns_sup, N}, {ranch_conns_sup, start_link, + [Ref, N, Transport, Protocol]}, + permanent, infinity, supervisor, [ranch_conns_sup]} + || N <- lists:seq(1, NumAcceptors)], + {ok, {{one_for_one, 1, 5}, ChildSpecs}}. diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl index 3853425..a4cc995 100644 --- a/src/ranch_listener_sup.erl +++ b/src/ranch_listener_sup.erl @@ -21,21 +21,22 @@ -spec start_link(ranch:ref(), module(), any(), module(), any()) -> {ok, pid()}. start_link(Ref, Transport, TransOpts, Protocol, ProtoOpts) -> + NumAcceptors = maps:get(num_acceptors, TransOpts, 10), MaxConns = maps:get(max_connections, TransOpts, 1024), ranch_server:set_new_listener_opts(Ref, MaxConns, TransOpts, ProtoOpts, [Ref, Transport, TransOpts, Protocol, ProtoOpts]), supervisor:start_link(?MODULE, { - Ref, Transport, Protocol + Ref, NumAcceptors, Transport, Protocol }). -init({Ref, Transport, Protocol}) -> +init({Ref, NumAcceptors, Transport, Protocol}) -> ok = ranch_server:set_listener_sup(Ref, self()), ChildSpecs = [ - {ranch_conns_sup, {ranch_conns_sup, start_link, - [Ref, Transport, Protocol]}, - permanent, infinity, supervisor, [ranch_conns_sup]}, + {ranch_conns_sup_sup, {ranch_conns_sup_sup, start_link, + [Ref, NumAcceptors, Transport, Protocol]}, + permanent, infinity, supervisor, [ranch_conns_sup_sup]}, {ranch_acceptors_sup, {ranch_acceptors_sup, start_link, - [Ref, Transport]}, + [Ref, NumAcceptors, Transport]}, permanent, infinity, supervisor, [ranch_acceptors_sup]} ], {ok, {{rest_for_one, 1, 5}, ChildSpecs}}. diff --git a/src/ranch_server.erl b/src/ranch_server.erl index a767cd8..9116217 100644 --- a/src/ranch_server.erl +++ b/src/ranch_server.erl @@ -19,8 +19,10 @@ -export([start_link/0]). -export([set_new_listener_opts/5]). -export([cleanup_listener_opts/1]). --export([set_connections_sup/2]). --export([get_connections_sup/1]). +-export([cleanup_connections_sups/1]). +-export([set_connections_sup/3]). +-export([get_connections_sup/2]). +-export([get_connections_sups/1]). -export([get_connections_sups/0]). -export([set_listener_sup/2]). -export([get_listener_sup/1]). @@ -68,29 +70,40 @@ cleanup_listener_opts(Ref) -> _ = ets:delete(?TAB, {trans_opts, Ref}), _ = ets:delete(?TAB, {proto_opts, Ref}), _ = ets:delete(?TAB, {listener_start_args, Ref}), - %% We also remove the pid of the connections supervisor. - %% Depending on the timing, it might already have been deleted + %% We also remove the pid of the connection supervisors. + %% Depending on the timing, they might already have been deleted %% when we handled the monitor DOWN message. However, in some %% cases when calling stop_listener followed by get_connections_sup, %% we could end up with the pid still being returned, when we %% expected a crash (because the listener was stopped). %% Deleting it explictly here removes any possible confusion. - _ = ets:delete(?TAB, {conns_sup, Ref}), + _ = ets:match_delete(?TAB, {{conns_sup, Ref, '_'}, '_'}), %% Ditto for the listener supervisor. _ = ets:delete(?TAB, {listener_sup, Ref}), ok. --spec set_connections_sup(ranch:ref(), pid()) -> ok. -set_connections_sup(Ref, Pid) -> - gen_server:call(?MODULE, {set_connections_sup, Ref, Pid}). +-spec cleanup_connections_sups(ranch:ref()) -> ok. +cleanup_connections_sups(Ref) -> + _ = ets:match_delete(?TAB, {{conns_sup, Ref, '_'}, '_'}), + ok. + +-spec set_connections_sup(ranch:ref(), non_neg_integer(), pid()) -> ok. +set_connections_sup(Ref, AcceptorId, Pid) -> + gen_server:call(?MODULE, {set_connections_sup, Ref, AcceptorId, Pid}). + +-spec get_connections_sup(ranch:ref(), non_neg_integer()) -> pid(). +get_connections_sup(Ref, AcceptorId) -> + ets:lookup_element(?TAB, {conns_sup, Ref, AcceptorId}, 2). --spec get_connections_sup(ranch:ref()) -> pid(). -get_connections_sup(Ref) -> - ets:lookup_element(?TAB, {conns_sup, Ref}, 2). +-spec get_connections_sups(ranch:ref()) -> [{non_neg_integer(), pid()}]. +get_connections_sups(Ref) -> + [{AcceptorId, Pid} || + [AcceptorId, Pid] <- ets:match(?TAB, {{conns_sup, Ref, '$1'}, '$2'})]. --spec get_connections_sups() -> [{ranch:ref(), pid()}]. +-spec get_connections_sups() -> [{ranch:ref(), non_neg_integer(), pid()}]. get_connections_sups() -> - [{Ref, Pid} || [Ref, Pid] <- ets:match(?TAB, {{conns_sup, '$1'}, '$2'})]. + [{Ref, AcceptorId, Pid} || + [Ref, AcceptorId, Pid] <- ets:match(?TAB, {{conns_sup, '$1', '$2'}, '$3'})]. -spec set_listener_sup(ranch:ref(), pid()) -> ok. set_listener_sup(Ref, Pid) -> @@ -142,13 +155,18 @@ get_listener_start_args(Ref) -> -spec count_connections(ranch:ref()) -> non_neg_integer(). count_connections(Ref) -> - ranch_conns_sup:active_connections(get_connections_sup(Ref)). + lists:foldl( + fun ({_, ConnsSup}, Acc) -> + Acc+ranch_conns_sup:active_connections(ConnsSup) + end, + 0, + get_connections_sups(Ref)). %% gen_server. init([]) -> - ConnMonitors = [{{erlang:monitor(process, Pid), Pid}, {conns_sup, Ref}} || - [Ref, Pid] <- ets:match(?TAB, {{conns_sup, '$1'}, '$2'})], + ConnMonitors = [{{erlang:monitor(process, Pid), Pid}, {conns_sup, Ref, AcceptorId}} || + [Ref, AcceptorId, Pid] <- ets:match(?TAB, {{conns_sup, '$1', '$2'}, '$3'})], ListenerMonitors = [{{erlang:monitor(process, Pid), Pid}, {listener_sup, Ref}} || [Ref, Pid] <- ets:match(?TAB, {{listener_sup, '$1'}, '$2'})], {ok, #state{monitors=ConnMonitors++ListenerMonitors}}. @@ -159,8 +177,8 @@ handle_call({set_new_listener_opts, Ref, MaxConns, TransOpts, ProtoOpts, StartAr ets:insert_new(?TAB, {{proto_opts, Ref}, ProtoOpts}), ets:insert_new(?TAB, {{listener_start_args, Ref}, StartArgs}), {reply, ok, State}; -handle_call({set_connections_sup, Ref, Pid}, _, State0) -> - State = set_monitored_process({conns_sup, Ref}, Pid, State0), +handle_call({set_connections_sup, Ref, AcceptorId, Pid}, _, State0) -> + State = set_monitored_process({conns_sup, Ref, AcceptorId}, Pid, State0), {reply, ok, State}; handle_call({set_listener_sup, Ref, Pid}, _, State0) -> State = set_monitored_process({listener_sup, Ref}, Pid, State0), @@ -170,16 +188,14 @@ handle_call({set_addr, Ref, Addr}, _, State) -> {reply, ok, State}; handle_call({set_max_conns, Ref, MaxConns}, _, State) -> ets:insert(?TAB, {{max_conns, Ref}, MaxConns}), - ConnsSup = get_connections_sup(Ref), - ConnsSup ! {set_max_conns, MaxConns}, + _ = [ConnsSup ! {set_max_conns, MaxConns} || {_, ConnsSup} <- get_connections_sups(Ref)], {reply, ok, State}; handle_call({set_trans_opts, Ref, Opts}, _, State) -> ets:insert(?TAB, {{trans_opts, Ref}, Opts}), {reply, ok, State}; handle_call({set_proto_opts, Ref, Opts}, _, State) -> ets:insert(?TAB, {{proto_opts, Ref}, Opts}), - ConnsSup = get_connections_sup(Ref), - ConnsSup ! {set_opts, Opts}, + _ = [ConnsSup ! {set_opts, Opts} || {_, ConnsSup} <- get_connections_sups(Ref)], {reply, ok, State}; handle_call(_Request, _From, State) -> {reply, ignore, State}. |