diff options
author | Loïc Hoguin <[email protected]> | 2021-01-11 15:14:49 +0100 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2021-02-07 17:31:38 +0100 |
commit | 36fdf30fcf09a8ffe20c3498ac0b58de0971fd7d (patch) | |
tree | af08ff8a8416594946f22749103d2912d6d54221 /test/pool_SUITE.erl | |
parent | 3a3e56fb66edaaa1a7093744a0fd8303b993b3c8 (diff) | |
download | gun-36fdf30fcf09a8ffe20c3498ac0b58de0971fd7d.tar.gz gun-36fdf30fcf09a8ffe20c3498ac0b58de0971fd7d.tar.bz2 gun-36fdf30fcf09a8ffe20c3498ac0b58de0971fd7d.zip |
Initial commit for Gun pools
The approach taken here is very similar to what browsers are
doing. A separate pool is created for each host/port/scope.
The authority (host header) is used to determine which pool
will execute requests. A connection process is semi-randomly
chosen, from the connections that have capacity. Maximum
capacity is determined by the protocol (the HTTP/2 setting
set by the server is used, for example). Multiple processes
can process requests/responses on the same connection
concurrently. There is no need to "give back" the response
to the pool, the number of ongoing streams is maintained via
an event handler.
The implementation is currently not strict, there may be
more attempts to create requests than there is capacity.
I'm not sure if it should be made strict or if Gun should
just wait before sending requests (it only matters in the
HTTP/2 case at the moment).
When there is no connection with capacity available in the
pool (because they have too many streams, or are reconnecting,
or any other reason), checking out fails. There is no timeout
to wait for a connection to be available. On the other hand
the checkout_retry option allows setting multiple timeouts
to retry checking out a connection. Each retry attempt's
wait time can have a different value.
The initial implementation of this work was sponsored by
Kobil and made at the suggestion of Ilya Khaprov.
Diffstat (limited to 'test/pool_SUITE.erl')
-rw-r--r-- | test/pool_SUITE.erl | 376 |
1 files changed, 376 insertions, 0 deletions
diff --git a/test/pool_SUITE.erl b/test/pool_SUITE.erl new file mode 100644 index 0000000..a48b75d --- /dev/null +++ b/test/pool_SUITE.erl @@ -0,0 +1,376 @@ +%% Copyright (c) 2021, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(pool_SUITE). +-compile(export_all). +-compile(nowarn_export_all). + +-import(ct_helper, [doc/1]). +-import(ct_helper, [config/2]). +-import(gun_test, [receive_from/1]). + +all() -> + ct_helper:all(?MODULE). + +init_per_suite(Config) -> + {ok, _} = cowboy:start_clear({?MODULE, tcp}, [], do_proto_opts()), + Port = ranch:get_port({?MODULE, tcp}), + [{port, Port}|Config]. + +end_per_suite(_) -> + ExtraListeners = [ + max_streams_h2_size_1, + max_streams_h2_size_2, + reconnect_h1 + ], + _ = [cowboy:stop_listener(Listener) || Listener <- ExtraListeners], + ok. + +do_proto_opts() -> + Routes = [ + {"/", hello_h, []}, + {"/delay", delayed_hello_h, 3000}, + {"/ws", ws_echo_h, []} + ], + #{ + env => #{dispatch => cowboy_router:compile([{'_', Routes}])} + }. + +%% Tests. + +hello_pool_h1(Config) -> + doc("Confirm the pool can be used for HTTP/1.1 connections."), + Port = config(port, Config), + {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{ + conn_opts => #{protocols => [http]}, + scope => ?FUNCTION_NAME + }), + gun_pool:await_up(ManagerPid), + Streams = [{async, _} = gun_pool:get("/", + #{<<"host">> => ["localhost:", integer_to_binary(Port)]}, + #{scope => ?FUNCTION_NAME} + ) || _ <- lists:seq(1, 8)], + _ = [begin + {response, nofin, 200, _} = gun_pool:await(StreamRef), + {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef) + end || {async, StreamRef} <- Streams]. + +hello_pool_h2(Config) -> + doc("Confirm the pool can be used for HTTP/2 connections."), + Port = config(port, Config), + {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{ + conn_opts => #{protocols => [http2]}, + scope => ?FUNCTION_NAME + }), + gun_pool:await_up(ManagerPid), + Streams = [{async, _} = gun_pool:get("/", + #{<<"host">> => ["localhost:", integer_to_binary(Port)]}, + #{scope => ?FUNCTION_NAME} + ) || _ <- lists:seq(1, 800)], + _ = [begin + {response, nofin, 200, _} = gun_pool:await(StreamRef), + {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef) + end || {async, StreamRef} <- Streams]. + +hello_pool_ws(Config) -> + doc("Confirm the pool can be used for HTTP/1.1 connections upgraded to Websocket."), + Port = config(port, Config), + {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{ + conn_opts => #{ + protocols => [http], + ws_opts => #{ + default_protocol => pool_ws_handler, + user_opts => self() + } + }, + scope => ?FUNCTION_NAME, + setup_fun => {fun + (ConnPid, {gun_up, _, http}, SetupState) -> + _ = gun:ws_upgrade(ConnPid, "/ws"), + {setup, SetupState}; + (_, {gun_upgrade, _, StreamRef, _, _}, _) -> + {up, ws, #{ws => StreamRef}}; + (ConnPid, Msg, SetupState) -> + ct:pal("Unexpected setup message for ~p: ~p", [ConnPid, Msg]), + {setup, SetupState} + end, undefined} + }), + gun_pool:await_up(ManagerPid), + _ = [gun_pool:ws_send({text, <<"Hello world!">>}, #{ + authority => ["localhost:", integer_to_binary(Port)], + scope => ?FUNCTION_NAME + }) || _ <- lists:seq(1, 8)], + %% The pool_ws_handler module sends frames back to us. + _ = [receive + {text, <<"Hello world!">>} -> + ok + end || _ <- lists:seq(1, 8)]. + +max_streams_h1(Config) -> + doc("Confirm requests are rejected when the maximum number " + "of streams is reached for HTTP/1.1 connections."), + Port = config(port, Config), + Authority = ["localhost:", integer_to_binary(Port)], + {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{ + conn_opts => #{protocols => [http]}, + scope => ?FUNCTION_NAME, + size => 1 + }), + gun_pool:await_up(ManagerPid), + {async, _} = gun_pool:get("/delay", + #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}), + timer:sleep(500), + {error, no_connection_available, _} = gun_pool:get("/delay", + #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}). + +max_streams_h1_retry(Config) -> + doc("Confirm connection checkout is retried when the maximum number " + "of streams is reached for HTTP/1.1 connections."), + Port = config(port, Config), + Authority = ["localhost:", integer_to_binary(Port)], + {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{ + conn_opts => #{protocols => [http]}, + scope => ?FUNCTION_NAME, + size => 1 + }), + gun_pool:await_up(ManagerPid), + {async, _} = gun_pool:get("/delay", + #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}), + timer:sleep(500), + {error, no_connection_available, _} = gun_pool:get("/delay", + #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}), + {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{ + checkout_retry => [100, 500, 500, 500, 500, 500, 500], + scope => ?FUNCTION_NAME + }). + +max_streams_h2_size_1(_) -> + doc("Confirm requests are rejected when the maximum number " + "of streams is reached for HTTP/2 connections."), + ProtoOpts = do_proto_opts(), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{ + max_concurrent_streams => 5 + }), + Port = ranch:get_port(?FUNCTION_NAME), + Authority = ["localhost:", integer_to_binary(Port)], + {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{ + conn_opts => #{protocols => [http2]}, + size => 1 + }), + gun_pool:await_up(ManagerPid), + [{async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}) || _ <- lists:seq(1, 5)], + timer:sleep(500), + {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}). + +max_streams_h2_size_1_retry(_) -> + doc("Confirm connection checkout is retried when the maximum number " + "of streams is reached for HTTP/2 connections."), + ProtoOpts = do_proto_opts(), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{ + max_concurrent_streams => 5 + }), + Port = ranch:get_port(?FUNCTION_NAME), + Authority = ["localhost:", integer_to_binary(Port)], + {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{ + conn_opts => #{protocols => [http2]}, + size => 1 + }), + gun_pool:await_up(ManagerPid), + [{async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}) || _ <- lists:seq(1, 5)], + timer:sleep(500), + {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}), + {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{ + checkout_retry => [100, 500, 500, 500, 500, 500, 500] + }). + +max_streams_h2_size_2(_) -> + doc("Confirm requests are rejected when the maximum number " + "of streams is reached for HTTP/2 connections."), + ProtoOpts = do_proto_opts(), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{ + max_concurrent_streams => 5 + }), + Port = ranch:get_port(?FUNCTION_NAME), + Authority = ["localhost:", integer_to_binary(Port)], + {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{ + conn_opts => #{protocols => [http2]}, + size => 2 + }), + gun_pool:await_up(ManagerPid), + [begin + {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}), + %% We need to wait a bit for the request to be sent because the + %% request is sent and counted asynchronously. + timer:sleep(10) + end || _ <- lists:seq(1, 10)], + timer:sleep(500), + {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}). + +max_streams_h2_size_2_retry(_) -> + doc("Confirm connection checkout is retried when the maximum number " + "of streams is reached for HTTP/2 connections."), + ProtoOpts = do_proto_opts(), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{ + max_concurrent_streams => 5 + }), + Port = ranch:get_port(?FUNCTION_NAME), + Authority = ["localhost:", integer_to_binary(Port)], + {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{ + conn_opts => #{protocols => [http2]}, + size => 2 + }), + gun_pool:await_up(ManagerPid), + [begin + {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}), + %% We need to wait a bit for the request to be sent because the + %% request is sent and counted asynchronously. + timer:sleep(10) + end || _ <- lists:seq(1, 10)], + timer:sleep(500), + {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}), + {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{ + checkout_retry => [100, 500, 500, 500, 500, 500, 500] + }). + +kill_restart_h1(Config) -> + doc("Confirm the Gun process is restarted and the pool operational " + "after an HTTP/1.1 Gun process has crashed."), + Port = config(port, Config), + Authority = ["localhost:", integer_to_binary(Port)], + {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{ + conn_opts => #{protocols => [http]}, + scope => ?FUNCTION_NAME + }), + gun_pool:await_up(ManagerPid), + Streams1 = [{async, _} = gun_pool:get("/", + #{<<"host">> => Authority}, + #{scope => ?FUNCTION_NAME} + ) || _ <- lists:seq(1, 8)], + _ = [begin + {response, nofin, 200, _} = gun_pool:await(StreamRef), + {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef) + end || {async, StreamRef} <- Streams1], + %% Get a connection and kill the process. + {operational, #{conns := Conns}} = gun_pool:info(ManagerPid), + ConnPid = hd(maps:keys(Conns)), + MRef = monitor(process, ConnPid), + exit(ConnPid, {shutdown, ?FUNCTION_NAME}), + receive {'DOWN', MRef, process, ConnPid, _} -> ok end, + {degraded, _} = gun_pool:info(ManagerPid), + gun_pool:await_up(ManagerPid), + Streams2 = [{async, _} = gun_pool:get("/", + #{<<"host">> => Authority}, + #{scope => ?FUNCTION_NAME} + ) || _ <- lists:seq(1, 8)], + _ = [begin + {response, nofin, 200, _} = gun_pool:await(StreamRef), + {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef) + end || {async, StreamRef} <- Streams2]. + +kill_restart_h2(Config) -> + doc("Confirm the Gun process is restarted and the pool operational " + "after an HTTP/2 Gun process has crashed."), + Port = config(port, Config), + Authority = ["localhost:", integer_to_binary(Port)], + {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{ + conn_opts => #{protocols => [http2]}, + scope => ?FUNCTION_NAME + }), + gun_pool:await_up(ManagerPid), + Streams1 = [{async, _} = gun_pool:get("/", + #{<<"host">> => Authority}, + #{scope => ?FUNCTION_NAME} + ) || _ <- lists:seq(1, 800)], + _ = [begin + {response, nofin, 200, _} = gun_pool:await(StreamRef), + {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef) + end || {async, StreamRef} <- Streams1], + %% Get a connection and kill the process. + {operational, #{conns := Conns}} = gun_pool:info(ManagerPid), + ConnPid = hd(maps:keys(Conns)), + MRef = monitor(process, ConnPid), + exit(ConnPid, {shutdown, ?FUNCTION_NAME}), + receive {'DOWN', MRef, process, ConnPid, _} -> ok end, + {degraded, _} = gun_pool:info(ManagerPid), + gun_pool:await_up(ManagerPid), + Streams2 = [{async, _} = gun_pool:get("/", + #{<<"host">> => Authority}, + #{scope => ?FUNCTION_NAME} + ) || _ <- lists:seq(1, 800)], + _ = [begin + {response, nofin, 200, _} = gun_pool:await(StreamRef), + {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef) + end || {async, StreamRef} <- Streams2]. + +%% @todo kill_restart_ws + +reconnect_h1(_) -> + doc("Confirm the Gun process reconnects automatically for HTTP/1.1 connections."), + ProtoOpts = do_proto_opts(), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{ + idle_timeout => 500, + scope => ?FUNCTION_NAME + }), + Port = ranch:get_port(?FUNCTION_NAME), + Authority = ["localhost:", integer_to_binary(Port)], + {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{ + conn_opts => #{protocols => [http]} + }), + gun_pool:await_up(ManagerPid), + Streams1 = [{async, _} = gun_pool:get("/", #{<<"host">> => Authority}) || _ <- lists:seq(1, 8)], + _ = [begin + {response, nofin, 200, _} = gun_pool:await(StreamRef), + {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef) + end || {async, StreamRef} <- Streams1], + %% Wait for the idle timeout to trigger. + timer:sleep(600), +% {degraded, _} = gun_pool:info(ManagerPid), + gun_pool:await_up(ManagerPid), + Streams2 = [{async, _} = gun_pool:get("/", #{<<"host">> => Authority}) || _ <- lists:seq(1, 8)], + _ = [begin + {response, nofin, 200, _} = gun_pool:await(StreamRef), + {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef) + end || {async, StreamRef} <- Streams2]. + +reconnect_h2(Config) -> + doc("Confirm the Gun process reconnects automatically for HTTP/2 connections."), + Port = config(port, Config), + Authority = ["localhost:", integer_to_binary(Port)], + {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{ + conn_opts => #{protocols => [http2]}, + scope => ?FUNCTION_NAME + }), + gun_pool:await_up(ManagerPid), + Streams1 = [{async, _} = gun_pool:get("/", + #{<<"host">> => Authority}, + #{scope => ?FUNCTION_NAME} + ) || _ <- lists:seq(1, 800)], + _ = [begin + {response, nofin, 200, _} = gun_pool:await(StreamRef), + {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef) + end || {async, StreamRef} <- Streams1], + %% Wait for the idle timeout to trigger. + timer:sleep(600), +% {degraded, _} = gun_pool:info(ManagerPid), + gun_pool:await_up(ManagerPid), + Streams2 = [{async, _} = gun_pool:get("/", + #{<<"host">> => Authority}, + #{scope => ?FUNCTION_NAME} + ) || _ <- lists:seq(1, 800)], + _ = [begin + {response, nofin, 200, _} = gun_pool:await(StreamRef), + {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef) + end || {async, StreamRef} <- Streams2]. + +%% @todo reconnect_ws |