diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ranch.erl | 63 | ||||
-rw-r--r-- | src/ranch_acceptors_sup.erl | 5 | ||||
-rw-r--r-- | src/ranch_conns_sup.erl | 17 | ||||
-rw-r--r-- | src/ranch_listener.erl | 142 | ||||
-rw-r--r-- | src/ranch_listener_sup.erl | 9 | ||||
-rw-r--r-- | src/ranch_protocol.erl | 2 | ||||
-rw-r--r-- | src/ranch_server.erl | 111 |
7 files changed, 131 insertions, 218 deletions
diff --git a/src/ranch.erl b/src/ranch.erl index 6f1b6fe..ea5f59b 100644 --- a/src/ranch.erl +++ b/src/ranch.erl @@ -19,6 +19,7 @@ -export([stop_listener/1]). -export([child_spec/6]). -export([accept_ack/1]). +-export([remove_connection/1]). -export([get_port/1]). -export([get_max_connections/1]). -export([set_max_connections/2]). @@ -67,17 +68,23 @@ start_listener(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) true -> Res = supervisor:start_child(ranch_sup, child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts)), - case proplists:get_value(socket, TransOpts) of - undefined -> - ok; - Socket -> - %% change the controlling process so the caller dying doesn't - %% close the port - ListenerPid = ranch_server:lookup_listener(Ref), + Socket = proplists:get_value(socket, TransOpts), + case Res of + {ok, Pid} when Socket =/= undefined -> + %% Give ownership of the socket to ranch_acceptors_sup + %% to make sure the socket stays open as long as the + %% listener is alive. If the socket closes however there + %% will be no way to recover because we don't know how + %% to open it again. + Children = supervisor:which_children(Pid), + {_, AcceptorsSup, _, _} + = lists:keyfind(ranch_acceptors_sup, 1, Children), %%% Note: the catch is here because SSL crashes when you change %%% the controlling process of a listen socket because of a bug. %%% The bug will be fixed in R16. - catch(Transport:controlling_process(Socket, ListenerPid)) + catch Transport:controlling_process(Socket, AcceptorsSup); + _ -> + ok end, Res end. @@ -90,7 +97,8 @@ start_listener(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) stop_listener(Ref) -> case supervisor:terminate_child(ranch_sup, {ranch_listener_sup, Ref}) of ok -> - supervisor:delete_child(ranch_sup, {ranch_listener_sup, Ref}); + _ = supervisor:delete_child(ranch_sup, {ranch_listener_sup, Ref}), + ranch_server:cleanup_listener_opts(Ref); {error, Reason} -> {error, Reason} end. @@ -115,36 +123,40 @@ child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) %% %% Effectively used to make sure the socket control has been given to %% the protocol process before starting to use it. --spec accept_ack(pid()) -> ok. -accept_ack(ListenerPid) -> - receive {shoot, ListenerPid} -> ok end. +-spec accept_ack(any()) -> ok. +accept_ack(Ref) -> + receive {shoot, Ref} -> ok end. + +%% @doc Remove the calling process' connection from the pool. +%% +%% Useful if you have long-lived connections that aren't taking up +%% resources and shouldn't be counted in the limited number of running +%% connections. +-spec remove_connection(any()) -> ok. +remove_connection(Ref) -> + ConnsSup = ranch_server:get_connections_sup(Ref), + ConnsSup ! {remove_connection, Ref}, + ok. %% @doc Return the listener's port. -spec get_port(any()) -> inet:port_number(). get_port(Ref) -> - ListenerPid = ranch_server:lookup_listener(Ref), - {ok, Port} = ranch_listener:get_port(ListenerPid), - Port. + ranch_server:get_port(Ref). %% @doc Return the max number of connections allowed concurrently. -spec get_max_connections(any()) -> max_conns(). get_max_connections(Ref) -> - ListenerPid = ranch_server:lookup_listener(Ref), - {ok, MaxConnections} = ranch_listener:get_max_connections(ListenerPid), - MaxConnections. + ranch_server:get_max_connections(Ref). %% @doc Set the max number of connections allowed concurrently. -spec set_max_connections(any(), max_conns()) -> ok. set_max_connections(Ref, MaxConnections) -> - ListenerPid = ranch_server:lookup_listener(Ref), - ok = ranch_listener:set_max_connections(ListenerPid, MaxConnections). + ranch_server:set_max_connections(Ref, MaxConnections). %% @doc Return the current protocol options for the given listener. -spec get_protocol_options(any()) -> any(). get_protocol_options(Ref) -> - ListenerPid = ranch_server:lookup_listener(Ref), - {ok, ProtoOpts} = ranch_listener:get_protocol_options(ListenerPid), - ProtoOpts. + ranch_server:get_protocol_options(Ref). %% @doc Upgrade the protocol options for the given listener. %% @@ -152,9 +164,8 @@ get_protocol_options(Ref) -> %% newly accepted connections receive the new protocol options. This has %% no effect on the currently opened connections. -spec set_protocol_options(any(), any()) -> ok. -set_protocol_options(Ref, ProtoOpts) -> - ListenerPid = ranch_server:lookup_listener(Ref), - ok = ranch_listener:set_protocol_options(ListenerPid, ProtoOpts). +set_protocol_options(Ref, Opts) -> + ranch_server:set_protocol_options(Ref, Opts). %% @doc Filter a list of options and remove all unwanted values. %% diff --git a/src/ranch_acceptors_sup.erl b/src/ranch_acceptors_sup.erl index 97df74f..18574fa 100644 --- a/src/ranch_acceptors_sup.erl +++ b/src/ranch_acceptors_sup.erl @@ -32,8 +32,7 @@ start_link(Ref, NbAcceptors, Transport, TransOpts) -> %% supervisor. init([Ref, NbAcceptors, Transport, TransOpts]) -> - ListenerPid = ranch_server:lookup_listener(Ref), - ConnsSup = ranch_server:lookup_connections_sup(Ref), + ConnsSup = ranch_server:get_connections_sup(Ref), LSocket = case proplists:get_value(socket, TransOpts) of undefined -> {ok, Socket} = Transport:listen(TransOpts), @@ -42,7 +41,7 @@ init([Ref, NbAcceptors, Transport, TransOpts]) -> Socket end, {ok, {_, Port}} = Transport:sockname(LSocket), - ranch_listener:set_port(ListenerPid, Port), + ranch_server:set_port(Ref, Port), Procs = [ {{acceptor, self(), N}, {ranch_acceptor, start_link, [ LSocket, Transport, ConnsSup diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl index 29b550f..eaa4d00 100644 --- a/src/ranch_conns_sup.erl +++ b/src/ranch_conns_sup.erl @@ -32,7 +32,7 @@ -record(state, { parent = undefined :: pid(), - listener_pid = undefined :: pid(), + ref :: any(), transport = undefined :: module(), protocol = undefined :: module(), opts :: any(), @@ -92,22 +92,21 @@ active_connections(SupPid) -> init(Parent, Ref, Transport, Protocol) -> process_flag(trap_exit, true), ok = ranch_server:set_connections_sup(Ref, self()), - ListenerPid = ranch_server:lookup_listener(Ref), - {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid), - {ok, Opts} = ranch_listener:get_protocol_options(ListenerPid), + MaxConns = ranch_server:get_max_connections(Ref), + Opts = ranch_server:get_protocol_options(Ref), ok = proc_lib:init_ack(Parent, {ok, self()}), - loop(#state{parent=Parent, listener_pid=ListenerPid, transport=Transport, + loop(#state{parent=Parent, ref=Ref, transport=Transport, protocol=Protocol, opts=Opts, max_conns=MaxConns}, 0, 0, []). -loop(State=#state{parent=Parent, listener_pid=ListenerPid, +loop(State=#state{parent=Parent, ref=Ref, transport=Transport, protocol=Protocol, opts=Opts, max_conns=MaxConns}, CurConns, NbChildren, Sleepers) -> receive {?MODULE, start_protocol, To, Socket} -> - case Protocol:start_link(ListenerPid, Socket, Transport, Opts) of + case Protocol:start_link(Ref, Socket, Transport, Opts) of {ok, Pid} -> Transport:controlling_process(Socket, Pid), - Pid ! {shoot, ListenerPid}, + Pid ! {shoot, Ref}, put(Pid, true), CurConns2 = CurConns + 1, if CurConns2 < MaxConns -> @@ -126,7 +125,7 @@ loop(State=#state{parent=Parent, listener_pid=ListenerPid, To ! {Tag, CurConns}, loop(State, CurConns, NbChildren, Sleepers); %% Remove a connection from the count of connections. - {remove_connection, ListenerPid} -> + {remove_connection, Ref} -> loop(State, CurConns - 1, NbChildren, Sleepers); %% Upgrade the max number of connections allowed concurrently. %% We resume all sleeping acceptors if this number increases. diff --git a/src/ranch_listener.erl b/src/ranch_listener.erl deleted file mode 100644 index a53cc75..0000000 --- a/src/ranch_listener.erl +++ /dev/null @@ -1,142 +0,0 @@ -%% Copyright (c) 2011-2012, Loïc Hoguin <[email protected]> -%% -%% Permission to use, copy, modify, and/or distribute this software for any -%% purpose with or without fee is hereby granted, provided that the above -%% copyright notice and this permission notice appear in all copies. -%% -%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES -%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR -%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES -%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN -%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF -%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -%% @doc Public API for managing listeners. --module(ranch_listener). --behaviour(gen_server). - -%% API. --export([start_link/3]). --export([stop/1]). --export([remove_connection/1]). --export([get_port/1]). --export([set_port/2]). --export([get_max_connections/1]). --export([set_max_connections/2]). --export([get_protocol_options/1]). --export([set_protocol_options/2]). - -%% gen_server. --export([init/1]). --export([handle_call/3]). --export([handle_cast/2]). --export([handle_info/2]). --export([terminate/2]). --export([code_change/3]). - --record(state, { - ref :: any(), - max_conns = undefined :: ranch:max_conns(), - port = undefined :: undefined | inet:port_number(), - proto_opts = undefined :: any() -}). - -%% API. - -%% @private --spec start_link(any(), non_neg_integer(), any()) -> {ok, pid()}. -start_link(Ref, MaxConns, ProtoOpts) -> - gen_server:start_link(?MODULE, [Ref, MaxConns, ProtoOpts], []). - -%% @private --spec stop(pid()) -> stopped. -stop(ServerPid) -> - gen_server:call(ServerPid, stop). - -%% @doc Remove this process' connection from the pool. -%% -%% Useful if you have long-lived connections that aren't taking up -%% resources and shouldn't be counted in the limited number of running -%% connections. --spec remove_connection(pid()) -> ok. -remove_connection(ServerPid) -> - ConnsSup = ranch_server:find_connections_sup(ServerPid), - ConnsSup ! {remove_connection, ServerPid}, - ok. - -%% @doc Return the listener's port. --spec get_port(pid()) -> {ok, inet:port_number()}. -get_port(ServerPid) -> - gen_server:call(ServerPid, get_port). - -%% @private --spec set_port(pid(), inet:port_number()) -> ok. -set_port(ServerPid, Port) -> - gen_server:cast(ServerPid, {set_port, Port}). - -%% @doc Return the max number of connections allowed concurrently. --spec get_max_connections(pid()) -> {ok, ranch:max_conns()}. -get_max_connections(ServerPid) -> - gen_server:call(ServerPid, get_max_connections). - -%% @doc Set the max number of connections allowed concurrently. --spec set_max_connections(pid(), ranch:max_conns()) -> ok. -set_max_connections(ServerPid, MaxConnections) -> - gen_server:call(ServerPid, {set_max_connections, MaxConnections}). - -%% @doc Return the current protocol options. --spec get_protocol_options(pid()) -> {ok, any()}. -get_protocol_options(ServerPid) -> - gen_server:call(ServerPid, get_protocol_options). - -%% @doc Upgrade the protocol options. --spec set_protocol_options(pid(), any()) -> ok. -set_protocol_options(ServerPid, ProtoOpts) -> - gen_server:call(ServerPid, {set_protocol_options, ProtoOpts}). - -%% gen_server. - -%% @private -init([Ref, MaxConns, ProtoOpts]) -> - ok = ranch_server:insert_listener(Ref, self()), - {ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}. - -%% @private -handle_call(get_port, _From, State=#state{port=Port}) -> - {reply, {ok, Port}, State}; -handle_call(get_max_connections, _From, State=#state{max_conns=MaxConns}) -> - {reply, {ok, MaxConns}, State}; -handle_call({set_max_connections, MaxConnections}, _From, - State=#state{ref=Ref}) -> - ConnsSup = ranch_server:lookup_connections_sup(Ref), - ConnsSup ! {set_max_conns, MaxConnections}, - {reply, ok, State#state{max_conns=MaxConnections}}; -handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) -> - {reply, {ok, ProtoOpts}, State}; -handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{ref=Ref}) -> - ConnsSup = ranch_server:lookup_connections_sup(Ref), - ConnsSup ! {set_opts, ProtoOpts}, - {reply, ok, State#state{proto_opts=ProtoOpts}}; -handle_call(stop, _From, State) -> - {stop, normal, stopped, State}; -handle_call(_, _From, State) -> - {reply, ignored, State}. - -%% @private -handle_cast({set_port, Port}, State) -> - {noreply, State#state{port=Port}}; -handle_cast(_Msg, State) -> - {noreply, State}. - -%% @private -handle_info(_Info, State) -> - {noreply, State}. - -%% @private -terminate(_Reason, _State) -> - ok. - -%% @private -code_change(_OldVsn, State, _Extra) -> - {ok, State}. diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl index 1d346aa..9f19123 100644 --- a/src/ranch_listener_sup.erl +++ b/src/ranch_listener_sup.erl @@ -28,18 +28,15 @@ -> {ok, pid()}. start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> MaxConns = proplists:get_value(max_connections, TransOpts, 1024), + ranch_server:set_new_listener_opts(Ref, MaxConns, ProtoOpts), supervisor:start_link(?MODULE, { - Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts + Ref, NbAcceptors, Transport, TransOpts, Protocol }). %% supervisor. -init({Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts}) -> +init({Ref, NbAcceptors, Transport, TransOpts, Protocol}) -> ChildSpecs = [ - %% listener - {ranch_listener, {ranch_listener, start_link, - [Ref, MaxConns, ProtoOpts]}, - permanent, 5000, worker, [ranch_listener]}, %% conns_sup {ranch_conns_sup, {ranch_conns_sup, start_link, [Ref, Transport, Protocol]}, diff --git a/src/ranch_protocol.erl b/src/ranch_protocol.erl index 38a56c8..c788547 100644 --- a/src/ranch_protocol.erl +++ b/src/ranch_protocol.erl @@ -17,7 +17,7 @@ %% Start a new connection process for the given socket. -callback start_link( - ListenerPid::pid(), + Ref::any(), Socket::any(), Transport::module(), ProtocolOptions::any()) diff --git a/src/ranch_server.erl b/src/ranch_server.erl index 77f11c5..d827ae2 100644 --- a/src/ranch_server.erl +++ b/src/ranch_server.erl @@ -18,11 +18,16 @@ %% API. -export([start_link/0]). --export([insert_listener/2]). --export([lookup_listener/1]). +-export([set_new_listener_opts/3]). +-export([cleanup_listener_opts/1]). -export([set_connections_sup/2]). --export([lookup_connections_sup/1]). --export([find_connections_sup/1]). +-export([get_connections_sup/1]). +-export([set_port/2]). +-export([get_port/1]). +-export([set_max_connections/2]). +-export([get_max_connections/1]). +-export([set_protocol_options/2]). +-export([get_protocol_options/1]). -export([count_connections/1]). %% gen_server. @@ -47,38 +52,63 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -%% @doc Insert a listener into the database. --spec insert_listener(any(), pid()) -> ok. -insert_listener(Ref, Pid) -> - true = ets:insert_new(?TAB, {{listener, Ref}, Pid, undefined}), - gen_server:cast(?MODULE, {insert_listener, Ref, Pid}). - -%% @doc Lookup a listener in the database. --spec lookup_listener(any()) -> pid(). -lookup_listener(Ref) -> - ets:lookup_element(?TAB, {listener, Ref}, 2). +%% @private +-spec set_new_listener_opts(any(), ranch:max_conns(), any()) -> ok. +set_new_listener_opts(Ref, MaxConns, Opts) -> + gen_server:call(?MODULE, {set_new_listener_opts, Ref, MaxConns, Opts}). + +%% @doc Cleanup listener options after it has been stopped. +-spec cleanup_listener_opts(any()) -> ok. +cleanup_listener_opts(Ref) -> + _ = ets:delete(?TAB, {port, Ref}), + _ = ets:delete(?TAB, {max_conns, Ref}), + _ = ets:delete(?TAB, {opts, Ref}), + ok. %% @doc Set a connection supervisor associated with specific listener. -spec set_connections_sup(any(), pid()) -> ok. set_connections_sup(Ref, Pid) -> - true = ets:update_element(?TAB, {listener, Ref}, {3, Pid}), - true = ets:insert_new(?TAB, {{conns_sup, lookup_listener(Ref)}, Pid}), - ok. + gen_server:call(?MODULE, {set_connections_sup, Ref, Pid}). -%% @doc Lookup a connection supervisor used by specific listener. --spec lookup_connections_sup(any()) -> pid() | undefined. -lookup_connections_sup(Ref) -> - ets:lookup_element(?TAB, {listener, Ref}, 3). +%% @doc Return the connection supervisor used by specific listener. +-spec get_connections_sup(any()) -> pid(). +get_connections_sup(Ref) -> + ets:lookup_element(?TAB, {conns_sup, Ref}, 2). -%% @doc Find a connection supervisor using the listener pid. --spec find_connections_sup(pid()) -> pid(). -find_connections_sup(Pid) -> - ets:lookup_element(?TAB, {conns_sup, Pid}, 2). +%% @private +-spec set_port(any(), inet:port_number()) -> ok. +set_port(Ref, Port) -> + gen_server:call(?MODULE, {set_port, Ref, Port}). + +%% @doc Return the listener's port. +-spec get_port(any()) -> inet:port_number(). +get_port(Ref) -> + ets:lookup_element(?TAB, {port, Ref}, 2). + +%% @doc Set the max number of connections allowed concurrently. +-spec set_max_connections(any(), ranch:max_conns()) -> ok. +set_max_connections(Ref, MaxConnections) -> + gen_server:call(?MODULE, {set_max_conns, Ref, MaxConnections}). + +%% @doc Return the max number of connections allowed concurrently. +-spec get_max_connections(any()) -> ranch:max_conns(). +get_max_connections(Ref) -> + ets:lookup_element(?TAB, {max_conns, Ref}, 2). + +%% @doc Upgrade the protocol options. +-spec set_protocol_options(any(), any()) -> ok. +set_protocol_options(Ref, ProtoOpts) -> + gen_server:call(?MODULE, {set_opts, Ref, ProtoOpts}). + +%% @doc Return the current protocol options. +-spec get_protocol_options(any()) -> any(). +get_protocol_options(Ref) -> + ets:lookup_element(?TAB, {opts, Ref}, 2). %% @doc Count the number of connections in the connection pool. -spec count_connections(any()) -> non_neg_integer(). count_connections(Ref) -> - ranch_conns_sup:active_connections(lookup_connections_sup(Ref)). + ranch_conns_sup:active_connections(get_connections_sup(Ref)). %% gen_server. @@ -87,14 +117,33 @@ init([]) -> {ok, #state{}}. %% @private +handle_call({set_new_listener_opts, Ref, MaxConns, Opts}, _, State) -> + ets:insert(?TAB, {{max_conns, Ref}, MaxConns}), + ets:insert(?TAB, {{opts, Ref}, Opts}), + {reply, ok, State}; +handle_call({set_connections_sup, Ref, Pid}, _, + State=#state{monitors=Monitors}) -> + true = ets:insert_new(?TAB, {{conns_sup, Ref}, Pid}), + MonitorRef = erlang:monitor(process, Pid), + {reply, ok, State#state{ + monitors=[{{MonitorRef, Pid}, Ref}|Monitors]}}; +handle_call({set_port, Ref, Port}, _, State) -> + true = ets:insert(?TAB, {{port, Ref}, Port}), + {reply, ok, State}; +handle_call({set_max_conns, Ref, MaxConns}, _, State) -> + ets:insert(?TAB, {{max_conns, Ref}, MaxConns}), + ConnsSup = get_connections_sup(Ref), + ConnsSup ! {set_max_conns, MaxConns}, + {reply, ok, State}; +handle_call({set_opts, Ref, Opts}, _, State) -> + ets:insert(?TAB, {{opts, Ref}, Opts}), + ConnsSup = get_connections_sup(Ref), + ConnsSup ! {set_opts, Opts}, + {reply, ok, State}; handle_call(_Request, _From, State) -> {reply, ignore, State}. %% @private -handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) -> - MonitorRef = erlang:monitor(process, Pid), - {noreply, State#state{ - monitors=[{{MonitorRef, Pid}, Ref}|Monitors]}}; handle_cast(_Request, State) -> {noreply, State}. @@ -102,7 +151,7 @@ handle_cast(_Request, State) -> handle_info({'DOWN', MonitorRef, process, Pid, _}, State=#state{monitors=Monitors}) -> {_, Ref} = lists:keyfind({MonitorRef, Pid}, 1, Monitors), - true = ets:delete(?TAB, {listener, Ref}), + true = ets:delete(?TAB, {conns_sup, Ref}), Monitors2 = lists:keydelete({MonitorRef, Pid}, 1, Monitors), {noreply, State#state{monitors=Monitors2}}; handle_info(_Info, State) -> |