aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2021-01-11 15:14:49 +0100
committerLoïc Hoguin <[email protected]>2021-02-07 17:31:38 +0100
commit36fdf30fcf09a8ffe20c3498ac0b58de0971fd7d (patch)
treeaf08ff8a8416594946f22749103d2912d6d54221
parent3a3e56fb66edaaa1a7093744a0fd8303b993b3c8 (diff)
downloadgun-36fdf30fcf09a8ffe20c3498ac0b58de0971fd7d.tar.gz
gun-36fdf30fcf09a8ffe20c3498ac0b58de0971fd7d.tar.bz2
gun-36fdf30fcf09a8ffe20c3498ac0b58de0971fd7d.zip
Initial commit for Gun pools
The approach taken here is very similar to what browsers are doing. A separate pool is created for each host/port/scope. The authority (host header) is used to determine which pool will execute requests. A connection process is semi-randomly chosen, from the connections that have capacity. Maximum capacity is determined by the protocol (the HTTP/2 setting set by the server is used, for example). Multiple processes can process requests/responses on the same connection concurrently. There is no need to "give back" the response to the pool, the number of ongoing streams is maintained via an event handler. The implementation is currently not strict, there may be more attempts to create requests than there is capacity. I'm not sure if it should be made strict or if Gun should just wait before sending requests (it only matters in the HTTP/2 case at the moment). When there is no connection with capacity available in the pool (because they have too many streams, or are reconnecting, or any other reason), checking out fails. There is no timeout to wait for a connection to be available. On the other hand the checkout_retry option allows setting multiple timeouts to retry checking out a connection. Each retry attempt's wait time can have a different value. The initial implementation of this work was sponsored by Kobil and made at the suggestion of Ilya Khaprov.
-rw-r--r--doc/src/manual/gun_up.asciidoc2
-rw-r--r--ebin/gun.app2
-rw-r--r--src/gun.erl7
-rw-r--r--src/gun_app.erl1
-rw-r--r--src/gun_conns_sup.erl36
-rw-r--r--src/gun_event.erl2
-rw-r--r--src/gun_http.erl8
-rw-r--r--src/gun_http2.erl2
-rw-r--r--src/gun_pool.erl704
-rw-r--r--src/gun_pool_events_h.erl157
-rw-r--r--src/gun_pools_sup.erl37
-rw-r--r--src/gun_sup.erl13
-rw-r--r--test/handlers/pool_ws_handler.erl13
-rw-r--r--test/pool_SUITE.erl376
14 files changed, 1343 insertions, 17 deletions
diff --git a/doc/src/manual/gun_up.asciidoc b/doc/src/manual/gun_up.asciidoc
index 3853afd..2c05432 100644
--- a/doc/src/manual/gun_up.asciidoc
+++ b/doc/src/manual/gun_up.asciidoc
@@ -11,7 +11,7 @@ gun_up - The connection is up
{gun_up, ConnPid, Protocol}
ConnPid :: pid()
-Protocol :: http | http2 | socks
+Protocol :: http | http2 | raw | socks
----
The connection is up.
diff --git a/ebin/gun.app b/ebin/gun.app
index 779ccbe..4efac80 100644
--- a/ebin/gun.app
+++ b/ebin/gun.app
@@ -1,7 +1,7 @@
{application, 'gun', [
{description, "HTTP/1.1, HTTP/2 and Websocket client for Erlang/OTP."},
{vsn, "2.0.0-rc.1"},
- {modules, ['gun','gun_app','gun_content_handler','gun_cookies','gun_cookies_list','gun_data_h','gun_default_event_h','gun_event','gun_http','gun_http2','gun_protocols','gun_public_suffix','gun_raw','gun_socks','gun_sse_h','gun_sup','gun_tcp','gun_tcp_proxy','gun_tls','gun_tls_proxy','gun_tls_proxy_cb','gun_tls_proxy_http2_connect','gun_tunnel','gun_ws','gun_ws_h']},
+ {modules, ['gun','gun_app','gun_conns_sup','gun_content_handler','gun_cookies','gun_cookies_list','gun_data_h','gun_default_event_h','gun_event','gun_http','gun_http2','gun_pool','gun_pool_events_h','gun_pools_sup','gun_protocols','gun_public_suffix','gun_raw','gun_socks','gun_sse_h','gun_sup','gun_tcp','gun_tcp_proxy','gun_tls','gun_tls_proxy','gun_tls_proxy_cb','gun_tls_proxy_http2_connect','gun_tunnel','gun_ws','gun_ws_h']},
{registered, [gun_sup]},
{applications, [kernel,stdlib,ssl,cowlib]},
{mod, {gun_app, []}},
diff --git a/src/gun.erl b/src/gun.erl
index 8a101a7..d808750 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -98,6 +98,7 @@
-export([start_link/4]).
-export([callback_mode/0]).
-export([init/1]).
+-export([default_transport/1]).
-export([not_connected/3]).
-export([domain_lookup/3]).
-export([connecting/3]).
@@ -321,7 +322,7 @@ do_open(Host, Port, Opts0) ->
case check_options(maps:to_list(Opts)) of
ok ->
Result = case maps:get(supervise, Opts, true) of
- true -> supervisor:start_child(gun_sup, [self(), Host, Port, Opts]);
+ true -> supervisor:start_child(gun_conns_sup, [self(), Host, Port, Opts]);
false -> start_link(self(), Host, Port, Opts)
end,
case Result of
@@ -508,7 +509,7 @@ intermediaries_info([Intermediary=#{transport := Transport0}|Tail], Acc) ->
-spec close(pid()) -> ok.
close(ServerPid) ->
- supervisor:terminate_child(gun_sup, ServerPid).
+ supervisor:terminate_child(gun_conns_sup, ServerPid).
-spec shutdown(pid()) -> ok.
shutdown(ServerPid) ->
@@ -832,6 +833,8 @@ await_up(ServerPid, Timeout, MRef) ->
{error, timeout}
end.
+%% Flushing gun messages.
+
-spec flush(pid() | stream_ref()) -> ok.
flush(ServerPid) when is_pid(ServerPid) ->
flush_pid(ServerPid);
diff --git a/src/gun_app.erl b/src/gun_app.erl
index 848faf2..624e65f 100644
--- a/src/gun_app.erl
+++ b/src/gun_app.erl
@@ -23,6 +23,7 @@
%% API.
start(_Type, _Args) ->
+ gun_pools = ets:new(gun_pools, [ordered_set, public, named_table]),
gun_sup:start_link().
stop(_State) ->
diff --git a/src/gun_conns_sup.erl b/src/gun_conns_sup.erl
new file mode 100644
index 0000000..bee266b
--- /dev/null
+++ b/src/gun_conns_sup.erl
@@ -0,0 +1,36 @@
+%% Copyright (c) 2013-2021, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_conns_sup).
+-behaviour(supervisor).
+
+%% API.
+-export([start_link/0]).
+
+%% supervisor.
+-export([init/1]).
+
+%% API.
+
+-spec start_link() -> {ok, pid()}.
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% supervisor.
+
+init([]) ->
+ Procs = [
+ #{id => gun, start => {gun, start_link, []}, restart => temporary}
+ ],
+ {ok, {#{strategy => simple_one_for_one}, Procs}}.
diff --git a/src/gun_event.erl b/src/gun_event.erl
index 71da77b..ab6020b 100644
--- a/src/gun_event.erl
+++ b/src/gun_event.erl
@@ -83,7 +83,7 @@
-type request_start_event() :: #{
stream_ref := gun:stream_ref(),
reply_to := pid(),
- function := headers | request | ws_upgrade,
+ function := headers | request | ws_upgrade, %% @todo connect?
method := iodata(),
scheme => binary(),
authority := iodata(),
diff --git a/src/gun_http.erl b/src/gun_http.erl
index cc541e2..aaa6d15 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -35,7 +35,7 @@
-export([down/1]).
-export([ws_upgrade/11]).
-%% Functions shared with gun_http2.
+%% Functions shared with gun_http2 and gun_pool.
-export([host_header/3]).
-type io() :: head | {body, non_neg_integer()} | body_close | body_chunked | body_trailer.
@@ -607,7 +607,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi
end,
{Authority, Headers3} = case lists:keyfind(<<"host">>, 1, Headers2) of
false ->
- Authority0 = host_header(Transport, Host, Port),
+ Authority0 = host_header(Transport:name(), Host, Port),
{Authority0, [{<<"host">>, Authority0}|Headers2]};
{_, Authority1} ->
{Authority1, Headers2}
@@ -648,7 +648,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi
end,
{Authority, Conn, Out, CookieStore, EvHandlerState}.
-host_header(Transport, Host0, Port) ->
+host_header(TransportName, Host0, Port) ->
Host = case Host0 of
{local, _SocketPath} -> <<>>;
Tuple when tuple_size(Tuple) =:= 8 -> [$[, inet:ntoa(Tuple), $]]; %% IPv6.
@@ -656,7 +656,7 @@ host_header(Transport, Host0, Port) ->
Atom when is_atom(Atom) -> atom_to_list(Atom);
_ -> Host0
end,
- case {Transport:name(), Port} of
+ case {TransportName, Port} of
{tcp, 80} -> Host;
{tls, 443} -> Host;
_ -> [Host, $:, integer_to_binary(Port)]
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 2778b4f..ec51235 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -979,7 +979,7 @@ prepare_headers(State=#http2_state{transport=Transport},
Scheme = scheme(State),
Authority = case lists:keyfind(<<"host">>, 1, Headers0) of
{_, Host} -> Host;
- _ -> gun_http:host_header(Transport, Host0, Port)
+ _ -> gun_http:host_header(Transport:name(), Host0, Port)
end,
%% @todo We also must remove any header found in the connection header.
%% @todo Much of this is duplicated in cow_http2_machine; sort things out.
diff --git a/src/gun_pool.erl b/src/gun_pool.erl
new file mode 100644
index 0000000..56734ee
--- /dev/null
+++ b/src/gun_pool.erl
@@ -0,0 +1,704 @@
+%% Copyright (c) 2021, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_pool).
+-behaviour(gen_statem).
+
+%% Pools.
+-export([start_pool/3]).
+-export([stop_pool/2]).
+-export([stop_pool/3]).
+% @todo shutdown pool?
+-export([info/0]).
+-export([info/1]).
+-export([info/2]).
+-export([await_up/1]).
+-export([await_up/2]).
+-export([checkout/2]). %% Use responsibly!
+
+%% Requests.
+-export([delete/2]).
+-export([delete/3]).
+-export([get/2]).
+-export([get/3]).
+-export([head/2]).
+-export([head/3]).
+-export([options/2]).
+-export([options/3]).
+-export([patch/2]).
+-export([patch/3]).
+-export([patch/4]).
+-export([post/2]).
+-export([post/3]).
+-export([post/4]).
+-export([put/2]).
+-export([put/3]).
+-export([put/4]).
+
+%% Generic requests interface.
+-export([headers/3]).
+-export([headers/4]).
+-export([request/4]).
+-export([request/5]).
+
+%% Streaming data.
+-export([data/3]).
+
+%% Tunneling. (HTTP/2+ only.)
+%% @todo -export([connect/2]).
+%% @todo -export([connect/3]).
+%% @todo -export([connect/4]).
+
+%% Cookies.
+%% @todo -export([gc_cookies/1]).
+%% @todo -export([session_gc_cookies/1]).
+
+%% Awaiting gun messages.
+-export([await/1]).
+-export([await/2]).
+-export([await/3]).
+-export([await_body/1]).
+-export([await_body/2]).
+-export([await_body/3]).
+
+%% Flushing gun messages.
+-export([flush/1]).
+
+%% Streams.
+-export([update_flow/2]).
+-export([cancel/1]).
+-export([stream_info/1]).
+
+%% Websocket. (HTTP/2+ only for upgrade.)
+%% -export([ws_upgrade/1]).
+%% -export([ws_upgrade/2]).
+%% -export([ws_upgrade/3]).
+-export([ws_send/2]).
+%% -export([ws_send/3]). (HTTP/2+ only.)
+
+%% Internals.
+-export([callback_mode/0]).
+-export([start_link/3]).
+-export([init/1]).
+-export([degraded/3]).
+-export([operational/3]).
+-export([terminate/3]).
+
+-type setup_msg() :: {gun_up, pid(), http | http2 | raw | socks}
+ | {gun_upgrade, pid(), gun:stream_ref(), [<<"websocket">>], [{binary(), binary()}]}.
+
+-type opts() :: #{
+ conn_opts => gun:opts(),
+ scope => any(),
+ setup_fun => {fun((pid(), setup_msg(), any()) -> any()), any()},
+ size => non_neg_integer()
+}.
+-export_type([opts/0]).
+
+-type pool_stream_ref() :: {pid(), gun:stream_ref()}.
+-export_type([pool_stream_ref/0]).
+
+-type error_result() :: {error, pool_not_found | no_connection_available, atom()}. %% @todo {pool_start_error, SupError}
+
+-type request_result() :: {async, pool_stream_ref()}
+ %% @todo {sync, ...} perhaps with Status, Headers, Body, and Extra info such as intermediate responses.
+ | error_result().
+
+-type req_opts() :: #{
+ %% Options common with normal Gun.
+ flow => pos_integer(),
+ reply_to => pid(),
+% @todo tunnel => stream_ref(),
+
+ %% Options specific to pools.
+ checkout_call_timeout => timeout(),
+ checkout_retry => [pos_integer()],
+ scope => any(),
+ start_pool_if_missing => boolean()
+}.
+-export_type([req_opts/0]).
+
+-type ws_send_opts() :: #{
+ authority => iodata(),
+
+ %% Options specific to pools.
+ checkout_call_timeout => timeout(),
+ checkout_retry => [pos_integer()],
+ scope => any(),
+ start_pool_if_missing => boolean()
+}.
+-export_type([ws_send_opts/0]).
+
+%% @todo tunnel
+-type meta() :: #{pid() => #{ws => gun:stream_ref()}}.
+
+-record(state, {
+ host :: inet:hostname() | inet:ip_address(),
+ port :: inet:port_number(),
+ opts :: gun:opts(),
+ table :: ets:tid(),
+ conns :: #{pid() => down | {setup, any()} | {up, http | http2 | ws | raw, map()}},
+ conns_meta = #{} :: meta(),
+ await_up = [] :: [{pid(), any()}]
+}).
+
+%% Pool management.
+
+-spec start_pool(inet:hostname() | inet:ip_address(), inet:port_number(), opts())
+ -> {ok, pid()} | {error, any()}.
+start_pool(Host, Port, Opts) ->
+ supervisor:start_child(gun_pools_sup, [Host, Port, Opts]).
+
+-spec stop_pool(inet:hostname() | inet:ip_address(), inet:port_number())
+ -> ok.
+stop_pool(Host, Port) ->
+ stop_pool(Host, Port, #{}).
+
+-spec stop_pool(inet:hostname() | inet:ip_address(), inet:port_number(), req_opts())
+ -> ok | {error, pool_not_found, atom()}.
+stop_pool(Host, Port, ReqOpts) ->
+ case get_pool(iolist_to_binary([Host, $:, integer_to_binary(Port)]), ReqOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ supervisor:terminate_child(gun_pools_sup, ManagerPid)
+ end.
+
+-spec info() -> [map()].
+info() ->
+ ets:foldl(fun({{Scope, _}, ManagerPid}, Acc) ->
+ {StateName, Info} = info(ManagerPid),
+ [Info#{scope => Scope, state => StateName}|Acc]
+ end, [], gun_pools).
+
+-spec info(pid() | binary()) -> undefined | {degraded | operational, map()}.
+info(ManagerPid) when is_pid(ManagerPid) ->
+ gen_statem:call(ManagerPid, info);
+info(Authority) ->
+ info(Authority, default).
+
+-spec info(binary(), any()) -> undefined | {degraded | operational, map()}.
+info(Authority, Scope) ->
+ case ets:lookup(gun_pools, {Scope, Authority}) of
+ [] ->
+ undefined;
+ [{_, ManagerPid}] ->
+ gen_statem:call(ManagerPid, info)
+ end.
+
+-spec await_up(pid() | binary()) -> ok | {error, pool_not_found, atom()}.
+await_up(ManagerPid) when is_pid(ManagerPid) ->
+ gen_statem:call(ManagerPid, await_up, 5000);
+await_up(Authority) ->
+ await_up(Authority, default).
+
+-spec await_up(binary(), any()) -> ok | {error, pool_not_found, atom()}.
+await_up(Authority, Scope) ->
+ case ets:lookup(gun_pools, {Scope, Authority}) of
+ [] ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ [{_, ManagerPid}] ->
+ gen_statem:call(ManagerPid, await_up, 5000)
+ end.
+
+-spec checkout(pid(), req_opts()) -> undefined | {pid(), map()}.
+checkout(ManagerPid, ReqOpts=#{checkout_retry := Retry}) when is_list(Retry) ->
+ CallTimeout = maps:get(checkout_call_timeout, ReqOpts, 5000),
+ case gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout) of
+ undefined ->
+ checkout_retry(ManagerPid, ReqOpts, CallTimeout, Retry);
+ Result ->
+ Result
+ end;
+checkout(ManagerPid, ReqOpts) ->
+ CallTimeout = maps:get(checkout_call_timeout, ReqOpts, 5000),
+ gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout).
+
+%% When the checkout_retry option is used, and the first call resulted
+%% in no connection being given out, we wait for the configured amount
+%% of time then try again. We loop over the wait times until there is
+%% none.
+checkout_retry(_, _, _, []) ->
+ undefined;
+checkout_retry(ManagerPid, ReqOpts, CallTimeout, [Wait|Retry]) ->
+ timer:sleep(Wait),
+ case gen_server:call(ManagerPid, {checkout, ReqOpts}, CallTimeout) of
+ undefined ->
+ checkout_retry(ManagerPid, ReqOpts, CallTimeout, Retry);
+ Result ->
+ Result
+ end.
+
+%% Requests.
+
+-spec delete(iodata(), gun:req_headers()) -> request_result().
+delete(Path, Headers) ->
+ request(<<"DELETE">>, Path, Headers, <<>>).
+
+-spec delete(iodata(), gun:req_headers(), req_opts()) -> request_result().
+delete(Path, Headers, ReqOpts) ->
+ request(<<"DELETE">>, Path, Headers, <<>>, ReqOpts).
+
+-spec get(iodata(), gun:req_headers()) -> request_result().
+get(Path, Headers) ->
+ request(<<"GET">>, Path, Headers, <<>>).
+
+-spec get(iodata(), gun:req_headers(), req_opts()) -> request_result().
+get(Path, Headers, ReqOpts) ->
+ request(<<"GET">>, Path, Headers, <<>>, ReqOpts).
+
+-spec head(iodata(), gun:req_headers()) -> request_result().
+head(Path, Headers) ->
+ request(<<"HEAD">>, Path, Headers, <<>>).
+
+-spec head(iodata(), gun:req_headers(), req_opts()) -> request_result().
+head(Path, Headers, ReqOpts) ->
+ request(<<"HEAD">>, Path, Headers, <<>>, ReqOpts).
+
+-spec options(iodata(), gun:req_headers()) -> request_result().
+options(Path, Headers) ->
+ request(<<"OPTIONS">>, Path, Headers, <<>>).
+
+-spec options(iodata(), gun:req_headers(), req_opts()) -> request_result().
+options(Path, Headers, ReqOpts) ->
+ request(<<"OPTIONS">>, Path, Headers, <<>>, ReqOpts).
+
+-spec patch(iodata(), gun:req_headers()) -> request_result().
+patch(Path, Headers) ->
+ headers(<<"PATCH">>, Path, Headers).
+
+-spec patch(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
+patch(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
+ headers(<<"PATCH">>, Path, Headers, ReqOpts);
+patch(Path, Headers, Body) ->
+ request(<<"PATCH">>, Path, Headers, Body).
+
+-spec patch(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+patch(Path, Headers, Body, ReqOpts) ->
+ request(<<"PATCH">>, Path, Headers, Body, ReqOpts).
+
+-spec post(iodata(), gun:req_headers()) -> request_result().
+post(Path, Headers) ->
+ headers(<<"POST">>, Path, Headers).
+
+-spec post(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
+post(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
+ headers(<<"POST">>, Path, Headers, ReqOpts);
+post(Path, Headers, Body) ->
+ request(<<"POST">>, Path, Headers, Body).
+
+-spec post(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+post(Path, Headers, Body, ReqOpts) ->
+ request(<<"POST">>, Path, Headers, Body, ReqOpts).
+
+-spec put(iodata(), gun:req_headers()) -> request_result().
+put(Path, Headers) ->
+ headers(<<"PUT">>, Path, Headers).
+
+-spec put(iodata(), gun:req_headers(), iodata() | req_opts()) -> request_result().
+put(Path, Headers, ReqOpts) when is_map(ReqOpts) ->
+ headers(<<"PUT">>, Path, Headers, ReqOpts);
+put(Path, Headers, Body) ->
+ request(<<"PUT">>, Path, Headers, Body).
+
+-spec put(iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+put(Path, Headers, Body, ReqOpts) ->
+ request(<<"PUT">>, Path, Headers, Body, ReqOpts).
+
+%% Generic requests interface.
+%%
+%% @todo Accept a TargetURI map as well as a normal Path.
+
+-spec headers(iodata(), iodata(), gun:req_headers()) -> request_result().
+headers(Method, Path, Headers) ->
+ headers(Method, Path, Headers, #{}).
+
+-spec headers(iodata(), iodata(), gun:req_headers(), req_opts()) -> request_result().
+headers(Method, Path, Headers, ReqOpts) ->
+ case get_pool(authority(Headers), ReqOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ case checkout(ManagerPid, ReqOpts) of
+ undefined ->
+ {error, no_connection_available,
+ 'No connection in the pool with enough capacity available to open a new stream.'};
+ {ConnPid, _Meta} ->
+ StreamRef = gun:headers(ConnPid, Method, Path, Headers, ReqOpts),
+ %% @todo Synchronous mode.
+ {async, {ConnPid, StreamRef}}
+ end
+ end.
+
+-spec request(iodata(), iodata(), gun:req_headers(), iodata()) -> request_result().
+request(Method, Path, Headers, Body) ->
+ request(Method, Path, Headers, Body, #{}).
+
+-spec request(iodata(), iodata(), gun:req_headers(), iodata(), req_opts()) -> request_result().
+request(Method, Path, Headers, Body, ReqOpts) ->
+ case get_pool(authority(Headers), ReqOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ case checkout(ManagerPid, ReqOpts) of
+ undefined ->
+ {error, no_connection_available,
+ 'No connection in the pool with enough capacity available to open a new stream.'};
+ {ConnPid, _Meta} ->
+ StreamRef = gun:request(ConnPid, Method, Path, Headers, Body, ReqOpts),
+ %% @todo Synchronous mode.
+ {async, {ConnPid, StreamRef}}
+ end
+ end.
+
+%% We require the host to be given in the headers for the time being.
+%% @todo Allow passing it in options. Websocket send already does that.
+authority(#{<<"host">> := Authority}) ->
+ Authority;
+authority(Headers) ->
+ {_, Authority} = lists:keyfind(<<"host">>, 1, Headers),
+ Authority.
+
+get_pool(Authority0, ReqOpts) ->
+ Authority = iolist_to_binary(Authority0),
+ %% @todo Perhaps rename this to temporary.
+ %% There's two concepts: temporary pool is started and stops
+ %% when there's no traffic. Dynamic pool simply reduces its number
+ %% of connections when there's no traffic.
+ StartPoolIfMissing = maps:get(start_pool_if_missing, ReqOpts, false),
+ Scope = maps:get(scope, ReqOpts, default),
+ case ets:lookup(gun_pools, {Scope, Authority}) of
+ [] when StartPoolIfMissing ->
+ start_missing_pool(Authority, ReqOpts);
+ [] ->
+ undefined;
+ [{_, ManagerPid}] ->
+ %% @todo With temporary pool, getting a pid here doesn't mean the pool can be used.
+ %% Indeed the manager could be in process of stopping. I suppose we must
+ %% do a check but perhaps it's best to leave that detail to the user
+ %% (they can easily retry and recreate the pool if necessary).
+ ManagerPid
+ end.
+
+start_missing_pool(_Authority, _ReqOpts) ->
+ undefined.
+
+%% Streaming data.
+
+-spec data(pool_stream_ref(), fin | nofin, iodata()) -> ok.
+data({ConnPid, StreamRef}, IsFin, Data) ->
+ gun:data(ConnPid, StreamRef, IsFin, Data).
+
+%% Awaiting gun messages.
+
+-spec await(pool_stream_ref()) -> gun:await_result().
+await({ConnPid, StreamRef}) ->
+ gun:await(ConnPid, StreamRef).
+
+-spec await(pool_stream_ref(), timeout() | reference()) -> gun:await_result().
+await({ConnPid, StreamRef}, MRefOrTimeout) ->
+ gun:await(ConnPid, StreamRef, MRefOrTimeout).
+
+-spec await(pool_stream_ref(), timeout(), reference()) -> gun:await_result().
+await({ConnPid, StreamRef}, Timeout, MRef) ->
+ gun:await(ConnPid, StreamRef, Timeout, MRef).
+
+-spec await_body(pool_stream_ref()) -> gun:await_body_result().
+await_body({ConnPid, StreamRef}) ->
+ gun:await_body(ConnPid, StreamRef).
+
+-spec await_body(pool_stream_ref(), timeout() | reference()) -> gun:await_body_result().
+await_body({ConnPid, StreamRef}, MRefOrTimeout) ->
+ gun:await_body(ConnPid, StreamRef, MRefOrTimeout).
+
+-spec await_body(pool_stream_ref(), timeout(), reference()) -> gun:await_body_result().
+await_body({ConnPid, StreamRef}, Timeout, MRef) ->
+ gun:await_body(ConnPid, StreamRef, Timeout, MRef).
+
+%% Flushing gun messages.
+
+-spec flush(pool_stream_ref()) -> ok.
+flush({ConnPid, _}) ->
+ gun:flush(ConnPid).
+
+%% Flow control.
+
+-spec update_flow(pool_stream_ref(), pos_integer()) -> ok.
+update_flow({ConnPid, StreamRef}, Flow) ->
+ gun:update_flow(ConnPid, StreamRef, Flow).
+
+%% Cancelling a stream.
+
+-spec cancel(pool_stream_ref()) -> ok.
+cancel({ConnPid, StreamRef}) ->
+ gun:cancel(ConnPid, StreamRef).
+
+%% Information about a stream.
+
+-spec stream_info(pool_stream_ref()) -> {ok, map() | undefined} | {error, not_connected}.
+stream_info({ConnPid, StreamRef}) ->
+ gun:stream_info(ConnPid, StreamRef).
+
+%% Websocket.
+
+-spec ws_send(gun:ws_frame() | [gun:ws_frame()], ws_send_opts()) -> ok | error_result().
+ws_send(Frames, WsSendOpts=#{authority := Authority}) ->
+ case get_pool(Authority, WsSendOpts) of
+ undefined ->
+ {error, pool_not_found,
+ 'No pool was found for the given scope and authority.'};
+ ManagerPid ->
+ case checkout(ManagerPid, WsSendOpts) of
+ undefined ->
+ {error, no_connection_available,
+ 'No connection in the pool with enough capacity available to send Websocket frames.'};
+ {ConnPid, #{ws := StreamRef}} ->
+ gun:ws_send(ConnPid, StreamRef, Frames)
+ end
+ end.
+
+%% Pool manager internals.
+%%
+%% The pool manager is responsible for starting connection processes
+%% and restarting them as necessary. It also provides a suitable
+%% connection process to any caller that needs it.
+%%
+%% The pool manager installs an event handler into each connection.
+%% The event handler is responsible for counting the number of
+%% active streams. It updates the gun_pooled_conns ets table
+%% whenever a stream begins or ends.
+%%
+%% A connection is deemed suitable if it is possible to open new
+%% streams. How many streams can be open at any one time depends
+%% on the protocol. For HTTP/2 the manager process keeps track of
+%% the connection's settings to know the maximum. For non-stream
+%% based protocols, there is no limit.
+%%
+%% The connection to be used is otherwise chosen randomly. The
+%% first connection that is suitable is returned. There is no
+%% need to "give back" the connection to the manager.
+
+%% @todo
+%% What should happen if we always fail to reconnect? I suspect we keep the manager
+%% around and propagate errors, the same as if there's no more capacity? Perhaps have alarms?
+
+callback_mode() -> state_functions.
+
+start_link(Host, Port, Opts) ->
+ gen_statem:start_link(?MODULE, {Host, Port, Opts}, []).
+
+init({Host, Port, Opts}) ->
+ process_flag(trap_exit, true),
+ true = ets:insert_new(gun_pools, {gun_pools_key(Host, Port, Opts), self()}),
+ Tid = ets:new(gun_pooled_conns, [ordered_set, public]),
+ Size = maps:get(size, Opts, 8),
+ %% @todo Only start processes in static mode.
+ ConnOpts = conn_opts(Tid, Opts),
+ Conns = maps:from_list([begin
+ {ok, ConnPid} = gun:open(Host, Port, ConnOpts),
+ _ = monitor(process, ConnPid),
+ {ConnPid, down}
+ end || _ <- lists:seq(1, Size)]),
+ State = #state{
+ host=Host,
+ port=Port,
+ opts=Opts,
+ table=Tid,
+ conns=Conns
+ },
+ %% If Size is 0 then we can never be operational.
+ {ok, degraded, State}.
+
+gun_pools_key(Host, Port, Opts) ->
+ Transport = maps:get(transport, Opts, gun:default_transport(Port)),
+ Authority = gun_http:host_header(Transport, Host, Port),
+ Scope = maps:get(scope, Opts, default),
+ {Scope, iolist_to_binary(Authority)}.
+
+conn_opts(Tid, Opts) ->
+ ConnOpts = maps:get(conn_opts, Opts, #{}),
+ EventHandlerState = maps:with([event_handler], ConnOpts),
+ H2Opts = maps:get(http2_opts, ConnOpts, #{}),
+ ConnOpts#{
+ event_handler => {gun_pool_events_h, EventHandlerState#{
+ table => Tid
+ }},
+ http2_opts => H2Opts#{
+ notify_settings_changed => true
+ }
+ }.
+
+%% We use the degraded state as long as at least one connection is degraded.
+%% @todo Probably keep count of connections separately to avoid counting every time.
+degraded(info, Msg={gun_up, ConnPid, _}, StateData=#state{opts=Opts, conns=Conns}) ->
+ #{ConnPid := down} = Conns,
+ %% We optionally run the setup function if one is defined. The
+ %% setup function tells us whether we are fully up or not. The
+ %% setup function may be called repeatedly until the connection
+ %% is established.
+ %%
+ %% @todo It is possible that the connection never does get
+ %% fully established. We should deal with this. We probably
+ %% need to handle all messages.
+ {SetupFun, SetupState0} = setup_fun(Opts),
+ degraded_setup(ConnPid, Msg, StateData, SetupFun, SetupState0);
+%% @todo
+%degraded(info, Msg={gun_tunnel_up, ConnPid, _, _}, StateData0=#state{conns=Conns}) ->
+% ;
+degraded(info, Msg={gun_upgrade, ConnPid, _, _, _},
+ StateData=#state{opts=#{setup_fun := {SetupFun, _}}, conns=Conns}) ->
+ %% @todo Probably shouldn't crash if the state is incorrect, that's programmer error though.
+ #{ConnPid := {setup, SetupState0}} = Conns,
+ %% We run the setup function again using the state previously kept.
+ degraded_setup(ConnPid, Msg, StateData, SetupFun, SetupState0);
+degraded(Type, Event, StateData) ->
+ handle_common(Type, Event, ?FUNCTION_NAME, StateData).
+
+setup_fun(#{setup_fun := SetupFun}) ->
+ SetupFun;
+setup_fun(_) ->
+ {fun (_, {gun_up, _, Protocol}, _) ->
+ {up, Protocol, #{}}
+ end, undefined}.
+
+degraded_setup(ConnPid, Msg, StateData0=#state{conns=Conns, conns_meta=ConnsMeta,
+ await_up=AwaitUp}, SetupFun, SetupState0) ->
+ case SetupFun(ConnPid, Msg, SetupState0) of
+ Setup={setup, _SetupState} ->
+ StateData = StateData0#state{conns=Conns#{ConnPid => Setup}},
+ {keep_state, StateData};
+ %% The Meta is different from Settings. It allows passing around
+ %% Websocket or tunnel stream refs.
+ {up, Protocol, Meta} ->
+ Settings = #{},
+ StateData = StateData0#state{
+ conns=Conns#{ConnPid => {up, Protocol, Settings}},
+ conns_meta=ConnsMeta#{ConnPid => Meta}
+ },
+ case is_degraded(StateData) of
+ true -> {keep_state, StateData};
+ false -> {next_state, operational, StateData#state{await_up=[]},
+ [{reply, ReplyTo, ok} || ReplyTo <- AwaitUp]}
+ end
+ end.
+
+is_degraded(#state{conns=Conns0}) ->
+ Conns = maps:to_list(Conns0),
+ Len = length(Conns),
+ Ups = [up || {_, {up, _, _}} <- Conns],
+ Len =/= length(Ups).
+
+operational(Type, Event, StateData) ->
+ handle_common(Type, Event, ?FUNCTION_NAME, StateData).
+
+handle_common({call, From}, {checkout, _ReqOpts}, _,
+ StateData=#state{conns_meta=ConnsMeta}) ->
+ case find_available_connection(StateData) of
+ none ->
+ {keep_state_and_data, {reply, From, undefined}};
+ ConnPid ->
+ Meta = maps:get(ConnPid, ConnsMeta, #{}),
+ {keep_state_and_data, {reply, From, {ConnPid, Meta}}}
+ end;
+handle_common(info, {gun_notify, ConnPid, settings_changed, Settings}, _, StateData=#state{conns=Conns}) ->
+ %% Assert that the state is correct.
+ {up, http2, _} = maps:get(ConnPid, Conns),
+ {keep_state, StateData#state{conns=Conns#{ConnPid => {up, http2, Settings}}}};
+handle_common(info, {gun_down, ConnPid, Protocol, _Reason, _KilledStreams}, _, StateData=#state{conns=Conns}) ->
+ {up, Protocol, _} = maps:get(ConnPid, Conns),
+ {next_state, degraded, StateData#state{conns=Conns#{ConnPid => down}}};
+%% @todo We do not want to reconnect automatically when the pool is dynamic.
+handle_common(info, {'DOWN', _MRef, process, ConnPid0, _Reason}, _,
+ StateData=#state{host=Host, port=Port, opts=Opts, table=Tid, conns=Conns0, conns_meta=ConnsMeta0}) ->
+ Conns = maps:remove(ConnPid0, Conns0),
+ ConnsMeta = maps:remove(ConnPid0, ConnsMeta0),
+ ConnOpts = conn_opts(Tid, Opts),
+ {ok, ConnPid} = gun:open(Host, Port, ConnOpts),
+ _ = monitor(process, ConnPid),
+ {next_state, degraded, StateData#state{conns=Conns#{ConnPid => down}, conns_meta=ConnsMeta}};
+handle_common({call, From}, info, StateName, #state{host=Host, port=Port,
+ opts=Opts, table=Tid, conns=Conns, conns_meta=ConnsMeta}) ->
+ {keep_state_and_data, {reply, From, {StateName, #{
+ %% @todo Not sure whether all of this should be documented. Maybe not ConnsMeta for now?
+ host => Host,
+ port => Port,
+ opts => Opts,
+ table => Tid,
+ conns => Conns,
+ conns_meta => ConnsMeta
+ }}}};
+handle_common({call, From}, await_up, operational, _) ->
+ {keep_state_and_data, {reply, From, ok}};
+handle_common({call, From}, await_up, _, StateData=#state{await_up=AwaitUp}) ->
+ {keep_state, StateData#state{await_up=[From|AwaitUp]}};
+handle_common(Type, Event, StateName, StateData) ->
+ logger:error("Unexpected event in state ~p of type ~p:~n~w~n~p~n",
+ [StateName, Type, Event, StateData]),
+ keep_state_and_data.
+
+%% We go over every connection and return the first one
+%% we find that has capacity. How we determine whether
+%% capacity is available depends on the protocol. For
+%% HTTP/2 we look into the protocol settings. The
+%% current number of streams is maintained by the
+%% event handler gun_pool_events_h.
+find_available_connection(#state{table=Tid, conns=Conns}) ->
+ I = lists:sort([{rand:uniform(), K} || K <- maps:keys(Conns)]),
+ find_available_connection(I, Conns, Tid).
+
+find_available_connection([], _, _) ->
+ none;
+find_available_connection([{_, ConnPid}|I], Conns, Tid) ->
+ case maps:get(ConnPid, Conns) of
+ {up, Protocol, Settings} ->
+ MaxStreams = max_streams(Protocol, Settings),
+ CurrentStreams = case ets:lookup(Tid, ConnPid) of
+ [] ->
+ 0;
+ [{_, CS}] ->
+ CS
+ end,
+ if
+ CurrentStreams + 1 > MaxStreams ->
+ find_available_connection(I, Conns, Tid);
+ true ->
+ ConnPid
+ end;
+ _ ->
+ find_available_connection(I, Conns, Tid)
+ end.
+
+max_streams(http, _) ->
+ 1;
+max_streams(http2, #{max_concurrent_streams := MaxStreams}) ->
+ MaxStreams;
+max_streams(http2, #{}) ->
+ infinity;
+%% There are no streams or Gun is not aware of streams when
+%% the protocol is Websocket or raw.
+max_streams(ws, _) ->
+ infinity;
+max_streams(raw, _) ->
+ infinity.
+
+terminate(Reason, StateName, #state{host=Host, port=Port, opts=Opts, await_up=AwaitUp}) ->
+ gen_statem:reply([
+ {reply, ReplyTo, {error, {terminate, StateName, Reason}}}
+ || ReplyTo <- AwaitUp]),
+ true = ets:delete(gun_pools, gun_pools_key(Host, Port, Opts)),
+ ok.
diff --git a/src/gun_pool_events_h.erl b/src/gun_pool_events_h.erl
new file mode 100644
index 0000000..1871a00
--- /dev/null
+++ b/src/gun_pool_events_h.erl
@@ -0,0 +1,157 @@
+%% Copyright (c) 2021, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_pool_events_h).
+
+-export([init/2]).
+-export([domain_lookup_start/2]).
+-export([domain_lookup_end/2]).
+-export([connect_start/2]).
+-export([connect_end/2]).
+-export([tls_handshake_start/2]).
+-export([tls_handshake_end/2]).
+-export([request_start/2]).
+-export([request_headers/2]).
+-export([request_end/2]).
+-export([push_promise_start/2]).
+-export([push_promise_end/2]).
+-export([response_start/2]).
+-export([response_inform/2]).
+-export([response_headers/2]).
+-export([response_trailers/2]).
+-export([response_end/2]).
+-export([ws_upgrade/2]).
+-export([ws_recv_frame_start/2]).
+-export([ws_recv_frame_header/2]).
+-export([ws_recv_frame_end/2]).
+-export([ws_send_frame_start/2]).
+-export([ws_send_frame_end/2]).
+-export([protocol_changed/2]).
+-export([origin_changed/2]).
+-export([cancel/2]).
+-export([disconnect/2]).
+-export([terminate/2]).
+
+init(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+domain_lookup_start(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+domain_lookup_end(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+connect_start(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+connect_end(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+tls_handshake_start(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+tls_handshake_end(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+request_start(Event=#{stream_ref := StreamRef}, State=#{table := Tid}) ->
+ _ = ets:update_counter(Tid, self(), +1, {self(), 0}),
+ propagate(Event, State#{
+ StreamRef => {nofin, nofin}
+ }, ?FUNCTION_NAME).
+
+request_headers(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+request_end(Event=#{stream_ref := StreamRef}, State0=#{table := Tid}) ->
+ State = case State0 of
+ #{StreamRef := {nofin, fin}} ->
+ _ = ets:update_counter(Tid, self(), -1),
+ maps:remove(StreamRef, State0);
+ #{StreamRef := {nofin, IsFin}} ->
+ State0#{StreamRef => {fin, IsFin}}
+ end,
+ propagate(Event, State, ?FUNCTION_NAME).
+
+push_promise_start(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+push_promise_end(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+response_start(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+response_inform(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+response_headers(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+response_trailers(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+response_end(Event=#{stream_ref := StreamRef}, State0=#{table := Tid}) ->
+ State = case State0 of
+ #{StreamRef := {fin, nofin}} ->
+ _ = ets:update_counter(Tid, self(), -1),
+ maps:remove(StreamRef, State0);
+ #{StreamRef := {IsFin, nofin}} ->
+ State0#{StreamRef => {IsFin, fin}}
+ end,
+ propagate(Event, State, ?FUNCTION_NAME).
+
+ws_upgrade(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+ws_recv_frame_start(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+ws_recv_frame_header(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+ws_recv_frame_end(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+ws_send_frame_start(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+ws_send_frame_end(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+protocol_changed(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+origin_changed(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+cancel(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+disconnect(Event, State=#{table := Tid}) ->
+ %% The ets:delete/2 call might fail when the pool has shut down.
+ try
+ true = ets:delete(Tid, self())
+ catch _:_ ->
+ ok
+ end,
+ propagate(Event, maps:with([event_handler, table], State), ?FUNCTION_NAME).
+
+terminate(Event, State) ->
+ propagate(Event, State, ?FUNCTION_NAME).
+
+propagate(Event, State=#{event_handler := {Mod, ModState0}}, Fun) ->
+ ModState = Mod:Fun(Event, ModState0),
+ State#{event_handler => {Mod, ModState}};
+propagate(_, State, _) ->
+ State.
diff --git a/src/gun_pools_sup.erl b/src/gun_pools_sup.erl
new file mode 100644
index 0000000..157b41b
--- /dev/null
+++ b/src/gun_pools_sup.erl
@@ -0,0 +1,37 @@
+%% Copyright (c) 2021, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_pools_sup).
+-behaviour(supervisor).
+
+%% API.
+-export([start_link/0]).
+
+%% supervisor.
+-export([init/1]).
+
+%% API.
+
+-spec start_link() -> {ok, pid()}.
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% supervisor.
+
+init([]) ->
+ %% @todo Review restart strategies.
+ Procs = [
+ #{id => gun_pool, start => {gun_pool, start_link, []}, restart => transient}
+ ],
+ {ok, {#{strategy => simple_one_for_one}, Procs}}.
diff --git a/src/gun_sup.erl b/src/gun_sup.erl
index dbe6698..a1270d7 100644
--- a/src/gun_sup.erl
+++ b/src/gun_sup.erl
@@ -12,7 +12,6 @@
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-%% @private
-module(gun_sup).
-behaviour(supervisor).
@@ -22,17 +21,17 @@
%% supervisor.
-export([init/1]).
--define(SUPERVISOR, ?MODULE).
-
%% API.
-spec start_link() -> {ok, pid()}.
start_link() ->
- supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []).
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% supervisor.
init([]) ->
- Procs = [{gun, {gun, start_link, []},
- temporary, 5000, worker, [gun]}],
- {ok, {{simple_one_for_one, 10, 10}, Procs}}.
+ Procs = [
+ #{id => gun_conns_sup, start => {gun_conns_sup, start_link, []}, type => supervisor},
+ #{id => gun_pools_sup, start => {gun_pools_sup, start_link, []}, type => supervisor}
+ ],
+ {ok, {#{}, Procs}}.
diff --git a/test/handlers/pool_ws_handler.erl b/test/handlers/pool_ws_handler.erl
new file mode 100644
index 0000000..5475e88
--- /dev/null
+++ b/test/handlers/pool_ws_handler.erl
@@ -0,0 +1,13 @@
+%% Feel free to use, reuse and abuse the code in this file.
+
+-module(pool_ws_handler).
+
+-export([init/4]).
+-export([handle/2]).
+
+init(_, _, _, #{user_opts := ReplyTo}) ->
+ {ok, ReplyTo}.
+
+handle(Frame, ReplyTo) ->
+ ReplyTo ! Frame,
+ {ok, 0, ReplyTo}.
diff --git a/test/pool_SUITE.erl b/test/pool_SUITE.erl
new file mode 100644
index 0000000..a48b75d
--- /dev/null
+++ b/test/pool_SUITE.erl
@@ -0,0 +1,376 @@
+%% Copyright (c) 2021, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(pool_SUITE).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-import(ct_helper, [doc/1]).
+-import(ct_helper, [config/2]).
+-import(gun_test, [receive_from/1]).
+
+all() ->
+ ct_helper:all(?MODULE).
+
+init_per_suite(Config) ->
+ {ok, _} = cowboy:start_clear({?MODULE, tcp}, [], do_proto_opts()),
+ Port = ranch:get_port({?MODULE, tcp}),
+ [{port, Port}|Config].
+
+end_per_suite(_) ->
+ ExtraListeners = [
+ max_streams_h2_size_1,
+ max_streams_h2_size_2,
+ reconnect_h1
+ ],
+ _ = [cowboy:stop_listener(Listener) || Listener <- ExtraListeners],
+ ok.
+
+do_proto_opts() ->
+ Routes = [
+ {"/", hello_h, []},
+ {"/delay", delayed_hello_h, 3000},
+ {"/ws", ws_echo_h, []}
+ ],
+ #{
+ env => #{dispatch => cowboy_router:compile([{'_', Routes}])}
+ }.
+
+%% Tests.
+
+hello_pool_h1(Config) ->
+ doc("Confirm the pool can be used for HTTP/1.1 connections."),
+ Port = config(port, Config),
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => ["localhost:", integer_to_binary(Port)]},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams].
+
+hello_pool_h2(Config) ->
+ doc("Confirm the pool can be used for HTTP/2 connections."),
+ Port = config(port, Config),
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => ["localhost:", integer_to_binary(Port)]},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams].
+
+hello_pool_ws(Config) ->
+ doc("Confirm the pool can be used for HTTP/1.1 connections upgraded to Websocket."),
+ Port = config(port, Config),
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{
+ protocols => [http],
+ ws_opts => #{
+ default_protocol => pool_ws_handler,
+ user_opts => self()
+ }
+ },
+ scope => ?FUNCTION_NAME,
+ setup_fun => {fun
+ (ConnPid, {gun_up, _, http}, SetupState) ->
+ _ = gun:ws_upgrade(ConnPid, "/ws"),
+ {setup, SetupState};
+ (_, {gun_upgrade, _, StreamRef, _, _}, _) ->
+ {up, ws, #{ws => StreamRef}};
+ (ConnPid, Msg, SetupState) ->
+ ct:pal("Unexpected setup message for ~p: ~p", [ConnPid, Msg]),
+ {setup, SetupState}
+ end, undefined}
+ }),
+ gun_pool:await_up(ManagerPid),
+ _ = [gun_pool:ws_send({text, <<"Hello world!">>}, #{
+ authority => ["localhost:", integer_to_binary(Port)],
+ scope => ?FUNCTION_NAME
+ }) || _ <- lists:seq(1, 8)],
+ %% The pool_ws_handler module sends frames back to us.
+ _ = [receive
+ {text, <<"Hello world!">>} ->
+ ok
+ end || _ <- lists:seq(1, 8)].
+
+max_streams_h1(Config) ->
+ doc("Confirm requests are rejected when the maximum number "
+ "of streams is reached for HTTP/1.1 connections."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME,
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ {async, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}),
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}).
+
+max_streams_h1_retry(Config) ->
+ doc("Confirm connection checkout is retried when the maximum number "
+ "of streams is reached for HTTP/1.1 connections."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME,
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ {async, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}),
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay",
+ #{<<"host">> => Authority}, #{scope => ?FUNCTION_NAME}),
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{
+ checkout_retry => [100, 500, 500, 500, 500, 500, 500],
+ scope => ?FUNCTION_NAME
+ }).
+
+max_streams_h2_size_1(_) ->
+ doc("Confirm requests are rejected when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ [{async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}) || _ <- lists:seq(1, 5)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}).
+
+max_streams_h2_size_1_retry(_) ->
+ doc("Confirm connection checkout is retried when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 1
+ }),
+ gun_pool:await_up(ManagerPid),
+ [{async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}) || _ <- lists:seq(1, 5)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{
+ checkout_retry => [100, 500, 500, 500, 500, 500, 500]
+ }).
+
+max_streams_h2_size_2(_) ->
+ doc("Confirm requests are rejected when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 2
+ }),
+ gun_pool:await_up(ManagerPid),
+ [begin
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ %% We need to wait a bit for the request to be sent because the
+ %% request is sent and counted asynchronously.
+ timer:sleep(10)
+ end || _ <- lists:seq(1, 10)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}).
+
+max_streams_h2_size_2_retry(_) ->
+ doc("Confirm connection checkout is retried when the maximum number "
+ "of streams is reached for HTTP/2 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ max_concurrent_streams => 5
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ size => 2
+ }),
+ gun_pool:await_up(ManagerPid),
+ [begin
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ %% We need to wait a bit for the request to be sent because the
+ %% request is sent and counted asynchronously.
+ timer:sleep(10)
+ end || _ <- lists:seq(1, 10)],
+ timer:sleep(500),
+ {error, no_connection_available, _} = gun_pool:get("/delay", #{<<"host">> => Authority}),
+ {async, _} = gun_pool:get("/delay", #{<<"host">> => Authority}, #{
+ checkout_retry => [100, 500, 500, 500, 500, 500, 500]
+ }).
+
+kill_restart_h1(Config) ->
+ doc("Confirm the Gun process is restarted and the pool operational "
+ "after an HTTP/1.1 Gun process has crashed."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Get a connection and kill the process.
+ {operational, #{conns := Conns}} = gun_pool:info(ManagerPid),
+ ConnPid = hd(maps:keys(Conns)),
+ MRef = monitor(process, ConnPid),
+ exit(ConnPid, {shutdown, ?FUNCTION_NAME}),
+ receive {'DOWN', MRef, process, ConnPid, _} -> ok end,
+ {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+kill_restart_h2(Config) ->
+ doc("Confirm the Gun process is restarted and the pool operational "
+ "after an HTTP/2 Gun process has crashed."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Get a connection and kill the process.
+ {operational, #{conns := Conns}} = gun_pool:info(ManagerPid),
+ ConnPid = hd(maps:keys(Conns)),
+ MRef = monitor(process, ConnPid),
+ exit(ConnPid, {shutdown, ?FUNCTION_NAME}),
+ receive {'DOWN', MRef, process, ConnPid, _} -> ok end,
+ {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+%% @todo kill_restart_ws
+
+reconnect_h1(_) ->
+ doc("Confirm the Gun process reconnects automatically for HTTP/1.1 connections."),
+ ProtoOpts = do_proto_opts(),
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], ProtoOpts#{
+ idle_timeout => 500,
+ scope => ?FUNCTION_NAME
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http]}
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/", #{<<"host">> => Authority}) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Wait for the idle timeout to trigger.
+ timer:sleep(600),
+% {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/", #{<<"host">> => Authority}) || _ <- lists:seq(1, 8)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+reconnect_h2(Config) ->
+ doc("Confirm the Gun process reconnects automatically for HTTP/2 connections."),
+ Port = config(port, Config),
+ Authority = ["localhost:", integer_to_binary(Port)],
+ {ok, ManagerPid} = gun_pool:start_pool("localhost", Port, #{
+ conn_opts => #{protocols => [http2]},
+ scope => ?FUNCTION_NAME
+ }),
+ gun_pool:await_up(ManagerPid),
+ Streams1 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams1],
+ %% Wait for the idle timeout to trigger.
+ timer:sleep(600),
+% {degraded, _} = gun_pool:info(ManagerPid),
+ gun_pool:await_up(ManagerPid),
+ Streams2 = [{async, _} = gun_pool:get("/",
+ #{<<"host">> => Authority},
+ #{scope => ?FUNCTION_NAME}
+ ) || _ <- lists:seq(1, 800)],
+ _ = [begin
+ {response, nofin, 200, _} = gun_pool:await(StreamRef),
+ {ok, <<"Hello world!">>} = gun_pool:await_body(StreamRef)
+ end || {async, StreamRef} <- Streams2].
+
+%% @todo reconnect_ws