aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ranch.erl63
-rw-r--r--src/ranch_acceptor.erl94
-rw-r--r--src/ranch_acceptors_sup.erl20
-rw-r--r--src/ranch_conns_sup.erl172
-rw-r--r--src/ranch_listener.erl179
-rw-r--r--src/ranch_listener_sup.erl21
-rw-r--r--src/ranch_protocol.erl2
-rw-r--r--src/ranch_server.erl195
-rw-r--r--src/ranch_sup.erl2
9 files changed, 316 insertions, 432 deletions
diff --git a/src/ranch.erl b/src/ranch.erl
index 6f1b6fe..ea5f59b 100644
--- a/src/ranch.erl
+++ b/src/ranch.erl
@@ -19,6 +19,7 @@
-export([stop_listener/1]).
-export([child_spec/6]).
-export([accept_ack/1]).
+-export([remove_connection/1]).
-export([get_port/1]).
-export([get_max_connections/1]).
-export([set_max_connections/2]).
@@ -67,17 +68,23 @@ start_listener(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts)
true ->
Res = supervisor:start_child(ranch_sup, child_spec(Ref, NbAcceptors,
Transport, TransOpts, Protocol, ProtoOpts)),
- case proplists:get_value(socket, TransOpts) of
- undefined ->
- ok;
- Socket ->
- %% change the controlling process so the caller dying doesn't
- %% close the port
- ListenerPid = ranch_server:lookup_listener(Ref),
+ Socket = proplists:get_value(socket, TransOpts),
+ case Res of
+ {ok, Pid} when Socket =/= undefined ->
+ %% Give ownership of the socket to ranch_acceptors_sup
+ %% to make sure the socket stays open as long as the
+ %% listener is alive. If the socket closes however there
+ %% will be no way to recover because we don't know how
+ %% to open it again.
+ Children = supervisor:which_children(Pid),
+ {_, AcceptorsSup, _, _}
+ = lists:keyfind(ranch_acceptors_sup, 1, Children),
%%% Note: the catch is here because SSL crashes when you change
%%% the controlling process of a listen socket because of a bug.
%%% The bug will be fixed in R16.
- catch(Transport:controlling_process(Socket, ListenerPid))
+ catch Transport:controlling_process(Socket, AcceptorsSup);
+ _ ->
+ ok
end,
Res
end.
@@ -90,7 +97,8 @@ start_listener(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts)
stop_listener(Ref) ->
case supervisor:terminate_child(ranch_sup, {ranch_listener_sup, Ref}) of
ok ->
- supervisor:delete_child(ranch_sup, {ranch_listener_sup, Ref});
+ _ = supervisor:delete_child(ranch_sup, {ranch_listener_sup, Ref}),
+ ranch_server:cleanup_listener_opts(Ref);
{error, Reason} ->
{error, Reason}
end.
@@ -115,36 +123,40 @@ child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts)
%%
%% Effectively used to make sure the socket control has been given to
%% the protocol process before starting to use it.
--spec accept_ack(pid()) -> ok.
-accept_ack(ListenerPid) ->
- receive {shoot, ListenerPid} -> ok end.
+-spec accept_ack(any()) -> ok.
+accept_ack(Ref) ->
+ receive {shoot, Ref} -> ok end.
+
+%% @doc Remove the calling process' connection from the pool.
+%%
+%% Useful if you have long-lived connections that aren't taking up
+%% resources and shouldn't be counted in the limited number of running
+%% connections.
+-spec remove_connection(any()) -> ok.
+remove_connection(Ref) ->
+ ConnsSup = ranch_server:get_connections_sup(Ref),
+ ConnsSup ! {remove_connection, Ref},
+ ok.
%% @doc Return the listener's port.
-spec get_port(any()) -> inet:port_number().
get_port(Ref) ->
- ListenerPid = ranch_server:lookup_listener(Ref),
- {ok, Port} = ranch_listener:get_port(ListenerPid),
- Port.
+ ranch_server:get_port(Ref).
%% @doc Return the max number of connections allowed concurrently.
-spec get_max_connections(any()) -> max_conns().
get_max_connections(Ref) ->
- ListenerPid = ranch_server:lookup_listener(Ref),
- {ok, MaxConnections} = ranch_listener:get_max_connections(ListenerPid),
- MaxConnections.
+ ranch_server:get_max_connections(Ref).
%% @doc Set the max number of connections allowed concurrently.
-spec set_max_connections(any(), max_conns()) -> ok.
set_max_connections(Ref, MaxConnections) ->
- ListenerPid = ranch_server:lookup_listener(Ref),
- ok = ranch_listener:set_max_connections(ListenerPid, MaxConnections).
+ ranch_server:set_max_connections(Ref, MaxConnections).
%% @doc Return the current protocol options for the given listener.
-spec get_protocol_options(any()) -> any().
get_protocol_options(Ref) ->
- ListenerPid = ranch_server:lookup_listener(Ref),
- {ok, ProtoOpts} = ranch_listener:get_protocol_options(ListenerPid),
- ProtoOpts.
+ ranch_server:get_protocol_options(Ref).
%% @doc Upgrade the protocol options for the given listener.
%%
@@ -152,9 +164,8 @@ get_protocol_options(Ref) ->
%% newly accepted connections receive the new protocol options. This has
%% no effect on the currently opened connections.
-spec set_protocol_options(any(), any()) -> ok.
-set_protocol_options(Ref, ProtoOpts) ->
- ListenerPid = ranch_server:lookup_listener(Ref),
- ok = ranch_listener:set_protocol_options(ListenerPid, ProtoOpts).
+set_protocol_options(Ref, Opts) ->
+ ranch_server:set_protocol_options(Ref, Opts).
%% @doc Filter a list of options and remove all unwanted values.
%%
diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl
index 63d24c8..44cf52d 100644
--- a/src/ranch_acceptor.erl
+++ b/src/ranch_acceptor.erl
@@ -16,89 +16,31 @@
-module(ranch_acceptor).
%% API.
--export([start_link/6]).
+-export([start_link/3]).
%% Internal.
--export([init/7]).
--export([loop/7]).
+-export([loop/3]).
%% API.
--spec start_link(any(), inet:socket(), module(), module(), pid(), pid())
+-spec start_link(inet:socket(), module(), pid())
-> {ok, pid()}.
-start_link(Ref, LSocket, Transport, Protocol, ListenerPid, ConnsSup) ->
- {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid),
- {ok, Opts} = ranch_listener:get_protocol_options(ListenerPid),
- Pid = spawn_link(?MODULE, init,
- [LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup]),
- ok = ranch_server:add_acceptor(Ref, Pid),
+start_link(LSocket, Transport, ConnsSup) ->
+ Pid = spawn_link(?MODULE, loop, [LSocket, Transport, ConnsSup]),
{ok, Pid}.
%% Internal.
--spec init(inet:socket(), module(), module(),
- non_neg_integer(), any(), pid(), pid()) -> no_return().
-init(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) ->
- async_accept(LSocket, Transport),
- loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup).
-
--spec loop(inet:socket(), module(), module(),
- ranch:max_conns(), any(), pid(), pid()) -> no_return().
-loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) ->
- receive
- %% We couldn't accept the socket but it's safe to continue.
- {accept, continue} ->
- ?MODULE:init(LSocket, Transport, Protocol,
- MaxConns, Opts, ListenerPid, ConnsSup);
- %% Found my sockets!
- {accept, CSocket} ->
- {ok, ConnPid} = supervisor:start_child(ConnsSup,
- [ListenerPid, CSocket, Transport, Protocol, Opts]),
- Transport:controlling_process(CSocket, ConnPid),
- ConnPid ! {shoot, ListenerPid},
- {ok, MaxConns2} = case MaxConns of
- infinity ->
- {ok, infinity};
- _ ->
- NbConns = ranch_listener:add_connection(ListenerPid, ConnPid),
- maybe_wait(ListenerPid, MaxConns, NbConns)
- end,
- ?MODULE:init(LSocket, Transport, Protocol,
- MaxConns2, Opts, ListenerPid, ConnsSup);
- %% Upgrade the max number of connections allowed concurrently.
- {set_max_conns, MaxConns2} ->
- ?MODULE:loop(LSocket, Transport, Protocol,
- MaxConns2, Opts, ListenerPid, ConnsSup);
- %% Upgrade the protocol options.
- {set_opts, Opts2} ->
- ?MODULE:loop(LSocket, Transport, Protocol,
- MaxConns, Opts2, ListenerPid, ConnsSup)
- end.
-
--spec maybe_wait(pid(), MaxConns, non_neg_integer())
- -> {ok, MaxConns} when MaxConns::ranch:max_conns().
-maybe_wait(_, MaxConns, NbConns) when MaxConns > NbConns ->
- {ok, MaxConns};
-maybe_wait(ListenerPid, MaxConns, NbConns) ->
- receive
- {set_max_conns, MaxConns2} ->
- maybe_wait(ListenerPid, MaxConns2, NbConns)
- after 0 ->
- NbConns2 = ranch_server:count_connections(ListenerPid),
- maybe_wait(ListenerPid, MaxConns, NbConns2)
- end.
-
--spec async_accept(inet:socket(), module()) -> ok.
-async_accept(LSocket, Transport) ->
- AcceptorPid = self(),
- _ = spawn_link(fun() ->
- case Transport:accept(LSocket, infinity) of
- {ok, CSocket} ->
- Transport:controlling_process(CSocket, AcceptorPid),
- AcceptorPid ! {accept, CSocket};
- %% We want to crash if the listening socket got closed.
- {error, Reason} when Reason =/= closed ->
- AcceptorPid ! {accept, continue}
- end
- end),
- ok.
+-spec loop(inet:socket(), module(), pid()) -> no_return().
+loop(LSocket, Transport, ConnsSup) ->
+ _ = case Transport:accept(LSocket, infinity) of
+ {ok, CSocket} ->
+ Transport:controlling_process(CSocket, ConnsSup),
+ %% This call will not return until process has been started
+ %% AND we are below the maximum number of connections.
+ ranch_conns_sup:start_protocol(ConnsSup, CSocket);
+ %% We want to crash if the listening socket got closed.
+ {error, Reason} when Reason =/= closed ->
+ ok
+ end,
+ ?MODULE:loop(LSocket, Transport, ConnsSup).
diff --git a/src/ranch_acceptors_sup.erl b/src/ranch_acceptors_sup.erl
index f1908a6..18574fa 100644
--- a/src/ranch_acceptors_sup.erl
+++ b/src/ranch_acceptors_sup.erl
@@ -17,24 +17,22 @@
-behaviour(supervisor).
%% API.
--export([start_link/5]).
+-export([start_link/4]).
%% supervisor.
-export([init/1]).
%% API.
--spec start_link(any(), non_neg_integer(), module(), any(),
- module()) -> {ok, pid()}.
-start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol) ->
- supervisor:start_link(?MODULE, [Ref, NbAcceptors, Transport, TransOpts,
- Protocol]).
+-spec start_link(any(), non_neg_integer(), module(), any())
+ -> {ok, pid()}.
+start_link(Ref, NbAcceptors, Transport, TransOpts) ->
+ supervisor:start_link(?MODULE, [Ref, NbAcceptors, Transport, TransOpts]).
%% supervisor.
-init([Ref, NbAcceptors, Transport, TransOpts, Protocol]) ->
- ListenerPid = ranch_server:lookup_listener(Ref),
- ConnsPid = ranch_server:lookup_connections_sup(Ref),
+init([Ref, NbAcceptors, Transport, TransOpts]) ->
+ ConnsSup = ranch_server:get_connections_sup(Ref),
LSocket = case proplists:get_value(socket, TransOpts) of
undefined ->
{ok, Socket} = Transport:listen(TransOpts),
@@ -43,10 +41,10 @@ init([Ref, NbAcceptors, Transport, TransOpts, Protocol]) ->
Socket
end,
{ok, {_, Port}} = Transport:sockname(LSocket),
- ranch_listener:set_port(ListenerPid, Port),
+ ranch_server:set_port(Ref, Port),
Procs = [
{{acceptor, self(), N}, {ranch_acceptor, start_link, [
- Ref, LSocket, Transport, Protocol, ListenerPid, ConnsPid
+ LSocket, Transport, ConnsSup
]}, permanent, brutal_kill, worker, []}
|| N <- lists:seq(1, NbAcceptors)],
{ok, {{one_for_one, 10, 10}, Procs}}.
diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl
index 3cc09be..eaa4d00 100644
--- a/src/ranch_conns_sup.erl
+++ b/src/ranch_conns_sup.erl
@@ -13,30 +13,170 @@
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
%% @private
+%%
+%% Make sure to never reload this module outside a release upgrade,
+%% as calling l(ranch_conns_sup) twice will kill the process and all
+%% the currently open connections.
-module(ranch_conns_sup).
--behaviour(supervisor).
%% API.
--export([start_link/1]).
--export([start_protocol/5]).
+-export([start_link/3]).
+-export([start_protocol/2]).
+-export([active_connections/1]).
+
+%% Supervisor internals.
+-export([init/4]).
+-export([system_continue/3]).
+-export([system_terminate/4]).
+-export([system_code_change/4]).
-%% supervisor.
--export([init/1]).
+-record(state, {
+ parent = undefined :: pid(),
+ ref :: any(),
+ transport = undefined :: module(),
+ protocol = undefined :: module(),
+ opts :: any(),
+ max_conns = undefined :: non_neg_integer() | infinity
+}).
%% API.
--spec start_link(any()) -> {ok, pid()}.
-start_link(Ref) ->
- supervisor:start_link(?MODULE, Ref).
+-spec start_link(any(), module(), module()) -> {ok, pid()}.
+start_link(Ref, Transport, Protocol) ->
+ proc_lib:start_link(?MODULE, init, [self(), Ref, Transport, Protocol]).
+
+%% We can safely assume we are on the same node as the supervisor.
+%%
+%% We can also safely avoid having a monitor and a timeout here
+%% because only three things can happen:
+%% * The supervisor died; rest_for_one strategy killed all acceptors
+%% so this very calling process is going to di--
+%% * There's too many connections, the supervisor will resume the
+%% acceptor only when we get below the limit again.
+%% * The supervisor is overloaded, there's either too many acceptors
+%% or the max_connections limit is too large. It's better if we
+%% don't keep accepting connections because this leaves
+%% more room for the situation to be resolved.
+%%
+%% We do not need the reply, we only need the ok from the supervisor
+%% to continue. The supervisor sends its own pid when the acceptor can
+%% continue.
+-spec start_protocol(pid(), inet:socket()) -> ok.
+start_protocol(SupPid, Socket) ->
+ SupPid ! {?MODULE, start_protocol, self(), Socket},
+ receive SupPid -> ok end.
--spec start_protocol(pid(), inet:socket(), module(), module(), any())
- -> {ok, pid()}.
-start_protocol(ListenerPid, Socket, Transport, Protocol, Opts) ->
- Protocol:start_link(ListenerPid, Socket, Transport, Opts).
+%% We can't make the above assumptions here. This function might be
+%% called from anywhere.
+-spec active_connections(pid()) -> non_neg_integer().
+active_connections(SupPid) ->
+ Tag = erlang:monitor(process, SupPid),
+ erlang:send(SupPid, {?MODULE, active_connections, self(), Tag},
+ [noconnect]),
+ receive
+ {Tag, Ret} ->
+ erlang:demonitor(Tag, [flush]),
+ Ret;
+ {'DOWN', Tag, _, _, noconnection} ->
+ exit({nodedown, node(SupPid)});
+ {'DOWN', Tag, _, _, Reason} ->
+ exit(Reason)
+ after 5000 ->
+ erlang:demonitor(Tag, [flush]),
+ exit(timeout)
+ end.
-%% supervisor.
+%% Supervisor internals.
-init(Ref) ->
+-spec init(pid(), any(), module(), module()) -> no_return().
+init(Parent, Ref, Transport, Protocol) ->
+ process_flag(trap_exit, true),
ok = ranch_server:set_connections_sup(Ref, self()),
- {ok, {{simple_one_for_one, 0, 1}, [{?MODULE, {?MODULE, start_protocol, []},
- temporary, brutal_kill, worker, [?MODULE]}]}}.
+ MaxConns = ranch_server:get_max_connections(Ref),
+ Opts = ranch_server:get_protocol_options(Ref),
+ ok = proc_lib:init_ack(Parent, {ok, self()}),
+ loop(#state{parent=Parent, ref=Ref, transport=Transport,
+ protocol=Protocol, opts=Opts, max_conns=MaxConns}, 0, 0, []).
+
+loop(State=#state{parent=Parent, ref=Ref,
+ transport=Transport, protocol=Protocol, opts=Opts,
+ max_conns=MaxConns}, CurConns, NbChildren, Sleepers) ->
+ receive
+ {?MODULE, start_protocol, To, Socket} ->
+ case Protocol:start_link(Ref, Socket, Transport, Opts) of
+ {ok, Pid} ->
+ Transport:controlling_process(Socket, Pid),
+ Pid ! {shoot, Ref},
+ put(Pid, true),
+ CurConns2 = CurConns + 1,
+ if CurConns2 < MaxConns ->
+ To ! self(),
+ loop(State, CurConns2, NbChildren + 1,
+ Sleepers);
+ true ->
+ loop(State, CurConns2, NbChildren + 1,
+ [To|Sleepers])
+ end;
+ _ ->
+ To ! self(),
+ loop(State, CurConns, NbChildren, Sleepers)
+ end;
+ {?MODULE, active_connections, To, Tag} ->
+ To ! {Tag, CurConns},
+ loop(State, CurConns, NbChildren, Sleepers);
+ %% Remove a connection from the count of connections.
+ {remove_connection, Ref} ->
+ loop(State, CurConns - 1, NbChildren, Sleepers);
+ %% Upgrade the max number of connections allowed concurrently.
+ %% We resume all sleeping acceptors if this number increases.
+ {set_max_conns, MaxConns2} when MaxConns2 > MaxConns ->
+ _ = [To ! self() || To <- Sleepers],
+ loop(State#state{max_conns=MaxConns2},
+ CurConns, NbChildren, []);
+ {set_max_conns, MaxConns2} ->
+ loop(State#state{max_conns=MaxConns2},
+ CurConns, NbChildren, Sleepers);
+ %% Upgrade the protocol options.
+ {set_opts, Opts2} ->
+ loop(State#state{opts=Opts2},
+ CurConns, NbChildren, Sleepers);
+ {'EXIT', Parent, Reason} ->
+ exit(Reason);
+ {'EXIT', Pid, _} when Sleepers =:= [] ->
+ erase(Pid),
+ loop(State, CurConns - 1, NbChildren - 1, Sleepers);
+ %% Resume a sleeping acceptor if needed.
+ {'EXIT', Pid, _} ->
+ erase(Pid),
+ [To|Sleepers2] = Sleepers,
+ To ! self(),
+ loop(State, CurConns - 1, NbChildren - 1, Sleepers2);
+ {system, From, Request} ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
+ {State, CurConns, NbChildren, Sleepers});
+ %% Calls from the supervisor module.
+ {'$gen_call', {To, Tag}, which_children} ->
+ Pids = get_keys(true),
+ Children = [{Protocol, Pid, worker, [Protocol]}
+ || Pid <- Pids],
+ To ! {Tag, Children},
+ loop(State, CurConns, NbChildren, Sleepers);
+ {'$gen_call', {To, Tag}, count_children} ->
+ Counts = [{specs, 1}, {active, NbChildren},
+ {supervisors, 0}, {workers, NbChildren}],
+ To ! {Tag, Counts},
+ loop(State, CurConns, NbChildren, Sleepers);
+ {'$gen_call', {To, Tag}, _} ->
+ To ! {Tag, {error, ?MODULE}},
+ loop(State, CurConns, NbChildren, Sleepers)
+ end.
+
+system_continue(_, _, {State, CurConns, NbChildren, Sleepers}) ->
+ loop(State, CurConns, NbChildren, Sleepers).
+
+-spec system_terminate(any(), _, _, _) -> no_return().
+system_terminate(Reason, _, _, _) ->
+ exit(Reason).
+
+system_code_change(Misc, _, _, _) ->
+ {ok, Misc}.
diff --git a/src/ranch_listener.erl b/src/ranch_listener.erl
deleted file mode 100644
index 408fbcd..0000000
--- a/src/ranch_listener.erl
+++ /dev/null
@@ -1,179 +0,0 @@
-%% Copyright (c) 2011-2012, Loïc Hoguin <[email protected]>
-%%
-%% Permission to use, copy, modify, and/or distribute this software for any
-%% purpose with or without fee is hereby granted, provided that the above
-%% copyright notice and this permission notice appear in all copies.
-%%
-%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
-%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
-%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
-%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
-%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
-%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
-%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-%% @doc Public API for managing listeners.
--module(ranch_listener).
--behaviour(gen_server).
-
-%% API.
--export([start_link/3]).
--export([stop/1]).
--export([add_connection/2]).
--export([remove_connection/1]).
--export([get_port/1]).
--export([set_port/2]).
--export([get_max_connections/1]).
--export([set_max_connections/2]).
--export([get_protocol_options/1]).
--export([set_protocol_options/2]).
-
-%% gen_server.
--export([init/1]).
--export([handle_call/3]).
--export([handle_cast/2]).
--export([handle_info/2]).
--export([terminate/2]).
--export([code_change/3]).
-
--record(state, {
- ref :: any(),
- max_conns = undefined :: ranch:max_conns(),
- port = undefined :: undefined | inet:port_number(),
- proto_opts = undefined :: any(),
- rm_diff = 0 :: non_neg_integer()
-}).
-
-%% API.
-
-%% @private
--spec start_link(any(), non_neg_integer(), any()) -> {ok, pid()}.
-start_link(Ref, MaxConns, ProtoOpts) ->
- gen_server:start_link(?MODULE, [Ref, MaxConns, ProtoOpts], []).
-
-%% @private
--spec stop(pid()) -> stopped.
-stop(ServerPid) ->
- gen_server:call(ServerPid, stop).
-
-%% @doc Add a connection to the listener's pool.
--spec add_connection(pid(), pid()) -> non_neg_integer().
-add_connection(ServerPid, ConnPid) ->
- ok = gen_server:cast(ServerPid, {add_connection, ConnPid}),
- ranch_server:add_connection(ServerPid).
-
-%% @doc Remove this process' connection from the pool.
-%%
-%% Useful if you have long-lived connections that aren't taking up
-%% resources and shouldn't be counted in the limited number of running
-%% connections.
--spec remove_connection(pid()) -> non_neg_integer().
-remove_connection(ServerPid) ->
- try
- Count = ranch_server:remove_connection(ServerPid),
- ok = gen_server:cast(ServerPid, remove_connection),
- Count
- catch
- error:badarg -> % Max conns = infinity
- 0
- end.
-
-%% @doc Return the listener's port.
--spec get_port(pid()) -> {ok, inet:port_number()}.
-get_port(ServerPid) ->
- gen_server:call(ServerPid, get_port).
-
-%% @private
--spec set_port(pid(), inet:port_number()) -> ok.
-set_port(ServerPid, Port) ->
- gen_server:cast(ServerPid, {set_port, Port}).
-
-%% @doc Return the max number of connections allowed concurrently.
--spec get_max_connections(pid()) -> {ok, ranch:max_conns()}.
-get_max_connections(ServerPid) ->
- gen_server:call(ServerPid, get_max_connections).
-
-%% @doc Set the max number of connections allowed concurrently.
--spec set_max_connections(pid(), ranch:max_conns()) -> ok.
-set_max_connections(ServerPid, MaxConnections) ->
- gen_server:call(ServerPid, {set_max_connections, MaxConnections}).
-
-%% @doc Return the current protocol options.
--spec get_protocol_options(pid()) -> {ok, any()}.
-get_protocol_options(ServerPid) ->
- gen_server:call(ServerPid, get_protocol_options).
-
-%% @doc Upgrade the protocol options.
--spec set_protocol_options(pid(), any()) -> ok.
-set_protocol_options(ServerPid, ProtoOpts) ->
- gen_server:call(ServerPid, {set_protocol_options, ProtoOpts}).
-
-%% gen_server.
-
-%% @private
-init([Ref, infinity, ProtoOpts]) ->
- ok = ranch_server:insert_listener(Ref, self()),
- {ok, #state{ref=Ref, max_conns=infinity, proto_opts=ProtoOpts}};
-init([Ref, MaxConns, ProtoOpts]) ->
- ok = ranch_server:insert_listener(Ref, self()),
- ranch_server:add_connections_counter(self()),
- {ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}.
-
-%% @private
-handle_call(get_port, _From, State=#state{port=Port}) ->
- {reply, {ok, Port}, State};
-handle_call(get_max_connections, _From, State=#state{max_conns=MaxConns}) ->
- {reply, {ok, MaxConns}, State};
-handle_call({set_max_connections, MaxConnections}, _From,
- State=#state{ref=Ref, max_conns=CurrMax, rm_diff=CurrDiff}) ->
- RmDiff = case {MaxConnections, CurrMax} of
- {infinity, _} -> % moving to infinity, delete connection key
- ranch_server:remove_connections_counter(self()),
- 0;
- {_, infinity} -> % moving away from infinity, create connection key
- ranch_server:add_connections_counter(self()),
- CurrDiff;
- {_, _} -> % stay current
- CurrDiff
- end,
- ranch_server:send_to_acceptors(Ref, {set_max_conns, MaxConnections}),
- {reply, ok, State#state{max_conns=MaxConnections, rm_diff=RmDiff}};
-handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) ->
- {reply, {ok, ProtoOpts}, State};
-handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{ref=Ref}) ->
- ranch_server:send_to_acceptors(Ref, {set_opts, ProtoOpts}),
- {reply, ok, State#state{proto_opts=ProtoOpts}};
-handle_call(stop, _From, State) ->
- {stop, normal, stopped, State};
-handle_call(_, _From, State) ->
- {reply, ignored, State}.
-
-%% @private
-handle_cast({add_connection, ConnPid}, State) ->
- _ = erlang:monitor(process, ConnPid),
- {noreply, State};
-handle_cast(remove_connection, State=#state{max_conns=infinity}) ->
- {noreply, State};
-handle_cast(remove_connection, State=#state{rm_diff=RmDiff}) ->
- {noreply, State#state{rm_diff=RmDiff + 1}};
-handle_cast({set_port, Port}, State) ->
- {noreply, State#state{port=Port}};
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-%% @private
-handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=0}) ->
- _ = ranch_server:remove_connection(self()),
- {noreply, State};
-handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=RmDiff}) ->
- {noreply, State#state{rm_diff=RmDiff - 1}};
-handle_info(_Info, State) ->
- {noreply, State}.
-
-%% @private
-terminate(_Reason, _State) ->
- ok.
-
-%% @private
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl
index 0147cf2..9f19123 100644
--- a/src/ranch_listener_sup.erl
+++ b/src/ranch_listener_sup.erl
@@ -28,25 +28,22 @@
-> {ok, pid()}.
start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
+ ranch_server:set_new_listener_opts(Ref, MaxConns, ProtoOpts),
supervisor:start_link(?MODULE, {
- Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts
- }).
+ Ref, NbAcceptors, Transport, TransOpts, Protocol
+ }).
%% supervisor.
-init({Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts}) ->
+init({Ref, NbAcceptors, Transport, TransOpts, Protocol}) ->
ChildSpecs = [
- %% listener
- {ranch_listener, {ranch_listener, start_link,
- [Ref, MaxConns, ProtoOpts]},
- permanent, 5000, worker, [ranch_listener]},
%% conns_sup
- {ranch_conns_sup, {ranch_conns_sup, start_link, [Ref]},
- permanent, infinity, supervisor, [ranch_conns_sup]},
+ {ranch_conns_sup, {ranch_conns_sup, start_link,
+ [Ref, Transport, Protocol]},
+ permanent, infinity, supervisor, [ranch_conns_sup]},
%% acceptors_sup
{ranch_acceptors_sup, {ranch_acceptors_sup, start_link,
- [Ref, NbAcceptors, Transport, TransOpts, Protocol]
- }, permanent, infinity, supervisor, [ranch_acceptors_sup]}
+ [Ref, NbAcceptors, Transport, TransOpts]
+ }, permanent, infinity, supervisor, [ranch_acceptors_sup]}
],
{ok, {{rest_for_one, 10, 10}, ChildSpecs}}.
-
diff --git a/src/ranch_protocol.erl b/src/ranch_protocol.erl
index 38a56c8..c788547 100644
--- a/src/ranch_protocol.erl
+++ b/src/ranch_protocol.erl
@@ -17,7 +17,7 @@
%% Start a new connection process for the given socket.
-callback start_link(
- ListenerPid::pid(),
+ Ref::any(),
Socket::any(),
Transport::module(),
ProtocolOptions::any())
diff --git a/src/ranch_server.erl b/src/ranch_server.erl
index c6d7c19..d827ae2 100644
--- a/src/ranch_server.erl
+++ b/src/ranch_server.erl
@@ -18,17 +18,17 @@
%% API.
-export([start_link/0]).
--export([insert_listener/2]).
--export([lookup_listener/1]).
+-export([set_new_listener_opts/3]).
+-export([cleanup_listener_opts/1]).
-export([set_connections_sup/2]).
--export([lookup_connections_sup/1]).
--export([add_acceptor/2]).
--export([send_to_acceptors/2]).
--export([add_connection/1]).
+-export([get_connections_sup/1]).
+-export([set_port/2]).
+-export([get_port/1]).
+-export([set_max_connections/2]).
+-export([get_max_connections/1]).
+-export([set_protocol_options/2]).
+-export([get_protocol_options/1]).
-export([count_connections/1]).
--export([remove_connection/1]).
--export([add_connections_counter/1]).
--export([remove_connections_counter/1]).
%% gen_server.
-export([init/1]).
@@ -40,8 +40,7 @@
-define(TAB, ?MODULE).
--type key() :: {listener | acceptors, any()}.
--type monitors() :: [{{reference(), pid()}, key()}].
+-type monitors() :: [{{reference(), pid()}, any()}].
-record(state, {
monitors = [] :: monitors()
}).
@@ -53,78 +52,63 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-%% @doc Insert a listener into the database.
--spec insert_listener(any(), pid()) -> ok.
-insert_listener(Ref, Pid) ->
- true = ets:insert_new(?TAB, {{listener, Ref}, Pid, undefined}),
- gen_server:cast(?MODULE, {insert_listener, Ref, Pid}).
-
-%% @doc Lookup a listener in the database.
--spec lookup_listener(any()) -> pid().
-lookup_listener(Ref) ->
- ets:lookup_element(?TAB, {listener, Ref}, 2).
+%% @private
+-spec set_new_listener_opts(any(), ranch:max_conns(), any()) -> ok.
+set_new_listener_opts(Ref, MaxConns, Opts) ->
+ gen_server:call(?MODULE, {set_new_listener_opts, Ref, MaxConns, Opts}).
+
+%% @doc Cleanup listener options after it has been stopped.
+-spec cleanup_listener_opts(any()) -> ok.
+cleanup_listener_opts(Ref) ->
+ _ = ets:delete(?TAB, {port, Ref}),
+ _ = ets:delete(?TAB, {max_conns, Ref}),
+ _ = ets:delete(?TAB, {opts, Ref}),
+ ok.
%% @doc Set a connection supervisor associated with specific listener.
-spec set_connections_sup(any(), pid()) -> ok.
set_connections_sup(Ref, Pid) ->
- true = ets:update_element(?TAB, {listener, Ref}, {3, Pid}),
- ok.
+ gen_server:call(?MODULE, {set_connections_sup, Ref, Pid}).
-%% @doc Lookup a connection supervisor used by specific listener.
--spec lookup_connections_sup(any()) -> pid() | undefined.
-lookup_connections_sup(Ref) ->
- ets:lookup_element(?TAB, {listener, Ref}, 3).
-
-%% @doc Add an acceptor for the given listener.
--spec add_acceptor(any(), pid()) -> ok.
-add_acceptor(Ref, Pid) ->
- gen_server:cast(?MODULE, {add_acceptor, Ref, Pid}).
-
-%% @doc Send a message to all acceptors of the given listener.
--spec send_to_acceptors(any(), any()) -> ok.
-send_to_acceptors(Ref, Msg) ->
- Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2),
- _ = [Pid ! Msg || Pid <- Acceptors],
- ok.
+%% @doc Return the connection supervisor used by specific listener.
+-spec get_connections_sup(any()) -> pid().
+get_connections_sup(Ref) ->
+ ets:lookup_element(?TAB, {conns_sup, Ref}, 2).
-%% @doc Add a connection to the connection pool.
-%%
-%% Also return the number of connections in the pool after this operation.
--spec add_connection(pid()) -> non_neg_integer().
-add_connection(ListenerPid) ->
- ets:update_counter(?TAB, {connections, ListenerPid}, 1).
+%% @private
+-spec set_port(any(), inet:port_number()) -> ok.
+set_port(Ref, Port) ->
+ gen_server:call(?MODULE, {set_port, Ref, Port}).
+
+%% @doc Return the listener's port.
+-spec get_port(any()) -> inet:port_number().
+get_port(Ref) ->
+ ets:lookup_element(?TAB, {port, Ref}, 2).
+
+%% @doc Set the max number of connections allowed concurrently.
+-spec set_max_connections(any(), ranch:max_conns()) -> ok.
+set_max_connections(Ref, MaxConnections) ->
+ gen_server:call(?MODULE, {set_max_conns, Ref, MaxConnections}).
+
+%% @doc Return the max number of connections allowed concurrently.
+-spec get_max_connections(any()) -> ranch:max_conns().
+get_max_connections(Ref) ->
+ ets:lookup_element(?TAB, {max_conns, Ref}, 2).
+
+%% @doc Upgrade the protocol options.
+-spec set_protocol_options(any(), any()) -> ok.
+set_protocol_options(Ref, ProtoOpts) ->
+ gen_server:call(?MODULE, {set_opts, Ref, ProtoOpts}).
+
+%% @doc Return the current protocol options.
+-spec get_protocol_options(any()) -> any().
+get_protocol_options(Ref) ->
+ ets:lookup_element(?TAB, {opts, Ref}, 2).
%% @doc Count the number of connections in the connection pool.
--spec count_connections(pid()) -> non_neg_integer().
-count_connections(ListenerPid) ->
- try
- ets:update_counter(?TAB, {connections, ListenerPid}, 0)
- catch
- error:badarg -> % Max conns = infinity
- 0
- end.
-
-%% @doc Remove a connection from the connection pool.
-%%
-%% Also return the number of connections in the pool after this operation.
--spec remove_connection(pid()) -> non_neg_integer().
-remove_connection(ListenerPid) ->
- ets:update_counter(?TAB, {connections, ListenerPid}, -1).
-
-
-%% @doc Add a connections counter to the connection pool
-%%
-%% Should only be used by ranch listeners when settings regarding the max
-%% number of connections change.
-add_connections_counter(Pid) ->
- true = ets:insert_new(?TAB, {{connections, Pid}, 0}).
-
-%% @doc remove a connections counter from the connection pool
-%%
-%% Should only be used by ranch listeners when settings regarding the max
-%% number of connections change.
-remove_connections_counter(Pid) ->
- true = ets:delete(?TAB, {connections, Pid}).
+-spec count_connections(any()) -> non_neg_integer().
+count_connections(Ref) ->
+ ranch_conns_sup:active_connections(get_connections_sup(Ref)).
%% gen_server.
@@ -133,32 +117,42 @@ init([]) ->
{ok, #state{}}.
%% @private
+handle_call({set_new_listener_opts, Ref, MaxConns, Opts}, _, State) ->
+ ets:insert(?TAB, {{max_conns, Ref}, MaxConns}),
+ ets:insert(?TAB, {{opts, Ref}, Opts}),
+ {reply, ok, State};
+handle_call({set_connections_sup, Ref, Pid}, _,
+ State=#state{monitors=Monitors}) ->
+ true = ets:insert_new(?TAB, {{conns_sup, Ref}, Pid}),
+ MonitorRef = erlang:monitor(process, Pid),
+ {reply, ok, State#state{
+ monitors=[{{MonitorRef, Pid}, Ref}|Monitors]}};
+handle_call({set_port, Ref, Port}, _, State) ->
+ true = ets:insert(?TAB, {{port, Ref}, Port}),
+ {reply, ok, State};
+handle_call({set_max_conns, Ref, MaxConns}, _, State) ->
+ ets:insert(?TAB, {{max_conns, Ref}, MaxConns}),
+ ConnsSup = get_connections_sup(Ref),
+ ConnsSup ! {set_max_conns, MaxConns},
+ {reply, ok, State};
+handle_call({set_opts, Ref, Opts}, _, State) ->
+ ets:insert(?TAB, {{opts, Ref}, Opts}),
+ ConnsSup = get_connections_sup(Ref),
+ ConnsSup ! {set_opts, Opts},
+ {reply, ok, State};
handle_call(_Request, _From, State) ->
{reply, ignore, State}.
%% @private
-handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) ->
- true = ets:insert_new(?TAB, {{acceptors, Ref}, []}),
- MonitorRef = erlang:monitor(process, Pid),
- {noreply, State#state{
- monitors=[{{MonitorRef, Pid}, {listener, Ref}}|Monitors]}};
-handle_cast({add_acceptor, Ref, Pid}, State=#state{monitors=Monitors}) ->
- MonitorRef = erlang:monitor(process, Pid),
- Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2),
- true = ets:insert(?TAB, {{acceptors, Ref}, [Pid|Acceptors]}),
- {noreply, State#state{
- monitors=[{{MonitorRef, Pid}, {acceptors, Ref}}|Monitors]}};
-handle_cast({add_connection, Pid}, State) ->
- _ = erlang:monitor(process, Pid),
- {noreply, State};
handle_cast(_Request, State) ->
{noreply, State}.
%% @private
handle_info({'DOWN', MonitorRef, process, Pid, _},
State=#state{monitors=Monitors}) ->
- {_, Key} = lists:keyfind({MonitorRef, Pid}, 1, Monitors),
- Monitors2 = remove_process(Key, MonitorRef, Pid, Monitors),
+ {_, Ref} = lists:keyfind({MonitorRef, Pid}, 1, Monitors),
+ true = ets:delete(?TAB, {conns_sup, Ref}),
+ Monitors2 = lists:keydelete({MonitorRef, Pid}, 1, Monitors),
{noreply, State#state{monitors=Monitors2}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -170,22 +164,3 @@ terminate(_Reason, _State) ->
%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
-%% Internal.
-
--spec remove_process(key(), reference(), pid(), Monitors)
- -> Monitors when Monitors::monitors() .
-remove_process(Key = {listener, Ref}, MonitorRef, Pid, Monitors) ->
- true = ets:delete(?TAB, Key),
- true = ets:delete(?TAB, {acceptors, Ref}),
- remove_connections_counter(Pid),
- lists:keydelete({MonitorRef, Pid}, 1, Monitors);
-remove_process(Key = {acceptors, _}, MonitorRef, Pid, Monitors) ->
- try
- Acceptors = ets:lookup_element(?TAB, Key, 2),
- true = ets:update_element(?TAB, Key, {2, lists:delete(Pid, Acceptors)})
- catch
- error:_ ->
- ok
- end,
- lists:keydelete({MonitorRef, Pid}, 1, Monitors).
diff --git a/src/ranch_sup.erl b/src/ranch_sup.erl
index ad1c558..ddf2f69 100644
--- a/src/ranch_sup.erl
+++ b/src/ranch_sup.erl
@@ -34,7 +34,7 @@ start_link() ->
init([]) ->
ranch_server = ets:new(ranch_server, [
- ordered_set, public, named_table, {write_concurrency, true}]),
+ ordered_set, public, named_table]),
Procs = [
{ranch_server, {ranch_server, start_link, []},
permanent, 5000, worker, [ranch_server]}