aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/cowboy.erl5
-rw-r--r--src/cowboy_acceptor.erl35
-rw-r--r--src/cowboy_acceptors_sup.erl19
-rw-r--r--src/cowboy_listener.erl151
-rw-r--r--src/cowboy_listener_sup.erl10
5 files changed, 192 insertions, 28 deletions
diff --git a/src/cowboy.erl b/src/cowboy.erl
index 4a82cc1..30730cb 100644
--- a/src/cowboy.erl
+++ b/src/cowboy.erl
@@ -32,6 +32,11 @@
%% performance. The exact number depends of course on your hardware, on the
%% protocol used and on the number of expected simultaneous connections.
%%
+%% The <em>Transport</em> option <em>max_connections</em> allows you to define
+%% the maximum number of simultaneous connections for this listener. It defaults
+%% to 1024. See <em>cowboy_listener</em> for more details on limiting the number
+%% of connections.
+%%
%% Although Cowboy includes a <em>cowboy_http_protocol</em> handler, other
%% handlers can be created for different protocols like IRC, FTP and more.
%%
diff --git a/src/cowboy_acceptor.erl b/src/cowboy_acceptor.erl
index cc8dfa3..be63cef 100644
--- a/src/cowboy_acceptor.erl
+++ b/src/cowboy_acceptor.erl
@@ -15,29 +15,32 @@
%% @private
-module(cowboy_acceptor).
--export([start_link/6]). %% API.
--export([acceptor/6]). %% Internal.
+-export([start_link/7]). %% API.
+-export([acceptor/7]). %% Internal.
%% API.
-spec start_link(inet:socket(), module(), module(), any(),
- non_neg_integer(), pid()) -> {ok, pid()}.
-start_link(LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup) ->
+ non_neg_integer(), pid(), pid()) -> {ok, pid()}.
+start_link(LSocket, Transport, Protocol, Opts,
+ MaxConns, ListenerPid, ReqsSup) ->
Pid = spawn_link(?MODULE, acceptor,
- [LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup]),
+ [LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup]),
{ok, Pid}.
%% Internal.
-spec acceptor(inet:socket(), module(), module(), any(),
- non_neg_integer(), pid()) -> no_return().
-acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup) ->
+ non_neg_integer(), pid(), pid()) -> no_return().
+acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) ->
case Transport:accept(LSocket, 2000) of
{ok, CSocket} ->
{ok, Pid} = supervisor:start_child(ReqsSup,
[CSocket, Transport, Protocol, Opts]),
Transport:controlling_process(CSocket, Pid),
- limit_reqs(MaxConns, ReqsSup);
+ {ok, NbConns} = cowboy_listener:add_connection(ListenerPid,
+ default, Pid),
+ limit_reqs(ListenerPid, NbConns, MaxConns);
{error, timeout} ->
ignore;
{error, _Reason} ->
@@ -45,13 +48,11 @@ acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup) ->
%% we may want to try and listen again on the port?
ignore
end,
- ?MODULE:acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ReqsSup).
+ ?MODULE:acceptor(LSocket, Transport, Protocol, Opts,
+ MaxConns, ListenerPid, ReqsSup).
--spec limit_reqs(non_neg_integer(), pid()) -> ok.
-limit_reqs(MaxConns, ReqsSup) ->
- Counts = supervisor:count_children(ReqsSup),
- Active = lists:keyfind(active, 1, Counts),
- case Active < MaxConns of
- true -> ok;
- false -> timer:sleep(1)
- end.
+-spec limit_reqs(pid(), non_neg_integer(), non_neg_integer()) -> ok.
+limit_reqs(_ListenerPid, NbConns, MaxConns) when NbConns =< MaxConns ->
+ ok;
+limit_reqs(ListenerPid, _NbConns, MaxConns) ->
+ cowboy_listener:wait(ListenerPid, default, MaxConns).
diff --git a/src/cowboy_acceptors_sup.erl b/src/cowboy_acceptors_sup.erl
index c12aeb5..3d57610 100644
--- a/src/cowboy_acceptors_sup.erl
+++ b/src/cowboy_acceptors_sup.erl
@@ -16,25 +16,28 @@
-module(cowboy_acceptors_sup).
-behaviour(supervisor).
--export([start_link/6]). %% API.
+-export([start_link/7]). %% API.
-export([init/1]). %% supervisor.
%% API.
--spec start_link(non_neg_integer(), module(), any(), module(), any(), pid())
- -> {ok, pid()}.
-start_link(NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ReqsPid) ->
- supervisor:start_link(?MODULE, [NbAcceptors,
- Transport, TransOpts, Protocol, ProtoOpts, ReqsPid]).
+-spec start_link(non_neg_integer(), module(), any(),
+ module(), any(), pid(), pid()) -> {ok, pid()}.
+start_link(NbAcceptors, Transport, TransOpts,
+ Protocol, ProtoOpts, ListenerPid, ReqsPid) ->
+ supervisor:start_link(?MODULE, [NbAcceptors, Transport, TransOpts,
+ Protocol, ProtoOpts, ListenerPid, ReqsPid]).
%% supervisor.
-spec init(list()) -> {ok, {{one_for_one, 10, 10}, list()}}.
-init([NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ReqsPid]) ->
+init([NbAcceptors, Transport, TransOpts,
+ Protocol, ProtoOpts, ListenerPid, ReqsPid]) ->
{ok, LSocket} = Transport:listen(TransOpts),
MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
Procs = [{{acceptor, self(), N}, {cowboy_acceptor, start_link, [
- LSocket, Transport, Protocol, ProtoOpts, MaxConns, ReqsPid
+ LSocket, Transport, Protocol, ProtoOpts,
+ MaxConns, ListenerPid, ReqsPid
]}, permanent, brutal_kill, worker, dynamic}
|| N <- lists:seq(1, NbAcceptors)],
{ok, {{one_for_one, 10, 10}, Procs}}.
diff --git a/src/cowboy_listener.erl b/src/cowboy_listener.erl
new file mode 100644
index 0000000..2f5ccc2
--- /dev/null
+++ b/src/cowboy_listener.erl
@@ -0,0 +1,151 @@
+%% Copyright (c) 2011, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% @doc Public API for managing listeners.
+-module(cowboy_listener).
+-behaviour(gen_server).
+
+-export([start_link/0, stop/1,
+ add_connection/3, remove_connection/2, wait/3]). %% API.
+-export([init/1, handle_call/3, handle_cast/2,
+ handle_info/2, terminate/2, code_change/3]). %% gen_server.
+
+-record(state, {
+ req_pools = [] :: [{atom(), non_neg_integer()}],
+ reqs_table :: ets:tid(),
+ queue = [] :: [{pid(), reference()}]
+}).
+
+%% API.
+
+%% @private
+-spec start_link() -> {ok, pid()}.
+start_link() ->
+ gen_server:start_link(?MODULE, [], []).
+
+%% @private
+-spec stop(pid()) -> stopped.
+stop(ServerPid) ->
+ gen_server:call(ServerPid, stop).
+
+%% @doc Add a connection to the given pool in the listener.
+%%
+%% Pools of connections are used to restrict the maximum number of connections
+%% depending on their type. By default, Cowboy add all connections to the
+%% pool <em>default</em>. It also checks for the maximum number of connections
+%% in that pool before accepting again.
+%%
+%% When a process managing a connection dies, the process is removed from the
+%% pool. If the socket has been sent to another process, it is up to the
+%% protocol code to inform the listener of the new <em>ConnPid</em> by removing
+%% the previous and adding the new one.
+-spec add_connection(pid(), atom(), pid()) -> {ok, non_neg_integer()}.
+add_connection(ServerPid, Pool, ConnPid) ->
+ gen_server:call(ServerPid, {add_connection, Pool, ConnPid}).
+
+%% @doc Remove the given connection from its pool.
+-spec remove_connection(pid(), pid()) -> ok.
+remove_connection(ServerPid, ConnPid) ->
+ gen_server:cast(ServerPid, {remove_connection, ConnPid}).
+
+%% @doc Wait until the number of connections in the given pool gets below
+%% the given threshold.
+%%
+%% This function will not return until the number of connections in the pool
+%% gets below <em>MaxConns</em>. It makes use of <em>gen_server:reply/2</em>
+%% to make the process wait for a reply indefinitely.
+-spec wait(pid(), atom(), non_neg_integer()) -> ok.
+wait(ServerPid, Pool, MaxConns) ->
+ gen_server:call(ServerPid, {wait, Pool, MaxConns}, infinity).
+
+%% gen_server.
+
+%% @private
+-spec init([]) -> {ok, #state{}}.
+init([]) ->
+ ReqsTablePid = ets:new(requests_table, [set, private]),
+ {ok, #state{reqs_table=ReqsTablePid}}.
+
+%% @private
+-spec handle_call(_, _, State)
+ -> {reply, ignored, State} | {stop, normal, stopped, State}.
+handle_call({add_connection, Pool, ConnPid}, _From, State=#state{
+ req_pools=Pools, reqs_table=ReqsTable}) ->
+ MonitorRef = erlang:monitor(process, ConnPid),
+ {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
+ false ->
+ {1, [{Pool, 1}|Pools]};
+ {Pool, NbConns} ->
+ NbConns2 = NbConns + 1,
+ {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
+ end,
+ ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}),
+ {reply, {ok, NbConnsRet}, State#state{req_pools=Pools2}};
+handle_call({wait, Pool, MaxConns}, From, State=#state{
+ req_pools=Pools, queue=Queue}) ->
+ case lists:keyfind(Pool, 1, Pools) of
+ {Pool, NbConns} when NbConns > MaxConns ->
+ {noreply, State#state{queue=[From|Queue]}};
+ _Any ->
+ {reply, ok, State}
+ end;
+handle_call(stop, _From, State) ->
+ {stop, normal, stopped, State};
+handle_call(_Request, _From, State) ->
+ {reply, ignored, State}.
+
+%% @private
+-spec handle_cast(_, State) -> {noreply, State}.
+handle_cast({remove_connection, ConnPid}, State) ->
+ State2 = remove_pid(ConnPid, State),
+ {noreply, State2};
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%% @private
+-spec handle_info(_, State) -> {noreply, State}.
+handle_info({'DOWN', _Ref, process, Pid, _Info}, State) ->
+ State2 = remove_pid(Pid, State),
+ {noreply, State2};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%% @private
+-spec terminate(_, _) -> ok.
+terminate(_Reason, _State) ->
+ ok.
+
+%% @private
+-spec code_change(_, State, _) -> {ok, State}.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%% Internal.
+
+%% @private
+-spec remove_pid(pid(), State) -> State.
+remove_pid(Pid, State=#state{
+ req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) ->
+ {MonitorRef, Pool} = ets:lookup_element(ReqsTable, Pid, 2),
+ erlang:demonitor(MonitorRef, [flush]),
+ {Pool, NbConns} = lists:keyfind(Pool, 1, Pools),
+ Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)],
+ ets:delete(ReqsTable, Pid),
+ case Queue of
+ [] ->
+ State#state{req_pools=Pools2};
+ [Client|Queue2] ->
+ gen_server:reply(Client, ok),
+ State#state{req_pools=Pools2, queue=Queue2}
+ end.
diff --git a/src/cowboy_listener_sup.erl b/src/cowboy_listener_sup.erl
index 0ed662f..adf5262 100644
--- a/src/cowboy_listener_sup.erl
+++ b/src/cowboy_listener_sup.erl
@@ -25,17 +25,21 @@
-> {ok, pid()}.
start_link(NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
{ok, SupPid} = supervisor:start_link(?MODULE, []),
+ {ok, ListenerPid} = supervisor:start_child(SupPid,
+ {cowboy_listener, {cowboy_listener, start_link, []},
+ permanent, 5000, worker, dynamic}),
{ok, ReqsPid} = supervisor:start_child(SupPid,
{cowboy_requests_sup, {cowboy_requests_sup, start_link, []},
permanent, 5000, supervisor, [cowboy_requests_sup]}),
{ok, _PoolPid} = supervisor:start_child(SupPid,
{cowboy_acceptors_sup, {cowboy_acceptors_sup, start_link, [
- NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ReqsPid
+ NbAcceptors, Transport, TransOpts,
+ Protocol, ProtoOpts, ListenerPid, ReqsPid
]}, permanent, 5000, supervisor, [cowboy_acceptors_sup]}),
{ok, SupPid}.
%% supervisor.
--spec init([]) -> {ok, {{one_for_one, 0, 1}, []}}.
+-spec init([]) -> {ok, {{one_for_all, 10, 10}, []}}.
init([]) ->
- {ok, {{one_for_one, 0, 1}, []}}.
+ {ok, {{one_for_all, 10, 10}, []}}.