diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cowboy_acceptor.erl | 28 | ||||
-rw-r--r-- | src/cowboy_acceptors_sup.erl | 3 | ||||
-rw-r--r-- | src/cowboy_listener.erl | 55 | ||||
-rw-r--r-- | src/cowboy_listener_sup.erl | 3 |
4 files changed, 36 insertions, 53 deletions
diff --git a/src/cowboy_acceptor.erl b/src/cowboy_acceptor.erl index 4cb9fa7..d46e212 100644 --- a/src/cowboy_acceptor.erl +++ b/src/cowboy_acceptor.erl @@ -15,33 +15,31 @@ %% @private -module(cowboy_acceptor). --export([start_link/7]). %% API. --export([acceptor/7]). %% Internal. +-export([start_link/6]). %% API. +-export([acceptor/6]). %% Internal. %% API. -spec start_link(inet:socket(), module(), module(), any(), - non_neg_integer(), pid(), pid()) -> {ok, pid()}. + pid(), pid()) -> {ok, pid()}. start_link(LSocket, Transport, Protocol, Opts, - MaxConns, ListenerPid, ReqsSup) -> + ListenerPid, ReqsSup) -> Pid = spawn_link(?MODULE, acceptor, - [LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup]), + [LSocket, Transport, Protocol, Opts, ListenerPid, ReqsSup]), {ok, Pid}. %% Internal. -spec acceptor(inet:socket(), module(), module(), any(), - non_neg_integer(), pid(), pid()) -> no_return(). -acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) -> + pid(), pid()) -> no_return(). +acceptor(LSocket, Transport, Protocol, Opts, ListenerPid, ReqsSup) -> case Transport:accept(LSocket, 2000) of {ok, CSocket} -> {ok, Pid} = supervisor:start_child(ReqsSup, [ListenerPid, CSocket, Transport, Protocol, Opts]), Transport:controlling_process(CSocket, Pid), - {ok, NbConns} = cowboy_listener:add_connection(ListenerPid, - default, Pid), - Pid ! {shoot, ListenerPid}, - limit_reqs(ListenerPid, NbConns, MaxConns); + ok = cowboy_listener:add_connection(ListenerPid, + default, Pid); {error, timeout} -> ignore; {error, _Reason} -> @@ -50,10 +48,4 @@ acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) -> ignore end, ?MODULE:acceptor(LSocket, Transport, Protocol, Opts, - MaxConns, ListenerPid, ReqsSup). - --spec limit_reqs(pid(), non_neg_integer(), non_neg_integer()) -> ok. -limit_reqs(_ListenerPid, NbConns, MaxConns) when NbConns =< MaxConns -> - ok; -limit_reqs(ListenerPid, _NbConns, MaxConns) -> - cowboy_listener:wait(ListenerPid, default, MaxConns). + ListenerPid, ReqsSup). diff --git a/src/cowboy_acceptors_sup.erl b/src/cowboy_acceptors_sup.erl index 17849a6..625028c 100644 --- a/src/cowboy_acceptors_sup.erl +++ b/src/cowboy_acceptors_sup.erl @@ -34,10 +34,9 @@ start_link(NbAcceptors, Transport, TransOpts, init([NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ListenerPid, ReqsPid]) -> {ok, LSocket} = Transport:listen(TransOpts), - MaxConns = proplists:get_value(max_connections, TransOpts, 1024), Procs = [{{acceptor, self(), N}, {cowboy_acceptor, start_link, [ LSocket, Transport, Protocol, ProtoOpts, - MaxConns, ListenerPid, ReqsPid + ListenerPid, ReqsPid ]}, permanent, brutal_kill, worker, []} || N <- lists:seq(1, NbAcceptors)], {ok, {{one_for_one, 10, 10}, Procs}}. diff --git a/src/cowboy_listener.erl b/src/cowboy_listener.erl index c19d079..8998e13 100644 --- a/src/cowboy_listener.erl +++ b/src/cowboy_listener.erl @@ -16,15 +16,16 @@ -module(cowboy_listener). -behaviour(gen_server). --export([start_link/0, stop/1, - add_connection/3, move_connection/3, remove_connection/2, wait/3]). %% API. +-export([start_link/1, stop/1, + add_connection/3, move_connection/3, remove_connection/2]). %% API. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% gen_server. -record(state, { req_pools = [] :: [{atom(), non_neg_integer()}], reqs_table :: ets:tid(), - queue = [] :: [{pid(), reference()}] + queue = [] :: [{pid(), reference()}], + max_conns = undefined :: non_neg_integer() }). %% API. @@ -36,9 +37,10 @@ %% Setting the process priority to high ensures the connection-related code %% will always be executed when a connection needs it, allowing Cowboy to %% scale far beyond what it would with a normal priority. --spec start_link() -> {ok, pid()}. -start_link() -> - gen_server:start_link(?MODULE, [], [{spawn_opt, [{priority, high}]}]). +-spec start_link(non_neg_integer()) -> {ok, pid()}. +start_link(MaxConns) -> + gen_server:start_link(?MODULE, [MaxConns], + [{spawn_opt, [{priority, high}]}]). %% @private -spec stop(pid()) -> stopped. @@ -50,15 +52,16 @@ stop(ServerPid) -> %% Pools of connections are used to restrict the maximum number of connections %% depending on their type. By default, Cowboy add all connections to the %% pool <em>default</em>. It also checks for the maximum number of connections -%% in that pool before accepting again. +%% in that pool before accepting again. This function only returns when there +%% is free space in the pool. %% %% When a process managing a connection dies, the process is removed from the %% pool. If the socket has been sent to another process, it is up to the %% protocol code to inform the listener of the new <em>ConnPid</em> by removing %% the previous and adding the new one. --spec add_connection(pid(), atom(), pid()) -> {ok, non_neg_integer()}. +-spec add_connection(pid(), atom(), pid()) -> ok. add_connection(ServerPid, Pool, ConnPid) -> - gen_server:call(ServerPid, {add_connection, Pool, ConnPid}). + gen_server:call(ServerPid, {add_connection, Pool, ConnPid}, infinity). %% @doc Move a connection from one pool to another. -spec move_connection(pid(), atom(), pid()) -> ok. @@ -70,30 +73,22 @@ move_connection(ServerPid, DestPool, ConnPid) -> remove_connection(ServerPid, ConnPid) -> gen_server:cast(ServerPid, {remove_connection, ConnPid}). -%% @doc Wait until the number of connections in the given pool gets below -%% the given threshold. -%% -%% This function will not return until the number of connections in the pool -%% gets below <em>MaxConns</em>. It makes use of <em>gen_server:reply/2</em> -%% to make the process wait for a reply indefinitely. --spec wait(pid(), atom(), non_neg_integer()) -> ok. -wait(ServerPid, Pool, MaxConns) -> - gen_server:call(ServerPid, {wait, Pool, MaxConns}, infinity). - %% gen_server. %% @private --spec init([]) -> {ok, #state{}}. -init([]) -> +-spec init(list()) -> {ok, #state{}}. +init([MaxConns]) -> ReqsTablePid = ets:new(requests_table, [set, private]), - {ok, #state{reqs_table=ReqsTablePid}}. + {ok, #state{reqs_table=ReqsTablePid, max_conns=MaxConns}}. %% @private -spec handle_call(_, _, State) -> {reply, ignored, State} | {stop, normal, stopped, State}. -handle_call({add_connection, Pool, ConnPid}, _From, State=#state{ - req_pools=Pools, reqs_table=ReqsTable}) -> +handle_call({add_connection, Pool, ConnPid}, From, State=#state{ + req_pools=Pools, reqs_table=ReqsTable, + queue=Queue, max_conns=MaxConns}) -> MonitorRef = erlang:monitor(process, ConnPid), + ConnPid ! {shoot, self()}, {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of false -> {1, [{Pool, 1}|Pools]}; @@ -102,14 +97,10 @@ handle_call({add_connection, Pool, ConnPid}, _From, State=#state{ {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} end, ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}), - {reply, {ok, NbConnsRet}, State#state{req_pools=Pools2}}; -handle_call({wait, Pool, MaxConns}, From, State=#state{ - req_pools=Pools, queue=Queue}) -> - case lists:keyfind(Pool, 1, Pools) of - {Pool, NbConns} when NbConns > MaxConns -> - {noreply, State#state{queue=[From|Queue]}}; - _Any -> - {reply, ok, State} + if NbConnsRet > MaxConns -> + {noreply, State#state{req_pools=Pools2, queue=[From|Queue]}}; + true -> + {reply, ok, State#state{req_pools=Pools2}} end; handle_call(stop, _From, State) -> {stop, normal, stopped, State}; diff --git a/src/cowboy_listener_sup.erl b/src/cowboy_listener_sup.erl index aca2b0b..1304cbc 100644 --- a/src/cowboy_listener_sup.erl +++ b/src/cowboy_listener_sup.erl @@ -24,9 +24,10 @@ -spec start_link(non_neg_integer(), module(), any(), module(), any()) -> {ok, pid()}. start_link(NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> + MaxConns = proplists:get_value(max_connections, TransOpts, 1024), {ok, SupPid} = supervisor:start_link(?MODULE, []), {ok, ListenerPid} = supervisor:start_child(SupPid, - {cowboy_listener, {cowboy_listener, start_link, []}, + {cowboy_listener, {cowboy_listener, start_link, [MaxConns]}, permanent, 5000, worker, [cowboy_listener]}), {ok, ReqsPid} = supervisor:start_child(SupPid, {cowboy_requests_sup, {cowboy_requests_sup, start_link, []}, |