aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/ranch_acceptor.erl71
-rw-r--r--src/ranch_listener.erl193
-rw-r--r--src/ranch_listener_sup.erl3
-rw-r--r--src/ranch_server.erl29
-rw-r--r--test/acceptor_SUITE.erl67
-rw-r--r--test/notify_and_wait_protocol.erl11
-rw-r--r--test/remove_conn_and_wait_protocol.erl17
7 files changed, 216 insertions, 175 deletions
diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl
index 692277b..e03cde9 100644
--- a/src/ranch_acceptor.erl
+++ b/src/ranch_acceptor.erl
@@ -19,43 +19,62 @@
-export([start_link/6]).
%% Internal.
--export([acceptor/7]).
+-export([init/7]).
+-export([loop/7]).
%% API.
-spec start_link(any(), inet:socket(), module(), module(), pid(), pid())
-> {ok, pid()}.
start_link(Ref, LSocket, Transport, Protocol, ListenerPid, ConnsSup) ->
+ {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid),
{ok, Opts} = ranch_listener:get_protocol_options(ListenerPid),
- Pid = spawn_link(?MODULE, acceptor,
- [LSocket, Transport, Protocol, Opts, 1, ListenerPid, ConnsSup]),
+ Pid = spawn_link(?MODULE, init,
+ [LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup]),
ok = ranch_server:add_acceptor(Ref, Pid),
{ok, Pid}.
%% Internal.
--spec acceptor(inet:socket(), module(), module(), any(),
- non_neg_integer(), pid(), pid()) -> no_return().
-acceptor(LSocket, Transport, Protocol, Opts, OptsVsn, ListenerPid, ConnsSup) ->
- Res = case Transport:accept(LSocket, 2000) of
- {ok, CSocket} ->
- {ok, Pid} = supervisor:start_child(ConnsSup,
+-spec init(inet:socket(), module(), module(),
+ non_neg_integer(), any(), pid(), pid()) -> no_return().
+init(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) ->
+ async_accept(LSocket, Transport),
+ loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup).
+
+-spec loop(inet:socket(), module(), module(),
+ non_neg_integer(), any(), pid(), pid()) -> no_return().
+loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) ->
+ receive
+ {accept, CSocket} ->
+ {ok, ConnPid} = supervisor:start_child(ConnsSup,
[ListenerPid, CSocket, Transport, Protocol, Opts]),
- Transport:controlling_process(CSocket, Pid),
- ranch_listener:add_connection(ListenerPid,
- default, Pid, OptsVsn);
- {error, timeout} ->
- ranch_listener:check_upgrades(ListenerPid, OptsVsn);
- {error, _Reason} ->
- %% @todo Probably do something here. If the socket was closed,
- %% we may want to try and listen again on the port?
- ok
- end,
- case Res of
- ok ->
- ?MODULE:acceptor(LSocket, Transport, Protocol,
- Opts, OptsVsn, ListenerPid, ConnsSup);
- {upgrade, Opts2, OptsVsn2} ->
- ?MODULE:acceptor(LSocket, Transport, Protocol,
- Opts2, OptsVsn2, ListenerPid, ConnsSup)
+ Transport:controlling_process(CSocket, ConnPid),
+ ConnPid ! {shoot, ListenerPid},
+ NbConns = ranch_listener:add_connection(ListenerPid, ConnPid),
+ maybe_wait(ListenerPid, MaxConns, NbConns),
+ ?MODULE:init(LSocket, Transport, Protocol,
+ MaxConns, Opts, ListenerPid, ConnsSup);
+ {set_opts, Opts2} ->
+ ?MODULE:loop(LSocket, Transport, Protocol,
+ MaxConns, Opts2, ListenerPid, ConnsSup)
end.
+
+-spec maybe_wait(pid(), non_neg_integer(), non_neg_integer()) -> ok.
+maybe_wait(_, MaxConns, NbConns) when MaxConns > NbConns ->
+ ok;
+maybe_wait(ListenerPid, MaxConns, _) ->
+ erlang:yield(),
+ NbConns2 = ranch_server:count_connections(ListenerPid),
+ maybe_wait(ListenerPid, MaxConns, NbConns2).
+
+-spec async_accept(inet:socket(), module()) -> ok.
+async_accept(LSocket, Transport) ->
+ AcceptorPid = self(),
+ _ = spawn_link(fun() ->
+ %% @todo {error, closed} must be handled and other errors ignored.
+ {ok, CSocket} = Transport:accept(LSocket, infinity),
+ Transport:controlling_process(CSocket, AcceptorPid),
+ AcceptorPid ! {accept, CSocket}
+ end),
+ ok.
diff --git a/src/ranch_listener.erl b/src/ranch_listener.erl
index 40528f5..83ee658 100644
--- a/src/ranch_listener.erl
+++ b/src/ranch_listener.erl
@@ -17,14 +17,13 @@
-behaviour(gen_server).
%% API.
--export([start_link/2]).
+-export([start_link/3]).
-export([stop/1]).
--export([add_connection/4]).
--export([move_connection/3]).
--export([remove_connection/2]).
--export([check_upgrades/2]).
+-export([add_connection/2]).
+-export([remove_connection/1]).
-export([get_port/1]).
-export([set_port/2]).
+-export([get_max_connections/1]).
-export([get_protocol_options/1]).
-export([set_protocol_options/2]).
@@ -36,74 +35,41 @@
-export([terminate/2]).
-export([code_change/3]).
--type pools() :: [{atom(), non_neg_integer()}].
-
-record(state, {
- conn_pools = [] :: pools(),
- conns_table :: ets:tid(),
- queue = undefined :: queue(),
+ ref :: any(),
max_conns = undefined :: non_neg_integer(),
port = undefined :: undefined | inet:port_number(),
- proto_opts :: any(),
- proto_opts_vsn = 1 :: non_neg_integer()
+ proto_opts = undefined :: any(),
+ rm_diff = 0 :: non_neg_integer()
}).
%% API.
%% @private
-%%
-%% We set the process priority to high because ranch_listener is the central
-%% gen_server in Ranch and is used to manage all the incoming connections.
-%% Setting the process priority to high ensures the connection-related code
-%% will always be executed when a connection needs it, allowing Ranch to
-%% scale far beyond what it would with a normal priority.
--spec start_link(non_neg_integer(), any()) -> {ok, pid()}.
-start_link(MaxConns, ProtoOpts) ->
- gen_server:start_link(?MODULE, [MaxConns, ProtoOpts],
- [{spawn_opt, [{priority, high}]}]).
+-spec start_link(any(), non_neg_integer(), any()) -> {ok, pid()}.
+start_link(Ref, MaxConns, ProtoOpts) ->
+ gen_server:start_link(?MODULE, [Ref, MaxConns, ProtoOpts], []).
%% @private
-spec stop(pid()) -> stopped.
stop(ServerPid) ->
gen_server:call(ServerPid, stop).
-%% @doc Add a connection to the given pool in the listener.
-%%
-%% Pools of connections are used to restrict the maximum number of connections
-%% depending on their type. By default, Ranch add all connections to the
-%% pool <em>default</em>. It also checks for the maximum number of connections
-%% in that pool before accepting again. This function only returns when there
-%% is free space in the pool.
-%%
-%% When a process managing a connection dies, the process is removed from the
-%% pool. If the socket has been sent to another process, it is up to the
-%% protocol code to inform the listener of the new <em>ConnPid</em> by removing
-%% the previous and adding the new one.
+%% @doc Add a connection to the listener's pool.
+-spec add_connection(pid(), pid()) -> non_neg_integer().
+add_connection(ServerPid, ConnPid) ->
+ ok = gen_server:cast(ServerPid, {add_connection, ConnPid}),
+ ranch_server:add_connection(ServerPid).
+
+%% @doc Remove this process' connection from the pool.
%%
-%% This function also returns whether the protocol options have been modified.
-%% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of
-%% the atom 'ok'. The acceptor can then continue with the new protocol options.
--spec add_connection(pid(), atom(), pid(), non_neg_integer())
- -> ok | {upgrade, any(), non_neg_integer()}.
-add_connection(ServerPid, Pool, ConnPid, OptsVsn) ->
- gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn},
- infinity).
-
-%% @doc Move a connection from one pool to another.
--spec move_connection(pid(), atom(), pid()) -> ok.
-move_connection(ServerPid, DestPool, ConnPid) ->
- gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}).
-
-%% @doc Remove the given connection from its pool.
--spec remove_connection(pid(), pid()) -> ok.
-remove_connection(ServerPid, ConnPid) ->
- gen_server:cast(ServerPid, {remove_connection, ConnPid}).
-
-%% @doc Return whether a protocol upgrade is required.
--spec check_upgrades(pid(), non_neg_integer())
- -> ok | {upgrade, any(), non_neg_integer()}.
-check_upgrades(ServerPid, OptsVsn) ->
- gen_server:call(ServerPid, {check_upgrades, OptsVsn}).
+%% Useful if you have long-lived connections that aren't taking up
+%% resources and shouldn't be counted in the limited number of running
+%% connections.
+-spec remove_connection(pid()) -> non_neg_integer().
+remove_connection(ServerPid) ->
+ ok = gen_server:cast(ServerPid, remove_connection),
+ ranch_server:remove_connection(ServerPid).
%% @doc Return the listener's port.
-spec get_port(pid()) -> {ok, inet:port_number()}.
@@ -115,6 +81,12 @@ get_port(ServerPid) ->
set_port(ServerPid, Port) ->
gen_server:cast(ServerPid, {set_port, Port}).
+%% @doc Return the max number of connections allowed concurrently.
+%% @todo Add set_max_connections.
+-spec get_max_connections(pid()) -> {ok, non_neg_integer()}.
+get_max_connections(ServerPid) ->
+ gen_server:call(ServerPid, get_max_connections).
+
%% @doc Return the current protocol options.
-spec get_protocol_options(pid()) -> {ok, any()}.
get_protocol_options(ServerPid) ->
@@ -128,65 +100,41 @@ set_protocol_options(ServerPid, ProtoOpts) ->
%% gen_server.
%% @private
-init([MaxConns, ProtoOpts]) ->
- ConnsTable = ets:new(connections_table, [set, private]),
- Queue = queue:new(),
- {ok, #state{conns_table=ConnsTable, max_conns=MaxConns,
- proto_opts=ProtoOpts, queue=Queue}}.
+init([Ref, MaxConns, ProtoOpts]) ->
+ {ok, #state{ref=Ref, max_conns=MaxConns, proto_opts=ProtoOpts}}.
%% @private
-handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{
- conn_pools=Pools, conns_table=ConnsTable,
- queue=Queue, max_conns=MaxConns,
- proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->
- {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ConnsTable),
- State2 = State#state{conn_pools=Pools2},
- if AccOptsVsn =/= LisOptsVsn ->
- {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2};
- NbConns > MaxConns ->
- Queue2 = queue:in(From, Queue),
- {noreply, State2#state{queue=Queue2}};
- true ->
- {reply, ok, State2}
- end;
-handle_call({check_upgrades, AccOptsVsn}, _From, State=#state{
- proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->
- if AccOptsVsn =/= LisOptsVsn ->
- {reply, {upgrade, ProtoOpts, LisOptsVsn}, State};
- true ->
- {reply, ok, State}
- end;
handle_call(get_port, _From, State=#state{port=Port}) ->
{reply, {ok, Port}, State};
+handle_call(get_max_connections, _From, State=#state{max_conns=MaxConns}) ->
+ {reply, {ok, MaxConns}, State};
handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) ->
{reply, {ok, ProtoOpts}, State};
-handle_call({set_protocol_options, ProtoOpts}, _From,
- State=#state{proto_opts_vsn=OptsVsn}) ->
- {reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}};
+handle_call({set_protocol_options, ProtoOpts}, _From, State=#state{ref=Ref}) ->
+ ranch_server:send_to_acceptors(Ref, {set_opts, ProtoOpts}),
+ {reply, ok, State#state{proto_opts=ProtoOpts}};
handle_call(stop, _From, State) ->
{stop, normal, stopped, State};
handle_call(_, _From, State) ->
{reply, ignored, State}.
%% @private
+handle_cast({add_connection, ConnPid}, State) ->
+ _ = erlang:monitor(process, ConnPid),
+ {noreply, State};
+handle_cast(remove_connection, State=#state{rm_diff=RmDiff}) ->
+ {noreply, State#state{rm_diff=RmDiff + 1}};
handle_cast({set_port, Port}, State) ->
{noreply, State#state{port=Port}};
-handle_cast({move_connection, DestPool, ConnPid}, State=#state{
- conn_pools=Pools, conns_table=ConnsTable}) ->
- Pools2 = move_pid(ConnPid, DestPool, Pools, ConnsTable),
- {noreply, State#state{conn_pools=Pools2}};
-handle_cast({remove_connection, ConnPid}, State=#state{
- conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) ->
- {Pools2, Queue2} = remove_pid(ConnPid, Pools, ConnsTable, Queue),
- {noreply, State#state{conn_pools=Pools2, queue=Queue2}};
handle_cast(_Msg, State) ->
{noreply, State}.
%% @private
-handle_info({'DOWN', _Ref, process, Pid, _Info}, State=#state{
- conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) ->
- {Pools2, Queue2} = remove_pid(Pid, Pools, ConnsTable, Queue),
- {noreply, State#state{conn_pools=Pools2, queue=Queue2}};
+handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=0}) ->
+ _ = ranch_server:remove_connection(self()),
+ {noreply, State};
+handle_info({'DOWN', _, process, _, _}, State=#state{rm_diff=RmDiff}) ->
+ {noreply, State#state{rm_diff=RmDiff - 1}};
handle_info(_Info, State) ->
{noreply, State}.
@@ -197,50 +145,3 @@ terminate(_Reason, _State) ->
%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
-%% Internal.
-
-%% @private
--spec add_pid(pid(), atom(), pools(), ets:tid())
- -> {non_neg_integer(), pools()}.
-add_pid(ConnPid, Pool, Pools, ConnsTable) ->
- MonitorRef = erlang:monitor(process, ConnPid),
- ConnPid ! {shoot, self()},
- {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
- false ->
- {1, [{Pool, 1}|Pools]};
- {Pool, NbConns} ->
- NbConns2 = NbConns + 1,
- {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
- end,
- ets:insert(ConnsTable, {ConnPid, {MonitorRef, Pool}}),
- {NbConnsRet, Pools2}.
-
-%% @private
--spec move_pid(pid(), atom(), pools(), ets:tid()) -> pools().
-move_pid(ConnPid, DestPool, Pools, ConnsTable) ->
- {MonitorRef, SrcPool} = ets:lookup_element(ConnsTable, ConnPid, 2),
- ets:insert(ConnsTable, {ConnPid, {MonitorRef, DestPool}}),
- {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools),
- DestNbConns = case lists:keyfind(DestPool, 1, Pools) of
- false -> 1;
- {DestPool, NbConns} -> NbConns + 1
- end,
- Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)),
- [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2].
-
-%% @private
--spec remove_pid(pid(), pools(), ets:tid(), queue()) -> {pools(), queue()}.
-remove_pid(Pid, Pools, ConnsTable, Queue) ->
- {MonitorRef, Pool} = ets:lookup_element(ConnsTable, Pid, 2),
- erlang:demonitor(MonitorRef, [flush]),
- {Pool, NbConns} = lists:keyfind(Pool, 1, Pools),
- Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)],
- ets:delete(ConnsTable, Pid),
- case queue:out(Queue) of
- {{value, Client}, Queue2} ->
- gen_server:reply(Client, ok),
- {Pools2, Queue2};
- _ ->
- {Pools2, Queue}
- end.
diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl
index de35758..c8ba12d 100644
--- a/src/ranch_listener_sup.erl
+++ b/src/ranch_listener_sup.erl
@@ -30,7 +30,8 @@ start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) ->
MaxConns = proplists:get_value(max_connections, TransOpts, 1024),
{ok, SupPid} = supervisor:start_link(?MODULE, []),
{ok, ListenerPid} = supervisor:start_child(SupPid,
- {ranch_listener, {ranch_listener, start_link, [MaxConns, ProtoOpts]},
+ {ranch_listener, {ranch_listener, start_link,
+ [Ref, MaxConns, ProtoOpts]},
permanent, 5000, worker, [ranch_listener]}),
ok = ranch_server:insert_listener(Ref, ListenerPid),
{ok, ConnsPid} = supervisor:start_child(SupPid,
diff --git a/src/ranch_server.erl b/src/ranch_server.erl
index 16e892d..faec9b6 100644
--- a/src/ranch_server.erl
+++ b/src/ranch_server.erl
@@ -22,6 +22,9 @@
-export([lookup_listener/1]).
-export([add_acceptor/2]).
-export([send_to_acceptors/2]).
+-export([add_connection/1]).
+-export([count_connections/1]).
+-export([remove_connection/1]).
%% gen_server.
-export([init/1]).
@@ -69,12 +72,31 @@ send_to_acceptors(Ref, Msg) ->
_ = [Pid ! Msg || Pid <- Acceptors],
ok.
+%% @doc Add a connection to the connection pool.
+%%
+%% Also return the number of connections in the pool after this operation.
+-spec add_connection(pid()) -> non_neg_integer().
+add_connection(ListenerPid) ->
+ ets:update_counter(?TAB, {connections, ListenerPid}, 1).
+
+%% @doc Count the number of connections in the connection pool.
+-spec count_connections(pid()) -> non_neg_integer().
+count_connections(ListenerPid) ->
+ ets:update_counter(?TAB, {connections, ListenerPid}, 0).
+
+%% @doc Remove a connection from the connection pool.
+%%
+%% Also return the number of connections in the pool after this operation.
+-spec remove_connection(pid()) -> non_neg_integer().
+remove_connection(ListenerPid) ->
+ ets:update_counter(?TAB, {connections, ListenerPid}, -1).
+
%% gen_server.
%% @private
init([]) ->
?TAB = ets:new(?TAB, [
- ordered_set, public, named_table, {read_concurrency, true}]),
+ ordered_set, public, named_table, {write_concurrency, true}]),
{ok, #state{}}.
%% @private
@@ -84,6 +106,7 @@ handle_call(_Request, _From, State) ->
%% @private
handle_cast({insert_listener, Ref, Pid}, State=#state{monitors=Monitors}) ->
true = ets:insert_new(?TAB, {{acceptors, Ref}, []}),
+ true = ets:insert_new(?TAB, {{connections, Pid}, 0}),
MonitorRef = erlang:monitor(process, Pid),
{noreply, State#state{
monitors=[{{MonitorRef, Pid}, {listener, Ref}}|Monitors]}};
@@ -93,6 +116,9 @@ handle_cast({add_acceptor, Ref, Pid}, State=#state{monitors=Monitors}) ->
true = ets:insert(?TAB, {{acceptors, Ref}, [Pid|Acceptors]}),
{noreply, State#state{
monitors=[{{MonitorRef, Pid}, {acceptors, Ref}}|Monitors]}};
+handle_cast({add_connection, Pid}, State) ->
+ _ = erlang:monitor(process, Pid),
+ {noreply, State};
handle_cast(_Request, State) ->
{noreply, State}.
@@ -120,6 +146,7 @@ code_change(_OldVsn, State, _Extra) ->
remove_process(Key = {listener, Ref}, MonitorRef, Pid, Monitors) ->
true = ets:delete(?TAB, Key),
true = ets:delete(?TAB, {acceptors, Ref}),
+ true = ets:delete(?TAB, {connections, Pid}),
lists:keydelete({MonitorRef, Pid}, 1, Monitors);
remove_process(Key = {acceptors, _}, MonitorRef, Pid, Monitors) ->
Acceptors = ets:lookup_element(?TAB, Key, 2),
diff --git a/test/acceptor_SUITE.erl b/test/acceptor_SUITE.erl
index c1313cc..389f375 100644
--- a/test/acceptor_SUITE.erl
+++ b/test/acceptor_SUITE.erl
@@ -29,6 +29,9 @@
%% tcp.
-export([tcp_echo/1]).
+-export([tcp_max_connections/1]).
+-export([tcp_max_connections_and_beyond/1]).
+-export([tcp_upgrade/1]).
%% ct.
@@ -37,7 +40,10 @@ all() ->
groups() ->
[{tcp, [
- tcp_echo
+ tcp_echo,
+ tcp_max_connections,
+ tcp_max_connections_and_beyond,
+ tcp_upgrade
]}, {ssl, [
ssl_echo
]}].
@@ -100,3 +106,62 @@ tcp_echo(_) ->
%% Make sure the listener stopped.
{'EXIT', _} = begin catch ranch:get_port(tcp_echo) end,
ok.
+
+tcp_max_connections(_) ->
+ {ok, _} = ranch:start_listener(tcp_max_connections, 1,
+ ranch_tcp, [{port, 0}, {max_connections, 10}],
+ notify_and_wait_protocol, [{msg, connected}, {pid, self()}]),
+ Port = ranch:get_port(tcp_max_connections),
+ %% @todo We'll probably want a more direct interface to count_connections.
+ ListenerPid = ranch_server:lookup_listener(tcp_max_connections),
+ ok = connect_loop(Port, 11, 150),
+ 10 = ranch_server:count_connections(ListenerPid),
+ 10 = receive_loop(connected, 400),
+ 1 = receive_loop(connected, 1000).
+
+tcp_max_connections_and_beyond(_) ->
+ {ok, _} = ranch:start_listener(tcp_max_connections_and_beyond, 1,
+ ranch_tcp, [{port, 0}, {max_connections, 10}],
+ remove_conn_and_wait_protocol, [{remove, true}]),
+ Port = ranch:get_port(tcp_max_connections_and_beyond),
+ %% @todo We'll probably want a more direct interface to count_connections.
+ ListenerPid = ranch_server:lookup_listener(tcp_max_connections_and_beyond),
+ ok = connect_loop(Port, 10, 0),
+ 0 = ranch_server:count_connections(ListenerPid),
+ ranch:set_protocol_options(tcp_max_connections_and_beyond,
+ [{remove, false}]),
+ receive after 500 -> ok end,
+ ok = connect_loop(Port, 10, 0),
+ receive after 500 -> ok end,
+ 10 = ranch_server:count_connections(ListenerPid).
+
+tcp_upgrade(_) ->
+ receive after 20000 -> ok end,
+ {ok, _} = ranch:start_listener(tcp_upgrade, 1,
+ ranch_tcp, [{port, 0}],
+ notify_and_wait_protocol, [{msg, connected}, {pid, self()}]),
+ Port = ranch:get_port(tcp_upgrade),
+ ok = connect_loop(Port, 1, 0),
+ receive connected -> ok after 1000 -> error(timeout) end,
+ ranch:set_protocol_options(tcp_upgrade, [{msg, upgraded}, {pid, self()}]),
+ ok = connect_loop(Port, 1, 0),
+ receive upgraded -> ok after 1000 -> error(timeout) end.
+
+%% Utility functions.
+
+connect_loop(_, 0, _) ->
+ ok;
+connect_loop(Port, N, Sleep) ->
+ {ok, _} = gen_tcp:connect("localhost", Port,
+ [binary, {active, false}, {packet, raw}]),
+ receive after Sleep -> ok end,
+ connect_loop(Port, N - 1, Sleep).
+
+receive_loop(Message, Timeout) ->
+ receive_loop(Message, Timeout, 0).
+receive_loop(Message, Timeout, N) ->
+ receive Message ->
+ receive_loop(Message, Timeout, N + 1)
+ after Timeout ->
+ N
+ end.
diff --git a/test/notify_and_wait_protocol.erl b/test/notify_and_wait_protocol.erl
new file mode 100644
index 0000000..27542f2
--- /dev/null
+++ b/test/notify_and_wait_protocol.erl
@@ -0,0 +1,11 @@
+-module(notify_and_wait_protocol).
+-export([start_link/4]).
+-export([init/2]).
+
+start_link(_, _, _, [{msg, Msg}, {pid, TestPid}]) ->
+ Pid = spawn_link(?MODULE, init, [Msg, TestPid]),
+ {ok, Pid}.
+
+init(Msg, Pid) ->
+ Pid ! Msg,
+ receive after 2500 -> ok end.
diff --git a/test/remove_conn_and_wait_protocol.erl b/test/remove_conn_and_wait_protocol.erl
new file mode 100644
index 0000000..8fffca7
--- /dev/null
+++ b/test/remove_conn_and_wait_protocol.erl
@@ -0,0 +1,17 @@
+-module(remove_conn_and_wait_protocol).
+-export([start_link/4]).
+-export([init/2]).
+
+start_link(ListenerPid, _, _, [{remove, MaybeRemove}]) ->
+ Pid = spawn_link(?MODULE, init, [ListenerPid, MaybeRemove]),
+ {ok, Pid}.
+
+init(ListenerPid, MaybeRemove) ->
+ ranch:accept_ack(ListenerPid),
+ case MaybeRemove of
+ true ->
+ ranch_listener:remove_connection(ListenerPid);
+ false ->
+ ok
+ end,
+ receive after 2500 -> ok end.