diff options
Diffstat (limited to 'src/ranch_listener.erl')
-rw-r--r-- | src/ranch_listener.erl | 224 |
1 files changed, 224 insertions, 0 deletions
diff --git a/src/ranch_listener.erl b/src/ranch_listener.erl new file mode 100644 index 0000000..fbcb87a --- /dev/null +++ b/src/ranch_listener.erl @@ -0,0 +1,224 @@ +%% Copyright (c) 2011-2012, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @doc Public API for managing listeners. +-module(ranch_listener). +-behaviour(gen_server). + +-export([start_link/2, stop/1, + add_connection/4, move_connection/3, remove_connection/2, check_upgrades/2, + get_protocol_options/1, set_protocol_options/2]). %% API. +-export([init/1, handle_call/3, handle_cast/2, + handle_info/2, terminate/2, code_change/3]). %% gen_server. + +-type pools() :: [{atom(), non_neg_integer()}]. + +-record(state, { + conn_pools = [] :: pools(), + conns_table :: ets:tid(), + queue = undefined :: queue(), + max_conns = undefined :: non_neg_integer(), + proto_opts :: any(), + proto_opts_vsn = 1 :: non_neg_integer() +}). + +%% API. + +%% @private +%% +%% We set the process priority to high because ranch_listener is the central +%% gen_server in Ranch and is used to manage all the incoming connections. +%% Setting the process priority to high ensures the connection-related code +%% will always be executed when a connection needs it, allowing Ranch to +%% scale far beyond what it would with a normal priority. +-spec start_link(non_neg_integer(), any()) -> {ok, pid()}. +start_link(MaxConns, ProtoOpts) -> + gen_server:start_link(?MODULE, [MaxConns, ProtoOpts], + [{spawn_opt, [{priority, high}]}]). + +%% @private +-spec stop(pid()) -> stopped. +stop(ServerPid) -> + gen_server:call(ServerPid, stop). + +%% @doc Add a connection to the given pool in the listener. +%% +%% Pools of connections are used to restrict the maximum number of connections +%% depending on their type. By default, Ranch add all connections to the +%% pool <em>default</em>. It also checks for the maximum number of connections +%% in that pool before accepting again. This function only returns when there +%% is free space in the pool. +%% +%% When a process managing a connection dies, the process is removed from the +%% pool. If the socket has been sent to another process, it is up to the +%% protocol code to inform the listener of the new <em>ConnPid</em> by removing +%% the previous and adding the new one. +%% +%% This function also returns whether the protocol options have been modified. +%% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of +%% the atom 'ok'. The acceptor can then continue with the new protocol options. +-spec add_connection(pid(), atom(), pid(), non_neg_integer()) + -> ok | {upgrade, any(), non_neg_integer()}. +add_connection(ServerPid, Pool, ConnPid, OptsVsn) -> + gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn}, + infinity). + +%% @doc Move a connection from one pool to another. +-spec move_connection(pid(), atom(), pid()) -> ok. +move_connection(ServerPid, DestPool, ConnPid) -> + gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}). + +%% @doc Remove the given connection from its pool. +-spec remove_connection(pid(), pid()) -> ok. +remove_connection(ServerPid, ConnPid) -> + gen_server:cast(ServerPid, {remove_connection, ConnPid}). + +%% @doc Return whether a protocol upgrade is required. +-spec check_upgrades(pid(), non_neg_integer()) + -> ok | {upgrade, any(), non_neg_integer()}. +check_upgrades(ServerPid, OptsVsn) -> + gen_server:call(ServerPid, {check_upgrades, OptsVsn}). + +%% @doc Return the current protocol options. +-spec get_protocol_options(pid()) -> {ok, any()}. +get_protocol_options(ServerPid) -> + gen_server:call(ServerPid, get_protocol_options). + +%% @doc Upgrade the protocol options. +-spec set_protocol_options(pid(), any()) -> ok. +set_protocol_options(ServerPid, ProtoOpts) -> + gen_server:call(ServerPid, {set_protocol_options, ProtoOpts}). + +%% gen_server. + +%% @private +-spec init(list()) -> {ok, #state{}}. +init([MaxConns, ProtoOpts]) -> + ConnsTable = ets:new(connections_table, [set, private]), + Queue = queue:new(), + {ok, #state{conns_table=ConnsTable, max_conns=MaxConns, + proto_opts=ProtoOpts, queue=Queue}}. + +%% @private +-spec handle_call(_, _, State) + -> {reply, ignored, State} | {stop, normal, stopped, State}. +handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{ + conn_pools=Pools, conns_table=ConnsTable, + queue=Queue, max_conns=MaxConns, + proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> + {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ConnsTable), + State2 = State#state{conn_pools=Pools2}, + if AccOptsVsn =/= LisOptsVsn -> + {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2}; + NbConns > MaxConns -> + Queue2 = queue:in(From, Queue), + {noreply, State2#state{queue=Queue2}}; + true -> + {reply, ok, State2} + end; +handle_call({check_upgrades, AccOptsVsn}, _From, State=#state{ + proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> + if AccOptsVsn =/= LisOptsVsn -> + {reply, {upgrade, ProtoOpts, LisOptsVsn}, State}; + true -> + {reply, ok, State} + end; +handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) -> + {reply, {ok, ProtoOpts}, State}; +handle_call({set_protocol_options, ProtoOpts}, _From, + State=#state{proto_opts_vsn=OptsVsn}) -> + {reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}}; +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}; +handle_call(_, _From, State) -> + {reply, ignored, State}. + +%% @private +-spec handle_cast(_, State) -> {noreply, State}. +handle_cast({move_connection, DestPool, ConnPid}, State=#state{ + conn_pools=Pools, conns_table=ConnsTable}) -> + Pools2 = move_pid(ConnPid, DestPool, Pools, ConnsTable), + {noreply, State#state{conn_pools=Pools2}}; +handle_cast({remove_connection, ConnPid}, State=#state{ + conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) -> + {Pools2, Queue2} = remove_pid(ConnPid, Pools, ConnsTable, Queue), + {noreply, State#state{conn_pools=Pools2, queue=Queue2}}; +handle_cast(_Msg, State) -> + {noreply, State}. + +%% @private +-spec handle_info(_, State) -> {noreply, State}. +handle_info({'DOWN', _Ref, process, Pid, _Info}, State=#state{ + conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) -> + {Pools2, Queue2} = remove_pid(Pid, Pools, ConnsTable, Queue), + {noreply, State#state{conn_pools=Pools2, queue=Queue2}}; +handle_info(_Info, State) -> + {noreply, State}. + +%% @private +-spec terminate(_, _) -> ok. +terminate(_Reason, _State) -> + ok. + +%% @private +-spec code_change(_, State, _) -> {ok, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% Internal. + +%% @private +-spec add_pid(pid(), atom(), pools(), ets:tid()) + -> {non_neg_integer(), pools()}. +add_pid(ConnPid, Pool, Pools, ConnsTable) -> + MonitorRef = erlang:monitor(process, ConnPid), + ConnPid ! {shoot, self()}, + {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of + false -> + {1, [{Pool, 1}|Pools]}; + {Pool, NbConns} -> + NbConns2 = NbConns + 1, + {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} + end, + ets:insert(ConnsTable, {ConnPid, {MonitorRef, Pool}}), + {NbConnsRet, Pools2}. + +%% @private +-spec move_pid(pid(), atom(), pools(), ets:tid()) -> pools(). +move_pid(ConnPid, DestPool, Pools, ConnsTable) -> + {MonitorRef, SrcPool} = ets:lookup_element(ConnsTable, ConnPid, 2), + ets:insert(ConnsTable, {ConnPid, {MonitorRef, DestPool}}), + {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools), + DestNbConns = case lists:keyfind(DestPool, 1, Pools) of + false -> 1; + {DestPool, NbConns} -> NbConns + 1 + end, + Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)), + [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2]. + +%% @private +-spec remove_pid(pid(), pools(), ets:tid(), queue()) -> {pools(), queue()}. +remove_pid(Pid, Pools, ConnsTable, Queue) -> + {MonitorRef, Pool} = ets:lookup_element(ConnsTable, Pid, 2), + erlang:demonitor(MonitorRef, [flush]), + {Pool, NbConns} = lists:keyfind(Pool, 1, Pools), + Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)], + ets:delete(ConnsTable, Pid), + case queue:out(Queue) of + {{value, Client}, Queue2} -> + gen_server:reply(Client, ok), + {Pools2, Queue2}; + _ -> + {Pools2, Queue} + end. |