diff options
author | Loïc Hoguin <[email protected]> | 2012-01-30 08:09:33 +0100 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2012-01-30 08:09:33 +0100 |
commit | 830cfc002e992126a2c605594f25d40a5a193968 (patch) | |
tree | a07bb8c0850b39e9ca93660659b1714025ea8ccc | |
parent | bb08cf85e38f02d5785b2a23c38d3b4c8020989c (diff) | |
download | cowboy-830cfc002e992126a2c605594f25d40a5a193968.tar.gz cowboy-830cfc002e992126a2c605594f25d40a5a193968.tar.bz2 cowboy-830cfc002e992126a2c605594f25d40a5a193968.zip |
Move max_connections check directly inside cowboy_listener
This is a big change in the internal cowboy API. This should not
have any impact on existing applications as only the acceptor is
expected to use these API calls.
The function cowboy_listener:wait/3 has been removed. max_connections
checking now occurs directly in cowboy_listener:add_connection/3.
If the pool is full and the acceptor has to wait, then it doesn't
return, waiting for a free space to be available.
To accomodate these changes, it is now cowboy_listener that will
inform the new connection that it is ready by sending {shoot, self()}.
This should be a great improvement to the latency of responses as
there is one less message to wait for before the request process
can do its work.
Overall the performance under heavy load should also be improved as
we greatly reduce the number of messages sent between the acceptor
and the listener process.
-rw-r--r-- | src/cowboy_acceptor.erl | 28 | ||||
-rw-r--r-- | src/cowboy_acceptors_sup.erl | 3 | ||||
-rw-r--r-- | src/cowboy_listener.erl | 55 | ||||
-rw-r--r-- | src/cowboy_listener_sup.erl | 3 |
4 files changed, 36 insertions, 53 deletions
diff --git a/src/cowboy_acceptor.erl b/src/cowboy_acceptor.erl index 4cb9fa7..d46e212 100644 --- a/src/cowboy_acceptor.erl +++ b/src/cowboy_acceptor.erl @@ -15,33 +15,31 @@ %% @private -module(cowboy_acceptor). --export([start_link/7]). %% API. --export([acceptor/7]). %% Internal. +-export([start_link/6]). %% API. +-export([acceptor/6]). %% Internal. %% API. -spec start_link(inet:socket(), module(), module(), any(), - non_neg_integer(), pid(), pid()) -> {ok, pid()}. + pid(), pid()) -> {ok, pid()}. start_link(LSocket, Transport, Protocol, Opts, - MaxConns, ListenerPid, ReqsSup) -> + ListenerPid, ReqsSup) -> Pid = spawn_link(?MODULE, acceptor, - [LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup]), + [LSocket, Transport, Protocol, Opts, ListenerPid, ReqsSup]), {ok, Pid}. %% Internal. -spec acceptor(inet:socket(), module(), module(), any(), - non_neg_integer(), pid(), pid()) -> no_return(). -acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) -> + pid(), pid()) -> no_return(). +acceptor(LSocket, Transport, Protocol, Opts, ListenerPid, ReqsSup) -> case Transport:accept(LSocket, 2000) of {ok, CSocket} -> {ok, Pid} = supervisor:start_child(ReqsSup, [ListenerPid, CSocket, Transport, Protocol, Opts]), Transport:controlling_process(CSocket, Pid), - {ok, NbConns} = cowboy_listener:add_connection(ListenerPid, - default, Pid), - Pid ! {shoot, ListenerPid}, - limit_reqs(ListenerPid, NbConns, MaxConns); + ok = cowboy_listener:add_connection(ListenerPid, + default, Pid); {error, timeout} -> ignore; {error, _Reason} -> @@ -50,10 +48,4 @@ acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) -> ignore end, ?MODULE:acceptor(LSocket, Transport, Protocol, Opts, - MaxConns, ListenerPid, ReqsSup). - --spec limit_reqs(pid(), non_neg_integer(), non_neg_integer()) -> ok. -limit_reqs(_ListenerPid, NbConns, MaxConns) when NbConns =< MaxConns -> - ok; -limit_reqs(ListenerPid, _NbConns, MaxConns) -> - cowboy_listener:wait(ListenerPid, default, MaxConns). + ListenerPid, ReqsSup). diff --git a/src/cowboy_acceptors_sup.erl b/src/cowboy_acceptors_sup.erl index 17849a6..625028c 100644 --- a/src/cowboy_acceptors_sup.erl +++ b/src/cowboy_acceptors_sup.erl @@ -34,10 +34,9 @@ start_link(NbAcceptors, Transport, TransOpts, init([NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ListenerPid, ReqsPid]) -> {ok, LSocket} = Transport:listen(TransOpts), - MaxConns = proplists:get_value(max_connections, TransOpts, 1024), Procs = [{{acceptor, self(), N}, {cowboy_acceptor, start_link, [ LSocket, Transport, Protocol, ProtoOpts, - MaxConns, ListenerPid, ReqsPid + ListenerPid, ReqsPid ]}, permanent, brutal_kill, worker, []} || N <- lists:seq(1, NbAcceptors)], {ok, {{one_for_one, 10, 10}, Procs}}. diff --git a/src/cowboy_listener.erl b/src/cowboy_listener.erl index c19d079..8998e13 100644 --- a/src/cowboy_listener.erl +++ b/src/cowboy_listener.erl @@ -16,15 +16,16 @@ -module(cowboy_listener). -behaviour(gen_server). --export([start_link/0, stop/1, - add_connection/3, move_connection/3, remove_connection/2, wait/3]). %% API. +-export([start_link/1, stop/1, + add_connection/3, move_connection/3, remove_connection/2]). %% API. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% gen_server. -record(state, { req_pools = [] :: [{atom(), non_neg_integer()}], reqs_table :: ets:tid(), - queue = [] :: [{pid(), reference()}] + queue = [] :: [{pid(), reference()}], + max_conns = undefined :: non_neg_integer() }). %% API. @@ -36,9 +37,10 @@ %% Setting the process priority to high ensures the connection-related code %% will always be executed when a connection needs it, allowing Cowboy to %% scale far beyond what it would with a normal priority. --spec start_link() -> {ok, pid()}. -start_link() -> - gen_server:start_link(?MODULE, [], [{spawn_opt, [{priority, high}]}]). +-spec start_link(non_neg_integer()) -> {ok, pid()}. +start_link(MaxConns) -> + gen_server:start_link(?MODULE, [MaxConns], + [{spawn_opt, [{priority, high}]}]). %% @private -spec stop(pid()) -> stopped. @@ -50,15 +52,16 @@ stop(ServerPid) -> %% Pools of connections are used to restrict the maximum number of connections %% depending on their type. By default, Cowboy add all connections to the %% pool <em>default</em>. It also checks for the maximum number of connections -%% in that pool before accepting again. +%% in that pool before accepting again. This function only returns when there +%% is free space in the pool. %% %% When a process managing a connection dies, the process is removed from the %% pool. If the socket has been sent to another process, it is up to the %% protocol code to inform the listener of the new <em>ConnPid</em> by removing %% the previous and adding the new one. --spec add_connection(pid(), atom(), pid()) -> {ok, non_neg_integer()}. +-spec add_connection(pid(), atom(), pid()) -> ok. add_connection(ServerPid, Pool, ConnPid) -> - gen_server:call(ServerPid, {add_connection, Pool, ConnPid}). + gen_server:call(ServerPid, {add_connection, Pool, ConnPid}, infinity). %% @doc Move a connection from one pool to another. -spec move_connection(pid(), atom(), pid()) -> ok. @@ -70,30 +73,22 @@ move_connection(ServerPid, DestPool, ConnPid) -> remove_connection(ServerPid, ConnPid) -> gen_server:cast(ServerPid, {remove_connection, ConnPid}). -%% @doc Wait until the number of connections in the given pool gets below -%% the given threshold. -%% -%% This function will not return until the number of connections in the pool -%% gets below <em>MaxConns</em>. It makes use of <em>gen_server:reply/2</em> -%% to make the process wait for a reply indefinitely. --spec wait(pid(), atom(), non_neg_integer()) -> ok. -wait(ServerPid, Pool, MaxConns) -> - gen_server:call(ServerPid, {wait, Pool, MaxConns}, infinity). - %% gen_server. %% @private --spec init([]) -> {ok, #state{}}. -init([]) -> +-spec init(list()) -> {ok, #state{}}. +init([MaxConns]) -> ReqsTablePid = ets:new(requests_table, [set, private]), - {ok, #state{reqs_table=ReqsTablePid}}. + {ok, #state{reqs_table=ReqsTablePid, max_conns=MaxConns}}. %% @private -spec handle_call(_, _, State) -> {reply, ignored, State} | {stop, normal, stopped, State}. -handle_call({add_connection, Pool, ConnPid}, _From, State=#state{ - req_pools=Pools, reqs_table=ReqsTable}) -> +handle_call({add_connection, Pool, ConnPid}, From, State=#state{ + req_pools=Pools, reqs_table=ReqsTable, + queue=Queue, max_conns=MaxConns}) -> MonitorRef = erlang:monitor(process, ConnPid), + ConnPid ! {shoot, self()}, {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of false -> {1, [{Pool, 1}|Pools]}; @@ -102,14 +97,10 @@ handle_call({add_connection, Pool, ConnPid}, _From, State=#state{ {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} end, ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}), - {reply, {ok, NbConnsRet}, State#state{req_pools=Pools2}}; -handle_call({wait, Pool, MaxConns}, From, State=#state{ - req_pools=Pools, queue=Queue}) -> - case lists:keyfind(Pool, 1, Pools) of - {Pool, NbConns} when NbConns > MaxConns -> - {noreply, State#state{queue=[From|Queue]}}; - _Any -> - {reply, ok, State} + if NbConnsRet > MaxConns -> + {noreply, State#state{req_pools=Pools2, queue=[From|Queue]}}; + true -> + {reply, ok, State#state{req_pools=Pools2}} end; handle_call(stop, _From, State) -> {stop, normal, stopped, State}; diff --git a/src/cowboy_listener_sup.erl b/src/cowboy_listener_sup.erl index aca2b0b..1304cbc 100644 --- a/src/cowboy_listener_sup.erl +++ b/src/cowboy_listener_sup.erl @@ -24,9 +24,10 @@ -spec start_link(non_neg_integer(), module(), any(), module(), any()) -> {ok, pid()}. start_link(NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> + MaxConns = proplists:get_value(max_connections, TransOpts, 1024), {ok, SupPid} = supervisor:start_link(?MODULE, []), {ok, ListenerPid} = supervisor:start_child(SupPid, - {cowboy_listener, {cowboy_listener, start_link, []}, + {cowboy_listener, {cowboy_listener, start_link, [MaxConns]}, permanent, 5000, worker, [cowboy_listener]}), {ok, ReqsPid} = supervisor:start_child(SupPid, {cowboy_requests_sup, {cowboy_requests_sup, start_link, []}, |