diff options
author | Loïc Hoguin <[email protected]> | 2012-08-06 13:39:28 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2012-08-06 13:39:28 +0200 |
commit | 6b354c1124035c54b6648665aafbe7197b34bd0e (patch) | |
tree | d31bdd1e0e6cffe8f31e50b40516e63201c6046d /src | |
parent | 7d52280c2e3fcc9e1b89435c98ef96d4758aed7a (diff) | |
download | ranch-6b354c1124035c54b6648665aafbe7197b34bd0e.tar.gz ranch-6b354c1124035c54b6648665aafbe7197b34bd0e.tar.bz2 ranch-6b354c1124035c54b6648665aafbe7197b34bd0e.zip |
Make accept asynchronous
Ranch now accepts connection asynchronously through a separate
process. The accept process is linked to the acceptor, calls
accept and does nothing else but send the socket back to the
acceptor. This allows us to receive messages in the acceptor
to handle upgrades instead of polling. This will also allow us
later to make acceptors system processes.
Remove support for connection pools in favor of a simpler
max_connections setting. Connections can be removed from the
count, allowing us to have as many long-lived connections as
we want while still limiting the number of short-lived ones.
Add max_connections, max_connections with long-lived connections,
and upgrade tests.
Diffstat (limited to 'src')
-rw-r--r-- | src/ranch_acceptor.erl | 71 | ||||
-rw-r--r-- | src/ranch_listener.erl | 193 | ||||
-rw-r--r-- | src/ranch_listener_sup.erl | 3 | ||||
-rw-r--r-- | src/ranch_server.erl | 29 |
4 files changed, 122 insertions, 174 deletions
diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl index 692277b..e03cde9 100644 --- a/src/ranch_acceptor.erl +++ b/src/ranch_acceptor.erl @@ -19,43 +19,62 @@ -export([start_link/6]). %% Internal. --export([acceptor/7]). +-export([init/7]). +-export([loop/7]). %% API. -spec start_link(any(), inet:socket(), module(), module(), pid(), pid()) -> {ok, pid()}. start_link(Ref, LSocket, Transport, Protocol, ListenerPid, ConnsSup) -> + {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid), {ok, Opts} = ranch_listener:get_protocol_options(ListenerPid), - Pid = spawn_link(?MODULE, acceptor, - [LSocket, Transport, Protocol, Opts, 1, ListenerPid, ConnsSup]), + Pid = spawn_link(?MODULE, init, + [LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup]), ok = ranch_server:add_acceptor(Ref, Pid), {ok, Pid}. %% Internal. --spec acceptor(inet:socket(), module(), module(), any(), - non_neg_integer(), pid(), pid()) -> no_return(). -acceptor(LSocket, Transport, Protocol, Opts, OptsVsn, ListenerPid, ConnsSup) -> - Res = case Transport:accept(LSocket, 2000) of - {ok, CSocket} -> - {ok, Pid} = supervisor:start_child(ConnsSup, +-spec init(inet:socket(), module(), module(), + non_neg_integer(), any(), pid(), pid()) -> no_return(). +init(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) -> + async_accept(LSocket, Transport), + loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup). + +-spec loop(inet:socket(), module(), module(), + non_neg_integer(), any(), pid(), pid()) -> no_return(). +loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) -> + receive + {accept, CSocket} -> + {ok, ConnPid} = supervisor:start_child(ConnsSup, [ListenerPid, CSocket, Transport, Protocol, Opts]), - Transport:controlling_process(CSocket, Pid), - ranch_listener:add_connection(ListenerPid, - default, Pid, OptsVsn); - {error, timeout} -> - ranch_listener:check_upgrades(ListenerPid, OptsVsn); - {error, _Reason} -> - %% @todo Probably do something here. If the socket was closed, - %% we may want to try and listen again on the port? - ok - end, - case Res of - ok -> - ?MODULE:acceptor(LSocket, Transport, Protocol, - Opts, OptsVsn, ListenerPid, ConnsSup); - {upgrade, Opts2, OptsVsn2} -> - ?MODULE:acceptor(LSocket, Transport, Protocol, - Opts2, OptsVsn2, ListenerPid, ConnsSup) + Transport:controlling_process(CSocket, ConnPid), + ConnPid ! {shoot, ListenerPid}, + NbConns = ranch_listener:add_connection(ListenerPid, ConnPid), + maybe_wait(ListenerPid, MaxConns, NbConns), + ?MODULE:init(LSocket, Transport, Protocol, + MaxConns, Opts, ListenerPid, ConnsSup); + {set_opts, Opts2} -> + ?MODULE:loop(LSocket, Transport, Protocol, + MaxConns, Opts2, ListenerPid, ConnsSup) end. + +-spec maybe_wait(pid(), non_neg_integer(), non_neg_integer()) -> ok. +maybe_wait(_, MaxConns, NbConns) when MaxConns > NbConns -> + ok; +maybe_wait(ListenerPid, MaxConns, _) -> + erlang:yield(), + NbConns2 = ranch_server:count_connections(ListenerPid), + maybe_wait(ListenerPid, MaxConns, NbConns2). + +-spec async_accept(inet:socket(), module()) -> ok. +async_accept(LSocket, Transport) -> + AcceptorPid = self(), + _ = spawn_link(fun() -> + %% @todo {error, closed} must be handled and other errors ignored. + {ok, CSocket} = Transport:accept(LSocket, infinity), + Transport:controlling_process(CSocket, AcceptorPid), + AcceptorPid ! {accept, CSocket} + end), + ok. diff --git a/src/ranch_listener.erl b/src/ranch_listener.erl index 40528f5..83ee658 100644 --- a/src/ranch_listener.erl +++ b/src/ranch_listener.erl @@ -17,14 +17,13 @@ -behaviour(gen_server). %% API. --export([start_link/2]). +-export([start_link/3]). -export([stop/1]). --export([add_connection/4]). --export([move_connection/3]). --export([remove_connection/2]). --export([check_upgrades/2]). +-export([add_connection/2]). +-export([remove_connection/1]). -export([get_port/1]). -export([set_port/2]). +-export([get_max_connections/1]). -export([get_protocol_options/1]). -export([set_protocol_options/2]). @@ -36,74 +35,41 @@ -export([terminate/2]). -export([code_change/3]). --type pools() :: [{atom(), non_neg_integer()}]. - -record(state, { - conn_pools = [] :: pools(), - conns_table :: ets:tid(), - queue = undefined :: queue(), + ref :: any(), max_conns = undefined :: non_neg_integer(), port = undefined :: undefined | inet:port_number(), - proto_opts :: any(), - proto_opts_vsn = 1 :: non_neg_integer() + proto_opts = undefined :: any(), + rm_diff = 0 :: non_neg_integer() }). %% API. %% @private -%% -%% We set the process priority to high because ranch_listener is the central -%% gen_server in Ranch and is used to manage all the incoming connections. -%% Setting the process priority to high ensures the connection-related code -%% will always be executed when a connection needs it, allowing Ranch to -%% scale far beyond what it would with a normal priority. --spec start_link(non_neg_integer(), any()) -> {ok, pid()}. -start_link(MaxConns, ProtoOpts) -> - gen_server:start_link(?MODULE, [MaxConns, ProtoOpts], - [{spawn_opt, [{priority, high}]}]). +-spec start_link(any(), non_neg_integer(), any()) -> {ok, pid()}. +start_link(Ref, MaxConns, ProtoOpts) -> + gen_server:start_link(?MODULE, [Ref, MaxConns, ProtoOpts], []). %% @private -spec stop(pid()) -> stopped. stop(ServerPid) -> gen_server:call(ServerPid, stop). -%% @doc Add a connection to the given pool in the listener. -%% -%% Pools of connections are used to restrict the maximum number of connections -%% depending on their type. By default, Ranch add all connections to the -%% pool <em>default</em>. It also checks for the maximum number of connections -%% in that pool before accepting again. This function only returns when there -%% is free space in the pool. -%% -%% When a process managing a connection dies, the process is removed from the -%% pool. If the socket has been sent to another process, it is up to the -%% protocol code to inform the listener of the new <em>ConnPid</em> by removing -%% the previous and adding the new one. +%% @doc Add a connection to the listener's pool. +-spec add_connection(pid(), pid()) -> non_neg_integer(). +add_connection(ServerPid, ConnPid) -> + ok = gen_server:cast(ServerPid, {add_connection, ConnPid}), + ranch_server:add_connection(ServerPid). + +%% @doc Remove this process' connection from the pool. %% -%% This function also returns whether the protocol options have been modified. -%% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of -%% the atom 'ok'. The acceptor can then continue with the new protocol options. --spec add_connection(pid(), atom(), pid(), non_neg_integer()) - -> ok | {upgrade, any(), non_neg_integer()}. -add_connection(ServerPid, Pool, ConnPid, OptsVsn) -> - gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn}, - infinity). - -%% @doc Move a connection from one pool to another. --spec move_connection(pid(), atom(), pid()) -> ok. -move_connection(ServerPid, DestPool, ConnPid) -> - gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}). - -%% @doc Remove the given connection from its pool. --spec remove_connection(pid(), pid()) -> ok. -remove_connection(ServerPid, ConnPid) -> - gen_server:cast(ServerPid, {remove_connection, ConnPid}). - -%% @doc Return whether a protocol upgrade is required. --spec check_upgrades(pid(), non_neg_integer()) - -> ok | {upgrade, any(), non_neg_integer()}. -check_upgrades(ServerPid, OptsVsn) -> - gen_server:call(ServerPid, {check_upgrades, OptsVsn}). +%% Useful if you have long-lived connections that aren't taking up +%% resources and shouldn't be counted in the limited number of running +%% connections. +-spec remove_connection(pid()) -> non_neg_integer(). +remove_connection(ServerPid) -> + ok = gen_server:cast(ServerPid, remove_connection), + ranch_server:remove_connection(ServerPid). %% @doc Return the listener's port. -spec get_port(pid()) -> {ok, inet:port_number()}. @@ -115,6 +81,12 @@ get_port(ServerPid) -> set_port(ServerPid, Port) -> gen_server:cast(ServerPid, {set_port, Port}). +%% @doc Return the max number of connections allowed concurrently. +%% @todo Add set_max_connections. +-spec get_max_connections(pid()) -> {ok, non_neg_integer()}. +get_max_connections(ServerPid) -> + gen_server:call(ServerPid, get_max_connections). + %% @doc Return the current protocol options. -spec get_protocol_options(pid()) -> {ok, any()}. get_protocol_options(ServerPid) -> @@ -128,65 +100,41 @@ set_protocol_options(ServerPid, ProtoOpts) -> %% gen_server. %% @private -init([MaxConns, ProtoOpts]) -> - ConnsTable = ets:new(connections_table, [set, private]), - Queue = queue:new(), - {ok, #state{conns_table=ConnsTable, max_conns=MaxConns, - proto_opts=ProtoOpts, queue=Queue}}. +init([Ref, MaxConns, ProtoOpts]) -> + {ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}. %% @private -handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{ - conn_pools=Pools, conns_table=ConnsTable, - queue=Queue, max_conns=MaxConns, - proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> - {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ConnsTable), - State2 = State#state{conn_pools=Pools2}, - if AccOptsVsn =/= LisOptsVsn -> - {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2}; - NbConns > MaxConns -> - Queue2 = queue:in(From, Queue), - {noreply, State2#state{queue=Queue2}}; - true -> - {reply, ok, State2} - end; -handle_call({check_upgrades, AccOptsVsn}, _From, State=#state{ - proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> - if AccOptsVsn =/= LisOptsVsn -> - {reply, {upgrade, ProtoOpts, LisOptsVsn}, State}; - true -> - {reply, ok, State} - end; handle_call(get_port, _From, State=#state{port=Port}) -> {reply, {ok, Port}, State}; +handle_call(get_max_connections, _From, State=#state{max_conns=MaxConns}) -> + {reply, {ok, MaxConns}, State}; handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) -> {reply, {ok, ProtoOpts}, State}; -handle_call({set_protocol_options, ProtoOpts}, _From, - State=#state{proto_opts_vsn=OptsVsn}) -> - {reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}}; +handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{ref=Ref}) -> + ranch_server:send_to_acceptors(Ref, {set_opts, ProtoOpts}), + {reply, ok, State#state{proto_opts=ProtoOpts}}; handle_call(stop, _From, State) -> {stop, normal, stopped, State}; handle_call(_, _From, State) -> {reply, ignored, State}. %% @private +handle_cast({add_connection, ConnPid}, State) -> + _ = erlang:monitor(process, ConnPid), + {noreply, State}; +handle_cast(remove_connection, State=#state{rm_diff=RmDiff}) -> + {noreply, State#state{rm_diff=RmDiff + 1}}; handle_cast({set_port, Port}, State) -> {noreply, State#state{port=Port}}; -handle_cast({move_connection, DestPool, ConnPid}, State=#state{ - conn_pools=Pools, conns_table=ConnsTable}) -> - Pools2 = move_pid(ConnPid, DestPool, Pools, ConnsTable), - {noreply, State#state{conn_pools=Pools2}}; -handle_cast({remove_connection, ConnPid}, State=#state{ - conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) -> - {Pools2, Queue2} = remove_pid(ConnPid, Pools, ConnsTable, Queue), - {noreply, State#state{conn_pools=Pools2, queue=Queue2}}; handle_cast(_Msg, State) -> {noreply, State}. %% @private -handle_info({'DOWN', _Ref, process, Pid, _Info}, State=#state{ - conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) -> - {Pools2, Queue2} = remove_pid(Pid, Pools, ConnsTable, Queue), - {noreply, State#state{conn_pools=Pools2, queue=Queue2}}; +handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=0}) -> + _ = ranch_server:remove_connection(self()), + {noreply, State}; +handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=RmDiff}) -> + {noreply, State#state{rm_diff=RmDiff - 1}}; handle_info(_Info, State) -> {noreply, State}. @@ -197,50 +145,3 @@ terminate(_Reason, _State) -> %% @private code_change(_OldVsn, State, _Extra) -> {ok, State}. - -%% Internal. - -%% @private --spec add_pid(pid(), atom(), pools(), ets:tid()) - -> {non_neg_integer(), pools()}. -add_pid(ConnPid, Pool, Pools, ConnsTable) -> - MonitorRef = erlang:monitor(process, ConnPid), - ConnPid ! {shoot, self()}, - {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of - false -> - {1, [{Pool, 1}|Pools]}; - {Pool, NbConns} -> - NbConns2 = NbConns + 1, - {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} - end, - ets:insert(ConnsTable, {ConnPid, {MonitorRef, Pool}}), - {NbConnsRet, Pools2}. - -%% @private --spec move_pid(pid(), atom(), pools(), ets:tid()) -> pools(). -move_pid(ConnPid, DestPool, Pools, ConnsTable) -> - {MonitorRef, SrcPool} = ets:lookup_element(ConnsTable, ConnPid, 2), - ets:insert(ConnsTable, {ConnPid, {MonitorRef, DestPool}}), - {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools), - DestNbConns = case lists:keyfind(DestPool, 1, Pools) of - false -> 1; - {DestPool, NbConns} -> NbConns + 1 - end, - Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)), - [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2]. - -%% @private --spec remove_pid(pid(), pools(), ets:tid(), queue()) -> {pools(), queue()}. -remove_pid(Pid, Pools, ConnsTable, Queue) -> - {MonitorRef, Pool} = ets:lookup_element(ConnsTable, Pid, 2), - erlang:demonitor(MonitorRef, [flush]), - {Pool, NbConns} = lists:keyfind(Pool, 1, Pools), - Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)], - ets:delete(ConnsTable, Pid), - case queue:out(Queue) of - {{value, Client}, Queue2} -> - gen_server:reply(Client, ok), - {Pools2, Queue2}; - _ -> - {Pools2, Queue} - end. diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl index de35758..c8ba12d 100644 --- a/src/ranch_listener_sup.erl +++ b/src/ranch_listener_sup.erl @@ -30,7 +30,8 @@ start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> MaxConns = proplists:get_value(max_connections, TransOpts, 1024), {ok, SupPid} = supervisor:start_link(?MODULE, []), {ok, ListenerPid} = supervisor:start_child(SupPid, - {ranch_listener, {ranch_listener, start_link, [MaxConns, ProtoOpts]}, + {ranch_listener, {ranch_listener, start_link, + [Ref, MaxConns, ProtoOpts]}, permanent, 5000, worker, [ranch_listener]}), ok = ranch_server:insert_listener(Ref, ListenerPid), {ok, ConnsPid} = supervisor:start_child(SupPid, diff --git a/src/ranch_server.erl b/src/ranch_server.erl index 16e892d..faec9b6 100644 --- a/src/ranch_server.erl +++ b/src/ranch_server.erl @@ -22,6 +22,9 @@ -export([lookup_listener/1]). -export([add_acceptor/2]). -export([send_to_acceptors/2]). +-export([add_connection/1]). +-export([count_connections/1]). +-export([remove_connection/1]). %% gen_server. -export([init/1]). @@ -69,12 +72,31 @@ send_to_acceptors(Ref, Msg) -> _ = [Pid ! Msg || Pid <- Acceptors], ok. +%% @doc Add a connection to the connection pool. +%% +%% Also return the number of connections in the pool after this operation. +-spec add_connection(pid()) -> non_neg_integer(). +add_connection(ListenerPid) -> + ets:update_counter(?TAB, {connections, ListenerPid}, 1). + +%% @doc Count the number of connections in the connection pool. +-spec count_connections(pid()) -> non_neg_integer(). +count_connections(ListenerPid) -> + ets:update_counter(?TAB, {connections, ListenerPid}, 0). + +%% @doc Remove a connection from the connection pool. +%% +%% Also return the number of connections in the pool after this operation. +-spec remove_connection(pid()) -> non_neg_integer(). +remove_connection(ListenerPid) -> + ets:update_counter(?TAB, {connections, ListenerPid}, -1). + %% gen_server. %% @private init([]) -> ?TAB = ets:new(?TAB, [ - ordered_set, public, named_table, {read_concurrency, true}]), + ordered_set, public, named_table, {write_concurrency, true}]), {ok, #state{}}. %% @private @@ -84,6 +106,7 @@ handle_call(_Request, _From, State) -> %% @private handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) -> true = ets:insert_new(?TAB, {{acceptors, Ref}, []}), + true = ets:insert_new(?TAB, {{connections, Pid}, 0}), MonitorRef = erlang:monitor(process, Pid), {noreply, State#state{ monitors=[{{MonitorRef, Pid}, {listener, Ref}}|Monitors]}}; @@ -93,6 +116,9 @@ handle_cast({add_acceptor, Ref, Pid}, State=#state{monitors=Monitors}) -> true = ets:insert(?TAB, {{acceptors, Ref}, [Pid|Acceptors]}), {noreply, State#state{ monitors=[{{MonitorRef, Pid}, {acceptors, Ref}}|Monitors]}}; +handle_cast({add_connection, Pid}, State) -> + _ = erlang:monitor(process, Pid), + {noreply, State}; handle_cast(_Request, State) -> {noreply, State}. @@ -120,6 +146,7 @@ code_change(_OldVsn, State, _Extra) -> remove_process(Key = {listener, Ref}, MonitorRef, Pid, Monitors) -> true = ets:delete(?TAB, Key), true = ets:delete(?TAB, {acceptors, Ref}), + true = ets:delete(?TAB, {connections, Pid}), lists:keydelete({MonitorRef, Pid}, 1, Monitors); remove_process(Key = {acceptors, _}, MonitorRef, Pid, Monitors) -> Acceptors = ets:lookup_element(?TAB, Key, 2), |