diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ranch.erl | 63 | ||||
-rw-r--r-- | src/ranch_acceptor.erl | 94 | ||||
-rw-r--r-- | src/ranch_acceptors_sup.erl | 20 | ||||
-rw-r--r-- | src/ranch_conns_sup.erl | 172 | ||||
-rw-r--r-- | src/ranch_listener.erl | 179 | ||||
-rw-r--r-- | src/ranch_listener_sup.erl | 21 | ||||
-rw-r--r-- | src/ranch_protocol.erl | 2 | ||||
-rw-r--r-- | src/ranch_server.erl | 195 | ||||
-rw-r--r-- | src/ranch_sup.erl | 2 |
9 files changed, 316 insertions, 432 deletions
diff --git a/src/ranch.erl b/src/ranch.erl index 6f1b6fe..ea5f59b 100644 --- a/src/ranch.erl +++ b/src/ranch.erl @@ -19,6 +19,7 @@ -export([stop_listener/1]). -export([child_spec/6]). -export([accept_ack/1]). +-export([remove_connection/1]). -export([get_port/1]). -export([get_max_connections/1]). -export([set_max_connections/2]). @@ -67,17 +68,23 @@ start_listener(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) true -> Res = supervisor:start_child(ranch_sup, child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts)), - case proplists:get_value(socket, TransOpts) of - undefined -> - ok; - Socket -> - %% change the controlling process so the caller dying doesn't - %% close the port - ListenerPid = ranch_server:lookup_listener(Ref), + Socket = proplists:get_value(socket, TransOpts), + case Res of + {ok, Pid} when Socket =/= undefined -> + %% Give ownership of the socket to ranch_acceptors_sup + %% to make sure the socket stays open as long as the + %% listener is alive. If the socket closes however there + %% will be no way to recover because we don't know how + %% to open it again. + Children = supervisor:which_children(Pid), + {_, AcceptorsSup, _, _} + = lists:keyfind(ranch_acceptors_sup, 1, Children), %%% Note: the catch is here because SSL crashes when you change %%% the controlling process of a listen socket because of a bug. %%% The bug will be fixed in R16. - catch(Transport:controlling_process(Socket, ListenerPid)) + catch Transport:controlling_process(Socket, AcceptorsSup); + _ -> + ok end, Res end. @@ -90,7 +97,8 @@ start_listener(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) stop_listener(Ref) -> case supervisor:terminate_child(ranch_sup, {ranch_listener_sup, Ref}) of ok -> - supervisor:delete_child(ranch_sup, {ranch_listener_sup, Ref}); + _ = supervisor:delete_child(ranch_sup, {ranch_listener_sup, Ref}), + ranch_server:cleanup_listener_opts(Ref); {error, Reason} -> {error, Reason} end. @@ -115,36 +123,40 @@ child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) %% %% Effectively used to make sure the socket control has been given to %% the protocol process before starting to use it. --spec accept_ack(pid()) -> ok. -accept_ack(ListenerPid) -> - receive {shoot, ListenerPid} -> ok end. +-spec accept_ack(any()) -> ok. +accept_ack(Ref) -> + receive {shoot, Ref} -> ok end. + +%% @doc Remove the calling process' connection from the pool. +%% +%% Useful if you have long-lived connections that aren't taking up +%% resources and shouldn't be counted in the limited number of running +%% connections. +-spec remove_connection(any()) -> ok. +remove_connection(Ref) -> + ConnsSup = ranch_server:get_connections_sup(Ref), + ConnsSup ! {remove_connection, Ref}, + ok. %% @doc Return the listener's port. -spec get_port(any()) -> inet:port_number(). get_port(Ref) -> - ListenerPid = ranch_server:lookup_listener(Ref), - {ok, Port} = ranch_listener:get_port(ListenerPid), - Port. + ranch_server:get_port(Ref). %% @doc Return the max number of connections allowed concurrently. -spec get_max_connections(any()) -> max_conns(). get_max_connections(Ref) -> - ListenerPid = ranch_server:lookup_listener(Ref), - {ok, MaxConnections} = ranch_listener:get_max_connections(ListenerPid), - MaxConnections. + ranch_server:get_max_connections(Ref). %% @doc Set the max number of connections allowed concurrently. -spec set_max_connections(any(), max_conns()) -> ok. set_max_connections(Ref, MaxConnections) -> - ListenerPid = ranch_server:lookup_listener(Ref), - ok = ranch_listener:set_max_connections(ListenerPid, MaxConnections). + ranch_server:set_max_connections(Ref, MaxConnections). %% @doc Return the current protocol options for the given listener. -spec get_protocol_options(any()) -> any(). get_protocol_options(Ref) -> - ListenerPid = ranch_server:lookup_listener(Ref), - {ok, ProtoOpts} = ranch_listener:get_protocol_options(ListenerPid), - ProtoOpts. + ranch_server:get_protocol_options(Ref). %% @doc Upgrade the protocol options for the given listener. %% @@ -152,9 +164,8 @@ get_protocol_options(Ref) -> %% newly accepted connections receive the new protocol options. This has %% no effect on the currently opened connections. -spec set_protocol_options(any(), any()) -> ok. -set_protocol_options(Ref, ProtoOpts) -> - ListenerPid = ranch_server:lookup_listener(Ref), - ok = ranch_listener:set_protocol_options(ListenerPid, ProtoOpts). +set_protocol_options(Ref, Opts) -> + ranch_server:set_protocol_options(Ref, Opts). %% @doc Filter a list of options and remove all unwanted values. %% diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl index 63d24c8..44cf52d 100644 --- a/src/ranch_acceptor.erl +++ b/src/ranch_acceptor.erl @@ -16,89 +16,31 @@ -module(ranch_acceptor). %% API. --export([start_link/6]). +-export([start_link/3]). %% Internal. --export([init/7]). --export([loop/7]). +-export([loop/3]). %% API. --spec start_link(any(), inet:socket(), module(), module(), pid(), pid()) +-spec start_link(inet:socket(), module(), pid()) -> {ok, pid()}. -start_link(Ref, LSocket, Transport, Protocol, ListenerPid, ConnsSup) -> - {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid), - {ok, Opts} = ranch_listener:get_protocol_options(ListenerPid), - Pid = spawn_link(?MODULE, init, - [LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup]), - ok = ranch_server:add_acceptor(Ref, Pid), +start_link(LSocket, Transport, ConnsSup) -> + Pid = spawn_link(?MODULE, loop, [LSocket, Transport, ConnsSup]), {ok, Pid}. %% Internal. --spec init(inet:socket(), module(), module(), - non_neg_integer(), any(), pid(), pid()) -> no_return(). -init(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) -> - async_accept(LSocket, Transport), - loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup). - --spec loop(inet:socket(), module(), module(), - ranch:max_conns(), any(), pid(), pid()) -> no_return(). -loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) -> - receive - %% We couldn't accept the socket but it's safe to continue. - {accept, continue} -> - ?MODULE:init(LSocket, Transport, Protocol, - MaxConns, Opts, ListenerPid, ConnsSup); - %% Found my sockets! - {accept, CSocket} -> - {ok, ConnPid} = supervisor:start_child(ConnsSup, - [ListenerPid, CSocket, Transport, Protocol, Opts]), - Transport:controlling_process(CSocket, ConnPid), - ConnPid ! {shoot, ListenerPid}, - {ok, MaxConns2} = case MaxConns of - infinity -> - {ok, infinity}; - _ -> - NbConns = ranch_listener:add_connection(ListenerPid, ConnPid), - maybe_wait(ListenerPid, MaxConns, NbConns) - end, - ?MODULE:init(LSocket, Transport, Protocol, - MaxConns2, Opts, ListenerPid, ConnsSup); - %% Upgrade the max number of connections allowed concurrently. - {set_max_conns, MaxConns2} -> - ?MODULE:loop(LSocket, Transport, Protocol, - MaxConns2, Opts, ListenerPid, ConnsSup); - %% Upgrade the protocol options. - {set_opts, Opts2} -> - ?MODULE:loop(LSocket, Transport, Protocol, - MaxConns, Opts2, ListenerPid, ConnsSup) - end. - --spec maybe_wait(pid(), MaxConns, non_neg_integer()) - -> {ok, MaxConns} when MaxConns::ranch:max_conns(). -maybe_wait(_, MaxConns, NbConns) when MaxConns > NbConns -> - {ok, MaxConns}; -maybe_wait(ListenerPid, MaxConns, NbConns) -> - receive - {set_max_conns, MaxConns2} -> - maybe_wait(ListenerPid, MaxConns2, NbConns) - after 0 -> - NbConns2 = ranch_server:count_connections(ListenerPid), - maybe_wait(ListenerPid, MaxConns, NbConns2) - end. - --spec async_accept(inet:socket(), module()) -> ok. -async_accept(LSocket, Transport) -> - AcceptorPid = self(), - _ = spawn_link(fun() -> - case Transport:accept(LSocket, infinity) of - {ok, CSocket} -> - Transport:controlling_process(CSocket, AcceptorPid), - AcceptorPid ! {accept, CSocket}; - %% We want to crash if the listening socket got closed. - {error, Reason} when Reason =/= closed -> - AcceptorPid ! {accept, continue} - end - end), - ok. +-spec loop(inet:socket(), module(), pid()) -> no_return(). +loop(LSocket, Transport, ConnsSup) -> + _ = case Transport:accept(LSocket, infinity) of + {ok, CSocket} -> + Transport:controlling_process(CSocket, ConnsSup), + %% This call will not return until process has been started + %% AND we are below the maximum number of connections. + ranch_conns_sup:start_protocol(ConnsSup, CSocket); + %% We want to crash if the listening socket got closed. + {error, Reason} when Reason =/= closed -> + ok + end, + ?MODULE:loop(LSocket, Transport, ConnsSup). diff --git a/src/ranch_acceptors_sup.erl b/src/ranch_acceptors_sup.erl index f1908a6..18574fa 100644 --- a/src/ranch_acceptors_sup.erl +++ b/src/ranch_acceptors_sup.erl @@ -17,24 +17,22 @@ -behaviour(supervisor). %% API. --export([start_link/5]). +-export([start_link/4]). %% supervisor. -export([init/1]). %% API. --spec start_link(any(), non_neg_integer(), module(), any(), - module()) -> {ok, pid()}. -start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol) -> - supervisor:start_link(?MODULE, [Ref, NbAcceptors, Transport, TransOpts, - Protocol]). +-spec start_link(any(), non_neg_integer(), module(), any()) + -> {ok, pid()}. +start_link(Ref, NbAcceptors, Transport, TransOpts) -> + supervisor:start_link(?MODULE, [Ref, NbAcceptors, Transport, TransOpts]). %% supervisor. -init([Ref, NbAcceptors, Transport, TransOpts, Protocol]) -> - ListenerPid = ranch_server:lookup_listener(Ref), - ConnsPid = ranch_server:lookup_connections_sup(Ref), +init([Ref, NbAcceptors, Transport, TransOpts]) -> + ConnsSup = ranch_server:get_connections_sup(Ref), LSocket = case proplists:get_value(socket, TransOpts) of undefined -> {ok, Socket} = Transport:listen(TransOpts), @@ -43,10 +41,10 @@ init([Ref, NbAcceptors, Transport, TransOpts, Protocol]) -> Socket end, {ok, {_, Port}} = Transport:sockname(LSocket), - ranch_listener:set_port(ListenerPid, Port), + ranch_server:set_port(Ref, Port), Procs = [ {{acceptor, self(), N}, {ranch_acceptor, start_link, [ - Ref, LSocket, Transport, Protocol, ListenerPid, ConnsPid + LSocket, Transport, ConnsSup ]}, permanent, brutal_kill, worker, []} || N <- lists:seq(1, NbAcceptors)], {ok, {{one_for_one, 10, 10}, Procs}}. diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl index 3cc09be..eaa4d00 100644 --- a/src/ranch_conns_sup.erl +++ b/src/ranch_conns_sup.erl @@ -13,30 +13,170 @@ %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. %% @private +%% +%% Make sure to never reload this module outside a release upgrade, +%% as calling l(ranch_conns_sup) twice will kill the process and all +%% the currently open connections. -module(ranch_conns_sup). --behaviour(supervisor). %% API. --export([start_link/1]). --export([start_protocol/5]). +-export([start_link/3]). +-export([start_protocol/2]). +-export([active_connections/1]). + +%% Supervisor internals. +-export([init/4]). +-export([system_continue/3]). +-export([system_terminate/4]). +-export([system_code_change/4]). -%% supervisor. --export([init/1]). +-record(state, { + parent = undefined :: pid(), + ref :: any(), + transport = undefined :: module(), + protocol = undefined :: module(), + opts :: any(), + max_conns = undefined :: non_neg_integer() | infinity +}). %% API. --spec start_link(any()) -> {ok, pid()}. -start_link(Ref) -> - supervisor:start_link(?MODULE, Ref). +-spec start_link(any(), module(), module()) -> {ok, pid()}. +start_link(Ref, Transport, Protocol) -> + proc_lib:start_link(?MODULE, init, [self(), Ref, Transport, Protocol]). + +%% We can safely assume we are on the same node as the supervisor. +%% +%% We can also safely avoid having a monitor and a timeout here +%% because only three things can happen: +%% * The supervisor died; rest_for_one strategy killed all acceptors +%% so this very calling process is going to di-- +%% * There's too many connections, the supervisor will resume the +%% acceptor only when we get below the limit again. +%% * The supervisor is overloaded, there's either too many acceptors +%% or the max_connections limit is too large. It's better if we +%% don't keep accepting connections because this leaves +%% more room for the situation to be resolved. +%% +%% We do not need the reply, we only need the ok from the supervisor +%% to continue. The supervisor sends its own pid when the acceptor can +%% continue. +-spec start_protocol(pid(), inet:socket()) -> ok. +start_protocol(SupPid, Socket) -> + SupPid ! {?MODULE, start_protocol, self(), Socket}, + receive SupPid -> ok end. --spec start_protocol(pid(), inet:socket(), module(), module(), any()) - -> {ok, pid()}. -start_protocol(ListenerPid, Socket, Transport, Protocol, Opts) -> - Protocol:start_link(ListenerPid, Socket, Transport, Opts). +%% We can't make the above assumptions here. This function might be +%% called from anywhere. +-spec active_connections(pid()) -> non_neg_integer(). +active_connections(SupPid) -> + Tag = erlang:monitor(process, SupPid), + erlang:send(SupPid, {?MODULE, active_connections, self(), Tag}, + [noconnect]), + receive + {Tag, Ret} -> + erlang:demonitor(Tag, [flush]), + Ret; + {'DOWN', Tag, _, _, noconnection} -> + exit({nodedown, node(SupPid)}); + {'DOWN', Tag, _, _, Reason} -> + exit(Reason) + after 5000 -> + erlang:demonitor(Tag, [flush]), + exit(timeout) + end. -%% supervisor. +%% Supervisor internals. -init(Ref) -> +-spec init(pid(), any(), module(), module()) -> no_return(). +init(Parent, Ref, Transport, Protocol) -> + process_flag(trap_exit, true), ok = ranch_server:set_connections_sup(Ref, self()), - {ok, {{simple_one_for_one, 0, 1}, [{?MODULE, {?MODULE, start_protocol, []}, - temporary, brutal_kill, worker, [?MODULE]}]}}. + MaxConns = ranch_server:get_max_connections(Ref), + Opts = ranch_server:get_protocol_options(Ref), + ok = proc_lib:init_ack(Parent, {ok, self()}), + loop(#state{parent=Parent, ref=Ref, transport=Transport, + protocol=Protocol, opts=Opts, max_conns=MaxConns}, 0, 0, []). + +loop(State=#state{parent=Parent, ref=Ref, + transport=Transport, protocol=Protocol, opts=Opts, + max_conns=MaxConns}, CurConns, NbChildren, Sleepers) -> + receive + {?MODULE, start_protocol, To, Socket} -> + case Protocol:start_link(Ref, Socket, Transport, Opts) of + {ok, Pid} -> + Transport:controlling_process(Socket, Pid), + Pid ! {shoot, Ref}, + put(Pid, true), + CurConns2 = CurConns + 1, + if CurConns2 < MaxConns -> + To ! self(), + loop(State, CurConns2, NbChildren + 1, + Sleepers); + true -> + loop(State, CurConns2, NbChildren + 1, + [To|Sleepers]) + end; + _ -> + To ! self(), + loop(State, CurConns, NbChildren, Sleepers) + end; + {?MODULE, active_connections, To, Tag} -> + To ! {Tag, CurConns}, + loop(State, CurConns, NbChildren, Sleepers); + %% Remove a connection from the count of connections. + {remove_connection, Ref} -> + loop(State, CurConns - 1, NbChildren, Sleepers); + %% Upgrade the max number of connections allowed concurrently. + %% We resume all sleeping acceptors if this number increases. + {set_max_conns, MaxConns2} when MaxConns2 > MaxConns -> + _ = [To ! self() || To <- Sleepers], + loop(State#state{max_conns=MaxConns2}, + CurConns, NbChildren, []); + {set_max_conns, MaxConns2} -> + loop(State#state{max_conns=MaxConns2}, + CurConns, NbChildren, Sleepers); + %% Upgrade the protocol options. + {set_opts, Opts2} -> + loop(State#state{opts=Opts2}, + CurConns, NbChildren, Sleepers); + {'EXIT', Parent, Reason} -> + exit(Reason); + {'EXIT', Pid, _} when Sleepers =:= [] -> + erase(Pid), + loop(State, CurConns - 1, NbChildren - 1, Sleepers); + %% Resume a sleeping acceptor if needed. + {'EXIT', Pid, _} -> + erase(Pid), + [To|Sleepers2] = Sleepers, + To ! self(), + loop(State, CurConns - 1, NbChildren - 1, Sleepers2); + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {State, CurConns, NbChildren, Sleepers}); + %% Calls from the supervisor module. + {'$gen_call', {To, Tag}, which_children} -> + Pids = get_keys(true), + Children = [{Protocol, Pid, worker, [Protocol]} + || Pid <- Pids], + To ! {Tag, Children}, + loop(State, CurConns, NbChildren, Sleepers); + {'$gen_call', {To, Tag}, count_children} -> + Counts = [{specs, 1}, {active, NbChildren}, + {supervisors, 0}, {workers, NbChildren}], + To ! {Tag, Counts}, + loop(State, CurConns, NbChildren, Sleepers); + {'$gen_call', {To, Tag}, _} -> + To ! {Tag, {error, ?MODULE}}, + loop(State, CurConns, NbChildren, Sleepers) + end. + +system_continue(_, _, {State, CurConns, NbChildren, Sleepers}) -> + loop(State, CurConns, NbChildren, Sleepers). + +-spec system_terminate(any(), _, _, _) -> no_return(). +system_terminate(Reason, _, _, _) -> + exit(Reason). + +system_code_change(Misc, _, _, _) -> + {ok, Misc}. diff --git a/src/ranch_listener.erl b/src/ranch_listener.erl deleted file mode 100644 index 408fbcd..0000000 --- a/src/ranch_listener.erl +++ /dev/null @@ -1,179 +0,0 @@ -%% Copyright (c) 2011-2012, Loïc Hoguin <[email protected]> -%% -%% Permission to use, copy, modify, and/or distribute this software for any -%% purpose with or without fee is hereby granted, provided that the above -%% copyright notice and this permission notice appear in all copies. -%% -%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES -%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR -%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES -%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN -%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF -%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -%% @doc Public API for managing listeners. --module(ranch_listener). --behaviour(gen_server). - -%% API. --export([start_link/3]). --export([stop/1]). --export([add_connection/2]). --export([remove_connection/1]). --export([get_port/1]). --export([set_port/2]). --export([get_max_connections/1]). --export([set_max_connections/2]). --export([get_protocol_options/1]). --export([set_protocol_options/2]). - -%% gen_server. --export([init/1]). --export([handle_call/3]). --export([handle_cast/2]). --export([handle_info/2]). --export([terminate/2]). --export([code_change/3]). - --record(state, { - ref :: any(), - max_conns = undefined :: ranch:max_conns(), - port = undefined :: undefined | inet:port_number(), - proto_opts = undefined :: any(), - rm_diff = 0 :: non_neg_integer() -}). - -%% API. - -%% @private --spec start_link(any(), non_neg_integer(), any()) -> {ok, pid()}. -start_link(Ref, MaxConns, ProtoOpts) -> - gen_server:start_link(?MODULE, [Ref, MaxConns, ProtoOpts], []). - -%% @private --spec stop(pid()) -> stopped. -stop(ServerPid) -> - gen_server:call(ServerPid, stop). - -%% @doc Add a connection to the listener's pool. --spec add_connection(pid(), pid()) -> non_neg_integer(). -add_connection(ServerPid, ConnPid) -> - ok = gen_server:cast(ServerPid, {add_connection, ConnPid}), - ranch_server:add_connection(ServerPid). - -%% @doc Remove this process' connection from the pool. -%% -%% Useful if you have long-lived connections that aren't taking up -%% resources and shouldn't be counted in the limited number of running -%% connections. --spec remove_connection(pid()) -> non_neg_integer(). -remove_connection(ServerPid) -> - try - Count = ranch_server:remove_connection(ServerPid), - ok = gen_server:cast(ServerPid, remove_connection), - Count - catch - error:badarg -> % Max conns = infinity - 0 - end. - -%% @doc Return the listener's port. --spec get_port(pid()) -> {ok, inet:port_number()}. -get_port(ServerPid) -> - gen_server:call(ServerPid, get_port). - -%% @private --spec set_port(pid(), inet:port_number()) -> ok. -set_port(ServerPid, Port) -> - gen_server:cast(ServerPid, {set_port, Port}). - -%% @doc Return the max number of connections allowed concurrently. --spec get_max_connections(pid()) -> {ok, ranch:max_conns()}. -get_max_connections(ServerPid) -> - gen_server:call(ServerPid, get_max_connections). - -%% @doc Set the max number of connections allowed concurrently. --spec set_max_connections(pid(), ranch:max_conns()) -> ok. -set_max_connections(ServerPid, MaxConnections) -> - gen_server:call(ServerPid, {set_max_connections, MaxConnections}). - -%% @doc Return the current protocol options. --spec get_protocol_options(pid()) -> {ok, any()}. -get_protocol_options(ServerPid) -> - gen_server:call(ServerPid, get_protocol_options). - -%% @doc Upgrade the protocol options. --spec set_protocol_options(pid(), any()) -> ok. -set_protocol_options(ServerPid, ProtoOpts) -> - gen_server:call(ServerPid, {set_protocol_options, ProtoOpts}). - -%% gen_server. - -%% @private -init([Ref, infinity, ProtoOpts]) -> - ok = ranch_server:insert_listener(Ref, self()), - {ok, #state{ref=Ref, max_conns=infinity, proto_opts=ProtoOpts}}; -init([Ref, MaxConns, ProtoOpts]) -> - ok = ranch_server:insert_listener(Ref, self()), - ranch_server:add_connections_counter(self()), - {ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}. - -%% @private -handle_call(get_port, _From, State=#state{port=Port}) -> - {reply, {ok, Port}, State}; -handle_call(get_max_connections, _From, State=#state{max_conns=MaxConns}) -> - {reply, {ok, MaxConns}, State}; -handle_call({set_max_connections, MaxConnections}, _From, - State=#state{ref=Ref, max_conns=CurrMax, rm_diff=CurrDiff}) -> - RmDiff = case {MaxConnections, CurrMax} of - {infinity, _} -> % moving to infinity, delete connection key - ranch_server:remove_connections_counter(self()), - 0; - {_, infinity} -> % moving away from infinity, create connection key - ranch_server:add_connections_counter(self()), - CurrDiff; - {_, _} -> % stay current - CurrDiff - end, - ranch_server:send_to_acceptors(Ref, {set_max_conns, MaxConnections}), - {reply, ok, State#state{max_conns=MaxConnections, rm_diff=RmDiff}}; -handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) -> - {reply, {ok, ProtoOpts}, State}; -handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{ref=Ref}) -> - ranch_server:send_to_acceptors(Ref, {set_opts, ProtoOpts}), - {reply, ok, State#state{proto_opts=ProtoOpts}}; -handle_call(stop, _From, State) -> - {stop, normal, stopped, State}; -handle_call(_, _From, State) -> - {reply, ignored, State}. - -%% @private -handle_cast({add_connection, ConnPid}, State) -> - _ = erlang:monitor(process, ConnPid), - {noreply, State}; -handle_cast(remove_connection, State=#state{max_conns=infinity}) -> - {noreply, State}; -handle_cast(remove_connection, State=#state{rm_diff=RmDiff}) -> - {noreply, State#state{rm_diff=RmDiff + 1}}; -handle_cast({set_port, Port}, State) -> - {noreply, State#state{port=Port}}; -handle_cast(_Msg, State) -> - {noreply, State}. - -%% @private -handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=0}) -> - _ = ranch_server:remove_connection(self()), - {noreply, State}; -handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=RmDiff}) -> - {noreply, State#state{rm_diff=RmDiff - 1}}; -handle_info(_Info, State) -> - {noreply, State}. - -%% @private -terminate(_Reason, _State) -> - ok. - -%% @private -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl index 0147cf2..9f19123 100644 --- a/src/ranch_listener_sup.erl +++ b/src/ranch_listener_sup.erl @@ -28,25 +28,22 @@ -> {ok, pid()}. start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> MaxConns = proplists:get_value(max_connections, TransOpts, 1024), + ranch_server:set_new_listener_opts(Ref, MaxConns, ProtoOpts), supervisor:start_link(?MODULE, { - Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts - }). + Ref, NbAcceptors, Transport, TransOpts, Protocol + }). %% supervisor. -init({Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts}) -> +init({Ref, NbAcceptors, Transport, TransOpts, Protocol}) -> ChildSpecs = [ - %% listener - {ranch_listener, {ranch_listener, start_link, - [Ref, MaxConns, ProtoOpts]}, - permanent, 5000, worker, [ranch_listener]}, %% conns_sup - {ranch_conns_sup, {ranch_conns_sup, start_link, [Ref]}, - permanent, infinity, supervisor, [ranch_conns_sup]}, + {ranch_conns_sup, {ranch_conns_sup, start_link, + [Ref, Transport, Protocol]}, + permanent, infinity, supervisor, [ranch_conns_sup]}, %% acceptors_sup {ranch_acceptors_sup, {ranch_acceptors_sup, start_link, - [Ref, NbAcceptors, Transport, TransOpts, Protocol] - }, permanent, infinity, supervisor, [ranch_acceptors_sup]} + [Ref, NbAcceptors, Transport, TransOpts] + }, permanent, infinity, supervisor, [ranch_acceptors_sup]} ], {ok, {{rest_for_one, 10, 10}, ChildSpecs}}. - diff --git a/src/ranch_protocol.erl b/src/ranch_protocol.erl index 38a56c8..c788547 100644 --- a/src/ranch_protocol.erl +++ b/src/ranch_protocol.erl @@ -17,7 +17,7 @@ %% Start a new connection process for the given socket. -callback start_link( - ListenerPid::pid(), + Ref::any(), Socket::any(), Transport::module(), ProtocolOptions::any()) diff --git a/src/ranch_server.erl b/src/ranch_server.erl index c6d7c19..d827ae2 100644 --- a/src/ranch_server.erl +++ b/src/ranch_server.erl @@ -18,17 +18,17 @@ %% API. -export([start_link/0]). --export([insert_listener/2]). --export([lookup_listener/1]). +-export([set_new_listener_opts/3]). +-export([cleanup_listener_opts/1]). -export([set_connections_sup/2]). --export([lookup_connections_sup/1]). --export([add_acceptor/2]). --export([send_to_acceptors/2]). --export([add_connection/1]). +-export([get_connections_sup/1]). +-export([set_port/2]). +-export([get_port/1]). +-export([set_max_connections/2]). +-export([get_max_connections/1]). +-export([set_protocol_options/2]). +-export([get_protocol_options/1]). -export([count_connections/1]). --export([remove_connection/1]). --export([add_connections_counter/1]). --export([remove_connections_counter/1]). %% gen_server. -export([init/1]). @@ -40,8 +40,7 @@ -define(TAB, ?MODULE). --type key() :: {listener | acceptors, any()}. --type monitors() :: [{{reference(), pid()}, key()}]. +-type monitors() :: [{{reference(), pid()}, any()}]. -record(state, { monitors = [] :: monitors() }). @@ -53,78 +52,63 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -%% @doc Insert a listener into the database. --spec insert_listener(any(), pid()) -> ok. -insert_listener(Ref, Pid) -> - true = ets:insert_new(?TAB, {{listener, Ref}, Pid, undefined}), - gen_server:cast(?MODULE, {insert_listener, Ref, Pid}). - -%% @doc Lookup a listener in the database. --spec lookup_listener(any()) -> pid(). -lookup_listener(Ref) -> - ets:lookup_element(?TAB, {listener, Ref}, 2). +%% @private +-spec set_new_listener_opts(any(), ranch:max_conns(), any()) -> ok. +set_new_listener_opts(Ref, MaxConns, Opts) -> + gen_server:call(?MODULE, {set_new_listener_opts, Ref, MaxConns, Opts}). + +%% @doc Cleanup listener options after it has been stopped. +-spec cleanup_listener_opts(any()) -> ok. +cleanup_listener_opts(Ref) -> + _ = ets:delete(?TAB, {port, Ref}), + _ = ets:delete(?TAB, {max_conns, Ref}), + _ = ets:delete(?TAB, {opts, Ref}), + ok. %% @doc Set a connection supervisor associated with specific listener. -spec set_connections_sup(any(), pid()) -> ok. set_connections_sup(Ref, Pid) -> - true = ets:update_element(?TAB, {listener, Ref}, {3, Pid}), - ok. + gen_server:call(?MODULE, {set_connections_sup, Ref, Pid}). -%% @doc Lookup a connection supervisor used by specific listener. --spec lookup_connections_sup(any()) -> pid() | undefined. -lookup_connections_sup(Ref) -> - ets:lookup_element(?TAB, {listener, Ref}, 3). - -%% @doc Add an acceptor for the given listener. --spec add_acceptor(any(), pid()) -> ok. -add_acceptor(Ref, Pid) -> - gen_server:cast(?MODULE, {add_acceptor, Ref, Pid}). - -%% @doc Send a message to all acceptors of the given listener. --spec send_to_acceptors(any(), any()) -> ok. -send_to_acceptors(Ref, Msg) -> - Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2), - _ = [Pid ! Msg || Pid <- Acceptors], - ok. +%% @doc Return the connection supervisor used by specific listener. +-spec get_connections_sup(any()) -> pid(). +get_connections_sup(Ref) -> + ets:lookup_element(?TAB, {conns_sup, Ref}, 2). -%% @doc Add a connection to the connection pool. -%% -%% Also return the number of connections in the pool after this operation. --spec add_connection(pid()) -> non_neg_integer(). -add_connection(ListenerPid) -> - ets:update_counter(?TAB, {connections, ListenerPid}, 1). +%% @private +-spec set_port(any(), inet:port_number()) -> ok. +set_port(Ref, Port) -> + gen_server:call(?MODULE, {set_port, Ref, Port}). + +%% @doc Return the listener's port. +-spec get_port(any()) -> inet:port_number(). +get_port(Ref) -> + ets:lookup_element(?TAB, {port, Ref}, 2). + +%% @doc Set the max number of connections allowed concurrently. +-spec set_max_connections(any(), ranch:max_conns()) -> ok. +set_max_connections(Ref, MaxConnections) -> + gen_server:call(?MODULE, {set_max_conns, Ref, MaxConnections}). + +%% @doc Return the max number of connections allowed concurrently. +-spec get_max_connections(any()) -> ranch:max_conns(). +get_max_connections(Ref) -> + ets:lookup_element(?TAB, {max_conns, Ref}, 2). + +%% @doc Upgrade the protocol options. +-spec set_protocol_options(any(), any()) -> ok. +set_protocol_options(Ref, ProtoOpts) -> + gen_server:call(?MODULE, {set_opts, Ref, ProtoOpts}). + +%% @doc Return the current protocol options. +-spec get_protocol_options(any()) -> any(). +get_protocol_options(Ref) -> + ets:lookup_element(?TAB, {opts, Ref}, 2). %% @doc Count the number of connections in the connection pool. --spec count_connections(pid()) -> non_neg_integer(). -count_connections(ListenerPid) -> - try - ets:update_counter(?TAB, {connections, ListenerPid}, 0) - catch - error:badarg -> % Max conns = infinity - 0 - end. - -%% @doc Remove a connection from the connection pool. -%% -%% Also return the number of connections in the pool after this operation. --spec remove_connection(pid()) -> non_neg_integer(). -remove_connection(ListenerPid) -> - ets:update_counter(?TAB, {connections, ListenerPid}, -1). - - -%% @doc Add a connections counter to the connection pool -%% -%% Should only be used by ranch listeners when settings regarding the max -%% number of connections change. -add_connections_counter(Pid) -> - true = ets:insert_new(?TAB, {{connections, Pid}, 0}). - -%% @doc remove a connections counter from the connection pool -%% -%% Should only be used by ranch listeners when settings regarding the max -%% number of connections change. -remove_connections_counter(Pid) -> - true = ets:delete(?TAB, {connections, Pid}). +-spec count_connections(any()) -> non_neg_integer(). +count_connections(Ref) -> + ranch_conns_sup:active_connections(get_connections_sup(Ref)). %% gen_server. @@ -133,32 +117,42 @@ init([]) -> {ok, #state{}}. %% @private +handle_call({set_new_listener_opts, Ref, MaxConns, Opts}, _, State) -> + ets:insert(?TAB, {{max_conns, Ref}, MaxConns}), + ets:insert(?TAB, {{opts, Ref}, Opts}), + {reply, ok, State}; +handle_call({set_connections_sup, Ref, Pid}, _, + State=#state{monitors=Monitors}) -> + true = ets:insert_new(?TAB, {{conns_sup, Ref}, Pid}), + MonitorRef = erlang:monitor(process, Pid), + {reply, ok, State#state{ + monitors=[{{MonitorRef, Pid}, Ref}|Monitors]}}; +handle_call({set_port, Ref, Port}, _, State) -> + true = ets:insert(?TAB, {{port, Ref}, Port}), + {reply, ok, State}; +handle_call({set_max_conns, Ref, MaxConns}, _, State) -> + ets:insert(?TAB, {{max_conns, Ref}, MaxConns}), + ConnsSup = get_connections_sup(Ref), + ConnsSup ! {set_max_conns, MaxConns}, + {reply, ok, State}; +handle_call({set_opts, Ref, Opts}, _, State) -> + ets:insert(?TAB, {{opts, Ref}, Opts}), + ConnsSup = get_connections_sup(Ref), + ConnsSup ! {set_opts, Opts}, + {reply, ok, State}; handle_call(_Request, _From, State) -> {reply, ignore, State}. %% @private -handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) -> - true = ets:insert_new(?TAB, {{acceptors, Ref}, []}), - MonitorRef = erlang:monitor(process, Pid), - {noreply, State#state{ - monitors=[{{MonitorRef, Pid}, {listener, Ref}}|Monitors]}}; -handle_cast({add_acceptor, Ref, Pid}, State=#state{monitors=Monitors}) -> - MonitorRef = erlang:monitor(process, Pid), - Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2), - true = ets:insert(?TAB, {{acceptors, Ref}, [Pid|Acceptors]}), - {noreply, State#state{ - monitors=[{{MonitorRef, Pid}, {acceptors, Ref}}|Monitors]}}; -handle_cast({add_connection, Pid}, State) -> - _ = erlang:monitor(process, Pid), - {noreply, State}; handle_cast(_Request, State) -> {noreply, State}. %% @private handle_info({'DOWN', MonitorRef, process, Pid, _}, State=#state{monitors=Monitors}) -> - {_, Key} = lists:keyfind({MonitorRef, Pid}, 1, Monitors), - Monitors2 = remove_process(Key, MonitorRef, Pid, Monitors), + {_, Ref} = lists:keyfind({MonitorRef, Pid}, 1, Monitors), + true = ets:delete(?TAB, {conns_sup, Ref}), + Monitors2 = lists:keydelete({MonitorRef, Pid}, 1, Monitors), {noreply, State#state{monitors=Monitors2}}; handle_info(_Info, State) -> {noreply, State}. @@ -170,22 +164,3 @@ terminate(_Reason, _State) -> %% @private code_change(_OldVsn, State, _Extra) -> {ok, State}. - -%% Internal. - --spec remove_process(key(), reference(), pid(), Monitors) - -> Monitors when Monitors::monitors() . -remove_process(Key = {listener, Ref}, MonitorRef, Pid, Monitors) -> - true = ets:delete(?TAB, Key), - true = ets:delete(?TAB, {acceptors, Ref}), - remove_connections_counter(Pid), - lists:keydelete({MonitorRef, Pid}, 1, Monitors); -remove_process(Key = {acceptors, _}, MonitorRef, Pid, Monitors) -> - try - Acceptors = ets:lookup_element(?TAB, Key, 2), - true = ets:update_element(?TAB, Key, {2, lists:delete(Pid, Acceptors)}) - catch - error:_ -> - ok - end, - lists:keydelete({MonitorRef, Pid}, 1, Monitors). diff --git a/src/ranch_sup.erl b/src/ranch_sup.erl index ad1c558..ddf2f69 100644 --- a/src/ranch_sup.erl +++ b/src/ranch_sup.erl @@ -34,7 +34,7 @@ start_link() -> init([]) -> ranch_server = ets:new(ranch_server, [ - ordered_set, public, named_table, {write_concurrency, true}]), + ordered_set, public, named_table]), Procs = [ {ranch_server, {ranch_server, start_link, []}, permanent, 5000, worker, [ranch_server]} |