diff options
-rw-r--r-- | src/ranch_acceptor.erl | 71 | ||||
-rw-r--r-- | src/ranch_listener.erl | 193 | ||||
-rw-r--r-- | src/ranch_listener_sup.erl | 3 | ||||
-rw-r--r-- | src/ranch_server.erl | 29 | ||||
-rw-r--r-- | test/acceptor_SUITE.erl | 67 | ||||
-rw-r--r-- | test/notify_and_wait_protocol.erl | 11 | ||||
-rw-r--r-- | test/remove_conn_and_wait_protocol.erl | 17 |
7 files changed, 216 insertions, 175 deletions
diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl index 692277b..e03cde9 100644 --- a/src/ranch_acceptor.erl +++ b/src/ranch_acceptor.erl @@ -19,43 +19,62 @@ -export([start_link/6]). %% Internal. --export([acceptor/7]). +-export([init/7]). +-export([loop/7]). %% API. -spec start_link(any(), inet:socket(), module(), module(), pid(), pid()) -> {ok, pid()}. start_link(Ref, LSocket, Transport, Protocol, ListenerPid, ConnsSup) -> + {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid), {ok, Opts} = ranch_listener:get_protocol_options(ListenerPid), - Pid = spawn_link(?MODULE, acceptor, - [LSocket, Transport, Protocol, Opts, 1, ListenerPid, ConnsSup]), + Pid = spawn_link(?MODULE, init, + [LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup]), ok = ranch_server:add_acceptor(Ref, Pid), {ok, Pid}. %% Internal. --spec acceptor(inet:socket(), module(), module(), any(), - non_neg_integer(), pid(), pid()) -> no_return(). -acceptor(LSocket, Transport, Protocol, Opts, OptsVsn, ListenerPid, ConnsSup) -> - Res = case Transport:accept(LSocket, 2000) of - {ok, CSocket} -> - {ok, Pid} = supervisor:start_child(ConnsSup, +-spec init(inet:socket(), module(), module(), + non_neg_integer(), any(), pid(), pid()) -> no_return(). +init(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) -> + async_accept(LSocket, Transport), + loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup). + +-spec loop(inet:socket(), module(), module(), + non_neg_integer(), any(), pid(), pid()) -> no_return(). +loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) -> + receive + {accept, CSocket} -> + {ok, ConnPid} = supervisor:start_child(ConnsSup, [ListenerPid, CSocket, Transport, Protocol, Opts]), - Transport:controlling_process(CSocket, Pid), - ranch_listener:add_connection(ListenerPid, - default, Pid, OptsVsn); - {error, timeout} -> - ranch_listener:check_upgrades(ListenerPid, OptsVsn); - {error, _Reason} -> - %% @todo Probably do something here. If the socket was closed, - %% we may want to try and listen again on the port? - ok - end, - case Res of - ok -> - ?MODULE:acceptor(LSocket, Transport, Protocol, - Opts, OptsVsn, ListenerPid, ConnsSup); - {upgrade, Opts2, OptsVsn2} -> - ?MODULE:acceptor(LSocket, Transport, Protocol, - Opts2, OptsVsn2, ListenerPid, ConnsSup) + Transport:controlling_process(CSocket, ConnPid), + ConnPid ! {shoot, ListenerPid}, + NbConns = ranch_listener:add_connection(ListenerPid, ConnPid), + maybe_wait(ListenerPid, MaxConns, NbConns), + ?MODULE:init(LSocket, Transport, Protocol, + MaxConns, Opts, ListenerPid, ConnsSup); + {set_opts, Opts2} -> + ?MODULE:loop(LSocket, Transport, Protocol, + MaxConns, Opts2, ListenerPid, ConnsSup) end. + +-spec maybe_wait(pid(), non_neg_integer(), non_neg_integer()) -> ok. +maybe_wait(_, MaxConns, NbConns) when MaxConns > NbConns -> + ok; +maybe_wait(ListenerPid, MaxConns, _) -> + erlang:yield(), + NbConns2 = ranch_server:count_connections(ListenerPid), + maybe_wait(ListenerPid, MaxConns, NbConns2). + +-spec async_accept(inet:socket(), module()) -> ok. +async_accept(LSocket, Transport) -> + AcceptorPid = self(), + _ = spawn_link(fun() -> + %% @todo {error, closed} must be handled and other errors ignored. + {ok, CSocket} = Transport:accept(LSocket, infinity), + Transport:controlling_process(CSocket, AcceptorPid), + AcceptorPid ! {accept, CSocket} + end), + ok. diff --git a/src/ranch_listener.erl b/src/ranch_listener.erl index 40528f5..83ee658 100644 --- a/src/ranch_listener.erl +++ b/src/ranch_listener.erl @@ -17,14 +17,13 @@ -behaviour(gen_server). %% API. --export([start_link/2]). +-export([start_link/3]). -export([stop/1]). --export([add_connection/4]). --export([move_connection/3]). --export([remove_connection/2]). --export([check_upgrades/2]). +-export([add_connection/2]). +-export([remove_connection/1]). -export([get_port/1]). -export([set_port/2]). +-export([get_max_connections/1]). -export([get_protocol_options/1]). -export([set_protocol_options/2]). @@ -36,74 +35,41 @@ -export([terminate/2]). -export([code_change/3]). --type pools() :: [{atom(), non_neg_integer()}]. - -record(state, { - conn_pools = [] :: pools(), - conns_table :: ets:tid(), - queue = undefined :: queue(), + ref :: any(), max_conns = undefined :: non_neg_integer(), port = undefined :: undefined | inet:port_number(), - proto_opts :: any(), - proto_opts_vsn = 1 :: non_neg_integer() + proto_opts = undefined :: any(), + rm_diff = 0 :: non_neg_integer() }). %% API. %% @private -%% -%% We set the process priority to high because ranch_listener is the central -%% gen_server in Ranch and is used to manage all the incoming connections. -%% Setting the process priority to high ensures the connection-related code -%% will always be executed when a connection needs it, allowing Ranch to -%% scale far beyond what it would with a normal priority. --spec start_link(non_neg_integer(), any()) -> {ok, pid()}. -start_link(MaxConns, ProtoOpts) -> - gen_server:start_link(?MODULE, [MaxConns, ProtoOpts], - [{spawn_opt, [{priority, high}]}]). +-spec start_link(any(), non_neg_integer(), any()) -> {ok, pid()}. +start_link(Ref, MaxConns, ProtoOpts) -> + gen_server:start_link(?MODULE, [Ref, MaxConns, ProtoOpts], []). %% @private -spec stop(pid()) -> stopped. stop(ServerPid) -> gen_server:call(ServerPid, stop). -%% @doc Add a connection to the given pool in the listener. -%% -%% Pools of connections are used to restrict the maximum number of connections -%% depending on their type. By default, Ranch add all connections to the -%% pool <em>default</em>. It also checks for the maximum number of connections -%% in that pool before accepting again. This function only returns when there -%% is free space in the pool. -%% -%% When a process managing a connection dies, the process is removed from the -%% pool. If the socket has been sent to another process, it is up to the -%% protocol code to inform the listener of the new <em>ConnPid</em> by removing -%% the previous and adding the new one. +%% @doc Add a connection to the listener's pool. +-spec add_connection(pid(), pid()) -> non_neg_integer(). +add_connection(ServerPid, ConnPid) -> + ok = gen_server:cast(ServerPid, {add_connection, ConnPid}), + ranch_server:add_connection(ServerPid). + +%% @doc Remove this process' connection from the pool. %% -%% This function also returns whether the protocol options have been modified. -%% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of -%% the atom 'ok'. The acceptor can then continue with the new protocol options. --spec add_connection(pid(), atom(), pid(), non_neg_integer()) - -> ok | {upgrade, any(), non_neg_integer()}. -add_connection(ServerPid, Pool, ConnPid, OptsVsn) -> - gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn}, - infinity). - -%% @doc Move a connection from one pool to another. --spec move_connection(pid(), atom(), pid()) -> ok. -move_connection(ServerPid, DestPool, ConnPid) -> - gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}). - -%% @doc Remove the given connection from its pool. --spec remove_connection(pid(), pid()) -> ok. -remove_connection(ServerPid, ConnPid) -> - gen_server:cast(ServerPid, {remove_connection, ConnPid}). - -%% @doc Return whether a protocol upgrade is required. --spec check_upgrades(pid(), non_neg_integer()) - -> ok | {upgrade, any(), non_neg_integer()}. -check_upgrades(ServerPid, OptsVsn) -> - gen_server:call(ServerPid, {check_upgrades, OptsVsn}). +%% Useful if you have long-lived connections that aren't taking up +%% resources and shouldn't be counted in the limited number of running +%% connections. +-spec remove_connection(pid()) -> non_neg_integer(). +remove_connection(ServerPid) -> + ok = gen_server:cast(ServerPid, remove_connection), + ranch_server:remove_connection(ServerPid). %% @doc Return the listener's port. -spec get_port(pid()) -> {ok, inet:port_number()}. @@ -115,6 +81,12 @@ get_port(ServerPid) -> set_port(ServerPid, Port) -> gen_server:cast(ServerPid, {set_port, Port}). +%% @doc Return the max number of connections allowed concurrently. +%% @todo Add set_max_connections. +-spec get_max_connections(pid()) -> {ok, non_neg_integer()}. +get_max_connections(ServerPid) -> + gen_server:call(ServerPid, get_max_connections). + %% @doc Return the current protocol options. -spec get_protocol_options(pid()) -> {ok, any()}. get_protocol_options(ServerPid) -> @@ -128,65 +100,41 @@ set_protocol_options(ServerPid, ProtoOpts) -> %% gen_server. %% @private -init([MaxConns, ProtoOpts]) -> - ConnsTable = ets:new(connections_table, [set, private]), - Queue = queue:new(), - {ok, #state{conns_table=ConnsTable, max_conns=MaxConns, - proto_opts=ProtoOpts, queue=Queue}}. +init([Ref, MaxConns, ProtoOpts]) -> + {ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}. %% @private -handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{ - conn_pools=Pools, conns_table=ConnsTable, - queue=Queue, max_conns=MaxConns, - proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> - {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ConnsTable), - State2 = State#state{conn_pools=Pools2}, - if AccOptsVsn =/= LisOptsVsn -> - {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2}; - NbConns > MaxConns -> - Queue2 = queue:in(From, Queue), - {noreply, State2#state{queue=Queue2}}; - true -> - {reply, ok, State2} - end; -handle_call({check_upgrades, AccOptsVsn}, _From, State=#state{ - proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> - if AccOptsVsn =/= LisOptsVsn -> - {reply, {upgrade, ProtoOpts, LisOptsVsn}, State}; - true -> - {reply, ok, State} - end; handle_call(get_port, _From, State=#state{port=Port}) -> {reply, {ok, Port}, State}; +handle_call(get_max_connections, _From, State=#state{max_conns=MaxConns}) -> + {reply, {ok, MaxConns}, State}; handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) -> {reply, {ok, ProtoOpts}, State}; -handle_call({set_protocol_options, ProtoOpts}, _From, - State=#state{proto_opts_vsn=OptsVsn}) -> - {reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}}; +handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{ref=Ref}) -> + ranch_server:send_to_acceptors(Ref, {set_opts, ProtoOpts}), + {reply, ok, State#state{proto_opts=ProtoOpts}}; handle_call(stop, _From, State) -> {stop, normal, stopped, State}; handle_call(_, _From, State) -> {reply, ignored, State}. %% @private +handle_cast({add_connection, ConnPid}, State) -> + _ = erlang:monitor(process, ConnPid), + {noreply, State}; +handle_cast(remove_connection, State=#state{rm_diff=RmDiff}) -> + {noreply, State#state{rm_diff=RmDiff + 1}}; handle_cast({set_port, Port}, State) -> {noreply, State#state{port=Port}}; -handle_cast({move_connection, DestPool, ConnPid}, State=#state{ - conn_pools=Pools, conns_table=ConnsTable}) -> - Pools2 = move_pid(ConnPid, DestPool, Pools, ConnsTable), - {noreply, State#state{conn_pools=Pools2}}; -handle_cast({remove_connection, ConnPid}, State=#state{ - conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) -> - {Pools2, Queue2} = remove_pid(ConnPid, Pools, ConnsTable, Queue), - {noreply, State#state{conn_pools=Pools2, queue=Queue2}}; handle_cast(_Msg, State) -> {noreply, State}. %% @private -handle_info({'DOWN', _Ref, process, Pid, _Info}, State=#state{ - conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) -> - {Pools2, Queue2} = remove_pid(Pid, Pools, ConnsTable, Queue), - {noreply, State#state{conn_pools=Pools2, queue=Queue2}}; +handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=0}) -> + _ = ranch_server:remove_connection(self()), + {noreply, State}; +handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=RmDiff}) -> + {noreply, State#state{rm_diff=RmDiff - 1}}; handle_info(_Info, State) -> {noreply, State}. @@ -197,50 +145,3 @@ terminate(_Reason, _State) -> %% @private code_change(_OldVsn, State, _Extra) -> {ok, State}. - -%% Internal. - -%% @private --spec add_pid(pid(), atom(), pools(), ets:tid()) - -> {non_neg_integer(), pools()}. -add_pid(ConnPid, Pool, Pools, ConnsTable) -> - MonitorRef = erlang:monitor(process, ConnPid), - ConnPid ! {shoot, self()}, - {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of - false -> - {1, [{Pool, 1}|Pools]}; - {Pool, NbConns} -> - NbConns2 = NbConns + 1, - {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} - end, - ets:insert(ConnsTable, {ConnPid, {MonitorRef, Pool}}), - {NbConnsRet, Pools2}. - -%% @private --spec move_pid(pid(), atom(), pools(), ets:tid()) -> pools(). -move_pid(ConnPid, DestPool, Pools, ConnsTable) -> - {MonitorRef, SrcPool} = ets:lookup_element(ConnsTable, ConnPid, 2), - ets:insert(ConnsTable, {ConnPid, {MonitorRef, DestPool}}), - {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools), - DestNbConns = case lists:keyfind(DestPool, 1, Pools) of - false -> 1; - {DestPool, NbConns} -> NbConns + 1 - end, - Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)), - [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2]. - -%% @private --spec remove_pid(pid(), pools(), ets:tid(), queue()) -> {pools(), queue()}. -remove_pid(Pid, Pools, ConnsTable, Queue) -> - {MonitorRef, Pool} = ets:lookup_element(ConnsTable, Pid, 2), - erlang:demonitor(MonitorRef, [flush]), - {Pool, NbConns} = lists:keyfind(Pool, 1, Pools), - Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)], - ets:delete(ConnsTable, Pid), - case queue:out(Queue) of - {{value, Client}, Queue2} -> - gen_server:reply(Client, ok), - {Pools2, Queue2}; - _ -> - {Pools2, Queue} - end. diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl index de35758..c8ba12d 100644 --- a/src/ranch_listener_sup.erl +++ b/src/ranch_listener_sup.erl @@ -30,7 +30,8 @@ start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> MaxConns = proplists:get_value(max_connections, TransOpts, 1024), {ok, SupPid} = supervisor:start_link(?MODULE, []), {ok, ListenerPid} = supervisor:start_child(SupPid, - {ranch_listener, {ranch_listener, start_link, [MaxConns, ProtoOpts]}, + {ranch_listener, {ranch_listener, start_link, + [Ref, MaxConns, ProtoOpts]}, permanent, 5000, worker, [ranch_listener]}), ok = ranch_server:insert_listener(Ref, ListenerPid), {ok, ConnsPid} = supervisor:start_child(SupPid, diff --git a/src/ranch_server.erl b/src/ranch_server.erl index 16e892d..faec9b6 100644 --- a/src/ranch_server.erl +++ b/src/ranch_server.erl @@ -22,6 +22,9 @@ -export([lookup_listener/1]). -export([add_acceptor/2]). -export([send_to_acceptors/2]). +-export([add_connection/1]). +-export([count_connections/1]). +-export([remove_connection/1]). %% gen_server. -export([init/1]). @@ -69,12 +72,31 @@ send_to_acceptors(Ref, Msg) -> _ = [Pid ! Msg || Pid <- Acceptors], ok. +%% @doc Add a connection to the connection pool. +%% +%% Also return the number of connections in the pool after this operation. +-spec add_connection(pid()) -> non_neg_integer(). +add_connection(ListenerPid) -> + ets:update_counter(?TAB, {connections, ListenerPid}, 1). + +%% @doc Count the number of connections in the connection pool. +-spec count_connections(pid()) -> non_neg_integer(). +count_connections(ListenerPid) -> + ets:update_counter(?TAB, {connections, ListenerPid}, 0). + +%% @doc Remove a connection from the connection pool. +%% +%% Also return the number of connections in the pool after this operation. +-spec remove_connection(pid()) -> non_neg_integer(). +remove_connection(ListenerPid) -> + ets:update_counter(?TAB, {connections, ListenerPid}, -1). + %% gen_server. %% @private init([]) -> ?TAB = ets:new(?TAB, [ - ordered_set, public, named_table, {read_concurrency, true}]), + ordered_set, public, named_table, {write_concurrency, true}]), {ok, #state{}}. %% @private @@ -84,6 +106,7 @@ handle_call(_Request, _From, State) -> %% @private handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) -> true = ets:insert_new(?TAB, {{acceptors, Ref}, []}), + true = ets:insert_new(?TAB, {{connections, Pid}, 0}), MonitorRef = erlang:monitor(process, Pid), {noreply, State#state{ monitors=[{{MonitorRef, Pid}, {listener, Ref}}|Monitors]}}; @@ -93,6 +116,9 @@ handle_cast({add_acceptor, Ref, Pid}, State=#state{monitors=Monitors}) -> true = ets:insert(?TAB, {{acceptors, Ref}, [Pid|Acceptors]}), {noreply, State#state{ monitors=[{{MonitorRef, Pid}, {acceptors, Ref}}|Monitors]}}; +handle_cast({add_connection, Pid}, State) -> + _ = erlang:monitor(process, Pid), + {noreply, State}; handle_cast(_Request, State) -> {noreply, State}. @@ -120,6 +146,7 @@ code_change(_OldVsn, State, _Extra) -> remove_process(Key = {listener, Ref}, MonitorRef, Pid, Monitors) -> true = ets:delete(?TAB, Key), true = ets:delete(?TAB, {acceptors, Ref}), + true = ets:delete(?TAB, {connections, Pid}), lists:keydelete({MonitorRef, Pid}, 1, Monitors); remove_process(Key = {acceptors, _}, MonitorRef, Pid, Monitors) -> Acceptors = ets:lookup_element(?TAB, Key, 2), diff --git a/test/acceptor_SUITE.erl b/test/acceptor_SUITE.erl index c1313cc..389f375 100644 --- a/test/acceptor_SUITE.erl +++ b/test/acceptor_SUITE.erl @@ -29,6 +29,9 @@ %% tcp. -export([tcp_echo/1]). +-export([tcp_max_connections/1]). +-export([tcp_max_connections_and_beyond/1]). +-export([tcp_upgrade/1]). %% ct. @@ -37,7 +40,10 @@ all() -> groups() -> [{tcp, [ - tcp_echo + tcp_echo, + tcp_max_connections, + tcp_max_connections_and_beyond, + tcp_upgrade ]}, {ssl, [ ssl_echo ]}]. @@ -100,3 +106,62 @@ tcp_echo(_) -> %% Make sure the listener stopped. {'EXIT', _} = begin catch ranch:get_port(tcp_echo) end, ok. + +tcp_max_connections(_) -> + {ok, _} = ranch:start_listener(tcp_max_connections, 1, + ranch_tcp, [{port, 0}, {max_connections, 10}], + notify_and_wait_protocol, [{msg, connected}, {pid, self()}]), + Port = ranch:get_port(tcp_max_connections), + %% @todo We'll probably want a more direct interface to count_connections. + ListenerPid = ranch_server:lookup_listener(tcp_max_connections), + ok = connect_loop(Port, 11, 150), + 10 = ranch_server:count_connections(ListenerPid), + 10 = receive_loop(connected, 400), + 1 = receive_loop(connected, 1000). + +tcp_max_connections_and_beyond(_) -> + {ok, _} = ranch:start_listener(tcp_max_connections_and_beyond, 1, + ranch_tcp, [{port, 0}, {max_connections, 10}], + remove_conn_and_wait_protocol, [{remove, true}]), + Port = ranch:get_port(tcp_max_connections_and_beyond), + %% @todo We'll probably want a more direct interface to count_connections. + ListenerPid = ranch_server:lookup_listener(tcp_max_connections_and_beyond), + ok = connect_loop(Port, 10, 0), + 0 = ranch_server:count_connections(ListenerPid), + ranch:set_protocol_options(tcp_max_connections_and_beyond, + [{remove, false}]), + receive after 500 -> ok end, + ok = connect_loop(Port, 10, 0), + receive after 500 -> ok end, + 10 = ranch_server:count_connections(ListenerPid). + +tcp_upgrade(_) -> + receive after 20000 -> ok end, + {ok, _} = ranch:start_listener(tcp_upgrade, 1, + ranch_tcp, [{port, 0}], + notify_and_wait_protocol, [{msg, connected}, {pid, self()}]), + Port = ranch:get_port(tcp_upgrade), + ok = connect_loop(Port, 1, 0), + receive connected -> ok after 1000 -> error(timeout) end, + ranch:set_protocol_options(tcp_upgrade, [{msg, upgraded}, {pid, self()}]), + ok = connect_loop(Port, 1, 0), + receive upgraded -> ok after 1000 -> error(timeout) end. + +%% Utility functions. + +connect_loop(_, 0, _) -> + ok; +connect_loop(Port, N, Sleep) -> + {ok, _} = gen_tcp:connect("localhost", Port, + [binary, {active, false}, {packet, raw}]), + receive after Sleep -> ok end, + connect_loop(Port, N - 1, Sleep). + +receive_loop(Message, Timeout) -> + receive_loop(Message, Timeout, 0). +receive_loop(Message, Timeout, N) -> + receive Message -> + receive_loop(Message, Timeout, N + 1) + after Timeout -> + N + end. diff --git a/test/notify_and_wait_protocol.erl b/test/notify_and_wait_protocol.erl new file mode 100644 index 0000000..27542f2 --- /dev/null +++ b/test/notify_and_wait_protocol.erl @@ -0,0 +1,11 @@ +-module(notify_and_wait_protocol). +-export([start_link/4]). +-export([init/2]). + +start_link(_, _, _, [{msg, Msg}, {pid, TestPid}]) -> + Pid = spawn_link(?MODULE, init, [Msg, TestPid]), + {ok, Pid}. + +init(Msg, Pid) -> + Pid ! Msg, + receive after 2500 -> ok end. diff --git a/test/remove_conn_and_wait_protocol.erl b/test/remove_conn_and_wait_protocol.erl new file mode 100644 index 0000000..8fffca7 --- /dev/null +++ b/test/remove_conn_and_wait_protocol.erl @@ -0,0 +1,17 @@ +-module(remove_conn_and_wait_protocol). +-export([start_link/4]). +-export([init/2]). + +start_link(ListenerPid, _, _, [{remove, MaybeRemove}]) -> + Pid = spawn_link(?MODULE, init, [ListenerPid, MaybeRemove]), + {ok, Pid}. + +init(ListenerPid, MaybeRemove) -> + ranch:accept_ack(ListenerPid), + case MaybeRemove of + true -> + ranch_listener:remove_connection(ListenerPid); + false -> + ok + end, + receive after 2500 -> ok end. |