aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2012-01-30 08:09:33 +0100
committerLoïc Hoguin <[email protected]>2012-01-30 08:09:33 +0100
commit830cfc002e992126a2c605594f25d40a5a193968 (patch)
treea07bb8c0850b39e9ca93660659b1714025ea8ccc
parentbb08cf85e38f02d5785b2a23c38d3b4c8020989c (diff)
downloadcowboy-830cfc002e992126a2c605594f25d40a5a193968.tar.gz
cowboy-830cfc002e992126a2c605594f25d40a5a193968.tar.bz2
cowboy-830cfc002e992126a2c605594f25d40a5a193968.zip
Move max_connections check directly inside cowboy_listener
This is a big change in the internal cowboy API. This should not have any impact on existing applications as only the acceptor is expected to use these API calls. The function cowboy_listener:wait/3 has been removed. max_connections checking now occurs directly in cowboy_listener:add_connection/3. If the pool is full and the acceptor has to wait, then it doesn't return, waiting for a free space to be available. To accomodate these changes, it is now cowboy_listener that will inform the new connection that it is ready by sending {shoot, self()}. This should be a great improvement to the latency of responses as there is one less message to wait for before the request process can do its work. Overall the performance under heavy load should also be improved as we greatly reduce the number of messages sent between the acceptor and the listener process.
-rw-r--r--src/cowboy_acceptor.erl28
-rw-r--r--src/cowboy_acceptors_sup.erl3
-rw-r--r--src/cowboy_listener.erl55
-rw-r--r--src/cowboy_listener_sup.erl3
4 files changed, 36 insertions, 53 deletions
diff --git a/src/cowboy_acceptor.erl b/src/cowboy_acceptor.erl
index 4cb9fa7..d46e212 100644
--- a/src/cowboy_acceptor.erl
+++ b/src/cowboy_acceptor.erl
@@ -15,33 +15,31 @@
%% @private
-module(cowboy_acceptor).
--export([start_link/7]). %% API.
--export([acceptor/7]). %% Internal.
+-export([start_link/6]). %% API.
+-export([acceptor/6]). %% Internal.
%% API.
-spec start_link(inet:socket(), module(), module(), any(),
- non_neg_integer(), pid(), pid()) -> {ok, pid()}.
+ pid(), pid()) -> {ok, pid()}.
start_link(LSocket, Transport, Protocol, Opts,
- MaxConns, ListenerPid, ReqsSup) ->
+ ListenerPid, ReqsSup) ->
Pid = spawn_link(?MODULE, acceptor,
- [LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup]),
+ [LSocket, Transport, Protocol, Opts, ListenerPid, ReqsSup]),
{ok, Pid}.
%% Internal.
-spec acceptor(inet:socket(), module(), module(), any(),
- non_neg_integer(), pid(), pid()) -> no_return().
-acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) ->
+ pid(), pid()) -> no_return().
+acceptor(LSocket, Transport, Protocol, Opts, ListenerPid, ReqsSup) ->
case Transport:accept(LSocket, 2000) of
{ok, CSocket} ->
{ok, Pid} = supervisor:start_child(ReqsSup,
[ListenerPid, CSocket, Transport, Protocol, Opts]),
Transport:controlling_process(CSocket, Pid),
- {ok, NbConns} = cowboy_listener:add_connection(ListenerPid,
- default, Pid),
- Pid ! {shoot, ListenerPid},
- limit_reqs(ListenerPid, NbConns, MaxConns);
+ ok = cowboy_listener:add_connection(ListenerPid,
+ default, Pid);
{error, timeout} ->
ignore;
{error, _Reason} ->
@@ -50,10 +48,4 @@ acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) ->
ignore
end,
?MODULE:acceptor(LSocket, Transport, Protocol, Opts,
- MaxConns, ListenerPid, ReqsSup).
-
--spec limit_reqs(pid(), non_neg_integer(), non_neg_integer()) -> ok.
-limit_reqs(_ListenerPid, NbConns, MaxConns) when NbConns =< MaxConns ->
- ok;
-limit_reqs(ListenerPid, _NbConns, MaxConns) ->
- cowboy_listener:wait(ListenerPid, default, MaxConns).
+ ListenerPid, ReqsSup).
diff --git a/src/cowboy_acceptors_sup.erl b/src/cowboy_acceptors_sup.erl
index 17849a6..625028c 100644
--- a/src/cowboy_acceptors_sup.erl
+++ b/src/cowboy_acceptors_sup.erl
@@ -34,10 +34,9 @@ start_link(NbAcceptors, Transport, TransOpts,
init([NbAcceptors, Transport, TransOpts,
Protocol, ProtoOpts, ListenerPid, ReqsPid]) ->
{ok, LSocket} = Transport:listen(TransOpts),
- MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
Procs = [{{acceptor, self(), N}, {cowboy_acceptor, start_link, [
LSocket, Transport, Protocol, ProtoOpts,
- MaxConns, ListenerPid, ReqsPid
+ ListenerPid, ReqsPid
]}, permanent, brutal_kill, worker, []}
|| N <- lists:seq(1, NbAcceptors)],
{ok, {{one_for_one, 10, 10}, Procs}}.
diff --git a/src/cowboy_listener.erl b/src/cowboy_listener.erl
index c19d079..8998e13 100644
--- a/src/cowboy_listener.erl
+++ b/src/cowboy_listener.erl
@@ -16,15 +16,16 @@
-module(cowboy_listener).
-behaviour(gen_server).
--export([start_link/0, stop/1,
- add_connection/3, move_connection/3, remove_connection/2, wait/3]). %% API.
+-export([start_link/1, stop/1,
+ add_connection/3, move_connection/3, remove_connection/2]). %% API.
-export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]). %% gen_server.
-record(state, {
req_pools = [] :: [{atom(), non_neg_integer()}],
reqs_table :: ets:tid(),
- queue = [] :: [{pid(), reference()}]
+ queue = [] :: [{pid(), reference()}],
+ max_conns = undefined :: non_neg_integer()
}).
%% API.
@@ -36,9 +37,10 @@
%% Setting the process priority to high ensures the connection-related code
%% will always be executed when a connection needs it, allowing Cowboy to
%% scale far beyond what it would with a normal priority.
--spec start_link() -> {ok, pid()}.
-start_link() ->
- gen_server:start_link(?MODULE, [], [{spawn_opt, [{priority, high}]}]).
+-spec start_link(non_neg_integer()) -> {ok, pid()}.
+start_link(MaxConns) ->
+ gen_server:start_link(?MODULE, [MaxConns],
+ [{spawn_opt, [{priority, high}]}]).
%% @private
-spec stop(pid()) -> stopped.
@@ -50,15 +52,16 @@ stop(ServerPid) ->
%% Pools of connections are used to restrict the maximum number of connections
%% depending on their type. By default, Cowboy add all connections to the
%% pool <em>default</em>. It also checks for the maximum number of connections
-%% in that pool before accepting again.
+%% in that pool before accepting again. This function only returns when there
+%% is free space in the pool.
%%
%% When a process managing a connection dies, the process is removed from the
%% pool. If the socket has been sent to another process, it is up to the
%% protocol code to inform the listener of the new <em>ConnPid</em> by removing
%% the previous and adding the new one.
--spec add_connection(pid(), atom(), pid()) -> {ok, non_neg_integer()}.
+-spec add_connection(pid(), atom(), pid()) -> ok.
add_connection(ServerPid, Pool, ConnPid) ->
- gen_server:call(ServerPid, {add_connection, Pool, ConnPid}).
+ gen_server:call(ServerPid, {add_connection, Pool, ConnPid}, infinity).
%% @doc Move a connection from one pool to another.
-spec move_connection(pid(), atom(), pid()) -> ok.
@@ -70,30 +73,22 @@ move_connection(ServerPid, DestPool, ConnPid) ->
remove_connection(ServerPid, ConnPid) ->
gen_server:cast(ServerPid, {remove_connection, ConnPid}).
-%% @doc Wait until the number of connections in the given pool gets below
-%% the given threshold.
-%%
-%% This function will not return until the number of connections in the pool
-%% gets below <em>MaxConns</em>. It makes use of <em>gen_server:reply/2</em>
-%% to make the process wait for a reply indefinitely.
--spec wait(pid(), atom(), non_neg_integer()) -> ok.
-wait(ServerPid, Pool, MaxConns) ->
- gen_server:call(ServerPid, {wait, Pool, MaxConns}, infinity).
-
%% gen_server.
%% @private
--spec init([]) -> {ok, #state{}}.
-init([]) ->
+-spec init(list()) -> {ok, #state{}}.
+init([MaxConns]) ->
ReqsTablePid = ets:new(requests_table, [set, private]),
- {ok, #state{reqs_table=ReqsTablePid}}.
+ {ok, #state{reqs_table=ReqsTablePid, max_conns=MaxConns}}.
%% @private
-spec handle_call(_, _, State)
-> {reply, ignored, State} | {stop, normal, stopped, State}.
-handle_call({add_connection, Pool, ConnPid}, _From, State=#state{
- req_pools=Pools, reqs_table=ReqsTable}) ->
+handle_call({add_connection, Pool, ConnPid}, From, State=#state{
+ req_pools=Pools, reqs_table=ReqsTable,
+ queue=Queue, max_conns=MaxConns}) ->
MonitorRef = erlang:monitor(process, ConnPid),
+ ConnPid ! {shoot, self()},
{NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
false ->
{1, [{Pool, 1}|Pools]};
@@ -102,14 +97,10 @@ handle_call({add_connection, Pool, ConnPid}, _From, State=#state{
{NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
end,
ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}),
- {reply, {ok, NbConnsRet}, State#state{req_pools=Pools2}};
-handle_call({wait, Pool, MaxConns}, From, State=#state{
- req_pools=Pools, queue=Queue}) ->
- case lists:keyfind(Pool, 1, Pools) of
- {Pool, NbConns} when NbConns > MaxConns ->
- {noreply, State#state{queue=[From|Queue]}};
- _Any ->
- {reply, ok, State}
+ if NbConnsRet > MaxConns ->
+ {noreply, State#state{req_pools=Pools2, queue=[From|Queue]}};
+ true ->
+ {reply, ok, State#state{req_pools=Pools2}}
end;
handle_call(stop, _From, State) ->
{stop, normal, stopped, State};
diff --git a/src/cowboy_listener_sup.erl b/src/cowboy_listener_sup.erl
index aca2b0b..1304cbc 100644
--- a/src/cowboy_listener_sup.erl
+++ b/src/cowboy_listener_sup.erl
@@ -24,9 +24,10 @@
-spec start_link(non_neg_integer(), module(), any(), module(), any())
-> {ok, pid()}.
start_link(NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
+ MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
{ok, SupPid} = supervisor:start_link(?MODULE, []),
{ok, ListenerPid} = supervisor:start_child(SupPid,
- {cowboy_listener, {cowboy_listener, start_link, []},
+ {cowboy_listener, {cowboy_listener, start_link, [MaxConns]},
permanent, 5000, worker, [cowboy_listener]}),
{ok, ReqsPid} = supervisor:start_child(SupPid,
{cowboy_requests_sup, {cowboy_requests_sup, start_link, []},