aboutsummaryrefslogtreecommitdiffstats
path: root/src/ranch_listener.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ranch_listener.erl')
-rw-r--r--src/ranch_listener.erl193
1 files changed, 47 insertions, 146 deletions
diff --git a/src/ranch_listener.erl b/src/ranch_listener.erl
index 40528f5..83ee658 100644
--- a/src/ranch_listener.erl
+++ b/src/ranch_listener.erl
@@ -17,14 +17,13 @@
-behaviour(gen_server).
%% API.
--export([start_link/2]).
+-export([start_link/3]).
-export([stop/1]).
--export([add_connection/4]).
--export([move_connection/3]).
--export([remove_connection/2]).
--export([check_upgrades/2]).
+-export([add_connection/2]).
+-export([remove_connection/1]).
-export([get_port/1]).
-export([set_port/2]).
+-export([get_max_connections/1]).
-export([get_protocol_options/1]).
-export([set_protocol_options/2]).
@@ -36,74 +35,41 @@
-export([terminate/2]).
-export([code_change/3]).
--type pools() :: [{atom(), non_neg_integer()}].
-
-record(state, {
- conn_pools = [] :: pools(),
- conns_table :: ets:tid(),
- queue = undefined :: queue(),
+ ref :: any(),
max_conns = undefined :: non_neg_integer(),
port = undefined :: undefined | inet:port_number(),
- proto_opts :: any(),
- proto_opts_vsn = 1 :: non_neg_integer()
+ proto_opts = undefined :: any(),
+ rm_diff = 0 :: non_neg_integer()
}).
%% API.
%% @private
-%%
-%% We set the process priority to high because ranch_listener is the central
-%% gen_server in Ranch and is used to manage all the incoming connections.
-%% Setting the process priority to high ensures the connection-related code
-%% will always be executed when a connection needs it, allowing Ranch to
-%% scale far beyond what it would with a normal priority.
--spec start_link(non_neg_integer(), any()) -> {ok, pid()}.
-start_link(MaxConns, ProtoOpts) ->
- gen_server:start_link(?MODULE, [MaxConns, ProtoOpts],
- [{spawn_opt, [{priority, high}]}]).
+-spec start_link(any(), non_neg_integer(), any()) -> {ok, pid()}.
+start_link(Ref, MaxConns, ProtoOpts) ->
+ gen_server:start_link(?MODULE, [Ref, MaxConns, ProtoOpts], []).
%% @private
-spec stop(pid()) -> stopped.
stop(ServerPid) ->
gen_server:call(ServerPid, stop).
-%% @doc Add a connection to the given pool in the listener.
-%%
-%% Pools of connections are used to restrict the maximum number of connections
-%% depending on their type. By default, Ranch add all connections to the
-%% pool <em>default</em>. It also checks for the maximum number of connections
-%% in that pool before accepting again. This function only returns when there
-%% is free space in the pool.
-%%
-%% When a process managing a connection dies, the process is removed from the
-%% pool. If the socket has been sent to another process, it is up to the
-%% protocol code to inform the listener of the new <em>ConnPid</em> by removing
-%% the previous and adding the new one.
+%% @doc Add a connection to the listener's pool.
+-spec add_connection(pid(), pid()) -> non_neg_integer().
+add_connection(ServerPid, ConnPid) ->
+ ok = gen_server:cast(ServerPid, {add_connection, ConnPid}),
+ ranch_server:add_connection(ServerPid).
+
+%% @doc Remove this process' connection from the pool.
%%
-%% This function also returns whether the protocol options have been modified.
-%% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of
-%% the atom 'ok'. The acceptor can then continue with the new protocol options.
--spec add_connection(pid(), atom(), pid(), non_neg_integer())
- -> ok | {upgrade, any(), non_neg_integer()}.
-add_connection(ServerPid, Pool, ConnPid, OptsVsn) ->
- gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn},
- infinity).
-
-%% @doc Move a connection from one pool to another.
--spec move_connection(pid(), atom(), pid()) -> ok.
-move_connection(ServerPid, DestPool, ConnPid) ->
- gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}).
-
-%% @doc Remove the given connection from its pool.
--spec remove_connection(pid(), pid()) -> ok.
-remove_connection(ServerPid, ConnPid) ->
- gen_server:cast(ServerPid, {remove_connection, ConnPid}).
-
-%% @doc Return whether a protocol upgrade is required.
--spec check_upgrades(pid(), non_neg_integer())
- -> ok | {upgrade, any(), non_neg_integer()}.
-check_upgrades(ServerPid, OptsVsn) ->
- gen_server:call(ServerPid, {check_upgrades, OptsVsn}).
+%% Useful if you have long-lived connections that aren't taking up
+%% resources and shouldn't be counted in the limited number of running
+%% connections.
+-spec remove_connection(pid()) -> non_neg_integer().
+remove_connection(ServerPid) ->
+ ok = gen_server:cast(ServerPid, remove_connection),
+ ranch_server:remove_connection(ServerPid).
%% @doc Return the listener's port.
-spec get_port(pid()) -> {ok, inet:port_number()}.
@@ -115,6 +81,12 @@ get_port(ServerPid) ->
set_port(ServerPid, Port) ->
gen_server:cast(ServerPid, {set_port, Port}).
+%% @doc Return the max number of connections allowed concurrently.
+%% @todo Add set_max_connections.
+-spec get_max_connections(pid()) -> {ok, non_neg_integer()}.
+get_max_connections(ServerPid) ->
+ gen_server:call(ServerPid, get_max_connections).
+
%% @doc Return the current protocol options.
-spec get_protocol_options(pid()) -> {ok, any()}.
get_protocol_options(ServerPid) ->
@@ -128,65 +100,41 @@ set_protocol_options(ServerPid, ProtoOpts) ->
%% gen_server.
%% @private
-init([MaxConns, ProtoOpts]) ->
- ConnsTable = ets:new(connections_table, [set, private]),
- Queue = queue:new(),
- {ok, #state{conns_table=ConnsTable, max_conns=MaxConns,
- proto_opts=ProtoOpts, queue=Queue}}.
+init([Ref, MaxConns, ProtoOpts]) ->
+ {ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}.
%% @private
-handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{
- conn_pools=Pools, conns_table=ConnsTable,
- queue=Queue, max_conns=MaxConns,
- proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->
- {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ConnsTable),
- State2 = State#state{conn_pools=Pools2},
- if AccOptsVsn =/= LisOptsVsn ->
- {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2};
- NbConns > MaxConns ->
- Queue2 = queue:in(From, Queue),
- {noreply, State2#state{queue=Queue2}};
- true ->
- {reply, ok, State2}
- end;
-handle_call({check_upgrades, AccOptsVsn}, _From, State=#state{
- proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->
- if AccOptsVsn =/= LisOptsVsn ->
- {reply, {upgrade, ProtoOpts, LisOptsVsn}, State};
- true ->
- {reply, ok, State}
- end;
handle_call(get_port, _From, State=#state{port=Port}) ->
{reply, {ok, Port}, State};
+handle_call(get_max_connections, _From, State=#state{max_conns=MaxConns}) ->
+ {reply, {ok, MaxConns}, State};
handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) ->
{reply, {ok, ProtoOpts}, State};
-handle_call({set_protocol_options, ProtoOpts}, _From,
- State=#state{proto_opts_vsn=OptsVsn}) ->
- {reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}};
+handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{ref=Ref}) ->
+ ranch_server:send_to_acceptors(Ref, {set_opts, ProtoOpts}),
+ {reply, ok, State#state{proto_opts=ProtoOpts}};
handle_call(stop, _From, State) ->
{stop, normal, stopped, State};
handle_call(_, _From, State) ->
{reply, ignored, State}.
%% @private
+handle_cast({add_connection, ConnPid}, State) ->
+ _ = erlang:monitor(process, ConnPid),
+ {noreply, State};
+handle_cast(remove_connection, State=#state{rm_diff=RmDiff}) ->
+ {noreply, State#state{rm_diff=RmDiff + 1}};
handle_cast({set_port, Port}, State) ->
{noreply, State#state{port=Port}};
-handle_cast({move_connection, DestPool, ConnPid}, State=#state{
- conn_pools=Pools, conns_table=ConnsTable}) ->
- Pools2 = move_pid(ConnPid, DestPool, Pools, ConnsTable),
- {noreply, State#state{conn_pools=Pools2}};
-handle_cast({remove_connection, ConnPid}, State=#state{
- conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) ->
- {Pools2, Queue2} = remove_pid(ConnPid, Pools, ConnsTable, Queue),
- {noreply, State#state{conn_pools=Pools2, queue=Queue2}};
handle_cast(_Msg, State) ->
{noreply, State}.
%% @private
-handle_info({'DOWN', _Ref, process, Pid, _Info}, State=#state{
- conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) ->
- {Pools2, Queue2} = remove_pid(Pid, Pools, ConnsTable, Queue),
- {noreply, State#state{conn_pools=Pools2, queue=Queue2}};
+handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=0}) ->
+ _ = ranch_server:remove_connection(self()),
+ {noreply, State};
+handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=RmDiff}) ->
+ {noreply, State#state{rm_diff=RmDiff - 1}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -197,50 +145,3 @@ terminate(_Reason, _State) ->
%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
-%% Internal.
-
-%% @private
--spec add_pid(pid(), atom(), pools(), ets:tid())
- -> {non_neg_integer(), pools()}.
-add_pid(ConnPid, Pool, Pools, ConnsTable) ->
- MonitorRef = erlang:monitor(process, ConnPid),
- ConnPid ! {shoot, self()},
- {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
- false ->
- {1, [{Pool, 1}|Pools]};
- {Pool, NbConns} ->
- NbConns2 = NbConns + 1,
- {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
- end,
- ets:insert(ConnsTable, {ConnPid, {MonitorRef, Pool}}),
- {NbConnsRet, Pools2}.
-
-%% @private
--spec move_pid(pid(), atom(), pools(), ets:tid()) -> pools().
-move_pid(ConnPid, DestPool, Pools, ConnsTable) ->
- {MonitorRef, SrcPool} = ets:lookup_element(ConnsTable, ConnPid, 2),
- ets:insert(ConnsTable, {ConnPid, {MonitorRef, DestPool}}),
- {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools),
- DestNbConns = case lists:keyfind(DestPool, 1, Pools) of
- false -> 1;
- {DestPool, NbConns} -> NbConns + 1
- end,
- Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)),
- [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2].
-
-%% @private
--spec remove_pid(pid(), pools(), ets:tid(), queue()) -> {pools(), queue()}.
-remove_pid(Pid, Pools, ConnsTable, Queue) ->
- {MonitorRef, Pool} = ets:lookup_element(ConnsTable, Pid, 2),
- erlang:demonitor(MonitorRef, [flush]),
- {Pool, NbConns} = lists:keyfind(Pool, 1, Pools),
- Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)],
- ets:delete(ConnsTable, Pid),
- case queue:out(Queue) of
- {{value, Client}, Queue2} ->
- gen_server:reply(Client, ok),
- {Pools2, Queue2};
- _ ->
- {Pools2, Queue}
- end.