aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/cowboy_acceptor.erl28
-rw-r--r--src/cowboy_acceptors_sup.erl3
-rw-r--r--src/cowboy_listener.erl55
-rw-r--r--src/cowboy_listener_sup.erl3
4 files changed, 36 insertions, 53 deletions
diff --git a/src/cowboy_acceptor.erl b/src/cowboy_acceptor.erl
index 4cb9fa7..d46e212 100644
--- a/src/cowboy_acceptor.erl
+++ b/src/cowboy_acceptor.erl
@@ -15,33 +15,31 @@
%% @private
-module(cowboy_acceptor).
--export([start_link/7]). %% API.
--export([acceptor/7]). %% Internal.
+-export([start_link/6]). %% API.
+-export([acceptor/6]). %% Internal.
%% API.
-spec start_link(inet:socket(), module(), module(), any(),
- non_neg_integer(), pid(), pid()) -> {ok, pid()}.
+ pid(), pid()) -> {ok, pid()}.
start_link(LSocket, Transport, Protocol, Opts,
- MaxConns, ListenerPid, ReqsSup) ->
+ ListenerPid, ReqsSup) ->
Pid = spawn_link(?MODULE, acceptor,
- [LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup]),
+ [LSocket, Transport, Protocol, Opts, ListenerPid, ReqsSup]),
{ok, Pid}.
%% Internal.
-spec acceptor(inet:socket(), module(), module(), any(),
- non_neg_integer(), pid(), pid()) -> no_return().
-acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) ->
+ pid(), pid()) -> no_return().
+acceptor(LSocket, Transport, Protocol, Opts, ListenerPid, ReqsSup) ->
case Transport:accept(LSocket, 2000) of
{ok, CSocket} ->
{ok, Pid} = supervisor:start_child(ReqsSup,
[ListenerPid, CSocket, Transport, Protocol, Opts]),
Transport:controlling_process(CSocket, Pid),
- {ok, NbConns} = cowboy_listener:add_connection(ListenerPid,
- default, Pid),
- Pid ! {shoot, ListenerPid},
- limit_reqs(ListenerPid, NbConns, MaxConns);
+ ok = cowboy_listener:add_connection(ListenerPid,
+ default, Pid);
{error, timeout} ->
ignore;
{error, _Reason} ->
@@ -50,10 +48,4 @@ acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) ->
ignore
end,
?MODULE:acceptor(LSocket, Transport, Protocol, Opts,
- MaxConns, ListenerPid, ReqsSup).
-
--spec limit_reqs(pid(), non_neg_integer(), non_neg_integer()) -> ok.
-limit_reqs(_ListenerPid, NbConns, MaxConns) when NbConns =< MaxConns ->
- ok;
-limit_reqs(ListenerPid, _NbConns, MaxConns) ->
- cowboy_listener:wait(ListenerPid, default, MaxConns).
+ ListenerPid, ReqsSup).
diff --git a/src/cowboy_acceptors_sup.erl b/src/cowboy_acceptors_sup.erl
index 17849a6..625028c 100644
--- a/src/cowboy_acceptors_sup.erl
+++ b/src/cowboy_acceptors_sup.erl
@@ -34,10 +34,9 @@ start_link(NbAcceptors, Transport, TransOpts,
init([NbAcceptors, Transport, TransOpts,
Protocol, ProtoOpts, ListenerPid, ReqsPid]) ->
{ok, LSocket} = Transport:listen(TransOpts),
- MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
Procs = [{{acceptor, self(), N}, {cowboy_acceptor, start_link, [
LSocket, Transport, Protocol, ProtoOpts,
- MaxConns, ListenerPid, ReqsPid
+ ListenerPid, ReqsPid
]}, permanent, brutal_kill, worker, []}
|| N <- lists:seq(1, NbAcceptors)],
{ok, {{one_for_one, 10, 10}, Procs}}.
diff --git a/src/cowboy_listener.erl b/src/cowboy_listener.erl
index c19d079..8998e13 100644
--- a/src/cowboy_listener.erl
+++ b/src/cowboy_listener.erl
@@ -16,15 +16,16 @@
-module(cowboy_listener).
-behaviour(gen_server).
--export([start_link/0, stop/1,
- add_connection/3, move_connection/3, remove_connection/2, wait/3]). %% API.
+-export([start_link/1, stop/1,
+ add_connection/3, move_connection/3, remove_connection/2]). %% API.
-export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]). %% gen_server.
-record(state, {
req_pools = [] :: [{atom(), non_neg_integer()}],
reqs_table :: ets:tid(),
- queue = [] :: [{pid(), reference()}]
+ queue = [] :: [{pid(), reference()}],
+ max_conns = undefined :: non_neg_integer()
}).
%% API.
@@ -36,9 +37,10 @@
%% Setting the process priority to high ensures the connection-related code
%% will always be executed when a connection needs it, allowing Cowboy to
%% scale far beyond what it would with a normal priority.
--spec start_link() -> {ok, pid()}.
-start_link() ->
- gen_server:start_link(?MODULE, [], [{spawn_opt, [{priority, high}]}]).
+-spec start_link(non_neg_integer()) -> {ok, pid()}.
+start_link(MaxConns) ->
+ gen_server:start_link(?MODULE, [MaxConns],
+ [{spawn_opt, [{priority, high}]}]).
%% @private
-spec stop(pid()) -> stopped.
@@ -50,15 +52,16 @@ stop(ServerPid) ->
%% Pools of connections are used to restrict the maximum number of connections
%% depending on their type. By default, Cowboy add all connections to the
%% pool <em>default</em>. It also checks for the maximum number of connections
-%% in that pool before accepting again.
+%% in that pool before accepting again. This function only returns when there
+%% is free space in the pool.
%%
%% When a process managing a connection dies, the process is removed from the
%% pool. If the socket has been sent to another process, it is up to the
%% protocol code to inform the listener of the new <em>ConnPid</em> by removing
%% the previous and adding the new one.
--spec add_connection(pid(), atom(), pid()) -> {ok, non_neg_integer()}.
+-spec add_connection(pid(), atom(), pid()) -> ok.
add_connection(ServerPid, Pool, ConnPid) ->
- gen_server:call(ServerPid, {add_connection, Pool, ConnPid}).
+ gen_server:call(ServerPid, {add_connection, Pool, ConnPid}, infinity).
%% @doc Move a connection from one pool to another.
-spec move_connection(pid(), atom(), pid()) -> ok.
@@ -70,30 +73,22 @@ move_connection(ServerPid, DestPool, ConnPid) ->
remove_connection(ServerPid, ConnPid) ->
gen_server:cast(ServerPid, {remove_connection, ConnPid}).
-%% @doc Wait until the number of connections in the given pool gets below
-%% the given threshold.
-%%
-%% This function will not return until the number of connections in the pool
-%% gets below <em>MaxConns</em>. It makes use of <em>gen_server:reply/2</em>
-%% to make the process wait for a reply indefinitely.
--spec wait(pid(), atom(), non_neg_integer()) -> ok.
-wait(ServerPid, Pool, MaxConns) ->
- gen_server:call(ServerPid, {wait, Pool, MaxConns}, infinity).
-
%% gen_server.
%% @private
--spec init([]) -> {ok, #state{}}.
-init([]) ->
+-spec init(list()) -> {ok, #state{}}.
+init([MaxConns]) ->
ReqsTablePid = ets:new(requests_table, [set, private]),
- {ok, #state{reqs_table=ReqsTablePid}}.
+ {ok, #state{reqs_table=ReqsTablePid, max_conns=MaxConns}}.
%% @private
-spec handle_call(_, _, State)
-> {reply, ignored, State} | {stop, normal, stopped, State}.
-handle_call({add_connection, Pool, ConnPid}, _From, State=#state{
- req_pools=Pools, reqs_table=ReqsTable}) ->
+handle_call({add_connection, Pool, ConnPid}, From, State=#state{
+ req_pools=Pools, reqs_table=ReqsTable,
+ queue=Queue, max_conns=MaxConns}) ->
MonitorRef = erlang:monitor(process, ConnPid),
+ ConnPid ! {shoot, self()},
{NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
false ->
{1, [{Pool, 1}|Pools]};
@@ -102,14 +97,10 @@ handle_call({add_connection, Pool, ConnPid}, _From, State=#state{
{NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
end,
ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}),
- {reply, {ok, NbConnsRet}, State#state{req_pools=Pools2}};
-handle_call({wait, Pool, MaxConns}, From, State=#state{
- req_pools=Pools, queue=Queue}) ->
- case lists:keyfind(Pool, 1, Pools) of
- {Pool, NbConns} when NbConns > MaxConns ->
- {noreply, State#state{queue=[From|Queue]}};
- _Any ->
- {reply, ok, State}
+ if NbConnsRet > MaxConns ->
+ {noreply, State#state{req_pools=Pools2, queue=[From|Queue]}};
+ true ->
+ {reply, ok, State#state{req_pools=Pools2}}
end;
handle_call(stop, _From, State) ->
{stop, normal, stopped, State};
diff --git a/src/cowboy_listener_sup.erl b/src/cowboy_listener_sup.erl
index aca2b0b..1304cbc 100644
--- a/src/cowboy_listener_sup.erl
+++ b/src/cowboy_listener_sup.erl
@@ -24,9 +24,10 @@
-spec start_link(non_neg_integer(), module(), any(), module(), any())
-> {ok, pid()}.
start_link(NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
+ MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
{ok, SupPid} = supervisor:start_link(?MODULE, []),
{ok, ListenerPid} = supervisor:start_child(SupPid,
- {cowboy_listener, {cowboy_listener, start_link, []},
+ {cowboy_listener, {cowboy_listener, start_link, [MaxConns]},
permanent, 5000, worker, [cowboy_listener]}),
{ok, ReqsPid} = supervisor:start_child(SupPid,
{cowboy_requests_sup, {cowboy_requests_sup, start_link, []},