diff options
-rw-r--r-- | src/cowboy.erl | 5 | ||||
-rw-r--r-- | src/cowboy_acceptor.erl | 35 | ||||
-rw-r--r-- | src/cowboy_acceptors_sup.erl | 19 | ||||
-rw-r--r-- | src/cowboy_listener.erl | 151 | ||||
-rw-r--r-- | src/cowboy_listener_sup.erl | 10 |
5 files changed, 192 insertions, 28 deletions
diff --git a/src/cowboy.erl b/src/cowboy.erl index 4a82cc1..30730cb 100644 --- a/src/cowboy.erl +++ b/src/cowboy.erl @@ -32,6 +32,11 @@ %% performance. The exact number depends of course on your hardware, on the %% protocol used and on the number of expected simultaneous connections. %% +%% The <em>Transport</em> option <em>max_connections</em> allows you to define +%% the maximum number of simultaneous connections for this listener. It defaults +%% to 1024. See <em>cowboy_listener</em> for more details on limiting the number +%% of connections. +%% %% Although Cowboy includes a <em>cowboy_http_protocol</em> handler, other %% handlers can be created for different protocols like IRC, FTP and more. %% diff --git a/src/cowboy_acceptor.erl b/src/cowboy_acceptor.erl index cc8dfa3..be63cef 100644 --- a/src/cowboy_acceptor.erl +++ b/src/cowboy_acceptor.erl @@ -15,29 +15,32 @@ %% @private -module(cowboy_acceptor). --export([start_link/6]). %% API. --export([acceptor/6]). %% Internal. +-export([start_link/7]). %% API. +-export([acceptor/7]). %% Internal. %% API. -spec start_link(inet:socket(), module(), module(), any(), - non_neg_integer(), pid()) -> {ok, pid()}. -start_link(LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup) -> + non_neg_integer(), pid(), pid()) -> {ok, pid()}. +start_link(LSocket, Transport, Protocol, Opts, + MaxConns, ListenerPid, ReqsSup) -> Pid = spawn_link(?MODULE, acceptor, - [LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup]), + [LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup]), {ok, Pid}. %% Internal. -spec acceptor(inet:socket(), module(), module(), any(), - non_neg_integer(), pid()) -> no_return(). -acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup) -> + non_neg_integer(), pid(), pid()) -> no_return(). +acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) -> case Transport:accept(LSocket, 2000) of {ok, CSocket} -> {ok, Pid} = supervisor:start_child(ReqsSup, [CSocket, Transport, Protocol, Opts]), Transport:controlling_process(CSocket, Pid), - limit_reqs(MaxConns, ReqsSup); + {ok, NbConns} = cowboy_listener:add_connection(ListenerPid, + default, Pid), + limit_reqs(ListenerPid, NbConns, MaxConns); {error, timeout} -> ignore; {error, _Reason} -> @@ -45,13 +48,11 @@ acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup) -> %% we may want to try and listen again on the port? ignore end, - ?MODULE:acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup). + ?MODULE:acceptor(LSocket, Transport, Protocol, Opts, + MaxConns, ListenerPid, ReqsSup). --spec limit_reqs(non_neg_integer(), pid()) -> ok. -limit_reqs(MaxConns, ReqsSup) -> - Counts = supervisor:count_children(ReqsSup), - Active = lists:keyfind(active, 1, Counts), - case Active < MaxConns of - true -> ok; - false -> timer:sleep(1) - end. +-spec limit_reqs(pid(), non_neg_integer(), non_neg_integer()) -> ok. +limit_reqs(_ListenerPid, NbConns, MaxConns) when NbConns =< MaxConns -> + ok; +limit_reqs(ListenerPid, _NbConns, MaxConns) -> + cowboy_listener:wait(ListenerPid, default, MaxConns). diff --git a/src/cowboy_acceptors_sup.erl b/src/cowboy_acceptors_sup.erl index c12aeb5..3d57610 100644 --- a/src/cowboy_acceptors_sup.erl +++ b/src/cowboy_acceptors_sup.erl @@ -16,25 +16,28 @@ -module(cowboy_acceptors_sup). -behaviour(supervisor). --export([start_link/6]). %% API. +-export([start_link/7]). %% API. -export([init/1]). %% supervisor. %% API. --spec start_link(non_neg_integer(), module(), any(), module(), any(), pid()) - -> {ok, pid()}. -start_link(NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ReqsPid) -> - supervisor:start_link(?MODULE, [NbAcceptors, - Transport, TransOpts, Protocol, ProtoOpts, ReqsPid]). +-spec start_link(non_neg_integer(), module(), any(), + module(), any(), pid(), pid()) -> {ok, pid()}. +start_link(NbAcceptors, Transport, TransOpts, + Protocol, ProtoOpts, ListenerPid, ReqsPid) -> + supervisor:start_link(?MODULE, [NbAcceptors, Transport, TransOpts, + Protocol, ProtoOpts, ListenerPid, ReqsPid]). %% supervisor. -spec init(list()) -> {ok, {{one_for_one, 10, 10}, list()}}. -init([NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ReqsPid]) -> +init([NbAcceptors, Transport, TransOpts, + Protocol, ProtoOpts, ListenerPid, ReqsPid]) -> {ok, LSocket} = Transport:listen(TransOpts), MaxConns = proplists:get_value(max_connections, TransOpts, 1024), Procs = [{{acceptor, self(), N}, {cowboy_acceptor, start_link, [ - LSocket, Transport, Protocol, ProtoOpts, MaxConns, ReqsPid + LSocket, Transport, Protocol, ProtoOpts, + MaxConns, ListenerPid, ReqsPid ]}, permanent, brutal_kill, worker, dynamic} || N <- lists:seq(1, NbAcceptors)], {ok, {{one_for_one, 10, 10}, Procs}}. diff --git a/src/cowboy_listener.erl b/src/cowboy_listener.erl new file mode 100644 index 0000000..2f5ccc2 --- /dev/null +++ b/src/cowboy_listener.erl @@ -0,0 +1,151 @@ +%% Copyright (c) 2011, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @doc Public API for managing listeners. +-module(cowboy_listener). +-behaviour(gen_server). + +-export([start_link/0, stop/1, + add_connection/3, remove_connection/2, wait/3]). %% API. +-export([init/1, handle_call/3, handle_cast/2, + handle_info/2, terminate/2, code_change/3]). %% gen_server. + +-record(state, { + req_pools = [] :: [{atom(), non_neg_integer()}], + reqs_table :: ets:tid(), + queue = [] :: [{pid(), reference()}] +}). + +%% API. + +%% @private +-spec start_link() -> {ok, pid()}. +start_link() -> + gen_server:start_link(?MODULE, [], []). + +%% @private +-spec stop(pid()) -> stopped. +stop(ServerPid) -> + gen_server:call(ServerPid, stop). + +%% @doc Add a connection to the given pool in the listener. +%% +%% Pools of connections are used to restrict the maximum number of connections +%% depending on their type. By default, Cowboy add all connections to the +%% pool <em>default</em>. It also checks for the maximum number of connections +%% in that pool before accepting again. +%% +%% When a process managing a connection dies, the process is removed from the +%% pool. If the socket has been sent to another process, it is up to the +%% protocol code to inform the listener of the new <em>ConnPid</em> by removing +%% the previous and adding the new one. +-spec add_connection(pid(), atom(), pid()) -> {ok, non_neg_integer()}. +add_connection(ServerPid, Pool, ConnPid) -> + gen_server:call(ServerPid, {add_connection, Pool, ConnPid}). + +%% @doc Remove the given connection from its pool. +-spec remove_connection(pid(), pid()) -> ok. +remove_connection(ServerPid, ConnPid) -> + gen_server:cast(ServerPid, {remove_connection, ConnPid}). + +%% @doc Wait until the number of connections in the given pool gets below +%% the given threshold. +%% +%% This function will not return until the number of connections in the pool +%% gets below <em>MaxConns</em>. It makes use of <em>gen_server:reply/2</em> +%% to make the process wait for a reply indefinitely. +-spec wait(pid(), atom(), non_neg_integer()) -> ok. +wait(ServerPid, Pool, MaxConns) -> + gen_server:call(ServerPid, {wait, Pool, MaxConns}, infinity). + +%% gen_server. + +%% @private +-spec init([]) -> {ok, #state{}}. +init([]) -> + ReqsTablePid = ets:new(requests_table, [set, private]), + {ok, #state{reqs_table=ReqsTablePid}}. + +%% @private +-spec handle_call(_, _, State) + -> {reply, ignored, State} | {stop, normal, stopped, State}. +handle_call({add_connection, Pool, ConnPid}, _From, State=#state{ + req_pools=Pools, reqs_table=ReqsTable}) -> + MonitorRef = erlang:monitor(process, ConnPid), + {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of + false -> + {1, [{Pool, 1}|Pools]}; + {Pool, NbConns} -> + NbConns2 = NbConns + 1, + {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} + end, + ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}), + {reply, {ok, NbConnsRet}, State#state{req_pools=Pools2}}; +handle_call({wait, Pool, MaxConns}, From, State=#state{ + req_pools=Pools, queue=Queue}) -> + case lists:keyfind(Pool, 1, Pools) of + {Pool, NbConns} when NbConns > MaxConns -> + {noreply, State#state{queue=[From|Queue]}}; + _Any -> + {reply, ok, State} + end; +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}; +handle_call(_Request, _From, State) -> + {reply, ignored, State}. + +%% @private +-spec handle_cast(_, State) -> {noreply, State}. +handle_cast({remove_connection, ConnPid}, State) -> + State2 = remove_pid(ConnPid, State), + {noreply, State2}; +handle_cast(_Msg, State) -> + {noreply, State}. + +%% @private +-spec handle_info(_, State) -> {noreply, State}. +handle_info({'DOWN', _Ref, process, Pid, _Info}, State) -> + State2 = remove_pid(Pid, State), + {noreply, State2}; +handle_info(_Info, State) -> + {noreply, State}. + +%% @private +-spec terminate(_, _) -> ok. +terminate(_Reason, _State) -> + ok. + +%% @private +-spec code_change(_, State, _) -> {ok, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% Internal. + +%% @private +-spec remove_pid(pid(), State) -> State. +remove_pid(Pid, State=#state{ + req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) -> + {MonitorRef, Pool} = ets:lookup_element(ReqsTable, Pid, 2), + erlang:demonitor(MonitorRef, [flush]), + {Pool, NbConns} = lists:keyfind(Pool, 1, Pools), + Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)], + ets:delete(ReqsTable, Pid), + case Queue of + [] -> + State#state{req_pools=Pools2}; + [Client|Queue2] -> + gen_server:reply(Client, ok), + State#state{req_pools=Pools2, queue=Queue2} + end. diff --git a/src/cowboy_listener_sup.erl b/src/cowboy_listener_sup.erl index 0ed662f..adf5262 100644 --- a/src/cowboy_listener_sup.erl +++ b/src/cowboy_listener_sup.erl @@ -25,17 +25,21 @@ -> {ok, pid()}. start_link(NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> {ok, SupPid} = supervisor:start_link(?MODULE, []), + {ok, ListenerPid} = supervisor:start_child(SupPid, + {cowboy_listener, {cowboy_listener, start_link, []}, + permanent, 5000, worker, dynamic}), {ok, ReqsPid} = supervisor:start_child(SupPid, {cowboy_requests_sup, {cowboy_requests_sup, start_link, []}, permanent, 5000, supervisor, [cowboy_requests_sup]}), {ok, _PoolPid} = supervisor:start_child(SupPid, {cowboy_acceptors_sup, {cowboy_acceptors_sup, start_link, [ - NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ReqsPid + NbAcceptors, Transport, TransOpts, + Protocol, ProtoOpts, ListenerPid, ReqsPid ]}, permanent, 5000, supervisor, [cowboy_acceptors_sup]}), {ok, SupPid}. %% supervisor. --spec init([]) -> {ok, {{one_for_one, 0, 1}, []}}. +-spec init([]) -> {ok, {{one_for_all, 10, 10}, []}}. init([]) -> - {ok, {{one_for_one, 0, 1}, []}}. + {ok, {{one_for_all, 10, 10}, []}}. |