diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ranch_acceptor.erl | 94 | ||||
-rw-r--r-- | src/ranch_acceptors_sup.erl | 17 | ||||
-rw-r--r-- | src/ranch_conns_sup.erl | 173 | ||||
-rw-r--r-- | src/ranch_listener.erl | 59 | ||||
-rw-r--r-- | src/ranch_listener_sup.erl | 16 | ||||
-rw-r--r-- | src/ranch_server.erl | 102 | ||||
-rw-r--r-- | src/ranch_sup.erl | 2 |
7 files changed, 217 insertions, 246 deletions
diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl index 63d24c8..44cf52d 100644 --- a/src/ranch_acceptor.erl +++ b/src/ranch_acceptor.erl @@ -16,89 +16,31 @@ -module(ranch_acceptor). %% API. --export([start_link/6]). +-export([start_link/3]). %% Internal. --export([init/7]). --export([loop/7]). +-export([loop/3]). %% API. --spec start_link(any(), inet:socket(), module(), module(), pid(), pid()) +-spec start_link(inet:socket(), module(), pid()) -> {ok, pid()}. -start_link(Ref, LSocket, Transport, Protocol, ListenerPid, ConnsSup) -> - {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid), - {ok, Opts} = ranch_listener:get_protocol_options(ListenerPid), - Pid = spawn_link(?MODULE, init, - [LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup]), - ok = ranch_server:add_acceptor(Ref, Pid), +start_link(LSocket, Transport, ConnsSup) -> + Pid = spawn_link(?MODULE, loop, [LSocket, Transport, ConnsSup]), {ok, Pid}. %% Internal. --spec init(inet:socket(), module(), module(), - non_neg_integer(), any(), pid(), pid()) -> no_return(). -init(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) -> - async_accept(LSocket, Transport), - loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup). - --spec loop(inet:socket(), module(), module(), - ranch:max_conns(), any(), pid(), pid()) -> no_return(). -loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) -> - receive - %% We couldn't accept the socket but it's safe to continue. - {accept, continue} -> - ?MODULE:init(LSocket, Transport, Protocol, - MaxConns, Opts, ListenerPid, ConnsSup); - %% Found my sockets! - {accept, CSocket} -> - {ok, ConnPid} = supervisor:start_child(ConnsSup, - [ListenerPid, CSocket, Transport, Protocol, Opts]), - Transport:controlling_process(CSocket, ConnPid), - ConnPid ! {shoot, ListenerPid}, - {ok, MaxConns2} = case MaxConns of - infinity -> - {ok, infinity}; - _ -> - NbConns = ranch_listener:add_connection(ListenerPid, ConnPid), - maybe_wait(ListenerPid, MaxConns, NbConns) - end, - ?MODULE:init(LSocket, Transport, Protocol, - MaxConns2, Opts, ListenerPid, ConnsSup); - %% Upgrade the max number of connections allowed concurrently. - {set_max_conns, MaxConns2} -> - ?MODULE:loop(LSocket, Transport, Protocol, - MaxConns2, Opts, ListenerPid, ConnsSup); - %% Upgrade the protocol options. - {set_opts, Opts2} -> - ?MODULE:loop(LSocket, Transport, Protocol, - MaxConns, Opts2, ListenerPid, ConnsSup) - end. - --spec maybe_wait(pid(), MaxConns, non_neg_integer()) - -> {ok, MaxConns} when MaxConns::ranch:max_conns(). -maybe_wait(_, MaxConns, NbConns) when MaxConns > NbConns -> - {ok, MaxConns}; -maybe_wait(ListenerPid, MaxConns, NbConns) -> - receive - {set_max_conns, MaxConns2} -> - maybe_wait(ListenerPid, MaxConns2, NbConns) - after 0 -> - NbConns2 = ranch_server:count_connections(ListenerPid), - maybe_wait(ListenerPid, MaxConns, NbConns2) - end. - --spec async_accept(inet:socket(), module()) -> ok. -async_accept(LSocket, Transport) -> - AcceptorPid = self(), - _ = spawn_link(fun() -> - case Transport:accept(LSocket, infinity) of - {ok, CSocket} -> - Transport:controlling_process(CSocket, AcceptorPid), - AcceptorPid ! {accept, CSocket}; - %% We want to crash if the listening socket got closed. - {error, Reason} when Reason =/= closed -> - AcceptorPid ! {accept, continue} - end - end), - ok. +-spec loop(inet:socket(), module(), pid()) -> no_return(). +loop(LSocket, Transport, ConnsSup) -> + _ = case Transport:accept(LSocket, infinity) of + {ok, CSocket} -> + Transport:controlling_process(CSocket, ConnsSup), + %% This call will not return until process has been started + %% AND we are below the maximum number of connections. + ranch_conns_sup:start_protocol(ConnsSup, CSocket); + %% We want to crash if the listening socket got closed. + {error, Reason} when Reason =/= closed -> + ok + end, + ?MODULE:loop(LSocket, Transport, ConnsSup). diff --git a/src/ranch_acceptors_sup.erl b/src/ranch_acceptors_sup.erl index f1908a6..97df74f 100644 --- a/src/ranch_acceptors_sup.erl +++ b/src/ranch_acceptors_sup.erl @@ -17,24 +17,23 @@ -behaviour(supervisor). %% API. --export([start_link/5]). +-export([start_link/4]). %% supervisor. -export([init/1]). %% API. --spec start_link(any(), non_neg_integer(), module(), any(), - module()) -> {ok, pid()}. -start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol) -> - supervisor:start_link(?MODULE, [Ref, NbAcceptors, Transport, TransOpts, - Protocol]). +-spec start_link(any(), non_neg_integer(), module(), any()) + -> {ok, pid()}. +start_link(Ref, NbAcceptors, Transport, TransOpts) -> + supervisor:start_link(?MODULE, [Ref, NbAcceptors, Transport, TransOpts]). %% supervisor. -init([Ref, NbAcceptors, Transport, TransOpts, Protocol]) -> +init([Ref, NbAcceptors, Transport, TransOpts]) -> ListenerPid = ranch_server:lookup_listener(Ref), - ConnsPid = ranch_server:lookup_connections_sup(Ref), + ConnsSup = ranch_server:lookup_connections_sup(Ref), LSocket = case proplists:get_value(socket, TransOpts) of undefined -> {ok, Socket} = Transport:listen(TransOpts), @@ -46,7 +45,7 @@ init([Ref, NbAcceptors, Transport, TransOpts, Protocol]) -> ranch_listener:set_port(ListenerPid, Port), Procs = [ {{acceptor, self(), N}, {ranch_acceptor, start_link, [ - Ref, LSocket, Transport, Protocol, ListenerPid, ConnsPid + LSocket, Transport, ConnsSup ]}, permanent, brutal_kill, worker, []} || N <- lists:seq(1, NbAcceptors)], {ok, {{one_for_one, 10, 10}, Procs}}. diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl index 3cc09be..29b550f 100644 --- a/src/ranch_conns_sup.erl +++ b/src/ranch_conns_sup.erl @@ -13,30 +13,171 @@ %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. %% @private +%% +%% Make sure to never reload this module outside a release upgrade, +%% as calling l(ranch_conns_sup) twice will kill the process and all +%% the currently open connections. -module(ranch_conns_sup). --behaviour(supervisor). %% API. --export([start_link/1]). --export([start_protocol/5]). +-export([start_link/3]). +-export([start_protocol/2]). +-export([active_connections/1]). + +%% Supervisor internals. +-export([init/4]). +-export([system_continue/3]). +-export([system_terminate/4]). +-export([system_code_change/4]). -%% supervisor. --export([init/1]). +-record(state, { + parent = undefined :: pid(), + listener_pid = undefined :: pid(), + transport = undefined :: module(), + protocol = undefined :: module(), + opts :: any(), + max_conns = undefined :: non_neg_integer() | infinity +}). %% API. --spec start_link(any()) -> {ok, pid()}. -start_link(Ref) -> - supervisor:start_link(?MODULE, Ref). +-spec start_link(any(), module(), module()) -> {ok, pid()}. +start_link(Ref, Transport, Protocol) -> + proc_lib:start_link(?MODULE, init, [self(), Ref, Transport, Protocol]). + +%% We can safely assume we are on the same node as the supervisor. +%% +%% We can also safely avoid having a monitor and a timeout here +%% because only three things can happen: +%% * The supervisor died; rest_for_one strategy killed all acceptors +%% so this very calling process is going to di-- +%% * There's too many connections, the supervisor will resume the +%% acceptor only when we get below the limit again. +%% * The supervisor is overloaded, there's either too many acceptors +%% or the max_connections limit is too large. It's better if we +%% don't keep accepting connections because this leaves +%% more room for the situation to be resolved. +%% +%% We do not need the reply, we only need the ok from the supervisor +%% to continue. The supervisor sends its own pid when the acceptor can +%% continue. +-spec start_protocol(pid(), inet:socket()) -> ok. +start_protocol(SupPid, Socket) -> + SupPid ! {?MODULE, start_protocol, self(), Socket}, + receive SupPid -> ok end. --spec start_protocol(pid(), inet:socket(), module(), module(), any()) - -> {ok, pid()}. -start_protocol(ListenerPid, Socket, Transport, Protocol, Opts) -> - Protocol:start_link(ListenerPid, Socket, Transport, Opts). +%% We can't make the above assumptions here. This function might be +%% called from anywhere. +-spec active_connections(pid()) -> non_neg_integer(). +active_connections(SupPid) -> + Tag = erlang:monitor(process, SupPid), + erlang:send(SupPid, {?MODULE, active_connections, self(), Tag}, + [noconnect]), + receive + {Tag, Ret} -> + erlang:demonitor(Tag, [flush]), + Ret; + {'DOWN', Tag, _, _, noconnection} -> + exit({nodedown, node(SupPid)}); + {'DOWN', Tag, _, _, Reason} -> + exit(Reason) + after 5000 -> + erlang:demonitor(Tag, [flush]), + exit(timeout) + end. -%% supervisor. +%% Supervisor internals. -init(Ref) -> +-spec init(pid(), any(), module(), module()) -> no_return(). +init(Parent, Ref, Transport, Protocol) -> + process_flag(trap_exit, true), ok = ranch_server:set_connections_sup(Ref, self()), - {ok, {{simple_one_for_one, 0, 1}, [{?MODULE, {?MODULE, start_protocol, []}, - temporary, brutal_kill, worker, [?MODULE]}]}}. + ListenerPid = ranch_server:lookup_listener(Ref), + {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid), + {ok, Opts} = ranch_listener:get_protocol_options(ListenerPid), + ok = proc_lib:init_ack(Parent, {ok, self()}), + loop(#state{parent=Parent, listener_pid=ListenerPid, transport=Transport, + protocol=Protocol, opts=Opts, max_conns=MaxConns}, 0, 0, []). + +loop(State=#state{parent=Parent, listener_pid=ListenerPid, + transport=Transport, protocol=Protocol, opts=Opts, + max_conns=MaxConns}, CurConns, NbChildren, Sleepers) -> + receive + {?MODULE, start_protocol, To, Socket} -> + case Protocol:start_link(ListenerPid, Socket, Transport, Opts) of + {ok, Pid} -> + Transport:controlling_process(Socket, Pid), + Pid ! {shoot, ListenerPid}, + put(Pid, true), + CurConns2 = CurConns + 1, + if CurConns2 < MaxConns -> + To ! self(), + loop(State, CurConns2, NbChildren + 1, + Sleepers); + true -> + loop(State, CurConns2, NbChildren + 1, + [To|Sleepers]) + end; + _ -> + To ! self(), + loop(State, CurConns, NbChildren, Sleepers) + end; + {?MODULE, active_connections, To, Tag} -> + To ! {Tag, CurConns}, + loop(State, CurConns, NbChildren, Sleepers); + %% Remove a connection from the count of connections. + {remove_connection, ListenerPid} -> + loop(State, CurConns - 1, NbChildren, Sleepers); + %% Upgrade the max number of connections allowed concurrently. + %% We resume all sleeping acceptors if this number increases. + {set_max_conns, MaxConns2} when MaxConns2 > MaxConns -> + _ = [To ! self() || To <- Sleepers], + loop(State#state{max_conns=MaxConns2}, + CurConns, NbChildren, []); + {set_max_conns, MaxConns2} -> + loop(State#state{max_conns=MaxConns2}, + CurConns, NbChildren, Sleepers); + %% Upgrade the protocol options. + {set_opts, Opts2} -> + loop(State#state{opts=Opts2}, + CurConns, NbChildren, Sleepers); + {'EXIT', Parent, Reason} -> + exit(Reason); + {'EXIT', Pid, _} when Sleepers =:= [] -> + erase(Pid), + loop(State, CurConns - 1, NbChildren - 1, Sleepers); + %% Resume a sleeping acceptor if needed. + {'EXIT', Pid, _} -> + erase(Pid), + [To|Sleepers2] = Sleepers, + To ! self(), + loop(State, CurConns - 1, NbChildren - 1, Sleepers2); + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {State, CurConns, NbChildren, Sleepers}); + %% Calls from the supervisor module. + {'$gen_call', {To, Tag}, which_children} -> + Pids = get_keys(true), + Children = [{Protocol, Pid, worker, [Protocol]} + || Pid <- Pids], + To ! {Tag, Children}, + loop(State, CurConns, NbChildren, Sleepers); + {'$gen_call', {To, Tag}, count_children} -> + Counts = [{specs, 1}, {active, NbChildren}, + {supervisors, 0}, {workers, NbChildren}], + To ! {Tag, Counts}, + loop(State, CurConns, NbChildren, Sleepers); + {'$gen_call', {To, Tag}, _} -> + To ! {Tag, {error, ?MODULE}}, + loop(State, CurConns, NbChildren, Sleepers) + end. + +system_continue(_, _, {State, CurConns, NbChildren, Sleepers}) -> + loop(State, CurConns, NbChildren, Sleepers). + +-spec system_terminate(any(), _, _, _) -> no_return(). +system_terminate(Reason, _, _, _) -> + exit(Reason). + +system_code_change(Misc, _, _, _) -> + {ok, Misc}. diff --git a/src/ranch_listener.erl b/src/ranch_listener.erl index 408fbcd..a53cc75 100644 --- a/src/ranch_listener.erl +++ b/src/ranch_listener.erl @@ -19,7 +19,6 @@ %% API. -export([start_link/3]). -export([stop/1]). --export([add_connection/2]). -export([remove_connection/1]). -export([get_port/1]). -export([set_port/2]). @@ -40,8 +39,7 @@ ref :: any(), max_conns = undefined :: ranch:max_conns(), port = undefined :: undefined | inet:port_number(), - proto_opts = undefined :: any(), - rm_diff = 0 :: non_neg_integer() + proto_opts = undefined :: any() }). %% API. @@ -56,27 +54,16 @@ start_link(Ref, MaxConns, ProtoOpts) -> stop(ServerPid) -> gen_server:call(ServerPid, stop). -%% @doc Add a connection to the listener's pool. --spec add_connection(pid(), pid()) -> non_neg_integer(). -add_connection(ServerPid, ConnPid) -> - ok = gen_server:cast(ServerPid, {add_connection, ConnPid}), - ranch_server:add_connection(ServerPid). - %% @doc Remove this process' connection from the pool. %% %% Useful if you have long-lived connections that aren't taking up %% resources and shouldn't be counted in the limited number of running %% connections. --spec remove_connection(pid()) -> non_neg_integer(). +-spec remove_connection(pid()) -> ok. remove_connection(ServerPid) -> - try - Count = ranch_server:remove_connection(ServerPid), - ok = gen_server:cast(ServerPid, remove_connection), - Count - catch - error:badarg -> % Max conns = infinity - 0 - end. + ConnsSup = ranch_server:find_connections_sup(ServerPid), + ConnsSup ! {remove_connection, ServerPid}, + ok. %% @doc Return the listener's port. -spec get_port(pid()) -> {ok, inet:port_number()}. @@ -111,12 +98,8 @@ set_protocol_options(ServerPid, ProtoOpts) -> %% gen_server. %% @private -init([Ref, infinity, ProtoOpts]) -> - ok = ranch_server:insert_listener(Ref, self()), - {ok, #state{ref=Ref, max_conns=infinity, proto_opts=ProtoOpts}}; init([Ref, MaxConns, ProtoOpts]) -> ok = ranch_server:insert_listener(Ref, self()), - ranch_server:add_connections_counter(self()), {ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}. %% @private @@ -125,23 +108,15 @@ handle_call(get_port, _From, State=#state{port=Port}) -> handle_call(get_max_connections, _From, State=#state{max_conns=MaxConns}) -> {reply, {ok, MaxConns}, State}; handle_call({set_max_connections, MaxConnections}, _From, - State=#state{ref=Ref, max_conns=CurrMax, rm_diff=CurrDiff}) -> - RmDiff = case {MaxConnections, CurrMax} of - {infinity, _} -> % moving to infinity, delete connection key - ranch_server:remove_connections_counter(self()), - 0; - {_, infinity} -> % moving away from infinity, create connection key - ranch_server:add_connections_counter(self()), - CurrDiff; - {_, _} -> % stay current - CurrDiff - end, - ranch_server:send_to_acceptors(Ref, {set_max_conns, MaxConnections}), - {reply, ok, State#state{max_conns=MaxConnections, rm_diff=RmDiff}}; + State=#state{ref=Ref}) -> + ConnsSup = ranch_server:lookup_connections_sup(Ref), + ConnsSup ! {set_max_conns, MaxConnections}, + {reply, ok, State#state{max_conns=MaxConnections}}; handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) -> {reply, {ok, ProtoOpts}, State}; handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{ref=Ref}) -> - ranch_server:send_to_acceptors(Ref, {set_opts, ProtoOpts}), + ConnsSup = ranch_server:lookup_connections_sup(Ref), + ConnsSup ! {set_opts, ProtoOpts}, {reply, ok, State#state{proto_opts=ProtoOpts}}; handle_call(stop, _From, State) -> {stop, normal, stopped, State}; @@ -149,24 +124,12 @@ handle_call(_, _From, State) -> {reply, ignored, State}. %% @private -handle_cast({add_connection, ConnPid}, State) -> - _ = erlang:monitor(process, ConnPid), - {noreply, State}; -handle_cast(remove_connection, State=#state{max_conns=infinity}) -> - {noreply, State}; -handle_cast(remove_connection, State=#state{rm_diff=RmDiff}) -> - {noreply, State#state{rm_diff=RmDiff + 1}}; handle_cast({set_port, Port}, State) -> {noreply, State#state{port=Port}}; handle_cast(_Msg, State) -> {noreply, State}. %% @private -handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=0}) -> - _ = ranch_server:remove_connection(self()), - {noreply, State}; -handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=RmDiff}) -> - {noreply, State#state{rm_diff=RmDiff - 1}}; handle_info(_Info, State) -> {noreply, State}. diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl index 0147cf2..1d346aa 100644 --- a/src/ranch_listener_sup.erl +++ b/src/ranch_listener_sup.erl @@ -30,7 +30,7 @@ start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> MaxConns = proplists:get_value(max_connections, TransOpts, 1024), supervisor:start_link(?MODULE, { Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts - }). + }). %% supervisor. @@ -38,15 +38,15 @@ init({Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts}) -> ChildSpecs = [ %% listener {ranch_listener, {ranch_listener, start_link, - [Ref, MaxConns, ProtoOpts]}, - permanent, 5000, worker, [ranch_listener]}, + [Ref, MaxConns, ProtoOpts]}, + permanent, 5000, worker, [ranch_listener]}, %% conns_sup - {ranch_conns_sup, {ranch_conns_sup, start_link, [Ref]}, - permanent, infinity, supervisor, [ranch_conns_sup]}, + {ranch_conns_sup, {ranch_conns_sup, start_link, + [Ref, Transport, Protocol]}, + permanent, infinity, supervisor, [ranch_conns_sup]}, %% acceptors_sup {ranch_acceptors_sup, {ranch_acceptors_sup, start_link, - [Ref, NbAcceptors, Transport, TransOpts, Protocol] - }, permanent, infinity, supervisor, [ranch_acceptors_sup]} + [Ref, NbAcceptors, Transport, TransOpts] + }, permanent, infinity, supervisor, [ranch_acceptors_sup]} ], {ok, {{rest_for_one, 10, 10}, ChildSpecs}}. - diff --git a/src/ranch_server.erl b/src/ranch_server.erl index c6d7c19..77f11c5 100644 --- a/src/ranch_server.erl +++ b/src/ranch_server.erl @@ -22,13 +22,8 @@ -export([lookup_listener/1]). -export([set_connections_sup/2]). -export([lookup_connections_sup/1]). --export([add_acceptor/2]). --export([send_to_acceptors/2]). --export([add_connection/1]). +-export([find_connections_sup/1]). -export([count_connections/1]). --export([remove_connection/1]). --export([add_connections_counter/1]). --export([remove_connections_counter/1]). %% gen_server. -export([init/1]). @@ -40,8 +35,7 @@ -define(TAB, ?MODULE). --type key() :: {listener | acceptors, any()}. --type monitors() :: [{{reference(), pid()}, key()}]. +-type monitors() :: [{{reference(), pid()}, any()}]. -record(state, { monitors = [] :: monitors() }). @@ -68,6 +62,7 @@ lookup_listener(Ref) -> -spec set_connections_sup(any(), pid()) -> ok. set_connections_sup(Ref, Pid) -> true = ets:update_element(?TAB, {listener, Ref}, {3, Pid}), + true = ets:insert_new(?TAB, {{conns_sup, lookup_listener(Ref)}, Pid}), ok. %% @doc Lookup a connection supervisor used by specific listener. @@ -75,56 +70,15 @@ set_connections_sup(Ref, Pid) -> lookup_connections_sup(Ref) -> ets:lookup_element(?TAB, {listener, Ref}, 3). -%% @doc Add an acceptor for the given listener. --spec add_acceptor(any(), pid()) -> ok. -add_acceptor(Ref, Pid) -> - gen_server:cast(?MODULE, {add_acceptor, Ref, Pid}). - -%% @doc Send a message to all acceptors of the given listener. --spec send_to_acceptors(any(), any()) -> ok. -send_to_acceptors(Ref, Msg) -> - Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2), - _ = [Pid ! Msg || Pid <- Acceptors], - ok. - -%% @doc Add a connection to the connection pool. -%% -%% Also return the number of connections in the pool after this operation. --spec add_connection(pid()) -> non_neg_integer(). -add_connection(ListenerPid) -> - ets:update_counter(?TAB, {connections, ListenerPid}, 1). +%% @doc Find a connection supervisor using the listener pid. +-spec find_connections_sup(pid()) -> pid(). +find_connections_sup(Pid) -> + ets:lookup_element(?TAB, {conns_sup, Pid}, 2). %% @doc Count the number of connections in the connection pool. --spec count_connections(pid()) -> non_neg_integer(). -count_connections(ListenerPid) -> - try - ets:update_counter(?TAB, {connections, ListenerPid}, 0) - catch - error:badarg -> % Max conns = infinity - 0 - end. - -%% @doc Remove a connection from the connection pool. -%% -%% Also return the number of connections in the pool after this operation. --spec remove_connection(pid()) -> non_neg_integer(). -remove_connection(ListenerPid) -> - ets:update_counter(?TAB, {connections, ListenerPid}, -1). - - -%% @doc Add a connections counter to the connection pool -%% -%% Should only be used by ranch listeners when settings regarding the max -%% number of connections change. -add_connections_counter(Pid) -> - true = ets:insert_new(?TAB, {{connections, Pid}, 0}). - -%% @doc remove a connections counter from the connection pool -%% -%% Should only be used by ranch listeners when settings regarding the max -%% number of connections change. -remove_connections_counter(Pid) -> - true = ets:delete(?TAB, {connections, Pid}). +-spec count_connections(any()) -> non_neg_integer(). +count_connections(Ref) -> + ranch_conns_sup:active_connections(lookup_connections_sup(Ref)). %% gen_server. @@ -138,27 +92,18 @@ handle_call(_Request, _From, State) -> %% @private handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) -> - true = ets:insert_new(?TAB, {{acceptors, Ref}, []}), MonitorRef = erlang:monitor(process, Pid), {noreply, State#state{ - monitors=[{{MonitorRef, Pid}, {listener, Ref}}|Monitors]}}; -handle_cast({add_acceptor, Ref, Pid}, State=#state{monitors=Monitors}) -> - MonitorRef = erlang:monitor(process, Pid), - Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2), - true = ets:insert(?TAB, {{acceptors, Ref}, [Pid|Acceptors]}), - {noreply, State#state{ - monitors=[{{MonitorRef, Pid}, {acceptors, Ref}}|Monitors]}}; -handle_cast({add_connection, Pid}, State) -> - _ = erlang:monitor(process, Pid), - {noreply, State}; + monitors=[{{MonitorRef, Pid}, Ref}|Monitors]}}; handle_cast(_Request, State) -> {noreply, State}. %% @private handle_info({'DOWN', MonitorRef, process, Pid, _}, State=#state{monitors=Monitors}) -> - {_, Key} = lists:keyfind({MonitorRef, Pid}, 1, Monitors), - Monitors2 = remove_process(Key, MonitorRef, Pid, Monitors), + {_, Ref} = lists:keyfind({MonitorRef, Pid}, 1, Monitors), + true = ets:delete(?TAB, {listener, Ref}), + Monitors2 = lists:keydelete({MonitorRef, Pid}, 1, Monitors), {noreply, State#state{monitors=Monitors2}}; handle_info(_Info, State) -> {noreply, State}. @@ -170,22 +115,3 @@ terminate(_Reason, _State) -> %% @private code_change(_OldVsn, State, _Extra) -> {ok, State}. - -%% Internal. - --spec remove_process(key(), reference(), pid(), Monitors) - -> Monitors when Monitors::monitors() . -remove_process(Key = {listener, Ref}, MonitorRef, Pid, Monitors) -> - true = ets:delete(?TAB, Key), - true = ets:delete(?TAB, {acceptors, Ref}), - remove_connections_counter(Pid), - lists:keydelete({MonitorRef, Pid}, 1, Monitors); -remove_process(Key = {acceptors, _}, MonitorRef, Pid, Monitors) -> - try - Acceptors = ets:lookup_element(?TAB, Key, 2), - true = ets:update_element(?TAB, Key, {2, lists:delete(Pid, Acceptors)}) - catch - error:_ -> - ok - end, - lists:keydelete({MonitorRef, Pid}, 1, Monitors). diff --git a/src/ranch_sup.erl b/src/ranch_sup.erl index ad1c558..ddf2f69 100644 --- a/src/ranch_sup.erl +++ b/src/ranch_sup.erl @@ -34,7 +34,7 @@ start_link() -> init([]) -> ranch_server = ets:new(ranch_server, [ - ordered_set, public, named_table, {write_concurrency, true}]), + ordered_set, public, named_table]), Procs = [ {ranch_server, {ranch_server, start_link, []}, permanent, 5000, worker, [ranch_server]} |