aboutsummaryrefslogtreecommitdiffstats
path: root/test/pool_SUITE.erl
diff options
context:
space:
mode:
Diffstat (limited to 'test/pool_SUITE.erl')
-rw-r--r--test/pool_SUITE.erl376
1 files changed, 376 insertions, 0 deletions
diff --git a/test/pool_SUITE.erl b/test/pool_SUITE.erl
new file mode 100644
index 0000000..a48b75d
--- /dev/null
+++ b/test/pool_SUITE.erl
@@ -0,0 +1,376 @@
+%% Copyright (c) 2021, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(pool_SUITE).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-import(ct_helper, [doc/1]).
+-import(ct_helper, [config/2]).
+-import(gun_test, [receive_from/1]).
+
+all() ->
+ ct_helper:all(?MODULE).
+
+init_per_suite(Config) ->
+ {ok, _} = cowboy:start_clear({?MODULE, tcp}, [], do_proto_opts()),
+ Port = ranch:get_port({?MODULE, tcp}),
+ [{port, Port}|Config].
+
+end_per_suite(_) ->
+ ExtraListeners = [
+ max_streams_h2_size_1,
+ max_streams_h2_size_2,
+ reconnect_h1
+ ],
+ _ = [cowboy:stop_listener(Listener) || Listener <- ExtraListeners],
+ ok.
+
+do_proto_opts() ->
+ Routes = [
+ {"/", hello_h, []},
+ {"/delay", delayed_hello_h, 3000},
+ {"/ws", ws_echo_h, []}
+ ],
+ #{
+ env => #{dispatch => cowboy_router:compile([{'_', Routes}])}
+ }.
+
+%% Tests.
+
+hello_pool_h1(Config) ->
+ doc("Confirm the pool can be used for HTTP/1.1 connections."),
+ Port = config(port, Config),
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => ["localhost:", integer_to_binary(Port)]},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams].
+
+hello_pool_h2(Config) ->
+ doc("Confirm the pool can be used for HTTP/2 connections."),
+ Port = config(port, Config),
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => ["localhost:", integer_to_binary(Port)]},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams].
+
+hello_pool_ws(Config) ->
+ doc("Confirm the pool can be used for HTTP/1.1 connections upgraded to Websocket."),
+ Port = config(port, Config),
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{
+ protocols => [http],
+ ws_opts => #{
+ default_protocol => pool_ws_handler,
+ user_opts => self()
+ }
+ },
+ scope => ?FUNCTION_NAME,
+ setup_fun => {fun
+ (ConnPid, {gun_up, _, http}, SetupState) ->
+ _ = gun:ws_upgrade(ConnPid, "/ws"),
+ {setup, SetupState};
+ (_, {gun_upgrade, _, StreamRef, _, _}, _) ->
+ {up, ws, #{ws => StreamRef}};
+ (ConnPid, Msg, SetupState) ->
+ ct:pal("Unexpected setup message for ~p: ~p", [ConnPid, Msg]),
+ {setup, SetupState}
+ end, undefined}
+ }),
+ gun_pool:await_up(ManagerPid),
+ _ = [gun_pool:ws_send({text, <<"Hello world!">>}, #{
+ authority => ["localhost:", integer_to_binary(Port)],
+ scope => ?FUNCTION_NAME
+ }) || _ <- lists:seq(1, 8)],
+ %% The pool_ws_handler module sends frames back to us.
+ _ = [receive
+ {text, <<"Hello world!">>} ->
+ ok
+ end || _ <- lists:seq(1, 8)].
+
+max_streams_h1(Config) ->
+ doc("Confirm requests are rejected when the maximum number "
+ "of streams is reached for HTTP/1.1 connections."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME,
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ {async, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}),
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}).
+
+max_streams_h1_retry(Config) ->
+ doc("Confirm connection checkout is retried when the maximum number "
+ "of streams is reached for HTTP/1.1 connections."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME,
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ {async, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}),
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}),
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{
+ checkout_retry => [100, 500, 500, 500, 500, 500, 500],
+ scope => ?FUNCTION_NAME
+ }).
+
+max_streams_h2_size_1(_) ->
+ doc("Confirm requests are rejected when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ [{async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}) || _ <- lists:seq(1, 5)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}).
+
+max_streams_h2_size_1_retry(_) ->
+ doc("Confirm connection checkout is retried when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ [{async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}) || _ <- lists:seq(1, 5)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{
+ checkout_retry => [100, 500, 500, 500, 500, 500, 500]
+ }).
+
+max_streams_h2_size_2(_) ->
+ doc("Confirm requests are rejected when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 2
+ }),
+ gun_pool:await_up(ManagerPid),
+ [begin
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ %% We need to wait a bit for the request to be sent because the
+ %% request is sent and counted asynchronously.
+ timer:sleep(10)
+ end || _ <- lists:seq(1, 10)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}).
+
+max_streams_h2_size_2_retry(_) ->
+ doc("Confirm connection checkout is retried when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 2
+ }),
+ gun_pool:await_up(ManagerPid),
+ [begin
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ %% We need to wait a bit for the request to be sent because the
+ %% request is sent and counted asynchronously.
+ timer:sleep(10)
+ end || _ <- lists:seq(1, 10)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{
+ checkout_retry => [100, 500, 500, 500, 500, 500, 500]
+ }).
+
+kill_restart_h1(Config) ->
+ doc("Confirm the Gun process is restarted and the pool operational "
+ "after an HTTP/1.1 Gun process has crashed."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Get a connection and kill the process.
+ {operational, #{conns := Conns}} = gun_pool:info(ManagerPid),
+ ConnPid = hd(maps:keys(Conns)),
+ MRef = monitor(process, ConnPid),
+ exit(ConnPid, {shutdown, ?FUNCTION_NAME}),
+ receive {'DOWN', MRef, process, ConnPid, _} -> ok end,
+ {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+kill_restart_h2(Config) ->
+ doc("Confirm the Gun process is restarted and the pool operational "
+ "after an HTTP/2 Gun process has crashed."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Get a connection and kill the process.
+ {operational, #{conns := Conns}} = gun_pool:info(ManagerPid),
+ ConnPid = hd(maps:keys(Conns)),
+ MRef = monitor(process, ConnPid),
+ exit(ConnPid, {shutdown, ?FUNCTION_NAME}),
+ receive {'DOWN', MRef, process, ConnPid, _} -> ok end,
+ {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+%% @todo kill_restart_ws
+
+reconnect_h1(_) ->
+ doc("Confirm the Gun process reconnects automatically for HTTP/1.1 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ idle_timeout => 500,
+ scope => ?FUNCTION_NAME
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]}
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/", #{<<"host">> => Authority}) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Wait for the idle timeout to trigger.
+ timer:sleep(600),
+% {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/", #{<<"host">> => Authority}) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+reconnect_h2(Config) ->
+ doc("Confirm the Gun process reconnects automatically for HTTP/2 connections."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Wait for the idle timeout to trigger.
+ timer:sleep(600),
+% {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+%% @todo reconnect_ws