aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2021-01-11 15:14:49 +0100
committerLoïc Hoguin <[email protected]>2021-02-07 17:31:38 +0100
commit36fdf30fcf09a8ffe20c3498ac0b58de0971fd7d (patch)
treeaf08ff8a8416594946f22749103d2912d6d54221 /test
parent3a3e56fb66edaaa1a7093744a0fd8303b993b3c8 (diff)
downloadgun-36fdf30fcf09a8ffe20c3498ac0b58de0971fd7d.tar.gz
gun-36fdf30fcf09a8ffe20c3498ac0b58de0971fd7d.tar.bz2
gun-36fdf30fcf09a8ffe20c3498ac0b58de0971fd7d.zip
Initial commit for Gun pools
The approach taken here is very similar to what browsers are doing. A separate pool is created for each host/port/scope. The authority (host header) is used to determine which pool will execute requests. A connection process is semi-randomly chosen, from the connections that have capacity. Maximum capacity is determined by the protocol (the HTTP/2 setting set by the server is used, for example). Multiple processes can process requests/responses on the same connection concurrently. There is no need to "give back" the response to the pool, the number of ongoing streams is maintained via an event handler. The implementation is currently not strict, there may be more attempts to create requests than there is capacity. I'm not sure if it should be made strict or if Gun should just wait before sending requests (it only matters in the HTTP/2 case at the moment). When there is no connection with capacity available in the pool (because they have too many streams, or are reconnecting, or any other reason), checking out fails. There is no timeout to wait for a connection to be available. On the other hand the checkout_retry option allows setting multiple timeouts to retry checking out a connection. Each retry attempt's wait time can have a different value. The initial implementation of this work was sponsored by Kobil and made at the suggestion of Ilya Khaprov.
Diffstat (limited to 'test')
-rw-r--r--test/handlers/pool_ws_handler.erl13
-rw-r--r--test/pool_SUITE.erl376
2 files changed, 389 insertions, 0 deletions
diff --git a/test/handlers/pool_ws_handler.erl b/test/handlers/pool_ws_handler.erl
new file mode 100644
index 0000000..5475e88
--- /dev/null
+++ b/test/handlers/pool_ws_handler.erl
@@ -0,0 +1,13 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(pool_ws_handler).
+
+-export([init/4]).
+-export([handle/2]).
+
+init(_, _, _, #{user_opts := ReplyTo}) ->
+ {ok, ReplyTo}.
+
+handle(Frame, ReplyTo) ->
+ ReplyTo ! Frame,
+ {ok, 0, ReplyTo}.
diff --git a/test/pool_SUITE.erl b/test/pool_SUITE.erl
new file mode 100644
index 0000000..a48b75d
--- /dev/null
+++ b/test/pool_SUITE.erl
@@ -0,0 +1,376 @@
+%% Copyright (c) 2021, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(pool_SUITE).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-import(ct_helper, [doc/1]).
+-import(ct_helper, [config/2]).
+-import(gun_test, [receive_from/1]).
+
+all() ->
+ ct_helper:all(?MODULE).
+
+init_per_suite(Config) ->
+ {ok, _} = cowboy:start_clear({?MODULE, tcp}, [], do_proto_opts()),
+ Port = ranch:get_port({?MODULE, tcp}),
+ [{port, Port}|Config].
+
+end_per_suite(_) ->
+ ExtraListeners = [
+ max_streams_h2_size_1,
+ max_streams_h2_size_2,
+ reconnect_h1
+ ],
+ _ = [cowboy:stop_listener(Listener) || Listener <- ExtraListeners],
+ ok.
+
+do_proto_opts() ->
+ Routes = [
+ {"/", hello_h, []},
+ {"/delay", delayed_hello_h, 3000},
+ {"/ws", ws_echo_h, []}
+ ],
+ #{
+ env => #{dispatch => cowboy_router:compile([{'_', Routes}])}
+ }.
+
+%% Tests.
+
+hello_pool_h1(Config) ->
+ doc("Confirm the pool can be used for HTTP/1.1 connections."),
+ Port = config(port, Config),
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => ["localhost:", integer_to_binary(Port)]},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams].
+
+hello_pool_h2(Config) ->
+ doc("Confirm the pool can be used for HTTP/2 connections."),
+ Port = config(port, Config),
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => ["localhost:", integer_to_binary(Port)]},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams].
+
+hello_pool_ws(Config) ->
+ doc("Confirm the pool can be used for HTTP/1.1 connections upgraded to Websocket."),
+ Port = config(port, Config),
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{
+ protocols => [http],
+ ws_opts => #{
+ default_protocol => pool_ws_handler,
+ user_opts => self()
+ }
+ },
+ scope => ?FUNCTION_NAME,
+ setup_fun => {fun
+ (ConnPid, {gun_up, _, http}, SetupState) ->
+ _ = gun:ws_upgrade(ConnPid, "/ws"),
+ {setup, SetupState};
+ (_, {gun_upgrade, _, StreamRef, _, _}, _) ->
+ {up, ws, #{ws => StreamRef}};
+ (ConnPid, Msg, SetupState) ->
+ ct:pal("Unexpected setup message for ~p: ~p", [ConnPid, Msg]),
+ {setup, SetupState}
+ end, undefined}
+ }),
+ gun_pool:await_up(ManagerPid),
+ _ = [gun_pool:ws_send({text, <<"Hello world!">>}, #{
+ authority => ["localhost:", integer_to_binary(Port)],
+ scope => ?FUNCTION_NAME
+ }) || _ <- lists:seq(1, 8)],
+ %% The pool_ws_handler module sends frames back to us.
+ _ = [receive
+ {text, <<"Hello world!">>} ->
+ ok
+ end || _ <- lists:seq(1, 8)].
+
+max_streams_h1(Config) ->
+ doc("Confirm requests are rejected when the maximum number "
+ "of streams is reached for HTTP/1.1 connections."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME,
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ {async, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}),
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}).
+
+max_streams_h1_retry(Config) ->
+ doc("Confirm connection checkout is retried when the maximum number "
+ "of streams is reached for HTTP/1.1 connections."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME,
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ {async, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}),
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}),
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{
+ checkout_retry => [100, 500, 500, 500, 500, 500, 500],
+ scope => ?FUNCTION_NAME
+ }).
+
+max_streams_h2_size_1(_) ->
+ doc("Confirm requests are rejected when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ [{async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}) || _ <- lists:seq(1, 5)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}).
+
+max_streams_h2_size_1_retry(_) ->
+ doc("Confirm connection checkout is retried when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ [{async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}) || _ <- lists:seq(1, 5)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{
+ checkout_retry => [100, 500, 500, 500, 500, 500, 500]
+ }).
+
+max_streams_h2_size_2(_) ->
+ doc("Confirm requests are rejected when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 2
+ }),
+ gun_pool:await_up(ManagerPid),
+ [begin
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ %% We need to wait a bit for the request to be sent because the
+ %% request is sent and counted asynchronously.
+ timer:sleep(10)
+ end || _ <- lists:seq(1, 10)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}).
+
+max_streams_h2_size_2_retry(_) ->
+ doc("Confirm connection checkout is retried when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 2
+ }),
+ gun_pool:await_up(ManagerPid),
+ [begin
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ %% We need to wait a bit for the request to be sent because the
+ %% request is sent and counted asynchronously.
+ timer:sleep(10)
+ end || _ <- lists:seq(1, 10)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{
+ checkout_retry => [100, 500, 500, 500, 500, 500, 500]
+ }).
+
+kill_restart_h1(Config) ->
+ doc("Confirm the Gun process is restarted and the pool operational "
+ "after an HTTP/1.1 Gun process has crashed."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Get a connection and kill the process.
+ {operational, #{conns := Conns}} = gun_pool:info(ManagerPid),
+ ConnPid = hd(maps:keys(Conns)),
+ MRef = monitor(process, ConnPid),
+ exit(ConnPid, {shutdown, ?FUNCTION_NAME}),
+ receive {'DOWN', MRef, process, ConnPid, _} -> ok end,
+ {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+kill_restart_h2(Config) ->
+ doc("Confirm the Gun process is restarted and the pool operational "
+ "after an HTTP/2 Gun process has crashed."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Get a connection and kill the process.
+ {operational, #{conns := Conns}} = gun_pool:info(ManagerPid),
+ ConnPid = hd(maps:keys(Conns)),
+ MRef = monitor(process, ConnPid),
+ exit(ConnPid, {shutdown, ?FUNCTION_NAME}),
+ receive {'DOWN', MRef, process, ConnPid, _} -> ok end,
+ {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+%% @todo kill_restart_ws
+
+reconnect_h1(_) ->
+ doc("Confirm the Gun process reconnects automatically for HTTP/1.1 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ idle_timeout => 500,
+ scope => ?FUNCTION_NAME
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]}
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/", #{<<"host">> => Authority}) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Wait for the idle timeout to trigger.
+ timer:sleep(600),
+% {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/", #{<<"host">> => Authority}) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+reconnect_h2(Config) ->
+ doc("Confirm the Gun process reconnects automatically for HTTP/2 connections."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Wait for the idle timeout to trigger.
+ timer:sleep(600),
+% {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+%% @todo reconnect_ws