aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ranch_acceptor.erl94
-rw-r--r--src/ranch_acceptors_sup.erl17
-rw-r--r--src/ranch_conns_sup.erl173
-rw-r--r--src/ranch_listener.erl59
-rw-r--r--src/ranch_listener_sup.erl16
-rw-r--r--src/ranch_server.erl102
-rw-r--r--src/ranch_sup.erl2
7 files changed, 217 insertions, 246 deletions
diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl
index 63d24c8..44cf52d 100644
--- a/src/ranch_acceptor.erl
+++ b/src/ranch_acceptor.erl
@@ -16,89 +16,31 @@
-module(ranch_acceptor).
%% API.
--export([start_link/6]).
+-export([start_link/3]).
%% Internal.
--export([init/7]).
--export([loop/7]).
+-export([loop/3]).
%% API.
--spec start_link(any(), inet:socket(), module(), module(), pid(), pid())
+-spec start_link(inet:socket(), module(), pid())
-> {ok, pid()}.
-start_link(Ref, LSocket, Transport, Protocol, ListenerPid, ConnsSup) ->
- {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid),
- {ok, Opts} = ranch_listener:get_protocol_options(ListenerPid),
- Pid = spawn_link(?MODULE, init,
- [LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup]),
- ok = ranch_server:add_acceptor(Ref, Pid),
+start_link(LSocket, Transport, ConnsSup) ->
+ Pid = spawn_link(?MODULE, loop, [LSocket, Transport, ConnsSup]),
{ok, Pid}.
%% Internal.
--spec init(inet:socket(), module(), module(),
- non_neg_integer(), any(), pid(), pid()) -> no_return().
-init(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) ->
- async_accept(LSocket, Transport),
- loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup).
-
--spec loop(inet:socket(), module(), module(),
- ranch:max_conns(), any(), pid(), pid()) -> no_return().
-loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) ->
- receive
- %% We couldn't accept the socket but it's safe to continue.
- {accept, continue} ->
- ?MODULE:init(LSocket, Transport, Protocol,
- MaxConns, Opts, ListenerPid, ConnsSup);
- %% Found my sockets!
- {accept, CSocket} ->
- {ok, ConnPid} = supervisor:start_child(ConnsSup,
- [ListenerPid, CSocket, Transport, Protocol, Opts]),
- Transport:controlling_process(CSocket, ConnPid),
- ConnPid ! {shoot, ListenerPid},
- {ok, MaxConns2} = case MaxConns of
- infinity ->
- {ok, infinity};
- _ ->
- NbConns = ranch_listener:add_connection(ListenerPid, ConnPid),
- maybe_wait(ListenerPid, MaxConns, NbConns)
- end,
- ?MODULE:init(LSocket, Transport, Protocol,
- MaxConns2, Opts, ListenerPid, ConnsSup);
- %% Upgrade the max number of connections allowed concurrently.
- {set_max_conns, MaxConns2} ->
- ?MODULE:loop(LSocket, Transport, Protocol,
- MaxConns2, Opts, ListenerPid, ConnsSup);
- %% Upgrade the protocol options.
- {set_opts, Opts2} ->
- ?MODULE:loop(LSocket, Transport, Protocol,
- MaxConns, Opts2, ListenerPid, ConnsSup)
- end.
-
--spec maybe_wait(pid(), MaxConns, non_neg_integer())
- -> {ok, MaxConns} when MaxConns::ranch:max_conns().
-maybe_wait(_, MaxConns, NbConns) when MaxConns > NbConns ->
- {ok, MaxConns};
-maybe_wait(ListenerPid, MaxConns, NbConns) ->
- receive
- {set_max_conns, MaxConns2} ->
- maybe_wait(ListenerPid, MaxConns2, NbConns)
- after 0 ->
- NbConns2 = ranch_server:count_connections(ListenerPid),
- maybe_wait(ListenerPid, MaxConns, NbConns2)
- end.
-
--spec async_accept(inet:socket(), module()) -> ok.
-async_accept(LSocket, Transport) ->
- AcceptorPid = self(),
- _ = spawn_link(fun() ->
- case Transport:accept(LSocket, infinity) of
- {ok, CSocket} ->
- Transport:controlling_process(CSocket, AcceptorPid),
- AcceptorPid ! {accept, CSocket};
- %% We want to crash if the listening socket got closed.
- {error, Reason} when Reason =/= closed ->
- AcceptorPid ! {accept, continue}
- end
- end),
- ok.
+-spec loop(inet:socket(), module(), pid()) -> no_return().
+loop(LSocket, Transport, ConnsSup) ->
+ _ = case Transport:accept(LSocket, infinity) of
+ {ok, CSocket} ->
+ Transport:controlling_process(CSocket, ConnsSup),
+ %% This call will not return until process has been started
+ %% AND we are below the maximum number of connections.
+ ranch_conns_sup:start_protocol(ConnsSup, CSocket);
+ %% We want to crash if the listening socket got closed.
+ {error, Reason} when Reason =/= closed ->
+ ok
+ end,
+ ?MODULE:loop(LSocket, Transport, ConnsSup).
diff --git a/src/ranch_acceptors_sup.erl b/src/ranch_acceptors_sup.erl
index f1908a6..97df74f 100644
--- a/src/ranch_acceptors_sup.erl
+++ b/src/ranch_acceptors_sup.erl
@@ -17,24 +17,23 @@
-behaviour(supervisor).
%% API.
--export([start_link/5]).
+-export([start_link/4]).
%% supervisor.
-export([init/1]).
%% API.
--spec start_link(any(), non_neg_integer(), module(), any(),
- module()) -> {ok, pid()}.
-start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol) ->
- supervisor:start_link(?MODULE, [Ref, NbAcceptors, Transport, TransOpts,
- Protocol]).
+-spec start_link(any(), non_neg_integer(), module(), any())
+ -> {ok, pid()}.
+start_link(Ref, NbAcceptors, Transport, TransOpts) ->
+ supervisor:start_link(?MODULE, [Ref, NbAcceptors, Transport, TransOpts]).
%% supervisor.
-init([Ref, NbAcceptors, Transport, TransOpts, Protocol]) ->
+init([Ref, NbAcceptors, Transport, TransOpts]) ->
ListenerPid = ranch_server:lookup_listener(Ref),
- ConnsPid = ranch_server:lookup_connections_sup(Ref),
+ ConnsSup = ranch_server:lookup_connections_sup(Ref),
LSocket = case proplists:get_value(socket, TransOpts) of
undefined ->
{ok, Socket} = Transport:listen(TransOpts),
@@ -46,7 +45,7 @@ init([Ref, NbAcceptors, Transport, TransOpts, Protocol]) ->
ranch_listener:set_port(ListenerPid, Port),
Procs = [
{{acceptor, self(), N}, {ranch_acceptor, start_link, [
- Ref, LSocket, Transport, Protocol, ListenerPid, ConnsPid
+ LSocket, Transport, ConnsSup
]}, permanent, brutal_kill, worker, []}
|| N <- lists:seq(1, NbAcceptors)],
{ok, {{one_for_one, 10, 10}, Procs}}.
diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl
index 3cc09be..29b550f 100644
--- a/src/ranch_conns_sup.erl
+++ b/src/ranch_conns_sup.erl
@@ -13,30 +13,171 @@
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
%% @private
+%%
+%% Make sure to never reload this module outside a release upgrade,
+%% as calling l(ranch_conns_sup) twice will kill the process and all
+%% the currently open connections.
-module(ranch_conns_sup).
--behaviour(supervisor).
%% API.
--export([start_link/1]).
--export([start_protocol/5]).
+-export([start_link/3]).
+-export([start_protocol/2]).
+-export([active_connections/1]).
+
+%% Supervisor internals.
+-export([init/4]).
+-export([system_continue/3]).
+-export([system_terminate/4]).
+-export([system_code_change/4]).
-%% supervisor.
--export([init/1]).
+-record(state, {
+ parent = undefined :: pid(),
+ listener_pid = undefined :: pid(),
+ transport = undefined :: module(),
+ protocol = undefined :: module(),
+ opts :: any(),
+ max_conns = undefined :: non_neg_integer() | infinity
+}).
%% API.
--spec start_link(any()) -> {ok, pid()}.
-start_link(Ref) ->
- supervisor:start_link(?MODULE, Ref).
+-spec start_link(any(), module(), module()) -> {ok, pid()}.
+start_link(Ref, Transport, Protocol) ->
+ proc_lib:start_link(?MODULE, init, [self(), Ref, Transport, Protocol]).
+
+%% We can safely assume we are on the same node as the supervisor.
+%%
+%% We can also safely avoid having a monitor and a timeout here
+%% because only three things can happen:
+%% * The supervisor died; rest_for_one strategy killed all acceptors
+%% so this very calling process is going to di--
+%% * There's too many connections, the supervisor will resume the
+%% acceptor only when we get below the limit again.
+%% * The supervisor is overloaded, there's either too many acceptors
+%% or the max_connections limit is too large. It's better if we
+%% don't keep accepting connections because this leaves
+%% more room for the situation to be resolved.
+%%
+%% We do not need the reply, we only need the ok from the supervisor
+%% to continue. The supervisor sends its own pid when the acceptor can
+%% continue.
+-spec start_protocol(pid(), inet:socket()) -> ok.
+start_protocol(SupPid, Socket) ->
+ SupPid ! {?MODULE, start_protocol, self(), Socket},
+ receive SupPid -> ok end.
--spec start_protocol(pid(), inet:socket(), module(), module(), any())
- -> {ok, pid()}.
-start_protocol(ListenerPid, Socket, Transport, Protocol, Opts) ->
- Protocol:start_link(ListenerPid, Socket, Transport, Opts).
+%% We can't make the above assumptions here. This function might be
+%% called from anywhere.
+-spec active_connections(pid()) -> non_neg_integer().
+active_connections(SupPid) ->
+ Tag = erlang:monitor(process, SupPid),
+ erlang:send(SupPid, {?MODULE, active_connections, self(), Tag},
+ [noconnect]),
+ receive
+ {Tag, Ret} ->
+ erlang:demonitor(Tag, [flush]),
+ Ret;
+ {'DOWN', Tag, _, _, noconnection} ->
+ exit({nodedown, node(SupPid)});
+ {'DOWN', Tag, _, _, Reason} ->
+ exit(Reason)
+ after 5000 ->
+ erlang:demonitor(Tag, [flush]),
+ exit(timeout)
+ end.
-%% supervisor.
+%% Supervisor internals.
-init(Ref) ->
+-spec init(pid(), any(), module(), module()) -> no_return().
+init(Parent, Ref, Transport, Protocol) ->
+ process_flag(trap_exit, true),
ok = ranch_server:set_connections_sup(Ref, self()),
- {ok, {{simple_one_for_one, 0, 1}, [{?MODULE, {?MODULE, start_protocol, []},
- temporary, brutal_kill, worker, [?MODULE]}]}}.
+ ListenerPid = ranch_server:lookup_listener(Ref),
+ {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid),
+ {ok, Opts} = ranch_listener:get_protocol_options(ListenerPid),
+ ok = proc_lib:init_ack(Parent, {ok, self()}),
+ loop(#state{parent=Parent, listener_pid=ListenerPid, transport=Transport,
+ protocol=Protocol, opts=Opts, max_conns=MaxConns}, 0, 0, []).
+
+loop(State=#state{parent=Parent, listener_pid=ListenerPid,
+ transport=Transport, protocol=Protocol, opts=Opts,
+ max_conns=MaxConns}, CurConns, NbChildren, Sleepers) ->
+ receive
+ {?MODULE, start_protocol, To, Socket} ->
+ case Protocol:start_link(ListenerPid, Socket, Transport, Opts) of
+ {ok, Pid} ->
+ Transport:controlling_process(Socket, Pid),
+ Pid ! {shoot, ListenerPid},
+ put(Pid, true),
+ CurConns2 = CurConns + 1,
+ if CurConns2 < MaxConns ->
+ To ! self(),
+ loop(State, CurConns2, NbChildren + 1,
+ Sleepers);
+ true ->
+ loop(State, CurConns2, NbChildren + 1,
+ [To|Sleepers])
+ end;
+ _ ->
+ To ! self(),
+ loop(State, CurConns, NbChildren, Sleepers)
+ end;
+ {?MODULE, active_connections, To, Tag} ->
+ To ! {Tag, CurConns},
+ loop(State, CurConns, NbChildren, Sleepers);
+ %% Remove a connection from the count of connections.
+ {remove_connection, ListenerPid} ->
+ loop(State, CurConns - 1, NbChildren, Sleepers);
+ %% Upgrade the max number of connections allowed concurrently.
+ %% We resume all sleeping acceptors if this number increases.
+ {set_max_conns, MaxConns2} when MaxConns2 > MaxConns ->
+ _ = [To ! self() || To <- Sleepers],
+ loop(State#state{max_conns=MaxConns2},
+ CurConns, NbChildren, []);
+ {set_max_conns, MaxConns2} ->
+ loop(State#state{max_conns=MaxConns2},
+ CurConns, NbChildren, Sleepers);
+ %% Upgrade the protocol options.
+ {set_opts, Opts2} ->
+ loop(State#state{opts=Opts2},
+ CurConns, NbChildren, Sleepers);
+ {'EXIT', Parent, Reason} ->
+ exit(Reason);
+ {'EXIT', Pid, _} when Sleepers =:= [] ->
+ erase(Pid),
+ loop(State, CurConns - 1, NbChildren - 1, Sleepers);
+ %% Resume a sleeping acceptor if needed.
+ {'EXIT', Pid, _} ->
+ erase(Pid),
+ [To|Sleepers2] = Sleepers,
+ To ! self(),
+ loop(State, CurConns - 1, NbChildren - 1, Sleepers2);
+ {system, From, Request} ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
+ {State, CurConns, NbChildren, Sleepers});
+ %% Calls from the supervisor module.
+ {'$gen_call', {To, Tag}, which_children} ->
+ Pids = get_keys(true),
+ Children = [{Protocol, Pid, worker, [Protocol]}
+ || Pid <- Pids],
+ To ! {Tag, Children},
+ loop(State, CurConns, NbChildren, Sleepers);
+ {'$gen_call', {To, Tag}, count_children} ->
+ Counts = [{specs, 1}, {active, NbChildren},
+ {supervisors, 0}, {workers, NbChildren}],
+ To ! {Tag, Counts},
+ loop(State, CurConns, NbChildren, Sleepers);
+ {'$gen_call', {To, Tag}, _} ->
+ To ! {Tag, {error, ?MODULE}},
+ loop(State, CurConns, NbChildren, Sleepers)
+ end.
+
+system_continue(_, _, {State, CurConns, NbChildren, Sleepers}) ->
+ loop(State, CurConns, NbChildren, Sleepers).
+
+-spec system_terminate(any(), _, _, _) -> no_return().
+system_terminate(Reason, _, _, _) ->
+ exit(Reason).
+
+system_code_change(Misc, _, _, _) ->
+ {ok, Misc}.
diff --git a/src/ranch_listener.erl b/src/ranch_listener.erl
index 408fbcd..a53cc75 100644
--- a/src/ranch_listener.erl
+++ b/src/ranch_listener.erl
@@ -19,7 +19,6 @@
%% API.
-export([start_link/3]).
-export([stop/1]).
--export([add_connection/2]).
-export([remove_connection/1]).
-export([get_port/1]).
-export([set_port/2]).
@@ -40,8 +39,7 @@
ref :: any(),
max_conns = undefined :: ranch:max_conns(),
port = undefined :: undefined | inet:port_number(),
- proto_opts = undefined :: any(),
- rm_diff = 0 :: non_neg_integer()
+ proto_opts = undefined :: any()
}).
%% API.
@@ -56,27 +54,16 @@ start_link(Ref, MaxConns, ProtoOpts) ->
stop(ServerPid) ->
gen_server:call(ServerPid, stop).
-%% @doc Add a connection to the listener's pool.
--spec add_connection(pid(), pid()) -> non_neg_integer().
-add_connection(ServerPid, ConnPid) ->
- ok = gen_server:cast(ServerPid, {add_connection, ConnPid}),
- ranch_server:add_connection(ServerPid).
-
%% @doc Remove this process' connection from the pool.
%%
%% Useful if you have long-lived connections that aren't taking up
%% resources and shouldn't be counted in the limited number of running
%% connections.
--spec remove_connection(pid()) -> non_neg_integer().
+-spec remove_connection(pid()) -> ok.
remove_connection(ServerPid) ->
- try
- Count = ranch_server:remove_connection(ServerPid),
- ok = gen_server:cast(ServerPid, remove_connection),
- Count
- catch
- error:badarg -> % Max conns = infinity
- 0
- end.
+ ConnsSup = ranch_server:find_connections_sup(ServerPid),
+ ConnsSup ! {remove_connection, ServerPid},
+ ok.
%% @doc Return the listener's port.
-spec get_port(pid()) -> {ok, inet:port_number()}.
@@ -111,12 +98,8 @@ set_protocol_options(ServerPid, ProtoOpts) ->
%% gen_server.
%% @private
-init([Ref, infinity, ProtoOpts]) ->
- ok = ranch_server:insert_listener(Ref, self()),
- {ok, #state{ref=Ref, max_conns=infinity, proto_opts=ProtoOpts}};
init([Ref, MaxConns, ProtoOpts]) ->
ok = ranch_server:insert_listener(Ref, self()),
- ranch_server:add_connections_counter(self()),
{ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}.
%% @private
@@ -125,23 +108,15 @@ handle_call(get_port, _From, State=#state{port=Port}) ->
handle_call(get_max_connections, _From, State=#state{max_conns=MaxConns}) ->
{reply, {ok, MaxConns}, State};
handle_call({set_max_connections, MaxConnections}, _From,
- State=#state{ref=Ref, max_conns=CurrMax, rm_diff=CurrDiff}) ->
- RmDiff = case {MaxConnections, CurrMax} of
- {infinity, _} -> % moving to infinity, delete connection key
- ranch_server:remove_connections_counter(self()),
- 0;
- {_, infinity} -> % moving away from infinity, create connection key
- ranch_server:add_connections_counter(self()),
- CurrDiff;
- {_, _} -> % stay current
- CurrDiff
- end,
- ranch_server:send_to_acceptors(Ref, {set_max_conns, MaxConnections}),
- {reply, ok, State#state{max_conns=MaxConnections, rm_diff=RmDiff}};
+ State=#state{ref=Ref}) ->
+ ConnsSup = ranch_server:lookup_connections_sup(Ref),
+ ConnsSup ! {set_max_conns, MaxConnections},
+ {reply, ok, State#state{max_conns=MaxConnections}};
handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) ->
{reply, {ok, ProtoOpts}, State};
handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{ref=Ref}) ->
- ranch_server:send_to_acceptors(Ref, {set_opts, ProtoOpts}),
+ ConnsSup = ranch_server:lookup_connections_sup(Ref),
+ ConnsSup ! {set_opts, ProtoOpts},
{reply, ok, State#state{proto_opts=ProtoOpts}};
handle_call(stop, _From, State) ->
{stop, normal, stopped, State};
@@ -149,24 +124,12 @@ handle_call(_, _From, State) ->
{reply, ignored, State}.
%% @private
-handle_cast({add_connection, ConnPid}, State) ->
- _ = erlang:monitor(process, ConnPid),
- {noreply, State};
-handle_cast(remove_connection, State=#state{max_conns=infinity}) ->
- {noreply, State};
-handle_cast(remove_connection, State=#state{rm_diff=RmDiff}) ->
- {noreply, State#state{rm_diff=RmDiff + 1}};
handle_cast({set_port, Port}, State) ->
{noreply, State#state{port=Port}};
handle_cast(_Msg, State) ->
{noreply, State}.
%% @private
-handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=0}) ->
- _ = ranch_server:remove_connection(self()),
- {noreply, State};
-handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=RmDiff}) ->
- {noreply, State#state{rm_diff=RmDiff - 1}};
handle_info(_Info, State) ->
{noreply, State}.
diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl
index 0147cf2..1d346aa 100644
--- a/src/ranch_listener_sup.erl
+++ b/src/ranch_listener_sup.erl
@@ -30,7 +30,7 @@ start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
supervisor:start_link(?MODULE, {
Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts
- }).
+ }).
%% supervisor.
@@ -38,15 +38,15 @@ init({Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts}) ->
ChildSpecs = [
%% listener
{ranch_listener, {ranch_listener, start_link,
- [Ref, MaxConns, ProtoOpts]},
- permanent, 5000, worker, [ranch_listener]},
+ [Ref, MaxConns, ProtoOpts]},
+ permanent, 5000, worker, [ranch_listener]},
%% conns_sup
- {ranch_conns_sup, {ranch_conns_sup, start_link, [Ref]},
- permanent, infinity, supervisor, [ranch_conns_sup]},
+ {ranch_conns_sup, {ranch_conns_sup, start_link,
+ [Ref, Transport, Protocol]},
+ permanent, infinity, supervisor, [ranch_conns_sup]},
%% acceptors_sup
{ranch_acceptors_sup, {ranch_acceptors_sup, start_link,
- [Ref, NbAcceptors, Transport, TransOpts, Protocol]
- }, permanent, infinity, supervisor, [ranch_acceptors_sup]}
+ [Ref, NbAcceptors, Transport, TransOpts]
+ }, permanent, infinity, supervisor, [ranch_acceptors_sup]}
],
{ok, {{rest_for_one, 10, 10}, ChildSpecs}}.
-
diff --git a/src/ranch_server.erl b/src/ranch_server.erl
index c6d7c19..77f11c5 100644
--- a/src/ranch_server.erl
+++ b/src/ranch_server.erl
@@ -22,13 +22,8 @@
-export([lookup_listener/1]).
-export([set_connections_sup/2]).
-export([lookup_connections_sup/1]).
--export([add_acceptor/2]).
--export([send_to_acceptors/2]).
--export([add_connection/1]).
+-export([find_connections_sup/1]).
-export([count_connections/1]).
--export([remove_connection/1]).
--export([add_connections_counter/1]).
--export([remove_connections_counter/1]).
%% gen_server.
-export([init/1]).
@@ -40,8 +35,7 @@
-define(TAB, ?MODULE).
--type key() :: {listener | acceptors, any()}.
--type monitors() :: [{{reference(), pid()}, key()}].
+-type monitors() :: [{{reference(), pid()}, any()}].
-record(state, {
monitors = [] :: monitors()
}).
@@ -68,6 +62,7 @@ lookup_listener(Ref) ->
-spec set_connections_sup(any(), pid()) -> ok.
set_connections_sup(Ref, Pid) ->
true = ets:update_element(?TAB, {listener, Ref}, {3, Pid}),
+ true = ets:insert_new(?TAB, {{conns_sup, lookup_listener(Ref)}, Pid}),
ok.
%% @doc Lookup a connection supervisor used by specific listener.
@@ -75,56 +70,15 @@ set_connections_sup(Ref, Pid) ->
lookup_connections_sup(Ref) ->
ets:lookup_element(?TAB, {listener, Ref}, 3).
-%% @doc Add an acceptor for the given listener.
--spec add_acceptor(any(), pid()) -> ok.
-add_acceptor(Ref, Pid) ->
- gen_server:cast(?MODULE, {add_acceptor, Ref, Pid}).
-
-%% @doc Send a message to all acceptors of the given listener.
--spec send_to_acceptors(any(), any()) -> ok.
-send_to_acceptors(Ref, Msg) ->
- Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2),
- _ = [Pid ! Msg || Pid <- Acceptors],
- ok.
-
-%% @doc Add a connection to the connection pool.
-%%
-%% Also return the number of connections in the pool after this operation.
--spec add_connection(pid()) -> non_neg_integer().
-add_connection(ListenerPid) ->
- ets:update_counter(?TAB, {connections, ListenerPid}, 1).
+%% @doc Find a connection supervisor using the listener pid.
+-spec find_connections_sup(pid()) -> pid().
+find_connections_sup(Pid) ->
+ ets:lookup_element(?TAB, {conns_sup, Pid}, 2).
%% @doc Count the number of connections in the connection pool.
--spec count_connections(pid()) -> non_neg_integer().
-count_connections(ListenerPid) ->
- try
- ets:update_counter(?TAB, {connections, ListenerPid}, 0)
- catch
- error:badarg -> % Max conns = infinity
- 0
- end.
-
-%% @doc Remove a connection from the connection pool.
-%%
-%% Also return the number of connections in the pool after this operation.
--spec remove_connection(pid()) -> non_neg_integer().
-remove_connection(ListenerPid) ->
- ets:update_counter(?TAB, {connections, ListenerPid}, -1).
-
-
-%% @doc Add a connections counter to the connection pool
-%%
-%% Should only be used by ranch listeners when settings regarding the max
-%% number of connections change.
-add_connections_counter(Pid) ->
- true = ets:insert_new(?TAB, {{connections, Pid}, 0}).
-
-%% @doc remove a connections counter from the connection pool
-%%
-%% Should only be used by ranch listeners when settings regarding the max
-%% number of connections change.
-remove_connections_counter(Pid) ->
- true = ets:delete(?TAB, {connections, Pid}).
+-spec count_connections(any()) -> non_neg_integer().
+count_connections(Ref) ->
+ ranch_conns_sup:active_connections(lookup_connections_sup(Ref)).
%% gen_server.
@@ -138,27 +92,18 @@ handle_call(_Request, _From, State) ->
%% @private
handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) ->
- true = ets:insert_new(?TAB, {{acceptors, Ref}, []}),
MonitorRef = erlang:monitor(process, Pid),
{noreply, State#state{
- monitors=[{{MonitorRef, Pid}, {listener, Ref}}|Monitors]}};
-handle_cast({add_acceptor, Ref, Pid}, State=#state{monitors=Monitors}) ->
- MonitorRef = erlang:monitor(process, Pid),
- Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2),
- true = ets:insert(?TAB, {{acceptors, Ref}, [Pid|Acceptors]}),
- {noreply, State#state{
- monitors=[{{MonitorRef, Pid}, {acceptors, Ref}}|Monitors]}};
-handle_cast({add_connection, Pid}, State) ->
- _ = erlang:monitor(process, Pid),
- {noreply, State};
+ monitors=[{{MonitorRef, Pid}, Ref}|Monitors]}};
handle_cast(_Request, State) ->
{noreply, State}.
%% @private
handle_info({'DOWN', MonitorRef, process, Pid, _},
State=#state{monitors=Monitors}) ->
- {_, Key} = lists:keyfind({MonitorRef, Pid}, 1, Monitors),
- Monitors2 = remove_process(Key, MonitorRef, Pid, Monitors),
+ {_, Ref} = lists:keyfind({MonitorRef, Pid}, 1, Monitors),
+ true = ets:delete(?TAB, {listener, Ref}),
+ Monitors2 = lists:keydelete({MonitorRef, Pid}, 1, Monitors),
{noreply, State#state{monitors=Monitors2}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -170,22 +115,3 @@ terminate(_Reason, _State) ->
%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
-%% Internal.
-
--spec remove_process(key(), reference(), pid(), Monitors)
- -> Monitors when Monitors::monitors() .
-remove_process(Key = {listener, Ref}, MonitorRef, Pid, Monitors) ->
- true = ets:delete(?TAB, Key),
- true = ets:delete(?TAB, {acceptors, Ref}),
- remove_connections_counter(Pid),
- lists:keydelete({MonitorRef, Pid}, 1, Monitors);
-remove_process(Key = {acceptors, _}, MonitorRef, Pid, Monitors) ->
- try
- Acceptors = ets:lookup_element(?TAB, Key, 2),
- true = ets:update_element(?TAB, Key, {2, lists:delete(Pid, Acceptors)})
- catch
- error:_ ->
- ok
- end,
- lists:keydelete({MonitorRef, Pid}, 1, Monitors).
diff --git a/src/ranch_sup.erl b/src/ranch_sup.erl
index ad1c558..ddf2f69 100644
--- a/src/ranch_sup.erl
+++ b/src/ranch_sup.erl
@@ -34,7 +34,7 @@ start_link() ->
init([]) ->
ranch_server = ets:new(ranch_server, [
- ordered_set, public, named_table, {write_concurrency, true}]),
+ ordered_set, public, named_table]),
Procs = [
{ranch_server, {ranch_server, start_link, []},
permanent, 5000, worker, [ranch_server]}