diff options
Diffstat (limited to 'src/ranch_listener.erl')
-rw-r--r-- | src/ranch_listener.erl | 193 |
1 files changed, 47 insertions, 146 deletions
diff --git a/src/ranch_listener.erl b/src/ranch_listener.erl index 40528f5..83ee658 100644 --- a/src/ranch_listener.erl +++ b/src/ranch_listener.erl @@ -17,14 +17,13 @@ -behaviour(gen_server). %% API. --export([start_link/2]). +-export([start_link/3]). -export([stop/1]). --export([add_connection/4]). --export([move_connection/3]). --export([remove_connection/2]). --export([check_upgrades/2]). +-export([add_connection/2]). +-export([remove_connection/1]). -export([get_port/1]). -export([set_port/2]). +-export([get_max_connections/1]). -export([get_protocol_options/1]). -export([set_protocol_options/2]). @@ -36,74 +35,41 @@ -export([terminate/2]). -export([code_change/3]). --type pools() :: [{atom(), non_neg_integer()}]. - -record(state, { - conn_pools = [] :: pools(), - conns_table :: ets:tid(), - queue = undefined :: queue(), + ref :: any(), max_conns = undefined :: non_neg_integer(), port = undefined :: undefined | inet:port_number(), - proto_opts :: any(), - proto_opts_vsn = 1 :: non_neg_integer() + proto_opts = undefined :: any(), + rm_diff = 0 :: non_neg_integer() }). %% API. %% @private -%% -%% We set the process priority to high because ranch_listener is the central -%% gen_server in Ranch and is used to manage all the incoming connections. -%% Setting the process priority to high ensures the connection-related code -%% will always be executed when a connection needs it, allowing Ranch to -%% scale far beyond what it would with a normal priority. --spec start_link(non_neg_integer(), any()) -> {ok, pid()}. -start_link(MaxConns, ProtoOpts) -> - gen_server:start_link(?MODULE, [MaxConns, ProtoOpts], - [{spawn_opt, [{priority, high}]}]). +-spec start_link(any(), non_neg_integer(), any()) -> {ok, pid()}. +start_link(Ref, MaxConns, ProtoOpts) -> + gen_server:start_link(?MODULE, [Ref, MaxConns, ProtoOpts], []). %% @private -spec stop(pid()) -> stopped. stop(ServerPid) -> gen_server:call(ServerPid, stop). -%% @doc Add a connection to the given pool in the listener. -%% -%% Pools of connections are used to restrict the maximum number of connections -%% depending on their type. By default, Ranch add all connections to the -%% pool <em>default</em>. It also checks for the maximum number of connections -%% in that pool before accepting again. This function only returns when there -%% is free space in the pool. -%% -%% When a process managing a connection dies, the process is removed from the -%% pool. If the socket has been sent to another process, it is up to the -%% protocol code to inform the listener of the new <em>ConnPid</em> by removing -%% the previous and adding the new one. +%% @doc Add a connection to the listener's pool. +-spec add_connection(pid(), pid()) -> non_neg_integer(). +add_connection(ServerPid, ConnPid) -> + ok = gen_server:cast(ServerPid, {add_connection, ConnPid}), + ranch_server:add_connection(ServerPid). + +%% @doc Remove this process' connection from the pool. %% -%% This function also returns whether the protocol options have been modified. -%% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of -%% the atom 'ok'. The acceptor can then continue with the new protocol options. --spec add_connection(pid(), atom(), pid(), non_neg_integer()) - -> ok | {upgrade, any(), non_neg_integer()}. -add_connection(ServerPid, Pool, ConnPid, OptsVsn) -> - gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn}, - infinity). - -%% @doc Move a connection from one pool to another. --spec move_connection(pid(), atom(), pid()) -> ok. -move_connection(ServerPid, DestPool, ConnPid) -> - gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}). - -%% @doc Remove the given connection from its pool. --spec remove_connection(pid(), pid()) -> ok. -remove_connection(ServerPid, ConnPid) -> - gen_server:cast(ServerPid, {remove_connection, ConnPid}). - -%% @doc Return whether a protocol upgrade is required. --spec check_upgrades(pid(), non_neg_integer()) - -> ok | {upgrade, any(), non_neg_integer()}. -check_upgrades(ServerPid, OptsVsn) -> - gen_server:call(ServerPid, {check_upgrades, OptsVsn}). +%% Useful if you have long-lived connections that aren't taking up +%% resources and shouldn't be counted in the limited number of running +%% connections. +-spec remove_connection(pid()) -> non_neg_integer(). +remove_connection(ServerPid) -> + ok = gen_server:cast(ServerPid, remove_connection), + ranch_server:remove_connection(ServerPid). %% @doc Return the listener's port. -spec get_port(pid()) -> {ok, inet:port_number()}. @@ -115,6 +81,12 @@ get_port(ServerPid) -> set_port(ServerPid, Port) -> gen_server:cast(ServerPid, {set_port, Port}). +%% @doc Return the max number of connections allowed concurrently. +%% @todo Add set_max_connections. +-spec get_max_connections(pid()) -> {ok, non_neg_integer()}. +get_max_connections(ServerPid) -> + gen_server:call(ServerPid, get_max_connections). + %% @doc Return the current protocol options. -spec get_protocol_options(pid()) -> {ok, any()}. get_protocol_options(ServerPid) -> @@ -128,65 +100,41 @@ set_protocol_options(ServerPid, ProtoOpts) -> %% gen_server. %% @private -init([MaxConns, ProtoOpts]) -> - ConnsTable = ets:new(connections_table, [set, private]), - Queue = queue:new(), - {ok, #state{conns_table=ConnsTable, max_conns=MaxConns, - proto_opts=ProtoOpts, queue=Queue}}. +init([Ref, MaxConns, ProtoOpts]) -> + {ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}. %% @private -handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{ - conn_pools=Pools, conns_table=ConnsTable, - queue=Queue, max_conns=MaxConns, - proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> - {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ConnsTable), - State2 = State#state{conn_pools=Pools2}, - if AccOptsVsn =/= LisOptsVsn -> - {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2}; - NbConns > MaxConns -> - Queue2 = queue:in(From, Queue), - {noreply, State2#state{queue=Queue2}}; - true -> - {reply, ok, State2} - end; -handle_call({check_upgrades, AccOptsVsn}, _From, State=#state{ - proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> - if AccOptsVsn =/= LisOptsVsn -> - {reply, {upgrade, ProtoOpts, LisOptsVsn}, State}; - true -> - {reply, ok, State} - end; handle_call(get_port, _From, State=#state{port=Port}) -> {reply, {ok, Port}, State}; +handle_call(get_max_connections, _From, State=#state{max_conns=MaxConns}) -> + {reply, {ok, MaxConns}, State}; handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) -> {reply, {ok, ProtoOpts}, State}; -handle_call({set_protocol_options, ProtoOpts}, _From, - State=#state{proto_opts_vsn=OptsVsn}) -> - {reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}}; +handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{ref=Ref}) -> + ranch_server:send_to_acceptors(Ref, {set_opts, ProtoOpts}), + {reply, ok, State#state{proto_opts=ProtoOpts}}; handle_call(stop, _From, State) -> {stop, normal, stopped, State}; handle_call(_, _From, State) -> {reply, ignored, State}. %% @private +handle_cast({add_connection, ConnPid}, State) -> + _ = erlang:monitor(process, ConnPid), + {noreply, State}; +handle_cast(remove_connection, State=#state{rm_diff=RmDiff}) -> + {noreply, State#state{rm_diff=RmDiff + 1}}; handle_cast({set_port, Port}, State) -> {noreply, State#state{port=Port}}; -handle_cast({move_connection, DestPool, ConnPid}, State=#state{ - conn_pools=Pools, conns_table=ConnsTable}) -> - Pools2 = move_pid(ConnPid, DestPool, Pools, ConnsTable), - {noreply, State#state{conn_pools=Pools2}}; -handle_cast({remove_connection, ConnPid}, State=#state{ - conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) -> - {Pools2, Queue2} = remove_pid(ConnPid, Pools, ConnsTable, Queue), - {noreply, State#state{conn_pools=Pools2, queue=Queue2}}; handle_cast(_Msg, State) -> {noreply, State}. %% @private -handle_info({'DOWN', _Ref, process, Pid, _Info}, State=#state{ - conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) -> - {Pools2, Queue2} = remove_pid(Pid, Pools, ConnsTable, Queue), - {noreply, State#state{conn_pools=Pools2, queue=Queue2}}; +handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=0}) -> + _ = ranch_server:remove_connection(self()), + {noreply, State}; +handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=RmDiff}) -> + {noreply, State#state{rm_diff=RmDiff - 1}}; handle_info(_Info, State) -> {noreply, State}. @@ -197,50 +145,3 @@ terminate(_Reason, _State) -> %% @private code_change(_OldVsn, State, _Extra) -> {ok, State}. - -%% Internal. - -%% @private --spec add_pid(pid(), atom(), pools(), ets:tid()) - -> {non_neg_integer(), pools()}. -add_pid(ConnPid, Pool, Pools, ConnsTable) -> - MonitorRef = erlang:monitor(process, ConnPid), - ConnPid ! {shoot, self()}, - {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of - false -> - {1, [{Pool, 1}|Pools]}; - {Pool, NbConns} -> - NbConns2 = NbConns + 1, - {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} - end, - ets:insert(ConnsTable, {ConnPid, {MonitorRef, Pool}}), - {NbConnsRet, Pools2}. - -%% @private --spec move_pid(pid(), atom(), pools(), ets:tid()) -> pools(). -move_pid(ConnPid, DestPool, Pools, ConnsTable) -> - {MonitorRef, SrcPool} = ets:lookup_element(ConnsTable, ConnPid, 2), - ets:insert(ConnsTable, {ConnPid, {MonitorRef, DestPool}}), - {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools), - DestNbConns = case lists:keyfind(DestPool, 1, Pools) of - false -> 1; - {DestPool, NbConns} -> NbConns + 1 - end, - Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)), - [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2]. - -%% @private --spec remove_pid(pid(), pools(), ets:tid(), queue()) -> {pools(), queue()}. -remove_pid(Pid, Pools, ConnsTable, Queue) -> - {MonitorRef, Pool} = ets:lookup_element(ConnsTable, Pid, 2), - erlang:demonitor(MonitorRef, [flush]), - {Pool, NbConns} = lists:keyfind(Pool, 1, Pools), - Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)], - ets:delete(ConnsTable, Pid), - case queue:out(Queue) of - {{value, Client}, Queue2} -> - gen_server:reply(Client, ok), - {Pools2, Queue2}; - _ -> - {Pools2, Queue} - end. |