aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ranch.erl63
-rw-r--r--src/ranch_acceptors_sup.erl5
-rw-r--r--src/ranch_conns_sup.erl17
-rw-r--r--src/ranch_listener.erl142
-rw-r--r--src/ranch_listener_sup.erl9
-rw-r--r--src/ranch_protocol.erl2
-rw-r--r--src/ranch_server.erl111
7 files changed, 131 insertions, 218 deletions
diff --git a/src/ranch.erl b/src/ranch.erl
index 6f1b6fe..ea5f59b 100644
--- a/src/ranch.erl
+++ b/src/ranch.erl
@@ -19,6 +19,7 @@
-export([stop_listener/1]).
-export([child_spec/6]).
-export([accept_ack/1]).
+-export([remove_connection/1]).
-export([get_port/1]).
-export([get_max_connections/1]).
-export([set_max_connections/2]).
@@ -67,17 +68,23 @@ start_listener(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts)
true ->
Res = supervisor:start_child(ranch_sup, child_spec(Ref, NbAcceptors,
Transport, TransOpts, Protocol, ProtoOpts)),
- case proplists:get_value(socket, TransOpts) of
- undefined ->
- ok;
- Socket ->
- %% change the controlling process so the caller dying doesn't
- %% close the port
- ListenerPid = ranch_server:lookup_listener(Ref),
+ Socket = proplists:get_value(socket, TransOpts),
+ case Res of
+ {ok, Pid} when Socket =/= undefined ->
+ %% Give ownership of the socket to ranch_acceptors_sup
+ %% to make sure the socket stays open as long as the
+ %% listener is alive. If the socket closes however there
+ %% will be no way to recover because we don't know how
+ %% to open it again.
+ Children = supervisor:which_children(Pid),
+ {_, AcceptorsSup, _, _}
+ = lists:keyfind(ranch_acceptors_sup, 1, Children),
%%% Note: the catch is here because SSL crashes when you change
%%% the controlling process of a listen socket because of a bug.
%%% The bug will be fixed in R16.
- catch(Transport:controlling_process(Socket, ListenerPid))
+ catch Transport:controlling_process(Socket, AcceptorsSup);
+ _ ->
+ ok
end,
Res
end.
@@ -90,7 +97,8 @@ start_listener(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts)
stop_listener(Ref) ->
case supervisor:terminate_child(ranch_sup, {ranch_listener_sup, Ref}) of
ok ->
- supervisor:delete_child(ranch_sup, {ranch_listener_sup, Ref});
+ _ = supervisor:delete_child(ranch_sup, {ranch_listener_sup, Ref}),
+ ranch_server:cleanup_listener_opts(Ref);
{error, Reason} ->
{error, Reason}
end.
@@ -115,36 +123,40 @@ child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts)
%%
%% Effectively used to make sure the socket control has been given to
%% the protocol process before starting to use it.
--spec accept_ack(pid()) -> ok.
-accept_ack(ListenerPid) ->
- receive {shoot, ListenerPid} -> ok end.
+-spec accept_ack(any()) -> ok.
+accept_ack(Ref) ->
+ receive {shoot, Ref} -> ok end.
+
+%% @doc Remove the calling process' connection from the pool.
+%%
+%% Useful if you have long-lived connections that aren't taking up
+%% resources and shouldn't be counted in the limited number of running
+%% connections.
+-spec remove_connection(any()) -> ok.
+remove_connection(Ref) ->
+ ConnsSup = ranch_server:get_connections_sup(Ref),
+ ConnsSup ! {remove_connection, Ref},
+ ok.
%% @doc Return the listener's port.
-spec get_port(any()) -> inet:port_number().
get_port(Ref) ->
- ListenerPid = ranch_server:lookup_listener(Ref),
- {ok, Port} = ranch_listener:get_port(ListenerPid),
- Port.
+ ranch_server:get_port(Ref).
%% @doc Return the max number of connections allowed concurrently.
-spec get_max_connections(any()) -> max_conns().
get_max_connections(Ref) ->
- ListenerPid = ranch_server:lookup_listener(Ref),
- {ok, MaxConnections} = ranch_listener:get_max_connections(ListenerPid),
- MaxConnections.
+ ranch_server:get_max_connections(Ref).
%% @doc Set the max number of connections allowed concurrently.
-spec set_max_connections(any(), max_conns()) -> ok.
set_max_connections(Ref, MaxConnections) ->
- ListenerPid = ranch_server:lookup_listener(Ref),
- ok = ranch_listener:set_max_connections(ListenerPid, MaxConnections).
+ ranch_server:set_max_connections(Ref, MaxConnections).
%% @doc Return the current protocol options for the given listener.
-spec get_protocol_options(any()) -> any().
get_protocol_options(Ref) ->
- ListenerPid = ranch_server:lookup_listener(Ref),
- {ok, ProtoOpts} = ranch_listener:get_protocol_options(ListenerPid),
- ProtoOpts.
+ ranch_server:get_protocol_options(Ref).
%% @doc Upgrade the protocol options for the given listener.
%%
@@ -152,9 +164,8 @@ get_protocol_options(Ref) ->
%% newly accepted connections receive the new protocol options. This has
%% no effect on the currently opened connections.
-spec set_protocol_options(any(), any()) -> ok.
-set_protocol_options(Ref, ProtoOpts) ->
- ListenerPid = ranch_server:lookup_listener(Ref),
- ok = ranch_listener:set_protocol_options(ListenerPid, ProtoOpts).
+set_protocol_options(Ref, Opts) ->
+ ranch_server:set_protocol_options(Ref, Opts).
%% @doc Filter a list of options and remove all unwanted values.
%%
diff --git a/src/ranch_acceptors_sup.erl b/src/ranch_acceptors_sup.erl
index 97df74f..18574fa 100644
--- a/src/ranch_acceptors_sup.erl
+++ b/src/ranch_acceptors_sup.erl
@@ -32,8 +32,7 @@ start_link(Ref, NbAcceptors, Transport, TransOpts) ->
%% supervisor.
init([Ref, NbAcceptors, Transport, TransOpts]) ->
- ListenerPid = ranch_server:lookup_listener(Ref),
- ConnsSup = ranch_server:lookup_connections_sup(Ref),
+ ConnsSup = ranch_server:get_connections_sup(Ref),
LSocket = case proplists:get_value(socket, TransOpts) of
undefined ->
{ok, Socket} = Transport:listen(TransOpts),
@@ -42,7 +41,7 @@ init([Ref, NbAcceptors, Transport, TransOpts]) ->
Socket
end,
{ok, {_, Port}} = Transport:sockname(LSocket),
- ranch_listener:set_port(ListenerPid, Port),
+ ranch_server:set_port(Ref, Port),
Procs = [
{{acceptor, self(), N}, {ranch_acceptor, start_link, [
LSocket, Transport, ConnsSup
diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl
index 29b550f..eaa4d00 100644
--- a/src/ranch_conns_sup.erl
+++ b/src/ranch_conns_sup.erl
@@ -32,7 +32,7 @@
-record(state, {
parent = undefined :: pid(),
- listener_pid = undefined :: pid(),
+ ref :: any(),
transport = undefined :: module(),
protocol = undefined :: module(),
opts :: any(),
@@ -92,22 +92,21 @@ active_connections(SupPid) ->
init(Parent, Ref, Transport, Protocol) ->
process_flag(trap_exit, true),
ok = ranch_server:set_connections_sup(Ref, self()),
- ListenerPid = ranch_server:lookup_listener(Ref),
- {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid),
- {ok, Opts} = ranch_listener:get_protocol_options(ListenerPid),
+ MaxConns = ranch_server:get_max_connections(Ref),
+ Opts = ranch_server:get_protocol_options(Ref),
ok = proc_lib:init_ack(Parent, {ok, self()}),
- loop(#state{parent=Parent, listener_pid=ListenerPid, transport=Transport,
+ loop(#state{parent=Parent, ref=Ref, transport=Transport,
protocol=Protocol, opts=Opts, max_conns=MaxConns}, 0, 0, []).
-loop(State=#state{parent=Parent, listener_pid=ListenerPid,
+loop(State=#state{parent=Parent, ref=Ref,
transport=Transport, protocol=Protocol, opts=Opts,
max_conns=MaxConns}, CurConns, NbChildren, Sleepers) ->
receive
{?MODULE, start_protocol, To, Socket} ->
- case Protocol:start_link(ListenerPid, Socket, Transport, Opts) of
+ case Protocol:start_link(Ref, Socket, Transport, Opts) of
{ok, Pid} ->
Transport:controlling_process(Socket, Pid),
- Pid ! {shoot, ListenerPid},
+ Pid ! {shoot, Ref},
put(Pid, true),
CurConns2 = CurConns + 1,
if CurConns2 < MaxConns ->
@@ -126,7 +125,7 @@ loop(State=#state{parent=Parent, listener_pid=ListenerPid,
To ! {Tag, CurConns},
loop(State, CurConns, NbChildren, Sleepers);
%% Remove a connection from the count of connections.
- {remove_connection, ListenerPid} ->
+ {remove_connection, Ref} ->
loop(State, CurConns - 1, NbChildren, Sleepers);
%% Upgrade the max number of connections allowed concurrently.
%% We resume all sleeping acceptors if this number increases.
diff --git a/src/ranch_listener.erl b/src/ranch_listener.erl
deleted file mode 100644
index a53cc75..0000000
--- a/src/ranch_listener.erl
+++ /dev/null
@@ -1,142 +0,0 @@
-%% Copyright (c) 2011-2012, Loïc Hoguin <[email protected]>
-%%
-%% Permission to use, copy, modify, and/or distribute this software for any
-%% purpose with or without fee is hereby granted, provided that the above
-%% copyright notice and this permission notice appear in all copies.
-%%
-%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
-%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
-%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
-%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
-%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
-%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
-%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
-%% @doc Public API for managing listeners.
--module(ranch_listener).
--behaviour(gen_server).
-
-%% API.
--export([start_link/3]).
--export([stop/1]).
--export([remove_connection/1]).
--export([get_port/1]).
--export([set_port/2]).
--export([get_max_connections/1]).
--export([set_max_connections/2]).
--export([get_protocol_options/1]).
--export([set_protocol_options/2]).
-
-%% gen_server.
--export([init/1]).
--export([handle_call/3]).
--export([handle_cast/2]).
--export([handle_info/2]).
--export([terminate/2]).
--export([code_change/3]).
-
--record(state, {
- ref :: any(),
- max_conns = undefined :: ranch:max_conns(),
- port = undefined :: undefined | inet:port_number(),
- proto_opts = undefined :: any()
-}).
-
-%% API.
-
-%% @private
--spec start_link(any(), non_neg_integer(), any()) -> {ok, pid()}.
-start_link(Ref, MaxConns, ProtoOpts) ->
- gen_server:start_link(?MODULE, [Ref, MaxConns, ProtoOpts], []).
-
-%% @private
--spec stop(pid()) -> stopped.
-stop(ServerPid) ->
- gen_server:call(ServerPid, stop).
-
-%% @doc Remove this process' connection from the pool.
-%%
-%% Useful if you have long-lived connections that aren't taking up
-%% resources and shouldn't be counted in the limited number of running
-%% connections.
--spec remove_connection(pid()) -> ok.
-remove_connection(ServerPid) ->
- ConnsSup = ranch_server:find_connections_sup(ServerPid),
- ConnsSup ! {remove_connection, ServerPid},
- ok.
-
-%% @doc Return the listener's port.
--spec get_port(pid()) -> {ok, inet:port_number()}.
-get_port(ServerPid) ->
- gen_server:call(ServerPid, get_port).
-
-%% @private
--spec set_port(pid(), inet:port_number()) -> ok.
-set_port(ServerPid, Port) ->
- gen_server:cast(ServerPid, {set_port, Port}).
-
-%% @doc Return the max number of connections allowed concurrently.
--spec get_max_connections(pid()) -> {ok, ranch:max_conns()}.
-get_max_connections(ServerPid) ->
- gen_server:call(ServerPid, get_max_connections).
-
-%% @doc Set the max number of connections allowed concurrently.
--spec set_max_connections(pid(), ranch:max_conns()) -> ok.
-set_max_connections(ServerPid, MaxConnections) ->
- gen_server:call(ServerPid, {set_max_connections, MaxConnections}).
-
-%% @doc Return the current protocol options.
--spec get_protocol_options(pid()) -> {ok, any()}.
-get_protocol_options(ServerPid) ->
- gen_server:call(ServerPid, get_protocol_options).
-
-%% @doc Upgrade the protocol options.
--spec set_protocol_options(pid(), any()) -> ok.
-set_protocol_options(ServerPid, ProtoOpts) ->
- gen_server:call(ServerPid, {set_protocol_options, ProtoOpts}).
-
-%% gen_server.
-
-%% @private
-init([Ref, MaxConns, ProtoOpts]) ->
- ok = ranch_server:insert_listener(Ref, self()),
- {ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}.
-
-%% @private
-handle_call(get_port, _From, State=#state{port=Port}) ->
- {reply, {ok, Port}, State};
-handle_call(get_max_connections, _From, State=#state{max_conns=MaxConns}) ->
- {reply, {ok, MaxConns}, State};
-handle_call({set_max_connections, MaxConnections}, _From,
- State=#state{ref=Ref}) ->
- ConnsSup = ranch_server:lookup_connections_sup(Ref),
- ConnsSup ! {set_max_conns, MaxConnections},
- {reply, ok, State#state{max_conns=MaxConnections}};
-handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) ->
- {reply, {ok, ProtoOpts}, State};
-handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{ref=Ref}) ->
- ConnsSup = ranch_server:lookup_connections_sup(Ref),
- ConnsSup ! {set_opts, ProtoOpts},
- {reply, ok, State#state{proto_opts=ProtoOpts}};
-handle_call(stop, _From, State) ->
- {stop, normal, stopped, State};
-handle_call(_, _From, State) ->
- {reply, ignored, State}.
-
-%% @private
-handle_cast({set_port, Port}, State) ->
- {noreply, State#state{port=Port}};
-handle_cast(_Msg, State) ->
- {noreply, State}.
-
-%% @private
-handle_info(_Info, State) ->
- {noreply, State}.
-
-%% @private
-terminate(_Reason, _State) ->
- ok.
-
-%% @private
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl
index 1d346aa..9f19123 100644
--- a/src/ranch_listener_sup.erl
+++ b/src/ranch_listener_sup.erl
@@ -28,18 +28,15 @@
-> {ok, pid()}.
start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
+ ranch_server:set_new_listener_opts(Ref, MaxConns, ProtoOpts),
supervisor:start_link(?MODULE, {
- Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts
+ Ref, NbAcceptors, Transport, TransOpts, Protocol
}).
%% supervisor.
-init({Ref, NbAcceptors, MaxConns, Transport, TransOpts, Protocol, ProtoOpts}) ->
+init({Ref, NbAcceptors, Transport, TransOpts, Protocol}) ->
ChildSpecs = [
- %% listener
- {ranch_listener, {ranch_listener, start_link,
- [Ref, MaxConns, ProtoOpts]},
- permanent, 5000, worker, [ranch_listener]},
%% conns_sup
{ranch_conns_sup, {ranch_conns_sup, start_link,
[Ref, Transport, Protocol]},
diff --git a/src/ranch_protocol.erl b/src/ranch_protocol.erl
index 38a56c8..c788547 100644
--- a/src/ranch_protocol.erl
+++ b/src/ranch_protocol.erl
@@ -17,7 +17,7 @@
%% Start a new connection process for the given socket.
-callback start_link(
- ListenerPid::pid(),
+ Ref::any(),
Socket::any(),
Transport::module(),
ProtocolOptions::any())
diff --git a/src/ranch_server.erl b/src/ranch_server.erl
index 77f11c5..d827ae2 100644
--- a/src/ranch_server.erl
+++ b/src/ranch_server.erl
@@ -18,11 +18,16 @@
%% API.
-export([start_link/0]).
--export([insert_listener/2]).
--export([lookup_listener/1]).
+-export([set_new_listener_opts/3]).
+-export([cleanup_listener_opts/1]).
-export([set_connections_sup/2]).
--export([lookup_connections_sup/1]).
--export([find_connections_sup/1]).
+-export([get_connections_sup/1]).
+-export([set_port/2]).
+-export([get_port/1]).
+-export([set_max_connections/2]).
+-export([get_max_connections/1]).
+-export([set_protocol_options/2]).
+-export([get_protocol_options/1]).
-export([count_connections/1]).
%% gen_server.
@@ -47,38 +52,63 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-%% @doc Insert a listener into the database.
--spec insert_listener(any(), pid()) -> ok.
-insert_listener(Ref, Pid) ->
- true = ets:insert_new(?TAB, {{listener, Ref}, Pid, undefined}),
- gen_server:cast(?MODULE, {insert_listener, Ref, Pid}).
-
-%% @doc Lookup a listener in the database.
--spec lookup_listener(any()) -> pid().
-lookup_listener(Ref) ->
- ets:lookup_element(?TAB, {listener, Ref}, 2).
+%% @private
+-spec set_new_listener_opts(any(), ranch:max_conns(), any()) -> ok.
+set_new_listener_opts(Ref, MaxConns, Opts) ->
+ gen_server:call(?MODULE, {set_new_listener_opts, Ref, MaxConns, Opts}).
+
+%% @doc Cleanup listener options after it has been stopped.
+-spec cleanup_listener_opts(any()) -> ok.
+cleanup_listener_opts(Ref) ->
+ _ = ets:delete(?TAB, {port, Ref}),
+ _ = ets:delete(?TAB, {max_conns, Ref}),
+ _ = ets:delete(?TAB, {opts, Ref}),
+ ok.
%% @doc Set a connection supervisor associated with specific listener.
-spec set_connections_sup(any(), pid()) -> ok.
set_connections_sup(Ref, Pid) ->
- true = ets:update_element(?TAB, {listener, Ref}, {3, Pid}),
- true = ets:insert_new(?TAB, {{conns_sup, lookup_listener(Ref)}, Pid}),
- ok.
+ gen_server:call(?MODULE, {set_connections_sup, Ref, Pid}).
-%% @doc Lookup a connection supervisor used by specific listener.
--spec lookup_connections_sup(any()) -> pid() | undefined.
-lookup_connections_sup(Ref) ->
- ets:lookup_element(?TAB, {listener, Ref}, 3).
+%% @doc Return the connection supervisor used by specific listener.
+-spec get_connections_sup(any()) -> pid().
+get_connections_sup(Ref) ->
+ ets:lookup_element(?TAB, {conns_sup, Ref}, 2).
-%% @doc Find a connection supervisor using the listener pid.
--spec find_connections_sup(pid()) -> pid().
-find_connections_sup(Pid) ->
- ets:lookup_element(?TAB, {conns_sup, Pid}, 2).
+%% @private
+-spec set_port(any(), inet:port_number()) -> ok.
+set_port(Ref, Port) ->
+ gen_server:call(?MODULE, {set_port, Ref, Port}).
+
+%% @doc Return the listener's port.
+-spec get_port(any()) -> inet:port_number().
+get_port(Ref) ->
+ ets:lookup_element(?TAB, {port, Ref}, 2).
+
+%% @doc Set the max number of connections allowed concurrently.
+-spec set_max_connections(any(), ranch:max_conns()) -> ok.
+set_max_connections(Ref, MaxConnections) ->
+ gen_server:call(?MODULE, {set_max_conns, Ref, MaxConnections}).
+
+%% @doc Return the max number of connections allowed concurrently.
+-spec get_max_connections(any()) -> ranch:max_conns().
+get_max_connections(Ref) ->
+ ets:lookup_element(?TAB, {max_conns, Ref}, 2).
+
+%% @doc Upgrade the protocol options.
+-spec set_protocol_options(any(), any()) -> ok.
+set_protocol_options(Ref, ProtoOpts) ->
+ gen_server:call(?MODULE, {set_opts, Ref, ProtoOpts}).
+
+%% @doc Return the current protocol options.
+-spec get_protocol_options(any()) -> any().
+get_protocol_options(Ref) ->
+ ets:lookup_element(?TAB, {opts, Ref}, 2).
%% @doc Count the number of connections in the connection pool.
-spec count_connections(any()) -> non_neg_integer().
count_connections(Ref) ->
- ranch_conns_sup:active_connections(lookup_connections_sup(Ref)).
+ ranch_conns_sup:active_connections(get_connections_sup(Ref)).
%% gen_server.
@@ -87,14 +117,33 @@ init([]) ->
{ok, #state{}}.
%% @private
+handle_call({set_new_listener_opts, Ref, MaxConns, Opts}, _, State) ->
+ ets:insert(?TAB, {{max_conns, Ref}, MaxConns}),
+ ets:insert(?TAB, {{opts, Ref}, Opts}),
+ {reply, ok, State};
+handle_call({set_connections_sup, Ref, Pid}, _,
+ State=#state{monitors=Monitors}) ->
+ true = ets:insert_new(?TAB, {{conns_sup, Ref}, Pid}),
+ MonitorRef = erlang:monitor(process, Pid),
+ {reply, ok, State#state{
+ monitors=[{{MonitorRef, Pid}, Ref}|Monitors]}};
+handle_call({set_port, Ref, Port}, _, State) ->
+ true = ets:insert(?TAB, {{port, Ref}, Port}),
+ {reply, ok, State};
+handle_call({set_max_conns, Ref, MaxConns}, _, State) ->
+ ets:insert(?TAB, {{max_conns, Ref}, MaxConns}),
+ ConnsSup = get_connections_sup(Ref),
+ ConnsSup ! {set_max_conns, MaxConns},
+ {reply, ok, State};
+handle_call({set_opts, Ref, Opts}, _, State) ->
+ ets:insert(?TAB, {{opts, Ref}, Opts}),
+ ConnsSup = get_connections_sup(Ref),
+ ConnsSup ! {set_opts, Opts},
+ {reply, ok, State};
handle_call(_Request, _From, State) ->
{reply, ignore, State}.
%% @private
-handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) ->
- MonitorRef = erlang:monitor(process, Pid),
- {noreply, State#state{
- monitors=[{{MonitorRef, Pid}, Ref}|Monitors]}};
handle_cast(_Request, State) ->
{noreply, State}.
@@ -102,7 +151,7 @@ handle_cast(_Request, State) ->
handle_info({'DOWN', MonitorRef, process, Pid, _},
State=#state{monitors=Monitors}) ->
{_, Ref} = lists:keyfind({MonitorRef, Pid}, 1, Monitors),
- true = ets:delete(?TAB, {listener, Ref}),
+ true = ets:delete(?TAB, {conns_sup, Ref}),
Monitors2 = lists:keydelete({MonitorRef, Pid}, 1, Monitors),
{noreply, State#state{monitors=Monitors2}};
handle_info(_Info, State) ->