From 830cfc002e992126a2c605594f25d40a5a193968 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 30 Jan 2012 08:09:33 +0100 Subject: Move max_connections check directly inside cowboy_listener This is a big change in the internal cowboy API. This should not have any impact on existing applications as only the acceptor is expected to use these API calls. The function cowboy_listener:wait/3 has been removed. max_connections checking now occurs directly in cowboy_listener:add_connection/3. If the pool is full and the acceptor has to wait, then it doesn't return, waiting for a free space to be available. To accomodate these changes, it is now cowboy_listener that will inform the new connection that it is ready by sending {shoot, self()}. This should be a great improvement to the latency of responses as there is one less message to wait for before the request process can do its work. Overall the performance under heavy load should also be improved as we greatly reduce the number of messages sent between the acceptor and the listener process. --- src/cowboy_acceptor.erl | 28 ++++++++-------------- src/cowboy_acceptors_sup.erl | 3 +-- src/cowboy_listener.erl | 55 ++++++++++++++++++-------------------------- src/cowboy_listener_sup.erl | 3 ++- 4 files changed, 36 insertions(+), 53 deletions(-) (limited to 'src') diff --git a/src/cowboy_acceptor.erl b/src/cowboy_acceptor.erl index 4cb9fa7..d46e212 100644 --- a/src/cowboy_acceptor.erl +++ b/src/cowboy_acceptor.erl @@ -15,33 +15,31 @@ %% @private -module(cowboy_acceptor). --export([start_link/7]). %% API. --export([acceptor/7]). %% Internal. +-export([start_link/6]). %% API. +-export([acceptor/6]). %% Internal. %% API. -spec start_link(inet:socket(), module(), module(), any(), - non_neg_integer(), pid(), pid()) -> {ok, pid()}. + pid(), pid()) -> {ok, pid()}. start_link(LSocket, Transport, Protocol, Opts, - MaxConns, ListenerPid, ReqsSup) -> + ListenerPid, ReqsSup) -> Pid = spawn_link(?MODULE, acceptor, - [LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup]), + [LSocket, Transport, Protocol, Opts, ListenerPid, ReqsSup]), {ok, Pid}. %% Internal. -spec acceptor(inet:socket(), module(), module(), any(), - non_neg_integer(), pid(), pid()) -> no_return(). -acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) -> + pid(), pid()) -> no_return(). +acceptor(LSocket, Transport, Protocol, Opts, ListenerPid, ReqsSup) -> case Transport:accept(LSocket, 2000) of {ok, CSocket} -> {ok, Pid} = supervisor:start_child(ReqsSup, [ListenerPid, CSocket, Transport, Protocol, Opts]), Transport:controlling_process(CSocket, Pid), - {ok, NbConns} = cowboy_listener:add_connection(ListenerPid, - default, Pid), - Pid ! {shoot, ListenerPid}, - limit_reqs(ListenerPid, NbConns, MaxConns); + ok = cowboy_listener:add_connection(ListenerPid, + default, Pid); {error, timeout} -> ignore; {error, _Reason} -> @@ -50,10 +48,4 @@ acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) -> ignore end, ?MODULE:acceptor(LSocket, Transport, Protocol, Opts, - MaxConns, ListenerPid, ReqsSup). - --spec limit_reqs(pid(), non_neg_integer(), non_neg_integer()) -> ok. -limit_reqs(_ListenerPid, NbConns, MaxConns) when NbConns =< MaxConns -> - ok; -limit_reqs(ListenerPid, _NbConns, MaxConns) -> - cowboy_listener:wait(ListenerPid, default, MaxConns). + ListenerPid, ReqsSup). diff --git a/src/cowboy_acceptors_sup.erl b/src/cowboy_acceptors_sup.erl index 17849a6..625028c 100644 --- a/src/cowboy_acceptors_sup.erl +++ b/src/cowboy_acceptors_sup.erl @@ -34,10 +34,9 @@ start_link(NbAcceptors, Transport, TransOpts, init([NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ListenerPid, ReqsPid]) -> {ok, LSocket} = Transport:listen(TransOpts), - MaxConns = proplists:get_value(max_connections, TransOpts, 1024), Procs = [{{acceptor, self(), N}, {cowboy_acceptor, start_link, [ LSocket, Transport, Protocol, ProtoOpts, - MaxConns, ListenerPid, ReqsPid + ListenerPid, ReqsPid ]}, permanent, brutal_kill, worker, []} || N <- lists:seq(1, NbAcceptors)], {ok, {{one_for_one, 10, 10}, Procs}}. diff --git a/src/cowboy_listener.erl b/src/cowboy_listener.erl index c19d079..8998e13 100644 --- a/src/cowboy_listener.erl +++ b/src/cowboy_listener.erl @@ -16,15 +16,16 @@ -module(cowboy_listener). -behaviour(gen_server). --export([start_link/0, stop/1, - add_connection/3, move_connection/3, remove_connection/2, wait/3]). %% API. +-export([start_link/1, stop/1, + add_connection/3, move_connection/3, remove_connection/2]). %% API. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% gen_server. -record(state, { req_pools = [] :: [{atom(), non_neg_integer()}], reqs_table :: ets:tid(), - queue = [] :: [{pid(), reference()}] + queue = [] :: [{pid(), reference()}], + max_conns = undefined :: non_neg_integer() }). %% API. @@ -36,9 +37,10 @@ %% Setting the process priority to high ensures the connection-related code %% will always be executed when a connection needs it, allowing Cowboy to %% scale far beyond what it would with a normal priority. --spec start_link() -> {ok, pid()}. -start_link() -> - gen_server:start_link(?MODULE, [], [{spawn_opt, [{priority, high}]}]). +-spec start_link(non_neg_integer()) -> {ok, pid()}. +start_link(MaxConns) -> + gen_server:start_link(?MODULE, [MaxConns], + [{spawn_opt, [{priority, high}]}]). %% @private -spec stop(pid()) -> stopped. @@ -50,15 +52,16 @@ stop(ServerPid) -> %% Pools of connections are used to restrict the maximum number of connections %% depending on their type. By default, Cowboy add all connections to the %% pool default. It also checks for the maximum number of connections -%% in that pool before accepting again. +%% in that pool before accepting again. This function only returns when there +%% is free space in the pool. %% %% When a process managing a connection dies, the process is removed from the %% pool. If the socket has been sent to another process, it is up to the %% protocol code to inform the listener of the new ConnPid by removing %% the previous and adding the new one. --spec add_connection(pid(), atom(), pid()) -> {ok, non_neg_integer()}. +-spec add_connection(pid(), atom(), pid()) -> ok. add_connection(ServerPid, Pool, ConnPid) -> - gen_server:call(ServerPid, {add_connection, Pool, ConnPid}). + gen_server:call(ServerPid, {add_connection, Pool, ConnPid}, infinity). %% @doc Move a connection from one pool to another. -spec move_connection(pid(), atom(), pid()) -> ok. @@ -70,30 +73,22 @@ move_connection(ServerPid, DestPool, ConnPid) -> remove_connection(ServerPid, ConnPid) -> gen_server:cast(ServerPid, {remove_connection, ConnPid}). -%% @doc Wait until the number of connections in the given pool gets below -%% the given threshold. -%% -%% This function will not return until the number of connections in the pool -%% gets below MaxConns. It makes use of gen_server:reply/2 -%% to make the process wait for a reply indefinitely. --spec wait(pid(), atom(), non_neg_integer()) -> ok. -wait(ServerPid, Pool, MaxConns) -> - gen_server:call(ServerPid, {wait, Pool, MaxConns}, infinity). - %% gen_server. %% @private --spec init([]) -> {ok, #state{}}. -init([]) -> +-spec init(list()) -> {ok, #state{}}. +init([MaxConns]) -> ReqsTablePid = ets:new(requests_table, [set, private]), - {ok, #state{reqs_table=ReqsTablePid}}. + {ok, #state{reqs_table=ReqsTablePid, max_conns=MaxConns}}. %% @private -spec handle_call(_, _, State) -> {reply, ignored, State} | {stop, normal, stopped, State}. -handle_call({add_connection, Pool, ConnPid}, _From, State=#state{ - req_pools=Pools, reqs_table=ReqsTable}) -> +handle_call({add_connection, Pool, ConnPid}, From, State=#state{ + req_pools=Pools, reqs_table=ReqsTable, + queue=Queue, max_conns=MaxConns}) -> MonitorRef = erlang:monitor(process, ConnPid), + ConnPid ! {shoot, self()}, {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of false -> {1, [{Pool, 1}|Pools]}; @@ -102,14 +97,10 @@ handle_call({add_connection, Pool, ConnPid}, _From, State=#state{ {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} end, ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}), - {reply, {ok, NbConnsRet}, State#state{req_pools=Pools2}}; -handle_call({wait, Pool, MaxConns}, From, State=#state{ - req_pools=Pools, queue=Queue}) -> - case lists:keyfind(Pool, 1, Pools) of - {Pool, NbConns} when NbConns > MaxConns -> - {noreply, State#state{queue=[From|Queue]}}; - _Any -> - {reply, ok, State} + if NbConnsRet > MaxConns -> + {noreply, State#state{req_pools=Pools2, queue=[From|Queue]}}; + true -> + {reply, ok, State#state{req_pools=Pools2}} end; handle_call(stop, _From, State) -> {stop, normal, stopped, State}; diff --git a/src/cowboy_listener_sup.erl b/src/cowboy_listener_sup.erl index aca2b0b..1304cbc 100644 --- a/src/cowboy_listener_sup.erl +++ b/src/cowboy_listener_sup.erl @@ -24,9 +24,10 @@ -spec start_link(non_neg_integer(), module(), any(), module(), any()) -> {ok, pid()}. start_link(NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> + MaxConns = proplists:get_value(max_connections, TransOpts, 1024), {ok, SupPid} = supervisor:start_link(?MODULE, []), {ok, ListenerPid} = supervisor:start_child(SupPid, - {cowboy_listener, {cowboy_listener, start_link, []}, + {cowboy_listener, {cowboy_listener, start_link, [MaxConns]}, permanent, 5000, worker, [cowboy_listener]}), {ok, ReqsPid} = supervisor:start_child(SupPid, {cowboy_requests_sup, {cowboy_requests_sup, start_link, []}, -- cgit v1.2.3