diff options
-rw-r--r-- | ebin/ranch.app | 2 | ||||
-rw-r--r-- | src/ranch.erl | 51 | ||||
-rw-r--r-- | src/ranch_acceptor.erl | 14 | ||||
-rw-r--r-- | src/ranch_acceptors_sup.erl | 14 | ||||
-rw-r--r-- | src/ranch_conns_sup.erl | 32 | ||||
-rw-r--r-- | src/ranch_conns_sup_sup.erl | 34 | ||||
-rw-r--r-- | src/ranch_listener_sup.erl | 13 | ||||
-rw-r--r-- | src/ranch_server.erl | 60 | ||||
-rw-r--r-- | test/acceptor_SUITE.erl | 79 | ||||
-rw-r--r-- | test/shutdown_SUITE.erl | 27 |
10 files changed, 215 insertions, 111 deletions
diff --git a/ebin/ranch.app b/ebin/ranch.app index ca9b50a..682943b 100644 --- a/ebin/ranch.app +++ b/ebin/ranch.app @@ -1,7 +1,7 @@ {application, 'ranch', [ {description, "Socket acceptor pool for TCP protocols."}, {vsn, "1.7.1"}, - {modules, ['ranch','ranch_acceptor','ranch_acceptors_sup','ranch_app','ranch_conns_sup','ranch_crc32c','ranch_listener_sup','ranch_protocol','ranch_proxy_header','ranch_server','ranch_ssl','ranch_sup','ranch_tcp','ranch_transport']}, + {modules, ['ranch','ranch_acceptor','ranch_acceptors_sup','ranch_app','ranch_conns_sup','ranch_conns_sup_sup','ranch_crc32c','ranch_listener_sup','ranch_protocol','ranch_proxy_header','ranch_server','ranch_ssl','ranch_sup','ranch_tcp','ranch_transport']}, {registered, [ranch_sup,ranch_server]}, {applications, [kernel,stdlib,ssl]}, {mod, {ranch_app, []}}, diff --git a/src/ranch.erl b/src/ranch.erl index 33a8256..2290dee 100644 --- a/src/ranch.erl +++ b/src/ranch.erl @@ -255,8 +255,11 @@ recv_proxy_header(Ref, Timeout) -> -spec remove_connection(ref()) -> ok. remove_connection(Ref) -> - ConnsSup = ranch_server:get_connections_sup(Ref), - ConnsSup ! {remove_connection, Ref, self()}, + ListenerSup = ranch_server:get_listener_sup(Ref), + {_, ConnsSupSup, _, _} = lists:keyfind(ranch_conns_sup_sup, 1, + supervisor:which_children(ListenerSup)), + _ = [ConnsSup ! {remove_connection, Ref, self()} || + {_, ConnsSup, _, _} <- supervisor:which_children(ConnsSupSup)], ok. -spec get_status(ref()) -> running | suspended. @@ -279,6 +282,16 @@ get_port(Ref) -> {_, Port} = get_addr(Ref), Port. +-spec get_connections(ref(), active|all) -> non_neg_integer(). +get_connections(Ref, active) -> + SupCounts = [ranch_conns_sup:active_connections(ConnsSup) || + {_, ConnsSup} <- ranch_server:get_connections_sups(Ref)], + lists:sum(SupCounts); +get_connections(Ref, all) -> + SupCounts = [proplists:get_value(active, supervisor:count_children(ConnsSup)) || + {_, ConnsSup} <- ranch_server:get_connections_sups(Ref)], + lists:sum(SupCounts). + -spec get_max_connections(ref()) -> max_conns(). get_max_connections(Ref) -> ranch_server:get_max_connections(Ref). @@ -321,7 +334,6 @@ info(Ref) -> listener_info(Ref, Pid) -> [_, Transport, _, Protocol, _] = ranch_server:get_listener_start_args(Ref), - ConnsSup = ranch_server:get_connections_sup(Ref), Status = get_status(Ref), {IP, Port} = get_addr(Ref), MaxConns = get_max_connections(Ref), @@ -333,8 +345,8 @@ listener_info(Ref, Pid) -> {ip, IP}, {port, Port}, {max_connections, MaxConns}, - {active_connections, ranch_conns_sup:active_connections(ConnsSup)}, - {all_connections, proplists:get_value(active, supervisor:count_children(ConnsSup))}, + {active_connections, get_connections(Ref, active)}, + {all_connections, get_connections(Ref, all)}, {transport, Transport}, {transport_options, TransOpts}, {protocol, Protocol}, @@ -342,20 +354,28 @@ listener_info(Ref, Pid) -> ]. -spec procs(ref(), acceptors | connections) -> [pid()]. -procs(Ref, acceptors) -> - procs1(Ref, ranch_acceptors_sup); -procs(Ref, connections) -> - procs1(Ref, ranch_conns_sup). - -procs1(Ref, Sup) -> +procs(Ref, Type) -> ListenerSup = ranch_server:get_listener_sup(Ref), - {_, SupPid, _, _} = lists:keyfind(Sup, 1, + procs1(ListenerSup, Type). + +procs1(ListenerSup, acceptors) -> + {_, SupPid, _, _} = lists:keyfind(ranch_acceptors_sup, 1, supervisor:which_children(ListenerSup)), try [Pid || {_, Pid, _, _} <- supervisor:which_children(SupPid)] - catch exit:{noproc, _} when Sup =:= ranch_acceptors_sup -> + catch exit:{noproc, _} -> [] - end. + end; +procs1(ListenerSup, connections) -> + {_, SupSupPid, _, _} = lists:keyfind(ranch_conns_sup_sup, 1, + supervisor:which_children(ListenerSup)), + Conns= + lists:map(fun ({_, SupPid, _, _}) -> + [Pid || {_, Pid, _, _} <- supervisor:which_children(SupPid)] + end, + supervisor:which_children(SupSupPid) + ), + lists:flatten(Conns). -spec wait_for_connections (ref(), '>' | '>=' | '==' | '=<', non_neg_integer()) -> ok; @@ -387,8 +407,7 @@ validate_interval(_) -> error(badarg). wait_for_connections_loop(Ref, Op, NumConns, Interval) -> CurConns = try - ConnsSup = ranch_server:get_connections_sup(Ref), - proplists:get_value(active, supervisor:count_children(ConnsSup)) + get_connections(Ref, all) catch _:_ -> 0 end, diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl index 3e426bd..935ec5c 100644 --- a/src/ranch_acceptor.erl +++ b/src/ranch_acceptor.erl @@ -15,23 +15,25 @@ -module(ranch_acceptor). -export([start_link/4]). --export([loop/4]). +-export([loop/5]). -spec start_link(inet:socket(), module(), module(), pid()) -> {ok, pid()}. start_link(LSocket, Transport, Logger, ConnsSup) -> - Pid = spawn_link(?MODULE, loop, [LSocket, Transport, Logger, ConnsSup]), + MonitorRef = monitor(process, ConnsSup), + Pid = spawn_link(?MODULE, loop, [LSocket, Transport, Logger, ConnsSup, MonitorRef]), {ok, Pid}. --spec loop(inet:socket(), module(), module(), pid()) -> no_return(). -loop(LSocket, Transport, Logger, ConnsSup) -> +-spec loop(inet:socket(), module(), module(), pid(), reference()) -> no_return(). +loop(LSocket, Transport, Logger, ConnsSup, MonitorRef) -> _ = case Transport:accept(LSocket, infinity) of {ok, CSocket} -> case Transport:controlling_process(CSocket, ConnsSup) of ok -> %% This call will not return until process has been started %% AND we are below the maximum number of connections. - ranch_conns_sup:start_protocol(ConnsSup, CSocket); + ranch_conns_sup:start_protocol(ConnsSup, MonitorRef, + CSocket); {error, _} -> Transport:close(CSocket) end; @@ -51,7 +53,7 @@ loop(LSocket, Transport, Logger, ConnsSup) -> ok end, flush(Logger), - ?MODULE:loop(LSocket, Transport, Logger, ConnsSup). + ?MODULE:loop(LSocket, Transport, Logger, ConnsSup, MonitorRef). flush(Logger) -> receive Msg -> diff --git a/src/ranch_acceptors_sup.erl b/src/ranch_acceptors_sup.erl index cdb633b..e320633 100644 --- a/src/ranch_acceptors_sup.erl +++ b/src/ranch_acceptors_sup.erl @@ -15,18 +15,16 @@ -module(ranch_acceptors_sup). -behaviour(supervisor). --export([start_link/2]). +-export([start_link/3]). -export([init/1]). --spec start_link(ranch:ref(), module()) +-spec start_link(ranch:ref(), pos_integer(), module()) -> {ok, pid()}. -start_link(Ref, Transport) -> - supervisor:start_link(?MODULE, [Ref, Transport]). +start_link(Ref, NumAcceptors, Transport) -> + supervisor:start_link(?MODULE, [Ref, NumAcceptors, Transport]). -init([Ref, Transport]) -> - ConnsSup = ranch_server:get_connections_sup(Ref), +init([Ref, NumAcceptors, Transport]) -> TransOpts = ranch_server:get_transport_options(Ref), - NumAcceptors = maps:get(num_acceptors, TransOpts, 10), Logger = maps:get(logger, TransOpts, error_logger), SocketOpts = maps:get(socket_opts, TransOpts, []), %% We temporarily put the logger in the process dictionary @@ -45,7 +43,7 @@ init([Ref, Transport]) -> ranch_server:set_addr(Ref, Addr), Procs = [ {{acceptor, self(), N}, {ranch_acceptor, start_link, [ - LSocket, Transport, Logger, ConnsSup + LSocket, Transport, Logger, ranch_server:get_connections_sup(Ref, N) ]}, permanent, brutal_kill, worker, []} || N <- lists:seq(1, NumAcceptors)], {ok, {{one_for_one, 1, 5}, Procs}}. diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl index d19405b..af3ba23 100644 --- a/src/ranch_conns_sup.erl +++ b/src/ranch_conns_sup.erl @@ -18,12 +18,12 @@ -module(ranch_conns_sup). %% API. --export([start_link/3]). --export([start_protocol/2]). +-export([start_link/4]). +-export([start_protocol/3]). -export([active_connections/1]). %% Supervisor internals. --export([init/4]). +-export([init/5]). -export([system_continue/3]). -export([system_terminate/4]). -export([system_code_change/4]). @@ -34,6 +34,7 @@ -record(state, { parent = undefined :: pid(), ref :: ranch:ref(), + acceptor_id :: non_neg_integer(), conn_type :: conn_type(), shutdown :: shutdown(), transport = undefined :: module(), @@ -46,10 +47,10 @@ %% API. --spec start_link(ranch:ref(), module(), module()) -> {ok, pid()}. -start_link(Ref, Transport, Protocol) -> +-spec start_link(ranch:ref(), non_neg_integer(), module(), module()) -> {ok, pid()}. +start_link(Ref, AcceptorId, Transport, Protocol) -> proc_lib:start_link(?MODULE, init, - [self(), Ref, Transport, Protocol]). + [self(), Ref, AcceptorId, Transport, Protocol]). %% We can safely assume we are on the same node as the supervisor. %% @@ -67,10 +68,15 @@ start_link(Ref, Transport, Protocol) -> %% We do not need the reply, we only need the ok from the supervisor %% to continue. The supervisor sends its own pid when the acceptor can %% continue. --spec start_protocol(pid(), inet:socket()) -> ok. -start_protocol(SupPid, Socket) -> +-spec start_protocol(pid(), reference(), inet:socket()) -> ok. +start_protocol(SupPid, MonitorRef, Socket) -> SupPid ! {?MODULE, start_protocol, self(), Socket}, - receive SupPid -> ok end. + receive + SupPid -> + ok; + {'DOWN', MonitorRef, process, SupPid, Reason} -> + error(Reason) + end. %% We can't make the above assumptions here. This function might be %% called from anywhere. @@ -94,10 +100,10 @@ active_connections(SupPid) -> %% Supervisor internals. --spec init(pid(), ranch:ref(), module(), module()) -> no_return(). -init(Parent, Ref, Transport, Protocol) -> +-spec init(pid(), ranch:ref(), non_neg_integer(), module(), module()) -> no_return(). +init(Parent, Ref, AcceptorId, Transport, Protocol) -> process_flag(trap_exit, true), - ok = ranch_server:set_connections_sup(Ref, self()), + ok = ranch_server:set_connections_sup(Ref, AcceptorId, self()), MaxConns = ranch_server:get_max_connections(Ref), TransOpts = ranch_server:get_transport_options(Ref), ConnType = maps:get(connection_type, TransOpts, worker), @@ -106,7 +112,7 @@ init(Parent, Ref, Transport, Protocol) -> Logger = maps:get(logger, TransOpts, error_logger), ProtoOpts = ranch_server:get_protocol_options(Ref), ok = proc_lib:init_ack(Parent, {ok, self()}), - loop(#state{parent=Parent, ref=Ref, conn_type=ConnType, + loop(#state{parent=Parent, ref=Ref, acceptor_id=AcceptorId, conn_type=ConnType, shutdown=Shutdown, transport=Transport, protocol=Protocol, opts=ProtoOpts, handshake_timeout=HandshakeTimeout, max_conns=MaxConns, logger=Logger}, 0, 0, []). diff --git a/src/ranch_conns_sup_sup.erl b/src/ranch_conns_sup_sup.erl new file mode 100644 index 0000000..423c5db --- /dev/null +++ b/src/ranch_conns_sup_sup.erl @@ -0,0 +1,34 @@ +%% Copyright (c) 2019, Jan Uhlig <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(ranch_conns_sup_sup). + +-behaviour(supervisor). + +-export([start_link/4]). +-export([init/1]). + +start_link(Ref, NumAcceptors, Transport, Protocol) -> + ok = ranch_server:cleanup_connections_sups(Ref), + supervisor:start_link(?MODULE, { + Ref, NumAcceptors, Transport, Protocol + }). + +init({Ref, NumAcceptors, Transport, Protocol}) -> + ChildSpecs = [ + {{ranch_conns_sup, N}, {ranch_conns_sup, start_link, + [Ref, N, Transport, Protocol]}, + permanent, infinity, supervisor, [ranch_conns_sup]} + || N <- lists:seq(1, NumAcceptors)], + {ok, {{one_for_one, 1, 5}, ChildSpecs}}. diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl index 3853425..a4cc995 100644 --- a/src/ranch_listener_sup.erl +++ b/src/ranch_listener_sup.erl @@ -21,21 +21,22 @@ -spec start_link(ranch:ref(), module(), any(), module(), any()) -> {ok, pid()}. start_link(Ref, Transport, TransOpts, Protocol, ProtoOpts) -> + NumAcceptors = maps:get(num_acceptors, TransOpts, 10), MaxConns = maps:get(max_connections, TransOpts, 1024), ranch_server:set_new_listener_opts(Ref, MaxConns, TransOpts, ProtoOpts, [Ref, Transport, TransOpts, Protocol, ProtoOpts]), supervisor:start_link(?MODULE, { - Ref, Transport, Protocol + Ref, NumAcceptors, Transport, Protocol }). -init({Ref, Transport, Protocol}) -> +init({Ref, NumAcceptors, Transport, Protocol}) -> ok = ranch_server:set_listener_sup(Ref, self()), ChildSpecs = [ - {ranch_conns_sup, {ranch_conns_sup, start_link, - [Ref, Transport, Protocol]}, - permanent, infinity, supervisor, [ranch_conns_sup]}, + {ranch_conns_sup_sup, {ranch_conns_sup_sup, start_link, + [Ref, NumAcceptors, Transport, Protocol]}, + permanent, infinity, supervisor, [ranch_conns_sup_sup]}, {ranch_acceptors_sup, {ranch_acceptors_sup, start_link, - [Ref, Transport]}, + [Ref, NumAcceptors, Transport]}, permanent, infinity, supervisor, [ranch_acceptors_sup]} ], {ok, {{rest_for_one, 1, 5}, ChildSpecs}}. diff --git a/src/ranch_server.erl b/src/ranch_server.erl index a767cd8..9116217 100644 --- a/src/ranch_server.erl +++ b/src/ranch_server.erl @@ -19,8 +19,10 @@ -export([start_link/0]). -export([set_new_listener_opts/5]). -export([cleanup_listener_opts/1]). --export([set_connections_sup/2]). --export([get_connections_sup/1]). +-export([cleanup_connections_sups/1]). +-export([set_connections_sup/3]). +-export([get_connections_sup/2]). +-export([get_connections_sups/1]). -export([get_connections_sups/0]). -export([set_listener_sup/2]). -export([get_listener_sup/1]). @@ -68,29 +70,40 @@ cleanup_listener_opts(Ref) -> _ = ets:delete(?TAB, {trans_opts, Ref}), _ = ets:delete(?TAB, {proto_opts, Ref}), _ = ets:delete(?TAB, {listener_start_args, Ref}), - %% We also remove the pid of the connections supervisor. - %% Depending on the timing, it might already have been deleted + %% We also remove the pid of the connection supervisors. + %% Depending on the timing, they might already have been deleted %% when we handled the monitor DOWN message. However, in some %% cases when calling stop_listener followed by get_connections_sup, %% we could end up with the pid still being returned, when we %% expected a crash (because the listener was stopped). %% Deleting it explictly here removes any possible confusion. - _ = ets:delete(?TAB, {conns_sup, Ref}), + _ = ets:match_delete(?TAB, {{conns_sup, Ref, '_'}, '_'}), %% Ditto for the listener supervisor. _ = ets:delete(?TAB, {listener_sup, Ref}), ok. --spec set_connections_sup(ranch:ref(), pid()) -> ok. -set_connections_sup(Ref, Pid) -> - gen_server:call(?MODULE, {set_connections_sup, Ref, Pid}). +-spec cleanup_connections_sups(ranch:ref()) -> ok. +cleanup_connections_sups(Ref) -> + _ = ets:match_delete(?TAB, {{conns_sup, Ref, '_'}, '_'}), + ok. + +-spec set_connections_sup(ranch:ref(), non_neg_integer(), pid()) -> ok. +set_connections_sup(Ref, AcceptorId, Pid) -> + gen_server:call(?MODULE, {set_connections_sup, Ref, AcceptorId, Pid}). + +-spec get_connections_sup(ranch:ref(), non_neg_integer()) -> pid(). +get_connections_sup(Ref, AcceptorId) -> + ets:lookup_element(?TAB, {conns_sup, Ref, AcceptorId}, 2). --spec get_connections_sup(ranch:ref()) -> pid(). -get_connections_sup(Ref) -> - ets:lookup_element(?TAB, {conns_sup, Ref}, 2). +-spec get_connections_sups(ranch:ref()) -> [{non_neg_integer(), pid()}]. +get_connections_sups(Ref) -> + [{AcceptorId, Pid} || + [AcceptorId, Pid] <- ets:match(?TAB, {{conns_sup, Ref, '$1'}, '$2'})]. --spec get_connections_sups() -> [{ranch:ref(), pid()}]. +-spec get_connections_sups() -> [{ranch:ref(), non_neg_integer(), pid()}]. get_connections_sups() -> - [{Ref, Pid} || [Ref, Pid] <- ets:match(?TAB, {{conns_sup, '$1'}, '$2'})]. + [{Ref, AcceptorId, Pid} || + [Ref, AcceptorId, Pid] <- ets:match(?TAB, {{conns_sup, '$1', '$2'}, '$3'})]. -spec set_listener_sup(ranch:ref(), pid()) -> ok. set_listener_sup(Ref, Pid) -> @@ -142,13 +155,18 @@ get_listener_start_args(Ref) -> -spec count_connections(ranch:ref()) -> non_neg_integer(). count_connections(Ref) -> - ranch_conns_sup:active_connections(get_connections_sup(Ref)). + lists:foldl( + fun ({_, ConnsSup}, Acc) -> + Acc+ranch_conns_sup:active_connections(ConnsSup) + end, + 0, + get_connections_sups(Ref)). %% gen_server. init([]) -> - ConnMonitors = [{{erlang:monitor(process, Pid), Pid}, {conns_sup, Ref}} || - [Ref, Pid] <- ets:match(?TAB, {{conns_sup, '$1'}, '$2'})], + ConnMonitors = [{{erlang:monitor(process, Pid), Pid}, {conns_sup, Ref, AcceptorId}} || + [Ref, AcceptorId, Pid] <- ets:match(?TAB, {{conns_sup, '$1', '$2'}, '$3'})], ListenerMonitors = [{{erlang:monitor(process, Pid), Pid}, {listener_sup, Ref}} || [Ref, Pid] <- ets:match(?TAB, {{listener_sup, '$1'}, '$2'})], {ok, #state{monitors=ConnMonitors++ListenerMonitors}}. @@ -159,8 +177,8 @@ handle_call({set_new_listener_opts, Ref, MaxConns, TransOpts, ProtoOpts, StartAr ets:insert_new(?TAB, {{proto_opts, Ref}, ProtoOpts}), ets:insert_new(?TAB, {{listener_start_args, Ref}, StartArgs}), {reply, ok, State}; -handle_call({set_connections_sup, Ref, Pid}, _, State0) -> - State = set_monitored_process({conns_sup, Ref}, Pid, State0), +handle_call({set_connections_sup, Ref, AcceptorId, Pid}, _, State0) -> + State = set_monitored_process({conns_sup, Ref, AcceptorId}, Pid, State0), {reply, ok, State}; handle_call({set_listener_sup, Ref, Pid}, _, State0) -> State = set_monitored_process({listener_sup, Ref}, Pid, State0), @@ -170,16 +188,14 @@ handle_call({set_addr, Ref, Addr}, _, State) -> {reply, ok, State}; handle_call({set_max_conns, Ref, MaxConns}, _, State) -> ets:insert(?TAB, {{max_conns, Ref}, MaxConns}), - ConnsSup = get_connections_sup(Ref), - ConnsSup ! {set_max_conns, MaxConns}, + _ = [ConnsSup ! {set_max_conns, MaxConns} || {_, ConnsSup} <- get_connections_sups(Ref)], {reply, ok, State}; handle_call({set_trans_opts, Ref, Opts}, _, State) -> ets:insert(?TAB, {{trans_opts, Ref}, Opts}), {reply, ok, State}; handle_call({set_proto_opts, Ref, Opts}, _, State) -> ets:insert(?TAB, {{proto_opts, Ref}, Opts}), - ConnsSup = get_connections_sup(Ref), - ConnsSup ! {set_opts, Opts}, + _ = [ConnsSup ! {set_opts, Opts} || {_, ConnsSup} <- get_connections_sups(Ref)], {reply, ok, State}; handle_call(_Request, _From, State) -> {reply, ignore, State}. diff --git a/test/acceptor_SUITE.erl b/test/acceptor_SUITE.erl index 3a58fd9..dce006a 100644 --- a/test/acceptor_SUITE.erl +++ b/test/acceptor_SUITE.erl @@ -297,11 +297,11 @@ misc_repeated_remove(_) -> ranch_tcp, #{}, remove_conn_and_wait_protocol, [{remove, 5, 0}]), Port = ranch:get_port(Name), - ConnsSup = ranch_server:get_connections_sup(Name), + ConnsSups = lists:sort(ranch_server:get_connections_sups(Name)), {ok, _} = gen_tcp:connect("localhost", Port, [binary, {active, false}, {packet, raw}]), timer:sleep(1000), - ConnsSup = ranch_server:get_connections_sup(Name), - true = erlang:is_process_alive(ConnsSup), + ConnsSups = lists:sort(ranch_server:get_connections_sups(Name)), + true = lists:all(fun ({_, ConnsSup}) -> erlang:is_process_alive(ConnsSup) end, ConnsSups), ok = ranch:stop_listener(Name). misc_wait_for_connections(_) -> @@ -759,8 +759,8 @@ tcp_max_connections_and_beyond(_) -> ok = connect_loop(Port, 10, 0), receive after 250 -> ok end, 0 = ranch_server:count_connections(Name), - 10 = length(supervisor:which_children(ranch_server:get_connections_sup(Name))), - Counts = supervisor:count_children(ranch_server:get_connections_sup(Name)), + 10 = length(do_conns_which_children(Name)), + Counts = do_conns_count_children(Name), {_, 1} = lists:keyfind(specs, 1, Counts), {_, 0} = lists:keyfind(supervisors, 1, Counts), {_, 10} = lists:keyfind(active, 1, Counts), @@ -770,8 +770,8 @@ tcp_max_connections_and_beyond(_) -> ok = connect_loop(Port, 10, 0), receive after 250 -> ok end, 10 = ranch_server:count_connections(Name), - 20 = length(supervisor:which_children(ranch_server:get_connections_sup(Name))), - Counts2 = supervisor:count_children(ranch_server:get_connections_sup(Name)), + 20 = length(do_conns_which_children(Name)), + Counts2 = do_conns_count_children(Name), {_, 20} = lists:keyfind(active, 1, Counts2), {_, 20} = lists:keyfind(workers, 1, Counts2), ok = ranch:stop_listener(Name). @@ -943,8 +943,7 @@ connection_type_supervisor(_) -> {ok, Socket} = gen_tcp:connect("localhost", Port, [binary, {active, false}, {packet, raw}]), ok = gen_tcp:send(Socket, <<"TCP Ranch is working!">>), {ok, <<"TCP Ranch is working!">>} = gen_tcp:recv(Socket, 21, 1000), - ConnsSup = ranch_server:get_connections_sup(Name), - [{echo_protocol, _, supervisor, [echo_protocol]}] = supervisor:which_children(ConnsSup), + [{echo_protocol, _, supervisor, [echo_protocol]}] = do_conns_which_children(Name), ok = ranch:stop_listener(Name), {error, closed} = gen_tcp:recv(Socket, 0, 1000), %% Make sure the listener stopped. @@ -961,8 +960,7 @@ connection_type_supervisor_separate_from_connection(_) -> {ok, Socket} = gen_tcp:connect("localhost", Port, [binary, {active, false}, {packet, raw}]), ok = gen_tcp:send(Socket, <<"TCP Ranch is working!">>), {ok, <<"TCP Ranch is working!">>} = gen_tcp:recv(Socket, 21, 1000), - ConnsSup = ranch_server:get_connections_sup(Name), - [{supervisor_separate, _, supervisor, [supervisor_separate]}] = supervisor:which_children(ConnsSup), + [{supervisor_separate, _, supervisor, [supervisor_separate]}] = do_conns_which_children(Name), ok = ranch:stop_listener(Name), {error, closed} = gen_tcp:recv(Socket, 0, 1000), %% Make sure the listener stopped. @@ -1010,6 +1008,7 @@ do_supervisor_clean_child_restart(_) -> "when the listening socket is closed."), Name = name(), %% Trace socket allocations. + {module, ranch_tcp} = code:ensure_loaded(ranch_tcp), _ = erlang:trace(new, true, [call]), 1 = erlang:trace_pattern({ranch_tcp, listen, 1}, [{'_', [], [{return_trace}]}], [global]), @@ -1018,7 +1017,7 @@ do_supervisor_clean_child_restart(_) -> echo_protocol, []), %% Trace supervisor spawns. 1 = erlang:trace(Pid, true, [procs, set_on_spawn]), - ConnsSup = ranch_server:get_connections_sup(Name), + ConnsSups = ranch_server:get_connections_sups(Name), %% Manually shut the listening socket down. LSocket = receive {trace, _, return_from, {ranch_tcp, listen, 1}, {ok, Socket}} -> @@ -1030,7 +1029,7 @@ do_supervisor_clean_child_restart(_) -> receive after 1000 -> ok end, %% Verify that supervisor and its first two children are alive. true = is_process_alive(Pid), - true = is_process_alive(ConnsSup), + true = lists:all(fun erlang:is_process_alive/1, [ConnsSup || {_, ConnsSup} <- ConnsSups]), %% Check that acceptors_sup is restarted properly. AccSupPid = receive {trace, Pid, spawn, Pid1, _} -> Pid1 end, receive {trace, AccSupPid, spawn, _, _} -> ok end, @@ -1039,7 +1038,7 @@ do_supervisor_clean_child_restart(_) -> {trace, _, spawn, _, _} -> error(invalid_restart) after 1000 -> ok end, %% Verify that children still registered right. - ConnsSup = ranch_server:get_connections_sup(Name), + ConnsSups = ranch_server:get_connections_sups(Name), _ = erlang:trace_pattern({ranch_tcp, listen, 1}, false, []), _ = erlang:trace(all, false, [all]), ok = clean_traces(), @@ -1061,15 +1060,17 @@ do_supervisor_clean_restart(_) -> echo_protocol, []), %% Trace supervisor spawns. 1 = erlang:trace(Pid, true, [procs, set_on_spawn]), - ConnsSup0 = ranch_server:get_connections_sup(Name), - erlang:exit(ConnsSup0, kill), + {_, ConnsSupSup0, _, _} = lists:keyfind(ranch_conns_sup_sup, 1, supervisor:which_children(Pid)), + exit(ConnsSupSup0, kill), receive after 1000 -> ok end, %% Verify that supervisor is alive true = is_process_alive(Pid), %% ...but children are dead. - false = is_process_alive(ConnsSup0), + false = is_process_alive(ConnsSupSup0), %% Receive traces from newly started children - ConnsSup = receive {trace, Pid, spawn, Pid2, _} -> Pid2 end, + ConnsSupSup = receive {trace, Pid, spawn, Pid2, _} -> Pid2 end, + [receive {trace, ConnsSupSup, spawn, _Pid, _} -> ok end || + _ <- lists:seq(1, NumAcc)], AccSupPid = receive {trace, Pid, spawn, Pid3, _} -> Pid3 end, %% ...and its acceptors. [receive {trace, AccSupPid, spawn, _Pid, _} -> ok end || @@ -1080,7 +1081,6 @@ do_supervisor_clean_restart(_) -> error(invalid_restart) after 1000 -> ok end, %% Verify that new children registered themselves properly. - ConnsSup = ranch_server:get_connections_sup(Name), _ = erlang:trace(all, false, [all]), ok = clean_traces(), ok = ranch:stop_listener(Name). @@ -1094,6 +1094,7 @@ supervisor_conns_alive(Config) -> do_supervisor_conns_alive(_) -> doc("Ensure that active connections stay open when the listening socket gets closed."), Name = name(), + {module, ranch_tcp} = code:ensure_loaded(ranch_tcp), _ = erlang:trace(new, true, [call]), 1 = erlang:trace_pattern({ranch_tcp, listen, 1}, [{'_', [], [{return_trace}]}], [global]), @@ -1127,11 +1128,11 @@ supervisor_protocol_start_link_crash(_) -> {ok, _} = ranch:start_listener(Name, ranch_tcp, #{}, crash_protocol, []), - ConnsSup = ranch_server:get_connections_sup(Name), + ConnsSups = ranch_server:get_connections_sups(Name), Port = ranch:get_port(Name), {ok, _} = gen_tcp:connect("localhost", Port, [binary, {active, true}, {packet, raw}]), receive after 500 -> ok end, - ConnsSup = ranch_server:get_connections_sup(Name), + ConnsSups = ranch_server:get_connections_sups(Name), ok = ranch:stop_listener(Name). supervisor_server_recover_state(Config) -> @@ -1150,7 +1151,7 @@ do_supervisor_server_recover_state(_) -> {ok, _} = ranch:start_listener(Name, ranch_tcp, #{}, echo_protocol, []), - ConnsSup = ranch_server:get_connections_sup(Name), + ConnsSups = ranch_server:get_connections_sups(Name), ServerPid = erlang:whereis(ranch_server), {monitors, Monitors} = erlang:process_info(ServerPid, monitors), erlang:exit(ServerPid, kill), @@ -1163,10 +1164,10 @@ do_supervisor_server_recover_state(_) -> 1000 -> error(timeout) end, - ConnsSup = ranch_server:get_connections_sup(Name), + ConnsSups = ranch_server:get_connections_sups(Name), ok = ranch:stop_listener(Name), %% Check ranch_server has removed the ranch_conns_sup. - {'EXIT', {badarg, _}} = (catch ranch_server:get_connections_sup(Name)), + [] = (catch ranch_server:get_connections_sups(Name)), _ = erlang:trace(all, false, [all]), ok = clean_traces(). @@ -1174,17 +1175,15 @@ supervisor_unexpected_message(_) -> doc("Ensure the connections supervisor stays alive when it receives " "an unexpected message."), Name = name(), - {ok, ListenerPid} = ranch:start_listener(Name, + {ok, _} = ranch:start_listener(Name, ranch_tcp, #{}, echo_protocol, []), Port = ranch:get_port(Name), {ok, Socket} = gen_tcp:connect("localhost", Port, [binary, {active, false}, {packet, raw}]), ok = gen_tcp:send(Socket, <<"TCP Ranch is working!">>), {ok, <<"TCP Ranch is working!">>} = gen_tcp:recv(Socket, 21, 1000), - %% Send the unexpected message to ranch_conns_sup. - Procs = supervisor:which_children(ListenerPid), - {_, ConnsSup, _, _} = lists:keyfind(ranch_conns_sup, 1, Procs), - ConnsSup ! hello, + %% Send the unexpected message to all ranch_conns_sups. + _ = [ConnSup ! hello || {_, ConnSup} <- ranch_server:get_connections_sups(Name)], %% Connection is still up. ok = gen_tcp:send(Socket, <<"TCP Ranch is working!">>), {ok, <<"TCP Ranch is working!">>} = gen_tcp:recv(Socket, 21, 1000), @@ -1229,3 +1228,25 @@ do_get_listener_socket(ListenerSupPid) -> {links, Links} = erlang:process_info(AcceptorsSupPid, links), [LSocket] = [P || P <- Links, is_port(P)], LSocket. + +do_conns_which_children(Name) -> + Conns = [supervisor:which_children(ConnsSup) || + {_, ConnsSup} <- ranch_server:get_connections_sups(Name)], + lists:flatten(Conns). + +do_conns_count_children(Name) -> + lists:foldl( + fun + (Stats, undefined) -> + Stats; + (Stats, Acc) -> + lists:zipwith( + fun ({K, V1}, {K, V2}) -> {K, V1+V2} end, + Acc, + Stats + ) + end, + undefined, + [supervisor:count_children(ConnsSup) || + {_, ConnsSup} <- ranch_server:get_connections_sups(Name)] + ). diff --git a/test/shutdown_SUITE.erl b/test/shutdown_SUITE.erl index 249458e..3cef65a 100644 --- a/test/shutdown_SUITE.erl +++ b/test/shutdown_SUITE.erl @@ -36,8 +36,8 @@ brutal_kill(_) -> {ok, _} = gen_tcp:connect("localhost", Port, []), receive after 100 -> ok end, ListenerSupChildren = supervisor:which_children(ListenerSup), - {_, ConnsSup, _, _} = lists:keyfind(ranch_conns_sup, 1, ListenerSupChildren), - [{_, Pid, _, _}] = supervisor:which_children(ConnsSup), + {_, ConnsSupSup, _, _} = lists:keyfind(ranch_conns_sup_sup, 1, ListenerSupChildren), + [Pid] = do_get_conn_pids(ConnsSupSup), true = is_process_alive(Pid), ok = ranch:stop_listener(Name), receive after 100 -> ok end, @@ -56,8 +56,8 @@ infinity(_) -> {ok, _} = gen_tcp:connect("localhost", Port, []), receive after 100 -> ok end, ListenerSupChildren = supervisor:which_children(ListenerSup), - {_, ConnsSup, _, _} = lists:keyfind(ranch_conns_sup, 1, ListenerSupChildren), - [{_, Pid, _, _}] = supervisor:which_children(ConnsSup), + {_, ConnsSupSup, _, _} = lists:keyfind(ranch_conns_sup_sup, 1, ListenerSupChildren), + [Pid] = do_get_conn_pids(ConnsSupSup), true = is_process_alive(Pid), ok = ranch:stop_listener(Name), receive after 100 -> ok end, @@ -78,8 +78,8 @@ infinity_trap_exit(_) -> {ok, _} = gen_tcp:connect("localhost", Port, []), receive after 100 -> ok end, ListenerSupChildren = supervisor:which_children(ListenerSup), - {_, ConnsSup, _, _} = lists:keyfind(ranch_conns_sup, 1, ListenerSupChildren), - [{_, Pid, _, _}] = supervisor:which_children(ConnsSup), + {_, ConnsSupSup, _, _} = lists:keyfind(ranch_conns_sup_sup, 1, ListenerSupChildren), + [Pid] = do_get_conn_pids(ConnsSupSup), true = is_process_alive(Pid), %% This call will block infinitely. SpawnPid = spawn(fun() -> ok = ranch:stop_listener(Name) end), @@ -107,8 +107,8 @@ timeout(_) -> {ok, _} = gen_tcp:connect("localhost", Port, []), receive after 100 -> ok end, ListenerSupChildren = supervisor:which_children(ListenerSup), - {_, ConnsSup, _, _} = lists:keyfind(ranch_conns_sup, 1, ListenerSupChildren), - [{_, Pid, _, _}] = supervisor:which_children(ConnsSup), + {_, ConnsSupSup, _, _} = lists:keyfind(ranch_conns_sup_sup, 1, ListenerSupChildren), + [Pid] = do_get_conn_pids(ConnsSupSup), true = is_process_alive(Pid), ok = ranch:stop_listener(Name), receive after 100 -> ok end, @@ -129,8 +129,8 @@ timeout_trap_exit(_) -> {ok, _} = gen_tcp:connect("localhost", Port, []), receive after 100 -> ok end, ListenerSupChildren = supervisor:which_children(ListenerSup), - {_, ConnsSup, _, _} = lists:keyfind(ranch_conns_sup, 1, ListenerSupChildren), - [{_, Pid, _, _}] = supervisor:which_children(ConnsSup), + {_, ConnsSupSup, _, _} = lists:keyfind(ranch_conns_sup_sup, 1, ListenerSupChildren), + [Pid] = do_get_conn_pids(ConnsSupSup), true = is_process_alive(Pid), %% This call will block for the duration of the shutdown. SpawnPid = spawn(fun() -> ok = ranch:stop_listener(Name) end), @@ -147,3 +147,10 @@ timeout_trap_exit(_) -> false = is_process_alive(ListenerSup), false = is_process_alive(SpawnPid), ok. + +do_get_conn_pids(ConnsSupSup) -> + ConnsSups = [ConnsSup || + {_, ConnsSup, _, _} <- supervisor:which_children(ConnsSupSup)], + ConnChildren = lists:flatten( + [supervisor:which_children(ConnsSup) || ConnsSup <- ConnsSups]), + [ConnPid || {_, ConnPid, _, _} <- ConnChildren]. |