diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ranch_acceptor.erl | 7 | ||||
-rw-r--r-- | src/ranch_acceptors_sup.erl | 12 | ||||
-rw-r--r-- | src/ranch_listener_sup.erl | 2 | ||||
-rw-r--r-- | src/ranch_server.erl | 46 |
4 files changed, 51 insertions, 16 deletions
diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl index 8de0de7..040145b 100644 --- a/src/ranch_acceptor.erl +++ b/src/ranch_acceptor.erl @@ -16,19 +16,20 @@ -module(ranch_acceptor). %% API. --export([start_link/6]). +-export([start_link/7]). %% Internal. -export([acceptor/7]). %% API. --spec start_link(inet:socket(), module(), module(), any(), +-spec start_link(any(), inet:socket(), module(), module(), any(), pid(), pid()) -> {ok, pid()}. -start_link(LSocket, Transport, Protocol, Opts, +start_link(Ref, LSocket, Transport, Protocol, Opts, ListenerPid, ConnsSup) -> Pid = spawn_link(?MODULE, acceptor, [LSocket, Transport, Protocol, Opts, 1, ListenerPid, ConnsSup]), + ok = ranch_server:add_acceptor(Ref, Pid), {ok, Pid}. %% Internal. diff --git a/src/ranch_acceptors_sup.erl b/src/ranch_acceptors_sup.erl index 5617873..8cdef17 100644 --- a/src/ranch_acceptors_sup.erl +++ b/src/ranch_acceptors_sup.erl @@ -17,29 +17,29 @@ -behaviour(supervisor). %% API. --export([start_link/7]). +-export([start_link/8]). %% supervisor. -export([init/1]). %% API. --spec start_link(non_neg_integer(), module(), any(), +-spec start_link(any(), non_neg_integer(), module(), any(), module(), any(), pid(), pid()) -> {ok, pid()}. -start_link(NbAcceptors, Transport, TransOpts, +start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ListenerPid, ConnsPid) -> - supervisor:start_link(?MODULE, [NbAcceptors, Transport, TransOpts, + supervisor:start_link(?MODULE, [Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ListenerPid, ConnsPid]). %% supervisor. -init([NbAcceptors, Transport, TransOpts, +init([Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ListenerPid, ConnsPid]) -> {ok, LSocket} = Transport:listen(TransOpts), {ok, {_, Port}} = Transport:sockname(LSocket), ranch_listener:set_port(ListenerPid, Port), Procs = [{{acceptor, self(), N}, {ranch_acceptor, start_link, [ - LSocket, Transport, Protocol, ProtoOpts, + Ref, LSocket, Transport, Protocol, ProtoOpts, ListenerPid, ConnsPid ]}, permanent, brutal_kill, worker, []} || N <- lists:seq(1, NbAcceptors)], diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl index b42732d..6adf618 100644 --- a/src/ranch_listener_sup.erl +++ b/src/ranch_listener_sup.erl @@ -38,7 +38,7 @@ start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> permanent, 5000, supervisor, [ranch_conns_sup]}), {ok, _PoolPid} = supervisor:start_child(SupPid, {ranch_acceptors_sup, {ranch_acceptors_sup, start_link, [ - NbAcceptors, Transport, TransOpts, + Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ListenerPid, ConnsPid ]}, permanent, 5000, supervisor, [ranch_acceptors_sup]}), {ok, SupPid}. diff --git a/src/ranch_server.erl b/src/ranch_server.erl index bafdeb5..7ded858 100644 --- a/src/ranch_server.erl +++ b/src/ranch_server.erl @@ -20,6 +20,8 @@ -export([start_link/0]). -export([insert_listener/2]). -export([lookup_listener/1]). +-export([add_acceptor/2]). +-export([send_to_acceptors/2]). %% gen_server. -export([init/1]). @@ -31,9 +33,10 @@ -define(TAB, ?MODULE). --type key() :: {listener, any()}. +-type key() :: {listener | acceptors, any()}. +-type monitors() :: [{{reference(), pid()}, key()}]. -record(state, { - monitors = [] :: [{{reference(), pid()}, key()}] + monitors = [] :: monitors() }). %% API. @@ -54,6 +57,18 @@ insert_listener(Ref, Pid) -> lookup_listener(Ref) -> ets:lookup_element(?TAB, {listener, Ref}, 2). +%% @doc Add an acceptor for the given listener. +-spec add_acceptor(any(), pid()) -> ok. +add_acceptor(Ref, Pid) -> + gen_server:cast(?MODULE, {add_acceptor, Ref, Pid}). + +%% @doc Send a message to all acceptors of the given listener. +-spec send_to_acceptors(any(), any()) -> ok. +send_to_acceptors(Ref, Msg) -> + Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2), + _ = [Pid ! Msg || Pid <- Acceptors], + ok. + %% gen_server. %% @private @@ -68,17 +83,24 @@ handle_call(_Request, _From, State) -> %% @private handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) -> + true = ets:insert_new(?TAB, {{acceptors, Ref}, []}), MonitorRef = erlang:monitor(process, Pid), {noreply, State#state{ monitors=[{{MonitorRef, Pid}, {listener, Ref}}|Monitors]}}; +handle_cast({add_acceptor, Ref, Pid}, State=#state{monitors=Monitors}) -> + MonitorRef = erlang:monitor(process, Pid), + Acceptors = ets:lookup_element(?TAB, {acceptors, Ref}, 2), + true = ets:insert(?TAB, {{acceptors, Ref}, [Pid|Acceptors]}), + {noreply, State#state{ + monitors=[{{MonitorRef, Pid}, {acceptors, Ref}}|Monitors]}}; handle_cast(_Request, State) -> {noreply, State}. %% @private -handle_info({'DOWN', Ref, process, Pid, _}, State=#state{monitors=Monitors}) -> - {_, Key} = lists:keyfind({Ref, Pid}, 1, Monitors), - true = ets:delete(?TAB, Key), - Monitors2 = lists:keydelete({Ref, Pid}, 1, Monitors), +handle_info({'DOWN', MonitorRef, process, Pid, _}, + State=#state{monitors=Monitors}) -> + {_, Key} = lists:keyfind({MonitorRef, Pid}, 1, Monitors), + Monitors2 = remove_process(Key, MonitorRef, Pid, Monitors), {noreply, State#state{monitors=Monitors2}}; handle_info(_Info, State) -> {noreply, State}. @@ -90,3 +112,15 @@ terminate(_Reason, _State) -> %% @private code_change(_OldVsn, State, _Extra) -> {ok, State}. + +%% Internal. + +-spec remove_process(key(), reference(), pid(), Monitors) + -> Monitors when Monitors::monitors() . +remove_process(Key = {listener, _}, MonitorRef, Pid, Monitors) -> + true = ets:delete(?TAB, Key), + lists:keydelete({MonitorRef, Pid}, 1, Monitors); +remove_process(Key = {acceptors, _}, MonitorRef, Pid, Monitors) -> + Acceptors = ets:lookup_element(?TAB, Key, 2), + true = ets:insert(?TAB, {Key, lists:delete(Pid, Acceptors)}), + lists:keydelete({MonitorRef, Pid}, 1, Monitors). |