%% Copyright (c) 2011-2012, Loïc Hoguin %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above %% copyright notice and this permission notice appear in all copies. %% %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. %% @doc Public API for managing listeners. -module(cowboy_listener). -behaviour(gen_server). -export([start_link/2, stop/1, add_connection/4, move_connection/3, remove_connection/2, check_upgrades/2, get_protocol_options/1, set_protocol_options/2]). %% API. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% gen_server. -type pools() :: [{atom(), non_neg_integer()}]. -record(state, { req_pools = [] :: pools(), reqs_table :: ets:tid(), queue = undefined :: queue(), max_conns = undefined :: non_neg_integer(), proto_opts :: any(), proto_opts_vsn = 1 :: non_neg_integer() }). %% API. %% @private %% %% We set the process priority to high because cowboy_listener is the central %% gen_server in Cowboy and is used to manage all the incoming connections. %% Setting the process priority to high ensures the connection-related code %% will always be executed when a connection needs it, allowing Cowboy to %% scale far beyond what it would with a normal priority. -spec start_link(non_neg_integer(), any()) -> {ok, pid()}. start_link(MaxConns, ProtoOpts) -> gen_server:start_link(?MODULE, [MaxConns, ProtoOpts], [{spawn_opt, [{priority, high}]}]). %% @private -spec stop(pid()) -> stopped. stop(ServerPid) -> gen_server:call(ServerPid, stop). %% @doc Add a connection to the given pool in the listener. %% %% Pools of connections are used to restrict the maximum number of connections %% depending on their type. By default, Cowboy add all connections to the %% pool default. It also checks for the maximum number of connections %% in that pool before accepting again. This function only returns when there %% is free space in the pool. %% %% When a process managing a connection dies, the process is removed from the %% pool. If the socket has been sent to another process, it is up to the %% protocol code to inform the listener of the new ConnPid by removing %% the previous and adding the new one. %% %% This function also returns whether the protocol options have been modified. %% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of %% the atom 'ok'. The acceptor can then continue with the new protocol options. -spec add_connection(pid(), atom(), pid(), non_neg_integer()) -> ok | {upgrade, any(), non_neg_integer()}. add_connection(ServerPid, Pool, ConnPid, OptsVsn) -> gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn}, infinity). %% @doc Move a connection from one pool to another. -spec move_connection(pid(), atom(), pid()) -> ok. move_connection(ServerPid, DestPool, ConnPid) -> gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}). %% @doc Remove the given connection from its pool. -spec remove_connection(pid(), pid()) -> ok. remove_connection(ServerPid, ConnPid) -> gen_server:cast(ServerPid, {remove_connection, ConnPid}). %% @doc Return whether a protocol upgrade is required. -spec check_upgrades(pid(), non_neg_integer()) -> ok | {upgrade, any(), non_neg_integer()}. check_upgrades(ServerPid, OptsVsn) -> gen_server:call(ServerPid, {check_upgrades, OptsVsn}). %% @doc Return the current protocol options. -spec get_protocol_options(pid()) -> {ok, any()}. get_protocol_options(ServerPid) -> gen_server:call(ServerPid, get_protocol_options). %% @doc Upgrade the protocol options. -spec set_protocol_options(pid(), any()) -> ok. set_protocol_options(ServerPid, ProtoOpts) -> gen_server:call(ServerPid, {set_protocol_options, ProtoOpts}). %% gen_server. %% @private -spec init(list()) -> {ok, #state{}}. init([MaxConns, ProtoOpts]) -> ReqsTable = ets:new(requests_table, [set, private]), Queue = queue:new(), {ok, #state{reqs_table=ReqsTable, max_conns=MaxConns, proto_opts=ProtoOpts, queue=Queue}}. %% @private -spec handle_call(_, _, State) -> {reply, ignored, State} | {stop, normal, stopped, State}. handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{ req_pools=Pools, reqs_table=ReqsTable, queue=Queue, max_conns=MaxConns, proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ReqsTable), State2 = State#state{req_pools=Pools2}, if AccOptsVsn =/= LisOptsVsn -> {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2}; NbConns > MaxConns -> Queue2 = queue:in(From, Queue), {noreply, State2#state{queue=Queue2}}; true -> {reply, ok, State2} end; handle_call({check_upgrades, AccOptsVsn}, _From, State=#state{ proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> if AccOptsVsn =/= LisOptsVsn -> {reply, {upgrade, ProtoOpts, LisOptsVsn}, State}; true -> {reply, ok, State} end; handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) -> {reply, {ok, ProtoOpts}, State}; handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{proto_opts_vsn=OptsVsn}) -> {reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}}; handle_call(stop, _From, State) -> {stop, normal, stopped, State}; handle_call(_Request, _From, State) -> {reply, ignored, State}. %% @private -spec handle_cast(_, State) -> {noreply, State}. handle_cast({move_connection, DestPool, ConnPid}, State=#state{ req_pools=Pools, reqs_table=ReqsTable}) -> Pools2 = move_pid(ConnPid, DestPool, Pools, ReqsTable), {noreply, State#state{req_pools=Pools2}}; handle_cast({remove_connection, ConnPid}, State=#state{ req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) -> {Pools2, Queue2} = remove_pid(ConnPid, Pools, ReqsTable, Queue), {noreply, State#state{req_pools=Pools2, queue=Queue2}}; handle_cast(_Msg, State) -> {noreply, State}. %% @private -spec handle_info(_, State) -> {noreply, State}. handle_info({'DOWN', _Ref, process, Pid, _Info}, State=#state{ req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) -> {Pools2, Queue2} = remove_pid(Pid, Pools, ReqsTable, Queue), {noreply, State#state{req_pools=Pools2, queue=Queue2}}; handle_info(_Info, State) -> {noreply, State}. %% @private -spec terminate(_, _) -> ok. terminate(_Reason, _State) -> ok. %% @private -spec code_change(_, State, _) -> {ok, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}. %% Internal. %% @private -spec add_pid(pid(), atom(), pools(), ets:tid()) -> {non_neg_integer(), pools()}. add_pid(ConnPid, Pool, Pools, ReqsTable) -> MonitorRef = erlang:monitor(process, ConnPid), ConnPid ! {shoot, self()}, {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of false -> {1, [{Pool, 1}|Pools]}; {Pool, NbConns} -> NbConns2 = NbConns + 1, {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} end, ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}), {NbConnsRet, Pools2}. %% @private -spec move_pid(pid(), atom(), pools(), ets:tid()) -> pools(). move_pid(ConnPid, DestPool, Pools, ReqsTable) -> {MonitorRef, SrcPool} = ets:lookup_element(ReqsTable, ConnPid, 2), ets:insert(ReqsTable, {ConnPid, {MonitorRef, DestPool}}), {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools), DestNbConns = case lists:keyfind(DestPool, 1, Pools) of false -> 1; {DestPool, NbConns} -> NbConns + 1 end, Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)), [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2]. %% @private -spec remove_pid(pid(), pools(), ets:tid(), queue()) -> {pools(), queue()}. remove_pid(Pid, Pools, ReqsTable, Queue) -> {MonitorRef, Pool} = ets:lookup_element(ReqsTable, Pid, 2), erlang:demonitor(MonitorRef, [flush]), {Pool, NbConns} = lists:keyfind(Pool, 1, Pools), Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)], ets:delete(ReqsTable, Pid), case queue:out(Queue) of {{value, Client}, Queue2} -> gen_server:reply(Client, ok), {Pools2, Queue2}; _ -> {Pools2, Queue} end.