aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/ranch_acceptor.erl7
-rw-r--r--src/ranch_acceptors_sup.erl12
-rw-r--r--src/ranch_listener_sup.erl2
-rw-r--r--src/ranch_server.erl46
4 files changed, 51 insertions, 16 deletions
diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl
index 8de0de7..040145b 100644
--- a/src/ranch_acceptor.erl
+++ b/src/ranch_acceptor.erl
@@ -16,19 +16,20 @@
-module(ranch_acceptor).
%% API.
--export([start_link/6]).
+-export([start_link/7]).
%% Internal.
-export([acceptor/7]).
%% API.
--spec start_link(inet:socket(), module(), module(), any(),
+-spec start_link(any(), inet:socket(), module(), module(), any(),
pid(), pid()) -> {ok, pid()}.
-start_link(LSocket, Transport, Protocol, Opts,
+start_link(Ref, LSocket, Transport, Protocol, Opts,
ListenerPid, ConnsSup) ->
Pid = spawn_link(?MODULE, acceptor,
[LSocket, Transport, Protocol, Opts, 1, ListenerPid, ConnsSup]),
+ ok = ranch_server:add_acceptor(Ref, Pid),
{ok, Pid}.
%% Internal.
diff --git a/src/ranch_acceptors_sup.erl b/src/ranch_acceptors_sup.erl
index 5617873..8cdef17 100644
--- a/src/ranch_acceptors_sup.erl
+++ b/src/ranch_acceptors_sup.erl
@@ -17,29 +17,29 @@
-behaviour(supervisor).
%% API.
--export([start_link/7]).
+-export([start_link/8]).
%% supervisor.
-export([init/1]).
%% API.
--spec start_link(non_neg_integer(), module(), any(),
+-spec start_link(any(), non_neg_integer(), module(), any(),
module(), any(), pid(), pid()) -> {ok, pid()}.
-start_link(NbAcceptors, Transport, TransOpts,
+start_link(Ref, NbAcceptors, Transport, TransOpts,
Protocol, ProtoOpts, ListenerPid, ConnsPid) ->
- supervisor:start_link(?MODULE, [NbAcceptors, Transport, TransOpts,
+ supervisor:start_link(?MODULE, [Ref, NbAcceptors, Transport, TransOpts,
Protocol, ProtoOpts, ListenerPid, ConnsPid]).
%% supervisor.
-init([NbAcceptors, Transport, TransOpts,
+init([Ref, NbAcceptors, Transport, TransOpts,
Protocol, ProtoOpts, ListenerPid, ConnsPid]) ->
{ok, LSocket} = Transport:listen(TransOpts),
{ok, {_, Port}} = Transport:sockname(LSocket),
ranch_listener:set_port(ListenerPid, Port),
Procs = [{{acceptor, self(), N}, {ranch_acceptor, start_link, [
- LSocket, Transport, Protocol, ProtoOpts,
+ Ref, LSocket, Transport, Protocol, ProtoOpts,
ListenerPid, ConnsPid
]}, permanent, brutal_kill, worker, []}
|| N <- lists:seq(1, NbAcceptors)],
diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl
index b42732d..6adf618 100644
--- a/src/ranch_listener_sup.erl
+++ b/src/ranch_listener_sup.erl
@@ -38,7 +38,7 @@ start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
permanent, 5000, supervisor, [ranch_conns_sup]}),
{ok, _PoolPid} = supervisor:start_child(SupPid,
{ranch_acceptors_sup, {ranch_acceptors_sup, start_link, [
- NbAcceptors, Transport, TransOpts,
+ Ref, NbAcceptors, Transport, TransOpts,
Protocol, ProtoOpts, ListenerPid, ConnsPid
]}, permanent, 5000, supervisor, [ranch_acceptors_sup]}),
{ok, SupPid}.
diff --git a/src/ranch_server.erl b/src/ranch_server.erl
index bafdeb5..7ded858 100644
--- a/src/ranch_server.erl
+++ b/src/ranch_server.erl
@@ -20,6 +20,8 @@
-export([start_link/0]).
-export([insert_listener/2]).
-export([lookup_listener/1]).
+-export([add_acceptor/2]).
+-export([send_to_acceptors/2]).
%% gen_server.
-export([init/1]).
@@ -31,9 +33,10 @@
-define(TAB, ?MODULE).
--type key() :: {listener, any()}.
+-type key() :: {listener | acceptors, any()}.
+-type monitors() :: [{{reference(), pid()}, key()}].
-record(state, {
- monitors = [] :: [{{reference(), pid()}, key()}]
+ monitors = [] :: monitors()
}).
%% API.
@@ -54,6 +57,18 @@ insert_listener(Ref, Pid) ->
lookup_listener(Ref) ->
ets:lookup_element(?TAB, {listener, Ref}, 2).
+%% @doc Add an acceptor for the given listener.
+-spec add_acceptor(any(), pid()) -> ok.
+add_acceptor(Ref, Pid) ->
+ gen_server:cast(?MODULE, {add_acceptor, Ref, Pid}).
+
+%% @doc Send a message to all acceptors of the given listener.
+-spec send_to_acceptors(any(), any()) -> ok.
+send_to_acceptors(Ref, Msg) ->
+ Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2),
+ _ = [Pid ! Msg || Pid <- Acceptors],
+ ok.
+
%% gen_server.
%% @private
@@ -68,17 +83,24 @@ handle_call(_Request, _From, State) ->
%% @private
handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) ->
+ true = ets:insert_new(?TAB, {{acceptors, Ref}, []}),
MonitorRef = erlang:monitor(process, Pid),
{noreply, State#state{
monitors=[{{MonitorRef, Pid}, {listener, Ref}}|Monitors]}};
+handle_cast({add_acceptor, Ref, Pid}, State=#state{monitors=Monitors}) ->
+ MonitorRef = erlang:monitor(process, Pid),
+ Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2),
+ true = ets:insert(?TAB, {{acceptors, Ref}, [Pid|Acceptors]}),
+ {noreply, State#state{
+ monitors=[{{MonitorRef, Pid}, {acceptors, Ref}}|Monitors]}};
handle_cast(_Request, State) ->
{noreply, State}.
%% @private
-handle_info({'DOWN', Ref, process, Pid, _}, State=#state{monitors=Monitors}) ->
- {_, Key} = lists:keyfind({Ref, Pid}, 1, Monitors),
- true = ets:delete(?TAB, Key),
- Monitors2 = lists:keydelete({Ref, Pid}, 1, Monitors),
+handle_info({'DOWN', MonitorRef, process, Pid, _},
+ State=#state{monitors=Monitors}) ->
+ {_, Key} = lists:keyfind({MonitorRef, Pid}, 1, Monitors),
+ Monitors2 = remove_process(Key, MonitorRef, Pid, Monitors),
{noreply, State#state{monitors=Monitors2}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -90,3 +112,15 @@ terminate(_Reason, _State) ->
%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+%% Internal.
+
+-spec remove_process(key(), reference(), pid(), Monitors)
+ -> Monitors when Monitors::monitors() .
+remove_process(Key = {listener, _}, MonitorRef, Pid, Monitors) ->
+ true = ets:delete(?TAB, Key),
+ lists:keydelete({MonitorRef, Pid}, 1, Monitors);
+remove_process(Key = {acceptors, _}, MonitorRef, Pid, Monitors) ->
+ Acceptors = ets:lookup_element(?TAB, Key, 2),
+ true = ets:insert(?TAB, {Key, lists:delete(Pid, Acceptors)}),
+ lists:keydelete({MonitorRef, Pid}, 1, Monitors).