From e4124de2c71564d37b3732ede0fe1542de1d6f99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 27 Aug 2012 11:50:35 +0200 Subject: Switch to Ranch for connection handling This is the first of many API incompatible changes. You have been warned. --- src/cowboy_listener.erl | 224 ------------------------------------------------ 1 file changed, 224 deletions(-) delete mode 100644 src/cowboy_listener.erl (limited to 'src/cowboy_listener.erl') diff --git a/src/cowboy_listener.erl b/src/cowboy_listener.erl deleted file mode 100644 index d15851d..0000000 --- a/src/cowboy_listener.erl +++ /dev/null @@ -1,224 +0,0 @@ -%% Copyright (c) 2011-2012, Loïc Hoguin -%% -%% Permission to use, copy, modify, and/or distribute this software for any -%% purpose with or without fee is hereby granted, provided that the above -%% copyright notice and this permission notice appear in all copies. -%% -%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES -%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR -%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES -%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN -%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF -%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -%% @doc Public API for managing listeners. --module(cowboy_listener). --behaviour(gen_server). - --export([start_link/2, stop/1, - add_connection/4, move_connection/3, remove_connection/2, check_upgrades/2, - get_protocol_options/1, set_protocol_options/2]). %% API. --export([init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2, code_change/3]). %% gen_server. - --type pools() :: [{atom(), non_neg_integer()}]. - --record(state, { - req_pools = [] :: pools(), - reqs_table :: ets:tid(), - queue = undefined :: queue(), - max_conns = undefined :: non_neg_integer(), - proto_opts :: any(), - proto_opts_vsn = 1 :: non_neg_integer() -}). - -%% API. - -%% @private -%% -%% We set the process priority to high because cowboy_listener is the central -%% gen_server in Cowboy and is used to manage all the incoming connections. -%% Setting the process priority to high ensures the connection-related code -%% will always be executed when a connection needs it, allowing Cowboy to -%% scale far beyond what it would with a normal priority. --spec start_link(non_neg_integer(), any()) -> {ok, pid()}. -start_link(MaxConns, ProtoOpts) -> - gen_server:start_link(?MODULE, [MaxConns, ProtoOpts], - [{spawn_opt, [{priority, high}]}]). - -%% @private --spec stop(pid()) -> stopped. -stop(ServerPid) -> - gen_server:call(ServerPid, stop). - -%% @doc Add a connection to the given pool in the listener. -%% -%% Pools of connections are used to restrict the maximum number of connections -%% depending on their type. By default, Cowboy add all connections to the -%% pool default. It also checks for the maximum number of connections -%% in that pool before accepting again. This function only returns when there -%% is free space in the pool. -%% -%% When a process managing a connection dies, the process is removed from the -%% pool. If the socket has been sent to another process, it is up to the -%% protocol code to inform the listener of the new ConnPid by removing -%% the previous and adding the new one. -%% -%% This function also returns whether the protocol options have been modified. -%% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of -%% the atom 'ok'. The acceptor can then continue with the new protocol options. --spec add_connection(pid(), atom(), pid(), non_neg_integer()) - -> ok | {upgrade, any(), non_neg_integer()}. -add_connection(ServerPid, Pool, ConnPid, OptsVsn) -> - gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn}, - infinity). - -%% @doc Move a connection from one pool to another. --spec move_connection(pid(), atom(), pid()) -> ok. -move_connection(ServerPid, DestPool, ConnPid) -> - gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}). - -%% @doc Remove the given connection from its pool. --spec remove_connection(pid(), pid()) -> ok. -remove_connection(ServerPid, ConnPid) -> - gen_server:cast(ServerPid, {remove_connection, ConnPid}). - -%% @doc Return whether a protocol upgrade is required. --spec check_upgrades(pid(), non_neg_integer()) - -> ok | {upgrade, any(), non_neg_integer()}. -check_upgrades(ServerPid, OptsVsn) -> - gen_server:call(ServerPid, {check_upgrades, OptsVsn}). - -%% @doc Return the current protocol options. --spec get_protocol_options(pid()) -> {ok, any()}. -get_protocol_options(ServerPid) -> - gen_server:call(ServerPid, get_protocol_options). - -%% @doc Upgrade the protocol options. --spec set_protocol_options(pid(), any()) -> ok. -set_protocol_options(ServerPid, ProtoOpts) -> - gen_server:call(ServerPid, {set_protocol_options, ProtoOpts}). - -%% gen_server. - -%% @private --spec init(list()) -> {ok, #state{}}. -init([MaxConns, ProtoOpts]) -> - ReqsTable = ets:new(requests_table, [set, private]), - Queue = queue:new(), - {ok, #state{reqs_table=ReqsTable, max_conns=MaxConns, - proto_opts=ProtoOpts, queue=Queue}}. - -%% @private --spec handle_call(_, _, State) - -> {reply, ignored, State} | {stop, normal, stopped, State}. -handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{ - req_pools=Pools, reqs_table=ReqsTable, - queue=Queue, max_conns=MaxConns, - proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> - {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ReqsTable), - State2 = State#state{req_pools=Pools2}, - if AccOptsVsn =/= LisOptsVsn -> - {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2}; - NbConns > MaxConns -> - Queue2 = queue:in(From, Queue), - {noreply, State2#state{queue=Queue2}}; - true -> - {reply, ok, State2} - end; -handle_call({check_upgrades, AccOptsVsn}, _From, State=#state{ - proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> - if AccOptsVsn =/= LisOptsVsn -> - {reply, {upgrade, ProtoOpts, LisOptsVsn}, State}; - true -> - {reply, ok, State} - end; -handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) -> - {reply, {ok, ProtoOpts}, State}; -handle_call({set_protocol_options, ProtoOpts}, _From, - State=#state{proto_opts_vsn=OptsVsn}) -> - {reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}}; -handle_call(stop, _From, State) -> - {stop, normal, stopped, State}; -handle_call(_Request, _From, State) -> - {reply, ignored, State}. - -%% @private --spec handle_cast(_, State) -> {noreply, State}. -handle_cast({move_connection, DestPool, ConnPid}, State=#state{ - req_pools=Pools, reqs_table=ReqsTable}) -> - Pools2 = move_pid(ConnPid, DestPool, Pools, ReqsTable), - {noreply, State#state{req_pools=Pools2}}; -handle_cast({remove_connection, ConnPid}, State=#state{ - req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) -> - {Pools2, Queue2} = remove_pid(ConnPid, Pools, ReqsTable, Queue), - {noreply, State#state{req_pools=Pools2, queue=Queue2}}; -handle_cast(_Msg, State) -> - {noreply, State}. - -%% @private --spec handle_info(_, State) -> {noreply, State}. -handle_info({'DOWN', _Ref, process, Pid, _Info}, State=#state{ - req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) -> - {Pools2, Queue2} = remove_pid(Pid, Pools, ReqsTable, Queue), - {noreply, State#state{req_pools=Pools2, queue=Queue2}}; -handle_info(_Info, State) -> - {noreply, State}. - -%% @private --spec terminate(_, _) -> ok. -terminate(_Reason, _State) -> - ok. - -%% @private --spec code_change(_, State, _) -> {ok, State}. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%% Internal. - -%% @private --spec add_pid(pid(), atom(), pools(), ets:tid()) - -> {non_neg_integer(), pools()}. -add_pid(ConnPid, Pool, Pools, ReqsTable) -> - MonitorRef = erlang:monitor(process, ConnPid), - ConnPid ! {shoot, self()}, - {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of - false -> - {1, [{Pool, 1}|Pools]}; - {Pool, NbConns} -> - NbConns2 = NbConns + 1, - {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} - end, - ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}), - {NbConnsRet, Pools2}. - -%% @private --spec move_pid(pid(), atom(), pools(), ets:tid()) -> pools(). -move_pid(ConnPid, DestPool, Pools, ReqsTable) -> - {MonitorRef, SrcPool} = ets:lookup_element(ReqsTable, ConnPid, 2), - ets:insert(ReqsTable, {ConnPid, {MonitorRef, DestPool}}), - {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools), - DestNbConns = case lists:keyfind(DestPool, 1, Pools) of - false -> 1; - {DestPool, NbConns} -> NbConns + 1 - end, - Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)), - [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2]. - -%% @private --spec remove_pid(pid(), pools(), ets:tid(), queue()) -> {pools(), queue()}. -remove_pid(Pid, Pools, ReqsTable, Queue) -> - {MonitorRef, Pool} = ets:lookup_element(ReqsTable, Pid, 2), - erlang:demonitor(MonitorRef, [flush]), - {Pool, NbConns} = lists:keyfind(Pool, 1, Pools), - Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)], - ets:delete(ReqsTable, Pid), - case queue:out(Queue) of - {{value, Client}, Queue2} -> - gen_server:reply(Client, ok), - {Pools2, Queue2}; - _ -> - {Pools2, Queue} - end. -- cgit v1.2.3