aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2012-08-06 13:39:28 +0200
committerLoïc Hoguin <[email protected]>2012-08-06 13:39:28 +0200
commit6b354c1124035c54b6648665aafbe7197b34bd0e (patch)
treed31bdd1e0e6cffe8f31e50b40516e63201c6046d /src
parent7d52280c2e3fcc9e1b89435c98ef96d4758aed7a (diff)
downloadranch-6b354c1124035c54b6648665aafbe7197b34bd0e.tar.gz
ranch-6b354c1124035c54b6648665aafbe7197b34bd0e.tar.bz2
ranch-6b354c1124035c54b6648665aafbe7197b34bd0e.zip
Make accept asynchronous
Ranch now accepts connection asynchronously through a separate process. The accept process is linked to the acceptor, calls accept and does nothing else but send the socket back to the acceptor. This allows us to receive messages in the acceptor to handle upgrades instead of polling. This will also allow us later to make acceptors system processes. Remove support for connection pools in favor of a simpler max_connections setting. Connections can be removed from the count, allowing us to have as many long-lived connections as we want while still limiting the number of short-lived ones. Add max_connections, max_connections with long-lived connections, and upgrade tests.
Diffstat (limited to 'src')
-rw-r--r--src/ranch_acceptor.erl71
-rw-r--r--src/ranch_listener.erl193
-rw-r--r--src/ranch_listener_sup.erl3
-rw-r--r--src/ranch_server.erl29
4 files changed, 122 insertions, 174 deletions
diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl
index 692277b..e03cde9 100644
--- a/src/ranch_acceptor.erl
+++ b/src/ranch_acceptor.erl
@@ -19,43 +19,62 @@
-export([start_link/6]).
%% Internal.
--export([acceptor/7]).
+-export([init/7]).
+-export([loop/7]).
%% API.
-spec start_link(any(), inet:socket(), module(), module(), pid(), pid())
-> {ok, pid()}.
start_link(Ref, LSocket, Transport, Protocol, ListenerPid, ConnsSup) ->
+ {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid),
{ok, Opts} = ranch_listener:get_protocol_options(ListenerPid),
- Pid = spawn_link(?MODULE, acceptor,
- [LSocket, Transport, Protocol, Opts, 1, ListenerPid, ConnsSup]),
+ Pid = spawn_link(?MODULE, init,
+ [LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup]),
ok = ranch_server:add_acceptor(Ref, Pid),
{ok, Pid}.
%% Internal.
--spec acceptor(inet:socket(), module(), module(), any(),
- non_neg_integer(), pid(), pid()) -> no_return().
-acceptor(LSocket, Transport, Protocol, Opts, OptsVsn, ListenerPid, ConnsSup) ->
- Res = case Transport:accept(LSocket, 2000) of
- {ok, CSocket} ->
- {ok, Pid} = supervisor:start_child(ConnsSup,
+-spec init(inet:socket(), module(), module(),
+ non_neg_integer(), any(), pid(), pid()) -> no_return().
+init(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) ->
+ async_accept(LSocket, Transport),
+ loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup).
+
+-spec loop(inet:socket(), module(), module(),
+ non_neg_integer(), any(), pid(), pid()) -> no_return().
+loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) ->
+ receive
+ {accept, CSocket} ->
+ {ok, ConnPid} = supervisor:start_child(ConnsSup,
[ListenerPid, CSocket, Transport, Protocol, Opts]),
- Transport:controlling_process(CSocket, Pid),
- ranch_listener:add_connection(ListenerPid,
- default, Pid, OptsVsn);
- {error, timeout} ->
- ranch_listener:check_upgrades(ListenerPid, OptsVsn);
- {error, _Reason} ->
- %% @todo Probably do something here. If the socket was closed,
- %% we may want to try and listen again on the port?
- ok
- end,
- case Res of
- ok ->
- ?MODULE:acceptor(LSocket, Transport, Protocol,
- Opts, OptsVsn, ListenerPid, ConnsSup);
- {upgrade, Opts2, OptsVsn2} ->
- ?MODULE:acceptor(LSocket, Transport, Protocol,
- Opts2, OptsVsn2, ListenerPid, ConnsSup)
+ Transport:controlling_process(CSocket, ConnPid),
+ ConnPid ! {shoot, ListenerPid},
+ NbConns = ranch_listener:add_connection(ListenerPid, ConnPid),
+ maybe_wait(ListenerPid, MaxConns, NbConns),
+ ?MODULE:init(LSocket, Transport, Protocol,
+ MaxConns, Opts, ListenerPid, ConnsSup);
+ {set_opts, Opts2} ->
+ ?MODULE:loop(LSocket, Transport, Protocol,
+ MaxConns, Opts2, ListenerPid, ConnsSup)
end.
+
+-spec maybe_wait(pid(), non_neg_integer(), non_neg_integer()) -> ok.
+maybe_wait(_, MaxConns, NbConns) when MaxConns > NbConns ->
+ ok;
+maybe_wait(ListenerPid, MaxConns, _) ->
+ erlang:yield(),
+ NbConns2 = ranch_server:count_connections(ListenerPid),
+ maybe_wait(ListenerPid, MaxConns, NbConns2).
+
+-spec async_accept(inet:socket(), module()) -> ok.
+async_accept(LSocket, Transport) ->
+ AcceptorPid = self(),
+ _ = spawn_link(fun() ->
+ %% @todo {error, closed} must be handled and other errors ignored.
+ {ok, CSocket} = Transport:accept(LSocket, infinity),
+ Transport:controlling_process(CSocket, AcceptorPid),
+ AcceptorPid ! {accept, CSocket}
+ end),
+ ok.
diff --git a/src/ranch_listener.erl b/src/ranch_listener.erl
index 40528f5..83ee658 100644
--- a/src/ranch_listener.erl
+++ b/src/ranch_listener.erl
@@ -17,14 +17,13 @@
-behaviour(gen_server).
%% API.
--export([start_link/2]).
+-export([start_link/3]).
-export([stop/1]).
--export([add_connection/4]).
--export([move_connection/3]).
--export([remove_connection/2]).
--export([check_upgrades/2]).
+-export([add_connection/2]).
+-export([remove_connection/1]).
-export([get_port/1]).
-export([set_port/2]).
+-export([get_max_connections/1]).
-export([get_protocol_options/1]).
-export([set_protocol_options/2]).
@@ -36,74 +35,41 @@
-export([terminate/2]).
-export([code_change/3]).
--type pools() :: [{atom(), non_neg_integer()}].
-
-record(state, {
- conn_pools = [] :: pools(),
- conns_table :: ets:tid(),
- queue = undefined :: queue(),
+ ref :: any(),
max_conns = undefined :: non_neg_integer(),
port = undefined :: undefined | inet:port_number(),
- proto_opts :: any(),
- proto_opts_vsn = 1 :: non_neg_integer()
+ proto_opts = undefined :: any(),
+ rm_diff = 0 :: non_neg_integer()
}).
%% API.
%% @private
-%%
-%% We set the process priority to high because ranch_listener is the central
-%% gen_server in Ranch and is used to manage all the incoming connections.
-%% Setting the process priority to high ensures the connection-related code
-%% will always be executed when a connection needs it, allowing Ranch to
-%% scale far beyond what it would with a normal priority.
--spec start_link(non_neg_integer(), any()) -> {ok, pid()}.
-start_link(MaxConns, ProtoOpts) ->
- gen_server:start_link(?MODULE, [MaxConns, ProtoOpts],
- [{spawn_opt, [{priority, high}]}]).
+-spec start_link(any(), non_neg_integer(), any()) -> {ok, pid()}.
+start_link(Ref, MaxConns, ProtoOpts) ->
+ gen_server:start_link(?MODULE, [Ref, MaxConns, ProtoOpts], []).
%% @private
-spec stop(pid()) -> stopped.
stop(ServerPid) ->
gen_server:call(ServerPid, stop).
-%% @doc Add a connection to the given pool in the listener.
-%%
-%% Pools of connections are used to restrict the maximum number of connections
-%% depending on their type. By default, Ranch add all connections to the
-%% pool <em>default</em>. It also checks for the maximum number of connections
-%% in that pool before accepting again. This function only returns when there
-%% is free space in the pool.
-%%
-%% When a process managing a connection dies, the process is removed from the
-%% pool. If the socket has been sent to another process, it is up to the
-%% protocol code to inform the listener of the new <em>ConnPid</em> by removing
-%% the previous and adding the new one.
+%% @doc Add a connection to the listener's pool.
+-spec add_connection(pid(), pid()) -> non_neg_integer().
+add_connection(ServerPid, ConnPid) ->
+ ok = gen_server:cast(ServerPid, {add_connection, ConnPid}),
+ ranch_server:add_connection(ServerPid).
+
+%% @doc Remove this process' connection from the pool.
%%
-%% This function also returns whether the protocol options have been modified.
-%% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of
-%% the atom 'ok'. The acceptor can then continue with the new protocol options.
--spec add_connection(pid(), atom(), pid(), non_neg_integer())
- -> ok | {upgrade, any(), non_neg_integer()}.
-add_connection(ServerPid, Pool, ConnPid, OptsVsn) ->
- gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn},
- infinity).
-
-%% @doc Move a connection from one pool to another.
--spec move_connection(pid(), atom(), pid()) -> ok.
-move_connection(ServerPid, DestPool, ConnPid) ->
- gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}).
-
-%% @doc Remove the given connection from its pool.
--spec remove_connection(pid(), pid()) -> ok.
-remove_connection(ServerPid, ConnPid) ->
- gen_server:cast(ServerPid, {remove_connection, ConnPid}).
-
-%% @doc Return whether a protocol upgrade is required.
--spec check_upgrades(pid(), non_neg_integer())
- -> ok | {upgrade, any(), non_neg_integer()}.
-check_upgrades(ServerPid, OptsVsn) ->
- gen_server:call(ServerPid, {check_upgrades, OptsVsn}).
+%% Useful if you have long-lived connections that aren't taking up
+%% resources and shouldn't be counted in the limited number of running
+%% connections.
+-spec remove_connection(pid()) -> non_neg_integer().
+remove_connection(ServerPid) ->
+ ok = gen_server:cast(ServerPid, remove_connection),
+ ranch_server:remove_connection(ServerPid).
%% @doc Return the listener's port.
-spec get_port(pid()) -> {ok, inet:port_number()}.
@@ -115,6 +81,12 @@ get_port(ServerPid) ->
set_port(ServerPid, Port) ->
gen_server:cast(ServerPid, {set_port, Port}).
+%% @doc Return the max number of connections allowed concurrently.
+%% @todo Add set_max_connections.
+-spec get_max_connections(pid()) -> {ok, non_neg_integer()}.
+get_max_connections(ServerPid) ->
+ gen_server:call(ServerPid, get_max_connections).
+
%% @doc Return the current protocol options.
-spec get_protocol_options(pid()) -> {ok, any()}.
get_protocol_options(ServerPid) ->
@@ -128,65 +100,41 @@ set_protocol_options(ServerPid, ProtoOpts) ->
%% gen_server.
%% @private
-init([MaxConns, ProtoOpts]) ->
- ConnsTable = ets:new(connections_table, [set, private]),
- Queue = queue:new(),
- {ok, #state{conns_table=ConnsTable, max_conns=MaxConns,
- proto_opts=ProtoOpts, queue=Queue}}.
+init([Ref, MaxConns, ProtoOpts]) ->
+ {ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}.
%% @private
-handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{
- conn_pools=Pools, conns_table=ConnsTable,
- queue=Queue, max_conns=MaxConns,
- proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->
- {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ConnsTable),
- State2 = State#state{conn_pools=Pools2},
- if AccOptsVsn =/= LisOptsVsn ->
- {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2};
- NbConns > MaxConns ->
- Queue2 = queue:in(From, Queue),
- {noreply, State2#state{queue=Queue2}};
- true ->
- {reply, ok, State2}
- end;
-handle_call({check_upgrades, AccOptsVsn}, _From, State=#state{
- proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->
- if AccOptsVsn =/= LisOptsVsn ->
- {reply, {upgrade, ProtoOpts, LisOptsVsn}, State};
- true ->
- {reply, ok, State}
- end;
handle_call(get_port, _From, State=#state{port=Port}) ->
{reply, {ok, Port}, State};
+handle_call(get_max_connections, _From, State=#state{max_conns=MaxConns}) ->
+ {reply, {ok, MaxConns}, State};
handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) ->
{reply, {ok, ProtoOpts}, State};
-handle_call({set_protocol_options, ProtoOpts}, _From,
- State=#state{proto_opts_vsn=OptsVsn}) ->
- {reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}};
+handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{ref=Ref}) ->
+ ranch_server:send_to_acceptors(Ref, {set_opts, ProtoOpts}),
+ {reply, ok, State#state{proto_opts=ProtoOpts}};
handle_call(stop, _From, State) ->
{stop, normal, stopped, State};
handle_call(_, _From, State) ->
{reply, ignored, State}.
%% @private
+handle_cast({add_connection, ConnPid}, State) ->
+ _ = erlang:monitor(process, ConnPid),
+ {noreply, State};
+handle_cast(remove_connection, State=#state{rm_diff=RmDiff}) ->
+ {noreply, State#state{rm_diff=RmDiff + 1}};
handle_cast({set_port, Port}, State) ->
{noreply, State#state{port=Port}};
-handle_cast({move_connection, DestPool, ConnPid}, State=#state{
- conn_pools=Pools, conns_table=ConnsTable}) ->
- Pools2 = move_pid(ConnPid, DestPool, Pools, ConnsTable),
- {noreply, State#state{conn_pools=Pools2}};
-handle_cast({remove_connection, ConnPid}, State=#state{
- conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) ->
- {Pools2, Queue2} = remove_pid(ConnPid, Pools, ConnsTable, Queue),
- {noreply, State#state{conn_pools=Pools2, queue=Queue2}};
handle_cast(_Msg, State) ->
{noreply, State}.
%% @private
-handle_info({'DOWN', _Ref, process, Pid, _Info}, State=#state{
- conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) ->
- {Pools2, Queue2} = remove_pid(Pid, Pools, ConnsTable, Queue),
- {noreply, State#state{conn_pools=Pools2, queue=Queue2}};
+handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=0}) ->
+ _ = ranch_server:remove_connection(self()),
+ {noreply, State};
+handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=RmDiff}) ->
+ {noreply, State#state{rm_diff=RmDiff - 1}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -197,50 +145,3 @@ terminate(_Reason, _State) ->
%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
-%% Internal.
-
-%% @private
--spec add_pid(pid(), atom(), pools(), ets:tid())
- -> {non_neg_integer(), pools()}.
-add_pid(ConnPid, Pool, Pools, ConnsTable) ->
- MonitorRef = erlang:monitor(process, ConnPid),
- ConnPid ! {shoot, self()},
- {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
- false ->
- {1, [{Pool, 1}|Pools]};
- {Pool, NbConns} ->
- NbConns2 = NbConns + 1,
- {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
- end,
- ets:insert(ConnsTable, {ConnPid, {MonitorRef, Pool}}),
- {NbConnsRet, Pools2}.
-
-%% @private
--spec move_pid(pid(), atom(), pools(), ets:tid()) -> pools().
-move_pid(ConnPid, DestPool, Pools, ConnsTable) ->
- {MonitorRef, SrcPool} = ets:lookup_element(ConnsTable, ConnPid, 2),
- ets:insert(ConnsTable, {ConnPid, {MonitorRef, DestPool}}),
- {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools),
- DestNbConns = case lists:keyfind(DestPool, 1, Pools) of
- false -> 1;
- {DestPool, NbConns} -> NbConns + 1
- end,
- Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)),
- [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2].
-
-%% @private
--spec remove_pid(pid(), pools(), ets:tid(), queue()) -> {pools(), queue()}.
-remove_pid(Pid, Pools, ConnsTable, Queue) ->
- {MonitorRef, Pool} = ets:lookup_element(ConnsTable, Pid, 2),
- erlang:demonitor(MonitorRef, [flush]),
- {Pool, NbConns} = lists:keyfind(Pool, 1, Pools),
- Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)],
- ets:delete(ConnsTable, Pid),
- case queue:out(Queue) of
- {{value, Client}, Queue2} ->
- gen_server:reply(Client, ok),
- {Pools2, Queue2};
- _ ->
- {Pools2, Queue}
- end.
diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl
index de35758..c8ba12d 100644
--- a/src/ranch_listener_sup.erl
+++ b/src/ranch_listener_sup.erl
@@ -30,7 +30,8 @@ start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
{ok, SupPid} = supervisor:start_link(?MODULE, []),
{ok, ListenerPid} = supervisor:start_child(SupPid,
- {ranch_listener, {ranch_listener, start_link, [MaxConns, ProtoOpts]},
+ {ranch_listener, {ranch_listener, start_link,
+ [Ref, MaxConns, ProtoOpts]},
permanent, 5000, worker, [ranch_listener]}),
ok = ranch_server:insert_listener(Ref, ListenerPid),
{ok, ConnsPid} = supervisor:start_child(SupPid,
diff --git a/src/ranch_server.erl b/src/ranch_server.erl
index 16e892d..faec9b6 100644
--- a/src/ranch_server.erl
+++ b/src/ranch_server.erl
@@ -22,6 +22,9 @@
-export([lookup_listener/1]).
-export([add_acceptor/2]).
-export([send_to_acceptors/2]).
+-export([add_connection/1]).
+-export([count_connections/1]).
+-export([remove_connection/1]).
%% gen_server.
-export([init/1]).
@@ -69,12 +72,31 @@ send_to_acceptors(Ref, Msg) ->
_ = [Pid ! Msg || Pid <- Acceptors],
ok.
+%% @doc Add a connection to the connection pool.
+%%
+%% Also return the number of connections in the pool after this operation.
+-spec add_connection(pid()) -> non_neg_integer().
+add_connection(ListenerPid) ->
+ ets:update_counter(?TAB, {connections, ListenerPid}, 1).
+
+%% @doc Count the number of connections in the connection pool.
+-spec count_connections(pid()) -> non_neg_integer().
+count_connections(ListenerPid) ->
+ ets:update_counter(?TAB, {connections, ListenerPid}, 0).
+
+%% @doc Remove a connection from the connection pool.
+%%
+%% Also return the number of connections in the pool after this operation.
+-spec remove_connection(pid()) -> non_neg_integer().
+remove_connection(ListenerPid) ->
+ ets:update_counter(?TAB, {connections, ListenerPid}, -1).
+
%% gen_server.
%% @private
init([]) ->
?TAB = ets:new(?TAB, [
- ordered_set, public, named_table, {read_concurrency, true}]),
+ ordered_set, public, named_table, {write_concurrency, true}]),
{ok, #state{}}.
%% @private
@@ -84,6 +106,7 @@ handle_call(_Request, _From, State) ->
%% @private
handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) ->
true = ets:insert_new(?TAB, {{acceptors, Ref}, []}),
+ true = ets:insert_new(?TAB, {{connections, Pid}, 0}),
MonitorRef = erlang:monitor(process, Pid),
{noreply, State#state{
monitors=[{{MonitorRef, Pid}, {listener, Ref}}|Monitors]}};
@@ -93,6 +116,9 @@ handle_cast({add_acceptor, Ref, Pid}, State=#state{monitors=Monitors}) ->
true = ets:insert(?TAB, {{acceptors, Ref}, [Pid|Acceptors]}),
{noreply, State#state{
monitors=[{{MonitorRef, Pid}, {acceptors, Ref}}|Monitors]}};
+handle_cast({add_connection, Pid}, State) ->
+ _ = erlang:monitor(process, Pid),
+ {noreply, State};
handle_cast(_Request, State) ->
{noreply, State}.
@@ -120,6 +146,7 @@ code_change(_OldVsn, State, _Extra) ->
remove_process(Key = {listener, Ref}, MonitorRef, Pid, Monitors) ->
true = ets:delete(?TAB, Key),
true = ets:delete(?TAB, {acceptors, Ref}),
+ true = ets:delete(?TAB, {connections, Pid}),
lists:keydelete({MonitorRef, Pid}, 1, Monitors);
remove_process(Key = {acceptors, _}, MonitorRef, Pid, Monitors) ->
Acceptors = ets:lookup_element(?TAB, Key, 2),